Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public void start() throws Exception {
this.port = jettyServerConfig.getHttpPort();
String URI = String.format("http://%s:%d", host, port);

// Add for lance rest service uri
if (!context.ignoreAuxRestService) {
properties.put("gravitino.lance-rest.gravitino.uri", URI);
serverConfig.loadFromProperties(properties);
ITUtils.overwriteConfigFile(
ITUtils.joinPath(mockConfDir.getAbsolutePath(), "gravitino.conf"), properties);
}

List<String> authenticators = new ArrayList<>();
String authenticatorStr = context.customConfig.get(Configs.AUTHENTICATORS.getKey());
if (authenticatorStr != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void registerCustomConfigs(Map<String, String> configs) {
customConfigs.putAll(configs);
}

private void rewriteGravitinoServerConfig() throws IOException {
protected void rewriteGravitinoServerConfig() throws IOException {
String gravitinoHome = System.getenv("GRAVITINO_HOME");
Path configPath = Paths.get(gravitinoHome, "conf", GravitinoServer.CONF_FILE);
if (originConfig == null) {
Expand Down Expand Up @@ -421,7 +421,7 @@ protected String readGitCommitIdFromGitFile() {
}
}

private static boolean isDeploy() {
public static boolean isDeploy() {
String mode =
System.getProperty(ITUtils.TEST_MODE) == null
? ITUtils.EMBEDDED_TEST_MODE
Expand Down
13 changes: 13 additions & 0 deletions lance/lance-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,16 @@ dependencies {
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
}

val testJar by tasks.registering(Jar::class) {
archiveClassifier.set("tests")
from(sourceSets["test"].output)
}

configurations {
create("testArtifacts")
}

artifacts {
add("testArtifacts", testJar)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,66 @@
*/
package org.apache.gravitino.lance.common.ops;

import com.lancedb.lance.namespace.model.CreateTableRequest;
import com.lancedb.lance.namespace.model.CreateTableResponse;
import com.lancedb.lance.namespace.model.DeregisterTableResponse;
import com.lancedb.lance.namespace.model.DescribeTableResponse;
import com.lancedb.lance.namespace.model.RegisterTableRequest;
import com.lancedb.lance.namespace.model.RegisterTableResponse;
import java.util.Map;

public interface LanceTableOperations {

DescribeTableResponse describeTable(String tableId, String delimiter);
/**
* Describe the details of a table.
*
* @param tableId table ids are in the format of "namespace/delimiter/table_name"
* @param delimiter the delimiter used in the namespace
* @param version the version of the table to describe, if null, describe the latest version
* @return the table description
*/
DescribeTableResponse describeTable(String tableId, String delimiter, Long version);

/**
* Create a new table.
*
* @param tableId table ids are in the format of "namespace/delimiter/table_name"
* @param mode it can be CREATE, OVERWRITE, or EXIST_OK
* @param delimiter the delimiter used in the namespace
* @param tableLocation the location where the table data will be stored
* @param tableProperties the properties of the table
* @param arrowStreamBody the arrow stream bytes containing the schema and data
* @return the response of the create table operation
*/
CreateTableResponse createTable(
String tableId,
String mode,
CreateTableRequest.ModeEnum mode,
String delimiter,
String tableLocation,
Map<String, String> tableProperties,
byte[] arrowStreamBody);

/**
* Register an existing table.
*
* @param tableId table ids are in the format of "namespace/delimiter/table_name"
* @param mode it can be REGISTER or OVERWRITE.
* @param delimiter the delimiter used in the namespace
* @param tableProperties the properties of the table, it should contain the table location
* @return
*/
RegisterTableResponse registerTable(
String tableId, String mode, String delimiter, Map<String, String> tableProperties);
String tableId,
RegisterTableRequest.ModeEnum mode,
String delimiter,
Map<String, String> tableProperties);

/**
* Deregister a table. It will not delete the underlying lance data.
*
* @param tableId table ids are in the format of "namespace/delimiter/table_name"
* @param delimiter the delimiter used in the namespace
* @return the response of the deregister table operation
*/
DeregisterTableResponse deregisterTable(String tableId, String delimiter);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import com.lancedb.lance.namespace.LanceNamespaceException;
import com.lancedb.lance.namespace.ObjectIdentifier;
import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
import com.lancedb.lance.namespace.model.CreateNamespaceRequest.ModeEnum;
import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
import com.lancedb.lance.namespace.model.CreateTableRequest;
import com.lancedb.lance.namespace.model.CreateTableResponse;
import com.lancedb.lance.namespace.model.DeregisterTableResponse;
import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
Expand All @@ -48,7 +48,6 @@
import com.lancedb.lance.namespace.util.CommonUtil;
import com.lancedb.lance.namespace.util.JsonArrowSchemaConverter;
import com.lancedb.lance.namespace.util.PageUtil;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -61,9 +60,6 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
Expand All @@ -83,6 +79,7 @@
import org.apache.gravitino.lance.common.ops.LanceNamespaceOperations;
import org.apache.gravitino.lance.common.ops.LanceTableOperations;
import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
import org.apache.gravitino.lance.common.utils.ArrowUtils;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.slf4j.Logger;
Expand Down Expand Up @@ -516,7 +513,8 @@ public ListTablesResponse listTables(
}

@Override
public DescribeTableResponse describeTable(String tableId, String delimiter) {
public DescribeTableResponse describeTable(String tableId, String delimiter, Long version) {
// TODO Currently we do not support versioned table description.
ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter));
Preconditions.checkArgument(
nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels());
Expand All @@ -537,7 +535,7 @@ public DescribeTableResponse describeTable(String tableId, String delimiter) {
@Override
public CreateTableResponse createTable(
String tableId,
String mode,
CreateTableRequest.ModeEnum mode,
String delimiter,
String tableLocation,
Map<String, String> tableProperties,
Expand All @@ -549,7 +547,8 @@ public CreateTableResponse createTable(
// Parser column information.
List<Column> columns = Lists.newArrayList();
if (arrowStreamBody != null) {
org.apache.arrow.vector.types.pojo.Schema schema = parseArrowIpcStream(arrowStreamBody);
org.apache.arrow.vector.types.pojo.Schema schema =
ArrowUtils.parseArrowIpcStream(arrowStreamBody);
columns = extractColumns(schema);
}

Expand All @@ -561,11 +560,10 @@ public CreateTableResponse createTable(

Map<String, String> createTableProperties = Maps.newHashMap(tableProperties);
createTableProperties.put("location", tableLocation);
createTableProperties.put("mode", mode);
// TODO considering the mode (create, exist_ok, overwrite)
createTableProperties.put("mode", mode.getValue());
createTableProperties.put("format", "lance");

ModeEnum createMode = ModeEnum.fromValue(mode.toLowerCase());
switch (createMode) {
switch (mode) {
case EXIST_OK:
if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
CreateTableResponse response = new CreateTableResponse();
Expand Down Expand Up @@ -598,10 +596,7 @@ public CreateTableResponse createTable(
catalog
.asTableCatalog()
.createTable(
tableIdentifier,
columns.toArray(new Column[0]),
tableLocation,
createTableProperties);
tableIdentifier, columns.toArray(new Column[0]), null, createTableProperties);

CreateTableResponse response = new CreateTableResponse();
response.setProperties(t.properties());
Expand All @@ -612,7 +607,10 @@ public CreateTableResponse createTable(

@Override
public RegisterTableResponse registerTable(
String tableId, String mode, String delimiter, Map<String, String> tableProperties) {
String tableId,
RegisterTableRequest.ModeEnum mode,
String delimiter,
Map<String, String> tableProperties) {
ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter));
Preconditions.checkArgument(
nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels());
Expand All @@ -623,9 +621,7 @@ public RegisterTableResponse registerTable(
NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));

// TODO Support real register API
RegisterTableRequest.ModeEnum createMode =
RegisterTableRequest.ModeEnum.fromValue(mode.toUpperCase());
if (createMode == RegisterTableRequest.ModeEnum.CREATE
if (mode == RegisterTableRequest.ModeEnum.CREATE
&& catalog.asTableCatalog().tableExists(tableIdentifier)) {
throw LanceNamespaceException.conflict(
"Table already exists: " + tableId,
Expand All @@ -634,14 +630,16 @@ public RegisterTableResponse registerTable(
CommonUtil.formatCurrentStackTrace());
}

if (createMode == RegisterTableRequest.ModeEnum.OVERWRITE
if (mode == RegisterTableRequest.ModeEnum.OVERWRITE
&& catalog.asTableCatalog().tableExists(tableIdentifier)) {
LOG.info("Overwriting existing table: {}", tableId);
catalog.asTableCatalog().dropTable(tableIdentifier);
catalog.asTableCatalog().purgeTable(tableIdentifier);
}

Table t =
catalog.asTableCatalog().createTable(tableIdentifier, new Column[] {}, "", tableProperties);
catalog
.asTableCatalog()
.createTable(tableIdentifier, new Column[] {}, null, tableProperties);

RegisterTableResponse response = new RegisterTableResponse();
response.setProperties(t.properties());
Expand All @@ -651,7 +649,6 @@ public RegisterTableResponse registerTable(

@Override
public DeregisterTableResponse deregisterTable(String tableId, String delimiter) {

ObjectIdentifier nsId = ObjectIdentifier.of(tableId, Pattern.quote(delimiter));
Preconditions.checkArgument(
nsId.levels() == 3, "Expected at 3-level namespace but got: %s", nsId.levels());
Expand All @@ -664,7 +661,14 @@ public DeregisterTableResponse deregisterTable(String tableId, String delimiter)
Table t = catalog.asTableCatalog().loadTable(tableIdentifier);
Map<String, String> properties = t.properties();
// TODO Support real deregister API.
catalog.asTableCatalog().purgeTable(tableIdentifier);
boolean result = catalog.asTableCatalog().purgeTable(tableIdentifier);
if (!result) {
throw LanceNamespaceException.notFound(
"Table not found: " + tableId,
NoSuchSchemaException.class.getSimpleName(),
tableId,
CommonUtil.formatCurrentStackTrace());
}

DeregisterTableResponse response = new DeregisterTableResponse();
response.setProperties(properties);
Expand All @@ -682,24 +686,8 @@ private JsonArrowSchema toJsonArrowSchema(Column[] columns) {
new org.apache.arrow.vector.types.pojo.Schema(fields));
}

@VisibleForTesting
org.apache.arrow.vector.types.pojo.Schema parseArrowIpcStream(byte[] stream) {
org.apache.arrow.vector.types.pojo.Schema schema;
try (BufferAllocator allocator = new RootAllocator();
ByteArrayInputStream bais = new ByteArrayInputStream(stream);
ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
schema = reader.getVectorSchemaRoot().getSchema();
} catch (Exception e) {
throw new RuntimeException("Failed to parse Arrow IPC stream", e);
}

Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC stream");
return schema;
}

private List<Column> extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
List<Column> columns = new ArrayList<>();

for (org.apache.arrow.vector.types.pojo.Field field : arrowSchema.getFields()) {
columns.add(
Column.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.lance.common.utils;

import com.google.common.base.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Schema;

public class ArrowUtils {
public static byte[] generateIpcStream(Schema arrowSchema) throws IOException {
try (BufferAllocator allocator = new RootAllocator()) {

// Create an empty VectorSchemaRoot with the schema
try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) {
// Allocate empty vectors (0 rows)
root.allocateNew();
root.setRowCount(0);

// Write to IPC stream
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (ArrowStreamWriter writer =
new ArrowStreamWriter(root, null, Channels.newChannel(outputStream))) {
writer.start();
writer.writeBatch();
writer.end();
}

return outputStream.toByteArray();
}
} catch (Exception e) {
throw new IOException("Failed to create empty Arrow IPC stream: " + e.getMessage(), e);
}
}

public static org.apache.arrow.vector.types.pojo.Schema parseArrowIpcStream(byte[] stream) {
org.apache.arrow.vector.types.pojo.Schema schema;
try (BufferAllocator allocator = new RootAllocator();
ByteArrayInputStream bais = new ByteArrayInputStream(stream);
ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
schema = reader.getVectorSchemaRoot().getSchema();
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse Arrow IPC stream", e);
}

Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC stream");
return schema;
}
}
Loading