Skip to content

Commit 7c2e57f

Browse files
committed
Initial commit
0 parents  commit 7c2e57f

File tree

9 files changed

+394
-0
lines changed

9 files changed

+394
-0
lines changed

.gitignore

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
.idea
2+
3+
# Binaries for programs and plugins
4+
*.exe
5+
*.exe~
6+
*.dll
7+
*.so
8+
*.dylib
9+
10+
# Test binary, built with `go test -c`
11+
*.test
12+
13+
# Output of the go coverage tool, specifically when used with LiteIDE
14+
*.out
15+
16+
# Dependency directories (remove the comment below to include it)
17+
# vendor/

LICENSE

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2020 rainu
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

README.md

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# mqtt-executor
2+
3+
A simple MQTT client written in go that subscribes to a configurable list of MQTT topics on the specified broker and
4+
executes a given shell script/command whenever a message arrives.
5+
6+
# Configuration
7+
Create a configuration file named "config.json"
8+
```json
9+
{
10+
"sys/file": {
11+
"create": ["/usr/bin/touch", "/tmp/example"],
12+
"remove": ["/bin/rm", "-f", "/tmp/example"],
13+
"info": ["/bin/ls", "-l", "/tmp/example"]
14+
}
15+
}
16+
```
17+
18+
# Usage
19+
20+
Start the tool with the path to the config file and the URL of the MQTT broker
21+
```bash
22+
mqtt-executor -broker tcp://127.0.0.1:1883 -config /path/to/config.json
23+
```
24+
25+
# Trigger command execution
26+
27+
```bash
28+
mosquitto_pub -d -t sys/file -m "create"
29+
mosquitto_pub -d -t sys/file -m "info"
30+
mosquitto_pub -d -t sys/file -m "remove"
31+
```
32+
33+
# Get the command results
34+
35+
Each command result will be written in **<incomingTopicName>/RESULT**. For example:
36+
37+
```bash
38+
mosquitto_sub -d -t sys/file/RESULT
39+
```

cmd/mqtt-executor/cmd_executor.go

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
MQTT "github.com/eclipse/paho.mqtt.golang"
8+
"go.uber.org/zap"
9+
"os/exec"
10+
"sync"
11+
"time"
12+
)
13+
14+
type CommandExecutor struct {
15+
lock sync.RWMutex
16+
usedContext map[context.Context]context.CancelFunc
17+
openExecutions sync.WaitGroup
18+
}
19+
20+
var commandExecutor CommandExecutor
21+
22+
func init() {
23+
commandExecutor = CommandExecutor{
24+
lock: sync.RWMutex{},
25+
usedContext: map[context.Context]context.CancelFunc{},
26+
}
27+
}
28+
29+
func (c *CommandExecutor) ExecuteCommand(client MQTT.Client, message MQTT.Message, commandAndArgs []string) {
30+
ctx := c.createContext()
31+
c.openExecutions.Add(1)
32+
defer c.openExecutions.Done()
33+
34+
command := exec.CommandContext(ctx, commandAndArgs[0], commandAndArgs[1:]...)
35+
out, execErr := command.CombinedOutput()
36+
if execErr != nil {
37+
zap.L().Error("Command execution failed.", zap.Error(execErr))
38+
client.Publish(fmt.Sprintf("%s/RESULT", message.Topic()), byte(*Config.PublishQOS), false, "<FAILED> "+execErr.Error())
39+
return
40+
}
41+
42+
client.Publish(fmt.Sprintf("%s/RESULT", message.Topic()), byte(*Config.PublishQOS), false, out)
43+
c.releaseContext(ctx)
44+
}
45+
46+
func (c *CommandExecutor) createContext() context.Context {
47+
c.lock.Lock()
48+
defer c.lock.Unlock()
49+
50+
ctx, cancelFunc := context.WithCancel(context.Background())
51+
c.usedContext[ctx] = cancelFunc
52+
53+
return ctx
54+
}
55+
56+
func (c *CommandExecutor) releaseContext(ctx context.Context) {
57+
c.lock.Lock()
58+
defer c.lock.Unlock()
59+
60+
delete(c.usedContext, ctx)
61+
}
62+
63+
func (c *CommandExecutor) Close(timeout time.Duration) error {
64+
wg := sync.WaitGroup{}
65+
66+
c.lock.Lock()
67+
defer c.lock.Unlock()
68+
69+
for ctx, cancelFunc := range c.usedContext {
70+
wg.Add(1)
71+
72+
go func(c context.Context, cf context.CancelFunc) {
73+
defer wg.Done()
74+
cf() //call cancel
75+
<-ctx.Done() //wait for cancellation
76+
}(ctx, cancelFunc)
77+
}
78+
79+
wgChan := make(chan bool)
80+
go func() {
81+
wg.Wait()
82+
c.openExecutions.Wait()
83+
84+
wgChan <- true
85+
}()
86+
87+
//wait for WaitGroup or Timeout
88+
select {
89+
case <-wgChan:
90+
return nil
91+
case <-time.After(timeout):
92+
return errors.New("timeout exceeded")
93+
}
94+
}

cmd/mqtt-executor/config.go

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"flag"
6+
MQTT "github.com/eclipse/paho.mqtt.golang"
7+
"go.uber.org/zap"
8+
"os"
9+
)
10+
11+
type config struct {
12+
Broker *string
13+
SubscribeQOS *int
14+
PublishQOS *int
15+
Username *string
16+
Password *string
17+
ClientId *string
18+
19+
TopicConfigFile *string
20+
TopicConfigurations TopicConfigurations
21+
}
22+
23+
var Config config
24+
25+
type TopicConfigurations map[string]TopicConfiguration
26+
type TopicConfiguration map[string][]string
27+
28+
func LoadConfig() {
29+
Config = config{
30+
Broker: flag.String("broker", "", "The broker URI. ex: tcp://127.0.0.1:1883"),
31+
SubscribeQOS: flag.Int("sub-qos", 0, "The Quality of Service for subscription 0,1,2 (default 0)"),
32+
PublishQOS: flag.Int("pub-qos", 0, "The Quality of Service for publishing 0,1,2 (default 0)"),
33+
Username: flag.String("user", "", "The User (optional)"),
34+
Password: flag.String("password", "", "The password (optional)"),
35+
ClientId: flag.String("client-id", "mqtt-executor", "The ClientID (optional)"),
36+
37+
TopicConfigFile: flag.String("config", "./config.json", "The topic configuration file"),
38+
}
39+
flag.Parse()
40+
41+
if *Config.Broker == "" {
42+
zap.L().Fatal("Broker is missing!")
43+
}
44+
if *Config.SubscribeQOS != 0 && *Config.SubscribeQOS != 1 && *Config.SubscribeQOS != 2 {
45+
zap.L().Fatal("Invalid qos level!")
46+
}
47+
if *Config.PublishQOS != 0 && *Config.PublishQOS != 1 && *Config.PublishQOS != 2 {
48+
zap.L().Fatal("Invalid qos level!")
49+
}
50+
if *Config.TopicConfigFile == "" {
51+
zap.L().Fatal("Topic configuration file is missing!")
52+
} else {
53+
file, err := os.Open(*Config.TopicConfigFile)
54+
if err != nil {
55+
zap.L().Fatal("Error while opening topic configuration file: %s", zap.Error(err))
56+
}
57+
defer file.Close()
58+
59+
err = json.NewDecoder(file).Decode(&Config.TopicConfigurations)
60+
if err != nil {
61+
zap.L().Fatal("Could not read topic configuration file: %s", zap.Error(err))
62+
}
63+
}
64+
}
65+
66+
func (c *config) GetMQTTOpts() *MQTT.ClientOptions {
67+
opts := MQTT.NewClientOptions()
68+
69+
opts.AddBroker(*c.Broker)
70+
if *c.Username != "" {
71+
opts.SetUsername(*c.Username)
72+
}
73+
if *c.Password != "" {
74+
opts.SetPassword(*c.Password)
75+
}
76+
if *c.ClientId != "" {
77+
opts.SetClientID(*c.ClientId)
78+
}
79+
80+
return opts
81+
}

cmd/mqtt-executor/log.go

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package main
2+
3+
import "go.uber.org/zap"
4+
5+
func init() {
6+
logger, _ := zap.NewDevelopment()
7+
zap.ReplaceGlobals(logger)
8+
defer zap.L().Sync()
9+
}

cmd/mqtt-executor/main.go

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package main
2+
3+
import (
4+
MQTT "github.com/eclipse/paho.mqtt.golang"
5+
"go.uber.org/zap"
6+
"os"
7+
"os/signal"
8+
"syscall"
9+
"time"
10+
)
11+
12+
func main() {
13+
LoadConfig()
14+
15+
signals := make(chan os.Signal, 1)
16+
defer close(signals)
17+
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
18+
19+
client := MQTT.NewClient(Config.GetMQTTOpts())
20+
21+
if token := client.Connect(); token.Wait() && token.Error() != nil {
22+
zap.L().Fatal("Error while connecting to mqtt broker: %s", zap.Error(token.Error()))
23+
}
24+
25+
for topicName, config := range Config.TopicConfigurations {
26+
client.Subscribe(topicName, byte(*Config.SubscribeQOS), createMessageHandler(config))
27+
}
28+
29+
// wait for interrupt
30+
<-signals
31+
32+
shutdown(client)
33+
}
34+
35+
func createMessageHandler(topicConfig TopicConfiguration) MQTT.MessageHandler {
36+
return func(client MQTT.Client, message MQTT.Message) {
37+
zap.L().Info("Incoming message: ",
38+
zap.String("topic", message.Topic()),
39+
zap.ByteString("payload", message.Payload()),
40+
)
41+
42+
cmdArgs, exists := topicConfig[string(message.Payload())]
43+
if !exists {
44+
zap.L().Warn("Command is not configured")
45+
return
46+
}
47+
48+
go commandExecutor.ExecuteCommand(client, message, cmdArgs)
49+
}
50+
}
51+
52+
func shutdown(client MQTT.Client) {
53+
zap.L().Info("Shutting down...")
54+
for topicName, _ := range Config.TopicConfigurations {
55+
client.Unsubscribe(topicName)
56+
}
57+
58+
err := commandExecutor.Close(20 * time.Second)
59+
if err != nil {
60+
zap.L().Error("Timeout while waiting command execution!", zap.Error(err))
61+
}
62+
63+
client.Disconnect(20 * 1000) //wait 10sek at most
64+
}

go.mod

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module github.com/rainu/mqtt-executor
2+
3+
go 1.14
4+
5+
require (
6+
github.com/eclipse/paho.mqtt.golang v1.2.0
7+
go.uber.org/zap v1.15.0
8+
golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5 // indirect
9+
)

0 commit comments

Comments
 (0)