Files
memabra/tests/test_execution_persistence.py
2026-04-15 11:06:05 +08:00

266 lines
9.8 KiB
Python

from pathlib import Path
from memabra.candidate_types import CandidateObject
from memabra.execution import ExecutionEngine, MemoryExecutor, ToolExecutor
from memabra.memory_store import InMemoryMemoryStore, MemoryRecord, MemorySource
from memabra.persistence import PersistenceStore
from memabra.retrieval import CandidateRetriever, InMemoryCandidateProvider
from memabra.router import RouteDecision, RuleBasedRouter, TaskContext
from memabra.runner import MemabraRunner
from memabra.schemas import SchemaRegistry
class FailingToolBackend:
def run_tool(self, tool_id: str, context: TaskContext, params: dict | None = None) -> dict:
return {"status": "error", "output": None, "error": f"{tool_id} failed", "latency_ms": 123}
class MixedResultToolBackend:
def run_tool(self, tool_id: str, context: TaskContext, params: dict | None = None) -> dict:
if tool_id == "tool-ok":
return {"status": "success", "output": "ok", "error": None, "latency_ms": 50}
return {"status": "error", "output": None, "error": f"{tool_id} failed", "latency_ms": 100}
class StaticSkillBackend:
def load_skill(self, skill_id: str) -> dict:
return {"skill_id": skill_id, "instructions": "Follow the documented deployment workflow."}
def test_execution_engine_marks_memory_used_and_runner_persists(tmp_path: Path):
memory_store = InMemoryMemoryStore()
memory_store.upsert(
MemoryRecord(
id="mem-telegram-pref",
memory_type="semantic",
fact_status="verified",
content="Prefer plain text on Telegram.",
summary="Telegram preference",
source=MemorySource(kind="user", ref="session-1"),
confidence=0.95,
)
)
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="memory",
candidates=[
CandidateObject(
id="mem-telegram-pref",
type="memory",
title="Telegram preference",
summary="Prefer plain text on Telegram.",
triggers=["telegram", "preference"],
confidence=0.95,
success_rate=0.9,
freshness=0.9,
)
],
)
]
)
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
runner = MemabraRunner(
retriever=retriever,
router=RuleBasedRouter(),
execution_engine=ExecutionEngine(memory_executor=MemoryExecutor(memory_store=memory_store)),
persistence_store=persistence,
memory_store=memory_store,
)
trajectory = runner.run(
context=TaskContext(user_input="Use my telegram preference for this answer."),
channel="telegram",
user_id="oza",
persist=True,
)
SchemaRegistry().validate_trajectory(trajectory)
assert any(event["event_type"] == "memory_injected" for event in trajectory["events"])
assert memory_store.get("mem-telegram-pref").last_used_at is not None
assert persistence.load_trajectory(trajectory["trajectory_id"])["trajectory_id"] == trajectory["trajectory_id"]
def test_persistence_store_round_trip_memory_record(tmp_path: Path):
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
record = MemoryRecord(
id="mem-1",
memory_type="semantic",
fact_status="assumed",
content="User likes concise replies.",
summary="Concise reply preference",
source=MemorySource(kind="user", ref="session-2"),
confidence=0.7,
)
persistence.save_memory_record(record)
loaded = persistence.load_memory_record("mem-1")
assert loaded["id"] == "mem-1"
assert len(persistence.list_memory_paths()) == 1
def test_runner_records_tool_failures_in_outcome_and_reward(tmp_path: Path):
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="tool",
candidates=[
CandidateObject(
id="tool-terminal",
type="tool",
title="terminal",
summary="Run terminal commands.",
triggers=["check", "current"],
confidence=0.95,
success_rate=0.9,
freshness=1.0,
)
],
)
]
)
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
runner = MemabraRunner(
retriever=retriever,
router=RuleBasedRouter(),
execution_engine=ExecutionEngine(tool_backend=FailingToolBackend()),
persistence_store=persistence,
)
trajectory = runner.run(
context=TaskContext(user_input="Check the current status."),
channel="telegram",
persist=True,
)
assert trajectory["outcome"]["status"] == "failure"
assert trajectory["outcome"]["tool_errors"] == 1
assert trajectory["reward"]["components"]["tool_error"] > 0
assert trajectory["reward"]["components"]["latency"] > 0
assert any(event["event_type"] == "tool_result" for event in trajectory["events"])
def test_runner_loads_skill_payload_from_backend():
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="skill",
candidates=[
CandidateObject(
id="skill-deploy",
type="skill",
title="deploy workflow",
summary="Reusable deployment procedure.",
triggers=["deploy", "workflow"],
confidence=0.9,
success_rate=0.95,
freshness=0.8,
)
],
)
]
)
runner = MemabraRunner(
retriever=retriever,
router=RuleBasedRouter(),
execution_engine=ExecutionEngine(skill_backend=StaticSkillBackend()),
)
trajectory = runner.run(context=TaskContext(user_input="Deploy this service with the usual workflow."))
skill_events = [event for event in trajectory["events"] if event["event_type"] == "skill_loaded"]
assert skill_events
assert skill_events[0]["payload"]["instructions"] == "Follow the documented deployment workflow."
def test_runner_detects_partial_success_for_mixed_tool_results():
class BothToolsRouter:
def choose(self, context, memory, skill, tool):
from memabra.router import RouteDecision
return RouteDecision(
decision_type="call_tool",
selected_ids=["tool-ok", "tool-fail"],
selected_payloads=[{}, {}],
rationale="Force both tools for testing.",
)
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="tool",
candidates=[
CandidateObject(
id="tool-ok",
type="tool",
title="ok tool",
summary="Always succeeds.",
triggers=["check", "current"],
confidence=0.95,
success_rate=0.9,
freshness=1.0,
),
CandidateObject(
id="tool-fail",
type="tool",
title="failing tool",
summary="Always fails.",
triggers=["check", "current"],
confidence=0.9,
success_rate=0.5,
freshness=1.0,
),
],
)
]
)
runner = MemabraRunner(
retriever=retriever,
router=BothToolsRouter(),
execution_engine=ExecutionEngine(tool_backend=MixedResultToolBackend()),
)
trajectory = runner.run(
context=TaskContext(user_input="Check the current status."),
channel="local",
)
assert trajectory["outcome"]["status"] == "partial_success"
assert trajectory["outcome"]["tool_errors"] == 1
assert trajectory["reward"]["components"]["tool_error"] > 0
assert trajectory["reward"]["components"]["context_cost"] > 0
def test_execution_engine_executes_composite_action_sequentially():
memory_store = InMemoryMemoryStore()
memory_store.upsert(
MemoryRecord(
id="mem-1",
memory_type="semantic",
fact_status="verified",
content="Prefer concise replies.",
summary="Concise preference",
source=MemorySource(kind="user", ref="session-1"),
confidence=0.9,
)
)
engine = ExecutionEngine(
memory_executor=MemoryExecutor(memory_store=memory_store),
tool_executor=ToolExecutor(backend=MixedResultToolBackend()),
)
decision = RouteDecision(
decision_type="composite_action",
composite_steps=[
RouteDecision(decision_type="inject_memory", selected_ids=["mem-1"]),
RouteDecision(decision_type="call_tool", selected_ids=["tool-ok"], selected_payloads=[{}]),
],
)
result = engine.execute(decision, TaskContext(user_input="composite test"), trajectory_id="traj-comp")
assert result.status == "executed"
assert any(event.event_type == "memory_injected" for event in result.events)
assert any(event.event_type == "tool_result" for event in result.events)
assert len(result.details["steps"]) == 2
assert result.details["steps"][0]["decision_type"] == "inject_memory"
assert result.details["steps"][1]["decision_type"] == "call_tool"