from pathlib import Path from memabra.candidate_types import CandidateObject from memabra.execution import ExecutionEngine, MemoryExecutor, ToolExecutor from memabra.memory_store import InMemoryMemoryStore, MemoryRecord, MemorySource from memabra.persistence import PersistenceStore from memabra.retrieval import CandidateRetriever, InMemoryCandidateProvider from memabra.router import RouteDecision, RuleBasedRouter, TaskContext from memabra.runner import MemabraRunner from memabra.schemas import SchemaRegistry class FailingToolBackend: def run_tool(self, tool_id: str, context: TaskContext, params: dict | None = None) -> dict: return {"status": "error", "output": None, "error": f"{tool_id} failed", "latency_ms": 123} class MixedResultToolBackend: def run_tool(self, tool_id: str, context: TaskContext, params: dict | None = None) -> dict: if tool_id == "tool-ok": return {"status": "success", "output": "ok", "error": None, "latency_ms": 50} return {"status": "error", "output": None, "error": f"{tool_id} failed", "latency_ms": 100} class StaticSkillBackend: def load_skill(self, skill_id: str) -> dict: return {"skill_id": skill_id, "instructions": "Follow the documented deployment workflow."} def test_execution_engine_marks_memory_used_and_runner_persists(tmp_path: Path): memory_store = InMemoryMemoryStore() memory_store.upsert( MemoryRecord( id="mem-telegram-pref", memory_type="semantic", fact_status="verified", content="Prefer plain text on Telegram.", summary="Telegram preference", source=MemorySource(kind="user", ref="session-1"), confidence=0.95, ) ) retriever = CandidateRetriever( [ InMemoryCandidateProvider( candidate_type="memory", candidates=[ CandidateObject( id="mem-telegram-pref", type="memory", title="Telegram preference", summary="Prefer plain text on Telegram.", triggers=["telegram", "preference"], confidence=0.95, success_rate=0.9, freshness=0.9, ) ], ) ] ) persistence = PersistenceStore(base_dir=tmp_path / "artifacts") runner = MemabraRunner( retriever=retriever, router=RuleBasedRouter(), execution_engine=ExecutionEngine(memory_executor=MemoryExecutor(memory_store=memory_store)), persistence_store=persistence, memory_store=memory_store, ) trajectory = runner.run( context=TaskContext(user_input="Use my telegram preference for this answer."), channel="telegram", user_id="oza", persist=True, ) SchemaRegistry().validate_trajectory(trajectory) assert any(event["event_type"] == "memory_injected" for event in trajectory["events"]) assert memory_store.get("mem-telegram-pref").last_used_at is not None assert persistence.load_trajectory(trajectory["trajectory_id"])["trajectory_id"] == trajectory["trajectory_id"] def test_persistence_store_round_trip_memory_record(tmp_path: Path): persistence = PersistenceStore(base_dir=tmp_path / "artifacts") record = MemoryRecord( id="mem-1", memory_type="semantic", fact_status="assumed", content="User likes concise replies.", summary="Concise reply preference", source=MemorySource(kind="user", ref="session-2"), confidence=0.7, ) persistence.save_memory_record(record) loaded = persistence.load_memory_record("mem-1") assert loaded["id"] == "mem-1" assert len(persistence.list_memory_paths()) == 1 def test_runner_records_tool_failures_in_outcome_and_reward(tmp_path: Path): retriever = CandidateRetriever( [ InMemoryCandidateProvider( candidate_type="tool", candidates=[ CandidateObject( id="tool-terminal", type="tool", title="terminal", summary="Run terminal commands.", triggers=["check", "current"], confidence=0.95, success_rate=0.9, freshness=1.0, ) ], ) ] ) persistence = PersistenceStore(base_dir=tmp_path / "artifacts") runner = MemabraRunner( retriever=retriever, router=RuleBasedRouter(), execution_engine=ExecutionEngine(tool_backend=FailingToolBackend()), persistence_store=persistence, ) trajectory = runner.run( context=TaskContext(user_input="Check the current status."), channel="telegram", persist=True, ) assert trajectory["outcome"]["status"] == "failure" assert trajectory["outcome"]["tool_errors"] == 1 assert trajectory["reward"]["components"]["tool_error"] > 0 assert trajectory["reward"]["components"]["latency"] > 0 assert any(event["event_type"] == "tool_result" for event in trajectory["events"]) def test_runner_loads_skill_payload_from_backend(): retriever = CandidateRetriever( [ InMemoryCandidateProvider( candidate_type="skill", candidates=[ CandidateObject( id="skill-deploy", type="skill", title="deploy workflow", summary="Reusable deployment procedure.", triggers=["deploy", "workflow"], confidence=0.9, success_rate=0.95, freshness=0.8, ) ], ) ] ) runner = MemabraRunner( retriever=retriever, router=RuleBasedRouter(), execution_engine=ExecutionEngine(skill_backend=StaticSkillBackend()), ) trajectory = runner.run(context=TaskContext(user_input="Deploy this service with the usual workflow.")) skill_events = [event for event in trajectory["events"] if event["event_type"] == "skill_loaded"] assert skill_events assert skill_events[0]["payload"]["instructions"] == "Follow the documented deployment workflow." def test_runner_detects_partial_success_for_mixed_tool_results(): class BothToolsRouter: def choose(self, context, memory, skill, tool): from memabra.router import RouteDecision return RouteDecision( decision_type="call_tool", selected_ids=["tool-ok", "tool-fail"], selected_payloads=[{}, {}], rationale="Force both tools for testing.", ) retriever = CandidateRetriever( [ InMemoryCandidateProvider( candidate_type="tool", candidates=[ CandidateObject( id="tool-ok", type="tool", title="ok tool", summary="Always succeeds.", triggers=["check", "current"], confidence=0.95, success_rate=0.9, freshness=1.0, ), CandidateObject( id="tool-fail", type="tool", title="failing tool", summary="Always fails.", triggers=["check", "current"], confidence=0.9, success_rate=0.5, freshness=1.0, ), ], ) ] ) runner = MemabraRunner( retriever=retriever, router=BothToolsRouter(), execution_engine=ExecutionEngine(tool_backend=MixedResultToolBackend()), ) trajectory = runner.run( context=TaskContext(user_input="Check the current status."), channel="local", ) assert trajectory["outcome"]["status"] == "partial_success" assert trajectory["outcome"]["tool_errors"] == 1 assert trajectory["reward"]["components"]["tool_error"] > 0 assert trajectory["reward"]["components"]["context_cost"] > 0 def test_execution_engine_executes_composite_action_sequentially(): memory_store = InMemoryMemoryStore() memory_store.upsert( MemoryRecord( id="mem-1", memory_type="semantic", fact_status="verified", content="Prefer concise replies.", summary="Concise preference", source=MemorySource(kind="user", ref="session-1"), confidence=0.9, ) ) engine = ExecutionEngine( memory_executor=MemoryExecutor(memory_store=memory_store), tool_executor=ToolExecutor(backend=MixedResultToolBackend()), ) decision = RouteDecision( decision_type="composite_action", composite_steps=[ RouteDecision(decision_type="inject_memory", selected_ids=["mem-1"]), RouteDecision(decision_type="call_tool", selected_ids=["tool-ok"], selected_payloads=[{}]), ], ) result = engine.execute(decision, TaskContext(user_input="composite test"), trajectory_id="traj-comp") assert result.status == "executed" assert any(event.event_type == "memory_injected" for event in result.events) assert any(event.event_type == "tool_result" for event in result.events) assert len(result.details["steps"]) == 2 assert result.details["steps"][0]["decision_type"] == "inject_memory" assert result.details["steps"][1]["decision_type"] == "call_tool"