我们在上一篇,谈到了数据传递者Deliver。那么还差一个Subscriber没讲,这个实现其实已经没什么好讲的了,可以直接看源码。因为Deliver里面蕴含了对Observable的订阅过程,而Subscriber的主要功能就是这个,相当于去掉被订阅功能的Deliver。
Reactive 编程就是把Observable、Deliver、Subscriber串起来变成一个单向流动的数据管道。所以必须设计一个串起来的方式。
Pipe编程模式
RxJS 6.0 的时候引入了pipe模式。所以我们的实现是基于pipe模式的。
func Pipe(source Observable, cbs ...Deliver) Observable {
for _, cb := range cbs {
source = cb(source)
}
return source
}
这时候我们可以将使用这个函数来组合所有的Rx对象
Pipe(FromArray(...),Filter(...),...)
这个函数返回的仍然是Observable,所以可以继续使用Pipe
ob1:=Pipe(FromArray(...),Filter(...),...)
Pipe(ob1,Map(...),SwitchMap(...),...)
当然最后必须得有人订阅这个Observable
Subscribe(...)(observable)
这么设计的原因是golang是强类型语言,pipe无法兼容observer类型,除非有泛型。否则Subscriber就可以放到pipe函数参数末尾传入了。
下面我们回到标题说的链式编程的实现
链式编程实现
所谓链式编程,就是一个对象的方法返回值是对象自身,这样可以接着调用对象的其他方法,行程一个链条,Rx早期的实现都是这么做的。
最终我们可以如此调用:
rx.FromArray(...).Filter(...).Subscribe(...)
那么如何实现呢?
package rx
import (
p "github.com/langhuihui/gorx/pipe"
)
type Observable struct {
source p.Observable
}
我们所有的Observable和Deliver包括Subscriber以及Pipe函数等定义全部都在github.com/langhuihui/gorx/pipe
这个包里面
那么我们在外层的rx包里面就定义上面这个Observable,名称是相同的,但在不同包里面。
在pipe包里面,Observable是一个函数,而在rx包里面Observable是一个结构体,目的是实现链式编程。这个结构体只有一个成员就是source,类型是pipe包里面的Observable。魔法就此展开了。
func FromArray(array []interface{}) *Observable {
return &Observable{p.FromArray(array)}
}
当我们调用rx.FromArray(...)
的时候,会返回一个rx.Observable 的对象指针,这个对象里面的source属性就是pipe包里面的FromArray函数调用后的Observable
当我们继续调用操作符Filter的时候,rx.FromArray(...).Filter(...)
,就会调用rx.Observable结构体的Filter方法,这时候我们只需要定义这个成员函数即可。
func (observable *Observable) Filter(f func(interface{}) bool) *Observable {
return &Observable{p.Filter(f)(observable.source)}
}
其他操作符以此类推,我写了一个脚本用来生成一系列这个定义,省去手工抄写的重复劳动。
可以瞬间从源码生成一堆成员方法
//TakeUntil
func (observable *Observable) TakeUntil(sSrc Observable, delivers ...p.Deliver) *Observable {
return &Observable{p.TakeUntil(sSrc.source, delivers...)(observable.source)}
}
//TakeLast
func (observable *Observable) TakeLast(count int) *Observable {
return &Observable{p.TakeLast(count)(observable.source)}
}
//Skip
func (observable *Observable) Skip(count int) *Observable {
return &Observable{p.Skip(count)(observable.source)}
}
//SkipWhile
func (observable *Observable) SkipWhile(f func(interface{}) bool) *Observable {
return &Observable{p.SkipWhile(f)(observable.source)}
}
//SkipUntil
func (observable *Observable) SkipUntil(sSrc Observable, delivers ...p.Deliver) *Observable {
return &Observable{p.SkipUntil(sSrc.source, delivers...)(observable.source)}
}
链式编程就算大工告成了。下面就是愉快的Rx编程了。
import "github.com/langhuihui/gorx"
rx.Interval(1000).SkipUntil(rx.Of(1).Delay(3000)).Subscribe(func(x interface{}, dispose func()) {
fmt.Print(x)
}, nil, nil)
有疑问加站长微信联系(非本文作者)