-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhance kafka broker matching #3129
Conversation
return b | ||
} | ||
|
||
if b.Addr() == addr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this one always going to match as one of the b.Addr() is passed as one of the addr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The address returned by b.Addr()
is the advertised hostname by current broker in metadata (b is not the broker we connect to, but a broker in meta.Brokers).
The advertised hostname must not match the hostname configured. e.g. kafka broker advertises hostname kafkaX.mydomain.com:9092
and addr could be localhost:9092
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We pass b.Addr()
of our broker we selected to findMatchingBroker
. So is the b.Addr()
from our broker the one we configured or the one fetched from the meta data? I assume it is the one we configured, as otherwise it would always match with at least on the brokers or otherwise the broker connection would not work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b.Addr()
will be the address we configured (defaults to localhost:9092
). the meta.Brokers
will contains the addressed as advertised by kafka broker itself. Might be different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should have an error message in case there is no broker with the same advertised address as configured?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
findMatchingBroker
is for trying to match one broker to the one we conncet to. The error if no broker is found is returned in connect
method.
@@ -69,7 +71,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { | |||
BaseMetricSet: base, | |||
broker: broker, | |||
cfg: cfg, | |||
id: noID, | |||
id: config.ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is the id from the configuration exactly going to be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ID
sets the broker id. With broker id being configured, the broker address matching will not be executed. id
should only be configured if number of hosts being monitored is 1 (e.g. localhost:9092
only).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The advertised
config also requires number of hosts being just 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need both config options? Implementation wise for matching id is probably much easier. And the id of a broker should never change over the lifetime of the broker I assume?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually considering to remove id
and advertised
settings. As these settings require hosts
to be exactly one host.
Right, you can lookup the id. But I think it's much more convenient to not have to configure id
or advertised
. Problem with these options is, if they are wrongly configured (by accident), the kafka module will not work correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 in removing again id and advertised if this prevents hosts
to be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I'm not sure if we go too far to get the right broker matching, but now that we have it ...
} | ||
|
||
return b, nil | ||
// no broker if found | ||
closeBroker(b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we turn that around, that we have if other == nil we return the error and close the broker and move the non error case tot he bottom? Would make it easier readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@urso LGTM. Could you squash, update the commit message and add a CHANGELOG entry? |
- compare broker names to hostname - try to lookup metricbeat host machine fqdn and compare to broker name - compare all ips of local machine with resolved broker name ips
ad74410
to
9bf4517
Compare
- compare broker names to hostname - try to lookup metricbeat host machine fqdn and compare to broker name - compare all ips of local machine with resolved broker name ips
* Rewrite elasticsearch connection URL (#3058) * Fix metricbeat service times-out at startup (#3056) * remove init collecting of processes * add changelog entry * Clarify that json.message_key is optional in Filebeat (#3055) I reordered the options based on importance (I put the optional config setting at the end). And I changed the wording to further clarify that the `json.message_key` setting is optional. Fixes #2864 * Document add_cloud_metadata processor (#3054) Fixes #2791 * Remove process.GetProcStatsEvents as not needed anymore (#3066) * Fix testing for 2x releases (#3057) * Update docker files to the last major with the most recent minor and bugfix version * Renamed files to Dockerfile-2x to not have to be renamed every time a new bugfix is released * Remove scripts and config files which are not needed anymore To run testsuite for 2x releases, run: `TESTING_ENVIRONMENT=2x make testsuite` * Remove old release notes files from packetbeat docs (#3067) * Update go-ucfg (#3045) - Update go-ucfg - add support for parsing lists/dictionaries from environment variables and via `-E` flag * Parse elasticsearch URL before logging it (#3075) * Fix the total CPU time in the Docker dashboard (#3085) (#3086) Part of #2629. The name of the field was changed, but not in the dashboard. (cherry picked from commit e271d9f) * Switch partition metricset from client to broker (#3029) Update kafka broker query - Switch paritition metricset from client to broker - on connect try to find the broker id (address must match advertised host). - check broker is leader before querying offsets - query offsets for all replicas - remove 'isr' from event, and replace with boolean flag `insync_replica` - replace `replicas` from event with per event `replica`-id - update sarama to get offset per replica id * Make error fields optional in partition event (#3089) * Update data.json * Make it clear in the docs that publish_async is still experimental (#3096) Remove example for publish_async from the docs * Remove metadata prefix from config as not needed (#3095) * Remove left over string in template test (#3102) * Fix typo in Dockerfile comment (#3105) * Document batch_read_size is experimental in Winlogbeat * Add benchmark test for batch_read_size in Winlogbeat (#3107) * Fix ES 2.x integration test (#3115) There was a test that was loading a mock template, and this template was assuming 5.x. * Pass `--always-copy` to virtualenv (#3082) virtualenv creates symlinks so `make setup` fails when ran on a network mounted fs. `--always-copy` copies files to the destination dir rather than symlinking. * Add project prefix for composer environment (#3116) This prefix is need to run tests with different environments in parallel so one does not affect the other. Like this 2x and snapshot builds should be able to coexist * Reduce allocations in UTF16 conversion (#3113) When decoding a UTF16 string contained in a buffer larger than just the string, more space was allocated than required. ``` BenchmarkUTF16BytesToString/simple_string-4 2000000 846 ns/op 384 B/op 3 allocs/op BenchmarkUTF16BytesToString/larger_buffer-4 2000000 874 ns/op 384 B/op 3 allocs/op BenchmarkUTF16BytesToString_Original/simple_string-4 2000000 840 ns/op 384 B/op 3 allocs/op BenchmarkUTF16BytesToString_Original/larger_buffer-4 1000000 3055 ns/op 8720 B/op 3 allocs/op ``` ``` PS C:\Gopath\src\github.com\elastic\beats\winlogbeat> go test -v github.com/elastic/beats/winlogbeat/eventlog -run ^TestBenchmarkBatchReadSize$ -benchmem -benchtime 10s -benchtest === RUN TestBenchmarkBatchReadSize --- PASS: TestBenchmarkBatchReadSize (68.04s) bench_test.go:100: batch_size=10, total_events=20000, batch_time=5.682627ms, events_per_sec=1759.7494961397256, bytes_alloced_per_event=44 kB, total_allocs=4923840 bench_test.go:100: batch_size=100, total_events=30000, batch_time=53.850879ms, events_per_sec=1856.9799018508127, bytes_alloced_per_event=44 kB, total_allocs=7354285 bench_test.go:100: batch_size=500, total_events=25000, batch_time=271.118774ms, events_per_sec=1844.2101689350366, bytes_alloced_per_event=43 kB, total_allocs=6125665 bench_test.go:100: batch_size=1000, total_events=30000, batch_time=558.03918ms, events_per_sec=1791.9888707455987, bytes_alloced_per_event=43 kB, total_allocs=7350324 PASS ok github.com/elastic/beats/winlogbeat/eventlog 68.095s PS C:\Gopath\src\github.com\elastic\beats\winlogbeat> go test -v github.com/elastic/beats/winlogbeat/eventlog -run ^TestBenchmarkBatchReadSize$ -benchmem -benchtime 10s -benchtest === RUN TestBenchmarkBatchReadSize --- PASS: TestBenchmarkBatchReadSize (71.85s) bench_test.go:100: batch_size=10, total_events=30000, batch_time=5.713873ms, events_per_sec=1750.1264028794478, bytes_alloced_per_event=25 kB, total_allocs=7385820 bench_test.go:100: batch_size=100, total_events=30000, batch_time=52.454484ms, events_per_sec=1906.4147118480853, bytes_alloced_per_event=24 kB, total_allocs=7354318 bench_test.go:100: batch_size=500, total_events=25000, batch_time=260.56659ms, events_per_sec=1918.8952812407758, bytes_alloced_per_event=24 kB, total_allocs=6125688 bench_test.go:100: batch_size=1000, total_events=30000, batch_time=530.468816ms, events_per_sec=1885.124949550286, bytes_alloced_per_event=24 kB, total_allocs=7350360 PASS ok github.com/elastic/beats/winlogbeat/eventlog 71.908s ``` * Fix for errno 1734 when calling EvtNext (#3112) When reading a batch of large event log records the Windows function EvtNext returns errno 1734 (0x6C6) which is RPC_S_INVALID_BOUND ("The array bounds are invalid."). This seems to be a bug in Windows because there is no documentation about this behavior. This fix handles the error by resetting the event log subscription handle (so events are not lost) and then retries the EvtNext call with maxHandles/2. Fixes #3076 * Fetch container stats in parallel (#3127) Currently fetching container stats is very slow as each request takes up to 2 seconds. To improve the fetching time if lots of containers are around, this creates the rrequests in parallel. The main downside is that this opens lots of connections. This fix should only temporary until the bulk api is available: moby/moby#25361 * Fix heartbeat not accepting `mode` parameter (#3128) * Remove fixed container names as not needed (#3122) Add beat name to project namespace * This makes sure different beats environment do not affect each other for example when Kafka is used * It also allows to run the testsuites of all the beats in parallel Introduce `stop-environment` command to stop all containers * Add doc for decode_json_fields processor (#3110) * Add doc for decode_json_fields processor * Use changed param names * Add example of decode_json_fields processor * Fix intro language about processors * Adding AmazonBeat to community beats (#3125) I created a basic version of amazonbeat, which reads data from an amazon product periodically. This beat does not yet publish to elasticsearch. * Reuse a byte buffer for holding XML (#3118) Previously the data was read into a []byte encoded as UTF16. Then that data was converted to []uint16 so that we can use utf16.Decode(). Then the []rune slice was converted to a string which did another data copy. The XML was unmarshalled from the string. This PR changes the code to convert the UTF16 []byte directly to UTF8 and puts the result into a reusable bytes.Buffer. The XML is then unmarshalled directly from the data in buffer. ``` BenchmarkUTF16ToUTF8-4 2000000 1044 ns/op 4 B/op 1 allocs/op ``` ``` git checkout 6ba7700 PS > go test github.com/elastic/beats/winlogbeat/eventlog -run TestBenc -benchtest -benchtime 10s -v === RUN TestBenchmarkBatchReadSize --- PASS: TestBenchmarkBatchReadSize (67.89s) bench_test.go:100: batch_size=10, total_events=30000, batch_time=5.119626ms, events_per_sec=1953.2676801000696, bytes_alloced_per_event=44 kB, total_allocs=7385952 bench_test.go:100: batch_size=100, total_events=30000, batch_time=51.366271ms, events_per_sec=1946.802795943665, bytes_alloced_per_event=44 kB, total_allocs=7354448 bench_test.go:100: batch_size=500, total_events=25000, batch_time=250.974356ms, events_per_sec=1992.2354138842775, bytes_alloced_per_event=43 kB, total_allocs=6125812 bench_test.go:100: batch_size=1000, total_events=30000, batch_time=514.796113ms, events_per_sec=1942.5166094834128, bytes_alloced_per_event=43 kB, total_allocs=7350550 PASS ok github.com/elastic/beats/winlogbeat/eventlog 67.950s git checkout 833a806 (#3113) PS > go test github.com/elastic/beats/winlogbeat/eventlog -run TestBenc -benchtest -benchtime 10s -v === RUN TestBenchmarkBatchReadSize --- PASS: TestBenchmarkBatchReadSize (65.69s) bench_test.go:100: batch_size=10, total_events=30000, batch_time=4.858277ms, events_per_sec=2058.3429063431336, bytes_alloced_per_event=25 kB, total_allocs=7385847 bench_test.go:100: batch_size=100, total_events=30000, batch_time=51.612952ms, events_per_sec=1937.49816906423, bytes_alloced_per_event=24 kB, total_allocs=7354362 bench_test.go:100: batch_size=500, total_events=25000, batch_time=241.713826ms, events_per_sec=2068.561853801445, bytes_alloced_per_event=24 kB, total_allocs=6125757 bench_test.go:100: batch_size=1000, total_events=30000, batch_time=494.961643ms, events_per_sec=2020.3585755431961, bytes_alloced_per_event=24 kB, total_allocs=7350474 PASS ok github.com/elastic/beats/winlogbeat/eventlog 65.747s This PR (#3118) PS > go test github.com/elastic/beats/winlogbeat/eventlog -run TestBenc -benchtest -benchtime 10s -v === RUN TestBenchmarkBatchReadSize --- PASS: TestBenchmarkBatchReadSize (65.80s) bench_test.go:100: batch_size=10, total_events=30000, batch_time=4.925281ms, events_per_sec=2030.341009985014, bytes_alloced_per_event=14 kB, total_allocs=7295817 bench_test.go:100: batch_size=100, total_events=30000, batch_time=48.976134ms, events_per_sec=2041.8108134055658, bytes_alloced_per_event=14 kB, total_allocs=7264329 bench_test.go:100: batch_size=500, total_events=25000, batch_time=250.314316ms, events_per_sec=1997.4886294557757, bytes_alloced_per_event=14 kB, total_allocs=6050719 bench_test.go:100: batch_size=1000, total_events=30000, batch_time=499.861923ms, events_per_sec=2000.5524605641945, bytes_alloced_per_event=14 kB, total_allocs=7260400 PASS ok github.com/elastic/beats/winlogbeat/eventlog 65.856s ``` * Fix make package for community beats (#3094) gopkg.in needs to be copied from the vendor directory of libbeat in the vendor directory * Auto generate modules list (#3131) This is to ensure no modules are forgotten in the future * Remove duplicated enabled entry from redis config (#3132) * Remove --always-copy from virtualenv and make it a param (#3136) In #3082 `--always-copy` was introduced. This caused issue on build on some operating systems. This PR reverts the change but makes `VIRTUALENV_PARAMS` a variable which can be passed to the Makefile. This allows anyone to set `--always-copy` if needed. * Adjust script to generate fields of type geo_point (#3147) * Fix for broken dashboard dependency in Cassandra Dashboard (#3146) The Cassandra Dashboard was linking to the wrong Cassandra visualisation. Some left over with : in the names were still inside Closes #3140 * Fix quotes (#3142) * Fix a print statement to be python 3 compliant (#3144) * Remove -prerelease from the repo names (#3153) * Add mongobeat to list of community beats (#3156) Mongobeat discovers instances in a mongo cluster and can be configured to ship multiple document types - from the commands db.stats() and db.serverStatus() * Update to most recent latest builds (#3161) * Merge snapshot and latest build for Logstash into 1 docker file * Pass certificate options to import dashboards script (#3139) * Pass certificate options to import dashboards script -cert for client certificate -key for client certificate key -cacert for certificate authority * Add -insecure flag to import_dashboards (#3163) * Improve speed and stability of CI builds (#3162) Loading and creating docker images takes quite a bit of time on the travis builds. Especially calls like apt-get update and install take lots of time and bandwidth and fail from time to time, as a host is not available. Following actions were taken: * Fake Kibana container is now based on alpine * Redis stunnel container was also switched to alpine * Add enabled config for prospectors (#3157) The enabled config allows easily to enable and disable a specific prospector. This is consistent with metricbeat where each modules has an enabled config. By default enabled is set to true. * Prototype Filebeat modules implementation (#3158) Contains the Nginx module, including the fields.yml and several pipelines. * Add edits for docker module docs (#3176) * Restructure and edit processors content (#3160) * Cleaned up Changelog in master (#3181) Added the 5.1.0 and 5.1.1 sections, removed duplicates. * metricbeat: enhance kafka broker matching (#3129) - compare broker names to hostname - try to lookup metricbeat host machine fqdn and compare to broker name - compare all ips of local machine with resolved broker name ips * Filebeat MySQL module (#3171) * Contains slowlog and errors filesets * Test files for two mysql versions (5.5 and 5.7) * Add support for built-in variables (e.g. `builtin.hostname`) * Contains a sample Kibana dashboard Part of #3159. * Fix #3167 change ownership of files in build/ (#3168) Add a new Makefile rule: fix-permissions fix-permissions runs a docker container that changes the ownership of all files from root to the user that runs the Makefile * Updating documentation to add udplogbeat (#3190) * Packer customize package info (#3188) * packer: Enable overriding of vendor and license * packer: customize URL of documentation link * packer: location of readme.md.j2 folder can be specified with PACKER_TEMPLATES_DIR * Filebeat syslog module (#3191) * Basic parsing of syslog fields * Supports multiline messages if the lines after the first one start with a space. * Contains a simple Kibana dashboard * Deprecate filters option in metrictbeat (#3173) * Add support for multiple paths per fileset (#3195) We generally need more than one path per OS, because the logs location is not always the same. For example, depending on the linux distribution and how you installed it, MySQL can have it's error logs in a number of default "paths". The solution is to configure them all, which means that Filebeat might try to access unexisting folders. This also improves the python prototype to accept multiple modules and to accept namespaced parameters. E.g.: ./filebeat.py --modules=nginx,syslog -M nginx.access.paths=... * case insensitive hostname comparison in kafka broker matching (#3193) - re-use common.LocalIPAddrs in partition module for resolving IPs - add missing net.IPAddr type switch to common.LocalIPAddrs - update matching to extract addresses early on using strings.ToLower => ensure case insensitive matching by lowercasing * Adds a couchbase module for metricbeat (#3081) * Export cpu cores (#3192) * Fix: Request headers with split_cookies enabled (#3065) * Add 3140 to changelog (#3207) (#3208) (cherry picked from commit 0f4103f)
Enhance bootstrap broker matching:
advertised
address for matching