diff --git a/internal/storage/compact/compact.go b/internal/storage/compact/compact.go index c5fb35d2..309fe9f0 100644 --- a/internal/storage/compact/compact.go +++ b/internal/storage/compact/compact.go @@ -115,9 +115,13 @@ func (s *Storage) Compact(ctx context.Context) (interface{}, error) { queue <- s.merge(merged, blocks, schema) // Reset both the schema and the set of blocks - schema = make(typeof.Schema, len(schema)) blocks = make([]block.Block, 0, 16) merged = make([]key.Key, 0, 16) + + // append the last value that didn't merge in previous merge + schema = input.Schema() + blocks = append(blocks, input) + merged = append(merged, key.Clone(k)) return false }); err != nil { return nil, err diff --git a/internal/storage/compact/compact_test.go b/internal/storage/compact/compact_test.go index ec17b2ac..77c487b6 100644 --- a/internal/storage/compact/compact_test.go +++ b/internal/storage/compact/compact_test.go @@ -86,9 +86,11 @@ func TestRange(t *testing.T) { // Insert out of order _ = store.Append(key.New("A", time.Unix(0, 0)), input, 60*time.Second) _ = store.Append(key.New("A", time.Unix(1, 0)), input, 60*time.Second) + _ = store.Append(key.New("C", time.Unix(1, 0)), input, 60*time.Second) _ = store.Append(key.New("B", time.Unix(0, 0)), input, 60*time.Second) _ = store.Append(key.New("B", time.Unix(1, 0)), input, 60*time.Second) _ = store.Append(key.New("B", time.Unix(2, 0)), input, 60*time.Second) + _ = store.Append(key.New("D", time.Unix(2, 0)), input, 60*time.Second) // Iterate in order var values [][]byte @@ -99,14 +101,14 @@ func TestRange(t *testing.T) { // Must be in order assert.NoError(t, err) - assert.Equal(t, 5, len(values)) + assert.Equal(t, 7, len(values)) for _, v := range values { assert.EqualValues(t, input, v) } // Manually compact, the final count should be 2 (given we have 2 keys) store.Compact(context.Background()) - assert.Equal(t, int64(2), count) + assert.Equal(t, int64(4), count) }) } diff --git a/internal/storage/flush/flush.go b/internal/storage/flush/flush.go index f875e685..b6e65fe6 100644 --- a/internal/storage/flush/flush.go +++ b/internal/storage/flush/flush.go @@ -5,7 +5,6 @@ package flush import ( "bytes" - "compress/flate" "sync" "time" @@ -69,7 +68,7 @@ func (s *Storage) Merge(blocks []block.Block, schema typeof.Schema) ([]byte, []b buffer := s.memoryPool.Get().(*bytes.Buffer) writer, err := eorc.NewWriter(buffer, eorc.SetSchema(orcSchema), - eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression})) + eorc.SetCompression(eorc.CompressionSnappy{})) for _, blk := range blocks { rows, err := blk.Select(blk.Schema()) diff --git a/internal/storage/flush/flush_test.go b/internal/storage/flush/flush_test.go index 4b5520d6..4521f08d 100644 --- a/internal/storage/flush/flush_test.go +++ b/internal/storage/flush/flush_test.go @@ -5,7 +5,6 @@ package flush import ( "bytes" - "compress/flate" "io/ioutil" "testing" @@ -16,7 +15,7 @@ import ( "github.com/kelindar/talaria/internal/encoding/orc" "github.com/kelindar/talaria/internal/encoding/typeof" "github.com/kelindar/talaria/internal/monitor" - "github.com/kelindar/talaria/internal/scripting" + script "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/storage/writer/noop" "github.com/stretchr/testify/assert" ) @@ -75,7 +74,7 @@ func TestMerge(t *testing.T) { orcBuffer := &bytes.Buffer{} writer, _ = eorc.NewWriter(orcBuffer, eorc.SetSchema(orcSchema), - eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression})) + eorc.SetCompression(eorc.CompressionSnappy{})) _ = writer.Write("eventName", 1, 1.0) _ = writer.Write("eventName", 2, 2.0) _ = writer.Close() @@ -153,7 +152,7 @@ func TestMerge_DifferentSchema(t *testing.T) { orcBuffer := &bytes.Buffer{} writer, _ = eorc.NewWriter(orcBuffer, eorc.SetSchema(orcSchema2), - eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression})) + eorc.SetCompression(eorc.CompressionSnappy{})) _ = writer.Write("eventName", 1, 1.0, nil) _ = writer.Write("eventName", 2, 2.0, "s") _ = writer.Close()