diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index c49689b716..34c376ea6b 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -25,10 +25,11 @@ from pyiceberg.catalog.rest import RestCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.expressions import AlwaysTrue, EqualTo, LessThanOrEqual -from pyiceberg.manifest import ManifestEntryStatus +from pyiceberg.manifest import ManifestContent, ManifestEntryStatus from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table +from pyiceberg.table.puffin import PuffinFile from pyiceberg.table.snapshots import Operation, Summary from pyiceberg.transforms import IdentityTransform from pyiceberg.types import FloatType, IntegerType, LongType, NestedField, StringType, TimestampType @@ -1024,3 +1025,66 @@ def test_manifest_entry_snapshot_id_after_partial_deletes(session_catalog: RestC f"DELETED entry snapshot_id should be {after_delete_snapshot.snapshot_id} " f"(the deleting snapshot), but was {entry.snapshot_id}" ) + + +@pytest.mark.integration +def test_read_spark_written_puffin_dv(spark: SparkSession, session_catalog: RestCatalog) -> None: + """Verify pyiceberg can read Puffin DVs written by Spark.""" + identifier = "default.spark_puffin_format_test" + + spark.sql(f"DROP TABLE IF EXISTS {identifier}") + spark.sql( + f""" + CREATE TABLE {identifier} (id BIGINT) + USING iceberg + TBLPROPERTIES ( + 'format-version' = '3', + 'write.delete.mode' = 'merge-on-read' + ) + """ + ) + + df = spark.range(1, 51) + df.coalesce(1).writeTo(identifier).append() + + files_before = spark.sql(f"SELECT * FROM {identifier}.files").collect() + assert len(files_before) == 1, f"Expected 1 file, got {len(files_before)}" + + spark.sql(f"DELETE FROM {identifier} WHERE id IN (10, 20, 30, 40)") + + table = session_catalog.load_table(identifier) + current_snapshot = table.current_snapshot() + assert current_snapshot is not None + + manifests = current_snapshot.manifests(table.io) + delete_manifests = [m for m in manifests if m.content == ManifestContent.DELETES] + assert len(delete_manifests) > 0, "Expected delete manifest with DVs" + + delete_manifest = delete_manifests[0] + entries = list(delete_manifest.fetch_manifest_entry(table.io)) + assert len(entries) == 1, "Expected exactly one delete file entry" + + delete_entry = entries[0] + puffin_path = delete_entry.data_file.file_path + assert puffin_path.endswith(".puffin"), f"Expected Puffin file, got: {puffin_path}" + + input_file = table.io.new_input(puffin_path) + with input_file.open() as f: + puffin_bytes = f.read() + + puffin = PuffinFile(puffin_bytes) + + assert len(puffin.footer.blobs) == 1, "Expected exactly one blob" + + blob = puffin.footer.blobs[0] + assert blob.type == "deletion-vector-v1" + assert "referenced-data-file" in blob.properties + assert blob.properties["cardinality"] == "4" + + dv_dict = puffin.to_vector() + assert len(dv_dict) == 1, "Expected one data file's deletions" + + for _data_file_path, chunked_array in dv_dict.items(): + positions = chunked_array.to_pylist() + assert len(positions) == 4, f"Expected 4 deleted positions, got {len(positions)}" + assert sorted(positions) == [9, 19, 29, 39], f"Unexpected positions: {positions}"