diff --git a/docs/en_US/api/restapi/connection.md b/docs/en_US/api/restapi/connection.md index 3d6ed5c480..1dda4e980e 100644 --- a/docs/en_US/api/restapi/connection.md +++ b/docs/en_US/api/restapi/connection.md @@ -30,7 +30,7 @@ Return all connections' information and status. ### Get a single connection status ```shell -GET http://localhost:9081/connection/{id} +GET http://localhost:9081/connections/{id} ``` ### Delete a single connection @@ -38,7 +38,7 @@ GET http://localhost:9081/connection/{id} When deleting a connection, it will check whether there are rules using the connection. If there are rules using the connection, the connection cannot be deleted. ```shell -DELETE http://localhost:9081/connection/{id} +DELETE http://localhost:9081/connections/{id} ``` ## Connectivity check diff --git a/docs/zh_CN/api/restapi/connection.md b/docs/zh_CN/api/restapi/connection.md index 4def1f4e7c..2e046f4249 100644 --- a/docs/zh_CN/api/restapi/connection.md +++ b/docs/zh_CN/api/restapi/connection.md @@ -30,7 +30,7 @@ GET http://localhost:9081/connections ### 获取单个连接状态 ```shell -GET http://localhost:9081/connection/{id} +GET http://localhost:9081/connections/{id} ``` ### 删除单个连接 @@ -38,7 +38,7 @@ GET http://localhost:9081/connection/{id} 删除连接时会是否有规则正在使用连接,如果存在规则正在使用连接,那么连接将无法被删除。 ```shell -DELETE http://localhost:9081/connection/{id} +DELETE http://localhost:9081/connections/{id} ``` ## 连通性检查 diff --git a/extensions/impl/influx/influx.go b/extensions/impl/influx/influx.go index ac35cf355f..caf08b1291 100644 --- a/extensions/impl/influx/influx.go +++ b/extensions/impl/influx/influx.go @@ -93,7 +93,7 @@ func (m *influxSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) if err != nil { sch(api.ConnectionDisconnected, err.Error()) } else { - sch(api.ConnectionConnecting, "") + sch(api.ConnectionConnected, "") } }() m.cli, err = client.NewHTTPClient(client.HTTPConfig{ diff --git a/extensions/impl/influx2/influx2.go b/extensions/impl/influx2/influx2.go index fba53fa872..27f22f38a0 100644 --- a/extensions/impl/influx2/influx2.go +++ b/extensions/impl/influx2/influx2.go @@ -142,7 +142,7 @@ func (m *influxSink2) Connect(ctx api.StreamContext, sch api.StatusChangeHandler if err != nil { sch(api.ConnectionDisconnected, err.Error()) } else { - sch(api.ConnectionConnecting, "") + sch(api.ConnectionConnected, "") } }() m.cli = client.NewClientWithOptions(m.conf.Addr, m.conf.Token, options) diff --git a/extensions/impl/kafka/sink.go b/extensions/impl/kafka/sink.go index 5e25da48de..868485a097 100644 --- a/extensions/impl/kafka/sink.go +++ b/extensions/impl/kafka/sink.go @@ -136,7 +136,7 @@ func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) if err != nil { sch(api.ConnectionDisconnected, err.Error()) } else { - sch(api.ConnectionConnecting, "") + sch(api.ConnectionConnected, "") } return err } diff --git a/extensions/impl/kafka/source.go b/extensions/impl/kafka/source.go index a8bc852915..99956cf50b 100644 --- a/extensions/impl/kafka/source.go +++ b/extensions/impl/kafka/source.go @@ -149,7 +149,7 @@ func (k *KafkaSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler if err != nil { sch(api.ConnectionDisconnected, err.Error()) } else { - sch(api.ConnectionConnecting, "") + sch(api.ConnectionConnected, "") } return nil } diff --git a/extensions/impl/zmq/sink.go b/extensions/impl/zmq/sink.go index 9318c79113..9514ede244 100644 --- a/extensions/impl/zmq/sink.go +++ b/extensions/impl/zmq/sink.go @@ -44,7 +44,7 @@ func (m *zmqSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) (e if err != nil { sch(api.ConnectionDisconnected, err.Error()) } else { - sch(api.ConnectionConnecting, "") + sch(api.ConnectionConnected, "") } }() m.publisher, err = zmq.NewSocket(zmq.PUB) diff --git a/extensions/impl/zmq/source.go b/extensions/impl/zmq/source.go index d4b799ea76..fb1e48b717 100644 --- a/extensions/impl/zmq/source.go +++ b/extensions/impl/zmq/source.go @@ -49,7 +49,7 @@ func (s *zmqSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) if err != nil { sch(api.ConnectionDisconnected, err.Error()) } else { - sch(api.ConnectionConnecting, "") + sch(api.ConnectionConnected, "") } }() // Create a new ZeroMQ context diff --git a/fvt/conn_test.go b/fvt/conn_test.go new file mode 100644 index 0000000000..7dd88ce770 --- /dev/null +++ b/fvt/conn_test.go @@ -0,0 +1,253 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fvt + +import ( + "fmt" + "net/http" + "testing" + "time" + + mqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/hooks/auth" + "github.com/mochi-mqtt/server/v2/listeners" + "github.com/stretchr/testify/suite" +) + +type ConnectionTestSuite struct { + suite.Suite +} + +func TestConnectionTestSuite(t *testing.T) { + suite.Run(t, new(ConnectionTestSuite)) +} + +func (s *ConnectionTestSuite) TestConnStatus() { + // Connect when broker is not started + s.Run("create rule when broker is not started", func() { + // create connection + connStr := `{ + "id": "conn1", + "typ":"mqtt", + "props": { + "server": "tcp://127.0.0.1:3883" + } + }` + resp, err := client.Post("connections", connStr) + s.Require().NoError(err) + fmt.Println(GetResponseText(resp)) + s.Require().Equal(http.StatusCreated, resp.StatusCode) + conf := map[string]any{ + "connectionSelector": "conn1", + } + resp, err = client.CreateConf("sources/mqtt/confKeys/ttt", conf) + s.Require().NoError(err) + s.Require().Equal(http.StatusOK, resp.StatusCode) + + streamSql := `{"sql": "create stream tttStream () WITH (TYPE=\"mqtt\", DATASOURCE=\"ttt\", FORMAT=\"json\", CONF_KEY=\"ttt\", SHARED=\"true\")"}` + resp, err = client.CreateStream(streamSql) + s.Require().NoError(err) + s.T().Log(GetResponseText(resp)) + s.Require().Equal(http.StatusCreated, resp.StatusCode) + + ruleSql := `{ + "id": "ruleTTT1", + "sql": "SELECT * FROM tttStream", + "actions": [ + { + "nop": { + } + } + ] + }` + resp, err = client.CreateRule(ruleSql) + s.Require().NoError(err) + s.T().Log(GetResponseText(resp)) + s.Require().Equal(http.StatusCreated, resp.StatusCode) + // Assert connection status + r := TryAssert(10, ConstantInterval, func() bool { + get, e := client.Get("connections/conn1") + s.Require().NoError(e) + resultMap, e := GetResponseResultMap(get) + fmt.Println(resultMap) + s.Require().NoError(e) + return resultMap["status"] == "disconnected" + }) + s.Require().True(r) + // Assert rule metrics + r = TryAssert(10, ConstantInterval, func() bool { + metrics, e := client.GetRuleStatus("ruleTTT1") + s.Require().NoError(e) + fmt.Println(metrics) + return metrics["source_conn1/ttt_0_connection_status"] == -1.0 + }) + s.Require().True(r) + }) + var ( + server *mqtt.Server + tcp *listeners.TCP + ) + s.Run("start broker, automatically connected", func() { + // Create the new MQTT Server. + server = mqtt.New(nil) + // Allow all connections. + _ = server.AddHook(new(auth.AllowHook), nil) + // Create a TCP listener on a standard port. + tcp = listeners.NewTCP(listeners.Config{ID: "testcon", Address: ":3883"}) + err := server.AddListener(tcp) + s.Require().NoError(err) + go func() { + err = server.Serve() + fmt.Println(err) + }() + fmt.Println(tcp.Address()) + // Assert rule metrics + r := TryAssert(10, ConstantInterval, func() bool { + metrics, e := client.GetRuleStatus("ruleTTT1") + s.Require().NoError(e) + fmt.Println(metrics) + return metrics["source_conn1/ttt_0_connection_status"] == 1.0 + }) + s.Require().True(r) + // Assert connection status + r = TryAssert(10, ConstantInterval, func() bool { + get, e := client.Get("connections/conn1") + s.Require().NoError(e) + resultMap, e := GetResponseResultMap(get) + fmt.Println(resultMap) + s.Require().NoError(e) + return resultMap["status"] == "connected" + }) + s.Require().True(r) + }) + s.Run("attach rule, get status", func() { + ruleSql := `{ + "id": "ruleTTT2", + "sql": "SELECT * FROM tttStream", + "actions": [ + { + "mqtt": { + "connectionSelector": "conn1", + "topic":"result" + } + } + ] + }` + resp, err := client.CreateRule(ruleSql) + s.Require().NoError(err) + s.T().Log(GetResponseText(resp)) + s.Require().Equal(http.StatusCreated, resp.StatusCode) + // Assert rule2 metrics + r := TryAssert(10, ConstantInterval, func() bool { + metrics, e := client.GetRuleStatus("ruleTTT2") + s.Require().NoError(e) + fmt.Println(metrics) + return metrics["source_conn1/ttt_0_connection_status"] == 1.0 + }) + s.Require().True(r) + }) + s.Run("stop broker, check status update", func() { + err := server.Close() + tcp.Close(nil) + s.Require().NoError(err) + // Assert rule1 metrics + r := TryAssert(10, ConstantInterval, func() bool { + metrics, e := client.GetRuleStatus("ruleTTT1") + s.Require().NoError(e) + fmt.Println(metrics) + return metrics["source_conn1/ttt_0_connection_status"] == 0.0 + }) + s.Require().True(r) + // Assert rule1 metrics + r = TryAssert(10, ConstantInterval, func() bool { + metrics, e := client.GetRuleStatus("ruleTTT2") + s.Require().NoError(e) + fmt.Println(metrics) + return metrics["source_conn1/ttt_0_connection_status"] == 0.0 + }) + s.Require().True(r) + // Assert connection status + r = TryAssert(10, ConstantInterval, func() bool { + get, e := client.Get("connections/conn1") + s.Require().NoError(e) + resultMap, e := GetResponseResultMap(get) + fmt.Println(resultMap) + s.Require().NoError(e) + return resultMap["status"] == "connecting" + }) + s.Require().True(r) + }) + s.Run("restart broker, check status update", func() { + // Create the new MQTT Server. + server = mqtt.New(nil) + // Allow all connections. + _ = server.AddHook(new(auth.AllowHook), nil) + // Create a TCP listener on a standard port. + tcp = listeners.NewTCP(listeners.Config{ID: "testcon", Address: ":3883"}) + err := server.AddListener(tcp) + s.Require().NoError(err) + go func() { + err = server.Serve() + fmt.Println(err) + }() + fmt.Println(tcp.Address()) + // Assert rule2 metrics + r := TryAssert(10, time.Second, func() bool { + metrics, e := client.GetRuleStatus("ruleTTT2") + s.Require().NoError(e) + fmt.Println(metrics) + return metrics["source_conn1/ttt_0_connection_status"] == 1.0 + }) + s.Require().True(r) + // Assert connection status + r = TryAssert(10, time.Second, func() bool { + get, e := client.Get("connections/conn1") + s.Require().NoError(e) + resultMap, e := GetResponseResultMap(get) + fmt.Println(resultMap) + s.Require().NoError(e) + return resultMap["status"] == "connected" + }) + s.Require().True(r) + // Assert rule1 metrics + r = TryAssert(10, time.Second, func() bool { + metrics, e := client.GetRuleStatus("ruleTTT1") + s.Require().NoError(e) + fmt.Println(metrics) + return metrics["source_conn1/ttt_0_connection_status"] == 1.0 + }) + s.Require().True(r) + }) + s.Run("clean", func() { + res, e := client.Delete("rules/ruleTTT1") + s.NoError(e) + s.Equal(http.StatusOK, res.StatusCode) + + res, e = client.Delete("rules/ruleTTT2") + s.NoError(e) + s.Equal(http.StatusOK, res.StatusCode) + + res, e = client.Delete("streams/tttStream") + s.NoError(e) + s.Equal(http.StatusOK, res.StatusCode) + + r := TryAssert(10, ConstantInterval, func() bool { + res, e = client.Delete("connections/conn1") + s.NoError(e) + return res.StatusCode == http.StatusOK + }) + s.Require().True(r) + }) +} diff --git a/fvt/sdk.go b/fvt/sdk.go index 1569596ef3..d71f0e9ec0 100644 --- a/fvt/sdk.go +++ b/fvt/sdk.go @@ -21,6 +21,7 @@ import ( "io" "net/http" "net/url" + "time" ) const ContentTypeJson = "application/json" @@ -113,6 +114,7 @@ func GetResponseResultMap(resp *http.Response) (result map[string]any, err error fmt.Println(err) return } + fmt.Println(string(body)) err = json.Unmarshal(body, &result) if err != nil { fmt.Println(err) @@ -120,3 +122,28 @@ func GetResponseResultMap(resp *http.Response) (result map[string]any, err error } return } + +func (sdk *SDK) CreateConf(confpath string, conf map[string]any) (resp *http.Response, err error) { + cc, err := json.Marshal(conf) + if err != nil { + fmt.Println(err) + return nil, err + } + req, err := http.NewRequest(http.MethodPut, sdk.baseUrl.JoinPath("metadata", confpath).String(), bytes.NewBuffer(cc)) + if err != nil { + fmt.Println(err) + return + } + return sdk.httpClient.Do(req) +} + +func TryAssert(count int, interval time.Duration, tryFunc func() bool) bool { + for count > 0 { + time.Sleep(interval) + if tryFunc() { + return true + } + count-- + } + return false +} diff --git a/internal/server/rest.go b/internal/server/rest.go index 21d080e47e..1c2f3e81d8 100644 --- a/internal/server/rest.go +++ b/internal/server/rest.go @@ -201,7 +201,7 @@ func createRestServer(ip string, port int, needToken bool) *http.Server { r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost) r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet) r.HandleFunc("/connections", connectionsHandler).Methods(http.MethodGet, http.MethodPost) - r.HandleFunc("/connection/{id}", connectionHandler).Methods(http.MethodGet, http.MethodDelete) + r.HandleFunc("/connections/{id}", connectionHandler).Methods(http.MethodGet, http.MethodDelete) r.HandleFunc("/ruletest", testRuleHandler).Methods(http.MethodPost) r.HandleFunc("/ruletest/{name}/start", testRuleStartHandler).Methods(http.MethodPost) r.HandleFunc("/ruletest/{name}", testRuleStopHandler).Methods(http.MethodDelete) diff --git a/internal/topo/subtopo.go b/internal/topo/subtopo.go index 24fd9de60f..38d50f1e5b 100644 --- a/internal/topo/subtopo.go +++ b/internal/topo/subtopo.go @@ -183,6 +183,9 @@ func (s *SrcSubTopo) Close(ctx api.StreamContext, ruleId string, runId int) { if s.cancel != nil { s.cancel() } + if ss, ok := s.source.(*SrcSubTopo); ok { + ss.Close(ctx, "$$subtopo_"+s.name, runId) + } RemoveSubTopo(s.name) } for _, op := range s.ops { diff --git a/pkg/connection/conn.go b/pkg/connection/conn.go index 6ab56bd5db..74f936dc19 100644 --- a/pkg/connection/conn.go +++ b/pkg/connection/conn.go @@ -97,10 +97,32 @@ func (meta *Meta) NotifyStatus(status string, s string) { }) } +func (meta *Meta) AddRef(refId string, sc api.StatusChangeHandler) { + s, e := meta.GetStatus() + if sc != nil { + sc(s, e) + } + meta.ref.Store(refId, sc) + meta.refCount.Add(1) +} + +func (meta *Meta) DeRef(refId string) { + meta.ref.Delete(refId) + meta.refCount.Add(-1) +} + func (meta *Meta) GetRefCount() int { return int(meta.refCount.Load()) } +func (meta *Meta) GetRefNames() (result []string) { + meta.ref.Range(func(key, _ any) bool { + result = append(result, key.(string)) + return true + }) + return +} + func (meta *Meta) GetStatus() (s string, e string) { ee := meta.lastError.Load() if ee != nil { diff --git a/pkg/connection/pool.go b/pkg/connection/pool.go index e307bf060a..5048c6815b 100644 --- a/pkg/connection/pool.go +++ b/pkg/connection/pool.go @@ -196,8 +196,8 @@ func DropNameConnection(ctx api.StreamContext, selId string) error { if !ok { return nil } - if meta.refCount.Load() > 0 { - return fmt.Errorf("connection %s can't be dropped due to reference", selId) + if meta.GetRefCount() > 0 { + return fmt.Errorf("connection %s can't be dropped due to references %v", selId, meta.GetRefNames()) } err := dropConnectionStore(meta.Typ, selId) if err != nil { @@ -254,19 +254,18 @@ func attachConnection(conId string, refId string, sc api.StatusChangeHandler) (* if !ok { return nil, fmt.Errorf("connection %s not existed", conId) } - meta.ref.Store(refId, sc) - meta.refCount.Add(1) + meta.AddRef(refId, sc) return meta.cw, nil } func detachConnection(ctx api.StreamContext, conId string) error { meta, ok := globalConnectionManager.connectionPool[conId] if !ok { + conf.Log.Infof("detachConnection not found:%v", conId) return nil } refId := extractRefId(ctx) - meta.ref.Delete(refId) - meta.refCount.Add(-1) + meta.DeRef(refId) globalConnectionManager.connectionPool[conId] = meta conf.Log.Infof("detachConnection remove conn:%v,ref:%v", conId, refId) if !meta.Named && meta.refCount.Load() == 0 {