-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[server][common][vpj] Introduce ComplexVenicePartitioner to materialized view #1509
base: main
Are you sure you want to change the base?
Conversation
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java
Outdated
Show resolved
Hide resolved
...sh-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/linkedin/venice/hadoop/task/datawriter/ComplexPartitionerWriterAdapter.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java
Outdated
Show resolved
Hide resolved
...a-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResult.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some early thoughts... did not read the whole PR yet. But hopefully useful in terms of discussing the API changes.
...ce-client-common/src/main/java/com/linkedin/venice/partitioner/VeniceComplexPartitioner.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java
Outdated
Show resolved
Hide resolved
0d80a11
to
fa6c001
Compare
fa6c001
to
fee9bc7
Compare
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Show resolved
Hide resolved
...nci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java
Show resolved
Hide resolved
clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java
Show resolved
Hide resolved
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...nci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java
Outdated
Show resolved
Hide resolved
...nci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java
Outdated
Show resolved
Hide resolved
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Show resolved
Hide resolved
...a-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResult.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/linkedin/venice/hadoop/task/datawriter/ComplexPartitionerWriterAdapter.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriterFactory.java
Show resolved
Hide resolved
...rc/main/java/com/linkedin/venice/hadoop/task/datawriter/ComplexPartitionerWriterAdapter.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java
Show resolved
Hide resolved
…zed view The change will not work if record is actually large and chunked. Proper chunking support is needed and will be addressed in a separate PR. 1. Introduced VeniceComplexPartitioner which extends VenicePartitioner and offer a new API to partition by value and provide possible one-to-many partition mapping. 2. Added value provider of type Lazy<GenericRecord> to VeniceViewWriter's processRecord API to access deserialized value if needed. e.g. when a VeniceComplexPartitioner is involved. 3. MergeConflictResult will now provide deserialized value in a best effort manner. This is useful when we already deserialized the value for a partial update operation so that the deserialized value can be provided directly to the materialized view writer. 4. Refactored VeniceWriter to expose an API to write to desired partition with new DIV. This is only used by the new method writeWithComplexPartitioner for now to handle the partitioning and writes of the same value to mulitple partitions. However, this newly exposed API should also come handy when we build proper chunking support to forward chunks to predetermined view topic partitions. 5. writeWithComplexPartitioner in VeniceWriter will re-chunk when writing to each partition. This should be optimized when we build proper chunking support.
fee9bc7
to
ca52bc5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code change looks good overall.
I do think we need to take care of the comment I just left, which is very tricky as it is a race condition.
Lazy<GenericRecord> oldValueProvider = Lazy.of(() -> { | ||
ChunkedValueManifestContainer oldValueManifestContainer = new ChunkedValueManifestContainer(); | ||
int oldValueReaderSchemaId = schemaRepository.getSupersetSchema(storeName).getId(); | ||
return readStoredValueRecord( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readStoredValueRecord
method would read the most-recent data, which means it will try to read the transient record cache first and then RocksDB.
For WC enabled store, delete
will update the transient record to be null
for the key and I think this method will always return null
.
Even we perform the lookup before updating transient record cache, it will be wrong as it is a lazy
function, so when ViewWriter tries to produce to view topics, it will still read the most recent value, which is null
, and the situation will become worse when parallel compute for AA/WC workload is enabled as all the updates to the same key in the same batch will be executed (updating transient cache) before producing to version/view topics, which means for the delete
operation, the lazy
function can read the most-recent value, which can be populated by a later put
in the same batch.
Can we do a non-lazy lookup always before finding out a more optimized solution?
[server][common][vpj] Introduce ComplexVenicePartitioner to materialized view
The change will not work if record is actually large and chunked. Proper chunking support is needed and will be addressed in a separate PR.
Introduced ComplexVenicePartitioner which extends VenicePartitioner and offer a new API to partition by value and provide possible one-to-many partition mapping.
Added value provider of type Lazy to VeniceViewWriter's processRecord API to access deserialized value if needed. e.g. when a ComplexVenicePartitioner is involved.
MergeConflictResultWrapper and WriteComputeResultWrapper will now provide deserialized value in a best effort manner. This is useful when we already deserialized the value for a partial update operation so that the deserialized value can be provided directly to the materialized view writer.
Refactored VeniceWriter to expose some APIs to child class. Introduced ComplexVeniceWriter which extends VeniceWriter. Reasoning here is that the ComplexVeniceWriter will have different APIs to be used in MaterializedViewWriter and CompositeVeniceWriter to write to materialized view partition(s) and potentially involving a ComplexVenicePartitioner. Alternatively we could push common logic from VeniceWriter to AbstractVeniceWriter. However, ComplexVeniceWriter needs/shares too much common logic with VeniceWriter (chunking, DIV support, pubSubAdapter, etc...) it will make AbstractVeniceWriter too specialized and unable to offer the flexibility it needs to support something like the CompositeVeniceWriter.
Override putLargeValue in ComplexVeniceWriter to skip chunking and writing large messages. Once we have proper chunking support we need to be careful to not re-chunk when writing the same value to different partition in ComplexVeniceWriter.
How was this PR tested?
Added new integration test with A/A, W/C and a new test value based partitioner.
Will add new unit tests once we have some consensus on the API changes
Does this PR introduce any user-facing changes?