Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for reading replication slot #171

Merged
merged 3 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion source.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/conduitio/conduit-commons/csync"
"github.com/conduitio/conduit-connector-postgres/source"
"github.com/conduitio/conduit-connector-postgres/source/cpool"
"github.com/conduitio/conduit-connector-postgres/source/logrepl"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error {
}

func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
pool, err := pgxpool.New(ctx, s.config.URL)
pool, err := cpool.New(ctx, s.config.URL)
if err != nil {
return fmt.Errorf("failed to create a connection pool to database: %w", err)
}
Expand Down
72 changes: 72 additions & 0 deletions source/cpool/cpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cpool

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

type replicationCtxKey struct{}

func WithReplication(ctx context.Context) context.Context {
return context.WithValue(ctx, replicationCtxKey{}, true)
}

// New returns new pgxpool.Pool with added hooks.
func New(ctx context.Context, conninfo string) (*pgxpool.Pool, error) {
config, err := pgxpool.ParseConfig(conninfo)
if err != nil {
return nil, fmt.Errorf("failed to parse pool config: %w", err)
}

config.BeforeAcquire = beforeAcquireHook
config.BeforeConnect = beforeConnectHook
config.AfterRelease = afterReleaseHook

pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return nil, err
}

return pool, nil
}

// beforeAcquireHook ensures purpose specific connections are returned:
// * If a replication connection is requested, ensure the connection has replication enabled.
// * If a regular connection is requested, return non-replication connections.
func beforeAcquireHook(ctx context.Context, conn *pgx.Conn) bool {
replReq := ctx.Value(replicationCtxKey{}) != nil
replOn := conn.Config().RuntimeParams["replication"] != ""

return replReq == replOn
}

// beforeConnectHook customizes the configuration of the new connection.
func beforeConnectHook(ctx context.Context, config *pgx.ConnConfig) error {
if v := ctx.Value(replicationCtxKey{}); v != nil {
config.RuntimeParams["replication"] = "database"
}

return nil
}

// afterReleaseHook marks all replication connections for disposal.
func afterReleaseHook(conn *pgx.Conn) bool {
return conn.Config().RuntimeParams["replication"] == ""
}
42 changes: 7 additions & 35 deletions source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/conduitio/conduit-connector-postgres/source/logrepl/internal"
"github.com/conduitio/conduit-connector-postgres/source/position"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgconn"
)

const (
subscriberDoneTimeout = time.Second * 2
"github.com/jackc/pgx/v5/pgxpool"
)

// Config holds configuration values for CDCIterator.
Expand All @@ -45,21 +40,15 @@ type CDCConfig struct {
type CDCIterator struct {
config CDCConfig
records chan sdk.Record
pgconn *pgconn.PgConn

sub *internal.Subscription
}

// NewCDCIterator initializes logical replication by creating the publication and subscription manager.
func NewCDCIterator(ctx context.Context, pgconf *pgconn.Config, c CDCConfig) (*CDCIterator, error) {
conn, err := pgconn.ConnectConfig(ctx, withReplication(pgconf))
if err != nil {
return nil, fmt.Errorf("could not establish replication connection: %w", err)
}

func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCIterator, error) {
if err := internal.CreatePublication(
ctx,
conn,
pool,
c.PublicationName,
internal.CreatePublicationOptions{Tables: c.Tables},
); err != nil {
Expand All @@ -77,7 +66,7 @@ func NewCDCIterator(ctx context.Context, pgconf *pgconn.Config, c CDCConfig) (*C

sub, err := internal.CreateSubscription(
ctx,
conn,
pool,
c.SlotName,
c.PublicationName,
c.Tables,
Expand All @@ -91,7 +80,6 @@ func NewCDCIterator(ctx context.Context, pgconf *pgconn.Config, c CDCConfig) (*C
return &CDCIterator{
config: c,
records: records,
pgconn: conn,
sub: sub,
}, nil
}
Expand Down Expand Up @@ -181,14 +169,11 @@ func (i *CDCIterator) Ack(_ context.Context, sdkPos sdk.Position) error {
// or the context gets canceled. If the subscription stopped with an unexpected
// error, the error is returned.
func (i *CDCIterator) Teardown(ctx context.Context) error {
defer i.pgconn.Close(ctx)

if !i.subscriberReady() {
return nil
if i.sub != nil {
return i.sub.Teardown(ctx)
}

i.sub.Stop()
return i.sub.Wait(ctx, subscriberDoneTimeout)
return nil
}

// subscriberReady returns true when the subscriber is running.
Expand All @@ -207,16 +192,3 @@ func (i *CDCIterator) subscriberReady() bool {
func (i *CDCIterator) TXSnapshotID() string {
return i.sub.TXSnapshotID
}

// withReplication adds the `replication` parameter to the connection config.
// This will uprgade a regular command connection to accept replication commands.
func withReplication(pgconf *pgconn.Config) *pgconn.Config {
c := pgconf.Copy()
if c.RuntimeParams == nil {
c.RuntimeParams = make(map[string]string)
}
// enable replication on connection
c.RuntimeParams["replication"] = "database"

return c
}
40 changes: 8 additions & 32 deletions source/logrepl/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/matryer/is"
)
Expand All @@ -40,12 +39,10 @@ func TestCDCIterator_New(t *testing.T) {
tests := []struct {
name string
setup func(t *testing.T) CDCConfig
pgconf *pgconn.Config
wantErr error
}{
{
name: "publication already exists",
pgconf: &pool.Config().ConnConfig.Config,
name: "publication already exists",
setup: func(t *testing.T) CDCConfig {
is := is.New(t)
table := test.SetupTestTable(ctx, t, pool)
Expand All @@ -66,21 +63,7 @@ func TestCDCIterator_New(t *testing.T) {
},
},
{
name: "fails to connect",
pgconf: func() *pgconn.Config {
c := pool.Config().ConnConfig.Config
c.Port = 31337

return &c
}(),
setup: func(*testing.T) CDCConfig {
return CDCConfig{}
},
wantErr: errors.New("could not establish replication connection"),
},
{
name: "fails to create publication",
pgconf: &pool.Config().ConnConfig.Config,
name: "fails to create publication",
setup: func(*testing.T) CDCConfig {
return CDCConfig{
PublicationName: "foobar",
Expand All @@ -89,8 +72,7 @@ func TestCDCIterator_New(t *testing.T) {
wantErr: errors.New("requires at least one table"),
},
{
name: "fails to create subscription",
pgconf: &pool.Config().ConnConfig.Config,
name: "fails to create subscription",
setup: func(t *testing.T) CDCConfig {
is := is.New(t)
table := test.SetupTestTable(ctx, t, pool)
Expand Down Expand Up @@ -118,7 +100,7 @@ func TestCDCIterator_New(t *testing.T) {

config := tt.setup(t)

_, err := NewCDCIterator(ctx, tt.pgconf, config)
i, err := NewCDCIterator(ctx, pool, config)
if tt.wantErr != nil {
if match := strings.Contains(err.Error(), tt.wantErr.Error()); !match {
t.Logf("%s != %s", err.Error(), tt.wantErr.Error())
Expand All @@ -127,6 +109,9 @@ func TestCDCIterator_New(t *testing.T) {
} else {
is.NoErr(err)
}
if i != nil {
is.NoErr(i.Teardown(ctx))
}
})
}
}
Expand Down Expand Up @@ -276,8 +261,6 @@ func TestCDCIterator_Next_Fail(t *testing.T) {
}

func TestCDCIterator_EnsureLSN(t *testing.T) {
// t.Skip()

ctx := context.Background()
is := is.New(t)

Expand Down Expand Up @@ -370,13 +353,6 @@ func TestCDCIterator_Ack(t *testing.T) {
}
}

func Test_withReplication(t *testing.T) {
is := is.New(t)

c := withReplication(&pgconn.Config{})
is.Equal(c.RuntimeParams["replication"], "database")
}

func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, table string, start bool) *CDCIterator {
is := is.New(t)
config := CDCConfig{
Expand All @@ -386,7 +362,7 @@ func testCDCIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, tabl
SlotName: table, // table is random, reuse for slot name
}

i, err := NewCDCIterator(ctx, &pool.Config().ConnConfig.Config, config)
i, err := NewCDCIterator(ctx, pool, config)
is.NoErr(err)

i.sub.StatusTimeout = 1 * time.Second
Expand Down
35 changes: 11 additions & 24 deletions source/logrepl/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (

"github.com/conduitio/conduit-connector-postgres/source/logrepl/internal"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)

type CleanupConfig struct {
Expand All @@ -36,21 +35,11 @@ type CleanupConfig struct {
func Cleanup(ctx context.Context, c CleanupConfig) error {
logger := sdk.Logger(ctx)

pgconfig, err := pgconn.ParseConfig(c.URL)
pool, err := pgxpool.New(ctx, c.URL)
if err != nil {
return fmt.Errorf("failed to parse config URL: %w", err)
return fmt.Errorf("failed to connect to database: %w", err)
}

if pgconfig.RuntimeParams == nil {
pgconfig.RuntimeParams = make(map[string]string)
}
pgconfig.RuntimeParams["replication"] = "database"

conn, err := pgconn.ConnectConfig(ctx, pgconfig)
if err != nil {
return fmt.Errorf("could not establish replication connection: %w", err)
}
defer conn.Close(ctx)
defer pool.Close()

var errs []error

Expand All @@ -61,18 +50,16 @@ func Cleanup(ctx context.Context, c CleanupConfig) error {

if c.SlotName != "" {
// Terminate any outstanding backends which are consuming the slot before deleting it.
mrr := conn.Exec(ctx, fmt.Sprintf(
"SELECT pg_terminate_backend(active_pid) FROM pg_replication_slots WHERE slot_name='%s' AND active=true", c.SlotName,
))
if err := mrr.Close(); err != nil {
if _, err := pool.Exec(
ctx,
"SELECT pg_terminate_backend(active_pid) FROM pg_replication_slots WHERE slot_name=$1 AND active=true", c.SlotName,
); err != nil {
errs = append(errs, fmt.Errorf("failed to terminate active backends on slot: %w", err))
}

if err := pglogrepl.DropReplicationSlot(
if _, err := pool.Exec(
ctx,
conn,
c.SlotName,
pglogrepl.DropReplicationSlotOptions{},
"SELECT pg_drop_replication_slot($1)", c.SlotName,
); err != nil {
errs = append(errs, fmt.Errorf("failed to clean up replication slot %q: %w", c.SlotName, err))
}
Expand All @@ -83,7 +70,7 @@ func Cleanup(ctx context.Context, c CleanupConfig) error {
if c.PublicationName != "" {
if err := internal.DropPublication(
ctx,
conn,
pool,
c.PublicationName,
internal.DropPublicationOptions{IfExists: true},
); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion source/logrepl/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (c *CombinedIterator) initCDCIterator(ctx context.Context, pos position.Pos
return fmt.Errorf("failed to parse LSN in position: %w", err)
}

cdcIterator, err := NewCDCIterator(ctx, &c.pool.Config().ConnConfig.Config, CDCConfig{
cdcIterator, err := NewCDCIterator(ctx, c.pool, CDCConfig{
LSN: lsn,
SlotName: c.conf.SlotName,
PublicationName: c.conf.PublicationName,
Expand Down
3 changes: 2 additions & 1 deletion source/logrepl/combined_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestCombinedIterator_New(t *testing.T) {
}

func TestCombinedIterator_Next(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

is := is.New(t)
Expand Down Expand Up @@ -220,6 +220,7 @@ func TestCombinedIterator_Next(t *testing.T) {
WithSnapshot: true,
})
is.NoErr(err)

_, err = pool.Exec(ctx, fmt.Sprintf(
`INSERT INTO %s (id, column1, column2, column3, column4, column5)
VALUES (7, 'buzz', 10101, true, 121.9, 51)`,
Expand Down
Loading