最近阅读到miner部分代码,看到里面的事件订阅提起了兴趣就研究了一下,以后做项目也许会用到。
以太坊提供了两种event事件模型,typeMux和feed,两者在使用上会有一点区别,前者可以订阅任意类型的消息,而后者由于subscribe的入参为chan,故类型在第一次使用时就确定,也就是一个feed只能对应一个类型的chan。前者如果订阅者始终没有读取chan消息,那么整个流程会阻塞,影响其他订阅者正常读取消息,性能低下,故以太坊中只有极少地方使用typeMux,主要是feed。而feed比较灵活,如果chan设置了缓冲,那么就可以异步,没有缓冲则效果和typeMux一样。
package main
import (
"fmt"
"github.com/ethereum/go-ethereum/event"
"time"
)
type test struct {
data int
}
type test1 struct {
data int
}
func main() {
//subMux()
feed()
select {}
}
//typeMux 同步事件模型
func subMux() {
var mux event.TypeMux
go func() {
ch := mux.Subscribe(test{})
for v := range ch.Chan() {
fmt.Println("test:", v.Data.(test))
}
}()
go func() {
sub := mux.Subscribe(test1{})
loop:
time.Sleep(time.Second * 7) //模拟test1读取管道阻塞,test等待
v := <-sub.Chan()
fmt.Println("test1:", v.Data.(test1))
goto loop
}()
go func() {
for i := 0; i < 10000; i++ {
time.Sleep(time.Second)
_ = mux.Post(test1{i})
_ = mux.Post(test{i})
}
}()
}
//可异步事件模型,feed管理的管道类型固定。
func feed() {
var fee event.Feed
var scope event.SubscriptionScope //统一管理sub,统一关闭通道及取消订阅
go func() {
ch := make(chan test, 100)
sub := fee.Subscribe(ch)
scope.Track(sub)
for {
select {
case v := <-ch:
fmt.Println("ch test1:", v)
case <-sub.Err():
fmt.Println("sub closed")
return
}
}
}()
go func() {
ch := make(chan test, 100) //添加带缓冲的管道,异步模式
sub := fee.Subscribe(ch)
scope.Track(sub)
loop:
time.Sleep(time.Second * 5) //模拟读取管道阻塞
select {
case v := <-ch:
fmt.Println("ch test1:", v)
case <-sub.Err():
fmt.Println("sub1 closed")
return
}
goto loop
}()
go func() {
for i := 0; i < 1000; i++ {
time.Sleep(time.Second)
fee.Send(test{i})
}
}()
go func() {
time.Sleep(time.Second * 10)
scope.Close()
}()
}
附别人的源码分析,http://lessisbetter.site/2018/10/18/ethereum-code-event-framework/,讲得很好。
有疑问加站长微信联系(非本文作者)