Skip to content

Commit

Permalink
add requestbasedmetarepository integration testing
Browse files Browse the repository at this point in the history
  • Loading branch information
pthirun committed Feb 13, 2025
1 parent 70236be commit 06c75e5
Show file tree
Hide file tree
Showing 11 changed files with 534 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public class DaVinciConfig {
*/
private int largeBatchRequestSplitThreshold = AvroGenericDaVinciClient.DEFAULT_CHUNK_SPLIT_THRESHOLD;

/**
* Determines whether to enable request-based metadata retrieval directly from the Venice Server.
* By default, metadata is retrieved from a system store via a thin client.
*/
private boolean useRequestBasedMetaRepository = false;

public DaVinciConfig() {
}

Expand Down Expand Up @@ -147,4 +153,13 @@ public DaVinciConfig setLargeBatchRequestSplitThreshold(int largeBatchRequestSpl
this.largeBatchRequestSplitThreshold = largeBatchRequestSplitThreshold;
return this;
}

public boolean isUseRequestBasedMetaRepository() {
return useRequestBasedMetaRepository;
}

public DaVinciConfig setUseRequestBasedMetaRepository(boolean useRequestBasedMetaRepository) {
this.useRequestBasedMetaRepository = useRequestBasedMetaRepository;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ protected synchronized DaVinciClient getClient(
ClientConfig clientConfig = new ClientConfig(internalStoreName).setD2Client(d2Client)
.setD2ServiceName(clusterDiscoveryD2ServiceName)
.setMetricsRepository(metricsRepository)
.setSpecificValueClass(valueClass);
.setSpecificValueClass(valueClass)
.setUseRequestBasedMetaRepository(config.isUseRequestBasedMetaRepository());

DaVinciClient client;
if (config.isIsolated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public static NativeMetadataRepository getInstance(
ClientConfig clientConfig,
VeniceProperties backendConfig,
ICProvider icProvider) {

NativeMetadataRepository nativeMetadataRepository;
if (clientConfig.isUseRequestBasedMetaRepository()) {
nativeMetadataRepository = new RequestBasedMetaRepository(clientConfig, backendConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,15 @@ protected int getMaxValueSchemaId(String storeName) {
}

protected void cacheStoreSchema(String storeName, StorePropertiesResponseRecord record) {

if (!storeSchemaMap.containsKey(storeName)) {
// New schema data
// New store
Map.Entry<CharSequence, CharSequence> keySchemaEntry =
record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().entrySet().iterator().next();
SchemaData schemaData = new SchemaData(
storeName,
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()));
storeSchemaMap.put(storeName, schemaData);
}

// Store Value Schemas
for (Map.Entry<CharSequence, CharSequence> entry: record.getStoreMetaValue()
.getStoreValueSchemas()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.repository.NativeMetadataRepository;
import com.linkedin.davinci.repository.RequestBasedMetaRepository;
import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
Expand Down Expand Up @@ -289,50 +288,6 @@ public void testThinClientMetaStoreBasedRepository() throws InterruptedException
}
}

// TODO PRANAV move this test and use the full DVC
// Can we add a new test file where we run a more
// comprehensive integration test with a DVC? i.e
// push some new versions or make some store config
// changes and make sure the DVC pick up those changes.
// You can see examples like the recently added
// testBatchOnlyMaterializedViewDVCConsumer.
// You probably don't need a VeniceTwoLayerMultiRegionMultiClusterWrapper,
// a single region will be sufficient.
@Test(timeOut = 120 * Time.MS_PER_SECOND)
public void testRequestBasedMetaStoreBasedRepository() throws InterruptedException {
String regularVeniceStoreName = Utils.getUniqueString("venice_store");
createStoreAndMaterializeMetaSystemStore(regularVeniceStoreName);
D2Client d2Client = null;
NativeMetadataRepository nativeMetadataRepository = null;
try {
d2Client = D2TestUtils.getAndStartD2Client(veniceLocalCluster.getZk().getAddress());
ClientConfig<StoreMetaValue> clientConfig =
getClientConfig(regularVeniceStoreName, d2Client).setUseRequestBasedMetaRepository(true);
// Not providing a CLIENT_META_SYSTEM_STORE_VERSION_MAP, should use the default value of 1 for system store
// current version.
VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();
nativeMetadataRepository = NativeMetadataRepository.getInstance(clientConfig, backendConfig);
nativeMetadataRepository.start();
// RequestBasedMetaRepository implementation should be used since
Assert.assertTrue(nativeMetadataRepository instanceof RequestBasedMetaRepository);
verifyRepository(nativeMetadataRepository, regularVeniceStoreName);
} finally {
if (d2Client != null) {
D2ClientUtils.shutdownClient(d2Client);
}
if (nativeMetadataRepository != null) {
// Calling clear explicitly here because if the NativeMetadataRepository implementation used happens to
// initialize
// a new DaVinciBackend then calling clear will trigger the cleanup logic to ensure the DaVinciBackend is not
// leaked
// into other tests.
nativeMetadataRepository.clear();
}
}
}

@Test(timeOut = 60 * Time.MS_PER_SECOND)
public void testThinClientMetaStoreBasedRepositoryWithLargeValueSchemas() throws InterruptedException {
String regularVeniceStoreName = Utils.getUniqueString("venice_store");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY;
import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH;
import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP;
import static org.testng.Assert.assertNotNull;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.Metric;
import io.tehuti.metrics.MetricsRepository;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TestDaVinciRequestBasedMetaRepository {
private static final int TEST_TIMEOUT = 2 * Time.MS_PER_MINUTE;

private static final String CLUSTER_NAME = "venice-cluster";
private VeniceClusterWrapper clusterWrapper;

private static final String storeNameStringToString = "store-name-string-to-string";
private static final String storeNameStringToNameRecord = "store-name-string-to-name-record";

// StoreName -> ControllerClient
// Using map to check which stores are created
private final Map<String, ControllerClient> controllerClients = new HashMap<>();
// StoreName -> Directory
private final Map<String, File> pushJobAvroDataDirs = new HashMap<>();

private DaVinciConfig daVinciConfig;
private MetricsRepository dvcMetricsRepo;
private D2Client daVinciD2RemoteFabric;
private CachingDaVinciClientFactory daVinciClientFactory;

@BeforeClass(alwaysRun = true)
public void setUp() throws IOException {

VeniceClusterCreateOptions.Builder options = new VeniceClusterCreateOptions.Builder().clusterName(CLUSTER_NAME)
.numberOfRouters(1)
.numberOfServers(2)
.numberOfControllers(2)
.replicationFactor(2)
.forkServer(false);
clusterWrapper = ServiceFactory.getVeniceCluster(options.build());

// Create stores
runPushJob( // String to String
storeNameStringToString,
TestWriteUtils
.writeSimpleAvroFileWithStringToStringSchema(getPushJobAvroFileDirectory(storeNameStringToString)));
runPushJob( // String to Name Record
storeNameStringToNameRecord,
TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV1Schema(
getPushJobAvroFileDirectory(storeNameStringToNameRecord)));

// Set up DVC Client Factory
VeniceProperties backendConfig =
new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath())
.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB)
.put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();
daVinciConfig = new DaVinciConfig();
daVinciConfig.setUseRequestBasedMetaRepository(true);
daVinciD2RemoteFabric = D2TestUtils.getAndStartD2Client(clusterWrapper.getZk().getAddress());
dvcMetricsRepo = new MetricsRepository();
daVinciClientFactory = new CachingDaVinciClientFactory(
daVinciD2RemoteFabric,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
dvcMetricsRepo,
backendConfig);
}

@AfterClass(alwaysRun = true)
public void cleanUp() {

// Shutdown remote fabric
D2ClientUtils.shutdownClient(daVinciD2RemoteFabric);

// Close client factory
daVinciClientFactory.close();

// Close controller clients
for (Map.Entry<String, ControllerClient> entry: controllerClients.entrySet()) {
entry.getValue().close();
}

// Close cluster wrapper
clusterWrapper.close();
}

@Test(timeOut = TEST_TIMEOUT)
public void testDVCRequestBasedMetaRepositoryStringToString()
throws IOException, ExecutionException, InterruptedException {

try (DaVinciClient<String, Object> storeClient =
daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToString, daVinciConfig)) {
storeClient.subscribeAll().get();

int recordCount = TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT;
for (int i = 1; i <= recordCount; i++) {
Assert.assertEquals(
storeClient.get(Integer.toString(i)).get().toString(),
TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i);
}
Assert
.assertEquals(getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToString), (double) 1);

// Run new push job
recordCount = 200;
runPushJob(
storeNameStringToString,
TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(
getPushJobAvroFileDirectory(storeNameStringToString),
recordCount));

// Verify version swap
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, false, () -> {
Assert.assertEquals(
getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToString),
(double) 2);
});

for (int i = 1; i <= recordCount; i++) {
Assert.assertEquals(
storeClient.get(Integer.toString(i)).get().toString(),
TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i);
}
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testDVCRequestBasedMetaRepositoryStringToNameRecord() throws ExecutionException, InterruptedException {

try (DaVinciClient<String, Object> storeClient =
daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToNameRecord, daVinciConfig)) {
storeClient.subscribeAll().get();

int recordCount = TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT;
for (int i = 1; i <= recordCount; i++) {
Assert.assertEquals(
storeClient.get(Integer.toString(i)).get().toString(),
TestWriteUtils.renderNameRecord(TestWriteUtils.STRING_TO_NAME_RECORD_V1_SCHEMA, i)
.get(DEFAULT_VALUE_FIELD_PROP)
.toString());
}
Assert.assertEquals(
getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToNameRecord),
(double) 1);
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testDVCRequestBasedMetaRepositoryStringToNameRecordVersions()
throws IOException, ExecutionException, InterruptedException {

try (DaVinciClient<String, Object> storeClient =
daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToNameRecord, daVinciConfig)) {
storeClient.subscribeAll().get();

for (int i = 0; i < TestWriteUtils.countStringToNameRecordSchemas(); i++) {
Schema schema = TestWriteUtils.getStringToNameRecordSchema(i);
int currentValueVersion = i + 2;

// Run new push job with new version
int recordCount = currentValueVersion * TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT;
runPushJob(
storeNameStringToNameRecord,
TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordSchema(
getPushJobAvroFileDirectory(storeNameStringToNameRecord),
schema,
recordCount));

// Verify version swap
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, false, () -> {
Assert.assertEquals(
getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToNameRecord),
(double) (currentValueVersion));
});

// Verify storeClient can read all
for (int j = 1; j <= recordCount; j++) {
Assert.assertEquals(
storeClient.get(Integer.toString(j)).get().toString(),
TestWriteUtils.renderNameRecord(schema, j).get(DEFAULT_VALUE_FIELD_PROP).toString());
}
}
}
}

private double getMetric(MetricsRepository metricsRepository, String metricName, String storeName) {
Metric metric = metricsRepository.getMetric("." + storeName + "--" + metricName);
assertNotNull(metric, "Expected metric " + metricName + " not found.");
return metric.value();
}

private File getPushJobAvroFileDirectory(String storeName) {
if (!pushJobAvroDataDirs.containsKey(storeName)) {
pushJobAvroDataDirs.put(storeName, getTempDataDirectory());
}

return pushJobAvroDataDirs.get(storeName);
}

private void runPushJob(String storeName, Schema schema) {

ControllerClient controllerClient;
File dataDir = getPushJobAvroFileDirectory(storeName);
String dataDirPath = "file:" + dataDir.getAbsolutePath();

if (!controllerClients.containsKey(storeName)) {
// Init store
controllerClient = IntegrationTestPushUtils.createStoreForJob(
CLUSTER_NAME,
schema,
TestWriteUtils.defaultVPJProps(
clusterWrapper.getVeniceControllers().get(0).getControllerUrl(),
dataDirPath,
storeName));
controllerClients.put(storeName, controllerClient);
} else {
controllerClient = controllerClients.get(storeName);

// Add new schema
Schema valueSchema = schema.getField(DEFAULT_VALUE_FIELD_PROP).schema();
SchemaResponse schemaResponse = controllerClient.addValueSchema(storeName, valueSchema.toString());
Assert.assertFalse(schemaResponse.isError(), schemaResponse.getError());
}

Properties props =
TestWriteUtils.defaultVPJProps(controllerClient.getLeaderControllerUrl(), dataDirPath, storeName);
TestWriteUtils.runPushJob(storeName + "_" + Utils.getUniqueString("push_job"), props);
}
}
Loading

0 comments on commit 06c75e5

Please sign in to comment.