Skip to content

Commit

Permalink
db: support multiple KeySchemas
Browse files Browse the repository at this point in the history
Adapt the configuration of block columnar KeySchemas to allow the user to
configure a set of known KeySchemas, in addition to specifying one for use in
construction of new sstables. If a user wishes to adapt their key schema, they
may introduce a new KeySchema while using the old KeySchema to interpret
existing sstables.

In future work we may consider allowing users to partition the LSM, specifying
separate KeySchemas for distinct portions of the keyspace.

Close #4045.
  • Loading branch information
jbowens committed Oct 16, 2024
1 parent 8b6d64f commit c719c4e
Show file tree
Hide file tree
Showing 45 changed files with 605 additions and 466 deletions.
4 changes: 3 additions & 1 deletion cmd/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/pebble/internal/bytealloc"
"github.com/cockroachdb/pebble/internal/crdbtest"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
)

Expand Down Expand Up @@ -61,7 +62,8 @@ func newPebbleDB(dir string) DB {
Comparer: &crdbtest.Comparer,
DisableWAL: disableWAL,
FormatMajorVersion: pebble.FormatNewest,
KeySchema: crdbtest.KeySchema,
KeySchema: crdbtest.KeySchema.Name,
KeySchemas: sstable.MakeKeySchemas(crdbtest.KeySchema),
L0CompactionThreshold: 2,
L0StopWritesThreshold: 1000,
LBaseMaxBytes: 64 << 20, // 64 MB
Expand Down
2 changes: 0 additions & 2 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/sstable/colblk"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/vfs/errorfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -617,7 +616,6 @@ func TestCompaction(t *testing.T) {
Comparer: testkeys.Comparer,
DebugCheck: DebugCheckLevels,
FS: mem,
KeySchema: colblk.DefaultKeySchema(testkeys.Comparer, 16),
L0CompactionThreshold: 8,
MemTableSize: memTableSize,
}
Expand Down
7 changes: 7 additions & 0 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,13 @@ func ingestLoad1(
tf, fmv, fmv.MinTableFormat(), fmv.MaxTableFormat(),
)
}
if tf.BlockColumnar() {
if _, ok := opts.KeySchemas[r.Properties.KeySchemaName]; !ok {
return nil, errors.Newf(
"pebble: table uses key schema %q unknown to the database",
r.Properties.KeySchemaName)
}
}

meta := &fileMetadata{}
meta.FileNum = fileNum
Expand Down
10 changes: 4 additions & 6 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/colblk"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/vfs/errorfs"
"github.com/kr/pretty"
Expand Down Expand Up @@ -218,10 +217,9 @@ func TestIngestLoadRand(t *testing.T) {
}

opts := (&Options{
Comparer: base.DefaultComparer,
KeySchema: colblk.DefaultKeySchema(base.DefaultComparer, 16),
FS: mem,
}).WithFSDefaults()
Comparer: base.DefaultComparer,
FS: mem,
}).WithFSDefaults().EnsureDefaults()
lr, err := ingestLoad(context.Background(), opts, version, paths, nil, nil, 0, pending)
require.NoError(t, err)

Expand Down Expand Up @@ -3671,7 +3669,7 @@ func TestIngestValidation(t *testing.T) {
BlockSize: blockSize, // Create many smaller blocks.
Comparer: opts.Comparer,
Compression: NoCompression, // For simpler debugging.
KeySchema: opts.KeySchema,
KeySchema: opts.KeySchemas[opts.KeySchema],
})
for _, kv := range keyVals {
require.NoError(t, w.Set(kv.key, kv.val))
Expand Down
4 changes: 2 additions & 2 deletions internal/compact/spans_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ func TestSplitAndEncodeSpan(t *testing.T) {
require.NoError(t, SplitAndEncodeSpan(base.DefaultComparer.Compare, &span, upToKey, tw))
require.NoError(t, tw.Close())
_, rangeDels, rangeKeys := sstable.ReadAll(obj, sstable.ReaderOptions{
Comparer: wo.Comparer,
KeySchema: wo.KeySchema,
Comparer: wo.Comparer,
KeySchemas: sstable.MakeKeySchemas(wo.KeySchema),
})
require.LessOrEqual(t, len(rangeDels)+len(rangeKeys), 1)
s := "."
Expand Down
1 change: 1 addition & 0 deletions internal/crdbtest/crdbtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ const (
)

var KeySchema = colblk.KeySchema{
Name: "crdb1",
ColumnTypes: []colblk.DataType{
cockroachColRoachKey: colblk.DataTypePrefixBytes,
cockroachColMVCCWallTime: colblk.DataTypeUint,
Expand Down
4 changes: 2 additions & 2 deletions level_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func TestCheckLevelsCornerCases(t *testing.T) {
}
// Set FileNum for logging purposes.
readerOpts := sstable.ReaderOptions{
Comparer: testkeys.Comparer,
KeySchema: writerOpts.KeySchema,
Comparer: testkeys.Comparer,
KeySchemas: sstable.KeySchemas{writerOpts.KeySchema.Name: writerOpts.KeySchema},
}
readerOpts.SetInternalCacheOpts(sstableinternal.CacheOptions{FileNum: base.DiskFileNum(fileNum - 1)})
r, err := sstable.NewReader(context.Background(), readable, readerOpts)
Expand Down
1 change: 1 addition & 0 deletions metamorphic/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestOptionsRoundtrip(t *testing.T) {
"Cache:",
"Cache.",
"FS:",
"KeySchemas[",
"TableCache:",
// Function pointers
"BlockPropertyCollectors:",
Expand Down
94 changes: 77 additions & 17 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/rangekey"
Expand Down Expand Up @@ -61,6 +62,9 @@ type FilterWriter = base.FilterWriter
// FilterPolicy exports the base.FilterPolicy type.
type FilterPolicy = base.FilterPolicy

// KeySchema exports the colblk.KeySchema type.
type KeySchema = colblk.KeySchema

// BlockPropertyCollector exports the sstable.BlockPropertyCollector type.
type BlockPropertyCollector = sstable.BlockPropertyCollector

Expand Down Expand Up @@ -890,11 +894,22 @@ type Options struct {
// The default value uses the underlying operating system's file system.
FS vfs.FS

// KeySchema defines the schema of a user key. When columnar blocks are in
// use (see FormatColumnarBlocks), the user may specify how a key should be
// decomposed into columns. If not set, colblk.DefaultKeySchema is used to
// construct a default key schema.
KeySchema colblk.KeySchema
// KeySchema is the name of the key schema that should be used when writing
// new sstables. There must be a key schema with this name defined in
// KeySchemas. If not set, colblk.DefaultKeySchema is used to construct a
// default key schema.
KeySchema string

// KeySchemas defines the set of known schemas of user keys. When columnar
// blocks are in use (see FormatColumnarBlocks), the user may specify how a
// key should be decomposed into columns. Each KeySchema must have a unique
// name. The schema named by Options.KeySchema is used while writing
// sstables during flushes and compactions.
//
// Multiple KeySchemas may be used over the lifetime of a database. Once a
// KeySchema is used, it must be provided in KeySchemas in subsequent calls
// to Open for perpetuity.
KeySchemas sstable.KeySchemas

// Lock, if set, must be a database lock acquired through LockDirectory for
// the same directory passed to Open. If provided, Open will skip locking
Expand Down Expand Up @@ -1207,8 +1222,10 @@ func (o *Options) EnsureDefaults() *Options {
if o.Experimental.KeyValidationFunc == nil {
o.Experimental.KeyValidationFunc = func([]byte) error { return nil }
}
if len(o.KeySchema.ColumnTypes) == 0 {
o.KeySchema = colblk.DefaultKeySchema(o.Comparer, 16 /* bundleSize */)
if o.KeySchema == "" && len(o.KeySchemas) == 0 {
ks := colblk.DefaultKeySchema(o.Comparer, 16 /* bundleSize */)
o.KeySchema = ks.Name
o.KeySchemas = sstable.MakeKeySchemas(ks)
}
if o.L0CompactionThreshold <= 0 {
o.L0CompactionThreshold = 4
Expand Down Expand Up @@ -1433,6 +1450,7 @@ func (o *Options) String() string {
fmt.Fprintf(&buf, " flush_delay_range_key=%s\n", o.FlushDelayRangeKey)
fmt.Fprintf(&buf, " flush_split_bytes=%d\n", o.FlushSplitBytes)
fmt.Fprintf(&buf, " format_major_version=%d\n", o.FormatMajorVersion)
fmt.Fprintf(&buf, " key_schema=%s\n", o.KeySchema)
fmt.Fprintf(&buf, " l0_compaction_concurrency=%d\n", o.Experimental.L0CompactionConcurrency)
fmt.Fprintf(&buf, " l0_compaction_file_threshold=%d\n", o.L0CompactionFileThreshold)
fmt.Fprintf(&buf, " l0_compaction_threshold=%d\n", o.L0CompactionThreshold)
Expand Down Expand Up @@ -1576,6 +1594,7 @@ type ParseHooks struct {
NewCleaner func(name string) (Cleaner, error)
NewComparer func(name string) (*Comparer, error)
NewFilterPolicy func(name string) (FilterPolicy, error)
NewKeySchema func(name string) (KeySchema, error)
NewMerger func(name string) (*Merger, error)
SkipUnknown func(name, value string) bool
}
Expand All @@ -1590,6 +1609,20 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
// a backwards incompatible change. Instead, leave in support for parsing the
// key but simply don't parse the value.

parseComparer := func(name string) (*Comparer, error) {
switch name {
case DefaultComparer.Name:
return DefaultComparer, nil
case testkeys.Comparer.Name:
return testkeys.Comparer, nil
default:
if hooks != nil && hooks.NewComparer != nil {
return hooks.NewComparer(name)
}
return nil, nil
}
}

switch {
case section == "Version":
switch key {
Expand Down Expand Up @@ -1631,14 +1664,7 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
}
}
case "comparer":
switch value {
case "leveldb.BytewiseComparator":
o.Comparer = DefaultComparer
default:
if hooks != nil && hooks.NewComparer != nil {
o.Comparer, err = hooks.NewComparer(value)
}
}
o.Comparer, err = parseComparer(value)
case "compaction_debt_concurrency":
o.Experimental.CompactionDebtConcurrency, err = strconv.ParseUint(value, 10, 64)
case "delete_range_flush_delay":
Expand Down Expand Up @@ -1678,6 +1704,32 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
if err == nil {
o.FormatMajorVersion = FormatMajorVersion(v)
}
case "key_schema":
o.KeySchema = value
if o.KeySchemas == nil {
o.KeySchemas = make(map[string]KeySchema)
}
if _, ok := o.KeySchemas[o.KeySchema]; !ok {
if strings.HasPrefix(value, "DefaultKeySchema(") && strings.HasSuffix(value, ")") {
argsStr := strings.TrimSuffix(strings.TrimPrefix(value, "DefaultKeySchema("), ")")
args := strings.FieldsFunc(argsStr, func(r rune) bool {
return unicode.IsSpace(r) || r == ','
})
var comparer *base.Comparer
var bundleSize int
comparer, err = parseComparer(args[0])
if err == nil {
bundleSize, err = strconv.Atoi(args[1])
}
if err == nil {
schema := colblk.DefaultKeySchema(comparer, bundleSize)
o.KeySchema = schema.Name
o.KeySchemas[o.KeySchema] = schema
}
} else if hooks != nil && hooks.NewKeySchema != nil {
o.KeySchemas[value], err = hooks.NewKeySchema(value)
}
}
case "l0_compaction_concurrency":
o.Experimental.L0CompactionConcurrency, err = strconv.Atoi(value)
case "l0_compaction_file_threshold":
Expand Down Expand Up @@ -2006,6 +2058,14 @@ func (o *Options) Validate() error {
if o.TableCache != nil && o.Cache != o.TableCache.cache {
fmt.Fprintf(&buf, "underlying cache in the TableCache and the Cache dont match\n")
}
if len(o.KeySchemas) > 0 {
if o.KeySchema == "" {
fmt.Fprintf(&buf, "KeySchemas is set but KeySchema is not\n")
}
if _, ok := o.KeySchemas[o.KeySchema]; !ok {
fmt.Fprintf(&buf, "KeySchema %q not found in KeySchemas\n", o.KeySchema)
}
}
if buf.Len() == 0 {
return nil
}
Expand All @@ -2019,7 +2079,7 @@ func (o *Options) MakeReaderOptions() sstable.ReaderOptions {
if o != nil {
readerOpts.Comparer = o.Comparer
readerOpts.Filters = o.Filters
readerOpts.KeySchema = o.KeySchema
readerOpts.KeySchemas = o.KeySchemas
readerOpts.LoadBlockSema = o.LoadBlockSema
readerOpts.LoggerAndTracer = o.LoggerAndTracer
readerOpts.Merger = o.Merger
Expand Down Expand Up @@ -2054,7 +2114,7 @@ func (o *Options) MakeWriterOptions(level int, format sstable.TableFormat) sstab
writerOpts.FilterPolicy = levelOpts.FilterPolicy
writerOpts.FilterType = levelOpts.FilterType
writerOpts.IndexBlockSize = levelOpts.IndexBlockSize
writerOpts.KeySchema = o.KeySchema
writerOpts.KeySchema = o.KeySchemas[o.KeySchema]
writerOpts.AllocatorSizeClasses = o.AllocatorSizeClasses
writerOpts.NumDeletionsThreshold = o.Experimental.NumDeletionsThreshold
writerOpts.DeletionSizeRatioThreshold = o.Experimental.DeletionSizeRatioThreshold
Expand Down
1 change: 1 addition & 0 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func TestOptionsString(t *testing.T) {
flush_delay_range_key=0s
flush_split_bytes=4194304
format_major_version=13
key_schema=DefaultKeySchema(leveldb.BytewiseComparator,16)
l0_compaction_concurrency=10
l0_compaction_file_threshold=500
l0_compaction_threshold=4
Expand Down
5 changes: 3 additions & 2 deletions replay/testdata/replay
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ tree
614 000007.sst
0 LOCK
133 MANIFEST-000001
1359 OPTIONS-000003
1418 OPTIONS-000003
0 marker.format-version.000001.013
0 marker.manifest.000001.MANIFEST-000001
simple/
Expand All @@ -21,7 +21,7 @@ tree
25 000004.log
586 000005.sst
85 MANIFEST-000001
1359 OPTIONS-000003
1418 OPTIONS-000003
0 marker.format-version.000001.013
0 marker.manifest.000001.MANIFEST-000001

Expand All @@ -42,6 +42,7 @@ cat build/OPTIONS-000003
flush_delay_range_key=0s
flush_split_bytes=4194304
format_major_version=13
key_schema=DefaultKeySchema(pebble.internal.testkeys,16)
l0_compaction_concurrency=10
l0_compaction_file_threshold=500
l0_compaction_threshold=4
Expand Down
4 changes: 2 additions & 2 deletions replay/testdata/replay_paced
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ tree
0 LOCK
133 MANIFEST-000001
205 MANIFEST-000010
1359 OPTIONS-000003
1418 OPTIONS-000003
0 marker.format-version.000001.013
0 marker.manifest.000002.MANIFEST-000010
high_read_amp/
Expand All @@ -26,7 +26,7 @@ tree
39 000008.log
560 000009.sst
157 MANIFEST-000010
1359 OPTIONS-000003
1418 OPTIONS-000003
0 marker.format-version.000001.013
0 marker.manifest.000001.MANIFEST-000010

Expand Down
2 changes: 2 additions & 0 deletions sstable/colblk/data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
// attributes inlined within the data block. For inlined-values, the
// user-defined value columns would be implicitly null.
type KeySchema struct {
Name string
ColumnTypes []DataType
NewKeyWriter func() KeyWriter
NewKeySeeker func() KeySeeker
Expand Down Expand Up @@ -161,6 +162,7 @@ var defaultKeySeekerPool = sync.Pool{
// into its prefix and suffix. Prefixes are sorted in lexicographical order.
func DefaultKeySchema(comparer *base.Comparer, prefixBundleSize int) KeySchema {
return KeySchema{
Name: fmt.Sprintf("DefaultKeySchema(%s,%d)", comparer.Name, prefixBundleSize),
ColumnTypes: defaultSchemaColumnTypes,
NewKeyWriter: func() KeyWriter {
kw := &defaultKeyWriter{comparer: comparer}
Expand Down
1 change: 1 addition & 0 deletions sstable/colblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func newColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumn

w.props.ComparerName = o.Comparer.Name
w.props.CompressionName = o.Compression.String()
w.props.KeySchemaName = o.KeySchema.Name
w.props.MergerName = o.MergerName

w.writeQueue.ch = make(chan *compressedBlock)
Expand Down
4 changes: 2 additions & 2 deletions sstable/colblk_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func TestColumnarWriter(t *testing.T) {
}
var err error
r, err = NewReader(context.Background(), obj, ReaderOptions{
Comparer: testkeys.Comparer,
KeySchema: keySchema,
Comparer: testkeys.Comparer,
KeySchemas: KeySchemas{keySchema.Name: keySchema},
})
require.NoError(t, err)
return "ok"
Expand Down
Loading

0 comments on commit c719c4e

Please sign in to comment.