Go 1.18发布了!基于 Go 1.18 Generics (泛型) 进行流式处理

xyctruth · 2022-03-14 14:44:29 · 2237 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2022-03-14 14:44:29 的主题,其中的信息可能已经有所发展或是发生改变。

Stream 是一个基于 Go 1.18+ 泛型的流式处理库, 它支持并行处理流中的数据. 并行流会将元素平均划分多个的分区, 并创建相同数量的 goroutine 执行, 并且会保证处理完成后流中元素保持原始顺序.

GitHub - xyctruth/stream: A Stream library based on Go 1.18+ Generics (Support Parallel Stream)

安装

需要安装 Go 1.18+ 版本

$ go get github.com/xyctruth/stream

在代码中导入它

import "github.com/xyctruth/stream"

基础

s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}).
    Filter(func(s string) bool { return s != "b" }).
    Map(func(s string) string { return "class_" + s }).
    Sort().
    Distinct().
    ToSlice()

// 需要转换切片元素的类型
s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}).
    Filter(func(v int) bool { return v >3 }).
    Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }).
    Reduce(func(r string, v string) string { return r + v })

类型约束

any 接受任何类型的元素, 所以不能使用 == != > < 比较元素, 导致你不能使用 Sort(), Find()...等函数 ,但是你可以使用 SortFunc(fn), FindFunc(fn)... 代替

type SliceStream[Elem any] struct {
    slice      []Elem
}

stream.NewSlice([]int{1, 2, 3, 7, 1})

comparable 接收的类型可以使用 == != 比较元素, 但仍然不能使用 > < 比较元素, 因此你不能使用 Sort(), Min()...等函数 ,但是你可以使用 SortFunc(fn), MinFunc()... 代替

type SliceComparableStream[Elem comparable] struct {
    SliceStream[Elem]
}

stream.NewSliceByComparable([]int{1, 2, 3, 7, 1})

constraints.Ordered 接收的类型可以使用 == != > <, 所以可以使用所有的函数

type SliceOrderedStream[Elem constraints.Ordered] struct {
    SliceComparableStream[Elem]
}

stream.NewSliceByOrdered([]int{1, 2, 3, 7, 1})

类型转换

有些时候我们需要使用 Map ,Reduce 转换切片元素的类型,但是很遗憾目前 Golang 并不支持结构体的方法有额外的类型参数,所有类型参数必须在结构体中声明。在 Golang 支持之前我们暂时使用临时方案解决这个问题。

type SliceMappingStream[Elem any, MapElem any, ReduceElem any] struct {
    SliceStream[Elem]
}

s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}).
    Filter(func(v int) bool { return v >3 }).
    Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }).
    Reduce(func(r string, v string) string { return r + v })

并行

Parallel 函数接收一个 goroutines int 参数. 如果 goroutines>1 则开启并行, 否则关闭并行, 默认流是关闭并行的。

并行会将流中的元素平均划分多个的分区, 并创建相同数量的 goroutine 执行, 并且会保证处理完成后流中元素保持原始顺序.

s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}).
    Parallel(10).
    Filter(func(s string) bool {
      // 一些耗时操作
      return s != "b"
    }).
    Map(func(s string) string {
      // 一些耗时操作
      return "class_" + s
    }).
    ForEach(
      func(index int, s string) {
      // 一些耗时操作
      }).
    ToSlice()

并行类型

  • First: 一旦获得第一个返回值,并行处理就结束. For: AllMatch, AnyMatch, FindFunc
  • ALL: 所有元素都需要并行处理,得到所有返回值,然后并行结束. For: Map, Filter
  • Action: 所有元素需要并行处理,不需要返回值. For: ForEach, Action

并行 goroutines

开启并行 goroutine 数量在面对 CPU 操作与 IO 操作有着不同的选择。 一般面对 CPU 操作时 goroutine 数量不需要设置大于 CPU 核心数,而 IO 操作时 goroutine 数量可以设置远远大于 CPU 核心数.

CPU 操作

BenchmarkParallelByCPU

NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) {
    sort.Ints(newArray(1000)) //  模拟 CPU 耗时操作
})

使用6个cpu核心进行基准测试

go test -run=^$ -benchtime=5s -cpu=6  -bench=^BenchmarkParallelByCPU

goos: darwin
goarch: amd64
pkg: github.com/xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByCPU/no_Parallel(0)-6                  710       8265106 ns/op
BenchmarkParallelByCPU/goroutines(2)-6                  1387       4333929 ns/op
BenchmarkParallelByCPU/goroutines(4)-6                  2540       2361783 ns/op
BenchmarkParallelByCPU/goroutines(6)-6                  3024       2100158 ns/op
BenchmarkParallelByCPU/goroutines(8)-6                  2347       2531435 ns/op
BenchmarkParallelByCPU/goroutines(10)-6                 2622       2306752 ns/op

IO 操作

BenchmarkParallelByIO

NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) {
    time.Sleep(time.Millisecond) // 模拟 IO 耗时操作
})

使用6个cpu核心进行基准测试

go test -run=^$ -benchtime=5s -cpu=6  -bench=^BenchmarkParallelByIO

goos: darwin
goarch: amd64
pkg: github.com/xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByIO/no_parallel(0)-6                    52     102023558 ns/op
BenchmarkParallelByIO/goroutines(2)-6                    100      55807303 ns/op
BenchmarkParallelByIO/goroutines(4)-6                    214      27868725 ns/op
BenchmarkParallelByIO/goroutines(6)-6                    315      18925789 ns/op
BenchmarkParallelByIO/goroutines(8)-6                    411      14439700 ns/op
BenchmarkParallelByIO/goroutines(10)-6                   537      11164758 ns/op
BenchmarkParallelByIO/goroutines(50)-6                  2629       2310602 ns/op
BenchmarkParallelByIO/goroutines(100)-6                 5094       1221887 ns/op

项目地址

https://github.com/xyctruth/stream


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2237 次点击  
加入收藏 微博
6 回复  |  直到 2022-03-19 11:30:44
zzustu
zzustu · #1 · 3年之前

这种项目一般在泛型发布初期练手时自己写写玩玩,等泛型普及开后,估计官方会有在 golang.com/x 里面加入类似的库。代码中对泛型的声明太冗长了: Elem Result ,可以改成 E R K V T

xyctruth
xyctruth · #2 · 3年之前
zzustuzzustu #1 回复

这种项目一般在泛型发布初期练手时自己写写玩玩,等泛型普及开后,估计官方会有在 golang.com/x 里面加入类似的库。代码中对泛型的声明太冗长了: `Elem` `Result` ,可以改成 `E` `R` `K` `V` `T` 等

golang.org/x/exp/slices包中已有切片泛型相关函数。

“代码中对泛型的声明太冗长了” ,对于这个问题最初其实很自然也是想用 E,R 等缩写。但是直到我看到 golang.org/x/exp/slices 包中的一些方法:https://github.com/golang/exp/blob/master/slices/sort.go 没使用缩写。

golang.org/x/exp/slices 片段代码

// Sort sorts a slice of any ordered type in ascending order.
func Sort[Elem constraints.Ordered](x []Elem) {
    n := len(x)
    quickSortOrdered(x, 0, n, maxDepth(n))
}

// Sort sorts the slice x in ascending order as determined by the less function.
// This sort is not guaranteed to be stable.
func SortFunc[Elem any](x []Elem, less func(a, b Elem) bool) {
    n := len(x)
    quickSortLessFunc(x, 0, n, maxDepth(n), less)
}
xyctruth
xyctruth · #3 · 3年之前
zzustuzzustu #1 回复

这种项目一般在泛型发布初期练手时自己写写玩玩,等泛型普及开后,估计官方会有在 golang.com/x 里面加入类似的库。代码中对泛型的声明太冗长了: `Elem` `Result` ,可以改成 `E` `R` `K` `V` `T` 等

就在刚刚5分钟前,golang.org/x/exp/slices 包更新 slices: consistently use E rather than Elem 😂

alex_my
alex_my · #4 · 3年之前

你们动手真快,刚还看到一个路由的。

xyctruth
xyctruth · #5 · 3年之前

我是打开代码看到 Elem, 在刷新就没了,就是这么巧。我以为我眼花了。

panjunjie
panjunjie · #6 · 3年之前

不得不承认,泛型加入,Go 变复杂了很多!

添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传