// Pipe适配器是用来连接期望一个io.Reader的代码和期望一个io.Writer的代码 package my_io import ( "errors" "sync" ) // onceError是一个只会存储一次错误的对象 type onceError struct { sync.Mutex //解析见下面 err error } func (a *onceError) Store(err error) { a.Lock() defer a.Unlock() if a.err!=nil{ return } a.err=err } func (a *onceError) Load() error { a.Lock() defer a.Unlock() return a.err } // ErrClosedPipe是一个用来表示在一个已经关闭的pipe上进行读写操作的错误 var ErrClosePipe=errors.New("io: read/write on closed pipe") // 一个pipe是PipeReader和PipeWriter潜在的共享pipe结构体 type pipe struct { wrMu sync.Mutex // 串行化写操作 wrCh chan []byte rdCh chan int once sync.Once // 保护关闭操作,确保只关闭一次 done chan struct{} rerr onceError werr onceError } func (p *pipe) Read(b []byte) (n int, err error) { select { case <-p.done: return 0,p.readCloseError() default: } select { // 从wrCh读取一个,然后拷贝到b,并向rdCh中写入拷贝的字节,返回拷贝的字节数和错误nil。如果p.done有值(读取关闭)的话,返回0和readCloseError读到的错误 case bw:=<-p.wrCh: nr:=copy(b,bw) p.rdCh<-nr return nr,nil case <-p.done: return 0,p.readCloseError() } } // 如果rerr==nil而werr不为nil,则返回werr,否则返回ErrClosePipe错误 func (p *pipe) readCloseError() error { rerr:=p.rerr.Load() if werr:=p.werr.Load();rerr==nil&&werr!=nil{ return werr } return ErrClosePipe } // 关闭读取 // 如果传入的错误为nil,则错误置为ErrClosePipe。否则存储err,关闭p.done通道,返回nil func (p *pipe) CloseRead(err error) error { if err==nil{ err=ErrClosePipe } p.rerr.Store(err) p.once.Do(func() { close(p.done) }) return nil } func (p *pipe) Write(b []byte) (n int,err error) { select { case <-p.done: return 0,p.writeCloseError() default: p.wrMu.Lock() defer p.wrMu.Unlock() } for once:=true;once || len(b)>0;once=false{ select { case p.wrCh <- b: nw:=<-p.rdCh b=b[nw:] n+=nw case <-p.done: return n,p.writeCloseError() } } return n,nil } func (p *pipe) writeCloseError() error { werr:=p.werr.Load() if rerr:=p.rerr.Load();werr==nil&&rerr!=nil{ return rerr } return ErrClosePipe } func (p *pipe) CloseWrite(err error) error { if err==nil{ err=EOF } p.werr.Store(err) p.once.Do(func() { close(p.done) }) return nil } // PipeReader是一个pipe读的那一半 type PipeReader struct { p *pipe } // Read实现了标准的Read接口: // 它从pipe中读取数据,阻塞直到一个writer到来或者写操作后面被关闭 // 如果写操作后面写操作后面关闭时有错误,那么这个错误会被返回;否则如果没有错误,那么错误将被置为EOF func (r *PipeReader) Read(data []byte) (n int, err error) { return r.p.Read(data) } // Close关闭这个reader;接下来对这个pipe写半部分的写操作将会返回ErrClosePipe错误 func (r *PipeReader) Close() error { return r.CloseWithError(nil) } // CloseWithError关闭这个reader;接下来对这个pipe写半部分的写操作将会返回一个错误err // // 如果pipe已经有错误了,那么CloseWithError永远不会覆写先前的那个错误并且总是返回nil func (r *PipeReader) CloseWithError(err error) error { return r.p.CloseRead(err) } // PipeWriter是一个pipe的写半部分 type PipeWriter struct { p *pipe } // Write实现了标准的Write接口: // 它写入数据到pipe,然后阻塞,直到一个或更多reader消费掉所有的数据或者读最后关闭 // 如果读最后关闭时有错误,那么这个错误会被返回。否则如果没有错误的话,返回错误为ErrClosePipe func (w *PipeWriter) Write(data []byte) (n int, err error) { return w.p.Write(data) } func (w *PipeWriter) Close() error { return w.CloseWithError(nil) } // CloseWithError关闭这个writer;接下来从这个pipe读半部分读取操作将会读取到0字节和传入的这个错误或者EOF(如果传入的错误为nil的话) // // 如果pipe已经有错误了的话,CloseWithError永远不会覆写先前的错误,并且总是返回nil func (w *PipeWriter) CloseWithError(err error) error { return w.p.CloseWrite(err) } // Pipe创建了一个同步的内存中的pipe // 它可以用来连接期望一个io.Reader的代码和期望一个io.Writer的代码 // // 在pipe上读取和写入时一对一的关系,除非多个读需要消费一个写 // 也就是说每个对PipeWriter的写操作都会被阻塞直到满足一个或多个从PipeReader的读操作消费掉了所有已经写入的数据 // 数据直接从写拷贝到一个或多个读;内部没有缓冲区 // // 可以并发安全地互相调用Read Write或者Close // 单线程调用则会串行化执行 func Pipe() (*PipeReader, *PipeWriter) { p:=&pipe{ wrCh:make(chan []byte), rdCh:make(chan int), done:make(chan struct{}), } return &PipeReader{p},&PipeWriter{p} }
有疑问加站长微信联系(非本文作者)