Skip to content

Commit

Permalink
add requestbasedmetarepository integration testing
Browse files Browse the repository at this point in the history
  • Loading branch information
pthirun committed Feb 19, 2025
1 parent 70236be commit 445c790
Show file tree
Hide file tree
Showing 7 changed files with 393 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public class DaVinciConfig {
*/
private int largeBatchRequestSplitThreshold = AvroGenericDaVinciClient.DEFAULT_CHUNK_SPLIT_THRESHOLD;

/**
* Determines whether to enable request-based metadata retrieval directly from the Venice Server.
* By default, metadata is retrieved from a system store via a thin client.
*/
private boolean useRequestBasedMetaRepository = false;

public DaVinciConfig() {
}

Expand Down Expand Up @@ -147,4 +153,13 @@ public DaVinciConfig setLargeBatchRequestSplitThreshold(int largeBatchRequestSpl
this.largeBatchRequestSplitThreshold = largeBatchRequestSplitThreshold;
return this;
}

public boolean isUseRequestBasedMetaRepository() {
return useRequestBasedMetaRepository;
}

public DaVinciConfig setUseRequestBasedMetaRepository(boolean useRequestBasedMetaRepository) {
this.useRequestBasedMetaRepository = useRequestBasedMetaRepository;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ protected synchronized DaVinciClient getClient(
ClientConfig clientConfig = new ClientConfig(internalStoreName).setD2Client(d2Client)
.setD2ServiceName(clusterDiscoveryD2ServiceName)
.setMetricsRepository(metricsRepository)
.setSpecificValueClass(valueClass);
.setSpecificValueClass(valueClass)
.setUseRequestBasedMetaRepository(config.isUseRequestBasedMetaRepository());

DaVinciClient client;
if (config.isIsolated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public static NativeMetadataRepository getInstance(
ClientConfig clientConfig,
VeniceProperties backendConfig,
ICProvider icProvider) {

NativeMetadataRepository nativeMetadataRepository;
if (clientConfig.isUseRequestBasedMetaRepository()) {
nativeMetadataRepository = new RequestBasedMetaRepository(clientConfig, backendConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,15 @@ protected int getMaxValueSchemaId(String storeName) {
}

protected void cacheStoreSchema(String storeName, StorePropertiesResponseRecord record) {

if (!storeSchemaMap.containsKey(storeName)) {
// New schema data
// New store
Map.Entry<CharSequence, CharSequence> keySchemaEntry =
record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().entrySet().iterator().next();
SchemaData schemaData = new SchemaData(
storeName,
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()));
storeSchemaMap.put(storeName, schemaData);
}

// Store Value Schemas
for (Map.Entry<CharSequence, CharSequence> entry: record.getStoreMetaValue()
.getStoreValueSchemas()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.repository.NativeMetadataRepository;
import com.linkedin.davinci.repository.RequestBasedMetaRepository;
import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
Expand Down Expand Up @@ -289,50 +288,6 @@ public void testThinClientMetaStoreBasedRepository() throws InterruptedException
}
}

// TODO PRANAV move this test and use the full DVC
// Can we add a new test file where we run a more
// comprehensive integration test with a DVC? i.e
// push some new versions or make some store config
// changes and make sure the DVC pick up those changes.
// You can see examples like the recently added
// testBatchOnlyMaterializedViewDVCConsumer.
// You probably don't need a VeniceTwoLayerMultiRegionMultiClusterWrapper,
// a single region will be sufficient.
@Test(timeOut = 120 * Time.MS_PER_SECOND)
public void testRequestBasedMetaStoreBasedRepository() throws InterruptedException {
String regularVeniceStoreName = Utils.getUniqueString("venice_store");
createStoreAndMaterializeMetaSystemStore(regularVeniceStoreName);
D2Client d2Client = null;
NativeMetadataRepository nativeMetadataRepository = null;
try {
d2Client = D2TestUtils.getAndStartD2Client(veniceLocalCluster.getZk().getAddress());
ClientConfig<StoreMetaValue> clientConfig =
getClientConfig(regularVeniceStoreName, d2Client).setUseRequestBasedMetaRepository(true);
// Not providing a CLIENT_META_SYSTEM_STORE_VERSION_MAP, should use the default value of 1 for system store
// current version.
VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();
nativeMetadataRepository = NativeMetadataRepository.getInstance(clientConfig, backendConfig);
nativeMetadataRepository.start();
// RequestBasedMetaRepository implementation should be used since
Assert.assertTrue(nativeMetadataRepository instanceof RequestBasedMetaRepository);
verifyRepository(nativeMetadataRepository, regularVeniceStoreName);
} finally {
if (d2Client != null) {
D2ClientUtils.shutdownClient(d2Client);
}
if (nativeMetadataRepository != null) {
// Calling clear explicitly here because if the NativeMetadataRepository implementation used happens to
// initialize
// a new DaVinciBackend then calling clear will trigger the cleanup logic to ensure the DaVinciBackend is not
// leaked
// into other tests.
nativeMetadataRepository.clear();
}
}
}

@Test(timeOut = 60 * Time.MS_PER_SECOND)
public void testThinClientMetaStoreBasedRepositoryWithLargeValueSchemas() throws InterruptedException {
String regularVeniceStoreName = Utils.getUniqueString("venice_store");
Expand Down Expand Up @@ -509,15 +464,8 @@ private void createStoreAndMaterializeMetaSystemStore(String storeName, String v
if (parentControllerClient.getStore(storeName).getStore() == null) {
NewStoreResponse resp =
parentControllerClient.createNewStore(storeName, "test_owner", INT_KEY_SCHEMA, valueSchema);
if (resp.isError()) {
System.out.println("Create new store failed: " + resp.getError());
}
assertFalse(resp.isError());
assertFalse(resp.isError(), "Create new store failed: " + resp.getError());
assertFalse(parentControllerClient.emptyPush(storeName, "test-push-job", 100).isError());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
}
String metaSystemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName);
TestUtils.waitForNonDeterministicPushCompletion(
Expand Down
Loading

0 comments on commit 445c790

Please sign in to comment.