Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer.close hangs when the cluster is unreachable #268

Closed
dmariassy opened this issue Jul 7, 2023 · 19 comments
Closed

producer.close hangs when the cluster is unreachable #268

dmariassy opened this issue Jul 7, 2023 · 19 comments
Assignees
Labels

Comments

@dmariassy
Copy link

👋🏻 Hi - I'm not sure if this should be a question raised on librdkafka or here, so apologies if this is the wrong place to report this.

Expected behaviour

When producer.close is called, the producer will eventually either close all connections or give up. In either scenario, all producer threads should be cleaned up once close completes.

Actual behaviour

When close is called on a producer with a non-empty buffer, and the target cluster is unreachable, close will hang indefinitely. The producer thread is never properly terminated and cleaned up.

Steps to reproduce

producer = Rdkafka::Config.new("bootstrap.servers": "some-host-where-kafka-is-not-running:9092").producer
producer.produce(topic: "topic", payload: "payload")
producer.close # This hangs
@mensfeld mensfeld self-assigned this Jul 7, 2023
@mensfeld mensfeld added the bug label Jul 7, 2023
@mensfeld
Copy link
Member

mensfeld commented Jul 7, 2023

Yeah, that's because we wait on the queue being empty. I will take a shot at this and maybe it can be mitigated on our side.

One thing that needs to happen though, is an error callback trigger on failed delivery for instrumentation reasons.

@dmariassy
Copy link
Author

One thing that needs to happen though, is an error callback trigger on failed delivery for instrumentation reasons.

💯 agree

@mensfeld
Copy link
Member

mensfeld commented Jul 9, 2023

Ok after a set of tests it's not a bug and it does not hang forever.

ref: https://karafka.io/docs/FAQ/#how-can-i-make-sure-that-karafkaproducer-does-not-blockdelay-my-processing

it will close after 5 minutes which is what you have as a default. This is consistent and expected.

@mensfeld mensfeld closed this as completed Jul 9, 2023
@mensfeld
Copy link
Member

mensfeld commented Jul 9, 2023

On top of that, upon eviction from the queue a proper instrumentation call is being emitted. Example from waterdrop (as it has callbacks hooked):

producer = WaterDrop::Producer.new

producer.setup do |config|
  config.deliver = true
  config.kafka = {
    'bootstrap.servers': 'localhost:9093',
    'request.required.acks': 1,
    'message.timeout.ms': 5_000
  }
end

producer.monitor.subscribe('error.occurred') do |event|
  error = event[:error]

  p "WaterDrop error occurred: #{error}"
end

irb(main):029:0> producer.produce_async(topic: 'test', payload: 'test')
=> #<Rdkafka::Producer::DeliveryHandle:0x00007f41cf6156d0>
irb(main):030:0> "WaterDrop error occurred: Local: Broker transport failure (transport)"

@mensfeld
Copy link
Member

mensfeld commented Jul 9, 2023

When you close producer that had evicted messages but they were well evicted ;) it closes without wait. Thus this works exactly as expected as it is now.

@mensfeld mensfeld added invalid and removed bug labels Jul 9, 2023
@mensfeld
Copy link
Member

mensfeld commented Jul 9, 2023

@jychen7
Copy link
Contributor

jychen7 commented Jul 10, 2023

Thank you for your detailed explanation. The config message.timeout.ms (or alias delivery.timeout.ms) makes sense.

I try the following script in Google Cloud Shell and find an interesting thing.

  • 999 ms output 1s
  • 1000 ms output 2s

Just curious do you know where is the extra 1s ❓

require "rdkafka"

message_timeout_ms = 999

producer = Rdkafka::Config.new(
    "bootstrap.servers": "some-host-where-kafka-is-not-running:9092",
    "message.timeout.ms": message_timeout_ms,
).producer

producer.produce(topic: "topic", payload: "payload")

t1 = Time.now
producer.close
t2 = Time.now
puts "close spend: #{t2 - t1} seconds"

@mensfeld
Copy link
Member

After the message is evicted, there is still a delay in constructing and shipping via error callback the failed messages + reason and then the delay on polling this and passing on this binding side. I cannot give you the exact location of this though. Sorry

@dmariassy
Copy link
Author

Thanks for looking into this @mensfeld! I understand the semantics of rdkafka-ruby better now.

IMO, it could still make sense to allow using separate timeouts during steady state (producer.produce) and special ops (producer.close). AFAICT flush already works this way: it accepts a custom timeout value which overrides the client's default timeout settings. I think close might benefit from something similar. Maybe I'm happy to retry for 15 minutes while my app is running normally, but I want a snappier response when I'm trying to shutdown my app.

@mensfeld
Copy link
Member

@dmariassy but flush is something else. Flush bypasses the wait time for messages sets to be constructed and then attempts to dispatch. A fail dispatch on flush still obeys the eviction policies and keeps messages there. It's just that you do not wait for the retries to kick in in a blocking manner:

ERR__TIMED_OUT if timeout_ms was reached before all outstanding requests were completed, else ERR_NO_ERROR 

So it does not alter this behaviour.

We could add ability to drop the messages via purge but then none of them would propage to error callbacks on delivery eviction.

@dmariassy
Copy link
Author

dmariassy commented Jul 10, 2023

Flush bypasses the wait time for messages sets to be constructed and then attempts to dispatch.

About that 😄 I'm playing around with flush and that's not what I'm seeing (but I might be missing something):

config = {
  "bootstrap.servers": "localhost:9092",
  "linger.ms": 5000,
}

producer = Rdkafka::Config.new(config).producer
producer.produce(topic: "test", payload: "1")
producer.flush(1000)
sleep 1

The event never lands in the topic ☝🏻 Am I missing something? It appears that flush isn't transmitting messages sets that aren't ready yet (given the linger.ms config).

We could add ability to drop the messages via purge but then none of them would propage to error callbacks on delivery eviction.

Gotcha. I see the tradeoff 🤔

@mensfeld
Copy link
Member

Give it a ms to actually go where it should (as it's async and librdkafka uses multiple queues and buffers down the road):

producer = Rdkafka::Config.new(config).producer
producer.produce(topic: "test-me", payload: "1")
sleep(0.001)
producer.flush(1000)

then it goes where expected (few attempts):

image

This is why we use rd_kafka_outq_len. It may be that there's one or two more settings.

So my guess here is that this message is not yet ready to be shipped. It may be that there are other things happening.

@mensfeld
Copy link
Member

The question is: is linger.ms ignored. My experience shows it should not and your example confirms that (1ms < 5000). The real question is about the flush not flushing the one. It may be something worth looking into, though if you close, it behaves as expected and you always want to close.

@dmariassy
Copy link
Author

dmariassy commented Jul 10, 2023

Give it a ms to actually go where it should (as it's async and librdkafka uses multiple queues and buffers down the road)

I actually sleep for a second

and you always want to close

So flush without close is not recommended?

@mensfeld
Copy link
Member

Let me correct myself:

This effectively makes Flush unusable, as the producer would produce at that time anyways.

flush() is mainly used to wait for outstanding messages to be delivered before terminating the producer, it is not aimed to be used in a synchronous produce+flush cycle. See https://github.com/edenhill/librdkafka/wiki/FAQ#why-is-there-no-sync-produce-interface

confluentinc/confluent-kafka-go#1013

So flush without close is not recommended?

You always want to use close. If you are implementing stuff like this, look at https://github.com/karafka/waterdrop + read my FAQ on that matter. There are several recommendations for production grade stable systems operations.

@mensfeld
Copy link
Member

rdkafka-ruby does not operate well on forks and we had many memory leaks down the road (all known are fixed as far as I remember by last round of fixes in this area), though dangling producers may upon termination not flush all OR may not have enough time before GC kicks in to run all the callbacks etc. I recall we had some problems with finalizers and GC + FFI and making sure your stuff is closed is the easiest way not to be surprised.

@mensfeld
Copy link
Member

I actually sleep for a second

Sleep between produce and flush ;)

@dmariassy
Copy link
Author

Sleep between produce and flush ;)

I'm still not seeing the data. But the example works on my co-workers machine, so it's possible that my local env is borked.

Thanks for all your answers!

@mensfeld
Copy link
Member

I do not know what you aim to achieve but if you are looking into advanced cases with warranties, instrumentation and monitoring + support of forking, the ability to distinguish in-process producer instances, and other things like that, you may be better with a higher level abstraction like waterdrop. rdkafka-ruby is supposed to provide level 1 with the same behaviors and similar APIs like librdkafka.

There are many edge cases there and I would always be happy to also accept help in other parts of Ruby-Kafka ecosystem :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

3 participants