Skip to content
This repository has been archived by the owner on Jan 20, 2023. It is now read-only.

Commit

Permalink
Add binary serialization of TDigests
Browse files Browse the repository at this point in the history
In order to support aggregation of TDigests from across multiple
processes, they need to be able to be serialized. This commit adds
serialization which follows the scheme used in the Java reference
implementation at github.com/tdunning/t-digest.
  • Loading branch information
spenczar committed Oct 5, 2017
1 parent c49e31b commit 51c81e0
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 0 deletions.
93 changes: 93 additions & 0 deletions serde.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package tdigest

import (
"bytes"
"encoding/binary"
"fmt"
"io"
)

const encodingVersion = int32(1)

func marshalBinary(d *TDigest) ([]byte, error) {
buf := bytes.NewBuffer(nil)
w := &binaryBufferWriter{buf: buf}
w.writeValue(encodingVersion)

var min, max float64
if len(d.centroids) > 0 {
min = d.centroids[0].mean
max = d.centroids[len(d.centroids)-1].mean
}

w.writeValue(min)
w.writeValue(max)
w.writeValue(d.compression)
w.writeValue(int32(len(d.centroids)))
for _, c := range d.centroids {
w.writeValue(c.count)
w.writeValue(c.mean)
}

if w.err != nil {
return nil, w.err
}
return buf.Bytes(), nil
}

func unmarshalBinary(d *TDigest, p []byte) error {
var (
ev int32
min, max float64
n int32
)
r := &binaryReader{r: bytes.NewReader(p)}
r.readValue(&ev)
if ev != encodingVersion {
return fmt.Errorf("invalid encoding version: %d", ev)
}
r.readValue(&min)
r.readValue(&max)
r.readValue(&d.compression)
r.readValue(&n)
if r.err != nil {
return r.err
}
d.centroids = make([]*centroid, int(n))
for i := 0; i < int(n); i++ {
c := new(centroid)
r.readValue(&c.count)
r.readValue(&c.mean)
if r.err != nil {
return r.err
}
d.centroids[i] = c
d.countTotal += c.count
}

return r.err
}

type binaryBufferWriter struct {
buf *bytes.Buffer
err error
}

func (w *binaryBufferWriter) writeValue(v interface{}) {
if w.err != nil {
return
}
w.err = binary.Write(w.buf, binary.LittleEndian, v)
}

type binaryReader struct {
r io.Reader
err error
}

func (r *binaryReader) readValue(v interface{}) {
if r.err != nil {
return
}
r.err = binary.Read(r.r, binary.LittleEndian, v)
}
30 changes: 30 additions & 0 deletions serde_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package tdigest

import (
"reflect"
"testing"
)

func TestMarshalRoundTrip(t *testing.T) {
testcase := func(in *TDigest) func(*testing.T) {
return func(t *testing.T) {
b, err := in.MarshalBinary()
if err != nil {
t.Fatalf("MarshalBinary err: %v", err)
}
out := new(TDigest)
err = out.UnmarshalBinary(b)
if err != nil {
t.Fatalf("UnmarshalBinary err: %v", err)
}
if !reflect.DeepEqual(in, out) {
t.Errorf("marshaling round trip resulted in changes")
t.Logf("in: %+v", in)
t.Logf("out: %+v", out)
}
}
}
t.Run("empty", testcase(New()))
t.Run("1 value", testcase(simpleTDigest(1)))
t.Run("1000 values", testcase(simpleTDigest(1000)))
}
12 changes: 12 additions & 0 deletions tdigest.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,15 @@ func (d *TDigest) MergeInto(other *TDigest) {
other.Add(c.mean, c.count)
}
}

// MarshalBinary serializes d as a sequence of bytes, suitable to be
// deserialized later with UnmarshalBinary.
func (d *TDigest) MarshalBinary() ([]byte, error) {
return marshalBinary(d)
}

// UnmarshalBinary populates d with the parsed contents of p, which should have
// been created with a call to MarshalBinary.
func (d *TDigest) UnmarshalBinary(p []byte) error {
return unmarshalBinary(d, p)
}

0 comments on commit 51c81e0

Please sign in to comment.