Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer using assign() does not re-query coordinator after NOT_COORD_FOR_GROUP #2791

Closed
7 tasks done
mlongob opened this issue Mar 31, 2020 · 5 comments
Closed
7 tasks done

Comments

@mlongob
Copy link

mlongob commented Mar 31, 2020

Description

Consumers using the assign() API instead of the subscribe() API can get into a state where they keep unsuccessfully trying to commit offsets to a broker that is no longer the group coordinator. They stay in this state until they're restarted.

This might be related to #2630

How to reproduce

librdkafka client code excerpts:

    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));

[...]

    conf->set("metadata.broker.list", brokerList, errstr);
    conf->set("client.id", "my-consumer", errstr);
    conf->set("enable.auto.offset.store", "false", errstr);
    conf->set("log.connection.close", "false", errstr);
    conf->set("enable.partition.eof", "false", errstr);
    conf->set("group.id", "mygroup", errstr);
    conf->set("auto.offset.reset", "largest", errstr);
    conf->set("debug", "consumer,cgrp,topic,fetch,broker", errstr);

    RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(conf, errstr);
    RdKafka::Topic* topic = RdKafka::Topic::create(consumer, "mytopic", NULL, errstr);
    RdKafka::TopicPartition* partition(RdKafka::TopicPartition::create("mytopic", 0);
    std::vector<RdKafka::TopicPartition*> partitions;
    partitions.push_back(partition);
    consumer->assign(partitions);

[...]

    RdKafka::Message* message = consumer->consume(1000);
    message->topic()->offset_store(message.partition(), message.offset());

Sequence of events to reproduce issue:

  1. Bring consumer online while all brokers are up
  2. Keep a constant stream of messages going into the partition we're assigning (1 msg / 10 seconds)
  3. Consumer successfully commits offset to group coordinator broker A
  4. Bring down broker A
  5. Consumer successfully switches to coordinator broker B and commits successfully
  6. Bring broker A back up
  7. Broker B returns NOT_COORD_FOR_GROUP and consumer keeps failing to commit (broker A is the coordinator for the group again, but librdkafka doesn't seem to detect it)

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag): 1.3.0
  • Apache Kafka version: 2.2.0
  • librdkafka client configuration
  • Operating system: Red Hat Enterprise Linux Server release 7.6 (Maipo)
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue: consumers will not commit until they're restarted

librdkafka client configuration dump:

builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer
client.id=my-consumer
metadata.broker.list=mybroker:9092
message.max.bytes=1000000
message.copy.max.bytes=65535
receive.message.max.bytes=100000000
max.in.flight.requests.per.connection=1000000
metadata.request.timeout.ms=60000
topic.metadata.refresh.interval.ms=300000
metadata.max.age.ms=900000
topic.metadata.refresh.fast.interval.ms=250
topic.metadata.refresh.fast.cnt=10
topic.metadata.refresh.sparse=true
debug=broker,topic,cgrp,fetch,consumer
socket.timeout.ms=60000
socket.blocking.max.ms=1000
socket.send.buffer.bytes=0
socket.receive.buffer.bytes=0
socket.keepalive.enable=false
socket.nagle.disable=false
socket.max.fails=1
broker.address.ttl=1000
broker.address.family=any
enable.sparse.connections=true
reconnect.backoff.jitter.ms=0
reconnect.backoff.ms=100
reconnect.backoff.max.ms=10000
statistics.interval.ms=0
enabled_events=0
log_cb=0x472690
log_level=6
log.queue=false
log.thread.name=true
log.connection.close=false
socket_cb=0x484620
open_cb=0x4a0cc0
default_topic_conf=0x15ac4a0
internal.termination.signal=0
api.version.request=true
api.version.request.timeout.ms=10000
api.version.fallback.ms=0
broker.version.fallback=0.10.0
security.protocol=plaintext
enable.ssl.certificate.verification=true
ssl.endpoint.identification.algorithm=none
sasl.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
sasl.kerberos.principal=kafkaclient
sasl.kerberos.kinit.cmd=kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} || kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal}
sasl.kerberos.min.time.before.relogin=60000
enable.sasl.oauthbearer.unsecure.jwt=false
test.mock.num.brokers=0
group.id=mygroup
partition.assignment.strategy=range,roundrobin
session.timeout.ms=10000
heartbeat.interval.ms=3000
group.protocol.type=consumer
coordinator.query.interval.ms=600000
max.poll.interval.ms=300000
enable.auto.commit=true
auto.commit.interval.ms=5000
enable.auto.offset.store=false
queued.min.messages=100000
queued.max.messages.kbytes=1048576
fetch.wait.max.ms=100
fetch.message.max.bytes=1048576
fetch.max.bytes=52428800
fetch.min.bytes=1
fetch.error.backoff.ms=500
offset.store.method=broker
isolation.level=read_committed
enable.partition.eof=false
check.crcs=false
client.rack=
enable.idempotence=false
enable.gapless.guarantee=false
queue.buffering.max.messages=100000
queue.buffering.max.kbytes=1048576
queue.buffering.max.ms=0.5
message.send.max.retries=2
retry.backoff.ms=100
queue.buffering.backpressure.threshold=1
compression.codec=none
batch.num.messages=10000
delivery.report.only.error=false

Full librdkafka debug logs: https://gist.github.com/mlongob/4ecfe850cbdacc24b53e789be016e371

librdkafka debug logs excerpts:

%7|1585687171.363|MEMBERID|my-consumer#consumer-1| [thrd:app]: Group "mygroup": updating member id "(not-set)" -> ""
%7|1585687171.363|BROKER|my-consumer#consumer-1| [thrd:app]: GroupCoordinator: Added new broker with NodeId -1
%7|1585687171.363|BRKMAIN|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread
%7|1585687171.364|CGRPOP|my-consumer#consumer-1| [thrd:main]: Group "mygroup" received op ASSIGN (v0) in state init (join state init, v1 vs 0)
%7|1585687171.364|ASSIGN|my-consumer#consumer-1| [thrd:main]: Group "mygroup": new assignment of 1 partition(s) in join state init
%7|1585687171.364|BARRIER|my-consumer#consumer-1| [thrd:main]: Group "mygroup": rd_kafka_cgrp_assign:2432: new version barrier v2
%7|1585687171.364|ASSIGN|my-consumer#consumer-1| [thrd:main]: Group "mygroup": assigning 1 partition(s) in join state init
%7|1585687171.364|CGRPJOINSTATE|my-consumer#consumer-1| [thrd:main]: Group "mygroup" changed join state init -> assigned (v2, state init)
%7|1585687171.364|BARRIER|my-consumer#consumer-1| [thrd:main]: Group "mygroup": rd_kafka_cgrp_partitions_fetch_start0:1736: new version barrier v3
%7|1585687171.364|FETCHSTART|my-consumer#consumer-1| [thrd:main]: Group "mygroup": starting fetchers for 1 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2477)
%7|1585687171.364|CGRPSTATE|my-consumer#consumer-1| [thrd:main]: Group "mygroup" changed state init -> query-coord (v3, join-state assigned)
%7|1585687171.364|CGRPQUERY|my-consumer#consumer-1| [thrd:main]: Group "mygroup": no broker available for coordinator query: intervaled in state query-coord
%7|1585687171.411|FEATURE|my-consumer#consumer-1| [thrd:mybroker:9092/boots]: mybroker:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,UnitTest
%7|1585687171.412|FEATURE|my-consumer#consumer-1| [thrd:mybroker-577:9092/27416]: mybroker-577:9092/27416: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,UnitTest
%7|1585687172.363|CGRPQUERY|my-consumer#consumer-1| [thrd:main]: mybroker:9092/bootstrap: Group "mygroup": querying for coordinator: intervaled in state query-coord
%7|1585687172.363|CGRPSTATE|my-consumer#consumer-1| [thrd:main]: Group "mygroup" changed state query-coord -> wait-coord (v3, join-state assigned)
%7|1585687172.364|CGRPCOORD|my-consumer#consumer-1| [thrd:main]: mybroker:9092/bootstrap: Group "mygroup" coordinator is mybroker-231:9092 id 20245
%7|1585687172.364|CGRPCOORD|my-consumer#consumer-1| [thrd:main]: Group "mygroup" changing coordinator -1 -> 20245
%7|1585687172.364|COORDSET|my-consumer#consumer-1| [thrd:main]: Group "mygroup" coordinator set to broker mybroker-231:9092/20245
%7|1585687172.364|CGRPSTATE|my-consumer#consumer-1| [thrd:main]: Group "mygroup" changed state wait-coord -> wait-broker-transport (v3, join-state assigned)
%7|1585687172.364|NODENAME|my-consumer#consumer-1| [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "mybroker-231:9092"
%7|1585687172.364|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Received CONNECT op
%7|1585687172.364|CGRPQUERY|my-consumer#consumer-1| [thrd:main]: mybroker:9092/bootstrap: Group "mygroup": querying for coordinator: intervaled in state wait-broker-transport
%7|1585687172.364|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1585687172.364|BROKERFAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker node update: (errno: Success)
%7|1585687172.364|FAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 0ms in state TRY_CONNECT)
%7|1585687172.364|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> DOWN
%7|1585687172.364|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1585687172.364|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1585687172.364|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/20245: broker in state TRY_CONNECT connecting
%7|1585687172.364|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1585687172.364|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/20245: Connecting to ipv4#10.34.192.190:9092 (plaintext) with socket 24
%7|1585687172.364|CGRPCOORD|my-consumer#consumer-1| [thrd:main]: mybroker:9092/bootstrap: Group "mygroup" coordinator is mybroker-231:9092 id 20245
%7|1585687172.366|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/20245: Connected to ipv4#10.34.192.190:9092
%7|1585687172.366|CONNECTED|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/20245: Connected (#1)
%7|1585687172.366|FEATURE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/20245: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1585687172.366|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1585687172.368|FEATURE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/20245: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,UnitTest
%7|1585687172.368|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state APIVERSION_QUERY -> UP
%7|1585687173.363|CGRPSTATE|my-consumer#consumer-1| [thrd:main]: Group "mygroup" changed state wait-broker-transport -> up (v3, join-state assigned)
%7|1585687173.363|BARRIER|my-consumer#consumer-1| [thrd:main]: Group "mygroup": rd_kafka_cgrp_partitions_fetch_start0:1736: new version barrier v4
%7|1585687173.363|FETCHSTART|my-consumer#consumer-1| [thrd:main]: Group "mygroup": starting fetchers for 1 assigned partition(s) in join-state assigned (usable_offsets=no, v4, line 3269)
%7|1585687173.363|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: OffsetFetchRequest(v1) for 1/1 partition(s)
%7|1585687173.363|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: Fetch committed offsets for 1/1 partition(s)
%7|1585687173.365|OFFSETFETCH|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: OffsetFetchResponse: mytopic [0] offset -1, metadata 0 byte(s)
%7|1585687173.365|OFFFETCH|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: OffsetFetch for 1/1 partition(s) returned Success
%7|1585687173.365|BARRIER|my-consumer#consumer-1| [thrd:main]: Group "mygroup": rd_kafka_cgrp_partitions_fetch_start0:1736: new version barrier v5
%7|1585687173.365|FETCHSTART|my-consumer#consumer-1| [thrd:main]: Group "mygroup": starting fetchers for 1 assigned partition(s) in join-state assigned (usable_offsets=yes, v5, line 1668)
%7|1585687173.365|CGRPJOINSTATE|my-consumer#consumer-1| [thrd:main]: Group "mygroup" changed join state assigned -> started (v5, state up)
%7|1585687173.365|CGRPOP|my-consumer#consumer-1| [thrd:main]: Group "mygroup" received op PARTITION_JOIN in state up (join state started, v5) for mytopic [0]
%7|1585687173.365|PARTADD|my-consumer#consumer-1| [thrd:main]: Group "mygroup": add mytopic [0]
%7|1585687211.364|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: Committing offsets for 1 partition(s): cgrp auto commit timer
%7|1585687211.364|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: Enqueue OffsetCommitRequest(v6, 1/1 partition(s))): cgrp auto commit timer
%7|1585687211.380|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Success
%7|1585687216.364|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: Committing offsets for 1 partition(s): cgrp auto commit timer
%7|1585687216.364|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: Enqueue OffsetCommitRequest(v6, 1/1 partition(s))): cgrp auto commit timer
%7|1585687216.368|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Success

[...]

%7|1585687751.367|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: Committing offsets for 1 partition(s): cgrp auto commit timer
%7|1585687751.367|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: Enqueue OffsetCommitRequest(v6, 1/1 partition(s))): cgrp auto commit timer
%7|1585687751.372|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Success
%7|1585687756.367|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: Committing offsets for 1 partition(s): cgrp auto commit timer
%7|1585687756.367|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: Enqueue OffsetCommitRequest(v6, 1/1 partition(s))): cgrp auto commit timer
%7|1585687756.372|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Success
%7|1585687758.716|BROKERFAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker transport failure: (errno: Connection reset by peer)
%7|1585687758.716|FAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Disconnected (after 586348ms in state UP)
%7|1585687758.716|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state UP -> DOWN
%7|1585687758.716|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1585687758.716|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1585687758.716|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/20245: broker in state TRY_CONNECT connecting
%7|1585687758.716|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1585687758.735|CGRPSTATE|my-consumer#consumer-1| [thrd:main]: Group "mygroup" changed state up -> query-coord (v5, join-state started)
%7|1585687758.735|CGRPQUERY|my-consumer#consumer-1| [thrd:main]: mybroker-577:9092/27416: Group "mygroup": querying for coordinator: intervaled in state query-coord
%7|1585687758.735|CGRPSTATE|my-consumer#consumer-1| [thrd:main]: Group "mygroup" changed state query-coord -> wait-coord (v5, join-state started)
%7|1585687758.815|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/20245: Connecting to ipv4#10.34.192.190:9092 (plaintext) with socket 24
%7|1585687758.817|BROKERFAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker transport failure: (errno: Connection refused)
%3|1585687758.817|FAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.34.192.190:9092 failed: Connection refused (after 100ms in state CONNECT)
%3|1585687758.817|ERROR|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.34.192.190:9092 failed: Connection refused (after 100ms in state CONNECT)
%7|1585687758.817|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> DOWN
%7|1585687758.817|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1585687758.817|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1585687758.817|RECONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/20245: Delaying next reconnect by 46ms
%7|1585687758.840|CGRPCOORD|my-consumer#consumer-1| [thrd:main]: mybroker-577:9092/27416: Group "mygroup" coordinator is mybroker-577:9092 id 27416
%7|1585687758.840|CGRPCOORD|my-consumer#consumer-1| [thrd:main]: Group "mygroup" changing coordinator 20245 -> 27416
%7|1585687758.840|COORDCLEAR|my-consumer#consumer-1| [thrd:main]: Group "mygroup" broker mybroker-231:9092/20245 is no longer coordinator
%7|1585687758.840|NODENAME|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/20245: Broker nodename changed from "mybroker-231:9092" to ""
%7|1585687758.840|COORDSET|my-consumer#consumer-1| [thrd:main]: Group "mygroup" coordinator set to broker mybroker-577:9092/27416
%7|1585687758.840|CGRPSTATE|my-consumer#consumer-1| [thrd:main]: Group "mygroup" changed state wait-coord -> wait-broker-transport (v5, join-state started)
%7|1585687758.840|NODENAME|my-consumer#consumer-1| [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "mybroker-577:9092"
%7|1585687758.840|BROKERFAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker node update: (errno: Connection refused)
%7|1585687758.840|CGRPQUERY|my-consumer#consumer-1| [thrd:main]: mybroker-577:9092/27416: Group "mygroup": querying for coordinator: intervaled in state wait-broker-transport
%7|1585687758.840|FAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 23ms in state TRY_CONNECT)
%7|1585687758.840|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> DOWN
%7|1585687758.840|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1585687758.840|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1585687758.840|RECONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: Delaying next reconnect by 22ms
%7|1585687758.862|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: broker in state TRY_CONNECT connecting
%7|1585687758.862|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1585687758.862|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: Connecting to ipv4#10.34.192.190:9092 (plaintext) with socket 24
%7|1585687758.864|BROKERFAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1585687758.864|FAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.34.192.190:9092 failed: Connection refused (after 1ms in state CONNECT)
%7|1585687758.864|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> DOWN
%7|1585687758.864|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1585687758.864|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1585687758.864|RECONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: Delaying next reconnect by 180ms
%7|1585687758.940|CGRPCOORD|my-consumer#consumer-1| [thrd:main]: mybroker-577:9092/27416: Group "mygroup" coordinator is mybroker-577:9092 id 27416
%7|1585687759.044|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: broker in state TRY_CONNECT connecting
%7|1585687759.044|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1585687759.044|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: Connecting to ipv4#10.34.192.190:9092 (plaintext) with socket 24
%7|1585687759.046|BROKERFAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1585687759.046|FAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.34.192.190:9092 failed: Connection refused (after 1ms in state CONNECT)
%7|1585687759.046|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> DOWN
%7|1585687759.046|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1585687759.046|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1585687759.046|RECONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: Delaying next reconnect by 492ms
%7|1585687759.539|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: broker in state TRY_CONNECT connecting
%7|1585687759.539|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1585687759.539|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: Connecting to ipv4#10.34.192.190:9092 (plaintext) with socket 24
%7|1585687759.540|BROKERFAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1585687759.540|FAIL|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Connect to ipv4#10.34.192.190:9092 failed: Connection refused (after 1ms in state CONNECT)
%7|1585687759.540|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> DOWN
%7|1585687759.540|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1585687759.540|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1585687759.540|RECONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: Delaying next reconnect by 695ms
%7|1585687759.901|CGRPQUERY|my-consumer#consumer-1| [thrd:main]: mybroker-577:9092/27416: Group "mygroup": querying for coordinator: intervaled in state wait-broker-transport
%7|1585687759.943|CGRPCOORD|my-consumer#consumer-1| [thrd:main]: mybroker-577:9092/27416: Group "mygroup" coordinator is mybroker-577:9092 id 27416
%7|1585687760.236|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: broker in state TRY_CONNECT connecting
%7|1585687760.236|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1585687760.282|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: Connecting to ipv4#10.122.76.209:9092 (plaintext) with socket 24
%7|1585687760.282|CONNECT|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: Connected to ipv4#10.122.76.209:9092
%7|1585687760.282|CONNECTED|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/27416: Connected (#2)
%7|1585687760.282|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1585687760.283|STATE|my-consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state APIVERSION_QUERY -> UP
%7|1585687760.369|CGRPSTATE|my-consumer#consumer-1| [thrd:main]: Group "mygroup" changed state wait-broker-transport -> up (v5, join-state started)
%7|1585687761.367|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Committing offsets for 1 partition(s): cgrp auto commit timer
%7|1585687761.367|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Enqueue OffsetCommitRequest(v6, 1/1 partition(s))): cgrp auto commit timer
%7|1585687761.378|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Success
%7|1585687766.367|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Committing offsets for 1 partition(s): cgrp auto commit timer
%7|1585687766.367|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Enqueue OffsetCommitRequest(v6, 1/1 partition(s))): cgrp auto commit timer

[...]

%7|1585687816.371|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Success
%7|1585687821.368|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Committing offsets for 1 partition(s): cgrp auto commit timer
%7|1585687821.368|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Enqueue OffsetCommitRequest(v6, 1/1 partition(s))): cgrp auto commit timer
%7|1585687821.368|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Broker: Not coordinator
%7|1585687821.368|COMMIT|my-consumer#consumer-1| [thrd:main]: Group "mygroup": unable to OffsetCommit in state up: Broker: Not coordinator: coordinator (mybroker-577:9092/27416) is unavailable: retrying later
%7|1585687821.368|CGRPOP|my-consumer#consumer-1| [thrd:main]: Group "mygroup" received op OFFSET_COMMIT (v0) in state up (join state started, v5 vs 0)
%7|1585687821.368|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Committing offsets for 1 partition(s): cgrp auto commit timer
%7|1585687821.368|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Enqueue OffsetCommitRequest(v6, 1/1 partition(s))): cgrp auto commit timer
%7|1585687821.369|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Broker: Not coordinator
%7|1585687826.368|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Committing offsets for 1 partition(s): cgrp auto commit timer
%7|1585687826.368|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Enqueue OffsetCommitRequest(v6, 1/1 partition(s))): cgrp auto commit timer
%7|1585687826.368|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Broker: Not coordinator
%7|1585687826.368|COMMIT|my-consumer#consumer-1| [thrd:main]: Group "mygroup": unable to OffsetCommit in state up: Broker: Not coordinator: coordinator (mybroker-577:9092/27416) is unavailable: retrying later
%7|1585687826.368|CGRPOP|my-consumer#consumer-1| [thrd:main]: Group "mygroup" received op OFFSET_COMMIT (v0) in state up (join state started, v5 vs 0)
%7|1585687826.368|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Committing offsets for 1 partition(s): cgrp auto commit timer
%7|1585687826.368|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Enqueue OffsetCommitRequest(v6, 1/1 partition(s))): cgrp auto commit timer
%7|1585687826.368|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Broker: Not coordinator
%7|1585687831.368|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Committing offsets for 1 partition(s): cgrp auto commit timer
%7|1585687831.368|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Enqueue OffsetCommitRequest(v6, 1/1 partition(s))): cgrp auto commit timer
%7|1585687831.368|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Broker: Not coordinator
%7|1585687831.368|COMMIT|my-consumer#consumer-1| [thrd:main]: Group "mygroup": unable to OffsetCommit in state up: Broker: Not coordinator: coordinator (mybroker-577:9092/27416) is unavailable: retrying later
%7|1585687831.368|CGRPOP|my-consumer#consumer-1| [thrd:main]: Group "mygroup" received op OFFSET_COMMIT (v0) in state up (join state started, v5 vs 0)
%7|1585687831.368|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Committing offsets for 1 partition(s): cgrp auto commit timer
%7|1585687831.368|OFFSET|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: Enqueue OffsetCommitRequest(v6, 1/1 partition(s))): cgrp auto commit timer
%7|1585687831.368|COMMIT|my-consumer#consumer-1| [thrd:main]: GroupCoordinator/27416: OffsetCommit for 1 partition(s): cgrp auto commit timer: returned: Broker: Not coordinator

[...]

Broker logs excerpts:

[2020-03-31 16:49:18,614] INFO [GroupCoordinator 20245]: Unloading group metadata for mygroup with generation 0 (kafka.coordinator.group.GroupCoordinator)
[2020-03-31 16:49:18,617] INFO [GroupCoordinator 27416]: Loading group metadata for mygroup with generation 0 (kafka.coordinator.group.GroupCoordinator)
[2020-03-31 16:50:19,818] INFO [GroupCoordinator 27416]: Unloading group metadata for mygroup with generation 0 (kafka.coordinator.group.GroupCoordinator)
[2020-03-31 16:50:19,880] INFO [GroupCoordinator 20245]: Loading group metadata for mygroup with generation 0 (kafka.coordinator.group.GroupCoordinator)

Series of events:

  1. 4:39:31 PM - Consumer is started / broker.id 20245 is the group coordinator for mygroup
  2. 4:49:18 PM - broker.id 20245 is stopped. broker.id 27416 is now coordinator for mygroup
  3. 4:50:19 PM - broker.id 27416 is restarted. broker.id 27416 is now coordinator for mygroup

The consumer is able to successfully recover from event (2) but not from event (3)

edenhill added a commit that referenced this issue Apr 3, 2020
For instance NOT_COORD_FOR_GROUP would not trigger a coordinator query.
@edenhill edenhill added this to the v1.5.0 milestone Apr 6, 2020
edenhill added a commit that referenced this issue Apr 20, 2020
For instance NOT_COORD_FOR_GROUP would not trigger a coordinator query.
edenhill added a commit that referenced this issue Apr 23, 2020
For instance NOT_COORD_FOR_GROUP would not trigger a coordinator query.
@MaximGurschi
Copy link

Hi Magnus. Thank you for looking into fixing it. Tell me please your thoughts on me adding a fallback to my consumer to trap _WAIT_COORD (-180) and NOT_COORDINATOR_FOR_GROUP (16) and get the consumer to destroy the old connection handle and create a new one (similar to the work around in #2630)?

Do you think it is worth while or that the imminent fix you are working on will make that unnecessary? Do you think that trapping those will lead to unnecessary, expensive and possibly frequent reconnects?

@edenhill
Copy link
Contributor

v1.4.2 with this fix included will be released later this week.

@MaximGurschi
Copy link

Thanks @edenhill , sounds great! Though what would you say to me still having the reconnect fallback in case consecutive errors of this form happen?

@edenhill
Copy link
Contributor

The workaround should no longer be needed.

@RaajBadra
Copy link

@edenhill I'm using node-rdkafka version 2.7.4 and got the below error and I'm using consumer assign() & had auto commit flag as true. If I upgrade to 2.9.x version will this issue be fixed automatically ? Like the consumer will rejoin the group automatically with out any work around ?

Error:

{"severity":4,"fac":"COMMITFAIL","message":"[thrd:main]: Offset commit (cgrp auto commit timer) failed for 0/1 partition(s): mytopic[0]@8747(Broker: Unknown member)"}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants