Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
0cad231
checkpoint
sungwy May 4, 2024
96e5533
checkpoint2
sungwy May 5, 2024
ddfa9ac
todo: sort with pyarrow_transform vals
sungwy May 5, 2024
1a5327a
checkpoint
sungwy May 6, 2024
e067a28
checkpoint
sungwy May 6, 2024
069f3bd
fix
sungwy May 6, 2024
615d5e3
tests
sungwy May 6, 2024
c0a0f32
more tests
sungwy May 6, 2024
d872245
Remove trailing slash from table location when creating a table (#702)
felixscherz May 6, 2024
a1f4ba8
Build: Bump mkdocs-section-index from 0.3.8 to 0.3.9 (#696)
dependabot[bot] May 7, 2024
e2f547d
Build: Bump cython from 3.0.8 to 3.0.10 (#697)
dependabot[bot] May 7, 2024
29beaf8
Build: Bump tqdm from 4.66.2 to 4.66.3 (#699)
dependabot[bot] May 7, 2024
70a45f6
Build: Bump werkzeug from 3.0.1 to 3.0.3 (#706)
dependabot[bot] May 7, 2024
0eb0c1c
Build: Bump jinja2 from 3.1.3 to 3.1.4 in /mkdocs (#707)
dependabot[bot] May 7, 2024
6a39eda
adopt review feedback
sungwy May 7, 2024
990ce80
Make `add_files` to support `snapshot_properties` argument (#695)
enkidulan May 7, 2024
0508667
Add support for categorical type (#693)
sungwy May 7, 2024
1f39b59
Build: Bump tenacity from 8.2.3 to 8.3.0 (#714)
dependabot[bot] May 8, 2024
50a65e5
Build: Bump mkdocstrings from 0.25.0 to 0.25.1 (#715)
dependabot[bot] May 8, 2024
3461305
Build: Bump coverage from 7.5.0 to 7.5.1 (#713)
dependabot[bot] May 8, 2024
399a9be
Build: Bump sqlalchemy from 2.0.29 to 2.0.30 (#712)
dependabot[bot] May 8, 2024
6f72e30
Build: Bump flask-cors from 4.0.0 to 4.0.1 (#718)
dependabot[bot] May 8, 2024
d14e137
comment
sungwy May 8, 2024
4de207d
Build: Bump mkdocs-material from 9.5.20 to 9.5.21 (#719)
dependabot[bot] May 9, 2024
d02d7a1
Build: Bump getdaft from 0.2.23 to 0.2.24 (#721)
dependabot[bot] May 9, 2024
aa361d1
Test, write subset of schema (#704)
kevinjqliu May 9, 2024
b41c98c
Remove pylintrc file (#724)
ndrluis May 13, 2024
444dca7
Add kevinjqliu to collaborators (#729)
Fokko May 13, 2024
7904fe5
Build: Bump moto from 5.0.6 to 5.0.7 (#733)
dependabot[bot] May 14, 2024
0d98ec8
Build: Bump mkdocs-material from 9.5.21 to 9.5.22 (#732)
dependabot[bot] May 14, 2024
6c2ba34
Build: Bump griffe from 0.44.0 to 0.45.0 (#731)
dependabot[bot] May 14, 2024
20b7b53
Build: Bump pypa/cibuildwheel from 2.17.0 to 2.18.0 (#730)
dependabot[bot] May 14, 2024
6d52325
Hive catalog: Add retry logic for hive locking (#701)
frankliee May 15, 2024
a268e5b
Add create_namespace_if_not_exists method (#725)
ndrluis May 15, 2024
b40378b
Remove NoSuchNamespaceError on namespace creation (#726)
ndrluis May 15, 2024
ac84bd5
Build: Bump pyarrow from 16.0.0 to 16.1.0 (#743)
dependabot[bot] May 15, 2024
20c2731
Build: Bump mkdocstrings-python from 1.10.0 to 1.10.1 (#744)
dependabot[bot] May 15, 2024
4fddcbe
Build: Bump mkdocstrings-python from 1.10.1 to 1.10.2 (#746)
dependabot[bot] May 21, 2024
0a58636
Build: Bump boto3 from 1.34.69 to 1.34.106 (#749)
dependabot[bot] May 21, 2024
c764d6a
--- (#754)
dependabot[bot] May 21, 2024
245ab87
--- (#755)
dependabot[bot] May 21, 2024
82df57e
--- (#756)
dependabot[bot] May 21, 2024
aa5a136
[FEAT]register table using iceberg metadata file via pyiceberg (#711)
MehulBatra May 22, 2024
5537cb4
modify doc(backward compatibility) typo (#757)
SeungyeopShin May 23, 2024
e917660
Bump requests from 2.32.1 to 2.32.2 (#759)
dependabot[bot] May 23, 2024
7083b2e
Bump griffe from 0.45.0 to 0.45.1 (#760)
dependabot[bot] May 23, 2024
03a0d65
Bump mypy-boto3-glue from 1.34.88 to 1.34.110 (#761)
dependabot[bot] May 23, 2024
996afd0
Bump mkdocstrings-python from 1.10.2 to 1.10.3 (#762)
dependabot[bot] May 23, 2024
eba4bee
Initial implementation of the manifest table (#717)
geruh May 23, 2024
42afc43
Fix: Table-Exists if Server returns 204 (#739)
c-thiel May 23, 2024
959718a
Bump duckdb from 0.10.2 to 0.10.3 (#764)
dependabot[bot] May 25, 2024
ed83e84
Bump griffe from 0.45.1 to 0.45.2 (#765)
dependabot[bot] May 25, 2024
b8023d2
Bump typing-extensions from 4.11.0 to 4.12.0 (#767)
dependabot[bot] May 25, 2024
a132be1
Bump mkdocs-material from 9.5.24 to 9.5.25 (#770)
dependabot[bot] May 28, 2024
8968996
Add azure configuration variables (#745)
kevinzwang May 28, 2024
ee2a7c5
Bump moto from 5.0.7 to 5.0.8 (#771)
dependabot[bot] May 28, 2024
54aacb4
Bump coverage from 7.5.1 to 7.5.2 (#772)
dependabot[bot] May 28, 2024
756ae62
Introduce hierarchical namespaces into SqlCatalog (#591)
cccs-eric May 28, 2024
4fb8ba2
Bump coverage from 7.5.2 to 7.5.3 (#776)
dependabot[bot] May 29, 2024
ec8d7dc
Bump pydantic from 2.7.1 to 2.7.2 (#775)
dependabot[bot] May 29, 2024
7552e03
Bump requests from 2.32.2 to 2.32.3 (#778)
dependabot[bot] May 30, 2024
e08cc9d
Bump getdaft from 0.2.24 to 0.2.25 (#779)
dependabot[bot] May 30, 2024
d3ad61c
Remove `record_fields` from the `Record` class (#580)
Fokko May 30, 2024
cf3bf8a
Unify to double quotes using Ruff (#781)
HonahX May 30, 2024
91973f2
Bump moto from 5.0.8 to 5.0.9 (#783)
dependabot[bot] May 31, 2024
0339e7f
Support CreateTableTransaction for SqlCatalog (#684)
HonahX May 31, 2024
84a2c04
Support CreateTableTransaction for HiveCatalog (#683)
HonahX May 31, 2024
8d79664
Support viewfs scheme along side with hdfs (#777)
yothinix May 31, 2024
20f6afd
Update `fsspec.py`to respect `s3.signer.uri property` (#741)
c-thiel May 31, 2024
5dd846d
checkpoint
sungwy May 4, 2024
6357193
checkpoint2
sungwy May 5, 2024
c30a57c
todo: sort with pyarrow_transform vals
sungwy May 5, 2024
541655f
checkpoint
sungwy May 6, 2024
afe83b1
checkpoint
sungwy May 6, 2024
00ca5f0
fix
sungwy May 6, 2024
511e988
tests
sungwy May 6, 2024
3b784ab
more tests
sungwy May 6, 2024
3711b1b
adopt review feedback
sungwy May 7, 2024
f16d778
comment
sungwy May 8, 2024
80d5064
Merge branch 'transform-partition-writes' of https://github.com/syun6…
sungwy May 31, 2024
9f0a92b
rebase
sungwy May 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyiceberg/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def partition(self) -> Record: # partition key transformed with iceberg interna
for raw_partition_field_value in self.raw_partition_field_values:
partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id]
if len(partition_fields) != 1:
raise ValueError("partition_fields must contain exactly one field.")
raise ValueError(f"Cannot have redundant partitions: {partition_fields}")
partition_field = partition_fields[0]
iceberg_typed_key_values[partition_field.name] = partition_record_value(
partition_field=partition_field,
Expand Down
67 changes: 29 additions & 38 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,11 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
if not isinstance(df, pa.Table):
raise ValueError(f"Expected PyArrow table, got: {df}")

supported_transforms = {IdentityTransform}
if not all(type(field.transform) in supported_transforms for field in self.table_metadata.spec().fields):
if unsupported_partitions := [
field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform
]:
raise ValueError(
f"All transforms are not supported, expected: {supported_transforms}, but get: {[str(field) for field in self.table_metadata.spec().fields if field.transform not in supported_transforms]}."
f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
)

_check_schema_compatible(self._table.schema(), other_schema=df.schema)
Expand Down Expand Up @@ -3544,33 +3545,6 @@ class TablePartition:
arrow_table_partition: pa.Table


def _get_partition_sort_order(partition_columns: list[str], reverse: bool = False) -> dict[str, Any]:
order = 'ascending' if not reverse else 'descending'
null_placement = 'at_start' if reverse else 'at_end'
return {'sort_keys': [(column_name, order) for column_name in partition_columns], 'null_placement': null_placement}


def group_by_partition_scheme(arrow_table: pa.Table, partition_columns: list[str]) -> pa.Table:
"""Given a table, sort it by current partition scheme."""
# only works for identity for now
sort_options = _get_partition_sort_order(partition_columns, reverse=False)
sorted_arrow_table = arrow_table.sort_by(sorting=sort_options['sort_keys'], null_placement=sort_options['null_placement'])
return sorted_arrow_table


def get_partition_columns(
spec: PartitionSpec,
schema: Schema,
) -> list[str]:
partition_cols = []
for partition_field in spec.fields:
column_name = schema.find_column_name(partition_field.source_id)
if not column_name:
raise ValueError(f"{partition_field=} could not be found in {schema}.")
partition_cols.append(column_name)
return partition_cols


def _get_table_partitions(
arrow_table: pa.Table,
partition_spec: PartitionSpec,
Expand Down Expand Up @@ -3625,13 +3599,30 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
"""
import pyarrow as pa

partition_columns = get_partition_columns(spec=spec, schema=schema)
arrow_table = group_by_partition_scheme(arrow_table, partition_columns)

reversing_sort_order_options = _get_partition_sort_order(partition_columns, reverse=True)
reversed_indices = pa.compute.sort_indices(arrow_table, **reversing_sort_order_options).to_pylist()

slice_instructions: list[dict[str, Any]] = []
partition_columns: List[Tuple[PartitionField, NestedField]] = [
(partition_field, schema.find_field(partition_field.source_id)) for partition_field in spec.fields
]
partition_values_table = pa.table({
str(partition.field_id): partition.transform.pyarrow_transform(field.field_type)(arrow_table[field.name])
for partition, field in partition_columns
})

# Sort by partitions
sort_indices = pa.compute.sort_indices(
partition_values_table,
sort_keys=[(col, "ascending") for col in partition_values_table.column_names],
null_placement="at_end",
).to_pylist()
arrow_table = arrow_table.take(sort_indices)

# Get slice_instructions to group by partitions
partition_values_table = partition_values_table.take(sort_indices)
reversed_indices = pa.compute.sort_indices(
partition_values_table,
sort_keys=[(col, "descending") for col in partition_values_table.column_names],
null_placement="at_start",
).to_pylist()
slice_instructions: List[Dict[str, Any]] = []
last = len(reversed_indices)
reversed_indices_size = len(reversed_indices)
ptr = 0
Expand All @@ -3642,6 +3633,6 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
last = reversed_indices[ptr]
ptr = ptr + group_size

table_partitions: list[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions)
table_partitions: List[TablePartition] = _get_table_partitions(arrow_table, spec, schema, slice_instructions)

return table_partitions
99 changes: 98 additions & 1 deletion pyiceberg/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from abc import ABC, abstractmethod
from enum import IntEnum
from functools import singledispatch
from typing import Any, Callable, Generic, Optional, TypeVar
from typing import TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar
from typing import Literal as LiteralType
from uuid import UUID

Expand Down Expand Up @@ -82,6 +82,9 @@
from pyiceberg.utils.parsing import ParseNumberFromBrackets
from pyiceberg.utils.singleton import Singleton

if TYPE_CHECKING:
import pyarrow as pa

S = TypeVar("S")
T = TypeVar("T")

Expand Down Expand Up @@ -175,6 +178,13 @@ def __eq__(self, other: Any) -> bool:
return self.root == other.root
return False

@property
def supports_pyarrow_transform(self) -> bool:
return False

@abstractmethod
def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]": ...


class BucketTransform(Transform[S, int]):
"""Base Transform class to transform a value into a bucket partition value.
Expand Down Expand Up @@ -290,6 +300,9 @@ def __repr__(self) -> str:
"""Return the string representation of the BucketTransform class."""
return f"BucketTransform(num_buckets={self._num_buckets})"

def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
raise NotImplementedError()


class TimeResolution(IntEnum):
YEAR = 6
Expand Down Expand Up @@ -349,6 +362,10 @@ def dedup_name(self) -> str:
def preserves_order(self) -> bool:
return True

@property
def supports_pyarrow_transform(self) -> bool:
Copy link
Copy Markdown
Contributor

@HonahX HonahX May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding a public method, how about we maintaining a private list of transform that supports pyarrow transform for the check at the beginning of append. For transforms that does not yet support pyarrow transform, we could throw NotImplementedError in pyarrow_transform

WDYT?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the existing convention for can_transform and preserves_order that are specified as class properties. I'm in favor of keeping it consistent with these

Copy link
Copy Markdown
Contributor

@HonahX HonahX May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing the context. My initial concern was that unlike can_transform and preserve_oder which behave differently across transforms, in the end the support_pyarrow_transform would return True for all the transforms, making it a little bit redundant. But on second thought, since we will likely not support pyarrow transform for other transforms in 0.7.0 release, this support_pyarrow_transform can be a useful property in the upcoming release.

We could deprecate this property after we support pyarrow for all transforms later. WDYT?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm on the same page! To add to that - I'm not yet sure if we will be able to support the other Transform classes in support_pyarrow_transform. The key idea here is that we want to use native pyarrow compute functions to apply the equivalent transform without converting the values back and forth between Arrow and Python data types. I'm not yet sure if we'll be able to do the same for BucketTransform for instance, with the existing range of pyarrow compute functions.

return True


class YearTransform(TimeTransform[S]):
"""Transforms a datetime value into a year value.
Expand Down Expand Up @@ -391,6 +408,21 @@ def __repr__(self) -> str:
"""Return the string representation of the YearTransform class."""
return "YearTransform()"

def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
import pyarrow as pa
import pyarrow.compute as pc

if isinstance(source, DateType):
epoch = datetime.EPOCH_DATE
elif isinstance(source, TimestampType):
epoch = datetime.EPOCH_TIMESTAMP
elif isinstance(source, TimestamptzType):
epoch = datetime.EPOCH_TIMESTAMPTZ
else:
raise ValueError(f"Cannot apply year transform for type: {source}")

return lambda v: pc.years_between(pa.scalar(epoch), v) if v is not None else None


class MonthTransform(TimeTransform[S]):
"""Transforms a datetime value into a month value.
Expand Down Expand Up @@ -433,6 +465,27 @@ def __repr__(self) -> str:
"""Return the string representation of the MonthTransform class."""
return "MonthTransform()"

def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
import pyarrow as pa
import pyarrow.compute as pc

if isinstance(source, DateType):
epoch = datetime.EPOCH_DATE
elif isinstance(source, TimestampType):
epoch = datetime.EPOCH_TIMESTAMP
elif isinstance(source, TimestamptzType):
epoch = datetime.EPOCH_TIMESTAMPTZ
else:
raise ValueError(f"Cannot apply month transform for type: {source}")

def month_func(v: pa.Array) -> pa.Array:
return pc.add(
pc.multiply(pc.years_between(pa.scalar(epoch), v), pa.scalar(12)),
pc.add(pc.month(v), pa.scalar(-1)),
)

return lambda v: month_func(v) if v is not None else None


class DayTransform(TimeTransform[S]):
"""Transforms a datetime value into a day value.
Expand Down Expand Up @@ -478,6 +531,21 @@ def __repr__(self) -> str:
"""Return the string representation of the DayTransform class."""
return "DayTransform()"

def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
import pyarrow as pa
import pyarrow.compute as pc

if isinstance(source, DateType):
epoch = datetime.EPOCH_DATE
elif isinstance(source, TimestampType):
epoch = datetime.EPOCH_TIMESTAMP
elif isinstance(source, TimestamptzType):
epoch = datetime.EPOCH_TIMESTAMPTZ
else:
raise ValueError(f"Cannot apply day transform for type: {source}")

return lambda v: pc.days_between(pa.scalar(epoch), v) if v is not None else None


class HourTransform(TimeTransform[S]):
"""Transforms a datetime value into a hour value.
Expand Down Expand Up @@ -515,6 +583,19 @@ def __repr__(self) -> str:
"""Return the string representation of the HourTransform class."""
return "HourTransform()"

def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
import pyarrow as pa
import pyarrow.compute as pc

if isinstance(source, TimestampType):
epoch = datetime.EPOCH_TIMESTAMP
elif isinstance(source, TimestamptzType):
epoch = datetime.EPOCH_TIMESTAMPTZ
else:
raise ValueError(f"Cannot apply hour transform for type: {source}")

return lambda v: pc.hours_between(pa.scalar(epoch), v) if v is not None else None


def _base64encode(buffer: bytes) -> str:
"""Convert bytes to base64 string."""
Expand Down Expand Up @@ -585,6 +666,13 @@ def __repr__(self) -> str:
"""Return the string representation of the IdentityTransform class."""
return "IdentityTransform()"

def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
return lambda v: v

@property
def supports_pyarrow_transform(self) -> bool:
return True


class TruncateTransform(Transform[S, S]):
"""A transform for truncating a value to a specified width.
Expand Down Expand Up @@ -725,6 +813,9 @@ def __repr__(self) -> str:
"""Return the string representation of the TruncateTransform class."""
return f"TruncateTransform(width={self._width})"

def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
raise NotImplementedError()


@singledispatch
def _human_string(value: Any, _type: IcebergType) -> str:
Expand Down Expand Up @@ -807,6 +898,9 @@ def __repr__(self) -> str:
"""Return the string representation of the UnknownTransform class."""
return f"UnknownTransform(transform={repr(self._transform)})"

def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
raise NotImplementedError()


class VoidTransform(Transform[S, None], Singleton):
"""A transform that always returns None."""
Expand Down Expand Up @@ -835,6 +929,9 @@ def __repr__(self) -> str:
"""Return the string representation of the VoidTransform class."""
return "VoidTransform()"

def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
raise NotImplementedError()


def _truncate_number(
name: str, pred: BoundLiteralPredicate[L], transform: Callable[[Optional[L]], Optional[L]]
Expand Down
43 changes: 43 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2145,3 +2145,46 @@ def arrow_table_with_only_nulls(pa_schema: "pa.Schema") -> "pa.Table":
import pyarrow as pa

return pa.Table.from_pylist([{}, {}], schema=pa_schema)


@pytest.fixture(scope="session")
def arrow_table_date_timestamps() -> "pa.Table":
"""Pyarrow table with only date, timestamp and timestamptz values."""
import pyarrow as pa

return pa.Table.from_pydict(
{
"date": [date(2023, 12, 31), date(2024, 1, 1), date(2024, 1, 31), date(2024, 2, 1), date(2024, 2, 1), None],
"timestamp": [
datetime(2023, 12, 31, 0, 0, 0),
datetime(2024, 1, 1, 0, 0, 0),
datetime(2024, 1, 31, 0, 0, 0),
datetime(2024, 2, 1, 0, 0, 0),
datetime(2024, 2, 1, 6, 0, 0),
None,
],
"timestamptz": [
datetime(2023, 12, 31, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 1, 31, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 2, 1, 0, 0, 0, tzinfo=timezone.utc),
datetime(2024, 2, 1, 6, 0, 0, tzinfo=timezone.utc),
None,
],
},
schema=pa.schema([
("date", pa.date32()),
("timestamp", pa.timestamp(unit="us")),
("timestamptz", pa.timestamp(unit="us", tz="UTC")),
]),
)


@pytest.fixture(scope="session")
def arrow_table_date_timestamps_schema() -> Schema:
"""Pyarrow table Schema with only date, timestamp and timestamptz values."""
return Schema(
NestedField(field_id=1, name="date", field_type=DateType(), required=False),
NestedField(field_id=2, name="timestamp", field_type=TimestampType(), required=False),
NestedField(field_id=3, name="timestamptz", field_type=TimestamptzType(), required=False),
)
Loading