Reading file concurrently using bufio.Scanner, need help to improve performance.

xuanbao · · 763 次点击    
这是一个分享于 的资源,其中的信息可能已经有所发展或是发生改变。
<p>I am trying to convert the below program which doesn&#39;t use concurrency to read a nearly 3MB text file.</p> <p>I want to read all words, get Upper case letters and print them on the screen.</p> <p>Below in the program the doesn&#39;t use concurrency:</p> <pre><code>package main import ( &#34;bufio&#34; &#34;fmt&#34; &#34;os&#34; &#34;time&#34; &#34;unicode&#34; ) func main() { start := time.Now().UnixNano() f, err := os.Open(&#34;./output.txt&#34;) defer f.Close() if err != nil { panic(err) } scanner := bufio.NewScanner(f) scanner.Split(bufio.ScanWords) done := make(chan bool) word := make(chan string) upperCaseWordChan := make(chan string) go func(word chan string, done chan bool) { for scanner.Scan() { word &lt;- scanner.Text() } done &lt;- true }(word, done) go func(word chan string, upperCaseWordChan chan string) { for data := range word { if unicode.IsUpper(rune(data[0])) { upperCaseWordChan &lt;- data } } }(word, upperCaseWordChan) go func(upperCaseWordChan chan string) { for data := range upperCaseWordChan { fmt.Println(data) } done &lt;- true }(upperCaseWordChan) &lt;-done end := time.Now().UnixNano() fmt.Println((end - start) / int64(time.Millisecond)) } </code></pre> <p>I converted above program to work with goroutines and channels:</p> <ol> <li>Get Upper Case words and put them into channel - 1st GO Routine.</li> <li>Print them by calling receive on channel. - 2nd GO Routine.</li> </ol> <p>I am not getting more than 50-100 ms of change in time.</p> <p>How can I reduce time reading concurrently by making any changes in below program ?</p> <p>Kindly help.</p> <pre><code>package main import ( &#34;bufio&#34; &#34;fmt&#34; &#34;os&#34; &#34;runtime&#34; &#34;time&#34; &#34;unicode&#34; ) var scanner *bufio.Scanner func main() { start := time.Now().UnixNano() runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) f, err := os.Open(&#34;./output.txt&#34;) defer f.Close() if err != nil { panic(err) } scanner = bufio.NewScanner(f) scanner.Split(bufio.ScanWords) done := make(chan bool) word := make(chan string) go getWords(word, done) go printer(word) &lt;-done end := time.Now().UnixNano() fmt.Println((end - start) / int64(time.Millisecond)) } func getWords(word chan string, done chan bool) { for scanner.Scan() { data := scanner.Text() if unicode.IsUpper(rune(data[0])) { word &lt;- data } } close(word) done &lt;- true } func printer(word &lt;-chan string) { for data := range word { fmt.Println(data) } } </code></pre> <hr/>**评论:**<br/><br/>TheMerovius: <pre><p>Are you sure that your program is CPU-constrained? Because I <em>highly</em> doubt that it is (i.e. I consider it basically impossible). If it isn&#39;t CPU constrained, making it concurrent won&#39;t get you anywhere. In fact, it is probably slowing you down, as you are now adding a whole lot of synchronization overhead to something that doesn&#39;t actually need it.</p> <p>Indeed, I get exactly the same time as with your program, if I just use this simple one:</p> <pre><code>func main() { start := time.Now() f, err := os.Open(&#34;pg10.txt&#34;) if err != nil { log.Fatal(err) } defer f.Close() s := bufio.NewScanner(f) s.Split(bufio.ScanWords) for s.Scan() { word := s.Bytes() r, _ := utf8.DecodeRune(word) if unicode.IsUpper(r) { fmt.Println(string(word)) } } if err := s.Err(); err != nil { log.Fatal(err) } end := time.Now() log.Println(end.Sub(start)) } </code></pre> <p>What makes your program &#34;slow&#34; isn&#39;t your CPU (the task you are making it do is far too trivial for that) but your I/O. I got a little bit of speedup with this:</p> <pre><code>start := time.Now() content, err := ioutil.ReadFile(&#34;pg10.txt&#34;) if err != nil { log.Fatal(err) } w := bufio.NewWriter(os.Stdout) s := bufio.NewScanner(bytes.NewReader(content)) s.Split(bufio.ScanWords) for s.Scan() { word := s.Bytes() r, _ := utf8.DecodeRune(word) if unicode.IsUpper(r) { fmt.Fprintln(w, string(word)) } } if err := s.Err(); err != nil { log.Fatal(err) } w.Flush() end := time.Now() log.Println(end.Sub(start)) </code></pre> <p>i.e. by a) reading the file into memory in one go, which saves syscalls and makes reads non-blocking (gives a tiny boost) and b) using a buffered writer for output, instead of dumping directly to the console, which gave a significant boost.</p> <p>Concurrency isn&#39;t a magic wand that makes things go faster - if your code is too slow, you should figure out where the bottlenecks are. Use top or the like to figure out the CPU usage - adding concurrency will only help you, if the CPU usage is less than N*100% (with N being the number of cores in your <del>program</del>computer). Use dstat or the like to figure out whether I/O is your bottleneck (it more than likely is) and if so, try improving the I/O patterns of your program. But don&#39;t just guess what&#39;s going on (or guess better) :)</p></pre>StoicalSayWhat: <pre><p>Thank you very much TheMerovius. I realise now, many I/O calls are taking time and writing buffered also helps instead of writing to console. This helps.</p></pre>StoicalSayWhat: <pre><p>Using:</p> <pre><code>bufio.NewWriter(os.Stdout) </code></pre> <p>did help, I replaced fmt.Println with fmt.Fprintln and I saw like 300ms improvement.</p> <p>Also I am not sure but I might be able to improve further if I read lines instead of words, split the spaces and filter out upper case words at this point itself instead of using bufio.ScanWords. Will try this.</p> <p>Thank you.</p></pre>yannisl: <pre><p>I get a big difference in performance, if I put the <code>fmt.Fprintln()</code> outside the <code>scan</code> loop, and concat the words in the loop. </p></pre>Emacs24: <pre><p>File reading in Go is synchronous, so the proper word here is parallelism.</p> <p>Something like this should work:</p> <ol> <li>We have 8 core CPU, has 8 goroutines and thus will try to read in 8 threads in parallel.</li> <li>We have a file and can find its length.</li> <li>We can split a file (logically) into 8 equal pieces (±1 byte)</li> <li>But the split should not split words, so actual piece should start after separator which should be found before. <a href="https://imgur.com/a/4QIxn" rel="nofollow">https://imgur.com/a/4QIxn</a> <ol> <li>First thread (goroutine) just starts over its chunk.</li> <li>Other threads (goroutines) look for the first separator, save its position somewhere for the &#34;previous&#34; thread (goroutine) and start scanning over.</li> <li>The last thread (goroutine) just exits when reached the end of its chunk</li> <li>Other goroutines checks where the next goroutine started its chunk and read until it.</li> </ol></li> </ol> <p>This will require a custom complex reader with shared context to store bounds with synchronization primitives underneath. Meh.</p> <p>The other approach may be an introduction of two steps:</p> <ol> <li>Find chunk bounds</li> <li>Run threads (goroutines) on these chunks</li> </ol> <p>In this case you will be OK with <a href="https://golang.org/pkg/io/#LimitedReader" rel="nofollow">LimitedReader</a> and <a href="https://golang.org/pkg/os/#File.Seek" rel="nofollow">Seek</a></p> <p><strong>PS</strong> I have yet to see a real need for such an approach. The parallelization is usually done at the process level, where independent processes read their own files.</p> <p><strong>PPS</strong> I believe sending data over channel will eliminate any acceleration you would get with parallel algorithm</p> <p><strong>PPPS</strong> <code>r, _ := utf8.DecodeRune(word)</code> this can be too slow because of GC.</p></pre>Rudd-X: <pre><p>This isn&#39;t concurrency-safe:</p> <pre><code>go func(word chan string, done chan bool) { for scanner.Scan() { word &lt;- scanner.Text() } done &lt;- true }(word, done) </code></pre></pre>

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

763 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传