From 1bda2f0cb463fea8088f381f934999f41ae028cb Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 17:49:10 +0000 Subject: [PATCH 01/16] Initial plan From a434e5a8eda43105810b539cf4793e41363a6f64 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 18:04:33 +0000 Subject: [PATCH 02/16] Add .tar.gz/.tgz archive support for extension, preset, and workflow installation Agent-Logs-Url: https://github.com/github/spec-kit/sessions/9fb9a8ea-0967-4baf-b95c-7101e423ff58 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 236 ++++++++++++++++++++++++++++------ src/specify_cli/extensions.py | 187 ++++++++++++++++++++++----- src/specify_cli/presets.py | 79 ++++++++---- tests/test_extensions.py | 131 +++++++++++++++++++ tests/test_presets.py | 68 ++++++++++ 5 files changed, 606 insertions(+), 95 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index d5f5aba2d5..0b18a1d25f 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -2576,7 +2576,7 @@ def preset_list(): @preset_app.command("add") def preset_add( preset_id: str = typer.Argument(None, help="Preset ID to install from catalog"), - from_url: str = typer.Option(None, "--from", help="Install from a URL (ZIP file)"), + from_url: str = typer.Option(None, "--from", help="Install from a URL (ZIP or .tar.gz/.tgz archive)"), dev: str = typer.Option(None, "--dev", help="Install from local directory (development mode)"), priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"), ): @@ -2629,17 +2629,24 @@ def preset_add( import urllib.request import urllib.error import tempfile + from .extensions import _detect_archive_format as _det_fmt with tempfile.TemporaryDirectory() as tmpdir: - zip_path = Path(tmpdir) / "preset.zip" + archive_fmt = _det_fmt(from_url) try: with urllib.request.urlopen(from_url, timeout=60) as response: - zip_path.write_bytes(response.read()) + if not archive_fmt: + content_type = response.headers.get("Content-Type", "") + archive_fmt = _det_fmt(from_url, content_type) + archive_data = response.read() except urllib.error.URLError as e: console.print(f"[red]Error:[/red] Failed to download: {e}") raise typer.Exit(1) - manifest = manager.install_from_zip(zip_path, speckit_version, priority) + suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip" + archive_path = Path(tmpdir) / f"preset{suffix}" + archive_path.write_bytes(archive_data) + manifest = manager.install_from_zip(archive_path, speckit_version, priority) console.print(f"[green]✓[/green] Preset '{manifest.name}' v{manifest.version} installed (priority {priority})") @@ -3573,7 +3580,7 @@ def catalog_remove( def extension_add( extension: str = typer.Argument(help="Extension name or path"), dev: bool = typer.Option(False, "--dev", help="Install from local directory"), - from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL"), + from_url: Optional[str] = typer.Option(None, "--from", help="Install from custom URL (ZIP or .tar.gz/.tgz archive)"), priority: int = typer.Option(10, "--priority", help="Resolution priority (lower = higher precedence, default 10)"), ): """Install an extension.""" @@ -3612,10 +3619,11 @@ def extension_add( manifest = manager.install_from_directory(source_path, speckit_version, priority=priority) elif from_url: - # Install from URL (ZIP file) + # Install from URL (ZIP or tar.gz archive) import urllib.request import urllib.error from urllib.parse import urlparse + from .extensions import _detect_archive_format # Validate URL parsed = urlparse(from_url) @@ -3631,25 +3639,33 @@ def extension_add( console.print("Only install extensions from sources you trust.\n") console.print(f"Downloading from {from_url}...") - # Download ZIP to temp location + # Download archive to temp location; detect format from URL or Content-Type. download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads" download_dir.mkdir(parents=True, exist_ok=True) - zip_path = download_dir / f"{extension}-url-download.zip" + archive_fmt = _detect_archive_format(from_url) try: with urllib.request.urlopen(from_url, timeout=60) as response: - zip_data = response.read() - zip_path.write_bytes(zip_data) + if not archive_fmt: + content_type = response.headers.get("Content-Type", "") + archive_fmt = _detect_archive_format(from_url, content_type) + archive_data = response.read() - # Install from downloaded ZIP - manifest = manager.install_from_zip(zip_path, speckit_version, priority=priority) + suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip" + archive_path = download_dir / f"{extension}-url-download{suffix}" + archive_path.write_bytes(archive_data) + + # Install from downloaded archive + manifest = manager.install_from_zip(archive_path, speckit_version, priority=priority) except urllib.error.URLError as e: console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}") raise typer.Exit(1) finally: - # Clean up downloaded ZIP - if zip_path.exists(): - zip_path.unlink() + # Clean up downloaded archive + for _suffix in (".zip", ".tar.gz"): + _p = download_dir / f"{extension}-url-download{_suffix}" + if _p.exists(): + _p.unlink() else: # Try bundled extensions first (shipped with spec-kit) @@ -4303,27 +4319,43 @@ def extension_update( # 5. Download new version zip_path = catalog.download_extension(extension_id) try: - # 6. Validate extension ID from ZIP BEFORE modifying installation - # Handle both root-level and nested extension.yml (GitHub auto-generated ZIPs) - with zipfile.ZipFile(zip_path, "r") as zf: - import yaml - manifest_data = None - namelist = zf.namelist() - - # First try root-level extension.yml - if "extension.yml" in namelist: - with zf.open("extension.yml") as f: - manifest_data = yaml.safe_load(f) or {} - else: - # Look for extension.yml in a single top-level subdirectory - # (e.g., "repo-name-branch/extension.yml") - manifest_paths = [n for n in namelist if n.endswith("/extension.yml") and n.count("/") == 1] - if len(manifest_paths) == 1: - with zf.open(manifest_paths[0]) as f: + # 6. Validate extension ID from archive BEFORE modifying installation + # Handle both root-level and nested extension.yml (GitHub auto-generated archives) + from .extensions import _detect_archive_format as _det_fmt_upd + import tarfile as _tarfile_upd + archive_fmt_upd = _det_fmt_upd(str(zip_path)) + import yaml + manifest_data = None + + if archive_fmt_upd == "tar.gz": + with _tarfile_upd.open(zip_path, "r:gz") as tf: + # First try root-level extension.yml + try: + m = tf.getmember("extension.yml") + manifest_data = yaml.safe_load(tf.extractfile(m).read()) or {} + except KeyError: + # Look for extension.yml in a single top-level subdirectory + members = [m for m in tf.getmembers() if m.name.endswith("/extension.yml") and m.name.count("/") == 1] + if len(members) == 1: + manifest_data = yaml.safe_load(tf.extractfile(members[0]).read()) or {} + else: + with zipfile.ZipFile(zip_path, "r") as zf: + namelist = zf.namelist() + + # First try root-level extension.yml + if "extension.yml" in namelist: + with zf.open("extension.yml") as f: manifest_data = yaml.safe_load(f) or {} + else: + # Look for extension.yml in a single top-level subdirectory + # (e.g., "repo-name-branch/extension.yml") + manifest_paths = [n for n in namelist if n.endswith("/extension.yml") and n.count("/") == 1] + if len(manifest_paths) == 1: + with zf.open(manifest_paths[0]) as f: + manifest_data = yaml.safe_load(f) or {} - if manifest_data is None: - raise ValueError("Downloaded extension archive is missing 'extension.yml'") + if manifest_data is None: + raise ValueError("Downloaded extension archive is missing 'extension.yml'") zip_extension_id = manifest_data.get("extension", {}).get("id") if zip_extension_id != extension_id: @@ -4875,6 +4907,54 @@ def workflow_list(): console.print() +def _wf_extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes: + """Extract ``workflow.yml`` from a ZIP or ``.tar.gz`` archive. + + Searches the archive root and a single nested top-level subdirectory + (e.g., ``repo-name-1.0/workflow.yml``). + + Args: + archive_path: Path to the downloaded archive. + archive_fmt: ``"zip"`` or ``"tar.gz"``. + + Returns: + Raw bytes of the ``workflow.yml`` file. + + Raises: + ValueError: If no ``workflow.yml`` is found in the archive. + """ + import tarfile as _tf + import zipfile as _zf + + if archive_fmt == "tar.gz": + with _tf.open(archive_path, "r:gz") as tf: + # Try root-level first. + try: + return tf.extractfile(tf.getmember("workflow.yml")).read() + except KeyError: + pass + # Look in a single top-level subdirectory. + candidates = [ + m for m in tf.getmembers() + if m.name.endswith("/workflow.yml") and m.name.count("/") == 1 + ] + if len(candidates) == 1: + return tf.extractfile(candidates[0]).read() + else: + with _zf.ZipFile(archive_path, "r") as zf: + namelist = zf.namelist() + if "workflow.yml" in namelist: + return zf.read("workflow.yml") + candidates = [ + n for n in namelist + if n.endswith("/workflow.yml") and n.count("/") == 1 + ] + if len(candidates) == 1: + return zf.read(candidates[0]) + + raise ValueError("No workflow.yml found in the downloaded archive") + + @workflow_app.command("add") def workflow_add( source: str = typer.Argument(..., help="Workflow ID, URL, or local path"), @@ -4928,6 +5008,8 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: from ipaddress import ip_address from urllib.parse import urlparse from urllib.request import urlopen # noqa: S310 + from .extensions import _detect_archive_format as _wf_det_fmt + import tarfile as _wf_tarfile parsed_src = urlparse(source) src_host = parsed_src.hostname or "" @@ -4958,18 +5040,51 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb): console.print(f"[red]Error:[/red] URL redirected to non-HTTPS: {final_url}") raise typer.Exit(1) + + # Detect archive format from the final URL or Content-Type header. + archive_fmt = _wf_det_fmt(final_url) + if not archive_fmt: + content_type = resp.headers.get("Content-Type", "") + archive_fmt = _wf_det_fmt(final_url, content_type) + + raw_data = resp.read() + except typer.Exit: + raise + except Exception as exc: + console.print(f"[red]Error:[/red] Failed to download workflow: {exc}") + raise typer.Exit(1) + + tmp_path = None + try: + if archive_fmt in ("tar.gz", "zip"): + # Extract workflow.yml from the archive. + suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip" + with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as arc_tmp: + arc_tmp.write(raw_data) + arc_tmp_path = Path(arc_tmp.name) + try: + wf_yaml = _wf_extract_workflow_yml(arc_tmp_path, archive_fmt) + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + tmp.write(wf_yaml) + tmp_path = Path(tmp.name) + finally: + arc_tmp_path.unlink(missing_ok=True) + else: + # Treat as a plain YAML file (existing behaviour). with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: - tmp.write(resp.read()) + tmp.write(raw_data) tmp_path = Path(tmp.name) except typer.Exit: raise except Exception as exc: - console.print(f"[red]Error:[/red] Failed to download workflow: {exc}") + console.print(f"[red]Error:[/red] Failed to process downloaded workflow: {exc}") raise typer.Exit(1) + try: _validate_and_install_local(tmp_path, source) finally: - tmp_path.unlink(missing_ok=True) + if tmp_path is not None: + tmp_path.unlink(missing_ok=True) return # Try as a local file/directory @@ -4978,6 +5093,26 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: if source_path.is_file() and source_path.suffix in (".yml", ".yaml"): _validate_and_install_local(source_path, str(source_path)) return + elif source_path.is_file() and ( + source.endswith(".tar.gz") or source.endswith(".tgz") or source.endswith(".zip") + ): + # Local archive file containing workflow.yml + from .extensions import _detect_archive_format as _wf_local_fmt + local_fmt = _wf_local_fmt(source) + try: + wf_yaml = _wf_extract_workflow_yml(source_path, local_fmt) + except (ValueError, Exception) as exc: + console.print(f"[red]Error:[/red] Failed to extract workflow from archive: {exc}") + raise typer.Exit(1) + import tempfile + with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: + tmp.write(wf_yaml) + tmp_local = Path(tmp.name) + try: + _validate_and_install_local(tmp_local, str(source_path)) + finally: + tmp_local.unlink(missing_ok=True) + return elif source_path.is_dir(): wf_file = source_path / "workflow.yml" if not wf_file.exists(): @@ -5041,6 +5176,8 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: try: from urllib.request import urlopen # noqa: S310 — URL comes from catalog + from .extensions import _detect_archive_format as _wf_cat_fmt + import tempfile as _wf_tmpmod workflow_dir.mkdir(parents=True, exist_ok=True) with urlopen(workflow_url, timeout=30) as response: # noqa: S310 @@ -5063,7 +5200,30 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: f"[red]Error:[/red] Workflow '{source}' redirected to non-HTTPS URL: {final_url}" ) raise typer.Exit(1) - workflow_file.write_bytes(response.read()) + + # Detect archive format from the final URL or Content-Type header. + cat_archive_fmt = _wf_cat_fmt(final_url) + if not cat_archive_fmt: + cat_ct = response.headers.get("Content-Type", "") + cat_archive_fmt = _wf_cat_fmt(final_url, cat_ct) + + raw_response = response.read() + + if cat_archive_fmt in ("tar.gz", "zip"): + # Download URL points to an archive — extract workflow.yml from it. + suffix = ".tar.gz" if cat_archive_fmt == "tar.gz" else ".zip" + with _wf_tmpmod.NamedTemporaryFile(suffix=suffix, delete=False) as arc_f: + arc_f.write(raw_response) + arc_tmp = Path(arc_f.name) + try: + wf_yaml_bytes = _wf_extract_workflow_yml(arc_tmp, cat_archive_fmt) + finally: + arc_tmp.unlink(missing_ok=True) + workflow_file.write_bytes(wf_yaml_bytes) + else: + workflow_file.write_bytes(raw_response) + except typer.Exit: + raise except Exception as exc: if workflow_dir.exists(): import shutil diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions.py index a419ebf1d2..14e78a6701 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions.py @@ -9,6 +9,8 @@ import json import hashlib import os +import sys +import tarfile import tempfile import zipfile import shutil @@ -106,6 +108,106 @@ def normalize_priority(value: Any, default: int = 10) -> int: return priority if priority >= 1 else default +def _detect_archive_format(url: str, content_type: str = "") -> str: + """Detect archive format from URL path extension or Content-Type header. + + Args: + url: URL or file path to inspect. + content_type: Optional ``Content-Type`` header value from the HTTP response. + + Returns: + ``"zip"`` for ZIP archives, ``"tar.gz"`` for gzipped tarballs, or ``""`` + when the format cannot be determined. + """ + # Strip query-string / fragment before examining the path extension. + url_path = url.split("?")[0].split("#")[0].lower() + if url_path.endswith(".zip"): + return "zip" + if url_path.endswith(".tar.gz") or url_path.endswith(".tgz"): + return "tar.gz" + + # Fall back to Content-Type header inspection. + ct = content_type.lower() + if "application/zip" in ct or "application/x-zip" in ct: + return "zip" + if any( + t in ct + for t in ( + "application/gzip", + "application/x-gzip", + "application/x-tar+gzip", + ) + ): + return "tar.gz" + + return "" + + +def _safe_extract_tarball( + archive_path: Path, + dest_dir: Path, + error_class: type = Exception, +) -> None: + """Safely extract a ``.tar.gz`` or ``.tgz`` archive into *dest_dir*. + + All members are validated before extraction to prevent *tar slip* + (path traversal) attacks. Symlinks, hard links, and special files + (devices, FIFOs, etc.) are rejected. + + On Python 3.12 and later the ``"data"`` extraction filter is applied + for an additional layer of OS-level protection. + + Args: + archive_path: Path to the ``.tar.gz``/``.tgz`` archive. + dest_dir: Destination directory (must already exist). + error_class: Exception class to raise on unsafe entries. + + Raises: + error_class: If any member is unsafe or the archive cannot be read. + """ + dest_resolved = dest_dir.resolve() + + with tarfile.open(archive_path, "r:gz") as tf: + members = tf.getmembers() + + # Validate every member before extracting anything. + for member in members: + # Reject absolute paths and any path component that is "..". + if os.path.isabs(member.name) or any( + part == ".." for part in member.name.replace("\\", "/").split("/") + ): + raise error_class( + f"Unsafe path in tar archive: {member.name} (potential path traversal)" + ) + + # Confirm the resolved path stays inside dest_dir. + member_path = (dest_dir / member.name).resolve() + try: + member_path.relative_to(dest_resolved) + except ValueError: + raise error_class( + f"Unsafe path in tar archive: {member.name} (potential path traversal)" + ) + + # Reject symlinks and hard links. + if member.issym() or member.islnk(): + raise error_class( + f"Symlinks are not allowed in archive: {member.name}" + ) + + # Only allow regular files and directories. + if not (member.isreg() or member.isdir()): + raise error_class( + f"Non-regular file in archive: {member.name}" + ) + + # Extract — use the "data" filter on Python 3.12+ for extra hardening. + if sys.version_info >= (3, 12): + tf.extractall(dest_dir, filter="data") # type: ignore[call-arg] + else: + tf.extractall(dest_dir) # noqa: S202 — validated manually above + + @dataclass class CatalogEntry: """Represents a single catalog entry in the catalog stack.""" @@ -1202,10 +1304,10 @@ def install_from_zip( speckit_version: str, priority: int = 10, ) -> ExtensionManifest: - """Install extension from ZIP file. + """Install extension from a ZIP or ``.tar.gz``/``.tgz`` archive. Args: - zip_path: Path to extension ZIP file + zip_path: Path to the extension archive (ZIP or gzipped tarball). speckit_version: Current spec-kit version priority: Resolution priority (lower = higher precedence, default 10) @@ -1213,7 +1315,8 @@ def install_from_zip( Installed extension manifest Raises: - ValidationError: If manifest is invalid or priority is invalid + ValidationError: If manifest is invalid, the archive is unsafe, or + priority is invalid CompatibilityError: If extension is incompatible """ # Validate priority early @@ -1223,21 +1326,27 @@ def install_from_zip( with tempfile.TemporaryDirectory() as tmpdir: temp_path = Path(tmpdir) - # Extract ZIP safely (prevent Zip Slip attack) - with zipfile.ZipFile(zip_path, 'r') as zf: - # Validate all paths first before extracting anything - temp_path_resolved = temp_path.resolve() - for member in zf.namelist(): - member_path = (temp_path / member).resolve() - # Use is_relative_to for safe path containment check - try: - member_path.relative_to(temp_path_resolved) - except ValueError: - raise ValidationError( - f"Unsafe path in ZIP archive: {member} (potential path traversal)" - ) - # Only extract after all paths are validated - zf.extractall(temp_path) + archive_fmt = _detect_archive_format(str(zip_path)) + + if archive_fmt == "tar.gz": + # Extract tarball safely (prevent tar slip attack) + _safe_extract_tarball(zip_path, temp_path, ValidationError) + else: + # Extract ZIP safely (prevent Zip Slip attack) + with zipfile.ZipFile(zip_path, 'r') as zf: + # Validate all paths first before extracting anything + temp_path_resolved = temp_path.resolve() + for member in zf.namelist(): + member_path = (temp_path / member).resolve() + # Use is_relative_to for safe path containment check + try: + member_path.relative_to(temp_path_resolved) + except ValueError: + raise ValidationError( + f"Unsafe path in ZIP archive: {member} (potential path traversal)" + ) + # Only extract after all paths are validated + zf.extractall(temp_path) # Find extension directory (may be nested) extension_dir = temp_path @@ -1251,7 +1360,7 @@ def install_from_zip( manifest_path = extension_dir / "extension.yml" if not manifest_path.exists(): - raise ValidationError("No extension.yml found in ZIP file") + raise ValidationError("No extension.yml found in archive") # Install from extracted directory return self.install_from_directory(extension_dir, speckit_version, priority=priority) @@ -1965,14 +2074,18 @@ def get_extension_info(self, extension_id: str) -> Optional[Dict[str, Any]]: return None def download_extension(self, extension_id: str, target_dir: Optional[Path] = None) -> Path: - """Download extension ZIP from catalog. + """Download extension archive from catalog. + + Supports both ZIP (``.zip``) and gzipped tarball (``.tar.gz``/``.tgz``) + archives. The format is detected from the download URL's path extension; + when ambiguous the ``Content-Type`` header is used as a fallback. Args: extension_id: ID of the extension to download - target_dir: Directory to save ZIP file (defaults to temp directory) + target_dir: Directory to save the archive (defaults to cache directory) Returns: - Path to downloaded ZIP file + Path to downloaded archive file Raises: ExtensionError: If extension not found or download fails @@ -2011,21 +2124,35 @@ def download_extension(self, extension_id: str, target_dir: Optional[Path] = Non target_dir.mkdir(parents=True, exist_ok=True) version = ext_info.get("version", "unknown") - zip_filename = f"{extension_id}-{version}.zip" - zip_path = target_dir / zip_filename - # Download the ZIP file + # Detect archive format from URL; resolve via Content-Type when needed. + archive_fmt = _detect_archive_format(download_url) + + # Download the archive try: with self._open_url(download_url, timeout=60) as response: - zip_data = response.read() - - zip_path.write_bytes(zip_data) - return zip_path + if not archive_fmt: + content_type = response.headers.get("Content-Type", "") + archive_fmt = _detect_archive_format(download_url, content_type) + archive_data = response.read() except urllib.error.URLError as e: raise ExtensionError(f"Failed to download extension from {download_url}: {e}") except IOError as e: - raise ExtensionError(f"Failed to save extension ZIP: {e}") + raise ExtensionError(f"Failed to save extension archive: {e}") + + # Choose file extension based on detected format. + if archive_fmt == "tar.gz": + archive_filename = f"{extension_id}-{version}.tar.gz" + else: + archive_filename = f"{extension_id}-{version}.zip" + + archive_path = target_dir / archive_filename + try: + archive_path.write_bytes(archive_data) + except IOError as e: + raise ExtensionError(f"Failed to save extension archive: {e}") + return archive_path def clear_cache(self): """Clear the catalog cache (both legacy and URL-hash-based files).""" diff --git a/src/specify_cli/presets.py b/src/specify_cli/presets.py index 27054a77fc..0d597423e1 100644 --- a/src/specify_cli/presets.py +++ b/src/specify_cli/presets.py @@ -27,7 +27,7 @@ from packaging import version as pkg_version from packaging.specifiers import SpecifierSet, InvalidSpecifier -from .extensions import ExtensionRegistry, normalize_priority +from .extensions import ExtensionRegistry, normalize_priority, _detect_archive_format, _safe_extract_tarball def _substitute_core_template( @@ -1604,10 +1604,10 @@ def install_from_zip( speckit_version: str, priority: int = 10, ) -> PresetManifest: - """Install preset from ZIP file. + """Install preset from a ZIP or ``.tar.gz``/``.tgz`` archive. Args: - zip_path: Path to preset ZIP file + zip_path: Path to the preset archive (ZIP or gzipped tarball). speckit_version: Current spec-kit version priority: Resolution priority (lower = higher precedence, default 10) @@ -1615,7 +1615,8 @@ def install_from_zip( Installed preset manifest Raises: - PresetValidationError: If manifest is invalid or priority is invalid + PresetValidationError: If manifest is invalid, the archive is unsafe, + or priority is invalid PresetCompatibilityError: If pack is incompatible """ # Validate priority early @@ -1625,18 +1626,24 @@ def install_from_zip( with tempfile.TemporaryDirectory() as tmpdir: temp_path = Path(tmpdir) - with zipfile.ZipFile(zip_path, 'r') as zf: - temp_path_resolved = temp_path.resolve() - for member in zf.namelist(): - member_path = (temp_path / member).resolve() - try: - member_path.relative_to(temp_path_resolved) - except ValueError: - raise PresetValidationError( - f"Unsafe path in ZIP archive: {member} " - "(potential path traversal)" - ) - zf.extractall(temp_path) + archive_fmt = _detect_archive_format(str(zip_path)) + + if archive_fmt == "tar.gz": + # Extract tarball safely (prevent tar slip attack) + _safe_extract_tarball(zip_path, temp_path, PresetValidationError) + else: + with zipfile.ZipFile(zip_path, 'r') as zf: + temp_path_resolved = temp_path.resolve() + for member in zf.namelist(): + member_path = (temp_path / member).resolve() + try: + member_path.relative_to(temp_path_resolved) + except ValueError: + raise PresetValidationError( + f"Unsafe path in ZIP archive: {member} " + "(potential path traversal)" + ) + zf.extractall(temp_path) pack_dir = temp_path manifest_path = pack_dir / "preset.yml" @@ -1649,7 +1656,7 @@ def install_from_zip( if not manifest_path.exists(): raise PresetValidationError( - "No preset.yml found in ZIP file" + "No preset.yml found in archive" ) return self.install_from_directory(pack_dir, speckit_version, priority) @@ -2242,14 +2249,18 @@ def get_pack_info( def download_pack( self, pack_id: str, target_dir: Optional[Path] = None ) -> Path: - """Download preset ZIP from catalog. + """Download preset archive from catalog. + + Supports both ZIP (``.zip``) and gzipped tarball (``.tar.gz``/``.tgz``) + archives. The format is detected from the download URL's path extension; + when ambiguous the ``Content-Type`` header is used as a fallback. Args: pack_id: ID of the preset to download - target_dir: Directory to save ZIP file (defaults to cache directory) + target_dir: Directory to save the archive (defaults to cache directory) Returns: - Path to downloaded ZIP file + Path to downloaded archive file Raises: PresetError: If pack not found or download fails @@ -2301,22 +2312,36 @@ def download_pack( target_dir.mkdir(parents=True, exist_ok=True) version = pack_info.get("version", "unknown") - zip_filename = f"{pack_id}-{version}.zip" - zip_path = target_dir / zip_filename + + # Detect archive format from URL; resolve via Content-Type when needed. + archive_fmt = _detect_archive_format(download_url) try: with self._open_url(download_url, timeout=60) as response: - zip_data = response.read() - - zip_path.write_bytes(zip_data) - return zip_path + if not archive_fmt: + content_type = response.headers.get("Content-Type", "") + archive_fmt = _detect_archive_format(download_url, content_type) + archive_data = response.read() except urllib.error.URLError as e: raise PresetError( f"Failed to download preset from {download_url}: {e}" ) except IOError as e: - raise PresetError(f"Failed to save preset ZIP: {e}") + raise PresetError(f"Failed to save preset archive: {e}") + + # Choose file extension based on detected format. + if archive_fmt == "tar.gz": + archive_filename = f"{pack_id}-{version}.tar.gz" + else: + archive_filename = f"{pack_id}-{version}.zip" + + archive_path = target_dir / archive_filename + try: + archive_path.write_bytes(archive_data) + except IOError as e: + raise PresetError(f"Failed to save preset archive: {e}") + return archive_path def clear_cache(self): """Clear all catalog cache files, including per-URL hashed caches.""" diff --git a/tests/test_extensions.py b/tests/test_extensions.py index c5be0ab4f3..4cf391605b 100644 --- a/tests/test_extensions.py +++ b/tests/test_extensions.py @@ -178,6 +178,47 @@ def test_custom_default(self): assert normalize_priority("invalid", default=1) == 1 +# ===== _detect_archive_format Tests ===== + +class TestDetectArchiveFormat: + """Test the _detect_archive_format helper.""" + + def _fmt(self, url, ct=""): + from specify_cli.extensions import _detect_archive_format + return _detect_archive_format(url, ct) + + def test_zip_url_extension(self): + assert self._fmt("https://example.com/ext-1.0.0.zip") == "zip" + + def test_tar_gz_url_extension(self): + assert self._fmt("https://example.com/ext-1.0.0.tar.gz") == "tar.gz" + + def test_tgz_url_extension(self): + assert self._fmt("https://example.com/ext-1.0.0.tgz") == "tar.gz" + + def test_zip_uppercase_url_extension(self): + assert self._fmt("https://example.com/ext.ZIP") == "zip" + + def test_tar_gz_with_query_string(self): + assert self._fmt("https://example.com/ext.tar.gz?token=abc") == "tar.gz" + + def test_zip_content_type_fallback(self): + assert self._fmt("https://example.com/download", "application/zip") == "zip" + + def test_gzip_content_type_fallback(self): + assert self._fmt("https://example.com/download", "application/gzip") == "tar.gz" + + def test_x_gzip_content_type_fallback(self): + assert self._fmt("https://example.com/download", "application/x-gzip") == "tar.gz" + + def test_unknown_returns_empty_string(self): + assert self._fmt("https://example.com/workflow.yml") == "" + + def test_url_extension_takes_precedence_over_content_type(self): + # URL says .zip — content-type claiming gzip should not override. + assert self._fmt("https://example.com/ext.zip", "application/gzip") == "zip" + + # ===== ExtensionManifest Tests ===== class TestExtensionManifest: @@ -1013,6 +1054,96 @@ def test_config_backup_on_remove(self, extension_dir, project_dir): assert backup_file.read_text() == "test: config" +# ===== install_from_zip Tarball Tests ===== + +class TestInstallFromTarball: + """Tests for install_from_zip accepting .tar.gz/.tgz archives.""" + + def _make_tarball(self, dest: Path, extension_dir: Path, nested: bool = False) -> None: + """Create a minimal .tar.gz archive from *extension_dir*.""" + import tarfile + with tarfile.open(dest, "w:gz") as tf: + for file_path in extension_dir.rglob("*"): + if file_path.is_file(): + arcname = file_path.relative_to(extension_dir) + if nested: + arcname = Path("test-ext-v1.0.0") / arcname + tf.add(file_path, arcname=str(arcname)) + + def test_install_from_tar_gz(self, extension_dir, project_dir, temp_dir): + """install_from_zip should accept a .tar.gz archive.""" + archive = temp_dir / "test-ext-1.0.0.tar.gz" + self._make_tarball(archive, extension_dir) + + manager = ExtensionManager(project_dir) + manifest = manager.install_from_zip(archive, "0.1.0") + assert manifest.id == "test-ext" + assert manager.registry.is_installed("test-ext") + + def test_install_from_tgz(self, extension_dir, project_dir, temp_dir): + """install_from_zip should accept a .tgz archive.""" + archive = temp_dir / "test-ext-1.0.0.tgz" + self._make_tarball(archive, extension_dir) + + manager = ExtensionManager(project_dir) + manifest = manager.install_from_zip(archive, "0.1.0") + assert manifest.id == "test-ext" + assert manager.registry.is_installed("test-ext") + + def test_install_from_tar_gz_nested(self, extension_dir, project_dir, temp_dir): + """install_from_zip should handle a single nested directory inside the tarball.""" + archive = temp_dir / "test-ext-nested.tar.gz" + self._make_tarball(archive, extension_dir, nested=True) + + manager = ExtensionManager(project_dir) + manifest = manager.install_from_zip(archive, "0.1.0") + assert manifest.id == "test-ext" + assert manager.registry.is_installed("test-ext") + + def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir): + """install_from_zip raises ValidationError when tarball has no extension.yml.""" + import tarfile + archive = temp_dir / "bad.tar.gz" + with tarfile.open(archive, "w:gz") as tf: + import io + data = b"no manifest here" + info = tarfile.TarInfo(name="readme.txt") + info.size = len(data) + tf.addfile(info, io.BytesIO(data)) + + manager = ExtensionManager(project_dir) + with pytest.raises(ValidationError, match="No extension.yml found"): + manager.install_from_zip(archive, "0.1.0") + + def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir): + """install_from_zip must reject tarballs with path traversal entries.""" + import tarfile, io + archive = temp_dir / "evil.tar.gz" + with tarfile.open(archive, "w:gz") as tf: + info = tarfile.TarInfo(name="../../evil.txt") + data = b"evil" + info.size = len(data) + tf.addfile(info, io.BytesIO(data)) + + manager = ExtensionManager(project_dir) + with pytest.raises(ValidationError, match="Unsafe path"): + manager.install_from_zip(archive, "0.1.0") + + def test_install_from_tar_gz_rejects_symlinks(self, project_dir, temp_dir): + """install_from_zip must reject tarballs containing symlinks.""" + import tarfile + archive = temp_dir / "symlink.tar.gz" + with tarfile.open(archive, "w:gz") as tf: + info = tarfile.TarInfo(name="link") + info.type = tarfile.SYMTYPE + info.linkname = "/etc/passwd" + tf.addfile(info) + + manager = ExtensionManager(project_dir) + with pytest.raises(ValidationError, match="Symlinks"): + manager.install_from_zip(archive, "0.1.0") + + # ===== CommandRegistrar Tests ===== class TestCommandRegistrar: diff --git a/tests/test_presets.py b/tests/test_presets.py index 4b167ed9be..6a1d7f9a43 100644 --- a/tests/test_presets.py +++ b/tests/test_presets.py @@ -649,6 +649,74 @@ def test_install_from_zip_no_manifest(self, project_dir, temp_dir): with pytest.raises(PresetValidationError, match="No preset.yml found"): manager.install_from_zip(zip_path, "0.1.5") + def _make_tarball(self, dest, pack_dir, nested=False): + import tarfile + with tarfile.open(dest, "w:gz") as tf: + for file_path in pack_dir.rglob("*"): + if file_path.is_file(): + arcname = file_path.relative_to(pack_dir) + if nested: + arcname = Path("test-pack-v1.0.0") / arcname + tf.add(file_path, arcname=str(arcname)) + + def test_install_from_tar_gz(self, project_dir, pack_dir, temp_dir): + """Test installing a preset from a .tar.gz archive.""" + archive = temp_dir / "test-pack-1.0.tar.gz" + self._make_tarball(archive, pack_dir) + + manager = PresetManager(project_dir) + manifest = manager.install_from_zip(archive, "0.1.5") + assert manifest.id == "test-pack" + assert manager.registry.is_installed("test-pack") + + def test_install_from_tgz(self, project_dir, pack_dir, temp_dir): + """Test installing a preset from a .tgz archive.""" + archive = temp_dir / "test-pack-1.0.tgz" + self._make_tarball(archive, pack_dir) + + manager = PresetManager(project_dir) + manifest = manager.install_from_zip(archive, "0.1.5") + assert manifest.id == "test-pack" + assert manager.registry.is_installed("test-pack") + + def test_install_from_tar_gz_nested(self, project_dir, pack_dir, temp_dir): + """Test installing a preset from a .tar.gz archive with a single nested directory.""" + archive = temp_dir / "test-pack-nested.tar.gz" + self._make_tarball(archive, pack_dir, nested=True) + + manager = PresetManager(project_dir) + manifest = manager.install_from_zip(archive, "0.1.5") + assert manifest.id == "test-pack" + assert manager.registry.is_installed("test-pack") + + def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir): + """Test installing a preset from a .tar.gz without preset.yml raises error.""" + import tarfile, io + archive = temp_dir / "bad.tar.gz" + with tarfile.open(archive, "w:gz") as tf: + data = b"no manifest here" + info = tarfile.TarInfo(name="readme.txt") + info.size = len(data) + tf.addfile(info, io.BytesIO(data)) + + manager = PresetManager(project_dir) + with pytest.raises(PresetValidationError, match="No preset.yml found"): + manager.install_from_zip(archive, "0.1.5") + + def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir): + """install_from_zip must reject tarballs with path traversal entries.""" + import tarfile, io + archive = temp_dir / "evil.tar.gz" + with tarfile.open(archive, "w:gz") as tf: + info = tarfile.TarInfo(name="../../evil.txt") + data = b"evil" + info.size = len(data) + tf.addfile(info, io.BytesIO(data)) + + manager = PresetManager(project_dir) + with pytest.raises(PresetValidationError, match="Unsafe path"): + manager.install_from_zip(archive, "0.1.5") + def test_remove(self, project_dir, pack_dir): """Test removing a preset.""" manager = PresetManager(project_dir) From b37f117cf9b07e9509148ed4f591e11dd58a1f6e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 18:06:49 +0000 Subject: [PATCH 03/16] Address code review: fix import style and rename local aliases Agent-Logs-Url: https://github.com/github/spec-kit/sessions/9fb9a8ea-0967-4baf-b95c-7101e423ff58 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 11 +++++------ tests/test_extensions.py | 5 +++-- tests/test_presets.py | 6 ++++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 0b18a1d25f..cbb189ac6f 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4321,14 +4321,14 @@ def extension_update( try: # 6. Validate extension ID from archive BEFORE modifying installation # Handle both root-level and nested extension.yml (GitHub auto-generated archives) - from .extensions import _detect_archive_format as _det_fmt_upd - import tarfile as _tarfile_upd - archive_fmt_upd = _det_fmt_upd(str(zip_path)) + from .extensions import _detect_archive_format as _ext_det_fmt + import tarfile as _tarfile + archive_fmt = _ext_det_fmt(str(zip_path)) import yaml manifest_data = None - if archive_fmt_upd == "tar.gz": - with _tarfile_upd.open(zip_path, "r:gz") as tf: + if archive_fmt == "tar.gz": + with _tarfile.open(zip_path, "r:gz") as tf: # First try root-level extension.yml try: m = tf.getmember("extension.yml") @@ -5009,7 +5009,6 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: from urllib.parse import urlparse from urllib.request import urlopen # noqa: S310 from .extensions import _detect_archive_format as _wf_det_fmt - import tarfile as _wf_tarfile parsed_src = urlparse(source) src_host = parsed_src.hostname or "" diff --git a/tests/test_extensions.py b/tests/test_extensions.py index 4cf391605b..6310140070 100644 --- a/tests/test_extensions.py +++ b/tests/test_extensions.py @@ -1103,9 +1103,9 @@ def test_install_from_tar_gz_nested(self, extension_dir, project_dir, temp_dir): def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir): """install_from_zip raises ValidationError when tarball has no extension.yml.""" import tarfile + import io archive = temp_dir / "bad.tar.gz" with tarfile.open(archive, "w:gz") as tf: - import io data = b"no manifest here" info = tarfile.TarInfo(name="readme.txt") info.size = len(data) @@ -1117,7 +1117,8 @@ def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir): def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir): """install_from_zip must reject tarballs with path traversal entries.""" - import tarfile, io + import tarfile + import io archive = temp_dir / "evil.tar.gz" with tarfile.open(archive, "w:gz") as tf: info = tarfile.TarInfo(name="../../evil.txt") diff --git a/tests/test_presets.py b/tests/test_presets.py index 6a1d7f9a43..f6f0a5a086 100644 --- a/tests/test_presets.py +++ b/tests/test_presets.py @@ -691,7 +691,8 @@ def test_install_from_tar_gz_nested(self, project_dir, pack_dir, temp_dir): def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir): """Test installing a preset from a .tar.gz without preset.yml raises error.""" - import tarfile, io + import tarfile + import io archive = temp_dir / "bad.tar.gz" with tarfile.open(archive, "w:gz") as tf: data = b"no manifest here" @@ -705,7 +706,8 @@ def test_install_from_tar_gz_no_manifest(self, project_dir, temp_dir): def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir): """install_from_zip must reject tarballs with path traversal entries.""" - import tarfile, io + import tarfile + import io archive = temp_dir / "evil.tar.gz" with tarfile.open(archive, "w:gz") as tf: info = tarfile.TarInfo(name="../../evil.txt") From b3a60f5fbad498a8418245bb78ef133278af86d4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 18:09:06 +0000 Subject: [PATCH 04/16] Improve tarball extraction security and cleanup logic Agent-Logs-Url: https://github.com/github/spec-kit/sessions/9fb9a8ea-0967-4baf-b95c-7101e423ff58 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 9 ++++----- src/specify_cli/extensions.py | 13 +++++++++++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index cbb189ac6f..d2bf7994fb 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -3643,6 +3643,7 @@ def extension_add( download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads" download_dir.mkdir(parents=True, exist_ok=True) archive_fmt = _detect_archive_format(from_url) + archive_path = None try: with urllib.request.urlopen(from_url, timeout=60) as response: @@ -3661,11 +3662,9 @@ def extension_add( console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}") raise typer.Exit(1) finally: - # Clean up downloaded archive - for _suffix in (".zip", ".tar.gz"): - _p = download_dir / f"{extension}-url-download{_suffix}" - if _p.exists(): - _p.unlink() + # Clean up the downloaded archive + if archive_path is not None and archive_path.exists(): + archive_path.unlink() else: # Try bundled extensions first (shipped with spec-kit) diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions.py index 14e78a6701..f1bb891406 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions.py @@ -155,7 +155,11 @@ def _safe_extract_tarball( (devices, FIFOs, etc.) are rejected. On Python 3.12 and later the ``"data"`` extraction filter is applied - for an additional layer of OS-level protection. + for an additional layer of OS-level protection. On earlier versions + the explicit member list (containing only pre-validated regular files + and directories) is passed to ``extractall()`` — since all symlinks are + already rejected in the validation phase, no archive-introduced symlink + can be followed during extraction. Args: archive_path: Path to the ``.tar.gz``/``.tgz`` archive. @@ -169,6 +173,7 @@ def _safe_extract_tarball( with tarfile.open(archive_path, "r:gz") as tf: members = tf.getmembers() + safe_members = [] # Validate every member before extracting anything. for member in members: @@ -201,11 +206,15 @@ def _safe_extract_tarball( f"Non-regular file in archive: {member.name}" ) + safe_members.append(member) + # Extract — use the "data" filter on Python 3.12+ for extra hardening. + # On older versions pass only the pre-validated members so that no + # unvetted entry (added concurrently or via a race) slips through. if sys.version_info >= (3, 12): tf.extractall(dest_dir, filter="data") # type: ignore[call-arg] else: - tf.extractall(dest_dir) # noqa: S202 — validated manually above + tf.extractall(dest_dir, members=safe_members) # noqa: S202 — validated above @dataclass From d78ead18020343763555db723e4968eb6ad6d2d3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 18:12:43 +0000 Subject: [PATCH 05/16] Remove unnecessary import aliases, use consistent names Agent-Logs-Url: https://github.com/github/spec-kit/sessions/9fb9a8ea-0967-4baf-b95c-7101e423ff58 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 42 ++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index d2bf7994fb..34ceedf84f 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4320,14 +4320,14 @@ def extension_update( try: # 6. Validate extension ID from archive BEFORE modifying installation # Handle both root-level and nested extension.yml (GitHub auto-generated archives) - from .extensions import _detect_archive_format as _ext_det_fmt - import tarfile as _tarfile - archive_fmt = _ext_det_fmt(str(zip_path)) + from .extensions import _detect_archive_format + import tarfile + archive_fmt = _detect_archive_format(str(zip_path)) import yaml manifest_data = None if archive_fmt == "tar.gz": - with _tarfile.open(zip_path, "r:gz") as tf: + with tarfile.open(zip_path, "r:gz") as tf: # First try root-level extension.yml try: m = tf.getmember("extension.yml") @@ -4906,7 +4906,7 @@ def workflow_list(): console.print() -def _wf_extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes: +def _extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes: """Extract ``workflow.yml`` from a ZIP or ``.tar.gz`` archive. Searches the archive root and a single nested top-level subdirectory @@ -4922,11 +4922,10 @@ def _wf_extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes: Raises: ValueError: If no ``workflow.yml`` is found in the archive. """ - import tarfile as _tf - import zipfile as _zf + import tarfile if archive_fmt == "tar.gz": - with _tf.open(archive_path, "r:gz") as tf: + with tarfile.open(archive_path, "r:gz") as tf: # Try root-level first. try: return tf.extractfile(tf.getmember("workflow.yml")).read() @@ -4940,7 +4939,7 @@ def _wf_extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes: if len(candidates) == 1: return tf.extractfile(candidates[0]).read() else: - with _zf.ZipFile(archive_path, "r") as zf: + with zipfile.ZipFile(archive_path, "r") as zf: namelist = zf.namelist() if "workflow.yml" in namelist: return zf.read("workflow.yml") @@ -5007,7 +5006,7 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: from ipaddress import ip_address from urllib.parse import urlparse from urllib.request import urlopen # noqa: S310 - from .extensions import _detect_archive_format as _wf_det_fmt + from .extensions import _detect_archive_format parsed_src = urlparse(source) src_host = parsed_src.hostname or "" @@ -5040,10 +5039,10 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: raise typer.Exit(1) # Detect archive format from the final URL or Content-Type header. - archive_fmt = _wf_det_fmt(final_url) + archive_fmt = _detect_archive_format(final_url) if not archive_fmt: content_type = resp.headers.get("Content-Type", "") - archive_fmt = _wf_det_fmt(final_url, content_type) + archive_fmt = _detect_archive_format(final_url, content_type) raw_data = resp.read() except typer.Exit: @@ -5061,7 +5060,7 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: arc_tmp.write(raw_data) arc_tmp_path = Path(arc_tmp.name) try: - wf_yaml = _wf_extract_workflow_yml(arc_tmp_path, archive_fmt) + wf_yaml = _extract_workflow_yml(arc_tmp_path, archive_fmt) with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: tmp.write(wf_yaml) tmp_path = Path(tmp.name) @@ -5095,10 +5094,10 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: source.endswith(".tar.gz") or source.endswith(".tgz") or source.endswith(".zip") ): # Local archive file containing workflow.yml - from .extensions import _detect_archive_format as _wf_local_fmt - local_fmt = _wf_local_fmt(source) + from .extensions import _detect_archive_format + local_fmt = _detect_archive_format(source) try: - wf_yaml = _wf_extract_workflow_yml(source_path, local_fmt) + wf_yaml = _extract_workflow_yml(source_path, local_fmt) except (ValueError, Exception) as exc: console.print(f"[red]Error:[/red] Failed to extract workflow from archive: {exc}") raise typer.Exit(1) @@ -5174,8 +5173,7 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: try: from urllib.request import urlopen # noqa: S310 — URL comes from catalog - from .extensions import _detect_archive_format as _wf_cat_fmt - import tempfile as _wf_tmpmod + from .extensions import _detect_archive_format workflow_dir.mkdir(parents=True, exist_ok=True) with urlopen(workflow_url, timeout=30) as response: # noqa: S310 @@ -5200,21 +5198,21 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: raise typer.Exit(1) # Detect archive format from the final URL or Content-Type header. - cat_archive_fmt = _wf_cat_fmt(final_url) + cat_archive_fmt = _detect_archive_format(final_url) if not cat_archive_fmt: cat_ct = response.headers.get("Content-Type", "") - cat_archive_fmt = _wf_cat_fmt(final_url, cat_ct) + cat_archive_fmt = _detect_archive_format(final_url, cat_ct) raw_response = response.read() if cat_archive_fmt in ("tar.gz", "zip"): # Download URL points to an archive — extract workflow.yml from it. suffix = ".tar.gz" if cat_archive_fmt == "tar.gz" else ".zip" - with _wf_tmpmod.NamedTemporaryFile(suffix=suffix, delete=False) as arc_f: + with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as arc_f: arc_f.write(raw_response) arc_tmp = Path(arc_f.name) try: - wf_yaml_bytes = _wf_extract_workflow_yml(arc_tmp, cat_archive_fmt) + wf_yaml_bytes = _extract_workflow_yml(arc_tmp, cat_archive_fmt) finally: arc_tmp.unlink(missing_ok=True) workflow_file.write_bytes(wf_yaml_bytes) From 0c6cc4502c4c5b715ee1d33e2d3539ec7fcf1aec Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 18:14:52 +0000 Subject: [PATCH 06/16] Fix type hint, add null checks for tf.extractfile() return value Agent-Logs-Url: https://github.com/github/spec-kit/sessions/9fb9a8ea-0967-4baf-b95c-7101e423ff58 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 16 ++++++++++++---- src/specify_cli/extensions.py | 2 +- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 34ceedf84f..99d313e123 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4331,12 +4331,16 @@ def extension_update( # First try root-level extension.yml try: m = tf.getmember("extension.yml") - manifest_data = yaml.safe_load(tf.extractfile(m).read()) or {} + f = tf.extractfile(m) + if f is not None: + manifest_data = yaml.safe_load(f.read()) or {} except KeyError: # Look for extension.yml in a single top-level subdirectory members = [m for m in tf.getmembers() if m.name.endswith("/extension.yml") and m.name.count("/") == 1] if len(members) == 1: - manifest_data = yaml.safe_load(tf.extractfile(members[0]).read()) or {} + f = tf.extractfile(members[0]) + if f is not None: + manifest_data = yaml.safe_load(f.read()) or {} else: with zipfile.ZipFile(zip_path, "r") as zf: namelist = zf.namelist() @@ -4928,7 +4932,9 @@ def _extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes: with tarfile.open(archive_path, "r:gz") as tf: # Try root-level first. try: - return tf.extractfile(tf.getmember("workflow.yml")).read() + f = tf.extractfile(tf.getmember("workflow.yml")) + if f is not None: + return f.read() except KeyError: pass # Look in a single top-level subdirectory. @@ -4937,7 +4943,9 @@ def _extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes: if m.name.endswith("/workflow.yml") and m.name.count("/") == 1 ] if len(candidates) == 1: - return tf.extractfile(candidates[0]).read() + f = tf.extractfile(candidates[0]) + if f is not None: + return f.read() else: with zipfile.ZipFile(archive_path, "r") as zf: namelist = zf.namelist() diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions.py index f1bb891406..f28c02e9b7 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions.py @@ -146,7 +146,7 @@ def _detect_archive_format(url: str, content_type: str = "") -> str: def _safe_extract_tarball( archive_path: Path, dest_dir: Path, - error_class: type = Exception, + error_class: "type[Exception]" = Exception, ) -> None: """Safely extract a ``.tar.gz`` or ``.tgz`` archive into *dest_dir*. From c44cc245ed47a21043ee1d804134c6c6b4cb47fd Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Wed, 6 May 2026 07:03:57 -0500 Subject: [PATCH 07/16] Address Copilot PR review: reject unknown archive formats, fix case-sensitive check - Add explanatory comment to empty except KeyError block in _extract_workflow_yml - Use case-insensitive extension matching for local archive detection in workflow add - Reject unknown archive formats with clear error messages instead of silently defaulting to ZIP in preset add --from, extension add --from, download_extension(), and download_pack() --- src/specify_cli/__init__.py | 14 ++++++++++++-- src/specify_cli/extensions.py | 5 +++++ src/specify_cli/presets.py | 5 +++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 99d313e123..843c60db34 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -2643,6 +2643,11 @@ def preset_add( console.print(f"[red]Error:[/red] Failed to download: {e}") raise typer.Exit(1) + if not archive_fmt: + console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.") + console.print("Ensure the URL points to a .zip or .tar.gz file.") + raise typer.Exit(1) + suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip" archive_path = Path(tmpdir) / f"preset{suffix}" archive_path.write_bytes(archive_data) @@ -3652,6 +3657,11 @@ def extension_add( archive_fmt = _detect_archive_format(from_url, content_type) archive_data = response.read() + if not archive_fmt: + console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.") + console.print("Ensure the URL points to a .zip or .tar.gz file.") + raise typer.Exit(1) + suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip" archive_path = download_dir / f"{extension}-url-download{suffix}" archive_path.write_bytes(archive_data) @@ -4936,7 +4946,7 @@ def _extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes: if f is not None: return f.read() except KeyError: - pass + pass # Root-level workflow.yml not found; fall through to subdirectory search below. # Look in a single top-level subdirectory. candidates = [ m for m in tf.getmembers() @@ -5099,7 +5109,7 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: _validate_and_install_local(source_path, str(source_path)) return elif source_path.is_file() and ( - source.endswith(".tar.gz") or source.endswith(".tgz") or source.endswith(".zip") + source.lower().endswith(".tar.gz") or source.lower().endswith(".tgz") or source.lower().endswith(".zip") ): # Local archive file containing workflow.yml from .extensions import _detect_archive_format diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions.py index f28c02e9b7..27e78925db 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions.py @@ -2151,6 +2151,11 @@ def download_extension(self, extension_id: str, target_dir: Optional[Path] = Non raise ExtensionError(f"Failed to save extension archive: {e}") # Choose file extension based on detected format. + if not archive_fmt: + raise ExtensionError( + f"Could not determine archive format for {download_url}. " + "Ensure the URL points to a .zip or .tar.gz file." + ) if archive_fmt == "tar.gz": archive_filename = f"{extension_id}-{version}.tar.gz" else: diff --git a/src/specify_cli/presets.py b/src/specify_cli/presets.py index 0d597423e1..f8cd3bbba9 100644 --- a/src/specify_cli/presets.py +++ b/src/specify_cli/presets.py @@ -2331,6 +2331,11 @@ def download_pack( raise PresetError(f"Failed to save preset archive: {e}") # Choose file extension based on detected format. + if not archive_fmt: + raise PresetError( + f"Could not determine archive format for {download_url}. " + "Ensure the URL points to a .zip or .tar.gz file." + ) if archive_fmt == "tar.gz": archive_filename = f"{pack_id}-{version}.tar.gz" else: From d00509e77077f1adb7f6ba808e458d161674b133 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 6 May 2026 19:22:38 +0000 Subject: [PATCH 08/16] Fix IOError messages, close tf.extractfile() handles, mention .tgz in error messages Agent-Logs-Url: https://github.com/github/spec-kit/sessions/891dfd6f-0f75-4522-bcd2-8a6fffb2d5f7 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 16 ++++++++++------ src/specify_cli/extensions.py | 4 ++-- src/specify_cli/presets.py | 4 ++-- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 843c60db34..b540ebd1a8 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -2645,7 +2645,7 @@ def preset_add( if not archive_fmt: console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.") - console.print("Ensure the URL points to a .zip or .tar.gz file.") + console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.") raise typer.Exit(1) suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip" @@ -3659,7 +3659,7 @@ def extension_add( if not archive_fmt: console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.") - console.print("Ensure the URL points to a .zip or .tar.gz file.") + console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.") raise typer.Exit(1) suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip" @@ -4343,14 +4343,16 @@ def extension_update( m = tf.getmember("extension.yml") f = tf.extractfile(m) if f is not None: - manifest_data = yaml.safe_load(f.read()) or {} + with f: + manifest_data = yaml.safe_load(f.read()) or {} except KeyError: # Look for extension.yml in a single top-level subdirectory members = [m for m in tf.getmembers() if m.name.endswith("/extension.yml") and m.name.count("/") == 1] if len(members) == 1: f = tf.extractfile(members[0]) if f is not None: - manifest_data = yaml.safe_load(f.read()) or {} + with f: + manifest_data = yaml.safe_load(f.read()) or {} else: with zipfile.ZipFile(zip_path, "r") as zf: namelist = zf.namelist() @@ -4944,7 +4946,8 @@ def _extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes: try: f = tf.extractfile(tf.getmember("workflow.yml")) if f is not None: - return f.read() + with f: + return f.read() except KeyError: pass # Root-level workflow.yml not found; fall through to subdirectory search below. # Look in a single top-level subdirectory. @@ -4955,7 +4958,8 @@ def _extract_workflow_yml(archive_path: Path, archive_fmt: str) -> bytes: if len(candidates) == 1: f = tf.extractfile(candidates[0]) if f is not None: - return f.read() + with f: + return f.read() else: with zipfile.ZipFile(archive_path, "r") as zf: namelist = zf.namelist() diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions.py index 27e78925db..6946a1a09b 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions.py @@ -2148,13 +2148,13 @@ def download_extension(self, extension_id: str, target_dir: Optional[Path] = Non except urllib.error.URLError as e: raise ExtensionError(f"Failed to download extension from {download_url}: {e}") except IOError as e: - raise ExtensionError(f"Failed to save extension archive: {e}") + raise ExtensionError(f"Failed to read extension archive from {download_url}: {e}") # Choose file extension based on detected format. if not archive_fmt: raise ExtensionError( f"Could not determine archive format for {download_url}. " - "Ensure the URL points to a .zip or .tar.gz file." + "Ensure the URL points to a .zip or .tar.gz/.tgz file." ) if archive_fmt == "tar.gz": archive_filename = f"{extension_id}-{version}.tar.gz" diff --git a/src/specify_cli/presets.py b/src/specify_cli/presets.py index f8cd3bbba9..e270bb504c 100644 --- a/src/specify_cli/presets.py +++ b/src/specify_cli/presets.py @@ -2328,13 +2328,13 @@ def download_pack( f"Failed to download preset from {download_url}: {e}" ) except IOError as e: - raise PresetError(f"Failed to save preset archive: {e}") + raise PresetError(f"Failed to read preset archive from {download_url}: {e}") # Choose file extension based on detected format. if not archive_fmt: raise PresetError( f"Could not determine archive format for {download_url}. " - "Ensure the URL points to a .zip or .tar.gz file." + "Ensure the URL points to a .zip or .tar.gz/.tgz file." ) if archive_fmt == "tar.gz": archive_filename = f"{pack_id}-{version}.tar.gz" From 0fd0bf6b9f5de68ec9739988b1ac24ea981e3af3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 6 May 2026 19:38:57 +0000 Subject: [PATCH 09/16] Catch TarError/OSError in _safe_extract_tarball; rename zip_path to archive_path in extension_update Agent-Logs-Url: https://github.com/github/spec-kit/sessions/953d7f62-a75a-4690-90a9-98345cae824d Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 16 +++---- src/specify_cli/extensions.py | 83 +++++++++++++++++++---------------- 2 files changed, 52 insertions(+), 47 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index b540ebd1a8..54f6661a6f 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4326,18 +4326,18 @@ def extension_update( backup_hooks[hook_name] = ext_hooks # 5. Download new version - zip_path = catalog.download_extension(extension_id) + archive_path = catalog.download_extension(extension_id) try: # 6. Validate extension ID from archive BEFORE modifying installation # Handle both root-level and nested extension.yml (GitHub auto-generated archives) from .extensions import _detect_archive_format import tarfile - archive_fmt = _detect_archive_format(str(zip_path)) + archive_fmt = _detect_archive_format(str(archive_path)) import yaml manifest_data = None if archive_fmt == "tar.gz": - with tarfile.open(zip_path, "r:gz") as tf: + with tarfile.open(archive_path, "r:gz") as tf: # First try root-level extension.yml try: m = tf.getmember("extension.yml") @@ -4354,7 +4354,7 @@ def extension_update( with f: manifest_data = yaml.safe_load(f.read()) or {} else: - with zipfile.ZipFile(zip_path, "r") as zf: + with zipfile.ZipFile(archive_path, "r") as zf: namelist = zf.namelist() # First try root-level extension.yml @@ -4382,7 +4382,7 @@ def extension_update( manager.remove(extension_id, keep_config=True) # 8. Install new version - _ = manager.install_from_zip(zip_path, speckit_version) + _ = manager.install_from_zip(archive_path, speckit_version) # Restore user config files from backup after successful install. new_extension_dir = manager.extensions_dir / extension_id @@ -4428,9 +4428,9 @@ def extension_update( hook["enabled"] = False hook_executor.save_project_config(config) finally: - # Clean up downloaded ZIP - if zip_path.exists(): - zip_path.unlink() + # Clean up downloaded archive + if archive_path.exists(): + archive_path.unlink() # 10. Clean up backup on success if backup_base.exists(): diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions.py index 6946a1a09b..3b25bbfbe5 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions.py @@ -171,50 +171,55 @@ def _safe_extract_tarball( """ dest_resolved = dest_dir.resolve() - with tarfile.open(archive_path, "r:gz") as tf: - members = tf.getmembers() - safe_members = [] - - # Validate every member before extracting anything. - for member in members: - # Reject absolute paths and any path component that is "..". - if os.path.isabs(member.name) or any( - part == ".." for part in member.name.replace("\\", "/").split("/") - ): - raise error_class( - f"Unsafe path in tar archive: {member.name} (potential path traversal)" - ) + try: + with tarfile.open(archive_path, "r:gz") as tf: + members = tf.getmembers() + safe_members = [] + + # Validate every member before extracting anything. + for member in members: + # Reject absolute paths and any path component that is "..". + if os.path.isabs(member.name) or any( + part == ".." for part in member.name.replace("\\", "/").split("/") + ): + raise error_class( + f"Unsafe path in tar archive: {member.name} (potential path traversal)" + ) - # Confirm the resolved path stays inside dest_dir. - member_path = (dest_dir / member.name).resolve() - try: - member_path.relative_to(dest_resolved) - except ValueError: - raise error_class( - f"Unsafe path in tar archive: {member.name} (potential path traversal)" - ) + # Confirm the resolved path stays inside dest_dir. + member_path = (dest_dir / member.name).resolve() + try: + member_path.relative_to(dest_resolved) + except ValueError: + raise error_class( + f"Unsafe path in tar archive: {member.name} (potential path traversal)" + ) - # Reject symlinks and hard links. - if member.issym() or member.islnk(): - raise error_class( - f"Symlinks are not allowed in archive: {member.name}" - ) + # Reject symlinks and hard links. + if member.issym() or member.islnk(): + raise error_class( + f"Symlinks are not allowed in archive: {member.name}" + ) - # Only allow regular files and directories. - if not (member.isreg() or member.isdir()): - raise error_class( - f"Non-regular file in archive: {member.name}" - ) + # Only allow regular files and directories. + if not (member.isreg() or member.isdir()): + raise error_class( + f"Non-regular file in archive: {member.name}" + ) - safe_members.append(member) + safe_members.append(member) - # Extract — use the "data" filter on Python 3.12+ for extra hardening. - # On older versions pass only the pre-validated members so that no - # unvetted entry (added concurrently or via a race) slips through. - if sys.version_info >= (3, 12): - tf.extractall(dest_dir, filter="data") # type: ignore[call-arg] - else: - tf.extractall(dest_dir, members=safe_members) # noqa: S202 — validated above + # Extract — use the "data" filter on Python 3.12+ for extra hardening. + # On older versions pass only the pre-validated members so that no + # unvetted entry (added concurrently or via a race) slips through. + if sys.version_info >= (3, 12): + tf.extractall(dest_dir, filter="data") # type: ignore[call-arg] + else: + tf.extractall(dest_dir, members=safe_members) # noqa: S202 — validated above + except error_class: + raise + except (tarfile.TarError, OSError) as e: + raise error_class(f"Failed to read archive {archive_path}: {e}") from e @dataclass From cb87a410f800d2385b226ce4d72b426f43981801 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 6 May 2026 19:51:00 +0000 Subject: [PATCH 10/16] Fix path traversal risk in extension URL download filename; fix redundant except clause Agent-Logs-Url: https://github.com/github/spec-kit/sessions/0c7ae935-443c-4e90-ba92-7c3234a46673 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 54f6661a6f..8d1e972153 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -3663,7 +3663,8 @@ def extension_add( raise typer.Exit(1) suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip" - archive_path = download_dir / f"{extension}-url-download{suffix}" + safe_name = Path(extension).name or "extension" + archive_path = download_dir / f"{safe_name}-url-download{suffix}" archive_path.write_bytes(archive_data) # Install from downloaded archive @@ -5120,7 +5121,7 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: local_fmt = _detect_archive_format(source) try: wf_yaml = _extract_workflow_yml(source_path, local_fmt) - except (ValueError, Exception) as exc: + except Exception as exc: console.print(f"[red]Error:[/red] Failed to extract workflow from archive: {exc}") raise typer.Exit(1) import tempfile From e0495ebc38d1303ec906bab1bad34140ff56ddbc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 6 May 2026 21:27:40 +0000 Subject: [PATCH 11/16] Fix arc_tmp_path UnboundLocalError in workflow install; add preset symlink rejection test Agent-Logs-Url: https://github.com/github/spec-kit/sessions/0469bac5-296a-46b6-b84e-eb33b0dc0fce Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 17 +++++++++++------ tests/test_presets.py | 14 ++++++++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 8d1e972153..1da35136cc 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5079,16 +5079,18 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: if archive_fmt in ("tar.gz", "zip"): # Extract workflow.yml from the archive. suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip" + arc_tmp_path = None with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as arc_tmp: - arc_tmp.write(raw_data) arc_tmp_path = Path(arc_tmp.name) + arc_tmp.write(raw_data) try: wf_yaml = _extract_workflow_yml(arc_tmp_path, archive_fmt) with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: - tmp.write(wf_yaml) tmp_path = Path(tmp.name) + tmp.write(wf_yaml) finally: - arc_tmp_path.unlink(missing_ok=True) + if arc_tmp_path is not None: + arc_tmp_path.unlink(missing_ok=True) else: # Treat as a plain YAML file (existing behaviour). with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: @@ -5125,9 +5127,10 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: console.print(f"[red]Error:[/red] Failed to extract workflow from archive: {exc}") raise typer.Exit(1) import tempfile + tmp_local = None with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp: - tmp.write(wf_yaml) tmp_local = Path(tmp.name) + tmp.write(wf_yaml) try: _validate_and_install_local(tmp_local, str(source_path)) finally: @@ -5231,13 +5234,15 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: if cat_archive_fmt in ("tar.gz", "zip"): # Download URL points to an archive — extract workflow.yml from it. suffix = ".tar.gz" if cat_archive_fmt == "tar.gz" else ".zip" + arc_tmp = None with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as arc_f: - arc_f.write(raw_response) arc_tmp = Path(arc_f.name) + arc_f.write(raw_response) try: wf_yaml_bytes = _extract_workflow_yml(arc_tmp, cat_archive_fmt) finally: - arc_tmp.unlink(missing_ok=True) + if arc_tmp is not None: + arc_tmp.unlink(missing_ok=True) workflow_file.write_bytes(wf_yaml_bytes) else: workflow_file.write_bytes(raw_response) diff --git a/tests/test_presets.py b/tests/test_presets.py index f6f0a5a086..133600d994 100644 --- a/tests/test_presets.py +++ b/tests/test_presets.py @@ -719,6 +719,20 @@ def test_install_from_tar_gz_rejects_path_traversal(self, project_dir, temp_dir) with pytest.raises(PresetValidationError, match="Unsafe path"): manager.install_from_zip(archive, "0.1.5") + def test_install_from_tar_gz_rejects_symlinks(self, project_dir, temp_dir): + """install_from_zip must reject tarballs containing symlinks.""" + import tarfile + archive = temp_dir / "symlink.tar.gz" + with tarfile.open(archive, "w:gz") as tf: + info = tarfile.TarInfo(name="link") + info.type = tarfile.SYMTYPE + info.linkname = "/etc/passwd" + tf.addfile(info) + + manager = PresetManager(project_dir) + with pytest.raises(PresetValidationError, match="Symlinks"): + manager.install_from_zip(archive, "0.1.5") + def test_remove(self, project_dir, pack_dir): """Test removing a preset.""" manager = PresetManager(project_dir) From 0a02369ebe2177c28a54ec616d7b5f9f43383aa9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 6 May 2026 21:50:25 +0000 Subject: [PATCH 12/16] Make detect_archive_format/safe_extract_tarball public; add workflow add archive CLI tests Agent-Logs-Url: https://github.com/github/spec-kit/sessions/845e41d1-75e3-49fb-a580-a7fb805dd716 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 28 ++-- src/specify_cli/extensions.py | 12 +- src/specify_cli/presets.py | 10 +- tests/test_extensions.py | 8 +- tests/test_workflows.py | 238 ++++++++++++++++++++++++++++++++++ 5 files changed, 267 insertions(+), 29 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 1da35136cc..ce29f48ae6 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -2629,7 +2629,7 @@ def preset_add( import urllib.request import urllib.error import tempfile - from .extensions import _detect_archive_format as _det_fmt + from .extensions import detect_archive_format as _det_fmt with tempfile.TemporaryDirectory() as tmpdir: archive_fmt = _det_fmt(from_url) @@ -3628,7 +3628,7 @@ def extension_add( import urllib.request import urllib.error from urllib.parse import urlparse - from .extensions import _detect_archive_format + from .extensions import detect_archive_format # Validate URL parsed = urlparse(from_url) @@ -3647,14 +3647,14 @@ def extension_add( # Download archive to temp location; detect format from URL or Content-Type. download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads" download_dir.mkdir(parents=True, exist_ok=True) - archive_fmt = _detect_archive_format(from_url) + archive_fmt = detect_archive_format(from_url) archive_path = None try: with urllib.request.urlopen(from_url, timeout=60) as response: if not archive_fmt: content_type = response.headers.get("Content-Type", "") - archive_fmt = _detect_archive_format(from_url, content_type) + archive_fmt = detect_archive_format(from_url, content_type) archive_data = response.read() if not archive_fmt: @@ -4331,9 +4331,9 @@ def extension_update( try: # 6. Validate extension ID from archive BEFORE modifying installation # Handle both root-level and nested extension.yml (GitHub auto-generated archives) - from .extensions import _detect_archive_format + from .extensions import detect_archive_format import tarfile - archive_fmt = _detect_archive_format(str(archive_path)) + archive_fmt = detect_archive_format(str(archive_path)) import yaml manifest_data = None @@ -5029,7 +5029,7 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: from ipaddress import ip_address from urllib.parse import urlparse from urllib.request import urlopen # noqa: S310 - from .extensions import _detect_archive_format + from .extensions import detect_archive_format parsed_src = urlparse(source) src_host = parsed_src.hostname or "" @@ -5062,10 +5062,10 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: raise typer.Exit(1) # Detect archive format from the final URL or Content-Type header. - archive_fmt = _detect_archive_format(final_url) + archive_fmt = detect_archive_format(final_url) if not archive_fmt: content_type = resp.headers.get("Content-Type", "") - archive_fmt = _detect_archive_format(final_url, content_type) + archive_fmt = detect_archive_format(final_url, content_type) raw_data = resp.read() except typer.Exit: @@ -5119,8 +5119,8 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: source.lower().endswith(".tar.gz") or source.lower().endswith(".tgz") or source.lower().endswith(".zip") ): # Local archive file containing workflow.yml - from .extensions import _detect_archive_format - local_fmt = _detect_archive_format(source) + from .extensions import detect_archive_format + local_fmt = detect_archive_format(source) try: wf_yaml = _extract_workflow_yml(source_path, local_fmt) except Exception as exc: @@ -5199,7 +5199,7 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: try: from urllib.request import urlopen # noqa: S310 — URL comes from catalog - from .extensions import _detect_archive_format + from .extensions import detect_archive_format workflow_dir.mkdir(parents=True, exist_ok=True) with urlopen(workflow_url, timeout=30) as response: # noqa: S310 @@ -5224,10 +5224,10 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None: raise typer.Exit(1) # Detect archive format from the final URL or Content-Type header. - cat_archive_fmt = _detect_archive_format(final_url) + cat_archive_fmt = detect_archive_format(final_url) if not cat_archive_fmt: cat_ct = response.headers.get("Content-Type", "") - cat_archive_fmt = _detect_archive_format(final_url, cat_ct) + cat_archive_fmt = detect_archive_format(final_url, cat_ct) raw_response = response.read() diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions.py index 3b25bbfbe5..f8c4c3925a 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions.py @@ -108,7 +108,7 @@ def normalize_priority(value: Any, default: int = 10) -> int: return priority if priority >= 1 else default -def _detect_archive_format(url: str, content_type: str = "") -> str: +def detect_archive_format(url: str, content_type: str = "") -> str: """Detect archive format from URL path extension or Content-Type header. Args: @@ -143,7 +143,7 @@ def _detect_archive_format(url: str, content_type: str = "") -> str: return "" -def _safe_extract_tarball( +def safe_extract_tarball( archive_path: Path, dest_dir: Path, error_class: "type[Exception]" = Exception, @@ -1340,11 +1340,11 @@ def install_from_zip( with tempfile.TemporaryDirectory() as tmpdir: temp_path = Path(tmpdir) - archive_fmt = _detect_archive_format(str(zip_path)) + archive_fmt = detect_archive_format(str(zip_path)) if archive_fmt == "tar.gz": # Extract tarball safely (prevent tar slip attack) - _safe_extract_tarball(zip_path, temp_path, ValidationError) + safe_extract_tarball(zip_path, temp_path, ValidationError) else: # Extract ZIP safely (prevent Zip Slip attack) with zipfile.ZipFile(zip_path, 'r') as zf: @@ -2140,14 +2140,14 @@ def download_extension(self, extension_id: str, target_dir: Optional[Path] = Non version = ext_info.get("version", "unknown") # Detect archive format from URL; resolve via Content-Type when needed. - archive_fmt = _detect_archive_format(download_url) + archive_fmt = detect_archive_format(download_url) # Download the archive try: with self._open_url(download_url, timeout=60) as response: if not archive_fmt: content_type = response.headers.get("Content-Type", "") - archive_fmt = _detect_archive_format(download_url, content_type) + archive_fmt = detect_archive_format(download_url, content_type) archive_data = response.read() except urllib.error.URLError as e: diff --git a/src/specify_cli/presets.py b/src/specify_cli/presets.py index e270bb504c..16b63862cc 100644 --- a/src/specify_cli/presets.py +++ b/src/specify_cli/presets.py @@ -27,7 +27,7 @@ from packaging import version as pkg_version from packaging.specifiers import SpecifierSet, InvalidSpecifier -from .extensions import ExtensionRegistry, normalize_priority, _detect_archive_format, _safe_extract_tarball +from .extensions import ExtensionRegistry, normalize_priority, detect_archive_format, safe_extract_tarball def _substitute_core_template( @@ -1626,11 +1626,11 @@ def install_from_zip( with tempfile.TemporaryDirectory() as tmpdir: temp_path = Path(tmpdir) - archive_fmt = _detect_archive_format(str(zip_path)) + archive_fmt = detect_archive_format(str(zip_path)) if archive_fmt == "tar.gz": # Extract tarball safely (prevent tar slip attack) - _safe_extract_tarball(zip_path, temp_path, PresetValidationError) + safe_extract_tarball(zip_path, temp_path, PresetValidationError) else: with zipfile.ZipFile(zip_path, 'r') as zf: temp_path_resolved = temp_path.resolve() @@ -2314,13 +2314,13 @@ def download_pack( version = pack_info.get("version", "unknown") # Detect archive format from URL; resolve via Content-Type when needed. - archive_fmt = _detect_archive_format(download_url) + archive_fmt = detect_archive_format(download_url) try: with self._open_url(download_url, timeout=60) as response: if not archive_fmt: content_type = response.headers.get("Content-Type", "") - archive_fmt = _detect_archive_format(download_url, content_type) + archive_fmt = detect_archive_format(download_url, content_type) archive_data = response.read() except urllib.error.URLError as e: diff --git a/tests/test_extensions.py b/tests/test_extensions.py index 6310140070..5cb79472cc 100644 --- a/tests/test_extensions.py +++ b/tests/test_extensions.py @@ -178,14 +178,14 @@ def test_custom_default(self): assert normalize_priority("invalid", default=1) == 1 -# ===== _detect_archive_format Tests ===== +# ===== detect_archive_format Tests ===== class TestDetectArchiveFormat: - """Test the _detect_archive_format helper.""" + """Test the detect_archive_format helper.""" def _fmt(self, url, ct=""): - from specify_cli.extensions import _detect_archive_format - return _detect_archive_format(url, ct) + from specify_cli.extensions import detect_archive_format + return detect_archive_format(url, ct) def test_zip_url_extension(self): assert self._fmt("https://example.com/ext-1.0.0.zip") == "zip" diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 4c042fc7d5..2b6c87f04a 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1843,3 +1843,241 @@ def test_switch_workflow(self, project_dir): assert state.status == RunStatus.COMPLETED assert "do-plan" in state.step_results assert "do-specify" not in state.step_results + + +# ===== workflow add archive CLI tests ===== + +MINIMAL_WORKFLOW_YAML = """\ +schema_version: "1.0" +workflow: + id: "arc-workflow" + name: "Archive Workflow" + version: "1.0.0" + description: "Installed from archive" +steps: + - id: step-one + type: shell + run: "echo hello" +""" + + +class TestWorkflowAddArchive: + """CLI-level tests for `workflow add` with local archive files.""" + + @pytest.fixture + def project_dir(self, tmp_path): + """Create a minimal spec-kit project.""" + specify = tmp_path / ".specify" + specify.mkdir() + (specify / "workflows").mkdir() + return tmp_path + + def _runner_and_app(self): + from typer.testing import CliRunner + from specify_cli import app + return CliRunner(), app + + # -- Local ZIP archive -------------------------------------------------- + + def test_workflow_add_local_zip_flat(self, project_dir): + """workflow add installs from a local ZIP with workflow.yml at root.""" + import zipfile + runner, app = self._runner_and_app() + + archive = project_dir / "workflow.zip" + with zipfile.ZipFile(archive, "w") as zf: + zf.writestr("workflow.yml", MINIMAL_WORKFLOW_YAML) + + with __import__("unittest.mock", fromlist=["patch"]).patch.object( + __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir + ): + result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False) + + assert result.exit_code == 0, result.output + assert "arc-workflow" in result.output + installed = project_dir / ".specify" / "workflows" / "arc-workflow" / "workflow.yml" + assert installed.exists() + + def test_workflow_add_local_zip_nested(self, project_dir): + """workflow add installs from a local ZIP with workflow.yml in a subdirectory.""" + import zipfile + runner, app = self._runner_and_app() + + archive = project_dir / "workflow.zip" + with zipfile.ZipFile(archive, "w") as zf: + zf.writestr("repo-1.0/workflow.yml", MINIMAL_WORKFLOW_YAML) + + with __import__("unittest.mock", fromlist=["patch"]).patch.object( + __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir + ): + result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False) + + assert result.exit_code == 0, result.output + assert "arc-workflow" in result.output + + def test_workflow_add_local_zip_missing_workflow_yml(self, project_dir): + """workflow add exits with an error when the ZIP has no workflow.yml.""" + import zipfile + runner, app = self._runner_and_app() + + archive = project_dir / "empty.zip" + with zipfile.ZipFile(archive, "w") as zf: + zf.writestr("README.md", "nothing here") + + with __import__("unittest.mock", fromlist=["patch"]).patch.object( + __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir + ): + result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=True) + + assert result.exit_code != 0 + assert "extract" in result.output.lower() or "workflow" in result.output.lower() + + # -- Local tar.gz archive ----------------------------------------------- + + def test_workflow_add_local_tar_gz_flat(self, project_dir): + """workflow add installs from a local .tar.gz with workflow.yml at root.""" + import tarfile, io + runner, app = self._runner_and_app() + + archive = project_dir / "workflow.tar.gz" + with tarfile.open(archive, "w:gz") as tf: + data = MINIMAL_WORKFLOW_YAML.encode() + info = tarfile.TarInfo(name="workflow.yml") + info.size = len(data) + tf.addfile(info, io.BytesIO(data)) + + with __import__("unittest.mock", fromlist=["patch"]).patch.object( + __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir + ): + result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False) + + assert result.exit_code == 0, result.output + assert "arc-workflow" in result.output + installed = project_dir / ".specify" / "workflows" / "arc-workflow" / "workflow.yml" + assert installed.exists() + + def test_workflow_add_local_tar_gz_nested(self, project_dir): + """workflow add installs from a local .tar.gz with workflow.yml in a subdirectory.""" + import tarfile, io + runner, app = self._runner_and_app() + + archive = project_dir / "workflow.tar.gz" + with tarfile.open(archive, "w:gz") as tf: + data = MINIMAL_WORKFLOW_YAML.encode() + info = tarfile.TarInfo(name="repo-1.0/workflow.yml") + info.size = len(data) + tf.addfile(info, io.BytesIO(data)) + + with __import__("unittest.mock", fromlist=["patch"]).patch.object( + __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir + ): + result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False) + + assert result.exit_code == 0, result.output + assert "arc-workflow" in result.output + + def test_workflow_add_local_tgz_flat(self, project_dir): + """workflow add recognises the .tgz extension as a gzipped tarball.""" + import tarfile, io + runner, app = self._runner_and_app() + + archive = project_dir / "workflow.tgz" + with tarfile.open(archive, "w:gz") as tf: + data = MINIMAL_WORKFLOW_YAML.encode() + info = tarfile.TarInfo(name="workflow.yml") + info.size = len(data) + tf.addfile(info, io.BytesIO(data)) + + with __import__("unittest.mock", fromlist=["patch"]).patch.object( + __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir + ): + result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False) + + assert result.exit_code == 0, result.output + assert "arc-workflow" in result.output + + def test_workflow_add_local_tar_gz_missing_workflow_yml(self, project_dir): + """workflow add exits with an error when the .tar.gz has no workflow.yml.""" + import tarfile, io + runner, app = self._runner_and_app() + + archive = project_dir / "empty.tar.gz" + with tarfile.open(archive, "w:gz") as tf: + data = b"nothing" + info = tarfile.TarInfo(name="README.md") + info.size = len(data) + tf.addfile(info, io.BytesIO(data)) + + with __import__("unittest.mock", fromlist=["patch"]).patch.object( + __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir + ): + result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=True) + + assert result.exit_code != 0 + assert "extract" in result.output.lower() or "workflow" in result.output.lower() + + # -- URL archive download ----------------------------------------------- + + def test_workflow_add_url_tar_gz(self, project_dir): + """workflow add downloads a .tar.gz from a URL and installs the workflow.""" + import tarfile, io + from unittest.mock import patch, MagicMock + runner, app = self._runner_and_app() + + # Build an in-memory tar.gz archive containing workflow.yml. + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tf: + data = MINIMAL_WORKFLOW_YAML.encode() + info = tarfile.TarInfo(name="workflow.yml") + info.size = len(data) + tf.addfile(info, io.BytesIO(data)) + raw_bytes = buf.getvalue() + + mock_resp = MagicMock() + mock_resp.geturl.return_value = "https://example.com/workflow.tar.gz" + mock_resp.headers.get.return_value = "application/gzip" + mock_resp.read.return_value = raw_bytes + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + + with patch("urllib.request.urlopen", return_value=mock_resp), \ + patch.object( + __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir + ): + result = runner.invoke( + app, ["workflow", "add", "https://example.com/workflow.tar.gz"], + catch_exceptions=False, + ) + + assert result.exit_code == 0, result.output + assert "arc-workflow" in result.output + + def test_workflow_add_url_zip(self, project_dir): + """workflow add downloads a .zip from a URL and installs the workflow.""" + import zipfile, io + from unittest.mock import patch, MagicMock + runner, app = self._runner_and_app() + + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("workflow.yml", MINIMAL_WORKFLOW_YAML) + raw_bytes = buf.getvalue() + + mock_resp = MagicMock() + mock_resp.geturl.return_value = "https://example.com/workflow.zip" + mock_resp.headers.get.return_value = "application/zip" + mock_resp.read.return_value = raw_bytes + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + + with patch("urllib.request.urlopen", return_value=mock_resp), \ + patch.object( + __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir + ): + result = runner.invoke( + app, ["workflow", "add", "https://example.com/workflow.zip"], + catch_exceptions=False, + ) + + assert result.exit_code == 0, result.output + assert "arc-workflow" in result.output From 1786d27c062dd60b9f8c54175e4c736b7a91eaf1 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Thu, 7 May 2026 08:17:45 -0500 Subject: [PATCH 13/16] Address PR review: fix extractfile fallback and add OSError handling - Fix tar.gz extractfile() None fallback in extension_update: nested-directory search now runs whenever manifest_data is still None, not only on KeyError - Add OSError handling around write_bytes in preset --from URL path - Add OSError handling around write_bytes in extension --from URL path --- src/specify_cli/__init__.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index ce29f48ae6..572e6c2548 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -2650,8 +2650,12 @@ def preset_add( suffix = ".tar.gz" if archive_fmt == "tar.gz" else ".zip" archive_path = Path(tmpdir) / f"preset{suffix}" - archive_path.write_bytes(archive_data) - manifest = manager.install_from_zip(archive_path, speckit_version, priority) + try: + archive_path.write_bytes(archive_data) + manifest = manager.install_from_zip(archive_path, speckit_version, priority) + except OSError as e: + console.print(f"[red]Error:[/red] Failed to save or install archive: {e}") + raise typer.Exit(1) console.print(f"[green]✓[/green] Preset '{manifest.name}' v{manifest.version} installed (priority {priority})") @@ -3672,6 +3676,9 @@ def extension_add( except urllib.error.URLError as e: console.print(f"[red]Error:[/red] Failed to download from {from_url}: {e}") raise typer.Exit(1) + except OSError as e: + console.print(f"[red]Error:[/red] Failed to save or install archive: {e}") + raise typer.Exit(1) finally: # Clean up the downloaded archive if archive_path is not None and archive_path.exists(): @@ -4347,7 +4354,10 @@ def extension_update( with f: manifest_data = yaml.safe_load(f.read()) or {} except KeyError: - # Look for extension.yml in a single top-level subdirectory + pass + # Fall back to nested-directory search if root-level + # was missing (KeyError) or not a regular file (None). + if manifest_data is None: members = [m for m in tf.getmembers() if m.name.endswith("/extension.yml") and m.name.count("/") == 1] if len(members) == 1: f = tf.extractfile(members[0]) From bd04937927bafa0f932ee67c1678df2e07430dda Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Thu, 7 May 2026 11:39:51 -0500 Subject: [PATCH 14/16] Potential fix for pull request finding 'Empty except' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> --- src/specify_cli/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 572e6c2548..cac17d72dc 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4354,7 +4354,8 @@ def extension_update( with f: manifest_data = yaml.safe_load(f.read()) or {} except KeyError: - pass + # extension.yml not present at archive root; use nested fallback below. + manifest_data = None # Fall back to nested-directory search if root-level # was missing (KeyError) or not a regular file (None). if manifest_data is None: From 05798a9e700ded7ff4b044a94ba2f586eb708853 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 17:57:01 +0000 Subject: [PATCH 15/16] Skip PAX/GNU metadata members in safe_extract_tarball; use standard mock imports in workflow tests Agent-Logs-Url: https://github.com/github/spec-kit/sessions/c1fcc1ff-8766-4d97-90a5-368447980acf Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/extensions.py | 15 +++++++++++- tests/test_workflows.py | 43 +++++++++++++---------------------- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions.py index f8c4c3925a..7d288aabf6 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions.py @@ -170,6 +170,15 @@ def safe_extract_tarball( error_class: If any member is unsafe or the archive cannot be read. """ dest_resolved = dest_dir.resolve() + # Tar metadata member types to skip during validation — they carry no + # extractable payload and are generated automatically by many common + # archiving tools (e.g. PAX headers, GNU longname/longlink entries). + _TAR_METADATA_TYPES = ( + tarfile.XHDTYPE, # PAX extended header + tarfile.XGLTYPE, # PAX global extended header + tarfile.SOLARIS_XHDTYPE, # Solaris PAX extended header + *tarfile.GNU_TYPES, # GNU longname / longlink / sparse + ) try: with tarfile.open(archive_path, "r:gz") as tf: @@ -195,13 +204,17 @@ def safe_extract_tarball( f"Unsafe path in tar archive: {member.name} (potential path traversal)" ) + # Skip tar metadata members — they carry no extractable payload. + if member.type in _TAR_METADATA_TYPES: + continue + # Reject symlinks and hard links. if member.issym() or member.islnk(): raise error_class( f"Symlinks are not allowed in archive: {member.name}" ) - # Only allow regular files and directories. + # Reject devices, FIFOs and other special file types. if not (member.isreg() or member.isdir()): raise error_class( f"Non-regular file in archive: {member.name}" diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 2b6c87f04a..ce382d88ac 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1882,15 +1882,14 @@ def _runner_and_app(self): def test_workflow_add_local_zip_flat(self, project_dir): """workflow add installs from a local ZIP with workflow.yml at root.""" import zipfile + from unittest.mock import patch runner, app = self._runner_and_app() archive = project_dir / "workflow.zip" with zipfile.ZipFile(archive, "w") as zf: zf.writestr("workflow.yml", MINIMAL_WORKFLOW_YAML) - with __import__("unittest.mock", fromlist=["patch"]).patch.object( - __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir - ): + with patch.object(Path, "cwd", return_value=project_dir): result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False) assert result.exit_code == 0, result.output @@ -1901,15 +1900,14 @@ def test_workflow_add_local_zip_flat(self, project_dir): def test_workflow_add_local_zip_nested(self, project_dir): """workflow add installs from a local ZIP with workflow.yml in a subdirectory.""" import zipfile + from unittest.mock import patch runner, app = self._runner_and_app() archive = project_dir / "workflow.zip" with zipfile.ZipFile(archive, "w") as zf: zf.writestr("repo-1.0/workflow.yml", MINIMAL_WORKFLOW_YAML) - with __import__("unittest.mock", fromlist=["patch"]).patch.object( - __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir - ): + with patch.object(Path, "cwd", return_value=project_dir): result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False) assert result.exit_code == 0, result.output @@ -1918,15 +1916,14 @@ def test_workflow_add_local_zip_nested(self, project_dir): def test_workflow_add_local_zip_missing_workflow_yml(self, project_dir): """workflow add exits with an error when the ZIP has no workflow.yml.""" import zipfile + from unittest.mock import patch runner, app = self._runner_and_app() archive = project_dir / "empty.zip" with zipfile.ZipFile(archive, "w") as zf: zf.writestr("README.md", "nothing here") - with __import__("unittest.mock", fromlist=["patch"]).patch.object( - __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir - ): + with patch.object(Path, "cwd", return_value=project_dir): result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=True) assert result.exit_code != 0 @@ -1937,6 +1934,7 @@ def test_workflow_add_local_zip_missing_workflow_yml(self, project_dir): def test_workflow_add_local_tar_gz_flat(self, project_dir): """workflow add installs from a local .tar.gz with workflow.yml at root.""" import tarfile, io + from unittest.mock import patch runner, app = self._runner_and_app() archive = project_dir / "workflow.tar.gz" @@ -1946,9 +1944,7 @@ def test_workflow_add_local_tar_gz_flat(self, project_dir): info.size = len(data) tf.addfile(info, io.BytesIO(data)) - with __import__("unittest.mock", fromlist=["patch"]).patch.object( - __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir - ): + with patch.object(Path, "cwd", return_value=project_dir): result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False) assert result.exit_code == 0, result.output @@ -1959,6 +1955,7 @@ def test_workflow_add_local_tar_gz_flat(self, project_dir): def test_workflow_add_local_tar_gz_nested(self, project_dir): """workflow add installs from a local .tar.gz with workflow.yml in a subdirectory.""" import tarfile, io + from unittest.mock import patch runner, app = self._runner_and_app() archive = project_dir / "workflow.tar.gz" @@ -1968,9 +1965,7 @@ def test_workflow_add_local_tar_gz_nested(self, project_dir): info.size = len(data) tf.addfile(info, io.BytesIO(data)) - with __import__("unittest.mock", fromlist=["patch"]).patch.object( - __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir - ): + with patch.object(Path, "cwd", return_value=project_dir): result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False) assert result.exit_code == 0, result.output @@ -1979,6 +1974,7 @@ def test_workflow_add_local_tar_gz_nested(self, project_dir): def test_workflow_add_local_tgz_flat(self, project_dir): """workflow add recognises the .tgz extension as a gzipped tarball.""" import tarfile, io + from unittest.mock import patch runner, app = self._runner_and_app() archive = project_dir / "workflow.tgz" @@ -1988,9 +1984,7 @@ def test_workflow_add_local_tgz_flat(self, project_dir): info.size = len(data) tf.addfile(info, io.BytesIO(data)) - with __import__("unittest.mock", fromlist=["patch"]).patch.object( - __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir - ): + with patch.object(Path, "cwd", return_value=project_dir): result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=False) assert result.exit_code == 0, result.output @@ -1999,6 +1993,7 @@ def test_workflow_add_local_tgz_flat(self, project_dir): def test_workflow_add_local_tar_gz_missing_workflow_yml(self, project_dir): """workflow add exits with an error when the .tar.gz has no workflow.yml.""" import tarfile, io + from unittest.mock import patch runner, app = self._runner_and_app() archive = project_dir / "empty.tar.gz" @@ -2008,9 +2003,7 @@ def test_workflow_add_local_tar_gz_missing_workflow_yml(self, project_dir): info.size = len(data) tf.addfile(info, io.BytesIO(data)) - with __import__("unittest.mock", fromlist=["patch"]).patch.object( - __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir - ): + with patch.object(Path, "cwd", return_value=project_dir): result = runner.invoke(app, ["workflow", "add", str(archive)], catch_exceptions=True) assert result.exit_code != 0 @@ -2041,9 +2034,7 @@ def test_workflow_add_url_tar_gz(self, project_dir): mock_resp.__exit__ = MagicMock(return_value=False) with patch("urllib.request.urlopen", return_value=mock_resp), \ - patch.object( - __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir - ): + patch.object(Path, "cwd", return_value=project_dir): result = runner.invoke( app, ["workflow", "add", "https://example.com/workflow.tar.gz"], catch_exceptions=False, @@ -2071,9 +2062,7 @@ def test_workflow_add_url_zip(self, project_dir): mock_resp.__exit__ = MagicMock(return_value=False) with patch("urllib.request.urlopen", return_value=mock_resp), \ - patch.object( - __import__("pathlib", fromlist=["Path"]).Path, "cwd", return_value=project_dir - ): + patch.object(Path, "cwd", return_value=project_dir): result = runner.invoke( app, ["workflow", "add", "https://example.com/workflow.zip"], catch_exceptions=False, From 1015ff24da703e0e03a7ee9ad0dcbaa800bee208 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 18:53:25 +0000 Subject: [PATCH 16/16] Fix GNU sparse skip in safe_extract_tarball; use response.geturl() for redirect-safe format detection and HTTPS re-check Agent-Logs-Url: https://github.com/github/spec-kit/sessions/739d3f73-200b-417a-8a86-134329200560 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 22 ++++++++++++++++++++-- src/specify_cli/extensions.py | 27 ++++++++++++++++++++++----- src/specify_cli/presets.py | 15 ++++++++++++++- tests/test_extensions.py | 2 ++ tests/test_presets.py | 1 + 5 files changed, 59 insertions(+), 8 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index cac17d72dc..e24ab7b80c 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -2633,16 +2633,25 @@ def preset_add( with tempfile.TemporaryDirectory() as tmpdir: archive_fmt = _det_fmt(from_url) + final_url = from_url try: with urllib.request.urlopen(from_url, timeout=60) as response: + final_url = response.geturl() if not archive_fmt: content_type = response.headers.get("Content-Type", "") - archive_fmt = _det_fmt(from_url, content_type) + archive_fmt = _det_fmt(final_url, content_type) archive_data = response.read() except urllib.error.URLError as e: console.print(f"[red]Error:[/red] Failed to download: {e}") raise typer.Exit(1) + # Re-validate scheme after any redirect (scheme-downgrade guard). + _fp = _urlparse(final_url) + _fl = _fp.hostname in ("localhost", "127.0.0.1", "::1") + if _fp.scheme != "https" and not (_fp.scheme == "http" and _fl): + console.print(f"[red]Error:[/red] URL was redirected to a non-HTTPS URL: {final_url}") + raise typer.Exit(1) + if not archive_fmt: console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.") console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.") @@ -3652,15 +3661,24 @@ def extension_add( download_dir = project_root / ".specify" / "extensions" / ".cache" / "downloads" download_dir.mkdir(parents=True, exist_ok=True) archive_fmt = detect_archive_format(from_url) + final_url = from_url archive_path = None try: with urllib.request.urlopen(from_url, timeout=60) as response: + final_url = response.geturl() if not archive_fmt: content_type = response.headers.get("Content-Type", "") - archive_fmt = detect_archive_format(from_url, content_type) + archive_fmt = detect_archive_format(final_url, content_type) archive_data = response.read() + # Re-validate scheme after any redirect (scheme-downgrade guard). + _fp = urlparse(final_url) + _fl = _fp.hostname in ("localhost", "127.0.0.1", "::1") + if _fp.scheme != "https" and not (_fp.scheme == "http" and _fl): + console.print(f"[red]Error:[/red] URL was redirected to a non-HTTPS URL: {final_url}") + raise typer.Exit(1) + if not archive_fmt: console.print("[red]Error:[/red] Could not determine archive format from URL or Content-Type.") console.print("Ensure the URL points to a .zip or .tar.gz/.tgz file.") diff --git a/src/specify_cli/extensions.py b/src/specify_cli/extensions.py index 7d288aabf6..0ba5dfb733 100644 --- a/src/specify_cli/extensions.py +++ b/src/specify_cli/extensions.py @@ -173,11 +173,15 @@ def safe_extract_tarball( # Tar metadata member types to skip during validation — they carry no # extractable payload and are generated automatically by many common # archiving tools (e.g. PAX headers, GNU longname/longlink entries). + # GNUTYPE_SPARSE is intentionally excluded: it carries a real file payload + # and tarfile.TarInfo.isreg() returns True for it, so it passes the + # regular-file check below and is extracted correctly. _TAR_METADATA_TYPES = ( - tarfile.XHDTYPE, # PAX extended header - tarfile.XGLTYPE, # PAX global extended header - tarfile.SOLARIS_XHDTYPE, # Solaris PAX extended header - *tarfile.GNU_TYPES, # GNU longname / longlink / sparse + tarfile.XHDTYPE, # PAX extended header + tarfile.XGLTYPE, # PAX global extended header + tarfile.SOLARIS_XHDTYPE, # Solaris PAX extended header + tarfile.GNUTYPE_LONGNAME, # GNU long path name (metadata only) + tarfile.GNUTYPE_LONGLINK, # GNU long link name (metadata only) ) try: @@ -2153,14 +2157,17 @@ def download_extension(self, extension_id: str, target_dir: Optional[Path] = Non version = ext_info.get("version", "unknown") # Detect archive format from URL; resolve via Content-Type when needed. + # `final_url` may differ from `download_url` if the server redirects. archive_fmt = detect_archive_format(download_url) + final_url = download_url # Download the archive try: with self._open_url(download_url, timeout=60) as response: + final_url = response.geturl() if not archive_fmt: content_type = response.headers.get("Content-Type", "") - archive_fmt = detect_archive_format(download_url, content_type) + archive_fmt = detect_archive_format(final_url, content_type) archive_data = response.read() except urllib.error.URLError as e: @@ -2168,6 +2175,16 @@ def download_extension(self, extension_id: str, target_dir: Optional[Path] = Non except IOError as e: raise ExtensionError(f"Failed to read extension archive from {download_url}: {e}") + # Re-validate scheme after any redirect to guard against scheme-downgrade. + _final_parsed = urlparse(final_url) + _final_is_localhost = _final_parsed.hostname in ("localhost", "127.0.0.1", "::1") + if _final_parsed.scheme != "https" and not ( + _final_parsed.scheme == "http" and _final_is_localhost + ): + raise ExtensionError( + f"Extension download URL was redirected to a non-HTTPS URL: {final_url}" + ) + # Choose file extension based on detected format. if not archive_fmt: raise ExtensionError( diff --git a/src/specify_cli/presets.py b/src/specify_cli/presets.py index 16b63862cc..532205f50a 100644 --- a/src/specify_cli/presets.py +++ b/src/specify_cli/presets.py @@ -2314,13 +2314,16 @@ def download_pack( version = pack_info.get("version", "unknown") # Detect archive format from URL; resolve via Content-Type when needed. + # `final_url` may differ from `download_url` if the server redirects. archive_fmt = detect_archive_format(download_url) + final_url = download_url try: with self._open_url(download_url, timeout=60) as response: + final_url = response.geturl() if not archive_fmt: content_type = response.headers.get("Content-Type", "") - archive_fmt = detect_archive_format(download_url, content_type) + archive_fmt = detect_archive_format(final_url, content_type) archive_data = response.read() except urllib.error.URLError as e: @@ -2330,6 +2333,16 @@ def download_pack( except IOError as e: raise PresetError(f"Failed to read preset archive from {download_url}: {e}") + # Re-validate scheme after any redirect to guard against scheme-downgrade. + _final_parsed = urlparse(final_url) + _final_is_localhost = _final_parsed.hostname in ("localhost", "127.0.0.1", "::1") + if _final_parsed.scheme != "https" and not ( + _final_parsed.scheme == "http" and _final_is_localhost + ): + raise PresetError( + f"Preset download URL was redirected to a non-HTTPS URL: {final_url}" + ) + # Choose file extension based on detected format. if not archive_fmt: raise PresetError( diff --git a/tests/test_extensions.py b/tests/test_extensions.py index 5cb79472cc..7028f8e3f1 100644 --- a/tests/test_extensions.py +++ b/tests/test_extensions.py @@ -2759,6 +2759,7 @@ def test_download_extension_sends_auth_header(self, temp_dir, monkeypatch): mock_response = MagicMock() mock_response.read.return_value = zip_bytes + mock_response.geturl.return_value = "https://github.com/org/repo/releases/download/v1/test-ext.zip" mock_response.__enter__ = lambda s: s mock_response.__exit__ = MagicMock(return_value=False) @@ -3661,6 +3662,7 @@ def test_download_extension_allows_bundled_with_url(self, temp_dir): mock_response = MagicMock() mock_response.read.return_value = b"fake zip data" + mock_response.geturl.return_value = "https://example.com/git-2.0.0.zip" mock_response.__enter__ = lambda s: s mock_response.__exit__ = MagicMock(return_value=False) diff --git a/tests/test_presets.py b/tests/test_presets.py index 133600d994..a527e68d0b 100644 --- a/tests/test_presets.py +++ b/tests/test_presets.py @@ -1613,6 +1613,7 @@ def test_download_pack_sends_auth_header(self, project_dir, monkeypatch): mock_response = MagicMock() mock_response.read.return_value = zip_bytes + mock_response.geturl.return_value = "https://github.com/org/repo/releases/download/v1/test-pack.zip" mock_response.__enter__ = lambda s: s mock_response.__exit__ = MagicMock(return_value=False)