From 924cb13b02fdba1cac01c3ac6709c6aacb9d4bbe Mon Sep 17 00:00:00 2001 From: Isha Tarte Date: Wed, 28 Aug 2024 08:34:01 -0700 Subject: [PATCH] Configure alternative BigNumeric precision and scale defaults (#1281) --- CHANGES.md | 2 + README-template.md | 20 +++++++++ .../spark/bigquery/SchemaConverters.java | 20 +++++++-- .../SchemaConvertersConfiguration.java | 44 ++++++++++++++++--- .../spark/bigquery/SparkBigQueryConfig.java | 22 ++++++++++ .../spark/bigquery/SchemaConverterTest.java | 20 +++++++++ 6 files changed, 118 insertions(+), 10 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index c4006b7af..b619bbec0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,8 @@ ## Next +* PR #1281 : Configure alternative BigNumeric precision and scale defaults + ## 0.40.0 - 2024-08-05 * PR #1259 : Encode snapshotTimeMillis in view materialization query. Thanks @tom-s-powell ! diff --git a/README-template.md b/README-template.md index d392a1e53..9a5ae6010 100644 --- a/README-template.md +++ b/README-template.md @@ -914,6 +914,26 @@ word-break:break-word Read + + bigNumericDefaultPrecision + + An alternative default precision for BigNumeric fields, as the BigQuery default is too wide for Spark. Values can be between 1 and 38. + This default is used only when the field has an unparameterized BigNumeric type. + Please note that there might be data loss if the actual data's precision is more than what is specified. +
(Optional) + + Read + + + bigNumericDefaultScale + + An alternative default scale for BigNumeric fields. Values can be between 0 and 38, and less than bigNumericFieldsPrecision. + This default is used only when the field has an unparameterized BigNumeric type. + Please note that there might be data loss if the actual data's scale is more than what is specified. +
(Optional) + + Read + Options can also be set outside of the code, using the `--conf` parameter of `spark-submit` or `--properties` parameter diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SchemaConverters.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SchemaConverters.java index 984b0b2df..4400d30e2 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SchemaConverters.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SchemaConverters.java @@ -379,18 +379,30 @@ private DataType getStandardDataType(Field field) { int precision = Optional.ofNullable(field.getPrecision()) .map(Long::intValue) - .orElse(BigQueryUtil.DEFAULT_BIG_NUMERIC_PRECISION); + .orElse(configuration.getBigNumericDefaultPrecision()); if (precision > DecimalType.MAX_PRECISION()) { throw new IllegalArgumentException( String.format( - "BigNumeric precision is too wide (%d), Spark can only handle decimal types with max precision of %d", + "BigNumeric precision is too wide (%d), Spark can only handle decimal types with max precision of %d, " + + "If your data is within Spark's precision, you can set it using bigNumericDefaultPrecision", precision, DecimalType.MAX_PRECISION())); } + int scale = + Optional.ofNullable(field.getScale()) + .map(Long::intValue) + .orElse(configuration.getBigNumericDefaultScale()); + if (scale > DecimalType.MAX_SCALE()) { + throw new IllegalArgumentException( + String.format( + "BigNumeric scale is too wide (%d), Spark can only handle decimal types with max scale of %d, " + + "If your data is within Spark's scale, you can set it using bigNumericDefaultScale", + scale, DecimalType.MAX_SCALE())); + } return createDecimalTypeFromNumericField( field, LegacySQLTypeName.BIGNUMERIC, - BigQueryUtil.DEFAULT_BIG_NUMERIC_PRECISION, - BigQueryUtil.DEFAULT_BIG_NUMERIC_SCALE); + configuration.getBigNumericDefaultPrecision(), + configuration.getBigNumericDefaultScale()); } else if (LegacySQLTypeName.STRING.equals(field.getType())) { return DataTypes.StringType; } else if (LegacySQLTypeName.BOOLEAN.equals(field.getType())) { diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SchemaConvertersConfiguration.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SchemaConvertersConfiguration.java index c3f53f5f5..b62567261 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SchemaConvertersConfiguration.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SchemaConvertersConfiguration.java @@ -15,44 +15,76 @@ */ package com.google.cloud.spark.bigquery; +import com.google.cloud.bigquery.connector.common.BigQueryUtil; import com.google.common.base.Objects; import java.io.Serializable; public class SchemaConvertersConfiguration implements Serializable { private final boolean allowMapTypeConversion; + private int bigNumericDefaultPrecision; + private int bigNumericDefaultScale; - private SchemaConvertersConfiguration(boolean allowMapTypeConversion) { + private SchemaConvertersConfiguration( + boolean allowMapTypeConversion, int bigNumericDefaultPrecision, int bigNumericDefaultScale) { this.allowMapTypeConversion = allowMapTypeConversion; + this.bigNumericDefaultPrecision = bigNumericDefaultPrecision; + this.bigNumericDefaultScale = bigNumericDefaultScale; } public static SchemaConvertersConfiguration from(SparkBigQueryConfig config) { - return SchemaConvertersConfiguration.of(config.getAllowMapTypeConversion()); + return SchemaConvertersConfiguration.of( + config.getAllowMapTypeConversion(), + config.getBigNumericDefaultPrecision(), + config.getBigNumericDefaultScale()); } public static SchemaConvertersConfiguration of(boolean allowMapTypeConversion) { - return new SchemaConvertersConfiguration(allowMapTypeConversion); + return new SchemaConvertersConfiguration( + allowMapTypeConversion, + BigQueryUtil.DEFAULT_BIG_NUMERIC_PRECISION, + BigQueryUtil.DEFAULT_BIG_NUMERIC_SCALE); + } + + public static SchemaConvertersConfiguration of( + boolean allowMapTypeConversion, int bigNumericDefaultPrecision, int bigNumericDefaultScale) { + return new SchemaConvertersConfiguration( + allowMapTypeConversion, bigNumericDefaultPrecision, bigNumericDefaultScale); } public static SchemaConvertersConfiguration createDefault() { - return new SchemaConvertersConfiguration(SparkBigQueryConfig.ALLOW_MAP_TYPE_CONVERSION_DEFAULT); + return new SchemaConvertersConfiguration( + SparkBigQueryConfig.ALLOW_MAP_TYPE_CONVERSION_DEFAULT, + BigQueryUtil.DEFAULT_BIG_NUMERIC_PRECISION, + BigQueryUtil.DEFAULT_BIG_NUMERIC_SCALE); } public boolean getAllowMapTypeConversion() { return allowMapTypeConversion; } + public int getBigNumericDefaultPrecision() { + return bigNumericDefaultPrecision; + } + + public int getBigNumericDefaultScale() { + return bigNumericDefaultScale; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SchemaConvertersConfiguration that = (SchemaConvertersConfiguration) o; - return Objects.equal(allowMapTypeConversion, that.allowMapTypeConversion); + return Objects.equal(allowMapTypeConversion, that.allowMapTypeConversion) + && Objects.equal(bigNumericDefaultPrecision, that.bigNumericDefaultPrecision) + && Objects.equal(bigNumericDefaultScale, that.bigNumericDefaultScale); } @Override public int hashCode() { - return Objects.hashCode(allowMapTypeConversion); + return Objects.hashCode( + allowMapTypeConversion, bigNumericDefaultPrecision, bigNumericDefaultScale); } @Override diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java index addee07de..f826e146f 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java @@ -46,6 +46,7 @@ import com.google.cloud.bigquery.connector.common.BigQueryConnectorException; import com.google.cloud.bigquery.connector.common.BigQueryCredentialsSupplier; import com.google.cloud.bigquery.connector.common.BigQueryProxyConfig; +import com.google.cloud.bigquery.connector.common.BigQueryUtil; import com.google.cloud.bigquery.connector.common.MaterializationConfiguration; import com.google.cloud.bigquery.connector.common.ReadSessionCreatorConfig; import com.google.cloud.bigquery.connector.common.ReadSessionCreatorConfigBuilder; @@ -167,6 +168,9 @@ public static WriteMethod from(@Nullable String writeMethod) { public static final String GPN_ATTRIBUTION = "GPN"; + public static final String BIG_NUMERIC_DEFAULT_PRECISION = "bigNumericDefaultPrecision"; + public static final String BIG_NUMERIC_DEFAULT_SCALE = "bigNumericDefaultScale"; + TableId tableId; // as the config needs to be Serializable, internally it uses // com.google.common.base.Optional but externally it uses the regular java.util.Optional @@ -245,6 +249,8 @@ public static WriteMethod from(@Nullable String writeMethod) { private boolean allowMapTypeConversion = ALLOW_MAP_TYPE_CONVERSION_DEFAULT; private long bigQueryJobTimeoutInMinutes = BIGQUERY_JOB_TIMEOUT_IN_MINUTES_DEFAULT; private com.google.common.base.Optional gpn; + private int bigNumericDefaultPrecision; + private int bigNumericDefaultScale; @VisibleForTesting SparkBigQueryConfig() { @@ -611,6 +617,14 @@ public static SparkBigQueryConfig from( config.snapshotTimeMillis = getOption(options, "snapshotTimeMillis").transform(Long::valueOf).orNull(); + config.bigNumericDefaultPrecision = + getAnyOption(globalOptions, options, BIG_NUMERIC_DEFAULT_PRECISION) + .transform(Integer::parseInt) + .or(BigQueryUtil.DEFAULT_BIG_NUMERIC_PRECISION); + config.bigNumericDefaultScale = + getAnyOption(globalOptions, options, BIG_NUMERIC_DEFAULT_SCALE) + .transform(Integer::parseInt) + .or(BigQueryUtil.DEFAULT_BIG_NUMERIC_SCALE); return config; } @@ -1074,6 +1088,14 @@ public OptionalLong getSnapshotTimeMillis() { return snapshotTimeMillis == null ? OptionalLong.empty() : OptionalLong.of(snapshotTimeMillis); } + public int getBigNumericDefaultPrecision() { + return bigNumericDefaultPrecision; + } + + public int getBigNumericDefaultScale() { + return bigNumericDefaultScale; + } + public ReadSessionCreatorConfig toReadSessionCreatorConfig() { return new ReadSessionCreatorConfigBuilder() .setViewsEnabled(viewsEnabled) diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SchemaConverterTest.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SchemaConverterTest.java index e4c6f6628..295b6389b 100644 --- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SchemaConverterTest.java +++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/SchemaConverterTest.java @@ -484,6 +484,26 @@ public void testCreateDecimalTypeFromNumericField() throws Exception { assertDecimal(numeric().setPrecision(20L).setScale(5L), 20, 5); } + @Test + public void testCreateDecimalTypeFromCustomBigNumericField() throws Exception { + Field customBigNumeric = Field.newBuilder("foo", LegacySQLTypeName.BIGNUMERIC).build(); + StructField field = + SchemaConverters.from(SchemaConvertersConfiguration.of(false, 38, 10)) + .convert(customBigNumeric); + assertThat(field.dataType()).isEqualTo(DataTypes.createDecimalType(38, 10)); + } + + @Test + public void testCreateDecimalTypeFromCustomBigNumericField_wide() throws Exception { + Field customBigNumeric = Field.newBuilder("foo", LegacySQLTypeName.BIGNUMERIC).build(); + assertThrows( + IllegalArgumentException.class, + () -> { + SchemaConverters.from(SchemaConvertersConfiguration.of(false, 40, 10)) + .convert(customBigNumeric); + }); + } + private Field.Builder numeric() { return Field.newBuilder("foo", LegacySQLTypeName.NUMERIC); }