Skip to content

Commit

Permalink
Set up Flink SQL Gateway (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
jogrogan authored Jan 15, 2025
1 parent 574be65 commit 7c0455f
Show file tree
Hide file tree
Showing 19 changed files with 123 additions and 130 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
run: |
kind load docker-image hoptimator
kind load docker-image hoptimator-flink-runner
kind load docker-image hoptimator-flink-operator
- name: Deploy Dev Environment
run: make deploy-dev-environment
- name: Deploy Hoptimator
Expand Down Expand Up @@ -64,4 +65,4 @@ jobs:
- name: Capture Flink Job Logs
if: always()
run: kubectl logs $(kubectl get pods -l app.kubernetes.io/name=flink-kubernetes-operator -o name)

24 changes: 15 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ test:
build:
./gradlew build
docker build . -t hoptimator
docker build hoptimator-flink-runner -t hoptimator-flink-runner
docker build hoptimator-flink-runner -f hoptimator-flink-runner/Dockerfile-flink-runner -t hoptimator-flink-runner
docker build hoptimator-flink-runner -f hoptimator-flink-runner/Dockerfile-flink-operator -t hoptimator-flink-operator

bounce: build undeploy deploy deploy-samples deploy-config deploy-demo

Expand Down Expand Up @@ -47,26 +48,32 @@ deploy-samples: deploy
undeploy-samples: undeploy
kubectl delete -f ./deploy/samples || echo "skipping"

deploy-flink:
deploy-flink: deploy
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
helm upgrade --install --atomic --set webhook.create=false,image.pullPolicy=Never,image.repository=docker.io/library/hoptimator-flink-operator,image.tag=latest flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
kubectl apply -f deploy/samples/flinkDeployment.yaml
kubectl apply -f deploy/samples/flinkSessionJob.yaml
docker compose -f ./deploy/docker/flink/docker-compose-sql-gateway.yaml up -d --wait

undeploy-flink:
kubectl delete flinkdeployments.flink.apache.org --all || echo "skipping"
docker compose -f ./deploy/docker/flink/docker-compose-sql-gateway.yaml down
kubectl delete flinksessionjobs.flink.apache.org --all || echo "skipping"
kubectl delete crd flinkdeployments.flink.apache.org || echo "skipping"
kubectl delete flinkdeployments.flink.apache.org --all || echo "skipping"
kubectl delete crd flinksessionjobs.flink.apache.org || echo "skipping"
kubectl delete crd flinkdeployments.flink.apache.org || echo "skipping"
helm uninstall flink-kubernetes-operator || echo "skipping"
helm repo remove flink-operator-repo || echo "skipping"

deploy-kafka: deploy deploy-flink
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
kubectl create namespace kafka || echo "skipping"
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy/dev
kubectl apply -f ./deploy/samples/demodb.yaml
kubectl apply -f ./deploy/samples/kafkadb.yaml
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"

undeploy-kafka:
kubectl delete kafkatopic.kafka.strimzi.io -n kafka --all || echo "skipping"
Expand All @@ -78,18 +85,17 @@ undeploy-kafka:
kubectl delete -f ./deploy/dev || echo "skipping"
kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"
kubectl delete namespace kafka || echo "skipping"
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"

# Deploys Venice cluster in docker and creates two stores in Venice. Stores are not managed via K8s for now.
deploy-venice: deploy deploy-flink
docker compose -f ./deploy/docker/docker-compose-single-dc-setup.yaml up -d --wait
docker compose -f ./deploy/docker/venice/docker-compose-single-dc-setup.yaml up -d --wait
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store schemas/keySchema.avsc schemas/valueSchema.avsc
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store-1 schemas/keySchema.avsc schemas/valueSchema.avsc
kubectl apply -f ./deploy/samples/venicedb.yaml

undeploy-venice:
kubectl delete -f ./deploy/samples/venicedb.yaml || echo "skipping"
docker compose -f ./deploy/docker/docker-compose-single-dc-setup.yaml down
docker compose -f ./deploy/docker/venice/docker-compose-single-dc-setup.yaml down

deploy-dev-environment: deploy deploy-flink deploy-kafka deploy-venice

Expand Down
24 changes: 22 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ The below setup will install two local demo DBs, ads and profiles.
> !intro
```

## Set up Kafka & Flink clusters
## Set up dev environment

The below setup will install a Kafka and Flink cluster within Kubernetes.
The below setup will create a dev environment with various resources within Kubernetes.

```
$ make install # build and install SQL CLI
Expand All @@ -60,6 +60,26 @@ The below setup will install a Kafka and Flink cluster within Kubernetes.
> !intro
```

Commands `deploy-kafka`, `deploy-venice`, `deploy-flink`, etc. exist in isolation to deploy individual components.

### Flink

```
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
basic-session-deployment-7b94b98b6b-d6jt5 1/1 Running 0 43s
```

Once the Flink deployment pod has STATUS 'Running', you can forward port 8081 and connect to http://localhost:8081/
to access the Flink dashboard.

```
$ kubectl port-forward basic-session-deployment-7b94b98b6b-d6jt5 8081 &
```

See the [Flink SQL Gateway Documentation](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql-gateway/overview/)
for sample adhoc queries through Flink.

## The SQL CLI

The `./hoptimator` script launches the [sqlline](https://github.com/julianhyde/sqlline) SQL CLI pre-configured to connect to `jdbc:hoptimator://`. The CLI includes some additional commands. See `!intro`.
Expand Down
12 changes: 12 additions & 0 deletions deploy/docker/flink/docker-compose-sql-gateway.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
services:
flink-sql-gateway:
image: flink:1.18.1
restart: unless-stopped
entrypoint: >
/bin/sh -c "./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.rest.address=localhost"
ports:
- 8083:8083
deploy:
resources:
limits:
memory: 1024M
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ services:
hostname: venice-client
tty: true
volumes:
- ./venice:/opt/venice/schemas
- ./schemas:/opt/venice/schemas
depends_on:
venice-router:
condition: service_healthy
File renamed without changes.
File renamed without changes.
36 changes: 0 additions & 36 deletions deploy/samples/flink.yaml

This file was deleted.

18 changes: 18 additions & 0 deletions deploy/samples/flinkDeployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-session-deployment
spec:
image: flink:1.18
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 0.1
taskManager:
resource:
memory: "2048m"
cpu: 0.1
22 changes: 22 additions & 0 deletions deploy/samples/flinkSessionJob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## This template adds Flink support.

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: JobTemplate
metadata:
name: flink-template
spec:
yaml: |
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: {{name}}
spec:
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- {{flinksql}}
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
24 changes: 12 additions & 12 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@ avro = "org.apache.avro:avro:1.10.2"
calcite-avatica = "org.apache.calcite.avatica:avatica:1.23.0"
calcite-core = "org.apache.calcite:calcite-core:1.34.0"
calcite-server = "org.apache.calcite:calcite-server:1.34.0"
flink-clients = "org.apache.flink:flink-clients:1.16.2"
flink-connector-base = "org.apache.flink:flink-connector-base:1.16.2"
flink-core = "org.apache.flink:flink-core:1.16.2"
flink-csv = "org.apache.flink:flink-csv:1.16.2"
flink-streaming-java = "org.apache.flink:flink-streaming-java:1.16.2"
flink-table-api-java = "org.apache.flink:flink-table-api-java:1.16.2"
flink-table-api-java-bridge = "org.apache.flink:flink-table-api-java-bridge:1.16.2"
flink-table-common = "org.apache.flink:flink-table-common:1.16.2"
flink-table-planner = "org.apache.flink:flink-table-planner_2.12:1.16.2"
flink-table-runtime = "org.apache.flink:flink-table-runtime:1.16.2"
flink-connector-kafka = "org.apache.flink:flink-sql-connector-kafka:1.16.2"
flink-clients = "org.apache.flink:flink-clients:1.18.1"
flink-connector-base = "org.apache.flink:flink-connector-base:1.18.1"
flink-core = "org.apache.flink:flink-core:1.18.1"
flink-csv = "org.apache.flink:flink-csv:1.18.1"
flink-streaming-java = "org.apache.flink:flink-streaming-java:1.18.1"
flink-table-api-java = "org.apache.flink:flink-table-api-java:1.18.1"
flink-table-api-java-bridge = "org.apache.flink:flink-table-api-java-bridge:1.18.1"
flink-table-common = "org.apache.flink:flink-table-common:1.18.1"
flink-table-planner = "org.apache.flink:flink-table-planner_2.12:1.18.1"
flink-table-runtime = "org.apache.flink:flink-table-runtime:1.18.1"
flink-connector-kafka = "org.apache.flink:flink-sql-connector-kafka:3.2.0-1.18"
flink-connector-mysql-cdc = "com.ververica:flink-sql-connector-mysql-cdc:2.3.0"
gson = "com.google.code.gson:gson:2.9.0"
jackson = "com.fasterxml.jackson.core:jackson-core:2.14.1"
jackson-dataformat-yaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.1"
javax-annotation-api = "javax.annotation:javax.annotation-api:1.3.2"
junit = "junit:junit:4.12"
kafka-clients = "org.apache.kafka:kafka-clients:2.7.1"
kafka-clients = "org.apache.kafka:kafka-clients:3.2.0"
kubernetes-client = "io.kubernetes:client-java:16.0.2"
kubernetes-extended-client = "io.kubernetes:client-java-extended:16.0.2"
slf4j-simple = "org.slf4j:slf4j-simple:1.7.30"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
Expand Down
2 changes: 2 additions & 0 deletions hoptimator-flink-runner/Dockerfile-flink-operator
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FROM apache/flink-kubernetes-operator:1.9.0
COPY ./build/libs/hoptimator-flink-runner-all.jar /opt/hoptimator-flink-runner.jar
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
FROM flink:1.16
FROM flink:1.18.1
COPY ./build/libs/hoptimator-flink-runner-all.jar /opt/hoptimator-flink-runner.jar
17 changes: 2 additions & 15 deletions hoptimator-kafka/src/test/resources/kafka-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,11 @@

insert into kafka."existing-topic-1" select * from kafka."existing-topic-2";
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
kind: FlinkSessionJob
metadata:
name: kafka-database-existing-topic-1
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 0.1
taskManager:
resource:
memory: "2048m"
cpu: 0.1
deploymentName: basic-session-deployment-example
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
Expand Down
21 changes: 4 additions & 17 deletions hoptimator-venice/src/test/resources/venice-ddl-insert-all.id
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,16 @@

insert into "VENICE-CLUSTER0"."test-store-1" select * from "VENICE-CLUSTER0"."test-store";
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
kind: FlinkSessionJob
metadata:
name: venice-cluster0-test-store-1
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 0.1
taskManager:
resource:
memory: "2048m"
cpu: 0.1
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`
jarURI: local:///opt/hoptimator-flink-runner.jar
parallelism: 1
Expand Down
23 changes: 5 additions & 18 deletions hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,17 @@

insert into "VENICE-CLUSTER0"."test-store-1" ("KEY$id", "intField") select "KEY$id", "stringField" from "VENICE-CLUSTER0"."test-store";
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
kind: FlinkSessionJob
metadata:
name: venice-cluster0-test-store-1
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 0.1
taskManager:
resource:
memory: "2048m"
cpu: 0.1
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS INTEGER) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store`
- CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS SIGNED) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store`
jarURI: local:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand Down
Loading

0 comments on commit 7c0455f

Please sign in to comment.