5.10 Go语言文本大数据处理(1):读入、清洗、分类

Amiee7 · · 5247 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

本篇文章IT兄弟连GO语言学院小美 给读者们分享一下5.10 Go语言文本大数据处理(1):读入、清洗、分类 对GO语言感兴趣想要学习Golang开发技术的小伙伴就随小编来了解一下吧。 **需求分析** ![在这里插入图片描述](https://img-blog.csdnimg.cn/20190218145213515.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTA5ODY3NzY=,size_16,color_FFFFFF,t_70) - 如图是一个大小达到1.3G的纯文本大数据,其中记录了情人节当天帝各大宾馆的开房数据~ - 我们对这一大数据文本进行读取、清洗、分类、入库; **读取文本大数据** - 通过`ioutil.ReadFile(filename)`API可以傻瓜式地一次性读入全部数据 ``` //一次性将全部数据载入内存(不可取) func main011() { //一次性将全部数据载入内存(不可取) contentBytes, err := ioutil.ReadFile(`D:\GoIP\腾讯课堂公开课2019\数据\kaifangX.txt`) if err != nil { fmt.Println("读入失败", err) } contentStr := string(contentBytes) //以换行符为定界符,将大文本炸碎为行的切片 lineStrs := strings.Split(contentStr, "\n\r") //逐行打印 for _, lineStr := range lineStrs { //fmt.Println(lineStr) newStr, _ := ConvertEncoding(lineStr, "GBK", "UTF-8") fmt.Println(newStr) } } ``` - 然而考虑到数据总量在1G以上,一次性读入内存,不但增加内存压力,产生爆内存的风险,还会长时间阻塞,效率极低; - 正确的做法是,使用缓冲区进行逐行读取: ``` //缓冲式读取 func main012() { //打开文件 file, _ := os.Open(`D:\GoIP\腾讯课堂公开课2019\数据\kaifangX.txt`) defer file.Close() //创建文件的缓冲读取器 reader := bufio.NewReader(file) for { //逐行读取 lineBytes, _, err := reader.ReadLine() //读到文件末尾时退出循环 if err == io.EOF { break } //输出当前行内容 gbkStr := string(lineBytes) utfStr, _ := ConvertEncoding(gbkStr, "GBK", "UTF-8") fmt.Println(utfStr) } } ``` - 该大数据文本的原始内容是GBK编码,为了在Goland中便于输出查看,我们在输出时对其进行了编码转换,这里我们用到了一个第三方包`github.com/axgle/mahonia` ``` /* 将文本转换字符编码并返回 srcStr 待转码的原始字符串 srcEncoding 原始字符串的编码(字符集) dstEncoding 目标编码(字符集) */ func ConvertEncoding(srcStr string, srcEncoding string, dstEncoding string) (dstStr string, err error) { //创建指定字符集的解码器 srcDecoder := mahonia.NewDecoder(srcEncoding) dstDecoder := mahonia.NewDecoder(dstEncoding) //将内容转换为UTF-8字符串 utfStr := srcDecoder.ConvertString(srcStr) //将UTF-8字节转换为目标字符集的字节 _, dstBytes, err := dstDecoder.Translate([]byte(utfStr), true) if err != nil { return } //还原为字符串并返回 dstStr = string(dstBytes) return } ``` **数据清洗** - 数据的来源各式各样,数据质量良莠不齐,在入库之前,我们需要对数据进行必要的清洗工作; - 在本例中,部分数据的身份证号码信息是错误非法的(例如长度),我们通过清洗对这部分数据进行筛除; ``` func main021() { //打开原始文本 srcFile, _ := os.Open(`D:\GoIP\腾讯课堂公开课2019\数据\kaifangX.txt`) defer srcFile.Close() //准备一个优质数据文件 goodFile, _ := os.OpenFile(`D:\GoIP\腾讯课堂公开课2019\数据\kaifang_good.txt`, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) defer goodFile.Close() //准备一个不良数据文件 badFile, _ := os.OpenFile(`D:\GoIP\腾讯课堂公开课2019\数据\kaifang_bad.txt`, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) defer badFile.Close() //创建原始文件的缓冲式读取器 reader := bufio.NewReader(srcFile) //循环读取 for { //一次读取一行数据 lineBytes, _, err := reader.ReadLine() //读到文件末尾时退出循环 if err == io.EOF { break } //转码得到当前行的UTF-8字符串 gbkStr := string(lineBytes) lineStr, _ := ConvertEncoding(gbkStr, "GBK", "UTF-8") //fmt.Println(lineStr) //使用逗号将当前行打散为子串 fields := strings.Split(lineStr, ",") //如果第二个子串的长度为18位,我们认为它是一个合法的身份证信息 if len(fields) > 1 && len(fields[1])==18{ //摘取到另一个优质的数据文件中 goodFile.WriteString(lineStr+"\n") fmt.Println("Good:",lineStr) }else{ //不合法的写入不良数据文件中 badFile.WriteString(lineStr+"\n") fmt.Println("Bad:",lineStr) } } } ``` - 技术栈主要是文件读写和字符串处理 **数据分类** - 此处我们根据身份证中的省份信息,将不同省份的开房者存入不同的文件中 - 主要技术栈依然包含文件读写和字符串处理 - 另外还用到了并发和管道,因为并发向34个省份文件中写入数据的效率显然是要高于一个线程的; 整体思路如下: - 主协程负责逐行读取文本大数据 - 另外开设34条子协程,负责对不同省份文件进行写入,从34个不同的管道中扫描数据并写出文件; - 主协程根据身份证号反映的不同省份,将读入的信息丢入不同的管道,由对应的子协程进行文件写出; - 当文件读取完毕时,关闭所有的数据管道(通知子协程停止数据扫描); - 主协程通过等待组等待所有子协程完成任务; 代码如下: ``` package main import ( "bufio" "fmt" "io" "os" "strings" "sync" ) /*省份结构体*/ type Province struct { //省的Id(身份证号的前两位) Id string //省份名称 Name string //省份对应的文件 File *os.File //写入该文件的数据管道 chanData chan string } /*全局等待组*/ var ( wg sync.WaitGroup ) func main031() { //创建省份数据map,通过Id查询省信息 pMap := make(map[string]*Province) /* 为每个省创建一个文件 为每个省创建一个Province实例,丢入map */ ps := []string{"北京市11", "天津市12", "河北省13", "山西省14", "内蒙古自治区15", "辽宁省21", "吉林省22", "黑龙江省23", "上海市31", "江苏省32", "浙江省33", "安徽省34", "福建省35", "江西省36", "山东省37", "河南省41", "湖北省42", "湖南省43", "广东省44", "广西壮族自治区45", "海南省46", "重庆市50", "四川省51", "贵州省52", "云南省53", "西藏自治区54", "陕西省61", "甘肃省62", "青海省63", "宁夏回族自治区64", "新疆维吾尔自治区65", "台湾省71", "香港特别行政区81", "澳门特别行政区91",} for _, p := range ps { //截取省份名称和省份Id name := p[:len(p)-2] id := p[len(p)-2:] //创建省份对象,并丢入map province := Province{Id: id, Name: name} pMap[id] = &province //为每个省份关联一个预备写入的数据文件 file, _ := os.OpenFile(`D:\GoIP\腾讯课堂公开课2019\数据\`+province.Name+".txt", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) province.File = file defer file.Close() //为当前省份关联换一个数据管道 province.chanData = make(chan string, 100) } /*为每个省创建一条数据写入协程,从各自的管道中读取数据并写入对应的文件*/ for _, province := range pMap { //为当前省份创建一条数据写入协程 wg.Add(1) go func(p *Province) { //执行数据写入 writeFile(p) //标记协程结束 wg.Done() }(province) } /*主协程读入数据,将不同省份的记录丢入对应的管道*/ file, _ := os.Open(`D:\GoIP\腾讯课堂公开课2019\数据\kaifang_good.txt`) defer file.Close() //创建缓冲读取器 reader := bufio.NewReader(file) for { //读取一行 lineBytes, _, err := reader.ReadLine() //读取完毕时,关闭所有数据管道,并退出读取 if err == io.EOF { fmt.Println("已经读到文件末尾!") /*遍历关闭所有数据管道(以通知子协程停止数据扫描)*/ for _, province := range pMap { close(province.chanData) fmt.Println(province.Name, "管道已关闭", province.chanData) } break } //拿出省份ID lineStr := string(lineBytes) fieldsSlice := strings.Split(lineStr, ",") id := fieldsSlice[1][0:2] //根据Id查询得到省份,进而向其管道中写入当前行 if province, ok := pMap[id]; ok { province.chanData <- (lineStr + "\n") } else { //这里其实也是不合法的数据 fmt.Println("莫名其妙的省", id) } } //阻塞等待所有协程结束任务 wg.Wait() fmt.Println("main over!") } /*扫描指定省份的管道,取出数据并写出到对应的文件中*/ func writeFile(province *Province) { //扫描管道中的数据,管道关闭时循环结束 for lineStr := range province.chanData { province.File.WriteString(lineStr) //fmt.Print(province.Name, "写入", lineStr) } fmt.Println(province.Name, "管道遍历已结束", province.chanData) } ``` 想要了解更多关于GO语言开发方面内容的小伙伴, 请关注IT兄弟连官网、公众号:GO语言研习社, IT兄弟连教育有专业的微软、谷歌讲师为您指导, 此外IT兄弟连老师精心推出的GO语言教程定能让你快速掌握GO语言从入门到精通开发实战技能。

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

5247 次点击  
加入收藏 微博
1 回复  |  直到 2020-03-22 10:22:08
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传