Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Snappy compression #146

Merged
merged 7 commits into from
May 25, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ Target "Build" (fun _ ->
Target "RunTests" (fun _ ->
!! testAssemblies
|> NUnit3 (fun p ->
{ p with TimeOut = TimeSpan.FromMinutes 20.})
{ p with TimeOut = TimeSpan.FromMinutes 20.
ProcessModel = NUnit3ProcessModel.SingleProcessModel })
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you want this change - but I couldn't get the tests to run otherwise on my machine.

)

#if MONO
Expand Down
1 change: 1 addition & 0 deletions paket.dependencies
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
source https://api.nuget.org/v3/index.json
nuget FSharp.Control.AsyncSeq
nuget Snappy.NET

group Build
source https://api.nuget.org/v3/index.json
Expand Down
4 changes: 4 additions & 0 deletions paket.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
NUGET
remote: https://api.nuget.org/v3/index.json
Crc32C.NET (1.0.5)
Snappy.NET (1.1.1.8)
Crc32C.NET (>= 1.0.5)
remote: http://api.nuget.org/v3/index.json
FSharp.Control.AsyncSeq (2.0.10)

Expand Down
10 changes: 5 additions & 5 deletions src/kafunk/AssemblyInfo.fs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
namespace System
namespace System
open System.Reflection

[<assembly: AssemblyTitleAttribute("kafunk")>]
[<assembly: AssemblyProductAttribute("kafunk")>]
[<assembly: AssemblyDescriptionAttribute("F# client for Kafka")>]
[<assembly: AssemblyVersionAttribute("0.0.40")>]
[<assembly: AssemblyFileVersionAttribute("0.0.40")>]
[<assembly: AssemblyVersionAttribute("0.1.0")>]
[<assembly: AssemblyFileVersionAttribute("0.1.0")>]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your fork might have been an older version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it just wasn't bumped, the fork was created this weekend?

do ()

module internal AssemblyVersionInformation =
let [<Literal>] Version = "0.0.40"
let [<Literal>] InformationalVersion = "0.0.40"
let [<Literal>] Version = "0.1.0"
let [<Literal>] InformationalVersion = "0.1.0"
84 changes: 48 additions & 36 deletions src/kafunk/Compression.fs
Original file line number Diff line number Diff line change
@@ -1,47 +1,43 @@
[<Compile(Module)>]
module internal Kafunk.Compression

open System.IO
open System.IO.Compression

open Kafunk

let private createMessage (value:Value) (compression:byte) =
let attrs = compression |> int8
Message.create value Binary.empty (Some attrs)

[<Compile(Module)>]
module GZip =

open System.IO
open System.IO.Compression

[<RequireQualifiedAccess>]
module internal Stream =
// The only thing that can be compressed is a MessageSet, not a single Message; this results in a message containing the compressed set
let compress (messageVer:ApiVersion) (ms:MessageSet) =
let compress (codec: CompressionCodec) (makeStream: MemoryStream -> Stream) (messageVer:ApiVersion) (ms:MessageSet) =
// TODO: pool MemoryStreams
use outputStream = new MemoryStream()
use gZipStream = new GZipStream(outputStream, CompressionMode.Compress)
do
let inputBytes = MessageSet.Size (messageVer,ms) |> Array.zeroCreate
let buf = Binary.ofArray inputBytes
MessageSet.Write (messageVer,ms,BinaryZipper(buf))
//MessageSet.write messageVer ms buf |> ignore
use compStream = makeStream outputStream
try
gZipStream.Write(inputBytes, 0, inputBytes.Length)
gZipStream.Close()
compStream.Write(inputBytes, 0, inputBytes.Length)
with :? IOException as _ex ->
// TODO: log this
//printfn "Couldn't write to gzip stream: %A" ex
reraise()
createMessage (outputStream.ToArray() |> Binary.ofArray) CompressionCodec.GZIP
createMessage (outputStream.ToArray() |> Binary.ofArray) codec

let decompress (messageVer:ApiVersion) (m:Message) =
let decompress (makeStream: MemoryStream -> Stream) (messageVer:ApiVersion) (m:Message) =
let inputBytes = m.value |> Binary.toArray
// TODO: pool MemoryStreams
use outputStream = new MemoryStream()
do
use inputStream = new MemoryStream(inputBytes)
use gzipInputStream = new GZipStream(inputStream, CompressionMode.Decompress)
use compStream = makeStream inputStream
try
gzipInputStream.CopyTo(outputStream)
gzipInputStream.Close()
compStream.CopyTo(outputStream)
with :? IOException as _ex ->
// TODO: log this
//printfn "Couldn't read from gzip stream: %A" ex
Expand All @@ -52,29 +48,42 @@ module GZip =
let bz = BinaryZipper(output |> Binary.ofArray)
MessageSet.Read (messageVer, 0, 0s, output.Length, bz)

[<Compile(Module)>]
module GZip =

open System.IO
open System.IO.Compression

let compress =
Stream.compress CompressionCodec.GZIP <| fun memStream ->
upcast new GZipStream(memStream, CompressionMode.Compress)

let decompress =
Stream.decompress <| fun memStream ->
upcast new GZipStream(memStream, CompressionMode.Decompress)

[<Compile(Module)>]
module Snappy =

open System.IO
open System.IO.Compression
open Snappy

let compress =
Stream.compress CompressionCodec.Snappy <| fun memStream ->
upcast new SnappyStream(memStream, CompressionMode.Compress)

let decompress =
Stream.decompress <| fun memStream ->
upcast new SnappyStream(memStream, CompressionMode.Decompress)

let compress (messageVer:int16) (compression:byte) (ms:MessageSet) =
match compression with
| CompressionCodec.None -> ms
| CompressionCodec.GZIP -> MessageSet.ofMessage messageVer (GZip.compress messageVer ms)
| CompressionCodec.Snappy -> MessageSet.ofMessage messageVer (Snappy.compress messageVer ms)
| _ -> failwithf "Incorrect compression codec %A" compression

//let decompress (messageVer:int16) (ms:MessageSet) =
// if ms.messages.Length = 0 then ms
// else
// let rs = ResizeArray<_>(ms.messages.Length)
// for i = 0 to ms.messages.Length - 1 do
// let msi = ms.messages.[i]
// match (msi.message.attributes &&& (sbyte CompressionCodec.Mask)) |> byte with
// | CompressionCodec.None ->
// rs.Add msi
// | CompressionCodec.GZIP ->
// let ms' = GZip.decompress messageVer msi.message
// for j = 0 to ms'.messages.Length - 1 do
// rs.Add(ms'.messages.[j])
// | c ->
// failwithf "compression_code=%i not supported" c
// MessageSet(rs.ToArray())


let decompress (messageVer:int16) (ms:MessageSet) =
if ms.messages.Length = 0 then ms
else
Expand All @@ -83,8 +92,11 @@ let decompress (messageVer:int16) (ms:MessageSet) =
match (msi.message.attributes &&& (sbyte CompressionCodec.Mask)) |> byte with
| CompressionCodec.None -> [|msi|]
| CompressionCodec.GZIP ->
let ms' = GZip.decompress messageVer msi.message
ms'.messages
let decompressed = GZip.decompress messageVer msi.message
decompressed.messages
| CompressionCodec.Snappy ->
let decompressed = Snappy.decompress messageVer msi.message
decompressed.messages
| c -> failwithf "compression_code=%i not supported" c)
|> MessageSet

4 changes: 4 additions & 0 deletions src/kafunk/Protocol.fs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ module Protocol =
/// The timestamp of a message.
type Timestamp = int64

/// Byte flag indicating compression codec in use.
type CompressionCodec = byte

[<Compile(Module)>]
module CompressionCodec =

[<Literal>]
Expand Down
35 changes: 33 additions & 2 deletions src/kafunk/kafunk.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,18 @@
<Reference Include="System.ServiceModel" />
</ItemGroup>
<Choose>
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2' Or $(TargetFrameworkVersion) == 'v4.6.3')">
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v2.0' Or $(TargetFrameworkVersion) == 'v3.0' Or $(TargetFrameworkVersion) == 'v3.5' Or $(TargetFrameworkVersion) == 'v4.0' Or $(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2' Or $(TargetFrameworkVersion) == 'v4.6.3' Or $(TargetFrameworkVersion) == 'v4.7')">
<ItemGroup>
<Reference Include="Crc32C.NET">
<HintPath>..\..\packages\Crc32C.NET\lib\net20\Crc32C.NET.dll</HintPath>
<Private>True</Private>
<Paket>True</Paket>
</Reference>
</ItemGroup>
</When>
</Choose>
<Choose>
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2' Or $(TargetFrameworkVersion) == 'v4.6.3' Or $(TargetFrameworkVersion) == 'v4.7')">
<ItemGroup>
<Reference Include="FSharp.Control.AsyncSeq">
<HintPath>..\..\packages\FSharp.Control.AsyncSeq\lib\net45\FSharp.Control.AsyncSeq.dll</HintPath>
Expand All @@ -106,7 +117,7 @@
</Reference>
</ItemGroup>
</When>
<When Condition="($(TargetFrameworkIdentifier) == 'WindowsPhoneApp') Or ($(TargetFrameworkIdentifier) == '.NETCore') Or ($(TargetFrameworkIdentifier) == '.NETStandard' And ($(TargetFrameworkVersion) == 'v1.1' Or $(TargetFrameworkVersion) == 'v1.2' Or $(TargetFrameworkVersion) == 'v1.3' Or $(TargetFrameworkVersion) == 'v1.4' Or $(TargetFrameworkVersion) == 'v1.5' Or $(TargetFrameworkVersion) == 'v1.6')) Or ($(TargetFrameworkIdentifier) == '.NETCoreApp' And $(TargetFrameworkVersion) == 'v1.0') Or ($(TargetFrameworkIdentifier) == 'MonoAndroid') Or ($(TargetFrameworkIdentifier) == 'MonoTouch') Or ($(TargetFrameworkIdentifier) == 'Xamarin.iOS') Or ($(TargetFrameworkIdentifier) == 'Xamarin.Mac') Or ($(TargetFrameworkProfile) == 'Profile7') Or ($(TargetFrameworkProfile) == 'Profile44')">
<When Condition="($(TargetFrameworkIdentifier) == 'WindowsPhoneApp') Or ($(TargetFrameworkIdentifier) == '.NETCore') Or ($(TargetFrameworkIdentifier) == '.NETStandard' And ($(TargetFrameworkVersion) == 'v1.1' Or $(TargetFrameworkVersion) == 'v1.2' Or $(TargetFrameworkVersion) == 'v1.3' Or $(TargetFrameworkVersion) == 'v1.4' Or $(TargetFrameworkVersion) == 'v1.5' Or $(TargetFrameworkVersion) == 'v1.6' Or $(TargetFrameworkVersion) == 'v2.0')) Or ($(TargetFrameworkIdentifier) == '.NETCoreApp' And ($(TargetFrameworkVersion) == 'v1.0' Or $(TargetFrameworkVersion) == 'v1.1' Or $(TargetFrameworkVersion) == 'v2.0')) Or ($(TargetFrameworkIdentifier) == 'MonoAndroid') Or ($(TargetFrameworkIdentifier) == 'MonoTouch') Or ($(TargetFrameworkIdentifier) == 'Xamarin.iOS') Or ($(TargetFrameworkIdentifier) == 'Xamarin.Mac') Or ($(TargetFrameworkProfile) == 'Profile7') Or ($(TargetFrameworkProfile) == 'Profile44')">
<ItemGroup>
<Reference Include="FSharp.Control.AsyncSeq">
<HintPath>..\..\packages\FSharp.Control.AsyncSeq\lib\portable-net45+netcore45+MonoAndroid1+MonoTouch1\FSharp.Control.AsyncSeq.dll</HintPath>
Expand All @@ -116,4 +127,24 @@
</ItemGroup>
</When>
</Choose>
<Choose>
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v2.0' Or $(TargetFrameworkVersion) == 'v3.0' Or $(TargetFrameworkVersion) == 'v3.5' Or $(TargetFrameworkVersion) == 'v4.0')">
<ItemGroup>
<Reference Include="Snappy.NET">
<HintPath>..\..\packages\Snappy.NET\lib\net20\Snappy.NET.dll</HintPath>
<Private>True</Private>
<Paket>True</Paket>
</Reference>
</ItemGroup>
</When>
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2' Or $(TargetFrameworkVersion) == 'v4.6.3' Or $(TargetFrameworkVersion) == 'v4.7')">
<ItemGroup>
<Reference Include="Snappy.NET">
<HintPath>..\..\packages\Snappy.NET\lib\net45\Snappy.NET.dll</HintPath>
<Private>True</Private>
<Paket>True</Paket>
</Reference>
</ItemGroup>
</When>
</Choose>
</Project>
3 changes: 2 additions & 1 deletion src/kafunk/paket.references
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
FSharp.Control.AsyncSeq
FSharp.Control.AsyncSeq
Snappy.NET
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module CompressionGzipTests
module CompressionTests

open Kafunk
open NUnit.Framework
Expand Down Expand Up @@ -28,3 +28,26 @@ let ``Compression.GZip should work`` () =
Assert.IsTrue (msg.value |> Binary.toArray = messageBytes)
Assert.IsTrue (msg2.value |> Binary.toArray = message2Bytes)


[<Test>]
[<Category("Compression")>]
let ``Compression.Snappy should work`` () =

let messageBytes = [| 1uy; 2uy; 3uy; 4uy; 2uy; 6uy; 8uy |]
let message2Bytes = [| 1uy; 2uy; 3uy; 2uy |]

let message = Message.create (Binary.ofArray messageBytes) (Binary.empty) None
let message2 = Message.create (Binary.ofArray message2Bytes) (Binary.empty) None

let inputMessage =
Compression.Snappy.compress 0s (MessageSet.ofMessages 0s [message; message2])

let outputMessageSet =
Compression.Snappy.decompress 0s inputMessage

let messages = outputMessageSet.messages
Assert.IsTrue (messages.Length = 2)
let (offset, size, msg) = let x = messages.[0] in x.offset, x.messageSize, x.message
let (offset2, size2, msg2) = let x = messages.[1] in x.offset, x.messageSize, x.message
Assert.IsTrue (msg.value |> Binary.toArray = messageBytes)
Assert.IsTrue (msg2.value |> Binary.toArray = message2Bytes)
Loading