Skip to content

Commit

Permalink
Update to libraries-bom 26.31.0 , Add support for protos and enums fo…
Browse files Browse the repository at this point in the history
…r spanner dataflow templates (#30181)

* Update to libraries-bom 26.31.0

* Support for proto & enum types in SpannerSchema & MutationSizeEstimator

* Support for proto & enum types in SpannerSchema & MutationSizeEstimator

* Upgrade dependencies for libraries bom to be consistent with libraries bom, upgrade arrow version to latest, bug fix for spanner proto type

* Spotless Apply

* Entry in CHANGES.md for arrow version bump, comment to keep the arrow version consistent with google_cloud_bigquery
  • Loading branch information
dhruvdua authored Feb 10, 2024
1 parent aefcada commit 34cfcc3
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Arrow version was bumped to 15.0.0 from 5.0.0 ([#30181](https://github.com/apache/beam/pull/30181)).

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,12 +602,12 @@ class BeamModulePlugin implements Plugin<Project> {
def dbcp2_version = "2.9.0"
def errorprone_version = "2.10.0"
// Try to keep gax_version consistent with gax-grpc version in google_cloud_platform_libraries_bom
def gax_version = "2.39.0"
def gax_version = "2.41.0"
def google_ads_version = "26.0.0"
def google_clients_version = "2.0.0"
def google_cloud_bigdataoss_version = "2.2.16"
// Try to keep google_cloud_spanner_version consistent with google_cloud_spanner_bom in google_cloud_platform_libraries_bom
def google_cloud_spanner_version = "6.56.0"
def google_cloud_spanner_version = "6.57.0"
def google_code_gson_version = "2.10.1"
def google_oauth_clients_version = "1.34.1"
// Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom
Expand All @@ -626,7 +626,7 @@ class BeamModulePlugin implements Plugin<Project> {
def log4j2_version = "2.20.0"
def nemo_version = "0.1"
// Try to keep netty_version consistent with the netty version in grpc_bom (includes grpc_netty) in google_cloud_platform_libraries_bom
def netty_version = "4.1.87.Final"
def netty_version = "4.1.100.Final"
def postgres_version = "42.2.16"
def powermock_version = "2.0.9"
// Try to keep protobuf_version consistent with the protobuf version in google_cloud_platform_libraries_bom
Expand All @@ -640,7 +640,8 @@ class BeamModulePlugin implements Plugin<Project> {
def spark3_version = "3.2.2"
def spotbugs_version = "4.0.6"
def testcontainers_version = "1.17.3"
def arrow_version = "5.0.0"
// Try to keep arrow_version consistent with the arrow version in google_cloud_bigquery, managed by google_cloud_platform_libraries_bom
def arrow_version = "15.0.0"
def jmh_version = "1.34"
def jupiter_version = "5.7.0"

Expand Down Expand Up @@ -756,9 +757,9 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version
google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite", // google_cloud_platform_libraries_bom sets version
// The release notes shows the versions set by the BOM:
// https://github.com/googleapis/java-cloud-bom/releases/tag/v26.30.0
// https://github.com/googleapis/java-cloud-bom/releases/tag/v26.31.0
// Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.30.0",
google_cloud_platform_libraries_bom : "com.google.cloud:libraries-bom:26.31.0",
google_cloud_spanner : "com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom sets version
google_cloud_spanner_test : "com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
google_code_gson : "com.google.code.gson:gson:$google_code_gson_version",
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/container/license_scripts/dep_urls_java.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jaxen:
'1.1.6':
type: "3-Clause BSD"
libraries-bom:
'26.30.0':
'26.31.0':
license: "https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-opensource-java/master/LICENSE"
type: "Apache License 2.0"
paranamer:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ private static long estimatePrimitiveValue(Value v) {
return 1;
case INT64:
case FLOAT64:
case ENUM:
return 8;
case DATE:
case TIMESTAMP:
Expand All @@ -116,6 +117,7 @@ private static long estimatePrimitiveValue(Value v) {
case PG_NUMERIC:
return v.isNull() ? 0 : v.getString().length();
case BYTES:
case PROTO:
return v.isNull() ? 0 : v.getBytes().length();
case NUMERIC:
// see
Expand All @@ -141,6 +143,7 @@ private static long estimateArrayValue(Value v) {
case BOOL:
return v.getBoolArray().size();
case INT64:
case ENUM:
return 8L * v.getInt64Array().size();
case FLOAT64:
return 8L * v.getFloat64Array().size();
Expand All @@ -155,6 +158,7 @@ private static long estimateArrayValue(Value v) {
}
return totalLength;
case BYTES:
case PROTO:
totalLength = 0;
for (ByteArray bytes : v.getBytesArray()) {
if (bytes == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public abstract class SpannerSchema implements Serializable {

abstract ImmutableList<String> tables();

abstract Dialect dialect();
Expand Down Expand Up @@ -161,6 +162,7 @@ static Column create(String name, String spannerType, Dialect dialect) {
public abstract Type getType();

private static Type parseSpannerType(String spannerType, Dialect dialect) {
String originalSpannerType = spannerType;
spannerType = spannerType.toUpperCase();
switch (dialect) {
case GOOGLE_STANDARD_SQL:
Expand Down Expand Up @@ -193,10 +195,23 @@ private static Type parseSpannerType(String spannerType, Dialect dialect) {
}
if (spannerType.startsWith("ARRAY")) {
// Substring "ARRAY<xxx>"
String spannerArrayType = spannerType.substring(6, spannerType.length() - 1);
String spannerArrayType =
originalSpannerType.substring(6, originalSpannerType.length() - 1);
Type itemType = parseSpannerType(spannerArrayType, dialect);
return Type.array(itemType);
}
if (spannerType.startsWith("PROTO")) {
// Substring "PROTO<xxx>"
String spannerProtoType =
originalSpannerType.substring(6, originalSpannerType.length() - 1);
return Type.proto(spannerProtoType);
}
if (spannerType.startsWith("ENUM")) {
// Substring "ENUM<xxx>"
String spannerEnumType =
originalSpannerType.substring(5, originalSpannerType.length() - 1);
return Type.protoEnum(spannerEnumType);
}
throw new IllegalArgumentException("Unknown spanner type " + spannerType);
case POSTGRESQL:
if (spannerType.endsWith("[]")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@ public void primitiveArrays() throws Exception {
"{\"key123\":\"value123\", \"key321\":\"value321\"}",
"{\"key456\":\"value456\", \"key789\":600}"))
.build();
Mutation protoEnum =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
.toProtoEnumArray(ImmutableList.of(1L, 2L, 3L), "customer.app.TestEnum")
.build();
Mutation protos =
Mutation.newInsertOrUpdateBuilder("test")
.set("bytes")
.toProtoMessageArray(
ImmutableList.of(
ByteArray.copyFrom("some_bytes".getBytes(UTF_8)),
ByteArray.copyFrom("some_bytes".getBytes(UTF_8))),
"customer.app.TestMessage")
.build();
assertThat(MutationSizeEstimator.sizeOf(int64), is(24L));
assertThat(MutationSizeEstimator.sizeOf(float64), is(16L));
assertThat(MutationSizeEstimator.sizeOf(bool), is(4L));
Expand All @@ -153,12 +167,19 @@ public void primitiveArrays() throws Exception {
assertThat(MutationSizeEstimator.sizeOf(json), is(62L));
assertThat(MutationSizeEstimator.sizeOf(bytes), is(20L));
assertThat(MutationSizeEstimator.sizeOf(jsonb), is(77L));
assertThat(MutationSizeEstimator.sizeOf(protoEnum), is(24L));
assertThat(MutationSizeEstimator.sizeOf(protos), is(20L));
}

@Test
public void nullPrimitiveArrays() throws Exception {
Mutation int64 =
Mutation.newInsertOrUpdateBuilder("test").set("one").toInt64Array((long[]) null).build();
Mutation protoEnum =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
.toProtoEnumArray(null, "customer.app.TestEnum")
.build();
Mutation float64 =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
Expand Down Expand Up @@ -187,6 +208,7 @@ public void nullPrimitiveArrays() throws Exception {
assertThat(MutationSizeEstimator.sizeOf(pgNumeric), is(0L));
assertThat(MutationSizeEstimator.sizeOf(json), is(0L));
assertThat(MutationSizeEstimator.sizeOf(jsonb), is(0L));
assertThat(MutationSizeEstimator.sizeOf(protoEnum), is(0L));
}

@Test
Expand Down Expand Up @@ -235,6 +257,38 @@ public void bytes() throws Exception {
assertThat(MutationSizeEstimator.sizeOf(deleteBytes), is(10L));
}

@Test
public void protos() throws Exception {
Mutation empty =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
.to(ByteArray.fromBase64(""), "customer.app.TestMessage")
.build();
Mutation nullValue =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
.to((ByteArray) null, "customer.app.TestMessage")
.build();
Mutation sample =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
.to(ByteArray.fromBase64("abcdabcd"), "customer.app.TestMessage")
.build();
Mutation nullArray =
Mutation.newInsertOrUpdateBuilder("test")
.set("one")
.toProtoMessageArray(null, "customer.app.TestMessage")
.build();
Mutation deleteBytes =
Mutation.delete("test", Key.of(ByteArray.copyFrom("some_bytes".getBytes(UTF_8))));

assertThat(MutationSizeEstimator.sizeOf(empty), is(0L));
assertThat(MutationSizeEstimator.sizeOf(nullValue), is(0L));
assertThat(MutationSizeEstimator.sizeOf(sample), is(6L));
assertThat(MutationSizeEstimator.sizeOf(nullArray), is(0L));
assertThat(MutationSizeEstimator.sizeOf(deleteBytes), is(10L));
}

@Test
public void jsons() throws Exception {
Mutation empty =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,18 @@ public void testSingleTable() throws Exception {
.addColumn("test", "maxKey", "STRING(MAX)")
.addColumn("test", "numericVal", "NUMERIC")
.addColumn("test", "jsonVal", "JSON")
.addColumn("test", "protoVal", "PROTO<customer.app.TestMessage>")
.addColumn("test", "enumVal", "ENUM<customer.app.TestEnum>")
.build();

assertEquals(1, schema.getTables().size());
assertEquals(4, schema.getColumns("test").size());
assertEquals(6, schema.getColumns("test").size());
assertEquals(1, schema.getKeyParts("test").size());
assertEquals(Type.json(), schema.getColumns("test").get(3).getType());
assertEquals(
Type.proto("customer.app.TestMessage"), schema.getColumns("test").get(4).getType());
assertEquals(
Type.protoEnum("customer.app.TestEnum"), schema.getColumns("test").get(5).getType());
}

@Test
Expand Down

0 comments on commit 34cfcc3

Please sign in to comment.