Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public static ManifestReader read(ManifestFile manifest, FileIO io, Map<Integer,
* @return a manifest writer
*/
public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
// always use a v1 writer for appended manifests because sequence number must be inherited
return write(1, spec, outputFile, null);
// always use a v2 writer to preserve sequence numbers, but use null for sequence number so appends inherit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean manifests will be written with the v2 schema (i.e. with sequence numbers) even though TableMetadata is v1 and the manifest list is written with v1? And this should work because we do a projection on read and sequence number is optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the constructor used to create an append or rewritten manifest, so this makes the manifests that are passed into FastAppend, MergeAppend, and RewriteManifests support sequence numbers. It is needed for the last case: when rewriting a manifest for a v2 table, we need to preserve sequence numbers.

This works for v1 because v1 tables will ignore the new fields.

return write(2, spec, outputFile, null);
}

/**
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ public static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
String location,
Map<String, String> properties) {
return newTableMetadata(schema, spec, location, properties, DEFAULT_TABLE_FORMAT_VERSION);
}

static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
String location,
Map<String, String> properties,
int formatVersion) {
// reassign all column ids to ensure consistency
AtomicInteger lastColumnId = new AtomicInteger(0);
Schema freshSchema = TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);
Expand All @@ -70,7 +78,7 @@ public static TableMetadata newTableMetadata(Schema schema,
}
PartitionSpec freshSpec = specBuilder.build();

return new TableMetadata(null, DEFAULT_TABLE_FORMAT_VERSION, UUID.randomUUID().toString(), location,
return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
INITIAL_SEQUENCE_NUMBER, System.currentTimeMillis(),
lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec),
ImmutableMap.copyOf(properties), -1, ImmutableList.of(),
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ static Schema wrapFileSchema(Types.StructType fileSchema) {

static class IndexedManifestEntry implements ManifestEntry, IndexedRecord {
private final org.apache.avro.Schema avroSchema;
private final long commitSnapshotId;
private final Long commitSnapshotId;
private final V1Metadata.IndexedDataFile fileWrapper;
private ManifestEntry wrapped = null;

IndexedManifestEntry(long commitSnapshotId, Types.StructType partitionType) {
IndexedManifestEntry(Long commitSnapshotId, Types.StructType partitionType) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied manifests will use a null snapshot ID.

this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry");
this.commitSnapshotId = commitSnapshotId;
// TODO: when v2 data files differ from v1, this should use a v2 wrapper
Expand Down Expand Up @@ -281,7 +281,8 @@ public Object get(int i) {
// if the entry's sequence number is null, then it will inherit the sequence number of the current commit.
// to validate that this is correct, check that the snapshot id is either null (will also be inherited) or
// that it matches the id of the current commit.
Preconditions.checkState(wrapped.snapshotId() == null || commitSnapshotId == wrapped.snapshotId(),
Preconditions.checkState(
wrapped.snapshotId() == null || wrapped.snapshotId().equals(commitSnapshotId),
"Found unassigned sequence number for an entry from snapshot: %s", wrapped.snapshotId());
return null;
}
Expand Down
10 changes: 8 additions & 2 deletions core/src/test/java/org/apache/iceberg/TableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ public class TableTestBase {
File metadataDir = null;
public TestTables.TestTable table = null;

protected final int formatVersion;

public TableTestBase(int formatVersion) {
this.formatVersion = formatVersion;
}

@Before
public void setupTable() throws Exception {
this.tableDir = temp.newFolder();
Expand All @@ -109,8 +115,8 @@ List<File> listManifestFiles(File tableDirToList) {
!name.startsWith("snap") && Files.getFileExtension(name).equalsIgnoreCase("avro")));
}

private TestTables.TestTable create(Schema schema, PartitionSpec spec) {
return TestTables.create(tableDir, "test", schema, spec);
TestTables.TestTable create(Schema schema, PartitionSpec spec) {
return TestTables.create(tableDir, "test", schema, spec, formatVersion);
}

TestTables.TestTable load() {
Expand Down
17 changes: 16 additions & 1 deletion core/src/test/java/org/apache/iceberg/TestCreateTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,25 @@
import org.apache.iceberg.types.TypeUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.iceberg.PartitionSpec.unpartitioned;

@RunWith(Parameterized.class)
public class TestCreateTransaction extends TableTestBase {
@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
new Object[] { 1 },
new Object[] { 2 },
};
}

public TestCreateTransaction(int formatVersion) {
super(formatVersion);
}

@Test
public void testCreateTransaction() throws IOException {
File tableDir = temp.newFolder();
Expand Down Expand Up @@ -273,7 +288,7 @@ public void testCreateTransactionConflict() throws IOException {
Assert.assertNull("Should have no metadata version",
TestTables.metadataVersion("test_conflict"));

Table conflict = TestTables.create(tableDir, "test_conflict", SCHEMA, unpartitioned());
Table conflict = TestTables.create(tableDir, "test_conflict", SCHEMA, unpartitioned(), formatVersion);

Assert.assertEquals("Table schema should match with reassigned IDs",
TypeUtil.assignIncreasingFreshIds(SCHEMA).asStruct(), conflict.schema().asStruct());
Expand Down
39 changes: 12 additions & 27 deletions core/src/test/java/org/apache/iceberg/TestDataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,30 @@

package org.apache.iceberg;

import java.io.File;
import java.io.IOException;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.iceberg.types.Types.NestedField.required;
import static org.junit.Assert.assertEquals;

public class TestDataTableScan {

@Rule
public TemporaryFolder temp = new TemporaryFolder();
private final Schema schema = new Schema(
required(1, "id", Types.IntegerType.get()),
required(2, "data", Types.StringType.get()));
private File tableDir = null;

@Before
public void setupTableDir() throws IOException {
this.tableDir = temp.newFolder();
@RunWith(Parameterized.class)
public class TestDataTableScan extends TableTestBase {
@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
new Object[] { 1 },
new Object[] { 2 },
};
}

@After
public void cleanupTables() {
TestTables.clearTables();
public TestDataTableScan(int formatVersion) {
super(formatVersion);
}

@Test
public void testTableScanHonorsSelect() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = TestTables.create(tableDir, "test", schema, spec);

TableScan scan = table.newScan().select("id");

Schema expectedSchema = new Schema(required(1, "id", Types.IntegerType.get()));
Expand All @@ -66,9 +54,6 @@ public void testTableScanHonorsSelect() {

@Test
public void testTableScanHonorsSelectWithoutCaseSensitivity() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = TestTables.create(tableDir, "test", schema, spec);

TableScan scan1 = table.newScan().caseSensitive(false).select("ID");
// order of refinements shouldn't matter
TableScan scan2 = table.newScan().select("ID").caseSensitive(false);
Expand Down
15 changes: 15 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,23 @@
import org.apache.iceberg.ManifestEntry.Status;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestDeleteFiles extends TableTestBase {
@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
new Object[] { 1 },
new Object[] { 2 },
};
}

public TestDeleteFiles(int formatVersion) {
super(formatVersion);
}

@Test
public void testMultipleDeletes() {
table.newAppend()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,24 @@
import com.google.common.collect.Iterables;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.junit.Assert.assertEquals;

@RunWith(Parameterized.class)
public class TestEntriesMetadataTable extends TableTestBase {
@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
new Object[] { 1 },
new Object[] { 2 },
};
}

public TestEntriesMetadataTable(int formatVersion) {
super(formatVersion);
}

@Test
public void testEntriesTable() {
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestFastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,22 @@
import org.apache.iceberg.exceptions.CommitFailedException;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestFastAppend extends TableTestBase {
@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
new Object[] { 1 },
new Object[] { 2 },
};
}

public TestFastAppend(int formatVersion) {
super(formatVersion);
}

@Test
public void testEmptyTableAppend() {
Expand Down
24 changes: 20 additions & 4 deletions core/src/test/java/org/apache/iceberg/TestFilterFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,27 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.iceberg.types.Types.NestedField.required;
import static org.junit.Assert.assertEquals;

@RunWith(Parameterized.class)
public class TestFilterFiles {
@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
new Object[] { 1 },
new Object[] { 2 },
};
}

public final int formatVersion;

public TestFilterFiles(int formatVersion) {
this.formatVersion = formatVersion;
}

@Rule
public TemporaryFolder temp = new TemporaryFolder();
Expand All @@ -60,28 +76,28 @@ public void cleanupTables() {
@Test
public void testFilterFilesUnpartitionedTable() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = TestTables.create(tableDir, "test", schema, spec);
Table table = TestTables.create(tableDir, "test", schema, spec, formatVersion);
testFilterFiles(table);
}

@Test
public void testCaseInsensitiveFilterFilesUnpartitionedTable() {
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = TestTables.create(tableDir, "test", schema, spec);
Table table = TestTables.create(tableDir, "test", schema, spec, formatVersion);
testCaseInsensitiveFilterFiles(table);
}

@Test
public void testFilterFilesPartitionedTable() {
PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("data", 16).build();
Table table = TestTables.create(tableDir, "test", schema, spec);
Table table = TestTables.create(tableDir, "test", schema, spec, formatVersion);
testFilterFiles(table);
}

@Test
public void testCaseInsensitiveFilterFilesPartitionedTable() {
PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("data", 16).build();
Table table = TestTables.create(tableDir, "test", schema, spec);
Table table = TestTables.create(tableDir, "test", schema, spec, formatVersion);
testCaseInsensitiveFilterFiles(table);
}

Expand Down
15 changes: 15 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestFindFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,23 @@
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestFindFiles extends TableTestBase {
@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
new Object[] { 1 },
new Object[] { 2 },
};
}

public TestFindFiles(int formatVersion) {
super(formatVersion);
}

@Test
public void testBasicBehavior() {
table.newAppend()
Expand Down
4 changes: 4 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestFormatVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.junit.Test;

public class TestFormatVersions extends TableTestBase {
public TestFormatVersions() {
super(1);
}

@Test
public void testDefaultFormatVersion() {
Assert.assertEquals("Should default to v1", 1, table.ops().current().formatVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,22 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestIncrementalDataTableScan extends TableTestBase {
@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
new Object[] { 1 },
new Object[] { 2 },
};
}

public TestIncrementalDataTableScan(int formatVersion) {
super(formatVersion);
}

@Before
public void setupTableProperties() {
Expand Down
15 changes: 15 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestManifestCleanup.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,23 @@
import org.apache.iceberg.expressions.Expressions;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestManifestCleanup extends TableTestBase {
@Parameterized.Parameters
public static Object[][] parameters() {
return new Object[][] {
new Object[] { 1 },
new Object[] { 2 },
};
}

public TestManifestCleanup(int formatVersion) {
super(formatVersion);
}

@Test
public void testDelete() {
Assert.assertEquals("Table should start with no manifests",
Expand Down
Loading