Skip to content

Commit

Permalink
Configure alternative BigNumeric precision and scale defaults (#1281)
Browse files Browse the repository at this point in the history
  • Loading branch information
isha97 authored Aug 28, 2024
1 parent b7a22cb commit 924cb13
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Next

* PR #1281 : Configure alternative BigNumeric precision and scale defaults

## 0.40.0 - 2024-08-05

* PR #1259 : Encode snapshotTimeMillis in view materialization query. Thanks @tom-s-powell !
Expand Down
20 changes: 20 additions & 0 deletions README-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,26 @@ word-break:break-word
</td>
<td>Read</td>
</tr>
<tr>
<td><code>bigNumericDefaultPrecision</code>
</td>
<td>An alternative default precision for BigNumeric fields, as the BigQuery default is too wide for Spark. Values can be between 1 and 38.
This default is used only when the field has an unparameterized BigNumeric type.
Please note that there might be data loss if the actual data's precision is more than what is specified.
<br/> (Optional)
</td>
<td>Read</td>
</tr>
<tr>
<td><code>bigNumericDefaultScale</code>
</td>
<td>An alternative default scale for BigNumeric fields. Values can be between 0 and 38, and less than bigNumericFieldsPrecision.
This default is used only when the field has an unparameterized BigNumeric type.
Please note that there might be data loss if the actual data's scale is more than what is specified.
<br/> (Optional)
</td>
<td>Read</td>
</tr>
</table>

Options can also be set outside of the code, using the `--conf` parameter of `spark-submit` or `--properties` parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,18 +379,30 @@ private DataType getStandardDataType(Field field) {
int precision =
Optional.ofNullable(field.getPrecision())
.map(Long::intValue)
.orElse(BigQueryUtil.DEFAULT_BIG_NUMERIC_PRECISION);
.orElse(configuration.getBigNumericDefaultPrecision());
if (precision > DecimalType.MAX_PRECISION()) {
throw new IllegalArgumentException(
String.format(
"BigNumeric precision is too wide (%d), Spark can only handle decimal types with max precision of %d",
"BigNumeric precision is too wide (%d), Spark can only handle decimal types with max precision of %d, "
+ "If your data is within Spark's precision, you can set it using bigNumericDefaultPrecision",
precision, DecimalType.MAX_PRECISION()));
}
int scale =
Optional.ofNullable(field.getScale())
.map(Long::intValue)
.orElse(configuration.getBigNumericDefaultScale());
if (scale > DecimalType.MAX_SCALE()) {
throw new IllegalArgumentException(
String.format(
"BigNumeric scale is too wide (%d), Spark can only handle decimal types with max scale of %d, "
+ "If your data is within Spark's scale, you can set it using bigNumericDefaultScale",
scale, DecimalType.MAX_SCALE()));
}
return createDecimalTypeFromNumericField(
field,
LegacySQLTypeName.BIGNUMERIC,
BigQueryUtil.DEFAULT_BIG_NUMERIC_PRECISION,
BigQueryUtil.DEFAULT_BIG_NUMERIC_SCALE);
configuration.getBigNumericDefaultPrecision(),
configuration.getBigNumericDefaultScale());
} else if (LegacySQLTypeName.STRING.equals(field.getType())) {
return DataTypes.StringType;
} else if (LegacySQLTypeName.BOOLEAN.equals(field.getType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,76 @@
*/
package com.google.cloud.spark.bigquery;

import com.google.cloud.bigquery.connector.common.BigQueryUtil;
import com.google.common.base.Objects;
import java.io.Serializable;

public class SchemaConvertersConfiguration implements Serializable {

private final boolean allowMapTypeConversion;
private int bigNumericDefaultPrecision;
private int bigNumericDefaultScale;

private SchemaConvertersConfiguration(boolean allowMapTypeConversion) {
private SchemaConvertersConfiguration(
boolean allowMapTypeConversion, int bigNumericDefaultPrecision, int bigNumericDefaultScale) {
this.allowMapTypeConversion = allowMapTypeConversion;
this.bigNumericDefaultPrecision = bigNumericDefaultPrecision;
this.bigNumericDefaultScale = bigNumericDefaultScale;
}

public static SchemaConvertersConfiguration from(SparkBigQueryConfig config) {
return SchemaConvertersConfiguration.of(config.getAllowMapTypeConversion());
return SchemaConvertersConfiguration.of(
config.getAllowMapTypeConversion(),
config.getBigNumericDefaultPrecision(),
config.getBigNumericDefaultScale());
}

public static SchemaConvertersConfiguration of(boolean allowMapTypeConversion) {
return new SchemaConvertersConfiguration(allowMapTypeConversion);
return new SchemaConvertersConfiguration(
allowMapTypeConversion,
BigQueryUtil.DEFAULT_BIG_NUMERIC_PRECISION,
BigQueryUtil.DEFAULT_BIG_NUMERIC_SCALE);
}

public static SchemaConvertersConfiguration of(
boolean allowMapTypeConversion, int bigNumericDefaultPrecision, int bigNumericDefaultScale) {
return new SchemaConvertersConfiguration(
allowMapTypeConversion, bigNumericDefaultPrecision, bigNumericDefaultScale);
}

public static SchemaConvertersConfiguration createDefault() {
return new SchemaConvertersConfiguration(SparkBigQueryConfig.ALLOW_MAP_TYPE_CONVERSION_DEFAULT);
return new SchemaConvertersConfiguration(
SparkBigQueryConfig.ALLOW_MAP_TYPE_CONVERSION_DEFAULT,
BigQueryUtil.DEFAULT_BIG_NUMERIC_PRECISION,
BigQueryUtil.DEFAULT_BIG_NUMERIC_SCALE);
}

public boolean getAllowMapTypeConversion() {
return allowMapTypeConversion;
}

public int getBigNumericDefaultPrecision() {
return bigNumericDefaultPrecision;
}

public int getBigNumericDefaultScale() {
return bigNumericDefaultScale;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SchemaConvertersConfiguration that = (SchemaConvertersConfiguration) o;
return Objects.equal(allowMapTypeConversion, that.allowMapTypeConversion);
return Objects.equal(allowMapTypeConversion, that.allowMapTypeConversion)
&& Objects.equal(bigNumericDefaultPrecision, that.bigNumericDefaultPrecision)
&& Objects.equal(bigNumericDefaultScale, that.bigNumericDefaultScale);
}

@Override
public int hashCode() {
return Objects.hashCode(allowMapTypeConversion);
return Objects.hashCode(
allowMapTypeConversion, bigNumericDefaultPrecision, bigNumericDefaultScale);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.cloud.bigquery.connector.common.BigQueryConnectorException;
import com.google.cloud.bigquery.connector.common.BigQueryCredentialsSupplier;
import com.google.cloud.bigquery.connector.common.BigQueryProxyConfig;
import com.google.cloud.bigquery.connector.common.BigQueryUtil;
import com.google.cloud.bigquery.connector.common.MaterializationConfiguration;
import com.google.cloud.bigquery.connector.common.ReadSessionCreatorConfig;
import com.google.cloud.bigquery.connector.common.ReadSessionCreatorConfigBuilder;
Expand Down Expand Up @@ -167,6 +168,9 @@ public static WriteMethod from(@Nullable String writeMethod) {

public static final String GPN_ATTRIBUTION = "GPN";

public static final String BIG_NUMERIC_DEFAULT_PRECISION = "bigNumericDefaultPrecision";
public static final String BIG_NUMERIC_DEFAULT_SCALE = "bigNumericDefaultScale";

TableId tableId;
// as the config needs to be Serializable, internally it uses
// com.google.common.base.Optional<String> but externally it uses the regular java.util.Optional
Expand Down Expand Up @@ -245,6 +249,8 @@ public static WriteMethod from(@Nullable String writeMethod) {
private boolean allowMapTypeConversion = ALLOW_MAP_TYPE_CONVERSION_DEFAULT;
private long bigQueryJobTimeoutInMinutes = BIGQUERY_JOB_TIMEOUT_IN_MINUTES_DEFAULT;
private com.google.common.base.Optional<String> gpn;
private int bigNumericDefaultPrecision;
private int bigNumericDefaultScale;

@VisibleForTesting
SparkBigQueryConfig() {
Expand Down Expand Up @@ -611,6 +617,14 @@ public static SparkBigQueryConfig from(

config.snapshotTimeMillis =
getOption(options, "snapshotTimeMillis").transform(Long::valueOf).orNull();
config.bigNumericDefaultPrecision =
getAnyOption(globalOptions, options, BIG_NUMERIC_DEFAULT_PRECISION)
.transform(Integer::parseInt)
.or(BigQueryUtil.DEFAULT_BIG_NUMERIC_PRECISION);
config.bigNumericDefaultScale =
getAnyOption(globalOptions, options, BIG_NUMERIC_DEFAULT_SCALE)
.transform(Integer::parseInt)
.or(BigQueryUtil.DEFAULT_BIG_NUMERIC_SCALE);

return config;
}
Expand Down Expand Up @@ -1074,6 +1088,14 @@ public OptionalLong getSnapshotTimeMillis() {
return snapshotTimeMillis == null ? OptionalLong.empty() : OptionalLong.of(snapshotTimeMillis);
}

public int getBigNumericDefaultPrecision() {
return bigNumericDefaultPrecision;
}

public int getBigNumericDefaultScale() {
return bigNumericDefaultScale;
}

public ReadSessionCreatorConfig toReadSessionCreatorConfig() {
return new ReadSessionCreatorConfigBuilder()
.setViewsEnabled(viewsEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,26 @@ public void testCreateDecimalTypeFromNumericField() throws Exception {
assertDecimal(numeric().setPrecision(20L).setScale(5L), 20, 5);
}

@Test
public void testCreateDecimalTypeFromCustomBigNumericField() throws Exception {
Field customBigNumeric = Field.newBuilder("foo", LegacySQLTypeName.BIGNUMERIC).build();
StructField field =
SchemaConverters.from(SchemaConvertersConfiguration.of(false, 38, 10))
.convert(customBigNumeric);
assertThat(field.dataType()).isEqualTo(DataTypes.createDecimalType(38, 10));
}

@Test
public void testCreateDecimalTypeFromCustomBigNumericField_wide() throws Exception {
Field customBigNumeric = Field.newBuilder("foo", LegacySQLTypeName.BIGNUMERIC).build();
assertThrows(
IllegalArgumentException.class,
() -> {
SchemaConverters.from(SchemaConvertersConfiguration.of(false, 40, 10))
.convert(customBigNumeric);
});
}

private Field.Builder numeric() {
return Field.newBuilder("foo", LegacySQLTypeName.NUMERIC);
}
Expand Down

0 comments on commit 924cb13

Please sign in to comment.