- Node event stream
- The node event stream publishes important domain events about the state transitions of the blockchain to clients subscribed to the node event stream. The node event stream is the mechanism to notify external applications about state changes on the blockchain. State changes on the blockchain include confirmation of a new block, confirmation of every transaction from the confirmed block. Every node on the blockchain publishes important domain events to the node event stream. The node event stream is multiplexed over the subscribed clients, so each subscribed client gets a replica of either all events published to the node event stream or a subset of events in accordance with the requested event types e.g. only confirmed blocks, or only confirmed transactions. When a client subscribes to the node event stream, the client specifies the types of events that the client is interested in. At any time the client can stop consuming events from the node event stream. In this case the node stops forwarding domain events on this specific channel. The node event stream multiplexed over client subscriptions decouples the node infrastructure from external applications, takes advantage of the scalability and resilience of the publish-subscribe style of communication, and isolates each subscribed client in a dedicated execution flow that does not impact other subscribed clients in face of failure of a specific subscription
- Event type
- The
Event
type represents a domain event that informs about a state transition on the blockchain node in particular and on the blockchain network in general. The event type carries information about the type of event that specifies the event object e.g. a transaction, a block; the action taken on the event object e.g. validated; and the encoded body of the event object providing further details about the event object. The set of supported types of events is defined by theEventType
type. The currently supported events are all known events, only transactions, only blocksType EventType
Type of event of the event object Action string
Action taken on the event object Body []byte
Encoded body of the event object type EventType uint64 const ( EvAll EventType = 0 EvTx EventType = 1 EvBlock EventType = 2 ) type Event struct { Type EventType `json:"type"` Action string `json:"action"` Body []byte `json:"body"` } func NewEvent(evType EventType, action string, body []byte) Event { return Event{Type: evType, Action: action, Body: body} }
- Event publisher interface
- The
EventPublisher
interface provides the uniform access for any node component to publish different types of domain events through to the node event stream. The event publisher interface decouples each node component e.g. the block proposer from a concrete implementation of the node event stream. This design allows independent evolution of the node event stream implementation without impacting the node components that publish domain events to the node event streamtype EventPublisher interface { PublishEvent(event Event) }
- Event stream type
- The
EventStream
type connects the single inbound event channel with the set of outbound event stream channels each dedicated to a subscribed client. The event stream type receives domain events published by node components through the event publisher interface and multiplexes the received domain events to every subscribed client through the dedicated outbound event stream channel. The event stream is concurrency safe. The single inbound event channel automatically handles concurrent requests to publish events from the node components. A mutex is used to handle concurrent requests to add new client subscriptions and to remove closed client subscriptions. The event stream type is fully integrated into the node graceful shutdown mechanism. The node shared context hierarchy signals a graceful shutdown. The node shared wait group gives time to node components to terminate gracefully before terminating the node’s main goroutine. The event stream type implements the event publisher interface. This allows any node component interested in publishing the domain events to depend only on the event publisher interface, rather that on the entire event stream type. The event stream type contains the node shared context hierarchy and the node shared wait group for the node graceful shutdown mechanism. The event stream type contains the single inbound event channel that receives all domain events published by all node components. The event stream type contains the mapping of outbound channels representing client subscriptions as clients are subscribed and unsubscribed from the node event stream. Adding a new client subscription to the map and removing closed client subscription from the map is concurrency safe. The event stream type contains the mutex to manage concurrency safe addition and removal of client subscriptions. Each client subscription is identified by a randomly generated number. The mapping keeps the relation between the subscriptions identified by randomly generated numbers and corresponding outbound event stream channels. The events published to the single inbound event channel are multiplexed by the event stream type to each outbound event stream channel and further concurrently delivered to subscribed clients trough the gRPC server streamingctx context.Context
Node shared context hierarchy wg *sync.WaitGroup
Node shared wait group chEvent chan chain.Event
Node inbound event channel mtx sync.Mutex
Event streams mutex chStreams map[string]chan chain.Event
Client outbound event stream channels type EventStream struct { ctx context.Context wg *sync.WaitGroup chEvent chan chain.Event mtx sync.Mutex chStreams map[string]chan chain.Event } func NewEventStream( ctx context.Context, wg *sync.WaitGroup, cap int, ) *EventStream { return &EventStream{ ctx: ctx, wg: wg, chEvent: make(chan chain.Event, cap), chStreams: make(map[string]chan chain.Event), } } func (s *EventStream) PublishEvent(event chain.Event) { s.chEvent <- event }
- Add and remove subscription
- The addition of new clients to the node event stream and the removal of closed subscription happens concurrently to the publishing of domain events and their multiplexing to the outbound event stream channels. The mapping of client subscriptions to the corresponding outbound event stream channels is concurrency safe and is protected with the mutex. Each client subscription is identified by a randomly generated number that is the key in the mapping. When a client subscription is closed, the corresponding outbound event stream channel is closed, and the client subscription identified by the randomly generated number is removed from the mapping. This design contributes to the resilience of concurrent handling of coming and going client subscriptions, while maintaining reliable delivery of published domain events to active client subscriptions, without impacting other concurrent processes on the node
- Add client subscription
- To add a new client subscription to the node event
stream the mapping of outbound event stream channels is locked for writing, a
new outbound event stream channel is created and added to the mapping under
the randomly generated number that identifies the client subscription. The add
client subscription process
- Lock the mapping of outbound event stream channels for writing
- Create a new outbound event stream channel
- Add the new outbound event stream channel to the mapping under the randomly
generated number that identifies the client subscription
func (s *EventStream) AddSubscriber(sub string) chan chain.Event { s.mtx.Lock() defer s.mtx.Unlock() chStream := make(chan chain.Event) s.chStreams[sub] = chStream fmt.Printf("<~> Stream: %v\n", sub) return chStream }
- Remove client subscription
- To remove a closed client subscription from the
node event stream the mapping of outbound event stream channels is locked for
writing, the corresponding outbound event stream channel is located under the
client subscription identifier. Then the outbound event stream channel is
closed and the client subscription is removed from the mapping of active
client subscriptions. The remove client subscription process
- Lock the mapping of outbound event stream channels for writing
- Locate the outbound event stream channel in the mapping of active client subscriptions using the subscription identifier as a key
- Close the outbound event stream channel
- Remove the client subscription from the mapping of active client
subscriptions
func (s *EventStream) RemoveSubscriber(sub string) { s.mtx.Lock() defer s.mtx.Unlock() chStream, exist := s.chStreams[sub] if exist { close(chStream) delete(s.chStreams, sub) fmt.Printf("<~> Unsubscribe: %v\n", sub) } }
- Multiplexing events to outbound channels
- Each domain event published by any
node component is received on the single inbound event channel. Every domain
event is forwarded to all active client subscriptions by multiplexing the
single inbound event channel to all outbound event stream channels
representing active client subscriptions. The multiplexing of the node event
stream is fully integrated with the node graceful shutdown mechanism. When the
node shared context is canceled, all active client subscriptions are closed
and removed from the mapping of active client subscriptions. When a new domain
event is published to the inbound event channel, the domain event is forwarded
to all outbound event stream channels of all active client subscriptions. The
process of multiplexing events to outbound channels
- Combine the node shared context cancellation channel with the node inbound
event channel
- When the node shared context hierarchy is canceled, close all active client subscriptions and stop forwarding domain events to subscribed clients
- When a new domain event is published by any node component through the event publisher interface, forward the domain event to all active client subscriptions
func (s *EventStream) StreamEvents() { defer s.wg.Done() for { select { case <- s.ctx.Done(): for sub := range s.chStreams { s.RemoveSubscriber(sub) } return case event := <- s.chEvent: for _, chStream := range s.chStreams { chStream <- event } } } }
- Combine the node shared context cancellation channel with the node inbound
event channel
The gRPC Node
service provides the StreamSubscribe
method to let clients to
subscribe to the node provided stream of domain events optionally specifying a
subset of event types of interest. The domain events published by the node
components are delivered to subscribed clients through the gRPC server
streaming. The interface of the service
message StreamSubscribeReq {
repeated uint64 EventTypes = 1;
}
message StreamSubscribeRes {
bytes Event = 1;
}
service Node {
rpc StreamSubscribe(StreamSubscribeReq) returns (stream StreamSubscribeRes);
}
The implementation of the StreamSubscribe
method
- Generate a random identifier for each new client subscription
- Add the new client subscription to the mapping of active client subscriptions
- Create the outbound event stream channel for the client subscription
- Defer removal of the client subscription from the mapping of active client subscriptions when the subscription is closed by the client
- Combine the cancellation channel of the node shared context hierarchy with the
outbound event stream channel
- When the node shared context hierarchy is canceled, stop forwarding domain events to the client
- When a new domain event is published, check if the event type is requested by the client, and, if requested, encode and send the domain event to the subscribed client through the gRPC server stream
func (s *NodeSrv) StreamSubscribe(
req *StreamSubscribeReq, stream grpc.ServerStreamingServer[StreamSubscribeRes],
) error {
sub := fmt.Sprint(rand.Intn(999999))
chStream := s.evStreamer.AddSubscriber(sub)
defer s.evStreamer.RemoveSubscriber(sub)
for {
select {
case <- stream.Context().Done():
return nil
case event, open := <- chStream:
if !open {
return nil
}
if slices.Contains(req.EventTypes, uint64(0)) ||
slices.Contains(req.EventTypes, uint64(event.Type)) {
jev, err := json.Marshal(event)
if err != nil {
fmt.Println(err)
continue
}
res := &StreamSubscribeRes{Event: jev}
err = stream.Send(res)
if err != nil {
return status.Errorf(codes.Internal, err.Error())
}
}
}
}
}
The TestStreamSubscribe
testing process
- Create and start the event stream on the node
- Set up the gRPC server and client
- Create the gRPC node client
- Call the
StreamSubscribe
method to subscribe to the node event stream and establish the gRPC server stream of domain events - Start publishing domain events to the node event stream through the event publisher interface
- Start consuming events from the gRPC server stream of domain events. For each
received domain event
- Decode the received domain event
- Verify that the type and the action of the domain event are correct
go test -v -cover -coverprofile=coverage.cov ./... -run StreamSubscribe
The TestEventStream
testing process
- Set up the bootstrap node
- Create the peer discovery without starting for the bootstrap node
- Initialize the state on the bootstrap node by creating the genesis
- Create and start the block relay for the bootstrap node
- Re-create the authority account from the genesis to sign blocks
- Create and start the block proposer on the bootstrap node
- Create and start the event stream on the bootstrap node
- Start the gRPC server on the bootstrap node
- Wait for the gRPC server of the bootstrap node to start
- Get the initial owner account and its balance from the genesis
- Re-create the initial owner account from the genesis
- Sign and send several signed transactions to the bootstrap node
- Set up the client that subscribes to the node event stream
- Set up a gRPC client connection with the bootstrap node
- Create the gRPC node client
- Call the
StreamSubscribe
method to subscribe to the node event stream and establish the gRPC server stream of domain events - Define the expected events to receive after the successful block proposal and the successful block confirmation
- Start consuming domain events from the gRPC server stream of domain events.
For each received event
- Decode the received domain event
- Verify that the type and the action of the domain event are correct
go test -v -cover -coverprofile=coverage.cov ./... -run EventStream
The gRPC StreamSubscribe
method is exposed through the CLI. Subscribe the
client to the node event stream and consume domain events
- Initialize the blockchain by starting the bootstrap node with parameters for
the blockchain initial configuration
set boot localhost:1122 set authpass password set ownerpass password rm -rf .keystore* .blockstore* # cleanup if necessary ./bcn node start --node $boot --bootstrap --authpass $authpass \ --ownerpass $ownerpass --balance 1000
- Create and persist a new account to the local key store of the bootstrap node
(in a new terminal)
./bcn account create --node $boot --ownerpass $ownerpass # acc 0a6c57d451f561d6baefe35bba47f8dd682b31da27f0dfdedc646648ea5d12ba
- Subscribe the client to the node event stream (in a new terminal)
--node
specifies the node address where the client subscribes--events
specifies the event types of interest. Supported values areall
,blk
,tx
./bcn node subscribe --node $boot --events blk,tx # <~> blk validated # blk 2: 12b381b -> 50de747 mrk 78d77fd # tx e67188b: 66d6141 -> 0a6c57d 2 2 # tx c23ecff: 0a6c57d -> 66d6141 1 2 # <~> tx validated # tx e67188b: 66d6141 -> 0a6c57d 2 2 # <~> tx validated # tx c23ecff: 0a6c57d -> 66d6141 1 2
- Define a shell function to create, sign, and send a transaction
function txSignAndSend -a node from to value ownerpass set tx (./bcn tx sign --node $node --from $from --to $to --value $value \ --ownerpass $ownerpass) echo SigTx $tx ./bcn tx send --node $node --sigtx $tx end
- Create, sign, and send a transaction transferring funds between the initial
owner account from the genesis and the new account (in a new terminal)
set acc1 66d614174909403746df7c3222cd74ca386995e4de11cfc99ca1efe548d33105 set acc2 0a6c57d451f561d6baefe35bba47f8dd682b31da27f0dfdedc646648ea5d12ba txSignAndSend $boot $acc1 $acc2 2 $ownerpass # SigTx {"from":"66d614174909403746df7c3222cd74ca386995e4de11cfc99ca1efe548d33105","to":"0a6c57d451f561d6baefe35bba47f8dd682b31da27f0dfdedc646648ea5d12ba","value":2,"nonce":2,"time":"2024-11-09T21:57:02.679527351+01:00","sig":"F09AtLMjFMBrz0D3IXZNx60tyHRD9+Ko8tqC4PMGYFwNMqNJ3oBlpNh4UFRBitwpjIoyV7OYPoGwW4lSaaHD6QA="} # tx e67188b8056a0455e9088b739fd2c5649a4c8f50633b04241bab36cc75a769e2 txSignAndSend $boot $acc2 $acc1 1 $ownerpass # SigTx {"from":"0a6c57d451f561d6baefe35bba47f8dd682b31da27f0dfdedc646648ea5d12ba","to":"66d614174909403746df7c3222cd74ca386995e4de11cfc99ca1efe548d33105","value":1,"nonce":2,"time":"2024-11-09T21:57:02.722528306+01:00","sig":"ckaWSJcOr4Qm0R+mE7zDWejAMmOKucpQY38hzzewb7Q7/zFzxIslG+WaHlUdnmFPkvmtzBFQ4fOfQvlMfMxf0gE="} # tx c23ecff0405ddb91aa8dc03c1f8f60c82d319b0266f97afb473ebd5f3c5db70b
- Wait for the the transactions to be validated, the new block to be proposed, and the proposed block to be confirmed
- Confirm the delivery of the domain events of the confirmed block and the confirmed transactions to the subscribed client