Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Snappy compression #146

Merged
merged 7 commits into from
May 25, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 81 additions & 58 deletions src/kafunk/Compression.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@ module internal Stream =
let buf = Binary.ofArray inputBytes
MessageSet.Write (messageVer,ms,BinaryZipper(buf))
use compStream = makeStream outputStream
try
compStream.Write(inputBytes, 0, inputBytes.Length)
with :? IOException as _ex ->
// TODO: log this
//printfn "Couldn't write to gzip stream: %A" ex
reraise()
compStream.Write(inputBytes, 0, inputBytes.Length)
createMessage (outputStream.ToArray() |> Binary.ofArray) codec

let decompress (makeStream: MemoryStream -> Stream) (messageVer:ApiVersion) (m:Message) =
Expand All @@ -36,12 +31,7 @@ module internal Stream =
do
use inputStream = new MemoryStream(inputBytes)
use compStream = makeStream inputStream
try
compStream.CopyTo(outputStream)
with :? IOException as _ex ->
// TODO: log this
//printfn "Couldn't read from gzip stream: %A" ex
reraise()
compStream.CopyTo(outputStream)
outputStream.Position <- 0L
let output = outputStream.ToArray()
// size is output array size divided by message set element size
Expand All @@ -67,81 +57,114 @@ module Snappy =

open System
open Snappy

module CompressedMessage =

type BinaryZipper with
module internal Binary =

let writeBlock (bytes: Binary.Segment) (buf : Binary.Segment) =
Buffer.BlockCopy(bytes.Array, bytes.Offset, buf.Array, buf.Offset, bytes.Count)
Binary.shiftOffset bytes.Count buf

let readBlock (length: int) (buf : Binary.Segment) =
let arr = ArraySegment<byte>(buf.Array, buf.Offset, length)
arr, Binary.shiftOffset length buf

let truncateIfSmaller actualLength maxLength (array: byte []) =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than copying an array, you can truncate by adjusting the offset on an ArraySegment (Binary module has some helper for this).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed what Snappy.NET did, but I guess it can be avoided. Made both compress and decompress here be Binary.Segment - > Binary.Segment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah looks good. Also here:

let buf = Array.zeroCreate uncompressedLength
let actualLength = SnappyCodec.Uncompress(content.Array, content.Offset, content.Count, buf, 0)
Binary.truncateIfSmaller actualLength uncompressedLength buf

The call to Binary.truncateIfSmaller should operate on ArraySegment<byte> rather than array directly. This way, when it actually has to truncate, it doesn't call Binary.toArray which creates a new array, but simply adjusts the offsets on the ArraySegment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't call Binary.toArray anymore - it does take an array, but it will return a Segment of appropriate length.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry I misread ofArray

if actualLength < maxLength
then Binary.toArray <| Binary.Segment(array, 0, actualLength)
else array

type internal SnappyBinaryZipper (buf: Binary.Segment) =

member this.WriteBlock (bytes: ArraySegment<byte>) =
Buffer.BlockCopy(bytes.Array, bytes.Offset, this.Buffer.Array, this.Buffer.Offset, bytes.Count)
this.ShiftOffset bytes.Count
let mutable buffer = buf

member this.Read<'a> (reader: Binary.Reader<'a>) =
let res, updatedBuffer = reader buffer
buffer <- updatedBuffer
res

member this.Write<'a> (writer: Binary.Segment -> Binary.Segment) =
buffer <- writer buffer

member this.Buffer = buffer

member this.ShiftOffset(by: int) = this.Write(Binary.shiftOffset by)

member this.Seek(offset: int) =
buffer <- Binary.Segment(buffer.Array, offset, buffer.Count)

member this.WriteInt32(x) = this.Write(Binary.writeInt32 x)
member this.WriteBlock(block) = this.Write(Binary.writeBlock block)

member this.ReadInt32() = this.Read(Binary.readInt32)
member this.ReadBlock(length) = this.Read(Binary.readBlock length)

member this.ReadBlock (length: int) =
let arr = ArraySegment<byte>(this.Buffer.Array, this.Buffer.Offset, length)
this.ShiftOffset length
arr
module internal CompressedMessage =

module private Header =
// Combined size of header in bytes.
let size = 16
// Magic string used by snappy-java.
let magic = [| byte -126; byte 'S'; byte 'N'; byte 'A'; byte 'P'; byte 'P'; byte 'Y'; byte 0 |]
// Current version number taken from snappy-java repo as of 22/05/2017.
let currentVer = 1
// Minimum compatible version number taken from snappy-java repo as of 22/05/2017.
let minimumVer = 1
// Total size of the header (magic string + two version ints + content length int)
let size = magic.Length + Binary.sizeInt32 currentVer + Binary.sizeInt32 minimumVer + Binary.sizeInt32 0

let pack (bytes: byte []) =
let buf = Array.zeroCreate (Header.size + 4 + bytes.Length)
let bz = BinaryZipper(Binary.ofArray buf)

// write header compatible with snappy-java
bz.WriteBlock(Binary.ofArray Header.magic)
bz.WriteInt32(Header.currentVer)
bz.WriteInt32(Header.minimumVer)

// write content
// NOTE: this will also write content length in the first 4 bytes, this is expected.
bz.WriteBytes(Binary.ofArray bytes)
let compress (bytes: byte []) : byte [] =
let maxLength = SnappyCodec.GetMaxCompressedLength(bytes.Length)

buf
let buf = Array.zeroCreate (Header.size + maxLength)
let bz = SnappyBinaryZipper(Binary.ofArray buf)

// write header compatible with snappy-java.
bz.WriteBlock (Binary.ofArray Header.magic)
bz.WriteInt32 (Header.currentVer)
bz.WriteInt32 (Header.minimumVer)

// move forward to write compressed content, then go back to write the actual compressed content length.
bz.ShiftOffset (Binary.sizeInt32 0)

let length = SnappyCodec.Compress(bytes, 0, bytes.Length, bz.Buffer.Array, bz.Buffer.Offset)

bz.Seek (Header.size - Binary.sizeInt32 length)
bz.WriteInt32 (length)

Binary.truncateIfSmaller (Header.size + length) (Header.size + maxLength) buf

let unpack (bytes: byte []) =
let bz = BinaryZipper(Binary.ofArray bytes)
let decompress (bytes: byte []) : byte [] =
let bz = SnappyBinaryZipper(Binary.ofArray bytes)

// TODO: do we want to validate these?
let magic = bz.ReadBlock(Header.magic.Length)
let currentVer = bz.ReadInt32()
let minimumVer = bz.ReadInt32()

let content = bz.ReadBytes()
let contentLength = bz.ReadInt32()
let content = bz.ReadBlock(contentLength)

let uncompressedLength = SnappyCodec.GetUncompressedLength(content.Array, content.Offset, content.Count)

let buf = Array.zeroCreate uncompressedLength
let actualLength = SnappyCodec.Uncompress(content.Array, content.Offset, content.Count, buf, 0)

let buf = Array.zeroCreate<byte> content.Count
Buffer.BlockCopy(content.Array, content.Offset, buf, 0, content.Count)
buf
Binary.truncateIfSmaller actualLength uncompressedLength buf

let compress (messageVer:ApiVersion) (ms:MessageSet) =
let inputBytes = MessageSet.Size (messageVer,ms) |> Array.zeroCreate
let buf = Binary.ofArray inputBytes
MessageSet.Write (messageVer,ms,BinaryZipper(buf))
try
let compressed = SnappyCodec.Compress(inputBytes)
let output = CompressedMessage.pack compressed
createMessage (Binary.ofArray output) CompressionCodec.Snappy
with :? IOException as _ex ->
// TODO: log this
reraise()

let output = CompressedMessage.compress inputBytes

createMessage (Binary.ofArray output) CompressionCodec.Snappy

let decompress (messageVer:ApiVersion) (m:Message) =
let inputBytes = m.value |> Binary.toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this copy be avoided by passing m.value directly into the binary zipper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - the same fix can be applied in the gzip version.

try
let compressed = CompressedMessage.unpack inputBytes
let output = SnappyCodec.Uncompress(compressed)
let bz = BinaryZipper(Binary.ofArray output)
MessageSet.Read (messageVer, 0, 0s, output.Length, bz)
with :? IOException as _ex ->
// TODO: log this
reraise()

let output = CompressedMessage.decompress inputBytes

let bz = BinaryZipper(Binary.ofArray output)
MessageSet.Read (messageVer, 0, 0s, output.Length, bz)

let compress (messageVer:int16) (compression:byte) (ms:MessageSet) =
match compression with
Expand Down
1 change: 0 additions & 1 deletion tests/kafunk.Tests/CompressionTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,3 @@ let ``Compression.Snappy reads messages that are compatible with reference imple
Encoding.ASCII.GetString(outputMsg.message.value |> Binary.toArray)

Assert.AreEqual ("test", payload)