Golang CSP模型基础库

筑梦之队 · · 507 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

Go语言的哲学思想之一为:Do not communicate by sharing memory; instead, share memory by communicating. 翻译成中文即是:不要通过共享内存进行通信;取而代之,应该通过通信来共享内存。
为了支持这种哲学,Go语言提供了channel(通道)。
想当初,我刚看到这种哲学的时候觉得特别好,但是在具体实现代码的时候,对其理解却不是很深刻。在项目中,我总是把channel当作生产者-消费者模型中的消息队列来进行使用。这样对于在不同的goroutine中传输数据是很方便的。
但是对于共享内存数据,我一直使用的是传统的锁方式,而在Go语言中,常用的就是Mutex和RWMutex。对于从C/C++/Java/C#转型到Go语言的程序员而言,使用Mutex/RWMutex是非常直观的。我也尝试过使用channel来共享数据,但是觉得代码非常地繁琐,于是就放弃了。
但是随着在越来越多的项目中使用Go,以及越来越多的同事使用Go;在其它语言中使用锁的常见问题也在Go里面暴露出来了。于是,我转而继续思想使用channel来实现内存数据的共享。
最近在使用Erlang,我从中得到不少的灵感。因为Erlang是一门只提供通过发送消息来共享内存的语言,而没有提供传统的锁方式。在Erlang中,OTP框架中提供了一个叫做gen_server的behaviour,而这可以大大地简化逻辑的实现。
于是,我使用Go语言的channel实现了一个类似的版本。
代码如下:
baseModel.go

/*
This package is designed to simplify the use of csp model.
In csp model, we use channel to communicate among goroutines.
But it's not easy and error prone to implement the logic every time.
So, this package is designed as a template to use.
*/
package cspBaseModel

import (
    "context"
    "fmt"
)

// Request type is defined as the message exchanged between higher layer and the base model.
type Request struct {
    Name      string      // The identity of a request. Which is used to find the callback function.
    Parameter interface{} // The data sent to the callback function.
    ReturnCh  interface{} // A channel used to pass message between the request method and the callback function.
}

func NewRequest(name string, parameter, returnCh interface{}) *Request {
    return &Request{
        Name:      name,
        Parameter: parameter,
        ReturnCh:  returnCh,
    }
}

// BaseModel type is the core type
type BaseModel struct {
    callbackMap    map[string]func(*Request) // This callbackMap stores all the name and callback function pairs.
    RequestChannel chan *Request             // This channel is used to receive all the request from higher layer functions.
}

// Higher layer modules can register name and callback function pairs to base model.
func (this *BaseModel) Register(name string, callback func(*Request)) {
    if _, exists := this.callbackMap[name]; exists {
        panic(fmt.Sprintf("Item with name:%s has already existed.", name))
    }
    this.callbackMap[name] = callback
}

// Higher layer modules start base model to handle all the requests.
// ctx is used to stop this goroutine, to avoid goroutine leak.
func (this *BaseModel) Start(ctx context.Context) {
    go func() {
        for {
            select {
            case requestObj := <-this.RequestChannel:
                if callback, exists := this.callbackMap[requestObj.Name]; !exists {
                    panic(fmt.Sprintf("There is no callback related to %s", requestObj.Name))
                } else {
                    callback(requestObj)
                }
            case <-ctx.Done():
                return
            }
        }
    }()
}

func NewBaseModel() *BaseModel {
    return &BaseModel{
        callbackMap:    make(map[string]func(*Request), 16),
        RequestChannel: make(chan *Request, 64),
    }
}

baseModel_test.go

package cspBaseModel

import (
    "context"
    "fmt"
    "sync"
    "testing"
)

type Player struct {
    Id    int
    Name  string
    Level int
}

func (this *Player) Equal(other *Player) bool {
    return this.Id == other.Id && this.Name == other.Name && this.Level == other.Level
}

func (this *Player) String() string {
    return fmt.Sprintf("{Id:%d, Name:%s, Level:%d}", this.Id, this.Name, this.Level)
}

func NewPlayer(id int, name string, level int) *Player {
    return &Player{
        Id:    id,
        Name:  name,
        Level: level,
    }
}

// Use mutex as the way to prevent concurrency
type MutexPlayer struct {
    mutex     sync.RWMutex
    playerMap map[int]*Player
}

func (this *MutexPlayer) GetPlayerById(id int) (*Player, bool) {
    this.mutex.RLock()
    defer this.mutex.RUnlock()
    playerObj, exists := this.playerMap[id]
    return playerObj, exists
}

func (this *MutexPlayer) GetPlayerList() []*Player {
    this.mutex.RLock()
    defer this.mutex.RUnlock()
    playerList := make([]*Player, 0, len(this.playerMap))
    for _, value := range this.playerMap {
        playerList = append(playerList, value)
    }
    return playerList
}

func (this *MutexPlayer) AddPlayer(playerObj *Player) {
    this.mutex.Lock()
    defer this.mutex.Unlock()
    this.playerMap[playerObj.Id] = playerObj
}

func NewMutexPlayer() *MutexPlayer {
    return &MutexPlayer{
        playerMap: make(map[int]*Player, 1024),
    }
}

// Use csp as the way to prevent concurrency
type CspPlayer struct {
    playerMap    map[int]*Player
    baseModelObj *BaseModel
    cancel       context.CancelFunc
}

func NewCspPlayer() *CspPlayer {
    ctx, cancel := context.WithCancel(context.Background())
    cspPlayerObj := &CspPlayer{
        playerMap:    make(map[int]*Player, 1024),
        baseModelObj: NewBaseModel(),
        cancel:       cancel,
    }
    cspPlayerObj.baseModelObj.Start(ctx)

    // Register callback
    cspPlayerObj.baseModelObj.Register("GetPlayerById", cspPlayerObj.getPlayerByIdCallback)
    cspPlayerObj.baseModelObj.Register("GetPlayerList", cspPlayerObj.getPlayerListCallback)
    cspPlayerObj.baseModelObj.Register("AddPlayer", cspPlayerObj.addPlayerCallback)

    return cspPlayerObj
}

func (this *CspPlayer) Stop() {
    this.cancel()
}

type GetPlayerByIdResponse struct {
    Value  *Player
    Exists bool
}

func (this *CspPlayer) GetPlayerById(id int) (*Player, bool) {
    retCh := make(chan *GetPlayerByIdResponse)
    this.baseModelObj.RequestChannel <- NewRequest("GetPlayerById", id, retCh)
    responseObj := <-retCh
    return responseObj.Value, responseObj.Exists
}

func (this *CspPlayer) getPlayerByIdCallback(requestObj *Request) {
    id, _ := requestObj.Parameter.(int)
    playerObj, exists := this.playerMap[id]
    getPlayerByIdResponseObj := &GetPlayerByIdResponse{
        Value:  playerObj,
        Exists: exists,
    }
    retCh, _ := requestObj.ReturnCh.(chan *GetPlayerByIdResponse)
    retCh <- getPlayerByIdResponseObj
}

type GetPlayerListResponse struct {
    Value []*Player
}

func (this *CspPlayer) GetPlayerList() []*Player {
    retCh := make(chan *GetPlayerListResponse)
    this.baseModelObj.RequestChannel <- NewRequest("GetPlayerList", nil, retCh)
    responseObj := <-retCh
    return responseObj.Value
}

func (this *CspPlayer) getPlayerListCallback(requestObj *Request) {
    playerList := make([]*Player, 0, len(this.playerMap))
    for _, value := range this.playerMap {
        playerList = append(playerList, value)
    }
    getPlayerListResponseObj := &GetPlayerListResponse{
        Value: playerList,
    }

    retCh, _ := requestObj.ReturnCh.(chan *GetPlayerListResponse)
    retCh <- getPlayerListResponseObj
}

func (this *CspPlayer) AddPlayer(playerObj *Player) {
    this.baseModelObj.RequestChannel <- NewRequest("AddPlayer", playerObj, nil)
}

func (this *CspPlayer) addPlayerCallback(requestObj *Request) {
    playerObj, _ := requestObj.Parameter.(*Player)
    this.playerMap[playerObj.Id] = playerObj
}

func TestMutexPlayer(t *testing.T) {
    mutexPlayerObj := NewMutexPlayer()
    playerObj := NewPlayer(1, "Jordan", 100)
    mutexPlayerObj.AddPlayer(playerObj)
    playerList := mutexPlayerObj.GetPlayerList()
    if len(playerList) != 1 {
        t.Fatalf("Expected %d items in the list, now got:%d", 1, len(playerList))
    }

    gotPlayerObj, exists := mutexPlayerObj.GetPlayerById(1)
    if !exists {
        t.Fatalf("Expected:%v, Got:%v", true, exists)
    }
    if gotPlayerObj.Equal(playerObj) == false {
        t.Fatalf("Expected:%s, Got:%s", playerObj, gotPlayerObj)
    }
}

func TestCspPlayer(t *testing.T) {
    cspPlayerObj := NewCspPlayer()
    playerObj := NewPlayer(1, "Jordan", 100)
    cspPlayerObj.AddPlayer(playerObj)
    playerList := cspPlayerObj.GetPlayerList()
    if len(playerList) != 1 {
        t.Fatalf("Expected %d items in the list, now got:%d", 1, len(playerList))
    }

    gotPlayerObj, exists := cspPlayerObj.GetPlayerById(1)
    if !exists {
        t.Fatalf("Expected:%v, Got:%v", true, exists)
    }
    if gotPlayerObj.Equal(playerObj) == false {
        t.Fatalf("Expected:%s, Got:%s", playerObj, gotPlayerObj)
    }
}

func BenchmarkMutexPlayer(b *testing.B) {
    mutexPlayerObj := NewMutexPlayer()
    for i := 0; i < b.N; i++ {
        mutexPlayerObj.AddPlayer(NewPlayer(i, fmt.Sprintf("Player%d", i), i))
        mutexPlayerObj.GetPlayerById(i)
        mutexPlayerObj.GetPlayerList()
    }
}

func BenchmarkCspPlayer(b *testing.B) {
    cspPlayerObj := NewCspPlayer()
    for i := 0; i < b.N; i++ {
        cspPlayerObj.AddPlayer(NewPlayer(i, fmt.Sprintf("Player%d", i), i))
        cspPlayerObj.GetPlayerById(i)
        cspPlayerObj.GetPlayerList()
    }
    cspPlayerObj.Stop()
}

对于外部的使用者,使用Mutex/RWMutex和使用channel,两者没有任何区别。所以可以很方便地进行实现,以及内部的改造。

详细代码,可以访问:https://github.com/Jordanzuo/goutil/tree/master/cspBaseModel


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:筑梦之队

查看原文:Golang CSP模型基础库

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

507 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传