diff --git a/bin/gravitino-lance-rest-server.sh.template b/bin/gravitino-lance-rest-server.sh.template new file mode 100644 index 00000000000..17f098903e7 --- /dev/null +++ b/bin/gravitino-lance-rest-server.sh.template @@ -0,0 +1,206 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#set -ex +USAGE="-e Usage: bin/gravitino-lance-rest-server.sh [--config ]\n\t + {start|run|stop|restart|status}" + +if [[ "$1" == "--config" ]]; then + shift + conf_dir="$1" + if [[ ! -d "${conf_dir}" ]]; then + echo "ERROR : ${conf_dir} is not a directory" + echo ${USAGE} + exit 1 + else + export GRAVITINO_CONF_DIR="${conf_dir}" + fi + shift +fi + +bin="$(dirname "${BASH_SOURCE-$0}")" +bin="$(cd "${bin}">/dev/null; pwd)" + +. "${bin}/common.sh" + +check_java_version + +function check_process_status() { + local pid=$(found_lance_rest_server_pid) + + if [[ -z "${pid}" ]]; then + echo "GravitinoLanceRESTServer is not running" + else + printArt + echo "GravitinoLanceRESTServer is running[PID:$pid]" + fi +} + +function found_lance_rest_server_pid() { + process_name='GravitinoLanceRESTServer'; + RUNNING_PIDS=$(ps x | grep ${process_name} | grep -v grep | awk '{print $1}'); + + if [[ -z "${RUNNING_PIDS}" ]]; then + return + fi + + if ! kill -0 ${RUNNING_PIDS} > /dev/null 2>&1; then + echo "GravitinoLanceRESTServer running but process is dead" + fi + + echo "${RUNNING_PIDS}" +} + +function wait_for_lance_rest_server_to_die() { + timeout=10 + timeoutTime=$(date "+%s") + let "timeoutTime+=$timeout" + currentTime=$(date "+%s") + forceKill=1 + + while [[ $currentTime -lt $timeoutTime ]]; do + local pid=$(found_lance_rest_server_pid) + if [[ -z "${pid}" ]]; then + forceKill=0 + break + fi + + kill ${pid} > /dev/null 2> /dev/null + if kill -0 ${pid} > /dev/null 2>&1; then + sleep 3 + else + forceKill=0 + break + fi + currentTime=$(date "+%s") + done + + if [[ $forceKill -ne 0 ]]; then + kill -9 ${pid} > /dev/null 2> /dev/null + fi +} + +function start() { + local pid=$(found_lance_rest_server_pid) + + if [[ ! -z "${pid}" ]]; then + if kill -0 ${pid} >/dev/null 2>&1; then + echo "GravitinoLanceRESTServer is already running" + return 0; + fi + fi + + if [[ ! -d "${GRAVITINO_LOG_DIR}" ]]; then + echo "Log dir doesn't exist, create ${GRAVITINO_LOG_DIR}" + mkdir -p "${GRAVITINO_LOG_DIR}" + fi + + nohup ${JAVA_RUNNER} ${JAVA_OPTS} ${GRAVITINO_DEBUG_OPTS} -cp ${GRAVITINO_CLASSPATH} ${GRAVITINO_SERVER_NAME} >> "${GRAVITINO_OUTFILE}" 2>&1 & + + pid=$! + if [[ -z "${pid}" ]]; then + echo "GravitinoLanceRESTServer start error!" + return 1; + fi + + sleep 2 + check_process_status +} + +function run() { + ${JAVA_RUNNER} ${JAVA_OPTS} ${GRAVITINO_DEBUG_OPTS} -cp ${GRAVITINO_CLASSPATH} ${GRAVITINO_SERVER_NAME} +} + +function stop() { + local pid + + pid=$(found_lance_rest_server_pid) + + if [[ -z "${pid}" ]]; then + echo "GravitinoLanceRESTServer is not running" + else + wait_for_lance_rest_server_to_die + echo "GravitinoLanceRESTServer stop" + fi +} + +HOSTNAME=$(hostname) +GRAVITINO_OUTFILE="${GRAVITINO_LOG_DIR}/gravitino-lance-rest-server.out" +GRAVITINO_SERVER_NAME=org.apache.gravitino.lance.server.GravitinoLanceRESTServer +GRAVITINO_SIMPLE_SERVER_NAME=gravitino-lance-rest-server + +JAVA_OPTS+=" -Dfile.encoding=UTF-8" +JAVA_OPTS+=" -Dlog4j2.configurationFile=file://${GRAVITINO_CONF_DIR}/log4j2.properties" +JAVA_OPTS+=" -Dgravitino.log.path=${GRAVITINO_LOG_DIR} ${GRAVITINO_MEM}" +JAVA_OPTS+=" -Dgravitino.server.name=${GRAVITINO_SIMPLE_SERVER_NAME}" +if [ "$JVM_VERSION" -eq 17 ]; then + JAVA_OPTS+=" -XX:+IgnoreUnrecognizedVMOptions" + JAVA_OPTS+=" --add-opens java.base/java.io=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.lang.invoke=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.lang.reflect=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.lang=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.math=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.net=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.nio=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.text=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.time=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.util.concurrent=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.util.regex=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.util=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/jdk.internal.ref=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/jdk.internal.reflect=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.sql/java.sql=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/sun.util.calendar=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/sun.nio.ch=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/sun.nio.cs=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/sun.security.action=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/sun.util.calendar=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.security.jgss/sun.security.krb5=ALL-UNNAMED" +fi + +#JAVA_OPTS+=" -Djava.security.krb5.conf=/etc/krb5.conf" + +if [ -d "${GRAVITINO_HOME}/lance-rest-server/libs" ]; then + addJarInDir "${GRAVITINO_HOME}/lance-rest-server/libs" + addDirToClasspath "${GRAVITINO_HOME}/lance-rest-server/conf" +else + addJarInDir "${GRAVITINO_HOME}/libs" +fi + +case "${1}" in + start) + start + ;; + run) + run + ;; + stop) + stop + ;; + restart) + stop + start + ;; + status) + check_process_status + ;; + *) + echo ${USAGE} +esac diff --git a/build.gradle.kts b/build.gradle.kts index 8ad42871680..62c6d9a6dd9 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -683,6 +683,7 @@ tasks { "copyCliLib", ":authorizations:copyLibAndConfig", ":iceberg:iceberg-rest-server:copyLibAndConfigs", + ":lance:lance-rest-server:copyLibAndConfigs", ":web:web:build" ) @@ -772,6 +773,51 @@ tasks { } } + val compileLanceRESTServer by registering { + dependsOn("lance:lance-rest-server:copyLibAndConfigsToStandalonePackage") + group = "gravitino distribution" + outputs.dir(projectDir.dir("distribution/${rootProject.name}-lance-rest-server")) + doLast { + copy { + from(projectDir.dir("conf")) { + include( + "${rootProject.name}-lance-rest-server.conf.template", + "${rootProject.name}-env.sh.template", + "log4j2.properties.template" + ) + into("${rootProject.name}-lance-rest-server/conf") + } + from(projectDir.dir("bin")) { + include("common.sh.template", "${rootProject.name}-lance-rest-server.sh.template") + into("${rootProject.name}-lance-rest-server/bin") + } + into(outputDir) + rename { fileName -> + fileName.replace(".template", "") + } + eachFile { + if (name == "gravitino-env.sh") { + filter { line -> + line.replace("GRAVITINO_VERSION_PLACEHOLDER", "$version") + } + } + } + fileMode = 0b111101101 + } + + copy { + from(projectDir.dir("licenses")) { into("${rootProject.name}-lance-rest-server/licenses") } + from(projectDir.file("LICENSE.rest")) { into("${rootProject.name}-lance-rest-server") } + from(projectDir.file("NOTICE.rest")) { into("${rootProject.name}-lance-rest-server") } + from(projectDir.file("README.md")) { into("${rootProject.name}-lance-rest-server") } + into(outputDir) + rename { fileName -> + fileName.replace(".rest", "") + } + } + } + } + val compileTrinoConnector by registering { dependsOn("trino-connector:trino-connector:copyLibs") group = "gravitino distribution" @@ -791,7 +837,7 @@ tasks { } val assembleDistribution by registering(Tar::class) { - dependsOn("assembleTrinoConnector", "assembleIcebergRESTServer") + dependsOn("assembleTrinoConnector", "assembleIcebergRESTServer", "assembleLanceRESTServer") group = "gravitino distribution" finalizedBy("checksumDistribution") into("${rootProject.name}-$version-bin") @@ -823,6 +869,17 @@ tasks { destinationDirectory.set(projectDir.dir("distribution")) } + val assembleLanceRESTServer by registering(Tar::class) { + dependsOn("compileLanceRESTServer") + group = "gravitino distribution" + finalizedBy("checksumLanceRESTServerDistribution") + into("${rootProject.name}-lance-rest-server-$version-bin") + from(compileLanceRESTServer.map { it.outputs.files.single() }) + compression = Compression.GZIP + archiveFileName.set("${rootProject.name}-lance-rest-server-$version-bin.tar.gz") + destinationDirectory.set(projectDir.dir("distribution")) + } + register("checksumIcebergRESTServerDistribution") { group = "gravitino distribution" dependsOn(assembleIcebergRESTServer) @@ -839,9 +896,30 @@ tasks { } } + register("checksumLanceRESTServerDistribution") { + group = "gravitino distribution" + dependsOn(assembleLanceRESTServer) + val archiveFile = assembleLanceRESTServer.flatMap { it.archiveFile } + val checksumFile = archiveFile.map { archive -> + archive.asFile.let { it.resolveSibling("${it.name}.sha256") } + } + inputs.file(archiveFile) + outputs.file(checksumFile) + doLast { + checksumFile.get().writeText( + serviceOf().sha256(archiveFile.get().asFile).toString() + ) + } + } + register("checksumDistribution") { group = "gravitino distribution" - dependsOn(assembleDistribution, "checksumTrinoConnector", "checksumIcebergRESTServerDistribution") + dependsOn( + assembleDistribution, + "checksumTrinoConnector", + "checksumIcebergRESTServerDistribution", + "checksumLanceRESTServerDistribution" + ) val archiveFile = assembleDistribution.flatMap { it.archiveFile } val checksumFile = archiveFile.map { archive -> archive.asFile.let { it.resolveSibling("${it.name}.sha256") } @@ -885,6 +963,7 @@ tasks { !it.name.startsWith("filesystem") && !it.name.startsWith("flink") && !it.name.startsWith("iceberg") && + !it.name.startsWith("lance") && !it.name.startsWith("spark") && it.name != "hadoop-common" && it.name != "hive-metastore-common" && @@ -916,6 +995,7 @@ tasks { !it.name.startsWith("filesystem") && !it.name.startsWith("flink") && !it.name.startsWith("iceberg") && + !it.name.startsWith("lance") && !it.name.startsWith("integration-test") && !it.name.startsWith("spark") && !it.name.startsWith("trino-connector") && diff --git a/conf/gravitino-lance-rest-server.conf.template b/conf/gravitino-lance-rest-server.conf.template new file mode 100644 index 00000000000..32609bffcaa --- /dev/null +++ b/conf/gravitino-lance-rest-server.conf.template @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# THE CONFIGURATION FOR Lance REST SERVER +gravitino.lance-rest.shutdown.timeout = 3000 + +# THE CONFIGURATION FOR Lance REST WEB SERVER +# The host name of the built-in web server +gravitino.lance-rest.host = 0.0.0.0 +# The http port number of the built-in web server +gravitino.lance-rest.httpPort = 9101 +# The min thread size of the built-in web server +gravitino.lance-rest.minThreads = 24 +# The max thread size of the built-in web server +gravitino.lance-rest.maxThreads = 200 +# The stop timeout of the built-in web server +gravitino.lance-rest.stopTimeout = 30000 +# The timeout of idle connections +gravitino.lance-rest.idleTimeout = 30000 +# The executor thread pool work queue size of the built-in web server +gravitino.lance-rest.threadPoolWorkQueueSize = 100 +# The request header size of the built-in web server +gravitino.lance-rest.requestHeaderSize = 131072 +# The response header size of the built-in web server +gravitino.lance-rest.responseHeaderSize = 131072 + +# THE CONFIGURATION FOR Lance CATALOG +# The logical Lance catalog served by this REST endpoint +gravitino.lance-rest.catalog-name = default diff --git a/core/src/main/java/org/apache/gravitino/cache/EntityCacheWeigher.java b/core/src/main/java/org/apache/gravitino/cache/EntityCacheWeigher.java index 768f60eb647..edc3ca6b9b4 100644 --- a/core/src/main/java/org/apache/gravitino/cache/EntityCacheWeigher.java +++ b/core/src/main/java/org/apache/gravitino/cache/EntityCacheWeigher.java @@ -40,24 +40,36 @@ * or manually cleared. *
  • Catalog: 0, which means that it will never be evicted from the cache unless timeout occurs * or manually cleared. - *
  • Schema: 10 - *
  • Other: 100 + *
  • Schema: 500 + *
  • Tag: 100 + *
  • Policy: 100 + *
  • Other: 200 * */ public class EntityCacheWeigher implements Weigher> { - public static final int METALAKE_WEIGHT = 0; + public static final int METALAKE_WEIGHT = 0; // 0 means never evict public static final int CATALOG_WEIGHT = 0; - public static final int SCHEMA_WEIGHT = 10; - public static final int OTHER_WEIGHT = 100; + public static final int SCHEMA_WEIGHT = 500; // higher weight means it will less likely be evicted + public static final int OTHER_WEIGHT = 200; + public static final int TAG_WEIGHT = 100; + public static final int POLICY_WEIGHT = 100; private static final Logger LOG = LoggerFactory.getLogger(EntityCacheWeigher.class.getName()); private static final EntityCacheWeigher INSTANCE = new EntityCacheWeigher(); private static final Map ENTITY_WEIGHTS = ImmutableMap.of( Entity.EntityType.METALAKE, METALAKE_WEIGHT, Entity.EntityType.CATALOG, CATALOG_WEIGHT, - Entity.EntityType.SCHEMA, SCHEMA_WEIGHT); + Entity.EntityType.SCHEMA, SCHEMA_WEIGHT, + Entity.EntityType.TAG, TAG_WEIGHT, + Entity.EntityType.POLICY, POLICY_WEIGHT); private static final long MAX_WEIGHT = - 2 * (METALAKE_WEIGHT * 10 + CATALOG_WEIGHT * (10 * 200) + SCHEMA_WEIGHT * (10 * 200 * 1000)); + 2 + * (METALAKE_WEIGHT * 10 + + CATALOG_WEIGHT * 100 + + SCHEMA_WEIGHT * 1000 + + OTHER_WEIGHT * 10000 + + TAG_WEIGHT * 10000 + + POLICY_WEIGHT * 10000); @VisibleForTesting protected EntityCacheWeigher() {} diff --git a/core/src/test/java/org/apache/gravitino/cache/TestCacheConfig.java b/core/src/test/java/org/apache/gravitino/cache/TestCacheConfig.java index 55c62a1ae92..3944d9a282b 100644 --- a/core/src/test/java/org/apache/gravitino/cache/TestCacheConfig.java +++ b/core/src/test/java/org/apache/gravitino/cache/TestCacheConfig.java @@ -21,19 +21,25 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.collect.ImmutableMap; import java.time.Duration; import java.util.List; +import java.util.stream.IntStream; import org.apache.gravitino.Catalog; import org.apache.gravitino.Config; import org.apache.gravitino.Configs; import org.apache.gravitino.Entity; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; +import org.apache.gravitino.file.Fileset; import org.apache.gravitino.meta.AuditInfo; import org.apache.gravitino.meta.BaseMetalake; import org.apache.gravitino.meta.CatalogEntity; +import org.apache.gravitino.meta.FilesetEntity; import org.apache.gravitino.meta.SchemaEntity; import org.apache.gravitino.meta.SchemaVersion; +import org.apache.gravitino.meta.TagEntity; +import org.apache.gravitino.utils.NameIdentifierUtil; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -47,14 +53,127 @@ void testDefaultCacheConfig() { Assertions.assertTrue(config.get(Configs.CACHE_WEIGHER_ENABLED)); Assertions.assertEquals(10_000, config.get(Configs.CACHE_MAX_ENTRIES)); Assertions.assertEquals(3_600_000L, config.get(Configs.CACHE_EXPIRATION_TIME)); - Assertions.assertEquals(40_000_000L, EntityCacheWeigher.getMaxWeight()); + Assertions.assertEquals(9_000_000L, EntityCacheWeigher.getMaxWeight()); Assertions.assertEquals("caffeine", config.get(Configs.CACHE_IMPLEMENTATION)); } + @Test + void testPolicyAndTagCacheWeigher() throws InterruptedException { + Caffeine builder = Caffeine.newBuilder(); + builder.maximumWeight(2000); + builder.weigher(EntityCacheWeigher.getInstance()); + Cache> cache = builder.build(); + + BaseMetalake baseMetalake = + BaseMetalake.builder() + .withName("metalake1") + .withId(1L) + .withVersion(SchemaVersion.V_0_1) + .withAuditInfo(AuditInfo.EMPTY) + .build(); + cache.put( + EntityCacheRelationKey.of(NameIdentifier.of("metalake1"), Entity.EntityType.METALAKE), + List.of(baseMetalake)); + CatalogEntity catalogEntity = + CatalogEntity.builder() + .withNamespace(Namespace.of("metalake1")) + .withName("catalog1") + .withProvider("provider") + .withAuditInfo(AuditInfo.EMPTY) + .withId(100L) + .withType(Catalog.Type.RELATIONAL) + .build(); + cache.put( + EntityCacheRelationKey.of( + NameIdentifier.of(new String[] {"metalake1", "catalog1"}), Entity.EntityType.CATALOG), + List.of(catalogEntity)); + + SchemaEntity schemaEntity = + SchemaEntity.builder() + .withNamespace(Namespace.of("metalake1", "catalog1")) + .withName("schema1") + .withAuditInfo(AuditInfo.EMPTY) + .withId(1000L) + .build(); + cache.put( + EntityCacheRelationKey.of( + NameIdentifier.of(new String[] {"metalake1", "catalog1", "schema1"}), + Entity.EntityType.SCHEMA), + List.of(schemaEntity)); + + for (int i = 0; i < 5; i++) { + String filesetName = "fileset" + i; + FilesetEntity fileset = + FilesetEntity.builder() + .withNamespace(Namespace.of("metalake1", "catalog1", "schema1")) + .withName(filesetName) + .withAuditInfo(AuditInfo.EMPTY) + .withStorageLocations(ImmutableMap.of("default", "s3://bucket/path")) + .withId((long) (i + 1) * 10_000) + .withFilesetType(Fileset.Type.MANAGED) + .build(); + cache.put( + EntityCacheRelationKey.of( + NameIdentifier.of(new String[] {"metalake1", "catalog1", "schema1", filesetName}), + Entity.EntityType.FILESET), + List.of(fileset)); + } + + for (int i = 0; i < 10; i++) { + String tagName = "tag" + i; + NameIdentifier tagNameIdent = NameIdentifierUtil.ofTag("metalake", tagName); + TagEntity tagEntity = + TagEntity.builder() + .withNamespace(tagNameIdent.namespace()) + .withName(tagName) + .withAuditInfo(AuditInfo.EMPTY) + .withId((long) (i + 1) * 100_000) + .build(); + cache.put(EntityCacheRelationKey.of(tagNameIdent, Entity.EntityType.TAG), List.of(tagEntity)); + } + + // The weight of the cache has exceeded 2000, some entities will be evicted if we continue to + // add fileset entities. + for (int i = 5; i < 15; i++) { + String filesetName = "fileset" + i; + FilesetEntity fileset = + FilesetEntity.builder() + .withNamespace(Namespace.of("metalake1", "catalog1", "schema1")) + .withName(filesetName) + .withAuditInfo(AuditInfo.EMPTY) + .withStorageLocations(ImmutableMap.of("default", "s3://bucket/path")) + .withId((long) (i + 1) * 10_000) + .withFilesetType(Fileset.Type.MANAGED) + .build(); + cache.put( + EntityCacheRelationKey.of( + NameIdentifier.of(new String[] {"metalake1", "catalog1", "schema1", filesetName}), + Entity.EntityType.FILESET), + List.of(fileset)); + } + + Thread.sleep(1000); + + // There should no tag entities in the cache, because the weight of each tag entity is 100 that + // is higher than the maximum weight of the fileset entity which is 200. + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(10)) + .until( + () -> + IntStream.of(0, 1, 2, 3) + .mapToObj(i -> NameIdentifierUtil.ofTag("metalake", "tag" + i)) + .allMatch( + tagNameIdent -> + cache.getIfPresent( + EntityCacheRelationKey.of(tagNameIdent, Entity.EntityType.TAG)) + == null)); + } + @Test void testCaffeineCacheWithWeight() throws Exception { Caffeine builder = Caffeine.newBuilder(); - builder.maximumWeight(500); + builder.maximumWeight(5000); builder.weigher(EntityCacheWeigher.getInstance()); Cache> cache = builder.build(); @@ -121,11 +240,11 @@ void testCaffeineCacheWithWeight() throws Exception { NameIdentifier.of("metalake1.catalog" + i), Entity.EntityType.CATALOG))); } - // Only some of the 100 schemas are still in the cache, to be exact, 500 / 10 = 50 schemas. + // Only some of the 100 schemas are still in the cache, to be exact, 5000 / 500 = 10 schemas. Awaitility.await() .atMost(Duration.ofSeconds(5)) .pollInterval(Duration.ofMillis(10)) - .until(() -> cache.asMap().size() == 10 + 3 + 500 / 10); + .until(() -> cache.asMap().size() == 10 + 3 + 5000 / 500); } @Test diff --git a/lance/build.gradle.kts b/lance/build.gradle.kts new file mode 100644 index 00000000000..fa6eb7d5ef9 --- /dev/null +++ b/lance/build.gradle.kts @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +tasks.all { + enabled = false +} diff --git a/lance/lance-common/build.gradle.kts b/lance/lance-common/build.gradle.kts new file mode 100644 index 00000000000..5048d274f66 --- /dev/null +++ b/lance/lance-common/build.gradle.kts @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +description = "lance-common" + +plugins { + `maven-publish` + id("java") + id("idea") +} + +dependencies { + implementation(project(":api")) + implementation(project(":catalogs:catalog-common")) + implementation(project(":common")) { + exclude("*") + } + implementation(project(":core")) { + exclude("*") + } + + implementation(libs.guava) + implementation(libs.commons.lang3) + implementation(libs.slf4j.api) + + testImplementation(libs.junit.jupiter.api) + testRuntimeOnly(libs.junit.jupiter.engine) +} diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/config/LanceConfig.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/config/LanceConfig.java new file mode 100644 index 00000000000..f2d7e748cf8 --- /dev/null +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/config/LanceConfig.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.lance.common.config; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.Config; +import org.apache.gravitino.OverwriteDefaultConfig; +import org.apache.gravitino.config.ConfigBuilder; +import org.apache.gravitino.config.ConfigConstants; +import org.apache.gravitino.config.ConfigEntry; + +/** Base Lance REST configuration. */ +public class LanceConfig extends Config implements OverwriteDefaultConfig { + + public static final String LANCE_CONFIG_PREFIX = "gravitino.lance-rest."; + + public static final int DEFAULT_LANCE_REST_SERVICE_HTTP_PORT = 9101; + public static final int DEFAULT_LANCE_REST_SERVICE_HTTPS_PORT = 9533; + + public static final ConfigEntry CATALOG_NAME = + new ConfigBuilder(LANCE_CONFIG_PREFIX + "catalog-name") + .doc("Logical Lance catalog served by the REST endpoint") + .version(ConfigConstants.VERSION_0_1_0) + .stringConf() + .createWithDefault("default"); + + public LanceConfig(Map properties) { + super(false); + loadFromMap(properties, key -> true); + } + + public LanceConfig() { + super(false); + } + + public String getCatalogName() { + return get(CATALOG_NAME); + } + + @Override + public Map getOverwriteDefaultConfig() { + return ImmutableMap.of( + ConfigConstants.WEBSERVER_HTTP_PORT, + String.valueOf(DEFAULT_LANCE_REST_SERVICE_HTTP_PORT), + ConfigConstants.WEBSERVER_HTTPS_PORT, + String.valueOf(DEFAULT_LANCE_REST_SERVICE_HTTPS_PORT)); + } +} diff --git a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceCatalogService.java b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceCatalogService.java new file mode 100644 index 00000000000..67dd4c2d226 --- /dev/null +++ b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceCatalogService.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.lance.common.ops; + +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.lance.common.config.LanceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thin placeholder that will later bridge Lance catalog metadata into Gravitino. + * + *

    The current implementation keeps an in-memory catalog view so the REST surface mirrors the + * Iceberg catalog experience while the Lance integration is built out for real. + */ +public class LanceCatalogService implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(LanceCatalogService.class); + + private final LanceConfig config; + private final ConcurrentMap namespaces; + + public LanceCatalogService(LanceConfig config) { + this.config = config; + this.namespaces = new ConcurrentHashMap<>(); + seedSampleMetadata(); + } + + public String catalogName() { + return config.getCatalogName(); + } + + public boolean namespaceExists(String namespace) { + return namespaces.containsKey(namespace); + } + + public Map> listNamespaces() { + Map> result = new ConcurrentHashMap<>(); + namespaces.forEach( + (name, state) -> + result.put( + name, Collections.unmodifiableMap(new ConcurrentHashMap<>(state.properties)))); + return Map.copyOf(result); + } + + public List listNamespaceNames() { + return namespaces.keySet().stream() + .sorted(Comparator.naturalOrder()) + .collect(Collectors.toUnmodifiableList()); + } + + public NamespaceListingResult listChildNamespaces( + String parentId, String delimiter, String pageToken, Integer limit) { + String normalizedParent = StringUtils.trimToEmpty(parentId); + String effectiveDelimiter = StringUtils.isBlank(delimiter) ? "$" : delimiter; + + List sortedNamespaces = listNamespaceNames(); + List filtered = filterChildren(sortedNamespaces, normalizedParent, effectiveDelimiter); + + int startingOffset = parsePageToken(pageToken, filtered.size()); + int pageLimit = limit == null ? filtered.size() : validatePositiveLimit(limit, filtered.size()); + int endIndex = Math.min(filtered.size(), startingOffset + pageLimit); + + List page = filtered.subList(startingOffset, endIndex); + String nextToken = endIndex < filtered.size() ? String.valueOf(endIndex) : null; + return new NamespaceListingResult(normalizedParent, effectiveDelimiter, page, nextToken); + } + + public boolean createNamespace(String namespace) { + if (StringUtils.isBlank(namespace)) { + throw new IllegalArgumentException("Namespace must be non-empty"); + } + NamespaceState state = new NamespaceState(Collections.emptyMap()); + NamespaceState existing = namespaces.putIfAbsent(namespace, state); + if (existing == null) { + LOG.info("Created Lance namespace {}", namespace); + return true; + } + return false; + } + + public boolean dropNamespace(String namespace) { + NamespaceState state = namespaces.get(namespace); + if (state == null) { + return false; + } + if (!state.tables.isEmpty()) { + LOG.info("Refusing to drop Lance namespace {} because it still owns tables", namespace); + return false; + } + boolean removed = namespaces.remove(namespace, state); + if (removed) { + LOG.info("Dropped Lance namespace {}", namespace); + } + return removed; + } + + public List listTables(String namespace) { + NamespaceState state = namespaces.get(namespace); + if (state == null) { + throw new IllegalArgumentException("Unknown namespace: " + namespace); + } + return state.tables.keySet().stream() + .sorted(Comparator.naturalOrder()) + .collect(Collectors.toUnmodifiableList()); + } + + public Optional> loadTable(String namespace, String table) { + NamespaceState state = namespaces.get(namespace); + if (state == null) { + return Optional.empty(); + } + LanceTableEntry tableEntry = state.tables.get(table); + if (tableEntry == null) { + return Optional.empty(); + } + return Optional.of(tableEntry.describe()); + } + + public TableListingResult listTables( + String namespaceId, String delimiter, String pageToken, Integer limit) { + String normalizedNamespace = StringUtils.trimToEmpty(namespaceId); + if (StringUtils.isBlank(normalizedNamespace)) { + throw new IllegalArgumentException("Namespace id must be provided"); + } + + String effectiveDelimiter = StringUtils.isBlank(delimiter) ? "$" : delimiter; + + NamespaceState state = namespaces.get(normalizedNamespace); + if (state == null) { + throw new NoSuchElementException("Unknown namespace: " + normalizedNamespace); + } + + List sortedTables = + state.tables.keySet().stream() + .sorted(Comparator.naturalOrder()) + .collect(Collectors.toList()); + + int startingOffset = parsePageToken(pageToken, sortedTables.size()); + int pageLimit = + limit == null ? sortedTables.size() : validatePositiveLimit(limit, sortedTables.size()); + int endIndex = Math.min(sortedTables.size(), startingOffset + pageLimit); + + List page = sortedTables.subList(startingOffset, endIndex); + String nextToken = endIndex < sortedTables.size() ? String.valueOf(endIndex) : null; + + return new TableListingResult(normalizedNamespace, effectiveDelimiter, page, nextToken); + } + + @Override + public void close() { + namespaces.clear(); + } + + private void seedSampleMetadata() { + NamespaceState defaultNamespace = + namespaces.computeIfAbsent("default", key -> new NamespaceState(Collections.emptyMap())); + defaultNamespace.tables.put( + "sample_table", + new LanceTableEntry( + "sample_table", + "default", + ImmutableMap.of( + "format", "lance", + "uri", "file:///tmp/sample_table.lance", + "summary", "Placeholder Lance table metadata"))); + } + + private static final class NamespaceState { + private final Map properties; + private final ConcurrentMap tables; + + NamespaceState(Map properties) { + this.properties = new ConcurrentHashMap<>(properties); + this.tables = new ConcurrentHashMap<>(); + } + } + + private static final class LanceTableEntry { + private final String name; + private final String namespace; + private final Map metadata; + + LanceTableEntry(String name, String namespace, Map metadata) { + this.name = name; + this.namespace = namespace; + this.metadata = new ConcurrentHashMap<>(metadata); + } + + Map describe() { + Map result = new ConcurrentHashMap<>(metadata); + result.put("name", name); + result.put("namespace", namespace); + return Collections.unmodifiableMap(result); + } + } + + private List filterChildren(List namespaces, String parentId, String delimiter) { + boolean rootRequest = StringUtils.isBlank(parentId) || "root".equalsIgnoreCase(parentId); + if (rootRequest) { + return namespaces; + } + + String parentPrefix = parentId + delimiter; + return namespaces.stream() + .filter(ns -> ns.startsWith(parentPrefix)) + .map( + ns -> { + String remainder = ns.substring(parentPrefix.length()); + int nextDelimiter = remainder.indexOf(delimiter); + if (nextDelimiter >= 0) { + return remainder.substring(0, nextDelimiter); + } + return remainder; + }) + .filter(child -> !child.isEmpty()) + .distinct() + .sorted(Comparator.naturalOrder()) + .collect(Collectors.toUnmodifiableList()); + } + + private int parsePageToken(String pageToken, int size) { + if (StringUtils.isBlank(pageToken)) { + return 0; + } + try { + int parsed = Integer.parseInt(pageToken); + if (parsed < 0 || parsed > size) { + throw new IllegalArgumentException("Invalid page_token value"); + } + return parsed; + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException("Invalid page_token value", nfe); + } + } + + private int validatePositiveLimit(int limit, int size) { + if (limit <= 0) { + throw new IllegalArgumentException("limit must be greater than 0"); + } + return Math.min(limit, Math.max(size, 0)); + } + + public static final class NamespaceListingResult { + private final String parentId; + private final String delimiter; + private final List namespaces; + private final String nextPageToken; + + NamespaceListingResult( + String parentId, String delimiter, List namespaces, String nextPageToken) { + this.parentId = parentId; + this.delimiter = delimiter; + this.namespaces = List.copyOf(namespaces); + this.nextPageToken = nextPageToken; + } + + public String getParentId() { + return parentId; + } + + public String getDelimiter() { + return delimiter; + } + + public List getNamespaces() { + return namespaces; + } + + public Optional getNextPageToken() { + return Optional.ofNullable(nextPageToken); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof NamespaceListingResult)) { + return false; + } + NamespaceListingResult that = (NamespaceListingResult) o; + return Objects.equals(parentId, that.parentId) + && Objects.equals(delimiter, that.delimiter) + && Objects.equals(namespaces, that.namespaces) + && Objects.equals(nextPageToken, that.nextPageToken); + } + + @Override + public int hashCode() { + return Objects.hash(parentId, delimiter, namespaces, nextPageToken); + } + } + + public static final class TableListingResult { + private final String namespaceId; + private final String delimiter; + private final List tables; + private final String nextPageToken; + + TableListingResult( + String namespaceId, String delimiter, List tables, String nextPageToken) { + this.namespaceId = namespaceId; + this.delimiter = delimiter; + this.tables = List.copyOf(tables); + this.nextPageToken = nextPageToken; + } + + public String getNamespaceId() { + return namespaceId; + } + + public String getDelimiter() { + return delimiter; + } + + public List getTables() { + return tables; + } + + public Optional getNextPageToken() { + return Optional.ofNullable(nextPageToken); + } + } +} diff --git a/lance/lance-rest-server/build.gradle.kts b/lance/lance-rest-server/build.gradle.kts new file mode 100644 index 00000000000..03376095935 --- /dev/null +++ b/lance/lance-rest-server/build.gradle.kts @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +description = "lance-rest-server" + +plugins { + `maven-publish` + id("java") + id("idea") +} + +dependencies { + implementation(project(":api")) + implementation(project(":catalogs:catalog-common")) + implementation(project(":common")) { + exclude("*") + } + implementation(project(":core")) { + exclude("*") + } + implementation(project(":server-common")) { + exclude("*") + } + implementation(project(":lance:lance-common")) + + implementation(libs.bundles.jetty) + implementation(libs.bundles.jersey) + implementation(libs.bundles.log4j) + implementation(libs.bundles.metrics) + implementation(libs.bundles.prometheus) + implementation(libs.metrics.jersey2) + implementation(libs.guava) + implementation(libs.jackson.annotations) + implementation(libs.jackson.databind) + implementation(libs.jackson.datatype.jdk8) + implementation(libs.jackson.datatype.jsr310) + + testImplementation(libs.junit.jupiter.api) + testRuntimeOnly(libs.junit.jupiter.engine) +} + +tasks { + val copyDepends by registering(Copy::class) { + from(configurations.runtimeClasspath) + into("build/libs") + } + + jar { + finalizedBy(copyDepends) + } + + register("copyLibs", Copy::class) { + dependsOn(copyDepends, "build") + from("build/libs") + into("$rootDir/distribution/package/lance-rest-server/libs") + } + + register("copyLibsToStandalonePackage", Copy::class) { + dependsOn(copyDepends, "build") + from("build/libs") + into("$rootDir/distribution/gravitino-lance-rest-server/libs") + } + + register("copyLibAndConfigs", Copy::class) { + dependsOn("copyLibs") + } + + register("copyLibAndConfigsToStandalonePackage", Copy::class) { + dependsOn("copyLibsToStandalonePackage") + } + + named("generateMetadataFileForMavenJavaPublication") { + dependsOn(copyDepends) + } +} diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/LanceRESTService.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/LanceRESTService.java new file mode 100644 index 00000000000..e85dc37b4a3 --- /dev/null +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/LanceRESTService.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.lance; + +import java.util.Map; +import javax.servlet.Servlet; +import org.apache.gravitino.auxiliary.GravitinoAuxiliaryService; +import org.apache.gravitino.lance.common.config.LanceConfig; +import org.apache.gravitino.lance.common.ops.LanceCatalogService; +import org.apache.gravitino.lance.service.rest.LanceNamespaceOperations; +import org.apache.gravitino.server.web.JettyServer; +import org.apache.gravitino.server.web.JettyServerConfig; +import org.glassfish.jersey.jackson.JacksonFeature; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.servlet.ServletContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Thin REST service shell for Lance metadata. */ +public class LanceRESTService implements GravitinoAuxiliaryService { + + private static final Logger LOG = LoggerFactory.getLogger(LanceRESTService.class); + + public static final String SERVICE_NAME = "lance-rest"; + public static final String LANCE_SPEC = "/lance/*"; + + private JettyServer server; + private LanceCatalogService catalogService; + + @Override + public String shortName() { + return SERVICE_NAME; + } + + @Override + public void serviceInit(Map properties) { + LanceConfig lanceConfig = new LanceConfig(properties); + JettyServerConfig serverConfig = JettyServerConfig.fromConfig(lanceConfig); + + server = new JettyServer(); + server.initialize(serverConfig, SERVICE_NAME, false); + + catalogService = new LanceCatalogService(lanceConfig); + + ResourceConfig resourceConfig = new ResourceConfig(); + resourceConfig.register(JacksonFeature.class); + resourceConfig.register(new LanceNamespaceOperations(catalogService)); + + Servlet container = new ServletContainer(resourceConfig); + server.addServlet(container, LANCE_SPEC); + server.addCustomFilters(LANCE_SPEC); + server.addSystemFilters(LANCE_SPEC); + + LOG.info("Initialized Lance REST service for catalog {}", lanceConfig.getCatalogName()); + } + + @Override + public void serviceStart() { + if (server != null) { + server.start(); + LOG.info("Lance REST service started"); + } + } + + @Override + public void serviceStop() throws Exception { + if (server != null) { + server.stop(); + LOG.info("Lance REST service stopped"); + } + if (catalogService != null) { + catalogService.close(); + } + } + + public void join() { + if (server != null) { + server.join(); + } + } +} diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/server/GravitinoLanceRESTServer.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/server/GravitinoLanceRESTServer.java new file mode 100644 index 00000000000..e28bdd5c175 --- /dev/null +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/server/GravitinoLanceRESTServer.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.lance.server; + +import org.apache.gravitino.Config; +import org.apache.gravitino.GravitinoEnv; +import org.apache.gravitino.lance.LanceRESTService; +import org.apache.gravitino.lance.common.config.LanceConfig; +import org.apache.gravitino.server.ServerConfig; +import org.apache.gravitino.server.authentication.ServerAuthenticator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Bootstrap entry point for the Lance REST facade. */ +public class GravitinoLanceRESTServer { + + private static final Logger LOG = LoggerFactory.getLogger(GravitinoLanceRESTServer.class); + + public static final String CONF_FILE = "gravitino-lance-rest-server.conf"; + + private final Config serverConfig; + + private LanceRESTService lanceRESTService; + private GravitinoEnv gravitinoEnv; + + public GravitinoLanceRESTServer(Config config) { + this.serverConfig = config; + this.gravitinoEnv = GravitinoEnv.getInstance(); + this.lanceRESTService = new LanceRESTService(); + } + + private void initialize() { + gravitinoEnv.initializeBaseComponents(serverConfig); + lanceRESTService.serviceInit( + serverConfig.getConfigsWithPrefix(LanceConfig.LANCE_CONFIG_PREFIX)); + ServerAuthenticator.getInstance().initialize(serverConfig); + } + + private void start() { + gravitinoEnv.start(); + lanceRESTService.serviceStart(); + } + + private void join() { + lanceRESTService.join(); + } + + private void stop() throws Exception { + lanceRESTService.serviceStop(); + LOG.info("Gravitino Lance REST service stopped"); + } + + public static void main(String[] args) { + LOG.info("Starting Gravitino Lance REST Server"); + String confPath = System.getenv("GRAVITINO_TEST") == null ? "" : args[0]; + ServerConfig serverConfig = ServerConfig.loadConfig(confPath, CONF_FILE); + GravitinoLanceRESTServer lanceRESTServer = new GravitinoLanceRESTServer(serverConfig); + lanceRESTServer.initialize(); + + try { + lanceRESTServer.start(); + } catch (Exception e) { + LOG.error("Error while running lance REST server", e); + System.exit(-1); + } + LOG.info("Done, Gravitino Lance REST server started."); + + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + try { + Thread.sleep(serverConfig.get(ServerConfig.SERVER_SHUTDOWN_TIMEOUT)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Interrupted exception:", e); + } catch (Exception e) { + LOG.error("Error while running clean-up tasks in shutdown hook", e); + } + })); + lanceRESTServer.join(); + + LOG.info("Shutting down Gravitino Lance REST Server ... "); + try { + lanceRESTServer.stop(); + LOG.info("Gravitino Lance REST Server has shut down."); + } catch (Exception e) { + LOG.error("Error while stopping Gravitino Lance REST Server", e); + } + } +} diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceListNamespacesResponse.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceListNamespacesResponse.java new file mode 100644 index 00000000000..11ec7d3c3c5 --- /dev/null +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceListNamespacesResponse.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.lance.service.rest; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +@JsonInclude(JsonInclude.Include.NON_NULL) +public class LanceListNamespacesResponse { + + @JsonProperty("id") + private final String id; + + @JsonProperty("delimiter") + private final String delimiter; + + @JsonProperty("namespaces") + private final List namespaces; + + @JsonProperty("next_page_token") + private final String nextPageToken; + + public LanceListNamespacesResponse( + String id, String delimiter, List namespaces, String nextPageToken) { + this.id = id; + this.delimiter = delimiter; + this.namespaces = List.copyOf(namespaces); + this.nextPageToken = nextPageToken; + } + + public String getId() { + return id; + } + + public String getDelimiter() { + return delimiter; + } + + public List getNamespaces() { + return namespaces; + } + + public String getNextPageToken() { + return nextPageToken; + } +} diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceListTablesResponse.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceListTablesResponse.java new file mode 100644 index 00000000000..82e2a909787 --- /dev/null +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceListTablesResponse.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.lance.service.rest; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +@JsonInclude(JsonInclude.Include.NON_NULL) +public class LanceListTablesResponse { + + @JsonProperty("id") + private final String namespaceId; + + @JsonProperty("delimiter") + private final String delimiter; + + @JsonProperty("tables") + private final List tables; + + @JsonProperty("next_page_token") + private final String nextPageToken; + + public LanceListTablesResponse( + String namespaceId, String delimiter, List tables, String nextPageToken) { + this.namespaceId = namespaceId; + this.delimiter = delimiter; + this.tables = List.copyOf(tables); + this.nextPageToken = nextPageToken; + } + + public String getNamespaceId() { + return namespaceId; + } + + public String getDelimiter() { + return delimiter; + } + + public List getTables() { + return tables; + } + + public String getNextPageToken() { + return nextPageToken; + } +} diff --git a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java new file mode 100644 index 00000000000..0ac9457eff9 --- /dev/null +++ b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.lance.service.rest; + +import java.util.NoSuchElementException; +import javax.ws.rs.BadRequestException; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.Encoded; +import javax.ws.rs.GET; +import javax.ws.rs.NotFoundException; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.gravitino.lance.common.ops.LanceCatalogService; + +@Path("/v1/namespace") +@Produces(MediaType.APPLICATION_JSON) +public class LanceNamespaceOperations { + + private final LanceCatalogService catalogService; + + public LanceNamespaceOperations(LanceCatalogService catalogService) { + this.catalogService = catalogService; + } + + @GET + @Path("/{id}/list") + public Response listNamespaces( + @Encoded @PathParam("id") String namespaceId, + @DefaultValue("$") @QueryParam("delimiter") String delimiter, + @QueryParam("page_token") String pageToken, + @QueryParam("limit") Integer limit) { + try { + LanceCatalogService.NamespaceListingResult result = + catalogService.listChildNamespaces(namespaceId, delimiter, pageToken, limit); + LanceListNamespacesResponse payload = + new LanceListNamespacesResponse( + result.getParentId(), + result.getDelimiter(), + result.getNamespaces(), + result.getNextPageToken().orElse(null)); + return Response.ok(payload).build(); + } catch (NoSuchElementException nse) { + throw new NotFoundException(nse.getMessage(), nse); + } catch (IllegalArgumentException iae) { + throw new BadRequestException(iae.getMessage(), iae); + } + } + + @GET + @Path("/{id}/table/list") + public Response listTables( + @Encoded @PathParam("id") String namespaceId, + @DefaultValue("$") @QueryParam("delimiter") String delimiter, + @QueryParam("page_token") String pageToken, + @QueryParam("limit") Integer limit) { + try { + LanceCatalogService.TableListingResult result = + catalogService.listTables(namespaceId, delimiter, pageToken, limit); + LanceListTablesResponse payload = + new LanceListTablesResponse( + result.getNamespaceId(), + result.getDelimiter(), + result.getTables(), + result.getNextPageToken().orElse(null)); + return Response.ok(payload).build(); + } catch (NoSuchElementException nse) { + throw new NotFoundException(nse.getMessage(), nse); + } catch (IllegalArgumentException iae) { + throw new BadRequestException(iae.getMessage(), iae); + } + } +} diff --git a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java index e435bac4a1c..874424feb27 100644 --- a/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java +++ b/server-common/src/main/java/org/apache/gravitino/server/authorization/MetadataFilterHelper.java @@ -156,6 +156,35 @@ public static E[] filterByExpression( Entity.EntityType entityType, E[] entities, Function toNameIdentifier) { + GravitinoAuthorizer authorizer = + GravitinoAuthorizerProvider.getInstance().getGravitinoAuthorizer(); + Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal(); + return filterByExpression( + metalake, expression, entityType, entities, toNameIdentifier, currentPrincipal, authorizer); + } + + /** + * Call {@link AuthorizationExpressionEvaluator} and use specified Principal and + * GravitinoAuthorizer to filter the metadata list + * + * @param metalake metalake name + * @param expression authorization expression + * @param entityType entity type + * @param entities metadata entities + * @param toNameIdentifier function to convert entity to NameIdentifier + * @param currentPrincipal current principal + * @param authorizer authorizer to filter metadata + * @return Filtered Metadata Entity + * @param Entity class + */ + public static E[] filterByExpression( + String metalake, + String expression, + Entity.EntityType entityType, + E[] entities, + Function toNameIdentifier, + Principal currentPrincipal, + GravitinoAuthorizer authorizer) { if (!enableAuthorization()) { return entities; } @@ -163,7 +192,6 @@ public static E[] filterByExpression( AuthorizationRequestContext authorizationRequestContext = new AuthorizationRequestContext(); List> futures = new ArrayList<>(); for (E entity : entities) { - Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal(); futures.add( CompletableFuture.supplyAsync( () -> { @@ -172,7 +200,7 @@ public static E[] filterByExpression( currentPrincipal, () -> { AuthorizationExpressionEvaluator authorizationExpressionEvaluator = - new AuthorizationExpressionEvaluator(expression); + new AuthorizationExpressionEvaluator(expression, authorizer); NameIdentifier nameIdentifier = toNameIdentifier.apply(entity); Map nameIdentifierMap = spiltMetadataNames(metalake, entityType, nameIdentifier); diff --git a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java index 78ddf58d44d..02c455e887b 100644 --- a/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java +++ b/server-common/src/main/java/org/apache/gravitino/server/authorization/expression/AuthorizationExpressionEvaluator.java @@ -39,6 +39,7 @@ public class AuthorizationExpressionEvaluator { private final String ognlAuthorizationExpression; + private final GravitinoAuthorizer authorizer; /** * Use {@link AuthorizationExpressionConverter} to convert the authorization expression into an @@ -47,8 +48,19 @@ public class AuthorizationExpressionEvaluator { * @param expression authorization expression */ public AuthorizationExpressionEvaluator(String expression) { + this(expression, GravitinoAuthorizerProvider.getInstance().getGravitinoAuthorizer()); + } + + /** + * Constructor of AuthorizationExpressionEvaluator + * + * @param expression authorization expression + * @param authorizer GravitinoAuthorizer instance + */ + public AuthorizationExpressionEvaluator(String expression, GravitinoAuthorizer authorizer) { this.ognlAuthorizationExpression = AuthorizationExpressionConverter.convertToOgnlExpression(expression); + this.authorizer = authorizer; } /** @@ -61,7 +73,24 @@ public AuthorizationExpressionEvaluator(String expression) { public boolean evaluate( Map metadataNames, AuthorizationRequestContext requestContext) { - return evaluate(metadataNames, new HashMap<>(), requestContext); + Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal(); + return evaluate(metadataNames, new HashMap<>(), requestContext, currentPrincipal); + } + + /** + * Use OGNL expressions to invoke GravitinoAuthorizer for authorizing multiple types of metadata + * IDs. + * + * @param metadataNames key-metadata type, value-metadata NameIdentifier + * @param requestContext authorization request context + * @param principal current principal + * @return authorization result + */ + public boolean evaluate( + Map metadataNames, + AuthorizationRequestContext requestContext, + Principal principal) { + return evaluate(metadataNames, new HashMap<>(), requestContext, principal); } /** @@ -77,11 +106,27 @@ public boolean evaluate( Map pathParams, AuthorizationRequestContext requestContext) { Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal(); - GravitinoAuthorizer gravitinoAuthorizer = - GravitinoAuthorizerProvider.getInstance().getGravitinoAuthorizer(); + return evaluate(metadataNames, pathParams, requestContext, currentPrincipal); + } + + /** + * Use OGNL expressions to invoke GravitinoAuthorizer for authorizing multiple types of metadata + * IDs. + * + * @param metadataNames key-metadata type, value-metadata NameIdentifier + * @param pathParams params from request path + * @param requestContext authorization request context + * @param currentPrincipal current principal + * @return authorization result + */ + private boolean evaluate( + Map metadataNames, + Map pathParams, + AuthorizationRequestContext requestContext, + Principal currentPrincipal) { OgnlContext ognlContext = Ognl.createDefaultContext(null); ognlContext.put("principal", currentPrincipal); - ognlContext.put("authorizer", gravitinoAuthorizer); + ognlContext.put("authorizer", authorizer); ognlContext.put("authorizationContext", requestContext); ognlContext.putAll(pathParams); metadataNames.forEach( diff --git a/server-common/src/test/java/org/apache/gravitino/server/authorization/expression/TestAuthorizationExpressionEvaluator.java b/server-common/src/test/java/org/apache/gravitino/server/authorization/expression/TestAuthorizationExpressionEvaluator.java index 59def1e5365..ed3f291dc87 100644 --- a/server-common/src/test/java/org/apache/gravitino/server/authorization/expression/TestAuthorizationExpressionEvaluator.java +++ b/server-common/src/test/java/org/apache/gravitino/server/authorization/expression/TestAuthorizationExpressionEvaluator.java @@ -42,8 +42,6 @@ public class TestAuthorizationExpressionEvaluator { public void testEvaluator() { String expression = "CATALOG::USE_CATALOG && SCHEMA::USE_SCHEMA && (TABLE::SELECT_TABLE || TABLE::MODIFY_TABLE)"; - AuthorizationExpressionEvaluator authorizationExpressionEvaluator = - new AuthorizationExpressionEvaluator(expression); try (MockedStatic principalUtilsMocked = mockStatic(PrincipalUtils.class); MockedStatic mockStatic = mockStatic(GravitinoAuthorizerProvider.class)) { @@ -53,6 +51,9 @@ public void testEvaluator() { GravitinoAuthorizerProvider mockedProvider = mock(GravitinoAuthorizerProvider.class); mockStatic.when(GravitinoAuthorizerProvider::getInstance).thenReturn(mockedProvider); when(mockedProvider.getGravitinoAuthorizer()).thenReturn(new MockGravitinoAuthorizer()); + AuthorizationExpressionEvaluator authorizationExpressionEvaluator = + new AuthorizationExpressionEvaluator(expression); + Map metadataNames = new HashMap<>(); metadataNames.put(Entity.EntityType.METALAKE, NameIdentifierUtil.ofMetalake("testMetalake")); metadataNames.put( @@ -79,17 +80,19 @@ public void testEvaluator() { @Test public void testEvaluatorWithOwner() { String expression = "METALAKE::OWNER || CATALOG::CREATE_CATALOG"; - AuthorizationExpressionEvaluator authorizationExpressionEvaluator = - new AuthorizationExpressionEvaluator(expression); try (MockedStatic principalUtilsMocked = mockStatic(PrincipalUtils.class); MockedStatic mockStatic = mockStatic(GravitinoAuthorizerProvider.class)) { - principalUtilsMocked - .when(PrincipalUtils::getCurrentPrincipal) - .thenReturn(new UserPrincipal("tester")); GravitinoAuthorizerProvider mockedProvider = mock(GravitinoAuthorizerProvider.class); mockStatic.when(GravitinoAuthorizerProvider::getInstance).thenReturn(mockedProvider); when(mockedProvider.getGravitinoAuthorizer()).thenReturn(new MockGravitinoAuthorizer()); + + AuthorizationExpressionEvaluator authorizationExpressionEvaluator = + new AuthorizationExpressionEvaluator(expression); + principalUtilsMocked + .when(PrincipalUtils::getCurrentPrincipal) + .thenReturn(new UserPrincipal("tester")); + Map metadataNames = new HashMap<>(); metadataNames.put( Entity.EntityType.METALAKE, NameIdentifierUtil.ofMetalake("metalakeWithOutOwner")); diff --git a/settings.gradle.kts b/settings.gradle.kts index 21245ecf8bb..20c8e014cdb 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -58,6 +58,8 @@ if (gradle.startParameter.projectProperties["enableFuse"]?.toBoolean() == true) } include("iceberg:iceberg-common") include("iceberg:iceberg-rest-server") +include("lance:lance-common") +include("lance:lance-rest-server") include("authorizations:authorization-ranger", "authorizations:authorization-common", "authorizations:authorization-chain") include("trino-connector:trino-connector", "trino-connector:integration-test") include("spark-connector:spark-common")