"""ACP tool-call helpers for mapping hermes tools to ACP ToolKind and building content."""

from __future__ import annotations

import json
import uuid
from typing import Any, Dict, List, Optional

import acp
from acp.schema import (
    ToolCallLocation,
    ToolCallStart,
    ToolCallProgress,
    ToolKind,
)

# ---------------------------------------------------------------------------
# Map hermes tool names -> ACP ToolKind
# ---------------------------------------------------------------------------

TOOL_KIND_MAP: Dict[str, ToolKind] = {
    # File operations
    "read_file": "read",
    "write_file": "edit",
    "patch": "edit",
    "search_files": "search",
    # Terminal / execution
    "terminal": "execute",
    "process": "execute",
    "execute_code": "execute",
    # Session/meta tools
    "todo": "other",
    "skill_view": "read",
    "skills_list": "read",
    "skill_manage": "edit",
    # Web / fetch
    "web_search": "fetch",
    "web_extract": "fetch",
    # Browser
    "browser_navigate": "fetch",
    "browser_click": "execute",
    "browser_type": "execute",
    "browser_snapshot": "read",
    "browser_vision": "read",
    "browser_scroll": "execute",
    "browser_press": "execute",
    "browser_back": "execute",
    "browser_get_images": "read",
    # Agent internals
    "delegate_task": "execute",
    "vision_analyze": "read",
    "image_generate": "execute",
    "text_to_speech": "execute",
    # Thinking / meta
    "_thinking": "think",
}


_POLISHED_TOOLS = {
    # Core operator loop
    "todo", "memory", "session_search", "delegate_task",
    # Files / execution
    "read_file", "write_file", "patch", "search_files", "terminal", "process", "execute_code",
    # Skills / web / browser / media
    "skill_view", "skills_list", "skill_manage", "web_search", "web_extract",
    "browser_navigate", "browser_click", "browser_type", "browser_press", "browser_scroll",
    "browser_back", "browser_snapshot", "browser_console", "browser_get_images", "browser_vision",
    "vision_analyze", "image_generate", "text_to_speech",
    # Schedulers / platform integrations
    "cronjob", "send_message", "clarify", "discord", "discord_admin",
    "ha_list_entities", "ha_get_state", "ha_list_services", "ha_call_service",
    "feishu_doc_read", "feishu_drive_list_comments", "feishu_drive_list_comment_replies",
    "feishu_drive_reply_comment", "feishu_drive_add_comment",
    "kanban_create", "kanban_show", "kanban_comment", "kanban_complete",
    "kanban_block", "kanban_link", "kanban_heartbeat",
    "yb_query_group_info", "yb_query_group_members", "yb_search_sticker",
    "yb_send_dm", "yb_send_sticker", "mixture_of_agents",
}


def get_tool_kind(tool_name: str) -> ToolKind:
    """Return the ACP ToolKind for a hermes tool, defaulting to 'other'."""
    return TOOL_KIND_MAP.get(tool_name, "other")


def make_tool_call_id() -> str:
    """Generate a unique tool call ID."""
    return f"tc-{uuid.uuid4().hex[:12]}"


def build_tool_title(tool_name: str, args: Dict[str, Any]) -> str:
    """Build a human-readable title for a tool call."""
    if tool_name == "terminal":
        cmd = args.get("command", "")
        if len(cmd) > 80:
            cmd = cmd[:77] + "..."
        return f"terminal: {cmd}"
    if tool_name == "read_file":
        return f"read: {args.get('path', '?')}"
    if tool_name == "write_file":
        return f"write: {args.get('path', '?')}"
    if tool_name == "patch":
        mode = args.get("mode", "replace")
        path = args.get("path", "?")
        return f"patch ({mode}): {path}"
    if tool_name == "search_files":
        return f"search: {args.get('pattern', '?')}"
    if tool_name == "web_search":
        return f"web search: {args.get('query', '?')}"
    if tool_name == "web_extract":
        urls = args.get("urls", [])
        if urls:
            return f"extract: {urls[0]}" + (f" (+{len(urls)-1})" if len(urls) > 1 else "")
        return "web extract"
    if tool_name == "process":
        action = str(args.get("action") or "").strip() or "manage"
        sid = str(args.get("session_id") or "").strip()
        return f"process {action}: {sid}" if sid else f"process {action}"
    if tool_name == "delegate_task":
        tasks = args.get("tasks")
        if isinstance(tasks, list) and tasks:
            return f"delegate batch ({len(tasks)} tasks)"
        goal = args.get("goal", "")
        if goal and len(goal) > 60:
            goal = goal[:57] + "..."
        return f"delegate: {goal}" if goal else "delegate task"
    if tool_name == "session_search":
        query = str(args.get("query") or "").strip()
        return f"session search: {query}" if query else "recent sessions"
    if tool_name == "memory":
        action = str(args.get("action") or "manage").strip() or "manage"
        target = str(args.get("target") or "memory").strip() or "memory"
        return f"memory {action}: {target}"
    if tool_name == "execute_code":
        code = str(args.get("code") or "").strip()
        first_line = next((line.strip() for line in code.splitlines() if line.strip()), "")
        if first_line:
            if len(first_line) > 70:
                first_line = first_line[:67] + "..."
            return f"python: {first_line}"
        return "python code"
    if tool_name == "todo":
        items = args.get("todos")
        if isinstance(items, list):
            return f"todo ({len(items)} item{'s' if len(items) != 1 else ''})"
        return "todo"
    if tool_name == "skill_view":
        name = str(args.get("name") or "?").strip() or "?"
        file_path = str(args.get("file_path") or "").strip()
        suffix = f"/{file_path}" if file_path else ""
        return f"skill view ({name}{suffix})"
    if tool_name == "skills_list":
        category = str(args.get("category") or "").strip()
        return f"skills list ({category})" if category else "skills list"
    if tool_name == "skill_manage":
        action = str(args.get("action") or "manage").strip() or "manage"
        name = str(args.get("name") or "?").strip() or "?"
        file_path = str(args.get("file_path") or "").strip()
        target = f"{name}/{file_path}" if file_path else name
        if len(target) > 64:
            target = target[:61] + "..."
        return f"skill {action}: {target}"
    if tool_name == "browser_navigate":
        return f"navigate: {args.get('url', '?')}"
    if tool_name == "browser_snapshot":
        return "browser snapshot"
    if tool_name == "browser_vision":
        return f"browser vision: {str(args.get('question', '?'))[:50]}"
    if tool_name == "browser_get_images":
        return "browser images"
    if tool_name == "vision_analyze":
        return f"analyze image: {str(args.get('question', '?'))[:50]}"
    if tool_name == "image_generate":
        prompt = str(args.get("prompt") or args.get("description") or "").strip()
        return f"generate image: {prompt[:50]}" if prompt else "generate image"
    if tool_name == "cronjob":
        action = str(args.get("action") or "manage").strip() or "manage"
        job_id = str(args.get("job_id") or args.get("id") or "").strip()
        return f"cron {action}: {job_id}" if job_id else f"cron {action}"
    return tool_name


def _text(content: str) -> Any:
    return acp.tool_content(acp.text_block(content))


def _json_loads_maybe(value: Optional[str]) -> Any:
    if not isinstance(value, str):
        return value
    try:
        return json.loads(value)
    except Exception:
        pass

    # Some Hermes tools append a human hint after a JSON payload, e.g.
    # ``{...}\n\n[Hint: Results truncated...]``. Keep the structured rendering path
    # by decoding the first JSON value instead of falling back to raw text.
    try:
        decoded, _ = json.JSONDecoder().raw_decode(value.lstrip())
        return decoded
    except Exception:
        return None


def _truncate_text(text: str, limit: int = 5000) -> str:
    if len(text) <= limit:
        return text
    return text[: max(0, limit - 100)] + f"\n... ({len(text)} chars total, truncated)"


def _fenced_text(text: str, language: str = "") -> str:
    """Return a Markdown fence that cannot be broken by backticks in text."""
    longest = max((len(run) for run in text.split("`")[1::2]), default=0)
    fence = "`" * max(3, longest + 1)
    return f"{fence}{language}\n{text}\n{fence}"


def _format_todo_result(result: Optional[str]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict) or not isinstance(data.get("todos"), list):
        return None
    summary = data.get("summary") if isinstance(data.get("summary"), dict) else {}
    icon = {
        "completed": "✅",
        "in_progress": "🔄",
        "pending": "⏳",
        "cancelled": "✗",
    }
    lines = ["**Todo list**", ""]
    for item in data["todos"]:
        if not isinstance(item, dict):
            continue
        status = str(item.get("status") or "pending")
        content = str(item.get("content") or item.get("id") or "").strip()
        if content:
            lines.append(f"- {icon.get(status, '•')} {content}")
    if summary:
        cancelled = summary.get("cancelled", 0)
        lines.extend([
            "",
            "**Progress:** "
            f"{summary.get('completed', 0)} completed, "
            f"{summary.get('in_progress', 0)} in progress, "
            f"{summary.get('pending', 0)} pending"
            + (f", {cancelled} cancelled" if cancelled else ""),
        ])
    return "\n".join(lines)


def _format_read_file_result(result: Optional[str], args: Optional[Dict[str, Any]]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return None
    if data.get("error") and not data.get("content"):
        return f"Read failed: {data.get('error')}"
    content = data.get("content")
    if not isinstance(content, str):
        return None
    path = str((args or {}).get("path") or data.get("path") or "file").strip()
    offset = (args or {}).get("offset")
    limit = (args or {}).get("limit")
    range_bits = []
    if offset:
        range_bits.append(f"from line {offset}")
    if limit:
        range_bits.append(f"limit {limit}")
    suffix = f" ({', '.join(range_bits)})" if range_bits else ""
    header = f"Read {path}{suffix}"
    if data.get("total_lines") is not None:
        header += f" — {data.get('total_lines')} total lines"
    # Hermes read_file output is line-numbered with `|`. If we send it as raw
    # Markdown, Zed can interpret pipes as tables and collapse the layout.
    # Fence the payload so file lines stay readable and literal.
    return _truncate_text(f"{header}\n\n{_fenced_text(content)}")


def _format_search_files_result(result: Optional[str]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return None
    matches = data.get("matches")
    if not isinstance(matches, list):
        return None

    total = data.get("total_count", len(matches))
    shown = min(len(matches), 12)
    truncated = bool(data.get("truncated")) or len(matches) > shown
    lines = [
        "Search results",
        f"Found {total} match{'es' if total != 1 else ''}; showing {shown}.",
        "",
    ]

    for match in matches[:shown]:
        if not isinstance(match, dict):
            lines.append(f"- {match}")
            continue

        path = str(match.get("path") or match.get("file") or match.get("filename") or "?")
        line = match.get("line") or match.get("line_number")
        content = str(match.get("content") or match.get("text") or "").strip()
        loc = f"{path}:{line}" if line else path
        lines.append(f"- {loc}")
        if content:
            snippet = _truncate_text(" ".join(content.split()), 300)
            lines.append(f"  {snippet}")

    if truncated:
        lines.extend([
            "",
            "Results truncated. Narrow the search, add file_glob, or use offset to page.",
        ])
    return _truncate_text("\n".join(lines), limit=7000)


def _format_execute_code_result(result: Optional[str]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return result if isinstance(result, str) and result.strip() else None
    output = str(data.get("output") or "")
    error = str(data.get("error") or "")
    exit_code = data.get("exit_code")
    parts = [f"Exit code: {exit_code}" if exit_code is not None else "Execution complete"]
    if output:
        parts.extend(["", "Output:", output])
    if error:
        parts.extend(["", "Error:", error])
    return _truncate_text("\n".join(parts))


def _extract_markdown_headings(content: str, limit: int = 8) -> list[str]:
    headings: list[str] = []
    for line in content.splitlines():
        stripped = line.strip()
        if stripped.startswith("#"):
            heading = stripped.lstrip("#").strip()
            if heading:
                headings.append(heading)
        if len(headings) >= limit:
            break
    return headings


def _format_skill_view_result(result: Optional[str]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return None
    if data.get("success") is False:
        return f"Skill view failed: {data.get('error', 'unknown error')}"
    name = str(data.get("name") or "skill")
    file_path = str(data.get("file") or data.get("path") or "SKILL.md")
    description = str(data.get("description") or "").strip()
    content = str(data.get("content") or "")
    linked = data.get("linked_files") if isinstance(data.get("linked_files"), dict) else None

    lines = ["**Skill loaded**", "", f"- **Name:** `{name}`", f"- **File:** `{file_path}`"]
    if description:
        lines.append(f"- **Description:** {description}")
    if content:
        lines.append(f"- **Content:** {len(content):,} chars loaded into agent context")
    if linked:
        linked_count = sum(len(v) for v in linked.values() if isinstance(v, list))
        lines.append(f"- **Linked files:** {linked_count}")

    headings = _extract_markdown_headings(content)
    if headings:
        lines.extend(["", "**Sections**"])
        lines.extend(f"- {heading}" for heading in headings)

    lines.extend([
        "",
        "_Full skill content is available to the agent but hidden here to keep ACP readable._",
    ])
    return "\n".join(lines)


def _format_skill_manage_result(result: Optional[str], args: Optional[Dict[str, Any]]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return None

    action = str((args or {}).get("action") or "manage").strip() or "manage"
    name = str((args or {}).get("name") or data.get("name") or "skill").strip() or "skill"
    file_path = str((args or {}).get("file_path") or data.get("file_path") or "SKILL.md").strip() or "SKILL.md"
    success = data.get("success")
    status = "✅ Skill updated" if success is not False else "✗ Skill update failed"

    lines = [f"**{status}**", "", f"- **Action:** `{action}`", f"- **Skill:** `{name}`"]
    if action not in {"delete"}:
        lines.append(f"- **File:** `{file_path}`")

    message = str(data.get("message") or data.get("error") or "").strip()
    if message:
        lines.append(f"- **Result:** {message}")

    replacements = data.get("replacements") or data.get("replacement_count")
    if replacements is not None:
        lines.append(f"- **Replacements:** {replacements}")

    path = str(data.get("path") or "").strip()
    if path:
        lines.append(f"- **Path:** `{path}`")

    return "\n".join(lines)


def _format_web_search_result(result: Optional[str]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return None
    web = data.get("data", {}).get("web") if isinstance(data.get("data"), dict) else data.get("web")
    if not isinstance(web, list):
        return None
    lines = [f"Web results: {len(web)}"]
    for item in web[:10]:
        if not isinstance(item, dict):
            continue
        title = str(item.get("title") or item.get("url") or "result").strip()
        url = str(item.get("url") or "").strip()
        desc = str(item.get("description") or "").strip()
        lines.append(f"• {title}" + (f" — {url}" if url else ""))
        if desc:
            lines.append(f"  {desc}")
    return _truncate_text("\n".join(lines))


def _format_web_extract_result(result: Optional[str]) -> Optional[str]:
    """Return only web_extract errors for ACP; success stays compact via title."""
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return None
    if data.get("success") is False and data.get("error"):
        return f"Web extract failed: {data.get('error')}"
    results = data.get("results")
    if not isinstance(results, list):
        return None

    failures: list[str] = []
    for item in results[:10]:
        if not isinstance(item, dict):
            continue
        error = str(item.get("error") or "").strip()
        if not error or error in {"None", "null"}:
            continue
        url = str(item.get("url") or "").strip()
        title = str(item.get("title") or url or "Untitled").strip()
        failures.append(
            f"- {title}" + (f" — {url}" if url and url != title else "") + f"\n  Error: {_truncate_text(error, limit=500)}"
        )

    if not failures:
        return None
    lines = [f"Web extract failed for {len(failures)} URL{'s' if len(failures) != 1 else ''}"]
    lines.extend(failures)
    return "\n".join(lines)


def _format_process_result(result: Optional[str], args: Optional[Dict[str, Any]]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return result if isinstance(result, str) and result.strip() else None
    if data.get("success") is False and data.get("error"):
        return f"Process error: {data.get('error')}"
    action = str((args or {}).get("action") or "process").strip() or "process"
    if isinstance(data.get("processes"), list):
        processes = data["processes"]
        lines = [f"Processes: {len(processes)}"]
        for proc in processes[:20]:
            if not isinstance(proc, dict):
                lines.append(f"- {proc}")
                continue
            sid = str(proc.get("session_id") or proc.get("id") or "?")
            status = str(proc.get("status") or ("exited" if proc.get("exited") else "running"))
            cmd = str(proc.get("command") or "").strip()
            pid = proc.get("pid")
            code = proc.get("exit_code")
            bits = [status]
            if pid is not None:
                bits.append(f"pid {pid}")
            if code is not None:
                bits.append(f"exit {code}")
            lines.append(f"- `{sid}` — {', '.join(bits)}" + (f" — {cmd[:120]}" if cmd else ""))
        if len(processes) > 20:
            lines.append(f"... {len(processes) - 20} more process(es)")
        return "\n".join(lines)

    status = str(data.get("status") or data.get("state") or action).strip()
    sid = str(data.get("session_id") or (args or {}).get("session_id") or "").strip()
    lines = [f"Process {action}: {status}" + (f" (`{sid}`)" if sid else "")]
    for key, label in (("command", "Command"), ("pid", "PID"), ("exit_code", "Exit code"), ("returncode", "Exit code"), ("lines", "Lines")):
        if data.get(key) is not None:
            lines.append(f"- **{label}:** {data.get(key)}")
    output = data.get("output") or data.get("new_output") or data.get("log") or data.get("stdout")
    error = data.get("error") or data.get("stderr")
    if output:
        lines.extend(["", "Output:", _truncate_text(str(output), limit=5000)])
    if error:
        lines.extend(["", "Error:", _truncate_text(str(error), limit=2000)])
    msg = data.get("message")
    if msg and not output and not error:
        lines.append(str(msg))
    return _truncate_text("\n".join(lines), limit=7000)


def _format_delegate_result(result: Optional[str]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return None
    if data.get("error") and not isinstance(data.get("results"), list):
        return f"Delegation failed: {data.get('error')}"
    results = data.get("results")
    if not isinstance(results, list):
        return None
    total = data.get("total_duration_seconds")
    lines = [f"Delegation results: {len(results)} task{'s' if len(results) != 1 else ''}" + (f" in {total}s" if total is not None else "")]
    icon = {"completed": "✅", "failed": "✗", "error": "✗", "timeout": "⏱", "interrupted": "⚠"}
    for item in results:
        if not isinstance(item, dict):
            lines.append(f"- {item}")
            continue
        idx = item.get("task_index")
        status = str(item.get("status") or "unknown")
        model = item.get("model")
        dur = item.get("duration_seconds")
        role = item.get("_child_role")
        header = f"{icon.get(status, '•')} Task {idx + 1 if isinstance(idx, int) else '?'}: {status}"
        bits = []
        if model:
            bits.append(str(model))
        if role:
            bits.append(f"role={role}")
        if dur is not None:
            bits.append(f"{dur}s")
        if bits:
            header += " (" + ", ".join(bits) + ")"
        lines.extend(["", header])
        summary = str(item.get("summary") or "").strip()
        error = str(item.get("error") or "").strip()
        if summary:
            lines.append(_truncate_text(summary, limit=1200))
        if error:
            lines.append("Error: " + _truncate_text(error, limit=800))
        trace = item.get("tool_trace")
        if isinstance(trace, list) and trace:
            names = [str(t.get("tool") or "?") for t in trace if isinstance(t, dict)]
            if names:
                lines.append("Tools: " + ", ".join(names[:12]) + (f" (+{len(names)-12})" if len(names) > 12 else ""))
    return _truncate_text("\n".join(lines), limit=8000)


def _format_session_search_result(result: Optional[str]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return None
    if data.get("success") is False:
        return f"Session search failed: {data.get('error', 'unknown error')}"
    results = data.get("results")
    if not isinstance(results, list):
        return None
    mode = data.get("mode") or "search"
    query = data.get("query")
    lines = ["Recent sessions" if mode == "recent" else f"Session search results" + (f" for `{query}`" if query else "")]
    if not results:
        lines.append(str(data.get("message") or "No matching sessions found."))
        return "\n".join(lines)
    for item in results:
        if not isinstance(item, dict):
            continue
        sid = str(item.get("session_id") or "?")
        title = str(item.get("title") or item.get("when") or "Untitled session").strip()
        when = str(item.get("last_active") or item.get("started_at") or item.get("when") or "").strip()
        count = item.get("message_count")
        source = str(item.get("source") or "").strip()
        meta = ", ".join(str(x) for x in [when, source, f"{count} msgs" if count is not None else ""] if x)
        lines.append(f"- **{title}** (`{sid}`)" + (f" — {meta}" if meta else ""))
        summary = str(item.get("summary") or item.get("preview") or "").strip()
        if summary:
            lines.append("  " + _truncate_text(" ".join(summary.split()), limit=500))
    return _truncate_text("\n".join(lines), limit=7000)


def _format_memory_result(result: Optional[str], args: Optional[Dict[str, Any]]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return None
    action = str((args or {}).get("action") or "memory").strip() or "memory"
    target = str(data.get("target") or (args or {}).get("target") or "memory")
    if data.get("success") is False:
        lines = [f"✗ Memory {action} failed ({target})", str(data.get("error") or "unknown error")]
        matches = data.get("matches")
        if isinstance(matches, list) and matches:
            lines.append("Matches:")
            lines.extend(f"- {_truncate_text(str(m), 160)}" for m in matches[:5])
        return "\n".join(lines)
    lines = [f"✅ Memory {action} saved ({target})"]
    if data.get("message"):
        lines.append(str(data.get("message")))
    if data.get("entry_count") is not None:
        lines.append(f"Entries: {data.get('entry_count')}")
    if data.get("usage"):
        lines.append(f"Usage: {data.get('usage')}")
    # Avoid dumping all memory entries into ACP UI; show only the explicit new value preview.
    preview = str((args or {}).get("content") or (args or {}).get("old_text") or "").strip()
    if preview:
        lines.append("Preview: " + _truncate_text(preview, limit=300))
    return "\n".join(lines)


def _format_edit_result(tool_name: str, result: Optional[str], args: Optional[Dict[str, Any]]) -> Optional[str]:
    data = _json_loads_maybe(result)
    path = str((args or {}).get("path") or "file").strip()
    if isinstance(data, dict):
        if data.get("success") is False or data.get("error"):
            return f"{tool_name} failed for {path}: {data.get('error', 'unknown error')}"
        message = str(data.get("message") or "").strip()
        replacements = data.get("replacements") or data.get("replacement_count")
        lines = [f"✅ {tool_name} completed" + (f" for `{path}`" if path else "")]
        if message:
            lines.append(message)
        if replacements is not None:
            lines.append(f"Replacements: {replacements}")
        if data.get("files_modified"):
            files = data.get("files_modified")
            if isinstance(files, list):
                lines.append("Files: " + ", ".join(f"`{f}`" for f in files[:8]))
        return "\n".join(lines)
    if isinstance(result, str) and result.strip():
        return _truncate_text(result, limit=3000)
    return f"✅ {tool_name} completed" + (f" for `{path}`" if path else "")


def _format_browser_result(tool_name: str, result: Optional[str], args: Optional[Dict[str, Any]]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return result if isinstance(result, str) and result.strip() else None
    if data.get("success") is False or data.get("error"):
        return f"{tool_name} failed: {data.get('error', 'unknown error')}"
    if tool_name == "browser_get_images":
        images = data.get("images") or data.get("data")
        if isinstance(images, list):
            lines = [f"Images found: {len(images)}"]
            for img in images[:12]:
                if isinstance(img, dict):
                    alt = str(img.get("alt") or "").strip()
                    url = str(img.get("url") or img.get("src") or "").strip()
                    lines.append(f"- {alt or 'image'}" + (f" — {url}" if url else ""))
            return _truncate_text("\n".join(lines), limit=5000)
    title = str(data.get("title") or data.get("url") or data.get("status") or tool_name)
    text = str(data.get("text") or data.get("content") or data.get("snapshot") or data.get("analysis") or data.get("message") or "").strip()
    lines = [title]
    if data.get("url") and data.get("url") != title:
        lines.append(str(data.get("url")))
    if text:
        lines.extend(["", _truncate_text(text, limit=5000)])
    return _truncate_text("\n".join(lines), limit=7000)


def _format_media_or_cron_result(tool_name: str, result: Optional[str]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, dict):
        return result if isinstance(result, str) and result.strip() else None
    if data.get("success") is False or data.get("error"):
        return f"{tool_name} failed: {data.get('error', 'unknown error')}"
    lines = [f"✅ {tool_name} completed"]
    for key in ("file_path", "path", "url", "image_url", "job_id", "id", "status", "message", "next_run"):
        if data.get(key):
            lines.append(f"- **{key}:** {data.get(key)}")
    return "\n".join(lines)


def _format_generic_structured_result(tool_name: str, result: Optional[str]) -> Optional[str]:
    data = _json_loads_maybe(result)
    if not isinstance(data, (dict, list)):
        return result if isinstance(result, str) and result.strip() else None
    if isinstance(data, list):
        lines = [f"{tool_name}: {len(data)} item{'s' if len(data) != 1 else ''}"]
        for item in data[:12]:
            lines.append(f"- {_truncate_text(str(item), limit=240)}")
        return _truncate_text("\n".join(lines), limit=5000)

    if data.get("success") is False or data.get("error"):
        return f"{tool_name} failed: {data.get('error', 'unknown error')}"

    lines = [f"✅ {tool_name} completed" if data.get("success") is True else f"{tool_name} result"]
    priority_keys = (
        "message", "status", "id", "task_id", "issue_id", "title", "name", "entity_id",
        "state", "service", "url", "path", "file_path", "count", "total", "next_run",
    )
    seen = set()
    for key in priority_keys:
        value = data.get(key)
        if value in (None, "", [], {}):
            continue
        seen.add(key)
        lines.append(f"- **{key}:** {_truncate_text(str(value), limit=500)}")

    for key, value in data.items():
        if key in seen or key in {"success", "raw", "content", "entries"}:
            continue
        if value in (None, "", [], {}):
            continue
        if isinstance(value, (dict, list)):
            preview = json.dumps(value, ensure_ascii=False, default=str)
        else:
            preview = str(value)
        lines.append(f"- **{key}:** {_truncate_text(preview, limit=500)}")
        if len(lines) >= 14:
            break

    content = data.get("content")
    if isinstance(content, str) and content.strip():
        lines.extend(["", _truncate_text(content.strip(), limit=1500)])
    return _truncate_text("\n".join(lines), limit=7000)


def _build_polished_completion_content(
    tool_name: str,
    result: Optional[str],
    function_args: Optional[Dict[str, Any]],
) -> Optional[List[Any]]:
    formatter = {
        "todo": lambda: _format_todo_result(result),
        "read_file": lambda: _format_read_file_result(result, function_args),
        "write_file": lambda: _format_edit_result(tool_name, result, function_args),
        "patch": lambda: _format_edit_result(tool_name, result, function_args),
        "search_files": lambda: _format_search_files_result(result),
        "execute_code": lambda: _format_execute_code_result(result),
        "process": lambda: _format_process_result(result, function_args),
        "delegate_task": lambda: _format_delegate_result(result),
        "session_search": lambda: _format_session_search_result(result),
        "memory": lambda: _format_memory_result(result, function_args),
        "skill_view": lambda: _format_skill_view_result(result),
        "skill_manage": lambda: _format_skill_manage_result(result, function_args),
        "web_search": lambda: _format_web_search_result(result),
        "web_extract": lambda: _format_web_extract_result(result),
        "browser_navigate": lambda: _format_browser_result(tool_name, result, function_args),
        "browser_snapshot": lambda: _format_browser_result(tool_name, result, function_args),
        "browser_vision": lambda: _format_browser_result(tool_name, result, function_args),
        "browser_get_images": lambda: _format_browser_result(tool_name, result, function_args),
        "vision_analyze": lambda: _format_media_or_cron_result(tool_name, result),
        "image_generate": lambda: _format_media_or_cron_result(tool_name, result),
        "cronjob": lambda: _format_media_or_cron_result(tool_name, result),
    }.get(tool_name)
    if formatter is None and tool_name in _POLISHED_TOOLS:
        formatter = lambda: _format_generic_structured_result(tool_name, result)
    if formatter is None:
        return None
    text = formatter()
    if not text:
        return None
    return [_text(text)]


def _build_patch_mode_content(patch_text: str) -> List[Any]:
    """Parse V4A patch mode input into ACP diff blocks when possible."""
    if not patch_text:
        return [acp.tool_content(acp.text_block(""))]

    try:
        from tools.patch_parser import OperationType, parse_v4a_patch

        operations, error = parse_v4a_patch(patch_text)
        if error or not operations:
            return [acp.tool_content(acp.text_block(patch_text))]

        content: List[Any] = []
        for op in operations:
            if op.operation == OperationType.UPDATE:
                old_chunks: list[str] = []
                new_chunks: list[str] = []
                for hunk in op.hunks:
                    old_lines = [line.content for line in hunk.lines if line.prefix in (" ", "-")]
                    new_lines = [line.content for line in hunk.lines if line.prefix in (" ", "+")]
                    if old_lines or new_lines:
                        old_chunks.append("\n".join(old_lines))
                        new_chunks.append("\n".join(new_lines))

                old_text = "\n...\n".join(chunk for chunk in old_chunks if chunk)
                new_text = "\n...\n".join(chunk for chunk in new_chunks if chunk)
                if old_text or new_text:
                    content.append(
                        acp.tool_diff_content(
                            path=op.file_path,
                            old_text=old_text or None,
                            new_text=new_text or "",
                        )
                    )
                continue

            if op.operation == OperationType.ADD:
                added_lines = [line.content for hunk in op.hunks for line in hunk.lines if line.prefix == "+"]
                content.append(
                    acp.tool_diff_content(
                        path=op.file_path,
                        new_text="\n".join(added_lines),
                    )
                )
                continue

            if op.operation == OperationType.DELETE:
                content.append(
                    acp.tool_diff_content(
                        path=op.file_path,
                        old_text=f"Delete file: {op.file_path}",
                        new_text="",
                    )
                )
                continue

            if op.operation == OperationType.MOVE:
                content.append(
                    acp.tool_content(acp.text_block(f"Move file: {op.file_path} -> {op.new_path}"))
                )

        return content or [acp.tool_content(acp.text_block(patch_text))]
    except Exception:
        return [acp.tool_content(acp.text_block(patch_text))]


def _strip_diff_prefix(path: str) -> str:
    raw = str(path or "").strip()
    if raw.startswith(("a/", "b/")):
        return raw[2:]
    return raw


def _parse_unified_diff_content(diff_text: str) -> List[Any]:
    """Convert unified diff text into ACP diff content blocks."""
    if not diff_text:
        return []

    content: List[Any] = []
    current_old_path: Optional[str] = None
    current_new_path: Optional[str] = None
    old_lines: list[str] = []
    new_lines: list[str] = []

    def _flush() -> None:
        nonlocal current_old_path, current_new_path, old_lines, new_lines
        if current_old_path is None and current_new_path is None:
            return
        path = current_new_path if current_new_path and current_new_path != "/dev/null" else current_old_path
        if not path or path == "/dev/null":
            current_old_path = None
            current_new_path = None
            old_lines = []
            new_lines = []
            return
        content.append(
            acp.tool_diff_content(
                path=_strip_diff_prefix(path),
                old_text="\n".join(old_lines) if old_lines else None,
                new_text="\n".join(new_lines),
            )
        )
        current_old_path = None
        current_new_path = None
        old_lines = []
        new_lines = []

    for line in diff_text.splitlines():
        if line.startswith("--- "):
            _flush()
            current_old_path = line[4:].strip()
            continue
        if line.startswith("+++ "):
            current_new_path = line[4:].strip()
            continue
        if line.startswith("@@"):
            continue
        if current_old_path is None and current_new_path is None:
            continue
        if line.startswith("+"):
            new_lines.append(line[1:])
        elif line.startswith("-"):
            old_lines.append(line[1:])
        elif line.startswith(" "):
            shared = line[1:]
            old_lines.append(shared)
            new_lines.append(shared)

    _flush()
    return content


def _build_tool_complete_content(
    tool_name: str,
    result: Optional[str],
    *,
    function_args: Optional[Dict[str, Any]] = None,
    snapshot: Any = None,
) -> List[Any]:
    """Build structured ACP completion content, falling back to plain text."""
    display_result = result or ""
    if len(display_result) > 5000:
        display_result = display_result[:4900] + f"\n... ({len(result)} chars total, truncated)"

    if tool_name in {"write_file", "patch", "skill_manage"}:
        try:
            from agent.display import extract_edit_diff

            diff_text = extract_edit_diff(
                tool_name,
                result,
                function_args=function_args,
                snapshot=snapshot,
            )
            if isinstance(diff_text, str) and diff_text.strip():
                diff_content = _parse_unified_diff_content(diff_text)
                if diff_content:
                    return diff_content
        except Exception:
            pass

    polished_content = _build_polished_completion_content(tool_name, result, function_args)
    if polished_content:
        return polished_content

    return [_text(display_result)]


# ---------------------------------------------------------------------------
# Build ACP content objects for tool-call events
# ---------------------------------------------------------------------------


def build_tool_start(
    tool_call_id: str,
    tool_name: str,
    arguments: Dict[str, Any],
) -> ToolCallStart:
    """Create a ToolCallStart event for the given hermes tool invocation."""
    kind = get_tool_kind(tool_name)
    title = build_tool_title(tool_name, arguments)
    locations = extract_locations(arguments)

    if tool_name == "patch":
        mode = arguments.get("mode", "replace")
        if mode == "replace":
            path = arguments.get("path", "")
            old = arguments.get("old_string", "")
            new = arguments.get("new_string", "")
            content = [acp.tool_diff_content(path=path, new_text=new, old_text=old)]
        else:
            patch_text = arguments.get("patch", "")
            content = _build_patch_mode_content(patch_text)
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name == "write_file":
        path = arguments.get("path", "")
        file_content = arguments.get("content", "")
        content = [acp.tool_diff_content(path=path, new_text=file_content)]
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name == "terminal":
        command = arguments.get("command", "")
        content = [_text(f"$ {command}")]
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name == "read_file":
        # The title and location already identify the file. Sending a synthetic
        # "Reading ..." content block makes Zed render an unhelpful Output
        # section before the real file contents arrive on completion.
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=None, locations=locations,
        )

    if tool_name == "search_files":
        pattern = arguments.get("pattern", "")
        target = arguments.get("target", "content")
        search_path = arguments.get("path")
        where = f" in {search_path}" if search_path else ""
        content = [_text(f"Searching for '{pattern}' ({target}){where}")]
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name == "todo":
        items = arguments.get("todos")
        if isinstance(items, list):
            preview_lines = ["Updating todo list", ""]
            for item in items[:8]:
                if isinstance(item, dict):
                    preview_lines.append(f"- {item.get('status', 'pending')}: {item.get('content', item.get('id', ''))}")
            if len(items) > 8:
                preview_lines.append(f"... {len(items) - 8} more")
            content = [_text("\n".join(preview_lines))]
        else:
            content = [_text("Reading todo list")]
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name == "skill_view":
        name = str(arguments.get("name") or "?").strip() or "?"
        file_path = str(arguments.get("file_path") or "SKILL.md").strip() or "SKILL.md"
        content = [_text(f"Loading skill '{name}' ({file_path})")]
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name == "skill_manage":
        action = str(arguments.get("action") or "manage").strip() or "manage"
        name = str(arguments.get("name") or "?").strip() or "?"
        file_path = str(arguments.get("file_path") or "SKILL.md").strip() or "SKILL.md"
        path = f"skills/{name}/{file_path}" if file_path else f"skills/{name}"

        if action == "patch":
            old = str(arguments.get("old_string") or "")
            new = str(arguments.get("new_string") or "")
            content = [acp.tool_diff_content(path=path, old_text=old or None, new_text=new)]
        elif action in {"edit", "create"}:
            content = [
                acp.tool_diff_content(
                    path=path,
                    new_text=str(arguments.get("content") or ""),
                )
            ]
        elif action == "write_file":
            target = str(arguments.get("file_path") or "file")
            content = [
                acp.tool_diff_content(
                    path=f"skills/{name}/{target}",
                    new_text=str(arguments.get("file_content") or ""),
                )
            ]
        elif action in {"delete", "remove_file"}:
            target = str(arguments.get("file_path") or file_path or name)
            content = [_text(f"Removing {target} from skill '{name}'")]
        else:
            content = [_text(f"Running skill_manage action '{action}' on skill '{name}' ({file_path})")]

        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name == "execute_code":
        code = str(arguments.get("code") or "").strip()
        preview = code[:1200] + (f"\n... ({len(code)} chars total, truncated)" if len(code) > 1200 else "")
        content = [_text(f"Running Python helper script:\n\n```python\n{preview}\n```" if preview else "Running Python helper script")]
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name == "web_search":
        query = str(arguments.get("query") or "").strip()
        content = [_text(f"Searching the web for: {query}" if query else "Searching the web")]
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name == "web_extract":
        # The title identifies the URL(s). Avoid a duplicate content block so
        # Zed renders this like read_file: compact start, concise completion.
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=None, locations=locations,
        )

    if tool_name == "process":
        action = str(arguments.get("action") or "").strip() or "manage"
        sid = str(arguments.get("session_id") or "").strip()
        data_preview = str(arguments.get("data") or "").strip()
        text = f"Process action: {action}" + (f"\nSession: {sid}" if sid else "")
        if data_preview:
            text += "\nInput: " + _truncate_text(data_preview, limit=500)
        content = [_text(text)]
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name == "delegate_task":
        tasks = arguments.get("tasks")
        if isinstance(tasks, list) and tasks:
            lines = [f"Delegating {len(tasks)} tasks", ""]
            for i, task in enumerate(tasks[:8], 1):
                if isinstance(task, dict):
                    goal = str(task.get("goal") or "").strip()
                    role = str(task.get("role") or "").strip()
                    lines.append(f"{i}. " + _truncate_text(goal, limit=160) + (f" ({role})" if role else ""))
            if len(tasks) > 8:
                lines.append(f"... {len(tasks) - 8} more")
            content = [_text("\n".join(lines))]
        else:
            goal = str(arguments.get("goal") or "").strip()
            content = [_text("Delegating task" + (f":\n{_truncate_text(goal, limit=800)}" if goal else ""))]
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name == "session_search":
        query = str(arguments.get("query") or "").strip()
        content = [_text(f"Searching past sessions for: {query}" if query else "Loading recent sessions")]
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name == "memory":
        action = str(arguments.get("action") or "manage").strip() or "manage"
        target = str(arguments.get("target") or "memory").strip() or "memory"
        preview = str(arguments.get("content") or arguments.get("old_text") or "").strip()
        text = f"Memory {action} ({target})"
        if preview:
            text += "\nPreview: " + _truncate_text(preview, limit=500)
        content = [_text(text)]
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    if tool_name in _POLISHED_TOOLS:
        try:
            args_text = json.dumps(arguments, indent=2, default=str)
        except (TypeError, ValueError):
            args_text = str(arguments)
        content = [_text(_truncate_text(args_text, limit=1200))]
        return acp.start_tool_call(
            tool_call_id, title, kind=kind, content=content, locations=locations,
        )

    # Generic fallback
    import json
    try:
        args_text = json.dumps(arguments, indent=2, default=str)
    except (TypeError, ValueError):
        args_text = str(arguments)
    content = [acp.tool_content(acp.text_block(args_text))]
    return acp.start_tool_call(
        tool_call_id, title, kind=kind, content=content, locations=locations,
        raw_input=None if tool_name in _POLISHED_TOOLS else arguments,
    )


def build_tool_complete(
    tool_call_id: str,
    tool_name: str,
    result: Optional[str] = None,
    function_args: Optional[Dict[str, Any]] = None,
    snapshot: Any = None,
) -> ToolCallProgress:
    """Create a ToolCallUpdate (progress) event for a completed tool call."""
    kind = get_tool_kind(tool_name)
    if tool_name == "web_extract":
        error_text = _format_web_extract_result(result)
        content = [_text(error_text)] if error_text else None
    else:
        content = _build_tool_complete_content(
            tool_name,
            result,
            function_args=function_args,
            snapshot=snapshot,
        )
    return acp.update_tool_call(
        tool_call_id,
        kind=kind,
        status="completed",
        content=content,
        raw_output=None if tool_name in _POLISHED_TOOLS else result,
    )


# ---------------------------------------------------------------------------
# Location extraction
# ---------------------------------------------------------------------------


def extract_locations(
    arguments: Dict[str, Any],
) -> List[ToolCallLocation]:
    """Extract file-system locations from tool arguments."""
    locations: List[ToolCallLocation] = []
    path = arguments.get("path")
    if path:
        line = arguments.get("offset") or arguments.get("line")
        locations.append(ToolCallLocation(path=path, line=line))
    return locations
