Skip to content

Commit 666a7c3

Browse files
committed
add dynamodb state store
1 parent 4c39445 commit 666a7c3

File tree

1 file changed

+141
-0
lines changed

1 file changed

+141
-0
lines changed

Diff for: plugins/statestore/dynamodb/dynamodb.go

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package dynamodb
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/aws/aws-sdk-go-v2/aws"
8+
"github.com/aws/aws-sdk-go-v2/config"
9+
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
10+
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
11+
12+
"github.com/tochemey/ego/v3/egopb"
13+
"github.com/tochemey/ego/v3/persistence"
14+
"google.golang.org/protobuf/proto"
15+
)
16+
17+
// No sort key is needed because we are only storing the latest state
18+
type StateItem struct {
19+
PersistenceID string // Partition key
20+
VersionNumber uint64
21+
StatePayload []byte
22+
StateManifest string
23+
Timestamp int64
24+
ShardNumber uint64
25+
}
26+
27+
const (
28+
tableName = "states_store"
29+
)
30+
31+
// DynamoDurableStore implements the DurableStore interface
32+
// and helps persist states in a DynamoDB
33+
type DynamoDurableStore struct {
34+
client *dynamodb.Client
35+
}
36+
37+
// enforce interface implementation
38+
var _ persistence.StateStore = (*DynamoDurableStore)(nil)
39+
40+
func NewStateStore() *DynamoDurableStore {
41+
cfg, err := config.LoadDefaultConfig(context.Background())
42+
if err != nil {
43+
return nil
44+
}
45+
46+
return &DynamoDurableStore{
47+
client: dynamodb.NewFromConfig(cfg),
48+
}
49+
}
50+
51+
// Connect connects to the journal store
52+
// No connection is needed because the client is stateless
53+
func (d DynamoDurableStore) Connect(ctx context.Context) error {
54+
return nil
55+
}
56+
57+
// Disconnect disconnect the journal store
58+
// There is no need to disconnect because the client is stateless
59+
func (DynamoDurableStore) Disconnect(ctx context.Context) error {
60+
return nil
61+
}
62+
63+
// Ping verifies a connection to the database is still alive, establishing a connection if necessary.
64+
// There is no need to ping because the client is stateless
65+
func (d DynamoDurableStore) Ping(ctx context.Context) error {
66+
_, err := d.client.ListTables(ctx, &dynamodb.ListTablesInput{})
67+
if err != nil {
68+
return fmt.Errorf("failed to fetch tables in the dynamodb: %w", err)
69+
}
70+
return nil
71+
}
72+
73+
// WriteState persist durable state for a given persistenceID.
74+
func (d DynamoDurableStore) WriteState(ctx context.Context, state *egopb.DurableState) error {
75+
76+
bytea, _ := proto.Marshal(state.GetResultingState())
77+
manifest := string(state.GetResultingState().ProtoReflect().Descriptor().FullName())
78+
79+
// Define the item to upsert
80+
item := map[string]types.AttributeValue{
81+
"PersistenceID": &types.AttributeValueMemberS{Value: state.GetPersistenceId()}, // Partition key
82+
"StatePayload": &types.AttributeValueMemberB{Value: bytea},
83+
"StateManifest": &types.AttributeValueMemberS{Value: manifest},
84+
"Timestamp": &types.AttributeValueMemberS{Value: string(state.GetTimestamp())},
85+
}
86+
87+
_, err := d.client.PutItem(ctx, &dynamodb.PutItemInput{
88+
TableName: aws.String(tableName),
89+
Item: item,
90+
})
91+
if err != nil {
92+
return fmt.Errorf("failed to upsert state into the dynamodb: %w", err)
93+
}
94+
95+
return nil
96+
}
97+
98+
// GetLatestState fetches the latest durable state
99+
func (d DynamoDurableStore) GetLatestState(ctx context.Context, persistenceID string) (*egopb.DurableState, error) {
100+
// Get criteria
101+
key := map[string]types.AttributeValue{
102+
"PersistenceID": &types.AttributeValueMemberS{Value: persistenceID},
103+
}
104+
105+
// Perform the GetItem operation
106+
resp, err := d.client.GetItem(ctx, &dynamodb.GetItemInput{
107+
TableName: aws.String(tableName),
108+
Key: key,
109+
})
110+
if err != nil {
111+
return nil, fmt.Errorf("failed to fetch the latest state from the dynamodb: %w", err)
112+
}
113+
114+
// Check if item exists
115+
if resp.Item == nil {
116+
return nil, nil
117+
}
118+
119+
item := &StateItem{
120+
PersistenceID: persistenceID,
121+
VersionNumber: parseDynamoUint64(resp.Item["VersionNumber"]),
122+
StatePayload: resp.Item["StatePayload"].(*types.AttributeValueMemberB).Value,
123+
StateManifest: resp.Item["StateManifest"].(*types.AttributeValueMemberS).Value,
124+
Timestamp: parseDynamoInt64(resp.Item["Timestamp"]),
125+
ShardNumber: parseDynamoUint64(resp.Item["ShardNumber"]),
126+
}
127+
128+
// unmarshal the event and the state
129+
state, err := toProto(item.StateManifest, item.StatePayload)
130+
if err != nil {
131+
return nil, fmt.Errorf("failed to unmarshal the durable state: %w", err)
132+
}
133+
134+
return &egopb.DurableState{
135+
PersistenceId: persistenceID,
136+
VersionNumber: item.VersionNumber,
137+
ResultingState: state,
138+
Timestamp: item.Timestamp,
139+
Shard: item.ShardNumber,
140+
}, nil
141+
}

0 commit comments

Comments
 (0)