关于存储:分布式日志存储架构代码实践

35次阅读

共计 4015 个字符,预计需要花费 11 分钟才能阅读完成。

上一篇,咱们针对分布式日志存储方案设计做了一个实践上的剖析与总结,文章地址。本文咱们将联合其中的一种计划进行实战代码的演示。另外一种计划,将在下一篇文章进行分享,此篇文章分享的是 MongoDB 架构模式。在知乎上公布该文章时,有人提到应用 opentelemtry+tsdb,感兴趣的能够去理解一下。

架构模式

通过上一篇的剖析,咱们大抵总结出这样的一个架构设计,架构图如下:

  1. 业务 A、业务 B、业务 C 和业务 D 示意咱们理论的接口地址。当客户端发送申请时,间接的解决模块。系统日志的生成也是在该模块中进行生成。
  2. MQ 服务,则是作为日志队列,长期存储日志音讯。这样是为了进步日志的解决能力。在高并发的业务场景中,如果实时的将日志写入到 MongoDB 中,这样难免会升高业务解决的速度。
  3. MongoDB 服务,则是最终的日志落地。也就是说将咱们的日志存储到磁盘,以达到数据的长久化,防止数据失落。
  4. 对于零碎的日志查看,咱们能够间接登录 MongoDB 服务进行 SQL 查问。个别为了效率、平安等起因,会提供一个治理界面来实时查看 MongoDB 的日志。这里就是咱们的 web 展现界面。能够通过 web 界面对日志做查问、筛选、删除等操作。

下面提到的是一个架构的大抵流程图。上面将具体的代码演示,须要查看代码的能够通过 Github 仓库地址获取。

代码演示

代码中要操作 RabbitMQ 服务、MongoDB 服务、API 业务逻辑解决和其余的服务,我这里将代码调用逻辑设计为如下构造。

magin.go(入口文件)->api(业务解决)->rabbitmq(日志生产者、消费者)->MongoDB(日志长久化)。
整顿代码架构如下:

代码阐明

上面列举几个应用到的技术栈以及对应的版本,可能须要在应用本代码时,须要留神一下这些服务的版本兼容,防止代码无奈运行。

  1. Go version 1.16。
  2. RabbitMQ version 3.10.0。
  3. MongoDB version v5.0.7。

上面对几个略微重要的代码段,进行简略阐明,残缺代码间接查看 Github 仓库即可。

入口文件

package main

import (
    "fmt"
    "net/http"

    "github.com/gin-gonic/gin"

    "gologs/api"
)

func main() {r := gin.Default()

    // 定义一个 order-api 的路由地址,并做对应的接口返回
    r.GET("/order", func(ctx *gin.Context) {orderApi, err := api.OrderApi()
        if err != nil {
            ctx.JSON(http.StatusInternalServerError, gin.H{
                "code": 1,
                "msg":  orderApi,
                "data": map[string]interface{}{},
            })
        }
        ctx.JSON(http.StatusOK, gin.H{
            "code": 1,
            "msg":  orderApi,
            "data": map[string]interface{}{},
        })
    })
    // 指定服务地址和端口号
    err := r.Run(":8081")
    if err != nil {fmt.Println("gin server fail, fail reason is", err)
    }
}

订单业务逻辑

package api

import (
    "time"

    "gologs/rabbit"
)
// 订单业务逻辑解决,并调用 Rabbit 服务投递 order 日志
func OrderApi() (string, error) {orderMsg := make(map[string]interface{})
    orderMsg["time"] = time.Now()
    orderMsg["type"] = "order"
    err := rabbit.SendMessage(orderMsg)
    if err != nil {return "write rabbitmq log fail", err}
    return "", nil
}

RabbitMQ 解决日志

package rabbit

import (
    "encoding/json"

    "github.com/streadway/amqp"

    "gologs/com"
)

func SendMessage(msg map[string]interface{}) error {channel := Connection()
    declare, err := channel.QueueDeclare("logs", false, false, false, false, nil)
    if err != nil {com.FailOnError(err, "RabbitMQ declare queue fail!")
        return err
    }

    marshal, err := json.Marshal(msg)
    if err != nil {return err}
    err = channel.Publish(
        "",
        declare.Name,
        false,
        false,
        amqp.Publishing{
            ContentType:  "text/plain", // message type
            Body:         marshal,      // message body
            DeliveryMode: amqp.Persistent,
        })
    if err != nil {com.FailOnError(err, "rabbitmq send message fail!")
        return err
    }
    return nil
}

消费者生产音讯

package rabbit

import (
    "encoding/json"
    "fmt"
    "time"

    "gologs/com"
    "gologs/mongo"
)

func ConsumerMessage() {channel := Connection()

    declare, err := channel.QueueDeclare("logs", false, false, false, false, nil)
    if err != nil {com.FailOnError(err, "queue declare fail")
    }

    consume, err := channel.Consume(
        declare.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {com.FailOnError(err, "message consumer failt")
    }

    for d := range consume {msg := make(map[string]interface{})
        err := json.Unmarshal(d.Body, &msg)
        fmt.Println(msg)
        if err != nil {com.FailOnError(err, "json parse error")
        }
        one, err := mongo.InsertOne(msg["type"].(string), msg)
        if err != nil {com.FailOnError(err, "mongodb insert fail")
        }
        fmt.Println(one)
        time.Sleep(time.Second * 10)
    }
}

调用 MongoDB 长久化日志

package mongo

import (
    "context"
    "errors"

    "gologs/com"
)

func InsertOne(collectionName string, logs map[string]interface{}) (interface{}, error) {collection := Connection().Database("logs").Collection(collectionName)
    one, err := collection.InsertOne(context.TODO(), logs)

    if err != nil {com.FailOnError(err, "write mongodb log fail")
        return "", errors.New(err.Error())
    }

    return one.InsertedID, nil
}

实战演示

下面大抵分享了代码逻辑,接下来演示代码的运行成果。

启动服务

启动服务,须要进入到 log 是目录上面,main.go 就是理论的入口文件。

启动日志消费者

启动日志消费者,保障一旦有日志,消费者能把日志实时存储到 MongoDB 中。同样的须要到 logs 目录下执行该命令。

go run rabbit_consumer.go

调用 API 服务

为了演示,这里间接应用浏览器去拜访该 order 对应的接口地址。http://127.0.0.1:8081/order。接口返回如下信息:

如果 code 是 1 则示意接口胜利,反之是不胜利,须要在调用的时候留神一下。

这里能够多拜访几次,查看 RabbitMQ 中的队列信息。如果消费者生产的比较慢,应该能够看到如下信息:

消费者监控

因为咱们在启动服务时,就独自开启了一个消费者线程,这个线程失常状况下时始终作为后台程序在运行。咱们能够查看大抵的生产数据内容,如下图:

MongoDB 查看数据

RabbitMQ 消费者将日志信息存储到 MongoDB 中,接下来间接通过 MongoDB 进行查问。

db.order.find();
[
  {"_id": {"$oid": "627675df5f796f95ddb9bbf4"},
    "time": "2022-05-07T21:36:02.374928+08:00",
    "type": "order"
  },
  {"_id": {"$oid": "627675e95f796f95ddb9bbf6"},
    "time": "2022-05-07T21:36:02.576065+08:00",
    "type": "order"
  }
  ................
]

文末总结

对于该架构的总体演示,就到此结束。当然还有很多细节须要欠缺,此篇内容次要是分享一个大抵的流程。下一篇咱们将分享如何在 Linux 上大家 ELK 环境,以便咱们前期做理论代码演示。

正文完
 0