pips/pip_prime.go
package pips
import (
"sync"
)
type PrimePip struct {
}
func NewPrimePip() *PrimePip {
primePip := &PrimePip{}
return primePip
}
func (primePip *PrimePip) RepeatFn(
done <-chan interface{},
fn func() interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
func (primePip *PrimePip) Take(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
func (primePip *PrimePip) ToInt(
done <-chan interface{},
valueStream <-chan interface{},
) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for v := range valueStream {
select {
case <-done:
return
case intStream <- v.(int):
}
}
}()
return intStream
}
func (primePip *PrimePip) PrimeFinder(
done <-chan interface{},
intStream <-chan int,
) <-chan interface{} {
primeStream := make(chan interface{})
go func() {
defer close(primeStream)
for integer := range intStream {
integer -= 1
prime := true
for divisor := integer - 1; divisor > 1; divisor-- {
if integer%divisor == 0 {
prime = false
break
}
}
if prime {
select {
case <-done:
return
case primeStream <- integer:
}
}
}
}()
return primeStream
}
func (primePip *PrimePip) FanIn(
done <-chan interface{},
channels ...<-chan interface{},
) <-chan interface{} {
var wg sync.WaitGroup
multiplexedStream := make(chan interface{})
multiplexed := func(c <-chan interface{}) {
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- i:
}
}
}
wg.Add(len(channels))
for _, c := range channels {
go multiplexed(c)
}
go func() {
wg.Wait()
close(multiplexedStream)
}()
return multiplexedStream
}
fanin2.go
// fanin2
package main
import (
"fanin2/pips"
"fmt"
"math/rand"
"runtime"
"time"
)
func main() {
done := make(chan interface{})
defer close(done)
start := time.Now()
rand := func() interface{} {
return rand.Intn(50000000)
}
primeP := pips.NewPrimePip()
randIntStream := primeP.ToInt(done, primeP.RepeatFn(done, rand))
numFinders := runtime.NumCPU()
fmt.Printf("Spinning up %d prime Finders \n", numFinders)
finders := make([]<-chan interface{}, numFinders)
fmt.Println("Primes:")
for i := 0; i < numFinders; i++ {
finders[i] = primeP.PrimeFinder(done, randIntStream)
}
for prime := range primeP.Take(done, primeP.FanIn(done, finders...), 10) {
fmt.Printf("\t%d \n", prime)
}
fmt.Printf("Search Took: %v \n", time.Since(start))
}
程序输出如下,可知相比于不使用扇入写法,效率从25s提升至5s,提升了五分之四。
有疑问加站长微信联系(非本文作者)