Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer write broken pipe #1565

Closed
chowyu08 opened this issue Dec 25, 2019 · 24 comments
Closed

producer write broken pipe #1565

chowyu08 opened this issue Dec 25, 2019 · 24 comments
Labels
stale Issues and pull requests without any recent activity

Comments

@chowyu08
Copy link

Versions

V1.24.1

Sarama Kafka Go
1.24.1 Kafka 2.1.1 1.13.1
Configuration

What configuration values are you using for Sarama and Kafka?

conf := sarama.NewConfig()
	conf.Version = sarama.V2_0_0_0
	conf.Producer.RequiredAcks = sarama.WaitForLocal
	conf.ChannelBufferSize = 1024
	conf.Net.KeepAlive = 30 * time.Second
Logs
logs: CLICK ME

time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 state change to [closing] because write tcp xxxxx:59382->xxxx:9092: write: broken pipe\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 state change to [closing] because write tcp xxxx:33336->xxxx:9092: write: broken pipe\n"
time="2019-12-25T09:15:51Z" level=info msg="Closed connection to broker xxxx:9092\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 state change to [retrying-1]\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 abandoning broker 3\n"
time="2019-12-25T09:15:51Z" level=info msg="Closed connection to broker xxxx:9092\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 input chan closed\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 shut down\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 state change to [retrying-1]\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 abandoning broker 2\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 state change to [retrying-1]\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 abandoning broker 2\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 input chan closed\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 shut down\n"
time="2019-12-25T09:15:51Z" level=info msg="client/metadata fetching metadata for [metric.transfer] from broker xxxx:9092\n"
time="2019-12-25T09:15:51Z" level=info msg="client/metadata fetching metadata for [msg.republish] from broker xxxx:9092\n"
time="2019-12-25T09:15:51Z" level=info msg="client/metadata fetching metadata for [metric.hit] from broker xxxx:9092\n"
time="2019-12-25T09:15:51Z" level=info msg="client/brokers replaced registered broker #1 with xxxx:9092"
time="2019-12-25T09:15:51Z" level=info msg="ClientID is the default of 'sarama', you should consider setting it to something application-specific."
time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 starting up\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/broker/3 state change to [open] on metric.transfer/0\n"
time="2019-12-25T09:15:51Z" level=info msg="client/brokers replaced registered broker #1 with xxxx:9092"
time="2019-12-25T09:15:51Z" level=info msg="ClientID is the default of 'sarama', you should consider setting it to something application-specific."
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 selected broker 3\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 state change to [flushing-1]\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.transfer/0 state change to [normal]\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 starting up\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 state change to [open] on metric.ruleengine.hit/0\n"
time="2019-12-25T09:15:51Z" level=info msg="client/brokers replaced registered broker #1 with xxxx:9092"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 selected broker 2\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 selected broker 2\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/broker/2 state change to [open] on msg.republish/1\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 state change to [flushing-1]\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/msg.republish/1 state change to [normal]\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 state change to [flushing-1]\n"
time="2019-12-25T09:15:51Z" level=info msg="producer/leader/metric.hit/0 state change to [normal]\n"
time="2019-12-25T09:15:52Z" level=info msg="Connected to broker at xxxx:9092 (registered as #2)\n"
time="2019-12-25T09:15:53Z" level=info msg="Connected to broker at xxxx:9092 (registered as #3)\n"

Problem Description

After 10 minutes, the producer scoket write broken pipe.

@talbspx
Copy link

talbspx commented Jan 28, 2020

👍
experiencing similar behaviour with
kafka : 0.10.2.1
go : 1.13
sarma : 1.24.1 SyncProducer

@talbspx
Copy link

talbspx commented Apr 13, 2020

update:
this comes from kafka - connections.max.idle.ms broker config that defaults to 10 minutes.
kafka removes idle connections for producers that longer than connections.max.idle.ms.
two possible ways to overcome this -

  1. Feature request to sarama - since kafka 0.11 it is possible to send a metadata request which will mimic a heartbeat from the producer to the broker in which the connection remains alive.
  2. set connections.max.idle.ms config on the broker side to be high enough value to the expected time your producer should produce a message thus keeping the connection alive all the time.

@dnwe
Copy link
Collaborator

dnwe commented Apr 13, 2020

So the client does already run a periodic metadata refresh based on backgroundMetadata goroutine in client.go which ticks on client.conf.Metadata.RefreshFrequency which defaults to 10m so you could reduce that to cause it to happen more frequently.

https://github.com/Shopify/sarama/blob/6159078aacb29adb9ecbd2a83ab9c2732c80add2/client.go#L768

However, that is sent to a random broker each time so won't necessarily keep alive your idle connections to all of the brokers in the cluster

@talbspx
Copy link

talbspx commented Apr 13, 2020

thanks for the quick reply 👍

so i have seen this configuration and set it to 5min (kafka broker remains on 10min for idle connection)
on local single kafka broker and this is still happening.

LOGS {"level":"info","version":"dev","time":"2020-04-13T14:15:59+03:00","message":"Scout service starting"} {"level":"info","time":"2020-04-13T14:15:59+03:00","message":"starting to init server config"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[Initializing new client]"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[ClientID is the default of 'sarama', you should consider setting it to something application-specific.]"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[ClientID is the default of 'sarama', you should consider setting it to something application-specific.]"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"client/metadata fetching metadata for all topics from broker [localhost:9092]\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"Connected to broker at [localhost:9092] (unregistered)\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"client/brokers registered new broker #[1001 %!d(string=localhost:9092)] at %!s(MISSING)"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[Successfully initialized new client]"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:15:59+03:00","message":"[ClientID is the default of 'sarama', you should consider setting it to something application-specific.]"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"producer/broker/[1001] starting up\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"producer/broker/[1001 %!d(string=serverconfigurations) 0] state change to [open] on %!s(MISSING)/%!d(MISSING)\n"} {"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:15:59+03:00","message":"Connected to broker at [localhost:9092 %!s(int32=1001)] (registered as #%!d(MISSING))\n"}

{"level":"debug","entity_id":"5e8056bb89b48fcb0bc16737","caller":"/Users/talbenshabtay/Desktop/workspace/dev/pxScout-serverconfigurations/db/server_config_source.go:142","time":"2020-04-13T14:16:18+03:00","message":"received entity update"} <-- trigger produce to kafka

{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:16:59+03:00","message":"client/metadata fetching metadata for all topics from broker [localhost:9092]\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:17:59+03:00","message":"client/metadata fetching metadata for all topics from broker [localhost:9092]\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:18:59+03:00","message":"client/metadata fetching metadata for all topics from broker [localhost:9092]\n"}

<--------- waited 3 minutes so connection is now idle and disconnected ------------------------>

{"level":"debug","entity_id":"5e8056bb89b48fcb0bc16737","caller":"/Users/talbenshabtay/Desktop/workspace/dev/pxScout-serverconfigurations/db/server_config_source.go:142","time":"2020-04-13T14:19:12+03:00","message":"received entity update"} <-- trigger produce to kafka

{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001 824634384640] state change to [closing] because %!s(MISSING)\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"Closed connection to broker [localhost:9092]\n"}
{"level":"error","error":"EOF","entity_name":"serverconfigurations","entity_id":"5e8056bb89b48fcb0bc16737","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/perimeterx/[email protected]!r!c7/scout/entity_producer.go:71","time":"2020-04-13T14:19:12+03:00","message":"failed to produce entity"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/leader/[serverconfigurations %!s(int32=0) %!s(int32=1001)]/%!d(MISSING) abandoning broker %!d(MISSING)\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001] input chan closed\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001] shut down\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"client/metadata fetching metadata for [[serverconfigurations] localhost:9092] from broker %!s(MISSING)\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:383","time":"2020-04-13T14:19:12+03:00","message":"[ClientID is the default of 'sarama', you should consider setting it to something application-specific.]"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001] starting up\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/broker/[1001 %!d(string=serverconfigurations) 0] state change to [open] on %!s(MISSING)/%!d(MISSING)\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"producer/leader/[serverconfigurations %!s(int32=0) %!s(int32=1001)]/%!d(MISSING) selected broker %!d(MISSING)\n"}
{"level":"debug","caller":"/Users/talbenshabtay/Desktop/workspace/dev/go/pkg/mod/github.com/rs/[email protected]/log.go:391","time":"2020-04-13T14:19:12+03:00","message":"Connected to broker at [localhost:9092 %!s(int32=1001)] (registered as #%!d(MISSING))\n"}

@dnwe updated with logs

@dnwe
Copy link
Collaborator

dnwe commented Apr 13, 2020

@talbspx interesting. If it's just local single broker instance can you change the idle broker config to 2m, set Sarama to 60s refresh and enable debug logs on the broker side? I'll also try and reproduce in the week if I can

@talbspx
Copy link

talbspx commented Apr 13, 2020

@dnwe yeah np :)
will try to have it later on today.

@d1egoaz
Copy link
Contributor

d1egoaz commented Apr 13, 2020

Interested, we've seen lots of these lately.

@talbspx
Copy link

talbspx commented Apr 16, 2020

@d1egoaz @dnwe
to me it seems like the is an issue with the way that the channel keys are handled in the kafka broker on v0.10.x . (code looks a bit different under trunk)

is where the server socket polls the Processor channels and update its state using the processor.Selector:
https://github.com/apache/kafka/blob/0.10.2.2/core/src/main/scala/kafka/network/SocketServer.scala#L494

then the Selector iterates over the channels and tries to update the channel state
https://github.com/apache/kafka/blob/0.10.2.2/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L312-L314

update the idlemanager over each channel key
https://github.com/apache/kafka/blob/0.10.2.2/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L339

and eventually release the channel if needed
https://github.com/apache/kafka/blob/0.10.2.2/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L324

so i believe the issue is with the SelectedKeys that are retrieved (managed) using the java.nio.Selector
https://docs.oracle.com/javase/7/docs/api/java/nio/channels/Selector.html

pierDipi added a commit to pierDipi/eventing-kafka-broker that referenced this issue Jul 27, 2020
- Fix sarama issue: IBM/sarama#1565

Signed-off-by: Pierangelo Di Pilato <[email protected]>
pierDipi added a commit to pierDipi/eventing-kafka-broker that referenced this issue Jul 27, 2020
- Fix sarama issue: IBM/sarama#1565

Signed-off-by: Pierangelo Di Pilato <[email protected]>
knative-prow-robot pushed a commit to knative-extensions/eventing-kafka-broker that referenced this issue Jul 28, 2020
* Create a new ClusterAdmin at each loop

- Fix sarama issue: IBM/sarama#1565

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Remove error return value from SetBootstrapServer

Signed-off-by: Pierangelo Di Pilato <[email protected]>
@sshahar1
Copy link

I am also interested in this issue. I am experiencing it with:
Sarama version v1.26.3
Kafka version 2.6.0
Go version 1.12

This is quite a mission critical application, so I do need to fix this.

@DasTushar
Copy link

Interested in this issue. I'm experiencing it with:

Sarama version v1.27.2
Go version 1.14

@caiofralmeida
Copy link

Interested in this issue. I'm experiencing it with:

Sarama version v1.28.0
Go version 1.15

@mster429
Copy link

mster429 commented Aug 31, 2021

Experiencing the same issue with the recent sarama version upgrade v1.29.1 and go-1.15.6. Any help in figuring out the cause is appreciated. The producer client is running on a k8s container with 1 core and 2 GiB memory. Does it have to do anything on the producer client end since the only change done was the sarama version upgrade. Thanks in advance :)

@dnwe
Copy link
Collaborator

dnwe commented Sep 8, 2021

Yes this is a bug. I believe the issue will be that the producer dispatcher func simply iterates over the Input channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won't realise until it next tries to send a message.

Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation

@DesmonPong
Copy link

Yes this is a bug. I believe the issue will be that the producer dispatcher func simply iterates over the Input channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won't realise until it next tries to send a message.

Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation

any update?

@baoxc-shopee
Copy link

update: this comes from kafka - connections.max.idle.ms broker config that defaults to 10 minutes. kafka removes idle connections for producers that longer than connections.max.idle.ms. two possible ways to overcome this -

  1. Feature request to sarama - since kafka 0.11 it is possible to send a metadata request which will mimic a heartbeat from the producer to the broker in which the connection remains alive.
  2. set connections.max.idle.ms config on the broker side to be high enough value to the expected time your producer should produce a message thus keeping the connection alive all the time.

Hi I met the same issue and could you share how can i send a metadata refresh as you said? Thanks a lot!

@king-freshket
Copy link

Yes this is a bug. I believe the issue will be that the producer dispatcher func simply iterates over the Input channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won't realise until it next tries to send a message.

Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation

any update

@ghost
Copy link

ghost commented Oct 14, 2022

Yes this is a bug. I believe the issue will be that the producer dispatcher func simply iterates over the Input channel blocking until new messages are put on the Input queue to send. If Kafka has closed the underlying broker connection in the mean time the producer currently won't realise until it next tries to send a message.

Update: hmm after looking into this some more, the brokerProducer should account for the dropped connection and re-establish it fairly transparently anyway. Needs more investigation

we are facing the same issue, any update?

@david-bergman
Copy link

david-bergman commented Nov 3, 2022

Interested in this issue. I'm experiencing it with:

Sarama version v1.37.2
Go version 1.18
Confluent Cloud Kafka

@boscar
Copy link

boscar commented Jan 30, 2023

Any update on this?

Sarama version v1.37.2
Go version 1.19
Confluent Cloud Kafka

@3AceShowHand
Copy link

Any update on this?

Sarama version v1.36.0
Go version 1.19
Kafka 3.x

@HY1310
Copy link

HY1310 commented Feb 10, 2023 via email

@david-bergman
Copy link

I am still experiencing this issue with the latest sarama
v1.38.2-0.20230327141928-9127f1c432c0
periodically , so once every week or so, when it seems Confluent Kafka does some rolling upgrades on the cluster, i get this error, and then the worker hangs indefinitely.
any plans to address this bug in future releases?

@dnwe
Copy link
Collaborator

dnwe commented Aug 31, 2023

With connections.max.idle.ms enabled server side, this should have been fixed with #2197 and #2234 both merged

@david-bergman it sounds like you are still seeing issues — I wonder if you were inadvertently using sasl v0 in v1.38.1 and missing out on the re-auth. Since #2572 the default was (put back) to sasl v1 and we also have a lot better protocol support. Can you re-test with the latest release?

Copy link

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur.
Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale Issues and pull requests without any recent activity
Projects
None yet
Development

No branches or pull requests