前篇
一个TCP长连接设备管理后台工程(一)
一个TCP长连接设备管理后台工程(二)
一个TCP长连接设备管理后台工程(三)
一个TCP长连接设备管理后台工程(四)
一个TCP长连接设备管理后台工程(五)
封包器
上面介绍了过滤器,过滤器实际就是一个能够处理粘包和拆包的解析器,和封包器的作用正好相反。但是封包器会很简单,因为封包没有粘包和拆包的处理。
代码如下:
//Packer is proto Packer api
func Packer(msg Message) []byte {
data := make([]byte, 0)
tempbytes := codec.Word2Bytes(msg.HEADER.MID)
data = append(data, tempbytes...)
datalen := uint16(len(msg.BODY)) & 0x03FF
datalen = datalen | 0x4000
tempbytes = utils.Word2Bytes(datalen)
data = append(data, tempbytes...)
data = append(data, msg.HEADER.Version)
if len(msg.HEADER.PhoneNum) < 10 {
data = append(data, make([]byte, 10-len(msg.HEADER.PhoneNum))...)
data = append(data, msg.HEADER.PhoneNum...)
} else {
data = append(data, msg.HEADER.PhoneNum[:10]...)
}
tempbytes = utils.Word2Bytes(msg.HEADER.SeqNum)
data = append(data, tempbytes...)
if msg.HEADER.IsMulti() {
data = append(data, utils.Word2Bytes(msg.HEADER.MutilFlag.MsgSum)...)
data = append(data, utils.Word2Bytes(msg.HEADER.MutilFlag.MsgIndex)...)
}
data = append(data, msg.BODY...)
csdata := byte(checkSum(data[:]))
data = append(data, csdata)
//添加头尾
var tmpdata []byte = []byte{0x7e}
for _, item := range data {
if item == 0x7d {
tmpdata = append(tmpdata, 0x7d, 0x01)
} else if item == 0x7e {
tmpdata = append(tmpdata, 0x7d, 0x02)
} else {
tmpdata = append(tmpdata, item)
}
}
tmpdata = append(tmpdata, 0x7e)
return tmpdata
}
处理器
处理器用来处理接收到的有效TCP数据包,它应该是比过滤器更上层的一个模块。因为我们是用来管理TCP连接的,一个tcp连接代表着一个终端设备,这个终端设备有各种属性和操作逻辑,这些东西都是依附于TCP的长连接。我们单独定义一个包来组织这部分内容:
package term
而我们的处理器就存在于这个包中。由于这个模块是tcp数据的实际处理模块,所以会牵扯到许多相关连的包,比如前面的codec、proto等,还有数据库的操作。
这一部分我们主要只介绍处理器的逻辑。前面我们说了,我们要处理的包有:
- 平台通用应答
- 终端通用应答
- 终端注册
- 终端注册应答
- 终端鉴权
- 心跳
- 位置上报处理
通过proto的filter我们得到了各个Message,并且获取了其中的帧头信息,BODY部分还没有处理。而我们的codec正是用来处理BODY部分的编/解码器。
所以处理器的基本流程就是根据Message中Header信息,分别处理其Body数据,然后返回处理的结果。这个处理的结果往往就是需要响应的数据流。所以我们的处理器函数的样子大概就是这样的:
func (t *Terminal) Handler(msg proto.Message) []byte{
}
传入一个Message,入后输出需要响应的数据,如果返回nil则表明没有数据需要响应。
其中Terminal这个结构体我们在后端模型这个装接中有提及到:
type Terminal struct {
authkey string
imei string
iccid string
vin string
tboxver string
loginTime time.Time
seqNum uint16
phoneNum []byte
Conn net.Conn
Engine *xorm.Engine
Ch chan int
}
同时为了使用codec的序列化和反序列化,我们还需要定义如下结构体:
type TermAckBody struct {
AckSeqNum uint16
AckID uint16
AckResult uint8
}
type PlatAckBody struct {
AckSeqNum uint16
AckID uint16
AckResult uint8
}
type RegisterBody struct {
ProID uint16
CityID uint16
ManufID []byte `len:"11"`
TermType []byte `len:"30"`
TermID []byte `len:"30"`
LicPlateColor uint8
LicPlate string
}
type RegisterAckBody struct {
AckSeqNum uint16
AckResult uint8
AuthKey string
}
type AuthBody struct {
AuthKeyLen uint8
AuthKey string
Imei []byte `len:"15"`
Version []byte `len:"20"`
}
type GPSInfoBody struct {
WarnFlag uint32
State uint32
Lat uint32
Lng uint32
Alt uint16
Speed uint16
Dir uint16
Time []byte `len:"6"`
}
type CtrlBody struct {
Cmd uint8
Param string
}
下面就来正式讲解Handler的实现。
首先获取保存Header中的电话号和流水号到Terminal中:
if t.phoneNum == nil {
t.phoneNum = make([]byte, 10)
}
copy(t.phoneNum, []byte(msg.HEADER.PhoneNum))
t.seqNum = msg.HEADER.SeqNum
然后通过switch来匹配消息id,并对其body部分做相关处理:
switch msg.HEADER.MID {
case proto.TermAck:
//
case proto.Register:
//
case proto.Login:
//
case proto.Heartbeat:
//
case proto.Gpsinfo:
//
}
return nil
我们先说注册,我们使用帧头中的手机号,在数据库中查找对应的鉴权码。然后从msg中获取body部分,通过codec反序列话得到RegisterBody实例。为了简单,我们此处不做其他数据验证,直接做出数据响应即可。生成需要响应的RegisterAckBody实例,然后序列化为body切片,然后生成响应的Message,再通过封包器封包为数据流返回:
devinfo := new(DevInfo)
devinfo.PhoneNum = strings.TrimLeft(utils.HexBuffToString(t.phoneNum), "0")
is, _ := t.Engine.Get(devinfo)
if !is {
return []byte{}
}
var reg RegisterBody
_, err := codec.Unmarshal(msg.BODY, ®)
if err != nil {
fmt.Println("err:", err)
}
var body []byte
body, err = codec.Marshal(&RegisterAckBody{
AckSeqNum: msg.HEADER.SeqNum,
AckResult: 0,
AuthKey: devinfo.Authkey,
})
if err != nil {
fmt.Println("err:", err)
}
msgAck := proto.Message{
HEADER: proto.Header{
MID: proto.RegisterAck,
Attr: proto.MakeAttr(1, false, 0, uint16(len(body))),
Version: 1,
PhoneNum: string(t.phoneNum),
SeqNum: t.seqNum,
},
BODY: body,
}
return proto.Packer(msgAck)
上面有涉及到数据库的查询操作,这部分使用了xorm,具体的参考xorm官方文档:xorm官方文档
上面涉及一个utils.HexBuffToString函数,这个函数会将字符串转换为16进制格式的字符串,本身是基于strconv.FormatUint(uint64(value), 16)完成的,但是这个函数会没有办法指定转换后的填充值,比如0x0A会直接转换成"A"而不是"0A",所以需要做一点特殊处理:
func HexBuffToString(hex []byte) string {
var ret string
for _, value := range hex {
str := strconv.FormatUint(uint64(value), 16)
if len([]rune(str)) == 1 {
ret = ret + "0" + str
} else {
ret = ret + str
}
}
return ret
}
Handler其他部分的流程大体差不多,就不做过多讲解了,完整代码:
//Handler is proto Handler api
func (t *Terminal) Handler(msg proto.Message) []byte {
if t.phoneNum == nil {
t.phoneNum = make([]byte, 10)
}
copy(t.phoneNum, []byte(msg.HEADER.PhoneNum))
t.seqNum = msg.HEADER.SeqNum
switch msg.HEADER.MID {
case proto.TermAck:
reqID := codec.Bytes2Word(msg.BODY[2:4])
if reqID == proto.UpdateReq {
//ch <- 1
//升级命令
}
case proto.Register:
devinfo := new(DevInfo)
devinfo.PhoneNum = strings.TrimLeft(utils.HexBuffToString(t.phoneNum), "0")
is, _ := t.Engine.Get(devinfo)
if !is {
return []byte{}
}
var reg RegisterBody
_, err := codec.Unmarshal(msg.BODY, ®)
if err != nil {
fmt.Println("err:", err)
}
var body []byte
body, err = codec.Marshal(&RegisterAckBody{
AckSeqNum: msg.HEADER.SeqNum,
AckResult: 0,
AuthKey: devinfo.Authkey,
})
if err != nil {
fmt.Println("err:", err)
}
msgAck := proto.Message{
HEADER: proto.Header{
MID: proto.RegisterAck,
Attr: proto.MakeAttr(1, false, 0, uint16(len(body))),
Version: 1,
PhoneNum: string(t.phoneNum),
SeqNum: t.seqNum,
},
BODY: body,
}
return proto.Packer(msgAck)
case proto.Login:
var auth AuthBody
_, err := codec.Unmarshal(msg.BODY, &auth)
if err != nil {
fmt.Println("err:", err)
}
t.authkey = auth.AuthKey
t.imei = string(auth.Imei)
t.tboxver = string(auth.Version)
var body []byte
body, err = codec.Marshal(&PlatAckBody{
AckSeqNum: msg.HEADER.SeqNum,
AckID: msg.HEADER.MID,
AckResult: 0,
})
if err != nil {
fmt.Println("err:", err)
}
msgAck := proto.Message{
HEADER: proto.Header{
MID: proto.PlatAck,
Attr: proto.MakeAttr(1, false, 0, uint16(len(body))),
Version: 1,
PhoneNum: string(t.phoneNum),
SeqNum: t.seqNum,
},
BODY: body,
}
return proto.Packer(msgAck)
case proto.Heartbeat:
var err error
var body []byte
body, err = codec.Marshal(&PlatAckBody{
AckSeqNum: msg.HEADER.SeqNum,
AckID: msg.HEADER.MID,
AckResult: 0,
})
if err != nil {
fmt.Println("err:", err)
}
msgAck := proto.Message{
HEADER: proto.Header{
MID: proto.PlatAck,
Attr: proto.MakeAttr(1, false, 0, uint16(len(body))),
Version: 1,
PhoneNum: string(t.phoneNum),
SeqNum: t.seqNum,
},
BODY: body,
}
return proto.Packer(msgAck)
case proto.Gpsinfo:
var gpsInfo GPSInfoBody
_, err := codec.Unmarshal(msg.BODY, &gpsInfo)
if err != nil {
fmt.Println("err:", err)
}
gpsdata := new(GPSData)
gpsdata.Imei = t.imei
gpsdata.Stamp = time.Now()
gpsdata.WarnFlag = gpsInfo.WarnFlag
gpsdata.State = gpsInfo.State
gpsdata.Latitude = gpsInfo.Lat
gpsdata.Longitude = gpsInfo.Lng
gpsdata.Altitude = gpsInfo.Alt
gpsdata.Speed = gpsInfo.Speed
gpsdata.Direction = gpsInfo.Dir
if (gpsdata.State & 0x00000001) > 0 {
gpsdata.AccState = 1
} else {
gpsdata.AccState = 0
}
if (gpsdata.State & 0x00000002) > 0 {
gpsdata.GpsState = 1
} else {
gpsdata.GpsState = 0
}
_, err = t.Engine.Insert(gpsdata)
if err != nil {
fmt.Println("insert gps err:", err)
}
var body []byte
body, err = codec.Marshal(&PlatAckBody{
AckSeqNum: msg.HEADER.SeqNum,
AckID: msg.HEADER.MID,
AckResult: 0,
})
if err != nil {
fmt.Println("err:", err)
}
msgAck := proto.Message{
HEADER: proto.Header{
MID: proto.PlatAck,
Attr: proto.MakeAttr(1, false, 0, uint16(len(body))),
Version: 1,
PhoneNum: string(t.phoneNum),
SeqNum: t.seqNum,
},
BODY: body,
}
return proto.Packer(msgAck)
}
return nil
}
有疑问加站长微信联系(非本文作者)