diff --git a/core/src/main/java/org/apache/iceberg/puffin/Blob.java b/core/src/main/java/org/apache/iceberg/puffin/Blob.java index 350748a5969e..57c2cd1c002b 100644 --- a/core/src/main/java/org/apache/iceberg/puffin/Blob.java +++ b/core/src/main/java/org/apache/iceberg/puffin/Blob.java @@ -30,23 +30,29 @@ public final class Blob { private final String type; private final List inputFields; + private final long snapshotId; + private final long sequenceNumber; private final ByteBuffer blobData; private final PuffinCompressionCodec requestedCompression; private final Map properties; - public Blob(String type, List inputFields, ByteBuffer blobData) { - this(type, inputFields, blobData, null, ImmutableMap.of()); + public Blob( + String type, List inputFields, long snapshotId, long sequenceNumber, + ByteBuffer blobData) { + this(type, inputFields, snapshotId, sequenceNumber, blobData, null, ImmutableMap.of()); } public Blob( - String type, List inputFields, ByteBuffer blobData, - @Nullable PuffinCompressionCodec requestedCompression, Map properties) { + String type, List inputFields, long snapshotId, long sequenceNumber, + ByteBuffer blobData, @Nullable PuffinCompressionCodec requestedCompression, Map properties) { Preconditions.checkNotNull(type, "type is null"); Preconditions.checkNotNull(inputFields, "inputFields is null"); Preconditions.checkNotNull(blobData, "blobData is null"); Preconditions.checkNotNull(properties, "properties is null"); this.type = type; this.inputFields = ImmutableList.copyOf(inputFields); + this.snapshotId = snapshotId; + this.sequenceNumber = sequenceNumber; this.blobData = blobData; this.requestedCompression = requestedCompression; this.properties = ImmutableMap.copyOf(properties); @@ -60,6 +66,14 @@ public List inputFields() { return inputFields; } + public long snapshotId() { + return snapshotId; + } + + public long sequenceNumber() { + return sequenceNumber; + } + public ByteBuffer blobData() { return blobData; } diff --git a/core/src/main/java/org/apache/iceberg/puffin/BlobMetadata.java b/core/src/main/java/org/apache/iceberg/puffin/BlobMetadata.java index 517ce765a077..cc29dd5df92f 100644 --- a/core/src/main/java/org/apache/iceberg/puffin/BlobMetadata.java +++ b/core/src/main/java/org/apache/iceberg/puffin/BlobMetadata.java @@ -29,19 +29,23 @@ public class BlobMetadata { private final String type; private final List inputFields; + private final long snapshotId; + private final long sequenceNumber; private final long offset; private final long length; private final String compressionCodec; private final Map properties; public BlobMetadata( - String type, List inputFields, long offset, long length, - @Nullable String compressionCodec, Map properties) { + String type, List inputFields, long snapshotId, long sequenceNumber, + long offset, long length, @Nullable String compressionCodec, Map properties) { Preconditions.checkNotNull(type, "type is null"); Preconditions.checkNotNull(inputFields, "inputFields is null"); Preconditions.checkNotNull(properties, "properties is null"); this.type = type; this.inputFields = ImmutableList.copyOf(inputFields); + this.snapshotId = snapshotId; + this.sequenceNumber = sequenceNumber; this.offset = offset; this.length = length; this.compressionCodec = compressionCodec; @@ -56,6 +60,20 @@ public List inputFields() { return inputFields; } + /** + * ID of the Iceberg table's snapshot the blob was computed from + */ + public long snapshotId() { + return snapshotId; + } + + /** + * Sequence number of the Iceberg table's snapshot the blob was computed from + */ + public long sequenceNumber() { + return sequenceNumber; + } + /** * Offset in the file */ diff --git a/core/src/main/java/org/apache/iceberg/puffin/FileMetadataParser.java b/core/src/main/java/org/apache/iceberg/puffin/FileMetadataParser.java index 59642b7ba42d..40b84f682fe1 100644 --- a/core/src/main/java/org/apache/iceberg/puffin/FileMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/puffin/FileMetadataParser.java @@ -41,6 +41,8 @@ private FileMetadataParser() { private static final String TYPE = "type"; private static final String FIELDS = "fields"; + private static final String SNAPSHOT_ID = "snapshot-id"; + private static final String SEQUENCE_NUMBER = "sequence-number"; private static final String OFFSET = "offset"; private static final String LENGTH = "length"; private static final String COMPRESSION_CODEC = "compression-codec"; @@ -123,6 +125,8 @@ static void toJson(BlobMetadata blobMetadata, JsonGenerator generator) throws IO generator.writeNumber(field); } generator.writeEndArray(); + generator.writeNumberField(SNAPSHOT_ID, blobMetadata.snapshotId()); + generator.writeNumberField(SEQUENCE_NUMBER, blobMetadata.sequenceNumber()); generator.writeNumberField(OFFSET, blobMetadata.offset()); generator.writeNumberField(LENGTH, blobMetadata.length()); @@ -145,6 +149,8 @@ static void toJson(BlobMetadata blobMetadata, JsonGenerator generator) throws IO static BlobMetadata blobMetadataFromJson(JsonNode json) { String type = JsonUtil.getString(TYPE, json); List fields = JsonUtil.getIntegerList(FIELDS, json); + long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, json); + long sequenceNumber = JsonUtil.getLong(SEQUENCE_NUMBER, json); long offset = JsonUtil.getLong(OFFSET, json); long length = JsonUtil.getLong(LENGTH, json); String compressionCodec = JsonUtil.getStringOrNull(COMPRESSION_CODEC, json); @@ -158,6 +164,8 @@ static BlobMetadata blobMetadataFromJson(JsonNode json) { return new BlobMetadata( type, fields, + snapshotId, + sequenceNumber, offset, length, compressionCodec, diff --git a/core/src/main/java/org/apache/iceberg/puffin/PuffinWriter.java b/core/src/main/java/org/apache/iceberg/puffin/PuffinWriter.java index 6b4a1e9dd3c5..cbaaf6828ede 100644 --- a/core/src/main/java/org/apache/iceberg/puffin/PuffinWriter.java +++ b/core/src/main/java/org/apache/iceberg/puffin/PuffinWriter.java @@ -79,8 +79,8 @@ public void add(Blob blob) { ByteBuffer rawData = PuffinFormat.compress(codec, blob.blobData()); int length = rawData.remaining(); IOUtil.writeFully(outputStream, rawData); - writtenBlobsMetadata.add(new BlobMetadata(blob.type(), blob.inputFields(), fileOffset, length, - codec.codecName(), blob.properties())); + writtenBlobsMetadata.add(new BlobMetadata(blob.type(), blob.inputFields(), blob.snapshotId(), + blob.sequenceNumber(), fileOffset, length, codec.codecName(), blob.properties())); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/core/src/test/java/org/apache/iceberg/puffin/PuffinFormatTestUtil.java b/core/src/test/java/org/apache/iceberg/puffin/PuffinFormatTestUtil.java index 5d9193ee5a0e..62a67c27c308 100644 --- a/core/src/test/java/org/apache/iceberg/puffin/PuffinFormatTestUtil.java +++ b/core/src/test/java/org/apache/iceberg/puffin/PuffinFormatTestUtil.java @@ -29,7 +29,7 @@ private PuffinFormatTestUtil() { public static final long EMPTY_PUFFIN_UNCOMPRESSED_FOOTER_SIZE = 28; // footer size for v1/sample-metric-data-compressed-zstd.bin - public static final long SAMPLE_METRIC_DATA_COMPRESSED_ZSTD_FOOTER_SIZE = 242; + public static final long SAMPLE_METRIC_DATA_COMPRESSED_ZSTD_FOOTER_SIZE = 314; static byte[] readTestResource(String resourceName) throws Exception { return Resources.toByteArray(Resources.getResource(PuffinFormatTestUtil.class, resourceName)); diff --git a/core/src/test/java/org/apache/iceberg/puffin/TestFileMetadataParser.java b/core/src/test/java/org/apache/iceberg/puffin/TestFileMetadataParser.java index 5efac95bdb24..3922d227389b 100644 --- a/core/src/test/java/org/apache/iceberg/puffin/TestFileMetadataParser.java +++ b/core/src/test/java/org/apache/iceberg/puffin/TestFileMetadataParser.java @@ -99,19 +99,23 @@ public void testBlobMetadata() { testJsonSerialization( new FileMetadata( ImmutableList.of( - new BlobMetadata("type-a", ImmutableList.of(1), 4, 16, null, ImmutableMap.of()), - new BlobMetadata("type-bbb", ImmutableList.of(2, 3, 4), Integer.MAX_VALUE * 10000L, 79834, null, + new BlobMetadata("type-a", ImmutableList.of(1), 14, 3, 4, 16, null, ImmutableMap.of()), + new BlobMetadata("type-bbb", ImmutableList.of(2, 3, 4), 77, 4, Integer.MAX_VALUE * 10000L, 79834, null, ImmutableMap.of())), ImmutableMap.of()), "{\n" + " \"blobs\" : [ {\n" + " \"type\" : \"type-a\",\n" + " \"fields\" : [ 1 ],\n" + + " \"snapshot-id\" : 14,\n" + + " \"sequence-number\" : 3,\n" + " \"offset\" : 4,\n" + " \"length\" : 16\n" + " }, {\n" + " \"type\" : \"type-bbb\",\n" + " \"fields\" : [ 2, 3, 4 ],\n" + + " \"snapshot-id\" : 77,\n" + + " \"sequence-number\" : 4,\n" + " \"offset\" : 21474836470000,\n" + " \"length\" : 79834\n" + " } ]\n" + @@ -124,13 +128,15 @@ public void testBlobProperties() { new FileMetadata( ImmutableList.of( new BlobMetadata( - "type-a", ImmutableList.of(1), 4, 16, null, + "type-a", ImmutableList.of(1), 14, 3, 4, 16, null, ImmutableMap.of("some key", "some value"))), ImmutableMap.of()), "{\n" + " \"blobs\" : [ {\n" + " \"type\" : \"type-a\",\n" + " \"fields\" : [ 1 ],\n" + + " \"snapshot-id\" : 14,\n" + + " \"sequence-number\" : 3,\n" + " \"offset\" : 4,\n" + " \"length\" : 16,\n" + " \"properties\" : {\n" + diff --git a/core/src/test/java/org/apache/iceberg/puffin/TestPuffinWriter.java b/core/src/test/java/org/apache/iceberg/puffin/TestPuffinWriter.java index a9b04af1e969..f45229c03049 100644 --- a/core/src/test/java/org/apache/iceberg/puffin/TestPuffinWriter.java +++ b/core/src/test/java/org/apache/iceberg/puffin/TestPuffinWriter.java @@ -96,14 +96,14 @@ private void testWriteMetric(PuffinCompressionCodec compression, String expected try (PuffinWriter writer = Puffin.write(outputFile) .createdBy("Test 1234") .build()) { - writer.add(new Blob("some-blob", ImmutableList.of(1), ByteBuffer.wrap("abcdefghi".getBytes(UTF_8)), + writer.add(new Blob("some-blob", ImmutableList.of(1), 2, 1, ByteBuffer.wrap("abcdefghi".getBytes(UTF_8)), compression, ImmutableMap.of())); // "xxx"s are stripped away by data offsets byte[] bytes = "xxx some blob \u0000 binary data 🤯 that is not very very very very very very long, is it? xxx".getBytes( UTF_8); - writer.add(new Blob("some-other-blob", ImmutableList.of(2), ByteBuffer.wrap(bytes, 4, bytes.length - 8), + writer.add(new Blob("some-other-blob", ImmutableList.of(2), 2, 1, ByteBuffer.wrap(bytes, 4, bytes.length - 8), compression, ImmutableMap.of())); assertThat(writer.writtenBlobsMetadata()).hasSize(2); diff --git a/core/src/test/resources/org/apache/iceberg/puffin/v1/sample-metric-data-compressed-zstd.bin b/core/src/test/resources/org/apache/iceberg/puffin/v1/sample-metric-data-compressed-zstd.bin index c63361501910..ac8b69c76e57 100644 Binary files a/core/src/test/resources/org/apache/iceberg/puffin/v1/sample-metric-data-compressed-zstd.bin and b/core/src/test/resources/org/apache/iceberg/puffin/v1/sample-metric-data-compressed-zstd.bin differ diff --git a/core/src/test/resources/org/apache/iceberg/puffin/v1/sample-metric-data-uncompressed.bin b/core/src/test/resources/org/apache/iceberg/puffin/v1/sample-metric-data-uncompressed.bin index 15d806805df9..ab8da13822c5 100644 Binary files a/core/src/test/resources/org/apache/iceberg/puffin/v1/sample-metric-data-uncompressed.bin and b/core/src/test/resources/org/apache/iceberg/puffin/v1/sample-metric-data-uncompressed.bin differ diff --git a/format/puffin-spec.md b/format/puffin-spec.md index 334cc1ebfc2b..f94fb0a4b852 100644 --- a/format/puffin-spec.md +++ b/format/puffin-spec.md @@ -102,6 +102,8 @@ with content size present), UTF-8 encoded JSON payload representing a single |-------------------|-----------------------------------------|----------| ----------- | | type | JSON string | yes | See [Blob types](#blob-types) | fields | JSON list of ints | yes | List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. +| snapshot-id | JSON long | yes | ID of the Iceberg table's snapshot the blob was computed from. +| sequence-number | JSON long | yes | Sequence number of the Iceberg table's snapshot the blob was computed from. | offset | JSON long | yes | The offset in the file where the blob contents start | length | JSON long | yes | The length of the blob stored in the file (after compression, if compressed) | compression-codec | JSON string | no | See [Compression codecs](#compression-codecs). If omitted, the data is assumed to be uncompressed. @@ -140,5 +142,3 @@ When writing a Puffin file it is recommended to set the following fields in the - `created-by` - human-readable identification of the application writing the file, along with its version. Example "Trino version 381". -- `source-snapshot-id` - a table snapshot which was used to calculate blob contents -- `source-sequence-number` - sequence number of the table snapshot used to calculate blob contents