Skip to content

Commit

Permalink
feat: add publishers and upgrade GoAkt to 3.0.0 (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Feb 17, 2025
1 parent 6f217d5 commit 6fa3384
Show file tree
Hide file tree
Showing 80 changed files with 5,338 additions and 577 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,3 @@ jobs:
files: ./coverage.out # optional
fail_ci_if_error: false # optional (default = false)
verbose: false # optional (default = false)
# - uses: go-semantic-release/action@v1
# id: semver
# with:
# github-token: ${{ secrets.GITHUB_TOKEN }}
# allow-initial-development-versions: true
# force-bump-patch-version: true
74 changes: 74 additions & 0 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"

on:
push:
branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ main ]
schedule:
- cron: '28 20 * * 0'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write

strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://git.io/codeql-language-support

steps:
- name: Checkout repository
uses: actions/checkout@v4

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main

# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v3

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl

# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language

#- run: |
# make bootstrap
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ issues:
- revive

linters-settings:
gosec:
excludes:
- G115
misspell:
locale: US
ignore-words:
Expand Down
6 changes: 5 additions & 1 deletion Earthfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
VERSION 0.8

FROM tochemey/docker-go:1.22.0-3.0.0
FROM tochemey/docker-go:1.23.4-5.1.1

RUN go install github.com/ory/go-acc@latest
# install vektra/mockery
RUN go install github.com/vektra/mockery/[email protected]

protogen:
# copy the proto files to generate
Expand Down Expand Up @@ -92,6 +94,8 @@ mock:
# generate the mocks
RUN mockery --dir persistence --all --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/persistence --case snake
RUN mockery --dir offsetstore --name OffsetStore --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/offsetstore --case snake
RUN mockery --dir . --name EventPublisher --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/ego --case snake
RUN mockery --dir . --name StatePublisher --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/ego --case snake


SAVE ARTIFACT ./mocks mocks AS LOCAL mocks
2 changes: 1 addition & 1 deletion behavior.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022-2025 Arsene Tochemey Gandote
* Copyright (c) 2023-2025 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down
22 changes: 12 additions & 10 deletions durable_state_actor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022-2025 Arsene Tochemey Gandote
* Copyright (c) 2023-2025 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -30,8 +30,8 @@ import (
"math"
"time"

"github.com/tochemey/goakt/v2/actors"
"github.com/tochemey/goakt/v2/goaktpb"
goakt "github.com/tochemey/goakt/v3/actor"
"github.com/tochemey/goakt/v3/goaktpb"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
Expand All @@ -55,11 +55,11 @@ type durableStateActor struct {
currentVersion uint64
lastCommandTime time.Time
eventsStream eventstream.Stream
actorSystem actors.ActorSystem
actorSystem goakt.ActorSystem
}

// implements the actors.Actor interface
var _ actors.Actor = (*durableStateActor)(nil)
// implements the goakt.Actor interface
var _ goakt.Actor = (*durableStateActor)(nil)

// newDurableStateActor creates an instance of actor provided the DurableStateBehavior
func newDurableStateActor(behavior DurableStateBehavior, stateStore persistence.StateStore, eventsStream eventstream.Stream) *durableStateActor {
Expand All @@ -81,7 +81,7 @@ func (entity *durableStateActor) PreStart(ctx context.Context) error {
}

// Receive processes any message dropped into the actor mailbox.
func (entity *durableStateActor) Receive(ctx *actors.ReceiveContext) {
func (entity *durableStateActor) Receive(ctx *goakt.ReceiveContext) {
switch command := ctx.Message().(type) {
case *goaktpb.PostStart:
entity.actorSystem = ctx.ActorSystem()
Expand Down Expand Up @@ -125,7 +125,7 @@ func (entity *durableStateActor) recoverFromStore(ctx context.Context) error {
}

// processCommand processes the incoming command
func (entity *durableStateActor) processCommand(receiveContext *actors.ReceiveContext, command Command) {
func (entity *durableStateActor) processCommand(receiveContext *goakt.ReceiveContext, command Command) {
ctx := receiveContext.Context()
newState, newVersion, err := entity.HandleCommand(ctx, command, entity.currentVersion, entity.currentState)
if err != nil {
Expand Down Expand Up @@ -153,7 +153,7 @@ func (entity *durableStateActor) processCommand(receiveContext *actors.ReceiveCo
}

// sendStateReply sends a state reply message
func (entity *durableStateActor) sendStateReply(ctx *actors.ReceiveContext) {
func (entity *durableStateActor) sendStateReply(ctx *goakt.ReceiveContext) {
state, _ := anypb.New(entity.currentState)
ctx.Response(&egopb.CommandReply{
Reply: &egopb.CommandReply_StateReply{
Expand All @@ -168,7 +168,7 @@ func (entity *durableStateActor) sendStateReply(ctx *actors.ReceiveContext) {
}

// sendErrorReply sends an error as a reply message
func (entity *durableStateActor) sendErrorReply(ctx *actors.ReceiveContext, err error) {
func (entity *durableStateActor) sendErrorReply(ctx *goakt.ReceiveContext, err error) {
ctx.Response(&egopb.CommandReply{
Reply: &egopb.CommandReply_ErrorReply{
ErrorReply: &egopb.ErrorReply{
Expand Down Expand Up @@ -211,6 +211,8 @@ func (entity *durableStateActor) persistStateAndPublish(ctx context.Context) err
shardNumber := entity.actorSystem.GetPartition(entity.ID())
topic := fmt.Sprintf(statesTopic, shardNumber)

entity.actorSystem.Logger().Debugf("publishing durableState to topic: %s", topic)

durableState := &egopb.DurableState{
PersistenceId: entity.ID(),
VersionNumber: entity.currentVersion,
Expand Down
44 changes: 22 additions & 22 deletions durable_state_actor_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022-2025 Arsene Tochemey Gandote
* Copyright (c) 2023-2025 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -32,8 +32,8 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tochemey/goakt/v2/actors"
"github.com/tochemey/goakt/v2/log"
goakt "github.com/tochemey/goakt/v3/actor"
"github.com/tochemey/goakt/v3/log"
"google.golang.org/protobuf/proto"

"github.com/tochemey/ego/v3/egopb"
Expand All @@ -47,10 +47,10 @@ func TestDurableStateBehavior(t *testing.T) {
t.Run("with state reply", func(t *testing.T) {
ctx := context.TODO()
// create an actor system
actorSystem, err := actors.NewActorSystem("TestActorSystem",
actors.WithPassivationDisabled(),
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(3))
actorSystem, err := goakt.NewActorSystem("TestActorSystem",
goakt.WithPassivationDisabled(),
goakt.WithLogger(log.DiscardLogger),
goakt.WithActorInitMaxRetries(3))
require.NoError(t, err)
assert.NotNil(t, actorSystem)

Expand Down Expand Up @@ -80,7 +80,7 @@ func TestDurableStateBehavior(t *testing.T) {

command = &testpb.CreateAccount{AccountBalance: 500.00}
// send the command to the actor
reply, err := actors.Ask(ctx, pid, command, 5*time.Second)
reply, err := goakt.Ask(ctx, pid, command, 5*time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand All @@ -107,7 +107,7 @@ func TestDurableStateBehavior(t *testing.T) {
AccountId: persistenceID,
Balance: 250,
}
reply, err = actors.Ask(ctx, pid, command, 5*time.Second)
reply, err = goakt.Ask(ctx, pid, command, 5*time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand Down Expand Up @@ -143,10 +143,10 @@ func TestDurableStateBehavior(t *testing.T) {
ctx := context.TODO()

// create an actor system
actorSystem, err := actors.NewActorSystem("TestActorSystem",
actors.WithPassivationDisabled(),
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(3))
actorSystem, err := goakt.NewActorSystem("TestActorSystem",
goakt.WithPassivationDisabled(),
goakt.WithLogger(log.DiscardLogger),
goakt.WithActorInitMaxRetries(3))
require.NoError(t, err)
assert.NotNil(t, actorSystem)

Expand Down Expand Up @@ -182,7 +182,7 @@ func TestDurableStateBehavior(t *testing.T) {

command = &testpb.CreateAccount{AccountBalance: 500.00}
// send the command to the actor
reply, err := actors.Ask(ctx, pid, command, time.Second)
reply, err := goakt.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand All @@ -209,7 +209,7 @@ func TestDurableStateBehavior(t *testing.T) {
AccountId: "different-id",
Balance: 250,
}
reply, err = actors.Ask(ctx, pid, command, time.Second)
reply, err = goakt.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand All @@ -231,10 +231,10 @@ func TestDurableStateBehavior(t *testing.T) {
})
t.Run("with state recovery from state store", func(t *testing.T) {
ctx := context.TODO()
actorSystem, err := actors.NewActorSystem("TestActorSystem",
actors.WithPassivationDisabled(),
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(3),
actorSystem, err := goakt.NewActorSystem("TestActorSystem",
goakt.WithPassivationDisabled(),
goakt.WithLogger(log.DiscardLogger),
goakt.WithActorInitMaxRetries(3),
)
require.NoError(t, err)
assert.NotNil(t, actorSystem)
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestDurableStateBehavior(t *testing.T) {

command = &testpb.CreateAccount{AccountBalance: 500.00}

reply, err := actors.Ask(ctx, pid, command, time.Second)
reply, err := goakt.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand All @@ -298,7 +298,7 @@ func TestDurableStateBehavior(t *testing.T) {
AccountId: persistenceID,
Balance: 250,
}
reply, err = actors.Ask(ctx, pid, command, time.Second)
reply, err = goakt.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestDurableStateBehavior(t *testing.T) {

// fetch the current state
command = &egopb.GetStateCommand{}
reply, err = actors.Ask(ctx, pid, command, time.Second)
reply, err = goakt.Ask(ctx, pid, command, time.Second)
require.NoError(t, err)
require.NotNil(t, reply)
require.IsType(t, new(egopb.CommandReply), reply)
Expand Down
Loading

0 comments on commit 6fa3384

Please sign in to comment.