From aea9b433d4e067709d4e04b240d7e03c1f26db15 Mon Sep 17 00:00:00 2001 From: stulzq Date: Mon, 28 Feb 2022 23:17:04 +0800 Subject: [PATCH 1/4] add in-memory sequencer --- cmd/layotto/main.go | 4 + cmd/layotto_multiple_api/main.go | 4 + components/go.mod | 5 +- components/go.sum | 5 -- .../in-memory/in_memory_sequencer.go | 45 ++++++++++++ .../in-memory/in_memory_sequencer_test.go | 73 +++++++++++++++++++ configs/config_in_memory.json | 5 ++ demo/sequencer/in-memory/client.go | 38 ++++++++++ .../zh/component_specs/sequencer/in-memory.md | 25 +++++++ 9 files changed, 197 insertions(+), 7 deletions(-) create mode 100644 components/sequencer/in-memory/in_memory_sequencer.go create mode 100644 components/sequencer/in-memory/in_memory_sequencer_test.go create mode 100644 demo/sequencer/in-memory/client.go create mode 100644 docs/zh/component_specs/sequencer/in-memory.md diff --git a/cmd/layotto/main.go b/cmd/layotto/main.go index 5968f460e6..d6acab399d 100644 --- a/cmd/layotto/main.go +++ b/cmd/layotto/main.go @@ -30,6 +30,7 @@ import ( "mosn.io/api" "mosn.io/layotto/components/file/s3/tencentcloud" component_actuators "mosn.io/layotto/components/pkg/actuators" + in_memory "mosn.io/layotto/components/sequencer/in-memory" "mosn.io/layotto/diagnostics" "mosn.io/layotto/pkg/grpc/default_api" secretstores_loader "mosn.io/layotto/pkg/runtime/secretstores" @@ -369,6 +370,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp runtime_sequencer.NewFactory("mongo", func() sequencer.Store { return sequencer_mongo.NewMongoSequencer(log.DefaultLogger) }), + runtime_sequencer.NewFactory("in-memory", func() sequencer.Store { + return in_memory.NewInMemorySequencer() + }), ), // secretstores runtime.WithSecretStoresFactory( diff --git a/cmd/layotto_multiple_api/main.go b/cmd/layotto_multiple_api/main.go index 66596a1c71..e89f1d8c4a 100644 --- a/cmd/layotto_multiple_api/main.go +++ b/cmd/layotto_multiple_api/main.go @@ -22,6 +22,7 @@ import ( helloworld_api "mosn.io/layotto/cmd/layotto_multiple_api/helloworld" "mosn.io/layotto/components/file/s3/tencentcloud" component_actuators "mosn.io/layotto/components/pkg/actuators" + in_memory "mosn.io/layotto/components/sequencer/in-memory" l8_grpc "mosn.io/layotto/pkg/grpc" "mosn.io/layotto/pkg/grpc/dapr" "mosn.io/layotto/pkg/grpc/default_api" @@ -360,6 +361,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp runtime_sequencer.NewFactory("zookeeper", func() sequencer.Store { return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger) }), + runtime_sequencer.NewFactory("in-memory", func() sequencer.Store { + return in_memory.NewInMemorySequencer() + }), )) return server, err } diff --git a/components/go.mod b/components/go.mod index d5120a2ccd..0903484f1e 100644 --- a/components/go.mod +++ b/components/go.mod @@ -18,11 +18,11 @@ require ( github.com/hashicorp/consul/api v1.3.0 github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/minio/minio-go/v7 v7.0.15 - github.com/mitchellh/mapstructure v1.4.1 + github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/pkg/errors v0.9.1 github.com/spf13/afero v1.2.2 // indirect github.com/stretchr/testify v1.7.0 - github.com/tencentyun/cos-go-sdk-v5 v0.7.33 // indirect + github.com/tencentyun/cos-go-sdk-v5 v0.7.33 github.com/valyala/fasthttp v1.26.0 github.com/zouyx/agollo/v4 v4.0.7 go.beyondstorage.io/services/hdfs v0.3.0 @@ -31,6 +31,7 @@ require ( go.etcd.io/etcd/client/v3 v3.5.0 go.etcd.io/etcd/server/v3 v3.5.0 go.mongodb.org/mongo-driver v1.8.0 + go.uber.org/atomic v1.7.0 golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 // indirect google.golang.org/grpc v1.38.0 diff --git a/components/go.sum b/components/go.sum index eb3d2a40b7..7479651907 100644 --- a/components/go.sum +++ b/components/go.sum @@ -506,13 +506,8 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.1 h1:wXr2uRxZTJXHLly6qhJabee5JqIhTRoLBhDOA74hDEQ= github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/klauspost/compress v1.13.5/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s= github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4= diff --git a/components/sequencer/in-memory/in_memory_sequencer.go b/components/sequencer/in-memory/in_memory_sequencer.go new file mode 100644 index 0000000000..f8742d5820 --- /dev/null +++ b/components/sequencer/in-memory/in_memory_sequencer.go @@ -0,0 +1,45 @@ +package in_memory + +import ( + "go.uber.org/atomic" + "mosn.io/layotto/components/sequencer" + "sync" +) + +type InMemorySequencer struct { + data *sync.Map +} + +func NewInMemorySequencer() *InMemorySequencer { + return &InMemorySequencer{ + data: &sync.Map{}, + } +} + +func (s *InMemorySequencer) Init(_ sequencer.Configuration) error { + return nil +} + +func (s *InMemorySequencer) GetNextId(req *sequencer.GetNextIdRequest) (*sequencer.GetNextIdResponse, error) { + seed, ok := s.data.Load(req.Key) + if !ok { + seed, _ = s.data.LoadOrStore(req.Key, &atomic.Int64{}) + } + + nextId := seed.(*atomic.Int64).Inc() + return &sequencer.GetNextIdResponse{NextId: nextId}, nil + +} + +func (s *InMemorySequencer) GetSegment(req *sequencer.GetSegmentRequest) (bool, *sequencer.GetSegmentResponse, error) { + seed, ok := s.data.Load(req.Key) + if !ok { + seed, _ = s.data.LoadOrStore(req.Key, &atomic.Int64{}) + } + + res := seed.(*atomic.Int64).Add(int64(req.Size)) + return true, &sequencer.GetSegmentResponse{ + From: res - int64(req.Size) + 1, + To: res, + }, nil +} diff --git a/components/sequencer/in-memory/in_memory_sequencer_test.go b/components/sequencer/in-memory/in_memory_sequencer_test.go new file mode 100644 index 0000000000..238217d07f --- /dev/null +++ b/components/sequencer/in-memory/in_memory_sequencer_test.go @@ -0,0 +1,73 @@ +package in_memory + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "mosn.io/layotto/components/sequencer" +) + +func TestNew(t *testing.T) { + s := NewInMemorySequencer() + + assert.NotNil(t, s) +} + +func TestInit(t *testing.T) { + s := NewInMemorySequencer() + assert.NotNil(t, s) + + err := s.Init(sequencer.Configuration{}) + assert.NoError(t, err) +} + +func TestGetNextId(t *testing.T) { + s := NewInMemorySequencer() + assert.NotNil(t, s) + + err := s.Init(sequencer.Configuration{}) + assert.NoError(t, err) + + var resp *sequencer.GetNextIdResponse + resp, err = s.GetNextId(&sequencer.GetNextIdRequest{Key: "666"}) + assert.NoError(t, err) + assert.Equal(t, int64(1), resp.NextId) + + resp, err = s.GetNextId(&sequencer.GetNextIdRequest{Key: "666"}) + assert.NoError(t, err) + assert.Equal(t, int64(2), resp.NextId) + + resp, err = s.GetNextId(&sequencer.GetNextIdRequest{Key: "777"}) + assert.NoError(t, err) + assert.Equal(t, int64(1), resp.NextId) +} + +func TestGetSegment(t *testing.T) { + s := NewInMemorySequencer() + assert.NotNil(t, s) + + err := s.Init(sequencer.Configuration{}) + assert.NoError(t, err) + + var resp *sequencer.GetSegmentResponse + var res bool + res, resp, err = s.GetSegment(&sequencer.GetSegmentRequest{Key: "666", Size: 5}) + assert.NoError(t, err) + assert.Equal(t, int64(1), resp.From) + assert.Equal(t, int64(5), resp.To) + assert.True(t, res) + + res, resp, err = s.GetSegment(&sequencer.GetSegmentRequest{Key: "666", Size: 5}) + assert.NoError(t, err) + assert.Equal(t, int64(6), resp.From) + assert.Equal(t, int64(10), resp.To) + assert.True(t, res) + + res, resp, err = s.GetSegment(&sequencer.GetSegmentRequest{Key: "777", Size: 5}) + assert.NoError(t, err) + assert.Equal(t, int64(1), resp.From) + assert.Equal(t, int64(5), resp.To) + assert.True(t, res) + +} diff --git a/configs/config_in_memory.json b/configs/config_in_memory.json index d1a01c7a74..7bcee2a06e 100644 --- a/configs/config_in_memory.json +++ b/configs/config_in_memory.json @@ -39,6 +39,11 @@ } } }, + "sequencer": { + "in-memory": { + "metadata": {} + } + }, "app": { "app_id": "app1", "grpc_callback_port": 9999 diff --git a/demo/sequencer/in-memory/client.go b/demo/sequencer/in-memory/client.go new file mode 100644 index 0000000000..13b8634532 --- /dev/null +++ b/demo/sequencer/in-memory/client.go @@ -0,0 +1,38 @@ +package main + +import ( + "context" + "fmt" + client "mosn.io/layotto/sdk/go-sdk/client" + runtimev1pb "mosn.io/layotto/spec/proto/runtime/v1" +) + +const ( + key = "key666" + storeName = "in-memory" +) + +func main() { + + cli, err := client.NewClient() + if err != nil { + panic(err) + } + defer cli.Close() + ctx := context.Background() + fmt.Printf("Try to get next id.Key:%s \n", key) + for i := 0; i < 10; i++ { + id, err := cli.GetNextId(ctx, &runtimev1pb.GetNextIdRequest{ + StoreName: storeName, + Key: key, + Options: nil, + Metadata: nil, + }) + if err != nil { + fmt.Print(err) + return + } + fmt.Printf("Next id:%v \n", id) + } + fmt.Println("Demo success!") +} diff --git a/docs/zh/component_specs/sequencer/in-memory.md b/docs/zh/component_specs/sequencer/in-memory.md new file mode 100644 index 0000000000..f5d430310f --- /dev/null +++ b/docs/zh/component_specs/sequencer/in-memory.md @@ -0,0 +1,25 @@ +# In-Memory + +## 配置项说明 + +直接使用配置:configs/config_in_memory.json + + +## 启动 layotto + +````shell +cd ${projectpath}/cmd/layotto +go build +```` +编译成功后执行: +````shell +./layotto start -c ../../configs/config_in_memory.json +```` + +## 运行 Demo + +````shell +cd ${projectpath}/demo/sequencer/in-memory/ + go build -o client + ./client +```` \ No newline at end of file From f9cb3453ea6716a5736b9460747635d278b843f6 Mon Sep 17 00:00:00 2001 From: stulzq Date: Mon, 28 Feb 2022 23:23:18 +0800 Subject: [PATCH 2/4] add license header --- .../sequencer/in-memory/in_memory_sequencer.go | 16 ++++++++++++++++ .../in-memory/in_memory_sequencer_test.go | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/components/sequencer/in-memory/in_memory_sequencer.go b/components/sequencer/in-memory/in_memory_sequencer.go index f8742d5820..0883e92268 100644 --- a/components/sequencer/in-memory/in_memory_sequencer.go +++ b/components/sequencer/in-memory/in_memory_sequencer.go @@ -1,3 +1,19 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package in_memory import ( diff --git a/components/sequencer/in-memory/in_memory_sequencer_test.go b/components/sequencer/in-memory/in_memory_sequencer_test.go index 238217d07f..dc62b7dd35 100644 --- a/components/sequencer/in-memory/in_memory_sequencer_test.go +++ b/components/sequencer/in-memory/in_memory_sequencer_test.go @@ -1,3 +1,19 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package in_memory import ( From ec3510fd44734868acb24841e2207f6348686203 Mon Sep 17 00:00:00 2001 From: stulzq Date: Tue, 1 Mar 2022 01:32:07 +0800 Subject: [PATCH 3/4] fmt code --- cmd/layotto_multiple_api/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/layotto_multiple_api/main.go b/cmd/layotto_multiple_api/main.go index e89f1d8c4a..88514b1652 100644 --- a/cmd/layotto_multiple_api/main.go +++ b/cmd/layotto_multiple_api/main.go @@ -22,7 +22,6 @@ import ( helloworld_api "mosn.io/layotto/cmd/layotto_multiple_api/helloworld" "mosn.io/layotto/components/file/s3/tencentcloud" component_actuators "mosn.io/layotto/components/pkg/actuators" - in_memory "mosn.io/layotto/components/sequencer/in-memory" l8_grpc "mosn.io/layotto/pkg/grpc" "mosn.io/layotto/pkg/grpc/dapr" "mosn.io/layotto/pkg/grpc/default_api" @@ -110,6 +109,7 @@ import ( // Sequencer sequencer_etcd "mosn.io/layotto/components/sequencer/etcd" + sequencer_inmemory "mosn.io/layotto/components/sequencer/in-memory" sequencer_redis "mosn.io/layotto/components/sequencer/redis" sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper" @@ -362,7 +362,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger) }), runtime_sequencer.NewFactory("in-memory", func() sequencer.Store { - return in_memory.NewInMemorySequencer() + return sequencer_inmemory.NewInMemorySequencer() }), )) return server, err From ccd1eac88c58593ae0efd19b19128fb9026d7e0f Mon Sep 17 00:00:00 2001 From: stulzq Date: Tue, 1 Mar 2022 11:44:18 +0800 Subject: [PATCH 4/4] fmt code --- cmd/layotto/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/layotto/main.go b/cmd/layotto/main.go index d6acab399d..96e1590790 100644 --- a/cmd/layotto/main.go +++ b/cmd/layotto/main.go @@ -30,7 +30,6 @@ import ( "mosn.io/api" "mosn.io/layotto/components/file/s3/tencentcloud" component_actuators "mosn.io/layotto/components/pkg/actuators" - in_memory "mosn.io/layotto/components/sequencer/in-memory" "mosn.io/layotto/diagnostics" "mosn.io/layotto/pkg/grpc/default_api" secretstores_loader "mosn.io/layotto/pkg/runtime/secretstores" @@ -117,6 +116,7 @@ import ( // Sequencer sequencer_etcd "mosn.io/layotto/components/sequencer/etcd" + sequencer_inmemory "mosn.io/layotto/components/sequencer/in-memory" sequencer_mongo "mosn.io/layotto/components/sequencer/mongo" sequencer_redis "mosn.io/layotto/components/sequencer/redis" sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper" @@ -371,7 +371,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp return sequencer_mongo.NewMongoSequencer(log.DefaultLogger) }), runtime_sequencer.NewFactory("in-memory", func() sequencer.Store { - return in_memory.NewInMemorySequencer() + return sequencer_inmemory.NewInMemorySequencer() }), ), // secretstores