日志系统EFK后续-monitor告警监控

5次阅读

共计 4890 个字符,预计需要花费 13 分钟才能阅读完成。

在之前的文章 日志系统 EFK 后续: fluent-bit 服务独立 中完成了 fluent-bit 采集, fluentd 转发至 kafka, 再到 elasticsearch 的全部过程. 后面提到要将服务器的日志同步到 fluent-bit 所在环境, 这个通过 rsync 做增量同步即可完成, 详细的便不做记录了. 现在主要记录一下 kafka 中告警日志监控并发送消息通知的过程.

fluentd 配置过滤 error 级别日志

<filter fb.dapeng>
  @type record_transformer
  enable_ruby
  <record>
    tag ${record["fbKey"].split('/')[3]}
  </record>
  remove_keys fbKey
</filter>

<match fb.dapeng>
  @type copy
  <store ignore_error>
    @type rewrite_tag_filter
    <rule>
      key     level
      pattern /^ERROR$/
      tag     error.fb.dapeng
    </rule>
  </store>
  <store ignore_error>
    @type kafka_buffered
    brokers kafka 服务器 ip:9092
    topic_key efk
    buffer_type file
    buffer_path /tmp/buffer
    flush_interval 5s
    default_topic efk
    output_data_type json
    compression_codec gzip
    max_send_retries 3
    required_acks -1
    discard_kafka_delivery_failed true
  </store>
</match>

<match error.fb.dapeng>
    @type kafka_buffered
    brokers kafka 服务器 ip:9092
    topic_key efk_error
    buffer_type file
    buffer_path /tmp/buffer_error
    flush_interval 5s
    default_topic efk_error
    output_data_type json
    compression_codec gzip
    max_send_retries 3
    required_acks -1
    discard_kafka_delivery_failed true
</match>
  • copy: 将每个 event 复制到多个输出, store 相当于 match
  • rewrite_tag_filter: 根据 rule 规则为匹配的 event 重写 tag, 并以新的 tag 发出消息, 并重新从上往下处理, 所以注意重写的 tag 不要与当前所在的 match 匹配, 否则会陷入死循环 …

此处匹配 tag 为 fb.dapeng 的消息:

  1. 匹配 level 为 ERROR 的消息重写 tag 为 error.fb.dapeng,
  2. 消息直接发到 kafka 的 topic: efk

对于 1 中 tag 重写为 error.fb.dapeng 的消息, 将消息发到 kafka 的 topic: efk_error

这样 elasticsearch 只用消费 kafka 的 topic: efk, 其中包含全部级别的日志信息,
而对于告警监控 monitor 则只用消费 kafka 的 topic: efk_error, 其中都是 ERROR 级别的日志

注意: rewrite_tag_filter 插件需要安装, 修改 fluentd 的 Dockerfile 重新构建镜像

FROM fluent/fluentd:v1.2
#增加 es 插件, kafka 插件, rewrite_tag_filter 插件
RUN  fluent-gem install fluent-plugin-elasticsearch
RUN  fluent-gem install fluent-plugin-kafka
RUN  fluent-gem install fluent-plugin-rewrite-tag-filter
CMD exec fluentd -c /fluentd/etc/${FLUENTD_CONF} -p /fluentd/plugins $FLUENTD_OPT

monitor 项目消费 kafka 消息

kafka 消费处理逻辑如下:

  1. 首先解析消息 json, 取出 sessionTid(标记服务调用链, 同一次调用操作, sessionTid 相同),
  2. 对于相同的 sessionTid, 只记录第一条 error 消息 (sessionTid 缓存 10s, 之后清除, 一般同一次调用报错不会间隔 10s 之久)
  3. 钉钉群消息通知
@Component
@Slf4j
public class EfkAlarmConsumer {

  @Resource
  EfkAlarmApp efkAlarmApp;

  private final Map<String, Long> cache = new ConcurrentHashMap<>();

  private final Timer timer = new Timer();

  @KafkaListener(topics = {"efk_error"}, groupId = "efk_error_consumer")
  public void processEfkAlarm(ConsumerRecord<String, String> record) {String json = record.value();

    Log l = resolveLog(json);
    if (null == l) {log.error("非法消息: {}", json);
    } else {log.debug("接收消息 Log: {}", l);
      processLog(l);
    }
  }

  private void processLog(Log l) {final String tid = l.getSessionTid();
    Long t = cache.get(tid);
    if (t == null) {cache.put(tid, System.currentTimeMillis());

      // 10s 之后 tid 的数据清除
      timer.schedule(new TimerTask() {
        @Override
        public void run() {cache.remove(tid);
        }
      }, 10000);

      String currIndex = String.format("dapeng_log_index-%s", new SimpleDateFormat("yyyy.MM.dd").format(new Date()));
      // 发钉钉 ...
      String text = String.format("%s", l.getMessage());
      String title = String.format("[%s] %s: %s[%s]", efkAlarmApp.getDingTag(), l.getLogtime(), l.getTag(), l.getHostname());
      String url = String.format(AppConst.ESHEAD_LINK_URI, currIndex, l.getSessionTid());
      
      DingService.send(efkAlarmApp.getDingWebHook(), msg(text, title, url));
    }
  }

  private Log resolveLog(String json) {
    Log l = null;
    try {l = JSON.parseObject(json, Log.class);
    } catch (Throwable e) {log.error("消息转换异常", e);
    }
    return l;
  }

  private String msg(String text, String title, String url) {
    return String.format(
        "{\n" +
        "\"msgtype\": \"link\", \n" +
        "\"link\": {\n" +
        "\"text\": \"%s\", \n" +
        "\"title\": \"%s\", \n" +
        "\"picUrl\": \"\",\n" +
        "\"messageUrl\": \"%s\"\n" +
        "}\n" +
        "}",
        text, title, url);
  }

}

消息链接跳转

链接 link 格式 ESHEAD_LINK_URI :

public class AppConst {

  public static final String ES_BASE_URI = "elasticsearch 服务器访问地址";
  public static final String ESHEAD_BASE_URI = "elasticsearch-head 网页访问地址";
  public static final String ESHEAD_LINK_URI = ESHEAD_BASE_URI + "?curr_index=%s&sessionTid=%s&base_uri=" + ES_BASE_URI;
}

修改 elasticsearch-head 项目中的 index.html

                <script>
                        window.onload = function() {if(location.href.contains("/_plugin/")) {var base_uri = location.href.replace(/_plugin\/.*/, '');
                                }
                                var args = location.search.substring(1).split("&").reduce(function(r, p) {r[decodeURIComponent(p.split("=")[0])] = decodeURIComponent(p.split("=")[1]); return r;
                                }, {});
                                new app.App("body", {
                                        id: "es",
                                        base_uri: args["base_uri"] || base_uri,
                                        auth_user : args["auth_user"] || "",
                                        auth_password : args["auth_password"],
                                        dashboard: args["dashboard"]
                                });

<!-- 如果参数带 sessionTid, 说明是从钉钉消息链接跳转过来的, 页面做特殊操作 -->
if ("sessionTid" in args) {$(".uiApp-headerNewMenuItem")[0].click();

  var t1 = setInterval(function() {if ($(".uiIndexSelector-select > select").length > 0) {clearInterval(t1);

      $(".uiIndexSelector-select > select").last()[0].value = args["curr_index"];
      $(".uiIndexSelector-select > select").last().change();

      var t2 = setInterval(function() {if ($(".field").length > 0) {clearInterval(t2);
          $(".field").last()[0].value = "dapeng_log.sessionTid";
          $(".field").last().change();

          var t3 = setInterval(function() {if ($(".qual").length > 0) {clearInterval(t3);
              $(".qual").last()[0].value =  args["sessionTid"];
              $("button").last().click();
            }
          }, 20);
        }
      }, 20);
    }

  }, 20);
}
                        };
                </script>

修改 dc-all.yml 文件中配置 elasticsearch-head, 新增 index.html 的挂载

- /data/workspace/elasticsearch-head/index.html:/usr/src/app/index.html

OK, 重启 elasticsearch-head 即可

钉钉消息如下:

点击消息跳转 es-head:

正文完
 0