建立连接,参数配置(说明:下文中golangConfig.InitConfigMap为项目启动时加载的配置文件,读取成全局的Map)
package cassandra import ( "fmt" "github.com/gocql/gocql" "time" "src/golangConfig" ) var ( // connect to the cluster CassandraSession *gocql.Session cluster *gocql.ClusterConfig CassandraSessionBatch *gocql.Batch clusterCreateSessionErr error CassandraInsertCql string ) func CassandraConnectInit() { CassandraInsertCql = "INSERT INTO " + golangConfig.InitConfigMap["cassandraTable"].(string) + " (srcIp, .....,uuid) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) " cluster = gocql.NewCluster() cluster.Keyspace = golangConfig.InitConfigMap["cassandraKeyspace"].(string) hosts := []string{golangConfig.InitConfigMap["cassandraServiceUrl"].(string)} cluster.Hosts = hosts cluster.ConnectTimeout = golangConfig.InitConfigMap["cassandraConnectTimeout"].(time.Duration) cluster.Timeout = golangConfig.InitConfigMap["cassandraTimeout"].(time.Duration) cluster.Consistency = gocql.Quorum cluster.NumConns = golangConfig.InitConfigMap["cassandraNumConns"].(int) cluster.Consistency = gocql.One CassandraSession, clusterCreateSessionErr = cluster.CreateSession() if clusterCreateSessionErr != nil { panic(clusterCreateSessionErr) } } func CassandraSessionBatchInit() *gocql.Batch { CassandraSessionBatch = CassandraSession.NewBatch(gocql.UnloggedBatch) return CassandraSessionBatch } func InsertCassandraMany(batch1 *gocql.Batch) { if err := CassandraSession.ExecuteBatch(batch1); err != nil { fmt.Println("execute batch:", err) } }
连接初始化,在Main方法中调用
dbwrite.CassandraInit()
以下代码为批量插入数据库(到达预设长度后,立即持久化。当最后长度不满足后,到达一定时间插入数据库),通过通道eventQueue1操作数据
package dbwrite import ( "fmt" "github.com/gocql/gocql" uuid "github.com/satori/go.uuid" "time" "src/golangConfig" "src/cassandra" ) type CassandraTable struct { cassandraInsertCql string srcIp string .....表字段 func NewCassandraTable(cassandraInsertCql string, srcIp string, .....表字段) *CassandraTable { var cassandraTable = new(CassandraTable) cassandraTable.cassandraInsertCql = cassandraInsertCql cassandraTable.srcIp = srcIp .....表字段 return cassandraTable } var ( eventQueue1 chan *CassandraTable BatchWriteSize1 int Workers1 int lingerTime1 time.Duration batchProcessor1 = func(batch *gocql.Batch) error { yxCassandra.InsertCassandraMany(batch) return nil } errHandler1 = func(err error, batch *gocql.Batch) { fmt.Println("some error happens") } ) func setWorkers1() { for i := 0; i < Workers1; i++ { go func() { batch := cassandra.CassandraSessionBatchInit() lingerTimer := time.NewTimer(5 * time.Second) if !lingerTimer.Stop() { <-lingerTimer.C } defer lingerTimer.Stop() for { select { case cassandraTable := <-eventQueue1: ul, _ := uuid.NewV4() batch.Query(cassandraTable.cassandraInsertCql, cassandraTable.srcIp, .....表字段, ul.String()) if batch.Size() != BatchWriteSize1 { if batch.Size() == 1 { lingerTimer.Reset(lingerTime1) } break } if err := batchProcessor1(batch); err != nil { errHandler1(err, batch) } if !lingerTimer.Stop() { <-lingerTimer.C } batch = cassandra.CassandraSessionBatchInit() case <-lingerTimer.C: if err := batchProcessor1(batch); err != nil { errHandler1(err, batch) } batch = cassandra.CassandraSessionBatchInit() } } }() } } func PushCassandraElement(e *CassandraTable) { eventQueue1 <- e } func CassandraInit() { eventQueue1 = make(chan *CassandraTable, golangConfig.InitConfigMap["cassandraEventQueueCache"].(int)) BatchWriteSize1 = golangConfig.InitConfigMap["cassandraBatchWriteSize"].(int) Workers1 = golangConfig.InitConfigMap["cassandraWorkers"].(int) lingerTime1 = golangConfig.InitConfigMap["cassandraLingerTime"].(time.Duration) cassandra.CassandraConnectInit() setWorkers1() }
向通道存入值
cassandraTable:=dbwrite.NewCassandraTable(yxCassandra.CassandraInsertCql, packetInfo.srcIp....) dbwrite.PushCassandraElement(cassandraTable)
有疑问加站长微信联系(非本文作者)