-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add rate limiter option to Cassandra writer (WIP) #1147
Conversation
8dcca83
to
879b240
Compare
Few questions:
|
I had considered that as well. I think it makes sense. The only issue is that config tends to be database specific, so the main change will probably still be in the Cassandra config (unless you'd like me to add to all database implementations immediately).
I'd love to use uber-go/ratelimit, but they only support integer rates, not floating point rates (not sure why). If I can get a PR through to them, that would be the best solution. Then again, it is probably a breaking change for their API. |
What makes floating point rates indispensable for us? |
I don't envision us having rate per ingester of less than one msg per second, so integer rates seem fine |
OK cool. I can remove the rate limiter too. |
Actually, ratelimit is broken. Here is the error:
Looks like the package is not really maintained based on the history and lack of versions. |
Codecov Report
@@ Coverage Diff @@
## master #1147 +/- ##
======================================
Coverage 100% 100%
======================================
Files 159 160 +1
Lines 7145 7162 +17
======================================
+ Hits 7145 7162 +17
Continue to review full report at Codecov.
|
Can someone please rerun test? ES unit test failed. |
I find that hard to believe given that it's being used by multiple other teams at Uber. Looks like the issue is more a vendoring issue. |
Interesting thought. I'm surprised it works. Seems to use internal package types in public interface. I'll try again and let you know. |
// rate limiter before each write. | ||
func (r *rateLimitedWriter) WriteSpan(s *model.Span) error { | ||
const cost = 1 | ||
for ok, waitTime := r.l.CheckCredit(cost); !ok; ok, waitTime = r.l.CheckCredit(cost) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't
for ok, waitTime := r.l.CheckCredit(cost); !ok {
time.Sleep(waitTime)
work? Not sure why the second call is needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well let's imagine Java/C++/wtvr other language:
for (bool ok = r.l.CheckCredit(cost); !ok; ok = r.l.CheckCredit(cost)) {
// ...
}
First statement is a declaration/initialization statement. Last statement repeats each time the loop is repeated. Not 100% sure, but I imagine Go works the same way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, this explains it: https://tour.golang.org/flowcontrol/1.
plugin/storage/cassandra/options.go
Outdated
@@ -48,6 +48,8 @@ const ( | |||
|
|||
// common storage settings | |||
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl" | |||
suffixWritesPerSecond = ".writer-rate-limit.operations-per-second" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with this for now but I think these configs should be storage type agnostic (ie ES can also use the rate limiter but we shouldn't have to pollute ES storage configs with .writer-rate-limit.operations-per-second
and .writer-rate-limit.max-burst
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish the configs generally were less storage specific. I'm not sure if we share any config at the moment.
plugin/storage/cassandra/options.go
Outdated
// rateLimitConfig defines common arguments to a rate limiter. | ||
type rateLimitConfig struct { | ||
operationsPerSecond float64 | ||
maxBurst float64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this config required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meant to delete this. Thanks for the reminder.
require.Nil(t, decoratedWriter) | ||
} | ||
|
||
func TestRateLimitedWriterUnlimitedWritesPerSecond(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that this test doesn't test anything additional over TestRateLimitedWriter
. Could this be made better?
(If not, it may make sense to implement a no-op Limiter inside our decorator so that we can test it better)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya. I didn't want to use time alone seeing how flakey that can be. Will try mocking soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a huge fan of type testing, but tried it out in latest revision.
plugin/storage/cassandra/factory.go
Outdated
@@ -28,6 +28,7 @@ import ( | |||
"github.com/jaegertracing/jaeger/storage" | |||
"github.com/jaegertracing/jaeger/storage/dependencystore" | |||
"github.com/jaegertracing/jaeger/storage/spanstore" | |||
"github.com/jaegertracing/jaeger/storage/spanstore/decorator" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the "decorator" package? Metrics is also a decorator, it's in its own package. I suggest spanstore/ratelimited
instead
plugin/storage/cassandra/factory.go
Outdated
|
||
return decorator.NewRateLimitedWriter( | ||
writer, | ||
f.Options.writesPerSecond, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if f.Options.writesPerSecond == 0, do we need to wrap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lol see what @vprithvi suggested earlier. We are wrapping with noop rate limiter in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we have an if statement, I'd rather have it here and not add two more layers of indirection. Then in your rate limited writer, return an error if rate < 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh man. I want to wait on @vprithvi's response because this is literally what I was wondering above.
Good idea. I wasn't sure if a passthrough would hurt performance due to the indirection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a preference towards having this logic be in the rate limiter because it means that it doesn't need to be in every storage implementation. However, it's not a strong preference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a separate concern though, the config and application of rate limiter is currently in the cassandra factory, but we could easily pull it up to the higher level (meta) factory (I just don't think we need to do it right now). This keeps the rate limiter simpler - you can only instantiate it with a positive rate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering when we care about performance of indirection, etc. But I'll go back to the original if statement here.
|
||
type rateLimitedWriter struct { | ||
w spanstore.Writer | ||
l ratelimit.Limiter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use full field names. One letter names are ok for local variables when there is context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it
ES integration test again! |
Job restarted |
plugin/storage/cassandra/options.go
Outdated
@@ -111,6 +113,9 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { | |||
flagSet.Duration(opt.primary.namespace+suffixSpanStoreWriteCacheTTL, | |||
opt.SpanStoreWriteCacheTTL, | |||
"The duration to wait before rewriting an existing service or operation name") | |||
flagSet.Int(opt.primary.namespace+suffixWritesPerSecond, | |||
opt.writesPerSecond, | |||
"The number of writes per second using rate limiter") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional upper limit on the number of span writes per second.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's better.
@@ -57,6 +58,7 @@ type Options struct { | |||
primary *namespaceConfig | |||
others map[string]*namespaceConfig | |||
SpanStoreWriteCacheTTL time.Duration | |||
writesPerSecond int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be public? And qualified with SpanStore
prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDK why SpanStoreWriteCacheTTL is public either. There is no serialization here. I think it might be a workaround to pass options to another package that need to read SpanStoreWriteCacheTTL
, but doesn't make much sense given that passing the int
is just as effective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how are we using this internally? SpanStoreWriteCacheTTL is externally settable option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both of these are initialized by command-line flags, so neither should be public. I did a code search and could not find a reference to SpanStoreWriteCacheTTL (not even in our internal Uber repo). Seems to be vestigial from some point where we needed access from our internal repo into this part of the OSS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use cli flags internally. If we need to set these two fields how would we do it without them being public? Did you try your branch without our internal main?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$ git grep -l SpanStoreWriteCacheTTL
plugin/storage/cassandra/factory.go
plugin/storage/cassandra/options.go
@@ -82,4 +83,6 @@ func TestOptionsWithFlags(t *testing.T) { | |||
assert.Equal(t, "", aux.Consistency, "aux storage does not inherit consistency from primary") | |||
assert.Equal(t, 3, aux.ProtoVersion) | |||
assert.Equal(t, 42*time.Second, aux.SocketKeepAlive) | |||
|
|||
assert.Equal(t, 10, opts.writesPerSecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to L73
"github.com/jaegertracing/jaeger/storage/spanstore" | ||
) | ||
|
||
var errInvalidRate = errors.New("rate must be a non-zero positive integer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
positive already implies non-zero
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couldn't we do the metrics in a separate PR? This is a scope creep.
if rate <= 0 { | ||
return nil, errInvalidRate | ||
} | ||
|
||
m := metricsFactory.Namespace("rate-limited-writer", nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is using dashes in metric names consistent with the rest of the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied the format from the storage/spanstore/metrics/writer_metrics.go
file, but I cannot guarantee that file is correct.
// WriteSpan wraps the write span method of the inner writer, but invokes the | ||
// rate limiter before each write. | ||
func (r *rateLimitedWriter) WriteSpan(s *model.Span) error { | ||
r.limiter.Take() | ||
const threshold = 100 * time.Microsecond |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks like a complete hack / arbitrary threshold. We should already have a write latency metric from the main writer. You can add another timer here that would measure full latency (wait + actual write) and emit that.
As for the counter, unless uber-go limiter returns a value indicating whether there was any sleep, I don't think we should be trying to reverse-engineer it with some arbitrary thredsholds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I happen to agree wholeheartedly. Team suggested I add a metric with number of delayed spans. The current rate limiter API makes this essentially impossible. IMO the real issue is that the rate limiter API has limited use in its current form. I believe it would have made more sense to return the duration slept as opposed to the timestamp of the wake time.
I just added metrics here while testing. I understand the concern regarding scope creep, and will move it to another PR soon. |
ade3fde
to
24aea8b
Compare
24aea8b
to
891eef8
Compare
Signed-off-by: Isaac Hier <[email protected]>
Signed-off-by: Isaac Hier <[email protected]>
Signed-off-by: Isaac Hier <[email protected]>
0b60e6f
to
2f11922
Compare
I think this is solved differently by #1353 |
Which problem is this PR solving?
Short description of the changes