athena audit logs - sqs receive#24038
Conversation
|
@hugoShaka @rosstimothy PTAL |
|
@hugoShaka friendly ping |
| // TODO(tobiaszheller): come back at some point and rework configuration of runWhileLocked. | ||
| // Now it tries every 250ms to acquire lock which can cause pressure on backend. | ||
| err = backend.RunWhileLocked(ctx, c.backend, lockName, lockTTL, func(ctx context.Context) error { |
There was a problem hiding this comment.
Is there any reason a single Auth can't process multiple batches in a row? Can we use a longer TTL and just delete the lock if there is no more work?
There was a problem hiding this comment.
I've not much to say about the code. My biggest concern is how this thing will fail and how to know/react. I don't think the RFD discussed backpressure and failure modes; if it did please point me to the discussion. You can disregard or postpone addressing this comment, I'll approve the PR tomorrow anyway.
We have no guarantee the auth can consume items faster than they pile up in the queue. It's not an issue per-se, but when this happens, we need to know how the system is doing, if it is consuming faster or slower than the event input, is stopped, cannot acquire lock, ... I think the following metrics would be a solid starting point:
- batch processing duration (histogram)
- batch size (histogram)
- batch count (histogram) (size vs count is because we can have a lot of small events or a few XXL, large batches can lead to memory pressure)
- batch processed (counter)
- last event seen (gauge/timestamp)
This will also allow tuning batch size, flush interval, and measure how the system behaves under load.
| maxWaitTimeOnReceiveMessageFromSQS = 5 * time.Second | ||
| // maxNumberOfWorkers defines how many workers are processing messages | ||
| // from queue or writing parquet files to s3. | ||
| maxNumberOfWorkers = 5 |
There was a problem hiding this comment.
Why was the number 5 chosen?
There was a problem hiding this comment.
I have hardcode 5 for now just based on gut feeling. In future probably this number should depend on how many items are in queue. 5 workers on my dev machine were enough to handle max load defined in cloud RFD (250 events/s if I remember correctly).
@hugoShaka thanks for raising it and sorry for not making it clear in description. I am planning to come back later in next PRs and add multiple metrics and replace debug messages. For metrics PRs I was planning to involve and get insights from Cloud team, because they will be monitoring those. Some metrics are available by AWS out of the box but I am not sure if Cloud team prefer to use AWS ones or if we should publish ours. We will also utilize dead-letter queue if messages cannot be processed. This PR is already complex so I think pushing metrics to other is reasonable. |
|
@russjones @rosstimothy I have decided to move "locking" part into other PR and keep here only reading from SQS. |
|
@rosstimothy |
8b6e9e4 to
5d1dd3b
Compare
5d1dd3b to
a7f01d4
Compare
a7f01d4 to
7eee9db
Compare
|
@tobiaszheller See the table below for backport results.
|
|
@tobiaszheller See the table below for backport results.
|
Part of https://github.com/gravitational/teleport.e/issues/894
RFD: #23700
This PR adds receiver of SQS messages for audit logs.
It's using channel to send messages from "receiver workers" to "s3 workers" which will be responsible for writing parquet file to s3. Note that "s3 workers" are not part of this PR and will be added in separate one.
Channel is used, because we want to start writing parquet file, as soon as we receive first events, even though we will listen for whole batch interval.