Skip to content

Commit

Permalink
Fix the issue of compact for last key (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
a9kitkumarsinha committed Jun 9, 2020
1 parent bdd1aeb commit c5167c6
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 9 deletions.
6 changes: 5 additions & 1 deletion internal/storage/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,13 @@ func (s *Storage) Compact(ctx context.Context) (interface{}, error) {
queue <- s.merge(merged, blocks, schema)

// Reset both the schema and the set of blocks
schema = make(typeof.Schema, len(schema))
blocks = make([]block.Block, 0, 16)
merged = make([]key.Key, 0, 16)

// append the last value that didn't merge in previous merge
schema = input.Schema()
blocks = append(blocks, input)
merged = append(merged, key.Clone(k))
return false
}); err != nil {
return nil, err
Expand Down
6 changes: 4 additions & 2 deletions internal/storage/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ func TestRange(t *testing.T) {
// Insert out of order
_ = store.Append(key.New("A", time.Unix(0, 0)), input, 60*time.Second)
_ = store.Append(key.New("A", time.Unix(1, 0)), input, 60*time.Second)
_ = store.Append(key.New("C", time.Unix(1, 0)), input, 60*time.Second)
_ = store.Append(key.New("B", time.Unix(0, 0)), input, 60*time.Second)
_ = store.Append(key.New("B", time.Unix(1, 0)), input, 60*time.Second)
_ = store.Append(key.New("B", time.Unix(2, 0)), input, 60*time.Second)
_ = store.Append(key.New("D", time.Unix(2, 0)), input, 60*time.Second)

// Iterate in order
var values [][]byte
Expand All @@ -99,14 +101,14 @@ func TestRange(t *testing.T) {

// Must be in order
assert.NoError(t, err)
assert.Equal(t, 5, len(values))
assert.Equal(t, 7, len(values))
for _, v := range values {
assert.EqualValues(t, input, v)
}

// Manually compact, the final count should be 2 (given we have 2 keys)
store.Compact(context.Background())
assert.Equal(t, int64(2), count)
assert.Equal(t, int64(4), count)
})
}

Expand Down
3 changes: 1 addition & 2 deletions internal/storage/flush/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package flush

import (
"bytes"
"compress/flate"
"sync"
"time"

Expand Down Expand Up @@ -69,7 +68,7 @@ func (s *Storage) Merge(blocks []block.Block, schema typeof.Schema) ([]byte, []b
buffer := s.memoryPool.Get().(*bytes.Buffer)
writer, err := eorc.NewWriter(buffer,
eorc.SetSchema(orcSchema),
eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression}))
eorc.SetCompression(eorc.CompressionSnappy{}))

for _, blk := range blocks {
rows, err := blk.Select(blk.Schema())
Expand Down
7 changes: 3 additions & 4 deletions internal/storage/flush/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package flush

import (
"bytes"
"compress/flate"
"io/ioutil"
"testing"

Expand All @@ -16,7 +15,7 @@ import (
"github.com/kelindar/talaria/internal/encoding/orc"
"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/scripting"
script "github.com/kelindar/talaria/internal/scripting"
"github.com/kelindar/talaria/internal/storage/writer/noop"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -75,7 +74,7 @@ func TestMerge(t *testing.T) {
orcBuffer := &bytes.Buffer{}
writer, _ = eorc.NewWriter(orcBuffer,
eorc.SetSchema(orcSchema),
eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression}))
eorc.SetCompression(eorc.CompressionSnappy{}))
_ = writer.Write("eventName", 1, 1.0)
_ = writer.Write("eventName", 2, 2.0)
_ = writer.Close()
Expand Down Expand Up @@ -153,7 +152,7 @@ func TestMerge_DifferentSchema(t *testing.T) {
orcBuffer := &bytes.Buffer{}
writer, _ = eorc.NewWriter(orcBuffer,
eorc.SetSchema(orcSchema2),
eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression}))
eorc.SetCompression(eorc.CompressionSnappy{}))
_ = writer.Write("eventName", 1, 1.0, nil)
_ = writer.Write("eventName", 2, 2.0, "s")
_ = writer.Close()
Expand Down

0 comments on commit c5167c6

Please sign in to comment.