logstash.conf,放到{logstash}/conf.d目录下

# Sample Logstash configuration for creating a simple# Beats -> Logstash -> Elasticsearch pipeline.input {    # tcp {    #  mode => "server"    #  host => "0.0.0.0"    #  port => 5044    #  tcp_keep_alive => true    #  codec => json_lines    # }    file {        #监听文件的门路        path => ["/opt/logs"]        #设置新事件的标记        delimiter => "\n"        #设置多长时间扫描目录,发现新文件        discover_interval => 15        #设置多长时间检测文件是否批改        stat_interval => 3         #监听文件的起始地位,默认是end        start_position => beginning        #设置多长时间会写入读取的地位信息        sincedb_write_interval => 5        codec => json_lines    }    http{        host => "0.0.0.0"        port => 8080        additional_codecs => {"application/json"=>"json"}        codec => "json_lines"        threads => 4        ssl => false    }    # kafka{    #     bootstrap_servers => ["x.x.x.x:9092"]    #     auto_offset_reset => "latest"    #     consumer_threads => 5    #     decorate_events => false          # 须要监听的topic    #     topics => ["spy_admin_topic", "sd_topic"]    #     codec => json_lines    # }}filter {#    date {#    timezone => "Asia/Shanghai"#    match => ["timestamp", "yyyy-MM-dd HH:mm:ss S"]#    }    # https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match    date {        match => [ "@timestamp", "ISO8601" ]        locale => "cn"        target => "@timestamp"    }    if "_" in [ad_id] {      mutate {        split => { "ad_id" => "_" }        add_field => { "t_ad_id" => "%{[ad_id][1]}" }      }      mutate {        rename => {"t_ad_id" => "ad_id"}      }    }    mutate {        convert => {            "client_id" => "integer"            "@version" => "integer"            "app_name" => "string"            "file" => "string"            "host" => "string"            "status_code" => "integer"            "duration" => "integer"            "size" => "integer"            "cnt7" => "integer"            "cnt6" => "integer"            "cnt5" => "integer"            "cnt4" => "integer"            "cnt3" => "integer"            "cnt2" => "integer"            "cnt1" => "integer"        }        remove_field => [ "@version", "_score", "_source", "_type", "timestamp", "level_value", "logger_name" ]    }}output {  elasticsearch {    hosts => ["http://localhost:9200"]    index => "%{[app_name]}"    user => "elastic"    password => "1qaz@WSX3edc"    pool_max => 500    pool_max_per_route => 2000    retry_initial_interval => 4    retry_max_interval => 16  }    #if [level] == "ERROR" {    #  mongodb {    #    collection => "%{[app_name]}"    #    database => "logstash"    #    isodate => true    #    uri => "mongodb://spy_user:L2LRi7BAAP163fii@107.150.33.170:33017/?authSource=admin"    #    bulk => true    #    codec => json    #  }  #}}