Skip to content

Commit

Permalink
cdc readme
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Jun 18, 2021
1 parent 0cd613b commit 7c6791a
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 3 deletions.
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ This package contains a **high-performance, columnar, in-memory storage engine**
* Support for **transaction isolation**, allowing you to create transactions and commit/rollback.
* Support for **expiration** of rows based on time-to-live or expiration column.
* Support for **atomic increment/decrement** of numerical values, transactionally.
* Support for **change data stream** that streams all commits consistently.

## Documentation

Expand All @@ -35,6 +36,7 @@ The general idea is to leverage cache-friendly ways of organizing data in [struc
- [Updating Values](#updating-values)
- [Expiring Values](#expiring-values)
- [Transaction Commit and Rollback](#transaction-commit-and-rollback)
- [Streaming Changes](#streaming-changes)
- [Complete Example](#complete-example)
- [Benchmarks](#benchmarks)
- [Contributing](#contributing)
Expand Down Expand Up @@ -308,6 +310,52 @@ players.Query(func(txn *column.Txn) error {
})
```

## Streaming Changes

This library also supports streaming out all transaction commits consistently, as they happen. This allows you to implement your own change data capture (CDC) listeners, stream data into kafka or into a remote database for durability. In order to enable it, you can simply provide an implementation of a `commit.Writer` interface during the creation of the collection.

In the example below we take advantage of the `commit.Channel` implementation of a `commit.Writer` which simply publishes the commits into a go channel. Here we create a buffered channel and keep consuming the commits with a separate goroutine, allowing us to view transactions as they happen in the store.

```go
// Create a new commit writer (simple channel) and a new collection
writer := make(commit.Channel, 1024)
players := NewCollection(column.Options{
Writer: writer,
})

// Read the changes from the channel
go func(){
for commit := writer{
println("commit", commit.Type.String())
}
}()

// ... insert, update or delete
```

On a separate note, this change stream is guaranteed to be consistent and serialized. This means that you can also replicate those changes on another database and synchronize both. In fact, this library also provides `Replay()` method on the collection that allows to do just that. In the example below we create two collections `primary` and `replica` and asychronously replicating all of the commits from the `primary` to the `replica` using the `Replay()` method together with the change stream.

```go
// Create a p rimary collection
writer := make(commit.Channel, 1024)
primary := column.NewCollection(column.Options{
Writer: &writer,
})
primary.CreateColumnsOf(object)

// Replica with the same schema
replica := column.NewCollection()
replica.CreateColumnsOf(object)

// Keep 2 collections in sync
go func() {
for change := range writer {
replica.Replay(change)
}
}()
```


## Complete Example

```go
Expand Down
14 changes: 14 additions & 0 deletions commit/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ const (
TypeDelete // Delete deletes a set of entries in the collection
)

// String returns the string representation of the type
func (t Type) String() string {
switch t {
case TypeStore:
return "store"
case TypeDelete:
return "delete"
case TypeInsert:
return "insert"
default:
return "invalid"
}
}

// --------------------------- Commit ----------------------------

// Commit represents an individual transaction commit. If multiple columns are committed
Expand Down
7 changes: 7 additions & 0 deletions commit/commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,10 @@ func TestCommits(t *testing.T) {
assert.NotEqual(t, unsafe.Pointer(&commit2.Updates), unsafe.Pointer(&clone2.Updates))
assert.NotEqual(t, unsafe.Pointer(&commit3.Inserts), unsafe.Pointer(&clone3.Inserts))
}

func TestType(t *testing.T) {
assert.Equal(t, "store", TypeStore.String())
assert.Equal(t, "delete", TypeDelete.String())
assert.Equal(t, "insert", TypeInsert.String())
assert.Equal(t, "invalid", Type(10).String())
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/kelindar/column
go 1.16

require (
github.com/cheekybits/genny v1.0.0
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kelindar/bitmap v1.0.5
github.com/stretchr/testify v1.7.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE=
github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down

0 comments on commit 7c6791a

Please sign in to comment.