用Go语言实现ReactiveX(三)——链式编程

一个灰 · · 989 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

接上一篇用Go语言实现ReactiveX(二)——Deliver

我们在上一篇,谈到了数据传递者Deliver。那么还差一个Subscriber没讲,这个实现其实已经没什么好讲的了,可以直接看源码。因为Deliver里面蕴含了对Observable的订阅过程,而Subscriber的主要功能就是这个,相当于去掉被订阅功能的Deliver。

Reactive 编程就是把Observable、Deliver、Subscriber串起来变成一个单向流动的数据管道。所以必须设计一个串起来的方式。

Pipe编程模式

RxJS 6.0 的时候引入了pipe模式。所以我们的实现是基于pipe模式的。

func Pipe(source Observable, cbs ...Deliver) Observable {
    for _, cb := range cbs {
        source = cb(source)
    }
    return source
}

这时候我们可以将使用这个函数来组合所有的Rx对象

Pipe(FromArray(...),Filter(...),...)

这个函数返回的仍然是Observable,所以可以继续使用Pipe

ob1:=Pipe(FromArray(...),Filter(...),...)
Pipe(ob1,Map(...),SwitchMap(...),...)

当然最后必须得有人订阅这个Observable

Subscribe(...)(observable)

这么设计的原因是golang是强类型语言,pipe无法兼容observer类型,除非有泛型。否则Subscriber就可以放到pipe函数参数末尾传入了。

下面我们回到标题说的链式编程的实现

链式编程实现

所谓链式编程,就是一个对象的方法返回值是对象自身,这样可以接着调用对象的其他方法,行程一个链条,Rx早期的实现都是这么做的。

最终我们可以如此调用:

rx.FromArray(...).Filter(...).Subscribe(...)

那么如何实现呢?

package rx
import (
    p "github.com/langhuihui/gorx/pipe"
)
type Observable struct {
    source p.Observable
}

我们所有的Observable和Deliver包括Subscriber以及Pipe函数等定义全部都在github.com/langhuihui/gorx/pipe这个包里面
那么我们在外层的rx包里面就定义上面这个Observable,名称是相同的,但在不同包里面。
在pipe包里面,Observable是一个函数,而在rx包里面Observable是一个结构体,目的是实现链式编程。这个结构体只有一个成员就是source,类型是pipe包里面的Observable。魔法就此展开了。

func FromArray(array []interface{}) *Observable {
    return &Observable{p.FromArray(array)}
}

当我们调用rx.FromArray(...)的时候,会返回一个rx.Observable 的对象指针,这个对象里面的source属性就是pipe包里面的FromArray函数调用后的Observable

当我们继续调用操作符Filter的时候,rx.FromArray(...).Filter(...),就会调用rx.Observable结构体的Filter方法,这时候我们只需要定义这个成员函数即可。

func (observable *Observable) Filter(f func(interface{}) bool) *Observable {
    return &Observable{p.Filter(f)(observable.source)}
}

其他操作符以此类推,我写了一个脚本用来生成一系列这个定义,省去手工抄写的重复劳动。
可以瞬间从源码生成一堆成员方法

//TakeUntil 
func (observable *Observable) TakeUntil(sSrc Observable, delivers ...p.Deliver) *Observable {
    return &Observable{p.TakeUntil(sSrc.source, delivers...)(observable.source)}
}

//TakeLast 
func (observable *Observable) TakeLast(count int) *Observable {
    return &Observable{p.TakeLast(count)(observable.source)}
}

//Skip 
func (observable *Observable) Skip(count int) *Observable {
    return &Observable{p.Skip(count)(observable.source)}
}

//SkipWhile 
func (observable *Observable) SkipWhile(f func(interface{}) bool) *Observable {
    return &Observable{p.SkipWhile(f)(observable.source)}
}

//SkipUntil 
func (observable *Observable) SkipUntil(sSrc Observable, delivers ...p.Deliver) *Observable {
    return &Observable{p.SkipUntil(sSrc.source, delivers...)(observable.source)}
}

链式编程就算大工告成了。下面就是愉快的Rx编程了。

import "github.com/langhuihui/gorx"
rx.Interval(1000).SkipUntil(rx.Of(1).Delay(3000)).Subscribe(func(x interface{}, dispose func()) {
        fmt.Print(x)
    }, nil, nil)

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:一个灰

查看原文:用Go语言实现ReactiveX(三)——链式编程

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

989 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传