From 5a3d847eb71c652728733ad358292b38d3508653 Mon Sep 17 00:00:00 2001 From: mchades Date: Wed, 29 Oct 2025 21:48:39 +0800 Subject: [PATCH 1/2] support Arrow type to Gravitino dataType --- .../build.gradle.kts | 1 + .../lance/LanceCatalogOperations.java | 1 + lance/lance-common/build.gradle.kts | 4 + .../common/ops/LanceNamespaceOperations.java | 3 +- .../ops/arrow/ArrowRecordBatchList.java | 40 ---- .../GravitinoLanceNamespaceWrapper.java | 217 +++--------------- .../gravitino}/LanceDataTypeConverter.java | 136 ++++++++++- .../gravitino/lance/common/TestArrowIPC.java | 83 ------- .../TestLanceDataTypeConverter.java | 156 ++++++++++++- .../rest/LanceNamespaceOperations.java | 2 +- .../service/rest/LanceTableOperations.java | 13 +- 11 files changed, 326 insertions(+), 330 deletions(-) delete mode 100644 lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java rename {catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance => lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino}/LanceDataTypeConverter.java (61%) delete mode 100644 lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java rename {catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/lance => lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino}/TestLanceDataTypeConverter.java (69%) diff --git a/catalogs/catalog-generic-lakehouse/build.gradle.kts b/catalogs/catalog-generic-lakehouse/build.gradle.kts index 704dbda7e36..7dd0a4bc169 100644 --- a/catalogs/catalog-generic-lakehouse/build.gradle.kts +++ b/catalogs/catalog-generic-lakehouse/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { implementation(project(":core")) { exclude("*") } + implementation(project(":lance:lance-common")) implementation(libs.bundles.log4j) implementation(libs.cglib) diff --git a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java index e27f8032abf..0ed83457a48 100644 --- a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java +++ b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java @@ -57,6 +57,7 @@ import org.apache.gravitino.exceptions.NoSuchSchemaException; import org.apache.gravitino.exceptions.NoSuchTableException; import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter; import org.apache.gravitino.meta.AuditInfo; import org.apache.gravitino.meta.TableEntity; import org.apache.gravitino.rel.Column; diff --git a/lance/lance-common/build.gradle.kts b/lance/lance-common/build.gradle.kts index 43cf5f42b85..6e18f3981fb 100644 --- a/lance/lance-common/build.gradle.kts +++ b/lance/lance-common/build.gradle.kts @@ -26,6 +26,9 @@ plugins { dependencies { implementation(project(":clients:client-java-runtime", configuration = "shadow")) + implementation(project(":common")) { + exclude("*") + } implementation(project(":core")) { exclude("*") } @@ -37,5 +40,6 @@ dependencies { testImplementation(project(":server-common")) testImplementation(libs.junit.jupiter.api) + testImplementation(libs.junit.jupiter.params) testRuntimeOnly(libs.junit.jupiter.engine) } diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java index 49141665a53..36d55c03e9f 100644 --- a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java @@ -49,5 +49,6 @@ DropNamespaceResponse dropNamespace( void namespaceExists(String namespaceId, String delimiter) throws LanceNamespaceException; - ListTablesResponse listTables(String id, String delimiter, String pageToken, Integer limit); + ListTablesResponse listTables( + String namespaceId, String delimiter, String pageToken, Integer limit); } diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java deleted file mode 100644 index b0c6a089d53..00000000000 --- a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.gravitino.lance.common.ops.arrow; - -import java.util.List; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.types.pojo.Schema; - -public class ArrowRecordBatchList { - private final Schema schema; - - @SuppressWarnings("unused") - private final List recordBatches; - - public Schema getSchema() { - return schema; - } - - public ArrowRecordBatchList(Schema schema, List recordBatches) { - this.schema = schema; - this.recordBatches = recordBatches; - } -} diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java index fe6404a424c..51a8151cd1e 100644 --- a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java @@ -20,6 +20,7 @@ import static org.apache.gravitino.lance.common.config.LanceConfig.METALAKE_NAME; import static org.apache.gravitino.lance.common.config.LanceConfig.NAMESPACE_BACKEND_URI; +import static org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter.CONVERTER; import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET; import com.google.common.base.Joiner; @@ -37,12 +38,11 @@ import com.lancedb.lance.namespace.model.DescribeTableResponse; import com.lancedb.lance.namespace.model.DropNamespaceRequest; import com.lancedb.lance.namespace.model.DropNamespaceResponse; -import com.lancedb.lance.namespace.model.JsonArrowDataType; -import com.lancedb.lance.namespace.model.JsonArrowField; import com.lancedb.lance.namespace.model.JsonArrowSchema; import com.lancedb.lance.namespace.model.ListNamespacesResponse; import com.lancedb.lance.namespace.model.ListTablesResponse; import com.lancedb.lance.namespace.util.CommonUtil; +import com.lancedb.lance.namespace.util.JsonArrowSchemaConverter; import com.lancedb.lance.namespace.util.PageUtil; import java.io.ByteArrayInputStream; import java.util.ArrayList; @@ -59,17 +59,8 @@ import java.util.stream.Stream; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamReader; -import org.apache.arrow.vector.types.DateUnit; -import org.apache.arrow.vector.types.FloatingPointPrecision; -import org.apache.arrow.vector.types.TimeUnit; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.ArrowType.Bool; -import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint; -import org.apache.arrow.vector.types.pojo.ArrowType.Int; import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Catalog; import org.apache.gravitino.CatalogChange; @@ -84,18 +75,12 @@ import org.apache.gravitino.exceptions.NonEmptyCatalogException; import org.apache.gravitino.exceptions.NonEmptySchemaException; import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; -import org.apache.gravitino.json.JsonUtils; import org.apache.gravitino.lance.common.config.LanceConfig; import org.apache.gravitino.lance.common.ops.LanceNamespaceOperations; import org.apache.gravitino.lance.common.ops.LanceTableOperations; import org.apache.gravitino.lance.common.ops.NamespaceWrapper; -import org.apache.gravitino.lance.common.ops.arrow.ArrowRecordBatchList; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.Table; -import org.apache.gravitino.rel.types.Type; -import org.apache.gravitino.rel.types.Types; -import org.apache.gravitino.rel.types.Types.FixedType; -import org.apache.gravitino.rel.types.Types.UnparsedType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -498,8 +483,8 @@ private T[] buildChanges( @Override public ListTablesResponse listTables( - String id, String delimiter, String pageToken, Integer limit) { - ObjectIdentifier nsId = ObjectIdentifier.of(id, Pattern.quote(delimiter)); + String namespaceId, String delimiter, String pageToken, Integer limit) { + ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, Pattern.quote(delimiter)); Preconditions.checkArgument( nsId.levels() <= 2, "Expected at most 2-level namespace but got: %s", nsId.levels()); String catalogName = nsId.levelAtListPos(0); @@ -508,9 +493,9 @@ public ListTablesResponse listTables( List tables = Arrays.stream(catalog.asTableCatalog().listTables(Namespace.of(schemaName))) .map(ident -> Joiner.on(delimiter).join(catalogName, schemaName, ident.name())) + .sorted() .collect(Collectors.toList()); - Collections.sort(tables); PageUtil.Page page = PageUtil.splitPage(tables, pageToken, PageUtil.normalizePageSize(limit)); ListNamespacesResponse response = new ListNamespacesResponse(); response.setNamespaces(Sets.newHashSet(page.items())); @@ -540,29 +525,6 @@ public DescribeTableResponse describeTable(String tableId, String delimiter) { return response; } - private JsonArrowSchema toJsonArrowSchema(Column[] columns) { - List fields = new ArrayList<>(); - for (Column column : columns) { - ArrowType arrowType = fromGravitinoType(column.dataType()); - FieldType fieldType = new FieldType(column.nullable(), arrowType, null, null); - Field field = new Field(column.name(), fieldType, null); - - JsonArrowDataType jsonArrowDataType = new JsonArrowDataType(); - // other filed needs to be set accordingly such as list, map, struct - jsonArrowDataType.setType(arrowType.toString()); - - JsonArrowField arrowField = new JsonArrowField(); - arrowField.setName(field.getName()); - arrowField.setType(jsonArrowDataType); - - fields.add(arrowField); - } - - JsonArrowSchema jsonArrowSchema = new JsonArrowSchema(); - jsonArrowSchema.setFields(fields); - return jsonArrowSchema; - } - @Override public CreateTableResponse createTable( String tableId, @@ -585,8 +547,8 @@ public CreateTableResponse createTable( // Parser column information. List columns = Lists.newArrayList(); if (arrowStreamBody != null) { - ArrowRecordBatchList recordBatchList = parseArrowIpcStream(arrowStreamBody); - columns = extractColumns(recordBatchList); + org.apache.arrow.vector.types.pojo.Schema schema = parseArrowIpcStream(arrowStreamBody); + columns = extractColumns(schema); } String catalogName = nsId.levelAtListPos(0); @@ -646,157 +608,44 @@ public CreateTableResponse createTable( return response; } - private ArrowRecordBatchList parseArrowIpcStream(byte[] stream) { + private JsonArrowSchema toJsonArrowSchema(Column[] columns) { + List fields = + Arrays.stream(columns) + .map(col -> CONVERTER.toArrowField(col.name(), col.dataType(), col.nullable())) + .collect(Collectors.toList()); + + return JsonArrowSchemaConverter.convertToJsonArrowSchema( + new org.apache.arrow.vector.types.pojo.Schema(fields)); + } + + private org.apache.arrow.vector.types.pojo.Schema parseArrowIpcStream(byte[] stream) { + org.apache.arrow.vector.types.pojo.Schema schema; + try (BufferAllocator allocator = new RootAllocator(); ByteArrayInputStream bais = new ByteArrayInputStream(stream); ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) { - - org.apache.arrow.vector.types.pojo.Schema schema = reader.getVectorSchemaRoot().getSchema(); - List batches = new ArrayList<>(); - - while (reader.loadNextBatch()) { - VectorSchemaRoot root = reader.getVectorSchemaRoot(); - if (root.getRowCount() > 0) { - batches.add(root); - } - } - return new ArrowRecordBatchList(schema, batches); + schema = reader.getVectorSchemaRoot().getSchema(); } catch (Exception e) { throw new RuntimeException("Failed to parse Arrow IPC stream", e); } + + Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC stream"); + return schema; } - private List extractColumns(ArrowRecordBatchList recordBatchList) { + private List extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) { List columns = new ArrayList<>(); - org.apache.arrow.vector.types.pojo.Schema arrowSchema = recordBatchList.getSchema(); for (org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) { - columns.add(toGravitinoColumn(field)); + columns.add( + Column.of( + field.getName(), + CONVERTER.toGravitino(field), + null, + field.isNullable(), + false, + DEFAULT_VALUE_NOT_SET)); } return columns; } - - private Column toGravitinoColumn(Field field) { - return Column.of( - field.getName(), - toGravitinoType(field), - field.getMetadata().get("comment"), - field.isNullable(), - false, - DEFAULT_VALUE_NOT_SET); - } - - private ArrowType fromGravitinoType(Type type) { - switch (type.name()) { - case BOOLEAN: - return Bool.INSTANCE; - case BYTE: - return new Int(8, true); - case SHORT: - return new Int(16, true); - case INTEGER: - return new Int(32, true); - case LONG: - return new Int(64, true); - case FLOAT: - return new FloatingPoint(FloatingPointPrecision.SINGLE); - case DOUBLE: - return new FloatingPoint(FloatingPointPrecision.DOUBLE); - case DECIMAL: - // Lance uses FIXED_SIZE_BINARY for decimal types - return new ArrowType.FixedSizeBinary(16); // assuming 16 bytes for decimal - case DATE: - return new ArrowType.Date(DateUnit.DAY); - case TIME: - return new ArrowType.Time(TimeUnit.MILLISECOND, 32); - case TIMESTAMP: - return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); - case VARCHAR: - case STRING: - return new ArrowType.Utf8(); - case FIXED: - FixedType fixedType = (FixedType) type; - return new ArrowType.FixedSizeBinary(fixedType.length()); - case BINARY: - return new ArrowType.Binary(); - case UNPARSED: - String typeStr = ((UnparsedType) type).unparsedType().toString(); - try { - Type t = JsonUtils.anyFieldMapper().readValue(typeStr, Type.class); - if (t instanceof Types.ListType) { - return ArrowType.List.INSTANCE; - } else if (t instanceof Types.MapType) { - return new ArrowType.Map(false); - } else if (t instanceof Types.StructType) { - return ArrowType.Struct.INSTANCE; - } else { - throw new UnsupportedOperationException( - "Unsupported UnparsedType conversion: " + t.simpleString()); - } - } catch (Exception e) { - // FixedSizeListArray(integer, 3) - if (typeStr.startsWith("FixedSizeListArray")) { - int size = - Integer.parseInt( - typeStr.substring(typeStr.indexOf(',') + 1, typeStr.indexOf(')')).trim()); - return new ArrowType.FixedSizeList(size); - } - throw new UnsupportedOperationException("Failed to parse UnparsedType: " + typeStr, e); - } - default: - throw new UnsupportedOperationException("Unsupported Gravitino type: " + type.name()); - } - } - - private Type toGravitinoType(Field field) { - FieldType parentType = field.getFieldType(); - ArrowType arrowType = parentType.getType(); - if (arrowType instanceof Bool) { - return Types.BooleanType.get(); - } else if (arrowType instanceof Int) { - Int intType = (Int) arrowType; - switch (intType.getBitWidth()) { - case 8 -> { - return Types.ByteType.get(); - } - case 16 -> { - return Types.ShortType.get(); - } - case 32 -> { - return Types.IntegerType.get(); - } - case 64 -> { - return Types.LongType.get(); - } - default -> throw new UnsupportedOperationException( - "Unsupported Int bit width: " + intType.getBitWidth()); - } - } else if (arrowType instanceof FloatingPoint) { - FloatingPoint floatingPoint = (FloatingPoint) arrowType; - switch (floatingPoint.getPrecision()) { - case SINGLE: - return Types.FloatType.get(); - case DOUBLE: - return Types.DoubleType.get(); - default: - throw new UnsupportedOperationException( - "Unsupported FloatingPoint precision: " + floatingPoint.getPrecision()); - } - } else if (arrowType instanceof ArrowType.FixedSizeBinary) { - ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary) arrowType; - return Types.FixedType.of(fixedSizeBinary.getByteWidth()); - } else if (arrowType instanceof ArrowType.Date) { - return Types.DateType.get(); - } else if (arrowType instanceof ArrowType.Time) { - return Types.TimeType.get(); - } else if (arrowType instanceof ArrowType.Timestamp) { - return Types.TimestampType.withoutTimeZone(); - } else if (arrowType instanceof ArrowType.Utf8) { - return Types.StringType.get(); - } else if (arrowType instanceof ArrowType.Binary) { - return Types.BinaryType.get(); - } else { - return Types.UnparsedType.of(arrowType.toString()); - } - } } diff --git a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/LanceDataTypeConverter.java similarity index 61% rename from catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java rename to lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/LanceDataTypeConverter.java index 9cd5783de1b..52d52d38fbd 100644 --- a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/LanceDataTypeConverter.java @@ -17,9 +17,9 @@ * under the License. */ -package org.apache.gravitino.catalog.lakehouse.lance; +package org.apache.gravitino.lance.common.ops.gravitino; -import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import java.util.Arrays; @@ -37,14 +37,14 @@ import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.gravitino.connector.DataTypeConverter; -import org.apache.gravitino.json.JsonUtils; import org.apache.gravitino.rel.types.Type; import org.apache.gravitino.rel.types.Types; import org.apache.gravitino.rel.types.Types.FixedType; -public class LanceDataTypeConverter implements DataTypeConverter { +public class LanceDataTypeConverter implements DataTypeConverter { public static final LanceDataTypeConverter CONVERTER = new LanceDataTypeConverter(); + private static final ObjectMapper mapper = new ObjectMapper(); public Field toArrowField(String name, Type type, boolean nullable) { switch (type.name()) { @@ -115,8 +115,8 @@ public Field toArrowField(String name, Type type, boolean nullable) { Types.ExternalType externalType = (Types.ExternalType) type; Field field; try { - field = JsonUtils.anyFieldMapper().readValue(externalType.catalogString(), Field.class); - } catch (JsonProcessingException e) { + field = mapper.readValue(externalType.catalogString(), Field.class); + } catch (Exception e) { throw new RuntimeException( "Failed to parse external type catalog string: " + externalType.catalogString(), e); } @@ -202,9 +202,125 @@ public ArrowType fromGravitino(Type type) { } @Override - public Type toGravitino(ArrowType arrowType) { - // since the table metadata will load from Gravitino storage directly, we don't need to - // implement this method for now. - throw new UnsupportedOperationException("toGravitino is not implemented yet."); + public Type toGravitino(Field arrowField) { + FieldType fieldType = arrowField.getFieldType(); + switch (fieldType.getType().getTypeID()) { + case Map: + Field structField = arrowField.getChildren().get(0); + Type keyType = toGravitino(structField.getChildren().get(0)); + Type valueType = toGravitino(structField.getChildren().get(1)); + boolean valueNullable = structField.getChildren().get(1).isNullable(); + return Types.MapType.of(keyType, valueType, valueNullable); + + case List: + Type elementType = toGravitino(arrowField.getChildren().get(0)); + boolean containsNull = arrowField.getChildren().get(0).isNullable(); + return Types.ListType.of(elementType, containsNull); + + case Struct: + Types.StructType.Field[] fields = + arrowField.getChildren().stream() + .map( + child -> + Types.StructType.Field.of( + child.getName(), + toGravitino(child), + child.isNullable(), + null /*comment*/)) + .toArray(Types.StructType.Field[]::new); + return Types.StructType.of(fields); + + case Union: + List types = arrowField.getChildren().stream().map(this::toGravitino).toList(); + return Types.UnionType.of(types.toArray(new Type[0])); + + case Bool: + return Types.BooleanType.get(); + + case Int: + Int intType = (Int) fieldType.getType(); + switch (intType.getBitWidth()) { + case 8: + return intType.getIsSigned() ? Types.ByteType.get() : Types.ByteType.unsigned(); + case 8 * 2: + return intType.getIsSigned() ? Types.ShortType.get() : Types.ShortType.unsigned(); + case 8 * 4: + return intType.getIsSigned() ? Types.IntegerType.get() : Types.IntegerType.unsigned(); + case 8 * 8: + return intType.getIsSigned() ? Types.LongType.get() : Types.LongType.unsigned(); + } + break; + + case FloatingPoint: + FloatingPoint floatingPoint = (FloatingPoint) fieldType.getType(); + switch (floatingPoint.getPrecision()) { + case SINGLE: + return Types.FloatType.get(); + case DOUBLE: + return Types.DoubleType.get(); + default: + // fallthrough + } + break; + + case Utf8: + return Types.StringType.get(); + case Binary: + return Types.BinaryType.get(); + case Decimal: + ArrowType.Decimal decimalType = (ArrowType.Decimal) fieldType.getType(); + return Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale()); + case Date: + if (((ArrowType.Date) fieldType.getType()).getUnit() == DateUnit.DAY) { + return Types.DateType.get(); + } + break; + case Timestamp: + ArrowType.Timestamp timestampType = (ArrowType.Timestamp) fieldType.getType(); + int precision = + switch (timestampType.getUnit()) { + case SECOND -> 0; + case MILLISECOND -> 3; + case MICROSECOND -> 6; + case NANOSECOND -> 9; + }; + boolean hasTimeZone = timestampType.getTimezone() != null; + return hasTimeZone + ? Types.TimestampType.withTimeZone(precision) + : Types.TimestampType.withoutTimeZone(precision); + case Time: + ArrowType.Time timeType = (ArrowType.Time) fieldType.getType(); + if (timeType.getUnit() == TimeUnit.NANOSECOND && timeType.getBitWidth() == 8 * 8) { + return Types.TimeType.get(); + } + break; + case Null: + return Types.NullType.get(); + case Interval: + IntervalUnit intervalUnit = ((ArrowType.Interval) fieldType.getType()).getUnit(); + if (intervalUnit == IntervalUnit.YEAR_MONTH) { + return Types.IntervalYearType.get(); + } + break; + case Duration: + TimeUnit timeUnit = ((ArrowType.Duration) fieldType.getType()).getUnit(); + if (timeUnit == TimeUnit.MICROSECOND) { + return Types.IntervalDayType.get(); + } + break; + case FixedSizeBinary: + ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary) fieldType.getType(); + return Types.FixedType.of(fixedSizeBinary.getByteWidth()); + default: + // fallthrough + } + + String typeString; + try { + typeString = mapper.writeValueAsString(arrowField); + } catch (Exception e) { + throw new RuntimeException("Failed to serialize Arrow field to string.", e); + } + return Types.ExternalType.of(typeString); } } diff --git a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java deleted file mode 100644 index 71f1bfc587c..00000000000 --- a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.gravitino.lance.common; - -import java.io.File; -import java.io.FileOutputStream; -import java.util.Arrays; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.IntVector; -import org.apache.arrow.vector.VarCharVector; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.ipc.ArrowStreamWriter; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.Schema; -import org.apache.arrow.vector.util.Text; -import org.junit.jupiter.api.Test; - -public class TestArrowIPC { - - private static final String FILENAME = "/tmp/initial_data.arrow"; - private static final int RECORD_COUNT = 3; - - @Test - void testIPC() throws Exception { - try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { - - Schema schema = - new Schema( - Arrays.asList( - Field.nullable("id", new ArrowType.Int(32, true)), - Field.nullable("value", new ArrowType.Utf8()))); - - try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { - IntVector idVector = (IntVector) root.getVector("id"); - VarCharVector valueVector = (VarCharVector) root.getVector("value"); - - idVector.allocateNew(); - valueVector.allocateNew(); - - for (int i = 0; i < RECORD_COUNT; i++) { - idVector.setSafe(i, i + 1); - valueVector.setSafe(i, new Text("Row_" + (i + 1))); - } - - idVector.setValueCount(RECORD_COUNT); - valueVector.setValueCount(RECORD_COUNT); - root.setRowCount(RECORD_COUNT); - - File outFile = new File(FILENAME); - try (FileOutputStream fos = new FileOutputStream(outFile); - ArrowStreamWriter writer = new ArrowStreamWriter(root, null, fos)) { - - writer.start(); - writer.writeBatch(); - writer.end(); - } - - System.out.println( - "✅ Successfully generated Arrow IPC Stream file: " + outFile.getAbsolutePath()); - System.out.println("--- Ready for cURL test ---"); - } - } - } -} diff --git a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/TestLanceDataTypeConverter.java b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestLanceDataTypeConverter.java similarity index 69% rename from catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/TestLanceDataTypeConverter.java rename to lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestLanceDataTypeConverter.java index cf28ee74342..9908f8feffd 100644 --- a/catalogs/catalog-generic-lakehouse/src/test/java/org/apache/gravitino/catalog/lakehouse/lance/TestLanceDataTypeConverter.java +++ b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestLanceDataTypeConverter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.gravitino.catalog.lakehouse.lance; +package org.apache.gravitino.lance.common.ops.gravitino; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -24,13 +24,18 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.Arrays; +import java.util.Collections; import java.util.function.Consumer; import java.util.stream.Stream; import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.UnionMode; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.gravitino.rel.types.Type; import org.apache.gravitino.rel.types.Types; import org.junit.jupiter.api.DisplayName; @@ -270,10 +275,11 @@ void testUnsupportedTypeThrowsException() { assertThrows(UnsupportedOperationException.class, () -> CONVERTER.fromGravitino(unparsedType)); } - @Test - void testToGravitinoNotImplemented() { - assertThrows( - UnsupportedOperationException.class, () -> CONVERTER.toGravitino(ArrowType.Utf8.INSTANCE)); + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("toGravitinoArguments") + void testToGravitino(String testName, Field arrowField, Type expectedGravitinoType) { + Type convertedType = CONVERTER.toGravitino(arrowField); + assertEquals(expectedGravitinoType, convertedType); } private static Stream toArrowFieldArguments() { @@ -324,4 +330,144 @@ private static Stream toArrowFieldArguments() { true, UNION_VALIDATOR)); } + + private static Stream toGravitinoArguments() { + return Stream.of( + // Simple Types + Arguments.of( + "Boolean", + new Field("bool_col", new FieldType(true, ArrowType.Bool.INSTANCE, null), null), + Types.BooleanType.get()), + Arguments.of( + "Byte", + new Field("byte_col", new FieldType(true, new ArrowType.Int(8, true), null), null), + Types.ByteType.get()), + Arguments.of( + "Short", + new Field("short_col", new FieldType(true, new ArrowType.Int(16, true), null), null), + Types.ShortType.get()), + Arguments.of( + "Integer", + new Field("int_col", new FieldType(true, new ArrowType.Int(32, true), null), null), + Types.IntegerType.get()), + Arguments.of( + "Long", + new Field("long_col", new FieldType(true, new ArrowType.Int(64, true), null), null), + Types.LongType.get()), + Arguments.of( + "Float", + new Field( + "float_col", + new FieldType( + true, new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), null), + null), + Types.FloatType.get()), + Arguments.of( + "Double", + new Field( + "double_col", + new FieldType( + true, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), null), + null), + Types.DoubleType.get()), + Arguments.of( + "String", + new Field("string_col", new FieldType(true, ArrowType.Utf8.INSTANCE, null), null), + Types.StringType.get()), + Arguments.of( + "Binary", + new Field("binary_col", new FieldType(true, ArrowType.Binary.INSTANCE, null), null), + Types.BinaryType.get()), + Arguments.of( + "Decimal", + new Field( + "decimal_col", new FieldType(true, new ArrowType.Decimal(10, 2, 128), null), null), + Types.DecimalType.of(10, 2)), + Arguments.of( + "Date", + new Field( + "date_col", new FieldType(true, new ArrowType.Date(DateUnit.DAY), null), null), + Types.DateType.get()), + Arguments.of( + "Timestamp without TZ", + new Field( + "ts_col", + new FieldType(true, new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), null), + null), + Types.TimestampType.withoutTimeZone(6)), + Arguments.of( + "Timestamp with TZ", + new Field( + "tstz_col", + new FieldType(true, new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), null), + null), + Types.TimestampType.withTimeZone(3)), + Arguments.of( + "Time", + new Field( + "time_col", + new FieldType(true, new ArrowType.Time(TimeUnit.NANOSECOND, 64), null), + null), + Types.TimeType.get()), + Arguments.of( + "Fixed", + new Field( + "fixed_col", new FieldType(true, new ArrowType.FixedSizeBinary(16), null), null), + Types.FixedType.of(16)), + // Complex Types + Arguments.of( + "List", + new Field( + "list_col", + new FieldType(false, ArrowType.List.INSTANCE, null), + Collections.singletonList( + new Field( + "element", new FieldType(true, new ArrowType.Int(32, true), null), null))), + Types.ListType.of(Types.IntegerType.get(), true)), + Arguments.of( + "Map", + new Field( + "map_col", + new FieldType(true, new ArrowType.Map(false), null), + Collections.singletonList( + new Field( + MapVector.DATA_VECTOR_NAME, + new FieldType(false, ArrowType.Struct.INSTANCE, null), + Arrays.asList( + new Field( + MapVector.KEY_NAME, + new FieldType(false, ArrowType.Utf8.INSTANCE, null), + null), + new Field( + MapVector.VALUE_NAME, + new FieldType(true, new ArrowType.Int(32, true), null), + null))))), + Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), true)), + Arguments.of( + "Struct", + new Field( + "struct_col", + new FieldType(true, ArrowType.Struct.INSTANCE, null), + Arrays.asList( + new Field("id", new FieldType(false, new ArrowType.Int(64, true), null), null), + new Field("name", new FieldType(true, ArrowType.Utf8.INSTANCE, null), null))), + SIMPLE_STRUCT), + Arguments.of( + "Union", + new Field( + "union_col", + new FieldType(true, new ArrowType.Union(UnionMode.Sparse, new int[] {1, 2}), null), + Arrays.asList( + new Field( + "integer", new FieldType(true, new ArrowType.Int(32, true), null), null), + new Field("string", new FieldType(true, ArrowType.Utf8.INSTANCE, null), null))), + Types.UnionType.of(Types.IntegerType.get(), Types.StringType.get())), + // External Type + Arguments.of( + "External (LargeUtf8)", + new Field( + "external_col", new FieldType(true, ArrowType.LargeUtf8.INSTANCE, null), null), + Types.ExternalType.of( + "{\"name\":\"external_col\",\"nullable\":true,\"type\":{\"name\":\"largeutf8\"},\"children\":[]}"))); + } } diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java index 4c3b96ce9ad..1ae9637b202 100644 --- a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java @@ -167,7 +167,7 @@ public Response namespaceExists( @ResponseMetered(name = "list-tables", absolute = true) public Response listTables( @PathParam("id") String namespaceId, - @DefaultValue("$") @QueryParam("delimiter") String delimiter, + @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") String delimiter, @QueryParam("page_token") String pageToken, @QueryParam("limit") Integer limit) { try { diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java index dd10a31534a..690cb8759e2 100644 --- a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java @@ -18,6 +18,8 @@ */ package org.apache.gravitino.lance.service.rest; +import static org.apache.gravitino.lance.common.ops.NamespaceWrapper.NAMESPACE_DELIMITER_DEFAULT; + import com.codahale.metrics.annotation.ResponseMetered; import com.codahale.metrics.annotation.Timed; import com.fasterxml.jackson.core.type.TypeReference; @@ -59,7 +61,7 @@ public LanceTableOperations(NamespaceWrapper lanceNamespace) { @ResponseMetered(name = "describe-table", absolute = true) public Response describeTable( @PathParam("id") String tableId, - @DefaultValue("$") @QueryParam("delimiter") String delimiter) { + @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") String delimiter) { try { DescribeTableResponse response = lanceNamespace.asTableOps().describeTable(tableId, delimiter); @@ -78,14 +80,14 @@ public Response describeTable( public Response createTable( @PathParam("id") String tableId, @QueryParam("mode") @DefaultValue("create") String mode, // create, exist_ok, overwrite - @QueryParam("delimiter") @DefaultValue("$") String delimiter, + @QueryParam("delimiter") @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) String delimiter, @HeaderParam("x-lance-table-location") String tableLocation, @HeaderParam("x-lance-table-properties") String tableProperties, @HeaderParam("x-lance-root-catalog") String rootCatalog, byte[] arrowStreamBody) { try { Map props = - JsonUtil.mapper().readValue(tableProperties, new TypeReference>() {}); + JsonUtil.mapper().readValue(tableProperties, new TypeReference<>() {}); CreateTableResponse response = lanceNamespace .asTableOps() @@ -104,7 +106,7 @@ public Response createTable( public Response createEmptyTable( @PathParam("id") String tableId, @QueryParam("mode") @DefaultValue("create") String mode, // create, exist_ok, overwrite - @QueryParam("delimiter") @DefaultValue("$") String delimiter, + @QueryParam("delimiter") @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) String delimiter, @HeaderParam("x-lance-table-location") String tableLocation, @HeaderParam("x-lance-root-catalog") String rootCatalog, @HeaderParam("x-lance-table-properties") String tableProperties) { @@ -112,8 +114,7 @@ public Response createEmptyTable( Map props = StringUtils.isBlank(tableProperties) ? Map.of() - : JsonUtil.mapper() - .readValue(tableProperties, new TypeReference>() {}); + : JsonUtil.mapper().readValue(tableProperties, new TypeReference<>() {}); CreateTableResponse response = lanceNamespace .asTableOps() From 0cbd4b0d5238bd5d9a7e8d83e0e8d4b41d899d99 Mon Sep 17 00:00:00 2001 From: mchades Date: Thu, 30 Oct 2025 15:56:15 +0800 Subject: [PATCH 2/2] address comments --- .../GravitinoLanceNamespaceWrapper.java | 9 ++- .../TestGravitinoLanceNamespaceWrapper.java | 76 +++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java index 51a8151cd1e..d3ddbb0edee 100644 --- a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java @@ -23,6 +23,7 @@ import static org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter.CONVERTER; import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -90,6 +91,11 @@ public class GravitinoLanceNamespaceWrapper extends NamespaceWrapper private static final Logger LOG = LoggerFactory.getLogger(GravitinoLanceNamespaceWrapper.class); private GravitinoClient client; + @VisibleForTesting + GravitinoLanceNamespaceWrapper() { + super(null); + } + public GravitinoLanceNamespaceWrapper(LanceConfig config) { super(config); } @@ -618,7 +624,8 @@ private JsonArrowSchema toJsonArrowSchema(Column[] columns) { new org.apache.arrow.vector.types.pojo.Schema(fields)); } - private org.apache.arrow.vector.types.pojo.Schema parseArrowIpcStream(byte[] stream) { + @VisibleForTesting + org.apache.arrow.vector.types.pojo.Schema parseArrowIpcStream(byte[] stream) { org.apache.arrow.vector.types.pojo.Schema schema; try (BufferAllocator allocator = new RootAllocator(); diff --git a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java new file mode 100644 index 00000000000..b0ddb980ab0 --- /dev/null +++ b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.lance.common.ops.gravitino; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.util.Arrays; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestGravitinoLanceNamespaceWrapper { + + @Test + public void testParseArrowIpcStream() throws Exception { + Schema schema = + new Schema( + Arrays.asList( + Field.nullable("id", new ArrowType.Int(32, true)), + Field.nullable("value", new ArrowType.Utf8()))); + + GravitinoLanceNamespaceWrapper wrapper = new GravitinoLanceNamespaceWrapper(); + byte[] ipcStream = generateIpcStream(schema); + Schema parsedSchema = wrapper.parseArrowIpcStream(ipcStream); + + Assertions.assertEquals(schema, parsedSchema); + } + + private byte[] generateIpcStream(Schema arrowSchema) throws IOException { + try (BufferAllocator allocator = new RootAllocator()) { + + // Create an empty VectorSchemaRoot with the schema + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + // Allocate empty vectors (0 rows) + root.allocateNew(); + root.setRowCount(0); + + // Write to IPC stream + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (ArrowStreamWriter writer = + new ArrowStreamWriter(root, null, Channels.newChannel(outputStream))) { + writer.start(); + writer.writeBatch(); + writer.end(); + } + + return outputStream.toByteArray(); + } + } catch (Exception e) { + throw new IOException("Failed to create empty Arrow IPC stream: " + e.getMessage(), e); + } + } +}