Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProvider;
import io.trino.plugin.iceberg.functions.tables.IcebergTablesFunctionProvider;
import io.trino.plugin.iceberg.procedure.AddFilesTableFromTableProcedure;
import io.trino.plugin.iceberg.procedure.AddFilesTableProcedure;
import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure;
Expand Down Expand Up @@ -138,9 +137,7 @@ public void configure(Binder binder)
tableProcedures.addBinding().toProvider(AddFilesTableProcedure.class).in(Scopes.SINGLETON);
tableProcedures.addBinding().toProvider(AddFilesTableFromTableProcedure.class).in(Scopes.SINGLETON);

Multibinder<ConnectorTableFunction> tableFunctions = newSetBinder(binder, ConnectorTableFunction.class);
tableFunctions.addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON);
tableFunctions.addBinding().toProvider(IcebergTablesFunctionProvider.class).in(Scopes.SINGLETON);
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON);
binder.bind(FunctionProvider.class).to(IcebergFunctionProvider.class).in(Scopes.SINGLETON);
binder.bind(TableChangesFunctionProcessorProviderFactory.class).in(Scopes.SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesSplitSource;
import io.trino.plugin.iceberg.functions.tables.IcebergTablesFunction.IcebergTables;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
Expand Down Expand Up @@ -158,9 +157,6 @@ public ConnectorSplitSource getSplits(
.toSnapshot(functionHandle.endSnapshotId()));
return new ClassLoaderSafeConnectorSplitSource(tableChangesSplitSource, IcebergSplitManager.class.getClassLoader());
}
if (function instanceof IcebergTables icebergTables) {
return new ClassLoaderSafeConnectorSplitSource(new FixedSplitSource(icebergTables), IcebergSplitManager.class.getClassLoader());
}

throw new IllegalStateException("Unknown table function: " + function);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,12 @@
package io.trino.plugin.iceberg.functions;

import com.google.inject.Inject;
import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionProcessorProvider;
import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionProcessorProviderFactory;
import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionSplitProcessor;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory;
import io.trino.plugin.iceberg.functions.tables.IcebergTablesFunction;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.function.FunctionProvider;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.function.table.TableFunctionProcessorProvider;
import io.trino.spi.function.table.TableFunctionProcessorProviderFactory;
import io.trino.spi.function.table.TableFunctionSplitProcessor;

import static java.util.Objects.requireNonNull;

Expand All @@ -48,28 +40,6 @@ public TableFunctionProcessorProviderFactory getTableFunctionProcessorProviderFa
if (functionHandle instanceof TableChangesFunctionHandle) {
return new ClassLoaderSafeTableFunctionProcessorProviderFactory(tableChangesFunctionProcessorProviderFactory, getClass().getClassLoader());
}
if (functionHandle instanceof IcebergTablesFunction.IcebergTables) {
ClassLoader classLoader = getClass().getClassLoader();
return new TableFunctionProcessorProviderFactory()
{
@Override
public TableFunctionProcessorProvider createTableFunctionProcessorProvider()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return new ClassLoaderSafeTableFunctionProcessorProvider(new TableFunctionProcessorProvider()
{
@Override
public TableFunctionSplitProcessor getSplitProcessor(ConnectorSession session, ConnectorTableFunctionHandle handle, ConnectorSplit split)
{
return new ClassLoaderSafeTableFunctionSplitProcessor(
new IcebergTablesFunction.IcebergTablesProcessor(((IcebergTablesFunction.IcebergTables) split).tables()),
getClass().getClassLoader());
}
}, classLoader);
}
}
};
}

throw new UnsupportedOperationException("Unsupported function: " + functionHandle);
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.testing.BaseConnectorSmokeTest;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.sql.TestTable;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -71,7 +70,6 @@
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

Expand Down Expand Up @@ -838,56 +836,6 @@ public void testCreateOrReplaceWithTableChangesFunction()
}
}

@Test
public void testIcebergTablesFunction()
throws Exception
{
String schemaName = getSession().getSchema().orElseThrow();
String firstSchema = "first_schema_" + randomNameSuffix();
String secondSchema = "second_schema_" + randomNameSuffix();
String firstSchemaLocation = schemaPath().replaceAll(schemaName, firstSchema);
String secondSchemaLocation = schemaPath().replaceAll(schemaName, secondSchema);
assertQuerySucceeds("CREATE SCHEMA " + firstSchema + " WITH (location = '%s')".formatted(firstSchemaLocation));
assertQuerySucceeds("CREATE SCHEMA " + secondSchema + " WITH (location = '%s')".formatted(secondSchemaLocation));
QueryRunner queryRunner = getQueryRunner();
Session firstSchemaSession = Session.builder(queryRunner.getDefaultSession()).setSchema(firstSchema).build();
Session secondSchemaSession = Session.builder(queryRunner.getDefaultSession()).setSchema(secondSchema).build();

try (TestTable _ = new TestTable(
sql -> getQueryRunner().execute(firstSchemaSession, sql),
"first_schema_table1_",
"(id int)");
TestTable _ = new TestTable(
sql -> getQueryRunner().execute(firstSchemaSession, sql),
"first_schema_table2_",
"(id int)");
TestTable secondSchemaTable = new TestTable(
sql -> queryRunner.execute(secondSchemaSession, sql),
"second_schema_table_",
"(id int)");
AutoCloseable _ = createAdditionalTables(firstSchema)) {
String firstSchemaTablesValues = "VALUES " + getQueryRunner()
.execute("SELECT table_schema, table_name FROM iceberg.information_schema.tables WHERE table_schema='%s'".formatted(firstSchema))
.getMaterializedRows().stream()
.map(row -> "('%s', '%s')".formatted(row.getField(0), row.getField(1)))
.collect(joining(", "));
String bothSchemasTablesValues = firstSchemaTablesValues + ", ('%s', '%s')".formatted(secondSchema, secondSchemaTable.getName());
assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables(SCHEMA_NAME => '%s'))".formatted(firstSchema), firstSchemaTablesValues);
assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables(null)) WHERE table_schema = '%s'".formatted(firstSchema), firstSchemaTablesValues);
assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables()) WHERE table_schema in ('%s', '%s')".formatted(firstSchema, secondSchema), bothSchemasTablesValues);
assertQuery("SELECT * FROM TABLE(iceberg.system.iceberg_tables(null)) WHERE table_schema in ('%s', '%s')".formatted(firstSchema, secondSchema), bothSchemasTablesValues);
}
finally {
assertQuerySucceeds("DROP SCHEMA " + firstSchema);
assertQuerySucceeds("DROP SCHEMA " + secondSchema);
}
}

protected AutoCloseable createAdditionalTables(String schema)
{
return () -> {};
}

@Test
public void testMetadataDeleteAfterCommitEnabled()
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,10 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.minio.messages.Event;
import io.trino.Session;
import io.trino.metastore.Column;
import io.trino.metastore.HiveMetastore;
import io.trino.metastore.HiveType;
import io.trino.metastore.Table;
import io.trino.plugin.hive.containers.Hive3MinioDataLake;
import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore;
import io.trino.testing.QueryRunner;
Expand All @@ -32,25 +28,18 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.metastore.PrincipalPrivileges.NO_PRIVILEGES;
import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE;
import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder;
import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore;
import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.ICEBERG_METASTORE_STORAGE_FORMAT;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY;
import static io.trino.testing.containers.Minio.MINIO_REGION;
import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;

Expand Down Expand Up @@ -245,25 +234,6 @@ public void testPathContainsSpecialCharacter()
assertUpdate("DROP TABLE " + tableName);
}

@Override
protected AutoCloseable createAdditionalTables(String schema)
{
HiveMetastore metastore = getHiveMetastore(getQueryRunner());
// simulate iceberg table created by spark with lowercase table type
Table lowerCaseTableType = io.trino.metastore.Table.builder()
.setDatabaseName(schema)
.setTableName("lowercase_type_" + randomNameSuffix())
.setOwner(Optional.empty())
.setDataColumns(ImmutableList.of(new Column("id", HiveType.HIVE_STRING, Optional.empty(), ImmutableMap.of())))
.setTableType(EXTERNAL_TABLE.name())
.withStorage(storage -> storage.setStorageFormat(ICEBERG_METASTORE_STORAGE_FORMAT))
.setParameter("EXTERNAL", "TRUE")
.setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toLowerCase(ENGLISH))
.build();
metastore.createTable(lowerCaseTableType, NO_PRIVILEGES);
return () -> metastore.dropTable(lowerCaseTableType.getDatabaseName(), lowerCaseTableType.getTableName(), true);
}

private String onMetastore(@Language("SQL") String sql)
{
return hiveMinioDataLake.getHiveHadoop().runOnMetastore(sql);
Expand Down
Loading
Loading