阅读量:0
配置filebeat
grep -Ev “^ #|$|#|^ #” /data/filebeat/filebeat.yml
filebeat.inputs: - type: log enabled: true paths: - /data/nginx_logs/nginx-access-*.log fields: #在日志中增加一个字段,字段为log_topic,值为nginx_access,logstash根据带有这个字段的日志存储到指定的es索引库 app_name: nginx-appname profiles_active: pro app_node: nginx_hostname fields_under_root: true tail_files: true include_lines: ['/apis/order/save'] #只收集日志中的指定行 filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 3 setup.kibana: processors: - add_host_metadata: when.not.contains.tags: forwarded - add_cloud_metadata: ~ - add_docker_metadata: ~ - add_kubernetes_metadata: ~ output.kafka: #输出到kafka系统 enabled: true hosts: ["kafka1:9092","kafka2:9092","kafka3:9092"] #kafka的地址 topic: 'nginx_appname_topic' #指定将日志存储到kafka集群的哪个topic中,这里的topic值是引用在inputs中定义的fields,通过这种方式可以将不同路径的日志分别存储到不同的topic中 username: kafka_user password: kafka_password
配置logstash
cat /usr/local/app/logstash/config/logstash.conf # Sample Logstash configuration for creating a simple # Beats -> Logstash -> Elasticsearch pipeline. input { kafka { #类型为kafka bootstrap_servers => ["kafka1:9092,kafka2:9092,kafka3:9092"] #kafka集群地址 group_id => 'logstash_groupname_consumer' topics => ["pro_log_topic","test_log_topic","uat_log_topic","nginx_appname_topic"] #要读取那些kafka topics client_id => "appname_pro_logs" consumer_threads => 3 sasl_mechanism => "PLAIN" security_protocol => "SASL_PLAINTEXT" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka_user' password='kafka_password';" codec => "json" #处理json格式的数据 auto_offset_reset => "latest" #只消费最新的kafka数据 } kafka { #类型为kafka bootstrap_servers => ["kafkaip:9092"] #kafka集群地址 group_id => 'logstash_groupname_consumer2' topics => ["topic"] #要读取那些kafka topics client_id => "appname_test_logs" consumer_threads => 3 sasl_mechanism => "PLAIN" security_protocol => "SASL_PLAINTEXT" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka_user' password='kafka_password';" codec => "json" #处理json格式的数据 auto_offset_reset => "latest" #只消费最新的kafka数据 } } filter { mutate { lowercase => ["app_name"] remove_field => ["_index","_id","_type","_version","_score","referer","agent","@version"] #删除没用的字段 } date { match => ["date", "yyyy-MM-dd HH:mm:ss.SSS"] target => '@timestamp' timezone => 'Asia/Shanghai' } ruby{ code => "event.set('index_day', (event.get('@timestamp').time.localtime).strftime('%Y.%m.%d'))" } } output { elasticsearch { hosts => ["172.19.189.179:9200","172.19.38.38:9200","172.19.38.39:9200"] index => "%{[app_name]}-%{[profiles_active]}-%{index_day}" #index => "%{[app_name]}-%{[profiles_active]}-%{+YYYY.MM.dd}" codec => "json" user => "elastic" password => "esappname0227" } }