本篇文章IT兄弟连GO语言学院小美
给读者们分享一下5.10 Go语言文本大数据处理(1):读入、清洗、分类
对GO语言感兴趣想要学习Golang开发技术的小伙伴就随小编来了解一下吧。
**需求分析**
![在这里插入图片描述](https://img-blog.csdnimg.cn/20190218145213515.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTA5ODY3NzY=,size_16,color_FFFFFF,t_70)
- 如图是一个大小达到1.3G的纯文本大数据,其中记录了情人节当天帝各大宾馆的开房数据~
- 我们对这一大数据文本进行读取、清洗、分类、入库;
**读取文本大数据**
- 通过`ioutil.ReadFile(filename)`API可以傻瓜式地一次性读入全部数据
```
//一次性将全部数据载入内存(不可取)
func main011() {
//一次性将全部数据载入内存(不可取)
contentBytes, err := ioutil.ReadFile(`D:\GoIP\腾讯课堂公开课2019\数据\kaifangX.txt`)
if err != nil {
fmt.Println("读入失败", err)
}
contentStr := string(contentBytes)
//以换行符为定界符,将大文本炸碎为行的切片
lineStrs := strings.Split(contentStr, "\n\r")
//逐行打印
for _, lineStr := range lineStrs {
//fmt.Println(lineStr)
newStr, _ := ConvertEncoding(lineStr, "GBK", "UTF-8")
fmt.Println(newStr)
}
}
```
- 然而考虑到数据总量在1G以上,一次性读入内存,不但增加内存压力,产生爆内存的风险,还会长时间阻塞,效率极低;
- 正确的做法是,使用缓冲区进行逐行读取:
```
//缓冲式读取
func main012() {
//打开文件
file, _ := os.Open(`D:\GoIP\腾讯课堂公开课2019\数据\kaifangX.txt`)
defer file.Close()
//创建文件的缓冲读取器
reader := bufio.NewReader(file)
for {
//逐行读取
lineBytes, _, err := reader.ReadLine()
//读到文件末尾时退出循环
if err == io.EOF {
break
}
//输出当前行内容
gbkStr := string(lineBytes)
utfStr, _ := ConvertEncoding(gbkStr, "GBK", "UTF-8")
fmt.Println(utfStr)
}
}
```
- 该大数据文本的原始内容是GBK编码,为了在Goland中便于输出查看,我们在输出时对其进行了编码转换,这里我们用到了一个第三方包`github.com/axgle/mahonia`
```
/*
将文本转换字符编码并返回
srcStr 待转码的原始字符串
srcEncoding 原始字符串的编码(字符集)
dstEncoding 目标编码(字符集)
*/
func ConvertEncoding(srcStr string, srcEncoding string, dstEncoding string) (dstStr string, err error) {
//创建指定字符集的解码器
srcDecoder := mahonia.NewDecoder(srcEncoding)
dstDecoder := mahonia.NewDecoder(dstEncoding)
//将内容转换为UTF-8字符串
utfStr := srcDecoder.ConvertString(srcStr)
//将UTF-8字节转换为目标字符集的字节
_, dstBytes, err := dstDecoder.Translate([]byte(utfStr), true)
if err != nil {
return
}
//还原为字符串并返回
dstStr = string(dstBytes)
return
}
```
**数据清洗**
- 数据的来源各式各样,数据质量良莠不齐,在入库之前,我们需要对数据进行必要的清洗工作;
- 在本例中,部分数据的身份证号码信息是错误非法的(例如长度),我们通过清洗对这部分数据进行筛除;
```
func main021() {
//打开原始文本
srcFile, _ := os.Open(`D:\GoIP\腾讯课堂公开课2019\数据\kaifangX.txt`)
defer srcFile.Close()
//准备一个优质数据文件
goodFile, _ := os.OpenFile(`D:\GoIP\腾讯课堂公开课2019\数据\kaifang_good.txt`, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
defer goodFile.Close()
//准备一个不良数据文件
badFile, _ := os.OpenFile(`D:\GoIP\腾讯课堂公开课2019\数据\kaifang_bad.txt`, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
defer badFile.Close()
//创建原始文件的缓冲式读取器
reader := bufio.NewReader(srcFile)
//循环读取
for {
//一次读取一行数据
lineBytes, _, err := reader.ReadLine()
//读到文件末尾时退出循环
if err == io.EOF {
break
}
//转码得到当前行的UTF-8字符串
gbkStr := string(lineBytes)
lineStr, _ := ConvertEncoding(gbkStr, "GBK", "UTF-8")
//fmt.Println(lineStr)
//使用逗号将当前行打散为子串
fields := strings.Split(lineStr, ",")
//如果第二个子串的长度为18位,我们认为它是一个合法的身份证信息
if len(fields) > 1 && len(fields[1])==18{
//摘取到另一个优质的数据文件中
goodFile.WriteString(lineStr+"\n")
fmt.Println("Good:",lineStr)
}else{
//不合法的写入不良数据文件中
badFile.WriteString(lineStr+"\n")
fmt.Println("Bad:",lineStr)
}
}
}
```
- 技术栈主要是文件读写和字符串处理
**数据分类**
- 此处我们根据身份证中的省份信息,将不同省份的开房者存入不同的文件中
- 主要技术栈依然包含文件读写和字符串处理
- 另外还用到了并发和管道,因为并发向34个省份文件中写入数据的效率显然是要高于一个线程的;
整体思路如下:
- 主协程负责逐行读取文本大数据
- 另外开设34条子协程,负责对不同省份文件进行写入,从34个不同的管道中扫描数据并写出文件;
- 主协程根据身份证号反映的不同省份,将读入的信息丢入不同的管道,由对应的子协程进行文件写出;
- 当文件读取完毕时,关闭所有的数据管道(通知子协程停止数据扫描);
- 主协程通过等待组等待所有子协程完成任务;
代码如下:
```
package main
import (
"bufio"
"fmt"
"io"
"os"
"strings"
"sync"
)
/*省份结构体*/
type Province struct {
//省的Id(身份证号的前两位)
Id string
//省份名称
Name string
//省份对应的文件
File *os.File
//写入该文件的数据管道
chanData chan string
}
/*全局等待组*/
var (
wg sync.WaitGroup
)
func main031() {
//创建省份数据map,通过Id查询省信息
pMap := make(map[string]*Province)
/*
为每个省创建一个文件
为每个省创建一个Province实例,丢入map
*/
ps := []string{"北京市11", "天津市12", "河北省13", "山西省14", "内蒙古自治区15", "辽宁省21", "吉林省22", "黑龙江省23", "上海市31", "江苏省32", "浙江省33", "安徽省34", "福建省35", "江西省36", "山东省37", "河南省41", "湖北省42", "湖南省43", "广东省44", "广西壮族自治区45", "海南省46", "重庆市50", "四川省51", "贵州省52", "云南省53", "西藏自治区54", "陕西省61", "甘肃省62", "青海省63", "宁夏回族自治区64", "新疆维吾尔自治区65", "台湾省71", "香港特别行政区81", "澳门特别行政区91",}
for _, p := range ps {
//截取省份名称和省份Id
name := p[:len(p)-2]
id := p[len(p)-2:]
//创建省份对象,并丢入map
province := Province{Id: id, Name: name}
pMap[id] = &province
//为每个省份关联一个预备写入的数据文件
file, _ := os.OpenFile(`D:\GoIP\腾讯课堂公开课2019\数据\`+province.Name+".txt", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
province.File = file
defer file.Close()
//为当前省份关联换一个数据管道
province.chanData = make(chan string, 100)
}
/*为每个省创建一条数据写入协程,从各自的管道中读取数据并写入对应的文件*/
for _, province := range pMap {
//为当前省份创建一条数据写入协程
wg.Add(1)
go func(p *Province) {
//执行数据写入
writeFile(p)
//标记协程结束
wg.Done()
}(province)
}
/*主协程读入数据,将不同省份的记录丢入对应的管道*/
file, _ := os.Open(`D:\GoIP\腾讯课堂公开课2019\数据\kaifang_good.txt`)
defer file.Close()
//创建缓冲读取器
reader := bufio.NewReader(file)
for {
//读取一行
lineBytes, _, err := reader.ReadLine()
//读取完毕时,关闭所有数据管道,并退出读取
if err == io.EOF {
fmt.Println("已经读到文件末尾!")
/*遍历关闭所有数据管道(以通知子协程停止数据扫描)*/
for _, province := range pMap {
close(province.chanData)
fmt.Println(province.Name, "管道已关闭", province.chanData)
}
break
}
//拿出省份ID
lineStr := string(lineBytes)
fieldsSlice := strings.Split(lineStr, ",")
id := fieldsSlice[1][0:2]
//根据Id查询得到省份,进而向其管道中写入当前行
if province, ok := pMap[id]; ok {
province.chanData <- (lineStr + "\n")
} else {
//这里其实也是不合法的数据
fmt.Println("莫名其妙的省", id)
}
}
//阻塞等待所有协程结束任务
wg.Wait()
fmt.Println("main over!")
}
/*扫描指定省份的管道,取出数据并写出到对应的文件中*/
func writeFile(province *Province) {
//扫描管道中的数据,管道关闭时循环结束
for lineStr := range province.chanData {
province.File.WriteString(lineStr)
//fmt.Print(province.Name, "写入", lineStr)
}
fmt.Println(province.Name, "管道遍历已结束", province.chanData)
}
```
想要了解更多关于GO语言开发方面内容的小伙伴,
请关注IT兄弟连官网、公众号:GO语言研习社,
IT兄弟连教育有专业的微软、谷歌讲师为您指导,
此外IT兄弟连老师精心推出的GO语言教程定能让你快速掌握GO语言从入门到精通开发实战技能。