Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.exceptions.NotImplementedException;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -323,6 +324,8 @@ private static List<Pair<Table, TableSnapshot>> toTableAndSnapshots(
snapshot = optional.get();
}
} catch (Catalog.TableNotExistException ignored) {
} catch (NotImplementedException ignored) {
// does not support supportsVersionManagement for external paimon table
}
}
tableAndSnapshots.add(Pair.of(table, snapshot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.PagedList;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TableType;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
Expand All @@ -33,6 +34,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.function.Function;
import org.apache.paimon.function.FunctionChange;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
Expand All @@ -48,6 +50,7 @@
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
Expand All @@ -71,12 +74,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.BRANCH;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
Expand Down Expand Up @@ -443,7 +448,8 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
checkNotSystemTable(identifier, "createTable");
validateCreateTable(schema);
createExternalTablePathIfNotExist(schema);
api.createTable(identifier, schema);
Schema newSchema = inferSchemaIfExternalPaimonTable(schema);
api.createTable(identifier, newSchema);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(identifier);
Expand Down Expand Up @@ -998,4 +1004,30 @@ private void createExternalTablePathIfNotExist(Schema schema) throws IOException
}
}
}

private Schema inferSchemaIfExternalPaimonTable(Schema schema) throws Exception {
TableType tableType = Options.fromMap(schema.options()).get(TYPE);
String externalLocation = schema.options().get(PATH.key());

if (TableType.TABLE.equals(tableType) && Objects.nonNull(externalLocation)) {
Path externalPath = new Path(externalLocation);
SchemaManager schemaManager =
new SchemaManager(fileIOFromOptions(externalPath), externalPath);
Optional<TableSchema> latest = schemaManager.latest();
if (latest.isPresent()) {
// Note we just validate schema here, will not create a new table
schemaManager.createTable(schema, true);
Schema existsSchema = latest.get().toSchema();
// use `owner` and `path` from the user provide schema
if (Objects.nonNull(schema.options().get(Catalog.OWNER_PROP))) {
existsSchema
.options()
.put(Catalog.OWNER_PROP, schema.options().get(Catalog.OWNER_PROP));
}
existsSchema.options().put(PATH.key(), schema.options().get(PATH.key()));
return existsSchema;
}
}
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,19 @@ private MockResponse getDataTokenHandle(Identifier tableIdentifier) throws Excep
}

private MockResponse snapshotHandle(Identifier identifier) throws Exception {
if (!tableMetadataStore.containsKey(identifier.getFullName())) {
throw new Catalog.TableNotExistException(identifier);
}
TableMetadata tableMetadata = tableMetadataStore.get(identifier.getFullName());
if (tableMetadata.isExternal()) {
ErrorResponse response =
new ErrorResponse(
ErrorResponse.RESOURCE_TYPE_TABLE,
identifier.getFullName(),
"external paimon table does not support get table snapshot in rest server",
501);
return mockResponse(response, 404);
}
RESTResponse response;
Optional<TableSnapshot> snapshotOptional =
Optional.ofNullable(tableLatestSnapshotStore.get(identifier.getFullName()));
Expand Down Expand Up @@ -714,6 +727,7 @@ private MockResponse listSnapshots(Identifier identifier) throws Exception {
}

private MockResponse loadSnapshot(Identifier identifier, String version) throws Exception {

FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
SnapshotManager snapshotManager = table.snapshotManager();
Snapshot snapshot = null;
Expand Down Expand Up @@ -1279,13 +1293,16 @@ private MockResponse tablesHandle(
tableMetadata = createObjectTable(identifier, schema);
} else {
catalog.createTable(identifier, schema, false);
boolean isExternal =
schema.options() != null
&& schema.options().containsKey(PATH.key());
tableMetadata =
createTableMetadata(
requestBody.getIdentifier(),
0L,
requestBody.getSchema(),
UUID.randomUUID().toString(),
false);
isExternal);
}
tableMetadataStore.put(
requestBody.getIdentifier().getFullName(), tableMetadata);
Expand Down Expand Up @@ -1510,10 +1527,16 @@ private MockResponse tableHandle(String method, String data, Identifier identifi
alterTableImpl(identifier, requestBody.getChanges());
return new MockResponse().setResponseCode(200);
case "DELETE":
try {
catalog.dropTable(identifier, false);
} catch (Exception e) {
System.out.println(e.getMessage());
if (!tableMetadataStore.containsKey(identifier.getFullName())) {
return new MockResponse().setResponseCode(404);
}
tableMetadata = tableMetadataStore.get(identifier.getFullName());
if (!tableMetadata.isExternal()) {
try {
catalog.dropTable(identifier, false);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
tableMetadataStore.remove(identifier.getFullName());
tableLatestSnapshotStore.remove(identifier.getFullName());
Expand All @@ -1532,7 +1555,7 @@ private MockResponse renameTableHandle(String data) throws Exception {
throw new Catalog.TableNoPermissionException(fromTable);
} else if (tableMetadataStore.containsKey(fromTable.getFullName())) {
TableMetadata tableMetadata = tableMetadataStore.get(fromTable.getFullName());
if (!isFormatTable(tableMetadata.schema().toSchema())) {
if (!isFormatTable(tableMetadata.schema().toSchema()) && !tableMetadata.isExternal()) {
catalog.renameTable(requestBody.getSource(), requestBody.getDestination(), false);
}
if (tableMetadataStore.containsKey(toTable.getFullName())) {
Expand Down Expand Up @@ -2066,6 +2089,17 @@ private MockResponse commitSnapshot(
Snapshot snapshot,
List<PartitionStatistics> statistics)
throws Catalog.TableNotExistException {
if (!tableMetadataStore.containsKey(identifier.getFullName())) {
throw new Catalog.TableNotExistException(identifier);
}
boolean isExternal = tableMetadataStore.get(identifier.getFullName()).isExternal();
if (isExternal) {
new ErrorResponse(
ErrorResponse.RESOURCE_TYPE_TABLE,
identifier.getFullName(),
"external paimon table does not support commit in rest server",
501);
}
FileStoreTable table = getFileTable(identifier);
if (!tableId.equals(table.catalogEnvironment().uuid())) {
throw new Catalog.TableNotExistException(identifier);
Expand Down Expand Up @@ -2223,7 +2257,10 @@ private MockResponse mockResponse(RESTResponse response, int httpCode) {
private TableMetadata createTableMetadata(
Identifier identifier, long schemaId, Schema schema, String uuid, boolean isExternal) {
Map<String, String> options = new HashMap<>(schema.options());
Path path = catalog.getTableLocation(identifier);
Path path =
isExternal && Objects.nonNull(schema.options().get(PATH.key()))
? new Path(schema.options().get(PATH.key()))
: catalog.getTableLocation(identifier);
String restPath = path.toString();
if (this.configResponse
.getDefaults()
Expand Down Expand Up @@ -2261,27 +2298,22 @@ private TableMetadata createObjectTable(Identifier identifier, Schema schema) {
return createTableMetadata(identifier, 1L, newSchema, UUID.randomUUID().toString(), false);
}

private FileStoreTable getFileTable(Identifier identifier)
throws Catalog.TableNotExistException {
if (tableMetadataStore.containsKey(identifier.getFullName())) {
TableMetadata tableMetadata = tableMetadataStore.get(identifier.getFullName());
TableSchema schema = tableMetadata.schema();
CatalogEnvironment catalogEnv =
new CatalogEnvironment(
identifier,
tableMetadata.uuid(),
catalog.catalogLoader(),
catalog.lockFactory().orElse(null),
catalog.lockContext().orElse(null),
catalogContext,
false);
Path path = new Path(schema.options().get(PATH.key()));
FileIO dataFileIO = catalog.fileIO();
FileStoreTable table =
FileStoreTableFactory.create(dataFileIO, path, schema, catalogEnv);
return table;
}
throw new Catalog.TableNotExistException(identifier);
private FileStoreTable getFileTable(Identifier identifier) {
TableMetadata tableMetadata = tableMetadataStore.get(identifier.getFullName());
TableSchema schema = tableMetadata.schema();
CatalogEnvironment catalogEnv =
new CatalogEnvironment(
identifier,
tableMetadata.uuid(),
catalog.catalogLoader(),
catalog.lockFactory().orElse(null),
catalog.lockContext().orElse(null),
catalogContext,
false);
Path path = new Path(schema.options().get(PATH.key()));
FileIO dataFileIO = catalog.fileIO();
FileStoreTable table = FileStoreTableFactory.create(dataFileIO, path, schema, catalogEnv);
return table;
}

private static int getMaxResults(Map<String, String> parameters) {
Expand Down
Loading
Loading