The Core module (osiris/core/) contains the fundamental business logic and orchestration components of Osiris.
osiris/core/
├── conversational_agent.py # LLM-driven conversation management
├── discovery.py # Database schema discovery
├── compiler_v0.py # OML to manifest compilation
├── config.py # Configuration management
├── driver.py # Driver protocol and registry
├── execution_adapter.py # Adapter abstract base class
├── adapter_factory.py # Adapter selection factory
├── session_logging.py # Structured logging system
├── state_store.py # SQLite session persistence
├── llm_adapter.py # Multi-provider LLM interface
├── secrets_masking.py # Automatic secret detection
├── cache_fingerprint.py # Discovery cache management
├── env_loader.py # Environment variable loading
└── error_taxonomy.py # Error classification system
Manages the LLM-driven conversation flow for pipeline creation.
Key Classes:
ConversationalAgent- Main conversation orchestratorChatState- FSM states (INIT, INTENT_CAPTURED, DISCOVERY, etc.)ConversationContext- Maintains conversation state
State Machine:
INIT → INTENT_CAPTURED → [DISCOVERY] → OML_SYNTHESIS → VALIDATE_OML → COMPILE
Key Methods:
def process_message(self, message: str) -> AgentResponse:
"""Process user input and advance conversation"""
def generate_oml(self, context: dict) -> str:
"""Generate OML from conversation context"""
def validate_and_save(self, oml: str) -> bool:
"""Validate OML and save to output/"""Intelligent database schema exploration with caching.
Key Classes:
DiscoveryAgent- Manages discovery processSchemaCache- SQLite-based cachingSchemaFingerprint- SHA-256 cache validation
Key Features:
- Progressive discovery (tables → columns → samples)
- Connection fingerprinting for cache validation
- Automatic cache invalidation on changes
Example Usage:
discovery = DiscoveryAgent(connection_config)
schema = discovery.discover_schema() # Cached if unchanged
tables = discovery.get_tables()
columns = discovery.get_columns("users")Compiles OML v0.1.0 to deterministic manifests.
Key Functions:
compile_oml()- Main compilation entryvalidate_schema()- OML v0.1.0 validationresolve_connections()- Connection reference resolutiongenerate_fingerprint()- SHA-256 manifest fingerprint
Compilation Process:
- Parse and validate OML
- Resolve connection references
- Generate step configurations
- Create deterministic manifest
- Calculate SHA-256 fingerprint
Defines the driver interface and registry system.
Protocol Definition:
class Driver(Protocol):
def run(
self,
step_id: str,
config: dict,
inputs: dict | None,
ctx: ExecutionContext
) -> dict:
"""Execute a pipeline step"""Registry System:
class DriverRegistry:
@classmethod
def register(cls, name: str, driver: Driver):
"""Register a driver implementation"""
@classmethod
def get(cls, name: str) -> Driver:
"""Retrieve registered driver"""Abstract base class for execution environments.
Interface:
class ExecutionAdapter(ABC):
@abstractmethod
def prepare(self, manifest: dict, context: ExecutionContext) -> PreparedRun:
"""Prepare execution package"""
@abstractmethod
def execute(self, prepared: PreparedRun, context: ExecutionContext):
"""Execute the pipeline"""
@abstractmethod
def collect(self, prepared: PreparedRun, context: ExecutionContext) -> dict:
"""Collect results and artifacts"""Implementations:
LocalAdapter- Local executionE2BTransparentProxy- E2B cloud execution
Session-scoped logging with events and metrics.
Key Classes:
SessionLogger- Main logging interfaceEventLogger- Structured event emissionMetricsCollector- Performance metrics
Log Structure:
logs/run_XXXXXXXXXX/
├── events.jsonl # Structured events
├── metrics.jsonl # Performance metrics
├── artifacts/ # Generated files
└── manifest.json # Compiled manifest
Event Format:
{
"ts": "2025-01-01T12:00:00Z",
"session": "run_1234567890",
"event": "step_start",
"step_id": "extract_data",
"driver": "mysql.extractor"
}Handles configuration loading with precedence.
Precedence Order:
- CLI arguments (highest)
- Environment variables
- Configuration files
- Defaults (lowest)
Key Classes:
Config- Main configuration containerConnectionConfig- Connection settingsRuntimeConfig- Execution settings
Unified interface for multiple LLM providers.
Supported Providers:
- OpenAI (GPT-4, GPT-4o)
- Anthropic (Claude 3.5)
- Google (Gemini)
Interface:
class LLMAdapter:
def complete(self, prompt: str, **kwargs) -> str:
"""Generate completion"""
def stream(self, prompt: str, **kwargs) -> Iterator[str]:
"""Stream completion chunks"""SQLite-based session state management.
Key Features:
- Conversation history persistence
- Discovery cache storage
- Session recovery on interruption
- Automatic cleanup of old sessions
Automatic detection and masking of sensitive data.
Detection Methods:
- Pattern-based (regex)
- Entropy analysis
- Known secret formats
- Connection password detection
Masking:
mask_secrets("password=secret123")
# Returns: "password=***"Standardized error types and handling.
Error Categories:
ConfigurationError- Invalid configurationCompilationError- OML compilation failuresExecutionError- Runtime failuresConnectionError- Database connectivityValidationError- Schema validation
Used in adapter_factory.py for adapter selection:
adapter = AdapterFactory.create(target="e2b") # or "local"Used in driver.py for component registration:
DriverRegistry.register("mysql.extractor", MySQLExtractor())
driver = DriverRegistry.get("mysql.extractor")Used in llm_adapter.py for provider selection:
llm = LLMAdapter.create(provider="openai", model="gpt-4")Passed to drivers and adapters:
@dataclass
class ExecutionContext:
session_id: str
working_dir: Path
artifacts_dir: Path
logger: SessionLogger
def log_event(self, event: str, data: dict):
"""Log structured event"""
def log_metric(self, name: str, value: float):
"""Log performance metric"""- Use protocols for interfaces (typing.Protocol)
- Emit structured events for observability
- Handle errors gracefully with context
- Cache expensive operations (discovery)
- Validate early (configuration, OML)
- Mask secrets in all outputs
- Use type hints throughout
def test_driver_registration():
driver = MockDriver()
DriverRegistry.register("test.driver", driver)
retrieved = DriverRegistry.get("test.driver")
assert retrieved is driverdef test_compilation_flow():
oml = load_fixture("pipeline.yaml")
manifest = compile_oml(oml)
assert manifest["fingerprint"]
assert len(manifest["steps"]) > 0- Parallel step execution
- Streaming data processing
- Advanced caching strategies
- Plugin system for drivers
- Distributed execution support