266 lines
9.8 KiB
Python
266 lines
9.8 KiB
Python
from pathlib import Path
|
|
|
|
from memabra.candidate_types import CandidateObject
|
|
from memabra.execution import ExecutionEngine, MemoryExecutor, ToolExecutor
|
|
from memabra.memory_store import InMemoryMemoryStore, MemoryRecord, MemorySource
|
|
from memabra.persistence import PersistenceStore
|
|
from memabra.retrieval import CandidateRetriever, InMemoryCandidateProvider
|
|
from memabra.router import RouteDecision, RuleBasedRouter, TaskContext
|
|
from memabra.runner import MemabraRunner
|
|
from memabra.schemas import SchemaRegistry
|
|
|
|
|
|
class FailingToolBackend:
|
|
def run_tool(self, tool_id: str, context: TaskContext, params: dict | None = None) -> dict:
|
|
return {"status": "error", "output": None, "error": f"{tool_id} failed", "latency_ms": 123}
|
|
|
|
|
|
class MixedResultToolBackend:
|
|
def run_tool(self, tool_id: str, context: TaskContext, params: dict | None = None) -> dict:
|
|
if tool_id == "tool-ok":
|
|
return {"status": "success", "output": "ok", "error": None, "latency_ms": 50}
|
|
return {"status": "error", "output": None, "error": f"{tool_id} failed", "latency_ms": 100}
|
|
|
|
|
|
class StaticSkillBackend:
|
|
def load_skill(self, skill_id: str) -> dict:
|
|
return {"skill_id": skill_id, "instructions": "Follow the documented deployment workflow."}
|
|
|
|
|
|
def test_execution_engine_marks_memory_used_and_runner_persists(tmp_path: Path):
|
|
memory_store = InMemoryMemoryStore()
|
|
memory_store.upsert(
|
|
MemoryRecord(
|
|
id="mem-telegram-pref",
|
|
memory_type="semantic",
|
|
fact_status="verified",
|
|
content="Prefer plain text on Telegram.",
|
|
summary="Telegram preference",
|
|
source=MemorySource(kind="user", ref="session-1"),
|
|
confidence=0.95,
|
|
)
|
|
)
|
|
retriever = CandidateRetriever(
|
|
[
|
|
InMemoryCandidateProvider(
|
|
candidate_type="memory",
|
|
candidates=[
|
|
CandidateObject(
|
|
id="mem-telegram-pref",
|
|
type="memory",
|
|
title="Telegram preference",
|
|
summary="Prefer plain text on Telegram.",
|
|
triggers=["telegram", "preference"],
|
|
confidence=0.95,
|
|
success_rate=0.9,
|
|
freshness=0.9,
|
|
)
|
|
],
|
|
)
|
|
]
|
|
)
|
|
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
|
|
runner = MemabraRunner(
|
|
retriever=retriever,
|
|
router=RuleBasedRouter(),
|
|
execution_engine=ExecutionEngine(memory_executor=MemoryExecutor(memory_store=memory_store)),
|
|
persistence_store=persistence,
|
|
memory_store=memory_store,
|
|
)
|
|
|
|
trajectory = runner.run(
|
|
context=TaskContext(user_input="Use my telegram preference for this answer."),
|
|
channel="telegram",
|
|
user_id="oza",
|
|
persist=True,
|
|
)
|
|
|
|
SchemaRegistry().validate_trajectory(trajectory)
|
|
assert any(event["event_type"] == "memory_injected" for event in trajectory["events"])
|
|
assert memory_store.get("mem-telegram-pref").last_used_at is not None
|
|
assert persistence.load_trajectory(trajectory["trajectory_id"])["trajectory_id"] == trajectory["trajectory_id"]
|
|
|
|
|
|
def test_persistence_store_round_trip_memory_record(tmp_path: Path):
|
|
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
|
|
record = MemoryRecord(
|
|
id="mem-1",
|
|
memory_type="semantic",
|
|
fact_status="assumed",
|
|
content="User likes concise replies.",
|
|
summary="Concise reply preference",
|
|
source=MemorySource(kind="user", ref="session-2"),
|
|
confidence=0.7,
|
|
)
|
|
|
|
persistence.save_memory_record(record)
|
|
loaded = persistence.load_memory_record("mem-1")
|
|
assert loaded["id"] == "mem-1"
|
|
assert len(persistence.list_memory_paths()) == 1
|
|
|
|
|
|
def test_runner_records_tool_failures_in_outcome_and_reward(tmp_path: Path):
|
|
retriever = CandidateRetriever(
|
|
[
|
|
InMemoryCandidateProvider(
|
|
candidate_type="tool",
|
|
candidates=[
|
|
CandidateObject(
|
|
id="tool-terminal",
|
|
type="tool",
|
|
title="terminal",
|
|
summary="Run terminal commands.",
|
|
triggers=["check", "current"],
|
|
confidence=0.95,
|
|
success_rate=0.9,
|
|
freshness=1.0,
|
|
)
|
|
],
|
|
)
|
|
]
|
|
)
|
|
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
|
|
runner = MemabraRunner(
|
|
retriever=retriever,
|
|
router=RuleBasedRouter(),
|
|
execution_engine=ExecutionEngine(tool_backend=FailingToolBackend()),
|
|
persistence_store=persistence,
|
|
)
|
|
|
|
trajectory = runner.run(
|
|
context=TaskContext(user_input="Check the current status."),
|
|
channel="telegram",
|
|
persist=True,
|
|
)
|
|
|
|
assert trajectory["outcome"]["status"] == "failure"
|
|
assert trajectory["outcome"]["tool_errors"] == 1
|
|
assert trajectory["reward"]["components"]["tool_error"] > 0
|
|
assert trajectory["reward"]["components"]["latency"] > 0
|
|
assert any(event["event_type"] == "tool_result" for event in trajectory["events"])
|
|
|
|
|
|
def test_runner_loads_skill_payload_from_backend():
|
|
retriever = CandidateRetriever(
|
|
[
|
|
InMemoryCandidateProvider(
|
|
candidate_type="skill",
|
|
candidates=[
|
|
CandidateObject(
|
|
id="skill-deploy",
|
|
type="skill",
|
|
title="deploy workflow",
|
|
summary="Reusable deployment procedure.",
|
|
triggers=["deploy", "workflow"],
|
|
confidence=0.9,
|
|
success_rate=0.95,
|
|
freshness=0.8,
|
|
)
|
|
],
|
|
)
|
|
]
|
|
)
|
|
runner = MemabraRunner(
|
|
retriever=retriever,
|
|
router=RuleBasedRouter(),
|
|
execution_engine=ExecutionEngine(skill_backend=StaticSkillBackend()),
|
|
)
|
|
|
|
trajectory = runner.run(context=TaskContext(user_input="Deploy this service with the usual workflow."))
|
|
|
|
skill_events = [event for event in trajectory["events"] if event["event_type"] == "skill_loaded"]
|
|
assert skill_events
|
|
assert skill_events[0]["payload"]["instructions"] == "Follow the documented deployment workflow."
|
|
|
|
|
|
def test_runner_detects_partial_success_for_mixed_tool_results():
|
|
class BothToolsRouter:
|
|
def choose(self, context, memory, skill, tool):
|
|
from memabra.router import RouteDecision
|
|
return RouteDecision(
|
|
decision_type="call_tool",
|
|
selected_ids=["tool-ok", "tool-fail"],
|
|
selected_payloads=[{}, {}],
|
|
rationale="Force both tools for testing.",
|
|
)
|
|
|
|
retriever = CandidateRetriever(
|
|
[
|
|
InMemoryCandidateProvider(
|
|
candidate_type="tool",
|
|
candidates=[
|
|
CandidateObject(
|
|
id="tool-ok",
|
|
type="tool",
|
|
title="ok tool",
|
|
summary="Always succeeds.",
|
|
triggers=["check", "current"],
|
|
confidence=0.95,
|
|
success_rate=0.9,
|
|
freshness=1.0,
|
|
),
|
|
CandidateObject(
|
|
id="tool-fail",
|
|
type="tool",
|
|
title="failing tool",
|
|
summary="Always fails.",
|
|
triggers=["check", "current"],
|
|
confidence=0.9,
|
|
success_rate=0.5,
|
|
freshness=1.0,
|
|
),
|
|
],
|
|
)
|
|
]
|
|
)
|
|
runner = MemabraRunner(
|
|
retriever=retriever,
|
|
router=BothToolsRouter(),
|
|
execution_engine=ExecutionEngine(tool_backend=MixedResultToolBackend()),
|
|
)
|
|
|
|
trajectory = runner.run(
|
|
context=TaskContext(user_input="Check the current status."),
|
|
channel="local",
|
|
)
|
|
|
|
assert trajectory["outcome"]["status"] == "partial_success"
|
|
assert trajectory["outcome"]["tool_errors"] == 1
|
|
assert trajectory["reward"]["components"]["tool_error"] > 0
|
|
assert trajectory["reward"]["components"]["context_cost"] > 0
|
|
|
|
|
|
def test_execution_engine_executes_composite_action_sequentially():
|
|
memory_store = InMemoryMemoryStore()
|
|
memory_store.upsert(
|
|
MemoryRecord(
|
|
id="mem-1",
|
|
memory_type="semantic",
|
|
fact_status="verified",
|
|
content="Prefer concise replies.",
|
|
summary="Concise preference",
|
|
source=MemorySource(kind="user", ref="session-1"),
|
|
confidence=0.9,
|
|
)
|
|
)
|
|
engine = ExecutionEngine(
|
|
memory_executor=MemoryExecutor(memory_store=memory_store),
|
|
tool_executor=ToolExecutor(backend=MixedResultToolBackend()),
|
|
)
|
|
decision = RouteDecision(
|
|
decision_type="composite_action",
|
|
composite_steps=[
|
|
RouteDecision(decision_type="inject_memory", selected_ids=["mem-1"]),
|
|
RouteDecision(decision_type="call_tool", selected_ids=["tool-ok"], selected_payloads=[{}]),
|
|
],
|
|
)
|
|
result = engine.execute(decision, TaskContext(user_input="composite test"), trajectory_id="traj-comp")
|
|
|
|
assert result.status == "executed"
|
|
assert any(event.event_type == "memory_injected" for event in result.events)
|
|
assert any(event.event_type == "tool_result" for event in result.events)
|
|
assert len(result.details["steps"]) == 2
|
|
assert result.details["steps"][0]["decision_type"] == "inject_memory"
|
|
assert result.details["steps"][1]["decision_type"] == "call_tool"
|
|
|