Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(conn): connection status for shared stream #3231

Merged
merged 4 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/en_US/api/restapi/connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ Return all connections' information and status.
### Get a single connection status

```shell
GET http://localhost:9081/connection/{id}
GET http://localhost:9081/connections/{id}
```

### Delete a single connection

When deleting a connection, it will check whether there are rules using the connection. If there are rules using the connection, the connection cannot be deleted.

```shell
DELETE http://localhost:9081/connection/{id}
DELETE http://localhost:9081/connections/{id}
```

## Connectivity check
Expand Down
4 changes: 2 additions & 2 deletions docs/zh_CN/api/restapi/connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ GET http://localhost:9081/connections
### 获取单个连接状态

```shell
GET http://localhost:9081/connection/{id}
GET http://localhost:9081/connections/{id}
```

### 删除单个连接

删除连接时会是否有规则正在使用连接,如果存在规则正在使用连接,那么连接将无法被删除。

```shell
DELETE http://localhost:9081/connection/{id}
DELETE http://localhost:9081/connections/{id}
```

## 连通性检查
Expand Down
2 changes: 1 addition & 1 deletion extensions/impl/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
if err != nil {
sch(api.ConnectionDisconnected, err.Error())
} else {
sch(api.ConnectionConnecting, "")
sch(api.ConnectionConnected, "")

Check warning on line 96 in extensions/impl/influx/influx.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/influx/influx.go#L96

Added line #L96 was not covered by tests
}
}()
m.cli, err = client.NewHTTPClient(client.HTTPConfig{
Expand Down
2 changes: 1 addition & 1 deletion extensions/impl/influx2/influx2.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
if err != nil {
sch(api.ConnectionDisconnected, err.Error())
} else {
sch(api.ConnectionConnecting, "")
sch(api.ConnectionConnected, "")

Check warning on line 145 in extensions/impl/influx2/influx2.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/influx2/influx2.go#L145

Added line #L145 was not covered by tests
}
}()
m.cli = client.NewClientWithOptions(m.conf.Addr, m.conf.Token, options)
Expand Down
2 changes: 1 addition & 1 deletion extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler)
if err != nil {
sch(api.ConnectionDisconnected, err.Error())
} else {
sch(api.ConnectionConnecting, "")
sch(api.ConnectionConnected, "")
}
return err
}
Expand Down
2 changes: 1 addition & 1 deletion extensions/impl/kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (k *KafkaSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler
if err != nil {
sch(api.ConnectionDisconnected, err.Error())
} else {
sch(api.ConnectionConnecting, "")
sch(api.ConnectionConnected, "")
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion extensions/impl/zmq/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (m *zmqSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) (e
if err != nil {
sch(api.ConnectionDisconnected, err.Error())
} else {
sch(api.ConnectionConnecting, "")
sch(api.ConnectionConnected, "")
}
}()
m.publisher, err = zmq.NewSocket(zmq.PUB)
Expand Down
2 changes: 1 addition & 1 deletion extensions/impl/zmq/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *zmqSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler)
if err != nil {
sch(api.ConnectionDisconnected, err.Error())
} else {
sch(api.ConnectionConnecting, "")
sch(api.ConnectionConnected, "")
}
}()
// Create a new ZeroMQ context
Expand Down
253 changes: 253 additions & 0 deletions fvt/conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fvt

import (
"fmt"
"net/http"
"testing"
"time"

mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/stretchr/testify/suite"
)

type ConnectionTestSuite struct {
suite.Suite
}

func TestConnectionTestSuite(t *testing.T) {
suite.Run(t, new(ConnectionTestSuite))
}

func (s *ConnectionTestSuite) TestConnStatus() {
// Connect when broker is not started
s.Run("create rule when broker is not started", func() {
// create connection
connStr := `{
"id": "conn1",
"typ":"mqtt",
"props": {
"server": "tcp://127.0.0.1:3883"
}
}`
resp, err := client.Post("connections", connStr)
s.Require().NoError(err)
fmt.Println(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)
conf := map[string]any{
"connectionSelector": "conn1",
}
resp, err = client.CreateConf("sources/mqtt/confKeys/ttt", conf)
s.Require().NoError(err)
s.Require().Equal(http.StatusOK, resp.StatusCode)

streamSql := `{"sql": "create stream tttStream () WITH (TYPE=\"mqtt\", DATASOURCE=\"ttt\", FORMAT=\"json\", CONF_KEY=\"ttt\", SHARED=\"true\")"}`
resp, err = client.CreateStream(streamSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

ruleSql := `{
"id": "ruleTTT1",
"sql": "SELECT * FROM tttStream",
"actions": [
{
"nop": {
}
}
]
}`
resp, err = client.CreateRule(ruleSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)
// Assert connection status
r := TryAssert(10, ConstantInterval, func() bool {
get, e := client.Get("connections/conn1")
s.Require().NoError(e)
resultMap, e := GetResponseResultMap(get)
fmt.Println(resultMap)
s.Require().NoError(e)
return resultMap["status"] == "disconnected"
})
s.Require().True(r)
// Assert rule metrics
r = TryAssert(10, ConstantInterval, func() bool {
metrics, e := client.GetRuleStatus("ruleTTT1")
s.Require().NoError(e)
fmt.Println(metrics)
return metrics["source_conn1/ttt_0_connection_status"] == -1.0
})
s.Require().True(r)
})
var (
server *mqtt.Server
tcp *listeners.TCP
)
s.Run("start broker, automatically connected", func() {
// Create the new MQTT Server.
server = mqtt.New(nil)
// Allow all connections.
_ = server.AddHook(new(auth.AllowHook), nil)
// Create a TCP listener on a standard port.
tcp = listeners.NewTCP(listeners.Config{ID: "testcon", Address: ":3883"})
err := server.AddListener(tcp)
s.Require().NoError(err)
go func() {
err = server.Serve()
fmt.Println(err)
}()
fmt.Println(tcp.Address())
// Assert rule metrics
r := TryAssert(10, ConstantInterval, func() bool {
metrics, e := client.GetRuleStatus("ruleTTT1")
s.Require().NoError(e)
fmt.Println(metrics)
return metrics["source_conn1/ttt_0_connection_status"] == 1.0
})
s.Require().True(r)
// Assert connection status
r = TryAssert(10, ConstantInterval, func() bool {
get, e := client.Get("connections/conn1")
s.Require().NoError(e)
resultMap, e := GetResponseResultMap(get)
fmt.Println(resultMap)
s.Require().NoError(e)
return resultMap["status"] == "connected"
})
s.Require().True(r)
})
s.Run("attach rule, get status", func() {
ruleSql := `{
"id": "ruleTTT2",
"sql": "SELECT * FROM tttStream",
"actions": [
{
"mqtt": {
"connectionSelector": "conn1",
"topic":"result"
}
}
]
}`
resp, err := client.CreateRule(ruleSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)
// Assert rule2 metrics
r := TryAssert(10, ConstantInterval, func() bool {
metrics, e := client.GetRuleStatus("ruleTTT2")
s.Require().NoError(e)
fmt.Println(metrics)
return metrics["source_conn1/ttt_0_connection_status"] == 1.0
})
s.Require().True(r)
})
s.Run("stop broker, check status update", func() {
err := server.Close()
tcp.Close(nil)
s.Require().NoError(err)
// Assert rule1 metrics
r := TryAssert(10, ConstantInterval, func() bool {
metrics, e := client.GetRuleStatus("ruleTTT1")
s.Require().NoError(e)
fmt.Println(metrics)
return metrics["source_conn1/ttt_0_connection_status"] == 0.0
})
s.Require().True(r)
// Assert rule1 metrics
r = TryAssert(10, ConstantInterval, func() bool {
metrics, e := client.GetRuleStatus("ruleTTT2")
s.Require().NoError(e)
fmt.Println(metrics)
return metrics["source_conn1/ttt_0_connection_status"] == 0.0
})
s.Require().True(r)
// Assert connection status
r = TryAssert(10, ConstantInterval, func() bool {
get, e := client.Get("connections/conn1")
s.Require().NoError(e)
resultMap, e := GetResponseResultMap(get)
fmt.Println(resultMap)
s.Require().NoError(e)
return resultMap["status"] == "connecting"
})
s.Require().True(r)
})
s.Run("restart broker, check status update", func() {
// Create the new MQTT Server.
server = mqtt.New(nil)
// Allow all connections.
_ = server.AddHook(new(auth.AllowHook), nil)
// Create a TCP listener on a standard port.
tcp = listeners.NewTCP(listeners.Config{ID: "testcon", Address: ":3883"})
err := server.AddListener(tcp)
s.Require().NoError(err)
go func() {
err = server.Serve()
fmt.Println(err)
}()
fmt.Println(tcp.Address())
// Assert rule2 metrics
r := TryAssert(10, time.Second, func() bool {
metrics, e := client.GetRuleStatus("ruleTTT2")
s.Require().NoError(e)
fmt.Println(metrics)
return metrics["source_conn1/ttt_0_connection_status"] == 1.0
})
s.Require().True(r)
// Assert connection status
r = TryAssert(10, time.Second, func() bool {
get, e := client.Get("connections/conn1")
s.Require().NoError(e)
resultMap, e := GetResponseResultMap(get)
fmt.Println(resultMap)
s.Require().NoError(e)
return resultMap["status"] == "connected"
})
s.Require().True(r)
// Assert rule1 metrics
r = TryAssert(10, time.Second, func() bool {
metrics, e := client.GetRuleStatus("ruleTTT1")
s.Require().NoError(e)
fmt.Println(metrics)
return metrics["source_conn1/ttt_0_connection_status"] == 1.0
})
s.Require().True(r)
})
s.Run("clean", func() {
res, e := client.Delete("rules/ruleTTT1")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("rules/ruleTTT2")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("streams/tttStream")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

r := TryAssert(10, ConstantInterval, func() bool {
res, e = client.Delete("connections/conn1")
s.NoError(e)
return res.StatusCode == http.StatusOK
})
s.Require().True(r)
})
}
27 changes: 27 additions & 0 deletions fvt/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"net/http"
"net/url"
"time"
)

const ContentTypeJson = "application/json"
Expand Down Expand Up @@ -113,10 +114,36 @@ func GetResponseResultMap(resp *http.Response) (result map[string]any, err error
fmt.Println(err)
return
}
fmt.Println(string(body))
err = json.Unmarshal(body, &result)
if err != nil {
fmt.Println(err)
return
}
return
}

func (sdk *SDK) CreateConf(confpath string, conf map[string]any) (resp *http.Response, err error) {
cc, err := json.Marshal(conf)
if err != nil {
fmt.Println(err)
return nil, err
}
req, err := http.NewRequest(http.MethodPut, sdk.baseUrl.JoinPath("metadata", confpath).String(), bytes.NewBuffer(cc))
if err != nil {
fmt.Println(err)
return
}
return sdk.httpClient.Do(req)
}

func TryAssert(count int, interval time.Duration, tryFunc func() bool) bool {
for count > 0 {
time.Sleep(interval)
if tryFunc() {
return true
}
count--
}
return false
}
2 changes: 1 addition & 1 deletion internal/server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
r.HandleFunc("/connections", connectionsHandler).Methods(http.MethodGet, http.MethodPost)
r.HandleFunc("/connection/{id}", connectionHandler).Methods(http.MethodGet, http.MethodDelete)
r.HandleFunc("/connections/{id}", connectionHandler).Methods(http.MethodGet, http.MethodDelete)
r.HandleFunc("/ruletest", testRuleHandler).Methods(http.MethodPost)
r.HandleFunc("/ruletest/{name}/start", testRuleStartHandler).Methods(http.MethodPost)
r.HandleFunc("/ruletest/{name}", testRuleStopHandler).Methods(http.MethodDelete)
Expand Down
Loading
Loading