Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add events publishers and upgrade GoAkt to 3.0.0 #125

Merged
merged 20 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,3 @@ jobs:
files: ./coverage.out # optional
fail_ci_if_error: false # optional (default = false)
verbose: false # optional (default = false)
# - uses: go-semantic-release/action@v1
# id: semver
# with:
# github-token: ${{ secrets.GITHUB_TOKEN }}
# allow-initial-development-versions: true
# force-bump-patch-version: true
74 changes: 74 additions & 0 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"

on:
push:
branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ main ]
schedule:
- cron: '28 20 * * 0'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write

strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://git.io/codeql-language-support

steps:
- name: Checkout repository
uses: actions/checkout@v4

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main

# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v3

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl

# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language

#- run: |
# make bootstrap
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ issues:
- revive

linters-settings:
gosec:
excludes:
- G115
misspell:
locale: US
ignore-words:
Expand Down
6 changes: 5 additions & 1 deletion Earthfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
VERSION 0.8

FROM tochemey/docker-go:1.22.0-3.0.0
FROM tochemey/docker-go:1.23.4-5.1.1

RUN go install github.com/ory/go-acc@latest
# install vektra/mockery
RUN go install github.com/vektra/mockery/[email protected]

protogen:
# copy the proto files to generate
Expand Down Expand Up @@ -92,6 +94,8 @@ mock:
# generate the mocks
RUN mockery --dir persistence --all --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/persistence --case snake
RUN mockery --dir offsetstore --name OffsetStore --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/offsetstore --case snake
RUN mockery --dir . --name EventPublisher --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/ego --case snake
RUN mockery --dir . --name StatePublisher --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/ego --case snake


SAVE ARTIFACT ./mocks mocks AS LOCAL mocks
2 changes: 1 addition & 1 deletion behavior.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022-2025 Arsene Tochemey Gandote
* Copyright (c) 2023-2025 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down
22 changes: 12 additions & 10 deletions durable_state_actor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022-2025 Arsene Tochemey Gandote
* Copyright (c) 2023-2025 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -30,8 +30,8 @@ import (
"math"
"time"

"github.com/tochemey/goakt/v2/actors"
"github.com/tochemey/goakt/v2/goaktpb"
goakt "github.com/tochemey/goakt/v3/actor"
"github.com/tochemey/goakt/v3/goaktpb"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
Expand All @@ -55,11 +55,11 @@ type durableStateActor struct {
currentVersion uint64
lastCommandTime time.Time
eventsStream eventstream.Stream
actorSystem actors.ActorSystem
actorSystem goakt.ActorSystem
}

// implements the actors.Actor interface
var _ actors.Actor = (*durableStateActor)(nil)
// implements the goakt.Actor interface
var _ goakt.Actor = (*durableStateActor)(nil)

// newDurableStateActor creates an instance of actor provided the DurableStateBehavior
func newDurableStateActor(behavior DurableStateBehavior, stateStore persistence.StateStore, eventsStream eventstream.Stream) *durableStateActor {
Expand All @@ -81,7 +81,7 @@ func (entity *durableStateActor) PreStart(ctx context.Context) error {
}

// Receive processes any message dropped into the actor mailbox.
func (entity *durableStateActor) Receive(ctx *actors.ReceiveContext) {
func (entity *durableStateActor) Receive(ctx *goakt.ReceiveContext) {
switch command := ctx.Message().(type) {
case *goaktpb.PostStart:
entity.actorSystem = ctx.ActorSystem()
Expand Down Expand Up @@ -125,7 +125,7 @@ func (entity *durableStateActor) recoverFromStore(ctx context.Context) error {
}

// processCommand processes the incoming command
func (entity *durableStateActor) processCommand(receiveContext *actors.ReceiveContext, command Command) {
func (entity *durableStateActor) processCommand(receiveContext *goakt.ReceiveContext, command Command) {
ctx := receiveContext.Context()
newState, newVersion, err := entity.HandleCommand(ctx, command, entity.currentVersion, entity.currentState)
if err != nil {
Expand Down Expand Up @@ -153,7 +153,7 @@ func (entity *durableStateActor) processCommand(receiveContext *actors.ReceiveCo
}

// sendStateReply sends a state reply message
func (entity *durableStateActor) sendStateReply(ctx *actors.ReceiveContext) {
func (entity *durableStateActor) sendStateReply(ctx *goakt.ReceiveContext) {
state, _ := anypb.New(entity.currentState)
ctx.Response(&egopb.CommandReply{
Reply: &egopb.CommandReply_StateReply{
Expand All @@ -168,7 +168,7 @@ func (entity *durableStateActor) sendStateReply(ctx *actors.ReceiveContext) {
}

// sendErrorReply sends an error as a reply message
func (entity *durableStateActor) sendErrorReply(ctx *actors.ReceiveContext, err error) {
func (entity *durableStateActor) sendErrorReply(ctx *goakt.ReceiveContext, err error) {
ctx.Response(&egopb.CommandReply{
Reply: &egopb.CommandReply_ErrorReply{
ErrorReply: &egopb.ErrorReply{
Expand Down Expand Up @@ -211,6 +211,8 @@ func (entity *durableStateActor) persistStateAndPublish(ctx context.Context) err
shardNumber := entity.actorSystem.GetPartition(entity.ID())
topic := fmt.Sprintf(statesTopic, shardNumber)

entity.actorSystem.Logger().Debugf("publishing durableState to topic: %s", topic)

durableState := &egopb.DurableState{
PersistenceId: entity.ID(),
VersionNumber: entity.currentVersion,
Expand Down
44 changes: 22 additions & 22 deletions durable_state_actor_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022-2025 Arsene Tochemey Gandote
* Copyright (c) 2023-2025 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -32,8 +32,8 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tochemey/goakt/v2/actors"
"github.com/tochemey/goakt/v2/log"
goakt "github.com/tochemey/goakt/v3/actor"
"github.com/tochemey/goakt/v3/log"
"google.golang.org/protobuf/proto"

"github.com/tochemey/ego/v3/egopb"
Expand All @@ -47,10 +47,10 @@ func TestDurableStateBehavior(t *testing.T) {
t.Run("with state reply", func(t *testing.T) {
ctx := context.TODO()
// create an actor system
actorSystem, err := actors.NewActorSystem("TestActorSystem",
actors.WithPassivationDisabled(),
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(3))
actorSystem, err := goakt.NewActorSystem("TestActorSystem",
goakt.WithPassivationDisabled(),
goakt.WithLogger(log.DiscardLogger),
goakt.WithActorInitMaxRetries(3))
require.NoError(t, err)
assert.NotNil(t, actorSystem)

Expand Down Expand Up @@ -80,7 +80,7 @@ func TestDurableStateBehavior(t *testing.T) {

command = &testpb.CreateAccount{AccountBalance: 500.00}
// send the command to the actor
reply, err := actors.Ask(ctx, pid, command, 5*time.Second)
reply, err := goakt.Ask(ctx, pid, command, 5*time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand All @@ -107,7 +107,7 @@ func TestDurableStateBehavior(t *testing.T) {
AccountId: persistenceID,
Balance: 250,
}
reply, err = actors.Ask(ctx, pid, command, 5*time.Second)
reply, err = goakt.Ask(ctx, pid, command, 5*time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand Down Expand Up @@ -143,10 +143,10 @@ func TestDurableStateBehavior(t *testing.T) {
ctx := context.TODO()

// create an actor system
actorSystem, err := actors.NewActorSystem("TestActorSystem",
actors.WithPassivationDisabled(),
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(3))
actorSystem, err := goakt.NewActorSystem("TestActorSystem",
goakt.WithPassivationDisabled(),
goakt.WithLogger(log.DiscardLogger),
goakt.WithActorInitMaxRetries(3))
require.NoError(t, err)
assert.NotNil(t, actorSystem)

Expand Down Expand Up @@ -182,7 +182,7 @@ func TestDurableStateBehavior(t *testing.T) {

command = &testpb.CreateAccount{AccountBalance: 500.00}
// send the command to the actor
reply, err := actors.Ask(ctx, pid, command, time.Second)
reply, err := goakt.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand All @@ -209,7 +209,7 @@ func TestDurableStateBehavior(t *testing.T) {
AccountId: "different-id",
Balance: 250,
}
reply, err = actors.Ask(ctx, pid, command, time.Second)
reply, err = goakt.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand All @@ -231,10 +231,10 @@ func TestDurableStateBehavior(t *testing.T) {
})
t.Run("with state recovery from state store", func(t *testing.T) {
ctx := context.TODO()
actorSystem, err := actors.NewActorSystem("TestActorSystem",
actors.WithPassivationDisabled(),
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(3),
actorSystem, err := goakt.NewActorSystem("TestActorSystem",
goakt.WithPassivationDisabled(),
goakt.WithLogger(log.DiscardLogger),
goakt.WithActorInitMaxRetries(3),
)
require.NoError(t, err)
assert.NotNil(t, actorSystem)
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestDurableStateBehavior(t *testing.T) {

command = &testpb.CreateAccount{AccountBalance: 500.00}

reply, err := actors.Ask(ctx, pid, command, time.Second)
reply, err := goakt.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand All @@ -298,7 +298,7 @@ func TestDurableStateBehavior(t *testing.T) {
AccountId: persistenceID,
Balance: 250,
}
reply, err = actors.Ask(ctx, pid, command, time.Second)
reply, err = goakt.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestDurableStateBehavior(t *testing.T) {

// fetch the current state
command = &egopb.GetStateCommand{}
reply, err = actors.Ask(ctx, pid, command, time.Second)
reply, err = goakt.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand Down
Loading