108 lines
3.9 KiB
Python
108 lines
3.9 KiB
Python
from pathlib import Path
|
|
|
|
from memabra.candidate_types import CandidateObject
|
|
from memabra.execution import ExecutionEngine, FileSystemSkillBackend, SkillExecutor
|
|
from memabra.retrieval import CandidateRetriever, InMemoryCandidateProvider
|
|
from memabra.router import RouteDecision, RuleBasedRouter, TaskContext
|
|
from memabra.runner import MemabraRunner
|
|
|
|
|
|
def test_filesystem_skill_backend_loads_skill_from_directory(tmp_path: Path):
|
|
skill_dir = tmp_path / "category-a" / "skill-demo"
|
|
skill_dir.mkdir(parents=True)
|
|
skill_file = skill_dir / "SKILL.md"
|
|
skill_file.write_text(
|
|
"---\n"
|
|
"name: skill-demo\n"
|
|
"description: A demo skill for testing.\n"
|
|
"version: 1.0.0\n"
|
|
"---\n\n"
|
|
"# Demo Skill\n\n"
|
|
"This is the demo skill body.\n"
|
|
)
|
|
|
|
backend = FileSystemSkillBackend(search_paths=[tmp_path])
|
|
payload = backend.load_skill("skill-demo")
|
|
|
|
assert payload["skill_id"] == "skill-demo"
|
|
assert payload["name"] == "skill-demo"
|
|
assert payload["description"] == "A demo skill for testing."
|
|
assert "This is the demo skill body." in payload["content"]
|
|
|
|
|
|
def test_filesystem_skill_backend_returns_error_for_missing_skill(tmp_path: Path):
|
|
backend = FileSystemSkillBackend(search_paths=[tmp_path])
|
|
payload = backend.load_skill("nonexistent")
|
|
|
|
assert payload["skill_id"] == "nonexistent"
|
|
assert payload["status"] == "error"
|
|
assert "not found" in payload["error"].lower()
|
|
|
|
|
|
def test_skill_executor_uses_filesystem_backend_to_load_payload(tmp_path: Path):
|
|
skill_dir = tmp_path / "ops" / "skill-deploy"
|
|
skill_dir.mkdir(parents=True)
|
|
skill_file = skill_dir / "SKILL.md"
|
|
skill_file.write_text(
|
|
"---\n"
|
|
"name: skill-deploy\n"
|
|
"description: Deploy workflow skill.\n"
|
|
"---\n\n"
|
|
"# Deploy Workflow\n\n"
|
|
"1. Build\n2. Test\n3. Deploy\n"
|
|
)
|
|
|
|
backend = FileSystemSkillBackend(search_paths=[tmp_path])
|
|
executor = SkillExecutor(backend=backend)
|
|
decision = RouteDecision(decision_type="load_skill", selected_ids=["skill-deploy"])
|
|
result = executor.execute(decision, TaskContext(user_input="deploy"), trajectory_id="traj-1")
|
|
|
|
assert result.status == "executed"
|
|
assert result.details["payloads"][0]["name"] == "skill-deploy"
|
|
assert "1. Build" in result.details["payloads"][0]["content"]
|
|
assert any(event.event_type == "skill_loaded" for event in result.events)
|
|
|
|
|
|
def test_execution_engine_runs_skill_path_end_to_end(tmp_path: Path):
|
|
skill_dir = tmp_path / "ops" / "skill-deploy"
|
|
skill_dir.mkdir(parents=True)
|
|
(skill_dir / "SKILL.md").write_text(
|
|
"---\n"
|
|
"name: skill-deploy\n"
|
|
"description: Deploy workflow skill.\n"
|
|
"---\n\n"
|
|
"Deploy steps here.\n"
|
|
)
|
|
|
|
retriever = CandidateRetriever(
|
|
[
|
|
InMemoryCandidateProvider(
|
|
candidate_type="skill",
|
|
candidates=[
|
|
CandidateObject(
|
|
id="skill-deploy",
|
|
type="skill",
|
|
title="deploy workflow",
|
|
summary="Reusable deployment procedure.",
|
|
triggers=["deploy", "workflow"],
|
|
confidence=0.9,
|
|
success_rate=0.95,
|
|
freshness=0.8,
|
|
)
|
|
],
|
|
)
|
|
]
|
|
)
|
|
runner = MemabraRunner(
|
|
retriever=retriever,
|
|
router=RuleBasedRouter(),
|
|
execution_engine=ExecutionEngine(skill_backend=FileSystemSkillBackend(search_paths=[tmp_path])),
|
|
)
|
|
|
|
trajectory = runner.run(context=TaskContext(user_input="Deploy this service with the usual workflow."))
|
|
|
|
skill_events = [event for event in trajectory["events"] if event["event_type"] == "skill_loaded"]
|
|
assert skill_events
|
|
assert skill_events[0]["payload"]["name"] == "skill-deploy"
|
|
assert "Deploy steps here." in skill_events[0]["payload"]["content"]
|