Skip to content

Commit

Permalink
Apply some Zeppelin fixes for kafka (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
jogrogan authored Feb 14, 2025
1 parent 547e5c7 commit c182634
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 15 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ build-zeppelin: build
# attaches to terminal (not run as daemon)
run-zeppelin: build-zeppelin
kubectl apply -f deploy/docker/zeppelin/zeppelin-flink-engine.yaml
kubectl apply -f deploy/docker/zeppelin/zeppelin-kafkadb.yaml
docker run --rm -p 8080:8080 \
--volume=${HOME}/.kube/config:/opt/zeppelin/.kube/config \
--add-host=docker-for-desktop:host-gateway \
Expand Down
1 change: 1 addition & 0 deletions deploy/dev/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ spec:
nodePort: 31092
brokers:
- broker: 0
# advertisedHost: host.docker.internal # swap these lines to enable Zeppelin, TODO: figure out a way around this
advertisedHost: 127.0.0.1
nodePort: 31234
config:
Expand Down
8 changes: 8 additions & 0 deletions deploy/docker/zeppelin/zeppelin-kafkadb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Database
metadata:
name: kafka-database
spec:
schema: KAFKA
url: jdbc:kafka://bootstrap.servers=host.docker.internal:9092
dialect: Calcite
4 changes: 2 additions & 2 deletions deploy/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ metadata:
name: hoptimator-operator
rules:
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["acls", "kafkatopics", "subscriptions", "sqljobs", "pipelines"]
resources: ["acls", "databases", "engines", "jobtemplates", "kafkatopics", "pipelines", "sqljobs", "subscriptions", "tabletemplates", "views"]
verbs: ["get", "watch", "list", "update", "create"]
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["kafkatopics/status", "subscriptions/status", "acls/status", "sqljobs/status", "pipelines/status"]
resources: ["acls/status", "kafkatopics/status", "pipelines/status", "sqljobs/status", "subscriptions/status"]
verbs: ["get", "patch"]
- apiGroups: ["flink.apache.org"]
resources: ["flinkdeployments", "flinksessionjobs"]
Expand Down
1 change: 1 addition & 0 deletions deploy/samples/kafkadb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ spec:
value.format = json
scan.startup.mode = earliest-offset
key.fields = KEY
key.format = raw
value.fields-include = EXCEPT_KEY
---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,18 @@ public Connection connect(String url, Properties props) throws SQLException {
return null;
}
try {
// Load properties from the URL and from getConnection()'s properties.
// URL properties take precedence.
Properties properties = new Properties();
properties.putAll(props); // via getConnection()
properties.putAll(ConnectStringParser.parse(url.substring(getConnectStringPrefix().length())));

if (prepareFactory == null) {
// funky way of extending Driver with a custom Prepare:
return withPrepareFactory(() -> new Prepare(props))
.connect(url, props);
return withPrepareFactory(() -> new Prepare(properties))
.connect(url, properties);
}
Connection connection = super.connect(url, props);
Connection connection = super.connect(url, properties);
if (connection == null) {
throw new IOException("Could not connect to " + url);
}
Expand All @@ -76,12 +82,6 @@ public Connection connect(String url, Properties props) throws SQLException {
calciteConnection.setSchema("DEFAULT");

WrappedSchemaPlus wrappedRootSchema = new WrappedSchemaPlus(rootSchema);

// Load properties from the URL and from getConnection()'s properties.
// URL properties take precedence.
Properties properties = new Properties();
properties.putAll(props); // via getConnection()
properties.putAll(ConnectStringParser.parse(url.substring(getConnectStringPrefix().length())));
String[] catalogs = properties.getProperty("catalogs", "").split(",");

if (catalogs.length == 0 || catalogs[0].length() == 0) {
Expand All @@ -92,7 +92,7 @@ public Connection connect(String url, Properties props) throws SQLException {
} else {
// load specific catalogs when loaded as `jdbc:hoptimator://catalogs=foo,bar`
for (String catalog : catalogs) {
CatalogService.catalog(catalog).register(wrappedRootSchema, props);
CatalogService.catalog(catalog).register(wrappedRootSchema, properties);
}
}

Expand All @@ -104,7 +104,7 @@ public Connection connect(String url, Properties props) throws SQLException {

@Override
public Driver withPrepareFactory(Supplier<CalcitePrepare> prepareFactory) {
return new HoptimatorDriver(prepareFactory);
return new HoptimatorDriver(prepareFactory);
}

public static class Prepare extends CalcitePrepareImpl {
Expand Down
4 changes: 2 additions & 2 deletions hoptimator-kafka/src/test/resources/kafka-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ spec:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE DATABASE IF NOT EXISTS `KAFKA` WITH ()
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'key.fields'='KEY', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json')
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json')
- CREATE DATABASE IF NOT EXISTS `KAFKA` WITH ()
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'key.fields'='KEY', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json')
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json')
- INSERT INTO `KAFKA`.`existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
Expand Down

0 comments on commit c182634

Please sign in to comment.