乐趣区

使用aws-sqs-做缓冲队列go

背景

作为一位刚进公司的小白,参与到项目的第一个任务是为操作记录的存储增加消息队列,为什么我们要这么做呢?原因如下:在现有系统中我们直接将用户的操作记录增加到 mongodb 数据库中,但是在我们的系统出现峰值的时候,发现 mongodb 受不了,为此我们要做到削峰这个功能,按照惯例我们想到了使用消息队列,同时由于我们在项目中普遍采用 aws 的云服务,为此我们采用了 aws 的消息队列。

注意事项

  1. aws sqs 收费是按照请求次数收费所以要尽量使用批量操作
  2. aws sqs 的消费上线是 12000 次,最多允许 12000 个在传递的数据
  3. aws sqs 容量无限大
  4. aws sqs 的批量操作的上限是 10 条数据(毕竟是按次数收费)
  5. aws sqs 并行取数据的过程中可能会出现重复,我们利用数据库的 ID 来去重,注意我们在生产 id 的时候使用 mongodb 自己的库来生成,原因是依照 mongodb 生成的 id 比较均匀,存入的数据库中的树形结构也比较平衡,效率比较高

操作步骤

使用 aws sqs 和使用其他的消息队列基本步骤一致,aws sqs 的官方已经给出了非常详尽的使用说明,尽量参考官方文档,下面给出简单的操作步骤,以及示例代码, 代码是用 go 写的,其他的语言可以参考 go 的官方文档

  1. 配置 aws sqs 的连接信息
awsSqs := AwsSQS{}

creds := credentials.NewStaticCredentials("key", "secret", "")
sess := session.Must(session.NewSession(&aws.Config{Region:      aws.String("region"),
    Credentials: creds,
}))

awsSqs.svc = sqs.New(sess)
  1. 向 aws sqs 发送数据
// 将消息发送给队列
func (awsSqs *AwsSQS) SendMessage(record string, qURL string) *Error {
    _, err := awsSqs.svc.SendMessage(&sqs.SendMessageInput{MessageBody: aws.String(record),
        QueueUrl:    &qURL,
    })
    if err != nil {Errorf("Error Send Message to sqs: err = %v", err)
        return NewError(ErrorCodeInnerError, err.Error())
    }
    return nil
}
  1. 从 aws sqs 获取数据
// 从队列中获取消息
func (awsSqs *AwsSQS) ReserveMessage(qURL string) (*sqs.ReceiveMessageOutput, *Error) {
    result, err := awsSqs.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
        QueueUrl:            &qURL,
        MaxNumberOfMessages: aws.Int64(10),
        WaitTimeSeconds:     aws.Int64(10),
    })

    if err != nil {Errorf("Error aws sqs ReceiveMessage : err=%v", err)
        return nil, NewError(ErrorCodeInnerError, err.Error())
    }

    return result, nil
}
  1. 从 aws sqs 删除数据
deleteMessageList := make([]*sqs.DeleteMessageBatchRequestEntry, 0)
deleteMessage := sqs.DeleteMessageBatchRequestEntry{Id: message.MessageId, ReceiptHandle: message.ReceiptHandle}
deleteMessageList = append(deleteMessageList, &deleteMessage)
// 将队列中的消息删除 (批量删除)
func (awsSqs *AwsSQS) DeleteMessage(list []*sqs.DeleteMessageBatchRequestEntry, qURL string) *Error {
    // delete message
    _, err := awsSqs.svc.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
        QueueUrl: &qURL,
        Entries:  list,
    })

    if err != nil {Errorf("Delete Message error:error =%v", err)
        return NewError(ErrorCodeInnerError, err.Error())
    }
    return nil
}
  1. 在存储到 moongodb 的过程中防止重复
// 自定义 mongodb 的_id,使用 mongodb 的库来生成 id
id := bson.NewObjectId().Hex()
entity.id = id
type entity struct {Id                 string `bson:"_id,omitempty"`}

参考资料

官方代码示例
aws 限制

退出移动版