Skip to content

Commit fe03f40

Browse files
authored
Merge branch 'main' into simple_java_client
2 parents dfda220 + 742c3e9 commit fe03f40

File tree

115 files changed

+5536
-949
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

115 files changed

+5536
-949
lines changed

README.md

+5-3
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,16 @@
99

1010
Layotto is an application runtime developed using Golang, which provides various distributed capabilities for applications, such as state management, configuration management, and event pub/sub capabilities to simplify application development.
1111

12-
Layotto uses the open source [MOSN](https://github.com/mosn/mosn) as the base, in addition to providing distributed capabilities, it also provides Service Mesh's ability to control traffic.
12+
Layotto is built on the open source data plane [MOSN](https://github.com/mosn/mosn) .In addition to providing distributed building blocks, Layotto can also serve as the data plane of Service Mesh and has the ability to control traffic.
1313

1414
## Motivation
1515

16-
Layotto aims to combine Runtime with Service Mesh into one sidecar. No matter which product you are using as the Service Mesh data plane (e.g. MOSN,Envoy or any other product), you can always attach Layotto to it and add Multi-Runtime capabilities without adding new sidecars.
16+
Layotto aims to combine [Multi-Runtime](https://www.infoq.com/articles/multi-runtime-microservice-architecture/) with Service Mesh into one sidecar. No matter which product you are using as the Service Mesh data plane (e.g. MOSN,Envoy or any other product), you can always attach Layotto to it and add Multi-Runtime capabilities without adding new sidecars.
1717

1818
For example, by adding Runtime capabilities to MOSN, a Layotto process can both [serve as the data plane of istio](https://mosn.io/layotto/#/en/start/istio/start.md) and provide various Runtime APIs (such as Configuration API, Pub/Sub API, etc.)
1919

20+
In addition, we were surprised to find that a sidecar can do much more than that. We are trying to make Layotto even the runtime container of FaaS (Function as a service) and [reloadable sdk](https://github.com/mosn/layotto/issues/166) with the magic power of [WebAssembly](https://en.wikipedia.org/wiki/WebAssembly) .
21+
2022
## Features
2123

2224
- Service Communication
@@ -32,7 +34,7 @@ For example, by adding Runtime capabilities to MOSN, a Layotto process can both
3234

3335
As shown in the architecture diagram below, Layotto uses the open source MOSN as the base to provide network layer management capabilities while providing distributed capabilities. The business logic can directly interact with Layotto through a lightweight SDK without paying attention to the specific back-end infrastructure.
3436

35-
Layotto provides sdk in various languages. The sdk interacts with Layotto through grpc. Application developers only need to specify their own infrastructure type through the configuration file [configure file](./configs/runtime_config.json) provided by Layotto. No coding changes are required, which greatly improves the portability of the program.
37+
Layotto provides sdks in various languages. The sdk interacts with Layotto through grpc. Application developers only need to specify their own infrastructure type through the configuration file [configure file](./configs/runtime_config.json) provided by Layotto. No coding changes are required, which greatly improves the portability of the program.
3638

3739
![Architecture](https://raw.githubusercontent.com/mosn/layotto/main/docs/img/runtime-architecture.png)
3840

cmd/layotto/main.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ import (
8888

8989
// Sequencer
9090
sequencer_etcd "mosn.io/layotto/components/sequencer/etcd"
91-
91+
sequencer_redis "mosn.io/layotto/components/sequencer/redis"
92+
sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper"
9293
// Actuator
9394
_ "mosn.io/layotto/pkg/actuator"
9495
"mosn.io/layotto/pkg/actuator/health"
@@ -263,6 +264,12 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
263264
runtime_sequencer.NewFactory("etcd", func() sequencer.Store {
264265
return sequencer_etcd.NewEtcdSequencer(log.DefaultLogger)
265266
}),
267+
runtime_sequencer.NewFactory("redis", func() sequencer.Store {
268+
return sequencer_redis.NewStandaloneRedisSequencer(log.DefaultLogger)
269+
}),
270+
runtime_sequencer.NewFactory("zookeeper", func() sequencer.Store {
271+
return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger)
272+
}),
266273
))
267274
// 4. check if unhealthy
268275
if err != nil {

components/lock/etcd/etcd_lock.go

+5-144
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,17 @@ package etcd
22

33
import (
44
"context"
5-
"crypto/tls"
6-
"crypto/x509"
7-
"errors"
85
"fmt"
9-
"io/ioutil"
10-
"strconv"
11-
"strings"
12-
"time"
13-
146
"go.etcd.io/etcd/client/v3"
7+
"mosn.io/layotto/components/pkg/utils"
158

169
"mosn.io/layotto/components/lock"
1710
"mosn.io/pkg/log"
1811
)
1912

20-
const (
21-
defaultDialTimeout = 5
22-
defaultKeyPrefix = "/layotto/"
23-
24-
prefixKey = "keyPrefixPath"
25-
usernameKey = "username"
26-
passwordKey = "password"
27-
dialTimeoutKey = "dialTimeout"
28-
endpointsKey = "endpoints"
29-
tlsCertPathKey = "tlsCert"
30-
tlsCertKeyPathKey = "tlsCertKey"
31-
tlsCaPathKey = "tlsCa"
32-
)
33-
3413
type EtcdLock struct {
3514
client *clientv3.Client
36-
metadata metadata
15+
metadata utils.EtcdMetadata
3716

3817
features []lock.Feature
3918
logger log.ErrorLogger
@@ -54,13 +33,13 @@ func NewEtcdLock(logger log.ErrorLogger) *EtcdLock {
5433

5534
func (e *EtcdLock) Init(metadata lock.Metadata) error {
5635
// 1. parse config
57-
m, err := parseEtcdMetadata(metadata)
36+
m, err := utils.ParseEtcdMetadata(metadata.Properties)
5837
if err != nil {
5938
return err
6039
}
6140
e.metadata = m
6241
// 2. construct client
63-
if e.client, err = e.newClient(m); err != nil {
42+
if e.client, err = utils.NewEtcdClient(m); err != nil {
6443
return err
6544
}
6645

@@ -134,130 +113,12 @@ func (e *EtcdLock) Close() error {
134113
return e.client.Close()
135114
}
136115

137-
func (e *EtcdLock) newClient(meta metadata) (*clientv3.Client, error) {
138-
139-
config := clientv3.Config{
140-
Endpoints: meta.endpoints,
141-
DialTimeout: time.Second * time.Duration(meta.dialTimeout),
142-
Username: meta.username,
143-
Password: meta.password,
144-
}
145-
146-
if meta.tlsCa != "" || meta.tlsCert != "" || meta.tlsCertKey != "" {
147-
//enable tls
148-
cert, err := tls.LoadX509KeyPair(meta.tlsCert, meta.tlsCertKey)
149-
if err != nil {
150-
return nil, fmt.Errorf("error reading tls certificate, cert: %s, certKey: %s, err: %s", meta.tlsCert, meta.tlsCertKey, err)
151-
}
152-
153-
caData, err := ioutil.ReadFile(meta.tlsCa)
154-
if err != nil {
155-
return nil, fmt.Errorf("error reading tls ca %s, err: %s", meta.tlsCa, err)
156-
}
157-
158-
pool := x509.NewCertPool()
159-
pool.AppendCertsFromPEM(caData)
160-
161-
tlsConfig := &tls.Config{
162-
Certificates: []tls.Certificate{cert},
163-
RootCAs: pool,
164-
}
165-
config.TLS = tlsConfig
166-
}
167-
168-
if client, err := clientv3.New(config); err != nil {
169-
return nil, err
170-
} else {
171-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(meta.dialTimeout))
172-
defer cancel()
173-
//ping
174-
_, err = client.Get(ctx, "ping")
175-
if err != nil {
176-
return nil, fmt.Errorf("etcd lock error: connect to etcd timeoout %s", meta.endpoints)
177-
}
178-
179-
return client, nil
180-
}
181-
}
182-
183116
func (e *EtcdLock) getKey(resourceId string) string {
184-
return fmt.Sprintf("%s%s", e.metadata.keyPrefix, resourceId)
117+
return fmt.Sprintf("%s%s", e.metadata.KeyPrefix, resourceId)
185118
}
186119

187120
func newInternalErrorUnlockResponse() *lock.UnlockResponse {
188121
return &lock.UnlockResponse{
189122
Status: lock.INTERNAL_ERROR,
190123
}
191124
}
192-
193-
func parseEtcdMetadata(meta lock.Metadata) (metadata, error) {
194-
m := metadata{}
195-
var err error
196-
197-
if val, ok := meta.Properties[endpointsKey]; ok && val != "" {
198-
m.endpoints = strings.Split(val, ";")
199-
} else {
200-
return m, errors.New("etcd lock error: missing endpoints address")
201-
}
202-
203-
if val, ok := meta.Properties[dialTimeoutKey]; ok && val != "" {
204-
if m.dialTimeout, err = strconv.Atoi(val); err != nil {
205-
return m, fmt.Errorf("etcd lock error: ncorrect dialTimeout value %s", val)
206-
}
207-
} else {
208-
m.dialTimeout = defaultDialTimeout
209-
}
210-
211-
if val, ok := meta.Properties[prefixKey]; ok && val != "" {
212-
m.keyPrefix = addPathSeparator(val)
213-
} else {
214-
m.keyPrefix = defaultKeyPrefix
215-
}
216-
217-
if val, ok := meta.Properties[usernameKey]; ok && val != "" {
218-
m.username = val
219-
}
220-
221-
if val, ok := meta.Properties[passwordKey]; ok && val != "" {
222-
m.password = val
223-
}
224-
225-
if val, ok := meta.Properties[tlsCaPathKey]; ok && val != "" {
226-
m.tlsCa = val
227-
}
228-
229-
if val, ok := meta.Properties[tlsCertPathKey]; ok && val != "" {
230-
m.tlsCert = val
231-
}
232-
233-
if val, ok := meta.Properties[tlsCertKeyPathKey]; ok && val != "" {
234-
m.tlsCertKey = val
235-
}
236-
237-
return m, nil
238-
}
239-
240-
func addPathSeparator(p string) string {
241-
if p == "" {
242-
return "/"
243-
}
244-
if p[0] != '/' {
245-
p = "/" + p
246-
}
247-
if p[len(p)-1] != '/' {
248-
p = p + "/"
249-
}
250-
return p
251-
}
252-
253-
type metadata struct {
254-
keyPrefix string
255-
dialTimeout int
256-
endpoints []string
257-
username string
258-
password string
259-
260-
tlsCa string
261-
tlsCert string
262-
tlsCertKey string
263-
}

components/lock/redis/standalone_redis_lock.go

+5-88
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,18 @@ package redis
22

33
import (
44
"context"
5-
"crypto/tls"
6-
"errors"
75
"fmt"
86
"github.com/go-redis/redis/v8"
97
"mosn.io/layotto/components/lock"
8+
"mosn.io/layotto/components/pkg/utils"
109
"mosn.io/pkg/log"
11-
"strconv"
1210
"time"
1311
)
1412

15-
const (
16-
host = "redisHost"
17-
password = "redisPassword"
18-
enableTLS = "enableTLS"
19-
maxRetries = "maxRetries"
20-
maxRetryBackoff = "maxRetryBackoff"
21-
defaultBase = 10
22-
defaultBitSize = 0
23-
defaultDB = 0
24-
defaultMaxRetries = 3
25-
defaultMaxRetryBackoff = time.Second * 2
26-
defaultEnableTLS = false
27-
)
28-
2913
// Standalone Redis lock store.Any fail-over related features are not supported,such as Sentinel and Redis Cluster.
3014
type StandaloneRedisLock struct {
3115
client *redis.Client
32-
metadata metadata
16+
metadata utils.RedisMetadata
3317
replicas int
3418

3519
features []lock.Feature
@@ -51,37 +35,21 @@ func NewStandaloneRedisLock(logger log.ErrorLogger) *StandaloneRedisLock {
5135

5236
func (p *StandaloneRedisLock) Init(metadata lock.Metadata) error {
5337
// 1. parse config
54-
m, err := parseRedisMetadata(metadata)
38+
m, err := utils.ParseRedisMetadata(metadata.Properties)
5539
if err != nil {
5640
return err
5741
}
5842
p.metadata = m
5943
// 2. construct client
60-
p.client = p.newClient(m)
44+
p.client = utils.NewRedisClient(m)
6145
p.ctx, p.cancel = context.WithCancel(context.Background())
6246
// 3. connect to redis
6347
if _, err = p.client.Ping(p.ctx).Result(); err != nil {
64-
return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", m.host, err)
48+
return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", m.Host, err)
6549
}
6650
return err
6751
}
6852

69-
func (p *StandaloneRedisLock) newClient(m metadata) *redis.Client {
70-
opts := &redis.Options{
71-
Addr: m.host,
72-
Password: m.password,
73-
DB: defaultDB,
74-
MaxRetries: m.maxRetries,
75-
MaxRetryBackoff: m.maxRetryBackoff,
76-
}
77-
if m.enableTLS {
78-
opts.TLSConfig = &tls.Config{
79-
InsecureSkipVerify: m.enableTLS,
80-
}
81-
}
82-
return redis.NewClient(opts)
83-
}
84-
8553
func (p *StandaloneRedisLock) Features() []lock.Feature {
8654
return p.features
8755
}
@@ -145,54 +113,3 @@ func (p *StandaloneRedisLock) Close() error {
145113

146114
return p.client.Close()
147115
}
148-
149-
func parseRedisMetadata(meta lock.Metadata) (metadata, error) {
150-
m := metadata{}
151-
152-
if val, ok := meta.Properties[host]; ok && val != "" {
153-
m.host = val
154-
} else {
155-
return m, errors.New("redis store error: missing host address")
156-
}
157-
158-
if val, ok := meta.Properties[password]; ok && val != "" {
159-
m.password = val
160-
}
161-
162-
m.enableTLS = defaultEnableTLS
163-
if val, ok := meta.Properties[enableTLS]; ok && val != "" {
164-
tls, err := strconv.ParseBool(val)
165-
if err != nil {
166-
return m, fmt.Errorf("redis store error: can't parse enableTLS field: %s", err)
167-
}
168-
m.enableTLS = tls
169-
}
170-
171-
m.maxRetries = defaultMaxRetries
172-
if val, ok := meta.Properties[maxRetries]; ok && val != "" {
173-
parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize)
174-
if err != nil {
175-
return m, fmt.Errorf("redis store error: can't parse maxRetries field: %s", err)
176-
}
177-
m.maxRetries = int(parsedVal)
178-
}
179-
180-
m.maxRetryBackoff = defaultMaxRetryBackoff
181-
if val, ok := meta.Properties[maxRetryBackoff]; ok && val != "" {
182-
parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize)
183-
if err != nil {
184-
return m, fmt.Errorf("redis store error: can't parse maxRetryBackoff field: %s", err)
185-
}
186-
m.maxRetryBackoff = time.Duration(parsedVal)
187-
}
188-
189-
return m, nil
190-
}
191-
192-
type metadata struct {
193-
host string
194-
password string
195-
maxRetries int
196-
maxRetryBackoff time.Duration
197-
enableTLS bool
198-
}

0 commit comments

Comments
 (0)