Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 96 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,13 @@ jobs:
python -m pip install uv
if [ -f amber/requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/requirements.txt; fi
if [ -f amber/operator-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; fi
if [ -f amber/dev-requirements.txt ]; then uv pip install --system --index-strategy unsafe-best-match -r amber/dev-requirements.txt; fi
- name: Create Databases
run: |
psql -h localhost -U postgres -f sql/texera_ddl.sql
psql -h localhost -U postgres -f sql/iceberg_postgres_catalog.sql
psql -h localhost -U postgres -f sql/texera_lakefs.sql
psql -h localhost -U postgres -f sql/texera_lakekeeper.sql
env:
PGPASSWORD: postgres
- name: Setup sbt launcher
Expand All @@ -331,6 +333,96 @@ jobs:
run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases -f sql/texera_ddl.sql
env:
PGPASSWORD: postgres
- name: Start MinIO
run: |
docker run -d --name minio --network host \
-e MINIO_ROOT_USER=texera_minio \
-e MINIO_ROOT_PASSWORD=password \
minio/minio:RELEASE.2025-02-28T09-55-16Z server /data
for i in $(seq 1 3); do
curl -sf http://localhost:9000/minio/health/live && break
echo "Waiting for MinIO... (attempt $i)"
sleep 1
done
- name: Start Lakekeeper
env:
LAKEKEEPER__PG_DATABASE_URL_READ: postgres://postgres:postgres@localhost:5432/texera_lakekeeper
LAKEKEEPER__PG_DATABASE_URL_WRITE: postgres://postgres:postgres@localhost:5432/texera_lakekeeper
LAKEKEEPER__PG_ENCRYPTION_KEY: texera_key
run: |
docker run --rm --network host \
-e LAKEKEEPER__PG_DATABASE_URL_READ \
-e LAKEKEEPER__PG_DATABASE_URL_WRITE \
-e LAKEKEEPER__PG_ENCRYPTION_KEY \
vakamo/lakekeeper:v0.11.0 migrate
docker run -d --name lakekeeper --network host \
-e LAKEKEEPER__PG_DATABASE_URL_READ \
-e LAKEKEEPER__PG_DATABASE_URL_WRITE \
-e LAKEKEEPER__PG_ENCRYPTION_KEY \
-e LAKEKEEPER__METRICS_PORT=9091 \
vakamo/lakekeeper:v0.11.0 serve
for i in $(seq 1 3); do
docker exec lakekeeper /home/nonroot/lakekeeper healthcheck && break
echo "Waiting for Lakekeeper... (attempt $i)"
sleep 1
done
docker exec lakekeeper /home/nonroot/lakekeeper healthcheck || {
echo "Lakekeeper failed to start. Container logs:"
docker logs lakekeeper
exit 1
}
- name: Initialize Lakekeeper warehouse
# Pull defaults out of storage.conf so this step doesn't duplicate
# values that already live in the runtime config. Each scalar in
# storage.conf is followed by a `${?VAR}` env-override line whose
# name is globally unique, so anchoring grep on that override line
# selects the value unambiguously across nested scopes.
run: |
CONF=common/config/src/main/resources/storage.conf
extract() {
grep -B1 -F "\${?$1}" "$CONF" | head -1 | sed -E 's/.*"([^"]+)".*/\1/'
}
WAREHOUSE_NAME=$(extract STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME)
S3_BUCKET=$(extract STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET)
S3_ENDPOINT=$(extract STORAGE_S3_ENDPOINT)
S3_REGION=$(extract STORAGE_S3_REGION)
S3_USERNAME=$(extract STORAGE_S3_AUTH_USERNAME)
S3_PASSWORD=$(extract STORAGE_S3_AUTH_PASSWORD)
# Lakekeeper's management API lives on the same host as the
# catalog; strip the /catalog suffix off the catalog URI to get
# the base URL.
REST_URI=$(extract STORAGE_ICEBERG_CATALOG_REST_URI)
LAKEKEEPER_BASE=${REST_URI%/catalog}
LAKEKEEPER_BASE=${LAKEKEEPER_BASE%/}

docker run --rm --network host --entrypoint sh minio/mc -c \
"mc alias set minio $S3_ENDPOINT $S3_USERNAME $S3_PASSWORD && \
mc mb --ignore-existing minio/$S3_BUCKET"
curl -sf -X POST -H 'Content-Type: application/json' \
-d '{"project-id":"00000000-0000-0000-0000-000000000000","project-name":"default"}' \
"$LAKEKEEPER_BASE/management/v1/project" || true
curl -sf -X POST -H 'Content-Type: application/json' -d @- \
"$LAKEKEEPER_BASE/management/v1/warehouse" <<EOF
{
"warehouse-name": "$WAREHOUSE_NAME",
"project-id": "00000000-0000-0000-0000-000000000000",
"storage-profile": {
"type": "s3",
"bucket": "$S3_BUCKET",
"region": "$S3_REGION",
"endpoint": "$S3_ENDPOINT",
"flavor": "s3-compat",
"path-style-access": true,
"sts-enabled": false
},
"storage-credential": {
"type": "s3",
"credential-type": "access-key",
"aws-access-key-id": "$S3_USERNAME",
"aws-secret-access-key": "$S3_PASSWORD"
}
}
EOF
- name: Lint and run amber integration tests
# AMBER_TEST_FILTER=integration-only tells amber/build.sbt to
# keep only @org.apache.texera.amber.tags.IntegrationTest
Expand All @@ -351,6 +443,9 @@ jobs:
sbt scalafmtCheckAll \
"scalafixAll --check" \
"WorkflowExecutionService/test"
- name: Run Python integration tests
run: |
cd amber && pytest -m integration -sv

platform:
# Per-service build, test, and license check for the non-amber Scala
Expand Down Expand Up @@ -524,7 +619,7 @@ jobs:
if [ -f amber/dev-requirements.txt ]; then uv pip install --system -r amber/dev-requirements.txt; fi
- name: Test with pytest
run: |
cd amber && pytest --cov=src/main/python --cov-report=xml -sv
cd amber && pytest -m "not integration" --cov=src/main/python --cov-report=xml -sv
Comment thread
Yicong-Huang marked this conversation as resolved.
- name: Upload python coverage to Codecov
if: matrix.python-version == '3.12' && always()
uses: codecov/codecov-action@75cd11691c0faa626561e295848008c8a7dddffe # v5.5.4
Expand Down
5 changes: 4 additions & 1 deletion amber/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ testpaths = ["src/test/python"]
# to mirror `src/main/python`'s __init__.py layout to avoid duplicate
# package names. Required for src-style test layouts per the pytest
# docs (https://docs.pytest.org/en/stable/explanation/goodpractices.html).
addopts = "--import-mode=importlib"
addopts = "--import-mode=importlib"
markers = [
"integration: end-to-end test routed to the amber-integration CI job",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.storage.iceberg

import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.exceptions.NoSuchTableException
import org.apache.iceberg.rest.RESTCatalog
import org.apache.texera.amber.config.StorageConfig
import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
import org.apache.texera.amber.tags.IntegrationTest
import org.apache.texera.amber.util.IcebergUtil
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec

import java.util.UUID

/** Round-trip table metadata via the REST catalog. */
@IntegrationTest
class IcebergRestCatalogIntegrationSpec extends AnyFlatSpec with BeforeAndAfterAll {

private var restCatalog: RESTCatalog = _

private val testNamespace = "rest_integration_test"

override def beforeAll(): Unit = {
super.beforeAll()
restCatalog = IcebergUtil.createRestCatalog(
"rest_integration_test",
StorageConfig.icebergRESTCatalogWarehouseName
)
}

behavior of "Iceberg REST catalog"

it should "round-trip table metadata via the REST catalog" in {
val amberSchema = Schema(
List(
new Attribute("id", AttributeType.INTEGER),
new Attribute("name", AttributeType.STRING)
)
)
val icebergSchema = IcebergUtil.toIcebergSchema(amberSchema)

val tableName = s"rest_table_${UUID.randomUUID().toString.replace("-", "")}"
val identifier = TableIdentifier.of(testNamespace, tableName)

IcebergUtil.createTable(
restCatalog,
testNamespace,
tableName,
icebergSchema,
overrideIfExists = true
)
assert(restCatalog.tableExists(identifier))

val loaded = restCatalog.loadTable(identifier)
assert(loaded.schema().sameSchema(icebergSchema))

restCatalog.dropTable(identifier, false)
assert(!restCatalog.tableExists(identifier))
intercept[NoSuchTableException] {
restCatalog.loadTable(identifier)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import uuid

import pytest
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, NestedField, StringType

from core.storage.iceberg.iceberg_utils import create_rest_catalog

pytestmark = pytest.mark.integration


@pytest.fixture
def rest_catalog():
return create_rest_catalog(
catalog_name="rest_integration_test",
warehouse_name="texera",
rest_uri="http://localhost:8181/catalog/",
s3_endpoint="http://localhost:9000",
s3_region="us-west-2",
s3_username="texera_minio",
s3_password="password",
)


def test_rest_catalog_round_trip(rest_catalog):
"""Round-trip table metadata via the REST catalog (Lakekeeper)."""
namespace = "rest_integration_test_ns"
table_name = f"rest_test_{uuid.uuid4().hex}"
identifier = f"{namespace}.{table_name}"

schema = Schema(
NestedField(field_id=1, name="id", field_type=IntegerType(), required=False),
NestedField(field_id=2, name="name", field_type=StringType(), required=False),
)

rest_catalog.create_namespace_if_not_exists(namespace)
if rest_catalog.table_exists(identifier):
rest_catalog.drop_table(identifier)

# create — exercises REST createTable.
rest_catalog.create_table(identifier=identifier, schema=schema)
assert rest_catalog.table_exists(identifier)

# load — exercises REST loadTable (metadata fetch).
loaded = rest_catalog.load_table(identifier)
assert len(loaded.schema().fields) == 2

# drop — exercises REST dropTable.
rest_catalog.drop_table(identifier)
assert not rest_catalog.table_exists(identifier)
with pytest.raises(NoSuchTableError):
rest_catalog.load_table(identifier)
Loading