想就证券行情做一个金融互联网的项目,后台服务器选择了mongodb。 为解决历史行情问题,编写了一个遍历通达信日线软件目录,把所有日线数据解析上传到monggodb的程序。 大家在使用时注意:
1、先在通达信中补足历史数据
2、把程序中的遍历目录分别修改为通达信软件中的上海和深圳的股票行情的目录
下步计划:
1、解决实时行情的问题。预计下周会有阶段性成果。
2、解决复权、复息的问题。预计下周解决。
3、完善自动转分钟线、日线、周线、月线
4、封装一些基础的程序化交易的函数。
5、编写一些模拟清算系统,用于程序化交易策略的历史数据测试
6、就行情数据展开一个大数据、人工智能判断的研究——希望有兴趣的朋友共同参与
7、利用语意理解的技术让爬虫自动去爬取上市公司新闻与公告,就新闻资讯做一些聚类的研究。
高频交易、算法交易是大势所趋,征集有兴趣的朋友共同参与。
邮件: 764238714@qq.com
上代码:
server
```go
package main
import (
"bytes"
"encoding/binary"
"fmt"
"gopkg.in/mgo.v2"
//"gopkg.in/mgo.v2/bson"
//"log"
"io"
"net"
"os"
"time"
)
const (
BZ = "PUTDATA"
MongodbUrl = "localhost:27017"
)
type StockFile struct {
Date, Open, High, Low, Close int32
Amount float32
Vol int32
Preclose int32
}
type StockData struct {
Code string
Date int32
Open, High, Low, Close, Amount float32
Vol, Preclose int32
}
func main() {
//在7070端口监听
tcpAddr, err := net.ResolveTCPAddr("tcp", ":7076")
checkError(err)
listener, err1 := net.ListenTCP("tcp", tcpAddr)
checkError(err1)
fmt.Println("服务器准备就绪")
session, err := mgo.Dial(MongodbUrl)
if err != nil {
panic(err)
}
// Optional. Switch the session to a monotonic behavior.
session.SetMode(mgo.Monotonic, true)
defer session.Close()
//c := session.DB("stock").C("day")
for {
//等待客户端的连接
conn, err2 := listener.Accept()
if err != nil {
/*通常服务端为一个服务,不会因为错误而退出。出错后,继续等待下一个连接请求*/
fmt.Println(err2)
continue
}
fmt.Println("收到客户端的请求")
go ServeClient(conn, session)
}
}
func ServeClient(conn net.Conn, session *mgo.Session) {
defer conn.Close()
defer fmt.Println("接收数据完成")
str := ReadData(conn)
if str == BZ {
SendData(conn, "OK")
fmt.Println("开始接受数据...")
Code := ReadData(conn)
c := session.DB("stock").C(Code)
SendData(conn, "CODEOK")
st := time.Now().UnixNano()
//显示掺入前集合中元素数目
countNum, err := c.Count()
if err != nil {
panic(err)
}
fmt.Println("mongodb连接成功,这个数据库的对象总计: ", countNum)
ReadHq(conn, c, Code)
fmt.Printf("执行程序花费的时间为(毫秒): %d \r\n", (time.Now().UnixNano()-st)/int64(time.Millisecond))
} else {
SendData(conn, "bad connetct")
}
return
}
/*读取验证数据*/
func ReadData(conn net.Conn) string {
var data bytes.Buffer
var buf [512]byte
for {
n, err := conn.Read(buf[0:])
if err != nil {
fmt.Println(err)
return ""
}
//我们的数据以0做为结束的标记
if buf[n-1] == 0 {
//n-1去掉结束标记0
data.Write(buf[0 : n-1])
break
} else {
data.Write(buf[0:n])
}
}
return string(data.Bytes())
}
func ReadHq(conn net.Conn, c *mgo.Collection, Code string) {
var buf [32]byte
for {
_, err := conn.Read(buf[0:32])
if err == io.EOF {
fmt.Println("此个文件传输结束")
break
}
if err != nil {
fmt.Println(err)
return
}
b_buf := bytes.NewBuffer(buf[0:32])
var x StockFile
var y StockData
binary.Read(b_buf, binary.LittleEndian, &x) //binary.LittleEndian 是内存中的字节序的概念,就是把低字节的放到了后面。网络传输一般用BigEndian,内存字节序和cpu有关,编程时要转化。
y.Date = x.Date
y.Open = float32(x.Open) / 100
y.High = float32(x.High) / 100
y.Low = float32(x.Low) / 100
y.Close = float32(x.Close) / 100
y.Preclose = x.Preclose
y.Amount = x.Amount
y.Vol = x.Vol
//y.Code = Code
fmt.Println(y)
err = c.Insert(&y)
if err != nil {
panic(err)
}
}
return
}
func SendData(conn net.Conn, data string) {
buf := []byte(data)
/*向 byte 字节里添加结束标记*/
buf = append(buf, 0)
_, err := conn.Write(buf)
if err != nil {
fmt.Println(err)
}
}
func checkError(err error) {
if err != nil {
fmt.Println(err)
os.Exit(0)
}
}
```
client
```go
package main
import (
"bytes"
"fmt"
"io"
//"io/ioutil"
"net"
"os"
"strings"
"time"
)
const (
BZ = "PUTDATA"
)
func main() {
tcpAddr, err := net.ResolveTCPAddr("tcp", "192.168.1.6:7076")
checkError(err)
//遍历目录
dir, err := os.OpenFile("/Users/wangyan/go/src/hangqing/day", os.O_RDONLY, 0666)
if err != nil {
fmt.Println(err.Error())
return
}
arrFile, err1 := dir.Readdir(0)
if err1 != nil {
fmt.Println(err1.Error())
return
}
for _, v := range arrFile {
//tcp 握手
conn, err := net.DialTCP("tcp", nil, tcpAddr)
checkError(err)
defer conn.Close()
SendData(conn, BZ)
s := ReadData(conn)
if s == "OK" {
fmt.Println("连接成功")
Code := strings.TrimRight(v.Name(), ".day")
SendData(conn, Code)
if ReadData(conn) == "CODEOK" {
fmt.Println("传输证券代码完成")
f, err := os.OpenFile("/Users/wangyan/go/src/hangqing/day/"+v.Name(), os.O_RDONLY, 0666)
checkError(err)
defer f.Close()
//每次传递32字节
buf := make([]byte, 32)
//重置文件指针,否则读不到内容的。
f.Seek(0, os.SEEK_SET)
for {
n, ferr := f.Read(buf)
if ferr != nil && ferr != io.EOF {
fmt.Println(ferr.Error())
break
}
if ferr == io.EOF {
fmt.Println(ferr.Error())
break
}
if n == 0 {
break
}
//str += string(buf[0:n])
SendHq(conn, buf)
}
/*一次读完整个文件
buf, err := ioutil.ReadAll(f)
checkError(err)
SendHq(conn, buf)
*/
fmt.Println("传输数据完成:", v.Name())
conn.Close()
f.Close()
} else {
fmt.Println("code not translat")
conn.Close()
}
} else {
fmt.Println("连接失败")
conn.Close()
}
time.Sleep(1 * time.Second)
}
}
/*读取数据*/
func ReadData(conn net.Conn) string {
var data bytes.Buffer
var buf [512]byte
for {
n, err := conn.Read(buf[0:])
if err != nil {
fmt.Println(err)
return ""
}
//我们的数据以0做为结束的标记
if buf[n-1] == 0 {
//n-1去掉结束标记0
data.Write(buf[0 : n-1])
break
} else {
data.Write(buf[0:n])
}
}
return string(data.Bytes())
}
func SendData(conn net.Conn, data string) {
buf := []byte(data)
/*向 byte 字节里添加结束标记*/
buf = append(buf, 0)
_, err := conn.Write(buf)
if err != nil {
fmt.Println(err)
}
}
func SendHq(conn net.Conn, data []byte) {
_, err := conn.Write(data)
if err != nil {
fmt.Println(err)
}
}
func checkError(err error) {
if err != nil {
fmt.Println(err)
os.Exit(0)
}
}
```
有疑问加站长微信联系(非本文作者)