Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,26 @@
package io.trino.plugin.hive.containers;

import com.google.common.collect.ImmutableMap;
import com.google.common.reflect.ClassPath;
import io.trino.testing.containers.Minio;
import io.trino.testing.minio.MinioClient;
import io.trino.util.AutoCloseableCloser;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.testcontainers.containers.Network;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.testing.containers.TestContainers.getPathFromClassPathResource;
import static java.time.temporal.ChronoUnit.MINUTES;
import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.Objects.requireNonNull;
import static java.util.regex.Matcher.quoteReplacement;
import static org.testcontainers.containers.Network.newNetwork;

public class HiveMinioDataLake
implements AutoCloseable
{
public static final String MINIO_ACCESS_KEY = "accesskey";
public static final String MINIO_SECRET_KEY = "secretkey";
@Deprecated
public static final String MINIO_ACCESS_KEY = Minio.MINIO_ACCESS_KEY;
@Deprecated
public static final String MINIO_SECRET_KEY = Minio.MINIO_SECRET_KEY;

private final String bucketName;
private final Minio minio;
Expand Down Expand Up @@ -88,7 +80,8 @@ public void start()
state = State.STARTING;
minio.start();
hiveHadoop.start();
minioClient = initMinioClient();
minioClient = closer.register(minio.createMinioClient());
minio.createBucket(bucketName);
state = State.STARTED;
}

Expand All @@ -107,23 +100,12 @@ public MinioClient getMinioClient()

public void copyResources(String resourcePath, String target)
{
try {
for (ClassPath.ResourceInfo resourceInfo : ClassPath.from(MinioClient.class.getClassLoader())
.getResources()) {
if (resourceInfo.getResourceName().startsWith(resourcePath)) {
String fileName = resourceInfo.getResourceName().replaceFirst("^" + Pattern.quote(resourcePath), quoteReplacement(target));
writeFile(resourceInfo.asByteSource().read(), fileName);
}
}
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
minio.copyResources(resourcePath, bucketName, target);
}

public void writeFile(byte[] contents, String target)
{
getMinioClient().putObject(getBucketName(), contents, target);
minio.writeFile(contents, bucketName, target);
}

public List<String> listFiles(String targetDirectory)
Expand All @@ -146,9 +128,10 @@ public String getBucketName()
return bucketName;
}

@Deprecated
public String getMinioAddress()
{
return "http://" + getMinio().getMinioApiEndpoint();
return getMinio().getMinioAddress();
}

@Override
Expand All @@ -158,22 +141,6 @@ public void close()
stop();
}

private MinioClient initMinioClient()
{
MinioClient minioClient = new MinioClient(getMinioAddress(), MINIO_ACCESS_KEY, MINIO_SECRET_KEY);
closer.register(minioClient);

// use retry loop for minioClient.makeBucket as minio container tends to return "Server not initialized, please try again" error
// for some time after starting up
RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
.withMaxDuration(Duration.of(2, MINUTES))
.withMaxAttempts(Integer.MAX_VALUE) // limited by MaxDuration
.withDelay(Duration.of(10, SECONDS));
Failsafe.with(retryPolicy).run(() -> minioClient.makeBucket(bucketName));

return minioClient;
}

private enum State
{
INITIAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure;
import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure;
import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure;
import io.trino.plugin.iceberg.procedure.RegisterTableProcedure;
import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPageSinkProvider;
Expand Down Expand Up @@ -80,6 +81,7 @@ public void configure(Binder binder)

Multibinder<Procedure> procedures = newSetBinder(binder, Procedure.class);
procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON);

Multibinder<TableProcedureMetadata> tableProcedures = newSetBinder(binder, TableProcedureMetadata.class);
tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceUtf8;
import io.airlift.slice.Slices;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
Expand Down Expand Up @@ -53,14 +57,18 @@
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;

import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.math.BigDecimal;
import java.math.BigInteger;
Expand All @@ -73,6 +81,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -83,9 +92,12 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_COLUMNS_KEY;
import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_FPP_KEY;
Expand All @@ -107,6 +119,7 @@
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand All @@ -127,6 +140,7 @@
import static java.lang.Double.parseDouble;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.parseFloat;
import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong;
import static java.lang.String.format;
import static java.util.Comparator.comparing;
Expand All @@ -143,11 +157,16 @@
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
import static org.apache.iceberg.types.Type.TypeID.BINARY;
import static org.apache.iceberg.types.Type.TypeID.FIXED;
import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash;

public final class IcebergUtil
{
public static final String METADATA_FOLDER_NAME = "metadata";
private static final String METADATA_FILE_EXTENSION = ".metadata.json";
private static final Pattern SIMPLE_NAME = Pattern.compile("[a-z][a-z0-9]*");

private static final Logger log = Logger.get(IcebergUtil.class);

private IcebergUtil() {}

public static boolean isIcebergTable(io.trino.plugin.hive.metastore.Table table)
Expand Down Expand Up @@ -623,4 +642,89 @@ private static void validateOrcBloomFilterColumns(ConnectorTableMetadata tableMe
throw new TrinoException(INVALID_TABLE_PROPERTY, format("Orc bloom filter columns %s not present in schema", Sets.difference(ImmutableSet.copyOf(orcBloomFilterColumns), allColumns)));
}
}

public static Optional<String> getLatestMetadataLocation(TrinoFileSystem fileSystem, String location)
{
List<String> latestMetadataLocations = new ArrayList<>();
String metadataDirectoryLocation = format("%s/%s", stripTrailingSlash(location), METADATA_FOLDER_NAME);
try {
int latestMetadataVersion = -1;
FileIterator fileIterator = fileSystem.listFiles(metadataDirectoryLocation);
while (fileIterator.hasNext()) {
FileEntry fileEntry = fileIterator.next();
if (fileEntry.path().endsWith(METADATA_FILE_EXTENSION)) {
OptionalInt version = parseVersion(fileEntry.path());
if (version.isPresent()) {
if (version.getAsInt() > latestMetadataVersion) {
latestMetadataVersion = version.getAsInt();
latestMetadataLocations.clear();
latestMetadataLocations.add(fileEntry.path());
}
else if (version.getAsInt() == latestMetadataVersion) {
latestMetadataLocations.add(fileEntry.path());
}
}
}
}
if (latestMetadataLocations.isEmpty()) {
throw new TrinoException(ICEBERG_INVALID_METADATA, "No versioned metadata file exists at location: " + metadataDirectoryLocation);
}
if (latestMetadataLocations.size() > 1) {
throw new TrinoException(ICEBERG_INVALID_METADATA, format("More than one latest metadata file found at location: %s, latest metadata files are %s",
metadataDirectoryLocation, latestMetadataLocations));
}
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking table's location: " + location, e);
}
return Optional.of(getOnlyElement(latestMetadataLocations));
}

/**
* Get the latest metadata file location present in location if metadataFileName is not provided, otherwise
* form the metadata file location using location and metadataFileName
*/
public static Optional<String> getMetadataLocation(TrinoFileSystem fileSystem, String location, Optional<String> metadataFileName)
{
if (metadataFileName.isEmpty()) {
return getLatestMetadataLocation(fileSystem, location);
}
return Optional.of(format("%s/%s/%s", stripTrailingSlash(location), METADATA_FOLDER_NAME, metadataFileName.get()));
}

public static OptionalInt parseVersion(String metadataLocation)
{
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
int versionEnd = metadataLocation.indexOf('-', versionStart);
try {
return OptionalInt.of(parseInt(metadataLocation.substring(versionStart, versionEnd)));
}
catch (NumberFormatException | IndexOutOfBoundsException e) {
log.warn(e, "Unable to parse version from metadata location: %s", metadataLocation);
return OptionalInt.empty();
}
}

/**
* Read metadata from provided metadata file location. This API should be called for valid metadata location.
*/
public static TableMetadata readMetadata(TrinoFileSystem fileSystem, String metadataLocation)
throws TrinoException
{
FileIO fileIO = fileSystem.toFileIo();
InputFile metadataFile = fileIO.newInputFile(metadataLocation);
return TableMetadataParser.read(fileIO, metadataFile);
}

public static void validateLocation(TrinoFileSystem fileSystem, String location)
{
try {
if (!fileSystem.newInputFile(location).exists()) {
throw new TrinoException(GENERIC_USER_ERROR, format("Location %s does not exist", location));
}
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format("Invalid location: %s", location), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.iceberg.catalog;

import io.airlift.log.Logger;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.StorageFormat;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -42,8 +41,9 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveType.toHiveType;
import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME;
import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider;
import static java.lang.Integer.parseInt;
import static io.trino.plugin.iceberg.IcebergUtil.parseVersion;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
Expand All @@ -57,10 +57,6 @@
public abstract class AbstractIcebergTableOperations
implements IcebergTableOperations
{
private static final Logger log = Logger.get(AbstractIcebergTableOperations.class);

protected static final String METADATA_FOLDER_NAME = "metadata";

protected static final StorageFormat STORAGE_FORMAT = StorageFormat.create(
LazySimpleSerDe.class.getName(),
FileInputFormat.class.getName(),
Expand Down Expand Up @@ -101,7 +97,7 @@ public void initializeFromMetadata(TableMetadata tableMetadata)
currentMetadata = tableMetadata;
currentMetadataLocation = tableMetadata.metadataFileLocation();
shouldRefresh = false;
version = parseVersion(currentMetadataLocation);
version = parseVersion(currentMetadataLocation).orElse(-1);
}

@Override
Expand Down Expand Up @@ -231,7 +227,7 @@ protected void refreshFromMetadataLocation(String newLocation)

currentMetadata = newMetadata.get();
currentMetadataLocation = newLocation;
version = parseVersion(newLocation);
version = parseVersion(newLocation).orElse(-1);
shouldRefresh = false;
}

Expand All @@ -250,19 +246,6 @@ protected static String metadataFileLocation(TableMetadata metadata, String file
return format("%s/%s/%s", stripTrailingSlash(metadata.location()), METADATA_FOLDER_NAME, filename);
}

protected static int parseVersion(String metadataLocation)
{
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
int versionEnd = metadataLocation.indexOf('-', versionStart);
try {
return parseInt(metadataLocation.substring(versionStart, versionEnd));
}
catch (NumberFormatException | IndexOutOfBoundsException e) {
log.warn(e, "Unable to parse version from metadata location: %s", metadataLocation);
return -1;
}
}

protected static List<Column> toHiveColumns(List<NestedField> columns)
{
return columns.stream()
Expand Down
Loading