Skip to content

Commit

Permalink
Schema support (#205)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Sep 16, 2024
1 parent 1326459 commit a9ac304
Show file tree
Hide file tree
Showing 13 changed files with 441 additions and 131 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ linters:
# - errorlint
# - exhaustive
# - exhaustivestruct
- exportloopref
- copyloopvar
- forbidigo
# - forcetypeassert
# - funlen
Expand Down
4 changes: 2 additions & 2 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error {

switch s.config.CDCMode {
case source.CDCModeAuto:
// TODO add logic that checks if the DB supports logical replication (since that's the only thing we support at the moment)
// TODO add logic that checks if the DB supports logical replication
// (since that's the only thing we support at the moment)
fallthrough
case source.CDCModeLogrepl:
i, err := logrepl.NewCombinedIterator(ctx, s.pool, logrepl.Config{
Expand All @@ -113,7 +114,6 @@ func (s *Source) Open(ctx context.Context, pos opencdc.Position) error {
TableKeys: s.tableKeys,
WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial,
SnapshotFetchSize: s.config.SnapshotFetchSize,
WithAvroSchema: s.config.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create logical replication iterator: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
)

// Config holds configuration values for CDCIterator.
// CDCConfig holds configuration values for CDCIterator.
type CDCConfig struct {
LSN pglogrepl.LSN
SlotName string
Expand Down Expand Up @@ -65,7 +65,7 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI
}

records := make(chan opencdc.Record)
handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, c.WithAvroSchema, records)
handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records)

sub, err := internal.CreateSubscription(
ctx,
Expand Down
164 changes: 159 additions & 5 deletions source/logrepl/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
"time"

"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-commons/schema"
"github.com/conduitio/conduit-connector-postgres/source/position"
"github.com/conduitio/conduit-connector-postgres/test"
sdkschema "github.com/conduitio/conduit-connector-sdk/schema"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hamba/avro/v2"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/matryer/is"
Expand Down Expand Up @@ -146,7 +149,11 @@ func TestCDCIterator_Next(t *testing.T) {
want: opencdc.Record{
Operation: opencdc.OperationCreate,
Metadata: map[string]string{
opencdc.MetadataCollection: table,
opencdc.MetadataCollection: table,
opencdc.MetadataKeySchemaSubject: table + "_key",
opencdc.MetadataKeySchemaVersion: "1",
opencdc.MetadataPayloadSchemaSubject: table + "_payload",
opencdc.MetadataPayloadSchemaVersion: "1",
},
Key: opencdc.StructuredData{"id": int64(6)},
Payload: opencdc.Change{
Expand Down Expand Up @@ -175,7 +182,11 @@ func TestCDCIterator_Next(t *testing.T) {
want: opencdc.Record{
Operation: opencdc.OperationUpdate,
Metadata: map[string]string{
opencdc.MetadataCollection: table,
opencdc.MetadataCollection: table,
opencdc.MetadataKeySchemaSubject: table + "_key",
opencdc.MetadataKeySchemaVersion: "1",
opencdc.MetadataPayloadSchemaSubject: table + "_payload",
opencdc.MetadataPayloadSchemaVersion: "1",
},
Key: opencdc.StructuredData{"id": int64(1)},
Payload: opencdc.Change{
Expand Down Expand Up @@ -205,7 +216,11 @@ func TestCDCIterator_Next(t *testing.T) {
want: opencdc.Record{
Operation: opencdc.OperationUpdate,
Metadata: map[string]string{
opencdc.MetadataCollection: table,
opencdc.MetadataCollection: table,
opencdc.MetadataKeySchemaSubject: table + "_key",
opencdc.MetadataKeySchemaVersion: "1",
opencdc.MetadataPayloadSchemaSubject: table + "_payload",
opencdc.MetadataPayloadSchemaVersion: "1",
},
Key: opencdc.StructuredData{"id": int64(1)},
Payload: opencdc.Change{
Expand Down Expand Up @@ -244,7 +259,11 @@ func TestCDCIterator_Next(t *testing.T) {
want: opencdc.Record{
Operation: opencdc.OperationDelete,
Metadata: map[string]string{
opencdc.MetadataCollection: table,
opencdc.MetadataCollection: table,
opencdc.MetadataKeySchemaSubject: table + "_key",
opencdc.MetadataKeySchemaVersion: "1",
opencdc.MetadataPayloadSchemaSubject: table + "_payload",
opencdc.MetadataPayloadSchemaVersion: "1",
},
Key: opencdc.StructuredData{"id": int64(4)},
Payload: opencdc.Change{
Expand Down Expand Up @@ -274,7 +293,11 @@ func TestCDCIterator_Next(t *testing.T) {
want: opencdc.Record{
Operation: opencdc.OperationDelete,
Metadata: map[string]string{
opencdc.MetadataCollection: table,
opencdc.MetadataCollection: table,
opencdc.MetadataKeySchemaSubject: table + "_key",
opencdc.MetadataKeySchemaVersion: "1",
opencdc.MetadataPayloadSchemaSubject: table + "_payload",
opencdc.MetadataPayloadSchemaVersion: "1",
},
Key: opencdc.StructuredData{"id": int64(3)},
Payload: opencdc.Change{
Expand Down Expand Up @@ -497,3 +520,134 @@ func fetchSlotStats(t *testing.T, c test.Querier, slotName string) (pglogrepl.LS
time.Sleep(100 * time.Millisecond)
}
}

func TestCDCIterator_Schema(t *testing.T) {
ctx := context.Background()

pool := test.ConnectPool(ctx, t, test.RepmgrConnString)
table := test.SetupTestTable(ctx, t, pool)

i := testCDCIterator(ctx, t, pool, table, true)
<-i.sub.Ready()

t.Run("initial table schema", func(t *testing.T) {
is := is.New(t)

_, err := pool.Exec(
ctx,
fmt.Sprintf(`INSERT INTO %s (id, column1, column2, column3, column4, column5)
VALUES (6, 'bizz', 456, false, 12.3, 14)`, table),
)
is.NoErr(err)

r, err := i.Next(ctx)
is.NoErr(err)

assertPayloadSchemaOK(ctx, is, test.TestTableAvroSchemaV1, table, r)
assertKeySchemaOK(ctx, is, table, r)
})

t.Run("column added", func(t *testing.T) {
is := is.New(t)

_, err := pool.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s ADD COLUMN column6 timestamp;`, table))
is.NoErr(err)

_, err = pool.Exec(
ctx,
fmt.Sprintf(`INSERT INTO %s (id, key, column1, column2, column3, column4, column5, column6)
VALUES (7, decode('aabbcc', 'hex'), 'example data 1', 100, true, 12345.678, 12345, '2023-09-09 10:00:00');`, table),
)
is.NoErr(err)

r, err := i.Next(ctx)
is.NoErr(err)

assertPayloadSchemaOK(ctx, is, test.TestTableAvroSchemaV2, table, r)
assertKeySchemaOK(ctx, is, table, r)
})

t.Run("column removed", func(t *testing.T) {
is := is.New(t)

_, err := pool.Exec(ctx, fmt.Sprintf(`ALTER TABLE %s DROP COLUMN column4, DROP COLUMN column5;`, table))
is.NoErr(err)

_, err = pool.Exec(
ctx,
fmt.Sprintf(`INSERT INTO %s (id, key, column1, column2, column3, column6)
VALUES (8, decode('aabbcc', 'hex'), 'example data 1', 100, true, '2023-09-09 10:00:00');`, table),
)
is.NoErr(err)

r, err := i.Next(ctx)
is.NoErr(err)

assertPayloadSchemaOK(ctx, is, test.TestTableAvroSchemaV3, table, r)
assertKeySchemaOK(ctx, is, table, r)
})
}

func assertPayloadSchemaOK(ctx context.Context, is *is.I, wantSchemaTemplate string, table string, r opencdc.Record) {
gotConduitSch, err := getPayloadSchema(ctx, r)
is.NoErr(err)

want, err := avro.Parse(fmt.Sprintf(wantSchemaTemplate, table+"_payload"))
is.NoErr(err)

got, err := avro.ParseBytes(gotConduitSch.Bytes)
is.NoErr(err)

is.Equal(want.String(), got.String())
}

func assertKeySchemaOK(ctx context.Context, is *is.I, table string, r opencdc.Record) {
gotConduitSch, err := getKeySchema(ctx, r)
is.NoErr(err)

want, err := avro.Parse(fmt.Sprintf(test.TestTableKeyAvroSchema, table+"_key"))
is.NoErr(err)

got, err := avro.ParseBytes(gotConduitSch.Bytes)
is.NoErr(err)

is.Equal(want.String(), got.String())
}

func getPayloadSchema(ctx context.Context, r opencdc.Record) (schema.Schema, error) {
payloadSubj, err := r.Metadata.GetPayloadSchemaSubject()
if err != nil {
return schema.Schema{}, fmt.Errorf("GetPayloadSchemaSubject failed: %w", err)
}

payloadV, err := r.Metadata.GetPayloadSchemaVersion()
if err != nil {
return schema.Schema{}, fmt.Errorf("GetPayloadSchemaVersion failed: %w", err)
}

payloadSch, err := sdkschema.Get(ctx, payloadSubj, payloadV)
if err != nil {
return schema.Schema{}, fmt.Errorf("failed getting schema: %w", err)
}

return payloadSch, nil
}

func getKeySchema(ctx context.Context, r opencdc.Record) (schema.Schema, error) {
keySubj, err := r.Metadata.GetKeySchemaSubject()
if err != nil {
return schema.Schema{}, fmt.Errorf("GetKeySchemaSubject failed: %w", err)
}

keyV, err := r.Metadata.GetKeySchemaVersion()
if err != nil {
return schema.Schema{}, fmt.Errorf("GetKeySchemaVersion failed: %w", err)
}

keySch, err := sdkschema.Get(ctx, keySubj, keyV)
if err != nil {
return schema.Schema{}, fmt.Errorf("failed getting schema: %w", err)
}

return keySch, nil
}
13 changes: 5 additions & 8 deletions source/logrepl/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type Config struct {
TableKeys map[string]string
WithSnapshot bool
SnapshotFetchSize int
WithAvroSchema bool
}

// Validate performs validation tasks on the config.
Expand Down Expand Up @@ -179,7 +178,6 @@ func (c *CombinedIterator) initCDCIterator(ctx context.Context, pos position.Pos
PublicationName: c.conf.PublicationName,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
WithAvroSchema: c.conf.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create CDC iterator: %w", err)
Expand All @@ -201,12 +199,11 @@ func (c *CombinedIterator) initSnapshotIterator(ctx context.Context, pos positio
}

snapshotIterator, err := snapshot.NewIterator(ctx, c.pool, snapshot.Config{
Position: c.conf.Position,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
TXSnapshotID: c.cdcIterator.TXSnapshotID(),
FetchSize: c.conf.SnapshotFetchSize,
WithAvroSchema: c.conf.WithAvroSchema,
Position: c.conf.Position,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
TXSnapshotID: c.cdcIterator.TXSnapshotID(),
FetchSize: c.conf.SnapshotFetchSize,
})
if err != nil {
return fmt.Errorf("failed to create snapshot iterator: %w", err)
Expand Down
Loading

0 comments on commit a9ac304

Please sign in to comment.