package my_bufio import ( "bytes" "errors" "io" "unicode/utf8" ) const ( defaultBufSize = 4096 ) var ( // 非法使用UnreadByte ErrInvalidUnreadByte=errors.New("bufio: invalid use of UnreadByte") // 非法使用UnreadRune ErrInvalidUnreadRune = errors.New("bufio: invalid use of UnreadRune") // buffer已满 ErrBufferFull = errors.New("bufio: buffer full") // count为负 ErrNegativeCount = errors.New("bufio: negative count") ) // 缓冲的输入 // Reader为io.Reader对象实现了缓冲 type Reader struct { buf []byte //缓冲区 rd io.Reader // 由客户端提供reader r,w int // buf的读写位置 err error // 内部保留的错误 lastByte int // 为UnreadByte设置的上一次读取的字节;-1表示非法 lastRuneSize int //为UnreadRunes设置的上一次读取的rune大小;-1表示非法 } const minReadBufferSize = 16 const maxConsecutiveEmptyReads = 100 // NewReaderSize返回一个大小最少为指定size的新的Reader,如果参数rd已经是一个拥有更大size的Reader,它会返回潜在Reader func NewReaderSize(rd io.Reader, size int) *Reader { // rd是否已经是一个Reader b,ok:=rd.(*Reader) if ok && len(b.buf)>=size{ return b } if size<minReadBufferSize{ size=minReadBufferSize } r:=new(Reader) r.reset(make([]byte,size),rd) return r } // NewReader返回一个缓冲区为默认大小的Reader func NewReader(rd io.Reader) *Reader { return NewReaderSize(rd,defaultBufSize) } // Size返回潜在缓冲区的字节大小 func (b *Reader) Size() int { return len(b.buf) } // Reset丢弃任何缓冲的数据,重置所有的状态,并且转换缓冲的reader从r的开始读取 func (b *Reader) Reset(r io.Reader) { b.reset(b.buf,r) } func (b *Reader) reset(buf []byte, r io.Reader) { *b=Reader{ buf: buf, rd: r, lastByte: -1, lastRuneSize: -1, } } // 返回的已读取字节数为负数 var errNegativeRead = errors.New("bufio: reader returned negative count from Read") // fill把一个新的chunk(数据块)读取到缓冲区中 func (b *Reader) fill() { // 将已经存在的数据滑动到开始位置 if b.r>0{ copy(b.buf,b.buf[b.r:b.w]) b.w-=b.r b.r=0 } // 如果w大于缓冲区的长达,代表已经超过缓冲区的容量 if b.w>len(b.buf){ panic("bufio: tried to fill full buffer") } // 读取新数据:尝试有限次数 for i:=maxConsecutiveEmptyReads;i>0;i--{ n,err:=b.rd.Read(b.buf[b.w:]) if n<0{ panic(errNegativeRead) } // 先处理读取的字节,后处理错误原则 b.w+=n if err!=nil{ b.err=err return } if n>0 { return } } // 超过限制次数都读取到空数据 b.err = io.ErrNoProgress } // 获取结构体内部保存的错误后,并将结构体内的错误置为nil func (b *Reader) readErr() error { err := b.err b.err = nil return err } // Peek会返回后面的n字节而不会使reader进度向前(r增加)。Peek返回的字节切片在下次调用的时候不再有效。 // 如果Peek返回的字节数少于n,它也会返回一个错误来解释为什么少了。如果n大于Reader的缓冲区大小,返回ErrBufferFull的错误。 // 调用Peek会阻止UnreadByte或UnreadRune的调用直到下一次read操作。 func (b *Reader) Peek(n int) ([]byte, error) { if n<0 { return nil,ErrNegativeCount } b.lastByte=-1 b.lastRuneSize=-1 //如果数据不够且缓冲区不满且没有错误,就填充 for b.w-b.r<n&&b.w-b.r<len(b.buf)&&b.err==nil{ b.fill() // b.w-b.r<len(b.buf) => buffer不满 } if n>len(b.buf){ return b.buf[b.r:b.w],ErrBufferFull } // 0<n<len(b.buf) var err error if avail:=b.w-b.r;avail<n{ // 在缓冲区的数据不够 n=avail err=b.readErr() if err==nil{ err=ErrBufferFull } } return b.buf[b.r:b.r+n],err } // Discard跳过接下来的n个字节,返回丢弃的字节数 // 如果丢弃的字节数少于n,也会返回一个错误 // 如果0<=n<=b.Buffered(),Discard不需要从潜在的io.Reader读取就可以保证成功 func (b *Reader) Discard(n int) (discard int, err error) { if n<0 { return 0,ErrNegativeCount } if n==0 { return } remain:=n for { skip:=b.Buffered() if skip==0{ b.fill() skip=b.Buffered() } if skip > remain{ skip=remain } b.r+=skip remain-=skip if remain==0{ return n,nil } if b.err!=nil{ return n-remain,b.readErr() } } } // Read把数据读取到p中 // 它返回读取到p中的字节数 // 在潜在的Reader上,bytes来自最多一个Read,因此n可能小于len(p) // 要精确读取len(p)的bytes,使用io.ReadFull(b,p) // 在EOF,数量为0,err为io.EOF func (b *Reader) Read(p []byte) (n int, err error) { n = len(p) // 如果p的长度为0, if n==0{ if b.Buffered()>0{ return 0,nil } return 0,b.readErr() } // 如果缓冲区已经没有缓冲的数据了 if b.r==b.w{ // 如果有错误,就返回 if b.err!=nil{ return 0,b.readErr() } if len(p)>=len(b.buf){ // 如果要读取到的p大于缓冲区,空的缓冲区,则直接读取到p避免拷贝 n,b.err=b.rd.Read(p) if n<0{ panic(ErrNegativeCount) } if n>0{ b.lastByte = int(p[n-1]) b.lastRuneSize = -1 } return n,b.readErr() } // 一次读取 // 不要使用b.fill,那将会造成循环 // 表示缓冲区没有可读的了,就把已读和已写置为0,填充缓冲区,再把已写置为填充数,(相当于fill),至于为什么b.fill会造成循环,不知道。。。 b.r=0 b.w=0 n,b.err=b.rd.Read(b.buf) if n<0 { panic(ErrNegativeCount) } if n==0{ return 0,b.readErr() } b.w+=n } // 尽可能多的拷贝 n=copy(p,b.buf[b.r:b.w]) b.r+=n b.lastByte=int(b.buf[b.r-1]) b.lastRuneSize=-1 return n,nil } // ReadByte读取并返回一个单字节 // 如果没有字节可以获得,返回错误 func (b *Reader) ReadByte() (byte, error) { b.lastRuneSize=-1 for b.r==b.w{ if b.err!=nil{ return 0,b.readErr() } b.fill() // 缓冲区为空 } c:=b.buf[b.r] b.r++ b.lastByte=int(c) return c,nil } // UnreadByte将将最后读取的一个字节设为未读。只有最近读过的字节可以回退。 // 如果最近Reader调用的方法不是读取操作UnreadByte将会返回一个错误。注意Peek不是读取操作 func (b *Reader) UnreadByte() error { // 如果上一次不是读取操作 || 还没有读取过并且有可以读取的 if b.lastByte<0 || b.r==0&&b.w>0{ return ErrInvalidUnreadByte } // b.r>0 || b.w==0 if b.r>0{ b.r-- }else { // b.r==0 && b.w==0 b.w=1 } b.buf[b.r]=byte(b.lastByte) b.lastByte=-1 b.lastRuneSize=-1 return nil } // ReadRune读取一个单个UTF-8编码的unicode字符并且返回这个字符和它所占字节数。如果这个字符是非法的, // 它消费1个字节并返回占用1字节的unicode.ReplacementChar(U+FFFD) func (b *Reader) ReadRune() (r rune, size int, err error) { // 如果未读字节不足于容纳一个最大字符字节 未读部分第一个字符不是完整编码 没有错误 缓冲区未满 都满足的情况下就填充 for b.r+utf8.UTFMax>b.w && !utf8.FullRune(b.buf[b.r:b.w])&&b.err == nil && b.w - b.r<len(b.buf){ b.fill() } b.lastRuneSize=-1 if b.r==b.w{ return 0,0,b.readErr() } r,size = rune(b.buf[b.r]),1 if r>=utf8.RuneSelf{ r,size=utf8.DecodeRune(b.buf[b.r:b.w]) } b.r+=size b.lastByte=int(b.buf[b.r-1]) b.lastRuneSize=size return r,size,nil } // UnreadRune回退最后一个读取的字符。如果最近在Reader上调用的方法不是ReadRune,UnreadRune返回一个错误。 //(这比UnreadByte严格,UnreadByte只要求最近是读操作) func (b *Reader) UnreadRune() error { if b.lastRuneSize<0 || b.r < b.lastRuneSize { return ErrInvalidUnreadRune } b.r -= b.lastRuneSize b.lastByte =-1 b.lastRuneSize = -1 return nil } // Buffered返回能从目前的缓冲区读取到的字节数 func (b *Reader) Buffered() int { return b.w - b.r } // ReadSlice从输入中读取数据知道遇到第一个delim,返回指向缓冲区中字节的切片。 // 在下次读取时,缓冲区的数据不再有效 // 如果读取过程中在遇到delim之前就发生错误,则返回缓冲区中所有的数据和遇到的错误(经常是io.EOF) // 如果缓冲区满了也没有读取到delim,则ReadSlice将会以ErrBufferFull失败. // 因为从ReadSlice返回的数据在下次I/O操作时会被复写,所以大多数客户端应该使用ReadBytes或者ReadString来代替. // 只有当读取的行不是以delim结尾的,ReadSlice才会返回err!=nil func (b *Reader) ReadSlice(delim byte) (line []byte, err error) { s:=0 // 搜索开始索引 for { // 搜索缓冲区 // 如果搜索到了 if i:=bytes.IndexByte(b.buf[b.r+s:b.w],delim);i>=0{ i+=s line=b.buf[b.r:b.r+i+1] b.r+=i+1 break } // 如果没有搜索到 // 是否有挂起的error? // 如果有,则返回所有缓冲的数据和错误 if b.err!=nil{ line=b.buf[b.r:b.w] b.r=b.w err=b.readErr() break } // Buffer是否已经满了? // 将整个缓冲区数据返回 if b.Buffered() >= len(b.buf){ b.r=b.w line=b.buf err=ErrBufferFull break } s=b.w-b.r // 不要重新扫描之前我们已经扫描过的部分 b.fill() // 将缓冲区填满 } // 处理最后的字节,如果有的话。 if i:=len(line)-1;i>=0{ b.lastByte=int(line[i]) b.lastRuneSize=-1 } return } // ReadLine是一个低级的原始行读取。大多数调用者应该使用ReadBytes('\n')或者ReadString('\n')而不是使用一个Scanner // ReadLine尝试返回一个单行,但不包括行尾字节。 // 如果一行对于缓冲区来说太长,然后isPrefix就会被设置,然后此行开始部分被返回。剩下的部分将会在下次调用时返回。 // 当此行最后一部分被返回后,isPrefix会被置为true // 返回的缓冲区只有在下一次调用ReadLine前有效 // ReadLine要么返回一个非空的行,要么返回一个错误,不会同时返回这两个。 // // 从ReadLine返回的文本不会包括行结尾("\r\n"或者"\n") // 如果输入没有以最终的行结尾,则不会有提示和错误给出。 // 在ReadLine后调用UnreadByte将总是回退读取的最后一个字节(可能是行结尾字符),即使那个字符不是ReadLine返回行的一部分 func (b *Reader) ReadLine() (line []byte, isPrefix bool, err error) { line,err=b.ReadSlice('\n') if err==ErrBufferFull{ // 处理'\r\n'跨越两个缓冲区的情况 if len(line)>0&&line[len(line)-1]=='\r'{ // 把'\r'放在buf的后面,并把'\r'从line中去掉,让下一次调用ReadLine去检查'\r\n' if b.r==0{ // 应该是不会遇到的 panic("bufio: tried to rewind past start of buffer") } b.r-- line=line[:len(line)-1] } return line,true,nil } if len(line)==0{ if err!=nil{ line=nil } return } err=nil if line[len(line)-1]=='\n'{ drop:=1 if len(line)>1&&line[len(line)-2]=='\r'{ drop=2 } line=line[:len(line)-drop] } return } // ReadBytes读取到输入的分隔符位置,返回读取到的包括分隔符在内的数据 // 如果ReadBytes在遇到分隔符之前遇到一个错误,在错误之前会返回数据,并返回错误和自身错误(经常是io.EOF) // 只有在返回的数据不包含分隔符时才会返回错误 func (b *Reader) ReadBytes(delim byte) ([]byte, error) { // 用ReadSlice去寻找数组,积累至缓冲区满 var frag []byte var full [][]byte var err error for { var e error frag,e=b.ReadSlice(delim) if e==nil{ // 获取到最终片段 break } if e!=ErrBufferFull{ // 不是期望的错误 err=e break } // 建立一个缓冲区的副本 buf:=make([]byte,len(frag)) copy(buf,frag) full=append(full,buf) } // 分配一个新的缓冲区去持有完整的碎片和片段 n:=0 for i:=range full{ n+=len(full[i]) } n+=len(frag) // 把完整的片段拷贝进来 buf:=make([]byte,n) n=0 for i:=range full{ n+=copy(buf[n:],full[i]) } copy(buf[n:],frag) return buf,err } // ReadString从输入中读取分隔符之前的数据,返回一个包含分隔符的字符串。 // 如果ReadString在读到分隔符之前发生了错误,则返回之前读到的数据和遇到的错误和自身错误(经常是io.EOF) // ReadString只有在返回的数据不是以分隔符结尾的时候,才返回错误 func (b *Reader) ReadString(delim byte) (string,error) { bytes,err:=b.ReadBytes(delim) return string(bytes),err } // WriteTo实现了io.WriteTo接口 // 这可能对潜在的Reader的Read方法进行多次调用 // 如果潜在的Reader纸质WriteTo方法,这会直接调用潜在的WriteTo而不会缓冲 func (b *Reader) WriteTo(w io.Writer) (n int64, err error) { // 首先把缓冲区的数据写入w n,err=b.writeBuf(w) if err!=nil{ return } // 如果b潜在的Reader实现了WriterTo接口,那么久调用WriteTo方法将潜在Reader的数据全部写入w,然后返回 if r,ok:=b.rd.(io.WriterTo);ok{ m,err:=r.WriteTo(w) n+=m return n,err } // 如果w实现了ReaderFrom接口,那么久调用ReadFrom从潜在Reader中读取全部读取出来,然后返回 if w,ok:=w.(io.ReaderFrom);ok{ m,err:=w.ReadFrom(b.rd) n+=m return n,err } // 如果既没有实现WriterTo接口,也没有实现ReaderFrom接口,那么久只能不断将缓冲区数据写入w,然后填充缓冲区,然后继续写入,如此循环,知道全部数据写入w // 如果缓冲区未满,填充缓冲区 if b.w-b.r<len(b.buf){ b.fill() } // 循环写入数据到w for b.r<b.w{ m,err:=b.writeBuf(w) n+=m if err!=nil{ return n,err } b.fill() } // 如果数据全部写完,将错误置为nil if b.err == io.EOF{ b.err=nil } return n,b.readErr() } // 写入数据返回的字节数为负数的错误 var ErrNegativeWrite=errors.New("bufio: writer returned negative count from Write") // writeBuf把Reader中缓冲区的数据写入到writer中 func (b *Reader) writeBuf(w io.Writer) (int64, error) { n,err:=w.Write(b.buf[b.r:b.w]) if n<0 { panic(ErrNegativeWrite) } b.r+=n return int64(n),err } // 输出缓冲 // Writer为io.Writer对象实现了缓冲 // 当向一个Writer写入数据过程中发生了一个错误,那么将不会再接收任何数据,并且接下来的Writes,Flush都会返回这个错误 // 当所有数据都被写入Writer后,客户端应该调用Flush方法已保证所有数据都已经被送往了潜在的io.Writer type Writer struct { err error buf []byte n int // 已缓冲的字节数(可写数据) wr io.Writer } // NewWriterSize返回一个具有有至少指定大小的缓冲区的Writer // 如果参数w已经是一个拥有更大size缓冲区的Writer,那么就返回这个潜在的Writer func NewWriterSize(w io.Writer, size int) *Writer { // 判断w是否已经是一个Writer了 b,ok:=w.(*Writer) if ok&&len(b.buf)>=size{ return b } if size<=0{ size=defaultBufSize } return &Writer{ buf: make([]byte,size), wr: w, } } // NewWriter返回一个拥有默认大小缓冲区的新Writer func NewWriter(w io.Writer) *Writer { return NewWriterSize(w,defaultBufSize) } // Size返回潜在buffer的大小 func (b *Writer) Size() int { return len(b.buf) } // Reset丢弃任何未flush的缓冲数据,清空所有的错误,并将底层的io.Writer重置为w func (b *Writer) Reset(w io.Writer) { b.err=nil b.n=0 b.wr=w } // Flush把任何已经缓冲的数据写入到潜在io.Writer中去 func (b *Writer) Flush() error { // 如果已经有错误就直接返回已经存在的错误 if b.err!=nil{ return b.err } // 如果没有可写的数据,那么直接返回 if b.n==0{ return nil } // 尝试将可写的数据全部写入b.wr n,err:=b.wr.Write(b.buf[0:b.n]) // 如果没有将可写的数据全部写入并且没有错误的话,将错误置为io.ErrShortWrite if n<b.n&&err==nil{ err=io.ErrShortWrite } // 如果有错误的话 if err!=nil{ // 如果已经写入了数据,但是没有写完,那么将没写完的数据拷贝到buf的头部,并将未写入数据置为可写数据n if n>0&&n<b.n{ copy(b.buf[0:b.n-n],b.buf[n:b.n]) } b.n-=n b.err=err return err } // 如果没有错误的话,代表缓冲区数据全部写入了b.wr,将可写数据置为0 b.n=0 return nil } // Available返回缓冲区未使用的字节数 func (b *Writer) Available() int { return len(b.buf)-b.n } // Buffered返回已经写入到现有buf的字节数 func (b *Writer) Buffered() int { return b.n } // Write将p中的数据写入b的buf // 它返回写入到buf中的字节数 // 如果nn<len(p),那么它也会返回一个错误来解释为什么写入的数据少于len(p) func (b *Writer) Write(p []byte) (nn int, err error) { // 如果p大于缓冲区可用空间并且没有错误 for len(p)>b.Available()&&b.err==nil{ var n int // 如果以缓冲数据为0(也就是说p比整个缓冲区还要大),那么就不通过缓冲区直接写入到底层的wr if b.Buffered()==0{ n,err =b.wr.Write(p) }else { // 否则的话讲缓冲区填满,然后一次性执行Flush n=copy(b.buf[b.n:],p) b.n+=n b.Flush() } nn+=n p=p[n:] } if b.err!=nil{ return nn,b.err } n:=copy(b.buf[b.n:],p) b.n+=n nn+=n return nn,nil } // WriteByte写入一个单字节 func (b *Writer) WriteByte(c byte) error { // 如果有错误就直接返回 if b.err!=nil{ return b.err } // 如果缓冲区已经满了,就执行Flush,如果Flush出错,就直接返回 if b.Available()<=0&&b.Flush()!=nil{ return b.err } b.buf[b.n]=c b.n++ return nil } // WriteRune写入一个unicode字符,返回写入的字节数和任何错误 func (b *Writer) WriteRune(r rune) (size int, err error) { // 如果r是一个单字节,那么就直接调用WriteByte写入 if r<utf8.RuneSelf{ err=b.WriteByte(byte(r)) if err!=nil{ return 0,err } return 1,nil } if b.err!=nil{ return 0,err } // 如果buffer剩余空间不足以容纳一个最大utf8字符字节,那么先Flush腾出空间,如果整个缓冲区都容纳不下最大utf8字符字节,那么就把rune转换成字符串调用WriteString方法写入 n:=b.Available() if n<utf8.UTFMax{ if b.Flush();b.err!=nil{ return 0,b.err } n=b.Available() if n<utf8.UTFMax{ // 只有当buffer真特么太小了才会发生 return b.WriteString(string(r)) } } // 如果剩余空间可以容纳最大utf8字符字节,那么就调用utf8.EncodeRune写入 size=utf8.EncodeRune(b.buf[b.n:],r) b.n+=size return size,nil } // WriteString写入一个字符串 // 它返回已经写入的字节数 // 如果返回的已写入的字节数少于len(s),那么它也会返回一个错误来解释为什么少了 func (b *Writer) WriteString(s string) (int, error) { nn:=0 for len(s)>b.Available()&&b.err!=nil{ n:=copy(b.buf[b.n:],s) b.n+=n nn+=n s=s[n:] b.Flush() } if b.err!=nil{ return nn,b.err } n:=copy(b.buf[b.n:],s) b.n+=n nn+=n return nn,nil } // ReadFrom实现了io.ReaderFrom接口 // 如果底层的writer支持WriteFrom方法,并且b的缓冲区还没有数据,那么就不适用缓冲区而直接调用底层的ReadFrom方法 func (b *Writer) ReadFrom(r io.Reader) (n int64, err error) { // 先检查本身有没有错误,有就直接返回 if b.err!=nil{ return 0,b.err } // 如果底层的writer支持WriteFrom方法,并且b的缓冲区还没有数据,那么就不适用缓冲区而直接调用底层的ReadFrom方法 if b.Buffered()==0{ if w,ok:=b.wr.(io.ReaderFrom);ok{ n,err=w.ReadFrom(r) b.err=err return n,err } } // m是从r中读取的字节数 var m int for { // 如果缓冲区已经满了,就先调用Flush,如果Flush出错就直接返回 if b.Available()==0{ if err1:=b.Flush();err1!=nil{ return n,err1 } } // nr记录循环读取空数据的次数 nr:=0 for nr<maxConsecutiveEmptyReads{ m,err=r.Read(b.buf[b.n:]) // 如果读取到了数据或者读取出错,就跳出循环 if m!=0 ||err!=nil{ break } nr++ } if nr==maxConsecutiveEmptyReads{ return n,io.ErrNoProgress } b.n+=m n+=int64(m) if err!=nil{ break } } if err==io.EOF{ // 如果我们之前确实填充过缓冲区,那么要抢先Flush if b.Available()==0{ err=b.Flush() }else { err=nil } } return n,err } // 带缓冲的输入和输出 // ReadWriter存储着Reader和Writer的指针 // 它实现了io.ReadWriter接口 type ReadWriter struct { *Reader *Writer } // NewReadWriter配置一个新的ReadWriter用来调度r和w func NewReadWriter(r *Reader, w *Writer) *ReadWriter { return &ReadWriter{ Reader: r, Writer: w, } }
有疑问加站长微信联系(非本文作者)