nginx收集指定接口日志到elk

avatar
作者
猴君
阅读量:0

配置filebeat

grep -Ev “^ #|$|#|^ #” /data/filebeat/filebeat.yml

filebeat.inputs: - type: log   enabled: true   paths:     - /data/nginx_logs/nginx-access-*.log      fields:                                       #在日志中增加一个字段,字段为log_topic,值为nginx_access,logstash根据带有这个字段的日志存储到指定的es索引库     app_name: nginx-appname     profiles_active: pro     app_node: nginx_hostname   fields_under_root: true   tail_files: true   include_lines: ['/apis/order/save'] #只收集日志中的指定行 filebeat.config.modules:   path: ${path.config}/modules.d/*.yml   reload.enabled: false setup.template.settings:   index.number_of_shards: 3 setup.kibana: processors:   - add_host_metadata:       when.not.contains.tags: forwarded   - add_cloud_metadata: ~   - add_docker_metadata: ~   - add_kubernetes_metadata: ~ output.kafka:                                   #输出到kafka系统   enabled: true   hosts: ["kafka1:9092","kafka2:9092","kafka3:9092"]                           #kafka的地址   topic: 'nginx_appname_topic'               #指定将日志存储到kafka集群的哪个topic中,这里的topic值是引用在inputs中定义的fields,通过这种方式可以将不同路径的日志分别存储到不同的topic中   username: kafka_user   password: kafka_password 

配置logstash

 cat /usr/local/app/logstash/config/logstash.conf  # Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline.  input {         kafka {                         #类型为kafka                 bootstrap_servers => ["kafka1:9092,kafka2:9092,kafka3:9092"]                    #kafka集群地址                 group_id => 'logstash_groupname_consumer'                 topics => ["pro_log_topic","test_log_topic","uat_log_topic","nginx_appname_topic"]                     #要读取那些kafka topics                 client_id => "appname_pro_logs"                 consumer_threads => 3                 sasl_mechanism => "PLAIN"                 security_protocol => "SASL_PLAINTEXT"                 sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka_user'  password='kafka_password';"                 codec => "json"                                                                         #处理json格式的数据                 auto_offset_reset => "latest"                                           #只消费最新的kafka数据         }         kafka {                         #类型为kafka                 bootstrap_servers => ["kafkaip:9092"]                        #kafka集群地址                 group_id => 'logstash_groupname_consumer2'                 topics => ["topic"]                 #要读取那些kafka topics                 client_id => "appname_test_logs"                 consumer_threads => 3                 sasl_mechanism => "PLAIN"                 security_protocol => "SASL_PLAINTEXT"                 sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka_user'  password='kafka_password';"                 codec => "json"                                                                         #处理json格式的数据                 auto_offset_reset => "latest"                                           #只消费最新的kafka数据         }   }  filter {         mutate {                 lowercase => ["app_name"]                 remove_field => ["_index","_id","_type","_version","_score","referer","agent","@version"]                 #删除没用的字段         }         date {         match => ["date", "yyyy-MM-dd HH:mm:ss.SSS"]                 target => '@timestamp'                 timezone => 'Asia/Shanghai'         }         ruby{                 code => "event.set('index_day', (event.get('@timestamp').time.localtime).strftime('%Y.%m.%d'))"         } }   output {   elasticsearch {     hosts => ["172.19.189.179:9200","172.19.38.38:9200","172.19.38.39:9200"]     index => "%{[app_name]}-%{[profiles_active]}-%{index_day}"     #index => "%{[app_name]}-%{[profiles_active]}-%{+YYYY.MM.dd}"     codec =>  "json"     user => "elastic"     password => "esappname0227"   } } 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!