背景
作为一位刚进公司的小白,参与到项目的第一个任务是为操作记录的存储增加消息队列,为什么我们要这么做呢?原因如下:在现有系统中我们直接将用户的操作记录增加到 mongodb 数据库中,但是在我们的系统出现峰值的时候,发现 mongodb 受不了,为此我们要做到削峰这个功能,按照惯例我们想到了使用消息队列,同时由于我们在项目中普遍采用 aws 的云服务,为此我们采用了 aws 的消息队列。
注意事项
- aws sqs 收费是按照请求次数收费所以要尽量使用批量操作
- aws sqs 的消费上线是 12000 次,最多允许 12000 个在传递的数据
- aws sqs 容量无限大
- aws sqs 的批量操作的上限是 10 条数据(毕竟是按次数收费)
- aws sqs 并行取数据的过程中可能会出现重复,我们利用数据库的 ID 来去重,注意我们在生产 id 的时候使用 mongodb 自己的库来生成,原因是依照 mongodb 生成的 id 比较均匀,存入的数据库中的树形结构也比较平衡,效率比较高
操作步骤
使用 aws sqs 和使用其他的消息队列基本步骤一致,aws sqs 的官方已经给出了非常详尽的使用说明,尽量参考官方文档,下面给出简单的操作步骤,以及示例代码, 代码是用 go 写的,其他的语言可以参考 go 的官方文档
- 配置 aws sqs 的连接信息
awsSqs := AwsSQS{}
creds := credentials.NewStaticCredentials("key", "secret", "")
sess := session.Must(session.NewSession(&aws.Config{Region: aws.String("region"),
Credentials: creds,
}))
awsSqs.svc = sqs.New(sess)
- 向 aws sqs 发送数据
// 将消息发送给队列
func (awsSqs *AwsSQS) SendMessage(record string, qURL string) *Error {
_, err := awsSqs.svc.SendMessage(&sqs.SendMessageInput{MessageBody: aws.String(record),
QueueUrl: &qURL,
})
if err != nil {Errorf("Error Send Message to sqs: err = %v", err)
return NewError(ErrorCodeInnerError, err.Error())
}
return nil
}
- 从 aws sqs 获取数据
// 从队列中获取消息
func (awsSqs *AwsSQS) ReserveMessage(qURL string) (*sqs.ReceiveMessageOutput, *Error) {
result, err := awsSqs.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: &qURL,
MaxNumberOfMessages: aws.Int64(10),
WaitTimeSeconds: aws.Int64(10),
})
if err != nil {Errorf("Error aws sqs ReceiveMessage : err=%v", err)
return nil, NewError(ErrorCodeInnerError, err.Error())
}
return result, nil
}
- 从 aws sqs 删除数据
deleteMessageList := make([]*sqs.DeleteMessageBatchRequestEntry, 0)
deleteMessage := sqs.DeleteMessageBatchRequestEntry{Id: message.MessageId, ReceiptHandle: message.ReceiptHandle}
deleteMessageList = append(deleteMessageList, &deleteMessage)
// 将队列中的消息删除 (批量删除)
func (awsSqs *AwsSQS) DeleteMessage(list []*sqs.DeleteMessageBatchRequestEntry, qURL string) *Error {
// delete message
_, err := awsSqs.svc.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
QueueUrl: &qURL,
Entries: list,
})
if err != nil {Errorf("Delete Message error:error =%v", err)
return NewError(ErrorCodeInnerError, err.Error())
}
return nil
}
- 在存储到 moongodb 的过程中防止重复
// 自定义 mongodb 的_id,使用 mongodb 的库来生成 id
id := bson.NewObjectId().Hex()
entity.id = id
type entity struct {Id string `bson:"_id,omitempty"`}
参考资料
官方代码示例
aws 限制