-
Notifications
You must be signed in to change notification settings - Fork 91
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add requestbasedmetarepository integration testing
- Loading branch information
Showing
7 changed files
with
387 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
273 changes: 273 additions & 0 deletions
273
...egrationTest/java/com/linkedin/venice/endToEnd/TestDaVinciRequestBasedMetaRepository.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,273 @@ | ||
package com.linkedin.venice.endToEnd; | ||
|
||
import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS; | ||
import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY; | ||
import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH; | ||
import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; | ||
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory; | ||
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP; | ||
import static org.testng.Assert.assertNotNull; | ||
|
||
import com.linkedin.d2.balancer.D2Client; | ||
import com.linkedin.davinci.client.DaVinciClient; | ||
import com.linkedin.davinci.client.DaVinciConfig; | ||
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory; | ||
import com.linkedin.venice.D2.D2ClientUtils; | ||
import com.linkedin.venice.controllerapi.ControllerClient; | ||
import com.linkedin.venice.controllerapi.SchemaResponse; | ||
import com.linkedin.venice.integration.utils.D2TestUtils; | ||
import com.linkedin.venice.integration.utils.ServiceFactory; | ||
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions; | ||
import com.linkedin.venice.integration.utils.VeniceClusterWrapper; | ||
import com.linkedin.venice.integration.utils.VeniceRouterWrapper; | ||
import com.linkedin.venice.meta.PersistenceType; | ||
import com.linkedin.venice.utils.IntegrationTestPushUtils; | ||
import com.linkedin.venice.utils.PropertyBuilder; | ||
import com.linkedin.venice.utils.TestUtils; | ||
import com.linkedin.venice.utils.TestWriteUtils; | ||
import com.linkedin.venice.utils.Time; | ||
import com.linkedin.venice.utils.Utils; | ||
import com.linkedin.venice.utils.VeniceProperties; | ||
import io.tehuti.Metric; | ||
import io.tehuti.metrics.MetricsRepository; | ||
import java.io.File; | ||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Properties; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.avro.Schema; | ||
import org.testng.Assert; | ||
import org.testng.annotations.AfterClass; | ||
import org.testng.annotations.BeforeClass; | ||
import org.testng.annotations.Test; | ||
|
||
|
||
public class TestDaVinciRequestBasedMetaRepository { | ||
private static final int TEST_TIMEOUT = 2 * Time.MS_PER_MINUTE; | ||
|
||
private static final String CLUSTER_NAME = "venice-cluster"; | ||
private VeniceClusterWrapper clusterWrapper; | ||
|
||
private static final String storeNameStringToString = "store-name-string-to-string"; | ||
private static final String storeNameStringToNameRecord = "store-name-string-to-name-record"; | ||
|
||
// StoreName -> ControllerClient | ||
// Using map to check which stores are created | ||
private final Map<String, ControllerClient> controllerClients = new HashMap<>(); | ||
// StoreName -> Directory | ||
private final Map<String, File> pushJobAvroDataDirs = new HashMap<>(); | ||
|
||
private DaVinciConfig daVinciConfig; | ||
private MetricsRepository dvcMetricsRepo; | ||
private D2Client daVinciD2RemoteFabric; | ||
private CachingDaVinciClientFactory daVinciClientFactory; | ||
|
||
@BeforeClass(alwaysRun = true) | ||
public void setUp() throws IOException { | ||
|
||
VeniceClusterCreateOptions.Builder options = new VeniceClusterCreateOptions.Builder().clusterName(CLUSTER_NAME) | ||
.numberOfRouters(1) | ||
.numberOfServers(2) | ||
.numberOfControllers(2) | ||
.replicationFactor(2) | ||
.forkServer(false); | ||
clusterWrapper = ServiceFactory.getVeniceCluster(options.build()); | ||
|
||
// Create stores | ||
runPushJob( // String to String | ||
storeNameStringToString, | ||
TestWriteUtils | ||
.writeSimpleAvroFileWithStringToStringSchema(getPushJobAvroFileDirectory(storeNameStringToString))); | ||
runPushJob( // String to Name Record | ||
storeNameStringToNameRecord, | ||
TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV1Schema( | ||
getPushJobAvroFileDirectory(storeNameStringToNameRecord))); | ||
|
||
// Set up DVC Client Factory | ||
VeniceProperties backendConfig = | ||
new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()) | ||
.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB) | ||
.put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) | ||
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) | ||
.build(); | ||
daVinciConfig = new DaVinciConfig(); | ||
daVinciConfig.setUseRequestBasedMetaRepository(true); | ||
daVinciD2RemoteFabric = D2TestUtils.getAndStartD2Client(clusterWrapper.getZk().getAddress()); | ||
dvcMetricsRepo = new MetricsRepository(); | ||
daVinciClientFactory = new CachingDaVinciClientFactory( | ||
daVinciD2RemoteFabric, | ||
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, | ||
dvcMetricsRepo, | ||
backendConfig); | ||
} | ||
|
||
@AfterClass(alwaysRun = true) | ||
public void cleanUp() { | ||
|
||
// Shutdown remote fabric | ||
D2ClientUtils.shutdownClient(daVinciD2RemoteFabric); | ||
|
||
// Close client factory | ||
daVinciClientFactory.close(); | ||
|
||
// Close controller clients | ||
for (Map.Entry<String, ControllerClient> entry: controllerClients.entrySet()) { | ||
entry.getValue().close(); | ||
} | ||
|
||
// Close cluster wrapper | ||
clusterWrapper.close(); | ||
} | ||
|
||
@Test(timeOut = TEST_TIMEOUT) | ||
public void testDVCRequestBasedMetaRepositoryStringToString() | ||
throws IOException, ExecutionException, InterruptedException { | ||
|
||
try (DaVinciClient<String, Object> storeClient = | ||
daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToString, daVinciConfig)) { | ||
storeClient.subscribeAll().get(); | ||
|
||
int recordCount = TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT; | ||
for (int i = 1; i <= recordCount; i++) { | ||
Assert.assertEquals( | ||
storeClient.get(Integer.toString(i)).get().toString(), | ||
TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i); | ||
} | ||
Assert | ||
.assertEquals(getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToString), (double) 1); | ||
|
||
// Perform another push with 200 keys to verify future version ingestion | ||
recordCount = 200; | ||
runPushJob( | ||
storeNameStringToString, | ||
TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema( | ||
getPushJobAvroFileDirectory(storeNameStringToString), | ||
recordCount)); | ||
|
||
// Perform another push with 200 keys to verify future version ingestion and version swap | ||
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, false, () -> { | ||
Assert.assertEquals( | ||
getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToString), | ||
(double) 2); | ||
}); | ||
|
||
for (int i = 1; i <= recordCount; i++) { | ||
Assert.assertEquals( | ||
storeClient.get(Integer.toString(i)).get().toString(), | ||
TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX + i); | ||
} | ||
} | ||
|
||
Assert.assertEquals(false, true); // TODO PRANAV | ||
} | ||
|
||
@Test(timeOut = TEST_TIMEOUT) | ||
public void testDVCRequestBasedMetaRepositoryStringToNameRecord() throws ExecutionException, InterruptedException { | ||
|
||
try (DaVinciClient<String, Object> storeClient = | ||
daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToNameRecord, daVinciConfig)) { | ||
storeClient.subscribeAll().get(); | ||
|
||
int recordCount = TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT; | ||
for (int i = 1; i <= recordCount; i++) { | ||
Assert.assertEquals( | ||
storeClient.get(Integer.toString(i)).get().toString(), | ||
TestWriteUtils.renderNameRecord(TestWriteUtils.STRING_TO_NAME_RECORD_V1_SCHEMA, i) | ||
.get(DEFAULT_VALUE_FIELD_PROP) | ||
.toString()); | ||
} | ||
Assert.assertEquals( | ||
getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToNameRecord), | ||
(double) 1); | ||
} | ||
|
||
Assert.assertEquals(false, true); // TODO PRANAV | ||
} | ||
|
||
@Test(timeOut = TEST_TIMEOUT) | ||
public void testDVCRequestBasedMetaRepositoryStringToNameRecordVersions() | ||
throws IOException, ExecutionException, InterruptedException { | ||
|
||
try (DaVinciClient<String, Object> storeClient = | ||
daVinciClientFactory.getAndStartGenericAvroClient(storeNameStringToNameRecord, daVinciConfig)) { | ||
storeClient.subscribeAll().get(); | ||
|
||
for (int i = 0; i < TestWriteUtils.STRING_TO_NAME_RECORD_SCHEMAS.length; i++) { | ||
Schema schema = TestWriteUtils.STRING_TO_NAME_RECORD_SCHEMAS[i]; | ||
int currentValueVersion = i + 2; | ||
System.out.println("HERE HERE HERE abcd: " + currentValueVersion); | ||
|
||
int recordCount = currentValueVersion * TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT; | ||
runPushJob( | ||
storeNameStringToNameRecord, | ||
TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordSchema( | ||
getPushJobAvroFileDirectory(storeNameStringToNameRecord), | ||
schema, | ||
recordCount)); | ||
|
||
// Perform another push with 200 keys to verify future version ingestion and version swap | ||
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, false, () -> { | ||
Assert.assertEquals( | ||
getMetric(dvcMetricsRepo, "current_version_number.Gauge", storeNameStringToNameRecord), | ||
(double) (currentValueVersion)); | ||
}); | ||
|
||
for (int j = 1; j <= recordCount; j++) { | ||
Assert.assertEquals( | ||
storeClient.get(Integer.toString(j)).get().toString(), | ||
TestWriteUtils.renderNameRecord(schema, j).get(DEFAULT_VALUE_FIELD_PROP).toString()); | ||
} | ||
} | ||
} | ||
|
||
Assert.assertEquals(false, true); // TODO PRANAV | ||
} | ||
|
||
private double getMetric(MetricsRepository metricsRepository, String metricName, String storeName) { | ||
Metric metric = metricsRepository.getMetric("." + storeName + "--" + metricName); | ||
assertNotNull(metric, "Expected metric " + metricName + " not found."); | ||
return metric.value(); | ||
} | ||
|
||
private File getPushJobAvroFileDirectory(String storeName) { | ||
if (!pushJobAvroDataDirs.containsKey(storeName)) { | ||
pushJobAvroDataDirs.put(storeName, getTempDataDirectory()); | ||
} | ||
|
||
return pushJobAvroDataDirs.get(storeName); | ||
} | ||
|
||
private void runPushJob(String storeName, Schema schema) { | ||
|
||
ControllerClient controllerClient = null; | ||
File dataDir = getPushJobAvroFileDirectory(storeName); | ||
String dataDirPath = "file:" + dataDir.getAbsolutePath(); | ||
|
||
if (!controllerClients.containsKey(storeName)) { | ||
System.out.println("HERE HERE HERE runPushJob - creating store: " + storeName); | ||
// Init store | ||
controllerClient = IntegrationTestPushUtils.createStoreForJob( | ||
CLUSTER_NAME, | ||
schema, | ||
TestWriteUtils.defaultVPJProps( | ||
clusterWrapper.getVeniceControllers().get(0).getControllerUrl(), | ||
dataDirPath, | ||
storeName)); | ||
controllerClients.put(storeName, controllerClient); | ||
} else { | ||
System.out.println("HERE HERE HERE runPushJob - adding value schema: " + storeName); | ||
controllerClient = controllerClients.get(storeName); | ||
|
||
// Add new schema | ||
Schema valueSchema = schema.getField(DEFAULT_VALUE_FIELD_PROP).schema(); | ||
SchemaResponse schemaResponse = controllerClient.addValueSchema(storeName, valueSchema.toString()); | ||
Assert.assertFalse(schemaResponse.isError(), schemaResponse.getError()); | ||
} | ||
|
||
Properties props = | ||
TestWriteUtils.defaultVPJProps(controllerClient.getLeaderControllerUrl(), dataDirPath, storeName); | ||
TestWriteUtils.runPushJob(storeName + "_" + Utils.getUniqueString("push_job"), props); | ||
} | ||
} |
Oops, something went wrong.