from pathlib import Path from memabra.candidate_types import CandidateObject from memabra.execution import ExecutionEngine, FileSystemSkillBackend, SkillExecutor from memabra.retrieval import CandidateRetriever, InMemoryCandidateProvider from memabra.router import RouteDecision, RuleBasedRouter, TaskContext from memabra.runner import MemabraRunner def test_filesystem_skill_backend_loads_skill_from_directory(tmp_path: Path): skill_dir = tmp_path / "category-a" / "skill-demo" skill_dir.mkdir(parents=True) skill_file = skill_dir / "SKILL.md" skill_file.write_text( "---\n" "name: skill-demo\n" "description: A demo skill for testing.\n" "version: 1.0.0\n" "---\n\n" "# Demo Skill\n\n" "This is the demo skill body.\n" ) backend = FileSystemSkillBackend(search_paths=[tmp_path]) payload = backend.load_skill("skill-demo") assert payload["skill_id"] == "skill-demo" assert payload["name"] == "skill-demo" assert payload["description"] == "A demo skill for testing." assert "This is the demo skill body." in payload["content"] def test_filesystem_skill_backend_returns_error_for_missing_skill(tmp_path: Path): backend = FileSystemSkillBackend(search_paths=[tmp_path]) payload = backend.load_skill("nonexistent") assert payload["skill_id"] == "nonexistent" assert payload["status"] == "error" assert "not found" in payload["error"].lower() def test_skill_executor_uses_filesystem_backend_to_load_payload(tmp_path: Path): skill_dir = tmp_path / "ops" / "skill-deploy" skill_dir.mkdir(parents=True) skill_file = skill_dir / "SKILL.md" skill_file.write_text( "---\n" "name: skill-deploy\n" "description: Deploy workflow skill.\n" "---\n\n" "# Deploy Workflow\n\n" "1. Build\n2. Test\n3. Deploy\n" ) backend = FileSystemSkillBackend(search_paths=[tmp_path]) executor = SkillExecutor(backend=backend) decision = RouteDecision(decision_type="load_skill", selected_ids=["skill-deploy"]) result = executor.execute(decision, TaskContext(user_input="deploy"), trajectory_id="traj-1") assert result.status == "executed" assert result.details["payloads"][0]["name"] == "skill-deploy" assert "1. Build" in result.details["payloads"][0]["content"] assert any(event.event_type == "skill_loaded" for event in result.events) def test_execution_engine_runs_skill_path_end_to_end(tmp_path: Path): skill_dir = tmp_path / "ops" / "skill-deploy" skill_dir.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\n" "name: skill-deploy\n" "description: Deploy workflow skill.\n" "---\n\n" "Deploy steps here.\n" ) retriever = CandidateRetriever( [ InMemoryCandidateProvider( candidate_type="skill", candidates=[ CandidateObject( id="skill-deploy", type="skill", title="deploy workflow", summary="Reusable deployment procedure.", triggers=["deploy", "workflow"], confidence=0.9, success_rate=0.95, freshness=0.8, ) ], ) ] ) runner = MemabraRunner( retriever=retriever, router=RuleBasedRouter(), execution_engine=ExecutionEngine(skill_backend=FileSystemSkillBackend(search_paths=[tmp_path])), ) trajectory = runner.run(context=TaskContext(user_input="Deploy this service with the usual workflow.")) skill_events = [event for event in trajectory["events"] if event["event_type"] == "skill_loaded"] assert skill_events assert skill_events[0]["payload"]["name"] == "skill-deploy" assert "Deploy steps here." in skill_events[0]["payload"]["content"]