5.10 Go语言文本大数据处理(1):读入、清洗、分类

Amiee7 · 2019-03-18 09:11:34 · 5473 次点击 · 大约8小时之前 开始浏览    置顶
这是一个创建于 2019-03-18 09:11:34 的主题,其中的信息可能已经有所发展或是发生改变。

本篇文章IT兄弟连GO语言学院小美

给读者们分享一下5.10 Go语言文本大数据处理(1):读入、清洗、分类

对GO语言感兴趣想要学习Golang开发技术的小伙伴就随小编来了解一下吧。 需求分析 在这里插入图片描述

  • 如图是一个大小达到1.3G的纯文本大数据,其中记录了情人节当天帝各大宾馆的开房数据~
  • 我们对这一大数据文本进行读取、清洗、分类、入库;

读取文本大数据

  • 通过ioutil.ReadFile(filename)API可以傻瓜式地一次性读入全部数据
//一次性将全部数据载入内存(不可取)
func main011() {
    //一次性将全部数据载入内存(不可取)
    contentBytes, err := ioutil.ReadFile(`D:\GoIP\腾讯课堂公开课2019\数据\kaifangX.txt`)
    if err != nil {
        fmt.Println("读入失败", err)
    }
    contentStr := string(contentBytes)

    //以换行符为定界符,将大文本炸碎为行的切片
    lineStrs := strings.Split(contentStr, "\n\r")

    //逐行打印
    for _, lineStr := range lineStrs {
        //fmt.Println(lineStr)
        newStr, _ := ConvertEncoding(lineStr, "GBK", "UTF-8")
        fmt.Println(newStr)
    }
}
  • 然而考虑到数据总量在1G以上,一次性读入内存,不但增加内存压力,产生爆内存的风险,还会长时间阻塞,效率极低;
  • 正确的做法是,使用缓冲区进行逐行读取:
//缓冲式读取
func main012() {

    //打开文件
    file, _ := os.Open(`D:\GoIP\腾讯课堂公开课2019\数据\kaifangX.txt`)
    defer file.Close()

    //创建文件的缓冲读取器
    reader := bufio.NewReader(file)

    for {

        //逐行读取
        lineBytes, _, err := reader.ReadLine()

        //读到文件末尾时退出循环
        if err == io.EOF {
            break
        }

        //输出当前行内容
        gbkStr := string(lineBytes)
        utfStr, _ := ConvertEncoding(gbkStr, "GBK", "UTF-8")
        fmt.Println(utfStr)
    }

}
  • 该大数据文本的原始内容是GBK编码,为了在Goland中便于输出查看,我们在输出时对其进行了编码转换,这里我们用到了一个第三方包github.com/axgle/mahonia
/*
将文本转换字符编码并返回
srcStr        待转码的原始字符串
srcEncoding    原始字符串的编码(字符集)
dstEncoding    目标编码(字符集)
*/
func ConvertEncoding(srcStr string, srcEncoding string, dstEncoding string) (dstStr string, err error) {

    //创建指定字符集的解码器
    srcDecoder := mahonia.NewDecoder(srcEncoding)
    dstDecoder := mahonia.NewDecoder(dstEncoding)

    //将内容转换为UTF-8字符串
    utfStr := srcDecoder.ConvertString(srcStr)

    //将UTF-8字节转换为目标字符集的字节
    _, dstBytes, err := dstDecoder.Translate([]byte(utfStr), true)
    if err != nil {
        return
    }

    //还原为字符串并返回
    dstStr = string(dstBytes)
    return
}

数据清洗

  • 数据的来源各式各样,数据质量良莠不齐,在入库之前,我们需要对数据进行必要的清洗工作;
  • 在本例中,部分数据的身份证号码信息是错误非法的(例如长度),我们通过清洗对这部分数据进行筛除;
func main021() {

    //打开原始文本
    srcFile, _ := os.Open(`D:\GoIP\腾讯课堂公开课2019\数据\kaifangX.txt`)
    defer srcFile.Close()

    //准备一个优质数据文件
    goodFile, _ := os.OpenFile(`D:\GoIP\腾讯课堂公开课2019\数据\kaifang_good.txt`, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
    defer goodFile.Close()

    //准备一个不良数据文件
    badFile, _ := os.OpenFile(`D:\GoIP\腾讯课堂公开课2019\数据\kaifang_bad.txt`, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
    defer badFile.Close()

    //创建原始文件的缓冲式读取器
    reader := bufio.NewReader(srcFile)

    //循环读取
    for {

        //一次读取一行数据
        lineBytes, _, err := reader.ReadLine()

        //读到文件末尾时退出循环
        if err == io.EOF {
            break
        }

        //转码得到当前行的UTF-8字符串
        gbkStr := string(lineBytes)
        lineStr, _ := ConvertEncoding(gbkStr, "GBK", "UTF-8")
        //fmt.Println(lineStr)

        //使用逗号将当前行打散为子串
        fields := strings.Split(lineStr, ",")

        //如果第二个子串的长度为18位,我们认为它是一个合法的身份证信息
        if len(fields) > 1 && len(fields[1])==18{
            //摘取到另一个优质的数据文件中
            goodFile.WriteString(lineStr+"\n")
            fmt.Println("Good:",lineStr)
        }else{
            //不合法的写入不良数据文件中
            badFile.WriteString(lineStr+"\n")
            fmt.Println("Bad:",lineStr)
        }
    }

}
  • 技术栈主要是文件读写和字符串处理

数据分类

  • 此处我们根据身份证中的省份信息,将不同省份的开房者存入不同的文件中
  • 主要技术栈依然包含文件读写和字符串处理
  • 另外还用到了并发和管道,因为并发向34个省份文件中写入数据的效率显然是要高于一个线程的;

整体思路如下:

  • 主协程负责逐行读取文本大数据
  • 另外开设34条子协程,负责对不同省份文件进行写入,从34个不同的管道中扫描数据并写出文件;
  • 主协程根据身份证号反映的不同省份,将读入的信息丢入不同的管道,由对应的子协程进行文件写出;
  • 当文件读取完毕时,关闭所有的数据管道(通知子协程停止数据扫描);
  • 主协程通过等待组等待所有子协程完成任务; 代码如下:
package main

import (
    "bufio"
    "fmt"
    "io"
    "os"
    "strings"
    "sync"
)

/*省份结构体*/
type Province struct {
    //省的Id(身份证号的前两位)
    Id string

    //省份名称
    Name string

    //省份对应的文件
    File *os.File

    //写入该文件的数据管道
    chanData chan string
}

/*全局等待组*/
var (
    wg sync.WaitGroup
)

func main031() {

    //创建省份数据map,通过Id查询省信息
    pMap := make(map[string]*Province)

    /*
    为每个省创建一个文件
    为每个省创建一个Province实例,丢入map
    */
    ps := []string{"北京市11", "天津市12", "河北省13", "山西省14", "内蒙古自治区15", "辽宁省21", "吉林省22", "黑龙江省23", "上海市31", "江苏省32", "浙江省33", "安徽省34", "福建省35", "江西省36", "山东省37", "河南省41", "湖北省42", "湖南省43", "广东省44", "广西壮族自治区45", "海南省46", "重庆市50", "四川省51", "贵州省52", "云南省53", "西藏自治区54", "陕西省61", "甘肃省62", "青海省63", "宁夏回族自治区64", "新疆维吾尔自治区65", "台湾省71", "香港特别行政区81", "澳门特别行政区91",}
    for _, p := range ps {

        //截取省份名称和省份Id
        name := p[:len(p)-2]
        id := p[len(p)-2:]

        //创建省份对象,并丢入map
        province := Province{Id: id, Name: name}
        pMap[id] = &province

        //为每个省份关联一个预备写入的数据文件
        file, _ := os.OpenFile(`D:\GoIP\腾讯课堂公开课2019\数据\`+province.Name+".txt", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
        province.File = file
        defer file.Close()

        //为当前省份关联换一个数据管道
        province.chanData = make(chan string, 100)
    }

    /*为每个省创建一条数据写入协程,从各自的管道中读取数据并写入对应的文件*/
    for _, province := range pMap {

        //为当前省份创建一条数据写入协程
        wg.Add(1)
        go func(p *Province) {

            //执行数据写入
            writeFile(p)

            //标记协程结束
            wg.Done()
        }(province)
    }

    /*主协程读入数据,将不同省份的记录丢入对应的管道*/
    file, _ := os.Open(`D:\GoIP\腾讯课堂公开课2019\数据\kaifang_good.txt`)
    defer file.Close()

    //创建缓冲读取器
    reader := bufio.NewReader(file)
    for {

        //读取一行
        lineBytes, _, err := reader.ReadLine()

        //读取完毕时,关闭所有数据管道,并退出读取
        if err == io.EOF {
            fmt.Println("已经读到文件末尾!")

            /*遍历关闭所有数据管道(以通知子协程停止数据扫描)*/
            for _, province := range pMap {
                close(province.chanData)
                fmt.Println(province.Name, "管道已关闭", province.chanData)
            }
            break
        }

        //拿出省份ID
        lineStr := string(lineBytes)
        fieldsSlice := strings.Split(lineStr, ",")
        id := fieldsSlice[1][0:2]

        //根据Id查询得到省份,进而向其管道中写入当前行
        if province, ok := pMap[id]; ok {
            province.chanData <- (lineStr + "\n")
        } else {
            //这里其实也是不合法的数据
            fmt.Println("莫名其妙的省", id)
        }
    }

    //阻塞等待所有协程结束任务
    wg.Wait()
    fmt.Println("main over!")

}

/*扫描指定省份的管道,取出数据并写出到对应的文件中*/
func writeFile(province *Province) {
    //扫描管道中的数据,管道关闭时循环结束
    for lineStr := range province.chanData {
        province.File.WriteString(lineStr)
        //fmt.Print(province.Name, "写入", lineStr)
    }
    fmt.Println(province.Name, "管道遍历已结束", province.chanData)
}

想要了解更多关于GO语言开发方面内容的小伙伴,

请关注IT兄弟连官网、公众号:GO语言研习社,

IT兄弟连教育有专业的微软、谷歌讲师为您指导,

此外IT兄弟连老师精心推出的GO语言教程定能让你快速掌握GO语言从入门到精通开发实战技能。


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

5473 次点击  
加入收藏 微博
1 回复  |  直到 2020-03-22 10:22:08
tianly123
tianly123 · #1 · 5年之前

有资料码

添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传