Major refactor of the backend, queue, activity, worker, and database layers. If you're upgrading from 0.1.x, read the Breaking Changes section closely.
Fully async database layer
configure_engine()andget_session()now require an async SQLAlchemy engine (AsyncEngine) and returnAsyncSession- Database URLs must use async drivers (e.g.
sqlite+aiosqlite://,postgresql+asyncpg://) sqlalchemy[asyncio]is now a core dependency
Async activity API
- All activity functions are async:
await ax.activity.create(...),await ax.activity.update(...),await ax.activity.complete(...),await ax.activity.error(...) activity.list(),activity.detail(), andactivity.count_active()are async and acceptAsyncSession- Activity handlers are async (
async def __call__) - The
sessionparameter was removed from activity mutations — the handler owns its own session lifecycle
Pool entry point
pool.run()was removed. Useawait pool.start()in an asyncio loop, or the newagentexec run mymodule:poolCLIAGENTEXEC_QUEUE_NAMErenamed toAGENTEXEC_QUEUE_PREFIX(old name still accepted as alias)agentexec.state.redis_backendrenamed toagentexec.state.redis— updateAGENTEXEC_STATE_BACKENDif set explicitly
Task context serialization
Task.contextis nowMapping[str, Any](raw dict), not a typed BaseModel — hydration happens at execution timeTask.create()is now async
Queue backend protocol
BaseQueueBackend.push()signature changed fromhigh_priority: booltopriority: Priority | None— affects Redis, Kafka, and any custom queue backend
Removed APIs
set_global_session/get_global_session/remove_global_session— useconfigure_engine/get_sessionstate.backend.publish/subscribe(pubsub),index_add/index_range/index_remove,clear,configureworker/logging.pyandcore/logging.py— all modules use stdliblogging.getLogger(__name__)directly
CLI entrypoint
- New
agentexecCLI command:agentexec run mymodule:pool --create-tables --workers 4
Partitioned Redis queues
- Tasks with
lock_keyroute to dedicated partition queues with per-partition locking and SCAN-based fair dequeue
Activity handler pattern
- Pluggable persistence via
PostgresHandler(default) andIPCHandler(worker processes)
Task retry
- Failed tasks requeue as high priority with
AGENTEXEC_MAX_TASK_RETRIES(default 3)
Kafka backend (experimental)
pip install agentexec[kafka]for queue and schedule via Kafka
Typed worker IPC
TaskFailedandActivityEventmessages flow overmultiprocessing.Queuewith pydantic validation
Schedule composite keys
{task_name}:{cron}:{context_hash}for unique schedule identity
Activity model create() classmethod
Activity.create()encapsulates record + initial log entry creation in one async call
Async engine disposal
dispose_engine()ensures the async engine's background threads exit cleanly on shutdown
Worker pool refactor
- Workers use the
spawnmultiprocessing start method with explicit context — no inherited state - Event handling and scheduling extracted into
_EventHandlerand_Schedulerclasses StateEventreplaced with stdlibmultiprocessing.Event— removes dependency on the state backend for shutdown coordination- Class-based backend architecture with ABCs (
BaseStateBackend,BaseQueueBackend,BaseScheduleBackend) Taskis pure data,TaskDefinitionowns behavior- Status enum extracted to
activity/status.py(no SQLAlchemy dependency)
Logging
- All modules use stdlib
logging.getLogger(__name__) - Spawned workers bootstrap a
StreamHandleron the root logger so logs reach stderr - Pool messages use
logger.info/logger.errorinstead ofprint()
- Orphaned worker processes on shutdown. SIGTERM (systemd/docker stop), SIGKILL, and SIGHUP were leaving worker processes running. Fixed via an asyncio SIGTERM handler in the CLI and
prctl(PR_SET_PDEATHSIG)in each worker so the kernel terminates workers when the pool dies - Worker and scheduler error loops throttled. Infra failures (e.g. Redis unreachable) were producing 100k+ log lines per second. Added a 1s sleep after outer-loop exceptions
- Unregistered task name crash. Worker now logs an error and skips instead of crashing when it receives a task for an unknown name
- Failed tasks now log full tracebacks via
logger.exceptioninstead oflogger.error - Kafka consumer handles
Nonemessage values without crashing ActivityUpdated.statusis aStatusenum instead of raw string
- Full documentation sweep for the async API — connection strings, CLI usage,
awaiton activity calls across all guides and API references
Scheduled tasks with cron expressions
@pool.schedule("task_name", "*/5 * * * *")decorator registers and schedules a task in one steppool.add_schedule()for imperative scheduling of already-registered tasks- Cron expressions evaluated in configurable timezone (
AGENTEXEC_SCHEDULER_TIMEZONE, default UTC) - Repeat budget:
-1for forever (default),0for one-shot,Nfor N more executions - Scheduler runs automatically inside
pool.run()— no extra setup needed - Idempotent registration: keyed by task name, so restarts and multiple pool instances overwrite instead of duplicating
- Clock-drift resilient: next run computed from intended anchor time, not wall clock
- Skips missed intervals after downtime instead of enqueuing a burst of catch-up tasks
- New
croniterdependency for cron expression parsing
State backend sorted set operations
- Added
zadd(),zrangebyscore(),zrem()toStateBackendprotocol and Redis implementation - Used internally by the scheduler for efficient due-task polling
Task-level distributed locking
- New
lock_keyparameter on@pool.task()andpool.add_task()for sequential execution of tasks sharing state - String template evaluated against context fields (e.g.,
lock_key="user:{user_id}") - Workers acquire a Redis lock before execution; tasks requeue automatically on contention
- Lock released in
finallyblock on completion or error - Configurable TTL via
AGENTEXEC_LOCK_TTL(default 1800s) as safety net for worker process death - Note: strict FIFO ordering is not guaranteed between tasks sharing the same lock key
Activity metadata for multi-tenancy
- Attach arbitrary metadata when creating activities (e.g.,
metadata={"organization_id": "org-123"}) - Filter activities by metadata in
activity.list()andactivity.detail() - Metadata accessible as attribute for programmatic use but excluded from API serialization by default to prevent accidental tenant info leakage
Redis cleanup on shutdown
state.clear_keys()removes all agentexec-prefixed keys and the task queue on shutdown- Prevents stale tasks from being picked up on restart
State backend lock primitives
- Added
acquire_lock()andrelease_lock()toStateBackendprotocol - Redis implementation uses atomic
SET NX EX/DELETE
- Added
test_task_locking.pywith 16 tests covering lock acquisition, release, requeue, and template evaluation - Fixed
tytype checker errors intest_activity_tracking.py(added narrowing guards forActivity | None)
Public Pool.add_task() method
Pool.add_task()is now public (was_add_task())- Alternative to
@pool.task()decorator for programmatic task registration - Includes comprehensive docstring with usage examples
Enhanced type safety for TaskHandler protocols
- Added generic type parameters (
ContextT,ResultT) to_SyncTaskHandlerand_AsyncTaskHandler - Better IDE autocomplete and type checking support
- Added comprehensive type checking tests in
test_task_types.py
Activity tracking improvements
- Made
percentagefield optional (int | None) in activity schemas - More flexible activity percentage tracking
Configuration robustness
- Added
extra="ignore"to config model for better forward compatibility
Database URL handling
- Fixed database URL rendering to properly handle password visibility
- Uses
engine.url.render_as_string(hide_password=False)instead ofstr(engine.url)
Tracker commit
- Added missing
db.commit()call in activity tracker
Type checking tests
- Added
test_task_types.pyfor validating TaskHandler protocol compatibility - Covers sync/async functions and class methods
Renamed WorkerPool to Pool
ax.WorkerPoolis nowax.Poolfor cleaner API- Update imports:
from agentexec import Pool
Activity percentage field renamed
completion_%renamed topercentagefor cleaner field naming
Pipelines run on workers
- Pipelines can now be executed on worker processes
- Register pipelines with the pool and enqueue them like tasks
Tracker for stateful counters
- New
Trackerclass for managing stateful counters across workers - Useful for tracking progress, metrics, and distributed state
Strict Pipeline type flow validation
- All step parameters and return types must be
BaseModelsubclasses - Type flow between consecutive steps is validated at runtime
- Tuple returns are unpacked and matched to next step's parameters
- Final step must return a single
BaseModel(not a tuple) - Empty pipelines raise
RuntimeErrorat class definition time
Type checking with ty
- Added
tytype checker to development workflow - Better Protocol definitions for step handlers
- Improved type hints throughout pipeline module
Better Pipeline flow tests
- Comprehensive test coverage for valid and invalid type flows
- Tests for tuple unpacking, subclass compatibility, count mismatches
- Tests for primitive type rejection and edge cases
Self-describing JSON serialization replaces pickle
- Task results now use JSON serialization with embedded type information (similar to pickle)
- Automatically stores fully qualified class name with data for type reconstruction
- No longer requires
TaskDefinitionregistry for result deserialization ax.gather()now works with tasks created viaax.enqueue()without pool context- Migration: Clear Redis or wait for TTL expiry on old pickled results
TaskHandler Protocol enforces BaseModel returns
- Task handlers must return a Pydantic
BaseModelinstance (notNoneor arbitrary objects) - Return type is automatically inferred and validated at registration time
- Enables type-safe result retrieval and automatic serialization
State backend abstraction
- Introduced
StateBackendProtocol for pluggable state storage implementations - Current Redis implementation moved to
agentexec.state.redis_backend - Backend modules verified against protocol at import time via
cast() - Prepares foundation for alternative backends (in-memory, DynamoDB, etc.)
Improved async patterns
brpop()is now a proper async function (was sync returning coroutine)- Consistent async/await usage across state operations
- Better type hints and IDE support
Enhanced type safety
TaskHandlerProtocol with support for both sync and async handlers- Proper type annotations for all state backend operations
serialize()anddeserialize()type-enforced forBaseModelonly
Comprehensive documentation added
- API reference for core modules (activity, pipeline, runner, task)
- Conceptual guides (architecture, task lifecycle, worker pool)
- Deployment guides (Docker, production best practices)
- Usage guides (basic usage, pipelines, FastAPI integration, OpenAI runner)
- Getting started (installation, quickstart, configuration)
- Contributing guide
React frontend and component library
- Added
agentexec-uinpm package with reusable React components - Pre-built UI for agent monitoring and activity tracking
- TanStack Query integration for real-time updates
- React Router for navigation between agent list and detail views
Docker deployment
- Docker worker image for containerized deployments
- GitHub Actions for automated Docker image publishing to GitHub Container Registry
- GitHub Actions for automated npm publishing of UI components
Comprehensive test coverage
- Achieved 89% code coverage
- Added unit tests for all core modules:
- State backend and serialization (
test_state.py,test_state_backend.py) - Self-describing results (
test_self_describing_results.py) - Activity tracking schemas (
test_activity_schemas.py) - Pipeline orchestration (
test_pipeline.py) - Task queue operations (
test_queue.py) - Worker events and logging (
test_worker_event.py,test_worker_logging.py) - Database operations (
test_db.py) - Configuration (
test_config.py)
- State backend and serialization (
Redis client refactoring
- Removed
core/redis_client.pyin favor of state backend abstraction - Lazy connection initialization for both async and sync Redis clients
- Proper connection cleanup in
backend.close()
Key formatting consistency
- All state keys use consistent
agentexec:prefix viabackend.format_key() - Results:
agentexec:result:{agent_id} - Events:
agentexec:event:{name}:{id} - Logs channel:
agentexec:logs
Standardized function signatures
get_result()andgather()returnBaseModeldirectly (not JSON strings)- Consistent parameter ordering across state module functions
- Better docstrings with type information
Pipelines
- Multi-step workflow orchestration with
ax.Pipeline - Define steps with
@pipeline.step(order)decorator - Parallel task execution with
ax.gather() - Result retrieval with
ax.get_result()
Worker logging via Redis pubsub
- Workers publish logs to Redis, collected by main process
- Use
pool.run()to see worker logs in real-time
Reorganized worker module
- Worker code moved to
agentexec.workersubpackage RedisEventfor cross-process shutdown coordinationget_worker_logger()configures logging and returns logger in one call
Refactored Redis client usage
- Added
get_redis_sync()for synchronous Redis operations - Sync/async Redis clients for different contexts
Async enqueue() function
ax.enqueue()is now async and must be awaited:task = await ax.enqueue("task_name", MyContext(key="value"))
Type-safe context with Pydantic BaseModel
- Task context must be a Pydantic
BaseModelinstead of a rawdict - Context class is automatically inferred from handler type hints:
class ResearchContext(BaseModel): company: str @pool.task("research") async def research(agent_id: UUID, context: ResearchContext): company = context.company # Type-safe with IDE autocomplete
Redis URL now required
redis_urldefaults toNoneand must be explicitly configured viaREDIS_URL- Prevents accidental connections to wrong Redis instances
Configurable activity messages
- Activity status messages are configurable via environment variables:
AGENTEXEC_ACTIVITY_MESSAGE_CREATE="Waiting to start." AGENTEXEC_ACTIVITY_MESSAGE_STARTED="Task started." AGENTEXEC_ACTIVITY_MESSAGE_COMPLETE="Task completed successfully." AGENTEXEC_ACTIVITY_MESSAGE_ERROR="Task failed with error: {error}"
Improved Task architecture
Taskis now the primary execution object withexecute()methodTaskDefinitionhandles registration metadata and context class inference- Full lifecycle management (QUEUED → RUNNING → COMPLETE/ERROR) encapsulated in
Task.execute()
Better SQLAlchemy session management
- New
scoped_sessionpattern for worker processes - Proper session cleanup on worker shutdown
- Switched to async Redis client (
redis.asyncio) - Consolidated cleanup code in worker
_run()method - Removed unused
debugconfig option
Initial release.