Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
nllarson committed Aug 30, 2024
1 parent c2594a0 commit fada7f5
Show file tree
Hide file tree
Showing 22 changed files with 280 additions and 4,328 deletions.
4,017 changes: 0 additions & 4,017 deletions assets/demo-env.excalidraw

This file was deleted.

Binary file removed assets/demo-env.png
Binary file not shown.
76 changes: 0 additions & 76 deletions assets/flink-notes.md

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
INSERT INTO `demo.customers.golden`
SELECT
CAST(idm.id AS BYTES) AS key,

IFNULL (sa.first_name, SPLIT_INDEX(sb.name, ' ', 0)) AS first_name,
IFNULL (sa.last_name, SPLIT_INDEX(sb.name, ' ', 1)) AS last_name,
sb.birthday AS date_of_birth,
IFNULL (sa.email, sb.email_address) AS email,
IFNULL (sa.phone, sb.contact_number) AS phone,
sa.address AS address,

CASE
WHEN sa.loyalty_tier = 'diamond' THEN sa.loyalty_tier
WHEN sb.membership_level = 'gold' THEN 'diamond'
WHEN sa.loyalty_tier = 'sapphire' THEN sa.loyalty_tier
ELSE 'emerald'
END AS loyalty_tier,

IFNULL(sa.loyalty_points, 0) + (IFNULL(sb.membership_points, 0) * 5) AS loyalty_points,
sb.favorite_drink AS favorite_drink,
CAST(sa.key AS STRING) AS system_a_customer_id,
CAST(sb.key AS STRING) AS system_b_customer_id,
IFNULL(sa.last_modified_date, sb.last_modified_date) AS last_modified_date

FROM `demo.customers.id-mapping` AS idm
JOIN `demo.customers.system-a` AS sa ON idm.aCustomerID = CAST(sa.key AS STRING)
JOIN `demo.customers.system-b` AS sb ON idm.bCustomerID = CAST(sb.key AS STRING)

This file was deleted.

This file was deleted.

36 changes: 36 additions & 0 deletions template/kafka-environment/flink.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
resource "confluent_flink_statement" "random_int_table" {
organization {
id = data.confluent_organization.this.id
}
environment {
id = module.staging_environment.id
}
compute_pool {
id = module.customer_cluster.flink_compute_pool_id
}
principal {
id = module.customer_cluster.developer_write_service_account.id
}
statement = file("./flink-statements/create-combined-customer-record.sql")
statement_name = "create-customer-golden-record"

properties = {
# SET TABLE 'staging';
"sql.current-catalog" = module.staging_environment.confluent_environment.display_name
# SET DATABASE 'inventory';
"sql.current-database" = module.customer_cluster.confluent_kafka_cluster.display_name
}

rest_endpoint = module.customer_cluster.flink_rest_endpoint
credentials {
key = module.customer_cluster.flink_admin_api_key.id
secret = module.customer_cluster.flink_admin_api_key.secret
}

depends_on = [
module.customers_a_topic,
module.customers_b_topic,
module.customers_mapping_topic,
module.customers_golden_topic
]
}
8 changes: 3 additions & 5 deletions template/kafka-environment/schemas/avro/customer-a.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
"name": "CustomerA",
"namespace": "com.improving.customer",
"fields": [
{ "name": "store_id", "type": "string" },
{ "name": "customer_id", "type": "string" },
{ "name": "first_name", "type": "string" },
{ "name": "last_name", "type": "string" },
{ "name": "email", "type": "string" },
{ "name": "phone", "type": "string" },
{ "name": "address", "type": "string" },
{ "name": "loyalty_points", "type": "int" },
{ "name": "preferred_store", "type": "string" },
{ "name": "last_modified_date", "type": "string" } // ISO 8601 date format
{ "name": "loyalty_tier", "type": "string" },
{ "name": "loyalty_points", "type": "int" },
{ "name": "last_modified_date", "type": "string" }
]
}
9 changes: 4 additions & 5 deletions template/kafka-environment/schemas/avro/customer-b.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
"name": "CustomerB",
"namespace": "com.improving.customer",
"fields": [
{ "name": "store_id", "type": "string" },
{ "name": "customer_id", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "email_address", "type": "string" },
{ "name": "contact_number", "type": "string" },
{ "name": "location", "type": "string" },
{ "name": "favorite_drink", "type": "string" },
{ "name": "last_purchase", "type": "string" },
{ "name": "last_modified_date", "type": "string" } // ISO 8601 date format
{ "name": "birthday", "type": "string" },
{ "name": "membership_level", "type": "string" },
{ "name": "membership_points", "type": "int" },
{ "name": "last_modified_date", "type": "string" }
]
}
30 changes: 7 additions & 23 deletions template/kafka-environment/schemas/avro/customer-golden.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,17 @@
"name": "Customer",
"namespace": "com.improving.model",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "first_name", "type": "string" },
{ "name": "last_name", "type": "string" },
{ "name": "date_of_birth", "type": "string" },
{ "name": "email", "type": "string" },
{ "name": "phone", "type": "string" },
{ "name": "address", "type": "string" },
{ "name": "last_modified_date", "type": "string" }, // ISO 8601 date format
{
"name": "customer_info",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "CustomerInfo",
"fields": [
{ "name": "store_id", "type": "string" },
{ "name": "customer_id", "type": "string" },
{ "name": "store_name", "type": "string" },
{ "name": "store_type", "type": "string" },
{ "name": "loyalty_points", "type": ["null", "int"], "default": null },
{ "name": "preferred_store", "type": ["null", "string"], "default": null },
{ "name": "favorite_drink", "type": ["null", "string"], "default": null },
{ "name": "last_purchase", "type": ["null", "string"], "default": null }
]
}
}
}
{ "name": "address", "type": "string" },
{ "name": "loyalty_tier", "type": "string" },
{ "name": "loyalty_points", "type": "int" },
{ "name": "favorite_drink", "type": "string" },
{ "name": "system_a_customer_id", "type": "string" },
{ "name": "system_bcustomer_id", "type": "string" },
{ "name": "last_modified_date", "type": "string" }
]
}
26 changes: 12 additions & 14 deletions template/kafka-environment/schemas/avro/customer-mapping.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@
"name": "IdMapping",
"namespace": "com.improving.customer",
"fields": [
{ "name": "id", "type": "string" },
{
"name": "id",
"type": "string"
},
{
"name": "linked_customers",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "LinkedCustomer",
"fields": [
{ "name": "store_id", "type": "string" },
{ "name": "customer_id", "type": "string" }
]
}
}
}
"name": "aCustomerID",
"type": "string"
},
{
"name": "bCustomerID",
"type": "string"
},
{ "name": "last_modified_date", "type": "string" }
]
}
28 changes: 4 additions & 24 deletions template/kafka-environment/topics.tf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module "customers_a_topic" {
source = "../modules/product_team_avro_topic"

topic_name = "shadowtraffic.customers.system-a"
topic_name = "demo.customers.system-a"
value_schema_file = file("./schemas/avro/customer-a.avsc")

kafka_id = module.customer_cluster.confluent_kafka_cluster.id
Expand All @@ -21,7 +21,7 @@ module "customers_a_topic" {
module "customers_b_topic" {
source = "../modules/product_team_avro_topic"

topic_name = "shadowtraffic.customers.system-b"
topic_name = "demo.customers.system-b"
value_schema_file = file("./schemas/avro/customer-b.avsc")

kafka_id = module.customer_cluster.confluent_kafka_cluster.id
Expand All @@ -40,7 +40,7 @@ module "customers_b_topic" {
module "customers_mapping_topic" {
source = "../modules/product_team_avro_topic"

topic_name = "shadowtraffic.customers.id-mapping"
topic_name = "demo.customers.id-mapping"
value_schema_file = file("./schemas/avro/customer-mapping.avsc")

kafka_id = module.customer_cluster.confluent_kafka_cluster.id
Expand All @@ -59,7 +59,7 @@ module "customers_mapping_topic" {
module "customers_golden_topic" {
source = "../modules/product_team_avro_topic"

topic_name = "shadowtraffic.customers.golden"
topic_name = "demo.customers.golden"
value_schema_file = file("./schemas/avro/customer-golden.avsc")

kafka_id = module.customer_cluster.confluent_kafka_cluster.id
Expand All @@ -74,23 +74,3 @@ module "customers_golden_topic" {

depends_on = [ module.customer_cluster ]
}

# resource "confluent_flink_statement" "customer_order_join" {
# statement = file("./flink-statements/customer-mapping-join.sql")

# statement_name = "customer-order-item-customer-join"

# properties = local.flink_default_properties

# depends_on = [
# # source
# module.customers_a_topic,
# module.customers_b_topic,
# module.customers_mapping_topic,

# # sink
# module.customers_golden_topic
# ]

# provider = confluent.customer_cluster
# }
8 changes: 0 additions & 8 deletions template/shadowtraffic/customers/config/config-avro.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@
],
"schedule": {
"stages": [
{
"_gen": "loadJsonFile",
"file": "/config/stages/seed-customers.json"
},
{
"_gen": "loadJsonFile",
"file": "/config/stages/seed-id-mappings.json"
},
{
"_gen": "loadJsonFile",
"file": "/config/stages/live.json"
Expand Down
Loading

0 comments on commit fada7f5

Please sign in to comment.