案例需求:我们的监测系统会定期的检查配置文件的变动,这些配置文件放置在一个独立的文件夹下面,我们可以通过对于整个的文件夹内所有文件进行md5的计算来完成监测,本文就通过Go语言实现了一个命令行工具,完成上述的需求。
1. 单一文件的md5计算
我们首先将需求任务进行分解,既然需要计算文件夹下的所有文件md5值,我们必须先考虑如何实现单一文件的md5值计算。
下面就是一个简单的md5求值程序,这里我们通过参数传递进去需要计算的文件,然后调用go语言提供的内置的crypto包中的函数来完成取值,计算得出的结果使用16进制的方式打印出来。
package main
import (
"crypto/md5"
"fmt"
"io/ioutil"
"os"
)
func Md5SumFile(file string) (value [md5.Size]byte, err error) {
data, err := ioutil.ReadFile(file)
if err != nil {
return
}
value = md5.Sum(data)
return
}
func main() {
if len(os.Args) < 2 {
fmt.Println("Usage: ./md5file yourfile")
return
}
md5Value, err := Md5SumFile(os.Args[1])
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Printf("%x %s\n", md5Value, os.Args[1])
}
执行程序输出如下面所示:
$ go run src/md5_files.go src/test.txt
fc3ff98e8c6a0d3087d515c0473f8677 src/test.txt
2 文件的遍历与计算
2.1 参数处理
如果我们传递进去的是一个文件夹则如何处理呢?这里我们先通过改造原始的程序使得程序可以接收不同的参数类型比如-d代表了后面的为一个目录文件,-f代表了传递的是一个文件,这里我们需要借助于golang的flags包来完成参数的解析,解析完毕后我们就可以按照对应的方式处理参数了。
var directory, file *string
func init() {
directory = flag.String("d", "", "The directory contains all the files that need to calculate the md5 value")
file = flag.String("f", "", "The file that need to caclulate the md5 value")
}
func main() {
flag.Parse()
if *directory == "" && *file == "" {
flag.Usage()
return
}
if *file != "" {
md5Value, err := Md5SumFile(*file)
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Printf("%x %s\n", md5Value, *file)
return
}
if *directory != "" {
result, err := Md5SumFolder(*directory)
if err != nil {
fmt.Println(err.Error())
return
}
var paths []string
for path := range result {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", result[path], path)
}
}
}
这里我们根据不同的参数类型进行处理,对于文件的话,我们按照第一个例子中的处理方式调用Md5SumFile()函数获得文件相应的md5值,而如果是文件夹类型的话,我们则需要新建一个处理函数,并且这个处理函数的返回值应该包含文件夹中所有文件和对应的md5值。
2.2 文件夹函数处理
整个的文件夹的处理函数如下面所示,包含了文件的遍历和调用md5取值的过程,最终的结果保存在一个map类型中返回。
func Md5SumFolder(folder string) (map[string][md5.Size]byte, error) {
result := make(map[string][md5.Size]byte)
err := filepath.Walk(folder, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
result[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return result, nil
}
2.3 汇总处理结果
我们还需要对于所有的文件md5值进行合并并且计算一个最终的md5值,这样我们只需要监测这个md5值是否发生变化,就能知道这个文件夹是否发生了变化。这里我们新加入一个-merge参数代表了输出单一的合并后的值,否则则输出整个文件夹下所有的md5值。
//init函数
merge = flag.Bool("merge", false, "Merging all md5 values to one (Folder type only)")
...
//main函数
if *merge == true {
var md5value string
for _, path := range paths {
md5value += fmt.Sprintf("%x", result[path])
}
fmt.Printf("%x %s\n", md5.Sum([]byte(md5value)), *directory)
} else {
for _, path := range paths {
fmt.Printf("%x %s\n", result[path], path)
}
}
这样我们可以测试函数执行情况如下,分别对于不同的类型参数进行取值:
$ go run src/md5_folder_support.go -f src/test.txt
01f73475c6d686cc2efd74a7c2a96df5 src/test.txt
$ go run src/md5_folder_support.go -d src
4adea23a39f58672f0258980d6c4208 src/md5_folder_support.go
fc3ff98e8c6a0d3087d515c0473f8677 src/test.txt
c7a96525778ec2daefb3e57c99b8c6c2 src/test/hello
7b9e14080b072115f287a7608d3b4aff src/test/test_sub/hello
de8ce01552ee663717249956f664c1f6 src/version/md5_files.go
$ go run src/md5_folder_support.go -d src -merge
b38c907db04043c7508b464378a5f632 src
3并行化取值
3.1 串行测试
如果只是使用上述的执行程序其实与直接编写一个shell脚本没什么太多区别,我们实测一个大小500Mb的文件夹的执行时间:
$ ./md5_folder_support -d /home/mike/Calibre\ 书库 -merge
d9c08b4a6149cb8d57740bd4482a820e /home/mike/Calibre 书库
1.36595528s
如果使用shell来直接执行的话,测试脚本如下:
$ start=$(date +'%s%N');find /home/mike/Calibre\ 书库 -type f -exec md5sum {} \;|sort -k 2|md5sum ;eclipe=$((($(date +'%s%N')-$start)));echo "$eclipe/1000000000"|bc -l
1.40261921400000000000
由于shell传递的是整个的sort产生的结果到md5所以显示的会不一致,但是流程基本上都涉及了,对于单个文件的取值,对于结果的排序,和汇总输出。我们可以看到基本上输出的花费时间是差不多的。
3.2 并行程序执行
我们将上面的串行执行程序,利用go的并行处理方式进行改写,改写后的文件夹处理函数代码如下:
func Md5SumFolder(folder string) (map[string][md5.Size]byte, error) {
returnValue := make(map[string][md5.Size]byte)
done := make(chan struct{})
defer close(done)
c := make(chan result)
errc := make(chan error, 1)
var wg sync.WaitGroup
go func() {
err := filepath.Walk(folder, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path: path, md5Sum: md5.Sum(data), err: err}:
case <-done:
}
wg.Done()
}()
select {
case <-done:
return errors.New("Canceled")
default:
return nil
}
})
errc <- err
go func() {
wg.Wait()
close(c)
}()
}()
for r := range c {
if r.err != nil {
return nil, r.err
}
returnValue[r.path] = r.md5Sum
}
if err := <-errc; err != nil {
return nil, err
}
return returnValue, nil
}
我们利用了sync包中的函数来完成等待过程,防止并行程序的提前退出。利用了一个名为done的channel来管理程序的退出,不管任何情况下,该通道的都会随着函数的退出而执行关闭操作,不再接收任何的输入进来。程序如下片段中,我们对于每一个新的文件,利用wg.Add()添加一个占位,这样如果我们不去手动的执行一个wg.Done()则主程序就会停止在wg.Wait()处。
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path: path, md5Sum: md5.Sum(data), err: err}:
case <-done:
}
wg.Done()
}()
对于程序执行一次跟之前相同的运算可以看出,新的并行程序的执行速度已经明显得到提升
go run src/main.go -d /home/mike/Calibre\ 书库 -merge
d9c08b4a6149cb8d57740bd4482a820e /home/mike/Calibre 书库
279.154729ms
3.3 并行限制
上述的例子中我们并未限制打开的文件数量,比如我们文件夹下有上万个文件,这样读入内存中的数据可能会非常的大,计算起来也要耗尽所有的内存,因此我们限制我们最大的读取数量比如限制最大打开10个文件进行计算,这样我们的程序需要重新设计一下。
我们先看一下程序最终实际的运行效果,基本上按照预期的时间处理完毕:
$ go run src/main.go -d /home/mike/Calibre\ 书库 -merge -max 1
d9c08b4a6149cb8d57740bd4482a820e /home/mike/Calibre 书库
1.241248438s
$ go run src/main.go -d /home/mike/Calibre\ 书库 -merge -max 2
d9c08b4a6149cb8d57740bd4482a820e /home/mike/Calibre 书库
831.018648ms
$ go run src/main.go -d /home/mike/Calibre\ 书库 -merge -max 3
d9c08b4a6149cb8d57740bd4482a820e /home/mike/Calibre 书库
520.293084ms
$ go run src/main.go -d /home/mike/Calibre\ 书库 -merge
d9c08b4a6149cb8d57740bd4482a820e /home/mike/Calibre 书库
228.959296ms
4 最终程序清单如下
package main
import (
"crypto/md5"
"errors"
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"sync"
"time"
)
var directory, file *string
var merge *bool
var limit *int
func init() {
directory = flag.String("d", "", "The directory contains all the files that need to calculate the md5 value")
file = flag.String("f", "", "The file that need to caclulate the md5 value")
merge = flag.Bool("merge", false, "Merging all md5 values to one (Folder type only)")
limit = flag.Int("max", 0, "limit the max files to caclulate.")
}
func Md5SumFile(file string) (value [md5.Size]byte, err error) {
data, err := ioutil.ReadFile(file)
if err != nil {
return
}
value = md5.Sum(data)
return
}
type result struct {
path string
md5Sum [md5.Size]byte
err error
}
func Md5SumFolder(folder string, limit int) (map[string][md5.Size]byte, error) {
returnValue := make(map[string][md5.Size]byte)
var limitChannel chan (struct{})
if limit != 0 {
limitChannel = make(chan struct{}, limit)
}
done := make(chan struct{})
defer close(done)
c := make(chan result)
errc := make(chan error, 1)
var wg sync.WaitGroup
go func() {
err := filepath.Walk(folder, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
if limit != 0 {
//如果已经满了则阻塞在这里
limitChannel <- struct{}{}
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path: path, md5Sum: md5.Sum(data), err: err}:
case <-done:
}
if limit != 0 {
//读出数据,这样就有新的文件可以处理
<-limitChannel
}
wg.Done()
}()
select {
case <-done:
return errors.New("Canceled")
default:
return nil
}
})
errc <- err
go func() {
wg.Wait()
close(c)
}()
}()
for r := range c {
if r.err != nil {
return nil, r.err
}
returnValue[r.path] = r.md5Sum
}
if err := <-errc; err != nil {
return nil, err
}
return returnValue, nil
}
func main() {
timeStart := time.Now()
flag.Parse()
if *directory == "" && *file == "" {
flag.Usage()
return
}
if *file != "" {
md5Value, err := Md5SumFile(*file)
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Printf("%x %s\n", md5Value, *file)
return
}
if *directory != "" {
result, err := Md5SumFolder(*directory, *limit)
if err != nil {
fmt.Println(err.Error())
return
}
var paths []string
for path := range result {
paths = append(paths, path)
}
sort.Strings(paths)
if *merge == true {
var md5value string
for _, path := range paths {
md5value += fmt.Sprintf("%x", result[path])
}
fmt.Printf("%x %s\n", md5.Sum([]byte(md5value)), *directory)
} else {
for _, path := range paths {
fmt.Printf("%x %s\n", result[path], path)
}
}
}
fmt.Println(time.Since(timeStart).String())
}
这就是整个的程序的执行流程,如果有任何问题,请留言告诉我或者email给我,我的个人网站jsmean.com欢迎大家访问。