Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the correct spec when rewiting existing manifests #1157

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
if any(entry.data_file not in found_deleted_data_files for entry in entries):
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
spec=self._transaction.table_metadata.spec(),
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
Expand Down
75 changes: 57 additions & 18 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import math
import os
import time
from datetime import date, datetime
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Any, Dict
from urllib.parse import urlparse

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
import pytest
import pytz
Expand All @@ -39,12 +40,12 @@
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import GreaterThanOrEqual, In, Not
from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not
from pyiceberg.io.pyarrow import _dataframe_to_data_files
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.transforms import DayTransform, IdentityTransform
from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform
from pyiceberg.types import (
DateType,
DoubleType,
Expand Down Expand Up @@ -1344,18 +1345,7 @@ def test_overwrite_all_data_with_filter(session_catalog: Catalog) -> None:


@pytest.mark.integration
def test_delete_threshold() -> None:
catalog = load_catalog(
"local",
**{
"type": "rest",
"uri": "http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)

def test_delete_threshold(session_catalog: Catalog) -> None:
schema = Schema(
NestedField(field_id=101, name="id", field_type=LongType(), required=True),
NestedField(field_id=103, name="created_at", field_type=DateType(), required=False),
Expand All @@ -1365,13 +1355,13 @@ def test_delete_threshold() -> None:
partition_spec = PartitionSpec(PartitionField(source_id=103, field_id=2000, transform=DayTransform(), name="created_at_day"))

try:
catalog.drop_table(
session_catalog.drop_table(
identifier="default.scores",
)
except NoSuchTableError:
pass

catalog.create_table(
session_catalog.create_table(
identifier="default.scores",
schema=schema,
partition_spec=partition_spec,
Expand All @@ -1395,7 +1385,7 @@ def test_delete_threshold() -> None:
# Create the dataframe
df = pd.DataFrame({"id": id_column, "created_at": created_at_column, "relevancy_score": relevancy_score_column})

iceberg_table = catalog.load_table("default.scores")
iceberg_table = session_catalog.load_table("default.scores")

# Convert the pandas DataFrame to a PyArrow Table with the Iceberg schema
arrow_schema = iceberg_table.schema().as_arrow()
Expand All @@ -1409,3 +1399,52 @@ def test_delete_threshold() -> None:
assert len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) == lower_before
iceberg_table.delete(delete_condition)
assert len(iceberg_table.scan().to_arrow()) == lower_before


@pytest.mark.integration
def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) -> None:
np.random.seed(876)
N = 1440
d = {
"timestamp": pa.array([datetime(2023, 1, 1, 0, 0, 0) + timedelta(minutes=i) for i in range(N)]),
"category": pa.array([np.random.choice(["A", "B", "C"]) for _ in range(N)]),
"value": pa.array(np.random.normal(size=N)),
}
data = pa.Table.from_pydict(d)

try:
session_catalog.drop_table(
identifier="default.test_error_table",
)
except NoSuchTableError:
pass

table = session_catalog.create_table(
"default.test_error_table",
schema=data.schema,
)

with table.update_spec() as update:
update.add_field("timestamp", transform=HourTransform())

table.append(data)

with table.update_spec() as update:
update.add_field("category", transform=IdentityTransform())

data_ = data.filter(
(pc.field("category") == "A")
& (pc.field("timestamp") >= datetime(2023, 1, 1, 0))
& (pc.field("timestamp") < datetime(2023, 1, 1, 1))
)

table.overwrite(
df=data_,
overwrite_filter=And(
And(
GreaterThanOrEqual("timestamp", datetime(2023, 1, 1, 0).isoformat()),
LessThan("timestamp", datetime(2023, 1, 1, 1).isoformat()),
),
EqualTo("category", "A"),
),
)