Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Snappy compression #146

Merged
merged 7 commits into from
May 25, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ Target "Build" (fun _ ->
Target "RunTests" (fun _ ->
!! testAssemblies
|> NUnit3 (fun p ->
{ p with TimeOut = TimeSpan.FromMinutes 20.})
{ p with TimeOut = TimeSpan.FromMinutes 20.
ProcessModel = NUnit3ProcessModel.SingleProcessModel })
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you want this change - but I couldn't get the tests to run otherwise on my machine.

)

#if MONO
Expand Down
1 change: 1 addition & 0 deletions paket.dependencies
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
source https://api.nuget.org/v3/index.json
nuget FSharp.Control.AsyncSeq
nuget Snappy.NET

group Build
source https://api.nuget.org/v3/index.json
Expand Down
4 changes: 4 additions & 0 deletions paket.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
NUGET
remote: https://api.nuget.org/v3/index.json
Crc32C.NET (1.0.5)
Snappy.NET (1.1.1.8)
Crc32C.NET (>= 1.0.5)
remote: http://api.nuget.org/v3/index.json
FSharp.Control.AsyncSeq (2.0.10)

Expand Down
10 changes: 5 additions & 5 deletions src/kafunk/AssemblyInfo.fs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
namespace System
namespace System
open System.Reflection

[<assembly: AssemblyTitleAttribute("kafunk")>]
[<assembly: AssemblyProductAttribute("kafunk")>]
[<assembly: AssemblyDescriptionAttribute("F# client for Kafka")>]
[<assembly: AssemblyVersionAttribute("0.0.40")>]
[<assembly: AssemblyFileVersionAttribute("0.0.40")>]
[<assembly: AssemblyVersionAttribute("0.1.0")>]
[<assembly: AssemblyFileVersionAttribute("0.1.0")>]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your fork might have been an older version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it just wasn't bumped, the fork was created this weekend?

do ()

module internal AssemblyVersionInformation =
let [<Literal>] Version = "0.0.40"
let [<Literal>] InformationalVersion = "0.0.40"
let [<Literal>] Version = "0.1.0"
let [<Literal>] InformationalVersion = "0.1.0"
193 changes: 147 additions & 46 deletions src/kafunk/Compression.fs
Original file line number Diff line number Diff line change
@@ -1,80 +1,178 @@
[<Compile(Module)>]
module internal Kafunk.Compression

open System.IO
open System.IO.Compression

open Kafunk

let private createMessage (value:Value) (compression:byte) =
let attrs = compression |> int8
Message.create value Binary.empty (Some attrs)

[<Compile(Module)>]
module GZip =

open System.IO
open System.IO.Compression

[<RequireQualifiedAccess>]
module internal Stream =
// The only thing that can be compressed is a MessageSet, not a single Message; this results in a message containing the compressed set
let compress (messageVer:ApiVersion) (ms:MessageSet) =
let compress (codec: CompressionCodec) (makeStream: MemoryStream -> Stream) (messageVer:ApiVersion) (ms:MessageSet) =
// TODO: pool MemoryStreams
use outputStream = new MemoryStream()
use gZipStream = new GZipStream(outputStream, CompressionMode.Compress)
do
let inputBytes = MessageSet.Size (messageVer,ms) |> Array.zeroCreate
let buf = Binary.ofArray inputBytes
MessageSet.Write (messageVer,ms,BinaryZipper(buf))
//MessageSet.write messageVer ms buf |> ignore
try
gZipStream.Write(inputBytes, 0, inputBytes.Length)
gZipStream.Close()
with :? IOException as _ex ->
// TODO: log this
//printfn "Couldn't write to gzip stream: %A" ex
reraise()
createMessage (outputStream.ToArray() |> Binary.ofArray) CompressionCodec.GZIP
use compStream = makeStream outputStream
compStream.Write(inputBytes, 0, inputBytes.Length)
createMessage (outputStream.ToArray() |> Binary.ofArray) codec

let decompress (messageVer:ApiVersion) (m:Message) =
let decompress (makeStream: MemoryStream -> Stream) (messageVer:ApiVersion) (m:Message) =
let inputBytes = m.value |> Binary.toArray
// TODO: pool MemoryStreams
use outputStream = new MemoryStream()
do
use inputStream = new MemoryStream(inputBytes)
use gzipInputStream = new GZipStream(inputStream, CompressionMode.Decompress)
try
gzipInputStream.CopyTo(outputStream)
gzipInputStream.Close()
with :? IOException as _ex ->
// TODO: log this
//printfn "Couldn't read from gzip stream: %A" ex
reraise()
use compStream = makeStream inputStream
compStream.CopyTo(outputStream)
outputStream.Position <- 0L
let output = outputStream.ToArray()
// size is output array size divided by message set element size
let bz = BinaryZipper(output |> Binary.ofArray)
MessageSet.Read (messageVer, 0, 0s, output.Length, bz)

[<Compile(Module)>]
module GZip =

open System.IO
open System.IO.Compression

let compress =
Stream.compress CompressionCodec.GZIP <| fun memStream ->
upcast new GZipStream(memStream, CompressionMode.Compress)

let decompress =
Stream.decompress <| fun memStream ->
upcast new GZipStream(memStream, CompressionMode.Decompress)

[<Compile(Module)>]
module Snappy =

open System
open Snappy

module internal Binary =

let writeBlock (bytes: Binary.Segment) (buf : Binary.Segment) =
Buffer.BlockCopy(bytes.Array, bytes.Offset, buf.Array, buf.Offset, bytes.Count)
Binary.shiftOffset bytes.Count buf

let readBlock (length: int) (buf : Binary.Segment) =
let arr = ArraySegment<byte>(buf.Array, buf.Offset, length)
arr, Binary.shiftOffset length buf

let truncateIfSmaller actualLength maxLength (array: byte []) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than copying an array, you can truncate by adjusting the offset on an ArraySegment (Binary module has some helper for this).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed what Snappy.NET did, but I guess it can be avoided. Made both compress and decompress here be Binary.Segment - > Binary.Segment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah looks good. Also here:

let buf = Array.zeroCreate uncompressedLength
let actualLength = SnappyCodec.Uncompress(content.Array, content.Offset, content.Count, buf, 0)
Binary.truncateIfSmaller actualLength uncompressedLength buf

The call to Binary.truncateIfSmaller should operate on ArraySegment<byte> rather than array directly. This way, when it actually has to truncate, it doesn't call Binary.toArray which creates a new array, but simply adjusts the offsets on the ArraySegment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't call Binary.toArray anymore - it does take an array, but it will return a Segment of appropriate length.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry I misread ofArray

if actualLength < maxLength
then Binary.toArray <| Binary.Segment(array, 0, actualLength)
else array

type internal SnappyBinaryZipper (buf: Binary.Segment) =

let mutable buffer = buf

member this.Read<'a> (reader: Binary.Reader<'a>) =
let res, updatedBuffer = reader buffer
buffer <- updatedBuffer
res

member this.Write<'a> (writer: Binary.Segment -> Binary.Segment) =
buffer <- writer buffer

member this.Buffer = buffer

member this.ShiftOffset(by: int) = this.Write(Binary.shiftOffset by)

member this.Seek(offset: int) =
buffer <- Binary.Segment(buffer.Array, offset, buffer.Count)

member this.WriteInt32(x) = this.Write(Binary.writeInt32 x)
member this.WriteBlock(block) = this.Write(Binary.writeBlock block)

member this.ReadInt32() = this.Read(Binary.readInt32)
member this.ReadBlock(length) = this.Read(Binary.readBlock length)

module internal CompressedMessage =

module private Header =
// Magic string used by snappy-java.
let magic = [| byte -126; byte 'S'; byte 'N'; byte 'A'; byte 'P'; byte 'P'; byte 'Y'; byte 0 |]
// Current version number taken from snappy-java repo as of 22/05/2017.
let currentVer = 1
// Minimum compatible version number taken from snappy-java repo as of 22/05/2017.
let minimumVer = 1
// Total size of the header (magic string + two version ints + content length int)
let size = magic.Length + Binary.sizeInt32 currentVer + Binary.sizeInt32 minimumVer + Binary.sizeInt32 0

let compress (bytes: byte []) : byte [] =
let maxLength = SnappyCodec.GetMaxCompressedLength(bytes.Length)

let buf = Array.zeroCreate (Header.size + maxLength)
let bz = SnappyBinaryZipper(Binary.ofArray buf)

// write header compatible with snappy-java.
bz.WriteBlock (Binary.ofArray Header.magic)
bz.WriteInt32 (Header.currentVer)
bz.WriteInt32 (Header.minimumVer)

// move forward to write compressed content, then go back to write the actual compressed content length.
bz.ShiftOffset (Binary.sizeInt32 0)

let length = SnappyCodec.Compress(bytes, 0, bytes.Length, bz.Buffer.Array, bz.Buffer.Offset)

bz.Seek (Header.size - Binary.sizeInt32 length)
bz.WriteInt32 (length)

Binary.truncateIfSmaller (Header.size + length) (Header.size + maxLength) buf

let decompress (bytes: byte []) : byte [] =
let bz = SnappyBinaryZipper(Binary.ofArray bytes)

// TODO: do we want to validate these?
let magic = bz.ReadBlock(Header.magic.Length)
let currentVer = bz.ReadInt32()
let minimumVer = bz.ReadInt32()

let contentLength = bz.ReadInt32()
let content = bz.ReadBlock(contentLength)

let uncompressedLength = SnappyCodec.GetUncompressedLength(content.Array, content.Offset, content.Count)

let buf = Array.zeroCreate uncompressedLength
let actualLength = SnappyCodec.Uncompress(content.Array, content.Offset, content.Count, buf, 0)

Binary.truncateIfSmaller actualLength uncompressedLength buf

let compress (messageVer:ApiVersion) (ms:MessageSet) =
let inputBytes = MessageSet.Size (messageVer,ms) |> Array.zeroCreate
let buf = Binary.ofArray inputBytes
MessageSet.Write (messageVer,ms,BinaryZipper(buf))

let output = CompressedMessage.compress inputBytes

createMessage (Binary.ofArray output) CompressionCodec.Snappy

let decompress (messageVer:ApiVersion) (m:Message) =
let inputBytes = m.value |> Binary.toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this copy be avoided by passing m.value directly into the binary zipper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - the same fix can be applied in the gzip version.


let output = CompressedMessage.decompress inputBytes

let bz = BinaryZipper(Binary.ofArray output)
MessageSet.Read (messageVer, 0, 0s, output.Length, bz)

let compress (messageVer:int16) (compression:byte) (ms:MessageSet) =
match compression with
| CompressionCodec.None -> ms
| CompressionCodec.GZIP -> MessageSet.ofMessage messageVer (GZip.compress messageVer ms)
| CompressionCodec.Snappy -> MessageSet.ofMessage messageVer (Snappy.compress messageVer ms)
| _ -> failwithf "Incorrect compression codec %A" compression

//let decompress (messageVer:int16) (ms:MessageSet) =
// if ms.messages.Length = 0 then ms
// else
// let rs = ResizeArray<_>(ms.messages.Length)
// for i = 0 to ms.messages.Length - 1 do
// let msi = ms.messages.[i]
// match (msi.message.attributes &&& (sbyte CompressionCodec.Mask)) |> byte with
// | CompressionCodec.None ->
// rs.Add msi
// | CompressionCodec.GZIP ->
// let ms' = GZip.decompress messageVer msi.message
// for j = 0 to ms'.messages.Length - 1 do
// rs.Add(ms'.messages.[j])
// | c ->
// failwithf "compression_code=%i not supported" c
// MessageSet(rs.ToArray())


let decompress (messageVer:int16) (ms:MessageSet) =
if ms.messages.Length = 0 then ms
else
Expand All @@ -83,8 +181,11 @@ let decompress (messageVer:int16) (ms:MessageSet) =
match (msi.message.attributes &&& (sbyte CompressionCodec.Mask)) |> byte with
| CompressionCodec.None -> [|msi|]
| CompressionCodec.GZIP ->
let ms' = GZip.decompress messageVer msi.message
ms'.messages
let decompressed = GZip.decompress messageVer msi.message
decompressed.messages
| CompressionCodec.Snappy ->
let decompressed = Snappy.decompress messageVer msi.message
decompressed.messages
| c -> failwithf "compression_code=%i not supported" c)
|> MessageSet

4 changes: 4 additions & 0 deletions src/kafunk/Protocol.fs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ module Protocol =
/// The timestamp of a message.
type Timestamp = int64

/// Byte flag indicating compression codec in use.
type CompressionCodec = byte

[<Compile(Module)>]
module CompressionCodec =

[<Literal>]
Expand Down
35 changes: 33 additions & 2 deletions src/kafunk/kafunk.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,18 @@
<Reference Include="System.ServiceModel" />
</ItemGroup>
<Choose>
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2' Or $(TargetFrameworkVersion) == 'v4.6.3')">
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v2.0' Or $(TargetFrameworkVersion) == 'v3.0' Or $(TargetFrameworkVersion) == 'v3.5' Or $(TargetFrameworkVersion) == 'v4.0' Or $(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2' Or $(TargetFrameworkVersion) == 'v4.6.3' Or $(TargetFrameworkVersion) == 'v4.7')">
<ItemGroup>
<Reference Include="Crc32C.NET">
<HintPath>..\..\packages\Crc32C.NET\lib\net20\Crc32C.NET.dll</HintPath>
<Private>True</Private>
<Paket>True</Paket>
</Reference>
</ItemGroup>
</When>
</Choose>
<Choose>
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2' Or $(TargetFrameworkVersion) == 'v4.6.3' Or $(TargetFrameworkVersion) == 'v4.7')">
<ItemGroup>
<Reference Include="FSharp.Control.AsyncSeq">
<HintPath>..\..\packages\FSharp.Control.AsyncSeq\lib\net45\FSharp.Control.AsyncSeq.dll</HintPath>
Expand All @@ -106,7 +117,7 @@
</Reference>
</ItemGroup>
</When>
<When Condition="($(TargetFrameworkIdentifier) == 'WindowsPhoneApp') Or ($(TargetFrameworkIdentifier) == '.NETCore') Or ($(TargetFrameworkIdentifier) == '.NETStandard' And ($(TargetFrameworkVersion) == 'v1.1' Or $(TargetFrameworkVersion) == 'v1.2' Or $(TargetFrameworkVersion) == 'v1.3' Or $(TargetFrameworkVersion) == 'v1.4' Or $(TargetFrameworkVersion) == 'v1.5' Or $(TargetFrameworkVersion) == 'v1.6')) Or ($(TargetFrameworkIdentifier) == '.NETCoreApp' And $(TargetFrameworkVersion) == 'v1.0') Or ($(TargetFrameworkIdentifier) == 'MonoAndroid') Or ($(TargetFrameworkIdentifier) == 'MonoTouch') Or ($(TargetFrameworkIdentifier) == 'Xamarin.iOS') Or ($(TargetFrameworkIdentifier) == 'Xamarin.Mac') Or ($(TargetFrameworkProfile) == 'Profile7') Or ($(TargetFrameworkProfile) == 'Profile44')">
<When Condition="($(TargetFrameworkIdentifier) == 'WindowsPhoneApp') Or ($(TargetFrameworkIdentifier) == '.NETCore') Or ($(TargetFrameworkIdentifier) == '.NETStandard' And ($(TargetFrameworkVersion) == 'v1.1' Or $(TargetFrameworkVersion) == 'v1.2' Or $(TargetFrameworkVersion) == 'v1.3' Or $(TargetFrameworkVersion) == 'v1.4' Or $(TargetFrameworkVersion) == 'v1.5' Or $(TargetFrameworkVersion) == 'v1.6' Or $(TargetFrameworkVersion) == 'v2.0')) Or ($(TargetFrameworkIdentifier) == '.NETCoreApp' And ($(TargetFrameworkVersion) == 'v1.0' Or $(TargetFrameworkVersion) == 'v1.1' Or $(TargetFrameworkVersion) == 'v2.0')) Or ($(TargetFrameworkIdentifier) == 'MonoAndroid') Or ($(TargetFrameworkIdentifier) == 'MonoTouch') Or ($(TargetFrameworkIdentifier) == 'Xamarin.iOS') Or ($(TargetFrameworkIdentifier) == 'Xamarin.Mac') Or ($(TargetFrameworkProfile) == 'Profile7') Or ($(TargetFrameworkProfile) == 'Profile44')">
<ItemGroup>
<Reference Include="FSharp.Control.AsyncSeq">
<HintPath>..\..\packages\FSharp.Control.AsyncSeq\lib\portable-net45+netcore45+MonoAndroid1+MonoTouch1\FSharp.Control.AsyncSeq.dll</HintPath>
Expand All @@ -116,4 +127,24 @@
</ItemGroup>
</When>
</Choose>
<Choose>
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v2.0' Or $(TargetFrameworkVersion) == 'v3.0' Or $(TargetFrameworkVersion) == 'v3.5' Or $(TargetFrameworkVersion) == 'v4.0')">
<ItemGroup>
<Reference Include="Snappy.NET">
<HintPath>..\..\packages\Snappy.NET\lib\net20\Snappy.NET.dll</HintPath>
<Private>True</Private>
<Paket>True</Paket>
</Reference>
</ItemGroup>
</When>
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2' Or $(TargetFrameworkVersion) == 'v4.6.3' Or $(TargetFrameworkVersion) == 'v4.7')">
<ItemGroup>
<Reference Include="Snappy.NET">
<HintPath>..\..\packages\Snappy.NET\lib\net45\Snappy.NET.dll</HintPath>
<Private>True</Private>
<Paket>True</Paket>
</Reference>
</ItemGroup>
</When>
</Choose>
</Project>
3 changes: 2 additions & 1 deletion src/kafunk/paket.references
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
FSharp.Control.AsyncSeq
FSharp.Control.AsyncSeq
Snappy.NET
30 changes: 0 additions & 30 deletions tests/kafunk.Tests/CompressionGzipTests.fs

This file was deleted.

Loading