Skip to content

Commit

Permalink
Revert "Renamed StorePartitionDataReceiver to `ConsumedPartitionRec…
Browse files Browse the repository at this point in the history
…eiver`. ✍️"

This reverts commit a5f550f.
  • Loading branch information
KaiSernLim committed Jan 6, 2025
1 parent a5f550f commit a427eec
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, L
}

ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> dataReceiver =
new ConsumedPartitionReceiver(
new StorePartitionDataReceiver(
storeIngestionTask,
pubSubTopicPartition,
kafkaURL, // do not resolve and let it pass downstream for offset tracking purpose
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ protected void checkLongRunningTaskState() throws InterruptedException {
currentLeaderTopic = versionTopic;
}
/**
* The flag is turned on in {@link ConsumedPartitionReceiver#shouldProcessRecord} avoid consuming
* The flag is turned on in {@link StorePartitionDataReceiver#shouldProcessRecord} avoid consuming
* unwanted messages after EOP in remote VT, such as SOBR. Now that the leader switches to consume locally,
* it should not skip any message.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void removePartition(int partition) {
/**
* Enforce partition level quota for the map.
* This function could be invoked by multiple threads when shared consumer is being used.
* Check {@link ConsumedPartitionReceiver#produceToStoreBufferServiceOrKafka} and
* Check {@link StorePartitionDataReceiver#produceToStoreBufferServiceOrKafka} and
* {@link StoreIngestionTask#checkIngestionProgress} to find more details.
*/
public void checkAllPartitionsQuota() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ protected void checkIngestionProgress(Store store) throws InterruptedException {
/**
* While using the shared consumer, we still need to check hybrid quota here since the actual disk usage could change
* because of compaction or the disk quota could be adjusted even there is no record write.
* Since {@link ConsumedPartitionReceiver#produceToStoreBufferServiceOrKafka} is only being invoked by
* Since {@link StorePartitionDataReceiver#produceToStoreBufferServiceOrKafka} is only being invoked by
* {@link KafkaConsumerService} when there are available records, this function needs to check whether we need to
* resume the consumption when there are paused consumption because of hybrid quota violation.
*/
Expand Down Expand Up @@ -2769,7 +2769,7 @@ private boolean processControlMessage(
case START_OF_PUSH:
/**
* N.B.: The processing for SOP happens at the very beginning of the pipeline, in:
* {@link ConsumedPartitionReceiver#produceToStoreBufferServiceOrKafka}
* {@link StorePartitionDataReceiver#produceToStoreBufferServiceOrKafka}
*/
break;
case END_OF_PUSH:
Expand All @@ -2778,7 +2778,7 @@ private boolean processControlMessage(
case START_OF_SEGMENT:
case END_OF_SEGMENT:
/**
* Nothing to do here as all the processing is being done in {@link ConsumedPartitionReceiver#delegateConsumerRecord}.
* Nothing to do here as all the processing is being done in {@link StorePartitionDataReceiver#delegateConsumerRecord}.
*/
break;
case START_OF_INCREMENTAL_PUSH:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
import org.apache.logging.log4j.Logger;


public class ConsumedPartitionReceiver
public class StorePartitionDataReceiver
implements ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> {
private final Logger LOGGER;
private static final byte[] BINARY_DECODER_PARAM = new byte[16];
Expand All @@ -105,7 +105,7 @@ private static class ReusableObjects {

private final ThreadLocal<ReusableObjects> threadLocalReusableObjects = ThreadLocal.withInitial(ReusableObjects::new);

public ConsumedPartitionReceiver(
public StorePartitionDataReceiver(
StoreIngestionTask storeIngestionTask,
PubSubTopicPartition topicPartition,
String kafkaUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public void testHandleDeleteBeforeEOP() {
when(serverConfig.isComputeFastAvroEnabled()).thenReturn(false);
when(ingestionTask.getServerConfig()).thenReturn(serverConfig);
when(ingestionTask.getHostLevelIngestionStats()).thenReturn(mock(HostLevelIngestionStats.class));
ConsumedPartitionReceiver consumedPartitionReceiver =
spy(new ConsumedPartitionReceiver(ingestionTask, topicPartition, "dummyUrl", 0));
StorePartitionDataReceiver storePartitionDataReceiver =
spy(new StorePartitionDataReceiver(ingestionTask, topicPartition, "dummyUrl", 0));
PartitionConsumptionState pcs = mock(PartitionConsumptionState.class);
when(pcs.isEndOfPushReceived()).thenReturn(false);
when(pcs.getVeniceWriterLazyRef()).thenReturn(Lazy.of(() -> mock(VeniceWriter.class)));
Expand All @@ -157,8 +157,8 @@ public void testHandleDeleteBeforeEOP() {
resultWrapper.setProcessedResult(result);
ArgumentCaptor<LeaderProducedRecordContext> leaderProducedRecordContextArgumentCaptor =
ArgumentCaptor.forClass(LeaderProducedRecordContext.class);
consumedPartitionReceiver.processMessageAndMaybeProduceToKafka(resultWrapper, pcs, 0, "dummyUrl", 0, 0L, 0L);
verify(consumedPartitionReceiver, times(1)).produceToLocalKafka(
storePartitionDataReceiver.processMessageAndMaybeProduceToKafka(resultWrapper, pcs, 0, "dummyUrl", 0, 0L, 0L);
verify(storePartitionDataReceiver, times(1)).produceToLocalKafka(
any(),
any(),
leaderProducedRecordContextArgumentCaptor.capture(),
Expand All @@ -178,8 +178,8 @@ public void testGetValueBytesFromTransientRecords(CompressionStrategy strategy)
when(ingestionTask.getCompressor()).thenReturn(Lazy.of(() -> compressor));
when(ingestionTask.getCompressionStrategy()).thenReturn(strategy);
PubSubTopicPartition topicPartition = mock(PubSubTopicPartition.class);
ConsumedPartitionReceiver consumedPartitionReceiver =
new ConsumedPartitionReceiver(ingestionTask, topicPartition, "dummyUrl", 0);
StorePartitionDataReceiver storePartitionDataReceiver =
new StorePartitionDataReceiver(ingestionTask, topicPartition, "dummyUrl", 0);

byte[] dataBytes = "Hello World".getBytes();
byte[] transientRecordValueBytes = dataBytes;
Expand All @@ -194,7 +194,7 @@ public void testGetValueBytesFromTransientRecords(CompressionStrategy strategy)
when(transientRecord.getValue()).thenReturn(transientRecordValueBytes);
when(transientRecord.getValueOffset()).thenReturn(startPosition);
when(transientRecord.getValueLen()).thenReturn(dataLength);
ByteBuffer result = consumedPartitionReceiver.getCurrentValueFromTransientRecord(transientRecord);
ByteBuffer result = storePartitionDataReceiver.getCurrentValueFromTransientRecord(transientRecord);
Assert.assertEquals(result.remaining(), dataBytes.length);
byte[] resultByteArray = new byte[result.remaining()];
result.get(resultByteArray);
Expand Down Expand Up @@ -337,8 +337,8 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio
PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(testTopic), partition);
ConsumedPartitionReceiver consumedPartitionReceiver =
new ConsumedPartitionReceiver(ingestionTask, topicPartition, kafkaUrl, kafkaClusterId);
StorePartitionDataReceiver storePartitionDataReceiver =
new StorePartitionDataReceiver(ingestionTask, topicPartition, kafkaUrl, kafkaClusterId);

byte[] key = "foo".getBytes();
byte[] updatedKeyBytes = ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(key);
Expand Down Expand Up @@ -408,11 +408,11 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio
KafkaKey kafkaKey = mock(KafkaKey.class);
when(consumerRecord.getKey()).thenReturn(kafkaKey);
when(kafkaKey.getKey()).thenReturn(new byte[] { 0xa });
consumedPartitionReceiver.produceToLocalKafka(
storePartitionDataReceiver.produceToLocalKafka(
consumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
consumedPartitionReceiver.getProduceToTopicFunction(
storePartitionDataReceiver.getProduceToTopicFunction(
partitionConsumptionState,
updatedKeyBytes,
updatedValueBytes,
Expand Down Expand Up @@ -534,10 +534,10 @@ public void testReadingChunkedRmdFromStorage() {
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(topLevelKey1)))
.thenReturn(expectedNonChunkedValue);
PubSubTopicPartition topicPartition = mock(PubSubTopicPartition.class);
ConsumedPartitionReceiver consumedPartitionReceiver =
new ConsumedPartitionReceiver(ingestionTask, topicPartition, "dummyUrl", 0);
StorePartitionDataReceiver storePartitionDataReceiver =
new StorePartitionDataReceiver(ingestionTask, topicPartition, "dummyUrl", 0);
byte[] result =
consumedPartitionReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key1, container, 0L);
storePartitionDataReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key1, container, 0L);
Assert.assertNotNull(result);
Assert.assertNull(container.getManifest());
Assert.assertEquals(result, expectedNonChunkedValue);
Expand Down Expand Up @@ -568,7 +568,7 @@ public void testReadingChunkedRmdFromStorage() {
.thenReturn(chunkedManifestBytes.array());
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(chunkedKey1InKey2))).thenReturn(chunkedValue1);
byte[] result2 =
consumedPartitionReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key2, container, 0L);
storePartitionDataReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key2, container, 0L);
Assert.assertNotNull(result2);
Assert.assertNotNull(container.getManifest());
Assert.assertEquals(container.getManifest().getKeysWithChunkIdSuffix().size(), 1);
Expand Down Expand Up @@ -605,7 +605,7 @@ public void testReadingChunkedRmdFromStorage() {
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(chunkedKey1InKey3))).thenReturn(chunkedValue1);
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(chunkedKey2InKey3))).thenReturn(chunkedValue2);
byte[] result3 =
consumedPartitionReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key3, container, 0L);
storePartitionDataReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key3, container, 0L);
Assert.assertNotNull(result3);
Assert.assertNotNull(container.getManifest());
Assert.assertEquals(container.getManifest().getKeysWithChunkIdSuffix().size(), 2);
Expand All @@ -614,11 +614,11 @@ public void testReadingChunkedRmdFromStorage() {

@Test
public void testUnwrapByteBufferFromOldValueProvider() {
Lazy<ByteBuffer> lazyBB = ConsumedPartitionReceiver.unwrapByteBufferFromOldValueProvider(Lazy.of(() -> null));
Lazy<ByteBuffer> lazyBB = StorePartitionDataReceiver.unwrapByteBufferFromOldValueProvider(Lazy.of(() -> null));
assertNotNull(lazyBB);
assertNull(lazyBB.get());

lazyBB = ConsumedPartitionReceiver.unwrapByteBufferFromOldValueProvider(
lazyBB = StorePartitionDataReceiver.unwrapByteBufferFromOldValueProvider(
Lazy.of(() -> new ByteBufferValueRecord<>(ByteBuffer.wrap(new byte[1]), 1)));
assertNotNull(lazyBB);
assertNotNull(lazyBB.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ public void testSubscribeConsumerFor() {
topicPartition,
PartitionReplicaIngestionContext.VersionRole.CURRENT,
PartitionReplicaIngestionContext.WorkloadType.NON_AA_OR_WRITE_COMPUTE);
ConsumedPartitionReceiver dataReceiver = (ConsumedPartitionReceiver) aggKafkaConsumerServiceSpy
StorePartitionDataReceiver dataReceiver = (StorePartitionDataReceiver) aggKafkaConsumerServiceSpy
.subscribeConsumerFor(PUBSUB_URL, storeIngestionTask, partitionReplicaIngestionContext, -1);

// regular pubsub url uses the default cluster id
Assert.assertEquals(dataReceiver.getKafkaClusterId(), 0);
verify(topicManager).prefetchAndCacheLatestOffset(topicPartition);

dataReceiver = (ConsumedPartitionReceiver) aggKafkaConsumerServiceSpy
dataReceiver = (StorePartitionDataReceiver) aggKafkaConsumerServiceSpy
.subscribeConsumerFor(PUBSUB_URL_SEP, storeIngestionTask, partitionReplicaIngestionContext, -1);
// pubsub url for sep topic uses a different cluster id
Assert.assertEquals(dataReceiver.getKafkaClusterId(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ public static Object[][] sortedInputAndAAConfigProvider() {
private KafkaConsumerService localKafkaConsumerService;
private KafkaConsumerService remoteKafkaConsumerService;

private ConsumedPartitionReceiver localConsumedDataReceiver;
private ConsumedPartitionReceiver remoteConsumedDataReceiver;
private StorePartitionDataReceiver localConsumedDataReceiver;
private StorePartitionDataReceiver remoteConsumedDataReceiver;

private static String storeNameWithoutVersionInfo;
private StoreInfo storeInfo;
Expand Down Expand Up @@ -1147,7 +1147,7 @@ private void prepareAggKafkaConsumerServiceMock() {
kafkaConsumerService = remoteKafkaConsumerService;
kafkaClusterId = 1;
}
ConsumedPartitionReceiver dataReceiver = new ConsumedPartitionReceiver(
StorePartitionDataReceiver dataReceiver = new StorePartitionDataReceiver(
storeIngestionTask,
partitionReplicaIngestionContext.getPubSubTopicPartition(),
kafkaUrl,
Expand Down Expand Up @@ -3330,7 +3330,7 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT
null);

if (hybridConfig.equals(HYBRID) && nodeType.equals(LEADER) && isAaWCParallelProcessingEnabled()) {
localConsumedDataReceiver = new ConsumedPartitionReceiver(
localConsumedDataReceiver = new StorePartitionDataReceiver(
storeIngestionTaskUnderTest,
fooTopicPartition,
inMemoryLocalKafkaBroker.getKafkaBootstrapServer(),
Expand Down Expand Up @@ -3656,8 +3656,8 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node
0,
0,
pubSubMessageHeaders);
ConsumedPartitionReceiver consumedPartitionReceiver =
new ConsumedPartitionReceiver(ingestionTask, topicPartition, "dummyUrl", 0);
StorePartitionDataReceiver storePartitionDataReceiver =
new StorePartitionDataReceiver(ingestionTask, topicPartition, "dummyUrl", 0);

assertEquals(partitionConsumptionState.getLeaderCompleteState(), LEADER_NOT_COMPLETED);
assertEquals(partitionConsumptionState.getLastLeaderCompleteStateUpdateInMs(), 0L);
Expand All @@ -3668,7 +3668,7 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node

if (nodeType != DA_VINCI) {
partitionConsumptionState.setLeaderFollowerState(LeaderFollowerStateType.LEADER);
consumedPartitionReceiver.getAndUpdateLeaderCompletedState(
storePartitionDataReceiver.getAndUpdateLeaderCompletedState(
kafkaKey,
kafkaValue,
controlMessage,
Expand All @@ -3679,7 +3679,7 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node
}

partitionConsumptionState.setLeaderFollowerState(STANDBY);
consumedPartitionReceiver.getAndUpdateLeaderCompletedState(
storePartitionDataReceiver.getAndUpdateLeaderCompletedState(
kafkaKey,
kafkaValue,
controlMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.kafka.consumer.AggKafkaConsumerService;
import com.linkedin.davinci.kafka.consumer.ConsumedPartitionReceiver;
import com.linkedin.davinci.kafka.consumer.IngestionThrottler;
import com.linkedin.davinci.kafka.consumer.KafkaClusterBasedRecordThrottler;
import com.linkedin.davinci.kafka.consumer.KafkaConsumerService;
import com.linkedin.davinci.kafka.consumer.KafkaConsumerServiceDelegator;
import com.linkedin.davinci.kafka.consumer.PartitionReplicaIngestionContext;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTask;
import com.linkedin.davinci.kafka.consumer.StorePartitionDataReceiver;
import com.linkedin.davinci.kafka.consumer.TopicExistenceChecker;
import com.linkedin.venice.integration.utils.PubSubBrokerConfigs;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
Expand Down Expand Up @@ -220,7 +220,7 @@ public void testLocalAndRemoteConsumption(boolean isTopicWiseSharedConsumerAssig
pubSubTopicPartition,
PartitionReplicaIngestionContext.VersionRole.CURRENT,
PartitionReplicaIngestionContext.WorkloadType.AA_OR_WRITE_COMPUTE);
ConsumedPartitionReceiver localDataReceiver = (ConsumedPartitionReceiver) aggKafkaConsumerService
StorePartitionDataReceiver localDataReceiver = (StorePartitionDataReceiver) aggKafkaConsumerService
.subscribeConsumerFor(localKafkaUrl, storeIngestionTask, partitionReplicaIngestionContext, -1);
Assert
.assertTrue(aggKafkaConsumerService.hasConsumerAssignedFor(localKafkaUrl, versionTopic, pubSubTopicPartition));
Expand All @@ -232,7 +232,7 @@ public void testLocalAndRemoteConsumption(boolean isTopicWiseSharedConsumerAssig
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProperties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
aggKafkaConsumerService.createKafkaConsumerService(consumerProperties);
ConsumedPartitionReceiver remoteDataReceiver = (ConsumedPartitionReceiver) aggKafkaConsumerService
StorePartitionDataReceiver remoteDataReceiver = (StorePartitionDataReceiver) aggKafkaConsumerService
.subscribeConsumerFor(remoteKafkaUrl, storeIngestionTask, partitionReplicaIngestionContext, -1);
Assert
.assertTrue(aggKafkaConsumerService.hasConsumerAssignedFor(remoteKafkaUrl, versionTopic, pubSubTopicPartition));
Expand Down

0 comments on commit a427eec

Please sign in to comment.