Skip to content

Simple Golang implementation for transactional outbox pattern for PostgreSQL using jackc/pgx driver

License

Notifications You must be signed in to change notification settings

nikolayk812/pgx-outbox

Repository files navigation

CI Status Go Report Card Go Reference License Coverage Status Dependabot Status Go version Release

Project Logo

pgx-outbox

This is a simple Golang implementation for transactional outbox pattern for PostgreSQL using jackc/pgx driver.

More advanced options are described in Revisiting the Outbox Pattern article by Gunnar Morling.

Motivation behind this library is to provide a generic extensible implementation to avoid boilerplate code in projects.

Note: this is not a general-purpose Postgres queue, even though internal implementation is based on a table with a queue-like structure.

Diagram

How to use

1. Add database migration to a project:

CREATE TABLE IF NOT EXISTS outbox_messages
(
    id           BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,

    broker       TEXT                                NOT NULL,
    topic        TEXT                                NOT NULL,
    metadata     JSONB,
    payload      JSONB                               NOT NULL,

    created_at   TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
    published_at TIMESTAMP
);

CREATE INDEX IF NOT EXISTS idx_outbox_messages_published_at_null ON outbox_messages (published_at) WHERE published_at IS NULL;

The outbox table name can be customized, but the table structure should remain exactly the same.

2. Add outbox.Writer to repository layer:

type repo struct {
	pool *pgxpool.Pool
	
	writer outbox.Writer
	messageMapper types.ToMessageFunc[User]
}

To map your a domain model, i.e. User to the outbox message, implement the types.ToMessageFunc function is service layer and pass it to the repository either in New function or as a repository method parameter.

Start using the writer.Write method in the repository methods which should produce outbox messages.

func (r *repo) CreateUser(ctx context.Context, user User) (u User, txErr error) {
	// create a transaction, commit/rollback in defer() depending on txErr

	user, err = r.createUser(ctx, tx, user)
	if err != nil {
		return u, fmt.Errorf("createUser: %w", err)
	}

	message, err := r.messageMapper(user)
	if err != nil {
		return u, fmt.Errorf("messageMapper: %w", err)
	}

	if _, err := r.writer.Write(ctx, tx, message); err != nil {
		return u, fmt.Errorf("writer.Write: %w", err)
	}

	return user, nil
}

See outbox.Writer example in repo.go of the 01_sns directory.

3. Add outbox.Forwarder to a cronjob:

forwarder, err := outbox.NewForwarderFromPool("outbox_messages", pool, publisher)

stats, err := forwarder.Forward(ctx, 10)
slog.Info("forwarded", "stats", stats)

where pool is a pgxpool.Pool and publisher is an implementation of outbox.Publisher.

This library provides reference publisher implementation for AWS SNS publisher in the sns module.

publisher, err := outboxSns.NewPublisher(awsSnsCli, messageTransformer{})

where messageTransformer is an implementation of outboxSns.MessageTransformer interface, for example:

func (mt messageTransformer) Transform(message types.Message) (*awsSns.PublishInput, error) {
	topicARN := fmt.Sprintf("arn:aws:sns:%s:%s:%s", tc.region, tc.accountID, message.Topic)

	return &awsSns.PublishInput{
		Message:  aws.String(string(message.Payload)),
		TopicArn: &topicARN,
	}, nil
}

See outbox.Forwarder example in main.go of the 01_sns directory.

Examples

1. SNS

Source code and instructions for the example are located in the examples/01_sns directory.

Example 1 diagram

Stargazers over time

Stargazers over time

Alternatives

About

Simple Golang implementation for transactional outbox pattern for PostgreSQL using jackc/pgx driver

Resources

License

Stars

Watchers

Forks