golang连接activemq

chen yuwen · · 11498 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

config.ini 配置文件

[stomp]
;activemq的IP地址
host:192.168.7.85 
;activemq的端口
port:61613 
;activemq的队列
queue:/queue/bbg_ordercache 
[php]
;php的执行路径
phpbin:php.exe
;被执行的文件的路径 
filepath:D:/jianguo/command/application/cli  
;传递给被执行文件的参数
params:show


main.go 代码文件:

package main
import (
	"bytes"
	"fmt"
	"github.com/gmallard/stompngo"
	"github.com/unknwon/goconfig"
	"log"
	"net"
	"os"
	"os/exec"
)
// 存储配置信息的变量
var config *goconfig.ConfigFile
// 存储日志信息的变量
var mylog *log.Logger
// 启动初始化
func init() {
	// 加载配置文件
	configPath := "./config.ini"
	conf, err := goconfig.LoadConfigFile(configPath)
	if err != nil {
		fmt.Println(err)
	}
	config = conf
	// @todo 强化按日期写日志文件
	file := "./log.txt"
	//t := time.Now()
	//file := "./log_" + strings.Replace(t.String()[:19], ":", "_", 3) + ".txt"
	hander, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666)
	if err != nil {
		log.Println(err)
	}
	mylog = log.New(hander, "\r\n", log.Ldate|log.Ltime|log.Llongfile)
}
// 主程序
func main() {
	host, _ := config.GetValue("stomp", "host")
	port, _ := config.GetValue("stomp", "port")
	n, e := net.Dial("tcp", net.JoinHostPort(host, port))
	if e != nil {
		fmt.Println(e)
	}
	// STOMP 1.0 的标准头
	//h := stompngo.Headers{}
	// STOMP 1.1 的标准头
	h := stompngo.Headers{"accept-version", "1.1"}
	// @todo 强化网络断开之后重试
	c, e := stompngo.Connect(n, h)
	if e != nil {
		fmt.Println(e)
	}
	// 必须客户端响应才可以删除MQ队列数据
	f := stompngo.Headers{"destination", "/queue/bbg_ordercache", "ack", "client"}
	// 自动删除MQ队列的数据
	//f := stompngo.Headers{"destination", "/queue/bbg_ordercache"}
	s, e := c.Subscribe(f)
	if e != nil {
		fmt.Println(e)
	}
	// 设置通道的容量
	//fmt.Println(c.SubChanCap())
	//c.SetSubChanCap(1)
	for {
		//r := <-s
		//fmt.Println(r)
		run(c, s)
	}
}
// 外部shell脚本调用,成功处理删除相应队列
func run(c *stompngo.Connection, s <-chan stompngo.MessageData) {
	phproot, _ := config.GetValue("php", "phpbin")
	filepath, _ := config.GetValue("php", "filepath")
	params, _ := config.GetValue("php", "params")
	r := <-s
	// 记录结果
	mylog.Println(r)
	order_id := r.Message.Headers.Value("order_id")
	//fmt.Println(order_id)
	cmd := exec.Command(phproot, filepath, params, "order_id", order_id)
	var out bytes.Buffer
	cmd.Stdout = &out
	err := cmd.Start()
	if err != nil {
		log.Fatal(err)
	}
	mylog.Println("Waiting for command to finish...")
	err = cmd.Wait()
	if err != nil {
		mylog.Printf("Command finished with error: %v", err)
	}
	str := out.String()
	//fmt.Println(str)
	if str == "success" {
		e := c.Ack(r.Message.Headers)
		if e != nil {
			mylog.Println(e)
		}
	}
}



有疑问加站长微信联系(非本文作者)

本文来自:开源中国博客

感谢作者:chen yuwen

查看原文:golang连接activemq

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

11498 次点击  
加入收藏 微博
2 回复  |  直到 2019-10-08 21:23:26
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传