Skip to content
This repository was archived by the owner on Nov 20, 2020. It is now read-only.

Commit 308f3a6

Browse files
authored
Merge pull request #146 from scrwtp/master
Snappy compression
2 parents 03528c4 + a4168e7 commit 308f3a6

11 files changed

+298
-90
lines changed

Diff for: build.fsx

+2-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ Target "Build" (fun _ ->
146146
Target "RunTests" (fun _ ->
147147
!! testAssemblies
148148
|> NUnit3 (fun p ->
149-
{ p with TimeOut = TimeSpan.FromMinutes 20.})
149+
{ p with TimeOut = TimeSpan.FromMinutes 20.
150+
ProcessModel = NUnit3ProcessModel.SingleProcessModel })
150151
)
151152

152153
#if MONO

Diff for: paket.dependencies

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
source https://api.nuget.org/v3/index.json
22
nuget FSharp.Control.AsyncSeq
3+
nuget Snappy.NET
34

45
group Build
56
source https://api.nuget.org/v3/index.json

Diff for: paket.lock

+4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
NUGET
2+
remote: https://api.nuget.org/v3/index.json
3+
Crc32C.NET (1.0.5)
4+
Snappy.NET (1.1.1.8)
5+
Crc32C.NET (>= 1.0.5)
26
remote: http://api.nuget.org/v3/index.json
37
FSharp.Control.AsyncSeq (2.0.10)
48

Diff for: src/kafunk/AssemblyInfo.fs

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
namespace System
1+
namespace System
22
open System.Reflection
33

44
[<assembly: AssemblyTitleAttribute("kafunk")>]
55
[<assembly: AssemblyProductAttribute("kafunk")>]
66
[<assembly: AssemblyDescriptionAttribute("F# client for Kafka")>]
7-
[<assembly: AssemblyVersionAttribute("0.0.40")>]
8-
[<assembly: AssemblyFileVersionAttribute("0.0.40")>]
7+
[<assembly: AssemblyVersionAttribute("0.1.0")>]
8+
[<assembly: AssemblyFileVersionAttribute("0.1.0")>]
99
do ()
1010

1111
module internal AssemblyVersionInformation =
12-
let [<Literal>] Version = "0.0.40"
13-
let [<Literal>] InformationalVersion = "0.0.40"
12+
let [<Literal>] Version = "0.1.0"
13+
let [<Literal>] InformationalVersion = "0.1.0"

Diff for: src/kafunk/Compression.fs

+144-46
Original file line numberDiff line numberDiff line change
@@ -1,80 +1,175 @@
11
[<Compile(Module)>]
22
module internal Kafunk.Compression
33

4+
open System.IO
5+
open System.IO.Compression
6+
47
open Kafunk
58

69
let private createMessage (value:Value) (compression:byte) =
710
let attrs = compression |> int8
811
Message.create value Binary.empty (Some attrs)
912

10-
[<Compile(Module)>]
11-
module GZip =
12-
13-
open System.IO
14-
open System.IO.Compression
15-
13+
[<RequireQualifiedAccess>]
14+
module internal Stream =
1615
// The only thing that can be compressed is a MessageSet, not a single Message; this results in a message containing the compressed set
17-
let compress (messageVer:ApiVersion) (ms:MessageSet) =
16+
let compress (codec: CompressionCodec) (makeStream: MemoryStream -> Stream) (messageVer:ApiVersion) (ms:MessageSet) =
1817
// TODO: pool MemoryStreams
1918
use outputStream = new MemoryStream()
20-
use gZipStream = new GZipStream(outputStream, CompressionMode.Compress)
2119
do
2220
let inputBytes = MessageSet.Size (messageVer,ms) |> Array.zeroCreate
2321
let buf = Binary.ofArray inputBytes
2422
MessageSet.Write (messageVer,ms,BinaryZipper(buf))
25-
//MessageSet.write messageVer ms buf |> ignore
26-
try
27-
gZipStream.Write(inputBytes, 0, inputBytes.Length)
28-
gZipStream.Close()
29-
with :? IOException as _ex ->
30-
// TODO: log this
31-
//printfn "Couldn't write to gzip stream: %A" ex
32-
reraise()
33-
createMessage (outputStream.ToArray() |> Binary.ofArray) CompressionCodec.GZIP
23+
use compStream = makeStream outputStream
24+
compStream.Write(inputBytes, 0, inputBytes.Length)
25+
createMessage (outputStream.ToArray() |> Binary.ofArray) codec
3426

35-
let decompress (messageVer:ApiVersion) (m:Message) =
27+
let decompress (makeStream: MemoryStream -> Stream) (messageVer:ApiVersion) (m:Message) =
3628
let inputBytes = m.value |> Binary.toArray
3729
// TODO: pool MemoryStreams
3830
use outputStream = new MemoryStream()
3931
do
4032
use inputStream = new MemoryStream(inputBytes)
41-
use gzipInputStream = new GZipStream(inputStream, CompressionMode.Decompress)
42-
try
43-
gzipInputStream.CopyTo(outputStream)
44-
gzipInputStream.Close()
45-
with :? IOException as _ex ->
46-
// TODO: log this
47-
//printfn "Couldn't read from gzip stream: %A" ex
48-
reraise()
33+
use compStream = makeStream inputStream
34+
compStream.CopyTo(outputStream)
4935
outputStream.Position <- 0L
5036
let output = outputStream.ToArray()
5137
// size is output array size divided by message set element size
5238
let bz = BinaryZipper(output |> Binary.ofArray)
5339
MessageSet.Read (messageVer, 0, 0s, output.Length, bz)
5440

41+
[<Compile(Module)>]
42+
module GZip =
43+
44+
open System.IO
45+
open System.IO.Compression
46+
47+
let compress =
48+
Stream.compress CompressionCodec.GZIP <| fun memStream ->
49+
upcast new GZipStream(memStream, CompressionMode.Compress)
50+
51+
let decompress =
52+
Stream.decompress <| fun memStream ->
53+
upcast new GZipStream(memStream, CompressionMode.Decompress)
54+
55+
[<Compile(Module)>]
56+
module Snappy =
57+
58+
open System
59+
open Snappy
60+
61+
module internal Binary =
62+
63+
let writeBlock (bytes: Binary.Segment) (buf : Binary.Segment) =
64+
Buffer.BlockCopy(bytes.Array, bytes.Offset, buf.Array, buf.Offset, bytes.Count)
65+
Binary.shiftOffset bytes.Count buf
66+
67+
let readBlock (length: int) (buf : Binary.Segment) =
68+
let arr = ArraySegment<byte>(buf.Array, buf.Offset, length)
69+
arr, Binary.shiftOffset length buf
70+
71+
let truncateIfSmaller actualLength maxLength (array: byte []) =
72+
if actualLength < maxLength
73+
then Binary.Segment(array, 0, actualLength)
74+
else Binary.ofArray array
75+
76+
type internal SnappyBinaryZipper (buf: Binary.Segment) =
77+
78+
let mutable buffer = buf
79+
80+
member this.Read<'a> (reader: Binary.Reader<'a>) =
81+
let res, updatedBuffer = reader buffer
82+
buffer <- updatedBuffer
83+
res
84+
85+
member this.Write<'a> (writer: Binary.Segment -> Binary.Segment) =
86+
buffer <- writer buffer
87+
88+
member this.Buffer = buffer
89+
90+
member this.ShiftOffset(by: int) = this.Write(Binary.shiftOffset by)
91+
92+
member this.Seek(offset: int) =
93+
buffer <- Binary.Segment(buffer.Array, offset, buffer.Count)
94+
95+
member this.WriteInt32(x) = this.Write(Binary.writeInt32 x)
96+
member this.WriteBlock(block) = this.Write(Binary.writeBlock block)
97+
98+
member this.ReadInt32() = this.Read(Binary.readInt32)
99+
member this.ReadBlock(length) = this.Read(Binary.readBlock length)
100+
101+
module internal CompressedMessage =
102+
103+
module private Header =
104+
// Magic string used by snappy-java.
105+
let magic = [| byte -126; byte 'S'; byte 'N'; byte 'A'; byte 'P'; byte 'P'; byte 'Y'; byte 0 |]
106+
// Current version number taken from snappy-java repo as of 22/05/2017.
107+
let currentVer = 1
108+
// Minimum compatible version number taken from snappy-java repo as of 22/05/2017.
109+
let minimumVer = 1
110+
// Total size of the header (magic string + two version ints + content length int)
111+
let size = magic.Length + Binary.sizeInt32 currentVer + Binary.sizeInt32 minimumVer + Binary.sizeInt32 0
112+
113+
let compress (bytes: Binary.Segment) : Binary.Segment =
114+
let maxLength = SnappyCodec.GetMaxCompressedLength(bytes.Count)
115+
116+
let buf = Array.zeroCreate (Header.size + maxLength)
117+
let bz = SnappyBinaryZipper(Binary.ofArray buf)
118+
119+
// write header compatible with snappy-java.
120+
bz.WriteBlock (Binary.ofArray Header.magic)
121+
bz.WriteInt32 (Header.currentVer)
122+
bz.WriteInt32 (Header.minimumVer)
123+
124+
// move forward to write compressed content, then go back to write the actual compressed content length.
125+
bz.ShiftOffset (Binary.sizeInt32 0)
126+
127+
let length = SnappyCodec.Compress(bytes.Array, bytes.Offset, bytes.Count, bz.Buffer.Array, bz.Buffer.Offset)
128+
129+
bz.Seek (Header.size - Binary.sizeInt32 length)
130+
bz.WriteInt32 (length)
131+
132+
Binary.truncateIfSmaller (Header.size + length) (Header.size + maxLength) buf
133+
134+
let decompress (bytes: Binary.Segment) : Binary.Segment =
135+
let bz = SnappyBinaryZipper(bytes)
136+
137+
// TODO: do we want to validate these?
138+
let magic = bz.ReadBlock(Header.magic.Length)
139+
let currentVer = bz.ReadInt32()
140+
let minimumVer = bz.ReadInt32()
141+
142+
let contentLength = bz.ReadInt32()
143+
let content = bz.ReadBlock(contentLength)
144+
145+
let uncompressedLength = SnappyCodec.GetUncompressedLength(content.Array, content.Offset, content.Count)
146+
147+
let buf = Array.zeroCreate uncompressedLength
148+
let actualLength = SnappyCodec.Uncompress(content.Array, content.Offset, content.Count, buf, 0)
149+
150+
Binary.truncateIfSmaller actualLength uncompressedLength buf
151+
152+
let compress (messageVer:ApiVersion) (ms:MessageSet) =
153+
let inputBytes = MessageSet.Size (messageVer,ms) |> Array.zeroCreate
154+
let buf = Binary.ofArray inputBytes
155+
MessageSet.Write (messageVer,ms,BinaryZipper(buf))
156+
157+
let output = CompressedMessage.compress buf
158+
159+
createMessage output CompressionCodec.Snappy
160+
161+
let decompress (messageVer:ApiVersion) (m:Message) =
162+
let output = CompressedMessage.decompress m.value
163+
let bz = BinaryZipper(output)
164+
MessageSet.Read (messageVer, 0, 0s, output.Count, bz)
165+
55166
let compress (messageVer:int16) (compression:byte) (ms:MessageSet) =
56167
match compression with
57168
| CompressionCodec.None -> ms
58169
| CompressionCodec.GZIP -> MessageSet.ofMessage messageVer (GZip.compress messageVer ms)
170+
| CompressionCodec.Snappy -> MessageSet.ofMessage messageVer (Snappy.compress messageVer ms)
59171
| _ -> failwithf "Incorrect compression codec %A" compression
60-
61-
//let decompress (messageVer:int16) (ms:MessageSet) =
62-
// if ms.messages.Length = 0 then ms
63-
// else
64-
// let rs = ResizeArray<_>(ms.messages.Length)
65-
// for i = 0 to ms.messages.Length - 1 do
66-
// let msi = ms.messages.[i]
67-
// match (msi.message.attributes &&& (sbyte CompressionCodec.Mask)) |> byte with
68-
// | CompressionCodec.None ->
69-
// rs.Add msi
70-
// | CompressionCodec.GZIP ->
71-
// let ms' = GZip.decompress messageVer msi.message
72-
// for j = 0 to ms'.messages.Length - 1 do
73-
// rs.Add(ms'.messages.[j])
74-
// | c ->
75-
// failwithf "compression_code=%i not supported" c
76-
// MessageSet(rs.ToArray())
77-
172+
78173
let decompress (messageVer:int16) (ms:MessageSet) =
79174
if ms.messages.Length = 0 then ms
80175
else
@@ -83,8 +178,11 @@ let decompress (messageVer:int16) (ms:MessageSet) =
83178
match (msi.message.attributes &&& (sbyte CompressionCodec.Mask)) |> byte with
84179
| CompressionCodec.None -> [|msi|]
85180
| CompressionCodec.GZIP ->
86-
let ms' = GZip.decompress messageVer msi.message
87-
ms'.messages
181+
let decompressed = GZip.decompress messageVer msi.message
182+
decompressed.messages
183+
| CompressionCodec.Snappy ->
184+
let decompressed = Snappy.decompress messageVer msi.message
185+
decompressed.messages
88186
| c -> failwithf "compression_code=%i not supported" c)
89187
|> MessageSet
90188

Diff for: src/kafunk/Protocol.fs

+4
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ module Protocol =
8484
/// The timestamp of a message.
8585
type Timestamp = int64
8686

87+
/// Byte flag indicating compression codec in use.
88+
type CompressionCodec = byte
89+
90+
[<Compile(Module)>]
8791
module CompressionCodec =
8892

8993
[<Literal>]

Diff for: src/kafunk/kafunk.fsproj

+33-2
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,18 @@
9797
<Reference Include="System.ServiceModel" />
9898
</ItemGroup>
9999
<Choose>
100-
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2' Or $(TargetFrameworkVersion) == 'v4.6.3')">
100+
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v2.0' Or $(TargetFrameworkVersion) == 'v3.0' Or $(TargetFrameworkVersion) == 'v3.5' Or $(TargetFrameworkVersion) == 'v4.0' Or $(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2' Or $(TargetFrameworkVersion) == 'v4.6.3' Or $(TargetFrameworkVersion) == 'v4.7')">
101+
<ItemGroup>
102+
<Reference Include="Crc32C.NET">
103+
<HintPath>..\..\packages\Crc32C.NET\lib\net20\Crc32C.NET.dll</HintPath>
104+
<Private>True</Private>
105+
<Paket>True</Paket>
106+
</Reference>
107+
</ItemGroup>
108+
</When>
109+
</Choose>
110+
<Choose>
111+
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2' Or $(TargetFrameworkVersion) == 'v4.6.3' Or $(TargetFrameworkVersion) == 'v4.7')">
101112
<ItemGroup>
102113
<Reference Include="FSharp.Control.AsyncSeq">
103114
<HintPath>..\..\packages\FSharp.Control.AsyncSeq\lib\net45\FSharp.Control.AsyncSeq.dll</HintPath>
@@ -106,7 +117,7 @@
106117
</Reference>
107118
</ItemGroup>
108119
</When>
109-
<When Condition="($(TargetFrameworkIdentifier) == 'WindowsPhoneApp') Or ($(TargetFrameworkIdentifier) == '.NETCore') Or ($(TargetFrameworkIdentifier) == '.NETStandard' And ($(TargetFrameworkVersion) == 'v1.1' Or $(TargetFrameworkVersion) == 'v1.2' Or $(TargetFrameworkVersion) == 'v1.3' Or $(TargetFrameworkVersion) == 'v1.4' Or $(TargetFrameworkVersion) == 'v1.5' Or $(TargetFrameworkVersion) == 'v1.6')) Or ($(TargetFrameworkIdentifier) == '.NETCoreApp' And $(TargetFrameworkVersion) == 'v1.0') Or ($(TargetFrameworkIdentifier) == 'MonoAndroid') Or ($(TargetFrameworkIdentifier) == 'MonoTouch') Or ($(TargetFrameworkIdentifier) == 'Xamarin.iOS') Or ($(TargetFrameworkIdentifier) == 'Xamarin.Mac') Or ($(TargetFrameworkProfile) == 'Profile7') Or ($(TargetFrameworkProfile) == 'Profile44')">
120+
<When Condition="($(TargetFrameworkIdentifier) == 'WindowsPhoneApp') Or ($(TargetFrameworkIdentifier) == '.NETCore') Or ($(TargetFrameworkIdentifier) == '.NETStandard' And ($(TargetFrameworkVersion) == 'v1.1' Or $(TargetFrameworkVersion) == 'v1.2' Or $(TargetFrameworkVersion) == 'v1.3' Or $(TargetFrameworkVersion) == 'v1.4' Or $(TargetFrameworkVersion) == 'v1.5' Or $(TargetFrameworkVersion) == 'v1.6' Or $(TargetFrameworkVersion) == 'v2.0')) Or ($(TargetFrameworkIdentifier) == '.NETCoreApp' And ($(TargetFrameworkVersion) == 'v1.0' Or $(TargetFrameworkVersion) == 'v1.1' Or $(TargetFrameworkVersion) == 'v2.0')) Or ($(TargetFrameworkIdentifier) == 'MonoAndroid') Or ($(TargetFrameworkIdentifier) == 'MonoTouch') Or ($(TargetFrameworkIdentifier) == 'Xamarin.iOS') Or ($(TargetFrameworkIdentifier) == 'Xamarin.Mac') Or ($(TargetFrameworkProfile) == 'Profile7') Or ($(TargetFrameworkProfile) == 'Profile44')">
110121
<ItemGroup>
111122
<Reference Include="FSharp.Control.AsyncSeq">
112123
<HintPath>..\..\packages\FSharp.Control.AsyncSeq\lib\portable-net45+netcore45+MonoAndroid1+MonoTouch1\FSharp.Control.AsyncSeq.dll</HintPath>
@@ -116,4 +127,24 @@
116127
</ItemGroup>
117128
</When>
118129
</Choose>
130+
<Choose>
131+
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v2.0' Or $(TargetFrameworkVersion) == 'v3.0' Or $(TargetFrameworkVersion) == 'v3.5' Or $(TargetFrameworkVersion) == 'v4.0')">
132+
<ItemGroup>
133+
<Reference Include="Snappy.NET">
134+
<HintPath>..\..\packages\Snappy.NET\lib\net20\Snappy.NET.dll</HintPath>
135+
<Private>True</Private>
136+
<Paket>True</Paket>
137+
</Reference>
138+
</ItemGroup>
139+
</When>
140+
<When Condition="$(TargetFrameworkIdentifier) == '.NETFramework' And ($(TargetFrameworkVersion) == 'v4.5' Or $(TargetFrameworkVersion) == 'v4.5.1' Or $(TargetFrameworkVersion) == 'v4.5.2' Or $(TargetFrameworkVersion) == 'v4.5.3' Or $(TargetFrameworkVersion) == 'v4.6' Or $(TargetFrameworkVersion) == 'v4.6.1' Or $(TargetFrameworkVersion) == 'v4.6.2' Or $(TargetFrameworkVersion) == 'v4.6.3' Or $(TargetFrameworkVersion) == 'v4.7')">
141+
<ItemGroup>
142+
<Reference Include="Snappy.NET">
143+
<HintPath>..\..\packages\Snappy.NET\lib\net45\Snappy.NET.dll</HintPath>
144+
<Private>True</Private>
145+
<Paket>True</Paket>
146+
</Reference>
147+
</ItemGroup>
148+
</When>
149+
</Choose>
119150
</Project>

Diff for: src/kafunk/paket.references

+2-1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
FSharp.Control.AsyncSeq
1+
FSharp.Control.AsyncSeq
2+
Snappy.NET

Diff for: tests/kafunk.Tests/CompressionGzipTests.fs

-30
This file was deleted.

0 commit comments

Comments
 (0)