-
Notifications
You must be signed in to change notification settings - Fork 63
Conversation
@@ -146,7 +146,8 @@ Target "Build" (fun _ -> | |||
Target "RunTests" (fun _ -> | |||
!! testAssemblies | |||
|> NUnit3 (fun p -> | |||
{ p with TimeOut = TimeSpan.FromMinutes 20.}) | |||
{ p with TimeOut = TimeSpan.FromMinutes 20. | |||
ProcessModel = NUnit3ProcessModel.SingleProcessModel }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if you want this change - but I couldn't get the tests to run otherwise on my machine.
Seems this will be a bit more tricky than that. Kafka uses non-standard framing (snappy-java Currently Snappy.NET uses a hardcoded value for that ("sNaPpY"). |
@scrwtp do we need framing for this? |
@eulerfx I've just updated the PR with new changes. I've switched from SnappyStream to plain Compress/Decompress calls, and we also now put the same headers on the message as snappy-java does. Checked compatibility with reference kafka-console-producer/kafka-console-consumer, looks good both ways. |
[<assembly: AssemblyVersionAttribute("0.0.40")>] | ||
[<assembly: AssemblyFileVersionAttribute("0.0.40")>] | ||
[<assembly: AssemblyVersionAttribute("0.1.0")>] | ||
[<assembly: AssemblyFileVersionAttribute("0.1.0")>] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your fork might have been an older version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it just wasn't bumped, the fork was created this weekend?
src/kafunk/Compression.fs
Outdated
let compressed = SnappyCodec.Compress(inputBytes) | ||
let output = CompressedMessage.pack compressed | ||
createMessage (Binary.ofArray output) CompressionCodec.Snappy | ||
with :? IOException as _ex -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say lets just drop this with
here and above since we're not adding any context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine dropping it. Don't you want logging there though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception would be caught at a higher level in the code anyway. Adding some context here might be useful, but not necessary IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/kafunk/Compression.fs
Outdated
MessageSet.Write (messageVer,ms,BinaryZipper(buf)) | ||
try | ||
let compressed = SnappyCodec.Compress(inputBytes) | ||
let output = CompressedMessage.pack compressed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be possible to avoid copying the array. Looking at SnappyCodec.Compress, it uses GetMaxCompressedLength to estimate the size of the array. What if you use that to allocate the array with the header bytes added to it, then use the other overload of Compress to write into that array?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take a stab at it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/kafunk/Compression.fs
Outdated
|
||
// write content | ||
// NOTE: this will also write content length in the first 4 bytes, this is expected. | ||
bz.WriteBytes(Binary.ofArray bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be worth using an internally copied version of this. BinaryZipper was designed specifically for the Kafka protocol and its possible it will cause an issue down the road if its refactored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can do a WriteInt32 followed by WriteBlock, but I'd think one of the tests added would fail if WriteBytes changed. (Compression.Snappy reads messages that are compatible with reference implementation
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to create a function called writeBytes local to the snappy compressor, that would simply do the same thing as WriteBytes. The reason is that WriteBytes might change for another reason at some point, driven by a need from the Kafka protocol, and it can cause an issue here down the road.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, no more dependency on BinaryZipper.
# Conflicts: # src/kafunk/Compression.fs
src/kafunk/Compression.fs
Outdated
let output = CompressedMessage.compress inputBytes | ||
|
||
createMessage (Binary.ofArray output) CompressionCodec.Snappy | ||
|
||
let decompress (messageVer:ApiVersion) (m:Message) = | ||
let inputBytes = m.value |> Binary.toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this copy be avoided by passing m.value directly into the binary zipper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - the same fix can be applied in the gzip version.
let arr = ArraySegment<byte>(buf.Array, buf.Offset, length) | ||
arr, Binary.shiftOffset length buf | ||
|
||
let truncateIfSmaller actualLength maxLength (array: byte []) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than copying an array, you can truncate by adjusting the offset on an ArraySegment (Binary module has some helper for this).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed what Snappy.NET did, but I guess it can be avoided. Made both compress and decompress here be Binary.Segment - > Binary.Segment
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah looks good. Also here:
let buf = Array.zeroCreate uncompressedLength
let actualLength = SnappyCodec.Uncompress(content.Array, content.Offset, content.Count, buf, 0)
Binary.truncateIfSmaller actualLength uncompressedLength buf
The call to Binary.truncateIfSmaller
should operate on ArraySegment<byte>
rather than array directly. This way, when it actually has to truncate, it doesn't call Binary.toArray
which creates a new array, but simply adjusts the offsets on the ArraySegment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't call Binary.toArray
anymore - it does take an array, but it will return a Segment
of appropriate length.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry I misread ofArray
Looks good. I've tried this out and it appears to be working, and performs well. I'm going to merge, and the only thing I'll do is I'll "unroll" the Write/Read methods on the SnappyBinaryZipper (much like BinaryZipper is unrolled) to avoid FSharpFunc allocations and virtual calls - this is a critical enough area where we should get low-level. |
I like the way they read, how about making them inline? |
In critical paths you often forego readability/reusability for the sake of performance. Making them inline won't help in this case - the calls to the underlying operations on the Btw, if you look at the IL for the former implementation of WriteInt32, it looks like this:
The |
Adding Snappy compression using Snappy.NET.