diff --git a/pom.xml b/pom.xml index 0a04b958c6e1e..2b73d65744a1a 100644 --- a/pom.xml +++ b/pom.xml @@ -34,13 +34,14 @@ + 1.8 ${project.basedir} true true src/checkstyle/presto-checks.xml - 1.8.0-151 + 11 3.3.9 4.7.1 @@ -190,6 +191,7 @@ redis-hbo-provider presto-singlestore presto-hana + presto-lance @@ -2277,6 +2279,7 @@ org/joda/time/.* + 8 diff --git a/presto-lance/pom.xml b/presto-lance/pom.xml new file mode 100644 index 0000000000000..0186b545741bd --- /dev/null +++ b/presto-lance/pom.xml @@ -0,0 +1,181 @@ + + + 4.0.0 + + com.facebook.presto + presto-root + 0.287-SNAPSHOT + + + presto-lance + Presto - LanceDB Connector + presto-plugin + + + ${project.parent.basedir} + 15.0.0 + + + + + com.facebook.airlift + bootstrap + + + + com.facebook.airlift + configuration + + + + com.facebook.airlift + log + + + + com.google.guava + guava + + + + com.google.inject + guice + + + + + com.facebook.presto + presto-spi + provided + + + + com.facebook.presto + presto-common + provided + + + + io.airlift + slice + provided + + + + io.airlift + units + provided + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + com.fasterxml.jackson.core + jackson-core + + + + javax.inject + javax.inject + + + + javax.validation + validation-api + + + + com.lancedb + lance-core + 0.11.0-SNAPSHOT + + + org.slf4j + slf4j-api + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + + + + + + com.lancedb + lancedb-core + 0.1-SNAPSHOT + + + + org.apache.arrow + arrow-memory-core + ${arrow.version} + + + org.slf4j + slf4j-api + + + + + + org.apache.arrow + arrow-vector + ${arrow.version} + + + org.slf4j + slf4j-api + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + + + + + + + com.facebook.presto + presto-testng-services + test + + + org.testng + testng + test + + + com.facebook.presto + presto-tests + test + + + com.facebook.presto + presto-main + test + + + + + + + org.basepom.maven + duplicate-finder-maven-plugin + + + LICENSE-EDL-1.0.txt + LICENSE-EPL-1.0.txt + arrow-git.properties + about.html + + + + + + + diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/LanceConfig.java b/presto-lance/src/main/java/com/facebook/presto/lance/LanceConfig.java new file mode 100644 index 0000000000000..71fc6b0042d5c --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/LanceConfig.java @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance; + +import com.facebook.airlift.configuration.Config; +import com.facebook.airlift.configuration.ConfigDescription; + +import javax.validation.constraints.NotNull; + +public class LanceConfig +{ + private String rootUrl = ""; + @NotNull + public String getRootUrl() + { + return rootUrl; + } + + @Config("lance.root-url") + @ConfigDescription("Lance root url") + public LanceConfig setRootUrl(String rootUrl) + { + this.rootUrl = rootUrl; + return this; + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/LanceConnector.java b/presto-lance/src/main/java/com/facebook/presto/lance/LanceConnector.java new file mode 100644 index 0000000000000..82493debbc78b --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/LanceConnector.java @@ -0,0 +1,211 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance; + +import com.facebook.presto.lance.metadata.LanceMetadata; +import com.facebook.presto.lance.metadata.LanceTransactionHandle; +import com.facebook.presto.lance.ingestion.LancePageSinkProvider; +import com.facebook.presto.lance.scan.LancePageSourceProvider; +import com.facebook.presto.lance.splits.LanceSplitManager; +import com.facebook.presto.spi.SystemTable; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorAccessControl; +import com.facebook.presto.spi.connector.ConnectorCapabilities; +import com.facebook.presto.spi.connector.ConnectorCommitHandle; +import com.facebook.presto.spi.connector.ConnectorIndexProvider; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.connector.ConnectorMetadataUpdaterProvider; +import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider; +import com.facebook.presto.spi.connector.ConnectorPageSinkProvider; +import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; +import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider; +import com.facebook.presto.spi.connector.ConnectorRecordSetProvider; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.spi.connector.ConnectorTypeSerdeProvider; +import com.facebook.presto.spi.procedure.Procedure; +import com.facebook.presto.spi.session.PropertyMetadata; +import com.facebook.presto.spi.transaction.IsolationLevel; +import com.google.inject.Inject; + +import java.util.List; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +public class LanceConnector + implements Connector +{ + private final LanceMetadata metadata; + private final LanceSplitManager splitManager; + private final LancePageSourceProvider pageSourceProvider; + private final LancePageSinkProvider pageSinkProvider; + + @Inject + public LanceConnector( + LanceMetadata metadata, + LanceSplitManager splitManager, + LancePageSourceProvider pageSourceProvider, + LancePageSinkProvider pageSinkProvider) + { + this.metadata = requireNonNull(metadata, "metadata is null"); + this.splitManager = requireNonNull(splitManager, "splitManager is null"); + this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null"); + this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvide is null"); + } + + @Override + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) + { + return LanceTransactionHandle.INSTANCE; + } + + @Override + public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) + { + return metadata; + } + + @Override + public ConnectorSplitManager getSplitManager() + { + return splitManager; + } + + @Override + public ConnectorPageSourceProvider getPageSourceProvider() + { + return pageSourceProvider; + } + + @Override + public ConnectorRecordSetProvider getRecordSetProvider() + { + return Connector.super.getRecordSetProvider(); + } + + @Override + public ConnectorPageSinkProvider getPageSinkProvider() + { + return pageSinkProvider; + } + + @Override + public ConnectorIndexProvider getIndexProvider() + { + return Connector.super.getIndexProvider(); + } + + @Override + public ConnectorNodePartitioningProvider getNodePartitioningProvider() + { + return Connector.super.getNodePartitioningProvider(); + } + + @Override + public ConnectorPlanOptimizerProvider getConnectorPlanOptimizerProvider() + { + return Connector.super.getConnectorPlanOptimizerProvider(); + } + + @Override + public ConnectorMetadataUpdaterProvider getConnectorMetadataUpdaterProvider() + { + return Connector.super.getConnectorMetadataUpdaterProvider(); + } + + @Override + public ConnectorTypeSerdeProvider getConnectorTypeSerdeProvider() + { + return Connector.super.getConnectorTypeSerdeProvider(); + } + + @Override + public Set getSystemTables() + { + return Connector.super.getSystemTables(); + } + + @Override + public Set getProcedures() + { + return Connector.super.getProcedures(); + } + + @Override + public List> getSessionProperties() + { + return Connector.super.getSessionProperties(); + } + + @Override + public List> getSchemaProperties() + { + return Connector.super.getSchemaProperties(); + } + + @Override + public List> getAnalyzeProperties() + { + return Connector.super.getAnalyzeProperties(); + } + + @Override + public List> getTableProperties() + { + return Connector.super.getTableProperties(); + } + + @Override + public List> getColumnProperties() + { + return Connector.super.getColumnProperties(); + } + + @Override + public ConnectorAccessControl getAccessControl() + { + return Connector.super.getAccessControl(); + } + + @Override + public ConnectorCommitHandle commit(ConnectorTransactionHandle transactionHandle) + { + return Connector.super.commit(transactionHandle); + } + + @Override + public void rollback(ConnectorTransactionHandle transactionHandle) + { + Connector.super.rollback(transactionHandle); + } + + @Override + public boolean isSingleStatementWritesOnly() + { + return Connector.super.isSingleStatementWritesOnly(); + } + + @Override + public void shutdown() + { + Connector.super.shutdown(); + } + + @Override + public Set getCapabilities() + { + return Connector.super.getCapabilities(); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/LanceConnectorFactory.java b/presto-lance/src/main/java/com/facebook/presto/lance/LanceConnectorFactory.java new file mode 100644 index 0000000000000..1e66371ceb946 --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/LanceConnectorFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.PageIndexerFactory; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorContext; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.facebook.presto.spi.function.FunctionMetadataManager; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.relation.RowExpressionService; +import com.google.inject.Injector; + +import java.util.Map; + +public class LanceConnectorFactory + implements ConnectorFactory +{ + @Override + public String getName() + { + return "lance"; + } + + @Override + public ConnectorHandleResolver getHandleResolver() + { + return new LanceHandleResolver(); + } + + @Override + public Connector create(String catalogName, Map config, ConnectorContext context) + { + { + ClassLoader classLoader = LanceConnectorFactory.class.getClassLoader(); + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + Bootstrap app = new Bootstrap( + new LanceModule(), + binder -> { + binder.bind(NodeManager.class).toInstance(context.getNodeManager()); + binder.bind(TypeManager.class).toInstance(context.getTypeManager()); + binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory()); + binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution()); + binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager()); + binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService()); + binder.bind(FilterStatsCalculatorService.class).toInstance(context.getFilterStatsCalculatorService()); + }); + + Injector injector = app + .doNotInitializeLogging() + .setRequiredConfigurationProperties(config) + .initialize(); + + return injector.getInstance(LanceConnector.class); + } + } + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/LanceHandleResolver.java b/presto-lance/src/main/java/com/facebook/presto/lance/LanceHandleResolver.java new file mode 100644 index 0000000000000..a9f6c22fcbc72 --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/LanceHandleResolver.java @@ -0,0 +1,96 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance; + +import com.facebook.presto.lance.ingestion.LanceIngestionTableHandle; +import com.facebook.presto.lance.metadata.LanceColumnHandle; +import com.facebook.presto.lance.metadata.LanceTableHandle; +import com.facebook.presto.lance.metadata.LanceTableLayoutHandle; +import com.facebook.presto.lance.metadata.LanceTransactionHandle; +import com.facebook.presto.lance.splits.LanceSplit; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.ConnectorIndexHandle; +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorMetadataUpdateHandle; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorPartitioningHandle; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +public class LanceHandleResolver + implements ConnectorHandleResolver +{ + @Override + public Class getTableHandleClass() + { + return LanceTableHandle.class; + } + + @Override + public Class getTableLayoutHandleClass() + { + return LanceTableLayoutHandle.class; + } + + @Override + public Class getColumnHandleClass() + { + return LanceColumnHandle.class; + } + + @Override + public Class getSplitClass() + { + return LanceSplit.class; + } + + @Override + public Class getIndexHandleClass() + { + return ConnectorHandleResolver.super.getIndexHandleClass(); + } + + @Override + public Class getOutputTableHandleClass() + { + return LanceIngestionTableHandle.class; + } + + @Override + public Class getInsertTableHandleClass() + { + return LanceIngestionTableHandle.class; + } + + @Override + public Class getPartitioningHandleClass() + { + return ConnectorHandleResolver.super.getPartitioningHandleClass(); + } + + @Override + public Class getTransactionHandleClass() + { + return LanceTransactionHandle.class; + } + + @Override + public Class getMetadataUpdateHandleClass() + { + return ConnectorHandleResolver.super.getMetadataUpdateHandleClass(); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/LanceModule.java b/presto-lance/src/main/java/com/facebook/presto/lance/LanceModule.java new file mode 100644 index 0000000000000..fe5f6d20c9be0 --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/LanceModule.java @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance; + +import com.facebook.presto.lance.client.LanceClient; +import com.facebook.presto.lance.ingestion.LancePageWriter; +import com.facebook.presto.lance.metadata.LanceMetadata; +import com.facebook.presto.lance.ingestion.LancePageSinkProvider; +import com.facebook.presto.lance.scan.LancePageSourceProvider; +import com.facebook.presto.lance.splits.LanceSplitManager; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; + +public class LanceModule + implements Module +{ + @Override + public void configure(Binder binder) + { + configBinder(binder).bindConfig(LanceConfig.class); + binder.bind(LanceClient.class).in(Scopes.SINGLETON); + binder.bind(LanceConnector.class).in(Scopes.SINGLETON); + binder.bind(LanceMetadata.class).in(Scopes.SINGLETON); + binder.bind(LanceHandleResolver.class).in(Scopes.SINGLETON); + binder.bind(LanceSplitManager.class).in(Scopes.SINGLETON); + binder.bind(LancePageSourceProvider.class).in(Scopes.SINGLETON); + binder.bind(LancePageSinkProvider.class).in(Scopes.SINGLETON); + binder.bind(LancePageWriter.class).in(Scopes.SINGLETON); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/LancePlugin.java b/presto-lance/src/main/java/com/facebook/presto/lance/LancePlugin.java new file mode 100644 index 0000000000000..882c3d88f271d --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/LancePlugin.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance; + +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.google.common.collect.ImmutableList; + +public class LancePlugin + implements Plugin +{ + @Override + public Iterable getConnectorFactories() + { + return ImmutableList.of(new LanceConnectorFactory()); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/client/LanceClient.java b/presto-lance/src/main/java/com/facebook/presto/lance/client/LanceClient.java new file mode 100644 index 0000000000000..a8a6481c61a5f --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/client/LanceClient.java @@ -0,0 +1,118 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.client; + +import com.facebook.presto.lance.LanceConfig; +import com.lancedb.lance.Dataset; +import com.lancedb.lance.DatasetFragment; +import com.lancedb.lance.FragmentMetadata; +import com.lancedb.lance.FragmentOperation; +import com.lancedb.lance.WriteParams; +import com.lancedb.lancedb.Connection; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.types.pojo.Schema; + +import javax.inject.Inject; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class LanceClient +{ + private final LanceConfig config; + private final Connection conn; + private final RootAllocator arrowRootAllocator; + + @Inject + public LanceClient(LanceConfig config) + { + this.config = requireNonNull(config, "config is null"); + conn = Connection.connect(config.getRootUrl()); + arrowRootAllocator = new RootAllocator(); + } + + public Connection getConn() + { + return conn; + } + + public List getFragments(String tableName) + { + try (Dataset dataset = Dataset.open(getTablePath(tableName), arrowRootAllocator)) { + return dataset.getFragments(); + } + } + + public RootAllocator getArrowRootAllocator() + { + return arrowRootAllocator; + } + + public void createTable(String tableName, Schema schema) + { + String tablePath = getTablePath(tableName); + //Create the directory for the table if it's on local file system + if (tablePath.startsWith("file:")) { + try { + new File(new URI(tablePath)).mkdir(); + } + catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + Dataset dataset = Dataset.create(arrowRootAllocator, tablePath, schema, new WriteParams.Builder().build()); + dataset.close(); + } + + public Schema getSchema(String tableName) + { + try (Dataset dataset = Dataset.open(getTablePath(tableName), arrowRootAllocator)) { + return dataset.getSchema(); + } + } + + public String getTablePath(String tableName) + { + return Paths.get(config.getRootUrl(), tableName + ".lance").toUri().toString(); + } + + public long appendAndCommit(String tableName, List fragmentMetadataList, long tableReadVersion) + { + FragmentOperation.Append appendOp = new FragmentOperation.Append(fragmentMetadataList); + try (Dataset dataset = Dataset.commit(arrowRootAllocator, getTablePath(tableName), appendOp, Optional.of(tableReadVersion))) { + return dataset.version(); + } + } + + public long getTableVersion(String tableName) + { + try (Dataset dataset = Dataset.open(getTablePath(tableName), arrowRootAllocator)) { + return dataset.version(); + } + } + + public Dataset open(String tableName) { + return Dataset.open(getTablePath(tableName), arrowRootAllocator); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/fragments/FragmentInfo.java b/presto-lance/src/main/java/com/facebook/presto/lance/fragments/FragmentInfo.java new file mode 100644 index 0000000000000..c43531e912161 --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/fragments/FragmentInfo.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.fragments; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public final class FragmentInfo +{ + private final int fragmentId; + + @JsonCreator + public FragmentInfo( + @JsonProperty("id") int fragmentId) + { + this.fragmentId = fragmentId; + } + + @JsonProperty + public int getFragmentId() + { + return fragmentId; + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/ingestion/LanceIngestionTableHandle.java b/presto-lance/src/main/java/com/facebook/presto/lance/ingestion/LanceIngestionTableHandle.java new file mode 100644 index 0000000000000..3285f8f7d9def --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/ingestion/LanceIngestionTableHandle.java @@ -0,0 +1,93 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.ingestion; + +import com.facebook.presto.lance.metadata.LanceColumnInfo; +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +public class LanceIngestionTableHandle + implements ConnectorInsertTableHandle, ConnectorOutputTableHandle +{ + private final String schemaName; + private final String tableName; + private final List columns; + + @JsonCreator + public LanceIngestionTableHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName, + @JsonProperty("columns") List columns) + { + this.schemaName = requireNonNull(schemaName, "schemaName is null"); + this.tableName = requireNonNull(tableName, "tableName is null"); + this.columns = requireNonNull(columns, "columns is null"); + } + + @JsonProperty + public String getSchemaName() + { + return schemaName; + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + @JsonProperty + public List getColumns() + { + return columns; + } + + @Override + public int hashCode() + { + return Objects.hash(schemaName, tableName); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + + LanceIngestionTableHandle other = (LanceIngestionTableHandle) obj; + return Objects.equals(this.schemaName, other.schemaName) && + Objects.equals(this.tableName, other.tableName); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("schemaName", schemaName) + .add("tableName", tableName) + .toString(); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/ingestion/LancePageSink.java b/presto-lance/src/main/java/com/facebook/presto/lance/ingestion/LancePageSink.java new file mode 100644 index 0000000000000..d04cb1bcd1c6e --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/ingestion/LancePageSink.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.ingestion; + +import com.facebook.presto.common.Page; +import com.facebook.presto.lance.LanceConfig; +import com.facebook.presto.spi.ConnectorPageSink; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.lancedb.lance.FragmentMetadata; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedFuture; + +public class LancePageSink + implements ConnectorPageSink +{ + private final LanceConfig lanceConfig; + private final LanceIngestionTableHandle tableHandle; + private final LancePageWriter lancePageWriter; + private final Schema arrowSchema; + private final HashSet fragmentMetaSet; + + public LancePageSink( + LanceConfig lanceConfig, + LanceIngestionTableHandle tableHandle, + LancePageWriter lancePageWriter, Schema arrowSchema) + { + this.lanceConfig = requireNonNull(lanceConfig, "LanceConfig is null"); + this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); + this.lancePageWriter = requireNonNull(lancePageWriter, "pageWriter is null"); + this.arrowSchema = requireNonNull(arrowSchema, "arrowSchema is null"); + this.fragmentMetaSet = new HashSet<>(); + } + + @Override + public CompletableFuture appendPage(Page page) + { + fragmentMetaSet.add(lancePageWriter.append(page, tableHandle, arrowSchema)); + return NOT_BLOCKED; + } + + @Override + public CompletableFuture> finish() + { + ImmutableCollection.Builder fragmentSliceList = ImmutableList.builder(); + for (FragmentMetadata fragmentMetadata : fragmentMetaSet) { + fragmentSliceList.add(Slices.wrappedBuffer(fragmentMetadata.getJsonMetadata().getBytes())); + } + return completedFuture(fragmentSliceList.build()); + } + + @Override + public void abort() + { + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/ingestion/LancePageSinkProvider.java b/presto-lance/src/main/java/com/facebook/presto/lance/ingestion/LancePageSinkProvider.java new file mode 100644 index 0000000000000..d9ba33b8d06dd --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/ingestion/LancePageSinkProvider.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.ingestion; + +import com.facebook.presto.lance.LanceConfig; +import com.facebook.presto.lance.ingestion.LanceIngestionTableHandle; +import com.facebook.presto.lance.ingestion.LancePageSink; +import com.facebook.presto.lance.ingestion.LancePageWriter; +import com.facebook.presto.lance.metadata.LanceColumnInfo; +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorPageSink; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.PageSinkContext; +import com.facebook.presto.spi.connector.ConnectorPageSinkProvider; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.google.common.collect.ImmutableList; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +import javax.inject.Inject; + +import static java.util.Objects.requireNonNull; + +public class LancePageSinkProvider + implements ConnectorPageSinkProvider +{ + private final LanceConfig lanceConfig; + private final LancePageWriter lancePageWriter; + + @Inject + public LancePageSinkProvider( + LanceConfig lanceConfig, + LancePageWriter lancePageWriter) + { + this.lanceConfig = requireNonNull(lanceConfig, "lance config is null"); + this.lancePageWriter = requireNonNull(lancePageWriter, "page writer is null"); + } + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, PageSinkContext pageSinkContext) + { + LanceIngestionTableHandle tableHandle = (LanceIngestionTableHandle) outputTableHandle; + ImmutableList.Builder arrowFieldBuilder = ImmutableList.builder(); + for (LanceColumnInfo column : tableHandle.getColumns()) { + arrowFieldBuilder.add(Field.nullable(column.getColumnName(), column.getDataType().getArrowType())); + } + Schema arrowSchema = new Schema(arrowFieldBuilder.build()); + return new LancePageSink(lanceConfig, tableHandle, lancePageWriter, arrowSchema); + } + + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle insertTableHandle, PageSinkContext pageSinkContext) + { + LanceIngestionTableHandle tableHandle = (LanceIngestionTableHandle) insertTableHandle; + ImmutableList.Builder arrowFieldBuilder = ImmutableList.builder(); + for (LanceColumnInfo column : tableHandle.getColumns()) { + arrowFieldBuilder.add(Field.nullable(column.getColumnName(), column.getDataType().getArrowType())); + } + Schema arrowSchema = new Schema(arrowFieldBuilder.build()); + return new LancePageSink(lanceConfig, tableHandle, lancePageWriter, arrowSchema); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/ingestion/LancePageWriter.java b/presto-lance/src/main/java/com/facebook/presto/lance/ingestion/LancePageWriter.java new file mode 100644 index 0000000000000..c69bcc5ad4011 --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/ingestion/LancePageWriter.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.ingestion; + +import com.facebook.presto.common.Page; +import com.facebook.presto.common.block.Block; +import com.facebook.presto.lance.LanceConfig; +import com.facebook.presto.lance.client.LanceClient; +import com.facebook.presto.lance.metadata.LanceColumnInfo; +import com.fasterxml.jackson.core.JsonFactory; +import com.lancedb.lance.Fragment; +import com.lancedb.lance.FragmentMetadata; +import com.lancedb.lance.WriteParams; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Schema; + +import javax.inject.Inject; + +import java.util.Optional; + +import static com.facebook.presto.common.type.IntegerType.INTEGER; +import static com.facebook.presto.common.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public class LancePageWriter +{ + public static final JsonFactory JSON_FACTORY = new JsonFactory(); + + private final LanceConfig lanceConfig; + private final LanceClient lanceClient; + + @Inject + public LancePageWriter(LanceConfig lanceConfig, LanceClient lanceClient) + { + this.lanceConfig = requireNonNull(lanceConfig, "lanceConfig is null"); + this.lanceClient = requireNonNull(lanceClient, "lanceClient is null"); + } + + public FragmentMetadata append(Page page, LanceIngestionTableHandle tableHandle, Schema arrowSchema) + { + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, lanceClient.getArrowRootAllocator())) { + root.allocateNew(); //TODO: merge fragment + for (int position = 0; position < page.getPositionCount(); position++) { + for (int channel = 0; channel < page.getChannelCount(); channel++) { + LanceColumnInfo column = tableHandle.getColumns().get(channel); + Block block = page.getBlock(channel); + FieldVector vector = root.getVector(column.getColumnName()); //TODO: cache vector + switch (column.getDataType()) { + case INTEGER: + IntVector intVector = (IntVector) vector; + //TODO: we should avoid use setSafe which might triger reallocate + intVector.setSafe(position, (int) INTEGER.getLong(block, position)); + break; + case VARCHAR: + VarCharVector varcharVector = (VarCharVector) vector; + varcharVector.setSafe(position, VARCHAR.getSlice(block, position).getBytes()); + break; + default: + throw new IllegalArgumentException("unsupported type: " + column.getDataType()); + } + } + // writeFieldValue(jsonGen, column.getDataType(), block, position); + } + root.setRowCount(page.getPositionCount()); + return Fragment.create(lanceClient.getTablePath(tableHandle.getTableName()), + lanceClient.getArrowRootAllocator(), root, Optional.empty(), new WriteParams.Builder().build()); + } + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceColumnHandle.java b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceColumnHandle.java new file mode 100644 index 0000000000000..b8a35c0f3916b --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceColumnHandle.java @@ -0,0 +1,119 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.metadata; + +import com.facebook.presto.common.type.Type; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +public class LanceColumnHandle + implements ColumnHandle +{ + private final String columnName; + private final Type columnType; + private final LanceColumnHandleType type; + + public LanceColumnHandle( + VariableReferenceExpression variable, + LanceColumnHandleType type) + { + this(variable.getName(), variable.getType(), type); + } + + public LanceColumnHandle( + String columnName, + Type columnType) + { + this(columnName, columnType, LanceColumnHandleType.REGULAR); + } + + @JsonCreator + public LanceColumnHandle( + @JsonProperty("columnName") String columnName, + @JsonProperty("columnType") Type columnType, + @JsonProperty("type") LanceColumnHandleType type) + { + this.columnName = requireNonNull(columnName, "columnName is null"); + this.columnType = requireNonNull(columnType, "columnType is null"); + this.type = requireNonNull(type, "type is null"); + } + + @JsonProperty + public String getColumnName() + { + return columnName; + } + + @JsonProperty + public Type getColumnType() + { + return columnType; + } + + @JsonProperty + public LanceColumnHandleType getType() + { + return type; + } + + public ColumnMetadata getColumnMetadata() + { + return new ColumnMetadata(getColumnName(), getColumnType()); + } + + @Override + public int hashCode() + { + return Objects.hash(columnName); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + + LanceColumnHandle other = (LanceColumnHandle) obj; + return Objects.equals(this.columnName, other.columnName) && + Objects.equals(this.columnType, other.columnType); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("columnName", columnName) + .add("columnType", columnType) + .add("type", type) + .toString(); + } + + public enum LanceColumnHandleType + { + REGULAR, // refers to the column in table + DERIVED, // refers to a derived column that is created after a pushdown expression + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceColumnInfo.java b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceColumnInfo.java new file mode 100644 index 0000000000000..faff239685d81 --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceColumnInfo.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +public class LanceColumnInfo +{ + private final String columnName; + private final LanceColumnType dataType; + + @JsonCreator + public LanceColumnInfo( + @JsonProperty("COLUMN_NAME") String columnName, + @JsonProperty("DATA_TYPE") LanceColumnType dataType) + { + this.columnName = requireNonNull(columnName, "columnName is null"); + this.dataType = requireNonNull(dataType, "dataType is null"); + } + + @JsonProperty + public String getColumnName() + { + return columnName; + } + + @JsonProperty + public LanceColumnType getDataType() + { + return dataType; + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("columnName", columnName) + .add("dataType", dataType) + .toString(); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceColumnType.java b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceColumnType.java new file mode 100644 index 0000000000000..2f8cfc025d110 --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceColumnType.java @@ -0,0 +1,105 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.metadata; + +import com.facebook.presto.common.type.BigintType; +import com.facebook.presto.common.type.BooleanType; +import com.facebook.presto.common.type.DoubleType; +import com.facebook.presto.common.type.IntegerType; +import com.facebook.presto.common.type.RealType; +import com.facebook.presto.common.type.TimestampType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.VarcharType; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; + +import static java.util.Objects.requireNonNull; + +public enum LanceColumnType +{ + BIGINT(BigintType.BIGINT, new ArrowType.Int(64, true)), + INTEGER(IntegerType.INTEGER, new ArrowType.Int(32, true)), + DOUBLE(DoubleType.DOUBLE, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + FLOAT(RealType.REAL, new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), + VARCHAR(VarcharType.VARCHAR, new ArrowType.Utf8()), + TIMESTAMP(TimestampType.TIMESTAMP, new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")), + BOOLEAN(BooleanType.BOOLEAN, new ArrowType.Bool()), + OTHER(VarcharType.VARCHAR, new ArrowType.Utf8()); + + private final ArrowType arrowType; + private final Type prestoType; + + LanceColumnType(Type prestoType, ArrowType arrowType) + { + this.prestoType = requireNonNull(prestoType, "prestoType is null"); + this.arrowType = requireNonNull(arrowType, "arrowType is null"); + } + + public ArrowType getArrowType() + { + return arrowType; + } + + public Type getPrestoType() + { + return prestoType; + } + + public static LanceColumnType fromPrestoType(Type type) + { + if (type instanceof BigintType) { + return BIGINT; + } + if (type instanceof IntegerType) { + return INTEGER; + } + if (type instanceof DoubleType) { + return DOUBLE; + } + if (type instanceof RealType) { + return FLOAT; + } + if (type instanceof TimestampType) { + return TIMESTAMP; + } + if (type instanceof VarcharType) { + return VARCHAR; + } + return OTHER; + } + + public static LanceColumnType fromArrowType(ArrowType type) + { + if (type instanceof ArrowType.Bool) { + return BOOLEAN; + } + else if (type instanceof ArrowType.Int) { + ArrowType.Int intType = (ArrowType.Int) type; + if (intType.getBitWidth() == 32) { + return INTEGER; + } + else if (intType.getBitWidth() == 64) { + return BIGINT; + } + } + else if (type instanceof ArrowType.FloatingPoint) { + return DOUBLE; + } + else if (type instanceof ArrowType.Utf8) { + return VARCHAR; + } + return OTHER; + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceMetadata.java b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceMetadata.java new file mode 100644 index 0000000000000..ad4f25d336d1e --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceMetadata.java @@ -0,0 +1,243 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.metadata; + +import com.facebook.presto.lance.client.LanceClient; +import com.facebook.presto.lance.ingestion.LanceIngestionTableHandle; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorInsertTableHandle; +import com.facebook.presto.spi.ConnectorNewTableLayout; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayout; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.ConnectorTableLayoutResult; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.connector.ConnectorOutputMetadata; +import com.facebook.presto.spi.connector.ConnectorTableVersion; +import com.facebook.presto.spi.statistics.ComputedStatistics; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.lancedb.lance.FragmentMetadata; +import io.airlift.slice.Slice; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +import javax.inject.Inject; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Collections.singletonList; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; + +public class LanceMetadata + implements ConnectorMetadata +{ + public static final String LANCE_DEFAULT_SCHEMA = "default"; + private final LanceClient lanceClient; + + @Inject + public LanceMetadata(LanceClient client) + { + lanceClient = requireNonNull(client, "client is null"); + } + @Override + public boolean schemaExists(ConnectorSession session, String schemaName) + { + return LANCE_DEFAULT_SCHEMA.equals(schemaName); + } + + @Override + public List listSchemaNames(ConnectorSession session) + { + return ImmutableList.of(LANCE_DEFAULT_SCHEMA); + } + + @Override + public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) + { + return getTableHandle(session, tableName, Optional.empty()); + } + + @Override + public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional tableVersion) + { + try { + if(lanceClient.getConn().tableNames().contains(tableName.getTableName())) { + return new LanceTableHandle(tableName.getSchemaName(), tableName.getTableName()); + } + return null; + } + catch (Exception ex) { + return null; + } + } + + @Override + public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) + { + return new ConnectorTableLayout(handle); + } + + @Override + public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) + { + LanceTableHandle lanceTableHandle = (LanceTableHandle) table; + Schema arrowSchema = lanceClient.getSchema(((LanceTableHandle) table).getTableName()); + SchemaTableName schemaTableName = + new SchemaTableName(lanceTableHandle.getSchemaName(), lanceTableHandle.getTableName()); + ImmutableList.Builder columnsMetadataListBuilder = + ImmutableList.builderWithExpectedSize(arrowSchema.getFields().size()); + for (Field field : arrowSchema.getFields()) { + columnsMetadataListBuilder.add(new ColumnMetadata(field.getName(), + LanceColumnType.fromArrowType(field.getType()).getPrestoType())); + } + return new ConnectorTableMetadata(schemaTableName, columnsMetadataListBuilder.build()); + } + + @Override + public Optional getInfo(ConnectorTableLayoutHandle layoutHandle) + { + return ConnectorMetadata.super.getInfo(layoutHandle); + } + + @Override + public List listTables(ConnectorSession session, Optional schemaName) + { + return lanceClient.getConn().tableNames().stream() + .map(tableName -> new SchemaTableName(schemaName.orElse(LANCE_DEFAULT_SCHEMA), tableName)) + .collect(toList()); + } + + @Override + public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) + { + LanceTableHandle lanceTableHandle = (LanceTableHandle) tableHandle; + Schema arrowSchema = lanceClient.getSchema(lanceTableHandle.getTableName()); + ImmutableMap.Builder columnHandleMapBuilder = ImmutableMap.builder(); + for (Field field : arrowSchema.getFields()) { + LanceColumnHandle columnHandle = + new LanceColumnHandle(field.getName(), LanceColumnType.fromArrowType(field.getType()).getPrestoType()); + columnHandleMapBuilder.put(field.getName(), columnHandle); + } + return columnHandleMapBuilder.build(); + } + + @Override + public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) + { + LanceColumnHandle lanceColumnHandle = (LanceColumnHandle) columnHandle; + return new ColumnMetadata(lanceColumnHandle.getColumnName(), lanceColumnHandle.getColumnType()); + } + + @Override + public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) + { + List tables = prefix.getTableName() != null ? + singletonList(prefix.toSchemaTableName()) : listTables(session, Optional.ofNullable(prefix.getSchemaName())); + ImmutableMap.Builder> columns = ImmutableMap.builder(); + for (SchemaTableName tableName : tables) { + ConnectorTableMetadata tableMetadata = getTableMetadata(session, getTableHandle(session, tableName)); + if (tableMetadata != null) { + columns.put(tableName, tableMetadata.getColumns()); + } + } + return columns.build(); + } + + @Override + public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout) + { + ImmutableList.Builder arrowFieldBuilder = ImmutableList.builder(); + ImmutableList.Builder columnInfoBuilder = ImmutableList.builder(); + for (ColumnMetadata column : tableMetadata.getColumns()) { + LanceColumnType lanceColumnType = LanceColumnType.fromPrestoType(column.getType()); + arrowFieldBuilder.add(Field.nullable(column.getName(), lanceColumnType.getArrowType())); + columnInfoBuilder.add(new LanceColumnInfo(column.getName(), lanceColumnType)); + } + Schema schema = new Schema(arrowFieldBuilder.build()); + lanceClient.createTable(tableMetadata.getTable().getTableName(), schema); + return new LanceIngestionTableHandle( + tableMetadata.getTable().getSchemaName(), + tableMetadata.getTable().getTableName(), + columnInfoBuilder.build()); + } + + @Override + public Optional finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) + { + LanceIngestionTableHandle lanceTableHandle = (LanceIngestionTableHandle) tableHandle; + List fragmentMetadataList = fragments.stream() + .map(fragmentSlice -> FragmentMetadata.fromJson(new String(fragmentSlice.getBytes()))) + .collect(Collectors.toList()); + long tableReadVersion = lanceClient.getTableVersion(lanceTableHandle.getTableName()); + lanceClient.appendAndCommit(lanceTableHandle.getTableName(), fragmentMetadataList, tableReadVersion); + return Optional.empty(); + } + + @Override + public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle) + { + LanceTableHandle lanceTableHandle = (LanceTableHandle) tableHandle; + Schema arrowSchema = lanceClient.getSchema(lanceTableHandle.getTableName()); + ImmutableList.Builder columnsInfoListBuilder = + ImmutableList.builderWithExpectedSize(arrowSchema.getFields().size()); + for (Field field : arrowSchema.getFields()) { + columnsInfoListBuilder.add(new LanceColumnInfo(field.getName(), + LanceColumnType.fromArrowType(field.getType()))); + } + return new LanceIngestionTableHandle( + lanceTableHandle.getSchemaName(), + lanceTableHandle.getTableName(), + columnsInfoListBuilder.build()); + } + + @Override + public Optional finishInsert(ConnectorSession session, + ConnectorInsertTableHandle tableHandle, + Collection fragments, + Collection computedStatistics) + { + LanceIngestionTableHandle lanceTableHandle = (LanceIngestionTableHandle) tableHandle; + List fragmentMetadataList = fragments.stream() + .map(fragmentSlice -> FragmentMetadata.fromJson(new String(fragmentSlice.getBytes()))) + .collect(Collectors.toList()); + long tableReadVersion = lanceClient.getTableVersion(lanceTableHandle.getTableName()); + lanceClient.appendAndCommit(lanceTableHandle.getTableName(), fragmentMetadataList, tableReadVersion); + return Optional.empty(); + } + + @Override + public ConnectorTableLayoutResult getTableLayoutForConstraint(ConnectorSession session, + ConnectorTableHandle table, + Constraint constraint, + Optional> desiredColumns) + { + LanceTableHandle lanceTableHandle = (LanceTableHandle) table; + ConnectorTableLayout layout = new ConnectorTableLayout(new LanceTableLayoutHandle(lanceTableHandle, constraint.getSummary())); + return new ConnectorTableLayoutResult(layout, constraint.getSummary()); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceTableHandle.java b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceTableHandle.java new file mode 100644 index 0000000000000..fe6f8eff142a6 --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceTableHandle.java @@ -0,0 +1,81 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.metadata; + +import com.facebook.presto.spi.ConnectorTableHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +public class LanceTableHandle + implements ConnectorTableHandle +{ + private final String schemaName; + private final String tableName; + + @JsonCreator + public LanceTableHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName) + { + this.schemaName = requireNonNull(schemaName, "schemaName is null"); + this.tableName = requireNonNull(tableName, "tableName is null"); + } + + @JsonProperty + public String getSchemaName() + { + return schemaName; + } + + @JsonProperty + public String getTableName() + { + return tableName; + } + + @Override + public int hashCode() + { + return Objects.hash(schemaName, tableName); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + + LanceTableHandle other = (LanceTableHandle) obj; + return Objects.equals(this.schemaName, other.schemaName) && + Objects.equals(this.tableName, other.tableName); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("schemaName", schemaName) + .add("tableName", tableName) + .toString(); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceTableLayoutHandle.java b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceTableLayoutHandle.java new file mode 100644 index 0000000000000..38aab4a5a5303 --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceTableLayoutHandle.java @@ -0,0 +1,78 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.metadata; + +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class LanceTableLayoutHandle + implements ConnectorTableLayoutHandle +{ + private final LanceTableHandle table; + private final TupleDomain tupleDomain; + + @JsonCreator + public LanceTableLayoutHandle( + @JsonProperty("table") LanceTableHandle table, + @JsonProperty("tupleDomain") TupleDomain domain) + { + this.table = requireNonNull(table, "table is null"); + this.tupleDomain = requireNonNull(domain, "tupleDomain is null"); + } + + @JsonProperty + public LanceTableHandle getTable() + { + return table; + } + + @JsonProperty + public TupleDomain getTupleDomain() + { + return tupleDomain; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LanceTableLayoutHandle that = (LanceTableLayoutHandle) o; + return Objects.equals(table, that.table) && + Objects.equals(tupleDomain, that.tupleDomain); + } + + @Override + public int hashCode() + { + return Objects.hash(table, tupleDomain); + } + + @Override + public String toString() + { + return table.toString(); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceTransactionHandle.java b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceTransactionHandle.java new file mode 100644 index 0000000000000..ddcabed799106 --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/metadata/LanceTransactionHandle.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.metadata; + +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +public enum LanceTransactionHandle + implements ConnectorTransactionHandle +{ + INSTANCE +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/scan/ArrowVectorPageBuilder.java b/presto-lance/src/main/java/com/facebook/presto/lance/scan/ArrowVectorPageBuilder.java new file mode 100644 index 0000000000000..c50e1cf876342 --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/scan/ArrowVectorPageBuilder.java @@ -0,0 +1,71 @@ +package com.facebook.presto.lance.scan; + +import com.facebook.presto.common.block.BlockBuilder; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.lance.metadata.LanceColumnType; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; + +import static io.airlift.slice.Slices.wrappedBuffer; + +public class ArrowVectorPageBuilder +{ + private final Type columnType; + private final BlockBuilder blockBuilder; + private final FieldVector arrowVector; + + private final ColumnProcessor columnProcessor; + + interface ColumnProcessor { + void write(int index); + } + + private ArrowVectorPageBuilder(Type columnType, BlockBuilder blockBuilder, FieldVector arrowVector, ColumnProcessor columnProcessor) + { + this.columnType = columnType; + this.blockBuilder = blockBuilder; + this.arrowVector = arrowVector; + this.columnProcessor = columnProcessor; + } + + public static ArrowVectorPageBuilder create(Type columnType, BlockBuilder blockBuilder, FieldVector arrowVector) { + ColumnProcessor columnProcessor = createColumnProcessor(columnType, blockBuilder, arrowVector); + return new ArrowVectorPageBuilder(columnType, blockBuilder, arrowVector, columnProcessor); + } + + private static ColumnProcessor createColumnProcessor(Type columnType, BlockBuilder blockBuilder, FieldVector arrowVector) { + LanceColumnType lanceColumnType = LanceColumnType.fromPrestoType(columnType); + switch (lanceColumnType){ + case BIGINT: + return index -> columnType.writeLong(blockBuilder, ((BigIntVector) arrowVector).get(index)); + case INTEGER: + return index -> columnType.writeLong(blockBuilder, ((IntVector) arrowVector).get(index)); + case DOUBLE: + case FLOAT: + return index -> columnType.writeDouble(blockBuilder, ((Float8Vector) arrowVector).get(index)); + case VARCHAR: + return index -> columnType.writeSlice(blockBuilder, wrappedBuffer(((VarCharVector) arrowVector).get(index))); + case BOOLEAN: + return index -> columnType.writeBoolean(blockBuilder, ((BitVector) arrowVector).get(index) == 1); + case TIMESTAMP: + case OTHER: + default: + throw new RuntimeException("unsupported type: " + lanceColumnType); + } + } + public void build() { + int valueCount = arrowVector.getValueCount(); + for (int index = 0; index < valueCount; index++) { + if (arrowVector.isNull(index)) { + blockBuilder.appendNull(); + } + else { + columnProcessor.write(index); + } + } + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/scan/LanceFragmentPageSource.java b/presto-lance/src/main/java/com/facebook/presto/lance/scan/LanceFragmentPageSource.java new file mode 100644 index 0000000000000..31288376b0cab --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/scan/LanceFragmentPageSource.java @@ -0,0 +1,190 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.scan; + +import com.facebook.presto.common.Page; +import com.facebook.presto.common.PageBuilder; +import com.facebook.presto.common.block.BlockBuilder; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.lance.client.LanceClient; +import com.facebook.presto.lance.fragments.FragmentInfo; +import com.facebook.presto.lance.metadata.LanceColumnHandle; +import com.facebook.presto.lance.metadata.LanceTableHandle; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPageSource; +import com.google.common.collect.ImmutableList; +import com.lancedb.lance.Dataset; +import com.lancedb.lance.DatasetFragment; +import com.lancedb.lance.ipc.LanceScanner; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +public class LanceFragmentPageSource + implements ConnectorPageSource +{ + private final LanceClient lanceClient; + private final List fragmentInfos; + private final List columns; + private final Dataset dataset; + private final List fragments; + private final PageBuilder pageBuilder; + private LanceScanner scanner; + private int fragmentIndex = 0; + private ArrowReader arrowReader; + private boolean isFinished; + private int completedPositions; + private long completedBytes; + + public LanceFragmentPageSource(LanceClient lanceClient, List fragmentInfos, LanceTableHandle table, List columns) + { + this.lanceClient = requireNonNull(lanceClient, "lanceClient is null"); + this.fragmentInfos = requireNonNull(fragmentInfos, "fragmentInfos is null"); + this.columns = requireNonNull(columns, "columns is null"); + dataset = lanceClient.open(table.getTableName()); + List allFragments = dataset.getFragments(); + this.fragments = allFragments.stream().map(fragmentInfo -> allFragments.get(fragmentInfo.getId())).collect(Collectors.toList()); + this.pageBuilder = new PageBuilder(columns.stream() + .map(column -> ((LanceColumnHandle) column).getColumnType()) + .collect(toImmutableList())); + } + + @Override + public long getCompletedBytes() + { + return completedBytes; + } + + @Override + public long getCompletedPositions() + { + return completedPositions; + } + + @Override + public long getReadTimeNanos() + { + return 0; + } + + @Override + public boolean isFinished() + { + return isFinished; + } + + @Override + public Page getNextPage() + { + if (!hasNextBatch()) { + return buildPage(); //there might be data remaining in the buffer of page builder, so we build and reset + } + try { + VectorSchemaRoot vectorSchemaRoot = arrowReader.getVectorSchemaRoot(); + List fieldVectors = vectorSchemaRoot.getFieldVectors(); + for (int column = 0; column < columns.size(); column++) { + LanceColumnHandle lanceColumn = ((LanceColumnHandle) columns.get(column)); + BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(column); + Type columnType = lanceColumn.getColumnType(); + FieldVector arrowVector = vectorSchemaRoot.getVector(lanceColumn.getColumnName()); + ArrowVectorPageBuilder.create(columnType, blockBuilder, arrowVector).build(); + } + vectorSchemaRoot.clear(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + pageBuilder.declarePosition(); + return buildPage(); + } + + private Page buildPage() + { + if (pageBuilder.isEmpty()) { + return null; + } + Page page = pageBuilder.build(); + completedPositions += page.getPositionCount(); + completedBytes += page.getSizeInBytes(); + pageBuilder.reset(); + return page; + } + + private boolean hasNextBatch() + { + if (isFinished) { + return false; + } + if (scanner == null) { + scanner = fragments.get(fragmentIndex).newScan(); + } + if (arrowReader == null) { + arrowReader = scanner.scanBatches(); + } + try { + if (arrowReader.loadNextBatch()) { + return true; + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + //no next batch, close resource and go to next fragments + closeResources(); + if (fragmentIndex + 1 < fragments.size()) { + fragmentIndex++; + return hasNextBatch(); + } + isFinished = true; + return false; + } + + private void closeResources() + { + try { + arrowReader.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + arrowReader = null; + try { + scanner.close(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + scanner = null; + } + + @Override + public long getSystemMemoryUsage() + { + return 0; + } + + @Override + public void close() + throws IOException + { + dataset.close(); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/scan/LancePageSourceProvider.java b/presto-lance/src/main/java/com/facebook/presto/lance/scan/LancePageSourceProvider.java new file mode 100644 index 0000000000000..445e7c5cca513 --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/scan/LancePageSourceProvider.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.scan; + +import com.facebook.presto.lance.client.LanceClient; +import com.facebook.presto.lance.metadata.LanceTableLayoutHandle; +import com.facebook.presto.lance.splits.LanceSplit; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.SplitContext; +import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +import javax.inject.Inject; + +import java.util.List; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +public class LancePageSourceProvider + implements ConnectorPageSourceProvider +{ + private final LanceClient lanceClient; + + @Inject + public LancePageSourceProvider(LanceClient lanceClient) + { + this.lanceClient = requireNonNull(lanceClient, "lanceClient is null"); + } + @Override + public ConnectorPageSource createPageSource( + ConnectorTransactionHandle transactionHandle, + ConnectorSession session, + ConnectorSplit split, + ConnectorTableLayoutHandle layout, + List columns, + SplitContext splitContext) + { + LanceSplit lanceSplit = (LanceSplit) split; + checkState(lanceSplit.getFragments().isPresent()); + LanceTableLayoutHandle lanceTableLayout = (LanceTableLayoutHandle) layout; + return new LanceFragmentPageSource(lanceClient, + lanceSplit.getFragments().get(), + lanceTableLayout.getTable(), + columns); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/splits/LanceSplit.java b/presto-lance/src/main/java/com/facebook/presto/lance/splits/LanceSplit.java new file mode 100644 index 0000000000000..9e1b4e780d5ca --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/splits/LanceSplit.java @@ -0,0 +1,106 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.splits; + +import com.facebook.presto.lance.fragments.FragmentInfo; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.NodeProvider; +import com.facebook.presto.spi.schedule.NodeSelectionStrategy; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Optional; + +import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE; +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public class LanceSplit + implements ConnectorSplit +{ + private final SplitType splitType; + private final Optional> fragments; + + @JsonCreator + public LanceSplit( + @JsonProperty("splitType") SplitType splitType, + @JsonProperty("fragments") Optional> fragments) + { + this.splitType = requireNonNull(splitType, "splitType id is null"); + this.fragments = requireNonNull(fragments, "fragments is null"); + } + + public static LanceSplit createBrokerSplit() + { + return new LanceSplit( + SplitType.BROKER, + Optional.empty()); + } + + public static LanceSplit createFragmentSplit(List fragments) + { + return new LanceSplit( + SplitType.FRAGMENT, + Optional.of(fragments)); + } + + @JsonProperty + public SplitType getSplitType() + { + return splitType; + } + + @JsonProperty + public Optional> getFragments() + { + return fragments; + } + + @Override + public NodeSelectionStrategy getNodeSelectionStrategy() + { + return NO_PREFERENCE; + } + + @Override + public List getPreferredNodes(NodeProvider nodeProvider) + { + return ImmutableList.of(); + } + + @Override + public Object getInfo() + { + return this; + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("splitType", splitType) + .add("fragments", fragments) + .toString(); + } + + public enum SplitType + { + FRAGMENT, + BROKER, + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/splits/LanceSplitManager.java b/presto-lance/src/main/java/com/facebook/presto/lance/splits/LanceSplitManager.java new file mode 100644 index 0000000000000..c3d340532880f --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/splits/LanceSplitManager.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.splits; + +import com.facebook.presto.lance.metadata.LanceTableLayoutHandle; +import com.facebook.presto.lance.client.LanceClient; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +import javax.inject.Inject; + +import static java.util.Objects.requireNonNull; + +public class LanceSplitManager + implements ConnectorSplitManager +{ + private final LanceClient lanceClient; + + @Inject + public LanceSplitManager(LanceClient lanceClient) { + this.lanceClient = requireNonNull(lanceClient, "lanceClient is null"); + } + @Override + public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, + ConnectorSession session, + ConnectorTableLayoutHandle layout, + SplitSchedulingContext splitSchedulingContext) + { + LanceTableLayoutHandle layoutHandle = (LanceTableLayoutHandle) layout; + return new LanceSplitSource(lanceClient, layoutHandle.getTable()); + } +} diff --git a/presto-lance/src/main/java/com/facebook/presto/lance/splits/LanceSplitSource.java b/presto-lance/src/main/java/com/facebook/presto/lance/splits/LanceSplitSource.java new file mode 100644 index 0000000000000..027c123fd11eb --- /dev/null +++ b/presto-lance/src/main/java/com/facebook/presto/lance/splits/LanceSplitSource.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance.splits; + +import com.facebook.presto.lance.metadata.LanceTableHandle; +import com.facebook.presto.lance.client.LanceClient; +import com.facebook.presto.lance.fragments.FragmentInfo; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.connector.ConnectorPartitionHandle; +import com.google.common.collect.ImmutableList; +import com.lancedb.lance.DatasetFragment; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedFuture; + +public class LanceSplitSource implements ConnectorSplitSource +{ + private final LanceClient lanceClient; + private final LanceTableHandle lanceTable; + private boolean isFinished; + + LanceSplitSource(LanceClient lanceClient, LanceTableHandle lanceTable) { + this.lanceClient = requireNonNull(lanceClient, "lanceClient is null"); + this.lanceTable = requireNonNull(lanceTable, "lanceTable is null"); + } + @Override + public CompletableFuture getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize) + { + ImmutableList.Builder splitListBuilder = ImmutableList.builder(); + List fragments = lanceClient.getFragments(lanceTable.getTableName()); + for (DatasetFragment fragment : fragments) { + FragmentInfo fragmentInfo = new FragmentInfo(fragment.getId()); + splitListBuilder.add(LanceSplit.createFragmentSplit(ImmutableList.of(fragmentInfo))); + } + isFinished = true; //TODO: streaming the splits + return completedFuture(new ConnectorSplitBatch(splitListBuilder.build(), isFinished())); + } + + @Override + public void close() + { + + } + + @Override + public boolean isFinished() + { + return isFinished; + } +} diff --git a/presto-lance/src/test/java/com/facebook/presto/lance/LanceQueryRunner.java b/presto-lance/src/test/java/com/facebook/presto/lance/LanceQueryRunner.java new file mode 100644 index 0000000000000..e4286d2054e87 --- /dev/null +++ b/presto-lance/src/test/java/com/facebook/presto/lance/LanceQueryRunner.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.lance; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.Session; +import com.facebook.presto.tests.DistributedQueryRunner; +import com.google.common.collect.ImmutableMap; + +import java.util.HashMap; +import java.util.Map; + +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static java.lang.String.format; + +public class LanceQueryRunner +{ + private static final Logger log = Logger.get(LanceQueryRunner.class); + private static final String DEFAULT_SOURCE = "test"; + private static final String DEFAULT_CATALOG = "lance"; + private static final String DEFAULT_SCHEMA = "lance"; + + private LanceQueryRunner() + { + } + + public static DistributedQueryRunner createLanceQueryRunner(Map connectorProperties) + throws Exception + { + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(createSession()) + .setExtraProperties(ImmutableMap.of("http-server.http.port", "8080")).build(); + try { + queryRunner.installPlugin(new LancePlugin()); + connectorProperties = new HashMap<>(ImmutableMap.copyOf(connectorProperties)); + connectorProperties.putIfAbsent("lance.root-url", "/tmp"); + + queryRunner.createCatalog(DEFAULT_CATALOG, "lance", connectorProperties); + return queryRunner; + } + catch (Exception e) { + queryRunner.close(); + throw e; + } + } + + public static Session createSession() + { + return testSessionBuilder() + .setSource(DEFAULT_SOURCE) + .setCatalog(DEFAULT_CATALOG) + .setSchema(DEFAULT_SCHEMA) + .build(); + } + + public static void main(String[] args) + throws Exception + { + DistributedQueryRunner queryRunner = createLanceQueryRunner(ImmutableMap.of()); + log.info(format("Presto server started: %s", queryRunner.getCoordinator().getBaseUrl())); + } +}