Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
310c614
Add the spark-operator crds
fabian-amrc Mar 19, 2025
e6402c4
Add spark-operator to the helm chart
fabian-amrc Mar 19, 2025
43dd150
Updated MinIO to more recent version
fabian-amrc Mar 21, 2025
7a3915f
Updated tenant deployment for better performance and scalability
fabian-amrc Mar 21, 2025
360b3e7
Add minio deltalake bucket for delta table data
fabian-amrc Mar 21, 2025
057d798
Add kafka helm dependency
fabian-amrc Mar 21, 2025
b9febbc
Add push to kafka topic from mqtt broker
fabian-amrc Mar 21, 2025
22c2cc1
Add base dockerfile for spark-applications
fabian-amrc Mar 25, 2025
1193870
Add ingester to helm deployment
fabian-amrc Mar 25, 2025
4b8546e
Deleted unnecessary spark operator crds
fabian-amrc Mar 26, 2025
2493c77
Chang minio ingress to use templates
fabian-amrc Mar 27, 2025
8efc7c5
Disabled spark application ingress
fabian-amrc Mar 27, 2025
28f4519
Fix linting error
fabian-amrc Mar 31, 2025
f33092f
Merge remote-tracking branch 'origin/testing/v4' into fje/data-lake
fabian-amrc Mar 31, 2025
39f459a
Increase partitions and replication factor for kafka
fabian-amrc Mar 31, 2025
50df5d7
Create separate historian module for kafka
fabian-amrc Mar 31, 2025
cf69857
Revert "Add push to kafka topic from mqtt broker"
fabian-amrc Mar 31, 2025
d7fb562
Update example env file for historian-uns-kafka
fabian-amrc Mar 31, 2025
f924630
Fix errors getting process.env variables
fabian-amrc Mar 31, 2025
8b794e0
Rename KafkaClient to MqttClient
fabian-amrc Mar 31, 2025
97d6000
Add kafka ingress
fabian-amrc Mar 31, 2025
a0b5bd5
Add kafkaUns to historians values
fabian-amrc Mar 31, 2025
8922b39
Add kafka kerberos key
fabian-amrc Mar 31, 2025
8a46173
Add Kafka group uuid
fabian-amrc Mar 31, 2025
e0ea057
Remove unnecessary dependencies from kafka-uns
fabian-amrc Mar 31, 2025
7af03ed
Fix kafka module naming and description
fabian-amrc Mar 31, 2025
73db45c
Add historian-uns-kafka to github workflow
fabian-amrc Mar 31, 2025
aed8abc
Add historian-uns-kafka to makefile
fabian-amrc Mar 31, 2025
df36f0e
Add historian-uns-kafka deployment
fabian-amrc Mar 31, 2025
b25e69e
Update mqtt-ingestor to match kafka schema
fabian-amrc Mar 31, 2025
e525d06
Merge remote-tracking branch 'origin/testing/v4' into fje/data-lake
fabian-amrc Apr 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ jobs:
- edge-helm-charts
- historian-sparkplug
- historian-uns
- historian-uns-kafka
- uns-ingester-sparkplug
permissions:
contents: read
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ subdirs+= edge-test
subdirs+= edge-tplink-smartplug
subdirs+= historian-sparkplug
subdirs+= historian-uns
subdirs+= historian-uns-kafka
subdirs+= uns-ingester-sparkplug

recurse= env MAKE="${MAKE}" ${tools}/recurse
Expand Down
1 change: 1 addition & 0 deletions acs-service-setup/lib/uuids.js
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ export const UNS = {
Group: {
Sparkplug: "e414d355-b991-429b-8f5d-97e823ff71f5",
Historian: "03f5f08a-f61e-4134-8f66-b2951e3bbb69",
Kafka: "98b4c9cf-d283-4fe1-a33c-8bdf48fe54bc",

Ingester: "e03152b8-cdab-11ef-8ee9-77393064507e",
Reader: "d6a4d87c-cd02-11ef-9a87-2f86ebe5ee08",
Expand Down
3 changes: 3 additions & 0 deletions acs-spark/applications/mqtt-ingester/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM local/spark:3.5.3
WORKDIR /app
COPY ./mqtt-ingester.py .
68 changes: 68 additions & 0 deletions acs-spark/applications/mqtt-ingester/mqtt-ingester.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

# Create SparkSession with Delta Lake and S3 (Minio) configurations
spark = SparkSession.builder \
.appName("KafkaToDeltaMinio") \
.config("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID")) \
.config("fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY")) \
.config("fs.s3a.endpoint", "https://minio.factory-plus.svc.cluster.local") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

# Read streaming data from Kafka
kafka_df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "acs-kafka.factory-plus.svc.cluster.local:9092") \
.option("subscribe", "mqtt_data") \
.load()

# Define the schema of your JSON data coming from Kafka to match the posted data structure
json_schema = StructType([
StructField("measurement", StringType(), True),
StructField("type", StringType(), True),
StructField("value", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("tags", StructType([
StructField("metricName", StringType(), True),
StructField("topLevelInstance", StringType(), True),
StructField("bottomLevelInstance", StringType(), True),
StructField("usesInstances", StringType(), True),
StructField("topLevelSchema", StringType(), True),
StructField("bottomLevelSchema", StringType(), True),
StructField("usesSchemas", StringType(), True),
StructField("enterprise", StringType(), True),
StructField("site", StringType(), True),
StructField("area", StringType(), True),
StructField("workCenter", StringType(), True),
StructField("workUnit", StringType(), True),
StructField("path", StringType(), True),
StructField("unit", StringType(), True)
]), True)
])

parsed_df = kafka_df.selectExpr("CAST(value AS STRING) as json_str") \
.select(from_json("json_str", json_schema).alias("data")) \
.select("data.*")

# Write the stream to a Delta table stored in Minio (S3a)
query = parsed_df.writeStream \
.format("delta") \
.option("checkpointLocation", "s3a://deltalake/delta-checkpoint/") \
.outputMode("append") \
.start("s3a://deltalake/bronze")

query.awaitTermination()

if __name__ == "__main__":
main()
20 changes: 20 additions & 0 deletions acs-spark/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM bitnami/spark:3.5.3

USER root

WORKDIR /app
RUN pip install pyspark==3.5.3

# Delta Lake jars
ADD https://repo1.maven.org/maven2/io/delta/delta-spark_2.12/3.3.0/delta-spark_2.12-3.3.0.jar /opt/bitnami/spark/jars/
ADD https://repo1.maven.org/maven2/io/delta/delta-standalone_2.12/3.3.0/delta-standalone_2.12-3.3.0.jar /opt/bitnami/spark/jars/
ADD https://repo1.maven.org/maven2/io/delta/delta-storage/3.3.0/delta-storage-3.3.0.jar /opt/bitnami/spark/jars/

# Kafka jars
ADD https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.3/spark-sql-kafka-0-10_2.12-3.5.3.jar /opt/bitnami/spark/jars/
ADD https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/4.0.0/kafka-clients-4.0.0.jar /opt/bitnami/spark/jars/
ADD https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.12/3.5.3/spark-streaming-kafka-0-10-assembly_2.12-3.5.3.jar /opt/bitnami/spark/jars/
ADD https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.5.3/spark-token-provider-kafka-0-10_2.12-3.5.3.jar /opt/bitnami/spark/jars/

# Apache jars
ADD https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.12.1/commons-pool2-2.12.1.jar /opt/bitnami/spark/jars/
2 changes: 1 addition & 1 deletion deploy/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
*.swp
*.tgz
/.idea
/charts/*
/charts/*
16 changes: 11 additions & 5 deletions deploy/Chart.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,22 @@ dependencies:
repository: https://grafana.github.io/helm-charts
version: 6.52.4
- name: operator
repository: https://alexgodbehere.github.io/helm-repository
version: 5.0.4
repository: file://../patched-charts/operator/
version: 7.0.0
- name: tenant
repository: https://operator.min.io
version: 5.0.3
version: 7.0.0
- name: influxdb2
repository: https://helm.influxdata.com/
version: 2.1.1
- name: cert-manager
repository: https://charts.jetstack.io
version: v1.14.4
digest: sha256:5e6b7a20a96c54dba427cd3913ff2f8246e6a1125b82e2cafd6eb710b722d77d
generated: "2024-04-19T12:00:46.600094+01:00"
- name: spark-operator
repository: https://kubeflow.github.io/spark-operator
version: 2.1.0
- name: kafka
repository: oci://registry-1.docker.io/bitnamicharts
version: 31.5.0
digest: sha256:692e988329975b18480d5fd556385013081ba471e6b965cd813320c83f27082c
generated: "2025-03-21T13:09:57.074248Z"
19 changes: 14 additions & 5 deletions deploy/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ dependencies:
version: 6.52.4
repository: https://grafana.github.io/helm-charts
condition: grafana.enabled
# This pulls from my personal repository because the official helm chart doesn't have CRDs in the `crds` directory
# https://github.com/minio/operator/pull/1564
# This pulls from our patched operator chart because the official helm chart doesn't have CRDs in the `crds` directory
# https://github.com/minio/operator/pull/1564
- name: operator
repository: https://alexgodbehere.github.io/helm-repository
version: 5.0.4
repository: file://../patched-charts/operator/
version: 7.0.0
condition: minio.enabled
- name: tenant
repository: https://operator.min.io
version: 5.0.3
version: 7.0.0
condition: minio.enabled
- name: influxdb2
version: 2.1.1
Expand All @@ -42,3 +42,12 @@ dependencies:
repository: https://charts.jetstack.io
version: v1.14.4
condition: acs.letsEncrypt.enabled
- name: spark-operator
version: 2.1.0
repository: https://kubeflow.github.io/spark-operator
condition: spark-operator.enabled
- name: kafka
repository: oci://registry-1.docker.io/bitnamicharts
version: 31.5.0
condition: kafka.enabled

18 changes: 18 additions & 0 deletions deploy/templates/auth/principals/service-clients.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,21 @@ spec:
- 03f5f08a-f61e-4134-8f66-b2951e3bbb69

{{- end }}
---
{{- if .Values.historians.unsKafka.enabled }}
apiVersion: factoryplus.app.amrc.co.uk/v1
kind: KerberosKey
metadata:
name: sv1historianunskafka
namespace: {{ .Release.Namespace }}
spec:
type: Random
principal: sv1historianunskafka@{{ .Values.identity.realm | required "values.identity.realm is required!" }}
secret: historian-uns-kafka-keytabs/client
account:
class: e463b4ae-a322-46cc-8976-4ba76838e908
name: Historian UNS (Kafka)
groups:
- 98b4c9cf-d283-4fe1-a33c-8bdf48fe54bc

{{- end }}
60 changes: 60 additions & 0 deletions deploy/templates/historians/uns-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# In the future if we have any more ingesters then this should be
# enabled if any of the ingesters are enabled.
{{- if .Values.historians.unsKafka.enabled }}
apiVersion: apps/v1
kind: Deployment
metadata:
name: historian-uns-kafka
namespace: {{ .Release.Namespace }}
labels:
component: historian-uns-kafka
spec:
replicas: 1
selector:
matchLabels:
component: historian-uns-kafka
template:
metadata:
labels:
component: historian-uns-kafka
factory-plus.service: historian-uns-kafka
spec:
{{- with .Values.acs.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
volumes:
- name: krb5-conf
configMap:
name: krb5-conf
- name: keytabs
secret:
secretName: historian-uns-keytabs
containers:
- name: historian-uns-kafka
{{ include "amrc-connectivity-stack.image" (list . .Values.historians.unsKafka) | indent 10 }}
command: [ "/usr/bin/k5start", "-Uf", "/keytabs/client" ]
args: [ "node", "--es-module-specifier-resolution=node", "app.js" ]
env:
- name: KRB5_CONFIG
value: /config/krb5-conf/krb5.conf
- name: CLIENT_KEYTAB
value: /keytabs/client
- name: LOG_LEVEL
value: {{ .Values.historians.unsKafka.logLevel | required "values.historians.unsKafka.logLevel" }}
- name: BATCH_SIZE
value: {{ .Values.historians.unsKafka.batchSize | quote | required "values.historians.unsKafka.batchSize" }}
- name: FLUSH_INTERVAL
value: {{ .Values.historians.unsKafka.flushInterval | quote | required "values.historians.unsKafka.flushInterval" }}
- name: KAFKA_TOPIC
value: {{ (index .Values.kafka.provisioning.topics 0).name }}
- name: KAFKA_BROKER_URL
value: http://acs-kafka.{{ .Release.Namespace }}.svc.cluster.local
- name: DIRECTORY_URL
value: http://directory.{{ .Release.Namespace }}.svc.cluster.local
volumeMounts:
- mountPath: /config/krb5-conf
name: krb5-conf
- mountPath: /keytabs
name: keytabs
{{- end -}}
24 changes: 24 additions & 0 deletions deploy/templates/kafka/ingress.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{{ if .Values.kafka.enabled }}
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: kafka-ingressroute
namespace: {{ .Release.Namespace }}
spec:
entryPoints:
- {{ .Values.acs.secure | ternary "websecure" "web" }}
routes:
- match: Host(`kafka.{{.Values.acs.baseUrl | required "values.acs.baseUrl is required"}}`)
kind: Rule
services:
- name: kafka
port: 9092
scheme: https
namespace: {{ .Release.Namespace }}
{{- if .Values.acs.secure }}
tls:
secretName: {{ coalesce .Values.minio.tlsSecretName .Values.acs.tlsSecretName }}
domains:
- main: kafka.{{.Values.acs.baseUrl | required "values.acs.baseUrl is required"}}
{{- end -}}
{{- end -}}
4 changes: 3 additions & 1 deletion deploy/templates/minio/ingress.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{{ if .Values.minio.enabled }}
{{ if and .Values.tenant.exposeApi .Values.tenant.tenant.ingress .Values.tenant.tenant.ingress.api (not (default false .Values.tenant.tenant.ingress.api.enabled)) }}
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
Expand All @@ -21,8 +22,9 @@ spec:
domains:
- main: data.{{.Values.acs.baseUrl | required "values.acs.baseUrl is required"}}
{{- end -}}
{{- end -}}
---
{{if .Values.minio.exposeConsole}}
{{ if and .Values.tenant.exposeConsole .Values.tenant.tenant.ingress .Values.tenant.tenant.ingress.console (not (default false .Values.tenant.tenant.ingress.console.enabled)) }}
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
Expand Down
55 changes: 55 additions & 0 deletions deploy/templates/spark/kafka-ingester-sparkjob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{{ if and .Values.minio.enabled .Values.spark.enabled .Values.kafka.enabled }}
{{- if not (lookup "v1" "SparkApplication" .Release.Namespace "kafka-ingester") }}

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: kafka-ingester
namespace: {{ .Release.Namespace }}
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: ghcr.io/factory-plus/kafka-ingester:v4.0.0-dev1
imagePullPolicy: IfNotPresent
mainApplicationFile: local:///app/mqtt-ingester.py
sparkVersion: 3.5.3
restartPolicy:
type: Always
driver:
cores: 1
memory: 1024m
labels:
version: 3.5.3
serviceAccount: spark-operator
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: manager-minio-secret
key: CONSOLE_ACCESS_KEY
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: manager-minio-secret
key: CONSOLE_SECRET_KEY
executor:
cores: 1
instances: 1
memory: 1024m
labels:
version: 3.5.3
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: manager-minio-secret
key: CONSOLE_ACCESS_KEY
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: manager-minio-secret
key: CONSOLE_SECRET_KEY

{{- end }}
{{- end -}}
Loading