共计 4890 个字符,预计需要花费 13 分钟才能阅读完成。
在之前的文章 日志系统 EFK 后续: fluent-bit 服务独立 中完成了 fluent-bit 采集, fluentd 转发至 kafka, 再到 elasticsearch 的全部过程. 后面提到要将服务器的日志同步到 fluent-bit 所在环境, 这个通过 rsync 做增量同步即可完成, 详细的便不做记录了. 现在主要记录一下 kafka 中告警日志监控并发送消息通知的过程.
fluentd 配置过滤 error 级别日志
<filter fb.dapeng>
@type record_transformer
enable_ruby
<record>
tag ${record["fbKey"].split('/')[3]}
</record>
remove_keys fbKey
</filter>
<match fb.dapeng>
@type copy
<store ignore_error>
@type rewrite_tag_filter
<rule>
key level
pattern /^ERROR$/
tag error.fb.dapeng
</rule>
</store>
<store ignore_error>
@type kafka_buffered
brokers kafka 服务器 ip:9092
topic_key efk
buffer_type file
buffer_path /tmp/buffer
flush_interval 5s
default_topic efk
output_data_type json
compression_codec gzip
max_send_retries 3
required_acks -1
discard_kafka_delivery_failed true
</store>
</match>
<match error.fb.dapeng>
@type kafka_buffered
brokers kafka 服务器 ip:9092
topic_key efk_error
buffer_type file
buffer_path /tmp/buffer_error
flush_interval 5s
default_topic efk_error
output_data_type json
compression_codec gzip
max_send_retries 3
required_acks -1
discard_kafka_delivery_failed true
</match>
-
copy
: 将每个 event 复制到多个输出, store 相当于 match -
rewrite_tag_filter
: 根据 rule 规则为匹配的 event 重写 tag, 并以新的 tag 发出消息, 并重新从上往下处理, 所以注意重写的 tag 不要与当前所在的 match 匹配, 否则会陷入死循环 …
此处匹配 tag 为 fb.dapeng 的消息:
- 匹配 level 为 ERROR 的消息重写 tag 为 error.fb.dapeng,
- 消息直接发到 kafka 的 topic: efk
对于 1 中 tag 重写为 error.fb.dapeng 的消息, 将消息发到 kafka 的 topic: efk_error
这样 elasticsearch 只用消费 kafka 的 topic: efk, 其中包含全部级别的日志信息,
而对于告警监控 monitor 则只用消费 kafka 的 topic: efk_error, 其中都是 ERROR 级别的日志
注意: rewrite_tag_filter
插件需要安装, 修改 fluentd 的 Dockerfile 重新构建镜像
FROM fluent/fluentd:v1.2
#增加 es 插件, kafka 插件, rewrite_tag_filter 插件
RUN fluent-gem install fluent-plugin-elasticsearch
RUN fluent-gem install fluent-plugin-kafka
RUN fluent-gem install fluent-plugin-rewrite-tag-filter
CMD exec fluentd -c /fluentd/etc/${FLUENTD_CONF} -p /fluentd/plugins $FLUENTD_OPT
monitor 项目消费 kafka 消息
kafka 消费处理逻辑如下:
- 首先解析消息 json, 取出 sessionTid(标记服务调用链, 同一次调用操作, sessionTid 相同),
- 对于相同的 sessionTid, 只记录第一条 error 消息 (sessionTid 缓存 10s, 之后清除, 一般同一次调用报错不会间隔 10s 之久)
- 钉钉群消息通知
@Component
@Slf4j
public class EfkAlarmConsumer {
@Resource
EfkAlarmApp efkAlarmApp;
private final Map<String, Long> cache = new ConcurrentHashMap<>();
private final Timer timer = new Timer();
@KafkaListener(topics = {"efk_error"}, groupId = "efk_error_consumer")
public void processEfkAlarm(ConsumerRecord<String, String> record) {String json = record.value();
Log l = resolveLog(json);
if (null == l) {log.error("非法消息: {}", json);
} else {log.debug("接收消息 Log: {}", l);
processLog(l);
}
}
private void processLog(Log l) {final String tid = l.getSessionTid();
Long t = cache.get(tid);
if (t == null) {cache.put(tid, System.currentTimeMillis());
// 10s 之后 tid 的数据清除
timer.schedule(new TimerTask() {
@Override
public void run() {cache.remove(tid);
}
}, 10000);
String currIndex = String.format("dapeng_log_index-%s", new SimpleDateFormat("yyyy.MM.dd").format(new Date()));
// 发钉钉 ...
String text = String.format("%s", l.getMessage());
String title = String.format("[%s] %s: %s[%s]", efkAlarmApp.getDingTag(), l.getLogtime(), l.getTag(), l.getHostname());
String url = String.format(AppConst.ESHEAD_LINK_URI, currIndex, l.getSessionTid());
DingService.send(efkAlarmApp.getDingWebHook(), msg(text, title, url));
}
}
private Log resolveLog(String json) {
Log l = null;
try {l = JSON.parseObject(json, Log.class);
} catch (Throwable e) {log.error("消息转换异常", e);
}
return l;
}
private String msg(String text, String title, String url) {
return String.format(
"{\n" +
"\"msgtype\": \"link\", \n" +
"\"link\": {\n" +
"\"text\": \"%s\", \n" +
"\"title\": \"%s\", \n" +
"\"picUrl\": \"\",\n" +
"\"messageUrl\": \"%s\"\n" +
"}\n" +
"}",
text, title, url);
}
}
消息链接跳转
链接 link 格式 ESHEAD_LINK_URI
:
public class AppConst {
public static final String ES_BASE_URI = "elasticsearch 服务器访问地址";
public static final String ESHEAD_BASE_URI = "elasticsearch-head 网页访问地址";
public static final String ESHEAD_LINK_URI = ESHEAD_BASE_URI + "?curr_index=%s&sessionTid=%s&base_uri=" + ES_BASE_URI;
}
修改 elasticsearch-head 项目中的 index.html
<script>
window.onload = function() {if(location.href.contains("/_plugin/")) {var base_uri = location.href.replace(/_plugin\/.*/, '');
}
var args = location.search.substring(1).split("&").reduce(function(r, p) {r[decodeURIComponent(p.split("=")[0])] = decodeURIComponent(p.split("=")[1]); return r;
}, {});
new app.App("body", {
id: "es",
base_uri: args["base_uri"] || base_uri,
auth_user : args["auth_user"] || "",
auth_password : args["auth_password"],
dashboard: args["dashboard"]
});
<!-- 如果参数带 sessionTid, 说明是从钉钉消息链接跳转过来的, 页面做特殊操作 -->
if ("sessionTid" in args) {$(".uiApp-headerNewMenuItem")[0].click();
var t1 = setInterval(function() {if ($(".uiIndexSelector-select > select").length > 0) {clearInterval(t1);
$(".uiIndexSelector-select > select").last()[0].value = args["curr_index"];
$(".uiIndexSelector-select > select").last().change();
var t2 = setInterval(function() {if ($(".field").length > 0) {clearInterval(t2);
$(".field").last()[0].value = "dapeng_log.sessionTid";
$(".field").last().change();
var t3 = setInterval(function() {if ($(".qual").length > 0) {clearInterval(t3);
$(".qual").last()[0].value = args["sessionTid"];
$("button").last().click();
}
}, 20);
}
}, 20);
}
}, 20);
}
};
</script>
修改 dc-all.yml 文件中配置 elasticsearch-head, 新增 index.html 的挂载
- /data/workspace/elasticsearch-head/index.html:/usr/src/app/index.html
OK, 重启 elasticsearch-head 即可
钉钉消息如下:
点击消息跳转 es-head: