Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import AlwaysTrue, EqualTo, LessThanOrEqual
from pyiceberg.manifest import ManifestEntryStatus
from pyiceberg.manifest import ManifestContent, ManifestEntryStatus
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.puffin import PuffinFile
from pyiceberg.table.snapshots import Operation, Summary
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import FloatType, IntegerType, LongType, NestedField, StringType, TimestampType
Expand Down Expand Up @@ -1024,3 +1025,66 @@ def test_manifest_entry_snapshot_id_after_partial_deletes(session_catalog: RestC
f"DELETED entry snapshot_id should be {after_delete_snapshot.snapshot_id} "
f"(the deleting snapshot), but was {entry.snapshot_id}"
)


@pytest.mark.integration
def test_read_spark_written_puffin_dv(spark: SparkSession, session_catalog: RestCatalog) -> None:
"""Verify pyiceberg can read Puffin DVs written by Spark."""
identifier = "default.spark_puffin_format_test"

spark.sql(f"DROP TABLE IF EXISTS {identifier}")
spark.sql(
f"""
CREATE TABLE {identifier} (id BIGINT)
USING iceberg
TBLPROPERTIES (
'format-version' = '3',
'write.delete.mode' = 'merge-on-read'
)
"""
)

df = spark.range(1, 51)
df.coalesce(1).writeTo(identifier).append()

files_before = spark.sql(f"SELECT * FROM {identifier}.files").collect()
assert len(files_before) == 1, f"Expected 1 file, got {len(files_before)}"

spark.sql(f"DELETE FROM {identifier} WHERE id IN (10, 20, 30, 40)")

table = session_catalog.load_table(identifier)
current_snapshot = table.current_snapshot()
assert current_snapshot is not None

manifests = current_snapshot.manifests(table.io)
delete_manifests = [m for m in manifests if m.content == ManifestContent.DELETES]
assert len(delete_manifests) > 0, "Expected delete manifest with DVs"

delete_manifest = delete_manifests[0]
entries = list(delete_manifest.fetch_manifest_entry(table.io))
assert len(entries) == 1, "Expected exactly one delete file entry"

delete_entry = entries[0]
puffin_path = delete_entry.data_file.file_path
assert puffin_path.endswith(".puffin"), f"Expected Puffin file, got: {puffin_path}"

input_file = table.io.new_input(puffin_path)
with input_file.open() as f:
puffin_bytes = f.read()

puffin = PuffinFile(puffin_bytes)

assert len(puffin.footer.blobs) == 1, "Expected exactly one blob"

blob = puffin.footer.blobs[0]
assert blob.type == "deletion-vector-v1"
assert "referenced-data-file" in blob.properties
assert blob.properties["cardinality"] == "4"

dv_dict = puffin.to_vector()
assert len(dv_dict) == 1, "Expected one data file's deletions"

for _data_file_path, chunked_array in dv_dict.items():
positions = chunked_array.to_pylist()
assert len(positions) == 4, f"Expected 4 deleted positions, got {len(positions)}"
assert sorted(positions) == [9, 19, 29, 39], f"Unexpected positions: {positions}"