大家好,请教一个关于channel问题,内附代码,求大神指教

jiang_qingtian · · 1145 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

背景:由于需要清理mongodb的数据,需要写一个简单的程序来复制一个集合的数据,集合大概有2亿条数据,有效数据只有50多万,这50条的主键都写在txt文件中,所以我想写一个productor从txt文件中读数据,然后写5个consumed来做查询和存储工作。但是代码完成了之后,没次都不能读完txt,大概几千条之后就卡了。 附带问题: consumed是怎么结束的,chan没数据了自动结束么?我打印的日志有时候会显示某个consumed结束了。 最后万分感谢各位大神,小白接触go才2周,比较紧要没有系统的去学习go,也没用过这个markdown格式,排版很丑所以给大家添麻烦了。 package main import ( "io/ioutil" "log" "strings" "fmt" "gopkg.in/mgo.v2" "os" "bufio" "io" "errors" "regexp" "time" ) var MongodbAddress string var fileDir string //初始化conf文件以及对应变量 ```func init() {``` content, err := ioutil.ReadFile("config/conf") if err != nil { log.Fatal("Cant't open config/conf:", err) } lines := strings.Split(string(content), "\n") section := "" for _, line := range (lines) { if strings.HasPrefix(line, "[") { section = strings.Trim(strings.TrimSpace(line), "[]") continue } if section == "" { fmt.Printf("Line \"%s\" has no section", line) continue } token := strings.Split(line, "=") if len(token) < 2 { fmt.Printf("Line \"%s\" has equation", line) continue } field := strings.TrimSpace(token[0]) value := "" if len(token) > 2 { value = strings.TrimSpace(strings.Join(token[1:], "=")) fmt.Println(value) } else { value = strings.TrimSpace(token[1]) } switch section { case "go": switch field { case "MongodbUrl": MongodbAddress = value fmt.Println(MongodbAddress) case "fileDir": fileDir = value fmt.Println(fileDir) } } } } var AdCollection *mgo.Collection //ad 集合 var AdTempCollection *mgo.Collection //ad_temp 用于存放有效数据 var waitQ = true ```func main() {``` session, err := mgo.Dial(MongodbAddress) if err != nil { log.Fatalln("Failed connect to mongoDB ........." + err.Error()) } defer session.Close() session.SetMode(mgo.Monotonic, true) AdCollection = session.DB("faceless").C("ad") AdTempCollection = session.DB("faceless").C("adTemp") log.Println("get collection ad and ad_temp now ,start to read file of pks............") ch := make(chan string,5) defer close(ch) go produce("生产者",ch) time.Sleep(1000*time.Millisecond) go consumed("消费者1",ch) go consumed("消费者2",ch) go consumed("消费者3",ch) go consumed("消费者4",ch) go consumed("消费者5",ch) for { if waitQ == false{ break } } fmt.Println(waitQ) } //生产者 逐行读取日志文件 ```func produce(line string,ch chan string )error{``` f, err := os.Open(fileDir) if err != nil { fmt.Println("生产者已经关闭") return err } buf := bufio.NewReader(f) for { line, _, err := buf.ReadLine() l := strings.TrimSpace(string(line)) if err != nil { if err == io.EOF { fmt.Println(err) waitQ = true fmt.Println("生产者已经关闭") return nil } fmt.Println(err) fmt.Println("生产者已经关闭") return err } l, err = DealLine(l) if err != nil { //log.Println("failed to format " + " with :" + l +" and error: "+err.Error()) }else { ch <- l } } fmt.Println("生产者已经关闭") return nil } //消费者 ```func consumed(name string, ch chan string){``` for i:= range ch{ log.Println(name+": "+i) err:= InsertDB(i) if err!=nil{ //fmt.Println("消费者已经关闭"+name) //return err } } fmt.Println("消费者已经关闭:"+name) //return nil } type Url struct { L string W int V int } type Ad struct { Pk string "_id" Urls []Url ",omitempty" City2 string ",omitempty" Urls2 map[string][]Url ",omitempty" UserId int "userId" Site string City string ",omitempty" ActId int "actId,omitempty" ActName string "actName,omitempty" Domain string Kind string Data map[string]string Status bool Ctime int64 } //插入新的表 ```func InsertDB(line string) error {``` ad := new(Ad) AdCollection.FindId(line).One(&ad) if ad.Pk =="" { //log.Println("发生一个错误,未找到数据: "+line) return nil } err:=AdTempCollection.Insert(ad) return err } //正则 2-8位任意数字或者字母 var reg = regexp.MustCompile(`^[A-Za-z0-9]{2,8}$`) //格式化处理id ```func DealLine(l string) (string, error) {``` if strings.Index(l," ") < 0 { return l,errors.New("老铁你这居然不含空格") } ls := strings.Split(l," ") if reg.MatchString(ls[0]) { return ls[0],nil } return ls[0], errors.New("老铁你格式明显不对啊: " + ls[0]) }

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1145 次点击  
加入收藏 微博
5 回复  |  直到 2018-06-20 12:10:50
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传