-
Notifications
You must be signed in to change notification settings - Fork 840
Support Snappy compression for gRPC #2940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
270a06d
5df98c9
835a6d1
5dd90d3
c775a23
6cae329
87bcb12
0f7a209
97bd88d
0978fec
2f15674
47d77a0
6b4b2bd
fee4259
88d34a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1774,10 +1774,15 @@ bigtable: | |
| # CLI flag: -bigtable.grpc-max-send-msg-size | ||
| [max_send_msg_size: <int> | default = 16777216] | ||
|
|
||
| # Use compression when sending messages. | ||
| # Deprecated: Use gzip compression when sending messages. | ||
| # CLI flag: -bigtable.grpc-use-gzip-compression | ||
| [use_gzip_compression: <boolean> | default = false] | ||
|
|
||
| # Use compression when sending messages. Supported values are: 'gzip', | ||
| # 'snappy' and '' (disable compression) | ||
| # CLI flag: -bigtable.grpc-compression | ||
| [compression: <string> | default = ""] | ||
|
||
|
|
||
| # Rate limit for gRPC client; 0 means disabled. | ||
| # CLI flag: -bigtable.grpc-client-rate-limit | ||
| [rate_limit: <float> | default = 0] | ||
|
|
@@ -2265,10 +2270,15 @@ grpc_client_config: | |
| # CLI flag: -ingester.client.grpc-max-send-msg-size | ||
| [max_send_msg_size: <int> | default = 16777216] | ||
|
|
||
| # Use compression when sending messages. | ||
| # Deprecated: Use gzip compression when sending messages. | ||
| # CLI flag: -ingester.client.grpc-use-gzip-compression | ||
| [use_gzip_compression: <boolean> | default = false] | ||
|
|
||
| # Use compression when sending messages. Supported values are: 'gzip', | ||
| # 'snappy' and '' (disable compression) | ||
| # CLI flag: -ingester.client.grpc-compression | ||
| [compression: <string> | default = ""] | ||
|
|
||
| # Rate limit for gRPC client; 0 means disabled. | ||
| # CLI flag: -ingester.client.grpc-client-rate-limit | ||
| [rate_limit: <float> | default = 0] | ||
|
|
@@ -2338,10 +2348,15 @@ grpc_client_config: | |
| # CLI flag: -querier.frontend-client.grpc-max-send-msg-size | ||
| [max_send_msg_size: <int> | default = 16777216] | ||
|
|
||
| # Use compression when sending messages. | ||
| # Deprecated: Use gzip compression when sending messages. | ||
| # CLI flag: -querier.frontend-client.grpc-use-gzip-compression | ||
| [use_gzip_compression: <boolean> | default = false] | ||
|
|
||
| # Use compression when sending messages. Supported values are: 'gzip', | ||
| # 'snappy' and '' (disable compression) | ||
| # CLI flag: -querier.frontend-client.grpc-compression | ||
| [compression: <string> | default = ""] | ||
|
|
||
| # Rate limit for gRPC client; 0 means disabled. | ||
| # CLI flag: -querier.frontend-client.grpc-client-rate-limit | ||
| [rate_limit: <float> | default = 0] | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -105,6 +105,9 @@ func (cfg *Config) Validate() error { | |||||
| if err := cfg.CassandraStorageConfig.Validate(); err != nil { | ||||||
| return errors.Wrap(err, "invalid Cassandra Storage config") | ||||||
| } | ||||||
| if err := cfg.GCPStorageConfig.Validate(util.Logger); err != nil { | ||||||
| return errors.Wrap(err, "invalid GCP Storage Storage config") | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| } | ||||||
| if err := cfg.Swift.Validate(); err != nil { | ||||||
| return errors.Wrap(err, "invalid Swift Storage config") | ||||||
| } | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| package snappy | ||
pstibrany marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| import ( | ||
| "io" | ||
| "sync" | ||
|
|
||
| "github.com/golang/snappy" | ||
| "google.golang.org/grpc/encoding" | ||
| ) | ||
|
|
||
| // Name is the name registered for the snappy compressor. | ||
| const Name = "snappy" | ||
|
|
||
| func init() { | ||
| encoding.RegisterCompressor(newCompressor()) | ||
| } | ||
|
|
||
| type compressor struct { | ||
pracucci marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| writersPool sync.Pool | ||
Wing924 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| readersPool sync.Pool | ||
| } | ||
|
|
||
| func newCompressor() *compressor { | ||
| c := &compressor{} | ||
| c.readersPool = sync.Pool{ | ||
| New: func() interface{} { | ||
| return &reader{ | ||
| compressor: c, | ||
| Reader: snappy.NewReader(nil), | ||
| } | ||
| }, | ||
| } | ||
| c.writersPool = sync.Pool{ | ||
| New: func() interface{} { | ||
| return &writeCloser{ | ||
| compressor: c, | ||
| Writer: snappy.NewBufferedWriter(nil), | ||
| } | ||
| }, | ||
| } | ||
| return c | ||
| } | ||
|
|
||
| func (c *compressor) Name() string { | ||
| return Name | ||
| } | ||
|
|
||
| func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { | ||
| wr := c.writersPool.Get().(*writeCloser) | ||
| wr.Reset(w) | ||
| return wr, nil | ||
| } | ||
|
|
||
| func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { | ||
| dr := c.readersPool.Get().(*reader) | ||
| dr.Reset(r) | ||
| return dr, nil | ||
| } | ||
|
|
||
| type writeCloser struct { | ||
| *compressor | ||
| *snappy.Writer | ||
|
||
| } | ||
|
|
||
| func (w *writeCloser) Close() error { | ||
| defer w.writersPool.Put(w) | ||
| return w.Writer.Close() | ||
| } | ||
|
|
||
| type reader struct { | ||
| *compressor | ||
| *snappy.Reader | ||
|
||
| } | ||
|
|
||
| func (r *reader) Read(p []byte) (n int, err error) { | ||
| n, err = r.Reader.Read(p) | ||
| if err == io.EOF { | ||
| r.readersPool.Put(r) | ||
| } | ||
| return n, err | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -2,11 +2,16 @@ package grpcclient | |||||
|
|
||||||
| import ( | ||||||
| "flag" | ||||||
| "fmt" | ||||||
|
|
||||||
| "github.com/go-kit/kit/log" | ||||||
| "github.com/go-kit/kit/log/level" | ||||||
| grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" | ||||||
| "github.com/pkg/errors" | ||||||
| "google.golang.org/grpc" | ||||||
|
|
||||||
| "github.com/cortexproject/cortex/pkg/util" | ||||||
| "github.com/cortexproject/cortex/pkg/util/flagext" | ||||||
| "github.com/cortexproject/cortex/pkg/util/tls" | ||||||
| ) | ||||||
|
|
||||||
|
|
@@ -15,11 +20,14 @@ type Config struct { | |||||
| MaxRecvMsgSize int `yaml:"max_recv_msg_size"` | ||||||
| MaxSendMsgSize int `yaml:"max_send_msg_size"` | ||||||
| UseGzipCompression bool `yaml:"use_gzip_compression"` | ||||||
|
||||||
| UseGzipCompression bool `yaml:"use_gzip_compression"` | |
| UseGzipCompression bool `yaml:"use_gzip_compression"` // TODO: Remove this deprecated option in v1.6.0. |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CLI flags and YAML config option names should match.
| Compression string `yaml:"compression"` | |
| Compression string `yaml:"grpc_compression"` |
pstibrany marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
pstibrany marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be ... use -%s.grpc-compression now.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use snappy.Name instead of the hardcoded "snappy" given we have the constant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this made me go back and look why I didn't use gzip.Name, and it turns out it didn't exist at the time.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| compression = "gzip" | |
| compression = gzip.Name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Note that they should be moved to their respective sections)