连接参数配置
package mongod import ( "context" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "log" "time" "src/golangConfig" ) var ( MongoServiceUrl = "172.168.1.153:27017" MongoDataBaseName = "baseName" MongoColPacketLogName = "logName" MongoDataBase *mongo.Database = nil MongoPacketLogCol *mongo.Collection = nil MongoClient *mongo.Client = nil MongoContext context.Context = nil ) type Yxtranslog struct { Id primitive.ObjectID `json:"id" bson:"_id"` name string `json:"name" bson:"name"` ......... } func Connect() (context.Context, error) { MongoServiceUrl = golangConfig.InitConfigMap["mongoServiceUrl"].(string) MongoDataBaseName = golangConfig.InitConfigMap["mongoDataBaseName"].(string) MongoColPacketLogName = golangConfig.InitConfigMap["mongoColPacketLogName"].(string) ctx, _ := context.WithTimeout(context.Background(), 120*time.Second) client, err := mongo.Connect(ctx, &options.ClientOptions{Hosts: []string{MongoServiceUrl}}) if err != nil { log.Fatal(err) return nil, err } // Check the connection err = client.Ping(context.TODO(), nil) if err != nil { log.Fatal(err) return nil, err } MongoContext = ctx MongoClient = client MongoDataBase = MongoClient.Database(MongoDataBaseName) MongoPacketLogCol = MongoDataBase.Collection(MongoColPacketLogName) return ctx, err } func Disconnect() error { err := MongoClient.Disconnect(context.TODO()) if err != nil { log.Fatal(err) return err } return nil } func GetTransLogCollection() *mongo.Collection { return MongoPacketLogCol } func insertMany(arr []interface{}) *mongo.InsertManyResult { collection := GetTransLogCollection() resm, err := collection.InsertMany(MongoContext, arr) if err != nil { log.Fatal(err) } return resm } func InsertMongodb(batch []interface{}) { go insertMany(batch) //Disconnect() }
连接初始化
dbwrite.MongodInit()
以下代码为批量插入数据库(到达预设长度后,立即持久化。当最后长度不满足后,到达一定时间插入数据库),通过通道eventQueue操作数据
package dbwrite import ( "fmt" "time" "src/golangConfig" "src/mongod" ) var ( eventQueue chan interface{} BatchWriteSize int Workers int lingerTime time.Duration batchProcessor = func(batch []interface{}) error { mongod.InsertMongodb(batch) return nil } errHandler = func(err error, batch []interface{}) { fmt.Println("some error happens") } ) func setWorkers() { for i := 0; i < Workers; i++ { go func() { var batch []interface{} lingerTimer := time.NewTimer(5 * time.Second) if !lingerTimer.Stop() { <-lingerTimer.C } defer lingerTimer.Stop() for { select { case msg := <-eventQueue: batch = append(batch, msg) if len(batch) != BatchWriteSize { if len(batch) == 1 { lingerTimer.Reset(lingerTime) } break } if err := batchProcessor(batch); err != nil { errHandler(err, batch) } if !lingerTimer.Stop() { <-lingerTimer.C } batch = make([]interface{}, 0) case <-lingerTimer.C: if err := batchProcessor(batch); err != nil { errHandler(err, batch) } batch = make([]interface{}, 0) } } }() } } func PushMongodElement(e interface{}) { eventQueue <- e } func MongodInit() { eventQueue = make(chan interface{}, golangConfig.InitConfigMap["mongodEventQueueCache"].(int)) BatchWriteSize = golangConfig.InitConfigMap["mongodBatchWriteSize"].(int) Workers = golangConfig.InitConfigMap["mongodWorkers"].(int) lingerTime = golangConfig.InitConfigMap["mongodLingerTime"].(time.Duration) mongod.Connect() setWorkers() }
向通道eventQueue存入值
json := bson.M{"name": name.....} dbwrite.PushMongodElement(json)
有疑问加站长微信联系(非本文作者)