logstash消费kafka消息并发送es
bin/logstash -f config/logstash.conf自动更新配置启动:./bin/lagstash -f configfile.conf --config.reload.automatic后台启动并自动更新配置利用nohup扔到后台运行。nohup /usr/local/ELk/logstash-5.1.1/bin/logstash -f/usr/lo...
·
bin/logstash -f config/logstash.conf 自动更新配置启动:
./bin/lagstash -f configfile.conf --config.reload.automatic
后台启动并自动更新配置
利用nohup扔到后台运行。 nohup /usr/local/ELk/logstash-5.1.1/bin/logstash -f /usr/local/ELk/logstash-5.1.1/config/conf.d/webnginx.conf >/dev/null &;
在应用Unix/Linux时,我们一般想让某个程序在后台运行,于是我们将常会用 & 在程序结尾来让程序自动运行。 而有些则无法使用&来自动运行 采用项目中的nohup方式存底 nohup 命令 用途:不挂断地运行命令。 语法:nohup Command [ Arg … ] [ & ] 示例: nohup java -jar BiuBiuBiu.jar >output 2>&1 & 解释: 1. 带&的命令行,即使terminal(终端)关闭,或者电脑死机程序依然运行(前提是你把程序递交到服务器上); 2. 2>&1的意思 这个意思是把标准错误(2)重定向到标准输出中(1),而标准输出又导入文件output里面,所以结果是标准错误和标准输出都导入文件output里面了。 至于为什么需要将标准错误重定向到标准输出的原因,那就归结为标准错误没有缓冲区,而stdout有。这就会导致 >output 2>output 文件output被两次打开,而stdout和stderr将会竞争覆盖,这肯定不是我门想要的. 这就是为什么有人会写成: nohup ./command.sh >output 2>output出错的原因了 ================================================================================== 最后谈一下/dev/null文件的作用,这是一个无底洞,任何东西都可以定向到这里,但是却无法打开。 所以一般很大的stdou和stderr当你不关心的时候可以利用stdout和stderr定向到这里 ./start.sh >/dev/null 2>&1 一般不需要查看日志的就直接丢掉丢掉 nohup java -jar BiuBiuBiu.jar >/dev/null 2>&1 &
1、下载解压或docker安装或yum安装 2、配置修改logstash.yml文件 3、配置自定义logstash.conf文件,设置数据的input,filter和output 如:kafka-logstash-es-connect.conf logstash启动 /opt/software/logstash-7.0.1/bin/logstash -f /opt/software/logstash-7.0.1/config/kafka-logstash-es-connect.conf 示例:消费kafka的json消息并把数据转存es input { kafka { bootstrap_servers => ["192.168.1.135:9092,192.168.1.135:9092"] group_id => "dofun-score" auto_offset_reset => "earliest" topics => ["score_statistic"] consumer_threads => 10 codec => json { charset => "UTF-8" } } } filter { # 将message转为json格式 json { source => "message" target => "message" } } output { elasticsearch { hosts => ["192.168.1.135:9200","192.168.1.136:9200"] index => "score-statistics-%{+YYYY.MM}" } } 配置文件logstash.yml # Settings file in YAML # # Settings can be specified either in hierarchical form, e.g.: # # pipeline: # batch: # size: 125 # delay: 5 # # Or as flat keys: # # pipeline.batch.size: 125 # pipeline.batch.delay: 5 # # ------------ Node identity ------------ # # Use a descriptive name for the node: # #设置节点名称,一般写主机名 # node.name: test # # If omitted the node name will default to the machine's host name # # ------------ Data path ------------------ # # Which directory should be used by logstash and its plugins # for any persistent needs. Defaults to LOGSTASH_HOME/data #创建logstash 和插件使用的持久化目录 # path.data: # # ------------ Pipeline Settings -------------- # # The ID of the pipeline. # # pipeline.id: main # # Set the number of workers that will, in parallel, execute the filters+outputs # stage of the pipeline. # # This defaults to the number of the host's CPU cores. # # pipeline.workers: 2 # # How many events to retrieve from inputs before sending to filters+workers # # pipeline.batch.size: 125 # # How long to wait in milliseconds while polling for the next event # before dispatching an undersized batch to filters+outputs # # pipeline.batch.delay: 50 # # Force Logstash to exit during shutdown even if there are still inflight # events in memory. By default, logstash will refuse to quit until all # received events have been pushed to the outputs. # # WARNING: enabling this can lead to data loss during shutdown # # pipeline.unsafe_shutdown: false # # ------------ Pipeline Configuration Settings -------------- # # Where to fetch the pipeline configuration for the main pipeline # # path.config: # # Pipeline configuration string for the main pipeline # # config.string: # # At startup, test if the configuration is valid and exit (dry run) # # config.test_and_exit: false # # Periodically check if the configuration has changed and reload the pipeline # This can also be triggered manually through the SIGHUP signal # #开启配置文件自动加载 config.reload.automatic: true # # How often to check if the pipeline configuration has changed (in seconds) #定义配置文件重载时间周期 # config.reload.interval: 20s # # Show fully compiled configuration as debug log message # NOTE: --log.level must be 'debug' # # config.debug: false # # When enabled, process escaped characters such as \n and \" in strings in the # pipeline configuration files. # # config.support_escapes: false # # ------------ Module Settings --------------- # Define modules here. Modules definitions must be defined as an array. # The simple way to see this is to prepend each `name` with a `-`, and keep # all associated variables under the `name` they are associated with, and # above the next, like this: # # modules: # - name: MODULE_NAME # var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE # var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE # var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE # var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE # # Module variable names must be in the format of # # var.PLUGIN_TYPE.PLUGIN_NAME.KEY # # modules: # # ------------ Cloud Settings --------------- # Define Elastic Cloud settings here. # Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy # and it may have an label prefix e.g. staging:dXMtZ... # This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host' # cloud.id: <identifier> # # Format of cloud.auth is: <user>:<pass> # This is optional # If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password' # If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password' # cloud.auth: elastic:<password> # # ------------ Queuing Settings -------------- # # Internal queuing model, "memory" for legacy in-memory based queuing and # "persisted" for disk-based acked queueing. Defaults is memory # # queue.type: memory # # If using queue.type: persisted, the directory path where the data files will be stored. # Default is path.data/queue # # path.queue: # # If using queue.type: persisted, the page data files size. The queue data consists of # append-only data files separated into pages. Default is 64mb # # queue.page_capacity: 64mb # # If using queue.type: persisted, the maximum number of unread events in the queue. # Default is 0 (unlimited) # # queue.max_events: 0 # # If using queue.type: persisted, the total capacity of the queue in number of bytes. # If you would like more unacked events to be buffered in Logstash, you can increase the # capacity using this setting. Please make sure your disk drive has capacity greater than # the size specified here. If both max_bytes and max_events are specified, Logstash will pick # whichever criteria is reached first # Default is 1024mb or 1gb # # queue.max_bytes: 1024mb # # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint # Default is 1024, 0 for unlimited # # queue.checkpoint.acks: 1024 # # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint # Default is 1024, 0 for unlimited # # queue.checkpoint.writes: 1024 # # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page # Default is 1000, 0 for no periodic checkpoint. # # queue.checkpoint.interval: 1000 # # ------------ Dead-Letter Queue Settings -------------- # Flag to turn on dead-letter queue. # # dead_letter_queue.enable: false # If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries # will be dropped if they would increase the size of the dead letter queue beyond this setting. # Default is 1024mb # dead_letter_queue.max_bytes: 1024mb # If using dead_letter_queue.enable: true, the directory path where the data files will be stored. # Default is path.data/dead_letter_queue # # path.dead_letter_queue: # # ------------ Metrics Settings -------------- # # Bind address for the metrics REST endpoint # #定义访问主机名,一般为域名或IP #http.host: "127.0.0.1" # # Bind port for the metrics REST endpoint, this option also accept a range # (9600-9700) and logstash will pick up the first available ports. # # http.port: 9600-9700 # # If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries # will be dropped if they would increase the size of the dead letter queue beyond this setting. # Default is 1024mb # dead_letter_queue.max_bytes: 1024mb # If using dead_letter_queue.enable: true, the directory path where the data files will be stored. # Default is path.data/dead_letter_queue # # path.dead_letter_queue: # # ------------ Metrics Settings -------------- # # Bind address for the metrics REST endpoint # # http.host: "127.0.0.1" # # Bind port for the metrics REST endpoint, this option also accept a range # (9600-9700) and logstash will pick up the first available ports. # # http.port: 9600-9700 # # ------------ Debugging Settings -------------- # # Options for log.level: # * fatal # * error # * warn # * info (default) # * debug # * trace # # log.level: info # path.logs: # # ------------ Other Settings -------------- # # Where to find custom plugins # path.plugins: [] # # ------------ X-Pack Settings (not applicable for OSS build)-------------- # # X-Pack Monitoring # https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html #xpack.monitoring.enabled: false #xpack.monitoring.elasticsearch.username: logstash_system #xpack.monitoring.elasticsearch.password: password #xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"] #xpack.monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ] #xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file #xpack.monitoring.elasticsearch.ssl.truststore.password: password #xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file #xpack.monitoring.elasticsearch.ssl.keystore.password: password #xpack.monitoring.elasticsearch.ssl.verification_mode: certificate #xpack.monitoring.elasticsearch.sniffing: false #xpack.monitoring.collection.interval: 10s #xpack.monitoring.collection.pipeline.details.enabled: true # # X-Pack Management # https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html #xpack.management.enabled: false #xpack.management.pipeline.id: ["main", "apache_logs"] #xpack.management.elasticsearch.username: logstash_admin_user #xpack.management.elasticsearch.password: password #xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"] #xpack.management.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ] #xpack.management.elasticsearch.ssl.truststore.path: /path/to/file #xpack.management.elasticsearch.ssl.truststore.password: password #xpack.management.elasticsearch.ssl.keystore.path: /path/to/file #xpack.management.elasticsearch.ssl.keystore.password: password #xpack.management.elasticsearch.ssl.verification_mode: certificate #xpack.management.elasticsearch.sniffing: false #xpack.management.logstash.poll_interval: 5s ############# logstash 消费kafka input { kafka { bootstrap_servers => ["172.18.215.23:9092,172.18.215.24:9092"] group_id => "dofun-connect" auto_offset_reset => "earliest" topics => ["dofun-cloud-connect"] consumer_threads => 10 codec => "json" } } filter { if [ipaddress] { if [ipaddress] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." { geoip { source => "ipaddress" fields => ["country_name","region_name","city_name","location"] } } } date { match => ["ts","UNIX"] } ruby { code => "event.set('timetemp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timetemp'))" } mutate { remove_field => ["timetemp"] remove_field => ["@version"] remove_field => ["proto_name"] remove_field => ["keepalive"] remove_field => ["proto_ver"] remove_field => ["connack"] remove_field => ["clean_start"] } } output { elasticsearch { hosts => ["172.18.215.18:9200","172.18.215.19:9200","172.18.215.20:9200"] index => "car-connect-%{+YYYY.MM.dd}" manage_template => true template_overwrite => true template_name => "car_connect_template" template => "/opt/software/logstash-7.0.1/config/carconnecttemplate.json" } } *********************************************** input { kafka { bootstrap_servers => ["192.168.1.135:9092,192.168.1.135:9092"] group_id => "dofun-score" auto_offset_reset => "earliest" topics => ["score_statistic"] consumer_threads => 10 codec => "json" } } #filter { #} date { match => ["ts","UNIX"] } ruby { code => "event.set('timetemp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timetemp'))" } output { elasticsearch { hosts => ["192.168.1.135:9200","192.168.1.135:9200"] index => "score-statistics-%{+YYYY.MM}" } } 1、下载解压或docker安装或yum安装 2、配置修改logstash.yml文件 3、配置自定义logstash.conf文件,设置数据的input,filter和output 如:kafka-logstash-es-connect.conf 示例:消费kafka的json消息并把数据转存es input { kafka { bootstrap_servers => ["192.168.1.135:9092,192.168.1.135:9092"] group_id => "dofun-score" auto_offset_reset => "earliest" topics => ["score_statistic"] consumer_threads => 10 codec => json { charset => "UTF-8" } } } filter { # 将message转为json格式 json { source => "message" target => "message" } } output { elasticsearch { hosts => ["192.168.1.135:9200","192.168.1.136:9200"] index => "score-statistics-%{+YYYY.MM}" } } 配置文件logstash.yml # Settings file in YAML # # Settings can be specified either in hierarchical form, e.g.: # # pipeline: # batch: # size: 125 # delay: 5 # # Or as flat keys: # # pipeline.batch.size: 125 # pipeline.batch.delay: 5 # # ------------ Node identity ------------ # # Use a descriptive name for the node: # #设置节点名称,一般写主机名 # node.name: test # # If omitted the node name will default to the machine's host name # # ------------ Data path ------------------ # # Which directory should be used by logstash and its plugins # for any persistent needs. Defaults to LOGSTASH_HOME/data #创建logstash 和插件使用的持久化目录 # path.data: # # ------------ Pipeline Settings -------------- # # The ID of the pipeline. # # pipeline.id: main # # Set the number of workers that will, in parallel, execute the filters+outputs # stage of the pipeline. # # This defaults to the number of the host's CPU cores. # # pipeline.workers: 2 # # How many events to retrieve from inputs before sending to filters+workers # # pipeline.batch.size: 125 # # How long to wait in milliseconds while polling for the next event # before dispatching an undersized batch to filters+outputs # # pipeline.batch.delay: 50 # # Force Logstash to exit during shutdown even if there are still inflight # events in memory. By default, logstash will refuse to quit until all # received events have been pushed to the outputs. # # WARNING: enabling this can lead to data loss during shutdown # # pipeline.unsafe_shutdown: false # # ------------ Pipeline Configuration Settings -------------- # # Where to fetch the pipeline configuration for the main pipeline # # path.config: # # Pipeline configuration string for the main pipeline # # config.string: # # At startup, test if the configuration is valid and exit (dry run) # # config.test_and_exit: false # # Periodically check if the configuration has changed and reload the pipeline # This can also be triggered manually through the SIGHUP signal # #开启配置文件自动加载 config.reload.automatic: true # # How often to check if the pipeline configuration has changed (in seconds) #定义配置文件重载时间周期 # config.reload.interval: 20s # # Show fully compiled configuration as debug log message # NOTE: --log.level must be 'debug' # # config.debug: false # # When enabled, process escaped characters such as \n and \" in strings in the # pipeline configuration files. # # config.support_escapes: false # # ------------ Module Settings --------------- # Define modules here. Modules definitions must be defined as an array. # The simple way to see this is to prepend each `name` with a `-`, and keep # all associated variables under the `name` they are associated with, and # above the next, like this: # # modules: # - name: MODULE_NAME # var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE # var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE # var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE # var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE # # Module variable names must be in the format of # # var.PLUGIN_TYPE.PLUGIN_NAME.KEY # # modules: # # ------------ Cloud Settings --------------- # Define Elastic Cloud settings here. # Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy # and it may have an label prefix e.g. staging:dXMtZ... # This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host' # cloud.id: <identifier> # # Format of cloud.auth is: <user>:<pass> # This is optional # If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password' # If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password' # cloud.auth: elastic:<password> # # ------------ Queuing Settings -------------- # # Internal queuing model, "memory" for legacy in-memory based queuing and # "persisted" for disk-based acked queueing. Defaults is memory # # queue.type: memory # # If using queue.type: persisted, the directory path where the data files will be stored. # Default is path.data/queue # # path.queue: # # If using queue.type: persisted, the page data files size. The queue data consists of # append-only data files separated into pages. Default is 64mb # # queue.page_capacity: 64mb # # If using queue.type: persisted, the maximum number of unread events in the queue. # Default is 0 (unlimited) # # queue.max_events: 0 # # If using queue.type: persisted, the total capacity of the queue in number of bytes. # If you would like more unacked events to be buffered in Logstash, you can increase the # capacity using this setting. Please make sure your disk drive has capacity greater than # the size specified here. If both max_bytes and max_events are specified, Logstash will pick # whichever criteria is reached first # Default is 1024mb or 1gb # # queue.max_bytes: 1024mb # # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint # Default is 1024, 0 for unlimited # # queue.checkpoint.acks: 1024 # # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint # Default is 1024, 0 for unlimited # # queue.checkpoint.writes: 1024 # # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page # Default is 1000, 0 for no periodic checkpoint. # # queue.checkpoint.interval: 1000 # # ------------ Dead-Letter Queue Settings -------------- # Flag to turn on dead-letter queue. # # dead_letter_queue.enable: false # If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries # will be dropped if they would increase the size of the dead letter queue beyond this setting. # Default is 1024mb # dead_letter_queue.max_bytes: 1024mb # If using dead_letter_queue.enable: true, the directory path where the data files will be stored. # Default is path.data/dead_letter_queue # # path.dead_letter_queue: # # ------------ Metrics Settings -------------- # # Bind address for the metrics REST endpoint # #定义访问主机名,一般为域名或IP #http.host: "127.0.0.1" # # Bind port for the metrics REST endpoint, this option also accept a range # (9600-9700) and logstash will pick up the first available ports. # # http.port: 9600-9700 # # If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries # will be dropped if they would increase the size of the dead letter queue beyond this setting. # Default is 1024mb # dead_letter_queue.max_bytes: 1024mb # If using dead_letter_queue.enable: true, the directory path where the data files will be stored. # Default is path.data/dead_letter_queue # # path.dead_letter_queue: # # ------------ Metrics Settings -------------- # # Bind address for the metrics REST endpoint # # http.host: "127.0.0.1" # # Bind port for the metrics REST endpoint, this option also accept a range # (9600-9700) and logstash will pick up the first available ports. # # http.port: 9600-9700 # # ------------ Debugging Settings -------------- # # Options for log.level: # * fatal # * error # * warn # * info (default) # * debug # * trace # # log.level: info # path.logs: # # ------------ Other Settings -------------- # # Where to find custom plugins # path.plugins: [] # # ------------ X-Pack Settings (not applicable for OSS build)-------------- # # X-Pack Monitoring # https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html #xpack.monitoring.enabled: false #xpack.monitoring.elasticsearch.username: logstash_system #xpack.monitoring.elasticsearch.password: password #xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"] #xpack.monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ] #xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file #xpack.monitoring.elasticsearch.ssl.truststore.password: password #xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file #xpack.monitoring.elasticsearch.ssl.keystore.password: password #xpack.monitoring.elasticsearch.ssl.verification_mode: certificate #xpack.monitoring.elasticsearch.sniffing: false #xpack.monitoring.collection.interval: 10s #xpack.monitoring.collection.pipeline.details.enabled: true # # X-Pack Management # https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html #xpack.management.enabled: false #xpack.management.pipeline.id: ["main", "apache_logs"] #xpack.management.elasticsearch.username: logstash_admin_user #xpack.management.elasticsearch.password: password #xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"] #xpack.management.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ] #xpack.management.elasticsearch.ssl.truststore.path: /path/to/file #xpack.management.elasticsearch.ssl.truststore.password: password #xpack.management.elasticsearch.ssl.keystore.path: /path/to/file #xpack.management.elasticsearch.ssl.keystore.password: password #xpack.management.elasticsearch.ssl.verification_mode: certificate #xpack.management.elasticsearch.sniffing: false #xpack.management.logstash.poll_interval: 5s ############# logstash 消费kafka input { kafka { bootstrap_servers => ["172.18.215.23:9092,172.18.215.24:9092"] group_id => "dofun-connect" auto_offset_reset => "earliest" topics => ["dofun-cloud-connect"] consumer_threads => 10 codec => "json" } } filter { if [ipaddress] { if [ipaddress] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." { geoip { source => "ipaddress" fields => ["country_name","region_name","city_name","location"] } } } date { match => ["ts","UNIX"] } ruby { code => "event.set('timetemp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timetemp'))" } mutate { remove_field => ["timetemp"] remove_field => ["@version"] remove_field => ["proto_name"] remove_field => ["keepalive"] remove_field => ["proto_ver"] remove_field => ["connack"] remove_field => ["clean_start"] } } output { elasticsearch { hosts => ["172.18.215.18:9200","172.18.215.19:9200","172.18.215.20:9200"] index => "car-connect-%{+YYYY.MM.dd}" manage_template => true template_overwrite => true template_name => "car_connect_template" template => "/opt/software/logstash-7.0.1/config/carconnecttemplate.json" } } *********************************************** input { kafka { bootstrap_servers => ["192.168.1.135:9092,192.168.1.135:9092"] group_id => "dofun-score" auto_offset_reset => "earliest" topics => ["score_statistic"] consumer_threads => 10 codec => "json" } } #filter { #} date { match => ["ts","UNIX"] } ruby { code => "event.set('timetemp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timetemp'))" } output { elasticsearch { hosts => ["192.168.1.135:9200","192.168.1.135:9200"] index => "score-statistics-%{+YYYY.MM}" } } *************** 生产环境配置 生产环境配置 input { kafka { bootstrap_servers => ["172.18.215.23:9092,172.18.215.24:9092"] group_id => "dofun-connect" auto_offset_reset => "earliest" topics => ["dofun-cloud-connect"] consumer_threads => 10 codec => "json" } } filter { if [ipaddress] { if [ipaddress] !~"^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." { geoip { source => "ipaddress" fields => ["country_name", "region_name", "city_name", "location"] } } } date { match => ["ts", "UNIX"] } ruby { code => "event.set('timetemp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timetemp'))" } mutate { remove_field => ["timetemp"] remove_field => ["@version"] remove_field => ["proto_name"] remove_field => ["keepalive"] remove_field => ["proto_ver"] remove_field => ["connack"] remove_field => ["clean_start"] } } output { elasticsearch { hosts => ["172.18.215.18:9200", "172.18.215.19:9200", "172.18.215.20:9200"] index => "car-connect-%{+YYYY.MM.dd}" manage_template => true template_overwrite => true template_name => "car_connect_template" template => "/opt/software/logstash-7.0.1/config/carconnecttemplate.json" } } *********************************** 测试环境下配置 input { kafka { bootstrap_servers => ["192.168.1.135:9092,192.168.1.135:9092"] group_id => "dofun-score" auto_offset_reset => "earliest" topics => ["score_statistic"] consumer_threads => 10 codec => json { charset => "UTF-8" } } } filter { json { source => "message" target => "message" } date { match => ["ts", "UNIX"] } ##将logstash的@timestamp时间更正为东八区时间再写入ES ruby { code => "event.set('temptime', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('temptime'))" } mutate { remove_field => ["temptime"] remove_field => ["temptimestamp"] remove_field => ["@version"] } } output { elasticsearch { hosts => ["192.168.1.135:9200","192.168.1.136:9200"] index => "score-statistics-%{+YYYY.MM}" } } ## 将时间戳转换为 时间再写入ES ruby { code => "event.set('temptimestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('temptimestamp'))" }
更多推荐
已为社区贡献3条内容
所有评论(0)