Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Meta
func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string, ops ...*storage.Operation) error {
// Use protobuf if feature gate is enabled
if metadata.FilelogProtobufCheckpointEncodingFeatureGate.IsEnabled() {
return SaveKeyProto(ctx, persister, rmds, key, ops...)
return saveKeyProto(ctx, persister, rmds, key, ops...)
}

// Otherwise use JSON (default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@ func tryLoadProtobuf(encoded []byte) ([]*reader.Metadata, error) {
return rmds, nil
}

// SaveProto syncs the most recent set of files to the database using protobuf encoding
func SaveProto(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata) error {
return SaveKeyProto(ctx, persister, rmds, knownFilesKey)
}

func SaveKeyProto(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string, ops ...*storage.Operation) error {
func saveKeyProto(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string, ops ...*storage.Operation) error {
pbList := &pb.MetadataList{
Metadata: make([]*pb.Metadata, 0, len(rmds)),
}
Expand Down Expand Up @@ -73,40 +68,6 @@ func SaveKeyProto(ctx context.Context, persister operator.Persister, rmds []*rea
return errs
}

// LoadProto loads the most recent set of files from the database using protobuf encoding
func LoadProto(ctx context.Context, persister operator.Persister) ([]*reader.Metadata, error) {
return LoadKeyProto(ctx, persister, knownFilesKey)
}

func LoadKeyProto(ctx context.Context, persister operator.Persister, key string) ([]*reader.Metadata, error) {
encoded, err := persister.Get(ctx, key)
if err != nil {
return nil, err
}

if encoded == nil {
return []*reader.Metadata{}, nil
}

pbList := &pb.MetadataList{}
if err := proto.Unmarshal(encoded, pbList); err != nil {
return nil, fmt.Errorf("unmarshal protobuf: %w", err)
}

rmds := make([]*reader.Metadata, 0, len(pbList.Metadata))
var errs error
for _, pbMeta := range pbList.Metadata {
rmd, err := pbToMetadata(pbMeta)
if err != nil {
errs = multierr.Append(errs, fmt.Errorf("convert protobuf to metadata: %w", err))
continue
}
rmds = append(rmds, rmd)
}

return rmds, errs
}

// metadataToPb converts reader.Metadata to protobuf Metadata
func metadataToPb(rmd *reader.Metadata) (*pb.Metadata, error) {
pbMeta := &pb.Metadata{
Expand Down
Loading