7
7
"github.com/rainu/mqtt-executor/internal/cmd"
8
8
"github.com/rainu/mqtt-executor/internal/mqtt/config"
9
9
"go.uber.org/zap"
10
+ "strings"
11
+ "sync"
10
12
"time"
11
13
)
12
14
@@ -15,58 +17,129 @@ const (
15
17
TopicSuffixResult = "RESULT"
16
18
PayloadStatusRunning = "RUNNING"
17
19
PayloadStatusStopped = "STOPPED"
20
+ PayloadStart = "START"
18
21
PayloadStop = "STOP"
19
22
)
20
23
21
24
type Trigger struct {
22
- triggerConfigs []config.Trigger
25
+ lock sync.RWMutex
26
+ runningCommands map [string ]context.CancelFunc
27
+ triggerConfigs []config.Trigger
28
+ publishQOS byte
23
29
24
30
Executor * cmd.CommandExecutor
25
31
MqttClient MQTT.Client
26
32
}
27
33
28
34
func (t * Trigger ) Initialise (subscribeQOS , publishQOS byte , triggerConfigs []config.Trigger ) {
35
+ t .publishQOS = publishQOS
36
+ t .runningCommands = map [string ]context.CancelFunc {}
29
37
30
38
for _ , triggerConf := range triggerConfigs {
31
39
t .triggerConfigs = append (t .triggerConfigs , triggerConf )
32
- t .MqttClient .Subscribe (triggerConf .Topic , subscribeQOS , t .createTriggerHandler (publishQOS , triggerConf ))
40
+ t .MqttClient .Subscribe (triggerConf .Topic , subscribeQOS , t .createTriggerHandler (triggerConf ))
41
+ t .publishStatus (triggerConf .Topic , PayloadStatusStopped )
33
42
}
34
-
35
- //TODO: publish stopped state for each command (inital state)
36
43
}
37
44
38
- func (t * Trigger ) createTriggerHandler (publishQOS byte , triggerConfig config.Trigger ) MQTT.MessageHandler {
45
+ func (t * Trigger ) createTriggerHandler (triggerConfig config.Trigger ) MQTT.MessageHandler {
39
46
return func (client MQTT.Client , message MQTT.Message ) {
40
47
zap .L ().Info ("Incoming message: " ,
41
48
zap .String ("topic" , message .Topic ()),
42
49
zap .ByteString ("payload" , message .Payload ()),
43
50
)
44
51
45
- action , exists := triggerConfig .Actions [string (message .Payload ())]
46
- if ! exists {
47
- zap .L ().Warn ("Command is not configured" )
48
- return
52
+ action := strings .ToUpper (string (message .Payload ()))
53
+
54
+ switch action {
55
+ case PayloadStart :
56
+ if t .isCommandRunning (triggerConfig ) {
57
+ zap .L ().Warn ("Command is already running. Skip execution!" , zap .String ("trigger" , triggerConfig .Name ))
58
+ return
59
+ }
60
+
61
+ go t .executeCommand (message .Topic (), triggerConfig )
62
+ case PayloadStop :
63
+ if ! t .isCommandRunning (triggerConfig ) {
64
+ return
65
+ }
66
+ t .interruptCommand (triggerConfig )
67
+ t .unregisterCommand (triggerConfig )
68
+ default :
69
+ zap .L ().Warn ("Invalid payload" )
49
70
}
50
- cmd := action .Command
51
-
52
- go t .executeCommand (publishQOS , message .Topic (), string (message .Payload ()), cmd )
53
71
}
54
72
}
55
73
56
- func (t * Trigger ) executeCommand (publishQOS byte , topic , action string , command config.Command ) {
57
- stateTopic := fmt .Sprintf ("%s/%s/%s" , topic , action , TopicSuffixState )
58
- resultTopic := fmt .Sprintf ("%s/%s/%s" , topic , action , TopicSuffixResult )
74
+ func (t * Trigger ) isCommandRunning (trigger config.Trigger ) bool {
75
+ t .lock .RLock ()
76
+ defer t .lock .RUnlock ()
77
+
78
+ _ , exist := t .runningCommands [trigger .Name ]
79
+ return exist
80
+ }
81
+
82
+ func (t * Trigger ) registerCommand (trigger config.Trigger ) context.Context {
83
+ t .lock .Lock ()
84
+ defer t .lock .Unlock ()
85
+
86
+ ctx , cancelFunc := context .WithCancel (context .Background ())
87
+ t .runningCommands [trigger .Name ] = cancelFunc
88
+
89
+ return ctx
90
+ }
91
+
92
+ func (t * Trigger ) unregisterCommand (trigger config.Trigger ) {
93
+ t .lock .Lock ()
94
+ defer t .lock .Unlock ()
95
+
96
+ delete (t .runningCommands , trigger .Name )
97
+ }
98
+
99
+ func (t * Trigger ) interruptCommand (trigger config.Trigger ) {
100
+ t .lock .RLock ()
101
+ defer t .lock .RUnlock ()
59
102
60
- t .MqttClient .Publish (stateTopic , publishQOS , false , PayloadStatusRunning )
61
- defer t .MqttClient .Publish (stateTopic , publishQOS , false , PayloadStatusStopped )
103
+ //execute corresponding cancel func
104
+ t .runningCommands [trigger.Name ]()
105
+ }
106
+
107
+ func (t * Trigger ) executeCommand (topic string , trigger config.Trigger ) {
108
+ ctx := t .registerCommand (trigger )
109
+ defer t .unregisterCommand (trigger )
110
+
111
+ t .publishStatus (topic , PayloadStatusRunning )
112
+ defer t .publishStatus (topic , PayloadStatusStopped )
62
113
63
- output , execErr := t .Executor .ExecuteCommandWithContext (command . Name , command . Arguments , context . Background () )
114
+ output , execErr := t .Executor .ExecuteCommandWithContext (trigger . Command . Name , trigger . Command . Arguments , ctx )
64
115
if execErr != nil {
65
- t .MqttClient .Publish (resultTopic , publishQOS , false , "<FAILED> " + execErr .Error ())
116
+ if execErr == context .Canceled {
117
+ t .publishResult (topic , "<INTERRUPTED>" )
118
+ } else {
119
+ t .publishResult (topic , "<FAILED>;" + execErr .Error ())
120
+ }
66
121
return
67
122
}
68
123
69
- t .MqttClient .Publish (resultTopic , publishQOS , false , output )
124
+ t .publishResult (topic , output )
125
+ }
126
+
127
+ func (t * Trigger ) publishStatus (parentTopic , status string ) MQTT.Token {
128
+ stateTopic := t .buildStateTopic (parentTopic )
129
+ return t .MqttClient .Publish (stateTopic , t .publishQOS , false , status )
130
+ }
131
+
132
+ func (t * Trigger ) publishResult (parentTopic string , result interface {}) MQTT.Token {
133
+ resultTopic := t .buildResultTopic (parentTopic )
134
+ return t .MqttClient .Publish (resultTopic , t .publishQOS , false , result )
135
+ }
136
+
137
+ func (t * Trigger ) buildStateTopic (parentTopic string ) string {
138
+ return fmt .Sprintf ("%s/%s" , parentTopic , TopicSuffixState )
139
+ }
140
+
141
+ func (t * Trigger ) buildResultTopic (parentTopic string ) string {
142
+ return fmt .Sprintf ("%s/%s" , parentTopic , TopicSuffixResult )
70
143
}
71
144
72
145
func (t * Trigger ) Close (timeout time.Duration ) error {
0 commit comments