接着上一篇的golang分布式存储 读书笔记(1)——流操作之GetStream封装,这次要讲的是上传文件并保存,使用restful
的PUT
方法,书中封装了PutStream
结构。
接口设计
客户端上传数据时向接口服务器发送PUT
请求,请求的url
为/objects/<object_name>
。
同样接口服务器向数据服务器转发PUT
请求,请求的url
为/objects/<object_name>
。数据服务器在本地指定文件夹(D:/objects/
)下创建<object_name>
文件,将PUT
的内容写入文件中。
目录结构
在GOPATH/src
目录下,目录结构为:
go-storage
apiServer
objects
get.go
put.go
apiServer.go
dataServer
dataServer.go
数据服务器实现
dataServer代码
数据服务器的put
接口和get
接口很类似,只不过将读文件改为了写文件。
package main
import (
"net/http"
"io"
"os"
"log"
"strings"
)
const (
objectDir = "D:/objects/"
)
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
log.Println(m)
if m == http.MethodGet {
// get(w, r)
return
} else if m == http.MethodPut {
put(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}
func put(w http.ResponseWriter, r *http.Request) {
// 提取文件名
fname := strings.Split(r.URL.EscapedPath(), "/")[2]
log.Println(fname)
// 创建文件
f, e := os.Create(objectDir + fname)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer f.Close()
// 往文件写入数据
io.Copy(f, r.Body)
}
func main() {
http.HandleFunc("/objects/", Handler)
http.ListenAndServe(":8889", nil)
}
io.Copy(f, r.Body)
将r.body
的数据写入f
中,其中r.Body
实现了io.ReadCloser
接口,f
实现了io.Writer
接口。最后要记得关闭文件。
测试
使用Restlet Client
发送PUT
请求进行测试。
往http://localhost:8889/objects/1.txt
发送PUT
请求,可以看到本地确实生成了1.txt
文件。
接口服务器实现
版本一
实现PUT请求
可以使用http.NewRequest
构造一个PUT
请求,使用http.Client
构造一个客户端进行发送。
例如:
request, _ := http.NewRequest("PUT","http://127.0.0.1:8889/objects/"+object, reader)
client := http.Client{}
r, e := client.Do(request) // 发送并接受请求
完整代码如下:
package main
import (
"net/http"
"io"
"strings"
"go-storage/apiServer/objects"
"log"
)
const dataServerAddr = "http://localhost:8889/objects/"
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m == http.MethodGet {
// get(w, r)
return
} else if m == http.MethodPut {
put(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}
func put(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
object := strings.Split(r.URL.EscapedPath(), "/")[2]
request, _ := http.NewRequest("PUT",dataServerAddr + object, r.Body)
client := http.Client{}
resp, e := client.Do(request) // 发送并接受请求
if e == nil && resp.StatusCode != http.StatusOK {
w.WriteHeader(http.StatusNotFound)
log.Printf("dataServer return http code %d", resp.StatusCode)
return
}
defer resp.Body.Close()
io.Copy(w, resp.Body)
}
func main() {
http.HandleFunc("/objects/", Handler)
http.ListenAndServe(":8888", nil)
}
测试
往http://localhost:8888/objects/2.txt
发送PUT
请求,可以看到本地确实生成了2.txt
文件。
版本二——封装版本
书上的代码实现的相对复杂一点,将整个操作封装成了一个PutStream
的结构体,先看结构体的具体成员:
type PutStream struct {
writer *io.PipeWriter
c chan error
}
可以看到其中一个成员是io.PipeWriter
类型,这类似于linux
中的管道,一端写入,一读取读取,这里是为了在两个协程之间建立连接,这里使用channel
并不好使。
另一个成员是channel
类型,因为子协程不能有返回值,所以这里用通道传递错误。
PutStream
的构造函数如下:
func NewPutStream(server, object string) *PutStream {
reader, writer := io.Pipe() // 通过管道将 两个协程 联系起来 (用channel应该也可以把?)
c := make(chan error)
go func() {
request, _ := http.NewRequest("PUT", "http://"+server+"/objects/"+object, reader)
client := http.Client{}
r, e := client.Do(request) // 如果 reader一直没有数据,是不是 Do就会阻塞?
if e == nil && r.StatusCode != http.StatusOK {
e = fmt.Errorf("dataServer return http code %d", r.StatusCode)
}
c <- e
}()
return &PutStream{writer, c}
}
先使用io.Pipe()
构造一个writer
和reader
,再初始化一个通道c
,最后使用writer
和c
构造一个PutStream
指针对象返回。
中间开启了一个子协程,该协程构造一个http.NewRequest
,并使用http.Client
构造客户端发送请求。其中request
构造的时候使用了管道的读端reader
,此时该管道的读端并没有数据,但是http.NewRequest
构造request
的时候并不会阻塞,而是会阻塞在client.Do(request)
这句代码,直到管道的写端写入数据。
如果得到的响应出错了,将错误写入管道c
中。
同时这个PutStream
需要实现io.Writer
和io.Writer
接口:
// 实现了 io.Writer接口
func (w *PutStream) Write(p []byte) (n int, err error) {
return w.writer.Write(p)
}
// 关闭流并得到错误
func (w *PutStream) Close() error {
w.writer.Close() // io.PipeWriter 关闭, reader也会关闭? client.Do(request)才能结束?
return <-w.c
}
由于功能是要上传并保存一个对象,所以实现一个StoreObject
方法来调用PutStream
。
func StoreObject(r io.Reader, object string) (int, error) {
stream := NewPutStream(data_server, object)
// 会阻塞,直到r中收到 EOF,stream实现了io.Writer接口
io.Copy(stream, r) // 将r的内容拷贝的 stream 中,stream有数据的时候,他对应的reader也就有了数据
// 会阻塞到 stream中的c channel收到消息
e := stream.Close()
if e != nil {
return http.StatusInternalServerError, e
}
return http.StatusOK, nil
}
新建一个PutStream
指针类型的stream
,由于它实现了io.Writer
接口,所以可以调用io.Copy
将r
中的内容复制到stream
中,其实就是写入到管道的写端。最后关闭流,管道写端写入也就结束,读端也读取结束,子协程的发送也就结束了。
其实该版本的实现和版本一是一样的,只不过多了一个子协程,多使用了io.Pipe
管道。
看起来其实版本一更加简单、直接。暂时也看不出封装的优势,也许在后面功能越来越复杂的时候,就可以体现这个优势。个人感觉这个封装还是比较优雅。
完整代码
package objects, put.go
package objects
import (
"net/http"
"fmt"
"io"
)
type PutStream struct {
writer *io.PipeWriter
c chan error
}
const (
data_server = "127.0.0.1:8889"
)
func StoreObject(r io.Reader, object string) (int, error) {
stream := NewPutStream(data_server, object)
// 会阻塞到 r中收到 EOF stream实现了io.Writer接口
io.Copy(stream, r) // 将r的内容拷贝的 stream 中,stream有数据的时候,他对应的reader也就有了数据
// 会阻塞到 stream中的c channel收到消息
e := stream.Close()
if e != nil {
return http.StatusInternalServerError, e
}
return http.StatusOK, nil
}
func NewPutStream(server, object string) *PutStream {
reader, writer := io.Pipe() // 通过管道将 两个协程 联系起来 (用channel应该也可以把?)
c := make(chan error)
go func() {
request, _ := http.NewRequest("PUT", "http://"+server+"/objects/"+object, reader)
client := http.Client{}
r, e := client.Do(request) // 如果 reader一直没有数据,是不是 Do就会阻塞?
if e == nil && r.StatusCode != http.StatusOK {
e = fmt.Errorf("dataServer return http code %d", r.StatusCode)
}
c <- e
}()
return &PutStream{writer, c}
}
// 实现了 io.Writer接口
func (w *PutStream) Write(p []byte) (n int, err error) {
return w.writer.Write(p)
}
func (w *PutStream) Close() error {
w.writer.Close() // io.PipeWriter 关闭, reader也会关闭? client.Do(request)才能结束?
return <-w.c
}
package main, apiServer.go
package main
import (
"net/http"
"io"
"strings"
"go-storage/apiServer/objects"
"log"
)
const dataServerAddr = "http://localhost:8889/objects/"
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m == http.MethodGet {
// get(w, r)
return
} else if m == http.MethodPut {
put(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}
func put(w http.ResponseWriter, r *http.Request) {
// object 要保存的对象名
object := strings.Split(r.URL.EscapedPath(), "/")[2]
c, e := objects.StoreObject(r.Body, object)
if e != nil {
log.Println(e)
}
w.WriteHeader(c)
}
func main() {
http.HandleFunc("/objects/", Handler)
http.ListenAndServe(":8888", nil)
}
测试过程同版本一。
疑问
- 如果传输一个
4g
的文件,到底需不需要占用4g
内容?在上一篇文章中我认为这种流操作可以减小内存占用,但是现在不太确定。
参考
有疑问加站长微信联系(非本文作者)