Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions .github/ISSUE_TEMPLATE/iceberg_bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ body:
description: What Apache Iceberg version are you using?
multiple: false
options:
- "1.4.3 (latest release)"
- "1.5.0 (latest release)"
- "1.4.3"
- "1.4.2"
- "1.4.1"
- "1.4.0"
Expand All @@ -38,14 +39,6 @@ body:
- "1.2.0"
- "1.1.0"
- "1.0.0"
- "0.14.1"
- "0.14.0"
- "0.13.1"
- "0.13.0"
- "0.12.1"
- "0.12.0"
- "0.11.1"
- "0.11.0"
- "main (development)"
validations:
required: false
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/PositionOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,15 @@ public abstract class PositionOutputStream extends OutputStream {
* @throws IOException If the underlying stream throws IOException
*/
public abstract long getPos() throws IOException;

/**
* Return the current stored length of the output. Can differ from the current position for
* encrypting streams, and for other non-length-preserving streams.
*
* @return current stored length in bytes
* @throws IOException If the underlying stream throws IOException
*/
public long storedLength() throws IOException {
return getPos();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public String oauth2ServerUri() {
return properties().getOrDefault(OAuth2Properties.OAUTH2_SERVER_URI, ResourcePaths.tokens());
}

@Value.Lazy
public Map<String, String> optionalOAuthParams() {
return OAuth2Util.buildOptionalParam(properties());
}

/** A Bearer token supplier which will be used for interaction with the server. */
@Value.Default
public Supplier<String> token() {
Expand Down Expand Up @@ -207,7 +212,13 @@ private AuthSession authSession() {
token,
expiresAtMillis(properties()),
new AuthSession(
ImmutableMap.of(), token, null, credential(), SCOPE, oauth2ServerUri())));
ImmutableMap.of(),
token,
null,
credential(),
SCOPE,
oauth2ServerUri(),
optionalOAuthParams())));
}

if (credentialProvided()) {
Expand All @@ -217,11 +228,22 @@ private AuthSession authSession() {
id -> {
AuthSession session =
new AuthSession(
ImmutableMap.of(), null, null, credential(), SCOPE, oauth2ServerUri());
ImmutableMap.of(),
null,
null,
credential(),
SCOPE,
oauth2ServerUri(),
optionalOAuthParams());
long startTimeMillis = System.currentTimeMillis();
OAuthTokenResponse authResponse =
OAuth2Util.fetchToken(
httpClient(), session.headers(), credential(), SCOPE, oauth2ServerUri());
httpClient(),
session.headers(),
credential(),
SCOPE,
oauth2ServerUri(),
optionalOAuthParams());
return AuthSession.fromTokenResponse(
httpClient(), tokenRefreshExecutor(), authResponse, startTimeMillis, session);
});
Expand Down
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ subprojects {
revapi {
oldGroup = project.group
oldName = project.name
oldVersion = "1.4.0"
oldVersion = "1.5.0"
}

tasks.register('showDeprecationRulesOnRevApiFailure') {
Expand Down Expand Up @@ -440,7 +440,8 @@ project(':iceberg-aliyun') {
exclude group: 'com.google.code.gson', module: 'gson'
}

testImplementation libs.jackson.dataformat.xml
testImplementation platform(libs.jackson.bom)
testImplementation "com.fasterxml.jackson.dataformat:jackson-dataformat-xml"
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation libs.spring.web
testImplementation(libs.spring.boot.starter.jetty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -154,8 +154,8 @@ public RewriteManifests addManifest(ManifestFile manifest) {

private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest.path());
OutputFile newFile = newManifestOutput();
InputFile toCopy = ops.io().newInputFile(manifest);
EncryptedOutputFile newFile = newManifestOutputFile();
return ManifestFiles.copyRewriteManifest(
current.formatVersion(),
manifest.partitionSpecId(),
Expand Down
36 changes: 26 additions & 10 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -446,17 +446,20 @@ public static MetricsReporter loadMetricsReporter(Map<String, String> properties
}

LOG.info("Loading custom MetricsReporter implementation: {}", impl);
DynConstructors.Ctor<MetricsReporter> ctor;
try {
ctor =
DynConstructors.builder(MetricsReporter.class)
.loader(CatalogUtil.class.getClassLoader())
.impl(impl)
.buildChecked();
} catch (NoSuchMethodException e) {

DynConstructors.Ctor<MetricsReporter> ctor =
tryLoadCtor(impl, CatalogUtil.class.getClassLoader());

if (ctor == null) {
LOG.warn(
"Could not find '{}' with the CatalogUtil class loader, falling back to the thread's context class loader",
impl);
ctor = tryLoadCtor(impl, Thread.currentThread().getContextClassLoader());
}

if (ctor == null) {
throw new IllegalArgumentException(
String.format("Cannot initialize MetricsReporter, missing no-arg constructor: %s", impl),
e);
String.format("Cannot initialize MetricsReporter, missing no-arg constructor: %s", impl));
}

MetricsReporter reporter;
Expand All @@ -473,4 +476,17 @@ public static MetricsReporter loadMetricsReporter(Map<String, String> properties

return reporter;
}

private static DynConstructors.Ctor<MetricsReporter> tryLoadCtor(
String impl, ClassLoader loader) {
try {
return DynConstructors.builder(MetricsReporter.class)
.loader(loader)
.impl(impl)
.buildChecked();
} catch (NoSuchMethodException e) {
LOG.warn("Failed to load constructor for {} using class loader {}", impl, loader, e);
return null;
}
}
}
8 changes: 4 additions & 4 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -121,14 +121,14 @@ public FastAppend appendManifest(ManifestFile manifest) {

private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest.path());
OutputFile newManifestPath = newManifestOutput();
InputFile toCopy = ops.io().newInputFile(manifest);
EncryptedOutputFile newManifestFile = newManifestOutputFile();
return ManifestFiles.copyAppendManifest(
current.formatVersion(),
manifest.partitionSpecId(),
toCopy,
current.specsById(),
newManifestPath,
newManifestFile,
snapshotId(),
summaryBuilder);
}
Expand Down
45 changes: 40 additions & 5 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.iceberg.ManifestReader.FileType;
import org.apache.iceberg.avro.AvroEncoderUtil;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.ContentCache;
Expand Down Expand Up @@ -157,11 +159,29 @@ public static ManifestWriter<DataFile> write(PartitionSpec spec, OutputFile outp
*/
public static ManifestWriter<DataFile> write(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
return write(
formatVersion, spec, EncryptedFiles.plainAsEncryptedOutput(outputFile), snapshotId);
}

/**
* Create a new {@link ManifestWriter} for the given format version.
*
* @param formatVersion a target format version
* @param spec a {@link PartitionSpec}
* @param encryptedOutputFile an {@link EncryptedOutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @return a manifest writer
*/
public static ManifestWriter<DataFile> write(
int formatVersion,
PartitionSpec spec,
EncryptedOutputFile encryptedOutputFile,
Long snapshotId) {
switch (formatVersion) {
case 1:
return new ManifestWriter.V1Writer(spec, outputFile, snapshotId);
return new ManifestWriter.V1Writer(spec, encryptedOutputFile, snapshotId);
case 2:
return new ManifestWriter.V2Writer(spec, outputFile, snapshotId);
return new ManifestWriter.V2Writer(spec, encryptedOutputFile, snapshotId);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down Expand Up @@ -198,6 +218,21 @@ public static ManifestReader<DeleteFile> readDeleteManifest(
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
return writeDeleteManifest(
formatVersion, spec, EncryptedFiles.plainAsEncryptedOutput(outputFile), snapshotId);
}

/**
* Create a new {@link ManifestWriter} for the given format version.
*
* @param formatVersion a target format version
* @param spec a {@link PartitionSpec}
* @param outputFile an {@link EncryptedOutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @return a manifest writer
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion, PartitionSpec spec, EncryptedOutputFile outputFile, Long snapshotId) {
switch (formatVersion) {
case 1:
throw new IllegalArgumentException("Cannot write delete files in a v1 table");
Expand Down Expand Up @@ -254,7 +289,7 @@ static ManifestFile copyAppendManifest(
int specId,
InputFile toCopy,
Map<Integer, PartitionSpec> specsById,
OutputFile outputFile,
EncryptedOutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
// use metadata that will add the current snapshot's ID for the rewrite
Expand All @@ -278,7 +313,7 @@ static ManifestFile copyRewriteManifest(
int specId,
InputFile toCopy,
Map<Integer, PartitionSpec> specsById,
OutputFile outputFile,
EncryptedOutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
// for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an
Expand All @@ -302,7 +337,7 @@ static ManifestFile copyRewriteManifest(
private static ManifestFile copyManifestInternal(
int formatVersion,
ManifestReader<DataFile> reader,
OutputFile outputFile,
EncryptedOutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder,
ManifestEntry.Status allowedEntryStatus) {
Expand Down
18 changes: 11 additions & 7 deletions core/src/main/java/org/apache/iceberg/ManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.iceberg;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
Expand All @@ -37,6 +39,7 @@ public abstract class ManifestWriter<F extends ContentFile<F>> implements FileAp
static final long UNASSIGNED_SEQ = -1L;

private final OutputFile file;
private final ByteBuffer keyMetadataBuffer;
private final int specId;
private final FileAppender<ManifestEntry<F>> writer;
private final Long snapshotId;
Expand All @@ -52,13 +55,14 @@ public abstract class ManifestWriter<F extends ContentFile<F>> implements FileAp
private long deletedRows = 0L;
private Long minDataSequenceNumber = null;

private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
this.file = file;
private ManifestWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
this.file = file.encryptingOutputFile();
this.specId = spec.specId();
this.writer = newAppender(spec, file);
this.writer = newAppender(spec, this.file);
this.snapshotId = snapshotId;
this.reused = new GenericManifestEntry<>(spec.partitionType());
this.stats = new PartitionSummary(spec);
this.keyMetadataBuffer = (file.keyMetadata() == null) ? null : file.keyMetadata().buffer();
}

protected abstract ManifestEntry<F> prepare(ManifestEntry<F> entry);
Expand Down Expand Up @@ -204,7 +208,7 @@ public ManifestFile toManifestFile() {
deletedFiles,
deletedRows,
stats.summaries(),
null);
keyMetadataBuffer);
}

@Override
Expand All @@ -216,7 +220,7 @@ public void close() throws IOException {
static class V2Writer extends ManifestWriter<DataFile> {
private final V2Metadata.IndexedManifestEntry<DataFile> entryWrapper;

V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
V2Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType());
}
Expand Down Expand Up @@ -250,7 +254,7 @@ protected FileAppender<ManifestEntry<DataFile>> newAppender(
static class V2DeleteWriter extends ManifestWriter<DeleteFile> {
private final V2Metadata.IndexedManifestEntry<DeleteFile> entryWrapper;

V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
V2DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType());
}
Expand Down Expand Up @@ -289,7 +293,7 @@ protected ManifestContent content() {
static class V1Writer extends ManifestWriter<DataFile> {
private final V1Metadata.IndexedManifestEntry entryWrapper;

V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
V1Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType());
}
Expand Down
Loading