Skip to content

Commit

Permalink
add requestbasedmetarepository integration testing
Browse files Browse the repository at this point in the history
  • Loading branch information
pthirun committed Feb 13, 2025
1 parent 70236be commit ea4f353
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public class DaVinciConfig {
*/
private int largeBatchRequestSplitThreshold = AvroGenericDaVinciClient.DEFAULT_CHUNK_SPLIT_THRESHOLD;

/**
* Determines whether to enable request-based metadata retrieval directly from the Venice Server.
* By default, metadata is retrieved from a system store via a thin client.
*/
private boolean useRequestBasedMetaRepository = false;

public DaVinciConfig() {
}

Expand Down Expand Up @@ -147,4 +153,13 @@ public DaVinciConfig setLargeBatchRequestSplitThreshold(int largeBatchRequestSpl
this.largeBatchRequestSplitThreshold = largeBatchRequestSplitThreshold;
return this;
}

public boolean isUseRequestBasedMetaRepository() {
return useRequestBasedMetaRepository;
}

public DaVinciConfig setUseRequestBasedMetaRepository(boolean useRequestBasedMetaRepository) {
this.useRequestBasedMetaRepository = useRequestBasedMetaRepository;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ protected synchronized DaVinciClient getClient(
ClientConfig clientConfig = new ClientConfig(internalStoreName).setD2Client(d2Client)
.setD2ServiceName(clusterDiscoveryD2ServiceName)
.setMetricsRepository(metricsRepository)
.setSpecificValueClass(valueClass);
.setSpecificValueClass(valueClass)
.setUseRequestBasedMetaRepository(config.isUseRequestBasedMetaRepository());

DaVinciClient client;
if (config.isIsolated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public static NativeMetadataRepository getInstance(
ClientConfig clientConfig,
VeniceProperties backendConfig,
ICProvider icProvider) {

NativeMetadataRepository nativeMetadataRepository;
if (clientConfig.isUseRequestBasedMetaRepository()) {
nativeMetadataRepository = new RequestBasedMetaRepository(clientConfig, backendConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,15 @@ protected int getMaxValueSchemaId(String storeName) {
}

protected void cacheStoreSchema(String storeName, StorePropertiesResponseRecord record) {

if (!storeSchemaMap.containsKey(storeName)) {
// New schema data
// New store
Map.Entry<CharSequence, CharSequence> keySchemaEntry =
record.getStoreMetaValue().getStoreKeySchemas().getKeySchemaMap().entrySet().iterator().next();
SchemaData schemaData = new SchemaData(
storeName,
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString()));
storeSchemaMap.put(storeName, schemaData);
}

// Store Value Schemas
for (Map.Entry<CharSequence, CharSequence> entry: record.getStoreMetaValue()
.getStoreValueSchemas()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.repository.NativeMetadataRepository;
import com.linkedin.davinci.repository.RequestBasedMetaRepository;
import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
Expand Down Expand Up @@ -289,50 +288,6 @@ public void testThinClientMetaStoreBasedRepository() throws InterruptedException
}
}

// TODO PRANAV move this test and use the full DVC
// Can we add a new test file where we run a more
// comprehensive integration test with a DVC? i.e
// push some new versions or make some store config
// changes and make sure the DVC pick up those changes.
// You can see examples like the recently added
// testBatchOnlyMaterializedViewDVCConsumer.
// You probably don't need a VeniceTwoLayerMultiRegionMultiClusterWrapper,
// a single region will be sufficient.
@Test(timeOut = 120 * Time.MS_PER_SECOND)
public void testRequestBasedMetaStoreBasedRepository() throws InterruptedException {
String regularVeniceStoreName = Utils.getUniqueString("venice_store");
createStoreAndMaterializeMetaSystemStore(regularVeniceStoreName);
D2Client d2Client = null;
NativeMetadataRepository nativeMetadataRepository = null;
try {
d2Client = D2TestUtils.getAndStartD2Client(veniceLocalCluster.getZk().getAddress());
ClientConfig<StoreMetaValue> clientConfig =
getClientConfig(regularVeniceStoreName, d2Client).setUseRequestBasedMetaRepository(true);
// Not providing a CLIENT_META_SYSTEM_STORE_VERSION_MAP, should use the default value of 1 for system store
// current version.
VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();
nativeMetadataRepository = NativeMetadataRepository.getInstance(clientConfig, backendConfig);
nativeMetadataRepository.start();
// RequestBasedMetaRepository implementation should be used since
Assert.assertTrue(nativeMetadataRepository instanceof RequestBasedMetaRepository);
verifyRepository(nativeMetadataRepository, regularVeniceStoreName);
} finally {
if (d2Client != null) {
D2ClientUtils.shutdownClient(d2Client);
}
if (nativeMetadataRepository != null) {
// Calling clear explicitly here because if the NativeMetadataRepository implementation used happens to
// initialize
// a new DaVinciBackend then calling clear will trigger the cleanup logic to ensure the DaVinciBackend is not
// leaked
// into other tests.
nativeMetadataRepository.clear();
}
}
}

@Test(timeOut = 60 * Time.MS_PER_SECOND)
public void testThinClientMetaStoreBasedRepositoryWithLargeValueSchemas() throws InterruptedException {
String regularVeniceStoreName = Utils.getUniqueString("venice_store");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY;
import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH;
import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP;
import static org.testng.Assert.assertNotNull;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.Metric;
import io.tehuti.metrics.MetricsRepository;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TestDaVinciRequestBasedMetaRepository {
private static final int TEST_TIMEOUT = 2 * Time.MS_PER_MINUTE;

private static final String CLUSTER_NAME = "venice-cluster";
private VeniceClusterWrapper clusterWrapper;

private static final String storeNameStringToString = "store-name-string-to-string";
private static final String storeNameStringToNameRecord = "store-name-string-to-name-record";

// StoreName -> ControllerClient
// Using map to check which stores are created
private final Map<String, ControllerClient> controllerClients = new HashMap<>();
// StoreName -> Directory
private final Map<String, File> pushJobAvroDataDirs = new HashMap<>();

private DaVinciConfig daVinciConfig;
private MetricsRepository dvcMetricsRepo;
private D2Client daVinciD2RemoteFabric;
private CachingDaVinciClientFactory daVinciClientFactory;

@BeforeClass(alwaysRun = true)
public void setUp() throws IOException {

VeniceClusterCreateOptions.Builder options = new VeniceClusterCreateOptions.Builder().clusterName(CLUSTER_NAME)
.numberOfRouters(1)
.numberOfServers(2)
.numberOfControllers(2)
.replicationFactor(2)
.forkServer(false);
clusterWrapper = ServiceFactory.getVeniceCluster(options.build());

// Create stores
runPushJob( // String to String
storeNameStringToString,
TestWriteUtils
.writeSimpleAvroFileWithStringToStringSchema(getPushJobAvroFileDirectory(storeNameStringToString)));
runPushJob( // String to Name Record
storeNameStringToNameRecord,
TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV1Schema(
getPushJobAvroFileDirectory(storeNameStringToNameRecord)));

// Set up DVC Client Factory
VeniceProperties backendConfig =
new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath())
.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB)
.put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();
daVinciConfig = new DaVinciConfig();
daVinciConfig.setUseRequestBasedMetaRepository(true);
daVinciD2RemoteFabric = D2TestUtils.getAndStartD2Client(clusterWrapper.getZk().getAddress());
dvcMetricsRepo = new MetricsRepository();
daVinciClientFactory = new CachingDaVinciClientFactory(
daVinciD2RemoteFabric,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
dvcMetricsRepo,
backendConfig);
}

@AfterClass(alwaysRun = true)
public void cleanUp() {

// Shutdown remote fabric
D2ClientUtils.shutdownClient(daVinciD2RemoteFabric);

// Close client factory
daVinciClientFactory.close();

// Close controller clients
for (Map.Entry<String, ControllerClient> entry: controllerClients.entrySet()) {
entry.getValue().close();
}

// Close cluster wrapper
clusterWrapper.close();
}

@Test(timeOut = TEST_TIMEOUT)
public void testDVCRequestBasedMetaRepositoryStringToString()
throws IOException, ExecutionException, InterruptedException {

try (DaVinciClient<String, Object> storeClient =
daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToString, daVinciConfig)) {
storeClient.subscribeAll().get();

int recordCount = TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT;
for (int i = 1; i <= recordCount; i++) {
Assert.assertEquals(
storeClient.get(Integer.toString(i)).get().toString(),
TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i);
}
Assert
.assertEquals(getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToString), (double) 1);

// Run new push job
recordCount = 200;
runPushJob(
storeNameStringToString,
TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(
getPushJobAvroFileDirectory(storeNameStringToString),
recordCount));

// Verify version swap
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, false, () -> {
Assert.assertEquals(
getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToString),
(double) 2);
});

for (int i = 1; i <= recordCount; i++) {
Assert.assertEquals(
storeClient.get(Integer.toString(i)).get().toString(),
TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i);
}
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testDVCRequestBasedMetaRepositoryStringToNameRecord() throws ExecutionException, InterruptedException {

try (DaVinciClient<String, Object> storeClient =
daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToNameRecord, daVinciConfig)) {
storeClient.subscribeAll().get();

int recordCount = TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT;
for (int i = 1; i <= recordCount; i++) {
// Verify storeClient can read
Assert.assertEquals(
storeClient.get(Integer.toString(i)).get().toString(),
TestWriteUtils.renderNameRecord(TestWriteUtils.STRING_TO_NAME_RECORD_V1_SCHEMA, i)
.get(DEFAULT_VALUE_FIELD_PROP)
.toString());
}

// Verify version
Assert.assertEquals(
getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToNameRecord),
(double) 1);
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testDVCRequestBasedMetaRepositoryStringToNameRecordVersions()
throws IOException, ExecutionException, InterruptedException {

try (DaVinciClient<String, Object> storeClient =
daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToNameRecord, daVinciConfig)) {
storeClient.subscribeAll().get();

for (int i = 0; i < TestWriteUtils.countStringToNameRecordSchemas(); i++) {
Schema schema = TestWriteUtils.getStringToNameRecordSchema(i);
int currentValueVersion = i + 2;

// Run new push job with new version
int recordCount = currentValueVersion * TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT;
runPushJob(
storeNameStringToNameRecord,
TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordSchema(
getPushJobAvroFileDirectory(storeNameStringToNameRecord),
schema,
recordCount));

// Verify version swap
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, false, () -> {
Assert.assertEquals(
getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToNameRecord),
(double) (currentValueVersion));
});

// Verify storeClient can read all
for (int j = 1; j <= recordCount; j++) {
Assert.assertEquals(
storeClient.get(Integer.toString(j)).get().toString(),
TestWriteUtils.renderNameRecord(schema, j).get(DEFAULT_VALUE_FIELD_PROP).toString());
}
}
}
}

private double getMetric(MetricsRepository metricsRepository, String metricName, String storeName) {
Metric metric = metricsRepository.getMetric("." + storeName + "--" + metricName);
assertNotNull(metric, "Expected metric " + metricName + " not found.");
return metric.value();
}

private File getPushJobAvroFileDirectory(String storeName) {
if (!pushJobAvroDataDirs.containsKey(storeName)) {
pushJobAvroDataDirs.put(storeName, getTempDataDirectory());
}

return pushJobAvroDataDirs.get(storeName);
}

private void runPushJob(String storeName, Schema schema) {

ControllerClient controllerClient;
File dataDir = getPushJobAvroFileDirectory(storeName);
String dataDirPath = "file:" + dataDir.getAbsolutePath();

if (!controllerClients.containsKey(storeName)) {
// Init store
controllerClient = IntegrationTestPushUtils.createStoreForJob(
CLUSTER_NAME,
schema,
TestWriteUtils.defaultVPJProps(
clusterWrapper.getVeniceControllers().get(0).getControllerUrl(),
dataDirPath,
storeName));
controllerClients.put(storeName, controllerClient);
} else {
controllerClient = controllerClients.get(storeName);

// Add new schema
Schema valueSchema = schema.getField(DEFAULT_VALUE_FIELD_PROP).schema();
SchemaResponse schemaResponse = controllerClient.addValueSchema(storeName, valueSchema.toString());
Assert.assertFalse(schemaResponse.isError(), schemaResponse.getError());
}

Properties props =
TestWriteUtils.defaultVPJProps(controllerClient.getLeaderControllerUrl(), dataDirPath, storeName);
TestWriteUtils.runPushJob(storeName + "_" + Utils.getUniqueString("push_job"), props);
}
}
Loading

0 comments on commit ea4f353

Please sign in to comment.