diff --git a/.travis.yml b/.travis.yml index e0feeaaddf29..feffed468f0a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -40,11 +40,13 @@ before_script: - if [ ! -f "/home/travis/.m2/install.sh" ];then wget -O /home/travis/.m2/install.sh https://tiup-mirrors.pingcap.com/install.sh; fi - if [ ! -f "/home/travis/.m2/rclone-v1.57.0-linux-amd64.zip" ];then wget -O /home/travis/.m2/rclone-v1.57.0-linux-amd64.zip --no-check-certificate https://downloads.rclone.org/v1.57.0/rclone-v1.57.0-linux-amd64.zip && unzip /home/travis/.m2/rclone-v1.57.0-linux-amd64.zip -d /home/travis/.m2; fi - if [ ! -f "/home/travis/.m2/litmus-0.13.tar.g" ];then wget -O /home/travis/.m2/litmus-0.13.tar.gz http://www.webdav.org/neon/litmus/litmus-0.13.tar.gz; tar -zxvf /home/travis/.m2/litmus-0.13.tar.gz -C /home/travis/.m2;cd /home/travis/.m2/litmus-0.13;./configure;make;cd -; fi + - if [ ! -f "/home/travis/.m2/etcd-v3.5.2-linux-amd64.tar.gz" ];then wget -O /home/travis/.m2/etcd-v3.5.2-linux-amd64.tar.gz https://github.com/etcd-io/etcd/releases/download/v3.5.2/etcd-v3.5.2-linux-amd64.tar.gz; tar -zxf /home/travis/.m2/etcd-v3.5.2-linux-amd64.tar.gz -C /home/travis/.m2; fi - docker images - sh /home/travis/.m2/install.sh && source ~/.bash_profile && nohup tiup playground --mode tikv-slim >> output.log 2>&1 & - docker run -d -p 9000:9000 -p 9001:9001 -e "MINIO_ROOT_USER=testUser" -e "MINIO_ROOT_PASSWORD=testUserPassword" quay.io/minio/minio:RELEASE.2022-01-25T19-56-04Z server /data --console-address ":9001" - go install github.com/minio/mc@RELEASE.2022-01-07T06-01-38Z && mc config host add local http://127.0.0.1:9000 testUser testUserPassword && mc mb local/testbucket - nohup /home/travis/.m2/rclone-v1.57.0-linux-amd64/rclone serve webdav local --addr 127.0.0.1:9007 >> rclone.log 2>&1 & + - # nohup /home/travis/.m2/etcd-v3.5.2-linux-amd64/etcd --unsafe-no-fsync --listen-client-urls http://127.0.0.1:2389 --advertise-client-urls http://127.0.0.1:2389 & - sudo chmod 777 /usr/local/maven-3.6.3/conf/settings.xml - sudo sed -i "s??/home/travis/.m2/repository?" /usr/local/maven-3.6.3/conf/settings.xml - docker run -d --name sftp -p 2222:22 juicedata/ci-sftp @@ -52,7 +54,7 @@ before_script: - sudo make -C fstests setup - chmod +x travis-setup-hdfs.sh - ./travis-setup-hdfs.sh - - for i in {2222,3306,5432,8020,9000,9007} ; do echo "lsof port:"$i;sudo lsof -i :$i;if [ $? != 0 ];then sleep 5; else continue; fi;sudo lsof -i :$i; if [ $? != 0 ];then echo "service not ready, port:"$i; exit 1;fi; done + - for i in {2222,3306,5432,8020,9000,9007,2379} ; do echo "lsof port:"$i;sudo lsof -i :$i;if [ $? != 0 ];then sleep 5; else continue; fi;sudo lsof -i :$i; if [ $? != 0 ];then echo "service not ready, port:"$i; exit 1;fi; done - sudo lsof -i :2379;if [ $? != 0 ];then echo "tikv is not ready";cat output.log;exit 1; fi script: - sudo chmod 777 /var/jfsCache diff --git a/Makefile b/Makefile index 4d806426f079..1d96a05f9a00 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ juicefs: Makefile cmd/*.go pkg/*/*.go go.* go build -ldflags="$(LDFLAGS)" -o juicefs ./cmd juicefs.lite: Makefile cmd/*.go pkg/*/*.go - go build -tags nogateway,nowebdav,nocos,nobos,nohdfs,noibmcos,noobs,nooss,noqingstor,noscs,nosftp,noswift,noupyun,noazure,nogs,noufile,nob2,nosqlite,nomysql,nopg,notikv,nobadger \ + go build -tags nogateway,nowebdav,nocos,nobos,nohdfs,noibmcos,noobs,nooss,noqingstor,noscs,nosftp,noswift,noupyun,noazure,nogs,noufile,nob2,nosqlite,nomysql,nopg,notikv,nobadger,noetcd \ -ldflags="$(LDFLAGS)" -o juicefs.lite ./cmd juicefs.ceph: Makefile cmd/*.go pkg/*/*.go diff --git a/go.mod b/go.mod index f2de776b21eb..aa2bf99403df 100644 --- a/go.mod +++ b/go.mod @@ -61,12 +61,14 @@ require ( github.com/urfave/cli/v2 v2.4.0 github.com/vbauerster/mpb/v7 v7.0.3 github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8 - golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064 - golang.org/x/net v0.0.0-20220325170049-de3da57026de - golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a - golang.org/x/sys v0.0.0-20220327210214-530d0810a4d0 + go.etcd.io/etcd v3.3.27+incompatible + go.etcd.io/etcd/client/v3 v3.5.2 + golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce + golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd + golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 + golang.org/x/sys v0.0.0-20220209214540-3681064d5158 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 - google.golang.org/api v0.73.0 + google.golang.org/api v0.70.0 gopkg.in/kothar/go-backblaze.v0 v0.0.0-20210124194846-35409b867216 xorm.io/xorm v1.0.7 ) @@ -98,8 +100,11 @@ require ( github.com/cheggaaa/pb v1.0.29 // indirect github.com/clbanning/mxj v1.8.4 // indirect github.com/coredns/coredns v1.4.0 // indirect + github.com/coreos/etcd v3.3.10+incompatible // indirect github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect + github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect github.com/dchest/siphash v1.2.1 // indirect @@ -214,7 +219,6 @@ require ( github.com/willf/bloom v2.0.3+incompatible // indirect go.etcd.io/etcd/api/v3 v3.5.2 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.2 // indirect - go.etcd.io/etcd/client/v3 v3.5.2 // indirect go.opencensus.io v0.23.0 // indirect go.opentelemetry.io/otel v0.14.0 // indirect go.uber.org/atomic v1.9.0 // indirect diff --git a/go.sum b/go.sum index 5c18a58ab961..4d3952b7ced6 100644 --- a/go.sum +++ b/go.sum @@ -171,13 +171,18 @@ github.com/colinmarc/hdfs/v2 v2.2.0 h1:4AaIlTq+/sWmeqYhI0dX8bD4YrMQM990tRjm636Fk github.com/colinmarc/hdfs/v2 v2.2.0/go.mod h1:Wss6n3mtaZyRwWaqtSH+6ge01qT0rw9dJJmvoUnIQ/E= github.com/coredns/coredns v1.4.0 h1:RubBkYmkByUqZWWkjRHvNLnUHgkRVqAWgSMmRFvpE1A= github.com/coredns/coredns v1.4.0/go.mod h1:zASH/MVDgR6XZTbxvOnsZfffS+31vg6Ackf/wo1+AM0= +github.com/coreos/etcd v3.3.10+incompatible h1:jFneRYjIvLMLhDLCzuTuU4rSJUjRplcJQ7pD7MnhC04= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf h1:iW4rZ826su+pqaw19uhpSCzhj44qo35pNgKFGqzDKkU= +github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU= @@ -830,6 +835,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0= +go.etcd.io/etcd v3.3.27+incompatible h1:5hMrpf6REqTHV2LW2OclNpRtxI0k9ZplMemJsMSWju0= +go.etcd.io/etcd v3.3.27+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.etcd.io/etcd/api/v3 v3.5.2 h1:tXok5yLlKyuQ/SXSjtqHc4uzNaMqZi2XsoSPr/LlJXI= go.etcd.io/etcd/api/v3 v3.5.2/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/client/pkg/v3 v3.5.2 h1:4hzqQ6hIb3blLyQ8usCU4h3NghkqcsohEQ3o3VetYxE= @@ -892,9 +899,8 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce h1:Roh6XWxHFKrPgC/EQhVubSAGQ6Ozk6IdxHSzt1mR0EI= golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064 h1:S25/rfnfsMVgORT4/J61MJ7rdyseOZOyvLIrZEZ7s6s= -golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -977,10 +983,8 @@ golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220325170049-de3da57026de h1:pZB1TWnKi+o4bENlbzAgLrEbY4RMYmUIRobMcSmfeYc= -golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -996,9 +1000,8 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 h1:RerP+noqYHUQ8CMRcPlC2nvTa4dcBIjegkuWdcUDuqg= golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a h1:qfl7ob3DIEs3Ml9oLuPwY2N04gymzAW04WsUQHIClgM= -golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1098,10 +1101,8 @@ golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220327210214-530d0810a4d0 h1:G6WAvvcMaaFYQhMbC0L5ZWNExEcJ3j3yFTxx4mwOHtM= -golang.org/x/sys v0.0.0-20220327210214-530d0810a4d0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1225,9 +1226,8 @@ google.golang.org/api v0.64.0/go.mod h1:931CdxA8Rm4t6zqTFGSsgwbAEZ2+GMYurbndwSim google.golang.org/api v0.66.0/go.mod h1:I1dmXYpX7HGwz/ejRxwQp2qj5bFAz93HiCU1C1oYd9M= google.golang.org/api v0.67.0/go.mod h1:ShHKP8E60yPsKNw/w8w+VYaj9H6buA5UqDp8dhbQZ6g= google.golang.org/api v0.69.0/go.mod h1:boanBiw+h5c3s+tBPgEzLDRHfFLWV0qXxRHz3ws7C80= +google.golang.org/api v0.70.0 h1:67zQnAE0T2rB0A3CwLSas0K+SbVzSxP+zTLkQLexeiw= google.golang.org/api v0.70.0/go.mod h1:Bs4ZM2HGifEvXwd50TtW70ovgJffJYw2oRCOFU/SkfA= -google.golang.org/api v0.73.0 h1:O9bThUh35K1rvUrQwTUQ1eqLC/IYyzUpWavYIO2EXvo= -google.golang.org/api v0.73.0/go.mod h1:lbd/q6BRFJbdpV6OUCXstVeiI5mL/d3/WifG7iNKnjI= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index fd198e802f4a..535384552455 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -1810,6 +1810,9 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { return } + if len(buf) > sliceBytes*100 { + buf = buf[:sliceBytes*100] + } ss := readSliceBuf(buf) skipped := skipSome(ss) ss = ss[skipped:] diff --git a/pkg/meta/tkv_etcd.go b/pkg/meta/tkv_etcd.go new file mode 100644 index 000000000000..de2b272eecbd --- /dev/null +++ b/pkg/meta/tkv_etcd.go @@ -0,0 +1,334 @@ +//go:build !noetcd +// +build !noetcd + +/* + * JuiceFS, Copyright 2022 Juicedata, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package meta + +import ( + "bytes" + "context" + "crypto/tls" + "fmt" + "net/url" + "strings" + "time" + + "github.com/pkg/errors" + etcd "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/pkg/transport" +) + +type etcdTxn struct { + ctx context.Context + kv etcd.KV + observed map[string]int64 + buffer map[string][]byte +} + +func (tx *etcdTxn) get(key []byte) []byte { + k := string(key) + if v, ok := tx.buffer[k]; ok { + return v + } + resp, err := tx.kv.Get(tx.ctx, k, etcd.WithLimit(1)) + if err != nil { + panic(fmt.Errorf("get %v: %s", k, err)) + } + if resp.Count == 0 { + tx.observed[k] = 0 + return nil + } + if resp.Count > 1 { + panic(fmt.Errorf("expect 1 keys but got %d", resp.Count)) + } + for _, pair := range resp.Kvs { + if bytes.Equal(pair.Key, key) { + tx.observed[k] = pair.ModRevision + return pair.Value + } else { + panic(fmt.Errorf("expect key %v, but got %v", k, string(pair.Key))) + } + } + panic("unreachable") +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func (tx *etcdTxn) gets(keys ...[]byte) [][]byte { + if len(keys) > 128 { + var rs = make([][]byte, 0, len(keys)) + for i := 0; i < len(keys); i += 128 { + rs = append(rs, tx.gets(keys[i:min(i+128, len(keys))]...)...) + } + return rs + } + ops := make([]etcd.Op, len(keys)) + for i, key := range keys { + ops[i] = etcd.OpGet(string(key)) + } + r, err := tx.kv.Do(tx.ctx, etcd.OpTxn(nil, ops, nil)) + if err != nil { + panic(fmt.Errorf("batch get with %d keys: %s", len(keys), err)) + } + rs := make(map[string][]byte) + for _, res := range r.Txn().Responses { + for _, p := range res.GetResponseRange().Kvs { + k := string(p.Key) + tx.observed[k] = p.ModRevision + rs[k] = p.Value + } + } + values := make([][]byte, len(keys)) + for i, key := range keys { + k := string(key) + if v, ok := tx.buffer[k]; ok { + values[i] = v + continue + } + values[i] = rs[k] + if len(values[i]) == 0 { + tx.observed[k] = 0 + } + } + return values +} + +func (tx *etcdTxn) scanRange(begin_, end_ []byte) map[string][]byte { + resp, err := tx.kv.Get(tx.ctx, string(begin_), etcd.WithRange(string(end_))) + if err != nil { + panic(fmt.Errorf("get range [%v-%v): %s", string(begin_), string(end_), err)) + } + ret := make(map[string][]byte) + for _, kv := range resp.Kvs { + k := string(kv.Key) + tx.observed[k] = kv.ModRevision + ret[k] = kv.Value + } + return ret +} + +func (tx *etcdTxn) scan(prefix []byte, handler func(key []byte, value []byte)) { + resp, err := tx.kv.Get(tx.ctx, + string(prefix), + etcd.WithPrefix(), + etcd.WithSerializable()) + if err != nil { + panic(fmt.Errorf("get prefix %v: %s", string(prefix), err)) + } + for _, kv := range resp.Kvs { + tx.observed[string(kv.Key)] = kv.ModRevision + handler(kv.Key, kv.Value) + } +} + +func (tx *etcdTxn) scanKeys(prefix []byte) [][]byte { + resp, err := tx.kv.Get(tx.ctx, string(prefix), etcd.WithPrefix(), etcd.WithKeysOnly()) + if err != nil { + panic(fmt.Errorf("get prefix %v with keys only: %s", string(prefix), err)) + } + var keys [][]byte + for _, kv := range resp.Kvs { + tx.observed[string(kv.Key)] = kv.ModRevision + keys = append(keys, kv.Key) + } + return keys +} + +func (tx *etcdTxn) scanValues(prefix []byte, limit int, filter func(k, v []byte) bool) map[string][]byte { + if limit == 0 { + return nil + } + resp, err := tx.kv.Get(tx.ctx, string(prefix), etcd.WithPrefix()) + if err != nil { + panic(fmt.Errorf("get prefix %s: %s", string(prefix), err)) + } + ret := make(map[string][]byte) + for _, kv := range resp.Kvs { + if filter == nil || filter(kv.Key, kv.Value) { + k := string(kv.Key) + tx.observed[k] = kv.ModRevision + ret[k] = kv.Value + if limit > 0 && len(ret) >= limit { + break + } + } + } + return ret +} + +func (tx *etcdTxn) exist(prefix []byte) bool { + resp, err := tx.kv.Get(tx.ctx, string(prefix), etcd.WithPrefix(), etcd.WithCountOnly()) + if err != nil { + panic(fmt.Errorf("get prefix %v with count only: %s", string(prefix), err)) + } + return resp.Count > 0 +} + +func (tx *etcdTxn) set(key, value []byte) { + tx.buffer[string(key)] = value +} + +func (tx *etcdTxn) append(key []byte, value []byte) []byte { + new := append(tx.get(key), value...) + tx.set(key, new) + return new +} + +func (tx *etcdTxn) incrBy(key []byte, value int64) int64 { + buf := tx.get(key) + new := parseCounter(buf) + if value != 0 { + new += value + tx.set(key, packCounter(new)) + } + return new +} + +func (tx *etcdTxn) dels(keys ...[]byte) { + for _, key := range keys { + tx.buffer[string(key)] = nil + } +} + +type etcdClient struct { + client *etcd.Client + kv etcd.KV +} + +func (c *etcdClient) name() string { + return "etcd" +} + +func (c *etcdClient) shouldRetry(err error) bool { + return errors.Is(err, conflicted) +} + +func (c *etcdClient) txn(f func(kvTxn) error) (err error) { + ctx := context.Background() + tx := &etcdTxn{ + ctx, + c.kv, + make(map[string]int64), + make(map[string][]byte), + } + start := time.Now() + defer func() { + if r := recover(); r != nil { + fe, ok := r.(error) + if ok { + err = fe + } else { + panic(r) + } + } + }() + err = f(tx) + if err != nil { + return err + } + if len(tx.buffer) == 0 { + return nil // read only + } + var conds []etcd.Cmp + var ops []etcd.Op + for k, v := range tx.observed { + conds = append(conds, etcd.Compare(etcd.ModRevision(k), "=", v)) + } + for k, v := range tx.buffer { + var op etcd.Op + if v == nil { + op = etcd.OpDelete(string(k)) + } else { + op = etcd.OpPut(string(k), string(v)) + } + ops = append(ops, op) + } + resp, err := c.kv.Txn(ctx).If(conds...).Then(ops...).Commit() + if time.Since(start) > time.Millisecond*10 { + logger.Debugf("txn with %d conds and %d ops took %s", len(conds), len(ops), time.Since(start)) + } + if err != nil { + return err + } + if resp.Succeeded { + return nil + } + return conflicted +} + +var conflicted = errors.New("conflicted transaction") + +func (c *etcdClient) reset(prefix []byte) error { + _, err := c.kv.Delete(context.Background(), string(prefix), etcd.WithPrefix()) + return err +} + +func (c *etcdClient) close() error { + return c.client.Close() +} + +func buildTlsConfig(u *url.URL) (*tls.Config, error) { + var tsinfo transport.TLSInfo + q := u.Query() + tsinfo.CAFile = q.Get("cacert") + tsinfo.CertFile = q.Get("cert") + tsinfo.KeyFile = q.Get("key") + tsinfo.ServerName = q.Get("server-name") + tsinfo.InsecureSkipVerify = q.Get("insecure-skip-verify") != "" + if tsinfo.CAFile != "" || tsinfo.CertFile != "" || tsinfo.KeyFile != "" || tsinfo.ServerName != "" { + return tsinfo.ClientConfig() + } + return nil, nil +} + +func newEtcdClient(addr string) (tkvClient, error) { + if !strings.Contains(addr, "://") { + addr = "http://" + addr + } + u, err := url.Parse(addr) + if err != nil { + return nil, fmt.Errorf("parse %s: %s", addr, err) + } + passwd, _ := u.User.Password() + conf := etcd.Config{ + Endpoints: strings.Split(u.Host, ","), + Username: u.User.Username(), + Password: passwd, + AutoSyncInterval: time.Minute, + } + conf.TLS, err = buildTlsConfig(u) + if err != nil { + return nil, fmt.Errorf("build tls config from %s: %s", u.RawQuery, err) + } + c, err := etcd.New(conf) + if err != nil { + return nil, err + } + var prefix string = u.Path + "\xFD" + return withPrefix(&etcdClient{c, etcd.NewKV(c)}, []byte(prefix)), nil +} + +func init() { + Register("etcd", newKVMeta) + drivers["etcd"] = newEtcdClient +} diff --git a/pkg/meta/tkv_prefix.go b/pkg/meta/tkv_prefix.go index ea3aaa71e44f..1a1c2dedae8b 100644 --- a/pkg/meta/tkv_prefix.go +++ b/pkg/meta/tkv_prefix.go @@ -24,7 +24,10 @@ type prefixTxn struct { } func (tx *prefixTxn) realKey(key []byte) []byte { - return append(tx.prefix, key...) + k := make([]byte, len(tx.prefix)+len(key)) + copy(k, tx.prefix) + copy(k[len(tx.prefix):], key) + return k } func (tx *prefixTxn) origKey(key []byte) []byte { @@ -113,7 +116,7 @@ func (c *prefixClient) txn(f func(kvTxn) error) error { func (c *prefixClient) reset(prefix []byte) error { if prefix != nil { - return fmt.Errorf("prefix must be nil") + return fmt.Errorf("prefix must be nil, but got %v", prefix) } return c.tkvClient.reset(c.prefix) } diff --git a/pkg/meta/tkv_test.go b/pkg/meta/tkv_test.go index 6e9da80705ec..ecbbbc6342e6 100644 --- a/pkg/meta/tkv_test.go +++ b/pkg/meta/tkv_test.go @@ -48,6 +48,14 @@ func TestBadgerClient(t *testing.T) { testMeta(t, m) } +func TestEtcdClient(t *testing.T) { + m, err := newKVMeta("etcd", "localhost:2379", &Config{}) + if err != nil { + t.Fatalf("create meta: %s", err) + } + testMeta(t, m) +} + func testTKV(t *testing.T, c tkvClient) { txn := func(f func(kt kvTxn)) { if err := c.txn(func(kt kvTxn) error { @@ -86,7 +94,7 @@ func testTKV(t *testing.T, c tkvClient) { var ks [][]byte txn(func(kt kvTxn) { ks = kt.gets([]byte("k1"), []byte("k2")) }) if ks[0] != nil || string(ks[1]) != "value" { - t.Fatalf("gets k1,k2: %+v", ks) + t.Fatalf("gets k1,k2: %+v != %+v", ks, [][]byte{nil, []byte("value")}) } var keys [][]byte @@ -154,6 +162,19 @@ func testTKV(t *testing.T, c tkvClient) { if count != 1 { t.Fatalf("counter should be 1, but got %d", count) } + + // key with zeros + k = []byte("k\x001") + txn(func(kt kvTxn) { + kt.set(k, v) + }) + var v2 []byte + txn(func(kt kvTxn) { + v2 = kt.get(k) + }) + if !bytes.Equal(v2, v) { + t.Fatalf("expect %v but got %v", v, v2) + } } func TestBadgerKV(t *testing.T) { @@ -164,6 +185,14 @@ func TestBadgerKV(t *testing.T) { testTKV(t, c) } +func TestEtcd(t *testing.T) { + c, err := newEtcdClient("localhost:2379/jfs") + if err != nil { + t.Fatal(err) + } + testTKV(t, c) +} + func TestMemKV(t *testing.T) { c, _ := newTkvClient("memkv", "") c = withPrefix(c, []byte("jfs"))