用golang遍历证券软件通达信日线目录,把所有日线数据发送到mongodb数据库中

wangyanlb · · 3416 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

想就证券行情做一个金融互联网的项目,后台服务器选择了mongodb。 为解决历史行情问题,编写了一个遍历通达信日线软件目录,把所有日线数据解析上传到monggodb的程序。 大家在使用时注意: 1、先在通达信中补足历史数据 2、把程序中的遍历目录分别修改为通达信软件中的上海和深圳的股票行情的目录

下步计划:

1、解决实时行情的问题。预计下周会有阶段性成果。

2、解决复权、复息的问题。预计下周解决。

3、完善自动转分钟线、日线、周线、月线

4、封装一些基础的程序化交易的函数。

5、编写一些模拟清算系统,用于程序化交易策略的历史数据测试

6、就行情数据展开一个大数据、人工智能判断的研究——希望有兴趣的朋友共同参与

7、利用语意理解的技术让爬虫自动去爬取上市公司新闻与公告,就新闻资讯做一些聚类的研究。

高频交易、算法交易是大势所趋,征集有兴趣的朋友共同参与。

邮件: 764238714@qq.com

上代码:server.go

package main

import (
    "bytes"
    "encoding/binary"
    "fmt"
    "gopkg.in/mgo.v2"
    //"gopkg.in/mgo.v2/bson"
    //"log"
    "io"
    "net"
    "os"
    "time"
)

const (
    BZ         = "PUTDATA"
    MongodbUrl = "localhost:27017"
)

type StockFile struct {
    Date, Open, High, Low, Close int32
    Amount                       float32
    Vol                          int32
    Preclose                     int32
}

type StockData struct {
    Code                           string
    Date                           int32
    Open, High, Low, Close, Amount float32
    Vol, Preclose                  int32
}

func main() {
    //在7070端口监听
    tcpAddr, err := net.ResolveTCPAddr("tcp", ":7076")
    checkError(err)
    listener, err1 := net.ListenTCP("tcp", tcpAddr)
    checkError(err1)
    fmt.Println("服务器准备就绪")

    session, err := mgo.Dial(MongodbUrl)
    if err != nil {
        panic(err)
    }
    // Optional. Switch the session to a monotonic behavior.
    session.SetMode(mgo.Monotonic, true)
    defer session.Close()
    //c := session.DB("stock").C("day")

    for {
        //等待客户端的连接
        conn, err2 := listener.Accept()
        if err != nil {
            /*通常服务端为一个服务,不会因为错误而退出。出错后,继续等待下一个连接请求*/
            fmt.Println(err2)
            continue
        }
        fmt.Println("收到客户端的请求")
        go ServeClient(conn, session)
    }

}

func ServeClient(conn net.Conn, session *mgo.Session) {
    defer conn.Close()
    defer fmt.Println("接收数据完成")

    str := ReadData(conn)
    if str == BZ {
        SendData(conn, "OK")
        fmt.Println("开始接受数据...")
        Code := ReadData(conn)
        c := session.DB("stock").C(Code)
        SendData(conn, "CODEOK")
        st := time.Now().UnixNano()
        //显示掺入前集合中元素数目
        countNum, err := c.Count()
        if err != nil {
            panic(err)
        }
        fmt.Println("mongodb连接成功,这个数据库的对象总计: ", countNum)
        ReadHq(conn, c, Code)
        fmt.Printf("执行程序花费的时间为(毫秒): %d \r\n", (time.Now().UnixNano()-st)/int64(time.Millisecond))

    } else {
        SendData(conn, "bad connetct")

    }

    return
}

/*读取验证数据*/
func ReadData(conn net.Conn) string {
    var data bytes.Buffer
    var buf [512]byte
    for {
        n, err := conn.Read(buf[0:])
        if err != nil {
            fmt.Println(err)
            return ""
        }
        //我们的数据以0做为结束的标记
        if buf[n-1] == 0 {
            //n-1去掉结束标记0
            data.Write(buf[0 : n-1])
            break
        } else {
            data.Write(buf[0:n])
        }
    }
    return string(data.Bytes())
}

func ReadHq(conn net.Conn, c *mgo.Collection, Code string) {

    var buf [32]byte
    for {
        _, err := conn.Read(buf[0:32])
        if err == io.EOF {
            fmt.Println("此个文件传输结束")
            break
        }
        if err != nil {
            fmt.Println(err)
            return
        }
        b_buf := bytes.NewBuffer(buf[0:32])
        var x StockFile
        var y StockData
        binary.Read(b_buf, binary.LittleEndian, &x) //binary.LittleEndian  是内存中的字节序的概念,就是把低字节的放到了后面。网络传输一般用BigEndian,内存字节序和cpu有关,编程时要转化。
        y.Date = x.Date
        y.Open = float32(x.Open) / 100
        y.High = float32(x.High) / 100
        y.Low = float32(x.Low) / 100
        y.Close = float32(x.Close) / 100
        y.Preclose = x.Preclose
        y.Amount = x.Amount
        y.Vol = x.Vol
        //y.Code = Code

        fmt.Println(y)
        err = c.Insert(&y)
        if err != nil {
            panic(err)
        }

    }
    return
}

func SendData(conn net.Conn, data string) {
    buf := []byte(data)
    /*向 byte 字节里添加结束标记*/
    buf = append(buf, 0)
    _, err := conn.Write(buf)
    if err != nil {
        fmt.Println(err)
    }
}

func checkError(err error) {
    if err != nil {
        fmt.Println(err)
        os.Exit(0)
    }
}
client

package main

import (
    "bytes"
    "fmt"
    "io"
    //"io/ioutil"
    "net"
    "os"
    "strings"
    "time"
)

const (
    BZ = "PUTDATA"
)

func main() {

    tcpAddr, err := net.ResolveTCPAddr("tcp", "192.168.1.6:7076")
    checkError(err)
    //遍历目录
    dir, err := os.OpenFile("/Users/wangyan/go/src/hangqing/day", os.O_RDONLY, 0666)
    if err != nil {
        fmt.Println(err.Error())
        return
    }
    arrFile, err1 := dir.Readdir(0)
    if err1 != nil {
        fmt.Println(err1.Error())
        return
    }
    for _, v := range arrFile {

        //tcp 握手
        conn, err := net.DialTCP("tcp", nil, tcpAddr)
        checkError(err)
        defer conn.Close()
        SendData(conn, BZ)
        s := ReadData(conn)

        if s == "OK" {
            fmt.Println("连接成功")
            Code := strings.TrimRight(v.Name(), ".day")
            SendData(conn, Code)
            if ReadData(conn) == "CODEOK" {
                fmt.Println("传输证券代码完成")
                f, err := os.OpenFile("/Users/wangyan/go/src/hangqing/day/"+v.Name(), os.O_RDONLY, 0666)
                checkError(err)
                defer f.Close()

                //每次传递32字节
                buf := make([]byte, 32)
                //重置文件指针,否则读不到内容的。
                f.Seek(0, os.SEEK_SET)
                for {
                    n, ferr := f.Read(buf)
                    if ferr != nil && ferr != io.EOF {
                        fmt.Println(ferr.Error())
                        break
                    }
                    if ferr == io.EOF {
                        fmt.Println(ferr.Error())
                        break
                    }
                    if n == 0 {
                        break
                    }
                    //str += string(buf[0:n])
                    SendHq(conn, buf)
                }

                /*一次读完整个文件
                buf, err := ioutil.ReadAll(f)
                checkError(err)
                SendHq(conn, buf)
                */
                fmt.Println("传输数据完成:", v.Name())

                conn.Close()
                f.Close()
            } else {
                fmt.Println("code not translat")
                conn.Close()
            }
        } else {
            fmt.Println("连接失败")
            conn.Close()
        }
        time.Sleep(1 * time.Second)
    }
}

/*读取数据*/
func ReadData(conn net.Conn) string {
    var data bytes.Buffer
    var buf [512]byte
    for {
        n, err := conn.Read(buf[0:])
        if err != nil {
            fmt.Println(err)
            return ""
        }

        //我们的数据以0做为结束的标记
        if buf[n-1] == 0 {
            //n-1去掉结束标记0
            data.Write(buf[0 : n-1])
            break
        } else {
            data.Write(buf[0:n])
        }
    }
    return string(data.Bytes())
}

func SendData(conn net.Conn, data string) {
    buf := []byte(data)
    /*向 byte 字节里添加结束标记*/
    buf = append(buf, 0)
    _, err := conn.Write(buf)
    if err != nil {
        fmt.Println(err)
    }
}

func SendHq(conn net.Conn, data []byte) {
    _, err := conn.Write(data)
    if err != nil {
        fmt.Println(err)
    }
}

func checkError(err error) {
    if err != nil {
        fmt.Println(err)
        os.Exit(0)
    }
}

client.go

 


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:701969077

3416 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传