diff --git a/build.fsx b/build.fsx index 961c33e..68b5dd8 100644 --- a/build.fsx +++ b/build.fsx @@ -146,7 +146,8 @@ Target "Build" (fun _ -> Target "RunTests" (fun _ -> !! testAssemblies |> NUnit3 (fun p -> - { p with TimeOut = TimeSpan.FromMinutes 20.}) + { p with TimeOut = TimeSpan.FromMinutes 20. + ProcessModel = NUnit3ProcessModel.SingleProcessModel }) ) #if MONO diff --git a/paket.dependencies b/paket.dependencies index c47b116..8f4e9a5 100644 --- a/paket.dependencies +++ b/paket.dependencies @@ -1,5 +1,6 @@ source https://api.nuget.org/v3/index.json nuget FSharp.Control.AsyncSeq +nuget Snappy.NET group Build source https://api.nuget.org/v3/index.json diff --git a/paket.lock b/paket.lock index 069ffdc..6adab20 100644 --- a/paket.lock +++ b/paket.lock @@ -1,4 +1,8 @@ NUGET + remote: https://api.nuget.org/v3/index.json + Crc32C.NET (1.0.5) + Snappy.NET (1.1.1.8) + Crc32C.NET (>= 1.0.5) remote: http://api.nuget.org/v3/index.json FSharp.Control.AsyncSeq (2.0.10) diff --git a/src/kafunk/AssemblyInfo.fs b/src/kafunk/AssemblyInfo.fs index 5dcbd4e..5187b9b 100644 --- a/src/kafunk/AssemblyInfo.fs +++ b/src/kafunk/AssemblyInfo.fs @@ -1,13 +1,13 @@ -namespace System +namespace System open System.Reflection [] [] [] -[] -[] +[] +[] do () module internal AssemblyVersionInformation = - let [] Version = "0.0.40" - let [] InformationalVersion = "0.0.40" + let [] Version = "0.1.0" + let [] InformationalVersion = "0.1.0" diff --git a/src/kafunk/Compression.fs b/src/kafunk/Compression.fs index fd10859..3f1be81 100644 --- a/src/kafunk/Compression.fs +++ b/src/kafunk/Compression.fs @@ -1,80 +1,175 @@ [] module internal Kafunk.Compression +open System.IO +open System.IO.Compression + open Kafunk let private createMessage (value:Value) (compression:byte) = let attrs = compression |> int8 Message.create value Binary.empty (Some attrs) -[] -module GZip = - - open System.IO - open System.IO.Compression - +[] +module internal Stream = // The only thing that can be compressed is a MessageSet, not a single Message; this results in a message containing the compressed set - let compress (messageVer:ApiVersion) (ms:MessageSet) = + let compress (codec: CompressionCodec) (makeStream: MemoryStream -> Stream) (messageVer:ApiVersion) (ms:MessageSet) = // TODO: pool MemoryStreams use outputStream = new MemoryStream() - use gZipStream = new GZipStream(outputStream, CompressionMode.Compress) do let inputBytes = MessageSet.Size (messageVer,ms) |> Array.zeroCreate let buf = Binary.ofArray inputBytes MessageSet.Write (messageVer,ms,BinaryZipper(buf)) - //MessageSet.write messageVer ms buf |> ignore - try - gZipStream.Write(inputBytes, 0, inputBytes.Length) - gZipStream.Close() - with :? IOException as _ex -> - // TODO: log this - //printfn "Couldn't write to gzip stream: %A" ex - reraise() - createMessage (outputStream.ToArray() |> Binary.ofArray) CompressionCodec.GZIP + use compStream = makeStream outputStream + compStream.Write(inputBytes, 0, inputBytes.Length) + createMessage (outputStream.ToArray() |> Binary.ofArray) codec - let decompress (messageVer:ApiVersion) (m:Message) = + let decompress (makeStream: MemoryStream -> Stream) (messageVer:ApiVersion) (m:Message) = let inputBytes = m.value |> Binary.toArray // TODO: pool MemoryStreams use outputStream = new MemoryStream() do use inputStream = new MemoryStream(inputBytes) - use gzipInputStream = new GZipStream(inputStream, CompressionMode.Decompress) - try - gzipInputStream.CopyTo(outputStream) - gzipInputStream.Close() - with :? IOException as _ex -> - // TODO: log this - //printfn "Couldn't read from gzip stream: %A" ex - reraise() + use compStream = makeStream inputStream + compStream.CopyTo(outputStream) outputStream.Position <- 0L let output = outputStream.ToArray() // size is output array size divided by message set element size let bz = BinaryZipper(output |> Binary.ofArray) MessageSet.Read (messageVer, 0, 0s, output.Length, bz) +[] +module GZip = + + open System.IO + open System.IO.Compression + + let compress = + Stream.compress CompressionCodec.GZIP <| fun memStream -> + upcast new GZipStream(memStream, CompressionMode.Compress) + + let decompress = + Stream.decompress <| fun memStream -> + upcast new GZipStream(memStream, CompressionMode.Decompress) + +[] +module Snappy = + + open System + open Snappy + + module internal Binary = + + let writeBlock (bytes: Binary.Segment) (buf : Binary.Segment) = + Buffer.BlockCopy(bytes.Array, bytes.Offset, buf.Array, buf.Offset, bytes.Count) + Binary.shiftOffset bytes.Count buf + + let readBlock (length: int) (buf : Binary.Segment) = + let arr = ArraySegment(buf.Array, buf.Offset, length) + arr, Binary.shiftOffset length buf + + let truncateIfSmaller actualLength maxLength (array: byte []) = + if actualLength < maxLength + then Binary.Segment(array, 0, actualLength) + else Binary.ofArray array + + type internal SnappyBinaryZipper (buf: Binary.Segment) = + + let mutable buffer = buf + + member this.Read<'a> (reader: Binary.Reader<'a>) = + let res, updatedBuffer = reader buffer + buffer <- updatedBuffer + res + + member this.Write<'a> (writer: Binary.Segment -> Binary.Segment) = + buffer <- writer buffer + + member this.Buffer = buffer + + member this.ShiftOffset(by: int) = this.Write(Binary.shiftOffset by) + + member this.Seek(offset: int) = + buffer <- Binary.Segment(buffer.Array, offset, buffer.Count) + + member this.WriteInt32(x) = this.Write(Binary.writeInt32 x) + member this.WriteBlock(block) = this.Write(Binary.writeBlock block) + + member this.ReadInt32() = this.Read(Binary.readInt32) + member this.ReadBlock(length) = this.Read(Binary.readBlock length) + + module internal CompressedMessage = + + module private Header = + // Magic string used by snappy-java. + let magic = [| byte -126; byte 'S'; byte 'N'; byte 'A'; byte 'P'; byte 'P'; byte 'Y'; byte 0 |] + // Current version number taken from snappy-java repo as of 22/05/2017. + let currentVer = 1 + // Minimum compatible version number taken from snappy-java repo as of 22/05/2017. + let minimumVer = 1 + // Total size of the header (magic string + two version ints + content length int) + let size = magic.Length + Binary.sizeInt32 currentVer + Binary.sizeInt32 minimumVer + Binary.sizeInt32 0 + + let compress (bytes: Binary.Segment) : Binary.Segment = + let maxLength = SnappyCodec.GetMaxCompressedLength(bytes.Count) + + let buf = Array.zeroCreate (Header.size + maxLength) + let bz = SnappyBinaryZipper(Binary.ofArray buf) + + // write header compatible with snappy-java. + bz.WriteBlock (Binary.ofArray Header.magic) + bz.WriteInt32 (Header.currentVer) + bz.WriteInt32 (Header.minimumVer) + + // move forward to write compressed content, then go back to write the actual compressed content length. + bz.ShiftOffset (Binary.sizeInt32 0) + + let length = SnappyCodec.Compress(bytes.Array, bytes.Offset, bytes.Count, bz.Buffer.Array, bz.Buffer.Offset) + + bz.Seek (Header.size - Binary.sizeInt32 length) + bz.WriteInt32 (length) + + Binary.truncateIfSmaller (Header.size + length) (Header.size + maxLength) buf + + let decompress (bytes: Binary.Segment) : Binary.Segment = + let bz = SnappyBinaryZipper(bytes) + + // TODO: do we want to validate these? + let magic = bz.ReadBlock(Header.magic.Length) + let currentVer = bz.ReadInt32() + let minimumVer = bz.ReadInt32() + + let contentLength = bz.ReadInt32() + let content = bz.ReadBlock(contentLength) + + let uncompressedLength = SnappyCodec.GetUncompressedLength(content.Array, content.Offset, content.Count) + + let buf = Array.zeroCreate uncompressedLength + let actualLength = SnappyCodec.Uncompress(content.Array, content.Offset, content.Count, buf, 0) + + Binary.truncateIfSmaller actualLength uncompressedLength buf + + let compress (messageVer:ApiVersion) (ms:MessageSet) = + let inputBytes = MessageSet.Size (messageVer,ms) |> Array.zeroCreate + let buf = Binary.ofArray inputBytes + MessageSet.Write (messageVer,ms,BinaryZipper(buf)) + + let output = CompressedMessage.compress buf + + createMessage output CompressionCodec.Snappy + + let decompress (messageVer:ApiVersion) (m:Message) = + let output = CompressedMessage.decompress m.value + let bz = BinaryZipper(output) + MessageSet.Read (messageVer, 0, 0s, output.Count, bz) + let compress (messageVer:int16) (compression:byte) (ms:MessageSet) = match compression with | CompressionCodec.None -> ms | CompressionCodec.GZIP -> MessageSet.ofMessage messageVer (GZip.compress messageVer ms) + | CompressionCodec.Snappy -> MessageSet.ofMessage messageVer (Snappy.compress messageVer ms) | _ -> failwithf "Incorrect compression codec %A" compression - -//let decompress (messageVer:int16) (ms:MessageSet) = -// if ms.messages.Length = 0 then ms -// else -// let rs = ResizeArray<_>(ms.messages.Length) -// for i = 0 to ms.messages.Length - 1 do -// let msi = ms.messages.[i] -// match (msi.message.attributes &&& (sbyte CompressionCodec.Mask)) |> byte with -// | CompressionCodec.None -> -// rs.Add msi -// | CompressionCodec.GZIP -> -// let ms' = GZip.decompress messageVer msi.message -// for j = 0 to ms'.messages.Length - 1 do -// rs.Add(ms'.messages.[j]) -// | c -> -// failwithf "compression_code=%i not supported" c -// MessageSet(rs.ToArray()) - + let decompress (messageVer:int16) (ms:MessageSet) = if ms.messages.Length = 0 then ms else @@ -83,8 +178,11 @@ let decompress (messageVer:int16) (ms:MessageSet) = match (msi.message.attributes &&& (sbyte CompressionCodec.Mask)) |> byte with | CompressionCodec.None -> [|msi|] | CompressionCodec.GZIP -> - let ms' = GZip.decompress messageVer msi.message - ms'.messages + let decompressed = GZip.decompress messageVer msi.message + decompressed.messages + | CompressionCodec.Snappy -> + let decompressed = Snappy.decompress messageVer msi.message + decompressed.messages | c -> failwithf "compression_code=%i not supported" c) |> MessageSet \ No newline at end of file diff --git a/src/kafunk/Protocol.fs b/src/kafunk/Protocol.fs index ca6f658..26ad62b 100644 --- a/src/kafunk/Protocol.fs +++ b/src/kafunk/Protocol.fs @@ -84,6 +84,10 @@ module Protocol = /// The timestamp of a message. type Timestamp = int64 + /// Byte flag indicating compression codec in use. + type CompressionCodec = byte + + [] module CompressionCodec = [] diff --git a/src/kafunk/kafunk.fsproj b/src/kafunk/kafunk.fsproj index fcdb76b..8e32fcc 100644 --- a/src/kafunk/kafunk.fsproj +++ b/src/kafunk/kafunk.fsproj @@ -97,7 +97,18 @@ - + + + + ..\..\packages\Crc32C.NET\lib\net20\Crc32C.NET.dll + True + True + + + + + + ..\..\packages\FSharp.Control.AsyncSeq\lib\net45\FSharp.Control.AsyncSeq.dll @@ -106,7 +117,7 @@ - + ..\..\packages\FSharp.Control.AsyncSeq\lib\portable-net45+netcore45+MonoAndroid1+MonoTouch1\FSharp.Control.AsyncSeq.dll @@ -116,4 +127,24 @@ + + + + + ..\..\packages\Snappy.NET\lib\net20\Snappy.NET.dll + True + True + + + + + + + ..\..\packages\Snappy.NET\lib\net45\Snappy.NET.dll + True + True + + + + \ No newline at end of file diff --git a/src/kafunk/paket.references b/src/kafunk/paket.references index dbfd40a..2b895ee 100644 --- a/src/kafunk/paket.references +++ b/src/kafunk/paket.references @@ -1 +1,2 @@ -FSharp.Control.AsyncSeq \ No newline at end of file +FSharp.Control.AsyncSeq +Snappy.NET \ No newline at end of file diff --git a/tests/kafunk.Tests/CompressionGzipTests.fs b/tests/kafunk.Tests/CompressionGzipTests.fs deleted file mode 100644 index e16ff35..0000000 --- a/tests/kafunk.Tests/CompressionGzipTests.fs +++ /dev/null @@ -1,30 +0,0 @@ -module CompressionGzipTests - -open Kafunk -open NUnit.Framework -open System -open System.Text - -[] -[] -let ``Compression.GZip should work`` () = - - let messageBytes = [| 1uy; 2uy; 3uy; 4uy; 2uy; 6uy; 8uy |] - let message2Bytes = [| 1uy; 2uy; 3uy; 2uy |] - - let message = Message.create (Binary.ofArray messageBytes) (Binary.empty) None - let message2 = Message.create (Binary.ofArray message2Bytes) (Binary.empty) None - - let inputMessage = - Compression.GZip.compress 0s (MessageSet.ofMessages 0s [message; message2]) - - let outputMessageSet = - Compression.GZip.decompress 0s inputMessage - - let messages = outputMessageSet.messages - Assert.IsTrue (messages.Length = 2) - let (offset, size, msg) = let x = messages.[0] in x.offset, x.messageSize, x.message - let (offset2, size2, msg2) = let x = messages.[1] in x.offset, x.messageSize, x.message - Assert.IsTrue (msg.value |> Binary.toArray = messageBytes) - Assert.IsTrue (msg2.value |> Binary.toArray = message2Bytes) - diff --git a/tests/kafunk.Tests/CompressionTests.fs b/tests/kafunk.Tests/CompressionTests.fs new file mode 100644 index 0000000..f2be93b --- /dev/null +++ b/tests/kafunk.Tests/CompressionTests.fs @@ -0,0 +1,98 @@ +module CompressionTests + +open Kafunk +open NUnit.Framework +open System +open System.Text + +[] +[] +let ``Compression.GZip should work`` () = + + let messageBytes = [| 1uy; 2uy; 3uy; 4uy; 2uy; 6uy; 8uy |] + let message2Bytes = [| 1uy; 2uy; 3uy; 2uy |] + + let message = Message.create (Binary.ofArray messageBytes) (Binary.empty) None + let message2 = Message.create (Binary.ofArray message2Bytes) (Binary.empty) None + + let inputMessage = + Compression.GZip.compress 0s (MessageSet.ofMessages 0s [message; message2]) + + let outputMessageSet = + Compression.GZip.decompress 0s inputMessage + + let messages = outputMessageSet.messages + Assert.IsTrue (messages.Length = 2) + let (offset, size, msg) = let x = messages.[0] in x.offset, x.messageSize, x.message + let (offset2, size2, msg2) = let x = messages.[1] in x.offset, x.messageSize, x.message + Assert.IsTrue (msg.value |> Binary.toArray = messageBytes) + Assert.IsTrue (msg2.value |> Binary.toArray = message2Bytes) + + +[] +[] +let ``Compression.Snappy should work`` () = + + let messageBytes = [| 1uy; 2uy; 3uy; 4uy; 2uy; 6uy; 8uy |] + let message2Bytes = [| 1uy; 2uy; 3uy; 2uy |] + + let message = Message.create (Binary.ofArray messageBytes) (Binary.empty) None + let message2 = Message.create (Binary.ofArray message2Bytes) (Binary.empty) None + + let inputMessage = + Compression.Snappy.compress 0s (MessageSet.ofMessages 0s [message; message2]) + + let outputMessageSet = + Compression.Snappy.decompress 0s inputMessage + + let messages = outputMessageSet.messages + Assert.IsTrue (messages.Length = 2) + let (offset, size, msg) = let x = messages.[0] in x.offset, x.messageSize, x.message + let (offset2, size2, msg2) = let x = messages.[1] in x.offset, x.messageSize, x.message + Assert.IsTrue (msg.value |> Binary.toArray = messageBytes) + Assert.IsTrue (msg2.value |> Binary.toArray = message2Bytes) + +module Samples = + /// Payload of the message sent using kafka-console-producer.bat with snappy compression on. + let consoleProducerMsg = + [| + // header + 130uy; 83uy; 78uy; 65uy; 80uy; 80uy; 89uy; 0uy; + 0uy; 0uy; 0uy; 1uy; 0uy; 0uy; 0uy; 1uy; + // length + 0uy; 0uy; 0uy; 25uy; + // content + 30uy; 0uy; 0uy; 25uy; 1uy; 72uy; 18uy; 89uy; 42uy; + 71uy; 135uy; 0uy; 0uy; 255uy; 255uy; 255uy; 255uy; + 0uy; 0uy; 0uy; 4uy; + // "test" + 116uy; 101uy; 115uy; 116uy + |] + +[] +[] +let ``Compression.Snappy writes messages that are compatible with reference implementation``() = + let payload = "test" + + let msg = Message.create (Binary.ofArray (Encoding.ASCII.GetBytes(payload))) (Binary.empty) None + let msgSet = MessageSet.ofMessage 0s msg + + let outputMsg = Compression.Snappy.compress 0s msgSet + + Assert.AreEqual (Samples.consoleProducerMsg, Binary.toArray outputMsg.value) + +[] +[] +let ``Compression.Snappy reads messages that are compatible with reference implementation``() = + let msg = Message.create (Binary.ofArray Samples.consoleProducerMsg) (Binary.empty) None + + let outputMsgSet = + Compression.Snappy.decompress 0s msg + + Assert.AreEqual (1, outputMsgSet.messages.Length) + + let payload = + let outputMsg = outputMsgSet.messages.[0] + Encoding.ASCII.GetString(outputMsg.message.value |> Binary.toArray) + + Assert.AreEqual ("test", payload) diff --git a/tests/kafunk.Tests/kafunk.Tests.fsproj b/tests/kafunk.Tests/kafunk.Tests.fsproj index ea3256f..cec7b54 100644 --- a/tests/kafunk.Tests/kafunk.Tests.fsproj +++ b/tests/kafunk.Tests/kafunk.Tests.fsproj @@ -72,7 +72,7 @@ - + @@ -104,7 +104,7 @@ - + ..\..\packages\FSharp.Control.AsyncSeq\lib\net45\FSharp.Control.AsyncSeq.dll @@ -113,7 +113,7 @@ - + ..\..\packages\FSharp.Control.AsyncSeq\lib\portable-net45+netcore45+MonoAndroid1+MonoTouch1\FSharp.Control.AsyncSeq.dll @@ -156,7 +156,7 @@ - + ..\..\packages\test\NUnit\lib\net45\nunit.framework.dll @@ -165,7 +165,7 @@ - + ..\..\packages\test\NUnit\lib\portable-net45+win8+wp8+wpa81+Xamarin.Mac+MonoAndroid10+MonoTouch10+Xamarin.iOS10\nunit.framework.dll