Skip to content

Commit

Permalink
feat: support contract listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
SupernaviX committed Feb 7, 2025
1 parent 179ebf4 commit 4c550a2
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 20 deletions.
186 changes: 166 additions & 20 deletions internal/blockchain/cardano/cardano.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Cardano struct {
callbacks common.BlockchainCallbacks
client *resty.Client
streams *streamManager
streamID string
wsconn wsclient.WSClient
cardanoconnectConf config.Section
subs common.FireflySubscriptions
Expand All @@ -67,8 +68,10 @@ type ffiMethodAndErrors struct {
}

type cardanoWSCommandPayload struct {
Type string `json:"type"`
Topic string `json:"topic,omitempty"`
Type string `json:"type"`
Topic string `json:"topic,omitempty"`
BatchNumber int64 `json:"batchNumber,omitempty"`
Message string `json:"message,omitempty"`
}

type Location struct {
Expand Down Expand Up @@ -136,6 +139,7 @@ func (c *Cardano) Init(ctx context.Context, cancelCtx context.CancelFunc, conf c
}

log.L(c.ctx).Infof("Event stream: %s (topic=%s)", stream.ID, c.pluginTopic)
c.streamID = stream.ID

go c.eventLoop()

Expand Down Expand Up @@ -262,7 +266,11 @@ func (c *Cardano) ParseInterface(ctx context.Context, method *fftypes.FFIMethod,
}

func (c *Cardano) NormalizeContractLocation(ctx context.Context, ntype blockchain.NormalizeType, location *fftypes.JSONAny) (result *fftypes.JSONAny, err error) {
return location, nil
parsed, err := c.parseContractLocation(ctx, location)
if err != nil {
return nil, err
}
return c.encodeContractLocation(ctx, parsed)
}

func (c *Cardano) CheckOverlappingLocations(ctx context.Context, left *fftypes.JSONAny, right *fftypes.JSONAny) (bool, error) {
Expand Down Expand Up @@ -297,6 +305,10 @@ func (c *Cardano) parseContractLocation(ctx context.Context, location *fftypes.J
}

func (c *Cardano) encodeContractLocation(ctx context.Context, location *Location) (result *fftypes.JSONAny, err error) {
location.Address, err = formatCardanoAddress(ctx, location.Address)
if err != nil {
return nil, err
}
normalized, err := json.Marshal(location)
if err == nil {
result = fftypes.JSONAnyPtrBytes(normalized)
Expand All @@ -305,15 +317,27 @@ func (c *Cardano) encodeContractLocation(ctx context.Context, location *Location
}

func (c *Cardano) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) (err error) {
return errors.New("AddContractListener not supported")
subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID)
firstEvent := string(core.SubOptsFirstEventNewest)
if listener.Options != nil {
firstEvent = listener.Options.FirstEvent
}

result, err := c.streams.createListener(ctx, c.streamID, subName, firstEvent, &listener.Filters)
listener.BackendID = result.ID
return err
}

func (c *Cardano) DeleteContractListener(ctx context.Context, subscription *core.ContractListener, okNotFound bool) error {
return errors.New("DeleteContractListener not supported")
return c.streams.deleteListener(ctx, c.streamID, subscription.BackendID)
}

func (c *Cardano) GetContractListenerStatus(ctx context.Context, namespace, subID string, okNotFound bool) (found bool, detail interface{}, status core.ContractListenerStatus, err error) {
return false, nil, core.ContractListenerStatusUnknown, errors.New("GetContractListenerStatus not supported")
l, err := c.streams.getListener(ctx, c.streamID, subID)
if err != nil || l == nil {
return false, nil, core.ContractListenerStatusUnknown, err
}
return true, nil, core.ContractListenerStatusUnknown, nil
}

func (c *Cardano) GetFFIParamValidator(ctx context.Context) (fftypes.FFIParamValidator, error) {
Expand All @@ -322,11 +346,25 @@ func (c *Cardano) GetFFIParamValidator(ctx context.Context) (fftypes.FFIParamVal
}

func (c *Cardano) GenerateEventSignature(ctx context.Context, event *fftypes.FFIEventDefinition) (string, error) {
return "", errors.New("GenerateEventSignature not supported")
params := []string{}
for _, param := range event.Params {
params = append(params, param.Schema.JSONObject().GetString("type"))
}
return fmt.Sprintf("%s(%s)", event.Name, strings.Join(params, ",")), nil
}

func (c *Cardano) GenerateEventSignatureWithLocation(ctx context.Context, event *fftypes.FFIEventDefinition, location *fftypes.JSONAny) (string, error) {
return "", errors.New("GenerateEventSignatureWithLocation not supported")
eventSignature, _ := c.GenerateEventSignature(ctx, event)

if location == nil {
return fmt.Sprintf("*:%s", eventSignature), nil
}

parsed, err := c.parseContractLocation(ctx, location)
if err != nil {
return "", err
}
return fmt.Sprintf("%s:%s", parsed.Address, eventSignature), nil
}

func (c *Cardano) GenerateErrorSignature(ctx context.Context, event *fftypes.FFIErrorDefinition) string {
Expand Down Expand Up @@ -449,19 +487,45 @@ func (c *Cardano) eventLoop() {
}
switch msgTyped := msgParsed.(type) {
case []interface{}:
// TODO: handle this
ack, _ := json.Marshal(&cardanoWSCommandPayload{
Type: "ack",
Topic: c.pluginTopic,
})
err = c.wsconn.Send(ctx, ack)
err = c.handleMessageBatch(ctx, 0, msgTyped)
if err == nil {
ack, _ := json.Marshal(&cardanoWSCommandPayload{
Type: "ack",
Topic: c.pluginTopic,
})
err = c.wsconn.Send(ctx, ack)
}
case map[string]interface{}:
var receipt common.BlockchainReceiptNotification
_ = json.Unmarshal(msgBytes, &receipt)

err := common.HandleReceipt(ctx, "", c, &receipt, c.callbacks)
if err != nil {
l.Errorf("Failed to process receipt: %+v", msgTyped)
isBatch := false
if batchNumber, ok := msgTyped["batchNumber"].(float64); ok {
if events, ok := msgTyped["events"].([]interface{}); ok {
// FFTM delivery with a batch number to use in the ack
isBatch = true
err = c.handleMessageBatch(ctx, (int64)(batchNumber), events)
// Errors processing messages are converted into nacks
ackOrNack := &cardanoWSCommandPayload{
Topic: c.pluginTopic,
BatchNumber: int64(batchNumber),
}
if err == nil {
ackOrNack.Type = "ack"
} else {
log.L(ctx).Errorf("Rejecting batch due error: %s", err)
ackOrNack.Type = "error"
ackOrNack.Message = err.Error()
}
b, _ := json.Marshal(&ackOrNack)
err = c.wsconn.Send(ctx, b)
}
}
if !isBatch {
var receipt common.BlockchainReceiptNotification
_ = json.Unmarshal(msgBytes, &receipt)

err := common.HandleReceipt(ctx, "", c, &receipt, c.callbacks)
if err != nil {
l.Errorf("Failed to process receipt: %+v", msgTyped)
}
}
default:
l.Errorf("Message unexpected: %+v", msgTyped)
Expand All @@ -477,6 +541,88 @@ func (c *Cardano) eventLoop() {
}
}

func (c *Cardano) handleMessageBatch(ctx context.Context, batchID int64, messages []interface{}) error {
events := make(common.EventsToDispatch)
count := len(messages)
for i, msgI := range messages {
msgMap, ok := msgI.(map[string]interface{})
if !ok {
log.L(ctx).Errorf("Message cannot be parsed as JSON: %+v", msgI)
return nil
}
msgJSON := fftypes.JSONObject(msgMap)

signature := msgJSON.GetString("signature")

logger := log.L(ctx)
logger.Infof("[Cardano:%d:%d/%d]: '%s'", batchID, i+1, count, signature)
logger.Tracef("Message: %+v", msgJSON)
if err := c.processContractEvent(ctx, events, msgJSON); err != nil {
return err
}
}

// Dispatch all the events from this patch that were successfully parsed and routed to namespaces
// (could be zero - that's ok)
return c.callbacks.DispatchBlockchainEvents(ctx, events)
}

func (c *Cardano) processContractEvent(ctx context.Context, events common.EventsToDispatch, msgJSON fftypes.JSONObject) error {
listenerID := msgJSON.GetString("listenerId")
listener, err := c.streams.getListener(ctx, c.streamID, listenerID)
if err != nil {
return err
}
namespace := common.GetNamespaceFromSubName(listener.Name)
event := c.parseBlockchainEvent(ctx, msgJSON)
if event != nil {
c.callbacks.PrepareBlockchainEvent(ctx, events, namespace, &blockchain.EventForListener{
Event: event,
ListenerID: listenerID,
})
}
return nil
}

func (c *Cardano) parseBlockchainEvent(ctx context.Context, msgJSON fftypes.JSONObject) *blockchain.Event {
sBlockNumber := msgJSON.GetString("blockNumber")
sTransactionHash := msgJSON.GetString("transactionHash")
blockNumber := msgJSON.GetInt64("blockNumber")
txIndex := msgJSON.GetInt64("transactionIndex")
logIndex := msgJSON.GetInt64("logIndex")
dataJSON := msgJSON.GetObject("data")
signature := msgJSON.GetString("signature")
name := strings.SplitN(signature, "(", 2)[0]
timestampStr := msgJSON.GetString("timestamp")
timestamp, err := fftypes.ParseTimeString(timestampStr)
if err != nil {
log.L(ctx).Errorf("Blockchain event is not valid - missing timestamp: %+v", msgJSON)
return nil // move on
}

if sBlockNumber == "" || sTransactionHash == "" {
log.L(ctx).Errorf("Blockchain event is not valid - missing data: %+v", msgJSON)
return nil // move on
}

delete(msgJSON, "data")
return &blockchain.Event{
BlockchainTXID: sTransactionHash,
Source: c.Name(),
Name: name,
ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, txIndex, logIndex),
Output: dataJSON,
Info: msgJSON,
Timestamp: timestamp,
Location: c.buildEventLocationString(msgJSON),
Signature: signature,
}
}

func (c *Cardano) buildEventLocationString(msgJSON fftypes.JSONObject) string {
return fmt.Sprintf("address=%s", msgJSON.GetString("address"))
}

func formatCardanoAddress(ctx context.Context, key string) (string, error) {
// TODO: this could be much stricter validation
if key != "" {
Expand Down
70 changes: 70 additions & 0 deletions internal/blockchain/cardano/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package cardano

import (
"context"
"fmt"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-common/pkg/ffresty"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/pkg/core"
)

type streamManager struct {
Expand All @@ -40,6 +42,20 @@ type eventStream struct {
Timestamps bool `json:"timestamps"`
}

type listener struct {
ID string `json:"id"`
Name string `json:"name,omitempty"`
}

type filter struct {
Event eventfilter `json:"event"`
}

type eventfilter struct {
Contract string `json:"contract"`
EventPath string `json:"eventPath"`
}

func newStreamManager(client *resty.Client, batchSize, batchTimeout uint) *streamManager {
return &streamManager{
client: client,
Expand Down Expand Up @@ -112,3 +128,57 @@ func (s *streamManager) ensureEventStream(ctx context.Context, topic string) (*e
}
return s.createEventStream(ctx, topic)
}

func (s *streamManager) getListener(ctx context.Context, streamID string, listenerID string) (listener *listener, err error) {
res, err := s.client.R().
SetContext(ctx).
SetResult(&listener).
Get(fmt.Sprintf("/eventstreams/%s/listeners/%s", streamID, listenerID))
if err != nil || !res.IsSuccess() {
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgCardanoconnectRESTErr)
}
return listener, nil
}

func (s *streamManager) createListener(ctx context.Context, streamID, name, lastEvent string, filters *core.ListenerFilters) (listener *listener, err error) {
cardanoFilters := []filter{}
for _, f := range *filters {
address := f.Location.JSONObject().GetString("address")
cardanoFilters = append(cardanoFilters, filter{
Event: eventfilter{
Contract: address,
EventPath: f.Event.Name,
},
})
}

body := map[string]interface{}{
"name": name,
"type": "events",
"fromBlock": lastEvent,
"filters": cardanoFilters,
}

res, err := s.client.R().
SetContext(ctx).
SetBody(body).
SetResult(&listener).
Post(fmt.Sprintf("/eventstreams/%s/listeners", streamID))

if err != nil || !res.IsSuccess() {
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgCardanoconnectRESTErr)
}

return listener, nil
}

func (s *streamManager) deleteListener(ctx context.Context, streamID, listenerID string) error {
res, err := s.client.R().
SetContext(ctx).
Delete(fmt.Sprintf("/eventstreams/%s/listeners/%s", streamID, listenerID))

if err != nil || !res.IsSuccess() {
return ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgCardanoconnectRESTErr)
}
return nil
}

0 comments on commit 4c550a2

Please sign in to comment.