DjanFey的基础库解读--io包(pipe.go)

DjanFy · · 1411 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

 

// Pipe适配器是用来连接期望一个io.Reader的代码和期望一个io.Writer的代码

package my_io

import (
   "errors"
   "sync"
)

// onceError是一个只会存储一次错误的对象
type onceError struct {
   sync.Mutex //解析见下面
   err error
}

func (a *onceError) Store(err error) {
   a.Lock()
   defer a.Unlock()
   if a.err!=nil{
      return
   }
   a.err=err
}

func (a *onceError) Load() error {
   a.Lock()
   defer a.Unlock()
   return a.err
}

// ErrClosedPipe是一个用来表示在一个已经关闭的pipe上进行读写操作的错误
var ErrClosePipe=errors.New("io: read/write on closed pipe")

// 一个pipe是PipeReader和PipeWriter潜在的共享pipe结构体
type pipe struct {
   wrMu sync.Mutex // 串行化写操作
   wrCh chan []byte
   rdCh chan int

   once sync.Once // 保护关闭操作,确保只关闭一次
   done chan struct{}
   rerr onceError
   werr onceError
}

func (p *pipe) Read(b []byte) (n int, err error) {
   select {
   case <-p.done:
      return 0,p.readCloseError()
   default:
   }
   select {
   // 从wrCh读取一个,然后拷贝到b,并向rdCh中写入拷贝的字节,返回拷贝的字节数和错误nil。如果p.done有值(读取关闭)的话,返回0和readCloseError读到的错误
   case bw:=<-p.wrCh:
      nr:=copy(b,bw)
      p.rdCh<-nr
      return nr,nil
   case <-p.done:
      return 0,p.readCloseError()
   }
}

// 如果rerr==nil而werr不为nil,则返回werr,否则返回ErrClosePipe错误
func (p *pipe) readCloseError() error {
   rerr:=p.rerr.Load()
   if werr:=p.werr.Load();rerr==nil&&werr!=nil{
      return werr
   }
   return ErrClosePipe
}

// 关闭读取
// 如果传入的错误为nil,则错误置为ErrClosePipe。否则存储err,关闭p.done通道,返回nil
func (p *pipe) CloseRead(err error) error {
   if err==nil{
      err=ErrClosePipe
   }
   p.rerr.Store(err)
   p.once.Do(func() {
      close(p.done)
   })
   return nil
}

func (p *pipe) Write(b []byte) (n int,err error) {
   select {
   case <-p.done:
      return 0,p.writeCloseError()
   default:
      p.wrMu.Lock()
      defer p.wrMu.Unlock()
   }
   for once:=true;once || len(b)>0;once=false{
      select {
      case p.wrCh <- b:
         nw:=<-p.rdCh
         b=b[nw:]
         n+=nw
      case <-p.done:
         return n,p.writeCloseError()
      }
   }
   return n,nil
}

func (p *pipe) writeCloseError() error {
   werr:=p.werr.Load()
   if rerr:=p.rerr.Load();werr==nil&&rerr!=nil{
      return rerr
   }
   return ErrClosePipe
}

func (p *pipe) CloseWrite(err error) error {
   if err==nil{
      err=EOF
   }
   p.werr.Store(err)
   p.once.Do(func() {
      close(p.done)
   })
   return nil
}

// PipeReader是一个pipe读的那一半
type PipeReader struct {
   p *pipe
}

// Read实现了标准的Read接口:
// 它从pipe中读取数据,阻塞直到一个writer到来或者写操作后面被关闭
// 如果写操作后面写操作后面关闭时有错误,那么这个错误会被返回;否则如果没有错误,那么错误将被置为EOF
func (r *PipeReader) Read(data []byte) (n int, err error) {
   return r.p.Read(data)
}

// Close关闭这个reader;接下来对这个pipe写半部分的写操作将会返回ErrClosePipe错误
func (r *PipeReader) Close() error {
   return r.CloseWithError(nil)
}

// CloseWithError关闭这个reader;接下来对这个pipe写半部分的写操作将会返回一个错误err
//
// 如果pipe已经有错误了,那么CloseWithError永远不会覆写先前的那个错误并且总是返回nil
func (r *PipeReader) CloseWithError(err error) error {
   return r.p.CloseRead(err)
}

// PipeWriter是一个pipe的写半部分
type PipeWriter struct {
   p *pipe
}

// Write实现了标准的Write接口:
// 它写入数据到pipe,然后阻塞,直到一个或更多reader消费掉所有的数据或者读最后关闭
// 如果读最后关闭时有错误,那么这个错误会被返回。否则如果没有错误的话,返回错误为ErrClosePipe
func (w *PipeWriter) Write(data []byte) (n int, err error) {
   return w.p.Write(data)
}

func (w *PipeWriter) Close() error {
   return w.CloseWithError(nil)
}

// CloseWithError关闭这个writer;接下来从这个pipe读半部分读取操作将会读取到0字节和传入的这个错误或者EOF(如果传入的错误为nil的话)
//
// 如果pipe已经有错误了的话,CloseWithError永远不会覆写先前的错误,并且总是返回nil
func (w *PipeWriter) CloseWithError(err error) error {
   return w.p.CloseWrite(err)
}


// Pipe创建了一个同步的内存中的pipe
// 它可以用来连接期望一个io.Reader的代码和期望一个io.Writer的代码
//
// 在pipe上读取和写入时一对一的关系,除非多个读需要消费一个写
// 也就是说每个对PipeWriter的写操作都会被阻塞直到满足一个或多个从PipeReader的读操作消费掉了所有已经写入的数据
// 数据直接从写拷贝到一个或多个读;内部没有缓冲区
//
// 可以并发安全地互相调用Read Write或者Close
// 单线程调用则会串行化执行
func Pipe() (*PipeReader, *PipeWriter) {
   p:=&pipe{
      wrCh:make(chan []byte),
      rdCh:make(chan int),
      done:make(chan struct{}),
   }
   return &PipeReader{p},&PipeWriter{p}
}

 


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1411 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传