"""Tests for acp_adapter.session — SessionManager and SessionState."""

import contextlib
import io
import json
import time
from types import SimpleNamespace
import pytest
from unittest.mock import MagicMock, patch

from acp_adapter import session as acp_session
from acp_adapter.session import SessionManager, SessionState
from hermes_state import SessionDB


def _mock_agent():
    return MagicMock(name="MockAIAgent")


@pytest.fixture()
def manager():
    """SessionManager with a mock agent factory (avoids needing API keys)."""
    return SessionManager(agent_factory=_mock_agent)


# ---------------------------------------------------------------------------
# create / get
# ---------------------------------------------------------------------------


class TestCreateSession:
    def test_create_session_returns_state(self, manager):
        state = manager.create_session(cwd="/tmp/work")
        assert isinstance(state, SessionState)
        assert state.cwd == "/tmp/work"
        assert state.session_id
        assert state.history == []
        assert state.agent is not None

    def test_create_session_registers_task_cwd(self, manager, monkeypatch):
        calls = []
        monkeypatch.setattr("acp_adapter.session._register_task_cwd", lambda task_id, cwd: calls.append((task_id, cwd)))
        state = manager.create_session(cwd="/tmp/work")
        assert calls == [(state.session_id, "/tmp/work")]


    def test_register_task_cwd_translates_windows_drive_for_wsl_tools(self, monkeypatch):
        captured = {}

        def fake_register_task_env_overrides(task_id, overrides):
            captured["task_id"] = task_id
            captured["overrides"] = overrides

        monkeypatch.setattr("hermes_constants._wsl_detected", True)
        monkeypatch.setattr(
            "tools.terminal_tool.register_task_env_overrides",
            fake_register_task_env_overrides,
        )

        acp_session._register_task_cwd("session-1", r"E:\Projects\AI\paperclip")

        assert captured == {
            "task_id": "session-1",
            "overrides": {"cwd": "/mnt/e/Projects/AI/paperclip"},
        }

    def test_session_ids_are_unique(self, manager):
        s1 = manager.create_session()
        s2 = manager.create_session()
        assert s1.session_id != s2.session_id

    def test_get_session(self, manager):
        state = manager.create_session()
        fetched = manager.get_session(state.session_id)
        assert fetched is state

    def test_get_nonexistent_session_returns_none(self, manager):
        assert manager.get_session("does-not-exist") is None




# ---------------------------------------------------------------------------
# WSL cwd translation
# ---------------------------------------------------------------------------


class TestWslCwdTranslation:
    def test_translate_acp_cwd_converts_windows_drive_path_when_wsl(self, monkeypatch):
        monkeypatch.setattr("hermes_constants._wsl_detected", True)

        assert acp_session._translate_acp_cwd(r"E:\Projects\AI\paperclip") == "/mnt/e/Projects/AI/paperclip"

    def test_translate_acp_cwd_handles_forward_slashes_when_wsl(self, monkeypatch):
        monkeypatch.setattr("hermes_constants._wsl_detected", True)

        assert acp_session._translate_acp_cwd("D:/work/project") == "/mnt/d/work/project"

    def test_translate_acp_cwd_leaves_windows_drive_path_unchanged_off_wsl(self, monkeypatch):
        monkeypatch.setattr("hermes_constants._wsl_detected", False)

        assert acp_session._translate_acp_cwd(r"E:\Projects\AI\paperclip") == r"E:\Projects\AI\paperclip"

    def test_translate_acp_cwd_leaves_posix_path_unchanged_on_wsl(self, monkeypatch):
        monkeypatch.setattr("hermes_constants._wsl_detected", True)

        assert acp_session._translate_acp_cwd("/mnt/e/Projects/AI/paperclip") == "/mnt/e/Projects/AI/paperclip"

    def test_create_session_stores_translated_cwd_on_wsl(self, manager, monkeypatch):
        monkeypatch.setattr("hermes_constants._wsl_detected", True)

        state = manager.create_session(cwd=r"E:\Projects\AI\paperclip")

        assert state.cwd == "/mnt/e/Projects/AI/paperclip"

    def test_fork_session_stores_translated_cwd_on_wsl(self, manager, monkeypatch):
        monkeypatch.setattr("hermes_constants._wsl_detected", True)
        original = manager.create_session(cwd="/tmp/base")

        forked = manager.fork_session(original.session_id, cwd=r"D:\work\project")

        assert forked is not None
        assert forked.cwd == "/mnt/d/work/project"

    def test_update_cwd_stores_translated_cwd_on_wsl(self, manager, monkeypatch):
        monkeypatch.setattr("hermes_constants._wsl_detected", True)
        state = manager.create_session(cwd="/tmp/old")

        updated = manager.update_cwd(state.session_id, cwd=r"C:\Users\foo\project")

        assert updated is not None
        assert updated.cwd == "/mnt/c/Users/foo/project"

# ---------------------------------------------------------------------------
# fork
# ---------------------------------------------------------------------------


class TestForkSession:
    def test_fork_session_deep_copies_history(self, manager):
        original = manager.create_session()
        original.history.append({"role": "user", "content": "hello"})
        original.history.append({"role": "assistant", "content": "hi"})

        forked = manager.fork_session(original.session_id, cwd="/new")
        assert forked is not None

        # History should be equal in content
        assert len(forked.history) == 2
        assert forked.history[0]["content"] == "hello"

        # But a deep copy — mutating one doesn't affect the other
        forked.history.append({"role": "user", "content": "extra"})
        assert len(original.history) == 2
        assert len(forked.history) == 3

    def test_fork_session_has_new_id(self, manager):
        original = manager.create_session()
        forked = manager.fork_session(original.session_id)
        assert forked is not None
        assert forked.session_id != original.session_id

    def test_fork_nonexistent_returns_none(self, manager):
        assert manager.fork_session("bogus-id") is None


# ---------------------------------------------------------------------------
# list / cleanup / remove
# ---------------------------------------------------------------------------


class TestListAndCleanup:
    def test_list_sessions_empty(self, manager):
        assert manager.list_sessions() == []

    def test_list_sessions_returns_created(self, manager):
        s1 = manager.create_session(cwd="/a")
        s2 = manager.create_session(cwd="/b")
        s1.history.append({"role": "user", "content": "hello from a"})
        s2.history.append({"role": "user", "content": "hello from b"})
        listing = manager.list_sessions()
        ids = {s["session_id"] for s in listing}
        assert s1.session_id in ids
        assert s2.session_id in ids
        assert len(listing) == 2

    def test_list_sessions_hides_empty_threads(self, manager):
        manager.create_session(cwd="/empty")
        assert manager.list_sessions() == []

    def test_cleanup_clears_all(self, manager):
        s1 = manager.create_session()
        s2 = manager.create_session()
        s1.history.append({"role": "user", "content": "one"})
        s2.history.append({"role": "user", "content": "two"})
        assert len(manager.list_sessions()) == 2
        manager.cleanup()
        assert manager.list_sessions() == []

    def test_remove_session(self, manager):
        state = manager.create_session()
        assert manager.remove_session(state.session_id) is True
        assert manager.get_session(state.session_id) is None
        # Removing again returns False
        assert manager.remove_session(state.session_id) is False


# ---------------------------------------------------------------------------
# persistence — sessions survive process restarts (via SessionDB)
# ---------------------------------------------------------------------------


class TestPersistence:
    """Verify that sessions are persisted to SessionDB and can be restored."""

    def test_create_session_includes_registered_mcp_toolsets(self, tmp_path, monkeypatch):
        captured = {}

        def fake_resolve_runtime_provider(requested=None, **kwargs):
            return {
                "provider": "openrouter",
                "api_mode": "chat_completions",
                "base_url": "https://openrouter.example/v1",
                "api_key": "***",
                "command": None,
                "args": [],
            }

        def fake_agent(**kwargs):
            captured.update(kwargs)
            return SimpleNamespace(model=kwargs.get("model"), enabled_toolsets=kwargs.get("enabled_toolsets"))

        monkeypatch.setattr("hermes_cli.config.load_config", lambda: {
            "model": {"provider": "openrouter", "default": "test-model"},
            "mcp_servers": {
                "olympus": {"command": "python", "enabled": True},
                "exa": {"url": "https://exa.ai/mcp"},
                "disabled": {"command": "python", "enabled": False},
            },
        })
        monkeypatch.setattr(
            "hermes_cli.runtime_provider.resolve_runtime_provider",
            fake_resolve_runtime_provider,
        )
        db = SessionDB(tmp_path / "state.db")

        with patch("run_agent.AIAgent", side_effect=fake_agent):
            manager = SessionManager(db=db)
            manager.create_session(cwd="/work")

        assert captured["enabled_toolsets"] == ["hermes-acp", "mcp-olympus", "mcp-exa"]

    def test_create_session_writes_to_db(self, manager):
        state = manager.create_session(cwd="/project")
        db = manager._get_db()
        assert db is not None
        row = db.get_session(state.session_id)
        assert row is not None
        assert row["source"] == "acp"
        # cwd stored in model_config JSON
        mc = json.loads(row["model_config"])
        assert mc["cwd"] == "/project"

    def test_get_session_restores_from_db(self, manager):
        """Simulate process restart: create session, drop from memory, get again."""
        state = manager.create_session(cwd="/work")
        state.history.append({"role": "user", "content": "hello"})
        state.history.append({"role": "assistant", "content": "hi there"})
        manager.save_session(state.session_id)

        sid = state.session_id

        # Drop from in-memory store (simulates process restart).
        with manager._lock:
            del manager._sessions[sid]

        # get_session should transparently restore from DB.
        restored = manager.get_session(sid)
        assert restored is not None
        assert restored.session_id == sid
        assert restored.cwd == "/work"
        assert len(restored.history) == 2
        assert restored.history[0]["content"] == "hello"
        assert restored.history[1]["content"] == "hi there"
        # Agent should have been recreated.
        assert restored.agent is not None

    def test_save_session_updates_db(self, manager):
        state = manager.create_session()
        state.history.append({"role": "user", "content": "test"})
        manager.save_session(state.session_id)

        db = manager._get_db()
        messages = db.get_messages_as_conversation(state.session_id)
        assert len(messages) == 1
        assert messages[0]["content"] == "test"

    def test_remove_session_deletes_from_db(self, manager):
        state = manager.create_session()
        db = manager._get_db()
        assert db.get_session(state.session_id) is not None
        manager.remove_session(state.session_id)
        assert db.get_session(state.session_id) is None

    def test_cleanup_removes_all_from_db(self, manager):
        s1 = manager.create_session()
        s2 = manager.create_session()
        db = manager._get_db()
        assert db.get_session(s1.session_id) is not None
        assert db.get_session(s2.session_id) is not None
        manager.cleanup()
        assert db.get_session(s1.session_id) is None
        assert db.get_session(s2.session_id) is None

    def test_list_sessions_includes_db_only(self, manager):
        """Sessions only in DB (not in memory) appear in list_sessions."""
        state = manager.create_session(cwd="/db-only")
        state.history.append({"role": "user", "content": "database only thread"})
        manager.save_session(state.session_id)
        sid = state.session_id

        # Drop from memory.
        with manager._lock:
            del manager._sessions[sid]

        listing = manager.list_sessions()
        ids = {s["session_id"] for s in listing}
        assert sid in ids

    def test_list_sessions_filters_by_cwd(self, manager):
        keep = manager.create_session(cwd="/keep")
        drop = manager.create_session(cwd="/drop")
        keep.history.append({"role": "user", "content": "keep me"})
        drop.history.append({"role": "user", "content": "drop me"})

        listing = manager.list_sessions(cwd="/keep")
        ids = {s["session_id"] for s in listing}
        assert keep.session_id in ids
        assert drop.session_id not in ids

    def test_list_sessions_matches_windows_and_wsl_paths(self, manager):
        state = manager.create_session(cwd="/mnt/e/Projects/AI/browser-link-3")
        state.history.append({"role": "user", "content": "same project from WSL"})

        listing = manager.list_sessions(cwd=r"E:\Projects\AI\browser-link-3")
        ids = {s["session_id"] for s in listing}
        assert state.session_id in ids

    def test_list_sessions_prefers_title_then_preview(self, manager):
        state = manager.create_session(cwd="/named")
        state.history.append({"role": "user", "content": "Investigate broken ACP history in Zed"})
        manager.save_session(state.session_id)
        db = manager._get_db()
        db.set_session_title(state.session_id, "Fix Zed ACP history")

        listing = manager.list_sessions(cwd="/named")
        assert listing[0]["title"] == "Fix Zed ACP history"

        db.set_session_title(state.session_id, "")
        listing = manager.list_sessions(cwd="/named")
        assert listing[0]["title"].startswith("Investigate broken ACP history")

    def test_list_sessions_sorted_by_most_recent_activity(self, manager):
        older = manager.create_session(cwd="/ordered")
        older.history.append({"role": "user", "content": "older"})
        manager.save_session(older.session_id)
        time.sleep(0.02)
        newer = manager.create_session(cwd="/ordered")
        newer.history.append({"role": "user", "content": "newer"})
        manager.save_session(newer.session_id)

        listing = manager.list_sessions(cwd="/ordered")
        assert [item["session_id"] for item in listing[:2]] == [newer.session_id, older.session_id]
        assert listing[0]["updated_at"]
        assert listing[1]["updated_at"]

    def test_fork_restores_source_from_db(self, manager):
        """Forking a session that is only in DB should work."""
        original = manager.create_session()
        original.history.append({"role": "user", "content": "context"})
        manager.save_session(original.session_id)

        # Drop original from memory.
        with manager._lock:
            del manager._sessions[original.session_id]

        forked = manager.fork_session(original.session_id, cwd="/fork")
        assert forked is not None
        assert len(forked.history) == 1
        assert forked.history[0]["content"] == "context"
        assert forked.session_id != original.session_id

    def test_update_cwd_restores_from_db(self, manager):
        state = manager.create_session(cwd="/old")
        sid = state.session_id

        with manager._lock:
            del manager._sessions[sid]

        updated = manager.update_cwd(sid, "/new")
        assert updated is not None
        assert updated.cwd == "/new"

        # Should also be persisted in DB.
        db = manager._get_db()
        row = db.get_session(sid)
        mc = json.loads(row["model_config"])
        assert mc["cwd"] == "/new"

    def test_only_restores_acp_sessions(self, manager):
        """get_session should not restore non-ACP sessions from DB."""
        db = manager._get_db()
        # Manually create a CLI session in the DB.
        db.create_session(session_id="cli-session-123", source="cli", model="test")
        # Should not be found via ACP SessionManager.
        assert manager.get_session("cli-session-123") is None

    def test_sessions_searchable_via_fts(self, manager):
        """ACP sessions stored in SessionDB are searchable via FTS5."""
        state = manager.create_session()
        state.history.append({"role": "user", "content": "how do I configure nginx"})
        state.history.append({"role": "assistant", "content": "Here is the nginx config..."})
        manager.save_session(state.session_id)

        db = manager._get_db()
        results = db.search_messages("nginx")
        assert len(results) > 0
        session_ids = {r["session_id"] for r in results}
        assert state.session_id in session_ids

    def test_tool_calls_persisted(self, manager):
        """Messages with tool_calls should round-trip through the DB."""
        state = manager.create_session()
        state.history.append({
            "role": "assistant",
            "content": None,
            "tool_calls": [{"id": "tc_1", "type": "function",
                            "function": {"name": "terminal", "arguments": "{}"}}],
        })
        state.history.append({
            "role": "tool",
            "content": "output here",
            "tool_call_id": "tc_1",
            "name": "terminal",
        })
        manager.save_session(state.session_id)

        # Drop from memory, restore from DB.
        with manager._lock:
            del manager._sessions[state.session_id]

        restored = manager.get_session(state.session_id)
        assert restored is not None
        assert len(restored.history) == 2
        assert restored.history[0].get("tool_calls") is not None
        assert restored.history[1].get("tool_call_id") == "tc_1"

    def test_restore_preserves_persisted_provider_snapshot(self, tmp_path, monkeypatch):
        """Restored ACP sessions should keep their original runtime provider."""
        runtime_choice = {"provider": "anthropic"}

        def fake_resolve_runtime_provider(requested=None, **kwargs):
            provider = requested or runtime_choice["provider"]
            return {
                "provider": provider,
                "api_mode": "anthropic_messages" if provider == "anthropic" else "chat_completions",
                "base_url": f"https://{provider}.example/v1",
                "api_key": f"{provider}-key",
                "command": None,
                "args": [],
            }

        def fake_agent(**kwargs):
            return SimpleNamespace(
                model=kwargs.get("model"),
                provider=kwargs.get("provider"),
                base_url=kwargs.get("base_url"),
                api_mode=kwargs.get("api_mode"),
            )

        monkeypatch.setattr("hermes_cli.config.load_config", lambda: {
            "model": {"provider": runtime_choice["provider"], "default": "test-model"}
        })
        monkeypatch.setattr(
            "hermes_cli.runtime_provider.resolve_runtime_provider",
            fake_resolve_runtime_provider,
        )
        db = SessionDB(tmp_path / "state.db")

        with patch("run_agent.AIAgent", side_effect=fake_agent):
            manager = SessionManager(db=db)
            state = manager.create_session(cwd="/work")
            manager.save_session(state.session_id)

            with manager._lock:
                del manager._sessions[state.session_id]

            runtime_choice["provider"] = "openrouter"
            restored = manager.get_session(state.session_id)

        assert restored is not None
        assert restored.agent.provider == "anthropic"
        assert restored.agent.base_url == "https://anthropic.example/v1"

    def test_acp_agents_route_human_output_to_stderr(self, tmp_path, monkeypatch):
        """ACP agents must keep stdout clean for JSON-RPC stdio transport."""

        def fake_resolve_runtime_provider(requested=None, **kwargs):
            return {
                "provider": "openrouter",
                "api_mode": "chat_completions",
                "base_url": "https://openrouter.example/v1",
                "api_key": "test-key",
                "command": None,
                "args": [],
            }

        def fake_agent(**kwargs):
            return SimpleNamespace(model=kwargs.get("model"), _print_fn=None)

        monkeypatch.setattr("hermes_cli.config.load_config", lambda: {
            "model": {"provider": "openrouter", "default": "test-model"}
        })
        monkeypatch.setattr(
            "hermes_cli.runtime_provider.resolve_runtime_provider",
            fake_resolve_runtime_provider,
        )
        db = SessionDB(tmp_path / "state.db")

        with patch("run_agent.AIAgent", side_effect=fake_agent):
            manager = SessionManager(db=db)
            state = manager.create_session(cwd="/work")

        stdout_buf = io.StringIO()
        stderr_buf = io.StringIO()
        with contextlib.redirect_stdout(stdout_buf), contextlib.redirect_stderr(stderr_buf):
            state.agent._print_fn("ACP noise")

        assert stdout_buf.getvalue() == ""
        assert stderr_buf.getvalue() == "ACP noise\n"
