From 1d44c1b83c000a6ddcb63373353b22612957d681 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Mon, 6 Jan 2025 11:32:01 +0530 Subject: [PATCH 1/6] Add verb to bulk load GBU data to KV store --- .../main/scala/ai/chronon/spark/Driver.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index 539ce9e192..0adfb40349 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -714,6 +714,35 @@ object Driver { } } + object GroupByUploadToKVBulkLoad { + @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) + class Args extends Subcommand("groupby-upload-bulk-load") with OnlineSubcommand { + val srcOfflineTable: ScallopOption[String] = + opt[String](required = true, descr = "Name of the source GroupBy Upload table") + + val groupbyName: ScallopOption[String] = + opt[String](required = true, descr = "Name of the GroupBy that we're triggering this upload for") + + val partitionString: ScallopOption[String] = + opt[String](required = true, descr = s"Partition string (in 'yyyy-MM-dd' format) that we are uploading") + } + + def run(args: Args): Unit = { + logger.info(s"Triggering bulk load for GroupBy: ${args.groupbyName()} for partition: ${args.partitionString()} from table: ${args.srcOfflineTable()}") + val kvStore = args.api.genKvStore + val startTime = System.currentTimeMillis() + try { + kvStore.bulkPut(args.srcOfflineTable(), args.groupbyName(), args.partitionString()) + } catch { + case e: Exception => + logger.error(s"Failed to upload GroupBy: ${args.groupbyName()} for partition: ${args.partitionString()} from table: ${args.srcOfflineTable()}", e) + throw e + } + logger.info( + s"Uploaded GroupByUpload data to KV store for GroupBy: ${args.groupbyName()}; partition: ${args.partitionString()} in ${(System.currentTimeMillis() - startTime)/1000} seconds") + } + } + object LogFlattener { class Args extends Subcommand("log-flattener") with OfflineSubcommand { val logTable: ScallopOption[String] = @@ -913,6 +942,8 @@ object Driver { addSubcommand(FetcherCliArgs) object MetadataUploaderArgs extends MetadataUploader.Args addSubcommand(MetadataUploaderArgs) + object GroupByUploadToKVBulkLoadArgs extends GroupByUploadToKVBulkLoad.Args + addSubcommand(GroupByUploadToKVBulkLoadArgs) object GroupByStreamingArgs extends GroupByStreaming.Args addSubcommand(GroupByStreamingArgs) object AnalyzerArgs extends Analyzer.Args @@ -959,6 +990,8 @@ object Driver { GroupByStreaming.run(args.GroupByStreamingArgs) case args.MetadataUploaderArgs => MetadataUploader.run(args.MetadataUploaderArgs) + case args.GroupByUploadToKVBulkLoadArgs => + GroupByUploadToKVBulkLoad.run(args.GroupByUploadToKVBulkLoadArgs) case args.FetcherCliArgs => FetcherCli.run(args.FetcherCliArgs) case args.LogFlattenerArgs => LogFlattener.run(args.LogFlattenerArgs) case args.ConsistencyMetricsArgs => ConsistencyMetricsCompute.run(args.ConsistencyMetricsArgs) From 712f86f48595f4a1dd61cbd0e44a7d0af670061f Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Mon, 6 Jan 2025 11:32:44 +0530 Subject: [PATCH 2/6] style: Apply scalafix and scalafmt changes --- .../src/main/scala/ai/chronon/spark/Driver.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index 0adfb40349..baec658200 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -724,22 +724,25 @@ object Driver { opt[String](required = true, descr = "Name of the GroupBy that we're triggering this upload for") val partitionString: ScallopOption[String] = - opt[String](required = true, descr = s"Partition string (in 'yyyy-MM-dd' format) that we are uploading") + opt[String](required = true, descr = "Partition string (in 'yyyy-MM-dd' format) that we are uploading") } def run(args: Args): Unit = { - logger.info(s"Triggering bulk load for GroupBy: ${args.groupbyName()} for partition: ${args.partitionString()} from table: ${args.srcOfflineTable()}") + logger.info(s"Triggering bulk load for GroupBy: ${args.groupbyName()} for partition: ${args + .partitionString()} from table: ${args.srcOfflineTable()}") val kvStore = args.api.genKvStore val startTime = System.currentTimeMillis() try { kvStore.bulkPut(args.srcOfflineTable(), args.groupbyName(), args.partitionString()) } catch { case e: Exception => - logger.error(s"Failed to upload GroupBy: ${args.groupbyName()} for partition: ${args.partitionString()} from table: ${args.srcOfflineTable()}", e) + logger.error(s"Failed to upload GroupBy: ${args.groupbyName()} for partition: ${args + .partitionString()} from table: ${args.srcOfflineTable()}", + e) throw e } - logger.info( - s"Uploaded GroupByUpload data to KV store for GroupBy: ${args.groupbyName()}; partition: ${args.partitionString()} in ${(System.currentTimeMillis() - startTime)/1000} seconds") + logger.info(s"Uploaded GroupByUpload data to KV store for GroupBy: ${args.groupbyName()}; partition: ${args + .partitionString()} in ${(System.currentTimeMillis() - startTime) / 1000} seconds") } } @@ -989,7 +992,7 @@ object Driver { shouldExit = false GroupByStreaming.run(args.GroupByStreamingArgs) - case args.MetadataUploaderArgs => MetadataUploader.run(args.MetadataUploaderArgs) + case args.MetadataUploaderArgs => MetadataUploader.run(args.MetadataUploaderArgs) case args.GroupByUploadToKVBulkLoadArgs => GroupByUploadToKVBulkLoad.run(args.GroupByUploadToKVBulkLoadArgs) case args.FetcherCliArgs => FetcherCli.run(args.FetcherCliArgs) From aa71445491f3b5dc3792854c4396edf69ececc2d Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Tue, 7 Jan 2025 11:04:31 +0530 Subject: [PATCH 3/6] Fix query to not add col family for value --- .../integrations/cloud_gcp/BigTableKVStoreImpl.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala index 8227578ecf..c0c8dad9b4 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala @@ -304,7 +304,6 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, s""" |EXPORT DATA OPTIONS ( | format='CLOUD_BIGTABLE', - | auto_create_column_families=true, | overwrite=true, | uri="https://bigtable.googleapis.com/projects/${adminClient.getProjectId}/instances/${adminClient.getInstanceId}/appProfiles/GROUPBY_INGEST/tables/$batchTable", | bigtable_options='''{ @@ -313,8 +312,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, | "familyId": "cf", | "encoding": "BINARY", | "columns": [ - | {"qualifierString": "value", "fieldName": "value_bytes"} - | + | {"qualifierString": "value", "fieldName": ""} | ] | } | ] @@ -322,7 +320,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, |) AS |SELECT | CONCAT(CAST(CONCAT('$datasetName', '#') AS BYTES), key_bytes) as rowkey, - | value_bytes as value, + | value_bytes as cf, | TIMESTAMP_MILLIS($endDsPlusOne) as _CHANGE_TIMESTAMP |FROM $sourceOfflineTable |WHERE ds = '$partition' @@ -334,8 +332,9 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, .newBuilder(exportQuery) .build() - val jobId = JobId.of(adminClient.getProjectId, s"export_${sourceOfflineTable.sanitize}_to_bigtable_$partition") val startTs = System.currentTimeMillis() + // we append the timestamp to the jobID as BigQuery doesn't allow us to re-run the same job + val jobId = JobId.of(adminClient.getProjectId, s"export_${sourceOfflineTable.sanitize}_to_bigtable_${partition}_$startTs") val job: Job = bigQueryClient.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()) logger.info(s"Export job started with Id: $jobId and link: ${job.getSelfLink}") val retryConfig = From 078d37827e61d2e47737d707c6a880f76fe5f65a Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Tue, 7 Jan 2025 11:05:19 +0530 Subject: [PATCH 4/6] style: Apply scalafix and scalafmt changes --- .../chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala index c0c8dad9b4..7438830387 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala @@ -334,7 +334,8 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, val startTs = System.currentTimeMillis() // we append the timestamp to the jobID as BigQuery doesn't allow us to re-run the same job - val jobId = JobId.of(adminClient.getProjectId, s"export_${sourceOfflineTable.sanitize}_to_bigtable_${partition}_$startTs") + val jobId = + JobId.of(adminClient.getProjectId, s"export_${sourceOfflineTable.sanitize}_to_bigtable_${partition}_$startTs") val job: Job = bigQueryClient.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()) logger.info(s"Export job started with Id: $jobId and link: ${job.getSelfLink}") val retryConfig = From a7a8bf32f32fd247df3d8b7eced6c6d56bd7a7b5 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Wed, 8 Jan 2025 12:43:18 +0530 Subject: [PATCH 5/6] Add support to configure api impl using conf + submitter test --- .../integrations/cloud_gcp/GcpApiImpl.scala | 9 +++++++-- .../cloud_gcp/DataprocSubmitterTest.scala | 17 +++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala index e5aab3d751..ac434dc2e4 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala @@ -21,9 +21,14 @@ class GcpApiImpl(conf: Map[String, String]) extends Api(conf) { override def genKvStore: KVStore = { val projectId = sys.env - .getOrElse("GCP_PROJECT_ID", throw new IllegalArgumentException("GCP_PROJECT_ID environment variable not set")) + .get("GCP_PROJECT_ID") + .orElse(conf.get("GCP_PROJECT_ID")) + .getOrElse(throw new IllegalArgumentException("GCP_PROJECT_ID environment variable not set")) + val instanceId = sys.env - .getOrElse("GCP_INSTANCE_ID", throw new IllegalArgumentException("GCP_INSTANCE_ID environment variable not set")) + .get("GCP_INSTANCE_ID") + .orElse(conf.get("GCP_INSTANCE_ID")) + .getOrElse(throw new IllegalArgumentException("GCP_INSTANCE_ID environment variable not set")) // Create settings builder based on whether we're in emulator mode (e.g. docker) or not val (dataSettingsBuilder, adminSettingsBuilder, maybeBQClient) = sys.env.get("BIGTABLE_EMULATOR_HOST") match { diff --git a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala index 490d3c425e..6451f44f3e 100644 --- a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala +++ b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala @@ -58,4 +58,21 @@ class DataprocSubmitterTest extends AnyFunSuite with MockitoSugar { println(submittedJobId) } + ignore("Used to test GBU bulk load locally. Do not enable this in CI/CD!") { + + val submitter = DataprocSubmitter() + val submittedJobId = + submitter.submit(List.empty, + "groupby-upload-bulk-load", + "-ZGCP_PROJECT_ID=canary-443022", + "-ZGCP_INSTANCE_ID=zipline-canary-instance", + "--online-jar=cloud_gcp-assembly-0.1.0-SNAPSHOT.jar", + "--online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl", + "--src-offline-table=data.test_gbu", + "--groupby-name=quickstart.purchases.v1", + "--partition-string=2024-01-01") + println(submittedJobId) + assertEquals(submittedJobId, "mock-job-id") + } + } From 79e703f7c5817439372263be82827f5fa5ad7d4b Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Wed, 8 Jan 2025 12:49:53 +0530 Subject: [PATCH 6/6] Use mocks --- .../integrations/cloud_gcp/DataprocSubmitterTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala index 6451f44f3e..ec1edd9fec 100644 --- a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala +++ b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala @@ -64,8 +64,8 @@ class DataprocSubmitterTest extends AnyFunSuite with MockitoSugar { val submittedJobId = submitter.submit(List.empty, "groupby-upload-bulk-load", - "-ZGCP_PROJECT_ID=canary-443022", - "-ZGCP_INSTANCE_ID=zipline-canary-instance", + "-ZGCP_PROJECT_ID=bigtable-project-id", + "-ZGCP_INSTANCE_ID=bigtable-instance-id", "--online-jar=cloud_gcp-assembly-0.1.0-SNAPSHOT.jar", "--online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl", "--src-offline-table=data.test_gbu",