用Go语言实现ReactiveX有很大的挑战,Go语言本身没有类的继承,所以无法采用基类来做一些封装操作。不过好在Go语言是有闭包和匿名函数。所以可以实现ReactiveX
https://github.com/langhuihui/GoRx
影响设计ReactiveX的要素
- 没有类的继承
- 有匿名函数
- 有闭包
- 强类型,没有泛型
- goroutine代替异步
实现生产者Observable
- 发送数据
- 完成事件
- error事件
- 被订阅
- 被取消订阅
发送数据功能
有两种方式可以实现,一种是直接调用回调函数,和javascript一样。这种方式的局限性在于代码相对啰嗦,因为golang的函数定义必须是有类型的,会涉及到更多的类型断言的操作,匿名函数使用起来也比javascript的要更麻烦一些。第二种方式是采用channel来传递数据,这种方式更加go方式一点。所以我后来采取了第二种方式实现。(第一种也尝试过)
简而言之,核心就是一个chan interface{},一个无缓冲的channel用来发送数据。这个channel是由Observer传递进来的(类似于回调的概念)
type Next chan interface{}
Observable <------Next----- Observer //subscribe
Observable
Next-----data----> Observer //next
被订阅
当Observable接收到用于发送数据的channel的时候,就是被订阅的时候。见上图。
完成事件
利用close一个channel会产生一个事件的方式进行触发。
Observable close(Next) ------> Observer (complete)
Observer通过对channel读取操作,如果第二个参数返回false(channel已经被关闭)代表complete
data,ok:=<-next
if !ok{
//complete
}
error事件
由于golang对异常捕获目前上不健全,所以暂时就通过next channel发送错误对象,在Observer中对数据类型进行类型断言,如果是error类型,则认为收到了错误事件。
被取消订阅(dispose)
这个事件是由Observer向Observable发出的
我们定义了一个新的channel :chan bool。成为stop channel专门用来做这个事情,这个channel不发送任何数据,只用来close的时候广播这个事件。
type Stop chan bool
channel在close的时候,所有等待接受数据的goroutine均能接受到这个关闭事件,这是其他语言不具备的优势。
Obserable <-------Next、Stop---------- Observer //subscribe
<--------- close stop ----------- Observer //dispose
案例:FromArray
func FromArray(array []interface{}) Observable {
return func(n Next, s Stop) {
for _, item := range array {
select {
case <-s:
return
default:
n <- item
}
}
close(n)
}
}
我们看到FromArray是一个函数,调用FromArray(数组或切片),会返回一个Observable。Observable是一个函数
type Obserable func(Next, Stop)
我们遍历传入的数组或切片,然后向Next管道传入数组中的元素,假如Stop被关闭,我们也能即使取消数据发送。
当所有数据发送完毕我们关闭Next管道,发出complete信号。
(未完待续)
有疑问加站长微信联系(非本文作者)