Skip to content

[feature] ab-cast etcd #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@ jobs:
- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Setup RabbitMQ with username and password
uses: getong/[email protected]
with:
rabbitmq version: '3.8.2-management-alpine'
host port: 5672
rabbitmq user: 'guest'
rabbitmq password: 'guest'
rabbitmq vhost: '/'

- name: All
run: make ci
- name: Setup etcd server and make
run: |
ETCD_VER=$(curl --silent https://api.github.com/repos/etcd-io/etcd/releases/latest | grep "tag_name" | cut -d ' ' -f4 | awk -F'"' '$0=$2')
curl -sL https://storage.googleapis.com/etcd/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o ./etcd.tar.gz
mkdir etcd && tar xzvf etcd.tar.gz -C etcd --strip-components=1
etcd/etcd > /dev/null &
sleep 30
make ci
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ endif

test_rule: # @HELP execute tests
@echo "executing tests"
GOTRACEBACK=all go test $(TESTARGS) -count=5 -timeout=120s -race ./test/...
GOTRACEBACK=all go test $(TESTARGS) -count=5 -timeout=120s -tags batchtest -race ./test/...
GOTRACEBACK=all go test $(TESTARGS) -count=1 -timeout=60s -race ./test/...
GOTRACEBACK=all go test $(TESTARGS) -count=1 -timeout=60s -tags batchtest -race ./test/...

lint: # @HELP lint files and format if possible
@echo "executing linter"
Expand Down
16 changes: 11 additions & 5 deletions _examples/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,32 @@ import (
"os/signal"
)

func produce(r *relt.Relt, reader io.Reader) {
func produce(r *relt.Relt, reader io.Reader, ctx context.Context) {
for {
println("Write message:")
scan := bufio.NewScanner(reader)
for scan.Scan() {
message := relt.Send{
Address: relt.DefaultExchangeName,
Data: scan.Bytes(),
}
log.Infof("Publishing message %s to group %s", string(message.Data), message.Address)
if err := r.Broadcast(message); err != nil {
if err := r.Broadcast(ctx, message); err != nil {
log.Errorf("failed sending %#v: %v", message, err)
}
}
}
}

func consume(r *relt.Relt, ctx context.Context) {
listener, err := r.Consume()
if err != nil {
return
}

for {
select {
case message := <-r.Consume():
case message := <-listener:
if message.Error != nil {
log.Errorf("message with error: %#v", message)
}
Expand All @@ -43,11 +49,11 @@ func consume(r *relt.Relt, ctx context.Context) {
func main() {
conf := relt.DefaultReltConfiguration()
conf.Name = "local-test"
relt := relt.NewRelt(*conf)
relt, _ := relt.NewRelt(*conf)
ctx, done := context.WithCancel(context.Background())

go func() {
produce(relt, os.Stdin)
produce(relt, os.Stdin, ctx)
}()

go func() {
Expand Down
32 changes: 26 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,30 @@ module github.com/jabolina/relt
go 1.14

require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/streadway/amqp v1.0.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
github.com/coreos/bbolt v0.0.0-00010101000000-000000000000 // indirect
github.com/coreos/etcd v3.3.25+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/google/uuid v1.2.0
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/prometheus/client_golang v1.9.0 // indirect
github.com/prometheus/common v0.15.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
go.uber.org/goleak v1.1.10
go.uber.org/zap v1.16.0 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
google.golang.org/genproto v0.0.0-20210212180131-e7f2df4ecc2d // indirect
google.golang.org/grpc v1.35.0 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)

replace (
github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.5
google.golang.org/grpc v1.35.0 => google.golang.org/grpc v1.26.0
)
533 changes: 507 additions & 26 deletions go.sum

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions internal/atomic_flag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package internal

import "sync/atomic"

const (
// Constant to represent the `active` state on the Flag.
active = 0x0

// Constant to represent the `inactive` state on the Flag.
inactive = 0x1
)

// An atomic boolean implementation, to act specifically as a flag.
type Flag struct {
flag int32
}

// Verify if the flag still on `active` state.
func (f *Flag) IsActive() bool {
return atomic.LoadInt32(&f.flag) == active
}

// Verify if the flag is on `inactive` state.
func (f *Flag) IsInactive() bool {
return atomic.LoadInt32(&f.flag) == inactive
}

// Transition the flag from `active` to `inactive`.
func (f *Flag) Inactivate() bool {
return atomic.CompareAndSwapInt32(&f.flag, active, inactive)
}
177 changes: 177 additions & 0 deletions internal/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package internal

import (
"context"
"github.com/coreos/etcd/clientv3"
"io"
"time"
)

// A single write requests to be applied to etcd.
type request struct {
// Issuer writer context.
ctx context.Context

// Event to be sent to etcd.
event Event

// Channel to send response back.
response chan error
}

// Configuration for the coordinator.
type CoordinatorConfiguration struct {
// Each Coordinator will handle only a single partition.
// This will avoid peers with overlapping partitions.
Partition string

// Address for etcd server.
Server string

// Parent context that the Coordinator will derive it's own context.
Ctx context.Context

// Handler for managing goroutines.
Handler *GoRoutineHandler
}

// Coordinator interface that should be implemented by the
// atomic broadcast handler.
// Commands should be issued through the coordinator to be delivered
// to other peers
type Coordinator interface {
io.Closer

// Watch for changes on the partition.
// After called, this method will start a new goroutine that only
// returns when the Coordinator context is done.
Watch(received chan<- Event) error

// Issues an Event.
Write(ctx context.Context, event Event) <-chan error
}

// Create a new Coordinator using the given configuration.
// The current implementation is the EtcdCoordinator, backed by etcd.
func NewCoordinator(configuration CoordinatorConfiguration) (Coordinator, error) {
cli, err := clientv3.New(clientv3.Config{
DialTimeout: 30 * time.Second,
Endpoints: []string{configuration.Server},
})
if err != nil {
return nil, err
}
kv := clientv3.NewKV(cli)
ctx, cancel := context.WithCancel(configuration.Ctx)
coord := &EtcdCoordinator{
configuration: configuration,
cli: cli,
kv: kv,
ctx: ctx,
cancel: cancel,
writeChan: make(chan request),
}
configuration.Handler.Spawn(coord.writer)
return coord, nil
}

// EtcdCoordinator will use etcd for atomic broadcast.
type EtcdCoordinator struct {
// Configuration parameters.
configuration CoordinatorConfiguration

// Current Coordinator context, created from the parent context.
ctx context.Context

// Function to cancel the current context.
cancel context.CancelFunc

// A client for the etcd server.
cli *clientv3.Client

// The key-value entry point for issuing requests.
kv clientv3.KV

// Channel to receive write requests.
writeChan chan request
}

// Listen and apply write requests.
// This will keep running while the application context is available.
// Receiving commands through the channel will ensure that they are
// applied synchronously to the etcd.
func (e *EtcdCoordinator) writer() {
for {
select {
case <-e.ctx.Done():
return
case req := <-e.writeChan:
_, err := e.kv.Put(req.ctx, req.event.Key, string(req.event.Value))
req.response <- err
}
}
}

// Starts a new coroutine for watching the Coordinator partition.
// All received information will be published back through the channel
// received as parameter.
//
// After calling a routine will run bounded to the application lifetime.
func (e *EtcdCoordinator) Watch(received chan<- Event) error {
watchChan := e.cli.Watch(e.ctx, e.configuration.Partition)
watchChanges := func() {
for response := range watchChan {
select {
case <-e.ctx.Done():
return
default:
e.handleResponse(response, received)
}
}
}
e.configuration.Handler.Spawn(watchChanges)
return nil
}

// Write the given event using the KV interface.
func (e *EtcdCoordinator) Write(ctx context.Context, event Event) <-chan error {
res := make(chan error)
e.writeChan <- request{
ctx: ctx,
event: event,
response: res,
}
return res
}

// Stop the etcd client connection.
func (e *EtcdCoordinator) Close() error {
e.cancel()
return e.cli.Close()
}

// This method is responsible for handling events from the etcd client.
//
// This method will transform each received event into Event object and
// publish it back using the given channel. A buffered channel will be created
// and a goroutine will be spawned, so we can publish the received messages
// asynchronously without blocking. This can cause the Close to hold, if there
// exists pending messages to be consumed by the channel, this method can cause a deadlock.
func (e *EtcdCoordinator) handleResponse(response clientv3.WatchResponse, received chan<- Event) {
buffered := make(chan Event, len(response.Events))
defer close(buffered)

e.configuration.Handler.Spawn(func() {
for ev := range buffered {
received <- ev
}
})

for _, event := range response.Events {
buffered <- Event{
Key: string(event.Kv.Key),
Value: event.Kv.Value,
Error: nil,
}
}
}
Loading