Cassandra go语言client使用

zhangqingping · · 9546 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

关于什么是cassandra,可以参考:

http://blog.csdn.net/zyz511919766/article/details/38683219

http://cassandra.apache.org/

比较了HBASE、mongodb 和 cassandra

1)HBASE 和 cassandra 都是列式存储,但是 cassandra部署方便,扩展容易
2) mongodb 并不是真正的列式存储,数据扩容比较麻烦,需要提前做好集群分区

casandra是 p2p(gossip)实现的bigtable,  数据一致性可以通过参数配置(R+W >N),  写操作完成是all node,还是指定的node个数,才进行返回。

 

数据模型:

 

尝试了cassandra的两个client。

1. "github.com/gocql/gocql"

2."github.com/hailocab/gocassa"

gocassa是在gocql上面的封装,提供更方便的操作。

在用cassandra之前,建议先熟悉一下CQL,类似 SQL语句的语法。

 

作为一个client, 我们需要考虑的点:

1)连接池

2)批量操作

3)可能还会考虑同步操作(同时更新两个table中的数据)

 

cassandra部署和使用都还算简单,比较困难的是,要摆脱传统的db设计范式思维,要根据后续的数据查询来设计你的bigtable结构,方便将来的查询。

贴上几个相关的参考资料:

http://www.slideshare.net/yukim/cql3-in-depth (CQL相关介绍)
http://www.slideshare.net/jaykumarpatel/cassandra-data-modeling-best-practices
http://www.infoq.com/cn/articles/best-practices-cassandra-data-model-design-part2 (ebay的cassandra实践)

 

然后,贴上两个client使用示例:

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/gocql/gocql"
)

func main() {
    // connect to the cluster
    cluster := gocql.NewCluster("127.0.0.1")
    cluster.Keyspace = "demo"
    cluster.Consistency = gocql.Quorum
    //设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
    cluster.NumConns = 3

    session, _ := cluster.CreateSession()
    time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
    fmt.Println(session.Pool.Size())
    defer session.Close()

    //unlogged batch, 进行批量插入,最好是partition key 一致的情况
    t := time.Now()
    batch := session.NewBatch(gocql.UnloggedBatch)
    for i := 0; i < 100; i++ {
        batch.Query(`INSERT INTO bigrow (rowname, iplist) VALUES (?,?)`, fmt.Sprintf("name_%d", i), fmt.Sprintf("ip_%d", i))
    }
    if err := session.ExecuteBatch(batch); err != nil {
        fmt.Println("execute batch:", err)
    }
    bt := time.Now().Sub(t).Nanoseconds()

    t = time.Now()
    for i := 0; i < 100; i++ {
        session.Query(`INSERT INTO bigrow (rowname, iplist) VALUES (?,?)`, fmt.Sprintf("name_%d", i), fmt.Sprintf("ip_%d", i))
    }
    nt := time.Now().Sub(t).Nanoseconds()

    t = time.Now()
    sbatch := session.NewBatch(gocql.UnloggedBatch)
    for i := 0; i < 100; i++ {
        sbatch.Query(`INSERT INTO bigrow (rowname, iplist) VALUES (?,?)`, "samerow", fmt.Sprintf("ip_%d", i))
    }
    if err := session.ExecuteBatch(sbatch); err != nil {
        fmt.Println("execute batch:", err)
    }
    sbt := time.Now().Sub(t).Nanoseconds()
    fmt.Println("bt:", bt, "sbt:", sbt, "nt:", nt)

    //----------out put------------------
    // ./rawtest
    // bt: 5795593 sbt: 3003774 nt: 261775
    //------------------------------------

    // insert a tweet
    if err := session.Query(`INSERT INTO tweet (timeline, id, text) VALUES (?, ?, ?)`,
        "me", gocql.TimeUUID(), "hello world").Exec(); err != nil {
        log.Fatal(err)
    }

    var id gocql.UUID
    var text string

    /* Search for a specific set of records whose 'timeline' column matches
     * the value 'me'. The secondary index that we created earlier will be
     * used for optimizing the search */
    if err := session.Query(`SELECT id, text FROM tweet WHERE timeline = ? LIMIT 1`,
        "me").Consistency(gocql.One).Scan(&id, &text); err != nil {
        log.Fatal(err)
    }
    fmt.Println("Tweet:", id, text)

    // list all tweets
    iter := session.Query(`SELECT id, text FROM tweet WHERE timeline = ?`, "me").Iter()
    for iter.Scan(&id, &text) {
        fmt.Println("Tweet:", id, text)
    }
    if err := iter.Close(); err != nil {
        log.Fatal(err)
    }

    query := session.Query(`SELECT * FROM bigrow where rowname = ?`, "30")
    // query := session.Query(`SELECT * FROM bigrow `)

    var m map[string]interface{}
    m = make(map[string]interface{}, 10)
    err := query.Consistency(gocql.One).MapScan(m)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("%#v", m)
}
package main

import (
    "fmt"
    "time"

    "github.com/hailocab/gocassa"
)

// This test assumes that cassandra is running on default port locally and
// that the keySpace called 'test' already exists.

type Sale struct {
    Id         string
    CustomerId string
    SellerId   string
    Price      int
    Created    time.Time
}

func main() {

    keySpace, err := gocassa.ConnectToKeySpace("not_exist_demo", []string{"127.0.0.1"}, "", "")

    if err != nil {
        panic(err)
    }
    salesTable := keySpace.Table("sale", Sale{}, gocassa.Keys{
        PartitionKeys: []string{"Id"},
    })

    // Create the table - we ignore error intentionally
    err = salesTable.Create()
    fmt.Println(err)
    // We insert the first record into our table - yay!
    err = salesTable.Set(Sale{
        Id:         "sale-1",
        CustomerId: "customer-1",
        SellerId:   "seller-1",
        Price:      42,
        Created:    time.Now(),
    }).Run()
    if err != nil {
        panic(err)
    }

    result := Sale{}
    if err := salesTable.Where(gocassa.Eq("Id", "sale-1")).ReadOne(&result).Run(); err != nil {
        panic(err)
    }
    fmt.Println(result)
}

 

更多配置可参考:

https://github.com/gocql/gocql/blob/master/cluster.go#L57

// ClusterConfig is a struct to configure the default cluster implementation
// of gocoql. It has a varity of attributes that can be used to modify the
// behavior to fit the most common use cases. Applications that requre a
// different setup must implement their own cluster.
type ClusterConfig struct {
    Hosts             []string          // addresses for the initial connections
    CQLVersion        string            // CQL version (default: 3.0.0)
    ProtoVersion      int               // version of the native protocol (default: 2)
    Timeout           time.Duration     // connection timeout (default: 600ms)
    Port              int               // port (default: 9042)
    Keyspace          string            // initial keyspace (optional)
    NumConns          int               // number of connections per host (default: 2)
    NumStreams        int               // number of streams per connection (default: max per protocol, either 128 or 32768)
    Consistency       Consistency       // default consistency level (default: Quorum)
    Compressor        Compressor        // compression algorithm (default: nil)
    Authenticator     Authenticator     // authenticator (default: nil)
    RetryPolicy       RetryPolicy       // Default retry policy to use for queries (default: 0)
    SocketKeepalive   time.Duration     // The keepalive period to use, enabled if > 0 (default: 0)
    ConnPoolType      NewPoolFunc       // The function used to create the connection pool for the session (default: NewSimplePool)
    DiscoverHosts     bool              // If set, gocql will attempt to automatically discover other members of the Cassandra cluster (default: false)
    MaxPreparedStmts  int               // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
    MaxRoutingKeyInfo int               // Sets the maximum cache size for query info about statements for each session (default: 1000)
    PageSize          int               // Default page size to use for created sessions (default: 5000)
    SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset)
    Discovery         DiscoveryConfig
    SslOpts           *SslOptions
    DefaultTimestamp  bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above)
}
连接池,默认采用是NewSimplePool,这里用的是roud robin
源码:https://github.com/gocql/gocql/blob/master/connectionpool.go#L454
ConnPoolType 是一个接口,可以自己实现接口来定制话自己的策略。
数据一致性策略,通过Consistency配置,默认是Quorum(大部分节点)
type Consistency uint16

const (
    Any         Consistency = 0x00
    One         Consistency = 0x01
    Two         Consistency = 0x02
    Three       Consistency = 0x03
    Quorum      Consistency = 0x04
    All         Consistency = 0x05
    LocalQuorum Consistency = 0x06
    EachQuorum  Consistency = 0x07
    LocalOne    Consistency = 0x0A
)

 

 

有疑问加站长微信联系(非本文作者)

本文来自:博客园

感谢作者:zhangqingping

查看原文:Cassandra go语言client使用

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

9546 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传