config.ini 配置文件
[stomp] ;activemq的IP地址 host:192.168.7.85 ;activemq的端口 port:61613 ;activemq的队列 queue:/queue/bbg_ordercache [php] ;php的执行路径 phpbin:php.exe ;被执行的文件的路径 filepath:D:/jianguo/command/application/cli ;传递给被执行文件的参数 params:show
main.go 代码文件:
package main import ( "bytes" "fmt" "github.com/gmallard/stompngo" "github.com/unknwon/goconfig" "log" "net" "os" "os/exec" ) // 存储配置信息的变量 var config *goconfig.ConfigFile // 存储日志信息的变量 var mylog *log.Logger // 启动初始化 func init() { // 加载配置文件 configPath := "./config.ini" conf, err := goconfig.LoadConfigFile(configPath) if err != nil { fmt.Println(err) } config = conf // @todo 强化按日期写日志文件 file := "./log.txt" //t := time.Now() //file := "./log_" + strings.Replace(t.String()[:19], ":", "_", 3) + ".txt" hander, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666) if err != nil { log.Println(err) } mylog = log.New(hander, "\r\n", log.Ldate|log.Ltime|log.Llongfile) } // 主程序 func main() { host, _ := config.GetValue("stomp", "host") port, _ := config.GetValue("stomp", "port") n, e := net.Dial("tcp", net.JoinHostPort(host, port)) if e != nil { fmt.Println(e) } // STOMP 1.0 的标准头 //h := stompngo.Headers{} // STOMP 1.1 的标准头 h := stompngo.Headers{"accept-version", "1.1"} // @todo 强化网络断开之后重试 c, e := stompngo.Connect(n, h) if e != nil { fmt.Println(e) } // 必须客户端响应才可以删除MQ队列数据 f := stompngo.Headers{"destination", "/queue/bbg_ordercache", "ack", "client"} // 自动删除MQ队列的数据 //f := stompngo.Headers{"destination", "/queue/bbg_ordercache"} s, e := c.Subscribe(f) if e != nil { fmt.Println(e) } // 设置通道的容量 //fmt.Println(c.SubChanCap()) //c.SetSubChanCap(1) for { //r := <-s //fmt.Println(r) run(c, s) } } // 外部shell脚本调用,成功处理删除相应队列 func run(c *stompngo.Connection, s <-chan stompngo.MessageData) { phproot, _ := config.GetValue("php", "phpbin") filepath, _ := config.GetValue("php", "filepath") params, _ := config.GetValue("php", "params") r := <-s // 记录结果 mylog.Println(r) order_id := r.Message.Headers.Value("order_id") //fmt.Println(order_id) cmd := exec.Command(phproot, filepath, params, "order_id", order_id) var out bytes.Buffer cmd.Stdout = &out err := cmd.Start() if err != nil { log.Fatal(err) } mylog.Println("Waiting for command to finish...") err = cmd.Wait() if err != nil { mylog.Printf("Command finished with error: %v", err) } str := out.String() //fmt.Println(str) if str == "success" { e := c.Ack(r.Message.Headers) if e != nil { mylog.Println(e) } } }