Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fluentd (td-agent) 阿里云部署 #78

Open
moooofly opened this issue Mar 13, 2019 · 1 comment
Open

fluentd (td-agent) 阿里云部署 #78

moooofly opened this issue Mar 13, 2019 · 1 comment

Comments

@moooofly
Copy link
Owner

moooofly commented Mar 13, 2019

大部分内容同 #65 (comment) ,这里将步骤进行简化处理


前置条件

  • 确保 NTP 时间准确性
  • 增加最大可用文件句柄数量
  • 优化网路相关的内核参数

Please follow the Preinstallation Guide to configure your OS properly. This will prevent many unnecessary problems.

安装

针对 Ubuntu 18.04 的安装命令

安装 td-agent

curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-bionic-td-agent3.sh | sh

安装 http output 插件

td-agent-gem install fluent-plugin-out-http

配置调整

/lib/systemd/system/td-agent.service

root@proxy-beijing:/etc/td-agent# cat /lib/systemd/system/td-agent.service
[Unit]
Description=td-agent: Fluentd based data collector for Treasure Data
Documentation=https://docs.treasuredata.com/articles/td-agent
After=network-online.target
Wants=network-online.target

[Service]
User=root  -- 1
Group=root  -- 2
LimitNOFILE=65536
Environment=LD_PRELOAD=/opt/td-agent/embedded/lib/libjemalloc.so
Environment=GEM_HOME=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=GEM_PATH=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=FLUENT_CONF=/etc/td-agent/td-agent.conf
Environment=FLUENT_PLUGIN=/etc/td-agent/plugin
Environment=FLUENT_SOCKET=/var/run/td-agent/td-agent.sock
Environment=TD_AGENT_OPTIONS=
PIDFile=/var/run/td-agent/td-agent.pid
RuntimeDirectory=td-agent
Type=forking
ExecStart=/opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent.pid $TD_AGENT_OPTIONS
ExecStop=/bin/kill -TERM ${MAINPID}
ExecReload=/bin/kill -HUP ${MAINPID}
Restart=always
TimeoutStopSec=120

[Install]
WantedBy=multi-user.target

td-agent.conf

root@proxy-beijing:/etc/td-agent# cat td-agent.conf
<source>
  @type tail
  path /opt/apps/proxy.log
  pos_file /opt/apps/proxy.log.pos
  tag aliyun.proxy.beijing
  <parse>
    @type json
  </parse>
</source>

<filter aliyun.**>
  @type record_transformer
  <record>
    _hostname "#{Socket.gethostname}"
    _source ${tag}
  </record>
</filter>

<match aliyun.**>
  @type http
  endpoint_url    http://dev-kafka-front.thellsapi.com/front/logs-tmp
  http_method     post   # default: post
  serializer      json   # default: form
  rate_limit_msec 0      # default: 0 = no rate limiting
  raise_on_error  false  # default: true
  authentication  none   # default: none
  username        ''     # default: ''
  password        ''     # default: '', secret: true
  buffered        false  # default: false. Switch non-buffered/buffered mode
  cacert_file     ''     # default: ''
  token           ''     # default: ''
  custom_headers  ''     # default: nil
</match>

<match aliyun.**>
  @type stdout
  @id output_stdout
</match>
root@proxy-beijing:/etc/td-agent#

其他

systemctl status td-agent.service
systemctl restart td-agent.service
tail -f /var/log/td-agent/td-agent.log
@moooofly moooofly added the ELK label Mar 13, 2019
@moooofly moooofly changed the title fluentd (td-agent) 阿里云环境部署 fluentd (td-agent) 阿里云部署 Mar 15, 2019
@moooofly moooofly added 日志系统 and removed ELK labels Mar 15, 2019
@moooofly
Copy link
Owner Author

moooofly commented Apr 4, 2019

排查 fluent/fluent-plugin-kafka 插件的 bug

背景

  • 业务需要把用户操作的日志以 protobuf 格式发送到对应的 topic 中(russell__user_operation_history)为数据方提供服务,使用的是 github.com/Shopify/sarama 开源库。
  • 业务方的设计是系统通过 russell__user_operation_history 这个 topic 将日志提供给数据分析的一方,而不是进入 elk 系统;
  • fluentd 负载采集 container 中业务输出到 stdout 上的日志,并见将其打入到 logs.k8s 这个 topic 上,最终 log 会进入 k8s elk 系统;
  • 通过 kafka 的调试工具(其实就是一个实现了 kafka consumer 的客户端)查看(直接从 kafka 上进行消息 dump),发现 russell__user_operation_history 这个 topic 上混入了(可以消费到)Grpc 的 log ,而这些 log 本该是打到 ELK 的;

问题根本原因

fluentd 使用的 kafka plugin 插件有坑,当 json 格式的消息体中包含有 topic 字段时,会覆盖 default_topic 设置的值;

细节

用于收集 k8s 容器日志的 fluentd 配置

# This configuration file for Fluentd / td-agent is used
# to watch changes to Docker log files. The kubelet creates symlinks that
# capture the pod name, namespace, container name & Docker container ID
# to the docker logs for pods in the /var/log/containers directory on the host.
# If running this fluentd configuration in a Docker container, the /var/log
# directory should be mounted in the container.
#
# These logs are then submitted to Elasticsearch which assumes the
# installation of the fluent-plugin-elasticsearch & the
# fluent-plugin-kubernetes_metadata_filter plugins.
# See https://github.com/uken/fluent-plugin-elasticsearch &
# https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter for
# more information about the plugins.
# Maintainer: Jimmi Dyson <[email protected]>
#
# Example
# =======
# A line in the Docker log file might look like this JSON:
#
# {"log":"2014/09/25 21:15:03 Got request with path wombat\n",
#  "stream":"stderr",
#   "time":"2014-09-25T21:15:03.499185026Z"}
#
# The time_format specification below makes sure we properly
# parse the time format produced by Docker. This will be
# submitted to Elasticsearch and should appear like:
# $ curl 'http://elasticsearch-logging:9200/_search?pretty'
# ...
# {
#      "_index" : "logstash-2014.09.25",
#      "_type" : "fluentd",
#      "_id" : "VBrbor2QTuGpsQyTCdfzqA",
#      "_score" : 1.0,
#      "_source":{"log":"2014/09/25 22:45:50 Got request with path wombat\n",
#                 "stream":"stderr","tag":"docker.container.all",
#                 "@timestamp":"2014-09-25T22:45:50+00:00"}
#    },
# ...
#
# The Kubernetes fluentd plugin is used to write the Kubernetes metadata to the log
# record & add labels to the log record if properly configured. This enables users
# to filter & search logs on any metadata.
# For example a Docker container's logs might be in the directory:
#
#  /var/lib/docker/containers/997599971ee6366d4a5920d25b79286ad45ff37a74494f262e3bc98d909d0a7b
#
# and in the file:
#
#  997599971ee6366d4a5920d25b79286ad45ff37a74494f262e3bc98d909d0a7b-json.log
#
# where 997599971ee6... is the Docker ID of the running container.
# The Kubernetes kubelet makes a symbolic link to this file on the host machine
# in the /var/log/containers directory which includes the pod name and the Kubernetes
# container name:
#
#    synthetic-logger-0.25lps-pod_default_synth-lgr-997599971ee6366d4a5920d25b79286ad45ff37a74494f262e3bc98d909d0a7b.log 
#    ->
#    /var/lib/docker/containers/997599971ee6366d4a5920d25b79286ad45ff37a74494f262e3bc98d909d0a7b/997599971ee6366d4a5920d25b79286ad45ff37a74494f262e3bc98d909d0a7b-json.log
#
# The /var/log directory on the host is mapped to the /var/log directory in the container
# running this instance of Fluentd and we end up collecting the file:
#
#   /var/log/containers/synthetic-logger-0.25lps-pod_default_synth-lgr-997599971ee6366d4a5920d25b79286ad45ff37a74494f262e3bc98d909d0a7b.log
#
# This results in the tag:
#
#  var.log.containers.synthetic-logger-0.25lps-pod_default_synth-lgr-997599971ee6366d4a5920d25b79286ad45ff37a74494f262e3bc98d909d0a7b.log
#
# The Kubernetes fluentd plugin is used to extract the namespace, pod name & container name
# which are added to the log message as a kubernetes field object & the Docker container ID
# is also added under the docker field object.
# The final tag is:
#
#   kubernetes.var.log.containers.synthetic-logger-0.25lps-pod_default_synth-lgr-997599971ee6366d4a5920d25b79286ad45ff37a74494f262e3bc98d909d0a7b.log
#
# And the final log record look like:
#
# {
#   "log":"2014/09/25 21:15:03 Got request with path wombat\n",
#   "stream":"stderr",
#   "time":"2014-09-25T21:15:03.499185026Z",
#   "kubernetes": {
#     "namespace": "default",
#     "pod_name": "synthetic-logger-0.25lps-pod",
#     "container_name": "synth-lgr"
#   },
#   "docker": {
#     "container_id": "997599971ee6366d4a5920d25b79286ad45ff37a74494f262e3bc98d909d0a7b"
#   }
# }
#
# This makes it easier for users to search for logs by pod name or by
# the name of the Kubernetes container regardless of how many times the
# Kubernetes pod has been restarted (resulting in a several Docker container IDs).
#
# TODO: Propagate the labels associated with a container along with its logs
# so users can query logs using labels as well as or instead of the pod name
# and container name. This is simply done via configuration of the Kubernetes
# fluentd plugin but requires secrets to be enabled in the fluent pod. This is a
# problem yet to be solved as secrets are not usable in static pods which the fluentd
# pod must be until a per-node controller is available in Kubernetes.


# Example:
# {"log":"[info:2016-02-16T16:04:05.930-08:00] Some log text here\n","stream":"stdout","time":"2016-02-17T00:04:05.931087621Z"}
<source>
  @type tail
  path /var/log/containers/*.log
  pos_file /var/log/es-containers.log.pos
  time_format %Y-%m-%dT%H:%M:%S.%NZ
  keep_time_key
  tag kubernetes.*
  format json
  read_from_head true
</source>

# add kubernetes meta to log
<filter kubernetes.**>
  @type kubernetes_metadata
</filter>

# Example:
# 2015-12-21 23:17:22,066 [salt.state       ][INFO    ] Completed state [net.ipv4.ip_forward] at time 23:17:22.066081
<source>
  @type tail
  format /^(?<time>[^ ]* [^ ,]*)[^\[]*\[[^\]]*\]\[(?<severity>[^ \]]*) *\] (?<message>.*)$/
  time_format %Y-%m-%d %H:%M:%S
  path /var/log/salt/minion
  pos_file /var/log/es-salt.pos
  tag salt
</source>

# Example:
# Dec 21 23:17:22 gke-foo-1-1-4b5cbd14-node-4eoj startupscript: Finished running startup script /var/run/google.startup.script
<source>
  @type tail
  format syslog
  path /var/log/startupscript.log
  pos_file /var/log/es-startupscript.log.pos
  tag startupscript
</source>

# Examples:
# time="2016-02-04T06:51:03.053580605Z" level=info msg="GET /containers/json"
# time="2016-02-04T07:53:57.505612354Z" level=error msg="HTTP Error" err="No such image: -f" statusCode=404
<source>
  @type tail
  format /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/
  time_format %Y-%m-%dT%H:%M:%S.%NZ
  path /var/log/docker.log
  pos_file /var/log/es-docker.log.pos
  tag docker
</source>

# Example:
# 2016/02/04 06:52:38 filePurge: successfully removed file /var/etcd/data/member/wal/00000000000006d0-00000000010a23d1.wal
<source>
  @type tail
  # Not parsing this, because it doesn't have anything particularly useful to
  # parse out of it (like severities).
  format none
  path /var/log/etcd.log
  pos_file /var/log/es-etcd.log.pos
  tag etcd
</source>

# Multi-line parsing is required for all the kube logs because very large log
# statements, such as those that include entire object bodies, get split into
# multiple lines by glog.

# Example:
# I0204 07:32:30.020537    3368 server.go:1048] POST /stats/container/: (13.972191ms) 200 [[Go-http-client/1.1] 10.244.1.3:40537]
<source>
  @type tail
  format multiline
  format_firstline /^\w\d{4}/
  format1 /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/
  time_format %m%d %H:%M:%S.%N
  path /var/log/kubelet.log
  pos_file /var/log/es-kubelet.log.pos
  tag kubelet
</source>

# Example:
# I0204 07:00:19.604280       5 handlers.go:131] GET /api/v1/nodes: (1.624207ms) 200 [[kube-controller-manager/v1.1.3 (linux/amd64) kubernetes/6a81b50] 127.0.0.1:38266]
<source>
  @type tail
  format multiline
  format_firstline /^\w\d{4}/
  format1 /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/
  time_format %m%d %H:%M:%S.%N
  path /var/log/kube-apiserver.log
  pos_file /var/log/es-kube-apiserver.log.pos
  tag kube-apiserver
</source>

# Example:
# I0204 06:55:31.872680       5 servicecontroller.go:277] LB already exists and doesn't need update for service kube-system/kube-ui
<source>
  @type tail
  format multiline
  format_firstline /^\w\d{4}/
  format1 /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/
  time_format %m%d %H:%M:%S.%N
  path /var/log/kube-controller-manager.log
  pos_file /var/log/es-kube-controller-manager.log.pos
  tag kube-controller-manager
</source>

# Example:
# W0204 06:49:18.239674       7 reflector.go:245] pkg/scheduler/factory/factory.go:193: watch of *api.Service ended with: 401: The event in requested index is outdated and cleared (the requested history has been cleared [2578313/2577886]) [2579312]
<source>
  @type tail
  format multiline
  format_firstline /^\w\d{4}/
  format1 /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/
  time_format %m%d %H:%M:%S.%N
  path /var/log/kube-scheduler.log
  pos_file /var/log/es-kube-scheduler.log.pos
  tag kube-scheduler
</source>

# Example:
# I0603 15:31:05.793605       6 cluster_manager.go:230] Reading config from path /etc/gce.conf
<source>
  @type tail
  format multiline
  multiline_flush_interval 5s
  format_firstline /^\w\d{4}/
  format1 /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/
  time_format %m%d %H:%M:%S.%N
  path /var/log/glbc.log
  pos_file /var/log/es-glbc.log.pos
  tag glbc
</source>

# Example:
# I0603 15:31:05.793605       6 cluster_manager.go:230] Reading config from path /etc/gce.conf
<source>
  @type tail
  format multiline
  multiline_flush_interval 5s
  format_firstline /^\w\d{4}/
  format1 /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/
  time_format %m%d %H:%M:%S.%N
  path /var/log/cluster-autoscaler.log
  pos_file /var/log/es-cluster-autoscaler.log.pos
  tag cluster-autoscaler
</source>

<match kubernetes.**>
  @type rewrite_tag_filter
  <rule>
    # if teelog pod, change tag to "teelog" for latter process
    key    $.kubernetes.pod_name
    pattern teelog.*
    tag teelog
  </rule>
  <rule>
    # if fluentd pod, change tag to "fluentd" for latter process
    key    $.kubernetes.pod_name
    pattern fluentd.*
    tag fluentd
  </rule>
  <rule>
    # for other pods, we need retag, or else it will be dropped
    key    $.kubernetes.pod_name
    pattern .+
    tag others
  </rule>
</match>

<match teelog>
  @type kafka_buffered

  # Brokers: you can choose either brokers or zookeeper.
  brokers     10.1.8.95:9092,10.1.15.25:9092,10.1.3.161:9092 # Set brokers directly
  # send client logs to special kafka topic
  default_topic   logs.client
  kafka_agg_max_bytes 819200
  max_send_limit_bytes 1000000

  <buffer>
    @type file
    path /var/log/fluentd/buffer-teelog
  </buffer>
</match>

# ignore fluentd generated logs
<match fluentd>
  @type null
</match>

<match **>
  @type kafka_buffered

  # Brokers: you can choose either brokers or zookeeper.
  brokers     10.1.8.95:9092,10.1.15.25:9092,10.1.3.161:9092 # Set brokers directly
  # send service logs to default kafka topic
  default_topic   logs.k8s
  kafka_agg_max_bytes 819200
  max_send_limit_bytes 1000000

  <buffer>
    @type file
    path /var/log/fluentd/buffer-k8s
  </buffer>
</match>

和问题直接相关的是

<match **>
  @type kafka_buffered

  # Brokers: you can choose either brokers or zookeeper.
  brokers     10.1.8.95:9092,10.1.15.25:9092,10.1.3.161:9092 # Set brokers directly
  # send service logs to default kafka topic
  default_topic   logs.k8s
  kafka_agg_max_bytes 819200
  max_send_limit_bytes 1000000

  <buffer>
    @type file
    path /var/log/fluentd/buffer-k8s
  </buffer>
</match>

其中看到 default_topic logs.k8s,查看文档,并没有找到什么特别说明的内容;


问题验证:

  • 创建 kafka 集群;
  • 创建两个 kafka consumer 分别用于消费两个测试用 topic :tp_default 和 tp_custom
  • 安装 fluentd 和 fluent-plugin-kafka 插件,并设置其 default_topic 为 tp_default ;
  • 向 fluentd 所采集的日志文件中写入包含和不包含 topic: "tp_custom" 的消息,并观察该消息最终被 kafka consumer 从哪个 topic 中消费得到

安装插件

td-agent-gem install fluent-plugin-kafka

创建对应的 topic

echo "custom" | kafkacat -P -b 47.98.126.155:32776,47.98.126.155:32777,47.98.126.155:32778 -t tp_custom
echo "default" | kafkacat -P -b 47.98.126.155:32776,47.98.126.155:32777,47.98.126.155:32778 -t tp_default

确认 topic 已经存在

root@proxy-hangzhou:~# kafkacat -L -b 47.98.126.155:32776,47.98.126.155:32777,47.98.126.155:32778
Metadata for all topics (from broker 1003: 47.98.126.155:32778/1003):
 3 brokers:
  broker 1001 at 47.98.126.155:32776
  broker 1003 at 47.98.126.155:32778
  broker 1002 at 47.98.126.155:32777
 5 topics:
  topic "tp_custom" with 1 partitions:
    partition 0, leader 1002, replicas: 1002, isrs: 1002
  topic "beats" with 1 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
  topic "tp_default" with 1 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
  topic "test_topic" with 1 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
  topic "test_a" with 1 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
root@proxy-hangzhou:~#

创建消费者

kafkacat -C -b 47.98.126.155:32776 -t tp_custom
kafkacat -C -b 47.98.126.155:32776 -t tp_default

调整 td-agent 的配置文件

root@proxy-hangzhou:/opt/apps# cat /etc/td-agent/td-agent.conf
<source>
  @type tail
  path /opt/apps/kafka_test.log
  pos_file /opt/apps/kafka_test.log.pos
  tag kafka.test
  <parse>
    @type json
  </parse>
</source>

<filter kafka.test>
  @type record_transformer
  <record>
    _hostname "#{Socket.gethostname}"
    _source ${tag}
  </record>
</filter>

<match kafka.test>
  @type kafka_buffered

  brokers 47.98.126.155:32776,47.98.126.155:32777,47.98.126.155:32778
  default_topic tp_default
  kafka_agg_max_messages 1
</match>
root@proxy-hangzhou:/opt/apps#

重启 td-agent 服务

root@proxy-hangzhou:/opt/apps# systemctl stop td-agent.service
root@proxy-hangzhou:/opt/apps# systemctl start td-agent.service
root@proxy-hangzhou:/opt/apps# systemctl status td-agent.service
● td-agent.service - td-agent: Fluentd based data collector for Treasure Data
   Loaded: loaded (/lib/systemd/system/td-agent.service; disabled; vendor preset: enabled)
   Active: active (running) since Thu 2019-04-04 16:20:49 CST; 2s ago
     Docs: https://docs.treasuredata.com/articles/td-agent
  Process: 31708 ExecStart=/opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent
 Main PID: 31722 (fluentd)
    Tasks: 9 (limit: 4915)
   CGroup: /system.slice/td-agent.service
           ├─31722 /opt/td-agent/embedded/bin/ruby /opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /v
           └─31730 /opt/td-agent/embedded/bin/ruby -Eascii-8bit:ascii-8bit /opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/

Apr 04 16:20:49 proxy-hangzhou systemd[1]: Starting td-agent: Fluentd based data collector for Treasure Data...
Apr 04 16:20:49 proxy-hangzhou systemd[1]: Started td-agent: Fluentd based data collector for Treasure Data.
root@proxy-hangzhou:/opt/apps#
root@proxy-hangzhou:/opt/apps#
root@proxy-hangzhou:/opt/apps# tail -f /var/log/td-agent/td-agent.log
2019-04-04 16:20:49 +0800 [info]: gem 'fluent-plugin-webhdfs' version '1.2.3'
2019-04-04 16:20:49 +0800 [info]: gem 'fluentd' version '1.3.3'
2019-04-04 16:20:49 +0800 [info]: adding filter pattern="kafka.test" type="record_transformer"
2019-04-04 16:20:49 +0800 [info]: adding match pattern="kafka.test" type="kafka_buffered"
2019-04-04 16:20:49 +0800 [info]: #0 brokers has been set directly: ["47.98.126.155:32776", "47.98.126.155:32777", "47.98.126.155:32778"]
2019-04-04 16:20:49 +0800 [info]: adding source type="tail"
2019-04-04 16:20:49 +0800 [info]: #0 starting fluentd worker pid=31730 ppid=31722 worker=0
2019-04-04 16:20:49 +0800 [info]: #0 initialized kafka producer: kafka
2019-04-04 16:20:49 +0800 [info]: #0 following tail of /opt/apps/kafka_test.log
2019-04-04 16:20:49 +0800 [info]: #0 fluentd worker is now running worker=0


向 fluentd 采集文件中写入日志

root@proxy-hangzhou:/opt/apps# echo '{"message":"This is a test!", "topic":"tp_custom"}' >> kafka_test.log
root@proxy-hangzhou:/opt/apps# echo '{"message":"This is a test!", "topic":"tp_custom"}' >> kafka_test.log
root@proxy-hangzhou:/opt/apps# echo '{"message":"This is a test!", "topic":"tp_custom"}' >> kafka_test.log
root@proxy-hangzhou:/opt/apps#
root@proxy-hangzhou:/opt/apps# echo '{"message":"This is a test!"}' >> kafka_test.log
root@proxy-hangzhou:/opt/apps# echo '{"message":"This is a test!"}' >> kafka_test.log
root@proxy-hangzhou:/opt/apps#

观察两个 consumer 的消费情况

root@proxy-hangzhou:~# kafkacat -C -b 47.98.126.155:32776 -t tp_custom
...
{"message":"This is a test!","topic":"tp_custom","_hostname":"proxy-hangzhou","_source":"kafka.test"}
{"message":"This is a test!","topic":"tp_custom","_hostname":"proxy-hangzhou","_source":"kafka.test"}
{"message":"This is a test!","topic":"tp_custom","_hostname":"proxy-hangzhou","_source":"kafka.test"}
{"message":"This is a test!","topic":"tp_custom","_hostname":"proxy-hangzhou","_source":"kafka.test"}
% Reached end of topic tp_custom [0] at offset 12
...
root@proxy-hangzhou:~# kafkacat -C -b 47.98.126.155:32776,47.98.126.155:32777,47.98.126.155:32778 -t tp_default
...
{"message":"This is a test!","_hostname":"proxy-hangzhou","_source":"kafka.test"}
{"message":"This is a test!","_hostname":"proxy-hangzhou","_source":"kafka.test"}
% Reached end of topic tp_default [0] at offset 8
...

到此,已经验证上述结论;如果 message 中没有 topic 字段,则使用 default_topic 中定义的值,否则使用 topic 字段的值(优先级更高)

最后看一下官方提供的配置说明

<match app.**>
  @type kafka_buffered

  # Brokers: you can choose either brokers or zookeeper. If you are not familiar with zookeeper, use brokers parameters.
  brokers             <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
  zookeeper           <zookeeper_host>:<zookeeper_port> # Set brokers via Zookeeper
  zookeeper_path      <broker path in zookeeper> :default => /brokers/ids # Set path in zookeeper for kafka

  topic_key             (string) :default => 'topic'   -- 这里可以看到,如果不配置 topic_key ,则默认使用 message 中的 topic 字段的值
  partition_key         (string) :default => 'partition'
  partition_key_key     (string) :default => 'partition_key'
  message_key_key       (string) :default => 'message_key'
  default_topic         (string) :default => nil   -- 这里可以看到,如果没有配置 default_topic ,则默认为空
  default_partition_key (string) :default => nil
  default_message_key   (string) :default => nil
  output_data_type      (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
  output_include_tag    (bool) :default => false
  output_include_time   (bool) :default => false
  exclude_topic_key     (bool) :default => false
  exclude_partition_key (bool) :default => false
  get_kafka_client_log  (bool) :default => false

  # See fluentd document for buffer related parameters: http://docs.fluentd.org/articles/buffer-plugin-overview

  # ruby-kafka producer options
  max_send_retries             (integer)     :default => 1
  required_acks                (integer)     :default => -1
  ack_timeout                  (integer)     :default => nil (Use default of ruby-kafka)
  compression_codec            (gzip|snappy) :default => nil (No compression)
  kafka_agg_max_bytes          (integer)     :default => 4096
  kafka_agg_max_messages       (integer)     :default => nil (No limit)
  max_send_limit_bytes         (integer)     :default => nil (No drop)
  discard_kafka_delivery_failed   (bool)     :default => false (No discard)
  monitoring_list              (array)       :default => []
</match>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant