Skip to content

Commit

Permalink
Migrate BINARY, VARBINARY, CHAR, VARCHAR jdbc logical types to portab…
Browse files Browse the repository at this point in the history
…le (apache#23548)

* Migrate BINARY, VARBINARY, CHAR, VARCHAR jdbc logical types to portable

* Move jdbc logical type to portable logical types in Java

* Create portable logical types in Python

* Support value_from_runner_api and value_to_runner_api in Python
  SchemaTransform (currently only support atomic type values)

Fix nullable/test/leftovers

* Fix typos

* Add standard coder test

* Fix RowCoderImpl cannot encode bytes column in cython compiled

* Set coder_impl.is_compiled=True when running on compiled stream
  module

* Add docstring, add todo and warnings for unsupported
  • Loading branch information
Abacn authored and ryanthompson592 committed Oct 28, 2022
1 parent e69bd6e commit 635c41c
Show file tree
Hide file tree
Showing 17 changed files with 900 additions and 431 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,17 @@ examples:

---

coder:
urn: "beam:coder:row:v1"
# f_char: logical(fixed_char(5)), f_varchar: logical(var_char(5)), f_bytes: logical(fixed_bytes(5)), f_varbytes: logical(var_bytes(5))
payload: "\n=\n\x06f_char\x1a3\x08\x01:/\n\x1fbeam:logical_type:fixed_char:v1\x1a\x02\x10\x07\"\x02\x10\x03*\x04\n\x02\x18\x05\nB\n\tf_varchar\x1a1\x08\x01:-\n\x1dbeam:logical_type:var_char:v1\x1a\x02\x10\x07\"\x02\x10\x03*\x04\n\x02\x18\n \x01(\x01\nC\n\x07f_bytes\x1a4\x08\x01:0\n beam:logical_type:fixed_bytes:v1\x1a\x02\x10\t\"\x02\x10\x03*\x04\n\x02\x18\x05 \x02(\x02\nD\n\nf_varbytes\x1a2\x08\x01:.\n\x1ebeam:logical_type:var_bytes:v1\x1a\x02\x10\t\"\x02\x10\x03*\x04\n\x02\x18\n \x03(\x03\x12$f0ffb3a4-f46f-41ca-a942-85e3e939452a"
examples:
"\x04\x00\x05ABCDE\x05ABCDE\x05ABCDE\x05ABCDE": {f_char: "ABCDE", f_varchar: "ABCDE", f_bytes: "ABCDE", f_varbytes: "ABCDE"}
"\x04\x00\x05A\n \x02A\n\x05A\n\x00\x00\x00\x02A\n": {f_char: "A\n ", f_varchar: "A\n", f_bytes: "A\n\x00\x00\x00", f_varbytes: "A\n"}
"\x04\x01\x06\x05null?\x04null": {f_char: "null?", f_varchar: null, f_bytes: null, f_varbytes: "null"}

---

coder:
urn: "beam:coder:sharded_key:v1"
components: [{urn: "beam:coder:string_utf8:v1"}]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,34 @@ message LogicalTypes {
// two's complement encoded big integer.
DECIMAL = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:logical_type:decimal:v1"];

// A URN for FixedLengthBytes type
// - Representation type: BYTES
// - Argument type: INT32.
// A fixed-length bytes with its length as the argument.
FIXED_BYTES = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:logical_type:fixed_bytes:v1"];

// A URN for VariableLengthBytes type
// - Representation type: BYTES
// - Argument type: INT32.
// A variable-length bytes with its maximum length as the argument.
VAR_BYTES = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:logical_type:var_bytes:v1"];

// A URN for FixedLengthString type
// - Representation type: STRING
// - Argument type: INT32.
// A fixed-length string with its length as the argument.
FIXED_CHAR = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:logical_type:fixed_char:v1"];

// A URN for VariableLengthString type
// - Representation type: STRING
// - Argument type: INT32.
// A variable-length string with its maximum length as the argument.
VAR_CHAR = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:logical_type:var_char:v1"];
}
}

Expand Down
1 change: 1 addition & 0 deletions sdks/go/test/regression/coders/fromyaml/fromyaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var filteredCases = []struct{ filter, reason string }{
{"30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9", "https://github.com/apache/beam/issues/21206: Support encoding position."},
{"80be749a-5700-4ede-89d8-dd9a4433a3f8", "https://github.com/apache/beam/issues/19817: Support millis_instant."},
{"800c44ae-a1b7-4def-bbf6-6217cca89ec4", "https://github.com/apache/beam/issues/19817: Support decimal."},
{"f0ffb3a4-f46f-41ca-a942-85e3e939452a", "https://github.com/apache/beam/issues/23526: Support char/varchar, binary/varbinary."},
}

// Coder is a representation a serialized beam coder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,15 @@
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable;
import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.UnknownLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
Expand All @@ -57,6 +61,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.apache.commons.lang3.ClassUtils;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Utility methods for translating schemas. */
Expand Down Expand Up @@ -85,6 +90,10 @@ public class SchemaTranslation {
.put(MicrosInstant.IDENTIFIER, MicrosInstant.class)
.put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class)
.put(PythonCallable.IDENTIFIER, PythonCallable.class)
.put(FixedBytes.IDENTIFIER, FixedBytes.class)
.put(VariableBytes.IDENTIFIER, VariableBytes.class)
.put(FixedString.IDENTIFIER, FixedString.class)
.put(VariableString.IDENTIFIER, VariableString.class)
.build();

public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) {
Expand Down Expand Up @@ -350,7 +359,10 @@ private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType p
Object fieldValue =
Objects.requireNonNull(fieldValueFromProto(fieldType, logicalType.getArgument()));
Class clazz = fieldValue.getClass();
if (fieldValue instanceof List) {
if (ClassUtils.isPrimitiveWrapper(clazz)) {
// argument is a primitive wrapper type (e.g. Integer)
clazz = ClassUtils.wrapperToPrimitive(clazz);
} else if (fieldValue instanceof List) {
// argument is ArrayValue or iterableValue
clazz = List.class;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,67 +20,68 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import java.util.Arrays;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.checkerframework.checker.nullness.qual.Nullable;

/** A LogicalType representing a fixed-size byte array. */
@Experimental(Kind.SCHEMAS)
public class FixedBytes implements LogicalType<byte[], byte[]> {
public static final String IDENTIFIER = "FixedBytes";
private final int byteArraySize;
/** A LogicalType representing a fixed-length byte array. */
public class FixedBytes extends PassThroughLogicalType<byte[]> {
public static final String IDENTIFIER =
SchemaApi.LogicalTypes.Enum.FIXED_BYTES
.getValueDescriptor()
.getOptions()
.getExtension(RunnerApi.beamUrn);

private FixedBytes(int byteArraySize) {
this.byteArraySize = byteArraySize;
}
private final @Nullable String name;
private final int byteArrayLength;

public static FixedBytes of(int byteArraySize) {
return new FixedBytes(byteArraySize);
/**
* Return an instance of FixedBytes with specified byte array length.
*
* <p>The name, if set, refers to the TYPE name in the underlying database, for example, BINARY.
*/
public static FixedBytes of(@Nullable String name, int byteArrayLength) {
return new FixedBytes(name, byteArrayLength);
}

public int getLength() {
return byteArraySize;
/** Return an instance of FixedBytes with specified byte array length. */
public static FixedBytes of(int byteArrayLength) {
return of(null, byteArrayLength);
}

@Override
public String getIdentifier() {
return IDENTIFIER;
private FixedBytes(@Nullable String name, int byteArrayLength) {
super(IDENTIFIER, FieldType.INT32, byteArrayLength, FieldType.BYTES);
this.name = name;
this.byteArrayLength = byteArrayLength;
}

@Override
public FieldType getArgumentType() {
return FieldType.INT32;
}

@Override
public Integer getArgument() {
return byteArraySize;
public int getLength() {
return byteArrayLength;
}

@Override
public FieldType getBaseType() {
return FieldType.BYTES;
public @Nullable String getName() {
return name;
}

@Override
public byte[] toBaseType(byte[] input) {
checkArgument(input.length == byteArraySize);
checkArgument(input.length == byteArrayLength);
return input;
}

@Override
public byte[] toInputType(byte[] base) {
checkArgument(base.length <= byteArraySize);
if (base.length == byteArraySize) {
checkArgument(base.length <= byteArrayLength);
if (base.length == byteArrayLength) {
return base;
} else {
return Arrays.copyOf(base, byteArraySize);
return Arrays.copyOf(base, byteArrayLength);
}
}

@Override
public String toString() {
return "FixedBytes: " + byteArraySize;
return "FixedBytes: " + byteArrayLength;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.logicaltypes;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.commons.lang3.StringUtils;
import org.checkerframework.checker.nullness.qual.Nullable;

/** A LogicalType representing a fixed-length string. */
public class FixedString extends PassThroughLogicalType<String> {
public static final String IDENTIFIER =
SchemaApi.LogicalTypes.Enum.FIXED_CHAR
.getValueDescriptor()
.getOptions()
.getExtension(RunnerApi.beamUrn);
private final @Nullable String name;
private final int stringLength;

/**
* Return an instance of FixedString with specified string length.
*
* <p>The name, if set, refers to the TYPE name in the underlying database, for example, CHAR.
*/
public static FixedString of(@Nullable String name, int stringLength) {
return new FixedString(name, stringLength);
}

/** Return an instance of FixedString with specified string length. */
public static FixedString of(int stringLength) {
return new FixedString(null, stringLength);
}

private FixedString(@Nullable String name, int stringLength) {
super(IDENTIFIER, FieldType.INT32, stringLength, FieldType.STRING);
this.name = name;
this.stringLength = stringLength;
}

public int getLength() {
return stringLength;
}

public @Nullable String getName() {
return name;
}

@Override
public String toInputType(String base) {
checkArgument(base.length() <= stringLength);

return StringUtils.rightPad(base, stringLength);
}

@Override
public String toString() {
return "FixedString: " + stringLength;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.logicaltypes;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.checkerframework.checker.nullness.qual.Nullable;

/** A LogicalType representing a variable-length byte array with specified maximum length. */
public class VariableBytes extends PassThroughLogicalType<byte[]> {
public static final String IDENTIFIER =
SchemaApi.LogicalTypes.Enum.VAR_BYTES
.getValueDescriptor()
.getOptions()
.getExtension(RunnerApi.beamUrn);
private final @Nullable String name;
private final int maxByteArrayLength;

/**
* Return an instance of VariableBytes with specified max byte array length.
*
* <p>The name, if set, refers to the TYPE name in the underlying database, for example, VARBINARY
* and LONGVARBINARY.
*/
public static VariableBytes of(@Nullable String name, int maxByteArrayLength) {
return new VariableBytes(name, maxByteArrayLength);
}

/** Return an instance of VariableBytes with specified max byte array length. */
public static VariableBytes of(int maxByteArrayLength) {
return of(null, maxByteArrayLength);
}

private VariableBytes(@Nullable String name, int maxByteArrayLength) {
super(IDENTIFIER, FieldType.INT32, maxByteArrayLength, FieldType.BYTES);
this.name = name;
this.maxByteArrayLength = maxByteArrayLength;
}

public int getMaxLength() {
return maxByteArrayLength;
}

public @Nullable String getName() {
return name;
}

@Override
public byte[] toInputType(byte[] base) {
checkArgument(base.length <= maxByteArrayLength);
return base;
}

@Override
public String toString() {
return "VariableBytes: " + maxByteArrayLength;
}
}
Loading

0 comments on commit 635c41c

Please sign in to comment.