-
Notifications
You must be signed in to change notification settings - Fork 860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
1.0 API Feedback #614
Comments
Super exciting to see it getting close to completion! Speaking of It gives a lot of flexibility, to quote this blog post, https://blogs.msdn.microsoft.com/dotnet/2018/07/09/system-io-pipelines-high-performance-io-in-net/
but since |
I've update today my software to the 1.0Experimental-2 version because I needed the AdminClient. Thanks in advance |
@xXCiccioXx - first of all, check out Reasons for this change:
|
@xqrzd - thanks for your input here - you're pointing out some important things I've yet to research :-). Updating the serializer interface is in scope for 1.0 because it's a breaking change to the API, but it's a little bit down the list because it's fairly well hidden (most people won't touch this part of the API) and there are more visible things to implement / bugs to fix first. So in summary, thanks for your work, it's useful! and sorry for the delay in getting to this (but we will). |
Testing nuget 1.0.0-experimental-12
ErrorCode.Local_ValueDeserialization is manifestation of this problem. There are 2 ways to handle errors, exceptions and error codes. ErrorCode.Local_ValueDeserialization combines 2. You have exception with error code :) So now I can not apply standard exception handling pattern So my point is, forcing IDesrializer does not solve any problem in Linq-enabled world and brings design challenges. Exposing IEnumerable and letting user deal with deserialization is arguably easier and definitely more flexible. I understand that "no generic" is too aggressive move, so maybe adding Raw[Consumer|Producer] and String[Consumer|Producer] will allow zero config solution for majority cases?
IDeserializer is inherited from IDisposable. It means that kafka client demand serializer to be disposable. But why? I do not see any use of serializer.Dispose from client. Serializer object is created outside of client and client is provided with an instance of serializer. So it is application's responsibility to call Dispose. Client has no knowledge when it is safe to call Dispose nor does it have any knowledge either IDisposable makes sense. If client has no knowledge either it is required nor when to call it, why does it put such a requirement at all? If you remove IDisposable from I[De]Serializer then app code can implement IDisposable only for cases when it is needed thus simplifying 99% of serializer calsses. I[De]Serializer.Configure
No googling "is timeout int or int64" anymore and one less runtime exception. |
Is there a reason that the callback passed in the 'error_cb' setting is inconsistent between Producers (where it has to be an |
@awhittier-metabolon - thanks! that's an error (fixed to be consistent in #623). Notes: 1. that PR still requires more work including addressing comment by @qed-. 2. the exact delegate type to use here is actually still under consideration. |
great feedback @vchekan - I've now actioned a lot of it. I'm replacing an old comment with this more succinct one for the benefit of anyone reading along. Decision to drop the raw (non-generic) consumer and producer.
Exceptions for every librdkafka error
ISerializer interface vs functions.
Provide headers to deserializer
|
De/Serializers... There are serdes for most of the dotnet intrinsic types. But the Producer/Consumer classes only recognize a couple from the P/C generic type def. Maybe add a |
In the release note for 1.0.0-beta,
I'm not able to find any reference to ConsumerAsync in the codebase. Are there plans for async consumers? |
Previously, I was going to have a completely async api, but the reality is it's not going to be possible to implement this 'properly' in the foreseeable future due to the way librdkafka works. I could have faked it using |
Excellent, looking forward to that. |
Is there a way to tell from the return value of Consume() if we are at partition EOF? |
@qed- no, but i'll re-consider it. |
@qed- currently consume can return one of two things - a message or null. if that is expanded to be three things (partition eof event), it makes writing the consume loop messier - even for people not concerned with partition eof. for that reason i exposed this as an event. a flag on message indicating this is the last message is another possibility, but as @edenhill just pointed out to me, that won't tell you you're at partition eof if there is no message. can you just write something like?:
|
What I see in library version 1.0.0-beta2 is two overloads of Consume(). The one that takes a timeout will return null in the case of both no message and partition EOF (also firing the event for partition EOF). The one that takes a cancellation token will block in both of those cases so you'd never see null returned from it. This seems a bit inconsistent. Your example code would work for the former case but not the latter, you'd need to put any code that wants to handle EOF directly in the event handler. Another option would be to add a partition EOF boolean to ConsumeResult. This wouldn't require people not interested in this flag to do anything different, but would allow clients that do care to know the difference between no message and EOF. |
To make my use case more concrete: I am using a consumer to persist the latest value per key in a compacted topic to a database. The persistence is triggered by reaching EOF, so it will only write the "current" value. |
What about returning a sentinel value?
var msg = consumer.Consume...
if(msg == Message.Eof) { ...handle eof... }
if(msg != null) { ...process message... }
…On Wed, 31 Oct 2018 at 17:14, David Chapman ***@***.***> wrote:
To make my use case more concrete: I am using a consumer to persist the
latest value per key in a compacted topic to a database. The persistence is
triggered by reaching EOF, so it will only write the "current" value.
With the beta2 API I'd have to put logic to update the dictionary in the
polling loop, and logic to write to database in the EOF event handler. This
is fine as the EOF handler gets invoked on the polling thread but it
doesn't seem as neat as it could be.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#614 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAvrq0ZWY9YjyG6Xub5KOd3gsrY4EZKNks5uqdp-gaJpZM4WSpB6>
.
|
thanks for input on this - you're right, it's not quite right. my additional thoughts: @qed- - i actually think it's fine (good) that the timeout variant can return @AndyPook - thanks for adding another alternative into the mix. i think it doesn't help though since it still adds a requirement for additional checking even in the case where people don't care. here's another idea:
quite liking that idea. |
update to the above (which was written in haste as i was about to board a plane): librdkafka already has the |
If you can target language version 7+ you could do this with pattern matching: public abstract class ConsumeResult public class Message : ConsumeResult public class PartitionEOF: ConsumeResult var result = consumer.Consume(); switch(result) case PartitionEof eof: |
What's the current status on this EOF Idea? Testing wise I would really like to work with the interface, so mocking is possible. But working with the interfce at this moment doesn't allow any eof detection. (working with beta2) |
Update on partition EOFs: in librdkafka v1.0, the default value for I like the subclass idea (@qed- ), except in practice the generic parameters get in the way. You'd end up with I like the message sentinel idea (@AndyPook ) except what is it? A boolean flag on |
|
I don't get the implication? Do you mean, when set to false there will be a ConsumeResult with a boolean flag when eof is reached? But how do you know if eof is reached when you set the option to false, don't you need the setting to be true? |
by default you don't get any eof notification. this means the code handling the |
what i'm concerned about is making the code as short as possible in the case people don't want eof notifications, which is most of the time. |
Ok I get it, thx |
Hello, me again :) Raising the Consumer.Consume methods should be async question again... I see that a But putting that to one side... why not just make this method async (as I previously tried to argue) this would make it just the same pattern as how the Producer handles the same situation? I realise that this is quite a breaking change this late in the process. |
would you accept a PR that tidies up a bunch of things? |
welcome back! we merge into 1.0.x, then up into master. we'll implement absolutely feel free to open PRs like you propose on 1.0.x. If they're small an obviously good they can go in before 1.0. err on the side of more small and simple PRs. thanks :-) We're going to release 1.0 asap. Nearly did it already, but see open PR + i'm reviewing some code that uses the library which may be enlightening, so going to do that first as well just in case. |
I get that librdkafka is not async. But... I cannot understand why it can be done correctly in the Producer but not in the Consumer. BTW: ConfigureAwait(false) does not need to be done at every level. Making the Consume methods async (using the same pattern as Lots of nuance. most of which is resolved by making these methods |
I'm thinking the the consumer consume call is blocking and the produce call is not - I don't think the Without the |
Here we go again... This "blocking" concern, just isn't one. It is completely typical (especially with interop things). But we've had this discussion before. I get and appreciate, that you're trying to do "the right thing" and factor things in a compact way. But it leads it into an async pickle.
So, we keep Consumer as pure sync and just wrap the async serde with the standard sync-over-async pattern. The Builder can deal with the wrapping and the Consumer is none the wiser. Yes this is just a different method of hiding the async'ness of the async serdes, but, at least it vastly simplifies the Consumer. |
yep, you get no benefit. In fact, the implementation forces the serde to be run on a different thread (where there is no async context) to ensure deadlocks can't happen, which is far from ideal from an efficiency perspective. it's a complete hack. a further possible downside from doing this is if the async method suspends, it could resume on a different thread so the deserializer can't make any assumptions about this - but in practice, no deserializer is going to. this is an interim measure. you are right that it was perhaps a bad idea to have provided |
Note: I'm going to consult an expert on this issue on Thursday. Thanks for persisting. |
btw: If you provided an example program that I could run, which demonstrates a scenario in which the current implementation does not work, that would really grab my attention. |
i think the scenario under 'Real-World Example' in https://devblogs.microsoft.com/pfxteam/should-i-expose-synchronous-wrappers-for-asynchronous-methods/ might apply to the current implementation. something that is unique to our scenario is the async deserializers will almost never need to wait on IO - they will almost always run without suspending. Also, in most scenarios, we won't want them to be executing in parallel if they do need to wait on IO (since they are probably looking up the same schema in schema registry, wasted effort an additional load). maybe it's possible to leverage this and force them to run serially. the above also has a lot to do with why I think what i'm doing is likely legitimate - our scenario is very atypical. |
you're right - there will be changes, stay tuned. |
My suggestion is not that the current impl will break. But is "wrong" in other ways. Doing Task.Run will create another Task that "might" cause another Thread leaving the current one blocked until it returns. Note that Task has a bool property So I wouldn't say that this is that atypical. I've written many things that had to be async (due to an interface requirement) that are entirely sync code , then just return Task.Completed. Also methods that does a bunch of sync stuff and optionally makes choices about whether to do async things, so it might complete synchronously or asynchronously. Even if you can't buy into that, please remove the Task.Run. Just call Yep, I seen Toubs article (he is def one of the authoritative source on this stuff). But... at that time people where asking is they should be creating async versions of every method in their systems. This is what he's advising against. It doesn't make sense. Elsewhere, people like him, say the equivalent of "do not do sync over async", "async things should only be called from async methods" Just one guys opinion. Hoping your discussion tomorrow correlates. |
I'm quite sure the current implementation can deadlock due to thread pool exhaustion (not to mention it's inefficient), which counts it out as a possibility. I didn't realize that before. reverting to Re: I'm still not sure of the best way to resolve this. The leading candidate in my mind right now is re-instating the We could also make a Re: multiple Consumer classes - tried it, very much don't like the api complexity / surface area it brings. I'd prefer to have runtime errors if one class is used inappropriately. |
"reverting to asyncValueDeserializer.DeserializeAsync(...).ConfigureAwait(false).GetAwaiter().GetResult(); isn't acceptable either because it requires all async calls in the DeserializAsync implementation to have ConfigureAwait(false) applied" Another thing to note is that using CA(f) means that any "ambient" values held in things like The guidance is actually for library developers to handle any CA(f) requirements as low as possible and within an async method. This protects any callers fro needing to worry about CA(f) I hope I've explained that well enough. Suffice to say that you only need CA(f) at some defined boundary and not all the way down the stack. |
... base class +... even if the only descendant "you" provide contains the current thing. It leaves the system open to library users to provide they're own customisations. ie the Open-Closed principal. |
As I'm on a roll :) I'll try to do a PR for this first thing tomorrow |
woah, that's a good catch. thanks for that. |
btw: do you want some confluent/kafka swag? (t-shirt and/or book or something), if so let me know via email (look in my profile) and I'll see what I can do. |
Ha! feels like I've just been a pita! Is that a bribe? 😄 ... I'll take it 😉 |
#878 for the "use interfaces" thing from above |
This issue is to track community feedback on the new 1.0 API and document changes that are likely to be made before it's release. We'll continue to update this first comment to reflect open changes currently under consideration & progress.
For problems, please open another issue.
Comments / suggestions? Add them below!
Current changes under consideration for 1.0:
async
serializers/deserializers (serdes that returnTask
s) in addition to simple serde types.ProduceAsync
- correctly implement aCancellationToken
parameter.AdminClient.GetMetadata
- remove this method and add methods analogous to the Java client.ListGroups
- parse the group data and change the return type.Span<T>
serializers.AdminClient
methods. Even though it's not useful now, a return type from e.g.CreateTopicsAsync
would allow for easier future extensibility.ErrorCode
s.Consumer<,>
implementIEnumerable
.Commit
to throwCommitException
, notTopicPartitionOffsetException
.Review TODOs:
SCRAM-SHA-1
sasl mechanism.Sasl_Plaintext
,Sasl_Ssl
,Roundrobin
,Ssl
.Dictionary<K,V>
.IPAddress.HostToNetworkOrder
.The text was updated successfully, but these errors were encountered: