From 27214149d3423fd2b9e9d410bd758dcb97024f9c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Wed, 24 Jan 2024 14:05:15 -0500 Subject: [PATCH] Support dynamic destinations with Python Storage API (#30045) * support dynamic destinations and add tests * put all relevant logic in StorageWriteToBigQuery --- ..._PostCommit_Python_Xlang_Gcp_Dataflow.json | 0 ...am_PostCommit_Python_Xlang_Gcp_Direct.json | 1 + CHANGES.md | 1 + ...torageWriteApiSchemaTransformProvider.java | 59 +++- ...geWriteApiSchemaTransformProviderTest.java | 54 +++- .../io/external/xlang_bigqueryio_it_test.py | 77 ++++- sdks/python/apache_beam/io/gcp/bigquery.py | 293 ++++++++++-------- 7 files changed, 334 insertions(+), 151 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json create mode 100644 .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json new file mode 100644 index 000000000000..8b137891791f --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -0,0 +1 @@ + diff --git a/CHANGES.md b/CHANGES.md index dbad15f3dba4..d4b9b7d77e07 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Added support for writing to BigQuery dynamic destinations with Python's Storage Write API ([#30045](https://github.com/apache/beam/pull/30045)) * Adding support for Tuples DataType in ClickHouse (Java) ([#29715](https://github.com/apache/beam/pull/29715)). * Added support for handling bad records to FileIO, TextIO, AvroIO ([#29670](https://github.com/apache/beam/pull/29670)). * Added support for handling bad records to BigtableIO ([#29885](https://github.com/apache/beam/pull/29885)). diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 52ce97294aa1..d0951cdad1a6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.util.Arrays; @@ -35,6 +36,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; +import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; +import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; import org.apache.beam.sdk.metrics.Counter; @@ -56,6 +59,7 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -81,6 +85,8 @@ public class BigQueryStorageWriteApiSchemaTransformProvider private static final String INPUT_ROWS_TAG = "input"; private static final String FAILED_ROWS_TAG = "FailedRows"; private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors"; + // magic string that tells us to write to dynamic destinations + protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS"; @Override protected Class configurationClass() { @@ -161,7 +167,11 @@ public void validate() { checkArgument( !Strings.isNullOrEmpty(this.getTable()), invalidConfigMessage + "Table spec for a BigQuery Write must be specified."); - checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable())); + + // if we have an input table spec, validate it + if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) { + checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable())); + } // validate create and write dispositions if (!Strings.isNullOrEmpty(this.getCreateDisposition())) { @@ -337,13 +347,36 @@ private static class NoOutputDoFn extends DoFn { public void process(ProcessContext c) {} } + private static class RowDynamicDestinations extends DynamicDestinations { + Schema schema; + + RowDynamicDestinations(Schema schema) { + this.schema = schema; + } + + @Override + public String getDestination(ValueInSingleWindow element) { + return element.getValue().getString("destination"); + } + + @Override + public TableDestination getTable(String destination) { + return new TableDestination(destination, null); + } + + @Override + public TableSchema getSchema(String destination) { + return BigQueryUtils.toTableSchema(schema); + } + } + @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { // Check that the input exists checkArgument(input.has(INPUT_ROWS_TAG), "Missing expected input tag: %s", INPUT_ROWS_TAG); PCollection inputRows = input.get(INPUT_ROWS_TAG); - BigQueryIO.Write write = createStorageWriteApiTransform(); + BigQueryIO.Write write = createStorageWriteApiTransform(inputRows.getSchema()); if (inputRows.isBounded() == IsBounded.UNBOUNDED) { Long triggeringFrequency = configuration.getTriggeringFrequencySeconds(); @@ -358,9 +391,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } boolean useAtLeastOnceSemantics = - configuration.getUseAtLeastOnceSemantics() == null - ? false - : configuration.getUseAtLeastOnceSemantics(); + configuration.getUseAtLeastOnceSemantics() != null + && configuration.getUseAtLeastOnceSemantics(); // Triggering frequency is only applicable for exactly-once if (!useAtLeastOnceSemantics) { write = @@ -433,7 +465,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - BigQueryIO.Write createStorageWriteApiTransform() { + BigQueryIO.Write createStorageWriteApiTransform(Schema schema) { Method writeMethod = configuration.getUseAtLeastOnceSemantics() != null && configuration.getUseAtLeastOnceSemantics() @@ -442,12 +474,23 @@ BigQueryIO.Write createStorageWriteApiTransform() { BigQueryIO.Write write = BigQueryIO.write() - .to(configuration.getTable()) .withMethod(writeMethod) - .useBeamSchema() .withFormatFunction(BigQueryUtils.toTableRow()) .withWriteDisposition(WriteDisposition.WRITE_APPEND); + if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { + checkArgument( + schema.getFieldNames().equals(Arrays.asList("destination", "record")), + "When writing to dynamic destinations, we expect Row Schema with a " + + "\"destination\" string field and a \"record\" Row field."); + write = + write + .to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema())) + .withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record"))); + } else { + write = write.to(configuration.getTable()).useBeamSchema(); + } + if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { CreateDisposition createDisposition = BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index 54c636bde5fe..64ea0b11d1b9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.List; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform; @@ -136,8 +137,8 @@ public PCollectionRowTuple runWithConfig( writeTransform.setBigQueryServices(fakeBigQueryServices); String tag = provider.inputCollectionNames().get(0); - - PCollection rows = p.apply(Create.of(inputRows).withRowSchema(SCHEMA)); + PCollection rows = + p.apply(Create.of(inputRows).withRowSchema(inputRows.get(0).getSchema())); PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows); PCollectionRowTuple result = input.apply(writeTransform); @@ -155,16 +156,20 @@ public Boolean rowsEquals(List expectedRows, List actualRows) { TableRow actualRow = actualRows.get(i); Row expectedRow = expectedRows.get(Integer.parseInt(actualRow.get("number").toString()) - 1); - if (!expectedRow.getValue("name").equals(actualRow.get("name")) - || !expectedRow - .getValue("number") - .equals(Long.parseLong(actualRow.get("number").toString()))) { + if (!rowEquals(expectedRow, actualRow)) { return false; } } return true; } + public boolean rowEquals(Row expectedRow, TableRow actualRow) { + return expectedRow.getValue("name").equals(actualRow.get("name")) + && expectedRow + .getValue("number") + .equals(Long.parseLong(actualRow.get("number").toString())); + } + @Test public void testSimpleWrite() throws Exception { String tableSpec = "project:dataset.simple_write"; @@ -179,6 +184,43 @@ public void testSimpleWrite() throws Exception { rowsEquals(ROWS, fakeDatasetService.getAllRows("project", "dataset", "simple_write"))); } + @Test + public void testWriteToDynamicDestinations() throws Exception { + String dynamic = BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS; + BigQueryStorageWriteApiSchemaTransformConfiguration config = + BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(dynamic).build(); + + String baseTableSpec = "project:dataset.dynamic_write_"; + + Schema schemaWithDestinations = + Schema.builder().addStringField("destination").addRowField("record", SCHEMA).build(); + List rowsWithDestinations = + ROWS.stream() + .map( + row -> + Row.withSchema(schemaWithDestinations) + .withFieldValue("destination", baseTableSpec + row.getInt64("number")) + .withFieldValue("record", row) + .build()) + .collect(Collectors.toList()); + + runWithConfig(config, rowsWithDestinations); + p.run().waitUntilFinish(); + + assertTrue( + rowEquals( + ROWS.get(0), + fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_1").get(0))); + assertTrue( + rowEquals( + ROWS.get(1), + fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_2").get(0))); + assertTrue( + rowEquals( + ROWS.get(2), + fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_3").get(0))); + } + @Test public void testInputElementCount() throws Exception { String tableSpec = "project:dataset.input_count"; diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index 5917ca4dc729..c1e9754526e8 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -28,6 +28,7 @@ import pytest from hamcrest.core import assert_that as hamcrest_assert +from hamcrest.core.core.allof import all_of import apache_beam as beam from apache_beam.io.gcp.bigquery import StorageWriteToBigQuery @@ -52,9 +53,6 @@ @pytest.mark.uses_gcp_java_expansion_service -# @unittest.skipUnless( -# os.environ.get('EXPANSION_PORT'), -# "EXPANSION_PORT environment var is not provided.") class BigQueryXlangStorageWriteIT(unittest.TestCase): BIGQUERY_DATASET = 'python_xlang_storage_write' @@ -114,7 +112,8 @@ def setUp(self): _LOGGER.info( "Created dataset %s in project %s", self.dataset_id, self.project) - _LOGGER.info("expansion port: %s", os.environ.get('EXPANSION_PORT')) + self.assertTrue( + os.environ.get('EXPANSION_PORT'), "Expansion service port not found!") self.expansion_service = ('localhost:%s' % os.environ.get('EXPANSION_PORT')) def tearDown(self): @@ -132,6 +131,8 @@ def tearDown(self): self.project) def parse_expected_data(self, expected_elements): + if not isinstance(expected_elements, list): + expected_elements = [expected_elements] data = [] for row in expected_elements: values = list(row.values()) @@ -246,6 +247,66 @@ def test_write_with_beam_rows(self): table=table_id, expansion_service=self.expansion_service)) hamcrest_assert(p, bq_matcher) + def test_write_to_dynamic_destinations(self): + base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id) + spec_with_project = '{}:{}'.format(self.project, base_table_spec) + tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS] + + bq_matchers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % tables[i], + data=self.parse_expected_data(self.ELEMENTS[i])) + for i in range(len(tables)) + ] + + with beam.Pipeline(argv=self.args) as p: + _ = ( + p + | beam.Create(self.ELEMENTS) + | beam.io.WriteToBigQuery( + table=lambda record: spec_with_project + str(record['int']), + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + schema=self.ALL_TYPES_SCHEMA, + use_at_least_once=False, + expansion_service=self.expansion_service)) + hamcrest_assert(p, all_of(*bq_matchers)) + + def test_write_to_dynamic_destinations_with_beam_rows(self): + base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id) + spec_with_project = '{}:{}'.format(self.project, base_table_spec) + tables = [base_table_spec + str(record['int']) for record in self.ELEMENTS] + + bq_matchers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM %s" % tables[i], + data=self.parse_expected_data(self.ELEMENTS[i])) + for i in range(len(tables)) + ] + + row_elements = [ + beam.Row( + my_int=e['int'], + my_float=e['float'], + my_numeric=e['numeric'], + my_string=e['str'], + my_bool=e['bool'], + my_bytes=e['bytes'], + my_timestamp=e['timestamp']) for e in self.ELEMENTS + ] + + with beam.Pipeline(argv=self.args) as p: + _ = ( + p + | beam.Create(row_elements) + | beam.io.WriteToBigQuery( + table=lambda record: spec_with_project + str(record.my_int), + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + use_at_least_once=False, + expansion_service=self.expansion_service)) + hamcrest_assert(p, all_of(*bq_matchers)) + def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): elements = self.ELEMENTS.copy() schema = self.ALL_TYPES_SCHEMA @@ -278,13 +339,16 @@ def run_streaming(self, table_name, num_streams=0, use_at_least_once=False): expansion_service=self.expansion_service)) hamcrest_assert(p, bq_matcher) - def test_streaming_with_fixed_num_streams(self): + def skip_if_not_dataflow_runner(self) -> bool: # skip if dataflow runner is not specified if not self._runner or "dataflowrunner" not in self._runner.lower(): self.skipTest( - "The exactly-once route has the requirement " + "Streaming with exactly-once route has the requirement " "`beam:requirement:pardo:on_window_expiration:v1`, " "which is currently only supported by the Dataflow runner") + + def test_streaming_with_fixed_num_streams(self): + self.skip_if_not_dataflow_runner() table = 'streaming_fixed_num_streams' self.run_streaming(table_name=table, num_streams=4) @@ -292,6 +356,7 @@ def test_streaming_with_fixed_num_streams(self): "Streaming to the Storage Write API sink with autosharding is broken " "with Dataflow Runner V2.") def test_streaming_with_auto_sharding(self): + self.skip_if_not_dataflow_runner() table = 'streaming_with_auto_sharding' self.run_streaming(table_name=table) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index ac06425e95a9..4643c8ddf0a5 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2222,73 +2222,17 @@ def find_in_nested_dict(schema): BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS]) elif method_to_use == WriteToBigQuery.Method.STORAGE_WRITE_API: - if self.schema is None: - try: - schema = schema_from_element_type(pcoll.element_type) - is_rows = True - except TypeError as exn: - raise ValueError( - "A schema is required in order to prepare rows" - "for writing with STORAGE_WRITE_API.") from exn - elif callable(self.schema): - raise NotImplementedError( - "Writing to dynamic destinations is not" - "supported for this write method.") - elif isinstance(self.schema, vp.ValueProvider): - schema = self.schema.get() - is_rows = False - else: - schema = self.schema - is_rows = False - - table = bigquery_tools.get_hashable_destination(self.table_reference) - # None type is not supported - triggering_frequency = self.triggering_frequency or 0 - # SchemaTransform expects Beam Rows, so map to Rows first - if is_rows: - input_beam_rows = pcoll - else: - input_beam_rows = ( - pcoll - | "Convert dict to Beam Row" >> beam.Map( - lambda row: bigquery_tools.beam_row_from_dict(row, schema) - ).with_output_types( - RowTypeConstraint.from_fields( - bigquery_tools.get_beam_typehints_from_tableschema(schema))) - ) - output_beam_rows = ( - input_beam_rows - | StorageWriteToBigQuery( - table=table, - create_disposition=self.create_disposition, - write_disposition=self.write_disposition, - triggering_frequency=triggering_frequency, - use_at_least_once=self.use_at_least_once, - with_auto_sharding=self.with_auto_sharding, - num_storage_api_streams=self._num_storage_api_streams, - expansion_service=self.expansion_service)) - - if is_rows: - failed_rows = output_beam_rows[StorageWriteToBigQuery.FAILED_ROWS] - failed_rows_with_errors = output_beam_rows[ - StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS] - else: - # return back from Beam Rows to Python dict elements - failed_rows = ( - output_beam_rows[StorageWriteToBigQuery.FAILED_ROWS] - | beam.Map(lambda row: row.as_dict())) - failed_rows_with_errors = ( - output_beam_rows[StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS] - | beam.Map( - lambda row: { - "error_message": row.error_message, - "failed_row": row.failed_row.as_dict() - })) - - return WriteResult( - method=WriteToBigQuery.Method.STORAGE_WRITE_API, - failed_rows=failed_rows, - failed_rows_with_errors=failed_rows_with_errors) + return pcoll | StorageWriteToBigQuery( + table=self.table_reference, + schema=self.schema, + table_side_inputs=self.table_side_inputs, + create_disposition=self.create_disposition, + write_disposition=self.write_disposition, + triggering_frequency=self.triggering_frequency, + use_at_least_once=self.use_at_least_once, + with_auto_sharding=self.with_auto_sharding, + num_storage_api_streams=self._num_storage_api_streams, + expansion_service=self.expansion_service) else: raise ValueError(f"Unsupported method {method_to_use}") @@ -2382,7 +2326,7 @@ class WriteResult: """ def __init__( self, - method: WriteToBigQuery.Method = None, + method: str = None, destination_load_jobid_pairs: PCollection[Tuple[str, JobReference]] = None, destination_file_pairs: PCollection[Tuple[str, Tuple[str, int]]] = None, @@ -2505,24 +2449,26 @@ def __getitem__(self, key): return self.attributes[key].__get__(self, WriteResult) -def _default_io_expansion_service(append_args=None): - return BeamJarExpansionService( - 'sdks:java:io:google-cloud-platform:expansion-service:build', - append_args=append_args) - - class StorageWriteToBigQuery(PTransform): """Writes data to BigQuery using Storage API. + Supports dynamic destinations. Dynamic schemas are not supported yet. Experimental; no backwards compatibility guarantees. """ - URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v2" + IDENTIFIER = "beam:schematransform:org.apache.beam:bigquery_storage_write:v2" FAILED_ROWS = "FailedRows" FAILED_ROWS_WITH_ERRORS = "FailedRowsWithErrors" + # fields for rows sent to Storage API with dynamic destinations + DESTINATION = "destination" + RECORD = "record" + # magic string to tell Java that these rows are going to dynamic destinations + DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS" def __init__( self, table, + table_side_inputs=None, + schema=None, create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_APPEND, triggering_frequency=0, @@ -2530,71 +2476,156 @@ def __init__( with_auto_sharding=False, num_storage_api_streams=0, expansion_service=None): - """Initialize a StorageWriteToBigQuery transform. - - :param table: - Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``. - :param create_disposition: - String specifying the strategy to take when the table doesn't - exist. Possible values are: - * ``'CREATE_IF_NEEDED'``: create if does not exist. - * ``'CREATE_NEVER'``: fail the write if does not exist. - :param write_disposition: - String specifying the strategy to take when the table already - contains data. Possible values are: - * ``'WRITE_TRUNCATE'``: delete existing rows. - * ``'WRITE_APPEND'``: add to existing rows. - * ``'WRITE_EMPTY'``: fail the write if table not empty. - :param triggering_frequency: - The time in seconds between write commits. Should only be specified - for streaming pipelines. Defaults to 5 seconds. - :param use_at_least_once: - Use at-least-once semantics. Is cheaper and provides lower latency, - but will potentially duplicate records. - :param with_auto_sharding: - Experimental. If true, enables using a dynamically determined number of - shards to write to BigQuery. Only applicable to unbounded input. - :param expansion_service: - The address (host:port) of the expansion service. If no expansion - service is provided, will attempt to run the default GCP expansion - service. - """ - super().__init__() self._table = table + self._table_side_inputs = table_side_inputs + self._schema = schema self._create_disposition = create_disposition self._write_disposition = write_disposition self._triggering_frequency = triggering_frequency self._use_at_least_once = use_at_least_once self._with_auto_sharding = with_auto_sharding self._num_storage_api_streams = num_storage_api_streams - self._expansion_service = ( - expansion_service or _default_io_expansion_service()) - self.schematransform_config = SchemaAwareExternalTransform.discover_config( - self._expansion_service, self.URN) + self._expansion_service = expansion_service or BeamJarExpansionService( + 'sdks:java:io:google-cloud-platform:expansion-service:build') def expand(self, input): - external_storage_write = SchemaAwareExternalTransform( - identifier=self.schematransform_config.identifier, - expansion_service=self._expansion_service, - rearrange_based_on_discovery=True, - autoSharding=self._with_auto_sharding, - numStreams=self._num_storage_api_streams, - createDisposition=self._create_disposition, - table=self._table, - triggeringFrequencySeconds=self._triggering_frequency, - useAtLeastOnceSemantics=self._use_at_least_once, - writeDisposition=self._write_disposition, - errorHandling={ - 'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS - }) - - input_tag = self.schematransform_config.inputs[0] - - result = {input_tag: input} | external_storage_write - result[StorageWriteToBigQuery.FAILED_ROWS] = result[ - StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS] | beam.Map( - lambda row_and_error: row_and_error[0]) - return result + if self._schema is None: + try: + schema = schema_from_element_type(input.element_type) + is_rows = True + except TypeError as exn: + raise ValueError( + "A schema is required in order to prepare rows" + "for writing with STORAGE_WRITE_API.") from exn + elif callable(self._schema): + raise NotImplementedError( + "Writing with dynamic schemas is not" + "supported for this write method.") + elif isinstance(self._schema, vp.ValueProvider): + schema = self._schema.get() + is_rows = False + else: + schema = self._schema + is_rows = False + + table = bigquery_tools.get_hashable_destination(self._table) + + # if writing to one destination, just convert to Beam rows and send over + if not callable(table): + if is_rows: + input_beam_rows = input + else: + input_beam_rows = ( + input + | "Convert dict to Beam Row" >> self.ConvertToBeamRows( + schema, False).with_output_types()) + + # For dynamic destinations, we first figure out where each row is going. + # Then we send (destination, record) rows over to Java SchemaTransform. + # We need to do this here because there are obstacles to passing the + # destinations function to Java + else: + # call function and append destination to each row + input_rows = ( + input + | "Append dynamic destinations" >> beam.ParDo( + bigquery_tools.AppendDestinationsFn(table), + *self._table_side_inputs)) + # if input type is Beam Row, just wrap everything in another Row + if is_rows: + input_beam_rows = ( + input_rows + | "Wrap in Beam Row" >> beam.Map( + lambda row: beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: row[0], + StorageWriteToBigQuery.RECORD: row[1] + })).with_output_types( + RowTypeConstraint.from_fields([ + (StorageWriteToBigQuery.DESTINATION, str), + (StorageWriteToBigQuery.RECORD, input.element_type) + ]))) + # otherwise, convert to Beam Rows + else: + input_beam_rows = ( + input_rows + | "Convert dict to Beam Row" >> self.ConvertToBeamRows( + schema, True).with_output_types()) + # communicate to Java that this write should use dynamic destinations + table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS + + output = ( + input_beam_rows + | SchemaAwareExternalTransform( + identifier=StorageWriteToBigQuery.IDENTIFIER, + expansion_service=self._expansion_service, + rearrange_based_on_discovery=True, + table=table, + createDisposition=self._create_disposition, + writeDisposition=self._write_disposition, + triggeringFrequencySeconds=self._triggering_frequency, + autoSharding=self._with_auto_sharding, + numStreams=self._num_storage_api_streams, + useAtLeastOnceSemantics=self._use_at_least_once, + errorHandling={ + 'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS + })) + + failed_rows_with_errors = output[ + StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS] + failed_rows = failed_rows_with_errors | beam.Map( + lambda row_and_error: row_and_error[0]) + if not is_rows: + # return back from Beam Rows to Python dict elements + failed_rows = failed_rows | beam.Map(lambda row: row.as_dict()) + failed_rows_with_errors = failed_rows_with_errors | beam.Map( + lambda row: { + "error_message": row.error_message, + "failed_row": row.failed_row.as_dict() + }) + + return WriteResult( + method=WriteToBigQuery.Method.STORAGE_WRITE_API, + failed_rows=failed_rows, + failed_rows_with_errors=failed_rows_with_errors) + + class ConvertToBeamRows(PTransform): + def __init__(self, schema, dynamic_destinations): + self.schema = schema + self.dynamic_destinations = dynamic_destinations + + def expand(self, input_dicts): + if self.dynamic_destinations: + return ( + input_dicts + | "Convert dict to Beam Row" >> beam.Map( + lambda row: beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: row[0], + StorageWriteToBigQuery.RECORD: bigquery_tools. + beam_row_from_dict(row[1], self.schema) + }))) + else: + return ( + input_dicts + | "Convert dict to Beam Row" >> beam.Map( + lambda row: bigquery_tools.beam_row_from_dict(row, self.schema)) + ) + + def with_output_types(self): + row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema( + self.schema) + if self.dynamic_destinations: + type_hint = RowTypeConstraint.from_fields([ + (StorageWriteToBigQuery.DESTINATION, str), + ( + StorageWriteToBigQuery.RECORD, + RowTypeConstraint.from_fields(row_type_hints)) + ]) + else: + type_hint = RowTypeConstraint.from_fields(row_type_hints) + + return super().with_output_types(type_hint) class ReadFromBigQuery(PTransform):