-
Notifications
You must be signed in to change notification settings - Fork 838
Support Snappy compression for gRPC #2940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
270a06d
5df98c9
835a6d1
5dd90d3
c775a23
6cae329
87bcb12
0f7a209
97bd88d
0978fec
2f15674
47d77a0
6b4b2bd
fee4259
88d34a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -105,6 +105,9 @@ func (cfg *Config) Validate() error { | |||||
| if err := cfg.CassandraStorageConfig.Validate(); err != nil { | ||||||
| return errors.Wrap(err, "invalid Cassandra Storage config") | ||||||
| } | ||||||
| if err := cfg.GCPStorageConfig.Validate(util.Logger); err != nil { | ||||||
| return errors.Wrap(err, "invalid GCP Storage Storage config") | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| } | ||||||
| if err := cfg.Swift.Validate(); err != nil { | ||||||
| return errors.Wrap(err, "invalid Swift Storage config") | ||||||
| } | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| package snappy | ||
pstibrany marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| import ( | ||
| "io" | ||
| "sync" | ||
|
|
||
| "github.com/golang/snappy" | ||
| "google.golang.org/grpc/encoding" | ||
| ) | ||
|
|
||
| // Name is the name registered for the snappy compressor. | ||
| const Name = "snappy" | ||
|
|
||
| func init() { | ||
| encoding.RegisterCompressor(newCompressor()) | ||
| } | ||
|
|
||
| type compressor struct { | ||
pracucci marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| writersPool sync.Pool | ||
Wing924 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| readersPool sync.Pool | ||
| } | ||
|
|
||
| func newCompressor() *compressor { | ||
| c := &compressor{} | ||
| c.readersPool = sync.Pool{ | ||
| New: func() interface{} { | ||
| return &reader{ | ||
| pool: &c.readersPool, | ||
| Reader: snappy.NewReader(nil), | ||
| } | ||
| }, | ||
| } | ||
| c.writersPool = sync.Pool{ | ||
| New: func() interface{} { | ||
| return &writeCloser{ | ||
| pool: &c.writersPool, | ||
| Writer: snappy.NewBufferedWriter(nil), | ||
| } | ||
| }, | ||
| } | ||
| return c | ||
| } | ||
|
|
||
| func (c *compressor) Name() string { | ||
| return Name | ||
| } | ||
|
|
||
| func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { | ||
| wr := c.writersPool.Get().(*writeCloser) | ||
| wr.Reset(w) | ||
| return wr, nil | ||
| } | ||
|
|
||
| func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { | ||
| dr := c.readersPool.Get().(*reader) | ||
| dr.Reset(r) | ||
| return dr, nil | ||
| } | ||
|
|
||
| type writeCloser struct { | ||
| *snappy.Writer | ||
pstibrany marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| pool *sync.Pool | ||
| } | ||
|
|
||
| func (w *writeCloser) Close() error { | ||
| defer w.pool.Put(w) | ||
| return w.Writer.Close() | ||
| } | ||
|
|
||
| type reader struct { | ||
| *snappy.Reader | ||
|
||
| pool *sync.Pool | ||
| } | ||
|
|
||
| func (r *reader) Read(p []byte) (n int, err error) { | ||
| n, err = r.Reader.Read(p) | ||
| if err == io.EOF { | ||
| r.pool.Put(r) | ||
|
||
| } | ||
| return n, err | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| package snappy | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "io" | ||
| "io/ioutil" | ||
| "strings" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestSnappy(t *testing.T) { | ||
| c := newCompressor() | ||
| assert.Equal(t, "snappy", c.Name()) | ||
|
|
||
| tests := []struct { | ||
| test string | ||
| input string | ||
| }{ | ||
| {"empty", ""}, | ||
| {"short", "hello world"}, | ||
| {"long", strings.Repeat("123456789", 1024)}, | ||
| } | ||
| for _, test := range tests { | ||
| t.Run(test.test, func(t *testing.T) { | ||
| var buf bytes.Buffer | ||
| // Compress | ||
| w, err := c.Compress(&buf) | ||
| require.NoError(t, err) | ||
| n, err := w.Write([]byte(test.input)) | ||
| require.NoError(t, err) | ||
| assert.Len(t, test.input, n) | ||
| err = w.Close() | ||
| require.NoError(t, err) | ||
| // Decompress | ||
| r, err := c.Decompress(&buf) | ||
| require.NoError(t, err) | ||
| out, err := ioutil.ReadAll(r) | ||
| require.NoError(t, err) | ||
| assert.Equal(t, test.input, string(out)) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func BenchmarkSnappyCompress(b *testing.B) { | ||
| data := []byte(strings.Repeat("123456789", 1024)) | ||
| c := newCompressor() | ||
| w, _ := c.Compress(ioutil.Discard) | ||
| b.ResetTimer() | ||
| for i := 0; i < b.N; i++ { | ||
| _, _ = w.Write(data) | ||
| _ = w.Close() | ||
| } | ||
| } | ||
|
|
||
| func BenchmarkSnappyDecompress(b *testing.B) { | ||
| data := []byte(strings.Repeat("123456789", 1024)) | ||
| c := newCompressor() | ||
| var buf bytes.Buffer | ||
| w, _ := c.Compress(&buf) | ||
| _, _ = w.Write(data) | ||
| reader := bytes.NewReader(buf.Bytes()) | ||
| b.ResetTimer() | ||
| for i := 0; i < b.N; i++ { | ||
| r, _ := c.Decompress(reader) | ||
| _, _ = ioutil.ReadAll(r) | ||
| _, _ = reader.Seek(0, io.SeekStart) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -3,10 +3,16 @@ package grpcclient | |||||
| import ( | ||||||
| "flag" | ||||||
|
|
||||||
| "github.com/go-kit/kit/log" | ||||||
| "github.com/go-kit/kit/log/level" | ||||||
| grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" | ||||||
| "github.com/pkg/errors" | ||||||
| "google.golang.org/grpc" | ||||||
| "google.golang.org/grpc/encoding/gzip" | ||||||
|
|
||||||
| "github.com/cortexproject/cortex/pkg/util" | ||||||
| "github.com/cortexproject/cortex/pkg/util/flagext" | ||||||
| "github.com/cortexproject/cortex/pkg/util/grpc/encoding/snappy" | ||||||
| "github.com/cortexproject/cortex/pkg/util/tls" | ||||||
| ) | ||||||
|
|
||||||
|
|
@@ -15,6 +21,7 @@ type Config struct { | |||||
| MaxRecvMsgSize int `yaml:"max_recv_msg_size"` | ||||||
| MaxSendMsgSize int `yaml:"max_send_msg_size"` | ||||||
| UseGzipCompression bool `yaml:"use_gzip_compression"` | ||||||
|
||||||
| UseGzipCompression bool `yaml:"use_gzip_compression"` | |
| UseGzipCompression bool `yaml:"use_gzip_compression"` // TODO: Remove this deprecated option in v1.6.0. |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| compression = "gzip" | |
| compression = gzip.Name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect this to be
grpc_compression. I'm not sure why the CI linter passed. Could you runmake docagain, please?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as you may know, CI don't run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I haven't noticed! Can't understand why the CI is not running on this specific PR 👀