Skip to content

Commit 8086c19

Browse files
committed
add more comments
1 parent eabedb6 commit 8086c19

File tree

8 files changed

+141
-50
lines changed

8 files changed

+141
-50
lines changed

cmd/mqtt-executor/log.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package main
33
import "go.uber.org/zap"
44

55
func init() {
6+
//initialise our global logger
7+
68
logger, _ := zap.NewDevelopment(
7-
zap.AddStacktrace(zap.FatalLevel),
9+
zap.AddStacktrace(zap.FatalLevel), //disable stacktrace for level lower than fatal
810
)
911
zap.ReplaceGlobals(logger)
1012
defer zap.L().Sync()

cmd/mqtt-executor/main.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ func main() {
2424
trigger.Executor = commandExecutor
2525
sensorWorker.Executor = commandExecutor
2626

27+
//reacting to signals (interrupt)
2728
signals := make(chan os.Signal, 1)
2829
defer close(signals)
2930
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
@@ -37,6 +38,7 @@ func main() {
3738
zap.L().Fatal("Error while connecting to mqtt broker: %s", zap.Error(token.Error()))
3839
}
3940

41+
//if hassio is enabled -> publish the hassio mqtt-discovery configs
4042
if *Config.HomeassistantEnable {
4143
haClient := hassio.Client{
4244
DeviceName: *Config.DeviceName,
@@ -69,6 +71,8 @@ func shutdown(client MQTT.Client) {
6971
}
7072
closeables := []closable{&statusWorker, &sensorWorker, &trigger, commandExecutor}
7173

74+
//most operating systems wait a maximum of 30 seconds
75+
7276
wg := sync.WaitGroup{}
7377
wg.Add(len(closeables))
7478
timeout := 20 * time.Second
@@ -84,5 +88,6 @@ func shutdown(client MQTT.Client) {
8488
}
8589
wg.Wait()
8690

87-
client.Disconnect(20 * 1000) //wait 10sek at most
91+
//we have to disconnect at last because one closeable unsubscripe all topics
92+
client.Disconnect(10 * 1000) //wait 10sek at most
8893
}

internal/cmd/executor.go

+8
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func (c *CommandExecutor) ExecuteCommand(cmd string, args []string) ([]byte, err
2727
}
2828

2929
func (c *CommandExecutor) ExecuteCommandWithContext(cmd string, args []string, executionContext context.Context) ([]byte, error) {
30+
//register the context so that we have a chance to cancel the commands later
3031
ctx := c.registerContext(executionContext)
3132
c.openExecutions.Add(1)
3233
defer c.openExecutions.Done()
@@ -46,16 +47,19 @@ func (c *CommandExecutor) ExecuteCommandWithContext(cmd string, args []string, e
4647
}
4748

4849
func (c *CommandExecutor) registerContext(parentContext context.Context) context.Context {
50+
//lock to ensure the map is thread-safe
4951
c.lock.Lock()
5052
defer c.lock.Unlock()
5153

54+
//wrap the given context so that we can later cancel the context (see Close func)
5255
ctx, cancelFunc := context.WithCancel(parentContext)
5356
c.usedContext[ctx] = cancelFunc
5457

5558
return ctx
5659
}
5760

5861
func (c *CommandExecutor) releaseContext(ctx context.Context) {
62+
//lock to ensure the map is thread-safe
5963
c.lock.Lock()
6064
defer c.lock.Unlock()
6165

@@ -65,19 +69,23 @@ func (c *CommandExecutor) releaseContext(ctx context.Context) {
6569
func (c *CommandExecutor) Close(timeout time.Duration) error {
6670
wg := sync.WaitGroup{}
6771

72+
//this lock ensures that no new commands can start
6873
c.lock.Lock()
6974
defer c.lock.Unlock()
7075

76+
//cancel all in parallel
7177
for ctx, cancelFunc := range c.usedContext {
7278
wg.Add(1)
7379

80+
//trigger cancel-func and wait for it
7481
go func(c context.Context, cf context.CancelFunc) {
7582
defer wg.Done()
7683
cf() //call cancel
7784
<-ctx.Done() //wait for cancellation
7885
}(ctx, cancelFunc)
7986
}
8087

88+
//wait for all commands to be stopped
8189
wgChan := make(chan bool)
8290
go func() {
8391
wg.Wait()

internal/mqtt/config/topic.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Availability struct {
2626
type Trigger struct {
2727
Name string `json:"name"`
2828
Topic string `json:"topic"`
29+
Icon string `json:"icon"`
2930
Command Command `json:"command"`
3031
}
3132

@@ -34,6 +35,7 @@ type Sensor struct {
3435
ResultTopic string `json:"topic"`
3536
Interval Interval `json:"interval"`
3637
Unit string `json:"unit"`
38+
Icon string `json:"icon"`
3739
Command Command `json:"command"`
3840
}
3941

@@ -42,15 +44,15 @@ type Command struct {
4244
Arguments []string `json:"arguments"`
4345
}
4446

45-
func LoadTopicConfiguration(configFile, deviceId string) (TopicConfigurations, error) {
46-
file, err := os.Open(configFile)
47+
func LoadTopicConfiguration(configFilePath, deviceId string) (TopicConfigurations, error) {
48+
configFile, err := os.Open(configFilePath)
4749
if err != nil {
4850
return TopicConfigurations{}, fmt.Errorf("error while opening topic configuration file: %w", err)
4951
}
50-
defer file.Close()
52+
defer configFile.Close()
5153

5254
var topicConfig TopicConfigurations
53-
err = json.NewDecoder(file).Decode(&topicConfig)
55+
err = json.NewDecoder(configFile).Decode(&topicConfig)
5456
if err != nil {
5557
return TopicConfigurations{}, fmt.Errorf("could not read topic configuration file: %w", err)
5658
}

internal/mqtt/hassio/client.go

+86-32
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,20 @@ import (
1313
)
1414

1515
type generalConfig struct {
16-
Name string `json:"name"`
17-
AvailabilityTopic string `json:"avty_t,omitempty"`
18-
PayloadAvailable string `json:"pl_avail,omitempty"`
19-
PayloadNotAvailable string `json:"pl_not_avail,omitempty"`
20-
UniqueId string `json:"uniq_id"`
21-
Device *device `json:"dev,omitempty"`
16+
Name string `json:"name"`
17+
AvailabilityTopic string `json:"avty_t,omitempty"`
18+
PayloadAvailable string `json:"pl_avail,omitempty"`
19+
PayloadNotAvailable string `json:"pl_not_avail,omitempty"`
20+
UniqueId string `json:"uniq_id"`
21+
Icon string `json:"ic,omitempty"`
22+
Device device `json:"dev,omitempty"`
2223
}
2324

2425
type sensorConfig struct {
2526
generalConfig
2627

2728
StateTopic string `json:"stat_t"`
2829
MeasurementUnit string `json:"unit_of_meas,omitempty"`
29-
Icon string `json:"ic,omitempty"`
3030
ForceUpdate *bool `json:"frc_upd,omitempty"`
3131
}
3232

@@ -82,7 +82,17 @@ func (c *Client) PublishDiscoveryConfig(config config.TopicConfigurations) {
8282
//trigger
8383
for _, trigger := range config.Trigger {
8484
targetTopic := fmt.Sprintf("%sswitch/%s/%s/config", c.TopicPrefix, c.DeviceId, friendlyName(trigger.Name))
85-
payload := c.generatePayloadForTriggerAction(config.Availability, trigger)
85+
payload := c.generateSwitchPayloadForTriggerAction(config.Availability, trigger)
86+
c.MqttClient.Publish(targetTopic, byte(0), false, payload)
87+
88+
//publish the trigger-result as sensor data
89+
targetTopic = fmt.Sprintf("%ssensor/%s_%s/result/config", c.TopicPrefix, c.DeviceId, friendlyName(trigger.Name))
90+
payload = c.generateResultPayloadForTriggerAction(config.Availability, trigger)
91+
c.MqttClient.Publish(targetTopic, byte(0), false, payload)
92+
93+
//publish the trigger-state as sensor data
94+
targetTopic = fmt.Sprintf("%ssensor/%s_%s/state/config", c.TopicPrefix, c.DeviceId, friendlyName(trigger.Name))
95+
payload = c.generateStatePayloadForTriggerAction(config.Availability, trigger)
8696
c.MqttClient.Publish(targetTopic, byte(0), false, payload)
8797
}
8898
}
@@ -91,20 +101,24 @@ func friendlyName(name string) string {
91101
return strings.Replace(name, " ", "_", -1)
92102
}
93103

104+
func (c *Client) buildDevice() device {
105+
return device{
106+
Name: c.DeviceName,
107+
Ids: []string{c.DeviceId},
108+
Manufacturer: "rainu",
109+
Model: runtime.GOOS,
110+
Version: "mqtt-executor",
111+
}
112+
}
113+
94114
func (c *Client) generatePayloadForStatus(availability *config.Availability) []byte {
95115
conf := sensorConfig{
96116
generalConfig: generalConfig{
97117
Name: "Status",
98118
PayloadAvailable: availability.Payload.Available,
99119
PayloadNotAvailable: availability.Payload.Unavailable,
100120
UniqueId: fmt.Sprintf("%s_status", c.DeviceId),
101-
Device: &device{
102-
Name: c.DeviceName,
103-
Ids: []string{c.DeviceId},
104-
Manufacturer: "rainu",
105-
Model: runtime.GOOS,
106-
Version: "mqtt-executor",
107-
},
121+
Device: c.buildDevice(),
108122
},
109123
StateTopic: availability.Topic,
110124
}
@@ -115,6 +129,7 @@ func (c *Client) generatePayloadForStatus(availability *config.Availability) []b
115129

116130
payload, err := json.Marshal(conf)
117131
if err != nil {
132+
//the "marshalling" is relatively safe - it should never appear at runtime
118133
panic(err)
119134
}
120135
return payload
@@ -125,37 +140,31 @@ func (c *Client) generatePayloadForSensor(availability *config.Availability, sen
125140
conf := sensorConfig{
126141
generalConfig: generalConfig{
127142
Name: sensor.Name,
143+
Icon: sensor.Icon,
128144
UniqueId: fmt.Sprintf("%s_%s", c.DeviceId, friendlyName(sensor.Name)),
129-
Device: &device{
130-
Ids: []string{c.DeviceId},
131-
},
145+
Device: c.buildDevice(),
132146
},
133147
StateTopic: sensor.ResultTopic,
134148
MeasurementUnit: sensor.Unit,
135-
Icon: "",
136149
ForceUpdate: &bTrue,
137150
}
138-
if availability != nil {
139-
conf.AvailabilityTopic = availability.Topic
140-
conf.PayloadAvailable = availability.Payload.Available
141-
conf.PayloadNotAvailable = availability.Payload.Unavailable
142-
}
151+
addAvailability(&conf.generalConfig, availability)
143152

144153
payload, err := json.Marshal(conf)
145154
if err != nil {
155+
//the "marshalling" is relatively safe - it should never appear at runtime
146156
panic(err)
147157
}
148158
return payload
149159
}
150160

151-
func (c *Client) generatePayloadForTriggerAction(availability *config.Availability, trigger config.Trigger) []byte {
161+
func (c *Client) generateSwitchPayloadForTriggerAction(availability *config.Availability, trigger config.Trigger) []byte {
152162
conf := triggerConfig{
153163
generalConfig: generalConfig{
154164
Name: fmt.Sprintf("%s", trigger.Name),
165+
Icon: trigger.Icon,
155166
UniqueId: fmt.Sprintf("%s_%s", c.DeviceId, friendlyName(trigger.Name)),
156-
Device: &device{
157-
Ids: []string{c.DeviceId},
158-
},
167+
Device: c.buildDevice(),
159168
},
160169
CommandTopic: trigger.Topic,
161170
PayloadStart: mqtt.PayloadStart,
@@ -164,15 +173,60 @@ func (c *Client) generatePayloadForTriggerAction(availability *config.Availabili
164173
StateRunning: mqtt.PayloadStatusRunning,
165174
StateStopped: mqtt.PayloadStatusStopped,
166175
}
167-
if availability != nil {
168-
conf.AvailabilityTopic = availability.Topic
169-
conf.PayloadAvailable = availability.Payload.Available
170-
conf.PayloadNotAvailable = availability.Payload.Unavailable
176+
addAvailability(&conf.generalConfig, availability)
177+
178+
payload, err := json.Marshal(conf)
179+
if err != nil {
180+
//the "marshalling" is relatively safe - it should never appear at runtime
181+
panic(err)
171182
}
183+
return payload
184+
}
185+
186+
func (c *Client) generateResultPayloadForTriggerAction(availability *config.Availability, trigger config.Trigger) []byte {
187+
conf := sensorConfig{
188+
generalConfig: generalConfig{
189+
Name: fmt.Sprintf("%s - Result", trigger.Name),
190+
Icon: trigger.Icon,
191+
UniqueId: fmt.Sprintf("%s_%s_result", c.DeviceId, friendlyName(trigger.Name)),
192+
Device: c.buildDevice(),
193+
},
194+
StateTopic: fmt.Sprintf("%s/%s", trigger.Topic, mqtt.TopicSuffixResult),
195+
}
196+
addAvailability(&conf.generalConfig, availability)
197+
198+
payload, err := json.Marshal(conf)
199+
if err != nil {
200+
//the "marshalling" is relatively safe - it should never appear at runtime
201+
panic(err)
202+
}
203+
return payload
204+
}
205+
206+
func (c *Client) generateStatePayloadForTriggerAction(availability *config.Availability, trigger config.Trigger) []byte {
207+
conf := sensorConfig{
208+
generalConfig: generalConfig{
209+
Name: fmt.Sprintf("%s - State", trigger.Name),
210+
Icon: trigger.Icon,
211+
UniqueId: fmt.Sprintf("%s_%s_state", c.DeviceId, friendlyName(trigger.Name)),
212+
Device: c.buildDevice(),
213+
},
214+
StateTopic: fmt.Sprintf("%s/%s", trigger.Topic, mqtt.TopicSuffixState),
215+
}
216+
addAvailability(&conf.generalConfig, availability)
172217

173218
payload, err := json.Marshal(conf)
174219
if err != nil {
220+
//the "marshalling" is relatively safe - it should never appear at runtime
175221
panic(err)
176222
}
177223
return payload
178224
}
225+
226+
func addAvailability(config *generalConfig, availability *config.Availability) {
227+
if availability != nil {
228+
config.AvailabilityTopic = availability.Topic
229+
config.PayloadAvailable = availability.Payload.Available
230+
config.PayloadNotAvailable = availability.Payload.Unavailable
231+
}
232+
}

internal/mqtt/sensor.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type SensorWorker struct {
2020

2121
func (s *SensorWorker) Initialise(publishQOS byte, sensorConfigs []config.Sensor) {
2222

23+
//generate a context so that we can cancel it later (see Close func)
2324
var ctx context.Context
2425
ctx, s.cancelFunc = context.WithCancel(context.Background())
2526

@@ -32,24 +33,25 @@ func (s *SensorWorker) Initialise(publishQOS byte, sensorConfigs []config.Sensor
3233
func (s *SensorWorker) runSensor(ctx context.Context, publishQOS byte, sensorConf config.Sensor) {
3334
defer s.waitGroup.Done()
3435

35-
//first one
36-
s.executeCommand(publishQOS, sensorConf)
36+
//first execution
37+
s.executeCommand(ctx, publishQOS, sensorConf)
3738

3839
ticker := time.Tick(time.Duration(sensorConf.Interval))
3940
for {
41+
//wait until next tick or shutdown
4042
select {
4143
case <-ticker:
42-
s.executeCommand(publishQOS, sensorConf)
44+
s.executeCommand(ctx, publishQOS, sensorConf)
4345
case <-ctx.Done():
4446
return
4547
}
4648
}
4749
}
4850

49-
func (s *SensorWorker) executeCommand(publishQOS byte, sensorConf config.Sensor) {
50-
output, execErr := s.Executor.ExecuteCommand(sensorConf.Command.Name, sensorConf.Command.Arguments)
51+
func (s *SensorWorker) executeCommand(ctx context.Context, publishQOS byte, sensorConf config.Sensor) {
52+
output, execErr := s.Executor.ExecuteCommandWithContext(sensorConf.Command.Name, sensorConf.Command.Arguments, ctx)
5153
if execErr != nil {
52-
s.MqttClient.Publish(sensorConf.ResultTopic, publishQOS, false, "<FAILED> "+execErr.Error())
54+
s.MqttClient.Publish(sensorConf.ResultTopic, publishQOS, false, "<FAILED>;"+execErr.Error())
5355
return
5456
}
5557

@@ -58,6 +60,7 @@ func (s *SensorWorker) executeCommand(publishQOS byte, sensorConf config.Sensor)
5860

5961
func (s *SensorWorker) Close(timeout time.Duration) error {
6062
if s.cancelFunc != nil {
63+
//close the context to interrupt possible running commands
6164
s.cancelFunc()
6265
}
6366

0 commit comments

Comments
 (0)