Skip to content

Commit

Permalink
feat(source): source event
Browse files Browse the repository at this point in the history
  • Loading branch information
alishazaee committed Jan 14, 2025
1 parent 6549a56 commit c03fcf8
Show file tree
Hide file tree
Showing 33 changed files with 865 additions and 168 deletions.
101 changes: 101 additions & 0 deletions cmd/scheduler/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package main

import (
"context"
"log/slog"
"os"
"os/signal"
"sync"
"syscall"

"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/logger"
"github.com/ormushq/ormus/pkg/channel"
"github.com/ormushq/ormus/pkg/channel/adapter/rabbitmqchannel"
"github.com/ormushq/ormus/scheduler"
sourceevent "github.com/ormushq/ormus/source/eventhandler"
"github.com/ormushq/ormus/source/repository/scylladb"
schrepo "github.com/ormushq/ormus/source/repository/scylladb/scheduler"
sourcescheduler "github.com/ormushq/ormus/source/scheduler"
)

// @termsOfService http://swagger.io/terms/

// @contact.name API Support
// @contact.url http://www.swagger.io/support
// @contact.email [email protected]

// @license.name Apache 2.0
// @license.url http://www.apache.org/licenses/LICENSE-2.0.html

// @securityDefinitions.apikey JWTToken
// @in header
// @name Authorization

func main() {
done := make(chan bool)
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
//----------------- Setup Logger -----------------//
fileMaxSizeInMB := 10
fileMaxAgeInDays := 30

loggercfg := logger.Config{
FilePath: "./destination/logs.json",
UseLocalTime: false,
FileMaxSizeInMB: fileMaxSizeInMB,
FileMaxAgeInDays: fileMaxAgeInDays,
}

logLevel := slog.LevelInfo
if config.C().Destination.DebugMode {
logLevel = slog.LevelDebug
}

opt := slog.HandlerOptions{
// todo should level debug be read from config?
Level: logLevel,
}
l := logger.New(loggercfg, &opt)

// use slog as default logger.
slog.SetDefault(l)

cfg := config.C()
sourceSch := SetupSourceServices(cfg, done, wg)
sch := scheduler.New(done, wg)
wg.Add(1)
go func() {

sch.Start(ctx, cfg.Source.UndeliveredEventRetransmitPeriod, sourceSch.PublishUndeliveredEvents)
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT)
<-quit
logger.L().Info("Received shutdown signal, shutting down gracefully...")
cancel()
<-done
logger.L().Info("system scheduler successfully shut down gracefully")
wg.Wait()
}

func SetupSourceServices(cfg config.Config, done chan bool, wg *sync.WaitGroup) sourcescheduler.Scheduler {

inputAdapter := rabbitmqchannel.New(done, wg, cfg.RabbitMq)
err := inputAdapter.NewChannel(cfg.Source.NewEventQueueName, channel.InputOnlyMode, cfg.Source.BufferSize, cfg.Source.MaxRetry)
if err != nil {
panic(err)
}

Publisher := sourceevent.NewPublisher(inputAdapter)

DB, err := scylladb.New(cfg.Source.ScyllaDBConfig)
if err != nil {
panic(err)
}

schRepo := schrepo.New(DB)
schSvc := sourcescheduler.New(schRepo, Publisher, cfg.Source, wg)

return schSvc
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func main() {

bufferSize := cfg.Source.BufferSize
maxRetryPolicy := cfg.Source.MaxRetry
eventName := cfg.Source.NewSourceEventName
eventName := cfg.Source.NewEventQueueName
err := otela.Configure(wg, done, otela.Config{Exporter: otela.ExporterConsole})
if err != nil {
panic(err.Error())
Expand All @@ -39,8 +39,8 @@ func main() {
go func() {
defer wg.Done()
for msg := range outputChannel {
m := encoder.DecodeNewSourceEvent(string(msg.Body))
log.Info(m.WriteKey)
m := encoder.DecodeNewEvent(string(msg.Body))
log.Info(m)

if err := msg.Ack(); err != nil {
panic(err)
Expand Down
File renamed without changes.
14 changes: 11 additions & 3 deletions cmd/source/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event
panic(err)
}

inputAdapter := rabbitmqchannel.New(done, wg, cfg.RabbitMq)
err = inputAdapter.NewChannel(cfg.Source.NewEventQueueName, channel.InputOnlyMode, cfg.Source.BufferSize, cfg.Source.MaxRetry)
if err != nil {
panic(err)
}

Publisher := sourceevent.NewPublisher(inputAdapter)

redisAdapter, err := redis.New(cfg.Redis)
if err != nil {
panic(err)
Expand All @@ -127,14 +135,14 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event

writeKeyRepo := writekeyrepo.New(redisAdapter, *ManagerAdapter)
writeKeySvc = writekey.New(&writeKeyRepo, cfg.Source)
eventHandler = *sourceevent.New(outputAdapter, writeKeySvc)
eventHandler = *sourceevent.NewConsumer(outputAdapter, writeKeySvc)

DB, err := scylladb.New(cfg.Source.ScyllaDBConfig)
if err != nil {
panic(err)
}
eventRepo := eventrepo.New(DB)
eventSvc = *eventsvc.New(eventRepo)
eventRepo := eventrepo.New(DB, Publisher)
eventSvc = *eventsvc.New(eventRepo, cfg.Source, wg)

eventValidator = eventvalidator.New(&writeKeyRepo, cfg.Source)

Expand Down
2 changes: 2 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ source:
port: 8082
network: "tcp"
write_key_validation_address: "127.0.0.1:8081"
new_event_queue_name: "new-event-received"
write_key_expiration: 120
undelivered_event_retransmit_period: 1
new_source_event_name: "new-source-event"
buffersize: 100
number_instants: 10
Expand Down
7 changes: 0 additions & 7 deletions config/test.yml

This file was deleted.

Loading

0 comments on commit c03fcf8

Please sign in to comment.