聊聊kotlin.coroutines【java协程】(1)

null_zhou · · 1518 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

这个夏天java最有竞争力的语言。关于它的语法糖在这就不一一阐述了,毕竟它能甜死你。
先说说什么是协程吧,用户态的子线程,轻量级,进程->线程->协程。

进程、线程、协程的关系和区别:
进程拥有自己独立的堆和栈,既不共享堆,亦不共享栈,进程由操作系统调度。
线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,线程亦由操作系统调度(标准线程是的)。
协程和线程一样共享堆,不共享栈,协程由程序员在协程的代码里显示调度。

协程的好处如下:
1.减少cpu线程上下文切换的开销
2.降低了内存消耗;
3.提高了cpu缓存命中率;
4.整体上提高了性能;
5.不提高硬件的前提下,提升了系统的负载能力。

只需要极少的栈内存(大概是4~5KB),默认情况下,线程栈的大小为1MB,一个线程可以开启数十万的协程,线程占用的内存开销远比协程要大得多。
golang原生就实现了协程,由runtime自行管理,一个go关键字就能开启goroutine。简直完美,但是今天要讲的不是golang。

总之,协程就是便宜,廉价,高效的代名词。

java里面要拥护这种高性能的协程,要通过第三方包来实现quasarcomsatkilim

上面这三位,就是目前所有java里面能快速实现coroutines的jar。
quasar:通过织入java字节码的方式,改变字节码结果,来使用,javaagent的方式
comsat:quasar的包装版本,提供轻量化的包装能快速使用。
kilim:和quasar一样,也要织入字节码来使用

但都有一个问题,必须预先给到注解,以上都能通过编译,但是到了linux环境,需要通过javaagent,因字节码被改写,无法追踪具体问题。协程管理是个大问题,会被线程kill,无故消失,笔者通过大半个月的实验,发现它们无法通过大部分环境,因而放弃。

kotlin.corouties

kotlin.corouties真是个非常好的api。语法简化,可以和golang的go关键字有得一拼。但在目前的kotlin api中是实验性质,不过已经具备上生产环境的能力,预计会在1.1.5中正式发布。因kotlin和java可以混编,所以coroutines是个下个高并发必备的知识点了。

kotlin.corouties调度器

CommonPool 调度器默认是通过fork/join的方式实现,目前还不提供接口,做自定义实现
launch(CommonPool)

Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks.
It uses[java.util.concurrent.ForkJoinPool]when available, which implements efficient work-stealing algorithm for its queues, so every coroutine resumption is dispatched as a separate task even when it already executes inside the pool.When available, it wraps ForkJoinPool.commonPool and provides a similar shared pool where not.

也就是说,kotlin的协程是并行调度的,关于fork/join也可以单独开一章讲了,暂不表。

Unconfined 调度器,默认是主线程调度 ,无限制启动协程,一旦协程睡了或者挂了,会启动新的协程

launch(Unconfined)

A coroutine dispatcher that is not confined to any specific thread.
It executes initial continuation of the coroutine right here in the current call-frame
and let the coroutine resume in whatever thread that is used by the corresponding suspending function, without
mandating any specific threading policy.

Note, that if you need your coroutine to be confined to a particular thread or a thread-pool after resumption,
but still want to execute it in the current call-frame until its first suspension, then you can use
an optional [CoroutineStart] parameter in coroutine builders like [launch] and [async] setting it to the
the value of [CoroutineStart.UNDISPATCHED].

ThreadPoolDispatcher.newSingleThreadContext调度器,单个线程的调度器

launch(newSingleThreadContext("MyOwnThread"))

Creates new coroutine execution context with the a single thread and built-in [yield] and [delay] support.
All continuations are dispatched immediately when invoked inside the thread of this context.
Resources of this pool (its thread) are reclaimed when job of this context is cancelled.
The specified [name] defines the name of the new thread.
An optional [parent] job may be specified upon creation.

launch(newFixedThreadPoolContext(100,"MyOwnThread")) 调度器,指定线程数量的调度器

Creates new coroutine execution context with the fixed-size thread-pool and built-in [yield] and [delay] support.
All continuations are dispatched immediately when invoked inside the threads of this context.
Resources of this pool (its threads) are reclaimed when job of this context is cancelled.
The specified [name] defines the names of the threads.
An optional [parent] job may be specified upon creation.

默认请全部使用launch(CommonPool),有特殊的限制问题,再考虑其他的调度器

launch(CommonPool) 异步协程开启

async(CommonPool) 同步协程开启

官方示例的Hello,World!,欢迎进入kotlin协程的世界

fun main(args: Array<String>) {
    launch(CommonPool) { // create new coroutine in common thread pool
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello,") // main function continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

kotlin中的sleep将不能暂停协程了,是个大坑,后面会讲到。

launch 启动协程,默认情况下直接开始执行,也可以显式执行

var job= launch(CommonPool) 
  if(job.isActive){
          job.cancel()
       }else{
            job.start()
     }

job任务可以根据需要什么时候开始执行,是否存活,取消等,提供了一系列api
有个小事,kotlin去掉了; 估计这个又可以引发一波大战

CommonPool 调度器
delay将会暂停1秒协程运行,
printlin是kotlin的打印方法,等同于System.out.printlin
Thread.sleep 这句只能暂停启动协程的线程

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    job.join() // wait until child coroutine completes
}

runBlocking<Unit> 启动一个非阻塞并且无返回值的任务
job.join() 等待协程任务完成

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
            doWorld() 
    }
    println("Hello,")
    job.join()
}

// this is your first suspending function
suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

这个讲suspend 关键字,为的是代码分离,不然就只能在 launch(CommonPool){}内部用delay来睡协程了,去掉了suspend是无法在其他方法调用delay睡协程了,直接编译错误。

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = List(100_000) { // create a lot of coroutines and list their jobs
        launch(CommonPool) {
            delay(1000L)
            print(".")
        }
    }
    jobs.forEach { it.join() } // wait for all jobs to complete
}

这个例子比较搞,启动100K的协程,如果你像作者一样,2G内存的渣机可能直接out-of-memory error,像笔者这样的8G大内存,是没有一点问题的。轻松愉快500ms执行完毕。

这个例子也是为了展示协程的轻量级和强悍,线程别说100K,就算10K,你的CPU和内存分分钟炸了,只能重启。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
        var nextPrintTime = 0L
        var i = 0
        while (isActive) { // cancellable computation loop
            val currentTime = System.currentTimeMillis()
            if (currentTime >= nextPrintTime) {
                println("I'm sleeping ${i++} ...")
                nextPrintTime = currentTime + 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to see if it was cancelled....
    println("main: Now I can quit.")
}

delay的例子太多,单独讲一个。启动了一个协程任务去计算当前的时间,然后你会发现协程内置了一个isActive属性,这也是线程内部唯三的三大内置属性之一。其他的两个为context和coroutineContext,不过context已经被放弃了,大概是作者觉得context,词不达意吧,从这点也可以发现kotlin不会随意的删除api,而是通过重命名,重载的方式提供新的。

isActive:如果协程处于存活或任务未完成,状态就返回true,如果取消或已完成,则返回false

例子的意思也很明显告诉你如果任务在delay时间内未被cancel则一直计算下去并打印三次I'm sleeping,然后任务被cancel,协程取消。主线程输出main: Now I can quit

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            run(NonCancellable) {
                println("I'm running finally")
                delay(1000L)
                println("And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to ensure it was cancelled indeed
    println("main: Now I can quit.")
}

这个例子讲的不可取消, run(NonCancellable)+finally=绝对执行的代码
run(NonCancellable)协程内部启动一个新的协程,并且不能取消,霸道总裁般的代码
run...{}内可以使用coroutineContext,跟上一级的协程块代码做交互。

fun main(args: Array<String>) = runBlocking<Unit> {
    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
}

repeat(1000) :迭代器,输入要迭代的次数:1000次
withTimeout(1300L) 时间1.3秒。
这里讲这个wiathTimeout主要是为了控制协程的超时时间,避免协程,一直在活动。虽然便宜,不代表能让任务一直执行下去,到了超时的时间会直接抛出异常

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}
fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

三个例子
measureTimeMillis :返回代码块的执行耗时,比起java,话就是少,就是这么屌

CoroutineStart:协程的执行模式(async和launch都可以用)

LAZY
懒加载

DEFAULT 默认的模式
默认 - 根据其上下文立即执行。

ATOMIC
根据其上下文原则(不可取消)计划协调执行。
跟[DEFAULT]类似,但协程在开始执行前无法取消。

UNDISPATCHED
未分派:暂不明白用途

println("The answer is ${one.await() + two.await()}")
kotlin执行计算可在字符串中一起计算
.await实际拿到的是协程返回的值,在例子中也就是13和29


suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 20
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 20
}


// The result type of asyncSomethingUsefulOne is Deferred<Int>
fun asyncSomethingUsefulOne() = async(CommonPool) {
    doSomethingUsefulOne()
}

// The result type of asyncSomethingUsefulTwo is Deferred<Int>
fun asyncSomethingUsefulTwo() = async(CommonPool)  {
    doSomethingUsefulTwo()
}


// note, that we don't have `runBlocking` to the right of `main` in this example
fun main(args: Array<String>) {
    val time = measureTimeMillis {
        // we can initiate async actions outside of a coroutine
        val one = asyncSomethingUsefulOne()
        val two = asyncSomethingUsefulTwo()
        // but waiting for a result must involve either suspending or blocking.
        // here we use `runBlocking { ... }` to block the main thread while waiting for the result
        runBlocking {
            println("The answer is ${one.await() + two.await()}")
        }
    }
    println("Completed in $time ms")
}

runBlocking{}是个同步非阻塞的代码块执行器,能统一拿到coroutines的返回值,支持泛型和接受返回参,多个或单个协程一旦启动后我们要拿返回值不仅可以用await,也可以用runBlocking

      var result= runBlocking<Int> {
            var resultint = one.await() + two.await()
            println("The answer is resultint="+resultint)
            //基本类型直接这样写就可以
            resultint
        }
     println(result)

============================================================================

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // not confined -- will work with main thread
        println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
        println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
        println("      'CommonPool': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
        println("          'newSTC': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

介绍了多个调度器

launch(Unconfined) 
launch(coroutineContext):这个调度器只有在runBlocking内部才能用,严格来说不算调度器,内部协程的下上文中,继续启动协程

launch(CommonPool) 
launch(newSingleThreadContext("MyOwnThread")) 

具体解释看开篇的说明

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // not confined -- will work with main thread
        println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("      'Unconfined': After delay in thread ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
        println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
这一句将会在新的协程中打印出来,因为协程本身被delay了

private val log = LoggerFactory.getLogger(X::class.java)

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) = runBlocking<Unit> {
    val a = async(coroutineContext) {
        log.info("I'm computing a piece of the answer")
        log("I'm computing a piece of the answer")
        6
    }
    val b = async(coroutineContext) {
        log.info("I'm computing another piece of the answer")
        log("I'm computing a piece of the answer")
        7
    }
    log.info("The answer is ${a.await() * b.await()}")
}

这里要讲的是日志:如果你是lombok的使用者,那么很遗憾,lombox现在暂不支持在kotlin使用@Slf4j或者@Log4j

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
这一句是官方示例给的,最好别用

private val log = LoggerFactory.getLogger(X::class.java)

跟以前一样用LoggerFactory拿就好了

fun main(args: Array<String>) = runBlocking<Unit> {
    println("My job is ${coroutineContext.get(Job.Key)}")
    println("My job is ${coroutineContext.get(Job)}")
    println("My job is ${coroutineContext[Job]}")
}

runBlocking<Unit> 这个老伙计了,老伙计本身其实也是coroutines启动的没想到吧,惊不惊喜,意不意外。这种设计就跟golang一样,有个统一的runtime管理器,但这里是显式的。
它被设计出来最大的原因就是阻塞执行了,在它内部可以启动多个async协程,然后共同计算出一个复杂的对象,然后统一返回给runBlocking,外部就可以直接接收

今天就聊到这,delay一下。欢迎加群交流关于kotlin.coroutines

QQ群:641163584

下一章,应该是下周

转载请联系我本人授权


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:null_zhou

查看原文:聊聊kotlin.coroutines【java协程】(1)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:1006366459

1518 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传