Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.gravitino.Catalog;
Expand Down Expand Up @@ -129,7 +128,7 @@ public Table createTable(
Dataset.create(
new RootAllocator(),
location,
convertColumnsToSchema(columns),
convertColumnsToArrowSchema(columns),
new WriteParams.Builder().withStorageOptions(storageProps).build())) {
GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder();
return builder
Expand All @@ -151,39 +150,13 @@ public Table createTable(
}
}

private org.apache.arrow.vector.types.pojo.Schema convertColumnsToSchema(Column[] columns) {
LanceDataTypeConverter converter = new LanceDataTypeConverter();
private org.apache.arrow.vector.types.pojo.Schema convertColumnsToArrowSchema(Column[] columns) {
List<Field> fields =
Arrays.stream(columns)
.map(
col -> {
boolean nullable = col.nullable();
ArrowType parentType = converter.fromGravitino(col.dataType());
List<ArrowType> childTypes = converter.getChildTypes(col.dataType());
List<Field> childFields =
childTypes.stream()
.map(
childType ->
new org.apache.arrow.vector.types.pojo.Field(
"",
org.apache.arrow.vector.types.pojo.FieldType.nullable(
childType),
null))
.collect(Collectors.toList());

if (nullable) {
return new org.apache.arrow.vector.types.pojo.Field(
col.name(),
org.apache.arrow.vector.types.pojo.FieldType.nullable(parentType),
childFields);
}

// not nullable
return new org.apache.arrow.vector.types.pojo.Field(
col.name(),
org.apache.arrow.vector.types.pojo.FieldType.notNullable(parentType),
childFields);
})
col ->
LanceDataTypeConverter.CONVERTER.toArrowField(
col.name(), col.dataType(), col.nullable()))
.collect(Collectors.toList());
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,174 +19,192 @@

package org.apache.gravitino.catalog.lakehouse.lance;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.IntervalUnit;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.UnionMode;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
import org.apache.arrow.vector.types.pojo.ArrowType.Int;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.gravitino.connector.DataTypeConverter;
import org.apache.gravitino.json.JsonUtils;
import org.apache.gravitino.rel.types.Type;
import org.apache.gravitino.rel.types.Types;
import org.apache.gravitino.rel.types.Types.FixedType;
import org.apache.gravitino.rel.types.Types.UnparsedType;

public class LanceDataTypeConverter implements DataTypeConverter<ArrowType, ArrowType> {

public static final LanceDataTypeConverter CONVERTER = new LanceDataTypeConverter();

public Field toArrowField(String name, Type type, boolean nullable) {
switch (type.name()) {
case LIST:
Types.ListType listType = (Types.ListType) type;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a type named FixedSizeList in Arrow, which Lance commonly uses. Do you handle it? This is how lance stores vector data.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, plz see the doc in this PR

FieldType listField = new FieldType(nullable, ArrowType.List.INSTANCE, null);
return new Field(
name,
listField,
Lists.newArrayList(
toArrowField("element", listType.elementType(), listType.elementNullable())));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the column name of subtype a constant value element or a random name?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's all okay, it didn't take effect in the actual parsing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.


case STRUCT:
Types.StructType structType = (Types.StructType) type;
FieldType structField = new FieldType(nullable, ArrowType.Struct.INSTANCE, null);
return new Field(
name,
structField,
Arrays.stream(structType.fields())
.map(field -> toArrowField(field.name(), field.type(), field.nullable()))
.toList());

case MAP:
Types.MapType mapType = (Types.MapType) type;
FieldType mapField = new FieldType(nullable, new ArrowType.Map(false), null);
return new Field(
name,
mapField,
Lists.newArrayList(
toArrowField(
MapVector.DATA_VECTOR_NAME,
Types.StructType.of(
Types.StructType.Field.of(
// Note: Arrow MapVector requires key field to be non-nullable
MapVector.KEY_NAME,
mapType.keyType(),
false /*nullable*/,
null /*comment*/),
Types.StructType.Field.of(
MapVector.VALUE_NAME,
mapType.valueType(),
mapType.valueNullable(),
null)),
false /*nullable*/)));

case UNION:
Types.UnionType unionType = (Types.UnionType) type;
List<Field> types =
Arrays.stream(unionType.types())
.map(
t ->
toArrowField(
t.simpleString(), t, true /*nullable*/) // union members are nullable
)
.toList();
int[] typeIds =
types.stream()
.mapToInt(
f ->
org.apache.arrow.vector.types.Types.getMinorTypeForArrowType(f.getType())
.ordinal())
.toArray();
FieldType unionField =
new FieldType(nullable, new ArrowType.Union(UnionMode.Sparse, typeIds), null);
return new Field(name, unionField, types);

case EXTERNAL:
Types.ExternalType externalType = (Types.ExternalType) type;
Field field;
try {
field = JsonUtils.anyFieldMapper().readValue(externalType.catalogString(), Field.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(
"Failed to parse external type catalog string: " + externalType.catalogString(), e);
}
Preconditions.checkArgument(
name.equals(field.getName()),
"expected field name %s but got %s",
name,
field.getName());
Preconditions.checkArgument(
nullable == field.isNullable(),
"expected field nullable %s but got %s",
nullable,
field.isNullable());
return field;

default:
// non-complex type
FieldType fieldType = new FieldType(nullable, fromGravitino(type), null);
return new Field(name, fieldType, null);
}
}

@Override
public ArrowType fromGravitino(Type type) {
switch (type.name()) {
case BOOLEAN:
return Bool.INSTANCE;
case BYTE:
return new Int(8, true);
return new Int(8, ((Types.ByteType) type).signed());
case SHORT:
return new Int(16, true);
return new Int(8 * 2, ((Types.ShortType) type).signed());
case INTEGER:
return new Int(32, true);
return new Int(8 * 4, ((Types.IntegerType) type).signed());
case LONG:
return new Int(64, true);
return new Int(8 * 8, ((Types.LongType) type).signed());
case FLOAT:
return new FloatingPoint(FloatingPointPrecision.SINGLE);
case DOUBLE:
return new FloatingPoint(FloatingPointPrecision.DOUBLE);
case STRING:
return ArrowType.Utf8.INSTANCE;
case BINARY:
return ArrowType.Binary.INSTANCE;
case DECIMAL:
// Lance uses FIXED_SIZE_BINARY for decimal types
return new ArrowType.FixedSizeBinary(16); // assuming 16 bytes for decimal
Types.DecimalType decimalType = (Types.DecimalType) type;
return new ArrowType.Decimal(decimalType.precision(), decimalType.scale(), 8 * 16);
case DATE:
return new ArrowType.Date(DateUnit.DAY);
case TIME:
return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
case TIMESTAMP:
return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
case VARCHAR:
case STRING:
return new ArrowType.Utf8();
Types.TimestampType timestampType = (Types.TimestampType) type;
TimeUnit timeUnit = TimeUnit.MICROSECOND;
if (timestampType.hasPrecisionSet()) {
timeUnit =
switch (timestampType.precision()) {
case 0 -> TimeUnit.SECOND;
case 3 -> TimeUnit.MILLISECOND;
case 6 -> TimeUnit.MICROSECOND;
case 9 -> TimeUnit.NANOSECOND;
default -> throw new UnsupportedOperationException(
"Expected precision to be one of 0, 3, 6, 9 but got: "
+ timestampType.precision());
};
}
if (timestampType.hasTimeZone()) {
// todo: need timeZoneId for timestamp with time zone
return new ArrowType.Timestamp(timeUnit, "UTC");
}
return new ArrowType.Timestamp(timeUnit, null);
case TIME:
return new ArrowType.Time(TimeUnit.NANOSECOND, 8 * 8);
case NULL:
return ArrowType.Null.INSTANCE;
case INTERVAL_YEAR:
return new ArrowType.Interval(IntervalUnit.YEAR_MONTH);
case INTERVAL_DAY:
return new ArrowType.Duration(TimeUnit.MICROSECOND);
case FIXED:
FixedType fixedType = (FixedType) type;
return new ArrowType.FixedSizeBinary(fixedType.length());
case BINARY:
return new ArrowType.Binary();
case UNPARSED:
String typeStr = ((UnparsedType) type).unparsedType().toString();
try {
Type t = JsonUtils.anyFieldMapper().readValue(typeStr, Type.class);
if (t instanceof Types.ListType) {
return ArrowType.List.INSTANCE;
} else if (t instanceof Types.MapType) {
return new ArrowType.Map(false);
} else if (t instanceof Types.StructType) {
return ArrowType.Struct.INSTANCE;
} else {
throw new UnsupportedOperationException(
"Unsupported UnparsedType conversion: " + t.simpleString());
}
} catch (Exception e) {
// FixedSizeListArray(integer, 3)
if (typeStr.startsWith("FixedSizeListArray")) {
int size =
Integer.parseInt(
typeStr.substring(typeStr.indexOf(',') + 1, typeStr.indexOf(')')).trim());
return new ArrowType.FixedSizeList(size);
}
throw new UnsupportedOperationException("Failed to parse UnparsedType: " + typeStr, e);
}
default:
throw new UnsupportedOperationException("Unsupported Gravitino type: " + type.name());
}
}

@Override
public Type toGravitino(ArrowType arrowType) {
if (arrowType instanceof Bool) {
return Types.BooleanType.get();
} else if (arrowType instanceof Int intType) {
switch (intType.getBitWidth()) {
case 8 -> {
return Types.ByteType.get();
}
case 16 -> {
return Types.ShortType.get();
}
case 32 -> {
return Types.IntegerType.get();
}
case 64 -> {
return Types.LongType.get();
}
default -> throw new UnsupportedOperationException(
"Unsupported Int bit width: " + intType.getBitWidth());
}
} else if (arrowType instanceof FloatingPoint floatingPoint) {
switch (floatingPoint.getPrecision()) {
case SINGLE:
return Types.FloatType.get();
case DOUBLE:
return Types.DoubleType.get();
default:
throw new UnsupportedOperationException(
"Unsupported FloatingPoint precision: " + floatingPoint.getPrecision());
}
} else if (arrowType instanceof ArrowType.FixedSizeBinary) {
ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary) arrowType;
return Types.FixedType.of(fixedSizeBinary.getByteWidth());
} else if (arrowType instanceof ArrowType.Date) {
return Types.DateType.get();
} else if (arrowType instanceof ArrowType.Time) {
return Types.TimeType.get();
} else if (arrowType instanceof ArrowType.Timestamp) {
return Types.TimestampType.withoutTimeZone();
} else if (arrowType instanceof ArrowType.Utf8) {
return Types.StringType.get();
} else if (arrowType instanceof ArrowType.Binary) {
return Types.BinaryType.get();
// TODO handle complex types like List, Map, Struct
} else {
throw new UnsupportedOperationException("Unsupported Arrow type: " + arrowType);
}
}

public List<ArrowType> getChildTypes(Type parentType) {
if (parentType.name() != Type.Name.UNPARSED) {
return List.of();
}

List<ArrowType> arrowTypes = Lists.newArrayList();
String typeStr = ((UnparsedType) parentType).unparsedType().toString();
try {
Type t = JsonUtils.anyFieldMapper().readValue(typeStr, Type.class);
if (t instanceof Types.ListType listType) {
arrowTypes.add(fromGravitino(listType.elementType()));
} else if (t instanceof Types.MapType mapType) {
arrowTypes.add(fromGravitino(mapType.keyType()));
arrowTypes.add(fromGravitino(mapType.valueType()));
} else {
// TODO support struct type.
throw new UnsupportedOperationException(
"Unsupported UnparsedType conversion: " + t.simpleString());
}

return arrowTypes;
} catch (Exception e) {
// FixedSizeListArray(integer, 3)

try {
if (typeStr.startsWith("FixedSizeListArray")) {
String type = typeStr.substring(typeStr.indexOf('(') + 1, typeStr.indexOf(',')).trim();
Type childType = JsonUtils.anyFieldMapper().readValue("\"" + type + "\"", Type.class);
arrowTypes.add(fromGravitino(childType));

return arrowTypes;
}
} catch (Exception e1) {
throw new UnsupportedOperationException("Failed to parse UnparsedType: " + typeStr, e1);
}

throw new UnsupportedOperationException("Failed to parse UnparsedType: " + typeStr, e);
}
// since the table metadata will load from Gravitino storage directly, we don't need to
// implement this method for now.
throw new UnsupportedOperationException("toGravitino is not implemented yet.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need such a conversion when request is sending from Lance rest server to Gravitino?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will implement this method when I refine the Lance REST server type conversion.

}
}
Loading