go cassandra 示例2

mb6018e97449ea1 · · 578 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

建立连接,参数配置(说明:下文中golangConfig.InitConfigMap为项目启动时加载的配置文件,读取成全局的Map)

package cassandra

import (
	"fmt"
	"github.com/gocql/gocql"
	"time"
	"src/golangConfig"
)

var (
	// connect to the cluster
	CassandraSession        *gocql.Session
	cluster                 *gocql.ClusterConfig
	CassandraSessionBatch   *gocql.Batch
	clusterCreateSessionErr error
	CassandraInsertCql      string
)

func CassandraConnectInit() {
	CassandraInsertCql = "INSERT INTO " + golangConfig.InitConfigMap["cassandraTable"].(string) + " (srcIp, .....,uuid) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) "
	cluster = gocql.NewCluster()
	cluster.Keyspace = golangConfig.InitConfigMap["cassandraKeyspace"].(string)
	hosts := []string{golangConfig.InitConfigMap["cassandraServiceUrl"].(string)}
	cluster.Hosts = hosts
	cluster.ConnectTimeout = golangConfig.InitConfigMap["cassandraConnectTimeout"].(time.Duration)
	cluster.Timeout = golangConfig.InitConfigMap["cassandraTimeout"].(time.Duration)
	cluster.Consistency = gocql.Quorum
	cluster.NumConns = golangConfig.InitConfigMap["cassandraNumConns"].(int)
	cluster.Consistency = gocql.One
	CassandraSession, clusterCreateSessionErr = cluster.CreateSession()
	if clusterCreateSessionErr != nil {
		panic(clusterCreateSessionErr)
	}

}

func CassandraSessionBatchInit() *gocql.Batch {
	CassandraSessionBatch = CassandraSession.NewBatch(gocql.UnloggedBatch)
	return CassandraSessionBatch
}

func InsertCassandraMany(batch1 *gocql.Batch) {
	if err := CassandraSession.ExecuteBatch(batch1); err != nil {
		fmt.Println("execute batch:", err)
	}
}

连接初始化,在Main方法中调用

dbwrite.CassandraInit()

以下代码为批量插入数据库(到达预设长度后,立即持久化。当最后长度不满足后,到达一定时间插入数据库),通过通道eventQueue1操作数据

package dbwrite

import (
	"fmt"
	"github.com/gocql/gocql"
	uuid "github.com/satori/go.uuid"
	"time"
	"src/golangConfig"
	"src/cassandra"
)

type CassandraTable struct {
	cassandraInsertCql  string
	srcIp               string
	.....表字段

func NewCassandraTable(cassandraInsertCql string, srcIp string, .....表字段) *CassandraTable {
	var cassandraTable = new(CassandraTable)
	cassandraTable.cassandraInsertCql = cassandraInsertCql
	cassandraTable.srcIp = srcIp
	.....表字段
	return cassandraTable
}

var (
	eventQueue1     chan *CassandraTable
	BatchWriteSize1 int
	Workers1        int
	lingerTime1     time.Duration

	batchProcessor1 = func(batch *gocql.Batch) error {
		yxCassandra.InsertCassandraMany(batch)
		return nil
	}
	errHandler1 = func(err error, batch *gocql.Batch) {
		fmt.Println("some error happens")
	}
)

func setWorkers1() {
	for i := 0; i < Workers1; i++ {
		go func() {
			batch := cassandra.CassandraSessionBatchInit()
			lingerTimer := time.NewTimer(5 * time.Second)
			if !lingerTimer.Stop() {
				<-lingerTimer.C
			}
			defer lingerTimer.Stop()

			for {
				select {
				case cassandraTable := <-eventQueue1:
					ul, _ := uuid.NewV4()
					batch.Query(cassandraTable.cassandraInsertCql, cassandraTable.srcIp, .....表字段, ul.String())
					if batch.Size() != BatchWriteSize1 {
						if batch.Size() == 1 {
							lingerTimer.Reset(lingerTime1)
						}
						break
					}
					if err := batchProcessor1(batch); err != nil {
						errHandler1(err, batch)
					}

					if !lingerTimer.Stop() {
						<-lingerTimer.C
					}

					batch = cassandra.CassandraSessionBatchInit()
				case <-lingerTimer.C:
					if err := batchProcessor1(batch); err != nil {
						errHandler1(err, batch)
					}
					batch = cassandra.CassandraSessionBatchInit()
				}
			}
		}()
	}
}

func PushCassandraElement(e *CassandraTable) {
	eventQueue1 <- e
}

func CassandraInit() {
	eventQueue1 = make(chan *CassandraTable, golangConfig.InitConfigMap["cassandraEventQueueCache"].(int))
	BatchWriteSize1 = golangConfig.InitConfigMap["cassandraBatchWriteSize"].(int)
	Workers1 = golangConfig.InitConfigMap["cassandraWorkers"].(int)
	lingerTime1 = golangConfig.InitConfigMap["cassandraLingerTime"].(time.Duration)
	cassandra.CassandraConnectInit()
	setWorkers1()
}

向通道存入值

cassandraTable:=dbwrite.NewCassandraTable(yxCassandra.CassandraInsertCql, packetInfo.srcIp....)
dbwrite.PushCassandraElement(cassandraTable)

 


有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:mb6018e97449ea1

查看原文:go cassandra 示例2

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

578 次点击  
加入收藏 微博
上一篇:go mongodb
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传