-
Notifications
You must be signed in to change notification settings - Fork 25.7k
[ML] handles compressed model stream from native process #58009
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] handles compressed model stream from native process #58009
Conversation
c772433 to
f3ccd19
Compare
…ics-handle-compressed-model-stream
|
Pinging @elastic/ml-core (:ml) |
…ics-handle-compressed-model-stream
|
run elasticsearch-ci/2 |
hendrikmuhs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added some comments
...ClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedMoodelPersisterIT.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/elasticsearch/xpack/ml/inference/persistence/TrainedModelDefinitionDoc.java
Show resolved
Hide resolved
...src/main/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersister.java
Outdated
Show resolved
Hide resolved
| Consumer<Exception> failureHandler, | ||
| ExtractedFields extractedFields) { | ||
| this.provider = provider; | ||
| this.currentModelId = new AtomicReference<>(""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not initialize it empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, ... I mean new AtomicReference<>()
...src/main/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersister.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersister.java
Outdated
Show resolved
Hide resolved
| try { | ||
| readyToStoreNewModel = false; | ||
| if (latch.await(30, TimeUnit.SECONDS) == false) { | ||
| LOGGER.error("[{}] Timed out (30s) waiting for inference model metadata to be stored", analytics.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this happens, it seems the persister can get stuck, because the doc gets never be stored and readyToStoreNewModel is never reset? Correct me if I am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it should reset. Good catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think it should switch back in this timeout check. If it times out, it just took a long time to persist.
If the persistence itself fails, then I will reset the boolean flag.
Similar behavior for the persistence of the definition docs. exception being, if the definition doc is the eos, then I will reset the flag.
tveasey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good (the expected format all looks correct from the C++ side) just a few minor comments.
...ClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedMoodelPersisterIT.java
Outdated
Show resolved
Hide resolved
...ClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedMoodelPersisterIT.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersister.java
Outdated
Show resolved
Hide resolved
...est/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersisterTests.java
Show resolved
Hide resolved
...ClusterTest/java/org/elasticsearch/xpack/ml/integration/ChunkedTrainedMoodelPersisterIT.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersister.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersister.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersister.java
Outdated
Show resolved
Hide resolved
.../ml/src/main/java/org/elasticsearch/xpack/ml/inference/persistence/TrainedModelProvider.java
Outdated
Show resolved
Hide resolved
…ics-handle-compressed-model-stream
tveasey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice tidy up! However, I think the readyToStoreNewModel flag gets reset too soon.
...src/main/java/org/elasticsearch/xpack/ml/dataframe/process/ChunkedTrainedModelPersister.java
Outdated
Show resolved
Hide resolved
tveasey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
davidkyle
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| .setCompressedString(chunks.get(i)) | ||
| .setCompressionVersion(TrainedModelConfig.CURRENT_DEFINITION_COMPRESSION_VERSION) | ||
| .setDefinitionLength(chunks.get(i).length()) | ||
| .setEos(i == chunks.size() - 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IntStream.range is end exclusive so we will never get to i == chunks.size() - 1
Should it be set to false as Eos is set on the last doc in the list in line 223
|
@elasticmachine update branch |
…ics-handle-compressed-model-stream
hendrikmuhs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, 2 more suggestions
| CountDownLatch latch = storeTrainedModelDoc(trainedModelDefinitionDoc); | ||
| try { | ||
| if (latch.await(STORE_TIMEOUT_SEC, TimeUnit.SECONDS) == false) { | ||
| LOGGER.error("[{}] Timed out (30s) waiting for chunked inference definition to be stored", analytics.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: now that STORE_TIMEOUT_SEC is a constant, the log message can use it as argument (in other places, too)
|
|
||
| private final String definition; | ||
| private final int docNum; | ||
| private final Boolean eos; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a 3rd state (null)?
in code it looks like null and false are false.
it seems simpler to me to use boolean and handle null as part of parsing
This moves model storage from handling the fully parsed JSON string to handling two separate types of documents. 1. ModelSizeInfo which contains model size information 2. TrainedModelDefinitionChunk which contains a particular chunk of the compressed model definition string. `model_size_info` is assumed to be handled first. This will generate the model_id and store the initial trained model config object. Then each chunk is assumed to be in correct order for concatenating the chunks to get a compressed definition. Native side change: elastic/ml-cpp#1349
… (#58836) * [ML] handles compressed model stream from native process (#58009) This moves model storage from handling the fully parsed JSON string to handling two separate types of documents. 1. ModelSizeInfo which contains model size information 2. TrainedModelDefinitionChunk which contains a particular chunk of the compressed model definition string. `model_size_info` is assumed to be handled first. This will generate the model_id and store the initial trained model config object. Then each chunk is assumed to be in correct order for concatenating the chunks to get a compressed definition. Native side change: elastic/ml-cpp#1349
This moves model storage from handling the fully parsed JSON string to handling two separate types of documents.
model_size_infois assumed to be handled first. This will generate the model_id and store the initial trained model config object. Then each chunk is assumed to be in correct order for concatenating the chunks to get a compressed definition.Native side change: elastic/ml-cpp#1349