Files
memabra/tests/test_skill_adapters.py
2026-04-15 11:06:05 +08:00

108 lines
3.9 KiB
Python

from pathlib import Path
from memabra.candidate_types import CandidateObject
from memabra.execution import ExecutionEngine, FileSystemSkillBackend, SkillExecutor
from memabra.retrieval import CandidateRetriever, InMemoryCandidateProvider
from memabra.router import RouteDecision, RuleBasedRouter, TaskContext
from memabra.runner import MemabraRunner
def test_filesystem_skill_backend_loads_skill_from_directory(tmp_path: Path):
skill_dir = tmp_path / "category-a" / "skill-demo"
skill_dir.mkdir(parents=True)
skill_file = skill_dir / "SKILL.md"
skill_file.write_text(
"---\n"
"name: skill-demo\n"
"description: A demo skill for testing.\n"
"version: 1.0.0\n"
"---\n\n"
"# Demo Skill\n\n"
"This is the demo skill body.\n"
)
backend = FileSystemSkillBackend(search_paths=[tmp_path])
payload = backend.load_skill("skill-demo")
assert payload["skill_id"] == "skill-demo"
assert payload["name"] == "skill-demo"
assert payload["description"] == "A demo skill for testing."
assert "This is the demo skill body." in payload["content"]
def test_filesystem_skill_backend_returns_error_for_missing_skill(tmp_path: Path):
backend = FileSystemSkillBackend(search_paths=[tmp_path])
payload = backend.load_skill("nonexistent")
assert payload["skill_id"] == "nonexistent"
assert payload["status"] == "error"
assert "not found" in payload["error"].lower()
def test_skill_executor_uses_filesystem_backend_to_load_payload(tmp_path: Path):
skill_dir = tmp_path / "ops" / "skill-deploy"
skill_dir.mkdir(parents=True)
skill_file = skill_dir / "SKILL.md"
skill_file.write_text(
"---\n"
"name: skill-deploy\n"
"description: Deploy workflow skill.\n"
"---\n\n"
"# Deploy Workflow\n\n"
"1. Build\n2. Test\n3. Deploy\n"
)
backend = FileSystemSkillBackend(search_paths=[tmp_path])
executor = SkillExecutor(backend=backend)
decision = RouteDecision(decision_type="load_skill", selected_ids=["skill-deploy"])
result = executor.execute(decision, TaskContext(user_input="deploy"), trajectory_id="traj-1")
assert result.status == "executed"
assert result.details["payloads"][0]["name"] == "skill-deploy"
assert "1. Build" in result.details["payloads"][0]["content"]
assert any(event.event_type == "skill_loaded" for event in result.events)
def test_execution_engine_runs_skill_path_end_to_end(tmp_path: Path):
skill_dir = tmp_path / "ops" / "skill-deploy"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"---\n"
"name: skill-deploy\n"
"description: Deploy workflow skill.\n"
"---\n\n"
"Deploy steps here.\n"
)
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="skill",
candidates=[
CandidateObject(
id="skill-deploy",
type="skill",
title="deploy workflow",
summary="Reusable deployment procedure.",
triggers=["deploy", "workflow"],
confidence=0.9,
success_rate=0.95,
freshness=0.8,
)
],
)
]
)
runner = MemabraRunner(
retriever=retriever,
router=RuleBasedRouter(),
execution_engine=ExecutionEngine(skill_backend=FileSystemSkillBackend(search_paths=[tmp_path])),
)
trajectory = runner.run(context=TaskContext(user_input="Deploy this service with the usual workflow."))
skill_events = [event for event in trajectory["events"] if event["event_type"] == "skill_loaded"]
assert skill_events
assert skill_events[0]["payload"]["name"] == "skill-deploy"
assert "Deploy steps here." in skill_events[0]["payload"]["content"]