Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,66 @@
*/
package org.apache.gravitino.lance.common.ops;

import com.lancedb.lance.namespace.model.CreateTableRequest;
import com.lancedb.lance.namespace.model.CreateTableResponse;
import com.lancedb.lance.namespace.model.DeregisterTableResponse;
import com.lancedb.lance.namespace.model.DescribeTableResponse;
import com.lancedb.lance.namespace.model.RegisterTableRequest;
import com.lancedb.lance.namespace.model.RegisterTableResponse;
import java.util.Map;

public interface LanceTableOperations {

DescribeTableResponse describeTable(String tableId, String delimiter);
/**
* Describe the details of a table.
*
* @param tableId table ids are in the format of "namespace/delimiter/table_name"
Comment thread
mchades marked this conversation as resolved.
Outdated
* @param delimiter the delimiter used in the namespace
* @param version the version of the table to describe, if null, describe the latest version
* @return the table description
*/
DescribeTableResponse describeTable(String tableId, String delimiter, Long version);
Comment thread
mchades marked this conversation as resolved.
Outdated

/**
* Create a new table.
*
* @param tableId table ids are in the format of "namespace/delimiter/table_name"
Comment thread
yuqi1129 marked this conversation as resolved.
Outdated
* @param mode it can be CREATE, OVERWRITE, or EXIST_OK
* @param delimiter the delimiter used in the namespace
* @param tableLocation the location where the table data will be stored
* @param tableProperties the properties of the table
* @param arrowStreamBody the arrow stream bytes containing the schema and data
* @return the response of the create table operation
*/
CreateTableResponse createTable(
String tableId,
String mode,
CreateTableRequest.ModeEnum mode,
String delimiter,
String tableLocation,
Map<String, String> tableProperties,
byte[] arrowStreamBody);

/**
* Register an existing table.
*
* @param tableId table ids are in the format of "namespace/delimiter/table_name"
Comment thread
yuqi1129 marked this conversation as resolved.
Outdated
* @param mode it can be REGISTER or OVERWRITE.
* @param delimiter the delimiter used in the namespace
* @param tableProperties the properties of the table, it should contain the table location
* @return
Comment thread
yuqi1129 marked this conversation as resolved.
Outdated
*/
RegisterTableResponse registerTable(
String tableId, String mode, String delimiter, Map<String, String> tableProperties);
String tableId,
RegisterTableRequest.ModeEnum mode,
String delimiter,
Map<String, String> tableProperties);

/**
* Deregister a table. It will not delete the underlying lance data.
*
* @param tableId table ids are in the format of "namespace/delimiter/table_name"
Comment thread
yuqi1129 marked this conversation as resolved.
Outdated
* @param delimiter the delimiter used in the namespace
* @return the response of the deregister table operation
*/
DeregisterTableResponse deregisterTable(String tableId, String delimiter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import com.lancedb.lance.namespace.LanceNamespaceException;
import com.lancedb.lance.namespace.ObjectIdentifier;
import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
import com.lancedb.lance.namespace.model.CreateNamespaceRequest.ModeEnum;
import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
import com.lancedb.lance.namespace.model.CreateTableRequest;
import com.lancedb.lance.namespace.model.CreateTableResponse;
import com.lancedb.lance.namespace.model.DeregisterTableResponse;
import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
Expand All @@ -48,7 +48,6 @@
import com.lancedb.lance.namespace.util.CommonUtil;
import com.lancedb.lance.namespace.util.JsonArrowSchemaConverter;
import com.lancedb.lance.namespace.util.PageUtil;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -61,9 +60,6 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
Expand All @@ -84,6 +80,7 @@
import org.apache.gravitino.lance.common.ops.LanceNamespaceOperations;
import org.apache.gravitino.lance.common.ops.LanceTableOperations;
import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
import org.apache.gravitino.lance.common.utils.ArrowUtils;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.slf4j.Logger;
Expand Down Expand Up @@ -525,7 +522,8 @@ public ListTablesResponse listTables(
}

@Override
public DescribeTableResponse describeTable(String tableId, String delimiter) {
public DescribeTableResponse describeTable(String tableId, String delimiter, Long version) {
// TODO Currently we do not support versioned table description.
Comment thread
mchades marked this conversation as resolved.
Outdated
ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter));
Preconditions.checkArgument(
nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels());
Expand All @@ -546,7 +544,7 @@ public DescribeTableResponse describeTable(String tableId, String delimiter) {
@Override
public CreateTableResponse createTable(
String tableId,
String mode,
CreateTableRequest.ModeEnum mode,
String delimiter,
String tableLocation,
Map<String, String> tableProperties,
Expand All @@ -558,7 +556,8 @@ public CreateTableResponse createTable(
// Parser column information.
List<Column> columns = Lists.newArrayList();
if (arrowStreamBody != null) {
org.apache.arrow.vector.types.pojo.Schema schema = parseArrowIpcStream(arrowStreamBody);
org.apache.arrow.vector.types.pojo.Schema schema =
ArrowUtils.parseArrowIpcStream(arrowStreamBody);
columns = extractColumns(schema);
}

Expand All @@ -570,11 +569,10 @@ public CreateTableResponse createTable(

Map<String, String> createTableProperties = Maps.newHashMap(tableProperties);
createTableProperties.put("location", tableLocation);
createTableProperties.put("mode", mode);
// TODO considering the mode (create, exist_ok, overwrite)
createTableProperties.put("mode", mode.getValue());
Comment thread
yuqi1129 marked this conversation as resolved.
Outdated
createTableProperties.put("format", "lance");

ModeEnum createMode = ModeEnum.fromValue(mode.toLowerCase());
switch (createMode) {
switch (mode) {
case EXIST_OK:
if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
CreateTableResponse response = new CreateTableResponse();
Expand Down Expand Up @@ -607,10 +605,7 @@ public CreateTableResponse createTable(
catalog
.asTableCatalog()
.createTable(
tableIdentifier,
columns.toArray(new Column[0]),
tableLocation,
createTableProperties);
tableIdentifier, columns.toArray(new Column[0]), null, createTableProperties);

CreateTableResponse response = new CreateTableResponse();
response.setProperties(t.properties());
Expand All @@ -621,7 +616,10 @@ public CreateTableResponse createTable(

@Override
public RegisterTableResponse registerTable(
String tableId, String mode, String delimiter, Map<String, String> tableProperties) {
String tableId,
RegisterTableRequest.ModeEnum mode,
String delimiter,
Map<String, String> tableProperties) {
ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter));
Preconditions.checkArgument(
nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels());
Expand All @@ -632,9 +630,7 @@ public RegisterTableResponse registerTable(
NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));

// TODO Support real register API
RegisterTableRequest.ModeEnum createMode =
RegisterTableRequest.ModeEnum.fromValue(mode.toUpperCase());
if (createMode == RegisterTableRequest.ModeEnum.CREATE
if (mode == RegisterTableRequest.ModeEnum.CREATE
&& catalog.asTableCatalog().tableExists(tableIdentifier)) {
throw LanceNamespaceException.conflict(
"Table already exists: " + tableId,
Expand All @@ -643,14 +639,16 @@ public RegisterTableResponse registerTable(
CommonUtil.formatCurrentStackTrace());
}

if (createMode == RegisterTableRequest.ModeEnum.OVERWRITE
if (mode == RegisterTableRequest.ModeEnum.OVERWRITE
&& catalog.asTableCatalog().tableExists(tableIdentifier)) {
LOG.info("Overwriting existing table: {}", tableId);
catalog.asTableCatalog().dropTable(tableIdentifier);
catalog.asTableCatalog().purgeTable(tableIdentifier);
Comment thread
yuqi1129 marked this conversation as resolved.
Outdated
}

Table t =
catalog.asTableCatalog().createTable(tableIdentifier, new Column[] {}, "", tableProperties);
catalog
.asTableCatalog()
.createTable(tableIdentifier, new Column[] {}, null, tableProperties);

RegisterTableResponse response = new RegisterTableResponse();
response.setProperties(t.properties());
Expand All @@ -660,7 +658,6 @@ public RegisterTableResponse registerTable(

@Override
public DeregisterTableResponse deregisterTable(String tableId, String delimiter) {

ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter));
Preconditions.checkArgument(
nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels());
Expand All @@ -673,7 +670,14 @@ public DeregisterTableResponse deregisterTable(String tableId, String delimiter)
Table t = catalog.asTableCatalog().loadTable(tableIdentifier);
Map<String, String> properties = t.properties();
// TODO Support real deregister API.
catalog.asTableCatalog().purgeTable(tableIdentifier);
boolean result = catalog.asTableCatalog().purgeTable(tableIdentifier);
Comment thread
yuqi1129 marked this conversation as resolved.
Outdated
if (!result) {
throw LanceNamespaceException.notFound(
"Table not found: " + tableId,
NoSuchSchemaException.class.getSimpleName(),
Comment thread
yuqi1129 marked this conversation as resolved.
Outdated
tableId,
CommonUtil.formatCurrentStackTrace());
}

DeregisterTableResponse response = new DeregisterTableResponse();
response.setProperties(properties);
Expand All @@ -691,21 +695,6 @@ private JsonArrowSchema toJsonArrowSchema(Column[] columns) {
new org.apache.arrow.vector.types.pojo.Schema(fields));
}

@VisibleForTesting
org.apache.arrow.vector.types.pojo.Schema parseArrowIpcStream(byte[] stream) {
org.apache.arrow.vector.types.pojo.Schema schema;
try (BufferAllocator allocator = new RootAllocator();
ByteArrayInputStream bais = new ByteArrayInputStream(stream);
ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
schema = reader.getVectorSchemaRoot().getSchema();
} catch (Exception e) {
throw new RuntimeException("Failed to parse Arrow IPC stream", e);
}

Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC stream");
return schema;
}

private List<Column> extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
List<Column> columns = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.lance.common.utils;

import com.google.common.base.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Schema;

public class ArrowUtils {
public static byte[] generateIpcStream(Schema arrowSchema) throws IOException {
try (BufferAllocator allocator = new RootAllocator()) {

// Create an empty VectorSchemaRoot with the schema
try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) {
// Allocate empty vectors (0 rows)
root.allocateNew();
root.setRowCount(0);

// Write to IPC stream
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (ArrowStreamWriter writer =
new ArrowStreamWriter(root, null, Channels.newChannel(outputStream))) {
writer.start();
writer.writeBatch();
writer.end();
}

return outputStream.toByteArray();
}
} catch (Exception e) {
throw new IOException("Failed to create empty Arrow IPC stream: " + e.getMessage(), e);
}
}

public static Schema parseArrowIpcStream(byte[] stream) {
Schema schema;
try (BufferAllocator allocator = new RootAllocator();
ByteArrayInputStream bais = new ByteArrayInputStream(stream);
ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
schema = reader.getVectorSchemaRoot().getSchema();
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse Arrow IPC stream", e);
}

Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC stream");
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,11 @@
*/
package org.apache.gravitino.lance.common.ops.gravitino;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.Arrays;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.gravitino.lance.common.utils.ArrowUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -41,36 +35,9 @@ public void testParseArrowIpcStream() throws Exception {
Arrays.asList(
Field.nullable("id", new ArrowType.Int(32, true)),
Field.nullable("value", new ArrowType.Utf8())));

GravitinoLanceNamespaceWrapper wrapper = new GravitinoLanceNamespaceWrapper();
byte[] ipcStream = generateIpcStream(schema);
Schema parsedSchema = wrapper.parseArrowIpcStream(ipcStream);
byte[] ipcStream = ArrowUtils.generateIpcStream(schema);
Schema parsedSchema = ArrowUtils.parseArrowIpcStream(ipcStream);

Assertions.assertEquals(schema, parsedSchema);
}

private byte[] generateIpcStream(Schema arrowSchema) throws IOException {
try (BufferAllocator allocator = new RootAllocator()) {

// Create an empty VectorSchemaRoot with the schema
try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) {
// Allocate empty vectors (0 rows)
root.allocateNew();
root.setRowCount(0);

// Write to IPC stream
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (ArrowStreamWriter writer =
new ArrowStreamWriter(root, null, Channels.newChannel(outputStream))) {
writer.start();
writer.writeBatch();
writer.end();
}

return outputStream.toByteArray();
}
} catch (Exception e) {
throw new IOException("Failed to create empty Arrow IPC stream: " + e.getMessage(), e);
}
}
}
Loading