diff --git a/.claude/ci/check-ci b/.claude/ci/check-ci index 13b8392182..6f8229eb53 100755 --- a/.claude/ci/check-ci +++ b/.claude/ci/check-ci @@ -6,7 +6,7 @@ # ] # /// -"""Monitor a GitLab CI pipeline until all jobs complete or a failure threshold is reached.""" +"""Monitor a GitLab CI pipeline (and GitHub Actions) until all jobs complete or a failure threshold is reached.""" import argparse import asyncio @@ -14,8 +14,9 @@ import os import subprocess import sys import time -from datetime import datetime, timezone +from datetime import datetime from pathlib import Path +from typing import Any import aiohttp @@ -23,15 +24,23 @@ GITLAB_HOST = "https://gitlab.ddbuild.io" PROJECT_ID = 355 API_BASE = f"{GITLAB_HOST}/api/v4" +GITHUB_API_BASE = "https://api.github.com" +GITHUB_REPO = "DataDog/dd-trace-php" +GH_FAILURE_CONCLUSIONS = {"failure", "timed_out", "action_required"} + DONE_STATUSES = {"success", "failed", "canceled", "skipped"} ACTIVE_STATUSES = {"running", "pending", "created"} +class GitLabError(Exception): + """Raised on fatal GitLab API errors (auth failure, network error, timeout).""" + + def parse_args(): - parser = argparse.ArgumentParser(description="Monitor a GitLab CI pipeline.") + parser = argparse.ArgumentParser(description="Monitor a GitLab CI pipeline and GitHub Actions workflows.") group = parser.add_mutually_exclusive_group() group.add_argument("--commit", type=str, default=None, help="Git ref to resolve to a SHA (default: HEAD)") - group.add_argument("--pipeline", type=int, default=None, help="Pipeline ID to monitor directly") + group.add_argument("--pipeline", type=int, default=None, help="Pipeline ID to monitor directly (GitLab only)") parser.add_argument("--discovery-timeout", type=int, default=60, help="Seconds to wait for pipeline discovery (default: 60)") parser.add_argument("--poll-interval", type=int, default=60, help="Seconds between poll cycles (default: 60)") parser.add_argument("--max-failures", type=int, default=50, help="Stop after this many failures (default: 50)") @@ -48,25 +57,46 @@ def resolve_sha(ref: str) -> str: return result.stdout.strip() -async def api_get(session: aiohttp.ClientSession, path: str, params: dict | None = None) -> tuple[int, any]: - """Make a GET request. Returns (status_code, json_or_text).""" +def get_github_token() -> str | None: + """Get GitHub token via ddtool auth github token.""" + try: + result = subprocess.run( + ["ddtool", "auth", "github", "token"], + capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + token = result.stdout.strip() + return token if token else None + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + return None + + +# --------------------------------------------------------------------------- +# GitLab API helpers +# --------------------------------------------------------------------------- + +async def api_get(session: aiohttp.ClientSession, path: str, params: dict | None = None) -> tuple[int, Any, dict]: + """Make a GET request. Returns (status_code, json_or_text, headers).""" url = f"{API_BASE}{path}" try: async with session.get(url, params=params) as resp: if resp.status in (401, 403): text = await resp.text() - print(f"Error: authentication failed ({resp.status}): {text}", file=sys.stderr) - sys.exit(1) + msg = f"Error: authentication failed ({resp.status}): {text}" + print(msg, file=sys.stderr) + raise GitLabError(msg) + headers = dict(resp.headers) if resp.content_type and "json" in resp.content_type: - return resp.status, await resp.json() - return resp.status, await resp.text() + return resp.status, await resp.json(), headers + return resp.status, await resp.text(), headers except aiohttp.ClientError as e: - return 0, str(e) + return 0, str(e), {} -async def api_get_json(session: aiohttp.ClientSession, path: str, params: dict | None = None) -> any: +async def api_get_json(session: aiohttp.ClientSession, path: str, params: dict | None = None) -> Any: """GET request expecting JSON. Returns None on network error.""" - status, data = await api_get(session, path, params) + status, data, _ = await api_get(session, path, params) if status == 0: print(f"Warning: network error fetching {path}: {data}", file=sys.stderr) return None @@ -78,28 +108,43 @@ async def api_get_json(session: aiohttp.ClientSession, path: str, params: dict | async def api_get_text(session: aiohttp.ClientSession, path: str) -> str | None: """GET request expecting text. Returns None on error.""" - status, data = await api_get(session, path) + status, data, _ = await api_get(session, path) if status == 0 or status >= 400: return None return data async def paginated_get(session: aiohttp.ClientSession, path: str, params: dict | None = None) -> list: - """Fetch all pages of a paginated endpoint.""" - all_items = [] + """Fetch all pages of a paginated endpoint. + + Reads X-Total-Pages from the first response, then fetches all remaining pages in parallel. + """ p = dict(params or {}) p["per_page"] = 100 - page = 1 - while True: - p["page"] = page - data = await api_get_json(session, path, p) - if data is None or not isinstance(data, list) or len(data) == 0: - break - all_items.extend(data) - if len(data) < 100: - break - page += 1 - return all_items + p["page"] = 1 + + status, first_page, headers = await api_get(session, path, p) + if status == 0: + print(f"Warning: network error fetching {path}: {first_page}", file=sys.stderr) + return [] + if status >= 400: + print(f"Warning: HTTP {status} fetching {path}", file=sys.stderr) + return [] + + total_pages = int(headers.get("X-Total-Pages", 1)) + + if not isinstance(first_page, list) or total_pages <= 1: + if not isinstance(first_page, list): + print(f"Warning: expected list from {path}, got {type(first_page).__name__}", file=sys.stderr) + return first_page if isinstance(first_page, list) else [] + + async def fetch_page(n: int) -> list: + pp = {**p, "page": n} + data = await api_get_json(session, path, pp) + return data if isinstance(data, list) else [] + + rest = await asyncio.gather(*[fetch_page(n) for n in range(2, total_pages + 1)]) + return first_page + [item for page in rest for item in page] async def discover_pipeline(session: aiohttp.ClientSession, sha: str, timeout: int) -> int: @@ -107,24 +152,24 @@ async def discover_pipeline(session: aiohttp.ClientSession, sha: str, timeout: i deadline = time.monotonic() + timeout interval = 5 while True: - status, data = await api_get(session, f"/projects/{PROJECT_ID}/pipelines", {"sha": sha, "per_page": 20}) + status, data, _ = await api_get(session, f"/projects/{PROJECT_ID}/pipelines", {"sha": sha, "per_page": 20}) if status == 0: - print(f"Error: network error during pipeline discovery: {data}", file=sys.stderr) - sys.exit(1) - if status in (401, 403): - # already handled in api_get - sys.exit(1) + msg = f"Error: network error during pipeline discovery: {data}" + print(msg, file=sys.stderr) + raise GitLabError(msg) if status >= 400: - print(f"Error: HTTP {status} during pipeline discovery", file=sys.stderr) - sys.exit(1) + msg = f"Error: HTTP {status} during pipeline discovery" + print(msg, file=sys.stderr) + raise GitLabError(msg) if isinstance(data, list) and len(data) > 0: # Pick the most recently updated pipeline best = max(data, key=lambda p: p.get("updated_at", "")) print(f"Found pipeline {best['id']} (status: {best.get('status', 'unknown')})") return best["id"] if time.monotonic() >= deadline: - print(f"Error: no pipeline found for SHA {sha} within {timeout}s", file=sys.stderr) - sys.exit(1) + msg = f"Error: no pipeline found for SHA {sha} within {timeout}s" + print(msg, file=sys.stderr) + raise GitLabError(msg) remaining = deadline - time.monotonic() wait = min(interval, remaining) if wait > 0: @@ -132,64 +177,70 @@ async def discover_pipeline(session: aiohttp.ClientSession, sha: str, timeout: i await asyncio.sleep(wait) -async def get_child_pipelines(session: aiohttp.ClientSession, pipeline_id: int, depth: int = 0) -> list[int]: - """Recursively find all descendant pipeline IDs within this project (up to 3 levels).""" - if depth >= 3: - return [] - bridges = await paginated_get(session, f"/projects/{PROJECT_ID}/pipelines/{pipeline_id}/bridges") - child_ids = [] - for bridge in bridges: - downstream = bridge.get("downstream_pipeline") - if not downstream or not downstream.get("id"): - continue - if downstream.get("project_id") != PROJECT_ID: - continue # cross-project trigger — different project, skip - cid = downstream["id"] - child_ids.append(cid) - grandchildren = await get_child_pipelines(session, cid, depth + 1) - child_ids.extend(grandchildren) - return child_ids - - -async def get_all_pipeline_ids(session: aiohttp.ClientSession, root_id: int) -> list[int]: - """Get root + all descendant pipeline IDs.""" - children = await get_child_pipelines(session, root_id) - return [root_id] + children - - -async def fetch_all_jobs(session: aiohttp.ClientSession, pipeline_ids: list[int]) -> list[dict]: - """Fetch jobs for all pipelines in parallel.""" - tasks = [paginated_get(session, f"/projects/{PROJECT_ID}/pipelines/{pid}/jobs") for pid in pipeline_ids] - results = await asyncio.gather(*tasks) - all_jobs = [] - for job_list in results: - if job_list: - all_jobs.extend(job_list) - return all_jobs - - -async def fetch_pipeline_statuses(session: aiohttp.ClientSession, pipeline_ids: list[int]) -> dict[int, str]: - """Fetch status of each pipeline in parallel.""" - async def fetch_one(pid): - data = await api_get_json(session, f"/projects/{PROJECT_ID}/pipelines/{pid}") - if data and isinstance(data, dict): - return pid, data.get("status", "unknown") - return pid, "unknown" - - results = await asyncio.gather(*[fetch_one(pid) for pid in pipeline_ids]) - return dict(results) +async def discover_pipelines_and_jobs( + session: aiohttp.ClientSession, root_id: int +) -> tuple[list[int], dict[int, str], dict[int, list[dict]], dict[int, dict]]: + """Discover all pipelines and fetch all jobs in a single pass (no double bridge fetches). + + For each pipeline, regular jobs, bridges, and pipeline details are fetched in parallel. + Child pipelines are discovered from bridges and recursed into (up to 3 levels deep). + + Returns: + pipeline_ids: root + descendants in discovery order + pipeline_names: {child_pipeline_id: bridge_job_name} for display + jobs_by_pipeline: {pipeline_id: [jobs + bridges]} + pipeline_info: {pipeline_id: {"status": ..., "name": ...}} + """ + pipeline_ids: list[int] = [] + pipeline_names: dict[int, str] = {} + jobs_by_pipeline: dict[int, list[dict]] = {} + pipeline_info: dict[int, dict] = {} + + async def visit(pid: int, depth: int): + pipeline_ids.append(pid) + jobs, bridges, details = await asyncio.gather( + paginated_get(session, f"/projects/{PROJECT_ID}/pipelines/{pid}/jobs"), + paginated_get(session, f"/projects/{PROJECT_ID}/pipelines/{pid}/bridges"), + api_get_json(session, f"/projects/{PROJECT_ID}/pipelines/{pid}"), + ) + jobs_by_pipeline[pid] = list(jobs or []) + list(bridges or []) + if details and isinstance(details, dict): + name = details.get("name") or details.get("ref") or str(pid) + pipeline_info[pid] = {"status": details.get("status", "unknown"), "name": name} + else: + pipeline_info[pid] = {"status": "unknown", "name": str(pid)} + if depth >= 3: + return + child_tasks = [] + for bridge in (bridges or []): + downstream = bridge.get("downstream_pipeline") + if not downstream or not downstream.get("id"): + continue + if downstream.get("project_id") != PROJECT_ID: + continue # cross-project trigger — skip + cid = downstream["id"] + pipeline_names[cid] = bridge.get("name", str(cid)) + child_tasks.append(visit(cid, depth + 1)) + if child_tasks: + await asyncio.gather(*child_tasks) + + await visit(root_id, 0) + return pipeline_ids, pipeline_names, jobs_by_pipeline, pipeline_info def compute_duration(job: dict) -> int | None: - """Compute duration in seconds from created_at to finished_at.""" - created = job.get("created_at") + """Compute actual run duration in seconds for a job.""" + duration = job.get("duration") + if duration is not None and isinstance(duration, (int, float)): + return round(duration) + started = job.get("started_at") finished = job.get("finished_at") - if not created or not finished: + if not started or not finished: return None try: - t_created = datetime.fromisoformat(created.replace("Z", "+00:00")) - t_finished = datetime.fromisoformat(finished.replace("Z", "+00:00")) - return round((t_finished - t_created).total_seconds()) + t_started = datetime.fromisoformat(started) + t_finished = datetime.fromisoformat(finished) + return round((t_finished - t_started).total_seconds()) except (ValueError, TypeError): return None @@ -206,98 +257,401 @@ async def download_job_log(session: aiohttp.ClientSession, job_id: int, log_dir: log_path.write_text(text) +# --------------------------------------------------------------------------- +# GitHub Actions API helpers +# --------------------------------------------------------------------------- + +class GitHubAuthError(Exception): + """Raised when the GitHub API returns 401/403 — token missing or expired.""" + + +async def gh_api_get(session: aiohttp.ClientSession, path: str, params: dict | None = None) -> tuple[int, Any, dict]: + """Make a GET request to the GitHub API. Returns (status_code, data, headers). + + Raises GitHubAuthError on 401/403 so callers can abort retry loops immediately. + """ + url = f"{GITHUB_API_BASE}{path}" + try: + async with session.get(url, params=params, allow_redirects=True) as resp: + if resp.status in (401, 403): + text = await resp.text() + msg = f"GitHub auth error ({resp.status}): {text[:200]}" + print(f"Warning: {msg}", file=sys.stderr) + raise GitHubAuthError(msg) + headers = dict(resp.headers) + content_type = resp.headers.get("Content-Type", "") + if "json" in content_type: + return resp.status, await resp.json(), headers + return resp.status, await resp.text(), headers + except aiohttp.ClientError as e: + return 0, str(e), {} + + +async def gh_get_json(session: aiohttp.ClientSession, path: str, params: dict | None = None) -> Any: + """GitHub GET expecting JSON. Returns None on network/HTTP error; raises GitHubAuthError on 401/403.""" + status, data, _ = await gh_api_get(session, path, params) + if status == 0: + print(f"Warning: network error fetching GitHub {path}: {data}", file=sys.stderr) + return None + if status >= 400: + print(f"Warning: HTTP {status} fetching GitHub {path}", file=sys.stderr) + return None + return data + + +async def gh_paginated_get_field(session: aiohttp.ClientSession, path: str, field: str, params: dict | None = None) -> list: + """Fetch all pages from a GitHub endpoint that wraps items in a field (e.g. 'jobs').""" + p = dict(params or {}) + p["per_page"] = 100 + results = [] + page = 1 + while True: + p["page"] = page + data = await gh_get_json(session, path, p) + if not data or not isinstance(data, dict): + break + items = data.get(field, []) + if not isinstance(items, list): + break + results.extend(items) + total_count = data.get("total_count") + if total_count is not None and len(results) >= total_count: + break + if len(items) < p["per_page"]: + break + page += 1 + return results + + +async def gh_discover_runs(session: aiohttp.ClientSession, sha: str, timeout: int) -> list[dict]: + """Poll GitHub until at least one workflow run is found for the given SHA. + + Returns an empty list (with a warning) if none are found within the timeout. + Raises GitHubAuthError immediately on 401/403. + """ + deadline = time.monotonic() + timeout + interval = 5 + while True: + data = await gh_get_json( + session, + f"/repos/{GITHUB_REPO}/actions/runs", + {"head_sha": sha, "per_page": 100} + ) + if isinstance(data, dict) and data.get("workflow_runs"): + runs = data["workflow_runs"] + print(f"Found {len(runs)} GitHub Actions workflow run(s) for {sha[:12]}") + return runs + if time.monotonic() >= deadline: + print(f"Warning: no GitHub Actions runs found for {sha[:12]} within {timeout}s", file=sys.stderr) + return [] + remaining = deadline - time.monotonic() + wait = min(interval, remaining) + if wait > 0: + await asyncio.sleep(wait) + + +async def gh_get_all_runs(session: aiohttp.ClientSession, sha: str) -> list[dict]: + """Fetch the current list of workflow runs for a SHA (single call, no polling).""" + data = await gh_get_json( + session, + f"/repos/{GITHUB_REPO}/actions/runs", + {"head_sha": sha, "per_page": 100} + ) + if data and isinstance(data, dict): + return data.get("workflow_runs", []) + return [] + + +async def gh_get_run_jobs(session: aiohttp.ClientSession, run_id: int) -> list[dict]: + """Fetch all jobs for a GitHub Actions run.""" + return await gh_paginated_get_field(session, f"/repos/{GITHUB_REPO}/actions/runs/{run_id}/jobs", "jobs") + + +async def gh_download_job_log(session: aiohttp.ClientSession, job_id: int, log_dir: Path): + """Download the log for a GitHub Actions job.""" + log_path = log_dir / f"gh_{job_id}.log" + status, data, _ = await gh_api_get(session, f"/repos/{GITHUB_REPO}/actions/jobs/{job_id}/logs") + if status == 0 or status >= 400 or not isinstance(data, str): + msg = f"Failed to download GitHub log for job {job_id}" + print(f"Warning: {msg}", file=sys.stderr) + log_path.write_text(msg) + else: + log_path.write_text(data) + + +# --------------------------------------------------------------------------- +# list-jobs mode +# --------------------------------------------------------------------------- + async def list_jobs(session: aiohttp.ClientSession, root_id: int): - """List all jobs grouped by pipeline, then exit.""" - pipeline_ids = await get_all_pipeline_ids(session, root_id) - pipeline_statuses = await fetch_pipeline_statuses(session, pipeline_ids) + """List all GitLab jobs grouped by pipeline, then exit.""" + pipeline_ids, pipeline_names, jobs_by_pipeline, pipeline_statuses = await discover_pipelines_and_jobs(session, root_id) - # Fetch jobs for all pipelines individually so we can group them - jobs_by_pipeline: dict[int, list[dict]] = {} - tasks = [] for pid in pipeline_ids: - tasks.append(paginated_get(session, f"/projects/{PROJECT_ID}/pipelines/{pid}/jobs")) - results = await asyncio.gather(*tasks) - for pid, job_list in zip(pipeline_ids, results): - jobs_by_pipeline[pid] = sorted(job_list or [], key=lambda j: j.get("name", "")) + jobs_by_pipeline[pid] = sorted(jobs_by_pipeline.get(pid, []), key=lambda j: j.get("name", "")) for pid in pipeline_ids: - status = pipeline_statuses.get(pid, "unknown") + info = pipeline_statuses.get(pid, {"status": "unknown", "name": str(pid)}) + display_name = pipeline_names.get(pid, info["name"]) jobs = jobs_by_pipeline.get(pid, []) - print(f"\nPipeline {pid} (status: {status}):") + print(f"\nPipeline {pid} '{display_name}' (status: {info['status']}):") for job in jobs: job_status = job.get("status", "unknown") job_name = job.get("name", "") - print(f" {job_status:<10}{job_name}") + job_id = job.get("id", "") + print(f" {job_status:<10}({job_id}) {job_name}") -async def run(args): - token = os.environ.get("GITLAB_PERSONAL_ACCESS_TOKEN", "").strip() - if not token: - print("Error: GITLAB_PERSONAL_ACCESS_TOKEN environment variable is not set or empty", file=sys.stderr) - sys.exit(1) +async def list_github_jobs(session: aiohttp.ClientSession, sha: str): + """List GitHub Actions workflow runs and their jobs for a SHA.""" + try: + runs = await gh_discover_runs(session, sha, timeout=15) + except GitHubAuthError: + print("\n(Skipping GitHub Actions — authentication failed)") + return + if not runs: + print("\n(No GitHub Actions workflow runs found)") + return + + for run in runs: + run_id = run["id"] + run_name = run.get("name", str(run_id)) + run_status = run.get("status", "unknown") + run_conclusion = run.get("conclusion") or "" + status_str = run_status if not run_conclusion else f"{run_status}, conclusion: {run_conclusion}" + jobs = await gh_get_run_jobs(session, run_id) + print(f"\nGitHub Actions run {run_id} '{run_name}' (status: {status_str}):") + for job in sorted(jobs, key=lambda j: j.get("name", "")): + job_status = job.get("status", "unknown") + job_conclusion = job.get("conclusion") or "" + display = job_conclusion if job_status == "completed" else job_status + job_name = job.get("name", "") + job_id = job.get("id", "") + print(f" {display:<10}({job_id}) {job_name}") - connector = aiohttp.TCPConnector(limit=20) - headers = {"PRIVATE-TOKEN": token} - async with aiohttp.ClientSession(headers=headers, connector=connector) as session: - # Determine root pipeline ID - if args.pipeline is not None: - root_id = args.pipeline - print(f"Using pipeline {root_id}") - else: - ref = args.commit if args.commit else "HEAD" - sha = resolve_sha(ref) - print(f"Resolved {ref} to {sha}") - root_id = await discover_pipeline(session, sha, args.discovery_timeout) - if args.list_jobs: - await list_jobs(session, root_id) - return +# --------------------------------------------------------------------------- +# Main monitoring loop +# --------------------------------------------------------------------------- - # Set up working directory - wdir = Path(f"/tmp/gitlab_{root_id}") - wdir.mkdir(parents=True, exist_ok=True) - fail_log_dir = wdir / "fail_logs" - fail_log_dir.mkdir(exist_ok=True) - print(f"Working directory: {wdir}") +async def run(args): + try: + await _run(args) + except GitLabError: + sys.exit(1) - seen_success: set[int] = set() - seen_failure: set[int] = set() - failure_count = 0 - deadline = time.monotonic() + args.timeout - while True: - # Discover all pipeline IDs - pipeline_ids = await get_all_pipeline_ids(session, root_id) +async def _run(args): + token = os.environ.get("GITLAB_PERSONAL_ACCESS_TOKEN", "").strip() + if not token: + print("Error: GITLAB_PERSONAL_ACCESS_TOKEN environment variable is not set or empty", file=sys.stderr) + sys.exit(1) - # Fetch all jobs in parallel - all_jobs = await fetch_all_jobs(session, pipeline_ids) + # Resolve SHA for GitHub Actions (not available when --pipeline is given) + sha: str | None = None + if args.pipeline is None: + ref = args.commit if args.commit else "HEAD" + sha = resolve_sha(ref) + print(f"Resolved {ref} to {sha}") + + # Attempt to get GitHub token + github_token = get_github_token() + if sha and not github_token: + print("Warning: could not get GitHub token via 'ddtool auth github token' — skipping GitHub Actions monitoring", file=sys.stderr) + + gl_connector = aiohttp.TCPConnector(limit=20) + gl_headers = {"PRIVATE-TOKEN": token} + + gh_session: aiohttp.ClientSession | None = None + gh_connector: aiohttp.TCPConnector | None = None + + async with aiohttp.ClientSession(headers=gl_headers, connector=gl_connector) as gl_session: + # Open GitHub session if we have a token and SHA + if sha and github_token: + gh_connector = aiohttp.TCPConnector(limit=10) + gh_headers = { + "Authorization": f"Bearer {github_token}", + "Accept": "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + } + gh_session = aiohttp.ClientSession(headers=gh_headers, connector=gh_connector) + + try: + await _monitor(args, sha, gl_session, gh_session) + finally: + if gh_session: + await gh_session.close() + if gh_connector: + await gh_connector.close() + + +async def _monitor(args, sha: str | None, gl_session: aiohttp.ClientSession, gh_session: aiohttp.ClientSession | None): + # Determine root GitLab pipeline ID + if args.pipeline is not None: + root_id = args.pipeline + print(f"Using pipeline {root_id}") + else: + root_id = await discover_pipeline(gl_session, sha, args.discovery_timeout) + + if args.list_jobs: + await list_jobs(gl_session, root_id) + if gh_session and sha: + await list_github_jobs(gh_session, sha) + return + + # Set up working directory + wdir = Path(f"/tmp/gitlab_{root_id}") + wdir.mkdir(parents=True, exist_ok=True) + fail_log_dir = wdir / "fail_logs" + fail_log_dir.mkdir(exist_ok=True) + print(f"Working directory: {wdir}") + + seen_success: set[int] = set() + seen_failure: set[int] = set() # GitLab + failure_count = 0 + deadline = time.monotonic() + args.timeout + + # GitHub state + gh_seen_failure: set[int] = set() # GitHub job IDs that failed + gh_seen_job: set[int] = set() # all GitHub job IDs we've reported + gh_monitoring = gh_session is not None and sha is not None + # Discover initial GitHub runs (with discovery_timeout) + gh_known_run_ids: set[int] = set() + if gh_monitoring: + try: + initial_runs = await gh_discover_runs(gh_session, sha, args.discovery_timeout) + except GitHubAuthError: + print("Skipping GitHub Actions monitoring — authentication failed.", file=sys.stderr) + initial_runs = [] + gh_monitoring = False + if gh_monitoring: + for r in initial_runs: + gh_known_run_ids.add(r["id"]) + if not initial_runs: + print("No GitHub Actions runs found; monitoring GitLab only.") + gh_monitoring = False + + async def _gh_get_runs_safe() -> list[dict] | None: + """Fetch current GH runs; returns None on auth failure (already printed).""" + try: + return await gh_get_all_runs(gh_session, sha) + except GitHubAuthError: + return None - # Process new successes - success_path = wdir / "success.txt" + while True: + # Phase 1: fetch GitLab pipeline tree and GitHub run list in parallel. + if gh_monitoring: + gl_result, gh_runs_raw = await asyncio.gather( + discover_pipelines_and_jobs(gl_session, root_id), + _gh_get_runs_safe(), + ) + if gh_runs_raw is None: + print("GitHub authentication failed mid-run; stopping GitHub Actions monitoring.", file=sys.stderr) + gh_monitoring = False + all_runs: list[dict] = [] + else: + all_runs = gh_runs_raw + else: + gl_result = await discover_pipelines_and_jobs(gl_session, root_id) + all_runs = [] + + pipeline_ids, _, jobs_by_pipeline, pipeline_statuses = gl_result + all_jobs = [job for jobs in jobs_by_pipeline.values() for job in jobs] + + # Phase 2: fetch GH run-jobs while we can (no GL dependency); process GL jobs locally. + if gh_monitoring: + for r in all_runs: + gh_known_run_ids.add(r["id"]) + run_ids_ordered = list(gh_known_run_ids) + gh_jobs_future = asyncio.gather(*[gh_get_run_jobs(gh_session, rid) for rid in run_ids_ordered]) + else: + run_ids_ordered = [] + gh_jobs_future = None + + # --- Process GitLab jobs --- + success_path = wdir / "success.txt" + failure_path = wdir / "failure.txt" + new_failure_jobs = [] + new_allow_fail_jobs = [] + running = 0 + passed = 0 + failed = 0 + allow_fail = 0 + with open(success_path, "a") as success_f: for job in all_jobs: jid = job["id"] - if job.get("status") == "success" and jid not in seen_success: - with open(success_path, "a") as f: - f.write(f"{jid}\t{job.get('name', '')}\n") - seen_success.add(jid) - - # Process new failures - failure_path = wdir / "failure.txt" - new_failure_jobs = [] - for job in all_jobs: + status = job.get("status") + if status == "success": + passed += 1 + if jid not in seen_success: + success_f.write(f"{jid}\t{job.get('name', '')}\n") + seen_success.add(jid) + elif status == "failed": + if job.get("allow_failure", False): + allow_fail += 1 + if jid not in seen_failure: + new_allow_fail_jobs.append(job) + else: + failed += 1 + if jid not in seen_failure: + new_failure_jobs.append(job) + elif status in ACTIVE_STATUSES: + running += 1 + + # Phase 3: wait for GH run-jobs, then download any new failure logs in parallel. + if gh_jobs_future is not None: + run_jobs_list = await gh_jobs_future + else: + run_jobs_list = [] + + # --- Process GitHub jobs --- + gh_running = 0 + gh_passed = 0 + gh_failed_count = 0 + gh_all_done = True + gh_new_failures: list[tuple[dict, dict]] = [] + gh_fail_log_dir = wdir / "gh_fail_logs" + + runs_by_id = {r["id"]: r for r in all_runs} + for rid, run_jobs in zip(run_ids_ordered, run_jobs_list): + run = runs_by_id.get(rid, {"id": rid, "status": "unknown", "name": str(rid)}) + if run.get("status", "unknown") != "completed": + gh_all_done = False + for job in run_jobs: jid = job["id"] - if job.get("status") == "failed" and jid not in seen_failure: - new_failure_jobs.append(job) - - # Download logs in parallel for new failures - if new_failure_jobs: - await asyncio.gather(*[download_job_log(session, job["id"], fail_log_dir) for job in new_failure_jobs]) - + job_status = job.get("status", "unknown") + conclusion = job.get("conclusion") or "" + if job_status == "completed": + if conclusion in ("success", "skipped"): + gh_passed += 1 + elif conclusion in GH_FAILURE_CONCLUSIONS: + gh_failed_count += 1 + if jid not in gh_seen_failure: + gh_new_failures.append((run, job)) + elif job_status in ("queued", "in_progress", "waiting"): + gh_running += 1 + gh_all_done = False + + # Phase 4: download GL and GH failure logs in parallel. + download_coros = [ + download_job_log(gl_session, job["id"], fail_log_dir) + for job in new_failure_jobs + if "downstream_pipeline" not in job # bridge/trigger jobs have no trace log + ] + if gh_new_failures: + gh_fail_log_dir.mkdir(exist_ok=True) + download_coros += [ + gh_download_job_log(gh_session, job["id"], gh_fail_log_dir) + for _, job in gh_new_failures + ] + if download_coros: + await asyncio.gather(*download_coros) + + with open(failure_path, "a") as failure_f: for job in new_failure_jobs: jid = job["id"] name = job.get("name", "") - with open(failure_path, "a") as f: - f.write(f"{jid}\t{name}\n") + failure_f.write(f"{jid}\t{name}\n") duration = compute_duration(job) if duration is not None: print(f'FAILED: ({jid}) "{name}" in {duration} secs') @@ -306,37 +660,57 @@ async def run(args): seen_failure.add(jid) failure_count += 1 - # Check failure threshold - if failure_count >= args.max_failures: - print(f"Stopping script after maximum number of failures ({failure_count}) was encountered") - sys.exit(1) - - # Status line - now = datetime.now().strftime("%H:%M:%S") - total_jobs = len(all_jobs) - running = sum(1 for j in all_jobs if j.get("status") in ACTIVE_STATUSES) - passed = sum(1 for j in all_jobs if j.get("status") == "success") - failed = sum(1 for j in all_jobs if j.get("status") == "failed") - print(f"[{now}] pipelines={len(pipeline_ids)} jobs={total_jobs} running={running} passed={passed} failed={failed}") - - # Check if all pipelines are done - pipeline_statuses = await fetch_pipeline_statuses(session, pipeline_ids) - all_pipelines_done = all(s in DONE_STATUSES for s in pipeline_statuses.values()) - no_active_jobs = all(j.get("status") not in ACTIVE_STATUSES for j in all_jobs) - - if all_pipelines_done and no_active_jobs: - if failure_count == 0: - print("All pipelines completed successfully.") - sys.exit(0) + for run, job in gh_new_failures: + jid = job["id"] + run_name = run.get("name", "") + job_name = job.get("name", "") + full_name = f"[GH] {run_name} / {job_name}" + failure_f.write(f"{jid}\t{full_name}\n") + duration = compute_duration(job) + if duration is not None: + print(f'FAILED: ({jid}) "{full_name}" in {duration} secs') else: - print(f"All pipelines completed with {failure_count} failure(s).") - sys.exit(1) + print(f'FAILED: ({jid}) "{full_name}"') + gh_seen_failure.add(jid) + failure_count += 1 + + for job in new_allow_fail_jobs: + seen_failure.add(job["id"]) - if time.monotonic() >= deadline: - print(f"Timeout after {args.timeout}s — pipelines still running.", file=sys.stderr) - sys.exit(2) + if failure_count >= args.max_failures: + print(f"Stopping script after maximum number of failures ({failure_count}) was encountered") + sys.exit(1) + + all_gl_done = all(info["status"] in DONE_STATUSES for info in pipeline_statuses.values()) + gl_running_count = running + + # Status line + now = datetime.now().strftime("%H:%M:%S") + if gh_monitoring: + print( + f"[{now}] gl_pipelines={len(pipeline_ids)} gl_jobs={len(all_jobs)} gl_running={gl_running_count} gl_passed={passed} gl_failed={failed} gl_allow_fail={allow_fail}" + f" | gh_running={gh_running} gh_passed={gh_passed} gh_failed={gh_failed_count}" + ) + else: + print(f"[{now}] pipelines={len(pipeline_ids)} jobs={len(all_jobs)} running={gl_running_count} passed={passed} failed={failed} allow_fail={allow_fail}") + + # Check completion + all_done = all_gl_done and (gl_running_count == 0) and (not gh_monitoring or gh_all_done) + + if all_done: + total_failures = failure_count + if total_failures == 0: + print("All pipelines completed successfully.") + sys.exit(0) + else: + print(f"All pipelines completed with {total_failures} failure(s).") + sys.exit(1) + + if time.monotonic() >= deadline: + print(f"Timeout after {args.timeout}s — pipelines still running.", file=sys.stderr) + sys.exit(2) - await asyncio.sleep(args.poll_interval) + await asyncio.sleep(args.poll_interval) def main(): diff --git a/.claude/ci/index.md b/.claude/ci/index.md index 9e7e4de21b..2f90d8ce94 100644 --- a/.claude/ci/index.md +++ b/.claude/ci/index.md @@ -182,17 +182,22 @@ curl -s -H "PRIVATE-TOKEN: $GITLAB_PERSONAL_ACCESS_TOKEN" \ ### Checking CI (Gitlab) Use `.claude/ci/check-ci` to follow a pipeline until all jobs complete. +When invoked with `--commit` (or defaulting to HEAD), it monitors both the GitLab +pipeline **and** any GitHub Actions workflows for the same commit. GitHub monitoring +requires `ddtool auth github login --org DataDog` to be configured; if the token is +unavailable, a warning is printed and only GitLab is monitored. Results are written to `/tmp/gitlab_/`: - `success.txt` — `\t` per line -- `failure.txt` — same format for failed jobs -- `fail_logs/.log` — full job trace for each failure +- `failure.txt` — same format for failed jobs (GitLab and GitHub; GitHub entries are prefixed `[GH]`) +- `fail_logs/.log` — full job trace for each GitLab failure +- `gh_fail_logs/gh_.log` — log for each GitHub Actions job failure Exit codes: 0 = all passed, 1 = failures or threshold reached. #### Invocation pattern -Available options: `--commit ` OR `--pipeline `, +Available options: `--commit ` OR `--pipeline ` (GitLab only, skips GitHub), `--discovery-timeout ` (default 60), `--poll-interval ` (default 60), `--max-failures ` (default 50), `--timeout ` (default 7200 = 2 h), `--list-jobs` (see below). @@ -200,8 +205,9 @@ Available options: `--commit ` OR `--pipeline `, ##### `--list-jobs` Prints all jobs grouped by pipeline with their status, then exits -immediately — does not monitor or download logs. Useful for a quick -snapshot of what ran and what failed: +immediately — does not monitor or download logs. Shows both GitLab pipelines +and GitHub Actions workflow runs. Useful for a quick snapshot of what ran and +what failed: ```bash .claude/ci/check-ci --commit HEAD --list-jobs @@ -214,6 +220,11 @@ Pipeline 105413994 (status: failed): failed test_extension_ci: [7.2] success compile extension: debug [8.3] ... + +GitHub Actions run 12345678 'Profiling ASAN Tests' (status: completed, conclusion: failure): + failure prof-asan (8.5, ubuntu-8-core-latest) + success prof-asan (8.3, ubuntu-8-core-latest) + ... ``` #### Monitor CI