Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CLI-COMMANDS.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ roboflow workflow version list my-workflow
roboflow workflow fork other-ws/their-workflow
```

### Fork a Universe project (async)

```bash
# Fork a public Universe project into the default (or --workspace) workspace.
# By default this blocks until the async task completes (up to --timeout seconds).
roboflow project fork https://universe.roboflow.com/leo-ueno-uduc7/license-plate-recognition
roboflow project fork leo-ueno-uduc7/license-plate-recognition --workspace my-ws

# Return immediately with a {taskId, url} payload instead of waiting.
roboflow project fork leo-ueno-uduc7/license-plate-recognition --no-wait

# Poll the resulting task later (works for any async task that returns a taskId).
roboflow asynctasks get <task-id>
roboflow asynctasks wait <task-id> --timeout 600
```

### Create a dataset version

```bash
Expand Down Expand Up @@ -259,6 +275,7 @@ Version numbers are always numeric — that's how `x/y` is disambiguated between
| `workflow` | Manage workflows |
| `folder` | Manage workspace folders |
| `annotation` | Annotation batches and jobs |
| `asynctasks` | Inspect async background tasks (e.g. project forks) |
| `trash` | List items in Trash |
| `universe` | Search Roboflow Universe |
| `video` | Video inference |
Expand Down
66 changes: 66 additions & 0 deletions roboflow/adapters/rfapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import urllib
from typing import Dict, List, Optional, Union
from urllib.parse import quote

import requests
from requests.exceptions import RequestException
Expand Down Expand Up @@ -813,6 +814,71 @@ def list_workflow_versions(api_key, workspace_url, workflow_url):
return response.json()


def fork_project(
api_key,
dest_workspace,
*,
url=None,
source_project_slug=None,
):
"""POST /{ws}/projects/fork — enqueue an async fork of a public Universe project.

Pass ``url`` (a Universe URL) or an explicit ``source_project_slug``. The
API owns parsing/validation. Returns the server's response, e.g.
``{"taskId": "...", "url": "<polling url>"}``.
"""
payload: Dict[str, str] = {}
if url:
payload["url"] = url
if source_project_slug:
payload["source_project"] = source_project_slug
response = requests.post(
f"{API_URL}/{dest_workspace}/projects/fork",
params={"api_key": api_key},
json=payload,
)
if not response.ok:
raise RoboflowError(response.text)
return response.json()


def get_async_task(api_key, workspace_url, task_id):
"""GET /{ws}/asynctasks/{id} — fetch the current status of an async task.

Returns the server's status payload, e.g.
``{"taskId": "...", "status": "running", "progress": {...}}`` or
``{"taskId": "...", "status": "completed", "result": {...}}`` once
terminal. Raises ``RoboflowError`` for any non-2xx response (including
404 for unknown ids or cross-workspace probes).
"""
# ``task_id`` comes from arbitrary external input; encode so a stray
# ``/``, ``?`` or ``#`` cannot mutate the request path (and still send
# the api_key with it).
encoded_task_id = quote(task_id, safe="")
response = requests.get(
f"{API_URL}/{workspace_url}/asynctasks/{encoded_task_id}",
params={"api_key": api_key},
)
if response.status_code != 200:
raise RoboflowError(response.text)
return response.json()


def get_async_task_at(api_key, polling_url):
"""GET an async-task polling URL returned verbatim by the server.

Enqueue endpoints (e.g. ``/{ws}/projects/fork``) return a fully-qualified
``url`` alongside ``taskId``. The host may differ from ``API_URL`` (e.g.
local dev against ``localapi.roboflow.one``), so hit it directly and
only attach the api_key. Falls back to ``get_async_task`` callers when
no server-supplied URL is available.
"""
response = requests.get(polling_url, params={"api_key": api_key})
if response.status_code != 200:
raise RoboflowError(response.text)
return response.json()


def fork_workflow(api_key, workspace_url, *, source_workspace, source_workflow, name=None, url=None):
"""POST /{ws}/forkWorkflow — fork a workflow into this workspace.

Expand Down
2 changes: 2 additions & 0 deletions roboflow/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def _walk(group: Any, prefix: str = "") -> None:
# ---------------------------------------------------------------------------

from roboflow.cli.handlers.annotation import annotation_app # noqa: E402
from roboflow.cli.handlers.asynctasks import asynctasks_app # noqa: E402
from roboflow.cli.handlers.auth import auth_app # noqa: E402
from roboflow.cli.handlers.batch import batch_app # noqa: E402
from roboflow.cli.handlers.completion import completion_app # noqa: E402
Expand All @@ -207,6 +208,7 @@ def _walk(group: Any, prefix: str = "") -> None:

# Register ALL commands in alphabetical order for clean --help output
app.add_typer(annotation_app, name="annotation")
app.add_typer(asynctasks_app, name="asynctasks")
app.add_typer(auth_app, name="auth")
app.add_typer(batch_app, name="batch", hidden=True) # All stubs — hidden until implemented
app.add_typer(completion_app, name="completion")
Expand Down
136 changes: 136 additions & 0 deletions roboflow/cli/handlers/asynctasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""Async task polling commands.

These mirror the generic ``GET /:workspace/asynctasks/:id`` endpoint so any
backend operation that returns ``{taskId, url}`` can be inspected with the
same CLI tools.
"""

from __future__ import annotations

from typing import Annotated

import typer

from roboflow.cli._compat import SortedGroup, ctx_to_args

asynctasks_app = typer.Typer(
cls=SortedGroup,
help="Inspect async background tasks (e.g. project forks)",
no_args_is_help=True,
)


@asynctasks_app.command("get")
def get_async_task(
ctx: typer.Context,
task_id: Annotated[str, typer.Argument(help="Async task id (returned by /projects/fork etc.)")],
) -> None:
"""Show the current status of an async task."""
args = ctx_to_args(ctx, task_id=task_id)
_get_async_task(args)


@asynctasks_app.command("wait")
def wait_async_task(
ctx: typer.Context,
task_id: Annotated[str, typer.Argument(help="Async task id")],
timeout: Annotated[
int,
typer.Option("--timeout", help="Seconds to wait for completion (0 = no timeout)."),
] = 1800,
) -> None:
"""Block until an async task is completed or failed."""
args = ctx_to_args(ctx, task_id=task_id, timeout=timeout)
_wait_async_task(args)


# ---------------------------------------------------------------------------
# Business logic
# ---------------------------------------------------------------------------


def _resolve_ws_and_key(args): # noqa: ANN001
from roboflow.cli._output import output_error
from roboflow.cli._resolver import resolve_default_workspace
from roboflow.config import load_roboflow_api_key

workspace_url = args.workspace or resolve_default_workspace(api_key=args.api_key)
if not workspace_url:
output_error(
args,
"No workspace specified.",
hint="Use --workspace or run 'roboflow auth login'.",
exit_code=2,
)
return None, None
api_key = args.api_key or load_roboflow_api_key(workspace_url)
if not api_key:
output_error(
args,
"No API key found.",
hint="Set ROBOFLOW_API_KEY or run 'roboflow auth login'.",
exit_code=2,
)
return None, None
return workspace_url, api_key


def _get_async_task(args): # noqa: ANN001
from roboflow.adapters import rfapi
from roboflow.cli._output import output, output_error

workspace_url, api_key = _resolve_ws_and_key(args)
if not api_key:
return

try:
status = rfapi.get_async_task(api_key, workspace_url, args.task_id)
except rfapi.RoboflowError as exc:
# Server returns 404 for unknown ids OR cross-workspace probes.
output_error(args, str(exc), exit_code=3)
return

output(args, status, text=f"taskId={status.get('taskId')} status={status.get('status')}")


def _wait_async_task(args): # noqa: ANN001
from roboflow.adapters import rfapi
from roboflow.cli._output import output, output_error
from roboflow.core.async_tasks import poll_until_terminal

workspace_url, api_key = _resolve_ws_and_key(args)
if not api_key:
return

def _print_progress(status): # noqa: ANN001
if args.json:
return
progress = status.get("progress")
if not isinstance(progress, dict):
return
# Don't use `or` here: `current == 0` is a legitimate value.
current = progress["current"] if "current" in progress else progress.get("completed")
total = progress.get("total")
if current is not None and total is not None:
print(f"Task progress: {current}/{total}", flush=True)

try:
final = poll_until_terminal(
api_key,
workspace_url,
args.task_id,
timeout=args.timeout,
on_update=_print_progress,
)
except rfapi.RoboflowError as exc:
output_error(args, str(exc), exit_code=3)
return
except TimeoutError as exc:
output_error(args, str(exc))
return

if final.get("status") == "failed":
output_error(args, final.get("error") or "Task failed.")
return

output(args, final, text=f"taskId={final.get('taskId')} status={final.get('status')}")
113 changes: 113 additions & 0 deletions roboflow/cli/handlers/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ def restore_project(
_restore_project(args)


@project_app.command("fork")
def fork_project(
ctx: typer.Context,
source: Annotated[
str,
typer.Argument(help="Source project: Universe URL or '<workspace>/<project>' shorthand."),
],
no_wait: Annotated[
bool,
typer.Option("--no-wait", help="Return immediately with the taskId instead of waiting."),
] = False,
timeout: Annotated[
int,
typer.Option("--timeout", help="Seconds to wait for completion (0 = no timeout)."),
] = 1800,
) -> None:
"""Fork a public Universe project into a workspace."""
args = ctx_to_args(ctx, source=source, no_wait=no_wait, timeout=timeout)
_fork_project(args)


@project_app.command("health")
def health_project(
ctx: typer.Context,
Expand Down Expand Up @@ -352,6 +373,98 @@ def _restore_project(args): # noqa: ANN001
output(args, data, text=f"Restored {workspace_url}/{project_slug} from Trash.")


def _fork_project(args): # noqa: ANN001
from roboflow.adapters import rfapi
from roboflow.cli._output import output, output_error
from roboflow.cli._resolver import resolve_default_workspace
from roboflow.config import load_roboflow_api_key
from roboflow.core.async_tasks import poll_until_terminal

# The server accepts the full URL (or `<ws>/<proj>` shorthand) as `url`
# and parses it itself — forward verbatim so the CLI doesn't duplicate
# that logic.
source = (args.source or "").strip()
if not source:
output_error(
args,
"Source is required.",
hint="Use '<workspace>/<project>' or a Universe URL.",
)
return

dest_workspace = args.workspace or resolve_default_workspace(api_key=args.api_key)
if not dest_workspace:
output_error(
args,
"No workspace specified.",
hint="Use --workspace or run 'roboflow auth login'.",
exit_code=2,
)
return

api_key = args.api_key or load_roboflow_api_key(dest_workspace)
if not api_key:
output_error(
args,
"No API key found.",
hint="Set ROBOFLOW_API_KEY or run 'roboflow auth login'.",
exit_code=2,
)
return

try:
enqueued = rfapi.fork_project(api_key, dest_workspace, url=source)
except rfapi.RoboflowError as exc:
output_error(args, str(exc))
return

task_id = enqueued["taskId"]

if args.no_wait:
polling_url = enqueued.get("url")
text = f"Fork enqueued: taskId={task_id}"
if polling_url:
text += f"\nPoll: {polling_url}"
output(args, enqueued, text=text)
return

def _print_progress(status): # noqa: ANN001
if args.json:
return
progress = status.get("progress")
if not isinstance(progress, dict):
return
# Don't use `or` here: `current == 0` is a legitimate value.
current = progress["current"] if "current" in progress else progress.get("completed")
total = progress.get("total")
if current is not None and total is not None:
print(f"Task progress: {current}/{total}", flush=True)

try:
final = poll_until_terminal(
api_key,
dest_workspace,
task_id,
timeout=args.timeout,
on_update=_print_progress,
polling_url=enqueued.get("url"),
)
except rfapi.RoboflowError as exc:
output_error(args, str(exc))
return
except TimeoutError as exc:
output_error(args, str(exc))
return

if final.get("status") == "failed":
output_error(args, final.get("error") or "Fork task failed.")
return

project_url = (final.get("result") or {}).get("url", "")
text = f"Forked.\nDestination URL: {project_url}" if project_url else "Forked."
output(args, final, text=text)


def _health_project(args): # noqa: ANN001
import json

Expand Down
Loading
Loading