书接上回,我们一起体验了promise模式,也了解到了其解决什么场景下的问题。
本篇文章的目的之一即回答好两个问题:
- reactor模式解决什么场景下的问题
- reactor解决问题的场景与promise模式有哪些不同,有哪些重叠
另外在概念层面,本篇文章希望能够解释清楚reactor领域一些常见的概念
在实践层面,带读者体验一下reactor模式的写法,抛砖引玉
案例实现
还是promise文章里的案例:基于计算器服务,实现一个接口,接口实现计算 a + ((b -c)+ d) -e -f + g
本文实现依然是基于vertx和kotlin来做,只是异步结果编排部分替换为reactor实现:
class ReactorVerticle : AbstractVerticle(){
lateinit var webClient: WebClient
override fun start() {
webClient = WebClient.create(vertx)
var eventBus = vertx.eventBus()
eventBus.consumer<JsonObject>("calc.reactor"){ msg ->
var msgBody = msg.body()
var a = msgBody.getInteger("a", 0)
var b = msgBody.getInteger("b", 0)
var c = msgBody.getInteger("c", 0)
var d = msgBody.getInteger("d", 0)
var e = msgBody.getInteger("e", 0)
var f = msgBody.getInteger("f", 0)
var g = msgBody.getInteger("g", 0)
(b asyncSub c)
.flatMap { it asyncAdd d }
.flatMap { it asyncAdd a }
.flatMap { it asyncSub e }
.flatMap { it asyncSub f }
.flatMap { it asyncAdd g }
.doOnError { msg.fail(500, it.message) }
.subscribe { msg.reply(it.toString()) }
}
}
infix fun Int.asyncAdd(input : Int) : Mono<Int> {
return calc(this, input, CalcOperator.add)
}
infix fun Int.asyncSub(input : Int) : Mono<Int> {
return calc(this, input, CalcOperator.sub)
}
/**
* 所有异常必须被处理
*/
fun calc(a: Int, b: Int, operator: CalcOperator) : Mono<Int> {
return Mono.create { sink ->
webClient.get(7777, "pi", "/${operator.name}?a=$a&b=$b")
.expect(ResponsePredicate.SC_OK).send{
if (it.succeeded()) {
try{
var addResult = it.result().bodyAsString().toInt()
sink.success(addResult)
println("reactor calc: $a - $b = $addResult")
} catch (e: Exception) {
sink.error(e)
}
} else {
sink.error(it.cause())
}
}
}
}
}
与promise模式实现的代码风格很像,在当前案例这种场景下可以说reactor模式可以做到和promise模式一样的效果。
但是reactor不仅仅解决promise的场景。
案例变种一
实现计算 a + ((b -c)+ d) -e -f + g
,当 b-c > 5 时取b-c的结果,否则以6作为b-c的结果
以reactor模式实现代码如下:
(b asyncSub c)
.filter{ it > 5 }
.switchIfEmpty(Mono.just(6))
.flatMap { it asyncAdd d }
.flatMap { it asyncAdd a }
.flatMap { it asyncSub e }
.flatMap { it asyncSub f }
.flatMap { it asyncAdd g }
.doOnError { msg.fail(500, it.message) }
.subscribe { msg.reply(it.toString()) }
当然以promise模式来做的话,代码优化一下,应该也不会太丑,比如:
(b asyncSub c).thenCompose {
var promise = CompletableFuture<Int>()
if(it > 5){
promise.complete(it)
} else {
promise.complete(6)
}
promise
}
.thenCompose { it asyncAdd d }
.thenCompose { it asyncAdd a }
.thenCompose { it asyncSub e }
.thenCompose { it asyncSub f }
.thenCompose { it asyncAdd g }
.thenAccept { msg.reply(it.toString()) }
.exceptionally {
msg.fail(500, it.message)
null
}
但是上面用到的filter和switchIfEmpty组合只是reactor里的两个操作符号而已,reactor里还有很多很丰富的各种操作符。
最初的问题
下面来解答最开始的两个问题
- reactor模式解决什么场景下的问题
- reactor解决问题的场景与promise模式有哪些不同,有哪些重叠
reactor可以解决promise场景下的问题,而且解决方案更加优雅,并形成标准;有很多现成的轮子(各种操作符)拿来即用。但reactor不只是为了解决promise面对的问题的,他解决的问题笔者归纳如下(个人观点,欢迎讨论):
- reactor是一种编程模式,与面向对象编程是一个级别的,他的存在是为了解决具体的某一类问题,但又不是解决特定问题的(能简单说一下面向对象编程解决什么问题吗?);简单来说reactor模式关注的是对数据的处理,把一坨数据通过各种操作符号转换为另一坨数据;数据在处理过程中又支持各种异步处理,支持对异步处理的结果进行编排,同时编程风格上推崇链式,所以看上去代码干净一些
- 上面一点里的
支持对异步结果的编排
顺手解决异步编程模式下,的回调地狱问题(与promise一样)
说法有点绕,reactor和promise本身是两个东西,promise顶多算一种设计模式,而reactor是一种编程风格;因为本系列文章从异步编程角度谈起,所以故事一路讲来把reactor和promise进行了一轮比较。
下面顺便引用两处权威文档:
asynchronous stream processing with non-blocking back pressure
-- https://www.reactive-streams.org/
Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).
-- https://en.wikipedia.org/wiki/Reactive_programming
reactor核心概念串讲
下面用两张图来讲一下reactor模式;第一张图,两个工人,左边的是publisher
、右边的是subscriber
,中间是一个流水线, 流水线上有四道工序:
- 变蓝色
- 变三角形
- 过滤尺寸太大的长方形
-
只要前九个
那么经过如上的流水线,最终结果会是什么样子呢?
其中流水线中每一道工序都可以是异步的,那么流水线相当于对异步处理的结果按照预先设定好的逻辑进行了编排。
下面把图中的角色对应到reactor模式里去: - publisher: publisher
- subscriber: subscriber
- operator: 各个工序都属于operator
reactive stream
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure
-- https://www.reactive-streams.org/
reactor stream 定义了一套 接口, project reactor 实现了这套接口并进行了一些拓展
所以基于reactor开发的代码,各种方法返回的Mono
或者Flux
都是属于publisher,publisher会被调用者subscribe到具体的subscriber,这个过程就是搭建流水线的过程。如果有0到1个货物要处理就用Mono;如果有0到n个货物要处理就用Flux。
这里只大概讲一下概念,更深入的内容建议熟读五遍 projectreactor文档,这个文档里会介绍为什么要用reactor,如何使用,如何选择操作符等各种问题。
reactor模式与http服务
java生态这么多年一路走来,做web服务,最原始大部分基于servlet模型,都是阻塞式io来做,与异步编程扯不上关系。随着时代的发展,对性能要求越来越高,高并发的web服务似乎是java的一个软肋。但基于异步编程,也衍生出了一些解决方案,可以与reactor模式一起玩的,列举常用的三种:
- spring webflux (听这名字 web的flux!)
- vertx 上面的示例代码就是基于vertx来写的
- reactor netty,这个笔者并没有体验过
上面的三个每一个都可以单独列一个话题谈好多,暂不展开
缺点
reactor模式虽然看起来狂拽酷炫,漫不经心就把回调地狱
给解决了,但是在复杂业务场景下,比如各种...if...else...
嵌套的场景,如果想要代码保持清晰性、可读性,是非常考验设计功底和编码功底的,当然对笔者来说这一点亦是异步编程的魅力所在。
对程序员个人来讲,多接触一些思想和模式,有助于开阔思路,融会贯通;
但是对于公司来讲,做项目讲究性价比,采用入门门槛高的编程方式,就需要招能力强的程序员,反之只需要招一些刚刚毕业的实习生即可。
异步编程,我们从回调、promise聊到reactor,有没有其他可选方案呢?
协程,支持以同步的方式写异步的代码
笔者认为这也是最近这几年golang火起来的一个主要原因。
另外jvm体系kotlin
也支持协程,与golang的协程在玩法上大不相同。
下一篇文章,我们来一起聊一聊协程。
参考文章
知乎上关于函数式编程的讨论
ReactiveX 文档翻译
projectreactor文档
有疑问加站长微信联系(非本文作者)