golang mongo

Feng_Sir · · 1185 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

package main

import (
    "encoding/json"
    "gopkg.in/mgo.v2"
    "jy/mongodbutil"
    "log"
    mu "mfw/util"
    "net"
    "net/rpc"
    "path"
    qu "qfw/util"
    "strings"

    "gopkg.in/mgo.v2/bson"
)

var udpclient mu.UdpClient //udp对象
var Sysconfig map[string]interface{}
var MgoIP, MgoDB, MgoC, MgoFileFiled string
var ChanB chan bool

func init() {
    qu.ReadConfig(&Sysconfig)
    MgoIP = qu.ObjToString(Sysconfig["mongodb_one_ip"])
    MgoDB = qu.ObjToString(Sysconfig["mongodb_one_db"])
    MgoC = qu.ObjToString(Sysconfig["mongodb_one_c"])
    MgoFileFiled = qu.ObjToStringDef(Sysconfig["mongodb_one_filefiled"], "projectinfo")
    if strings.TrimSpace(MgoIP) == "" || strings.TrimSpace(MgoDB) == "" || strings.TrimSpace(MgoC) == "" {
        log.Println("获取配置文件参数失败", Sysconfig)
        return
    }
    mongodbutil.Mgo = mongodbutil.MgoFactory(qu.IntAllDef(Sysconfig["dbsize"], 5), 10, 120, MgoIP, MgoDB)
    ChanB = make(chan bool, qu.IntAllDef(Sysconfig["channelsize"], 5))
}

func main() {
    log.Println(Sysconfig)
    udpclient = mu.UdpClient{Local: Sysconfig["udpip"].(string) + ":" + Sysconfig["udpport"].(string), BufSize: 1024}
    udpclient.Listen(processUdpMsg)
    log.Printf("Udp listening port: %s:%s\n", Sysconfig["udpip"], Sysconfig["udpport"])
    b := make(chan bool, 1)
    <-b
}
//  "file2text": "192.168.3.207:1234",
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
    defer qu.Catch()
    switch act {
    case mu.OP_TYPE_DATA:
        var mapInfo map[string]interface{}
        err := json.Unmarshal(data, &mapInfo)
        if err != nil {
            log.Println("json err :", err, string(data))
            return
        }
        log.Println(mapInfo)
        gid := strings.TrimSpace(mapInfo["gtid"].(string))
        lid := strings.TrimSpace(mapInfo["lteid"].(string))
        if bson.IsObjectIdHex(gid) && bson.IsObjectIdHex(lid) {

            MgoSession, err := mgo.Dial(MgoIP)
            defer MgoSession.Close()
            if err != nil {
                log.Println("mongo err:",err)
                return
            }
            iter := MgoSession.DB(MgoDB).C(MgoC).Find(
                bson.M{
                    "_id": bson.M{
                        "$gte": bson.ObjectIdHex(gid),
                        "$lte": bson.ObjectIdHex(lid),
                    },
                    MgoFileFiled: bson.M{
                        "$ne": nil,
                    },
                },).Select(bson.M{"_id": 1,MgoFileFiled:1}).Iter()

            //if findAll, b := mongodbutil.Mgo.Find(MgoC,
            //  bson.M{
            //      "_id": bson.M{
            //          "$gte": bson.ObjectIdHex(gid),
            //          "$lte": bson.ObjectIdHex(lid),
            //      },
            //      MgoFileFiled: bson.M{
            //          "$ne": nil,
            //      },
            //  },
            //  //if findAll, b := mongodbutil.Mgo.Find(MgoC, bson.M{"_id": bson.M{"$gte": bson.ObjectIdHex(gid), "$lte": bson.ObjectIdHex(lid)}},
            //  nil, `{"_id":"1",`+MgoFileFiled+`:"1"}`, false, -1, -1); !b {
            //  log.Println("查询数据失败 :", string(data))
            //} else {
            var result *map[string]interface{}
            for iter.Next(&result){
                //for _, v := range *result {
                    qmap := *qu.ObjToMap(result)
                    mid := qmap["_id"]
                    if v, ok := qmap[MgoFileFiled].(map[string]interface{}); !ok {
                        mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
                            "$set": bson.M{
                                "updatefileErr": 1,
                            },})
                        //log.Println(mid, "mgo 转换异常", MgoFileFiled)
                        continue
                    } else {
                        switch v["attachments"].(type) {
                        case map[string]interface{}:
                            att := v["attachments"].(map[string]interface{})
                            for _, vaatt := range att {
                                if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
                                    mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
                                        "$set": bson.M{
                                            "updatefileErr": 1,
                                        },})
                                    //log.Println(mid, "mgo 结构体转换失败", vaatt)
                                    continue
                                } else {
                                    ChanB <- true
                                    go save(mid, qmap, fileinfo)

                                }
                            }
                        }
                    }
                    //fileMap := *qu.ObjToMap(qmap["projectinfo"])
                    //fmt.Println(fileMap["attachments"])
                }
            //}
            defer iter.Close()

            //fmt.Println(len(*findAll))
                //if len(*findAll) <= 0 {
                //  log.Println("查询数据为空 :", string(data))
                //  return
                //}
                //for _, v := range *findAll {
                //  qmap := *qu.ObjToMap(v)
                //  mid := qmap["_id"]
                //  if v, ok := qmap[MgoFileFiled].(map[string]interface{}); !ok {
                //      log.Println(mid, "mgo 转换异常", MgoFileFiled)
                //      continue
                //  } else {
                //      switch v["attachments"].(type) {
                //      case map[string]interface{}:
                //          att := v["attachments"].(map[string]interface{})
                //          for _, vaatt := range att {
                //              if fileinfo, ok := vaatt.(map[string]interface{}); !ok {
                //                  log.Println(mid, "mgo 结构体转换失败", vaatt)
                //                  continue
                //              } else {
                //                  ChanB <- true
                //                  go save(mid, qmap, fileinfo)
                //
                //              }
                //          }
                //      }
                //  }
                //  //fileMap := *qu.ObjToMap(qmap["projectinfo"])
                //  //fmt.Println(fileMap["attachments"])
                //}
            //}
        } else {
            log.Println("开始id或结束id参数错误:", string(data))
        }

    case mu.OP_NOOP: //下个节点回应
        log.Println("接收成功", string(data))

    }

}
func save(mid interface{}, qmap, fileinfo map[string]interface{}) {
    defer qu.Catch()
    defer func() {
        <-ChanB
    }()
    type FileData struct {
        Fid     string
        Name    string
        Type    string //文件类型png、jpg、tif、swf(ocr识别);pdf,doc,docx,xls
        Content string //识别内容
    }
    client, err := rpc.DialHTTP("tcp", qu.ObjToString(Sysconfig["file2text"]))
    if err != nil {
        mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
            "$set": bson.M{
                "updatefileErr": 1,
            },})
        log.Println(mid, "rpc err :", err)
        return
    }
    defer client.Close()
    var reply []byte
    //bs, _ := ioutil.ReadFile("1.docx")
    fileData := &FileData{
        Name: qu.ObjToString(fileinfo["filename"]),
        Fid:  qu.ObjToString(fileinfo["fid"]), //附件id
        Type: path.Ext(qu.ObjToString(fileinfo["filename"]))[1:],
    }
    //log.Println(mid, fileData)
    err = client.Call("FileToText.FileToContext", fileData, &reply)
    if err != nil {
        mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
            "$set": bson.M{
                "updatefileErr": 1,
            },})
        log.Println(mid, "call ocr error:", err)
        return
    }
    //fileinfo["ftype"] = "doc"
    //reply = []byte("jdsfkldasjflkj")
    //fileinfo["ftype"] = "zip"
    //testfiles := []map[string]interface {
    //}{
    //  {"Name": "test4.doc", "Content": "test4context", "Type": "doc", "Size": "40M"},
    //  {"Name": "test5.pdf", "Content": "test5context", "Type": "pdf", "Size": "50M"},
    //  {"Name": "test6.xlsx", "Content": "test6context", "Type": "xlsx", "Size": "60M"},
    //}
    //reply, _ = json.Marshal(testfiles)
    if len(reply) == 0{
        mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
            "$set": bson.M{
                "updatefileErr": 1,
            },})
        log.Println(mid, "rpc返回数据为空:", string(reply))
        return
    }
    log.Println(mid, string(reply))
    rdata := make(map[string]interface{})
    if err := json.Unmarshal(reply, &rdata); err != nil {
        mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
            "$set": bson.M{
                "updatefileErr": 1,
            },})
        log.Println(mid, "rpc返回数据解析失败:", err)
        return
    }
    if rdata["err"] == nil || rdata["err"] == "null" || rdata["err"] == "" {
        if qu.ObjToString(fileinfo["ftype"]) == "rar" || qu.ObjToString(fileinfo["ftype"]) == "zip" {
            fileinfo["content"] = rdata["contextc"]
        } else {
            fileinfo["content"] = rdata["context"]
        }
        if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
            "$set": bson.M{
                MgoFileFiled: qmap[MgoFileFiled],
                "updatefileErr":0,
            },
        }) {
            mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
                "$set": bson.M{
                    "updatefileErr": 1,
                },})
            log.Println(mid, "mongo更新数据失败")
        } else {
            log.Println(mid, "mongo更新数据成功")
        }
    } else {
        mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
            "$set": bson.M{
                "updatefileErr": 1,
            },})
        log.Println(mid, "调用rpc服务解析异常:", rdata["err"])
    }
    //if qu.ObjToString(fileinfo["ftype"]) == "zip" || qu.ObjToString(fileinfo["ftype"]) == "rar" {
    //  fileDatas := make([]map[string]interface{}, 0)
    //  if err := json.Unmarshal(reply, &fileDatas); err != nil {
    //      log.Println("json转换错误", mid, err)
    //      return
    //  }
    //  fileinfo["content"] = fileDatas
    //} else {
    //  fileinfo["content"] = string(reply)
    //}
    //if !mongodbutil.Mgo.UpdateById(MgoC, mid, bson.M{
    //  "$set": bson.M{
    //      MgoFileFiled: qmap[MgoFileFiled],
    //  },
    //}) {
    //  log.Println(mid, "更新数据失败")
    //} else {
    //  log.Println(mid, "更新数据成功")
    //}

}


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:Feng_Sir

查看原文:golang mongo

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1185 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传