听说go的在系统性能方面有很大的优势,最近对go语言产生了极大的兴趣,相对现有的项目用go改造,原有的项目用的ace框架编写的通信的框架,在目前的移动的通信网中忙时有的时候处理不过来,于是先研究试图测试一下socket。由于对go刚刚入门,有些不正确的地方还请高人指点。
由于我们系统通常是不同语言之间通信(之前系统是客户端和服务端都用c++),这里客户端采用java+mina编写,服务端采用go编写,最初设计,像借用go语言中的gob进行编解码,但是经过测试后发现行不通,经过和网友以及一些高人的指点,gob其实针对go语言之间的编解码的,跨语言还真不灵光。有同事建议我用protocolbuffer这个,我一看这个又是定义类似idl文件(之前做了几年的corba技术,对这样的东西有点抵触了,因为有的时候项目合作方已经定下的方案或者已经完成的项目很难配合你用一种新的技术或者新协议重新修改),没有办法我只能采用硬编码的方式实现了,下一步我会采用protocolbuffer技术实现跨语言之间的通信,如果您有刚好的方式,希望能交流。下面我把我的部分代码贴一下,仅供参考,如果需要全部可以测试程序可以留下方式。
客户端主要代码:
主入口类:
public class Client {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
MinaClient client = new MinaClient();
if (client.connect()) {
System.out.println("连接服务器成功!");
//client.send("连接服务器成功!");
//Scanner scanner = new Scanner(System.in);
boolean flag =false;
int i = 0;
while (!flag) {
i++;
//client.send("hello world "+i);
Ss7LspMsg msg = new Ss7LspMsg();
msg.setSeq(231115);
msg.setProtocoltype(1);
msg.setTime(System.currentTimeMillis());
msg.setLsp(123);
msg.setLen(20);
byte[] bytes = new byte[20];
for(int j=0;j<20;j++){
bytes[i] = (byte) j;
}
msg.setBytes(bytes);
//client.send("helloworld"+i);
client.send(msg);
if(i==1)
flag = true;
/*try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
}
}
System.out.println("OVER!");
client.close();
}
}
MinaClient类:
public class MinaClient {
private SocketConnector connector;
private ConnectFuture future;
private IoSession session;
public boolean connect() {
// 创建一个socket连接
connector = new NioSocketConnector();
// 设置链接超时时间
connector.setConnectTimeoutMillis(3000);
// 获取过滤器链
DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
// 添加编码过滤器 处理乱码、编码问题
filterChain.addLast("codec", new ProtocolCodecFilter(new CharsetCodecFactory()));
// 消息核心处理器
connector.setHandler(new ClientMessageHandlerAdapter());
// 连接服务器,知道端口、地址
future = connector.connect(new InetSocketAddress("127.0.0.1",22345));
// 等待连接创建完成
future.awaitUninterruptibly();
// 获取当前session
session = future.getSession();
return true;
}
public void setAttribute(Object key, Object value) {
session.setAttribute(key, value);
}
public void send(String message) {
session.write(message);
}
public void send(Ss7LspMsg message) {
session.write(message);
}
public boolean close() {
CloseFuture future = session.getCloseFuture();
future.awaitUninterruptibly(1000);
connector.dispose();
return true;
}
public SocketConnector getConnector() {
return connector;
}
public IoSession getSession() {
return session;
}
消息基础类(消息头)
public abstract class MsgHeader implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
//消息长度,包括头长度
//头长:8
int length = 20;
//协议类型
int protocoltype;
//序列号
int seq;
//时间戳
long time;
public MsgHeader() {
super();
// TODO Auto-generated constructor stub
}
public MsgHeader(int length, int protocoltype, int seq) {
super();
this.length = length;
this.protocoltype = protocoltype;
this.seq = seq;
this.time = System.currentTimeMillis();
}
public void encodeHeader(IoBuffer buf) {
// The total Length will be set later.
buf.putInt(seq);
buf.putInt(protocoltype);
buf.putInt(length);
System.out.println("len is "+length);
buf.putLong(time);
}
public void decodeHeader(IoBuffer buf) {
seq = buf.getInt();
protocoltype = buf.getInt();
length = buf.getInt();
time = buf.getLong();
}
public abstract boolean encodeBody(IoBuffer bt);
public abstract boolean decodeBody(byte[] body);
public int getLength() {
return length;
}
public void setLength(int length) {
this.length += length;
}
public int getProtocoltype() {
return protocoltype;
}
public void setProtocoltype(int protocoltype) {
this.protocoltype = protocoltype;
}
public int getSeq() {
return seq;
}
public void setSeq(int seq) {
this.seq = seq;
}
public byte[] strToBytes(int len,String str){
byte[] bytes = new byte[len];
for(int i=0;i<len;i++){
bytes[i] = (byte) 0xff;
}
String tmpstr = null;
int tmplen = 0;
if(str.trim().length()>len){
tmpstr = str.substring(0, 15);
tmplen = 16;
}else{
tmpstr = str;
tmplen = str.length();
}
byte[] tmpbytes = tmpstr.getBytes();
for(int i=0;i<tmplen;i++){
bytes[i] = tmpbytes[i];
}
return bytes;
}
public String decOctetString(byte[] bt) {
int b = 0;
int e = 0;
// find the begin non 0 position;
for (int i = 0; i < bt.length; i++) {
if (bt[i] != 0) {
b = i;
break;
}
}
// find the end non 0 position;
for (int i = bt.length - 1; i > 0; i--) {
if (bt[i] != 0) {
e = i;
break;
}
}
return new String(bt, b, e - b + 1);
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
/**
* 字符串ip转换为long
* @param 字符串ip
* @return
*/
public static long getStringIpToLong(String ip) {
String[] ips = ip.trim().split("[.]");
long num = 16777216L*Long.parseLong(ips[0]) + 65536L*Long.parseLong(ips[1]) + 256*Long.parseLong(ips[2]) + Long.parseLong(ips[3]);
return num;
}
/**
* 长整型ip转换为string
* @param long型ip
* @return
*/
public static String getLongIpToString(long ipLong) {
long mask[] = {0x000000FF,0x0000FF00,0x00FF0000,0xFF000000};
long num = 0;
StringBuffer ipInfo = new StringBuffer();
for(int i=0;i<4;i++){
num = (ipLong & mask[i])>>(i*8);
if(i>0) ipInfo.insert(0,".");
ipInfo.insert(0,Long.toString(num,10));
}
return ipInfo.toString();
}
}
抽象消息类(请求类、反馈类)
public abstract class BaseReq extends MsgHeader {
/**
*
*/
private static final long serialVersionUID = 1L;
}
public abstract class BaseRsp extends MsgHeader {
/**
*
*/
private static final long serialVersionUID = 1L;
protected int result;
protected int reason;
public int getResult() {
return result;
}
public void setResult(int result) {
this.result = result;
}
public int getReason() {
return reason;
}
public void setReason(int reason) {
this.reason = reason;
}
}
测试消息类
public class Ss7LspMsg extends BaseReq {
private int lsp;
private int len;
private byte[] bytes;
public int getLsp() {
return lsp;
}
public void setLsp(int lsp) {
this.lsp = lsp;
}
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getBytes() {
return bytes;
}
public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
@Override
public boolean encodeBody(IoBuffer bt) {
// TODO Auto-generated method stub
this.setLength(len+8);
encodeHeader(bt);
bt.putInt(len);
bt.putInt(lsp);
bt.put(bytes);
return true;
}
@Override
public boolean decodeBody(byte[] body) {
// TODO Auto-generated method stub
return false;
}
}
服务端代码:
负责通信的go文件
package main
import (
"fmt"
//"github.com/bbangert/toml"
"bytes"
"encoding/binary"
"encoding/gob"
"io"
"net"
)
//常量定义
const (
VERSION = "0.1.0"
TCP = "tcp"
UDP = "udp"
RECV_BUF_LEN = 1024
)
type IpTransType struct {
Type string //网络类型tcp/udp
Addr string //ip地址 默认 127.0.0.1
Port int32
}
func InitServer(transType IpTransType) (err error) {
if transType.Addr == "" {
err = fmt.Errorf("transType.Addr is nil,please check the configuration file")
return
}
if transType.Port < 1 || transType.Port > 65535 {
err = fmt.Errorf("transType.Port must be in (1 ~ 65535")
return
}
if !(transType.Type == TCP || transType.Type == UDP) {
err = fmt.Errorf("transType.Type only be 'tcp' or 'udp' ")
return
}
listener, err := net.Listen(transType.Type, "127.0.0.1:22345")
defer listener.Close()
for {
conn, err := listener.Accept()
if err != nil {
continue
}
fmt.Println("conn is coming")
go Receiver(conn)
}
return
}
type LspMsg struct {
seq int32
protocol int32
length int32
times int64
lens int32
lsp int32
bytes [20]byte
//bytes := make([]byte,20)
//bytes *[]byte
}
type LspMsgBig struct {
Seq int32
Protocol int32
Length int32
Times int64
Lens int32
Lsp int32
Bytes [20]byte
//bytes := make([]byte,20)
//bytes *[]byte
}
func Decode(data []byte, to interface{}) error {
buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf)
return dec.Decode(to)
}
func BytesToInt32(bytes []byte) int32 {
return int32(binary.BigEndian.Uint32(bytes))
}
func BytesToInt8(bytes []byte) int8 {
return int8(bytes[0])
}
func BytesToInt16(bytes []byte) int16 {
return int16(binary.BigEndian.Uint16(bytes))
}
func BytesToInt64(bytes []byte) int64 {
return int64(binary.BigEndian.Uint64(bytes))
}
func Receiver(conn net.Conn) (err error) {
buf := make([]byte, RECV_BUF_LEN)
//buf bytes.Buffer
defer conn.Close()
for {
n, err1 := conn.Read(buf)
switch err1 {
case nil:
//n, _ := conn.Write(buf[0:n])
var out LspMsg
//Decode(b, &out)
var outout LspMsgBig
if err := Decode(buf, &outout); err != nil {
fmt.Println("decode fail: " + err.Error())
}
fmt.Println("outout is ", outout)
fmt.Println("Byte2Int32 is ", BytesToInt32(buf[0:4]))
fmt.Println("length is ", buf[0:n])
fmt.Println("length is ", buf[0:4])
fmt.Println("length is ", BytesToInt8(buf[1:4]))
out.seq = BytesToInt32(buf[0:4])
out.protocol = BytesToInt32(buf[4:8])
out.length = BytesToInt32(buf[8:12])
out.times = BytesToInt64(buf[12:20])
out.lens = BytesToInt32(buf[20:24])
out.lsp = BytesToInt32(buf[24:28])
bytes := out.bytes[0:20]
copy(bytes, buf[28:n])
//out.bytes = &(buf[28:n])
fmt.Println(out.bytes)
/*
for j := 0; j < 20; j++ {
out.bytes[j] = buf[j+28]
}
*/
fmt.Println("length is ", out)
case io.EOF: //当对方断开连接时触发该方法
fmt.Printf("Warning: End of data: %s \n", err1)
err = err1
return
default: //当对方断开连接时触发该方法
fmt.Printf("Error: Reading data: %s \n", err1)
err = err1
return
}
}
return
}程序主入口:
package main
import (
"fmt"
//"net"
"bytes"
"encoding/gob"
//"C"
)
type P struct {
X, Y, Z int
Name string
}
type Q struct {
X, Y *int32
Name string
}
func main() {
//C.puts(C.CString("Hello, world\n"))
var network bytes.Buffer // Stand-in for a network connection
enc := gob.NewEncoder(&network) // Will write to network.
dec := gob.NewDecoder(&network) // Will read from network.
err := enc.Encode(P{3, 4, 5, "Pythagoras"})
if err != nil {
fmt.Println("encode error:", err)
}
var q Q
err = dec.Decode(&q)
fmt.Println("ENC IS ", enc)
fmt.Println("dec IS ", dec)
//fmt.Println("network IS ", network.String())
if err != nil {
fmt.Println("decode error:", err)
}
fmt.Printf("%q: {%d,%d}\n", q.Name, *q.X, *q.Y)
fmt.Println("Hello World!")
var transType IpTransType
transType.Addr = "127.0.0.1"
transType.Port = 12345
transType.Type = TCP
InitServer(transType)
}
测试结果:
conn is coming 48 decode fail: EOF outout is {0 0 0 0 0 0 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]} Byte2Int32 is 231115 length is [0 3 134 203 0 0 0 1 0 0 0 48 0 0 1 63 38 140 96 48 0 0 0 20 0 0 0 123 0 19 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] length is [0 3 134 203] length is 3 [0 19 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0] length is {231115 1 48 1370741301296 20 123 [0 19 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]} FUCK22 FUCK22 0 Warning: End of data: EOF
到目前为止我还木有找到一种go夸语言通信编解码的问题,所以能硬编解码了。
有疑问加站长微信联系(非本文作者)