背景
作为一位刚进公司的小白,参与到项目的第一个任务是为操作记录的存储增加消息队列,为什么我们要这么做呢?原因如下:在现有系统中我们直接将用户的操作记录增加到mongodb数据库中,但是在我们的系统出现峰值的时候,发现mongodb受不了,为此我们要做到削峰这个功能,按照惯例我们想到了使用消息队列,同时由于我们在项目中普遍采用aws的云服务,为此我们采用了aws的消息队列。
注意事项
- aws sqs 收费是按照请求次数收费所以要尽量使用批量操作
- aws sqs 的消费上线是12000次,最多允许12000个在传递的数据
- aws sqs 容量无限大
- aws sqs 的批量操作的上限是10条数据(毕竟是按次数收费)
- aws sqs并行取数据的过程中可能会出现重复,我们利用数据库的ID来去重,注意我们在生产id的时候使用mongodb自己的库来生成,原因是依照mongodb生成的id比较均匀,存入的数据库中的树形结构也比较平衡,效率比较高
操作步骤
使用aws sqs和使用其他的消息队列基本步骤一致,aws sqs的官方已经给出了非常详尽的使用说明,尽量参考官方文档,下面给出简单的操作步骤,以及示例代码,代码是用go写的,其他的语言可以参考go的官方文档
- 配置aws sqs的连接信息
awsSqs := AwsSQS{}
creds := credentials.NewStaticCredentials("key", "secret", "")
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String("region"),
Credentials: creds,
}))
awsSqs.svc = sqs.New(sess)
- 向aws sqs发送数据
// 将消息发送给队列
func (awsSqs *AwsSQS) SendMessage(record string, qURL string) *Error {
_, err := awsSqs.svc.SendMessage(&sqs.SendMessageInput{
MessageBody: aws.String(record),
QueueUrl: &qURL,
})
if err != nil {
Errorf("Error Send Message to sqs: err = %v", err)
return NewError(ErrorCodeInnerError, err.Error())
}
return nil
}
- 从aws sqs 获取数据
// 从队列中获取消息
func (awsSqs *AwsSQS) ReserveMessage(qURL string) (*sqs.ReceiveMessageOutput, *Error) {
result, err := awsSqs.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: &qURL,
MaxNumberOfMessages: aws.Int64(10),
WaitTimeSeconds: aws.Int64(10),
})
if err != nil {
Errorf("Error aws sqs ReceiveMessage : err=%v ", err)
return nil, NewError(ErrorCodeInnerError, err.Error())
}
return result, nil
}
- 从aws sqs 删除数据
deleteMessageList := make([]*sqs.DeleteMessageBatchRequestEntry, 0)
deleteMessage := sqs.DeleteMessageBatchRequestEntry{Id: message.MessageId, ReceiptHandle: message.ReceiptHandle}
deleteMessageList = append(deleteMessageList, &deleteMessage)
// 将队列中的消息删除(批量删除)
func (awsSqs *AwsSQS) DeleteMessage(list []*sqs.DeleteMessageBatchRequestEntry, qURL string) *Error {
// delete message
_, err := awsSqs.svc.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
QueueUrl: &qURL,
Entries: list,
})
if err != nil {
Errorf("Delete Message error:error =%v", err)
return NewError(ErrorCodeInnerError, err.Error())
}
return nil
}
- 在存储到moongodb的过程中防止重复
// 自定义mongodb的_id,使用mongodb的库来生成id
id := bson.NewObjectId().Hex()
entity.id = id
type entity struct {
Id string `bson:"_id,omitempty"`
}
参考资料
有疑问加站长微信联系(非本文作者)