diff --git a/deploy/samples/venicedb.yaml b/deploy/samples/venicedb.yaml index f5be95d..8091f9a 100644 --- a/deploy/samples/venicedb.yaml +++ b/deploy/samples/venicedb.yaml @@ -1,10 +1,10 @@ apiVersion: hoptimator.linkedin.com/v1alpha1 kind: Database metadata: - name: venice-cluster0 + name: venice spec: - schema: VENICE-CLUSTER0 - url: jdbc:venice://cluster=venice-cluster0;router.url=http://localhost:7777 + schema: VENICE + url: jdbc:venice://clusters=venice-cluster0;router.url=http://localhost:7777 dialect: Calcite --- @@ -12,10 +12,10 @@ spec: apiVersion: hoptimator.linkedin.com/v1alpha1 kind: TableTemplate metadata: - name: venice-template-cluster0 + name: venice-template spec: databases: - - venice-cluster0 + - venice connector: | connector = venice storeName = {{table}} diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java index f83afc4..f326cec 100644 --- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java @@ -1,7 +1,9 @@ package com.linkedin.hoptimator.venice; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -35,8 +37,10 @@ public ClusterSchema(Properties properties) { public void populate() throws InterruptedException, ExecutionException, IOException { tableMap.clear(); - String cluster = properties.getProperty("cluster"); - log.info("Loading Venice stores for cluster {}", cluster); + String clusterStr = properties.getProperty("clusters"); + List clusters = Arrays.asList(clusterStr.split(",")); + + log.info("Loading Venice stores for cluster {}", clusters); String sslConfigPath = properties.getProperty("ssl-config-path"); Optional sslFactory = Optional.empty(); @@ -47,12 +51,14 @@ public void populate() throws InterruptedException, ExecutionException, IOExcept sslFactory = Optional.of(SslUtils.getSSLFactory(sslProperties, sslFactoryClassName)); } - try (ControllerClient controllerClient = createControllerClient(cluster, sslFactory)) { - String[] stores = controllerClient.queryStoreList(false).getStores(); - log.info("Loaded {} Venice stores.", stores.length); - for (String store : stores) { - StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(store); - tableMap.put(store, createVeniceStore(storeSchemaFetcher)); + for (String cluster : clusters) { + try (ControllerClient controllerClient = createControllerClient(cluster, sslFactory)) { + String[] stores = controllerClient.queryStoreList(false).getStores(); + log.info("Loaded {} Venice stores.", stores.length); + for (String store : stores) { + StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(store); + tableMap.put(store, createVeniceStore(storeSchemaFetcher)); + } } } } diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDriver.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDriver.java index 90d018b..e182737 100644 --- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDriver.java +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDriver.java @@ -3,7 +3,6 @@ import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; -import java.util.Locale; import java.util.Properties; import org.apache.calcite.avatica.ConnectStringParser; @@ -16,7 +15,6 @@ /** JDBC driver for Venice stores. */ public class VeniceDriver extends Driver { public static final String CATALOG_NAME = "VENICE"; - public static final String CONFIG_NAME = "venice.config"; static { new VeniceDriver().register(); @@ -41,14 +39,7 @@ public Connection connect(String url, Properties props) throws SQLException { Properties properties = new Properties(); properties.putAll(props); // in case the driver is loaded via getConnection() properties.putAll(ConnectStringParser.parse(url.substring(getConnectStringPrefix().length()))); - String cluster = properties.getProperty("cluster"); - if (cluster == null) { - throw new IllegalArgumentException("Missing required cluster property. Need: jdbc:venice://cluster=..."); - } - cluster = cluster.toUpperCase(Locale.ROOT); - if (!cluster.startsWith(CATALOG_NAME)) { - cluster = CATALOG_NAME + "-" + cluster; - } + try { Connection connection = super.connect(url, props); if (connection == null) { @@ -60,7 +51,7 @@ public Connection connect(String url, Properties props) throws SQLException { SchemaPlus rootSchema = calciteConnection.getRootSchema(); ClusterSchema schema = createClusterSchema(properties); schema.populate(); - rootSchema.add(cluster.toUpperCase(Locale.ROOT), schema); + rootSchema.add(CATALOG_NAME, schema); return connection; } catch (Exception e) { throw new SQLException("Problem loading " + url, e); diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id index e1ca7d4..6c249ba 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id @@ -1,22 +1,22 @@ !set outputformat mysql !use k8s -insert into "VENICE-CLUSTER0"."test-store-1" select * from "VENICE-CLUSTER0"."test-store"; +insert into "VENICE"."test-store-1" select * from "VENICE"."test-store"; apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: - name: venice-cluster0-test-store-1 + name: venice-test-store-1 namespace: flink spec: deploymentName: basic-session-deployment job: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') - - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') - - INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store` + - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () + - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') + - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () + - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - INSERT INTO `VENICE`.`test-store-1` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE`.`test-store` jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id index ada0b4c..256a02d 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id @@ -1,22 +1,22 @@ !set outputformat mysql !use k8s -insert into "VENICE-CLUSTER0"."test-store-1" ("KEY_id", "intField") select "KEY_id", "stringField" from "VENICE-CLUSTER0"."test-store"; +insert into "VENICE"."test-store-1" ("KEY_id", "intField") select "KEY_id", "stringField" from "VENICE"."test-store"; apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: - name: venice-cluster0-test-store-1 + name: venice-test-store-1 namespace: flink spec: deploymentName: basic-session-deployment job: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') - - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') - - INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`KEY_id`, `intField`) SELECT `KEY_id`, CAST(`stringField` AS SIGNED) AS `intField` FROM `VENICE-CLUSTER0`.`test-store` + - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () + - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') + - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () + - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - INSERT INTO `VENICE`.`test-store-1` (`KEY_id`, `intField`) SELECT `KEY_id`, CAST(`stringField` AS SIGNED) AS `intField` FROM `VENICE`.`test-store` jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless diff --git a/hoptimator-venice/src/test/resources/venice-ddl-select.id b/hoptimator-venice/src/test/resources/venice-ddl-select.id index 7c0b538..59611d3 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-select.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-select.id @@ -1,7 +1,7 @@ !set outputformat mysql !use k8s -select * from "VENICE-CLUSTER0"."test-store-1"; +select * from "VENICE"."test-store-1"; apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: @@ -12,11 +12,11 @@ spec: job: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () + - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') - CREATE DATABASE IF NOT EXISTS `PIPELINE` WITH () - CREATE TABLE IF NOT EXISTS `PIPELINE`.`SINK` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH () - - INSERT INTO `PIPELINE`.`SINK` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1` + - INSERT INTO `PIPELINE`.`SINK` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE`.`test-store-1` jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless