Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,27 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;

public class MetadataLogsTable extends BaseMetadataTable {
public class MetadataLogEntriesTable extends BaseMetadataTable {

private static final Schema METADATA_LOGS_SCHEMA =
private static final Schema METADATA_LOG_ENTRIES_SCHEMA =
new Schema(
Types.NestedField.required(1, "timestamp_millis", Types.LongType.get()),
Types.NestedField.required(1, "timestamp", Types.TimestampType.withZone()),
Types.NestedField.required(2, "file", Types.StringType.get()),
Types.NestedField.optional(3, "latest_snapshot_id", Types.LongType.get()),
Types.NestedField.optional(4, "latest_schema_id", Types.IntegerType.get()),
Types.NestedField.optional(5, "latest_sequence_number", Types.LongType.get()));

MetadataLogsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".metadata_logs");
MetadataLogEntriesTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".metadata_log_entries");
}

MetadataLogsTable(TableOperations ops, Table table, String name) {
MetadataLogEntriesTable(TableOperations ops, Table table, String name) {
super(ops, table, name);
}

@Override
MetadataTableType metadataTableType() {
return MetadataTableType.METADATA_LOGS;
return MetadataTableType.METADATA_LOG_ENTRIES;
}

@Override
Expand All @@ -54,7 +54,7 @@ public TableScan newScan() {

@Override
public Schema schema() {
return METADATA_LOGS_SCHEMA;
return METADATA_LOG_ENTRIES_SCHEMA;
}

private DataTask task(TableScan scan) {
Expand All @@ -69,26 +69,27 @@ private DataTask task(TableScan scan) {
schema(),
scan.schema(),
metadataLogEntries,
metadataLogEntry -> MetadataLogsTable.metadataLogToRow(metadataLogEntry, table()));
metadataLogEntry ->
MetadataLogEntriesTable.metadataLogEntryToRow(metadataLogEntry, table()));
}

private class MetadataLogScan extends StaticTableScan {
MetadataLogScan(TableOperations ops, Table table) {
super(
ops,
table,
METADATA_LOGS_SCHEMA,
MetadataTableType.METADATA_LOGS,
MetadataLogsTable.this::task);
METADATA_LOG_ENTRIES_SCHEMA,
MetadataTableType.METADATA_LOG_ENTRIES,
MetadataLogEntriesTable.this::task);
}

MetadataLogScan(TableOperations ops, Table table, TableScanContext context) {
super(
ops,
table,
METADATA_LOGS_SCHEMA,
MetadataTableType.METADATA_LOGS,
MetadataLogsTable.this::task,
METADATA_LOG_ENTRIES_SCHEMA,
MetadataTableType.METADATA_LOG_ENTRIES,
MetadataLogEntriesTable.this::task,
context);
}

Expand All @@ -100,11 +101,11 @@ protected TableScan newRefinedScan(

@Override
public CloseableIterable<FileScanTask> planFiles() {
return CloseableIterable.withNoopClose(MetadataLogsTable.this.task(this));
return CloseableIterable.withNoopClose(MetadataLogEntriesTable.this.task(this));
}
}

private static StaticDataTask.Row metadataLogToRow(
private static StaticDataTask.Row metadataLogEntryToRow(
TableMetadata.MetadataLogEntry metadataLogEntry, Table table) {
Long latestSnapshotId = null;
Snapshot latestSnapshot = null;
Expand All @@ -116,7 +117,7 @@ private static StaticDataTask.Row metadataLogToRow(
}

return StaticDataTask.Row.of(
metadataLogEntry.timestampMillis(),
metadataLogEntry.timestampMillis() * 1000,
metadataLogEntry.file(),
// latest snapshot in this file corresponding to the log entry
latestSnapshotId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public enum MetadataTableType {
DATA_FILES,
DELETE_FILES,
HISTORY,
METADATA_LOGS,
METADATA_LOG_ENTRIES,
SNAPSHOTS,
MANIFESTS,
PARTITIONS,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ private static Table createMetadataTableInstance(
return new HistoryTable(ops, baseTable, metadataTableName);
case SNAPSHOTS:
return new SnapshotsTable(ops, baseTable, metadataTableName);
case METADATA_LOGS:
return new MetadataLogsTable(ops, baseTable, metadataTableName);
case METADATA_LOG_ENTRIES:
return new MetadataLogEntriesTable(ops, baseTable, metadataTableName);
case MANIFESTS:
return new ManifestsTable(ops, baseTable, metadataTableName);
case PARTITIONS:
Expand Down
14 changes: 14 additions & 0 deletions docs/spark-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,20 @@ SELECT * FROM prod.db.table.history
**This shows a commit that was rolled back.** The example has two snapshots with the same parent, and one is *not* an ancestor of the current table state.
{{< /hint >}}

### Metadata Log Entries

To show table metadata log entries:

```sql
SELECT * from prod.db.table.metadata_log_entries
```

| timestamp | file | latest_snapshot_id | latest_schema_id | latest_sequence_number |
| -- | -- | -- | -- | -- |
| 2022-07-28 10:43:52.93 | s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.json | null | null | null |
| 2022-07-28 10:43:57.487 | s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json | 170260833677645300 | 0 | 1 |
| 2022-07-28 10:43:58.25 | s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json | 958906493976709774 | 0 | 2 |

### Snapshots

To show the valid snapshots for a table:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -381,7 +382,7 @@ public void testAllFilesPartitioned() throws Exception {
}

@Test
public void testMetadataLogs() throws Exception {
public void testMetadataLogEntries() throws Exception {
// Create table and insert data
sql(
"CREATE TABLE %s (id bigint, data string) "
Expand All @@ -408,24 +409,24 @@ public void testMetadataLogs() throws Exception {
Lists.newArrayList(tableMetadata.previousFiles());

// Check metadataLog table
List<Object[]> metadataLogs = sql("SELECT * FROM %s.metadata_logs", tableName);
List<Object[]> metadataLogs = sql("SELECT * FROM %s.metadata_log_entries", tableName);
assertEquals(
"MetadataLogsTable result should match the metadataLog entries",
"MetadataLogEntriesTable result should match the metadataLog entries",
ImmutableList.of(
row(
metadataLogEntries.get(0).timestampMillis(),
DateTimeUtils.toJavaTimestamp(metadataLogEntries.get(0).timestampMillis() * 1000),
metadataLogEntries.get(0).file(),
null,
null,
null),
row(
metadataLogEntries.get(1).timestampMillis(),
DateTimeUtils.toJavaTimestamp(metadataLogEntries.get(1).timestampMillis() * 1000),
metadataLogEntries.get(1).file(),
parentSnapshot.snapshotId(),
parentSnapshot.schemaId(),
parentSnapshot.sequenceNumber()),
row(
currentSnapshot.timestampMillis(),
DateTimeUtils.toJavaTimestamp(currentSnapshot.timestampMillis() * 1000),
tableMetadata.metadataFileLocation(),
currentSnapshot.snapshotId(),
currentSnapshot.schemaId(),
Expand All @@ -435,14 +436,16 @@ public void testMetadataLogs() throws Exception {
// test filtering
List<Object[]> metadataLogWithFilters =
sql(
"SELECT * FROM %s.metadata_logs WHERE latest_snapshot_id = %s",
"SELECT * FROM %s.metadata_log_entries WHERE latest_snapshot_id = %s",
tableName, currentSnapshotId);
Assert.assertEquals("metadataLog table should return 1 row", 1, metadataLogWithFilters.size());
Assert.assertEquals(
"metadataLogEntries table should return 1 row", 1, metadataLogWithFilters.size());
assertEquals(
"Result should match the latest snapshot entry",
ImmutableList.of(
row(
tableMetadata.currentSnapshot().timestampMillis(),
DateTimeUtils.toJavaTimestamp(
tableMetadata.currentSnapshot().timestampMillis() * 1000),
tableMetadata.metadataFileLocation(),
tableMetadata.currentSnapshot().snapshotId(),
tableMetadata.currentSnapshot().schemaId(),
Expand All @@ -455,9 +458,10 @@ public void testMetadataLogs() throws Exception {
.map(TableMetadata.MetadataLogEntry::file)
.collect(Collectors.toList());
metadataFiles.add(tableMetadata.metadataFileLocation());
List<Object[]> metadataLogWithProjection = sql("SELECT file FROM %s.metadata_logs", tableName);
List<Object[]> metadataLogWithProjection =
sql("SELECT file FROM %s.metadata_log_entries", tableName);
Assert.assertEquals(
"metadataLog table should return 3 rows", 3, metadataLogWithProjection.size());
"metadataLogEntries table should return 3 rows", 3, metadataLogWithProjection.size());
assertEquals(
"metadataLog entry should be of same file",
metadataFiles.stream().map(this::row).collect(Collectors.toList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -381,7 +382,7 @@ public void testAllFilesPartitioned() throws Exception {
}

@Test
public void testMetadataLogs() throws Exception {
public void testMetadataLogEntries() throws Exception {
// Create table and insert data
sql(
"CREATE TABLE %s (id bigint, data string) "
Expand All @@ -408,24 +409,24 @@ public void testMetadataLogs() throws Exception {
Lists.newArrayList(tableMetadata.previousFiles());

// Check metadataLog table
List<Object[]> metadataLogs = sql("SELECT * FROM %s.metadata_logs", tableName);
List<Object[]> metadataLogs = sql("SELECT * FROM %s.metadata_log_entries", tableName);
assertEquals(
"MetadataLogsTable result should match the metadataLog entries",
"MetadataLogEntriesTable result should match the metadataLog entries",
ImmutableList.of(
row(
metadataLogEntries.get(0).timestampMillis(),
DateTimeUtils.toJavaTimestamp(metadataLogEntries.get(0).timestampMillis() * 1000),
metadataLogEntries.get(0).file(),
null,
null,
null),
row(
metadataLogEntries.get(1).timestampMillis(),
DateTimeUtils.toJavaTimestamp(metadataLogEntries.get(1).timestampMillis() * 1000),
metadataLogEntries.get(1).file(),
parentSnapshot.snapshotId(),
parentSnapshot.schemaId(),
parentSnapshot.sequenceNumber()),
row(
currentSnapshot.timestampMillis(),
DateTimeUtils.toJavaTimestamp(currentSnapshot.timestampMillis() * 1000),
tableMetadata.metadataFileLocation(),
currentSnapshot.snapshotId(),
currentSnapshot.schemaId(),
Expand All @@ -435,14 +436,16 @@ public void testMetadataLogs() throws Exception {
// test filtering
List<Object[]> metadataLogWithFilters =
sql(
"SELECT * FROM %s.metadata_logs WHERE latest_snapshot_id = %s",
"SELECT * FROM %s.metadata_log_entries WHERE latest_snapshot_id = %s",
tableName, currentSnapshotId);
Assert.assertEquals("metadataLog table should return 1 row", 1, metadataLogWithFilters.size());
Assert.assertEquals(
"metadataLogEntries table should return 1 row", 1, metadataLogWithFilters.size());
assertEquals(
"Result should match the latest snapshot entry",
ImmutableList.of(
row(
tableMetadata.currentSnapshot().timestampMillis(),
DateTimeUtils.toJavaTimestamp(
tableMetadata.currentSnapshot().timestampMillis() * 1000),
tableMetadata.metadataFileLocation(),
tableMetadata.currentSnapshot().snapshotId(),
tableMetadata.currentSnapshot().schemaId(),
Expand All @@ -455,9 +458,10 @@ public void testMetadataLogs() throws Exception {
.map(TableMetadata.MetadataLogEntry::file)
.collect(Collectors.toList());
metadataFiles.add(tableMetadata.metadataFileLocation());
List<Object[]> metadataLogWithProjection = sql("SELECT file FROM %s.metadata_logs", tableName);
List<Object[]> metadataLogWithProjection =
sql("SELECT file FROM %s.metadata_log_entries", tableName);
Assert.assertEquals(
"metadataLog table should return 3 rows", 3, metadataLogWithProjection.size());
"metadataLogEntries table should return 3 rows", 3, metadataLogWithProjection.size());
assertEquals(
"metadataLog entry should be of same file",
metadataFiles.stream().map(this::row).collect(Collectors.toList()),
Expand Down