背景:由于需要清理mongodb的数据,需要写一个简单的程序来复制一个集合的数据,集合大概有2亿条数据,有效数据只有50多万,这50条的主键都写在txt文件中,所以我想写一个productor从txt文件中读数据,然后写5个consumed来做查询和存储工作。但是代码完成了之后,没次都不能读完txt,大概几千条之后就卡了。
附带问题: consumed是怎么结束的,chan没数据了自动结束么?我打印的日志有时候会显示某个consumed结束了。
最后万分感谢各位大神,小白接触go才2周,比较紧要没有系统的去学习go,也没用过这个markdown格式,排版很丑所以给大家添麻烦了。
package main
import (
"io/ioutil"
"log"
"strings"
"fmt"
"gopkg.in/mgo.v2"
"os"
"bufio"
"io"
"errors"
"regexp"
"time"
)
var MongodbAddress string
var fileDir string
//初始化conf文件以及对应变量
```func init() {```
content, err := ioutil.ReadFile("config/conf")
if err != nil {
log.Fatal("Cant't open config/conf:", err)
}
lines := strings.Split(string(content), "\n")
section := ""
for _, line := range (lines) {
if strings.HasPrefix(line, "[") {
section = strings.Trim(strings.TrimSpace(line), "[]")
continue
}
if section == "" {
fmt.Printf("Line \"%s\" has no section", line)
continue
}
token := strings.Split(line, "=")
if len(token) < 2 {
fmt.Printf("Line \"%s\" has equation", line)
continue
}
field := strings.TrimSpace(token[0])
value := ""
if len(token) > 2 {
value = strings.TrimSpace(strings.Join(token[1:], "="))
fmt.Println(value)
} else {
value = strings.TrimSpace(token[1])
}
switch section {
case "go":
switch field {
case "MongodbUrl":
MongodbAddress = value
fmt.Println(MongodbAddress)
case "fileDir":
fileDir = value
fmt.Println(fileDir)
}
}
}
}
var AdCollection *mgo.Collection //ad 集合
var AdTempCollection *mgo.Collection //ad_temp 用于存放有效数据
var waitQ = true
```func main() {```
session, err := mgo.Dial(MongodbAddress)
if err != nil {
log.Fatalln("Failed connect to mongoDB ........." + err.Error())
}
defer session.Close()
session.SetMode(mgo.Monotonic, true)
AdCollection = session.DB("faceless").C("ad")
AdTempCollection = session.DB("faceless").C("adTemp")
log.Println("get collection ad and ad_temp now ,start to read file of pks............")
ch := make(chan string,5)
defer close(ch)
go produce("生产者",ch)
time.Sleep(1000*time.Millisecond)
go consumed("消费者1",ch)
go consumed("消费者2",ch)
go consumed("消费者3",ch)
go consumed("消费者4",ch)
go consumed("消费者5",ch)
for {
if waitQ == false{
break
}
}
fmt.Println(waitQ)
}
//生产者 逐行读取日志文件
```func produce(line string,ch chan string )error{```
f, err := os.Open(fileDir)
if err != nil {
fmt.Println("生产者已经关闭")
return err
}
buf := bufio.NewReader(f)
for {
line, _, err := buf.ReadLine()
l := strings.TrimSpace(string(line))
if err != nil {
if err == io.EOF {
fmt.Println(err)
waitQ = true
fmt.Println("生产者已经关闭")
return nil
}
fmt.Println(err)
fmt.Println("生产者已经关闭")
return err
}
l, err = DealLine(l)
if err != nil {
//log.Println("failed to format " + " with :" + l +" and error: "+err.Error())
}else {
ch <- l
}
}
fmt.Println("生产者已经关闭")
return nil
}
//消费者
```func consumed(name string, ch chan string){```
for i:= range ch{
log.Println(name+": "+i)
err:= InsertDB(i)
if err!=nil{
//fmt.Println("消费者已经关闭"+name)
//return err
}
}
fmt.Println("消费者已经关闭:"+name)
//return nil
}
type Url struct {
L string
W int
V int
}
type Ad struct {
Pk string "_id"
Urls []Url ",omitempty"
City2 string ",omitempty"
Urls2 map[string][]Url ",omitempty"
UserId int "userId"
Site string
City string ",omitempty"
ActId int "actId,omitempty"
ActName string "actName,omitempty"
Domain string
Kind string
Data map[string]string
Status bool
Ctime int64
}
//插入新的表
```func InsertDB(line string) error {```
ad := new(Ad)
AdCollection.FindId(line).One(&ad)
if ad.Pk =="" {
//log.Println("发生一个错误,未找到数据: "+line)
return nil
}
err:=AdTempCollection.Insert(ad)
return err
}
//正则 2-8位任意数字或者字母
var reg = regexp.MustCompile(`^[A-Za-z0-9]{2,8}$`)
//格式化处理id
```func DealLine(l string) (string, error) {```
if strings.Index(l," ") < 0 {
return l,errors.New("老铁你这居然不含空格")
}
ls := strings.Split(l," ")
if reg.MatchString(ls[0]) {
return ls[0],nil
}
return ls[0], errors.New("老铁你格式明显不对啊: " + ls[0])
}
阻塞在什么地方可以用pprof来看:
https://golang.google.cn/pkg/net/http/pprof/
https://studygolang.com/articles/12314
#5
更多评论