我在不同公司从事反爬虫、反病毒、反恶意程序已经有15年了,我知道,由于每天需要处理和应对的大量数据,这些系统最终会因此变得十分复杂。
目前我是smsjunk.com的CEO以及KnowBe4的首席架构师,两家公司都是活跃与网络安全行业。
有趣的是在过去10年作为一名软件工程师,几乎所有我参与的后端开发项目里面都是用Ruby on Rails来完成的。可是你不要误会,我热爱Ruby on Rails并且我认为它是一个非常出色的开发环境,但当你用ruby的思路在设计和开发系统一段时间以后,你往往会忘记,其实你还可以利用多线程,并行化,高速执行以及更小的内存开销来开发系统。我是一名c/c++,Delphi以及c#的开发人员已经很多年了,然后我开始慢慢意识到,使用合适的工具让系统变得更加简单明了才是一件正确的事情。
编程界对于编程语言以及框架的争论从未停歇,而我并不想参与到其中去。我相信效率高低,生产力大小以及代码的可维护性很大一部分取决于你所设计的架构是否足够简单。
要解决的问题
当我们开发一个匿名遥测以及数据分析系统的时候,其中一个需求是能够处理和应付百万数量级的POST请求,网络请求处理器会接收一个POST过来JSON,这个JSON里面会包含许多需要写入到Amazon S3的数据集合,以便我们的map-reduce系统可以在后续来处理这些数据。
一般情况下我们会考虑构建一个worker分层的结构,并且利用一些中间件,例如:
- Sidekiq
- Resque
- DelayedJob
- Elasticbeanstalk Worker Tier
- RabbitMQ
- 等等..
然后设立两个不同的集群,一个是给web客户端,另一个是给worker,然后我们可以将worker扩容到我们处理业务时所需要的数量。
但在最开始的时候,我们的团队就意识到可以用Go来实现所有这些,因为在讨论期间我们认为这将会是一个非常高访问量的系统。我利用Go来开发也已经有两年了,用它来开发过一些系统,但是负载规模远没有此次的需求这么大。
我们先定义一些struct来规定我们POST接收的请求体,以及定义一个上传到S3 bucket的方法UploadToS3
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
复制代码
幼稚地使用Go runtines
最开始的时候我们非常天真地实现一个POST的钩子方法如下,只是简单地将每个请求体的上传动作放到Go rutinues中让他们并行执行:
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
复制代码
在中等规模的负载情况下,这种方法对大部分人都是没有问题的,但在应对更大规模的请求量时候,我们很快就招架不住了。当我们把这个版本的代码部署到生产环境以后,我们期待能有大量的请求进来但实际还不能达到百万级别的数量级。我们完全低估了这个系统要处理的流量数。
但不管怎么说上面的方法都是欠妥的。因为它没有任何方法让我们去控制Go runtinues启动的数量。所以当我们的系统在面对每分钟百万级POST请求的时候很快就垮掉了。
再战
我们需要找到另外的方法。在一开始我们就在讨论如何让我们的请求处理程序的生命周期尽可能地缩短以及上传到S3的操作能在后台或者异步运行。当然,在Ruby on Rails里面你必须这么做,否则你将会阻塞到所有其他的网络请求处理程序。无论您使用的是美洲狮,独角兽还是过路人(请不要参与JRuby讨论)。然后我们想到使用消息队列这种比较常见的方法来处理来达到我们的目的,例如Resque, Sidekiq, SQS等等,还有数不清的工具因为实在有太多方法来实现这个功能。
所以在第二次迭代的时候,我们需要创建一个缓冲队列,我们会将任务放入队列里面然后再一个个地上传到S3上,但由于我们希望达到能够控制这个队列的最大容量的目的,并且我们有足够的RAM来允许我们将请求体储存到内存当中,所以我们认为直接使用了Go提供的channel,然后将我们的请求直接入队到channel中处理就可以了。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
复制代码
我们会从channel中获取任务并且执行他们的上传操作
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
复制代码
但说句老实话,我并不知道这是在干嘛。肯定是因为那时已经太晚还有我们已经喝了太多的红牛。????????
这个改动并没有让我们的困境得到任何改善,我们将并发任务放到了队列中执行仅仅是看上去好像解决了问题。但是我们的异步程序一次只会上传一个请求体到S3上面,但是我们的请求数此时远远大于我们上传到S3的数量,可想而知我们的缓冲队列很快就到达了他的极限爆满了,然后它阻挡了其他网络请求的入队操作。
相当于我们仅仅回避了问题,并且让我们的系统的崩溃时间进入了倒数。我们这个缺陷的版本发布以后,整个系统的延迟率在持续性地每分钟在上涨。
更加好的解决办法
我们决定采用协同的方式来改进我们的Go channel,通过建立一个带有2个的channel处理系统,一个用于将请求体入队,另一个是负责控制worker
在JobQueue
中并发运行时的数量。
这个想法的核心是以一个相对稳定的频率去并行上传数据到S3,这样的话既不会把我们的服务器弄垮,也不会因为连接过多造成很多S3的连接错误。所以我们开始着手于Job/Worker模式。这个对于熟悉Java,c#开发 来说并不陌生,你可以理解为这是Go利用channel来实现worker线程池的方法。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
复制代码
接下来修改我们网络请求的钩子函数,负责创建一个Job的结构体的实例然后将其放入JobQueue channel中等待worker来获取执行。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue <- work
}
w.WriteHeader(http.StatusOK)
}
复制代码
在我们网络服务初始化的时候创建一个Dispather
并且调用Run()
创建一个装有一定数量worker的线程池,用来接收和处理来自JobQueue
的Job
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
复制代码
下面是我们Dispather
的实现
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
复制代码
注意我们限制了worker在线程池的最大数量。我们的应用运行在一个docker化的Go环境中,部署在Amazon的Elasticbeanstalk上,并且尽量遵循12要素原则来配置我们的生产环境,在环境变量中获取对应的参数值,这样我们就可以控制worker的数量以及JobQueue
的最大容量通过直接修改对应的值而不需要重新去部署我们的应用。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
复制代码
当我们将这个版本发布到生产环境以后我们的延迟率马上有明显的下降,我们处理请求的能力有一个质的飞跃。
在一分钟以后等我们的负载均衡器完全启动起来以后,可以看到ElasticBeanstalk上服务器接收的请求数将近一百万次每分钟。通常我们早上都有几个小时的流量高峰期,那时甚至会超过百万请求次数每分钟。而且当我们发布完新代码以后服务器的数量就从100台下降到并稳定在了20台。
当给集群加上合适的配置以及设置自动伸缩以后,甚至可以降到仅仅用4台c4.Large的EC2实例来处理日常业务。并且集群会自动增加新的实例当CPU使用率持续5分钟达到90%时。
总结
简洁化设计永远是我所追求的东西。我们可以设计一个复杂的系统用很多的队列,后台运行worker,复杂的部署等等,但取而代之我们决定利用Elasticbeanstalk强大的自动伸缩功能以及Go所提供开箱即用的并发特性。
总会有一个工具适合你的工作,在有的时候当你Ruby on Rails系统需要一个强大的网络请求处理功能的时候,可以试着考虑一下除了ruby生态圈以外的更加强大和简洁的替代方案。
在你走之前
如果你能关注一下我的Twittwer并且分享给身边的朋友的话,我会非常感谢的!我的Twitter是 twitter.com/mcastilho
有疑问加站长微信联系(非本文作者)