1.进程、线程、协程区别
a.各自特点
参考《详细介绍 进程、线程和协程的区别》
- 进程:拥有自己独立的堆和栈,既不共享堆,也不共享栈,进程由操作系统调度;
- 线程:拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度;
- 协程:拥有自己独立的栈和共享的堆,共享堆,不共享栈,协程由程序员在协程的代码里显示调度。
协程与线程:
每个单位时间内,一个CPU只能处理一个线程(操作系统:thread),线程是CPU处理的单位或单元,底层资源占用中等(比进程少)。线程中程序的执行过程是:同步阻塞的(依次执行),非抢占式的(依代码编写顺序)。开发上比较清晰明了。
协程是“用户级”的线程,通过把线程的分段运行:主动暂停、主动运行,切换逻辑点,针对i/o请求可以节约连接、对方处理的中间环节等待时间,一个线程上可以跑多个协程。协程中的程序执行是触发、跳转的,异步非阻塞的(事件触发),抢占式的(线程挂起等待响应)。开发上很复杂。
channel信道,是go用于在线程间传递数据的,下面关于channel的例子观察线程与协程使用情况
b.上代码一:
使用一个无缓存channel时:
package main
import (
"fmt"
"time"
)
var waitc = make(chan int)
func routine(id int) {
time.Sleep(time.Microsecond *200)
fmt.Printf("this is routine %v before.\n", id)
waitc <- id
fmt.Printf("this is routine %v after.\n", id)
}
func main() {
for i := 0; i < 5; i++ {
go routine(i*i)
}
for i := 0; i < 5; i++ {
fmt.Printf("--this is main routine %v before.\n", i)
<-waitc
fmt.Printf("--this is main routine %v after.\n", i)
}
time.Sleep(time.Microsecond *200)
}
/*
--this is main routine 0 before.
this is routine 1 before.
this is routine 1 after.
this is routine 9 before.
this is routine 16 before.
this is routine 4 before.
this is routine 0 before.
--this is main routine 0 after.
--this is main routine 1 before.
--this is main routine 1 after.
--this is main routine 2 before.
--this is main routine 2 after.
--this is main routine 3 before.
--this is main routine 3 after.
--this is main routine 4 before.
--this is main routine 4 after.
this is routine 4 after.
this is routine 9 after.
this is routine 16 after.
原文地址 https://blog.csdn.net/kjfcpua/article/details/18265441
-----------------------------------------------------------------------
解析:
遇到信道阻塞,循环取的、继续循环、跳过当前,阻塞的执行完毕、继续循环、直到完成
原理上与 yield中断相同
*/
百度查找关于go的多线程,写法也跟协程没有明显区别。参照上面特点的话,线程部分:go func(){}()另起一个线程,变量继承当前父进程/主线程、运行空间为{}、内部顺序执行,如果有数据流阻塞;协程部分:对线程添加异步代码,实现事件驱动的执行状态切换,运行空间为当前线程、数据流驱动(输出、输入)不阻塞。
参考 上官二狗《Go 缓冲 channel 和 非缓冲 channel 的区别》
c、代码二:
使用一个缓存channel + 一个无缓存channel时:
package main
import (
"fmt"
"math/rand"
"strconv"
"time"
)
type array2j struct {
a []string
b string
}
func main() {
ch := make(chan string, 3)
c2 := make(chan string)
var queue array2j
for i:=1; i<=5; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Intn(100)) * time.Microsecond)
fmt.Println("go func:" + strconv.Itoa(i))
ch <- strconv.Itoa(i) + "_ch_" + strconv.Itoa(rand.Int())
}(i)
}
for j:=1; j<=2; j++ {
go func() {
time.Sleep(1 * time.Second)
ch <- "c2"
}()
}
//等下,切到其他线程
time.Sleep(1 * time.Second)
for {
select {
case a,e := <-ch:
fmt.Println(a,e)
queue.a = append(queue.a, a)
case b,e := <-c2:
fmt.Println(b,e)
queue.b = b
}
//下面这快代码是不报错的关键:当所有信道为空时退出循环
if len(ch) + len(c2) == 0 {
fmt.Println("queue", queue)
break
}
//执行不到
res, err := <- ch
fmt.Println(res, err)
}
fmt.Println("hello go!")
}
/**
go func:1
go func:4
go func:2
go func:3
go func:5
1_ch_5577006791947779410 true
4_ch_8674665223082153551 true
2_ch_6129484611666145821 true
3_ch_4037200794235010051 true
5_ch_3916589616287113937 true
c2 true
c2 true
queue {[1_ch_5577006791947779410 4_ch_8674665223082153551 2_ch_6129484611666145821 3_ch_4037200794235010051 5_ch_3916589616287113937 c2 c2] }
hello go!
*/
这里信道ch宽度是3,有5个线程输入-对应将有5个输出,运行流出正常。说明:有缓存、超量时会自动阻塞,当读取完其中数值时,又可以继续写入。
2.应用测试
a、数据库的批量写入
实际操作只有看到线程和异步, 协程是线程的一个异步表现。
准备测试环境,使用php进行建表、生成10w测试数据sql.data的准备略。
package main
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
"fmt"
"io"
"os"
"path"
"strconv"
_ "strings"
"bufio"
_ "fmt"
_ "io"
_ "io/ioutil"
"time"
)
func ReadFile(filePath string, handle func(string)) error {
f, err := os.Open(filePath)
defer f.Close()
if err != nil {
return err
}
//255*100 测试行大于4K时读取被截断 [:4K]
buf := bufio.NewReaderSize(f, 25500)
for {
line, _, err := buf.ReadLine()
statistic.readLine++
handle(string(line))
if err != nil {
if err == io.EOF{
//fmt.Println( "io.EOF:", err, string(line))
return nil
}
return err
}
//return nil
}
}
func buildQuery(line string){
if len(line) == 0 {
//结尾
query := curSql[:len(curSql)-1]
fmt.Println( "==> EOF队列:" + strconv.Itoa(statistic.sqlLineNum), line)
go execQuery(query)
}else{
newSql := curSql + "(" + line +"),"
if len(newSql) > maxSqlLen {
query := curSql[:len(curSql)-1]
fmt.Println( "任务队列:" + strconv.Itoa(statistic.sqlLineNum))
go execQuery(query)
//回归
curSql = sqlBuild + "(" + line +"),"
}else{
curSql = newSql
}
}
}
func execQuery(query string) {
statistic.sqlLineNum++
res, err := myDb.Exec(query) //Result
if err != nil {
fmt.Println(err.Error()) //显示异常
panic(err.Error()) //抛出异常
}
re, err := res.RowsAffected() //int64, error
if err != nil {
fmt.Println(err.Error()) //显示异常
fmt.Println(err) //抛出异常
}
string := strconv.FormatInt(re, 10)
rows, err := strconv.Atoi(string)
if err != nil {
fmt.Println(err) //抛出异常
}
channelResult <- rows
}
type statistics struct {
execDoneNum int
sqlLineNum int
chanRecNum int
readLine int
}
var sqlBuild string
var curSql string
var myDb *sql.DB
var maxSqlLen = 1024*1024*2
var statistic statistics = statistics{0,0,0,0}
//容器mysql的最大连接数是150 (200崩溃)
var channelResult = make(chan int, 20)
func main(){
var err error
myDb, err = sql.Open("mysql", "root:123456@tcp(172.1.11.11:3306)/testdb?charset=utf8")
if err != nil {
fmt.Println(err.Error()) //显示异常
panic(err.Error()) //抛出异常
}
defer myDb.Close()
var count int
rows, err := myDb.Query("SELECT COUNT(id) as count FROM t10_5")
if err != nil {
fmt.Println(err.Error()) //显示异常
panic(err.Error()) //抛出异常
}
for rows.Next() {
rows.Scan(&count)
}
fmt.Println(count)
fmt.Println()
//初始化sql
sqlBuild = "INSERT INTO `t10_5` ("
for i:=1; i<100; i++ {
sqlBuild += "`field_"+ strconv.Itoa(i) +"`,"
}
sqlBuild = sqlBuild[:len(sqlBuild)-1] + ") VALUES "
pwd, _ := os.Getwd()
dataPath := path.Join(pwd, "sql.data")
fmt.Println(dataPath)
curSql = sqlBuild;
ReadFile(dataPath, buildQuery)
time.Sleep(time.Second)
for {
x, ok := <- channelResult
statistic.execDoneNum += x
statistic.chanRecNum++
fmt.Println(statistic.sqlLineNum, statistic.execDoneNum, ok, len(channelResult))
if len(channelResult)==0 {
print("---------- 完成 ----------")
fmt.Println(statistic.execDoneNum, statistic.sqlLineNum)
break
}
}
fmt.Println("chanRecNum=", statistic.chanRecNum, "sqlLineNum=", statistic.sqlLineNum, "readLine=", statistic.readLine, statistic.execDoneNum, len(channelResult))
var count2 int
rows, err = myDb.Query("SELECT COUNT(id) as count FROM t10_5")
if err != nil {
fmt.Println(err.Error()) //显示异常
panic(err.Error()) //抛出异常
}
for rows.Next() {
rows.Scan(&count2)
}
fmt.Println(count2)
fmt.Println(count2-count, statistic.execDoneNum, "缺失行:", count2-count-statistic.execDoneNum)
}
/**
...
523 100000 true 0
---------- 完成 ----------100000 523
chanRecNum= 523 sqlLineNum= 523 readLine= 100001 100000 0
100000
100000 100000 缺失行: 0
...
523 100000 true 0
---------- 完成 ----------100000 523
chanRecNum= 523 sqlLineNum= 523 readLine= 100001 100000 0
200000
100000 100000 缺失行: 0
real 1m4.293s
user 0m28.764s
sys 0m1.944s
*/
大量写入在主从库时,会占用大量内存,导致主机多次内存和磁盘空间不足,需要注意。
有疑问加站长微信联系(非本文作者)