Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ test-integration:
docker compose -f dev/docker-compose-integration.yml kill
docker compose -f dev/docker-compose-integration.yml rm -f
docker compose -f dev/docker-compose-integration.yml up -d
sleep 10
sleep 5
docker compose -f dev/docker-compose-integration.yml cp ./dev/provision.py spark-iceberg:/opt/spark/provision.py
docker compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
poetry run pytest tests/ -v -m integration ${PYTEST_ARGS}
poetry run pytest tests/ -v -m adrian ${PYTEST_ARGS} -s

test-integration-rebuild:
docker compose -f dev/docker-compose-integration.yml kill
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/expressions/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@ def _(self, _: TimeType) -> Literal[int]:
def _(self, _: TimestampType) -> Literal[int]:
return TimestampLiteral(self.value)

@to.register(TimestamptzType)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

def _(self, _: TimestamptzType) -> Literal[int]:
return TimestampLiteral(self.value)

@to.register(DecimalType)
def _(self, type_var: DecimalType) -> Literal[Decimal]:
unscaled = Decimal(self.value)
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id
self._sequence_number = sequence_number

def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
wrapped_manifest_file = ManifestFile(*manifest_file.record_fields())
wrapped_manifest_file = ManifestFile(*manifest_file.record_values())
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Record class should be extremely lightweight, and we should avoid adding additional methods to it. I've changed this into a copy in apache#580


if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ:
# if the sequence number is being assigned here, then the manifest must be created by the current operation.
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def partition_to_path(self, data: Record, schema: Schema) -> str:

field_strs = []
value_strs = []
for pos, value in enumerate(data.record_fields()):
for pos, value in enumerate(data.record_values()):
partition_field = self.fields[pos]
value_str = partition_field.transform.to_human_string(field_types[pos].field_type, value=value)

Expand Down
96 changes: 93 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
And,
BooleanExpression,
EqualTo,
IsNull,
Not,
Or,
Reference,
Expand Down Expand Up @@ -428,6 +429,70 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
for data_file in data_files:
update_snapshot.append_data_file(data_file)

def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""
Shorthand for adding a table dynamic overwrite with a PyArrow table to the transaction.

Args:
df: The Arrow dataframe that will be used to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
"""

def _build_partition_predicate(spec_id: int, delete_partitions: List[Record]) -> BooleanExpression:
expr: BooleanExpression = AlwaysFalse()
for partition in delete_partitions:
match_partition_expression: BooleanExpression = AlwaysTrue()
partition_fields = partition.record_fields()
for pos in range(len(partition_fields)):
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of relying on the .record_fields() you should pass in the PartitionSpec and loop over the partition-fields instead.

predicate = (
EqualTo(Reference(partition_fields[pos]), partition[pos])
if partition[pos] is not None
else IsNull(Reference(partition_fields[pos]))
)
match_partition_expression = And(match_partition_expression, predicate)
expr = Or(expr, match_partition_expression)
return expr

try:
import pyarrow as pa
except ModuleNotFoundError as e:
raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

if not isinstance(df, pa.Table):
raise ValueError(f"Expected PyArrow table, got: {df}")

_check_schema_compatible(self._table.schema(), other_schema=df.schema)

# cast if the two schemas are compatible but not equal
table_arrow_schema = self._table.schema().as_arrow()
if table_arrow_schema != df.schema:
df = df.cast(table_arrow_schema)

# If dataframe does not have data, there is no need to overwrite
if df.shape[0] == 0:
return
Comment on lines +479 to +488
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point we might want to consolidate these checks


append_snapshot_commit_uuid = uuid.uuid4()
data_files: List[DataFile] = list(
_dataframe_to_data_files(
table_metadata=self._table.metadata, write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io
)
)
with self.update_snapshot(snapshot_properties=snapshot_properties).delete(
only_delete_within_latest_spec=True
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having an internal debate with myself what the right thing to do here, or what the user expects. The delete method will also rewrite older specs into newer ones if there is a partial delete.

I think if I would evolve the spec from a monthly to a daily partitioning. Run a dynamic overwrite over certain days, that the data would be overwritten.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Yes, originally I think it would be dangerous for a user to dynamic overwrite partitions not in the current spec (because unlike static overwrite, user does not provide a filter explicitly and he might be unconcious of this). But after discussion with Sung, we both think it is more natural to detect the filter in current spec but apply to all data files than requiring the user to have awareness of the timepoint where partiiton evolution happens so that he knows what data files will get touched and what will not.

) as delete_snapshot:
delete_partitions = [data_file.partition for data_file in data_files]
delete_filter = _build_partition_predicate(
spec_id=self.table_metadata.spec().spec_id, delete_partitions=delete_partitions
)
delete_snapshot.delete_by_predicate(delete_filter)

with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append(
append_snapshot_commit_uuid
) as append_snapshot:
for data_file in data_files:
append_snapshot.append_data_file(data_file)

def overwrite(
self,
df: pa.Table,
Expand Down Expand Up @@ -1425,6 +1490,17 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
with self.transaction() as tx:
tx.append(df=df, snapshot_properties=snapshot_properties)

def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
"""Shorthand for dynamic overwriting the table with a PyArrow table.

Old partitions are auto detected and replaced with data files created for input arrow table.
Args:
df: The Arrow dataframe that will be used to overwrite the table
snapshot_properties: Custom properties to be added to the snapshot summary
"""
with self.transaction() as tx:
tx.dynamic_overwrite(df=df, snapshot_properties=snapshot_properties)

def overwrite(
self,
df: pa.Table,
Expand Down Expand Up @@ -3003,6 +3079,7 @@ class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]):
"""

_predicate: BooleanExpression
_only_delete_within_latest_spec: bool

def __init__(
self,
Expand All @@ -3011,9 +3088,11 @@ def __init__(
io: FileIO,
commit_uuid: Optional[uuid.UUID] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
only_delete_within_latest_spec: bool = False,
Copy link
Copy Markdown
Owner

@Fokko Fokko May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
only_delete_within_latest_spec: bool = False,
only_delete_current_spec: bool = False,

The latest can be different from the current spec. If you add a field to the partition spec and drop it immediately, it will re-use the existing spec.

):
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
self._predicate = AlwaysFalse()
self._only_delete_within_latest_spec = only_delete_within_latest_spec

def _commit(self) -> UpdatesAndRequirements:
# Only produce a commit when there is something to delete
Expand Down Expand Up @@ -3070,6 +3149,12 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
self._deleted_data_files = set()
if snapshot := self._transaction.table_metadata.current_snapshot():
for manifest_file in snapshot.manifests(io=self._io):
if (
self._only_delete_within_latest_spec
and manifest_file.partition_spec_id != self._transaction.table_metadata.spec().spec_id
):
existing_manifests.append(manifest_file)
continue
if manifest_file.content == ManifestContent.DATA:
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
# If the manifest isn't relevant, we can just keep it in the manifest-list
Expand Down Expand Up @@ -3252,9 +3337,13 @@ def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Di
self._io = io
self._snapshot_properties = snapshot_properties

def fast_append(self) -> FastAppendFiles:
def fast_append(self, commit_uuid: Optional[uuid.UUID] = None) -> FastAppendFiles:
return FastAppendFiles(
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
operation=Operation.APPEND,
transaction=self._transaction,
io=self._io,
snapshot_properties=self._snapshot_properties,
commit_uuid=commit_uuid,
)

def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles:
Expand All @@ -3268,12 +3357,13 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> OverwriteFiles:
snapshot_properties=self._snapshot_properties,
)

def delete(self) -> DeleteFiles:
def delete(self, only_delete_within_latest_spec: bool = False) -> DeleteFiles:
return DeleteFiles(
operation=Operation.DELETE,
transaction=self._transaction,
io=self._io,
snapshot_properties=self._snapshot_properties,
only_delete_within_latest_spec=only_delete_within_latest_spec,
)


Expand Down
4 changes: 2 additions & 2 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,14 @@ def set_partition_summary_limit(self, limit: int) -> None:

def add_file(self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC) -> None:
self.metrics.add_file(data_file)
if len(data_file.partition.record_fields()) != 0:
if len(data_file.partition.record_values()) != 0:
self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=True, schema=schema)

def remove_file(
self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC
) -> None:
self.metrics.remove_file(data_file)
if len(data_file.partition.record_fields()) != 0:
if len(data_file.partition.record_values()) != 0:
self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=False, schema=schema)

def update_partition_metrics(self, partition_spec: PartitionSpec, file: DataFile, is_add_file: bool, schema: Schema) -> None:
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/typedef.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ def __repr__(self) -> str:
return f"{self.__class__.__name__}[{', '.join(f'{key}={repr(value)}' for key, value in self.__dict__.items() if not key.startswith('_'))}]"

def record_fields(self) -> List[str]:
"""Return all the fields of the Record class except those specified in skip_fields."""
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point, the Record should be just a list that represents a struct. The record itself should not carry a schema. See apache#579

return list(self._position_to_field_name)

def record_values(self) -> List[str]:
"""Return values of all the fields of the Record class except those specified in skip_fields."""
return [self.__getattribute__(v) if hasattr(self, v) else None for v in self._position_to_field_name]

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_rest_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None:
sort_order_id=entry.data_file.sort_order_id,
spec_id=entry.data_file.spec_id,
)
wrapped_entry_v2 = ManifestEntry(*entry.record_fields())
wrapped_entry_v2 = ManifestEntry(*entry.record_values())
wrapped_entry_v2.data_file = wrapped_data_file_v2_debug
wrapped_entry_v2_dict = todict(wrapped_entry_v2)
# This one should not be written
Expand Down
Loading