From a427eecd6b6623990ac538bad4c34c453ca6fde6 Mon Sep 17 00:00:00 2001 From: Kai-Sern Lim Date: Mon, 6 Jan 2025 10:42:19 -0800 Subject: [PATCH] =?UTF-8?q?Revert=20"Renamed=20`StorePartitionDataReceiver?= =?UTF-8?q?`=20to=20`ConsumedPartitionReceiver`.=20=E2=9C=8D=EF=B8=8F"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit a5f550f34762aa214244322f2a85b5aa77c576ed. --- .../consumer/AggKafkaConsumerService.java | 2 +- .../LeaderFollowerStoreIngestionTask.java | 2 +- .../consumer/StorageUtilizationManager.java | 2 +- .../kafka/consumer/StoreIngestionTask.java | 6 ++-- ...r.java => StorePartitionDataReceiver.java} | 4 +-- .../ActiveActiveStoreIngestionTaskTest.java | 36 +++++++++---------- .../consumer/AggKafkaConsumerServiceTest.java | 4 +-- .../consumer/StoreIngestionTaskTest.java | 16 ++++----- .../venice/kafka/KafkaConsumptionTest.java | 6 ++-- 9 files changed, 39 insertions(+), 39 deletions(-) rename clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/{ConsumedPartitionReceiver.java => StorePartitionDataReceiver.java} (99%) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java index 35439d244bc..decf08933d3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java @@ -397,7 +397,7 @@ public ConsumedDataReceiver>> dataReceiver = - new ConsumedPartitionReceiver( + new StorePartitionDataReceiver( storeIngestionTask, pubSubTopicPartition, kafkaURL, // do not resolve and let it pass downstream for offset tracking purpose diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 63249e18592..da010ba9c75 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -663,7 +663,7 @@ protected void checkLongRunningTaskState() throws InterruptedException { currentLeaderTopic = versionTopic; } /** - * The flag is turned on in {@link ConsumedPartitionReceiver#shouldProcessRecord} avoid consuming + * The flag is turned on in {@link StorePartitionDataReceiver#shouldProcessRecord} avoid consuming * unwanted messages after EOP in remote VT, such as SOBR. Now that the leader switches to consume locally, * it should not skip any message. */ diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorageUtilizationManager.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorageUtilizationManager.java index c1546dfe027..cc83fd1128a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorageUtilizationManager.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorageUtilizationManager.java @@ -216,7 +216,7 @@ public void removePartition(int partition) { /** * Enforce partition level quota for the map. * This function could be invoked by multiple threads when shared consumer is being used. - * Check {@link ConsumedPartitionReceiver#produceToStoreBufferServiceOrKafka} and + * Check {@link StorePartitionDataReceiver#produceToStoreBufferServiceOrKafka} and * {@link StoreIngestionTask#checkIngestionProgress} to find more details. */ public void checkAllPartitionsQuota() { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 94e60163442..450c41527b4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1265,7 +1265,7 @@ protected void checkIngestionProgress(Store store) throws InterruptedException { /** * While using the shared consumer, we still need to check hybrid quota here since the actual disk usage could change * because of compaction or the disk quota could be adjusted even there is no record write. - * Since {@link ConsumedPartitionReceiver#produceToStoreBufferServiceOrKafka} is only being invoked by + * Since {@link StorePartitionDataReceiver#produceToStoreBufferServiceOrKafka} is only being invoked by * {@link KafkaConsumerService} when there are available records, this function needs to check whether we need to * resume the consumption when there are paused consumption because of hybrid quota violation. */ @@ -2769,7 +2769,7 @@ private boolean processControlMessage( case START_OF_PUSH: /** * N.B.: The processing for SOP happens at the very beginning of the pipeline, in: - * {@link ConsumedPartitionReceiver#produceToStoreBufferServiceOrKafka} + * {@link StorePartitionDataReceiver#produceToStoreBufferServiceOrKafka} */ break; case END_OF_PUSH: @@ -2778,7 +2778,7 @@ private boolean processControlMessage( case START_OF_SEGMENT: case END_OF_SEGMENT: /** - * Nothing to do here as all the processing is being done in {@link ConsumedPartitionReceiver#delegateConsumerRecord}. + * Nothing to do here as all the processing is being done in {@link StorePartitionDataReceiver#delegateConsumerRecord}. */ break; case START_OF_INCREMENTAL_PUSH: diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumedPartitionReceiver.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorePartitionDataReceiver.java similarity index 99% rename from clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumedPartitionReceiver.java rename to clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorePartitionDataReceiver.java index 79e18425c81..8db6f4767b9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumedPartitionReceiver.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorePartitionDataReceiver.java @@ -81,7 +81,7 @@ import org.apache.logging.log4j.Logger; -public class ConsumedPartitionReceiver +public class StorePartitionDataReceiver implements ConsumedDataReceiver>> { private final Logger LOGGER; private static final byte[] BINARY_DECODER_PARAM = new byte[16]; @@ -105,7 +105,7 @@ private static class ReusableObjects { private final ThreadLocal threadLocalReusableObjects = ThreadLocal.withInitial(ReusableObjects::new); - public ConsumedPartitionReceiver( + public StorePartitionDataReceiver( StoreIngestionTask storeIngestionTask, PubSubTopicPartition topicPartition, String kafkaUrl, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java index c8ecfbac3a8..41c5bd5e271 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java @@ -135,8 +135,8 @@ public void testHandleDeleteBeforeEOP() { when(serverConfig.isComputeFastAvroEnabled()).thenReturn(false); when(ingestionTask.getServerConfig()).thenReturn(serverConfig); when(ingestionTask.getHostLevelIngestionStats()).thenReturn(mock(HostLevelIngestionStats.class)); - ConsumedPartitionReceiver consumedPartitionReceiver = - spy(new ConsumedPartitionReceiver(ingestionTask, topicPartition, "dummyUrl", 0)); + StorePartitionDataReceiver storePartitionDataReceiver = + spy(new StorePartitionDataReceiver(ingestionTask, topicPartition, "dummyUrl", 0)); PartitionConsumptionState pcs = mock(PartitionConsumptionState.class); when(pcs.isEndOfPushReceived()).thenReturn(false); when(pcs.getVeniceWriterLazyRef()).thenReturn(Lazy.of(() -> mock(VeniceWriter.class))); @@ -157,8 +157,8 @@ public void testHandleDeleteBeforeEOP() { resultWrapper.setProcessedResult(result); ArgumentCaptor leaderProducedRecordContextArgumentCaptor = ArgumentCaptor.forClass(LeaderProducedRecordContext.class); - consumedPartitionReceiver.processMessageAndMaybeProduceToKafka(resultWrapper, pcs, 0, "dummyUrl", 0, 0L, 0L); - verify(consumedPartitionReceiver, times(1)).produceToLocalKafka( + storePartitionDataReceiver.processMessageAndMaybeProduceToKafka(resultWrapper, pcs, 0, "dummyUrl", 0, 0L, 0L); + verify(storePartitionDataReceiver, times(1)).produceToLocalKafka( any(), any(), leaderProducedRecordContextArgumentCaptor.capture(), @@ -178,8 +178,8 @@ public void testGetValueBytesFromTransientRecords(CompressionStrategy strategy) when(ingestionTask.getCompressor()).thenReturn(Lazy.of(() -> compressor)); when(ingestionTask.getCompressionStrategy()).thenReturn(strategy); PubSubTopicPartition topicPartition = mock(PubSubTopicPartition.class); - ConsumedPartitionReceiver consumedPartitionReceiver = - new ConsumedPartitionReceiver(ingestionTask, topicPartition, "dummyUrl", 0); + StorePartitionDataReceiver storePartitionDataReceiver = + new StorePartitionDataReceiver(ingestionTask, topicPartition, "dummyUrl", 0); byte[] dataBytes = "Hello World".getBytes(); byte[] transientRecordValueBytes = dataBytes; @@ -194,7 +194,7 @@ public void testGetValueBytesFromTransientRecords(CompressionStrategy strategy) when(transientRecord.getValue()).thenReturn(transientRecordValueBytes); when(transientRecord.getValueOffset()).thenReturn(startPosition); when(transientRecord.getValueLen()).thenReturn(dataLength); - ByteBuffer result = consumedPartitionReceiver.getCurrentValueFromTransientRecord(transientRecord); + ByteBuffer result = storePartitionDataReceiver.getCurrentValueFromTransientRecord(transientRecord); Assert.assertEquals(result.remaining(), dataBytes.length); byte[] resultByteArray = new byte[result.remaining()]; result.get(resultByteArray); @@ -337,8 +337,8 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(testTopic), partition); - ConsumedPartitionReceiver consumedPartitionReceiver = - new ConsumedPartitionReceiver(ingestionTask, topicPartition, kafkaUrl, kafkaClusterId); + StorePartitionDataReceiver storePartitionDataReceiver = + new StorePartitionDataReceiver(ingestionTask, topicPartition, kafkaUrl, kafkaClusterId); byte[] key = "foo".getBytes(); byte[] updatedKeyBytes = ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(key); @@ -408,11 +408,11 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio KafkaKey kafkaKey = mock(KafkaKey.class); when(consumerRecord.getKey()).thenReturn(kafkaKey); when(kafkaKey.getKey()).thenReturn(new byte[] { 0xa }); - consumedPartitionReceiver.produceToLocalKafka( + storePartitionDataReceiver.produceToLocalKafka( consumerRecord, partitionConsumptionState, leaderProducedRecordContext, - consumedPartitionReceiver.getProduceToTopicFunction( + storePartitionDataReceiver.getProduceToTopicFunction( partitionConsumptionState, updatedKeyBytes, updatedValueBytes, @@ -534,10 +534,10 @@ public void testReadingChunkedRmdFromStorage() { when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(topLevelKey1))) .thenReturn(expectedNonChunkedValue); PubSubTopicPartition topicPartition = mock(PubSubTopicPartition.class); - ConsumedPartitionReceiver consumedPartitionReceiver = - new ConsumedPartitionReceiver(ingestionTask, topicPartition, "dummyUrl", 0); + StorePartitionDataReceiver storePartitionDataReceiver = + new StorePartitionDataReceiver(ingestionTask, topicPartition, "dummyUrl", 0); byte[] result = - consumedPartitionReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key1, container, 0L); + storePartitionDataReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key1, container, 0L); Assert.assertNotNull(result); Assert.assertNull(container.getManifest()); Assert.assertEquals(result, expectedNonChunkedValue); @@ -568,7 +568,7 @@ public void testReadingChunkedRmdFromStorage() { .thenReturn(chunkedManifestBytes.array()); when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(chunkedKey1InKey2))).thenReturn(chunkedValue1); byte[] result2 = - consumedPartitionReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key2, container, 0L); + storePartitionDataReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key2, container, 0L); Assert.assertNotNull(result2); Assert.assertNotNull(container.getManifest()); Assert.assertEquals(container.getManifest().getKeysWithChunkIdSuffix().size(), 1); @@ -605,7 +605,7 @@ public void testReadingChunkedRmdFromStorage() { when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(chunkedKey1InKey3))).thenReturn(chunkedValue1); when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(chunkedKey2InKey3))).thenReturn(chunkedValue2); byte[] result3 = - consumedPartitionReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key3, container, 0L); + storePartitionDataReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key3, container, 0L); Assert.assertNotNull(result3); Assert.assertNotNull(container.getManifest()); Assert.assertEquals(container.getManifest().getKeysWithChunkIdSuffix().size(), 2); @@ -614,11 +614,11 @@ public void testReadingChunkedRmdFromStorage() { @Test public void testUnwrapByteBufferFromOldValueProvider() { - Lazy lazyBB = ConsumedPartitionReceiver.unwrapByteBufferFromOldValueProvider(Lazy.of(() -> null)); + Lazy lazyBB = StorePartitionDataReceiver.unwrapByteBufferFromOldValueProvider(Lazy.of(() -> null)); assertNotNull(lazyBB); assertNull(lazyBB.get()); - lazyBB = ConsumedPartitionReceiver.unwrapByteBufferFromOldValueProvider( + lazyBB = StorePartitionDataReceiver.unwrapByteBufferFromOldValueProvider( Lazy.of(() -> new ByteBufferValueRecord<>(ByteBuffer.wrap(new byte[1]), 1))); assertNotNull(lazyBB); assertNotNull(lazyBB.get()); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java index e72f5c5fef0..1a3193238c8 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerServiceTest.java @@ -123,14 +123,14 @@ public void testSubscribeConsumerFor() { topicPartition, PartitionReplicaIngestionContext.VersionRole.CURRENT, PartitionReplicaIngestionContext.WorkloadType.NON_AA_OR_WRITE_COMPUTE); - ConsumedPartitionReceiver dataReceiver = (ConsumedPartitionReceiver) aggKafkaConsumerServiceSpy + StorePartitionDataReceiver dataReceiver = (StorePartitionDataReceiver) aggKafkaConsumerServiceSpy .subscribeConsumerFor(PUBSUB_URL, storeIngestionTask, partitionReplicaIngestionContext, -1); // regular pubsub url uses the default cluster id Assert.assertEquals(dataReceiver.getKafkaClusterId(), 0); verify(topicManager).prefetchAndCacheLatestOffset(topicPartition); - dataReceiver = (ConsumedPartitionReceiver) aggKafkaConsumerServiceSpy + dataReceiver = (StorePartitionDataReceiver) aggKafkaConsumerServiceSpy .subscribeConsumerFor(PUBSUB_URL_SEP, storeIngestionTask, partitionReplicaIngestionContext, -1); // pubsub url for sep topic uses a different cluster id Assert.assertEquals(dataReceiver.getKafkaClusterId(), 1); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 5d8fe660b83..9ddc61f10d0 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -375,8 +375,8 @@ public static Object[][] sortedInputAndAAConfigProvider() { private KafkaConsumerService localKafkaConsumerService; private KafkaConsumerService remoteKafkaConsumerService; - private ConsumedPartitionReceiver localConsumedDataReceiver; - private ConsumedPartitionReceiver remoteConsumedDataReceiver; + private StorePartitionDataReceiver localConsumedDataReceiver; + private StorePartitionDataReceiver remoteConsumedDataReceiver; private static String storeNameWithoutVersionInfo; private StoreInfo storeInfo; @@ -1147,7 +1147,7 @@ private void prepareAggKafkaConsumerServiceMock() { kafkaConsumerService = remoteKafkaConsumerService; kafkaClusterId = 1; } - ConsumedPartitionReceiver dataReceiver = new ConsumedPartitionReceiver( + StorePartitionDataReceiver dataReceiver = new StorePartitionDataReceiver( storeIngestionTask, partitionReplicaIngestionContext.getPubSubTopicPartition(), kafkaUrl, @@ -3330,7 +3330,7 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT null); if (hybridConfig.equals(HYBRID) && nodeType.equals(LEADER) && isAaWCParallelProcessingEnabled()) { - localConsumedDataReceiver = new ConsumedPartitionReceiver( + localConsumedDataReceiver = new StorePartitionDataReceiver( storeIngestionTaskUnderTest, fooTopicPartition, inMemoryLocalKafkaBroker.getKafkaBootstrapServer(), @@ -3656,8 +3656,8 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node 0, 0, pubSubMessageHeaders); - ConsumedPartitionReceiver consumedPartitionReceiver = - new ConsumedPartitionReceiver(ingestionTask, topicPartition, "dummyUrl", 0); + StorePartitionDataReceiver storePartitionDataReceiver = + new StorePartitionDataReceiver(ingestionTask, topicPartition, "dummyUrl", 0); assertEquals(partitionConsumptionState.getLeaderCompleteState(), LEADER_NOT_COMPLETED); assertEquals(partitionConsumptionState.getLastLeaderCompleteStateUpdateInMs(), 0L); @@ -3668,7 +3668,7 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node if (nodeType != DA_VINCI) { partitionConsumptionState.setLeaderFollowerState(LeaderFollowerStateType.LEADER); - consumedPartitionReceiver.getAndUpdateLeaderCompletedState( + storePartitionDataReceiver.getAndUpdateLeaderCompletedState( kafkaKey, kafkaValue, controlMessage, @@ -3679,7 +3679,7 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node } partitionConsumptionState.setLeaderFollowerState(STANDBY); - consumedPartitionReceiver.getAndUpdateLeaderCompletedState( + storePartitionDataReceiver.getAndUpdateLeaderCompletedState( kafkaKey, kafkaValue, controlMessage, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/KafkaConsumptionTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/KafkaConsumptionTest.java index 4a3d715d497..0e6400ec75b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/KafkaConsumptionTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/kafka/KafkaConsumptionTest.java @@ -9,13 +9,13 @@ import com.github.benmanes.caffeine.cache.Cache; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.kafka.consumer.AggKafkaConsumerService; -import com.linkedin.davinci.kafka.consumer.ConsumedPartitionReceiver; import com.linkedin.davinci.kafka.consumer.IngestionThrottler; import com.linkedin.davinci.kafka.consumer.KafkaClusterBasedRecordThrottler; import com.linkedin.davinci.kafka.consumer.KafkaConsumerService; import com.linkedin.davinci.kafka.consumer.KafkaConsumerServiceDelegator; import com.linkedin.davinci.kafka.consumer.PartitionReplicaIngestionContext; import com.linkedin.davinci.kafka.consumer.StoreIngestionTask; +import com.linkedin.davinci.kafka.consumer.StorePartitionDataReceiver; import com.linkedin.davinci.kafka.consumer.TopicExistenceChecker; import com.linkedin.venice.integration.utils.PubSubBrokerConfigs; import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; @@ -220,7 +220,7 @@ public void testLocalAndRemoteConsumption(boolean isTopicWiseSharedConsumerAssig pubSubTopicPartition, PartitionReplicaIngestionContext.VersionRole.CURRENT, PartitionReplicaIngestionContext.WorkloadType.AA_OR_WRITE_COMPUTE); - ConsumedPartitionReceiver localDataReceiver = (ConsumedPartitionReceiver) aggKafkaConsumerService + StorePartitionDataReceiver localDataReceiver = (StorePartitionDataReceiver) aggKafkaConsumerService .subscribeConsumerFor(localKafkaUrl, storeIngestionTask, partitionReplicaIngestionContext, -1); Assert .assertTrue(aggKafkaConsumerService.hasConsumerAssignedFor(localKafkaUrl, versionTopic, pubSubTopicPartition)); @@ -232,7 +232,7 @@ public void testLocalAndRemoteConsumption(boolean isTopicWiseSharedConsumerAssig consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerProperties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024); aggKafkaConsumerService.createKafkaConsumerService(consumerProperties); - ConsumedPartitionReceiver remoteDataReceiver = (ConsumedPartitionReceiver) aggKafkaConsumerService + StorePartitionDataReceiver remoteDataReceiver = (StorePartitionDataReceiver) aggKafkaConsumerService .subscribeConsumerFor(remoteKafkaUrl, storeIngestionTask, partitionReplicaIngestionContext, -1); Assert .assertTrue(aggKafkaConsumerService.hasConsumerAssignedFor(remoteKafkaUrl, versionTopic, pubSubTopicPartition));