Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions api/python/test/canary/group_bys/gcp/purchases.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,71 @@
time_column="ts") # The event time
))

view_source = Source(
events=EventSource(
table="data.purchases_native_view", # This points to the log table in the warehouse with historical purchase events, updated in batch daily
topic=None, # See the 'returns' GroupBy for an example that has a streaming source configured. In this case, this would be the streaming source topic that can be listened to for realtime events
query=Query(
selects=selects("user_id","purchase_price"), # Select the fields we care about
time_column="ts") # The event time
))

window_sizes = [Window(length=day, time_unit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below

v1_view_dev = GroupBy(
backfill_start_date="2023-11-01",
sources=[view_source],
keys=["user_id"], # We are aggregating by user
online=True,
aggregations=[Aggregation(
input_column="purchase_price",
operation=Operation.SUM,
windows=window_sizes
), # The sum of purchases prices in various windows
Aggregation(
input_column="purchase_price",
operation=Operation.COUNT,
windows=window_sizes
), # The count of purchases in various windows
Aggregation(
input_column="purchase_price",
operation=Operation.AVERAGE,
windows=window_sizes
), # The average purchases by user in various windows
Aggregation(
input_column="purchase_price",
operation=Operation.LAST_K(10),
),
],
)

v1_view_test = GroupBy(
backfill_start_date="2023-11-01",
sources=[view_source],
keys=["user_id"], # We are aggregating by user
online=True,
aggregations=[Aggregation(
input_column="purchase_price",
operation=Operation.SUM,
windows=window_sizes
), # The sum of purchases prices in various windows
Aggregation(
input_column="purchase_price",
operation=Operation.COUNT,
windows=window_sizes
), # The count of purchases in various windows
Aggregation(
input_column="purchase_price",
operation=Operation.AVERAGE,
windows=window_sizes
), # The average purchases by user in various windows
Aggregation(
input_column="purchase_price",
operation=Operation.LAST_K(10),
),
],
)

v1_dev = GroupBy(
backfill_start_date="2023-11-01",
sources=[source],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case object BigQueryNative extends Format {
|
|""".stripMargin

val partColName = sparkSession.read
val pColOption = sparkSession.read
.format(bqFormat)
.option("project", providedProject)
// See: https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/434#issuecomment-886156191
Expand All @@ -45,37 +45,47 @@ case object BigQueryNative extends Format {
.as[String]
.collect
.headOption
.getOrElse(
throw new UnsupportedOperationException(s"No partition column for table ${tableName} found.")
) // TODO: support unpartitioned tables (uncommon case).

// Next, we query the BQ table using the requested partitionFilter to grab all the distinct partition values that match the filter.
val partitionWheres = if (partitionFilters.nonEmpty) s"WHERE ${partitionFilters}" else partitionFilters
val partitionFormat = TableUtils(sparkSession).partitionFormat
val select =
s"SELECT distinct(${partColName}) AS ${internalBQPartitionCol} FROM ${bqFriendlyName} ${partitionWheres}"
val selectedParts = sparkSession.read
.format(bqFormat)
.option("viewsEnabled", true)
.option("materializationDataset", bqTableId.getDataset)
.load(select)
.select(date_format(col(internalBQPartitionCol), partitionFormat))
.as[String]
.collect
.toList
logger.info(s"Part values: ${selectedParts}")

// Finally, we query the BQ table for each of the selected partition values and union them together.
selectedParts
.map((partValue) => {
val pFilter = f"${partColName} = '${partValue}'"
pColOption match {
case Some(partColName) => {
// Next, we query the BQ table using the requested partitionFilter to grab all the distinct partition values that match the filter.
val partitionFormat = TableUtils(sparkSession).partitionFormat
val select =
s"SELECT distinct(${partColName}) AS ${internalBQPartitionCol} FROM ${bqFriendlyName} ${partitionWheres}"
logger.info(s"Listing in scope BQ native table partitions: ${select}")
val selectedParts = sparkSession.read
.format(bqFormat)
.option("viewsEnabled", true)
.option("materializationDataset", bqTableId.getDataset)
.load(select)
.select(date_format(col(internalBQPartitionCol), partitionFormat))
.as[String]
.collect
.toList
logger.info(s"Part values: ${selectedParts}")

// Finally, we query the BQ table for each of the selected partition values and union them together.
selectedParts
.map((partValue) => {
val pFilter = f"${partColName} = '${partValue}'"
sparkSession.read
.format(bqFormat)
.option("filter", pFilter)
.load(bqFriendlyName)
.withColumn(partColName, lit(partValue))
}) // todo: make it nullable
.reduce(_ unionByName _)
}
case None =>
val select = s"SELECT * FROM ${bqFriendlyName} ${partitionWheres}"
logger.info(s"BQ Query: ${select}")
sparkSession.read
.option("viewsEnabled", true)
.option("materializationDataset", bqTableId.getDataset)
.format(bqFormat)
.option("filter", pFilter)
.load(bqFriendlyName)
.withColumn(partColName, lit(partValue))
}) // todo: make it nullable
.reduce(_ unionByName _)
.load(select)
}
}

override def primaryPartitions(tableName: String,
Expand Down
19 changes: 19 additions & 0 deletions scripts/distribution/run_gcp_quickstart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ set -xo pipefail
# Delete gcp tables to start from scratch
if [[ "$ENVIRONMENT" == "canary" ]]; then
bq rm -f -t canary-443022:data.gcp_purchases_v1_test
bq rm -f -t canary-443022:data.gcp_purchases_v1_view_test
bq rm -f -t canary-443022:data.gcp_purchases_v1_test_upload
else
bq rm -f -t canary-443022:data.gcp_purchases_v1_dev
bq rm -f -t canary-443022:data.gcp_purchases_v1_view_dev
bq rm -f -t canary-443022:data.gcp_purchases_v1_dev_upload
fi
#TODO: delete bigtable rows
Expand Down Expand Up @@ -132,6 +134,15 @@ fi

fail_if_bash_failed $?

echo -e "${GREEN}<<<<<.....................................BACKFILL-VIEW.....................................>>>>>\033[0m"
if [[ "$ENVIRONMENT" == "canary" ]]; then
zipline run --repo=$CHRONON_ROOT --version $VERSION --mode backfill --conf compiled/group_bys/gcp/purchases.v1_view_test
else
zipline run --repo=$CHRONON_ROOT --version $VERSION --mode backfill --conf compiled/group_bys/gcp/purchases.v1_view_dev
fi

fail_if_bash_failed $?

echo -e "${GREEN}<<<<<.....................................CHECK-PARTITIONS.....................................>>>>>\033[0m"
EXPECTED_PARTITION="2023-11-30"
if [[ "$ENVIRONMENT" == "canary" ]]; then
Expand All @@ -149,6 +160,14 @@ else
fi
fail_if_bash_failed

echo -e "${GREEN}<<<<<.....................................GROUP-BY-UPLOAD.....................................>>>>>\033[0m"
if [[ "$ENVIRONMENT" == "canary" ]]; then
zipline run --repo=$CHRONON_ROOT --version $VERSION --mode upload --conf compiled/group_bys/gcp/purchases.v1_view_test --ds 2023-12-01
else
zipline run --repo=$CHRONON_ROOT --version $VERSION --mode upload --conf compiled/group_bys/gcp/purchases.v1_view_dev --ds 2023-12-01
fi
fail_if_bash_failed

# Need to wait for upload to finish
echo -e "${GREEN}<<<<<.....................................UPLOAD-TO-KV.....................................>>>>>\033[0m"
if [[ "$ENVIRONMENT" == "canary" ]]; then
Expand Down
Loading