每分钟百万级请求
比如这段是这样子的回调,http.HandleFunc("/", payloadHandler), 只是把一个 http 请求塞进 chan chan Job 里,可能并没有等到处理就返回给请求 200 了。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader( http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader( http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue <- work
}
w.WriteHeader( http.StatusOK)
}
到了从双层队通道取 Job 时候,处理的结果并不会通知给 http 请求方,
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
我是想实现能实时支持的方式,还把w http.ResponseWriter也套进了 Job 里面,想等真正有结果在w.Write。 结果发现接受的不到 Response.Body。应该是返回比处理要早。 所以不知道是不是这种模式 ,只能实现异步。
payLoadHandler在w.WriteHeader( http.StatusOK)之后是立即return了么?是的话正常的http请求就到此结束了吧,后边再写数据应该是会返回错误
#1
更多评论
一开始看错乐以为是每秒百万请求,想这是挑战 techempower的跑分了…………
在通道没有满的时候, JobQueue <- work 并不阻塞。
所以直接就w.WriteHeader( http.StatusOK)了。
而且不太能明白你需要干的是什么。
如果只是并发处理你的所有payload的话,不是应该用sync的waitgroup么,example就是一个类似于你的代码。
#2
这段代码,原文作者的使用场景是收集终端所提交的信息,并不需要和终端有交互。
你的是api server吗?是不是就可以在每个http请求里直接处理就可以了?
#3