Skip to content

Commit

Permalink
feat: add internal api
Browse files Browse the repository at this point in the history
  • Loading branch information
lekotros committed Aug 29, 2022
1 parent 4769148 commit e570fbd
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 0 deletions.
26 changes: 26 additions & 0 deletions internal/dir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package internal

import (
"os"
"strings"
)

func DirExpand(v string) string {
vs := strings.Split(v, "/")

for i, vp := range vs {
if vp == "" {
continue
}
if vp[0] == '$' {
vs[i] = os.Getenv(vp[1:])
}
if vp[0] == '~' {
if dir, err := os.UserHomeDir(); err == nil {
vs[i] = dir
}
}
}

return strings.Join(vs, "/")
}
117 changes: 117 additions & 0 deletions internal/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package internal

import (
"sync"
"time"
)

type EventType string

const (
EventTypeLog EventType = "LOG"
EventTypeValidationStart EventType = "VALIDATION_START"
EventTypeValidationStop EventType = "VALIDATION_STOP"
EventTypeScriptStart EventType = "SCRIPT_START"
EventTypeScriptStop EventType = "SCRIPT_STOP"
EventTypeWorkerStart EventType = "WORKER_START"
EventTypeWorkerStop EventType = "WORKER_STOP"
)

type Event struct {
ContextID string `json:"c"`
Sequence int `json:"s"`
Type EventType `json:"t"`
Timestamp time.Time `json:"ts"`
Data map[string]interface{} `json:"d,omitempty"`
}

type EventHandler func(event Event)

type eventHandler struct {
id int
handler EventHandler
}

type Emitter struct {
sync.Mutex
wg sync.WaitGroup
id string
queue chan Event
handlers []eventHandler
inc int
seq int
}

func (e *Emitter) Subscribe(handler EventHandler) int {
e.Lock()
defer e.Unlock()

id := e.inc
e.handlers = append(e.handlers, eventHandler{
id: id,
handler: handler,
})
e.inc += 1

return id
}

func (e *Emitter) Unsubscribe(id int) {
e.Lock()
defer e.Unlock()

for i, h := range e.handlers {
if h.id == id {
e.handlers = append(e.handlers[:i], e.handlers[i+1:]...)
return
}
}
}

func (e *Emitter) Emit(t EventType, data map[string]interface{}) Event {
e.Lock()
defer e.Unlock()

event := Event{
ContextID: e.id,
Sequence: e.seq,
Type: t,
Timestamp: time.Now(),
Data: data,
}
e.wg.Add(1)
e.queue <- event
e.seq += 1

return event
}

func (e *Emitter) Start() {
for {
event, ok := <-e.queue
if !ok {
return // channel closed
}

for _, h := range e.handlers {
h.handler(event)
}

e.wg.Done()
}
}

func (e *Emitter) Close() {
close(e.queue)
e.wg.Wait()
}

func NewEmitter(id string) *Emitter {
return &Emitter{
id: id,
queue: make(chan Event),
handlers: []eventHandler{},
inc: 0,
seq: 0,
}
}
57 changes: 57 additions & 0 deletions internal/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package internal

import (
"runtime"
"sync"
)

type Task func(int) Result

type Queue struct {
concurrency int
tasks chan Task
}

func (q *Queue) Add(task Task) {
q.tasks <- task
}

func (q *Queue) Run() []Result {
var wg sync.WaitGroup
n := len(q.tasks)
res := []Result{}

wg.Add(n)

for i := 0; i < q.concurrency; i++ {
go func(id int) {
for task := range q.tasks {
res = append(res, task(id))
wg.Done()
}
}(i + 1)
}

wg.Wait()

return res
}

func NewQueue(n, size int) *Queue {
var tasks chan Task
if size > 0 {
tasks = make(chan Task, size)
} else {
tasks = make(chan Task)
}

concurrency := n
if n <= 0 {
concurrency = runtime.NumCPU()
}

return &Queue{
concurrency: concurrency,
tasks: tasks,
}
}
38 changes: 38 additions & 0 deletions internal/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package internal

type M map[string]interface{}

type ValueHandler func(v interface{}) interface{}
type ErrorHandler func(err error) interface{}

type Result struct {
value interface{}
err error
}

func (r Result) IsErr() bool { return r.err != nil }

func (r Result) Get() interface{} { return r.value }

func (r Result) GetOrElse(op ErrorHandler) interface{} {
if r.err != nil {
return op(r.err)
}

return r.value
}

func (r Result) Map(op ValueHandler) Result {
if r.err == nil {
r.value = op(r.value)
}

return r
}

func NewResult(v interface{}, err error) Result {
return Result{
value: v,
err: err,
}
}

0 comments on commit e570fbd

Please sign in to comment.