今天有一个从mongodb读取数据,然后放到sqlserver的工作,当然这等程序必须用go来完成啊。
先准备mongdb的第三方驱动包 http://labix.org/mgo
odbc的第三方驱动包 https://bitbucket.org/miquella/mgodbc
读取mongodb的数据的条件是根据日期范围,以及字符串条件。上代码。
package main import ( _ "bitbucket.org/miquella/mgodbc" "database/sql" "encoding/json" "fmt" "labix.org/v2/mgo" "labix.org/v2/mgo/bson" "os" "runtime" "time" ) type serverslice struct { Servers []string Sqlconn string Start string End string } type user struct { UserName string "UserName" Password string "Password" Email string "Email" Phone string "Phone" SubTime time.Time "SubTime" } var config serverslice var worker = runtime.NumCPU() //初始化配置 func init() { file, _ := os.Open("config.json") defer file.Close() buf := make([]byte, 2048) n, _ := file.Read(buf) err := json.Unmarshal(buf[:n], &config) if err != nil { panic(err) fmt.Println(err) } } func main() { runtime.GOMAXPROCS(runtime.NumCPU()) var chanUser = make(chan user) // 标记完成 dones := make(chan struct{}, worker) go readmongodb(chanUser) for i := 0; i < worker; i++ { go writesql(chanUser, dones) } awaitForCloseResult(dones) fmt.Println("完成") } //读取mongodb数据 func readmongodb(chanUser chan<- user) { start, _ := time.Parse(layout, config.Start) end, _ := time.Parse(layout, config.End) for _, server := range config.Servers { session, err := mgo.Dial(server) if err != nil { fmt.Println("打开", server, "失败") panic(err) } defer session.Close() var regex = bson.RegEx{} regex.Pattern = "^139.*" //查询条件是 start<=subtime<=end;email=nil;phone 不以139开头 var query = bson.M{"SubTime": bson.M{"$gte": start, "$lte": end}, "Email": nil, "Phone": bson.M{"$not": regex}} //sqlreader类似 iter := session.DB("db").C("Users").Find(query).Iter() message := user{} for iter.Next(&message) { chanUser <- message } //关闭通道 close(chanUser) } } //写入sqlserver func writesql(chanUser <-chan user, dones chan<- struct{}) { con, err := sql.Open("mgodbc", config.Sqlconn) if err != nil { fmt.Println(err) return } defer con.Close() //con.Exec(prepareSQL) for message := range chanUser { //组装sql var sql = fmt.Sprintf(insertSql, message.UserName, message.Password, message.Email, message.Phone, message.SubTime.Format(layout)) _, err = con.Exec(sql) if err != nil { fmt.Println(err) } } dones <- struct{}{} } /*等待操作完成*/ func awaitForCloseResult(dones <-chan struct{}) { for { <-dones worker-- if worker <= 0 { return } } } var layout = "2006-01-02 15:04:05" var insertSql string = `INSERT INTO [dbo].[User] ([UserName] ,[Password] ,[Email] ,[Phone] ,[SubTime] ) VALUES ('%s','%s','%s','%s','%s'); `
对应的配置文件config.json
{ "servers": [ "127.0.0.1:27017" ], "sqlconn":"driver={SQL Server};SERVER=localhost;UID=aa;PWD=bbb;DATABASE=auth", "start":"2013-05-17 00:00:00", "end":"2013-06-01 00:00:00" }
有疑问加站长微信联系(非本文作者)