Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions core/src/main/java/org/apache/iceberg/puffin/Blob.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,29 @@
public final class Blob {
private final String type;
private final List<Integer> inputFields;
private final long snapshotId;
private final long sequenceNumber;
private final ByteBuffer blobData;
private final PuffinCompressionCodec requestedCompression;
private final Map<String, String> properties;

public Blob(String type, List<Integer> inputFields, ByteBuffer blobData) {
this(type, inputFields, blobData, null, ImmutableMap.of());
public Blob(
String type, List<Integer> inputFields, long snapshotId, long sequenceNumber,
ByteBuffer blobData) {
this(type, inputFields, snapshotId, sequenceNumber, blobData, null, ImmutableMap.of());
}

public Blob(
String type, List<Integer> inputFields, ByteBuffer blobData,
@Nullable PuffinCompressionCodec requestedCompression, Map<String, String> properties) {
String type, List<Integer> inputFields, long snapshotId, long sequenceNumber,
ByteBuffer blobData, @Nullable PuffinCompressionCodec requestedCompression, Map<String, String> properties) {
Preconditions.checkNotNull(type, "type is null");
Preconditions.checkNotNull(inputFields, "inputFields is null");
Preconditions.checkNotNull(blobData, "blobData is null");
Preconditions.checkNotNull(properties, "properties is null");
this.type = type;
this.inputFields = ImmutableList.copyOf(inputFields);
this.snapshotId = snapshotId;
this.sequenceNumber = sequenceNumber;
this.blobData = blobData;
this.requestedCompression = requestedCompression;
this.properties = ImmutableMap.copyOf(properties);
Expand All @@ -60,6 +66,14 @@ public List<Integer> inputFields() {
return inputFields;
}

public long snapshotId() {
return snapshotId;
}

public long sequenceNumber() {
return sequenceNumber;
}

public ByteBuffer blobData() {
return blobData;
}
Expand Down
22 changes: 20 additions & 2 deletions core/src/main/java/org/apache/iceberg/puffin/BlobMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,23 @@
public class BlobMetadata {
private final String type;
private final List<Integer> inputFields;
private final long snapshotId;
private final long sequenceNumber;
private final long offset;
private final long length;
private final String compressionCodec;
private final Map<String, String> properties;

public BlobMetadata(
String type, List<Integer> inputFields, long offset, long length,
@Nullable String compressionCodec, Map<String, String> properties) {
String type, List<Integer> inputFields, long snapshotId, long sequenceNumber,
long offset, long length, @Nullable String compressionCodec, Map<String, String> properties) {
Preconditions.checkNotNull(type, "type is null");
Preconditions.checkNotNull(inputFields, "inputFields is null");
Preconditions.checkNotNull(properties, "properties is null");
this.type = type;
this.inputFields = ImmutableList.copyOf(inputFields);
this.snapshotId = snapshotId;
this.sequenceNumber = sequenceNumber;
this.offset = offset;
this.length = length;
this.compressionCodec = compressionCodec;
Expand All @@ -56,6 +60,20 @@ public List<Integer> inputFields() {
return inputFields;
}

/**
* ID of the Iceberg table's snapshot the blob was computed from
*/
public long snapshotId() {
return snapshotId;
}

/**
* Sequence number of the Iceberg table's snapshot the blob was computed from
*/
public long sequenceNumber() {
return sequenceNumber;
}

/**
* Offset in the file
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ private FileMetadataParser() {

private static final String TYPE = "type";
private static final String FIELDS = "fields";
private static final String SNAPSHOT_ID = "snapshot-id";
private static final String SEQUENCE_NUMBER = "sequence-number";
private static final String OFFSET = "offset";
private static final String LENGTH = "length";
private static final String COMPRESSION_CODEC = "compression-codec";
Expand Down Expand Up @@ -123,6 +125,8 @@ static void toJson(BlobMetadata blobMetadata, JsonGenerator generator) throws IO
generator.writeNumber(field);
}
generator.writeEndArray();
generator.writeNumberField(SNAPSHOT_ID, blobMetadata.snapshotId());
generator.writeNumberField(SEQUENCE_NUMBER, blobMetadata.sequenceNumber());

generator.writeNumberField(OFFSET, blobMetadata.offset());
generator.writeNumberField(LENGTH, blobMetadata.length());
Expand All @@ -145,6 +149,8 @@ static void toJson(BlobMetadata blobMetadata, JsonGenerator generator) throws IO
static BlobMetadata blobMetadataFromJson(JsonNode json) {
String type = JsonUtil.getString(TYPE, json);
List<Integer> fields = JsonUtil.getIntegerList(FIELDS, json);
long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, json);
long sequenceNumber = JsonUtil.getLong(SEQUENCE_NUMBER, json);
long offset = JsonUtil.getLong(OFFSET, json);
long length = JsonUtil.getLong(LENGTH, json);
String compressionCodec = JsonUtil.getStringOrNull(COMPRESSION_CODEC, json);
Expand All @@ -158,6 +164,8 @@ static BlobMetadata blobMetadataFromJson(JsonNode json) {
return new BlobMetadata(
type,
fields,
snapshotId,
sequenceNumber,
offset,
length,
compressionCodec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public void add(Blob blob) {
ByteBuffer rawData = PuffinFormat.compress(codec, blob.blobData());
int length = rawData.remaining();
IOUtil.writeFully(outputStream, rawData);
writtenBlobsMetadata.add(new BlobMetadata(blob.type(), blob.inputFields(), fileOffset, length,
codec.codecName(), blob.properties()));
writtenBlobsMetadata.add(new BlobMetadata(blob.type(), blob.inputFields(), blob.snapshotId(),
blob.sequenceNumber(), fileOffset, length, codec.codecName(), blob.properties()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private PuffinFormatTestUtil() {
public static final long EMPTY_PUFFIN_UNCOMPRESSED_FOOTER_SIZE = 28;

// footer size for v1/sample-metric-data-compressed-zstd.bin
public static final long SAMPLE_METRIC_DATA_COMPRESSED_ZSTD_FOOTER_SIZE = 242;
public static final long SAMPLE_METRIC_DATA_COMPRESSED_ZSTD_FOOTER_SIZE = 314;

static byte[] readTestResource(String resourceName) throws Exception {
return Resources.toByteArray(Resources.getResource(PuffinFormatTestUtil.class, resourceName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,23 @@ public void testBlobMetadata() {
testJsonSerialization(
new FileMetadata(
ImmutableList.of(
new BlobMetadata("type-a", ImmutableList.of(1), 4, 16, null, ImmutableMap.of()),
new BlobMetadata("type-bbb", ImmutableList.of(2, 3, 4), Integer.MAX_VALUE * 10000L, 79834, null,
new BlobMetadata("type-a", ImmutableList.of(1), 14, 3, 4, 16, null, ImmutableMap.of()),
new BlobMetadata("type-bbb", ImmutableList.of(2, 3, 4), 77, 4, Integer.MAX_VALUE * 10000L, 79834, null,
ImmutableMap.of())),
ImmutableMap.of()),
"{\n" +
" \"blobs\" : [ {\n" +
" \"type\" : \"type-a\",\n" +
" \"fields\" : [ 1 ],\n" +
" \"snapshot-id\" : 14,\n" +
" \"sequence-number\" : 3,\n" +
" \"offset\" : 4,\n" +
" \"length\" : 16\n" +
" }, {\n" +
" \"type\" : \"type-bbb\",\n" +
" \"fields\" : [ 2, 3, 4 ],\n" +
" \"snapshot-id\" : 77,\n" +
" \"sequence-number\" : 4,\n" +
" \"offset\" : 21474836470000,\n" +
" \"length\" : 79834\n" +
" } ]\n" +
Expand All @@ -124,13 +128,15 @@ public void testBlobProperties() {
new FileMetadata(
ImmutableList.of(
new BlobMetadata(
"type-a", ImmutableList.of(1), 4, 16, null,
"type-a", ImmutableList.of(1), 14, 3, 4, 16, null,
ImmutableMap.of("some key", "some value"))),
ImmutableMap.of()),
"{\n" +
" \"blobs\" : [ {\n" +
" \"type\" : \"type-a\",\n" +
" \"fields\" : [ 1 ],\n" +
" \"snapshot-id\" : 14,\n" +
" \"sequence-number\" : 3,\n" +
" \"offset\" : 4,\n" +
" \"length\" : 16,\n" +
" \"properties\" : {\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ private void testWriteMetric(PuffinCompressionCodec compression, String expected
try (PuffinWriter writer = Puffin.write(outputFile)
.createdBy("Test 1234")
.build()) {
writer.add(new Blob("some-blob", ImmutableList.of(1), ByteBuffer.wrap("abcdefghi".getBytes(UTF_8)),
writer.add(new Blob("some-blob", ImmutableList.of(1), 2, 1, ByteBuffer.wrap("abcdefghi".getBytes(UTF_8)),
compression, ImmutableMap.of()));

// "xxx"s are stripped away by data offsets
byte[] bytes =
"xxx some blob \u0000 binary data 🤯 that is not very very very very very very long, is it? xxx".getBytes(
UTF_8);
writer.add(new Blob("some-other-blob", ImmutableList.of(2), ByteBuffer.wrap(bytes, 4, bytes.length - 8),
writer.add(new Blob("some-other-blob", ImmutableList.of(2), 2, 1, ByteBuffer.wrap(bytes, 4, bytes.length - 8),
compression, ImmutableMap.of()));

assertThat(writer.writtenBlobsMetadata()).hasSize(2);
Expand Down
Binary file not shown.
Binary file not shown.
4 changes: 2 additions & 2 deletions format/puffin-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ with content size present), UTF-8 encoded JSON payload representing a single
|-------------------|-----------------------------------------|----------| ----------- |
| type | JSON string | yes | See [Blob types](#blob-types)
| fields | JSON list of ints | yes | List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
| snapshot-id | JSON long | yes | ID of the Iceberg table's snapshot the blob was computed from.
| sequence-number | JSON long | yes | Sequence number of the Iceberg table's snapshot the blob was computed from.
| offset | JSON long | yes | The offset in the file where the blob contents start
| length | JSON long | yes | The length of the blob stored in the file (after compression, if compressed)
| compression-codec | JSON string | no | See [Compression codecs](#compression-codecs). If omitted, the data is assumed to be uncompressed.
Expand Down Expand Up @@ -140,5 +142,3 @@ When writing a Puffin file it is recommended to set the following fields in the

- `created-by` - human-readable identification of the application writing the file,
along with its version. Example "Trino version 381".
- `source-snapshot-id` - a table snapshot which was used to calculate blob contents
- `source-sequence-number` - sequence number of the table snapshot used to calculate blob contents