-
Notifications
You must be signed in to change notification settings - Fork 17
/
services.go
119 lines (105 loc) · 3.2 KB
/
services.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package nexnode
import (
"fmt"
"log/slog"
"github.com/nats-io/nats.go"
hs "github.com/synadia-io/nex/host-services"
"github.com/synadia-io/nex/host-services/builtins"
"github.com/synadia-io/nex/internal/models"
"go.opentelemetry.io/otel/trace"
)
const hostServiceHTTP = "http"
const hostServiceKeyValue = "kv"
const hostServiceMessaging = "messaging"
const hostServiceObjectStore = "objectstore"
// Host services server implements select functionality which is
// exposed to workloads by way of the agent which makes RPC calls
// via the internal NATS connection
type HostServices struct {
config *models.HostServicesConfig
log *slog.Logger
ncint *nats.Conn
server *hs.HostServicesServer
}
func NewHostServices(
ncint *nats.Conn,
config *models.HostServicesConfig,
log *slog.Logger,
tracer trace.Tracer,
) *HostServices {
return &HostServices{
config: config,
log: log,
ncint: ncint,
// ‼️ It cannot be overstated how important it is that the host services server
// be given the -internal- NATS connection and -not- the external/control one
//
// Sincerely,
// Someone who lost a day of troubleshooting
server: hs.NewHostServicesServer(ncint, log, tracer),
}
}
func (h *HostServices) init() error {
if httpConfig, ok := h.config.Services[hostServiceHTTP]; ok {
if httpConfig.Enabled {
http, err := builtins.NewHTTPService(h.log)
if err != nil {
h.log.Error(fmt.Sprintf("failed to initialize http host service: %s", err.Error()))
return err
} else {
h.log.Debug("initialized http host service")
}
err = h.server.AddService(hostServiceHTTP, http, httpConfig.Configuration)
if err != nil {
return err
}
}
}
if kvConfig, ok := h.config.Services[hostServiceKeyValue]; ok {
if kvConfig.Enabled {
kv, err := builtins.NewKeyValueService(h.log)
if err != nil {
h.log.Error(fmt.Sprintf("failed to initialize key/value host service: %s", err.Error()))
return err
} else {
h.log.Debug("initialized key/value host service")
}
err = h.server.AddService(hostServiceKeyValue, kv, kvConfig.Configuration)
if err != nil {
return err
}
}
}
if messagingConfig, ok := h.config.Services[hostServiceMessaging]; ok {
if messagingConfig.Enabled {
messaging, err := builtins.NewMessagingService(h.log)
if err != nil {
h.log.Error(fmt.Sprintf("failed to initialize messaging host service: %s", err.Error()))
return err
} else {
h.log.Debug("initialized messaging host service")
}
err = h.server.AddService(hostServiceMessaging, messaging, messagingConfig.Configuration)
if err != nil {
return err
}
}
}
if objectConfig, ok := h.config.Services[hostServiceObjectStore]; ok {
if objectConfig.Enabled {
object, err := builtins.NewObjectStoreService(h.log)
if err != nil {
h.log.Error(fmt.Sprintf("failed to initialize object store host service: %s", err.Error()))
return err
} else {
h.log.Debug("initialized object store host service")
}
err = h.server.AddService(hostServiceObjectStore, object, objectConfig.Configuration)
if err != nil {
return err
}
}
}
h.log.Debug("Host services configured", slog.Any("services", h.server.Services()))
return h.server.Start()
}