-
Notifications
You must be signed in to change notification settings - Fork 525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Intake protocol v2 #1260
Intake protocol v2 #1260
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started the APM Server and sent 1 metadata, 1 transaction request.
After the transaction showing up in ES, the CPU usage of the APM Server went up to 200% although idle. I haven't looked into the reasons for this.
EDIT: I print the size of the publishers queue p.pendingRequests
and the queue keeps getting filled up with some entries, they are all non transformable though so nothing can really get processed, and CPU load increases.
beater/route_config.go
Outdated
sourcemapRouteType = routeType{ | ||
sourcemapHandler, | ||
backendMetadataDecoder, | ||
rumTransformConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would the sourceampRouteType need the sourcemap config? This is for uploading afaik.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it just needs config.SmapMapper
to be set. rumTransformConfig
does that.
beater/route_config.go
Outdated
authHandler(beaterConfig.SecretToken, h))) | ||
} | ||
|
||
func backendMetadataDecoder(beaterConfig *Config, d decoder.ReqDecoder) decoder.ReqDecoder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about calling this systemMetadataDecoder
and userMetaDataDecoder
instead of backend and rum ?
|
||
type v2Route struct { | ||
routeType | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the type definition and the methods on them should be in the same file.
update: @graphaelli and I looked into the issues, and it seems that the server is not handling the read_timeout properly. The requeing of non-existent requests starts when the read timeout is hit. |
var err error | ||
var rawModel map[string]interface{} | ||
|
||
eventables := []transform.Transformable{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are using var eventables []transform.Transformable
instead of creating an empty array, then the formerly experienced CPU load and queuing of non-existing events disappears.
Still think there are more issues here though, as eof
seems to not be read properly,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx to @graphaelli for pointing out the cpu usage is going on in this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @roncohen for putting up this WIP PR to get some early feedback!
I appreciate the cleanness of the PR and I really like having the route_config
separated, and also how little changes are necessary in the models!
As this is WIP, I assume you're going to add processor/package_tests for v2.
Are you planning on introducing json schemas for v2? E.g. in v1 spans
were nested in transactions
, which would not be allowed any longer in v2. With the current schemas this would still be possible.
Afaik the plan is to open a new http connection per bulk request. Anyhow, one bulk request potentially holds a lot of data, which are currently read, decoded and validated one after the other by a single thread. The reading and decoding part has high impact on the throughput. Have you thought about something like using one go routine reading from the stream but multiple go routines doing the decoding and validation and passing back the results to the one blocking go routine? (ofc does not need to be included in this PR but would like to hear your thoughts about this).
beater/v2_handler.go
Outdated
|
||
for k, v := range reqMeta { | ||
utility.MergeAdd(rawMetadata, k, v.(map[string]interface{})) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would this be necessary here? I understood that the requestDecoder
is taking care of bringing data into a map[string]interface{}
format and augment if necessary. In a next step we pass those data to validation and go struct decoding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we discussed offline and decided to defer this to subsequent PRs
great feedback. Thanks @simitt ! I think parallelizing decoding makes sense, but I might leave it for a follow up PR. When we do it, we would need to benchmark it make sure there's significant speedups. I'll make an issue if it doesn't go into this PR. |
Created a follow-up task for the v2 json spec changes, #1276. |
model/span/span.go
Outdated
@@ -56,7 +65,7 @@ type Span struct { | |||
TransactionId string | |||
} | |||
|
|||
func DecodeSpan(input interface{}, err error) (*Span, error) { | |||
func DecodeSpan(input interface{}, err error) (transform.Transformable, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to add setting the Timestamp
to the DecodeSpan
method, as for v2 the spans are not sent within transactions
any more.
There was an agreement that if the Timestamp is not set it should be set to the time "that the HTTP request from the agent was first received. "
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of setting the request time during decoding, I'm now doing it during transformation. I added a request time to the transformation context which is used to set the time if it's absent.
beater/v2_handler.go
Outdated
|
||
for { | ||
transformables, done := v.readBatch(batchSize, ndReader, resp) | ||
if transformables != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would still report empty arrays (if no data were read or a read error occurred an empty array can be returned), which doesn't cause errors but is unnecessary, please see https://github.com/elastic/apm-server/pull/1260/files/fb77d985e9ee8623290bc23839542e9b318472c8#r208983595
{"metadata.json", "model/metadata/generated/schema/metadata.go", "ModelSchema"}, | ||
{"errors/error.json", "model/error/generated/schema/error.go", "ModelSchema"}, | ||
{"transactions/transaction.json", "model/transaction/generated/schema/transaction.go", "ModelSchema"}, | ||
{"transactions/span.json", "model/span/generated/schema/transaction.go", "ModelSchema"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be span
4cd792f
to
2c737ab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to create a separate v2
branch in the upstream and point this PR against it. There are a couple of open tasks, required to be solved before v2 should be used. While I think this PR introduces a lot of good changes, merging it into master does not feel right. This is such a big undertaking that I think having a separate feature branch that we keep up-to-date with changes in master would be the right way to go forward.
docs/spec/span/span.json
Outdated
@@ -1,5 +1,5 @@ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please rename to docs/spec/spans/span.json
as the other folders are also pluralized.
beater/stream_response_test.go
Outdated
sr.add(QueueFullErr, 23) | ||
|
||
jsonOut, err := sr.marshal() | ||
assert.NoError(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use require.xxx
instead of assert.xxx
wherever the test should stop if the condition is not fullfilled?
(see https://godoc.org/github.com/stretchr/testify/require)
beater/stream_response_test.go
Outdated
} | ||
} | ||
}` | ||
expectedJSON = strings.Replace(strings.Replace(expectedJSON, "\n", "", -1), "\t", "", -1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should defer this, and you already introduce the tests here. Changing this to use the approvals
would require something like:
...
var jsonOut map[string]interface{}
outByte, err := sr.marshal()
require.NoError(t, err)
err = json.Unmarshal(outByte, &jsonOut)
require.NoError(t, err)
verifyErr := tests.ApproveJson(jsonOut, "testStreamResponseSimple", nil)
if verifyErr != nil {
assert.Fail(t, fmt.Sprintf("Test %s failed with error: %s", "testStreamResponseSimple", verifyErr.Error()))
}
As this is not a huge effort, I'd appreciate having it changed.
beater/route_config.go
Outdated
func v2backendHandler(beaterConfig *Config, h http.Handler) http.Handler { | ||
return logHandler( | ||
requestTimeHandler( | ||
authHandler(beaterConfig.SecretToken, h))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concurrencyLimitHandler
ensured that the Server cannot get overwhelmed with requests. While I see that the handling needs to be changed for v2 I am not sure it can simply be removed.
What if an agent is configured to create a separate http request in very short intervals and sends huge payloads per event (e.g. large stracktraces). This could still cause severe issues on the server, especially when running on a small box.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean. However, the current default of 5 makes it impossible to support more than 5 agents in v2 out of the box without changing the default value - which would affect v1 negatively. I think the solution is to create a limited number (runtime.NumCPU
) of separate goroutines that read from the streams. That would also parallelize decoding and validation. I'll take a look
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've now changed the base branch to a v2
. With that in mind, I would like to defer this to a separate PR. v2 is already way safer memory wise than v1, even the "edge case" you mention here. I don't think this particular issue should hold up the many other issues that are waiting for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging this to a v2
branch instead of master
relaxes this, and I agree we can defer it. Could you create and link the Issue please so we don't forget.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added it to this one: #1285
698cafd
to
be856ad
Compare
Let me know you think there's more we need to do here @simitt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you link all unchecked notes to the according github tasks please!
@@ -0,0 +1,328 @@ | |||
// Licensed to Elasticsearch B.V. under one or more contributor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errors
and metrics
are missing completely from these tests. There is an open Issue for handling errors, but none for metrics. Please add an Issue or some tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed this one. We have an issue for adding tests for all the different event types here: #1288 but i don't know if that's what you had in mind
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errors and metrics are tested here by the way: https://github.com/elastic/apm-server/pull/1260/files/0d3c4cadfc672abafbf469ca5e3b960babfca8ad#diff-44977a4ead5422850c78d1922785dc2bR168
Is that what you had in mind?
This introduces two new endpoints, one for backend systems and one for RUM, which use the new intake v2 format.
This introduces two new endpoints, one for backend systems and one for RUM, which use the new intake v2 format.
This introduces two new endpoints, one for backend systems and one for RUM, which use the new intake v2 format.
This introduces two new endpoints, one for backend systems and one for RUM, which use the new intake v2 format.
This introduces two new endpoints, one for backend systems and one for RUM, which use the new intake v2 format.
This introduces two new endpoints, one for backend systems and one for RUM, which use the new intake v2 format.
This introduces two new endpoints, one for backend systems and one for RUM, which use the new intake v2 format.
This introduces two new endpoints, one for backend systems and one for RUM, which use the new intake v2 format.
This introduces two new endpoints, one for backend systems and one for RUM, which use the new intake v2 format.
TODO:
Finalize processor/package_tests style tests for v2(deferred Intake v2: Finalize processor/package_tests style tests #1288)Investigate "full queue" behavior (reporting timeout is 1s atm., which is probably too high)(defered v2: Investigate "full queue" behavior #1298)Investigate which limits/configs are useful for intake v2(defered v2: Investigate which limits/configs are useful for intake v2 #1299)Consider unsetting/increasing(also handled by v2: Investigate which limits/configs are useful for intake v2 #1299)read_timeout
when using v2 to avoid timeoutsCloses #1238
Config options of note:
On the server there are some config options that are interesting for v2:
if the agent doesn’t complete its request within the
read_timeout
, atm. APM Server will force close the connection with an internal errorif the unzipped size surpasses
max_unzipped_size
the rest will be rejectedfor v2, this will be the maximum number of agents that we can receive data from. For v2 we should make it significantly or disable it entirely for v2, as we no longer need to keep it low to conserve memory