Initial standalone memabra release

This commit is contained in:
Carlos Ouyang
2026-04-15 11:06:05 +08:00
commit 58f9f221b1
464 changed files with 30256 additions and 0 deletions

197
tests/test_app.py Normal file
View File

@@ -0,0 +1,197 @@
from pathlib import Path
from memabra.app import MemabraApp, build_app_with_skills, build_demo_app
def test_build_demo_app_runs_task_and_produces_summary(tmp_path: Path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
trajectory = app.run_task("Use my telegram preference for this answer.", channel="telegram", user_id="oza")
summary = app.replay_summary()
assert trajectory["trajectory_id"].startswith("traj-")
assert summary.trajectories == 1
assert any(event["event_type"] == "memory_injected" for event in trajectory["events"])
assert len(list((tmp_path / "demo-artifacts" / "trajectories").glob("*.json"))) == 1
def test_app_can_run_tool_task_with_demo_backend(tmp_path: Path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
trajectory = app.run_task("Check the current system status.")
assert trajectory["decisions"][0]["decision_type"] == "call_tool"
assert any(event["event_type"] == "tool_result" for event in trajectory["events"])
assert trajectory["outcome"]["status"] == "success"
def test_build_app_with_skills_loads_real_skill_from_filesystem(tmp_path: Path):
skill_dir = tmp_path / "skills" / "github-auth"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"---\n"
"name: github-auth\n"
"description: Authenticate with GitHub.\n"
"---\n\n"
"# GitHub Auth\n\n"
"Use git or gh.\n"
)
app = build_app_with_skills(base_dir=tmp_path / "artifacts", skill_search_paths=[tmp_path / "skills"])
# github-auth is not in the candidate set by default, so router won't trigger it.
# We test that the app builds and a memory task still works.
trajectory = app.run_task("Use my telegram preference for this answer.", channel="telegram", user_id="oza")
assert trajectory["decisions"][0]["decision_type"] == "inject_memory"
# Now verify the skill backend is actually wired by loading directly
backend = app.runner.execution_engine.skill_executor.backend
payload = backend.load_skill("github-auth")
assert payload["name"] == "github-auth"
assert "Use git or gh." in payload["content"]
def test_app_artifact_index_queries_persisted_trajectories(tmp_path: Path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
app.run_task("Use my telegram preference for this answer.", channel="telegram", user_id="u1")
app.run_task("Check the current system status.", channel="local", user_id="u2")
index = app.artifact_index()
telegram_trajs = index.query(channel="telegram")
tool_trajs = index.query(decision_type="call_tool")
assert len(telegram_trajs) == 1
assert telegram_trajs[0]["task"]["input"] == "Use my telegram preference for this answer."
assert len(tool_trajs) == 1
assert tool_trajs[0]["task"]["input"] == "Check the current system status."
slice_ids = index.slice_dataset(channel="local")
assert len(slice_ids) == 1
def test_app_run_online_learning_cycle_returns_report(tmp_path: Path):
from memabra.benchmarks import BenchmarkTask
from memabra.promotion import PromotionPolicy
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
# Seed trajectories
for i in range(10):
app.run_task(f"Task {i}")
result = app.run_online_learning_cycle(
policy=PromotionPolicy(
min_reward_delta=-1.0,
max_error_rate_increase=1.0,
max_latency_increase_ms=10000.0,
required_task_count=1,
),
benchmark_tasks=[BenchmarkTask(user_input="Task 0")],
min_new_trajectories=1,
)
assert "skipped" in result
assert "promoted" in result or result["skipped"] is True
assert "report_id" in result
def test_app_run_online_learning_cycle_uses_baseline_version(tmp_path: Path):
from memabra.benchmarks import BenchmarkTask
from memabra.promotion import PromotionPolicy
from memabra.router import SimpleLearningRouter
from memabra.router_versioning import RouterVersionStore
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
for i in range(10):
app.run_task(f"Task {i}")
# Save a baseline version
baseline_router = SimpleLearningRouter()
baseline_router._weights = {"call_tool": {"input_length": 0.99}}
baseline_router._feature_keys = ["input_length"]
version_dir = tmp_path / "versions"
store = RouterVersionStore(base_dir=version_dir)
store.save(baseline_router, version_id="v-baseline")
# Change current router
app.set_router(SimpleLearningRouter())
result = app.run_online_learning_cycle(
policy=PromotionPolicy(
min_reward_delta=-1.0,
max_error_rate_increase=1.0,
max_latency_increase_ms=10000.0,
required_task_count=1,
),
benchmark_tasks=[BenchmarkTask(user_input="Task 0")],
min_new_trajectories=1,
version_store_base_dir=version_dir,
baseline_version_id="v-baseline",
)
assert result["skipped"] is False
assert "baseline_metrics" in result
assert "challenger_metrics" in result
def test_app_run_online_learning_cycle_rebuilds_case_index(tmp_path: Path):
from memabra.benchmarks import BenchmarkTask
from memabra.promotion import PromotionPolicy
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
for i in range(10):
app.run_task(f"Task {i}")
case_index_path = tmp_path / "case-index.json"
result = app.run_online_learning_cycle(
policy=PromotionPolicy(
min_reward_delta=-1.0,
max_error_rate_increase=1.0,
max_latency_increase_ms=10000.0,
required_task_count=1,
),
benchmark_tasks=[BenchmarkTask(user_input="Task 0")],
min_new_trajectories=1,
case_index_path=case_index_path,
)
assert result["skipped"] is False
assert case_index_path.exists()
from memabra.case_index import CaseIndex
index = CaseIndex.load(case_index_path)
assert index.best("Task 0") is not None
def test_app_build_case_index_from_trajectories(tmp_path: Path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
app.run_task("Hello world", channel="local", user_id="u1")
app.run_task("Hello world", channel="local", user_id="u2")
case_index = app.build_case_index()
assert case_index.best("Hello world") is not None
def test_app_save_and_load_case_index(tmp_path: Path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
app.run_task("Persist this case", channel="local", user_id="u1")
case_index_path = tmp_path / "case-index.json"
app.build_case_index()
app.save_case_index(case_index_path)
loaded_app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
loaded_app.load_case_index(case_index_path)
assert loaded_app.case_index is not None
assert loaded_app.case_index.best("Persist this case") is not None
def test_app_best_trajectory_for_input(tmp_path: Path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
trajectory = app.run_task("Find the best trajectory", channel="local", user_id="u1")
app.build_case_index()
best_id = app.best_trajectory_for("Find the best trajectory")
assert best_id == trajectory["trajectory_id"]

View File

@@ -0,0 +1,169 @@
from pathlib import Path
from memabra.persistence import PersistenceStore
from memabra.artifact_index import ArtifactIndex
def _make_trajectory(
trajectory_id: str,
*,
status: str = "success",
decision_type: str = "direct_answer",
channel: str = "local",
reward_total: float = 1.0,
latency_ms: int = 100,
tool_errors: int = 0,
user_corrections: int = 0,
input_text: str = "Hello",
created_at: str = "2026-01-15T10:00:00Z",
):
return {
"trajectory_id": trajectory_id,
"task": {
"task_id": f"task-{trajectory_id}",
"input": input_text,
"channel": channel,
"created_at": created_at,
"user_id": None,
},
"context_snapshot": {"conversation_summary": "", "environment_summary": "", "recent_failures": []},
"candidate_sets": {"memory": [], "skill": [], "tool": []},
"decisions": [
{
"step": 1,
"decision_type": decision_type,
"selected_ids": [],
"selected_payloads": [],
"rejected_ids": [],
"rationale": "",
"estimated_cost": 0.0,
}
],
"events": [],
"outcome": {
"status": status,
"steps": 1,
"latency_ms": latency_ms,
"user_corrections": user_corrections,
"tool_errors": tool_errors,
"notes": None,
},
"reward": {
"total": reward_total,
"components": {
"task_success": 1.0 if status == "success" else 0.0,
"retrieval_hit": 0.0,
"tool_error": 0.1 * tool_errors,
"user_correction": 0.1 * user_corrections,
"latency": 0.0,
"context_cost": 0.0,
"useful_reuse": 0.0,
},
},
}
def test_artifact_index_lists_all_trajectories(tmp_path: Path):
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
persistence.save_trajectory(_make_trajectory("traj-1", status="success"))
persistence.save_trajectory(_make_trajectory("traj-2", status="failure"))
index = ArtifactIndex(persistence_store=persistence)
results = index.query()
assert len(results) == 2
assert {r["trajectory_id"] for r in results} == {"traj-1", "traj-2"}
def test_artifact_index_filters_by_status(tmp_path: Path):
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
persistence.save_trajectory(_make_trajectory("traj-1", status="success"))
persistence.save_trajectory(_make_trajectory("traj-2", status="failure"))
persistence.save_trajectory(_make_trajectory("traj-3", status="partial_success"))
index = ArtifactIndex(persistence_store=persistence)
successes = index.query(status="success")
failures = index.query(status="failure")
assert len(successes) == 1
assert successes[0]["trajectory_id"] == "traj-1"
assert len(failures) == 1
assert failures[0]["trajectory_id"] == "traj-2"
def test_artifact_index_filters_by_reward_range(tmp_path: Path):
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
persistence.save_trajectory(_make_trajectory("traj-1", reward_total=0.9))
persistence.save_trajectory(_make_trajectory("traj-2", reward_total=0.5))
persistence.save_trajectory(_make_trajectory("traj-3", reward_total=-0.2))
index = ArtifactIndex(persistence_store=persistence)
high = index.query(min_reward=0.6)
low = index.query(max_reward=0.0)
assert len(high) == 1 and high[0]["trajectory_id"] == "traj-1"
assert len(low) == 1 and low[0]["trajectory_id"] == "traj-3"
def test_artifact_index_filters_by_decision_type_and_channel(tmp_path: Path):
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
persistence.save_trajectory(_make_trajectory("traj-1", decision_type="direct_answer", channel="local"))
persistence.save_trajectory(_make_trajectory("traj-2", decision_type="call_tool", channel="telegram"))
index = ArtifactIndex(persistence_store=persistence)
tools = index.query(decision_type="call_tool")
telegram = index.query(channel="telegram")
assert len(tools) == 1 and tools[0]["trajectory_id"] == "traj-2"
assert len(telegram) == 1 and telegram[0]["trajectory_id"] == "traj-2"
def test_artifact_index_filters_by_tool_errors_and_user_corrections(tmp_path: Path):
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
persistence.save_trajectory(_make_trajectory("traj-1", tool_errors=0, user_corrections=0))
persistence.save_trajectory(_make_trajectory("traj-2", tool_errors=2, user_corrections=1))
index = ArtifactIndex(persistence_store=persistence)
with_errors = index.query(min_tool_errors=1)
with_corrections = index.query(min_user_corrections=1)
assert len(with_errors) == 1 and with_errors[0]["trajectory_id"] == "traj-2"
assert len(with_corrections) == 1 and with_corrections[0]["trajectory_id"] == "traj-2"
def test_artifact_index_filters_by_input_text(tmp_path: Path):
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
persistence.save_trajectory(_make_trajectory("traj-1", input_text="Deploy the service"))
persistence.save_trajectory(_make_trajectory("traj-2", input_text="Check status"))
index = ArtifactIndex(persistence_store=persistence)
deploy = index.query(input_contains="deploy")
status = index.query(input_contains="STATUS")
assert len(deploy) == 1 and deploy[0]["trajectory_id"] == "traj-1"
assert len(status) == 1 and status[0]["trajectory_id"] == "traj-2"
def test_artifact_index_slice_dataset_returns_ids(tmp_path: Path):
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
persistence.save_trajectory(_make_trajectory("traj-1", status="success", reward_total=0.9))
persistence.save_trajectory(_make_trajectory("traj-2", status="failure", reward_total=-0.1))
persistence.save_trajectory(_make_trajectory("traj-3", status="success", reward_total=0.95))
index = ArtifactIndex(persistence_store=persistence)
slice_ids = index.slice_dataset(status="success", min_reward=0.8)
assert slice_ids == ["traj-1", "traj-3"]
def test_artifact_index_refresh_picks_up_new_files(tmp_path: Path):
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
persistence.save_trajectory(_make_trajectory("traj-1"))
index = ArtifactIndex(persistence_store=persistence)
assert len(index.query()) == 1
persistence.save_trajectory(_make_trajectory("traj-2"))
index.refresh()
assert len(index.query()) == 2

38
tests/test_benchmarks.py Normal file
View File

@@ -0,0 +1,38 @@
from __future__ import annotations
from memabra.benchmarks import BenchmarkSuite, BenchmarkTask, save_benchmark_suite, load_benchmark_suite, default_benchmark_suite
def test_benchmark_suite_roundtrip(tmp_path):
path = tmp_path / "suite.json"
suite = BenchmarkSuite(
name="test-suite",
tasks=[
BenchmarkTask(user_input="Hello", channel="local", user_id="u1"),
BenchmarkTask(user_input="World", channel="telegram"),
],
)
save_benchmark_suite(suite, path)
loaded = load_benchmark_suite(path)
assert loaded.name == "test-suite"
assert len(loaded.tasks) == 2
assert loaded.tasks[0].user_input == "Hello"
assert loaded.tasks[0].channel == "local"
assert loaded.tasks[0].user_id == "u1"
assert loaded.tasks[1].user_input == "World"
assert loaded.tasks[1].channel == "telegram"
assert loaded.tasks[1].user_id is None
def test_default_benchmark_suite_covers_expected_categories():
suite = default_benchmark_suite()
assert suite.name == "default"
assert len(suite.tasks) >= 4
inputs = [t.user_input.lower() for t in suite.tasks]
assert any("memory" in i or "preference" in i for i in inputs)
assert any("skill" in i or "deploy" in i for i in inputs)
assert any("tool" in i or "status" in i for i in inputs)
assert any("composite" in i or "multiple" in i for i in inputs)

50
tests/test_case_index.py Normal file
View File

@@ -0,0 +1,50 @@
from memabra.case_index import CaseIndex
def test_case_index_adds_and_retrieves_best_trajectory():
index = CaseIndex()
trajectory = {
"trajectory_id": "traj-1",
"task": {"input": "Hello world"},
"outcome": {"status": "success"},
"reward": {"total": 1.0},
}
index.add(trajectory)
assert index.best("Hello world") == "traj-1"
def test_case_index_returns_none_for_unknown_input():
index = CaseIndex()
assert index.best("Unknown input") is None
def test_case_index_keeps_higher_reward_for_same_input():
index = CaseIndex()
index.add({
"trajectory_id": "traj-low",
"task": {"input": "Same input"},
"outcome": {"status": "success"},
"reward": {"total": 0.5},
})
index.add({
"trajectory_id": "traj-high",
"task": {"input": "Same input"},
"outcome": {"status": "success"},
"reward": {"total": 1.5},
})
assert index.best("Same input") == "traj-high"
def test_case_index_save_and_round_trip(tmp_path):
index = CaseIndex()
index.add({
"trajectory_id": "traj-save",
"task": {"input": "Persist me"},
"outcome": {"status": "success"},
"reward": {"total": 2.0},
})
path = tmp_path / "case_index.json"
index.save(path)
loaded = CaseIndex.load(path)
assert loaded.best("Persist me") == "traj-save"

574
tests/test_cli_workflow.py Normal file
View File

@@ -0,0 +1,574 @@
from pathlib import Path
from memabra.cli import format_output, run_online_learning_workflow, run_wrapup_workflow
def test_run_wrapup_workflow_trains_evaluates_and_versions_router(tmp_path: Path):
result = run_wrapup_workflow(base_dir=tmp_path / "demo-artifacts")
assert result["seed_summary"]["trajectories"] >= 3
assert "baseline" in result["comparison"]
assert "challenger" in result["comparison"]
assert result["saved_version"]["version_id"]
assert (tmp_path / "demo-artifacts" / "router-versions" / "current.json").exists()
def test_run_online_learning_workflow_runs_cycle_and_returns_report(tmp_path: Path):
result = run_online_learning_workflow(base_dir=tmp_path / "demo-artifacts")
assert "skipped" in result
assert "report_id" in result
# Since it seeds tasks, it should not skip
assert result["skipped"] is False
assert result["promoted"] is True
assert (tmp_path / "demo-artifacts" / "training-reports").exists()
def test_format_output_workflow_text_includes_decision_reason_and_dry_run():
payload = {
"report_id": "report-123",
"skipped": False,
"promoted": False,
"dry_run": True,
"decision": {
"accepted": False,
"reasons": ["Reward delta too small", "Latency increased"],
"metrics": {
"reward_delta": -0.12,
"error_rate_delta": 0.02,
"latency_delta_ms": 12.5,
},
},
"baseline_metrics": {
"avg_reward": 1.0,
"error_rate": 0.1,
"avg_latency_ms": 120.0,
},
"challenger_metrics": {
"avg_reward": 0.88,
"error_rate": 0.12,
"avg_latency_ms": 132.5,
},
}
rendered = format_output(payload, output_format="text", mode="workflow")
assert "Memabra online learning result" in rendered
assert "Summary" in rendered
assert "Report ID: report-123" in rendered
assert "Skipped: no" in rendered
assert "Promoted: no" in rendered
assert "Dry run: yes" in rendered
assert "Baseline" in rendered
assert "Reward: 1.0000" in rendered
assert "Error rate: 0.1000" in rendered
assert "Latency (ms): 120.0000" in rendered
assert "Challenger" in rendered
assert "Reward: 0.8800" in rendered
assert "Deltas" in rendered
assert "Reward delta: -0.1200" in rendered
assert "Error rate delta: 0.0200" in rendered
assert "Latency delta (ms): 12.5000" in rendered
assert "Decision" in rendered
assert "Reason: Reward delta too small; Latency increased" in rendered
def test_format_output_workflow_text_includes_error_details():
payload = {
"report_id": "report-err",
"skipped": False,
"promoted": False,
"error": "benchmark crashed",
}
rendered = format_output(payload, output_format="text", mode="workflow")
assert "Error: benchmark crashed" in rendered
def test_format_output_status_text_includes_latest_report_details():
payload = {
"base_dir": "/tmp/demo-artifacts",
"current_version_id": "v2",
"version_count": 2,
"trajectory_count": 8,
"report_count": 3,
"latest_report": {
"report_id": "report-9",
"timestamp": "2026-04-15T06:00:00+00:00",
"promoted": True,
},
}
rendered = format_output(payload, output_format="text", mode="status")
assert "Memabra status" in rendered
assert "Current version: v2" in rendered
assert "Latest report: report-9" in rendered
assert "Latest report time: 2026-04-15T06:00:00+00:00" in rendered
assert "Latest promotion accepted: yes" in rendered
def test_format_output_list_versions_text_marks_current_version():
payload = {
"current_version_id": "v2",
"versions": [
{"version_id": "v1", "metadata": {"source": "seed", "avg_reward": 1.2}},
{"version_id": "v2", "metadata": {"source": "online_learning", "avg_reward": 1.4}},
],
}
rendered = format_output(payload, output_format="text", mode="list_versions")
assert "Saved router versions (2 total)" in rendered
assert "Current version: v2" in rendered
assert "1. v1 (source=seed, avg_reward=1.2)" in rendered
assert "2. v2 (current, source=online_learning, avg_reward=1.4)" in rendered
def test_main_entrypoint_uses_online_learning_workflow(monkeypatch):
from memabra import cli
calls = []
def mock_online_learning_workflow(*, base_dir=None, min_new_trajectories=3, seen_trajectory_store=None, **kwargs):
calls.append({"base_dir": str(base_dir), "min_new_trajectories": min_new_trajectories, "seen_trajectory_store": seen_trajectory_store})
return {"skipped": False, "promoted": True, "report_id": "report-test"}
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
rc = cli.main()
assert rc == 0
assert len(calls) == 1
assert calls[0]["min_new_trajectories"] == 3
def test_main_entrypoint_parses_base_dir_argument(monkeypatch):
from memabra import cli
calls = []
def mock_online_learning_workflow(*, base_dir=None, min_new_trajectories=3, seen_trajectory_store=None, **kwargs):
calls.append({"base_dir": str(base_dir) if base_dir else None, "min_new_trajectories": min_new_trajectories, "seen_trajectory_store": seen_trajectory_store})
return {"skipped": False, "promoted": True, "report_id": "report-test"}
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
rc = cli.main(["--base-dir", "/custom/path"])
assert rc == 0
assert len(calls) == 1
assert calls[0]["base_dir"] == "/custom/path"
def test_main_entrypoint_parses_min_new_trajectories_argument(monkeypatch):
from memabra import cli
calls = []
def mock_online_learning_workflow(*, base_dir=None, min_new_trajectories=3, seen_trajectory_store=None, **kwargs):
calls.append({"base_dir": str(base_dir) if base_dir else None, "min_new_trajectories": min_new_trajectories, "seen_trajectory_store": seen_trajectory_store})
return {"skipped": False, "promoted": True, "report_id": "report-test"}
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
rc = cli.main(["--min-new-trajectories", "10"])
assert rc == 0
assert len(calls) == 1
assert calls[0]["min_new_trajectories"] == 10
def test_run_online_learning_workflow_skips_on_second_run_when_seen_store_provided(tmp_path: Path):
base_dir = tmp_path / "demo-artifacts"
seen_store = tmp_path / "seen.json"
result1 = run_online_learning_workflow(
base_dir=base_dir,
min_new_trajectories=1,
seen_trajectory_store=seen_store,
)
assert result1["skipped"] is False
result2 = run_online_learning_workflow(
base_dir=base_dir,
min_new_trajectories=1,
seen_trajectory_store=seen_store,
)
assert result2["skipped"] is True
assert "too few new trajectories" in result2["reason"].lower()
def test_main_entrypoint_passes_default_seen_trajectory_store(monkeypatch):
from memabra import cli
calls = []
def mock_online_learning_workflow(*, base_dir=None, min_new_trajectories=3, seen_trajectory_store=None, dry_run=False, **kwargs):
calls.append({
"base_dir": str(base_dir) if base_dir else None,
"min_new_trajectories": min_new_trajectories,
"seen_trajectory_store": str(seen_trajectory_store) if seen_trajectory_store else None,
"dry_run": dry_run,
})
return {"skipped": False, "promoted": True, "report_id": "report-test"}
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
rc = cli.main()
assert rc == 0
assert len(calls) == 1
assert calls[0]["seen_trajectory_store"] is not None
assert "seen-trajectories.json" in calls[0]["seen_trajectory_store"]
assert calls[0]["dry_run"] is False
def test_main_entrypoint_passes_dry_run_flag(monkeypatch):
from memabra import cli
calls = []
def mock_online_learning_workflow(*, base_dir=None, min_new_trajectories=3, seen_trajectory_store=None, dry_run=False, **kwargs):
calls.append({
"base_dir": str(base_dir) if base_dir else None,
"min_new_trajectories": min_new_trajectories,
"seen_trajectory_store": str(seen_trajectory_store) if seen_trajectory_store else None,
"dry_run": dry_run,
"baseline_version": kwargs.get("baseline_version"),
})
return {"skipped": False, "promoted": True, "report_id": "report-test"}
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
rc = cli.main(["--dry-run"])
assert rc == 0
assert len(calls) == 1
assert calls[0]["dry_run"] is True
def test_main_entrypoint_passes_baseline_version_flag(monkeypatch):
from memabra import cli
calls = []
def mock_online_learning_workflow(*, base_dir=None, min_new_trajectories=3, seen_trajectory_store=None, dry_run=False, baseline_version=None, **kwargs):
calls.append({
"base_dir": str(base_dir) if base_dir else None,
"min_new_trajectories": min_new_trajectories,
"seen_trajectory_store": str(seen_trajectory_store) if seen_trajectory_store else None,
"dry_run": dry_run,
"baseline_version": baseline_version,
})
return {"skipped": False, "promoted": True, "report_id": "report-test"}
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
rc = cli.main(["--baseline-version", "v1"])
assert rc == 0
assert len(calls) == 1
assert calls[0]["baseline_version"] == "v1"
def test_main_entrypoint_supports_text_format_for_workflow(monkeypatch, capsys):
from memabra import cli
def mock_online_learning_workflow(**kwargs):
return {
"skipped": False,
"promoted": False,
"report_id": "report-text",
"dry_run": True,
"decision": {
"accepted": False,
"reasons": ["Dry run requested"],
"metrics": {
"reward_delta": 0.05,
"error_rate_delta": 0.0,
"latency_delta_ms": 4.0,
},
},
"baseline_metrics": {
"avg_reward": 0.8,
"error_rate": 0.1,
"avg_latency_ms": 90.0,
},
"challenger_metrics": {
"avg_reward": 0.85,
"error_rate": 0.1,
"avg_latency_ms": 94.0,
},
}
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
rc = cli.main(["--format", "text", "--dry-run"])
captured = capsys.readouterr()
assert rc == 0
assert "Memabra online learning result" in captured.out
assert "Summary" in captured.out
assert "Dry run: yes" in captured.out
assert "Baseline" in captured.out
assert "Reward: 0.8000" in captured.out
assert "Challenger" in captured.out
assert "Reward: 0.8500" in captured.out
assert "Deltas" in captured.out
assert "Reward delta: 0.0500" in captured.out
assert "Reason: Dry run requested" in captured.out
def test_main_entrypoint_passes_case_index_flags(monkeypatch):
from memabra import cli
calls = []
def mock_online_learning_workflow(*, base_dir=None, min_new_trajectories=3, seen_trajectory_store=None, dry_run=False, baseline_version=None, case_index_path=None, rebuild_case_index=False, **kwargs):
calls.append({
"base_dir": str(base_dir) if base_dir else None,
"case_index_path": str(case_index_path) if case_index_path else None,
"rebuild_case_index": rebuild_case_index,
})
return {"skipped": False, "promoted": True, "report_id": "report-test"}
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
rc = cli.main(["--case-index", "/tmp/cases.json", "--rebuild-case-index"])
assert rc == 0
assert len(calls) == 1
assert calls[0]["case_index_path"] == "/tmp/cases.json"
assert calls[0]["rebuild_case_index"] is True
def test_run_online_learning_workflow_loads_existing_case_index(tmp_path: Path):
base_dir = tmp_path / "demo-artifacts"
case_index_path = tmp_path / "case-index.json"
# Run once to create trajectories and rebuild case index
result1 = run_online_learning_workflow(base_dir=base_dir, min_new_trajectories=1, rebuild_case_index=True, case_index_path=case_index_path)
assert result1["skipped"] is False
assert case_index_path.exists()
# Second run should load the existing case index
result2 = run_online_learning_workflow(base_dir=base_dir, min_new_trajectories=1, rebuild_case_index=False, case_index_path=case_index_path)
assert result2["skipped"] is False
def test_run_online_learning_workflow_rebuilds_case_index_after_cycle(tmp_path: Path):
base_dir = tmp_path / "demo-artifacts"
case_index_path = tmp_path / "case-index.json"
result = run_online_learning_workflow(
base_dir=base_dir,
min_new_trajectories=1,
case_index_path=case_index_path,
)
assert result["skipped"] is False
assert case_index_path.exists()
from memabra.case_index import CaseIndex
index = CaseIndex.load(case_index_path)
# The benchmark task during the cycle should produce a trajectory that gets indexed
assert index.best("Use my telegram preference for this answer.") is not None
def test_main_entrypoint_defaults_case_index_path_when_rebuild_flag_set(monkeypatch):
from memabra import cli
calls = []
def mock_online_learning_workflow(*, base_dir=None, min_new_trajectories=3, seen_trajectory_store=None, dry_run=False, baseline_version=None, case_index_path=None, rebuild_case_index=False, **kwargs):
calls.append({
"base_dir": str(base_dir) if base_dir else None,
"case_index_path": str(case_index_path) if case_index_path else None,
"rebuild_case_index": rebuild_case_index,
})
return {"skipped": False, "promoted": True, "report_id": "report-test"}
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
rc = cli.main(["--rebuild-case-index"])
assert rc == 0
assert len(calls) == 1
assert calls[0]["rebuild_case_index"] is True
assert calls[0]["case_index_path"] is not None
assert "case-index.json" in calls[0]["case_index_path"]
def test_main_status_flag_prints_status_and_skips_workflow(tmp_path: Path, monkeypatch, capsys):
from memabra import cli
workflow_calls = []
def mock_online_learning_workflow(**kwargs):
workflow_calls.append(kwargs)
return {"skipped": False, "promoted": True, "report_id": "report-test"}
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
base_dir = tmp_path / "demo-artifacts"
base_dir.mkdir(parents=True, exist_ok=True)
rc = cli.main(["status", "--base-dir", str(base_dir)])
captured = capsys.readouterr()
assert rc == 0
assert len(workflow_calls) == 0
assert "current_version_id" in captured.out
def test_main_status_flag_supports_text_format(tmp_path: Path, monkeypatch, capsys):
from memabra import cli
workflow_calls = []
def mock_online_learning_workflow(**kwargs):
workflow_calls.append(kwargs)
return {"skipped": False, "promoted": True, "report_id": "report-test"}
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
base_dir = tmp_path / "demo-artifacts"
base_dir.mkdir(parents=True, exist_ok=True)
rc = cli.main(["status", "--format", "text", "--base-dir", str(base_dir)])
captured = capsys.readouterr()
assert rc == 0
assert len(workflow_calls) == 0
assert "Memabra status" in captured.out
assert "Current version:" in captured.out
assert "Trajectory count:" in captured.out
def test_main_rollback_flag_rolls_back_and_skips_workflow(tmp_path: Path, monkeypatch, capsys):
from memabra import cli
from memabra.router_versioning import RouterVersionStore
workflow_calls = []
rollback_calls = []
def mock_online_learning_workflow(**kwargs):
workflow_calls.append(kwargs)
return {"skipped": False, "promoted": True, "report_id": "report-test"}
def mock_rollback(self, version_id: str):
rollback_calls.append(version_id)
return {"current_version_id": version_id}
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
monkeypatch.setattr(RouterVersionStore, "rollback", mock_rollback)
base_dir = tmp_path / "demo-artifacts"
base_dir.mkdir(parents=True, exist_ok=True)
rc = cli.main(["version", "rollback", "v1", "--base-dir", str(base_dir)])
captured = capsys.readouterr()
assert rc == 0
assert len(workflow_calls) == 0
assert len(rollback_calls) == 1
assert rollback_calls[0] == "v1"
assert "current_version_id" in captured.out
def test_main_rollback_flag_supports_text_format(tmp_path: Path, monkeypatch, capsys):
from memabra import cli
from memabra.router_versioning import RouterVersionStore
def mock_rollback(self, version_id: str):
return {"current_version_id": version_id}
monkeypatch.setattr(RouterVersionStore, "rollback", mock_rollback)
base_dir = tmp_path / "demo-artifacts"
base_dir.mkdir(parents=True, exist_ok=True)
rc = cli.main(["version", "rollback", "v1", "--format", "text", "--base-dir", str(base_dir)])
captured = capsys.readouterr()
assert rc == 0
assert "Rolled back current version to: v1" in captured.out
def test_main_rollback_missing_version_prints_error_and_exits_nonzero(tmp_path: Path, monkeypatch, capsys):
from memabra import cli
from memabra.router_versioning import RouterVersionStore
def mock_rollback(self, version_id: str):
raise ValueError(f"Version '{version_id}' not found.")
monkeypatch.setattr(RouterVersionStore, "rollback", mock_rollback)
base_dir = tmp_path / "demo-artifacts"
base_dir.mkdir(parents=True, exist_ok=True)
rc = cli.main(["version", "rollback", "v99", "--base-dir", str(base_dir)])
captured = capsys.readouterr()
assert rc == 1
assert "not found" in captured.err.lower()
def test_main_list_versions_flag_prints_versions_and_skips_workflow(tmp_path: Path, monkeypatch, capsys):
from memabra import cli
from memabra.router_versioning import RouterVersionStore
workflow_calls = []
def mock_online_learning_workflow(**kwargs):
workflow_calls.append(kwargs)
return {"skipped": False, "promoted": True, "report_id": "report-test"}
def mock_list_versions(self):
return [
{"version_id": "v1", "metadata": {"source": "test"}},
{"version_id": "v2", "metadata": {"source": "test"}},
]
monkeypatch.setattr(cli, "run_online_learning_workflow", mock_online_learning_workflow)
monkeypatch.setattr(RouterVersionStore, "list_versions", mock_list_versions)
base_dir = tmp_path / "demo-artifacts"
base_dir.mkdir(parents=True, exist_ok=True)
rc = cli.main(["version", "list", "--base-dir", str(base_dir)])
captured = capsys.readouterr()
assert rc == 0
assert len(workflow_calls) == 0
assert "v1" in captured.out
assert "v2" in captured.out
def test_main_list_versions_flag_supports_text_format(tmp_path: Path, monkeypatch, capsys):
from memabra import cli
from memabra.router_versioning import RouterVersionStore
def mock_list_versions(self):
return [
{"version_id": "v1", "metadata": {"source": "seed", "avg_reward": 1.2}},
{"version_id": "v2", "metadata": {"source": "online_learning", "avg_reward": 1.4}},
]
def mock_get_current(self):
return {"current_version_id": "v2"}
monkeypatch.setattr(RouterVersionStore, "list_versions", mock_list_versions)
monkeypatch.setattr(RouterVersionStore, "get_current", mock_get_current)
base_dir = tmp_path / "demo-artifacts"
base_dir.mkdir(parents=True, exist_ok=True)
rc = cli.main(["version", "list", "--format", "text", "--base-dir", str(base_dir)])
captured = capsys.readouterr()
assert rc == 0
assert "Saved router versions (2 total)" in captured.out
assert "Current version: v2" in captured.out
assert "2. v2 (current, source=online_learning, avg_reward=1.4)" in captured.out

49
tests/test_dataset.py Normal file
View File

@@ -0,0 +1,49 @@
from memabra.dataset import DatasetBuilder, TrainingSample
def test_dataset_builder_extracts_features_and_label():
trajectories = [
{
"task": {"input": "hello world"},
"candidate_sets": {
"memory": [{"confidence": 0.8}],
"skill": [{"success_rate": 0.9}],
"tool": [{"confidence": 0.7, "risk": 0.2}],
},
"decisions": [{"decision_type": "direct_answer"}],
"reward": {"total": 0.95},
}
]
builder = DatasetBuilder()
samples = builder.build(trajectories)
assert len(samples) == 1
sample = samples[0]
assert sample.input_text == "hello world"
assert sample.label == "direct_answer"
assert sample.reward == 0.95
assert sample.features["input_length"] == 11
assert sample.features["memory_count"] == 1
assert sample.features["skill_count"] == 1
assert sample.features["tool_count"] == 1
assert sample.features["top_memory_confidence"] == 0.8
assert sample.features["top_skill_success_rate"] == 0.9
assert sample.features["top_tool_confidence"] == 0.7
assert sample.features["top_tool_risk"] == 0.2
def test_dataset_builder_handles_empty_candidates():
trajectories = [
{
"task": {"input": "hi"},
"candidate_sets": {"memory": [], "skill": [], "tool": []},
"decisions": [{"decision_type": "clarify"}],
"reward": {"total": 0.0},
}
]
builder = DatasetBuilder()
samples = builder.build(trajectories)
assert len(samples) == 1
assert samples[0].features["top_memory_confidence"] == 0.0
assert samples[0].features["top_skill_success_rate"] == 0.0
assert samples[0].features["top_tool_confidence"] == 0.0
assert samples[0].features["top_tool_risk"] == 0.0

54
tests/test_evaluator.py Normal file
View File

@@ -0,0 +1,54 @@
from memabra.app import build_demo_app
from memabra.evaluator import BenchmarkTask, Evaluator
def test_evaluator_runs_benchmark_and_reports_metrics(tmp_path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
evaluator = Evaluator(app)
tasks = [
BenchmarkTask(user_input="Use my telegram preference."),
BenchmarkTask(user_input="Check the current system status."),
]
result = evaluator.run(tasks)
assert result.task_count == 2
assert result.avg_reward >= 0.0
assert "inject_memory" in result.decision_distribution
assert "call_tool" in result.decision_distribution
assert result.error_rate == 0.0
def test_evaluator_ab_compares_two_routers(tmp_path):
from memabra.router import RuleBasedRouter, TaskContext
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
evaluator = Evaluator(app)
tasks = [
BenchmarkTask(user_input="Use my telegram preference."),
BenchmarkTask(user_input="Check the current system status."),
]
baseline = evaluator.run(tasks, router=RuleBasedRouter())
# Using same router for both arms in this test; real tests would compare different routers
challenger = evaluator.run(tasks, router=RuleBasedRouter())
comparison = evaluator.compare(baseline, challenger)
assert comparison["winner"] in ("baseline", "challenger", "tie")
assert "avg_reward_delta" in comparison
assert "error_rate_delta" in comparison
def test_app_trains_learning_router_from_artifact_index(tmp_path):
from memabra.router import SimpleLearningRouter
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
# Generate some training data
app.run_task("Use my telegram preference.", channel="local")
app.run_task("Check the current system status.", channel="local")
router = app.train_learning_router()
assert isinstance(router, SimpleLearningRouter)
# After training, the router should be able to make predictions (not fallback to clarify for known patterns)
trajectory = app.run_task("Use my telegram preference.", channel="local")
assert trajectory["reward"]["total"] >= 0.0

View File

@@ -0,0 +1,265 @@
from pathlib import Path
from memabra.candidate_types import CandidateObject
from memabra.execution import ExecutionEngine, MemoryExecutor, ToolExecutor
from memabra.memory_store import InMemoryMemoryStore, MemoryRecord, MemorySource
from memabra.persistence import PersistenceStore
from memabra.retrieval import CandidateRetriever, InMemoryCandidateProvider
from memabra.router import RouteDecision, RuleBasedRouter, TaskContext
from memabra.runner import MemabraRunner
from memabra.schemas import SchemaRegistry
class FailingToolBackend:
def run_tool(self, tool_id: str, context: TaskContext, params: dict | None = None) -> dict:
return {"status": "error", "output": None, "error": f"{tool_id} failed", "latency_ms": 123}
class MixedResultToolBackend:
def run_tool(self, tool_id: str, context: TaskContext, params: dict | None = None) -> dict:
if tool_id == "tool-ok":
return {"status": "success", "output": "ok", "error": None, "latency_ms": 50}
return {"status": "error", "output": None, "error": f"{tool_id} failed", "latency_ms": 100}
class StaticSkillBackend:
def load_skill(self, skill_id: str) -> dict:
return {"skill_id": skill_id, "instructions": "Follow the documented deployment workflow."}
def test_execution_engine_marks_memory_used_and_runner_persists(tmp_path: Path):
memory_store = InMemoryMemoryStore()
memory_store.upsert(
MemoryRecord(
id="mem-telegram-pref",
memory_type="semantic",
fact_status="verified",
content="Prefer plain text on Telegram.",
summary="Telegram preference",
source=MemorySource(kind="user", ref="session-1"),
confidence=0.95,
)
)
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="memory",
candidates=[
CandidateObject(
id="mem-telegram-pref",
type="memory",
title="Telegram preference",
summary="Prefer plain text on Telegram.",
triggers=["telegram", "preference"],
confidence=0.95,
success_rate=0.9,
freshness=0.9,
)
],
)
]
)
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
runner = MemabraRunner(
retriever=retriever,
router=RuleBasedRouter(),
execution_engine=ExecutionEngine(memory_executor=MemoryExecutor(memory_store=memory_store)),
persistence_store=persistence,
memory_store=memory_store,
)
trajectory = runner.run(
context=TaskContext(user_input="Use my telegram preference for this answer."),
channel="telegram",
user_id="oza",
persist=True,
)
SchemaRegistry().validate_trajectory(trajectory)
assert any(event["event_type"] == "memory_injected" for event in trajectory["events"])
assert memory_store.get("mem-telegram-pref").last_used_at is not None
assert persistence.load_trajectory(trajectory["trajectory_id"])["trajectory_id"] == trajectory["trajectory_id"]
def test_persistence_store_round_trip_memory_record(tmp_path: Path):
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
record = MemoryRecord(
id="mem-1",
memory_type="semantic",
fact_status="assumed",
content="User likes concise replies.",
summary="Concise reply preference",
source=MemorySource(kind="user", ref="session-2"),
confidence=0.7,
)
persistence.save_memory_record(record)
loaded = persistence.load_memory_record("mem-1")
assert loaded["id"] == "mem-1"
assert len(persistence.list_memory_paths()) == 1
def test_runner_records_tool_failures_in_outcome_and_reward(tmp_path: Path):
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="tool",
candidates=[
CandidateObject(
id="tool-terminal",
type="tool",
title="terminal",
summary="Run terminal commands.",
triggers=["check", "current"],
confidence=0.95,
success_rate=0.9,
freshness=1.0,
)
],
)
]
)
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
runner = MemabraRunner(
retriever=retriever,
router=RuleBasedRouter(),
execution_engine=ExecutionEngine(tool_backend=FailingToolBackend()),
persistence_store=persistence,
)
trajectory = runner.run(
context=TaskContext(user_input="Check the current status."),
channel="telegram",
persist=True,
)
assert trajectory["outcome"]["status"] == "failure"
assert trajectory["outcome"]["tool_errors"] == 1
assert trajectory["reward"]["components"]["tool_error"] > 0
assert trajectory["reward"]["components"]["latency"] > 0
assert any(event["event_type"] == "tool_result" for event in trajectory["events"])
def test_runner_loads_skill_payload_from_backend():
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="skill",
candidates=[
CandidateObject(
id="skill-deploy",
type="skill",
title="deploy workflow",
summary="Reusable deployment procedure.",
triggers=["deploy", "workflow"],
confidence=0.9,
success_rate=0.95,
freshness=0.8,
)
],
)
]
)
runner = MemabraRunner(
retriever=retriever,
router=RuleBasedRouter(),
execution_engine=ExecutionEngine(skill_backend=StaticSkillBackend()),
)
trajectory = runner.run(context=TaskContext(user_input="Deploy this service with the usual workflow."))
skill_events = [event for event in trajectory["events"] if event["event_type"] == "skill_loaded"]
assert skill_events
assert skill_events[0]["payload"]["instructions"] == "Follow the documented deployment workflow."
def test_runner_detects_partial_success_for_mixed_tool_results():
class BothToolsRouter:
def choose(self, context, memory, skill, tool):
from memabra.router import RouteDecision
return RouteDecision(
decision_type="call_tool",
selected_ids=["tool-ok", "tool-fail"],
selected_payloads=[{}, {}],
rationale="Force both tools for testing.",
)
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="tool",
candidates=[
CandidateObject(
id="tool-ok",
type="tool",
title="ok tool",
summary="Always succeeds.",
triggers=["check", "current"],
confidence=0.95,
success_rate=0.9,
freshness=1.0,
),
CandidateObject(
id="tool-fail",
type="tool",
title="failing tool",
summary="Always fails.",
triggers=["check", "current"],
confidence=0.9,
success_rate=0.5,
freshness=1.0,
),
],
)
]
)
runner = MemabraRunner(
retriever=retriever,
router=BothToolsRouter(),
execution_engine=ExecutionEngine(tool_backend=MixedResultToolBackend()),
)
trajectory = runner.run(
context=TaskContext(user_input="Check the current status."),
channel="local",
)
assert trajectory["outcome"]["status"] == "partial_success"
assert trajectory["outcome"]["tool_errors"] == 1
assert trajectory["reward"]["components"]["tool_error"] > 0
assert trajectory["reward"]["components"]["context_cost"] > 0
def test_execution_engine_executes_composite_action_sequentially():
memory_store = InMemoryMemoryStore()
memory_store.upsert(
MemoryRecord(
id="mem-1",
memory_type="semantic",
fact_status="verified",
content="Prefer concise replies.",
summary="Concise preference",
source=MemorySource(kind="user", ref="session-1"),
confidence=0.9,
)
)
engine = ExecutionEngine(
memory_executor=MemoryExecutor(memory_store=memory_store),
tool_executor=ToolExecutor(backend=MixedResultToolBackend()),
)
decision = RouteDecision(
decision_type="composite_action",
composite_steps=[
RouteDecision(decision_type="inject_memory", selected_ids=["mem-1"]),
RouteDecision(decision_type="call_tool", selected_ids=["tool-ok"], selected_payloads=[{}]),
],
)
result = engine.execute(decision, TaskContext(user_input="composite test"), trajectory_id="traj-comp")
assert result.status == "executed"
assert any(event.event_type == "memory_injected" for event in result.events)
assert any(event.event_type == "tool_result" for event in result.events)
assert len(result.details["steps"]) == 2
assert result.details["steps"][0]["decision_type"] == "inject_memory"
assert result.details["steps"][1]["decision_type"] == "call_tool"

View File

@@ -0,0 +1,91 @@
from memabra.candidate_types import CandidateObject
from memabra.dataset import TrainingSample
from memabra.router import SimpleLearningRouter, TaskContext
def test_learning_router_fits_and_predicts():
router = SimpleLearningRouter()
samples = [
TrainingSample(
input_text="run tool",
features={
"input_length": 8,
"memory_count": 0,
"skill_count": 0,
"tool_count": 1,
"top_memory_confidence": 0.0,
"top_skill_success_rate": 0.0,
"top_tool_confidence": 0.9,
"top_tool_risk": 0.1,
},
label="call_tool",
reward=1.0,
),
TrainingSample(
input_text="remember",
features={
"input_length": 8,
"memory_count": 1,
"skill_count": 0,
"tool_count": 0,
"top_memory_confidence": 0.9,
"top_skill_success_rate": 0.0,
"top_tool_confidence": 0.0,
"top_tool_risk": 0.0,
},
label="inject_memory",
reward=1.0,
),
]
router.fit(samples)
tool = CandidateObject(
id="t1",
type="tool",
title="t",
summary="s",
triggers=[],
confidence=0.9,
success_rate=0.9,
freshness=0.9,
cost=0.0,
risk=0.1,
)
decision = router.choose(
TaskContext(user_input="run tool"),
memory_candidates=[],
skill_candidates=[],
tool_candidates=[tool],
)
assert decision.decision_type == "call_tool"
mem = CandidateObject(
id="m1",
type="memory",
title="m",
summary="s",
triggers=[],
confidence=0.9,
success_rate=0.9,
freshness=0.9,
cost=0.0,
risk=0.0,
)
decision = router.choose(
TaskContext(user_input="remember"),
memory_candidates=[mem],
skill_candidates=[],
tool_candidates=[],
)
assert decision.decision_type == "inject_memory"
def test_learning_router_falls_back_to_clarify_when_untrained():
router = SimpleLearningRouter()
decision = router.choose(
TaskContext(user_input="hi"),
memory_candidates=[],
skill_candidates=[],
tool_candidates=[],
)
assert decision.decision_type == "clarify"

View File

@@ -0,0 +1,27 @@
from memabra.memory_store import InMemoryMemoryStore, MemoryRecord, MemorySource
from memabra.schemas import SchemaRegistry
def test_memory_store_verify_and_revoke_round_trip():
store = InMemoryMemoryStore()
record = MemoryRecord(
id="mem-pref-1",
memory_type="semantic",
fact_status="assumed",
content="User prefers plain text on Telegram.",
summary="Telegram plain-text preference",
source=MemorySource(kind="user", ref="session-1"),
confidence=0.9,
)
store.upsert(record)
store.verify("mem-pref-1", status="confirmed", check_method="user-confirmed")
store.mark_used("mem-pref-1")
store.revoke("mem-pref-1", reason="User changed preference")
updated = store.get("mem-pref-1")
assert updated is not None
assert updated.verification.status == "confirmed"
assert updated.last_used_at is not None
assert updated.fact_status == "revoked"
SchemaRegistry().validate_memory_record(updated.to_dict())

View File

@@ -0,0 +1,348 @@
from __future__ import annotations
from memabra.app import build_demo_app
from memabra.benchmarks import BenchmarkTask
from memabra.dataset import DatasetBuilder
from memabra.evaluator import Evaluator
from memabra.online_learning import OnlineLearningCoordinator
from memabra.promotion import PromotionPolicy
from memabra.router_versioning import RouterVersionStore
def _seed_trajectories(app, count: int):
for i in range(count):
app.run_task(f"Test task {i}", channel="local")
def test_coordinator_skips_when_too_few_new_trajectories(tmp_path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
_seed_trajectories(app, 2)
coordinator = OnlineLearningCoordinator(
app=app,
policy=PromotionPolicy(
min_reward_delta=0.01,
max_error_rate_increase=0.05,
max_latency_increase_ms=100.0,
required_task_count=1,
),
benchmark_tasks=[BenchmarkTask(user_input="test")],
min_new_trajectories=5,
)
result = coordinator.run_cycle()
assert result["skipped"] is True
assert "too few new trajectories" in result["reason"].lower()
def test_coordinator_rejects_when_policy_fails(tmp_path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
# Seed enough trajectories for training and benchmarking
_seed_trajectories(app, 10)
# Use a very strict policy that will reject any challenger
policy = PromotionPolicy(
min_reward_delta=1.0, # impossible to meet
max_error_rate_increase=0.0,
max_latency_increase_ms=0.0,
required_task_count=1,
)
coordinator = OnlineLearningCoordinator(
app=app,
policy=policy,
benchmark_tasks=[BenchmarkTask(user_input="Test task 0")],
min_new_trajectories=1,
version_store_base_dir=tmp_path / "versions",
)
result = coordinator.run_cycle()
assert result["skipped"] is False
assert result["promoted"] is False
assert "decision" in result
assert result["decision"].accepted is False
def test_coordinator_accepts_and_saves_version_when_policy_passes(tmp_path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
_seed_trajectories(app, 10)
# Lenient policy that should pass
policy = PromotionPolicy(
min_reward_delta=-1.0, # always passes
max_error_rate_increase=1.0,
max_latency_increase_ms=10000.0,
required_task_count=1,
)
version_dir = tmp_path / "versions"
report_dir = tmp_path / "reports"
coordinator = OnlineLearningCoordinator(
app=app,
policy=policy,
benchmark_tasks=[BenchmarkTask(user_input="Test task 0")],
min_new_trajectories=1,
version_store_base_dir=version_dir,
report_store_base_dir=report_dir,
)
result = coordinator.run_cycle()
assert result["skipped"] is False
assert result["promoted"] is True
assert "version_id" in result
assert result["decision"].accepted is True
# Verify version was saved
store = RouterVersionStore(base_dir=version_dir)
versions = store.list_versions()
assert len(versions) == 1
assert versions[0]["version_id"] == result["version_id"]
# Verify report was saved
from memabra.training_reports import TrainingReportStore
report_store = TrainingReportStore(base_dir=report_dir)
reports = report_store.list_reports()
assert len(reports) == 1
assert reports[0]["promoted_version_id"] == result["version_id"]
def test_coordinator_saves_report_on_rejection(tmp_path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
_seed_trajectories(app, 10)
policy = PromotionPolicy(
min_reward_delta=1.0,
max_error_rate_increase=0.0,
max_latency_increase_ms=0.0,
required_task_count=1,
)
report_dir = tmp_path / "reports"
coordinator = OnlineLearningCoordinator(
app=app,
policy=policy,
benchmark_tasks=[BenchmarkTask(user_input="Test task 0")],
min_new_trajectories=1,
report_store_base_dir=report_dir,
)
result = coordinator.run_cycle()
assert result["promoted"] is False
from memabra.training_reports import TrainingReportStore
report_store = TrainingReportStore(base_dir=report_dir)
reports = report_store.list_reports()
assert len(reports) == 1
assert reports[0]["promotion_decision"]["accepted"] is False
def test_coordinator_catches_training_exception_and_returns_error_report(tmp_path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
_seed_trajectories(app, 10)
policy = PromotionPolicy(
min_reward_delta=-1.0,
max_error_rate_increase=1.0,
max_latency_increase_ms=10000.0,
required_task_count=1,
)
report_dir = tmp_path / "reports"
coordinator = OnlineLearningCoordinator(
app=app,
policy=policy,
benchmark_tasks=[BenchmarkTask(user_input="Test task 0")],
min_new_trajectories=1,
report_store_base_dir=report_dir,
)
# Force a training failure by monkeypatching DatasetBuilder.build to raise
original_build = DatasetBuilder.build
DatasetBuilder.build = lambda self, trajectories: (_ for _ in ()).throw(RuntimeError("simulated training failure"))
try:
result = coordinator.run_cycle()
finally:
DatasetBuilder.build = original_build
assert result["skipped"] is False
assert result["promoted"] is False
assert "error" in result
assert "simulated training failure" in result["error"]
# Verify error report was saved
from memabra.training_reports import TrainingReportStore
report_store = TrainingReportStore(base_dir=report_dir)
reports = report_store.list_reports()
assert len(reports) == 1
assert reports[0]["promotion_decision"]["accepted"] is False
assert "simulated training failure" in reports[0]["promotion_decision"]["reasons"][0]
def test_coordinator_persists_seen_trajectory_ids_across_restarts(tmp_path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
_seed_trajectories(app, 5)
policy = PromotionPolicy(
min_reward_delta=-1.0,
max_error_rate_increase=1.0,
max_latency_increase_ms=10000.0,
required_task_count=1,
)
benchmark_tasks = [BenchmarkTask(user_input="Test task 0")]
seen_store = tmp_path / "seen_trajectories.json"
version_dir = tmp_path / "versions"
report_dir = tmp_path / "reports"
coordinator1 = OnlineLearningCoordinator(
app=app,
policy=policy,
benchmark_tasks=benchmark_tasks,
min_new_trajectories=1,
version_store_base_dir=version_dir,
report_store_base_dir=report_dir,
seen_trajectory_store=seen_store,
)
result1 = coordinator1.run_cycle()
assert result1["skipped"] is False
# New coordinator instance pointing to same store
coordinator2 = OnlineLearningCoordinator(
app=app,
policy=policy,
benchmark_tasks=benchmark_tasks,
min_new_trajectories=1,
version_store_base_dir=version_dir,
report_store_base_dir=report_dir,
seen_trajectory_store=seen_store,
)
result2 = coordinator2.run_cycle()
assert result2["skipped"] is True
assert "too few new trajectories" in result2["reason"].lower()
def test_coordinator_dry_run_does_not_promote_or_save_version(tmp_path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
_seed_trajectories(app, 10)
policy = PromotionPolicy(
min_reward_delta=-1.0,
max_error_rate_increase=1.0,
max_latency_increase_ms=10000.0,
required_task_count=1,
)
version_dir = tmp_path / "versions"
report_dir = tmp_path / "reports"
coordinator = OnlineLearningCoordinator(
app=app,
policy=policy,
benchmark_tasks=[BenchmarkTask(user_input="Test task 0")],
min_new_trajectories=1,
version_store_base_dir=version_dir,
report_store_base_dir=report_dir,
)
result = coordinator.run_cycle(dry_run=True)
assert result["skipped"] is False
assert result["promoted"] is False
assert "decision" in result
assert result["decision"].accepted is True # policy would accept, but dry_run blocks promotion
# No version should be saved
store = RouterVersionStore(base_dir=version_dir)
assert len(store.list_versions()) == 0
# Report should still be saved for audit
from memabra.training_reports import TrainingReportStore
report_store = TrainingReportStore(base_dir=report_dir)
reports = report_store.list_reports()
assert len(reports) == 1
assert reports[0].get("dry_run") is True
def test_coordinator_rebuilds_case_index_when_path_provided(tmp_path):
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
_seed_trajectories(app, 10)
policy = PromotionPolicy(
min_reward_delta=-1.0,
max_error_rate_increase=1.0,
max_latency_increase_ms=10000.0,
required_task_count=1,
)
case_index_path = tmp_path / "case-index.json"
coordinator = OnlineLearningCoordinator(
app=app,
policy=policy,
benchmark_tasks=[BenchmarkTask(user_input="Test task 0")],
min_new_trajectories=1,
case_index_path=case_index_path,
)
result = coordinator.run_cycle()
assert result["skipped"] is False
assert case_index_path.exists()
from memabra.case_index import CaseIndex
index = CaseIndex.load(case_index_path)
assert index.best("Test task 0") is not None
def test_coordinator_uses_specified_baseline_version(tmp_path):
from memabra.router import SimpleLearningRouter
app = build_demo_app(base_dir=tmp_path / "demo-artifacts")
_seed_trajectories(app, 10)
# Save a baseline version with known weights
baseline_router = SimpleLearningRouter()
baseline_router._weights = {"call_tool": {"input_length": 0.99}}
baseline_router._feature_keys = ["input_length"]
version_dir = tmp_path / "versions"
store = RouterVersionStore(base_dir=version_dir)
store.save(baseline_router, version_id="v-baseline", metadata={"note": "baseline"})
# Change app's current router to something different
different_router = SimpleLearningRouter()
different_router._weights = {"clarify": {"input_length": 0.01}}
different_router._feature_keys = ["input_length"]
app.set_router(different_router)
policy = PromotionPolicy(
min_reward_delta=-1.0,
max_error_rate_increase=1.0,
max_latency_increase_ms=10000.0,
required_task_count=1,
)
report_dir = tmp_path / "reports"
coordinator = OnlineLearningCoordinator(
app=app,
policy=policy,
benchmark_tasks=[BenchmarkTask(user_input="Test task 0")],
min_new_trajectories=1,
version_store_base_dir=version_dir,
report_store_base_dir=report_dir,
)
result = coordinator.run_cycle(baseline_version_id="v-baseline")
assert result["skipped"] is False
assert "baseline_metrics" in result
assert "challenger_metrics" in result
# Verify report records the baseline version
from memabra.training_reports import TrainingReportStore
report_store = TrainingReportStore(base_dir=report_dir)
reports = report_store.list_reports()
assert len(reports) == 1
assert reports[0].get("baseline_version_id") == "v-baseline"

View File

@@ -0,0 +1,126 @@
from memabra.execution import ActionResult
from memabra.outcome import OutcomeEngine, RewardEngine
from memabra.retrieval import RetrievalResult
from memabra.router import RouteDecision, TaskContext
from memabra.telemetry import RewardBreakdown
def test_outcome_engine_success_for_memory_injection():
engine = OutcomeEngine()
decision = RouteDecision(decision_type="inject_memory", selected_ids=["mem-1"])
result = ActionResult(decision_type="inject_memory", status="executed", details={"latency_ms": 50})
outcome = engine.build_outcome(decision, result)
assert outcome.status == "success"
assert outcome.steps == 1
assert outcome.latency_ms == 50
assert outcome.tool_errors == 0
def test_outcome_engine_failure_for_tool_error():
engine = OutcomeEngine()
decision = RouteDecision(decision_type="call_tool", selected_ids=["tool-1"])
result = ActionResult(decision_type="call_tool", status="error", details={"latency_ms": 120})
outcome = engine.build_outcome(decision, result)
assert outcome.status == "failure"
assert outcome.latency_ms == 120
assert outcome.tool_errors == 1
def test_outcome_engine_counts_multiple_tool_errors():
engine = OutcomeEngine()
decision = RouteDecision(decision_type="call_tool", selected_ids=["tool-1", "tool-2"])
result = ActionResult(
decision_type="call_tool",
status="error",
details={
"latency_ms": 200,
"results": [
{"tool_id": "tool-1", "status": "error"},
{"tool_id": "tool-2", "status": "error"},
],
},
)
outcome = engine.build_outcome(decision, result)
assert outcome.status == "failure"
assert outcome.tool_errors == 2
def test_outcome_engine_partial_success_for_mixed_tool_results():
engine = OutcomeEngine()
decision = RouteDecision(decision_type="call_tool", selected_ids=["tool-1", "tool-2"])
result = ActionResult(
decision_type="call_tool",
status="error",
details={
"latency_ms": 200,
"results": [
{"tool_id": "tool-1", "status": "success"},
{"tool_id": "tool-2", "status": "error"},
],
},
)
outcome = engine.build_outcome(decision, result)
assert outcome.status == "partial_success"
assert outcome.tool_errors == 1
def test_reward_engine_penalizes_latency_by_tier():
outcome_engine = OutcomeEngine()
reward_engine = RewardEngine()
decision = RouteDecision(decision_type="call_tool")
outcome_fast = outcome_engine.build_outcome(decision, ActionResult(decision_type="call_tool", status="success", details={"latency_ms": 200}))
outcome_slow = outcome_engine.build_outcome(decision, ActionResult(decision_type="call_tool", status="success", details={"latency_ms": 2500}))
reward_fast = reward_engine.compute(decision, outcome_fast)
reward_slow = reward_engine.compute(decision, outcome_slow)
assert reward_fast.latency < reward_slow.latency
assert reward_slow.latency > 0.5
def test_reward_engine_context_cost_based_on_candidate_count():
from memabra.candidate_types import CandidateObject
outcome_engine = OutcomeEngine()
reward_engine = RewardEngine()
decision = RouteDecision(decision_type="direct_answer")
outcome = outcome_engine.build_outcome(decision, ActionResult(decision_type="direct_answer", status="skipped", details={"latency_ms": 0}))
dummy_candidate = CandidateObject(id="c1", type="memory", title="t", summary="s", triggers=[])
retrieval = RetrievalResult(memory=[dummy_candidate, dummy_candidate, dummy_candidate], skill=[dummy_candidate, dummy_candidate], tool=[dummy_candidate])
reward = reward_engine.compute(decision, outcome, retrieval_result=retrieval)
assert reward.context_cost > 0
def test_reward_engine_reduces_task_success_for_multiple_errors():
outcome_engine = OutcomeEngine()
reward_engine = RewardEngine()
decision = RouteDecision(decision_type="call_tool")
outcome = outcome_engine.build_outcome(
decision,
ActionResult(
decision_type="call_tool",
status="error",
details={
"latency_ms": 100,
"results": [
{"tool_id": "tool-1", "status": "error"},
{"tool_id": "tool-2", "status": "error"},
],
},
),
)
reward = reward_engine.compute(decision, outcome)
assert reward.task_success < 0.5
assert reward.tool_error >= 0.5

View File

@@ -0,0 +1,22 @@
def test_memabra_package_exports_alpha_modules():
from src import memabra
assert hasattr(memabra, "promotion")
assert hasattr(memabra, "benchmarks")
assert hasattr(memabra, "online_learning")
assert hasattr(memabra, "training_reports")
def test_memabra_top_level_imports():
from memabra import PromotionPolicy, BenchmarkSuite, OnlineLearningCoordinator, TrainingReportStore, CaseIndex
assert PromotionPolicy is not None
assert BenchmarkSuite is not None
assert OnlineLearningCoordinator is not None
assert TrainingReportStore is not None
assert CaseIndex is not None
def test_benchmark_task_exported_from_package():
from memabra import BenchmarkTask
assert BenchmarkTask is not None

112
tests/test_promotion.py Normal file
View File

@@ -0,0 +1,112 @@
from __future__ import annotations
import pytest
from memabra.promotion import PromotionDecision, PromotionPolicy
from memabra.evaluator import EvaluationResult
class TestPromotionPolicy:
def test_accepted_when_challenger_improves_on_all_metrics(self):
policy = PromotionPolicy(
min_reward_delta=0.01,
max_error_rate_increase=0.05,
max_latency_increase_ms=100.0,
required_task_count=2,
)
baseline = EvaluationResult(
task_count=2,
avg_reward=0.5,
error_rate=0.1,
avg_latency_ms=50.0,
)
challenger = EvaluationResult(
task_count=2,
avg_reward=0.6,
error_rate=0.05,
avg_latency_ms=45.0,
)
decision = policy.evaluate(baseline, challenger)
assert isinstance(decision, PromotionDecision)
assert decision.accepted is True
assert decision.reasons == []
assert decision.metrics["reward_delta"] == pytest.approx(0.1, abs=0.001)
assert decision.metrics["error_rate_delta"] == pytest.approx(-0.05, abs=0.001)
assert decision.metrics["latency_delta_ms"] == pytest.approx(-5.0, abs=0.001)
def test_rejected_when_reward_delta_below_minimum(self):
policy = PromotionPolicy(
min_reward_delta=0.1,
max_error_rate_increase=0.05,
max_latency_increase_ms=100.0,
required_task_count=2,
)
baseline = EvaluationResult(task_count=2, avg_reward=0.5, error_rate=0.1, avg_latency_ms=50.0)
challenger = EvaluationResult(task_count=2, avg_reward=0.55, error_rate=0.1, avg_latency_ms=50.0)
decision = policy.evaluate(baseline, challenger)
assert decision.accepted is False
assert any("reward" in r.lower() for r in decision.reasons)
def test_rejected_when_error_rate_increase_exceeds_max(self):
policy = PromotionPolicy(
min_reward_delta=0.01,
max_error_rate_increase=0.05,
max_latency_increase_ms=100.0,
required_task_count=2,
)
baseline = EvaluationResult(task_count=2, avg_reward=0.5, error_rate=0.1, avg_latency_ms=50.0)
challenger = EvaluationResult(task_count=2, avg_reward=0.6, error_rate=0.2, avg_latency_ms=50.0)
decision = policy.evaluate(baseline, challenger)
assert decision.accepted is False
assert any("error" in r.lower() for r in decision.reasons)
def test_rejected_when_latency_increase_exceeds_max(self):
policy = PromotionPolicy(
min_reward_delta=0.01,
max_error_rate_increase=0.05,
max_latency_increase_ms=10.0,
required_task_count=2,
)
baseline = EvaluationResult(task_count=2, avg_reward=0.5, error_rate=0.1, avg_latency_ms=50.0)
challenger = EvaluationResult(task_count=2, avg_reward=0.6, error_rate=0.1, avg_latency_ms=65.0)
decision = policy.evaluate(baseline, challenger)
assert decision.accepted is False
assert any("latency" in r.lower() for r in decision.reasons)
def test_rejected_when_task_count_below_required(self):
policy = PromotionPolicy(
min_reward_delta=0.01,
max_error_rate_increase=0.05,
max_latency_increase_ms=100.0,
required_task_count=5,
)
baseline = EvaluationResult(task_count=2, avg_reward=0.5, error_rate=0.1, avg_latency_ms=50.0)
challenger = EvaluationResult(task_count=2, avg_reward=0.6, error_rate=0.1, avg_latency_ms=50.0)
decision = policy.evaluate(baseline, challenger)
assert decision.accepted is False
assert any("task count" in r.lower() for r in decision.reasons)
def test_multiple_rejection_reasons_accumulate(self):
policy = PromotionPolicy(
min_reward_delta=0.2,
max_error_rate_increase=0.01,
max_latency_increase_ms=10.0,
required_task_count=10,
)
baseline = EvaluationResult(task_count=2, avg_reward=0.5, error_rate=0.1, avg_latency_ms=50.0)
challenger = EvaluationResult(task_count=2, avg_reward=0.55, error_rate=0.15, avg_latency_ms=70.0)
decision = policy.evaluate(baseline, challenger)
assert decision.accepted is False
assert len(decision.reasons) >= 3

57
tests/test_replay.py Normal file
View File

@@ -0,0 +1,57 @@
from pathlib import Path
from memabra.persistence import PersistenceStore
from memabra.replay import TrajectoryReplay
EXAMPLE_DIR = "docs/examples"
def test_replay_summary_counts_outcomes_and_actions():
replay = TrajectoryReplay()
summary = replay.summarize_directory(EXAMPLE_DIR)
assert summary.trajectories == 4
assert summary.success_count == 2
assert summary.partial_success_count == 1
assert summary.failure_count == 1
assert summary.direct_answer_count == 1
assert summary.memory_action_count == 1
assert summary.tool_action_count == 2
assert summary.skill_action_count == 0
def test_replay_can_summarize_persisted_artifacts(tmp_path: Path):
persistence = PersistenceStore(base_dir=tmp_path / "artifacts")
persistence.save_trajectory(
{
"trajectory_id": "traj-1",
"task": {"task_id": "task-1", "input": "A", "channel": "local", "created_at": "2026-01-01T00:00:00Z", "user_id": None},
"context_snapshot": {"conversation_summary": "", "environment_summary": "", "recent_failures": []},
"candidate_sets": {"memory": [], "skill": [], "tool": []},
"decisions": [{"step": 1, "decision_type": "direct_answer", "selected_ids": [], "rejected_ids": [], "rationale": "", "estimated_cost": 0}],
"events": [],
"outcome": {"status": "success", "steps": 1, "latency_ms": 10, "user_corrections": 0, "tool_errors": 0, "notes": None},
"reward": {"total": 1.0, "components": {"task_success": 1.0, "retrieval_hit": 0.0, "tool_error": 0.0, "user_correction": 0.0, "latency": 0.0, "context_cost": 0.0, "useful_reuse": 0.0}},
}
)
persistence.save_trajectory(
{
"trajectory_id": "traj-2",
"task": {"task_id": "task-2", "input": "B", "channel": "local", "created_at": "2026-01-01T00:00:00Z", "user_id": None},
"context_snapshot": {"conversation_summary": "", "environment_summary": "", "recent_failures": []},
"candidate_sets": {"memory": [], "skill": [], "tool": []},
"decisions": [{"step": 1, "decision_type": "call_tool", "selected_ids": ["tool-1"], "rejected_ids": [], "rationale": "", "estimated_cost": 0.1}],
"events": [],
"outcome": {"status": "failure", "steps": 1, "latency_ms": 50, "user_corrections": 0, "tool_errors": 1, "notes": None},
"reward": {"total": -0.2, "components": {"task_success": 0.2, "retrieval_hit": 0.0, "tool_error": 0.3, "user_correction": 0.0, "latency": 0.05, "context_cost": 0.0, "useful_reuse": 0.0}},
}
)
replay = TrajectoryReplay()
summary = replay.summarize_persistence_store(persistence)
assert summary.trajectories == 2
assert summary.success_count == 1
assert summary.failure_count == 1
assert summary.tool_action_count == 1

45
tests/test_retrieval.py Normal file
View File

@@ -0,0 +1,45 @@
from memabra.candidate_types import CandidateObject
from memabra.retrieval import CandidateRetriever, InMemoryCandidateProvider
from memabra.router import TaskContext
def test_retriever_ranks_trigger_matches_first():
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="memory",
candidates=[
CandidateObject(
id="mem-weak",
type="memory",
title="Generic preference",
summary="A weak preference record",
confidence=0.4,
success_rate=0.4,
freshness=0.4,
triggers=["generic"],
),
CandidateObject(
id="mem-strong",
type="memory",
title="Formatting preference",
summary="Telegram prefers plain text",
confidence=0.8,
success_rate=0.9,
freshness=0.9,
triggers=["telegram", "formatting"],
tags=["output"],
),
],
)
]
)
result = retriever.retrieve(
TaskContext(user_input="Use my telegram formatting preference for the output."),
top_k=2,
)
assert [candidate.id for candidate in result.memory] == ["mem-strong", "mem-weak"]
assert result.skill == []
assert result.tool == []

View File

@@ -0,0 +1,137 @@
from memabra.candidate_types import CandidateObject
from memabra.router import FeatureScoringRouter, TaskContext
def test_feature_scoring_router_computes_score_breakdown_and_selects_best():
router = FeatureScoringRouter()
memory = CandidateObject(
id="mem-1",
type="memory",
title="m1",
summary="s1",
confidence=0.9,
success_rate=0.9,
freshness=0.9,
cost=0.1,
risk=0.1,
)
tool = CandidateObject(
id="tool-1",
type="tool",
title="t1",
summary="s1",
confidence=0.8,
success_rate=0.8,
freshness=0.8,
cost=0.1,
risk=0.1,
)
decision = router.choose(
TaskContext(user_input="do something"),
memory_candidates=[memory],
skill_candidates=[],
tool_candidates=[tool],
)
assert decision.decision_type == "inject_memory"
assert "mem-1" in decision.score_breakdown
assert "tool-1" in decision.score_breakdown
assert decision.score_breakdown["mem-1"] > decision.score_breakdown["tool-1"]
def test_feature_scoring_router_applies_failure_penalty():
router = FeatureScoringRouter()
tool_a = CandidateObject(
id="tool-a",
type="tool",
title="ta",
summary="sa",
confidence=0.9,
success_rate=0.9,
freshness=0.9,
cost=0.0,
risk=0.0,
)
tool_b = CandidateObject(
id="tool-b",
type="tool",
title="tb",
summary="sb",
confidence=0.9,
success_rate=0.9,
freshness=0.9,
cost=0.0,
risk=0.0,
)
context = TaskContext(user_input="run tool", recent_failures=["tool-b"])
decision = router.choose(
context,
memory_candidates=[],
skill_candidates=[],
tool_candidates=[tool_a, tool_b],
)
assert decision.decision_type == "call_tool"
assert decision.selected_ids == ["tool-a"]
assert decision.score_breakdown["tool-b"] < decision.score_breakdown["tool-a"]
def test_feature_scoring_router_emits_composite_action_for_preconditions():
router = FeatureScoringRouter()
memory = CandidateObject(
id="mem-1",
type="memory",
title="m1",
summary="s1",
confidence=0.7,
success_rate=0.5,
freshness=0.3,
cost=0.0,
risk=0.0,
)
tool = CandidateObject(
id="tool-1",
type="tool",
title="t1",
summary="s1",
confidence=0.9,
success_rate=0.9,
freshness=0.9,
cost=0.0,
risk=0.0,
preconditions=["memory"],
)
decision = router.choose(
TaskContext(user_input="run tool"),
memory_candidates=[memory],
skill_candidates=[],
tool_candidates=[tool],
)
assert decision.decision_type == "composite_action"
assert len(decision.composite_steps) == 2
assert decision.composite_steps[0].decision_type == "inject_memory"
assert decision.composite_steps[0].selected_ids == ["mem-1"]
assert decision.composite_steps[1].decision_type == "call_tool"
assert decision.composite_steps[1].selected_ids == ["tool-1"]
def test_feature_scoring_router_fallback_when_precondition_missing():
router = FeatureScoringRouter()
tool = CandidateObject(
id="tool-1",
type="tool",
title="t1",
summary="s1",
confidence=0.9,
success_rate=0.9,
freshness=0.9,
cost=0.0,
risk=0.0,
preconditions=["memory"],
)
decision = router.choose(
TaskContext(user_input="run tool"),
memory_candidates=[],
skill_candidates=[],
tool_candidates=[tool],
)
assert decision.decision_type == "call_tool"
assert decision.selected_ids == ["tool-1"]

View File

@@ -0,0 +1,12 @@
from memabra.router import (
FeatureScoringRouter,
RouterProtocol,
RuleBasedRouter,
SimpleLearningRouter,
)
def test_all_router_implementations_conform_to_router_protocol():
assert isinstance(RuleBasedRouter(), RouterProtocol)
assert isinstance(FeatureScoringRouter(), RouterProtocol)
assert isinstance(SimpleLearningRouter(), RouterProtocol)

View File

@@ -0,0 +1,25 @@
from memabra.candidate_types import CandidateObject
from memabra.router import RuleBasedRouter, TaskContext
def test_router_prefers_memory_for_preference_queries():
router = RuleBasedRouter()
decision = router.choose(
TaskContext(user_input="Remember my preferred deployment region"),
memory_candidates=[
CandidateObject(
id="mem-1",
type="memory",
title="Preferred region",
summary="User prefers us-west-2",
confidence=0.9,
freshness=0.8,
success_rate=0.9,
)
],
skill_candidates=[],
tool_candidates=[],
)
assert decision.decision_type == "inject_memory"
assert decision.selected_ids == ["mem-1"]

View File

@@ -0,0 +1,115 @@
import json
from pathlib import Path
from memabra.router import SimpleLearningRouter
from memabra.router_versioning import RouterVersionStore
def test_save_and_load_router_version(tmp_path):
store = RouterVersionStore(base_dir=tmp_path)
router = SimpleLearningRouter()
router._weights = {"call_tool": {"input_length": 0.5, "tool_count": 1.2}}
router._feature_keys = ["input_length", "tool_count"]
store.save(router, version_id="v1", metadata={"avg_reward": 0.75})
loaded = store.load("v1")
assert loaded._weights == router._weights
assert loaded._feature_keys == router._feature_keys
def test_list_versions_returns_metadata(tmp_path):
store = RouterVersionStore(base_dir=tmp_path)
router = SimpleLearningRouter()
router._weights = {"inject_memory": {"memory_count": 0.8}}
router._feature_keys = ["memory_count"]
store.save(router, version_id="v1", metadata={"avg_reward": 0.75})
store.save(router, version_id="v2", metadata={"avg_reward": 0.82})
versions = store.list_versions()
assert len(versions) == 2
assert versions[0]["version_id"] == "v1"
assert versions[0]["metadata"]["avg_reward"] == 0.75
assert versions[1]["version_id"] == "v2"
assert versions[1]["metadata"]["avg_reward"] == 0.82
def test_rollback_changes_current_version(tmp_path):
store = RouterVersionStore(base_dir=tmp_path)
router = SimpleLearningRouter()
router._weights = {"a": {"x": 1.0}}
router._feature_keys = ["x"]
store.save(router, version_id="v1")
store.save(router, version_id="v2")
assert store.get_current()["current_version_id"] == "v2"
store.rollback("v1")
current = store.get_current()
assert current["current_version_id"] == "v1"
assert current.get("rollback_from") == "v2"
assert "rolled_back_at" in current
def test_save_tracks_active_router_metadata(tmp_path):
store = RouterVersionStore(base_dir=tmp_path)
router = SimpleLearningRouter()
router._weights = {"a": {"x": 1.0}}
router._feature_keys = ["x"]
store.save(
router,
version_id="v1",
metadata={"promotion_source": "online_learning", "benchmark_summary": {"reward_delta": 0.1}},
)
current = store.get_current()
assert current["current_version_id"] == "v1"
assert current["promotion_source"] == "online_learning"
assert current["benchmark_summary"]["reward_delta"] == 0.1
assert current.get("prior_version_id") is None
def test_save_records_prior_version_id(tmp_path):
store = RouterVersionStore(base_dir=tmp_path)
router = SimpleLearningRouter()
router._weights = {"a": {"x": 1.0}}
router._feature_keys = ["x"]
store.save(router, version_id="v1")
store.save(router, version_id="v2")
current = store.get_current()
assert current["current_version_id"] == "v2"
assert current["prior_version_id"] == "v1"
def test_load_without_version_uses_current(tmp_path):
store = RouterVersionStore(base_dir=tmp_path)
router = SimpleLearningRouter()
router._weights = {"call_tool": {"input_length": 0.5}}
router._feature_keys = ["input_length"]
store.save(router, version_id="v1")
loaded = store.load()
assert loaded._weights == router._weights
def test_app_save_and_load_learning_router(tmp_path):
from memabra.app import MemabraApp, build_demo_app
app = build_demo_app(base_dir=tmp_path / "artifacts")
router = SimpleLearningRouter()
router._weights = {"clarify": {"input_length": 0.1}}
router._feature_keys = ["input_length"]
app.runner.router = router
version_dir = tmp_path / "router-versions"
app.save_learning_router(version_id="v-test", base_dir=version_dir, metadata={"note": "test"})
loaded_app = build_demo_app(base_dir=tmp_path / "artifacts")
loaded_app.load_learning_router(version_id="v-test", base_dir=version_dir)
assert loaded_app.runner.router._weights == router._weights
assert loaded_app.runner.router._feature_keys == router._feature_keys

96
tests/test_runner.py Normal file
View File

@@ -0,0 +1,96 @@
from memabra.candidate_types import CandidateObject
from memabra.retrieval import CandidateRetriever, InMemoryCandidateProvider
from memabra.router import RuleBasedRouter, TaskContext
from memabra.runner import MemabraRunner
from memabra.schemas import SchemaRegistry
def test_runner_produces_valid_draft_trajectory():
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="memory",
candidates=[
CandidateObject(
id="mem-1",
type="memory",
title="Output preference",
summary="Prefer plain text on Telegram.",
triggers=["telegram", "preference"],
confidence=0.9,
success_rate=0.8,
freshness=0.9,
tags=["output"],
)
],
)
]
)
runner = MemabraRunner(retriever=retriever, router=RuleBasedRouter())
trajectory = runner.run(
context=TaskContext(
user_input="Use my telegram preference for this answer.",
conversation_summary="User often cares about output formatting.",
),
channel="telegram",
user_id="oza",
)
SchemaRegistry().validate_trajectory(trajectory)
assert trajectory["decisions"][0]["decision_type"] == "inject_memory"
assert trajectory["candidate_sets"]["memory"][0]["id"] == "mem-1"
assert len(trajectory["events"]) == 3
def test_runner_injects_episodic_candidate_when_case_index_matches(tmp_path):
from memabra.case_index import CaseIndex
from memabra.persistence import PersistenceStore
store = PersistenceStore(base_dir=tmp_path / "artifacts")
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="memory",
candidates=[],
),
InMemoryCandidateProvider(
candidate_type="skill",
candidates=[],
),
InMemoryCandidateProvider(
candidate_type="tool",
candidates=[],
),
]
)
runner = MemabraRunner(retriever=retriever, router=RuleBasedRouter(), persistence_store=store)
# First run creates a trajectory
trajectory1 = runner.run(
context=TaskContext(user_input="Hello world"),
channel="local",
persist=True,
)
# Build case index from the trajectory
case_index = CaseIndex()
case_index.add(trajectory1)
# Second run with case index should inject an episodic candidate
runner_with_case = MemabraRunner(
retriever=retriever,
router=RuleBasedRouter(),
persistence_store=store,
case_index=case_index,
)
trajectory2 = runner_with_case.run(
context=TaskContext(user_input="Hello world"),
channel="local",
persist=True,
)
memory_candidates = trajectory2["candidate_sets"]["memory"]
assert any(c["id"].startswith("episodic-") for c in memory_candidates)
# With a persistence store, the runner should generate a rich episodic summary
assert any("Task:" in c["summary"] for c in memory_candidates)

30
tests/test_schemas.py Normal file
View File

@@ -0,0 +1,30 @@
import pytest
from memabra.schemas import SchemaRegistry, SchemaValidationError
EXAMPLE_TRAJECTORY = "docs/examples/trajectory_success_memory.json"
def test_schema_registry_validates_example_trajectory():
registry = SchemaRegistry()
with open(EXAMPLE_TRAJECTORY, "r", encoding="utf-8") as f:
example = __import__("json").load(f)
registry.validate_trajectory(example)
def test_schema_registry_rejects_missing_required_keys():
registry = SchemaRegistry()
with pytest.raises(SchemaValidationError):
registry.validate_trajectory({"trajectory_id": "oops"})
def test_no_resource_warning_from_schema_validation():
import warnings
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always", ResourceWarning)
test_schema_registry_validates_example_trajectory()
resource_warnings = [x for x in w if issubclass(x.category, ResourceWarning)]
assert len(resource_warnings) == 0

View File

@@ -0,0 +1,107 @@
from pathlib import Path
from memabra.candidate_types import CandidateObject
from memabra.execution import ExecutionEngine, FileSystemSkillBackend, SkillExecutor
from memabra.retrieval import CandidateRetriever, InMemoryCandidateProvider
from memabra.router import RouteDecision, RuleBasedRouter, TaskContext
from memabra.runner import MemabraRunner
def test_filesystem_skill_backend_loads_skill_from_directory(tmp_path: Path):
skill_dir = tmp_path / "category-a" / "skill-demo"
skill_dir.mkdir(parents=True)
skill_file = skill_dir / "SKILL.md"
skill_file.write_text(
"---\n"
"name: skill-demo\n"
"description: A demo skill for testing.\n"
"version: 1.0.0\n"
"---\n\n"
"# Demo Skill\n\n"
"This is the demo skill body.\n"
)
backend = FileSystemSkillBackend(search_paths=[tmp_path])
payload = backend.load_skill("skill-demo")
assert payload["skill_id"] == "skill-demo"
assert payload["name"] == "skill-demo"
assert payload["description"] == "A demo skill for testing."
assert "This is the demo skill body." in payload["content"]
def test_filesystem_skill_backend_returns_error_for_missing_skill(tmp_path: Path):
backend = FileSystemSkillBackend(search_paths=[tmp_path])
payload = backend.load_skill("nonexistent")
assert payload["skill_id"] == "nonexistent"
assert payload["status"] == "error"
assert "not found" in payload["error"].lower()
def test_skill_executor_uses_filesystem_backend_to_load_payload(tmp_path: Path):
skill_dir = tmp_path / "ops" / "skill-deploy"
skill_dir.mkdir(parents=True)
skill_file = skill_dir / "SKILL.md"
skill_file.write_text(
"---\n"
"name: skill-deploy\n"
"description: Deploy workflow skill.\n"
"---\n\n"
"# Deploy Workflow\n\n"
"1. Build\n2. Test\n3. Deploy\n"
)
backend = FileSystemSkillBackend(search_paths=[tmp_path])
executor = SkillExecutor(backend=backend)
decision = RouteDecision(decision_type="load_skill", selected_ids=["skill-deploy"])
result = executor.execute(decision, TaskContext(user_input="deploy"), trajectory_id="traj-1")
assert result.status == "executed"
assert result.details["payloads"][0]["name"] == "skill-deploy"
assert "1. Build" in result.details["payloads"][0]["content"]
assert any(event.event_type == "skill_loaded" for event in result.events)
def test_execution_engine_runs_skill_path_end_to_end(tmp_path: Path):
skill_dir = tmp_path / "ops" / "skill-deploy"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"---\n"
"name: skill-deploy\n"
"description: Deploy workflow skill.\n"
"---\n\n"
"Deploy steps here.\n"
)
retriever = CandidateRetriever(
[
InMemoryCandidateProvider(
candidate_type="skill",
candidates=[
CandidateObject(
id="skill-deploy",
type="skill",
title="deploy workflow",
summary="Reusable deployment procedure.",
triggers=["deploy", "workflow"],
confidence=0.9,
success_rate=0.95,
freshness=0.8,
)
],
)
]
)
runner = MemabraRunner(
retriever=retriever,
router=RuleBasedRouter(),
execution_engine=ExecutionEngine(skill_backend=FileSystemSkillBackend(search_paths=[tmp_path])),
)
trajectory = runner.run(context=TaskContext(user_input="Deploy this service with the usual workflow."))
skill_events = [event for event in trajectory["events"] if event["event_type"] == "skill_loaded"]
assert skill_events
assert skill_events[0]["payload"]["name"] == "skill-deploy"
assert "Deploy steps here." in skill_events[0]["payload"]["content"]

View File

@@ -0,0 +1,66 @@
from memabra.router import TaskContext
def test_local_function_tool_adapter_executes_callable():
from memabra.execution import LocalFunctionToolAdapter
def add(a: int, b: int) -> int:
return a + b
adapter = LocalFunctionToolAdapter(func=add)
result = adapter.run_tool("add", TaskContext(user_input="add 1 and 2"), {"a": 1, "b": 2})
assert result["status"] == "success"
assert result["output"] == 3
assert result["error"] is None
def test_subprocess_tool_adapter_executes_command():
from memabra.execution import SubprocessToolAdapter
adapter = SubprocessToolAdapter(command="echo hello")
result = adapter.run_tool("echo", TaskContext(user_input="say hello"))
assert result["status"] == "success"
assert "hello" in result["output"]
assert result["error"] is None
assert result["latency_ms"] >= 0
def test_tool_registry_resolves_and_runs_tools():
from memabra.execution import LocalFunctionToolAdapter, ToolRegistry
registry = ToolRegistry()
registry.register("double", LocalFunctionToolAdapter(func=lambda x: x * 2))
result = registry.run_tool("double", TaskContext(user_input="double 5"), {"x": 5})
assert result["status"] == "success"
assert result["output"] == 10
def test_tool_registry_returns_error_for_unknown_tool():
from memabra.execution import ToolRegistry
registry = ToolRegistry()
result = registry.run_tool("missing", TaskContext(user_input="missing"))
assert result["status"] == "error"
assert "not found" in result["error"].lower()
def test_tool_executor_uses_registry_and_produces_result_events():
from memabra.execution import ToolExecutor, ToolRegistry, LocalFunctionToolAdapter
from memabra.router import RouteDecision
registry = ToolRegistry()
registry.register("add", LocalFunctionToolAdapter(func=lambda a, b: a + b))
executor = ToolExecutor(backend=registry)
decision = RouteDecision(decision_type="call_tool", selected_ids=["add"], selected_payloads=[{"a": 2, "b": 3}])
result = executor.execute(decision, TaskContext(user_input="add 2 and 3"), trajectory_id="traj-1")
assert result.status == "executed"
assert result.details["results"][0]["output"] == 5
assert any(event.event_type == "tool_called" for event in result.events)
assert any(event.event_type == "tool_result" for event in result.events)

View File

@@ -0,0 +1,74 @@
from __future__ import annotations
from datetime import datetime, timezone
from memabra.evaluator import EvaluationResult
from memabra.promotion import PromotionDecision, PromotionPolicy
from memabra.training_reports import TrainingReportStore, build_report
def test_build_report_includes_all_required_fields():
baseline = EvaluationResult(task_count=2, avg_reward=0.5, error_rate=0.1, avg_latency_ms=50.0)
challenger = EvaluationResult(task_count=2, avg_reward=0.6, error_rate=0.05, avg_latency_ms=45.0)
decision = PromotionDecision(accepted=True, reasons=[], metrics={"reward_delta": 0.1})
report = build_report(
source_trajectory_ids=["t1", "t2"],
baseline=baseline,
challenger=challenger,
decision=decision,
promoted_version_id="v-2026",
)
assert report["source_trajectory_ids"] == ["t1", "t2"]
assert report["sample_count"] == 2
assert "timestamp" in report
assert report["promoted_version_id"] == "v-2026"
assert report["baseline_metrics"]["avg_reward"] == 0.5
assert report["challenger_metrics"]["avg_reward"] == 0.6
assert report["promotion_decision"]["accepted"] is True
def test_training_report_store_save_and_list(tmp_path):
store = TrainingReportStore(base_dir=tmp_path / "reports")
report = build_report(
source_trajectory_ids=["t1"],
baseline=EvaluationResult(task_count=1, avg_reward=0.5, error_rate=0.0, avg_latency_ms=10.0),
challenger=EvaluationResult(task_count=1, avg_reward=0.6, error_rate=0.0, avg_latency_ms=10.0),
decision=PromotionDecision(accepted=False, reasons=["reward too low"], metrics={}),
)
saved = store.save(report)
reports = store.list_reports()
assert len(reports) == 1
assert reports[0]["report_id"] == saved["report_id"]
assert reports[0]["promotion_decision"]["accepted"] is False
def test_training_report_store_get_report_returns_specific_report(tmp_path):
from memabra.training_reports import TrainingReportStore, build_report
from memabra.evaluator import EvaluationResult
from memabra.promotion import PromotionDecision
store = TrainingReportStore(base_dir=tmp_path)
report = build_report(
source_trajectory_ids=["t1", "t2"],
baseline=EvaluationResult(task_count=1, trajectories=[], avg_reward=0.5, error_rate=0.0, avg_latency_ms=10.0, decision_distribution={}),
challenger=EvaluationResult(task_count=1, trajectories=[], avg_reward=0.6, error_rate=0.0, avg_latency_ms=10.0, decision_distribution={}),
decision=PromotionDecision(accepted=True, reasons=[], metrics={}),
promoted_version_id="v1",
)
store.save(report)
fetched = store.get_report(report["report_id"])
assert fetched is not None
assert fetched["report_id"] == report["report_id"]
assert fetched["promoted_version_id"] == "v1"
def test_training_report_store_get_report_missing_returns_none(tmp_path):
from memabra.training_reports import TrainingReportStore
store = TrainingReportStore(base_dir=tmp_path)
assert store.get_report("nonexistent") is None

View File

@@ -0,0 +1,58 @@
from memabra.trajectory_summary import TrajectorySummarizer
def test_summarize_direct_answer_success():
summarizer = TrajectorySummarizer()
trajectory = {
"task": {"input": "What is 2+2?"},
"decisions": [{"decision_type": "direct_answer"}],
"outcome": {"status": "success", "steps": 1, "tool_errors": 0, "user_corrections": 0},
"reward": {"total": 1.0},
}
summary = summarizer.summarize(trajectory)
assert "Task: 'What is 2+2?'" in summary
assert "Actions: direct_answer" in summary
assert "Outcome: success (reward=1.0, steps=1)" in summary
def test_summarize_multi_step_with_tool_errors():
summarizer = TrajectorySummarizer()
trajectory = {
"task": {"input": "Run analysis"},
"decisions": [
{"decision_type": "clarify"},
{"decision_type": "call_tool"},
{"decision_type": "direct_answer"},
],
"outcome": {"status": "partial_success", "steps": 3, "tool_errors": 1, "user_corrections": 1},
"reward": {"total": 0.5},
}
summary = summarizer.summarize(trajectory)
assert "Actions: clarify -> call_tool -> direct_answer" in summary
assert "Outcome: partial_success (reward=0.5, steps=3)" in summary
assert "Tool errors: 1" in summary
assert "User corrections: 1" in summary
def test_summarize_truncates_long_input():
summarizer = TrajectorySummarizer()
long_input = "a" * 100
trajectory = {
"task": {"input": long_input},
"decisions": [{"decision_type": "direct_answer"}],
"outcome": {"status": "success", "steps": 1, "tool_errors": 0, "user_corrections": 0},
"reward": {"total": 0.9},
}
summary = summarizer.summarize(trajectory)
assert "Task: '" in summary
assert "..." in summary
assert len(summary) < 300
def test_summarize_handles_missing_fields_gracefully():
summarizer = TrajectorySummarizer()
trajectory = {}
summary = summarizer.summarize(trajectory)
assert "Task: ''" in summary
assert "Actions: none" in summary
assert "Outcome: unknown (reward=0.0, steps=0)" in summary