我需求是写一个exporter,输出一些监控指标。通过调用多个shell命令得到返回结果,再通过golang的http.HandleFunc来实现一个http服务。
现在我通过go metricsProducer(metricsChannel)实现了并发执行,并将输出写入到metricsChannel中,但是输出的时候却被阻塞了,http服务一直无返回,以下是源码:小弟刚学go,不太明白channel以及并发这块,还请大神们多指教。
```go
var metricsChannel = make(chan string, 10)
func exec_shell(s string) (string){
cmd := exec.Command("/bin/bash", "-c", s)
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
log.Fatal(err)
}
return out.String()
}
func metricsConsumer(metricsChannel chan string, w http.ResponseWriter) {
for metrics := range metricsChannel {
fmt.Println(metrics)
io.WriteString(w, metrics)
}
defer close(metricsChannel)
}
func ExporterHandler(w http.ResponseWriter, r *http.Request){
metricsConsumer(metricsChannel, w)
}
func main(){
port := flag.String("port", "30083", "Input your exporter port")
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU())
go metricsProducer(metricsChannel)
http.HandleFunc("/metrics", ExporterHandler)
url := fmt.Sprintf("%s:%s", "127.0.0.1", *port)
fmt.Println("url=", url+"/metrics")
err := http.ListenAndServe(url, nil)
if err != nil {
log.Fatal("ListenAndServe:", err)
}
}
func metricsProducer(metricsChannel chan<- string) {
commands := [3]string{"ls", "ls -lrt", "hostname -i"}
for _, cmd := range commands {
metricsChannel <- fetchMetrics(cmd, "this is test")
}
}
func fetchMetrics(command, describe string) (string) {
metrics := fmt.Sprintf("%s", exec_shell(command))
return "#" + describe + "\n" + metrics
}
```
package main
import (
"os/exec"
"bytes"
"log"
"net/http"
"runtime"
"io/ioutil"
"io"
"fmt"
"encoding/json"
"strings"
)
type HttpHandler struct {
MetricsChannel chan string
DataChannel chan string
}
func NewHttpHandler() *HttpHandler {
return &HttpHandler{
MetricsChannel: nil,
DataChannel: nil,
}
}
func(handler *HttpHandler) ExecShell(s string) (string){
var out bytes.Buffer
cmd := exec.Command("/bin/bash", "-c", s)
cmd.Stdout = &out
if err := cmd.Run();err != nil {
return err.Error()
}
return out.String()
}
func(handler *HttpHandler) Consume(data []string) {
defer close(handler.MetricsChannel)
for _, cmd := range data {
handler.MetricsChannel <- cmd
}
}
func(handler *HttpHandler) Product() {
defer close(handler.DataChannel)
for {
cmd,ok := <- handler.MetricsChannel
if !ok {
break
}
r := handler.ExecShell(cmd)
fmt.Println("process:",cmd,r)
handler.DataChannel <- r
}
}
func(handler *HttpHandler) Process(data []string) (string, error) {
handler.MetricsChannel = make(chan string,10)
handler.DataChannel = make(chan string,10)
go handler.Consume(data)
go handler.Product()
result := make([]string,0)
for {
data,ok := <- handler.DataChannel
if !ok {
break
}
result = append(result,data)
}
return strings.Join(result,"\n"),nil
}
func(handler *HttpHandler) Metrics(w http.ResponseWriter, r *http.Request) {
var err error
body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Fatal(err)
}
response, err := handler.Process(handler.ParseRequest(body))
if err != nil {
io.WriteString(w,fmt.Sprintf("error happened:%s",err.Error()))
}
io.WriteString(w,response)
}
func (handler *HttpHandler) ParseRequest(str []byte) []string {
var data []string
json.Unmarshal(str,&data)
return data
}
func main(){
runtime.GOMAXPROCS(runtime.NumCPU())
httpHandler := NewHttpHandler()
http.HandleFunc("/metrics",httpHandler.Metrics)
err := http.ListenAndServe(":8888", nil)
if err != nil {
log.Fatal("ListenAndServe:", err)
}
}
#6
更多评论
看代码看得眼睛疼,这个必然阻塞啊。
你需要理解下 for metrics := range metricsChannel 是什么……
chan是个进程见通行的缓存。
正常情况请按 <-和 ->的方式来操作。
需要做消费者的话,应该在主进程里起一go用select来读数据。
另外不太明白你现在这个用途为什么需要用到chan.
#1
抱歉,不会排版,耽误你时间了。
非常感谢你的回复,我初衷是觉得我执行command的时候,有些command可能执行时间长,所以我想着只要command执行完了就把结果丢到chan去,然后我消费端把它消费。
#2