技术栈

OS: Ubuntu 20.04 LTSdocker: 20.10.12docker-compose: 1.25.0Elasticsearch: 7.16.3Logstash: 7.16.3kafka: 2.13-2.8.1Python: 3.8.2kafka-python: 2.0.2

用 docker 搭建 logstash

官网文档

  • docker 镜像拉取:https://www.elastic.co/guide/...
  • docker 镜像配置:https://www.elastic.co/guide/...
  • docker 镜像目录构造:https://www.elastic.co/guide/...

配置步骤

  • 拉取镜像
docker pull docker.elastic.co/logstash/logstash:7.16.3
  • logstash 配置文件 /home/qbit/logstash/settings/logstash.yml
http.host: "0.0.0.0"xpack.monitoring.elasticsearch.hosts: [ "http://192.168.1.46:9200" ]
  • 管道配置文件 /home/qbit/logstash/pipeline/:/usr/share/logstash/pipeline/es-pipeline.conf
input {    kafka {        codec => json        bootstrap_servers => "192.168.1.46:9092"        topics => ["coder_topic"]    }}filter {    mutate {        add_field => { "timestamp" => "%{@timestamp}" }        remove_field => ["@version"]    }    date {        match => [ "timestamp", "ISO8601" ]     # 这里用 @timestamp 解析会出错        target => "time0"    }    ruby {        code => "            time1 = event.get('@timestamp').time.getlocal('+08:00').strftime('%Y-%m-%dT%H:%M:%S+08')            time2 = Time.parse(event.get('timestamp')).getlocal('+08:00').strftime('%Y-%m-%dT%H:%M:%S+08')            time3 = Time.now.getlocal('+08:00').strftime('%Y-%m-%dT%H:%M:%S+08')            event.set('time1', time1)            event.set('time2', time2)            event.set('time3', time3)        "        }}output {    stdout {        codec => json_lines    }    elasticsearch {        hosts => ["192.168.1.46:9200"]        index => "coder_index"        document_id => "%{id}"    }}
  • 创立容器
docker run --rm -it --name logstash \-v /home/qbit/logstash/pipeline/:/usr/share/logstash/pipeline/ \-v /home/qbit/logstash/settings/logstash.yml:/usr/share/logstash/config/logstash.yml \docker.elastic.co/logstash/logstash:7.16.3

用 Python 发送音讯

  • producer.py
# encoding: utf-8# author: qbit# date: 2022-01-28# summary: 向 kafka 发送音讯import jsonfrom kafka import KafkaProducerdef producer():    producer = KafkaProducer(        bootstrap_servers="192.168.1.46:9092",        key_serializer=lambda k: json.dumps(k).encode('utf8'),        value_serializer=lambda v: json.dumps(v).encode('utf8'),    )    id = 'qbit'    dic = {'id': f"{id}", 'age': '23'}    producer.send(topic="coder_topic", key=id, value=dic)    print(f"send key: {id}, value: {dic}")if __name__ == "__main__":    producer()
  • 运行后果
# python3 producer.pysend key: qbit, value: {'id': 'qbit', 'age': '23'}

用 Kibana 查看 ES 中数据

GET coder_index/_search
{    "_index": "coder_index",    "_type": "_doc",    "_id": "qbit",    "_score": 1.0,    "_source": {        "id": "qbit",        "age": "23",        "@timestamp": "2022-01-28T01:03:40.733Z",   // logstash event 工夫戳        "timestamp":  "2022-01-28T01:03:40.733Z",        "time0":      "2022-01-28T01:03:40.733Z",        "time1":      "2022-01-28T09:03:40+08",        "time2":      "2022-01-28T09:03:40+08",        "time3":      "2022-01-28T09:03:40+08"      // filter 中 ruby 代码生成的工夫戳    }}
本文出自 qbit snap