k8s logstash多管道配置

avatar
作者
猴君
阅读量:0

背景

采用的是标准的ELK+filebeat架构

ES版本:7.17.15

logstash版本:7.17.15

filebeat版本: 7.17.15

helm版本:7.17.3,官方地址:elastic/helm-charts

说一下为什么会想到使用多管道的原因

我们刚开始部署的是单管道,里面有多种类型的日志需要传输,比如埋点日志、系统日志、日志推送至kafka、日志推送至阿里云sls

后来在系统运行中,开发人员不够细心,在配置埋点日志的时候,出现了部分语法错误,导致整个日志系统受到影响,logstash整个推送都不可用,相当于没有隔离性,所以,在调研的时候,看到了多管道,多实例,所以来进行研究一下。

为什么使用多管道?

优点

  • 隔离性:每个管道可以独立处理不同类型的日志或数据流,有助于数据的分类和管理。
  • 灵活性:可以在同一实例中灵活配置和管理多个管道,以适应不同的需求。
  • 资源共享:多个管道共享同一个Logstash实例的资源,提高资源利用率。

缺点

  • 复杂性:配置和管理多个管道会增加复杂性,特别是在调试和维护时。
  • 性能瓶颈:当管道数量过多或数据量很大时,可能导致性能瓶颈,影响整体处理效率。

适用于需要在同一Logstash实例中处理不同类型数据的场景,提高资源利用率,但可能增加配置和管理的复杂性。

相较于单管道

单管道配置简单,适用于数据流较少或需求简单的场景,并且他的处理流程单一,性能更容易预测和管理

同时,他的缺点也很明显,不具备隔离性,正如我背景里面遇到的问题,灵活性,拓展性也比较差。

适用于简单数据流的处理

相较于多实例

优点:

  1. 高可用性:通过部署多个Logstash实例,可以提高系统的高可用性,减少单点故障的风险。
  2. 扩展性强:可以根据需求增加实例,水平扩展系统处理能力。
  3. 独立性:每个实例可以独立处理特定的数据流或任务,减少相互影响。

缺点:

  1. 资源开销大:需要更多的资源(内存、CPU等)来运行多个实例,增加运维成本。
  2. 管理复杂:需要配置和管理多个实例,增加了运维的复杂性。

适用于高可用性和高扩展性需求的场景,独立性强,但资源开销大,管理复杂

helm的方式配置

整体分为两部分,主管道和子管道

主管道用来接受数据来源,以及公共的处理,通过filter处理之后,在将信息分发给子管道,子管道来控制输出源

在logstash需要配置两个地方:

  • 一个是pipeline.yml文件,配置需要加载的管道文件以及id;
  • 一个是管道文件配置内容,确定输入源以及输出源头;

下面的方式采用的helm的方式,docker方式或者其他方式可参考一下

示例如下:

filebeat配置

filebeat.inputs: - type: log   paths:   - /tmp/logs/biz/*.log   fields:     fb_collect_app: xxx-xxx-test-biz     fb_collect_type: bizlog     system_env: dev  # 如果不包含 "prod",设置为test     send_kafka: "false"   fields_under_root: true - type: log   paths:   - /tmp/logs/sys/*.log   multiline.pattern: '^\s|^"|^Caused by:'   multiline.match: after   fields:     fb_collect_app: xxx-xxx-test-sys     fb_collect_type: syslog     system_env: dev  # 如果不包含 "prod",设置为test   fields_under_root: true output.logstash:   hosts:   - "xxx"  

logstash配置

values.yaml

# 配置文件 logstashConfig:   logstash.yml: |     # 如果处理的字符中含有\t\n等字符,是不生效的,我们需要开启logstash的字符转义功能,config.support_escapes: true     config.support_escapes: true     http.host: 0.0.0.0     pipeline.ecs_compatibility: v1   pipelines.yml: |     - pipeline.id: base-processing       path.config: "/usr/share/logstash/pipeline/base-processing.conf"     - pipeline.id: syslog-processing       path.config: "/usr/share/logstash/pipeline/syslog-processing.conf"     - pipeline.id: bizlog-processing       path.config: "/usr/share/logstash/pipeline/bizlog-processing.conf"        # 管道内容   base-processing.conf: |     input {        beats{          port => "5055"        }     }      output {       if [fb_collect_type] == "bizlog" {         pipeline {           send_to => bizlogs         }         if [send_kafka] == "true" {           stdout { codec => rubydebug }           pipeline {             send_to => kafkalogs           }         }       }else if [fb_collect_type] == "syslog" {         pipeline {           send_to => syslogs         }       }     }   bizlog-processing.conf: |     input {        pipeline {          address => bizlogs        }     }     filter {         ruby {           code => "event.cancel if (Time.now.to_f - event.get('@timestamp').to_f) > (60 * 60 * 24 * 3)"         }         json {           source => "message"           skip_on_invalid_json => true         }         date {           match => ["business_time","yyyy-MM-dd HH:mm:ss.SSS"]           target => "@timestamp"         }     }       output {         elasticsearch {           hosts => ["elasticsearch-master.business:9200"]           index => "%{fb_collect_app}-%{+YYYY.MM.dd}"           user => elastic           password => "xxx"        }     }    syslog-processing.conf: |     input {        pipeline {          address => syslogs        }     }     filter {         ruby {           code => "event.cancel if (Time.now.to_f - event.get('@timestamp').to_f) > (60 * 60 * 24 * 3)"         }         mutate{           strip => ["message"]           gsub => [ "message", "\r", " " ]           gsub => [ "message", "\t", " " ]           gsub => [ "message", "\n", " " ]           gsub => [ "message", "\u0000", " " ]         }         json {           source => "message"           skip_on_invalid_json => true         }         date {           match => ["timestamp","yyyy-MM-dd HH:mm:ss.SSS"]           target => "@timestamp"         }     }       output {         elasticsearch {           hosts => ["elasticsearch-master.business:9200"]           index => "%{fb_collect_app}-%{+YYYY.MM.dd}"           user => elastic           password => "xxx"        }     }   kafka-processing.conf: |       input {            pipeline {              address => kafkalogs            }         }       output {         stdout { codec => rubydebug }       }  

其中,base-processing.conf为主管道,用来确定接收源的,然后在根据条件,将数据输入到某个pipeline中,

最后pipeline来觉得输出到es那个索引下

最后重启logstash下即可生效

踩坑点

ECS Compatibility mode

部署之后,logstash一直报错:

Relying on default value of pipeline.ecs_compatibility, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode 

这个是因为Logstash 正在使用默认的 ECS(Elastic Common Schema)兼容性模式,这可能在未来的版本中发生变化。为了避免升级时出现意外变化,你可以在 Logstash 的配置文件中显式声明所需的 ECS 兼容性模式。

解决方案:

logstash.yml 文件中添加或修改 pipeline.ecs_compatibility 参数。你可以选择以下几种模式之一:

  • disabled: 不使用 ECS 兼容性模式。
  • v1: 使用 ECS 1.0 兼容性模式。
  • v8: 使用 ECS 8.0 兼容性模式(如果你的 Logstash 版本支持)。

例如,设置 ECS 兼容性模式为 disabled

pipeline.ecs_compatibility: disabled 

或者设置为 ECS 1.0 兼容性模式:

pipeline.ecs_compatibility: v1 

我的logstash版本是7.17.15,选择的是ECS 1.0兼容模式

配置完成后,重启logstash

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!