Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@

import javax.inject.Inject;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class HdfsEnvironment
implements Externalizable
{
static {
HadoopNative.requireHadoopNative();
Expand Down Expand Up @@ -102,7 +106,29 @@ public void doAs(ConnectorIdentity identity, Runnable action)
hdfsAuthentication.doAs(identity, action);
}

public HdfsEnvironment()
{
this.hdfsConfiguration = null;
this.hdfsAuthentication = null;
this.newDirectoryPermissions = null;
this.newFileInheritOwnership = false;
this.verifyChecksum = false;
}

@Override
public void writeExternal(ObjectOutput out)
throws IOException
{
}

@Override
public void readExternal(ObjectInput in)
throws IOException, ClassNotFoundException
{
}
Comment on lines +119 to +128
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's unlikely correct serialization of HdfsEnvironment state, since it doesn't seem to store anything.

Anyway, we should not make this class serializable (nor Externalizable) at all


public static class HdfsContext
implements Externalizable
{
private final ConnectorIdentity identity;

Expand Down Expand Up @@ -130,5 +156,22 @@ public String toString()
.add("user", identity)
.toString();
}

public HdfsContext()
{
identity = null;
}

@Override
public void writeExternal(ObjectOutput out)
throws IOException
{
}

@Override
public void readExternal(ObjectInput in)
throws IOException, ClassNotFoundException
{
}
Comment on lines +166 to +175
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above. This is neither correct, nor desired.

}
}
6 changes: 6 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${dep.iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive-metastore</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.iceberg.FileContent;

import java.util.Optional;

Expand All @@ -27,18 +28,21 @@ public class CommitTaskData
private final long fileSizeInBytes;
private final MetricsWrapper metrics;
private final Optional<String> partitionDataJson;
private final FileContent content;

@JsonCreator
public CommitTaskData(
@JsonProperty("path") String path,
@JsonProperty("fileSizeInBytes") long fileSizeInBytes,
@JsonProperty("metrics") MetricsWrapper metrics,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson)
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("content") FileContent content)
{
this.path = requireNonNull(path, "path is null");
this.fileSizeInBytes = fileSizeInBytes;
this.metrics = requireNonNull(metrics, "metrics is null");
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.content = requireNonNull(content, "content is null");
checkArgument(fileSizeInBytes >= 0, "fileSizeInBytes is negative");
}

Expand All @@ -65,4 +69,10 @@ public Optional<String> getPartitionDataJson()
{
return partitionDataJson;
}

@JsonProperty
public FileContent getContent()
{
return content;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,26 @@
import java.util.Objects;
import java.util.Optional;

import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataColumns.IS_DELETED;
import static org.apache.iceberg.MetadataColumns.ROW_POSITION;

public class IcebergColumnHandle
implements ColumnHandle
{
public static final IcebergColumnHandle ROW_POSITION_HANDLE = new IcebergColumnHandle(
primitiveColumnIdentity(ROW_POSITION.fieldId(), ROW_POSITION.name()),
BIGINT,
ImmutableList.of(),
BIGINT,
Optional.empty());

// use Integer.MIN_VALUE as $row_id field ID, which is currently not reserved by Iceberg
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we know if iceberg starts using this id internally? Would sth break?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Iceberg column name and type could change, comparing column ID is needed for equality check. Using a duplicated column ID would cause issue in that area.

All the Iceberg metadata columns have IDs counting down from Integer.MAX_VALUE. That's why I am choosing Integer.MIN_VALUE here. If there is any risk of conflicting IDs, Iceberg will inform the Trino community, and switching to another ID is fully backwards compatible.

public static final int TRINO_ROW_ID_COLUMN_ID = Integer.MIN_VALUE;
public static final String TRINO_ROW_ID_COLUMN_NAME = "$row_id";

private final ColumnIdentity baseColumnIdentity;
private final Type baseType;
// The list of field ids to indicate the projected part of the top-level column represented by baseColumnIdentity
Expand Down Expand Up @@ -138,6 +153,21 @@ public boolean isBaseColumn()
return path.isEmpty();
}

public boolean isIcebergRowPositionMetadataColumn()
{
return id == ROW_POSITION.fieldId();
}

public boolean isIcebergIsDeletedMetadataColumn()
{
return id == IS_DELETED.fieldId();
}

public boolean isTrinoRowIdColumn()
{
return id == TRINO_ROW_ID_COLUMN_ID;
}

@Override
public int hashCode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.plugin.hive.HiveCompressionCodec;
import org.apache.iceberg.FileFormat;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

Expand All @@ -29,6 +30,9 @@

public class IcebergConfig
{
public static final int FORMAT_VERSION_SUPPORT_MIN = 1;
public static final int FORMAT_VERSION_SUPPORT_MAX = 2;
Comment on lines +33 to +34
Copy link
Copy Markdown
Member

@losipiuk losipiuk Nov 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: MIN/MAX_SUPPORTED_VERSION


private IcebergFileFormat fileFormat = ORC;
private HiveCompressionCodec compressionCodec = GZIP;
private boolean useFileSizeFromMetadata = true;
Expand All @@ -38,6 +42,7 @@ public class IcebergConfig
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
private boolean projectionPushdownEnabled = true;
private int formatVersion = FORMAT_VERSION_SUPPORT_MIN;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not 2? document.
i do believe we should create tables with format 2 rather sooner than later.


public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -167,4 +172,19 @@ public IcebergConfig setProjectionPushdownEnabled(boolean projectionPushdownEnab
this.projectionPushdownEnabled = projectionPushdownEnabled;
return this;
}

@Min(FORMAT_VERSION_SUPPORT_MIN)
@Max(FORMAT_VERSION_SUPPORT_MAX)
public int getFormatVersion()
{
return formatVersion;
}

@Config("iceberg.format-version")
@ConfigDescription("Iceberg table format version to use when creating a table")
public IcebergConfig setFormatVersion(int formatVersion)
{
this.formatVersion = formatVersion;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -110,11 +111,12 @@ public IcebergFileWriter createFileWriter(
JobConf jobConf,
ConnectorSession session,
HdfsContext hdfsContext,
FileFormat fileFormat)
FileFormat fileFormat,
FileContent fileContent)
{
switch (fileFormat) {
case PARQUET:
return createParquetWriter(outputPath, icebergSchema, jobConf, session, hdfsContext);
return createParquetWriter(outputPath, icebergSchema, jobConf, session, hdfsContext, fileContent);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why fileContent is ignored for ORC?

at least add verify(fileContent==DATA, "...") in ORC branch

case ORC:
return createOrcWriter(outputPath, icebergSchema, jobConf, session);
default:
Expand All @@ -127,7 +129,8 @@ private IcebergFileWriter createParquetWriter(
Schema icebergSchema,
JobConf jobConf,
ConnectorSession session,
HdfsContext hdfsContext)
HdfsContext hdfsContext,
FileContent fileContent)
{
List<String> fileColumnNames = icebergSchema.columns().stream()
.map(Types.NestedField::name)
Expand Down Expand Up @@ -162,7 +165,8 @@ private IcebergFileWriter createParquetWriter(
nodeVersion.toString(),
outputPath,
hdfsEnvironment,
hdfsContext);
hdfsContext,
fileContent);
}
catch (IOException e) {
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Parquet file", e);
Expand Down
Loading