"""OpenAI-compatible facade that talks to Google's Cloud Code Assist backend.

This adapter lets Hermes use the ``google-gemini-cli`` provider as if it were
a standard OpenAI-shaped chat completion endpoint, while the underlying HTTP
traffic goes to ``cloudcode-pa.googleapis.com/v1internal:{generateContent,
streamGenerateContent}`` with a Bearer access token obtained via OAuth PKCE.

Architecture
------------
- ``GeminiCloudCodeClient`` exposes ``.chat.completions.create(**kwargs)``
  mirroring the subset of the OpenAI SDK that ``run_agent.py`` uses.
- Incoming OpenAI ``messages[]`` / ``tools[]`` / ``tool_choice`` are translated
  to Gemini's native ``contents[]`` / ``tools[].functionDeclarations`` /
  ``toolConfig`` / ``systemInstruction`` shape.
- The request body is wrapped ``{project, model, user_prompt_id, request}``
  per Code Assist API expectations.
- Responses (``candidates[].content.parts[]``) are converted back to
  OpenAI ``choices[0].message`` shape with ``content`` + ``tool_calls``.
- Streaming uses SSE (``?alt=sse``) and yields OpenAI-shaped delta chunks.

Attribution
-----------
Translation semantics follow jenslys/opencode-gemini-auth (MIT) and the public
Gemini API docs. Request envelope shape
(``{project, model, user_prompt_id, request}``) is documented nowhere; it is
reverse-engineered from the opencode-gemini-auth and clawdbot implementations.
"""

from __future__ import annotations

import json
import logging
import time
import uuid
from types import SimpleNamespace
from typing import Any, Dict, Iterator, List, Optional

import httpx

from agent import google_oauth
from agent.gemini_schema import sanitize_gemini_tool_parameters
from agent.google_code_assist import (
    CODE_ASSIST_ENDPOINT,
    CodeAssistError,
    ProjectContext,
    resolve_project_context,
)

logger = logging.getLogger(__name__)


# =============================================================================
# Request translation: OpenAI → Gemini
# =============================================================================

_ROLE_MAP_OPENAI_TO_GEMINI = {
    "user": "user",
    "assistant": "model",
    "system": "user",   # handled separately via systemInstruction
    "tool": "user",     # functionResponse is wrapped in a user-role turn
    "function": "user",
}


def _coerce_content_to_text(content: Any) -> str:
    """OpenAI content may be str or a list of parts; reduce to plain text."""
    if content is None:
        return ""
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        pieces: List[str] = []
        for p in content:
            if isinstance(p, str):
                pieces.append(p)
            elif isinstance(p, dict):
                if p.get("type") == "text" and isinstance(p.get("text"), str):
                    pieces.append(p["text"])
                # Multimodal (image_url, etc.) — stub for now; log and skip
                elif p.get("type") in ("image_url", "input_audio"):
                    logger.debug("Dropping multimodal part (not yet supported): %s", p.get("type"))
        return "\n".join(pieces)
    return str(content)


def _translate_tool_call_to_gemini(tool_call: Dict[str, Any]) -> Dict[str, Any]:
    """OpenAI tool_call -> Gemini functionCall part."""
    fn = tool_call.get("function") or {}
    args_raw = fn.get("arguments", "")
    try:
        args = json.loads(args_raw) if isinstance(args_raw, str) and args_raw else {}
    except json.JSONDecodeError:
        args = {"_raw": args_raw}
    if not isinstance(args, dict):
        args = {"_value": args}
    return {
        "functionCall": {
            "name": fn.get("name") or "",
            "args": args,
        },
        # Sentinel signature — matches opencode-gemini-auth's approach.
        # Without this, Code Assist rejects function calls that originated
        # outside its own chain.
        "thoughtSignature": "skip_thought_signature_validator",
    }


def _translate_tool_result_to_gemini(message: Dict[str, Any]) -> Dict[str, Any]:
    """OpenAI tool-role message -> Gemini functionResponse part.

    The function name isn't in the OpenAI tool message directly; it must be
    passed via the assistant message that issued the call. For simplicity we
    look up ``name`` on the message (OpenAI SDK copies it there) or on the
    ``tool_call_id`` cross-reference.
    """
    name = str(message.get("name") or message.get("tool_call_id") or "tool")
    content = _coerce_content_to_text(message.get("content"))
    # Gemini expects the response as a dict under `response`. We wrap plain
    # text in {"output": "..."}.
    try:
        parsed = json.loads(content) if content.strip().startswith(("{", "[")) else None
    except json.JSONDecodeError:
        parsed = None
    response = parsed if isinstance(parsed, dict) else {"output": content}
    return {
        "functionResponse": {
            "name": name,
            "response": response,
        },
    }


def _build_gemini_contents(
    messages: List[Dict[str, Any]],
) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
    """Convert OpenAI messages[] to Gemini contents[] + systemInstruction."""
    system_text_parts: List[str] = []
    contents: List[Dict[str, Any]] = []

    for msg in messages:
        if not isinstance(msg, dict):
            continue
        role = str(msg.get("role") or "user")

        if role == "system":
            system_text_parts.append(_coerce_content_to_text(msg.get("content")))
            continue

        # Tool result message — emit a user-role turn with functionResponse
        if role == "tool" or role == "function":
            contents.append({
                "role": "user",
                "parts": [_translate_tool_result_to_gemini(msg)],
            })
            continue

        gemini_role = _ROLE_MAP_OPENAI_TO_GEMINI.get(role, "user")
        parts: List[Dict[str, Any]] = []

        text = _coerce_content_to_text(msg.get("content"))
        if text:
            parts.append({"text": text})

        # Assistant messages can carry tool_calls
        tool_calls = msg.get("tool_calls") or []
        if isinstance(tool_calls, list):
            for tc in tool_calls:
                if isinstance(tc, dict):
                    parts.append(_translate_tool_call_to_gemini(tc))

        if not parts:
            # Gemini rejects empty parts; skip the turn entirely
            continue

        contents.append({"role": gemini_role, "parts": parts})

    system_instruction: Optional[Dict[str, Any]] = None
    joined_system = "\n".join(p for p in system_text_parts if p).strip()
    if joined_system:
        system_instruction = {
            "role": "system",
            "parts": [{"text": joined_system}],
        }

    return contents, system_instruction


def _translate_tools_to_gemini(tools: Any) -> List[Dict[str, Any]]:
    """OpenAI tools[] -> Gemini tools[].functionDeclarations[]."""
    if not isinstance(tools, list) or not tools:
        return []
    declarations: List[Dict[str, Any]] = []
    for t in tools:
        if not isinstance(t, dict):
            continue
        fn = t.get("function") or {}
        if not isinstance(fn, dict):
            continue
        name = fn.get("name")
        if not name:
            continue
        decl = {"name": str(name)}
        if fn.get("description"):
            decl["description"] = str(fn["description"])
        params = fn.get("parameters")
        if isinstance(params, dict):
            decl["parameters"] = sanitize_gemini_tool_parameters(params)
        declarations.append(decl)
    if not declarations:
        return []
    return [{"functionDeclarations": declarations}]


def _translate_tool_choice_to_gemini(tool_choice: Any) -> Optional[Dict[str, Any]]:
    """OpenAI tool_choice -> Gemini toolConfig.functionCallingConfig."""
    if tool_choice is None:
        return None
    if isinstance(tool_choice, str):
        if tool_choice == "auto":
            return {"functionCallingConfig": {"mode": "AUTO"}}
        if tool_choice == "required":
            return {"functionCallingConfig": {"mode": "ANY"}}
        if tool_choice == "none":
            return {"functionCallingConfig": {"mode": "NONE"}}
    if isinstance(tool_choice, dict):
        fn = tool_choice.get("function") or {}
        name = fn.get("name")
        if name:
            return {
                "functionCallingConfig": {
                    "mode": "ANY",
                    "allowedFunctionNames": [str(name)],
                },
            }
    return None


def _normalize_thinking_config(config: Any) -> Optional[Dict[str, Any]]:
    """Accept thinkingBudget / thinkingLevel / includeThoughts (+ snake_case)."""
    if not isinstance(config, dict) or not config:
        return None
    budget = config.get("thinkingBudget", config.get("thinking_budget"))
    level = config.get("thinkingLevel", config.get("thinking_level"))
    include = config.get("includeThoughts", config.get("include_thoughts"))
    normalized: Dict[str, Any] = {}
    if isinstance(budget, (int, float)):
        normalized["thinkingBudget"] = int(budget)
    if isinstance(level, str) and level.strip():
        normalized["thinkingLevel"] = level.strip().lower()
    if isinstance(include, bool):
        normalized["includeThoughts"] = include
    return normalized or None


def build_gemini_request(
    *,
    messages: List[Dict[str, Any]],
    tools: Any = None,
    tool_choice: Any = None,
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    top_p: Optional[float] = None,
    stop: Any = None,
    thinking_config: Any = None,
) -> Dict[str, Any]:
    """Build the inner Gemini request body (goes inside ``request`` wrapper)."""
    contents, system_instruction = _build_gemini_contents(messages)

    body: Dict[str, Any] = {"contents": contents}
    if system_instruction is not None:
        body["systemInstruction"] = system_instruction

    gemini_tools = _translate_tools_to_gemini(tools)
    if gemini_tools:
        body["tools"] = gemini_tools
    tool_cfg = _translate_tool_choice_to_gemini(tool_choice)
    if tool_cfg is not None:
        body["toolConfig"] = tool_cfg

    generation_config: Dict[str, Any] = {}
    if isinstance(temperature, (int, float)):
        generation_config["temperature"] = float(temperature)
    if isinstance(max_tokens, int) and max_tokens > 0:
        generation_config["maxOutputTokens"] = max_tokens
    if isinstance(top_p, (int, float)):
        generation_config["topP"] = float(top_p)
    if isinstance(stop, str) and stop:
        generation_config["stopSequences"] = [stop]
    elif isinstance(stop, list) and stop:
        generation_config["stopSequences"] = [str(s) for s in stop if s]
    normalized_thinking = _normalize_thinking_config(thinking_config)
    if normalized_thinking:
        generation_config["thinkingConfig"] = normalized_thinking
    if generation_config:
        body["generationConfig"] = generation_config

    return body


def wrap_code_assist_request(
    *,
    project_id: str,
    model: str,
    inner_request: Dict[str, Any],
    user_prompt_id: Optional[str] = None,
) -> Dict[str, Any]:
    """Wrap the inner Gemini request in the Code Assist envelope."""
    return {
        "project": project_id,
        "model": model,
        "user_prompt_id": user_prompt_id or str(uuid.uuid4()),
        "request": inner_request,
    }


# =============================================================================
# Response translation: Gemini → OpenAI
# =============================================================================

def _translate_gemini_response(
    resp: Dict[str, Any],
    model: str,
) -> SimpleNamespace:
    """Non-streaming Gemini response -> OpenAI-shaped SimpleNamespace.

    Code Assist wraps the actual Gemini response inside ``response``, so we
    unwrap it first if present.
    """
    inner = resp.get("response") if isinstance(resp.get("response"), dict) else resp

    candidates = inner.get("candidates") or []
    if not isinstance(candidates, list) or not candidates:
        return _empty_response(model)

    cand = candidates[0]
    content_obj = cand.get("content") if isinstance(cand, dict) else {}
    parts = content_obj.get("parts") if isinstance(content_obj, dict) else []

    text_pieces: List[str] = []
    reasoning_pieces: List[str] = []
    tool_calls: List[SimpleNamespace] = []

    for i, part in enumerate(parts or []):
        if not isinstance(part, dict):
            continue
        # Thought parts are model's internal reasoning — surface as reasoning,
        # don't mix into content.
        if part.get("thought") is True:
            if isinstance(part.get("text"), str):
                reasoning_pieces.append(part["text"])
            continue
        if isinstance(part.get("text"), str):
            text_pieces.append(part["text"])
            continue
        fc = part.get("functionCall")
        if isinstance(fc, dict) and fc.get("name"):
            try:
                args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False)
            except (TypeError, ValueError):
                args_str = "{}"
            tool_calls.append(SimpleNamespace(
                id=f"call_{uuid.uuid4().hex[:12]}",
                type="function",
                index=i,
                function=SimpleNamespace(name=str(fc["name"]), arguments=args_str),
            ))

    finish_reason = "tool_calls" if tool_calls else _map_gemini_finish_reason(
        str(cand.get("finishReason") or "")
    )

    usage_meta = inner.get("usageMetadata") or {}
    usage = SimpleNamespace(
        prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
        completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
        total_tokens=int(usage_meta.get("totalTokenCount") or 0),
        prompt_tokens_details=SimpleNamespace(
            cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0),
        ),
    )

    message = SimpleNamespace(
        role="assistant",
        content="".join(text_pieces) if text_pieces else None,
        tool_calls=tool_calls or None,
        reasoning="".join(reasoning_pieces) or None,
        reasoning_content="".join(reasoning_pieces) or None,
        reasoning_details=None,
    )
    choice = SimpleNamespace(
        index=0,
        message=message,
        finish_reason=finish_reason,
    )
    return SimpleNamespace(
        id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
        object="chat.completion",
        created=int(time.time()),
        model=model,
        choices=[choice],
        usage=usage,
    )


def _empty_response(model: str) -> SimpleNamespace:
    message = SimpleNamespace(
        role="assistant", content="", tool_calls=None,
        reasoning=None, reasoning_content=None, reasoning_details=None,
    )
    choice = SimpleNamespace(index=0, message=message, finish_reason="stop")
    usage = SimpleNamespace(
        prompt_tokens=0, completion_tokens=0, total_tokens=0,
        prompt_tokens_details=SimpleNamespace(cached_tokens=0),
    )
    return SimpleNamespace(
        id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
        object="chat.completion",
        created=int(time.time()),
        model=model,
        choices=[choice],
        usage=usage,
    )


def _map_gemini_finish_reason(reason: str) -> str:
    mapping = {
        "STOP": "stop",
        "MAX_TOKENS": "length",
        "SAFETY": "content_filter",
        "RECITATION": "content_filter",
        "OTHER": "stop",
    }
    return mapping.get(reason.upper(), "stop")


# =============================================================================
# Streaming SSE iterator
# =============================================================================

class _GeminiStreamChunk(SimpleNamespace):
    """Mimics an OpenAI ChatCompletionChunk with .choices[0].delta."""
    pass


def _make_stream_chunk(
    *,
    model: str,
    content: str = "",
    tool_call_delta: Optional[Dict[str, Any]] = None,
    finish_reason: Optional[str] = None,
    reasoning: str = "",
) -> _GeminiStreamChunk:
    delta_kwargs: Dict[str, Any] = {"role": "assistant"}
    if content:
        delta_kwargs["content"] = content
    if tool_call_delta is not None:
        delta_kwargs["tool_calls"] = [SimpleNamespace(
            index=tool_call_delta.get("index", 0),
            id=tool_call_delta.get("id") or f"call_{uuid.uuid4().hex[:12]}",
            type="function",
            function=SimpleNamespace(
                name=tool_call_delta.get("name") or "",
                arguments=tool_call_delta.get("arguments") or "",
            ),
        )]
    if reasoning:
        delta_kwargs["reasoning"] = reasoning
        delta_kwargs["reasoning_content"] = reasoning
    delta = SimpleNamespace(**delta_kwargs)
    choice = SimpleNamespace(index=0, delta=delta, finish_reason=finish_reason)
    return _GeminiStreamChunk(
        id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
        object="chat.completion.chunk",
        created=int(time.time()),
        model=model,
        choices=[choice],
        usage=None,
    )


def _iter_sse_events(response: httpx.Response) -> Iterator[Dict[str, Any]]:
    """Parse Server-Sent Events from an httpx streaming response."""
    buffer = ""
    for chunk in response.iter_text():
        if not chunk:
            continue
        buffer += chunk
        while "\n" in buffer:
            line, buffer = buffer.split("\n", 1)
            line = line.rstrip("\r")
            if not line:
                continue
            if line.startswith("data: "):
                data = line[6:]
                if data == "[DONE]":
                    return
                try:
                    yield json.loads(data)
                except json.JSONDecodeError:
                    logger.debug("Non-JSON SSE line: %s", data[:200])


def _translate_stream_event(
    event: Dict[str, Any],
    model: str,
    tool_call_counter: List[int],
) -> List[_GeminiStreamChunk]:
    """Unwrap Code Assist envelope and emit OpenAI-shaped chunk(s).

    ``tool_call_counter`` is a single-element list used as a mutable counter
    across events in the same stream. Each ``functionCall`` part gets a
    fresh, unique OpenAI ``index`` — keying by function name would collide
    whenever the model issues parallel calls to the same tool (e.g. reading
    three files in one turn).
    """
    inner = event.get("response") if isinstance(event.get("response"), dict) else event
    candidates = inner.get("candidates") or []
    if not candidates:
        return []
    cand = candidates[0]
    if not isinstance(cand, dict):
        return []

    chunks: List[_GeminiStreamChunk] = []

    content = cand.get("content") or {}
    parts = content.get("parts") if isinstance(content, dict) else []
    for part in parts or []:
        if not isinstance(part, dict):
            continue
        if part.get("thought") is True and isinstance(part.get("text"), str):
            chunks.append(_make_stream_chunk(
                model=model, reasoning=part["text"],
            ))
            continue
        if isinstance(part.get("text"), str) and part["text"]:
            chunks.append(_make_stream_chunk(model=model, content=part["text"]))
        fc = part.get("functionCall")
        if isinstance(fc, dict) and fc.get("name"):
            name = str(fc["name"])
            idx = tool_call_counter[0]
            tool_call_counter[0] += 1
            try:
                args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False)
            except (TypeError, ValueError):
                args_str = "{}"
            chunks.append(_make_stream_chunk(
                model=model,
                tool_call_delta={
                    "index": idx,
                    "name": name,
                    "arguments": args_str,
                },
            ))

    finish_reason_raw = str(cand.get("finishReason") or "")
    if finish_reason_raw:
        mapped = _map_gemini_finish_reason(finish_reason_raw)
        if tool_call_counter[0] > 0:
            mapped = "tool_calls"
        chunks.append(_make_stream_chunk(model=model, finish_reason=mapped))
    return chunks


# =============================================================================
# GeminiCloudCodeClient — OpenAI-compatible facade
# =============================================================================

MARKER_BASE_URL = "cloudcode-pa://google"


class _GeminiChatCompletions:
    def __init__(self, client: "GeminiCloudCodeClient"):
        self._client = client

    def create(self, **kwargs: Any) -> Any:
        return self._client._create_chat_completion(**kwargs)


class _GeminiChatNamespace:
    def __init__(self, client: "GeminiCloudCodeClient"):
        self.completions = _GeminiChatCompletions(client)


class GeminiCloudCodeClient:
    """Minimal OpenAI-SDK-compatible facade over Code Assist v1internal."""

    def __init__(
        self,
        *,
        api_key: Optional[str] = None,
        base_url: Optional[str] = None,
        default_headers: Optional[Dict[str, str]] = None,
        project_id: str = "",
        **_: Any,
    ):
        # `api_key` here is a dummy — real auth is the OAuth access token
        # fetched on every call via agent.google_oauth.get_valid_access_token().
        # We accept the kwarg for openai.OpenAI interface parity.
        self.api_key = api_key or "google-oauth"
        self.base_url = base_url or MARKER_BASE_URL
        self._default_headers = dict(default_headers or {})
        self._configured_project_id = project_id
        self._project_context: Optional[ProjectContext] = None
        self._project_context_lock = False  # simple single-thread guard
        self.chat = _GeminiChatNamespace(self)
        self.is_closed = False
        self._http = httpx.Client(timeout=httpx.Timeout(connect=15.0, read=600.0, write=30.0, pool=30.0))

    def close(self) -> None:
        self.is_closed = True
        try:
            self._http.close()
        except Exception:
            pass

    # Implement the OpenAI SDK's context-manager-ish closure check
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def _ensure_project_context(self, access_token: str, model: str) -> ProjectContext:
        """Lazily resolve and cache the project context for this client."""
        if self._project_context is not None:
            return self._project_context

        env_project = google_oauth.resolve_project_id_from_env()
        creds = google_oauth.load_credentials()
        stored_project = creds.project_id if creds else ""

        # Prefer what's already baked into the creds
        if stored_project:
            self._project_context = ProjectContext(
                project_id=stored_project,
                managed_project_id=creds.managed_project_id if creds else "",
                tier_id="",
                source="stored",
            )
            return self._project_context

        ctx = resolve_project_context(
            access_token,
            configured_project_id=self._configured_project_id,
            env_project_id=env_project,
            user_agent_model=model,
        )
        # Persist discovered project back to the creds file so the next
        # session doesn't re-run the discovery.
        if ctx.project_id or ctx.managed_project_id:
            google_oauth.update_project_ids(
                project_id=ctx.project_id,
                managed_project_id=ctx.managed_project_id,
            )
        self._project_context = ctx
        return ctx

    def _create_chat_completion(
        self,
        *,
        model: str = "gemini-2.5-flash",
        messages: Optional[List[Dict[str, Any]]] = None,
        stream: bool = False,
        tools: Any = None,
        tool_choice: Any = None,
        temperature: Optional[float] = None,
        max_tokens: Optional[int] = None,
        top_p: Optional[float] = None,
        stop: Any = None,
        extra_body: Optional[Dict[str, Any]] = None,
        timeout: Any = None,
        **_: Any,
    ) -> Any:
        access_token = google_oauth.get_valid_access_token()
        ctx = self._ensure_project_context(access_token, model)

        thinking_config = None
        if isinstance(extra_body, dict):
            thinking_config = extra_body.get("thinking_config") or extra_body.get("thinkingConfig")

        inner = build_gemini_request(
            messages=messages or [],
            tools=tools,
            tool_choice=tool_choice,
            temperature=temperature,
            max_tokens=max_tokens,
            top_p=top_p,
            stop=stop,
            thinking_config=thinking_config,
        )
        wrapped = wrap_code_assist_request(
            project_id=ctx.project_id,
            model=model,
            inner_request=inner,
        )

        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "Authorization": f"Bearer {access_token}",
            "User-Agent": "hermes-agent (gemini-cli-compat)",
            "X-Goog-Api-Client": "gl-python/hermes",
            "x-activity-request-id": str(uuid.uuid4()),
        }
        headers.update(self._default_headers)

        if stream:
            return self._stream_completion(model=model, wrapped=wrapped, headers=headers)

        url = f"{CODE_ASSIST_ENDPOINT}/v1internal:generateContent"
        response = self._http.post(url, json=wrapped, headers=headers)
        if response.status_code != 200:
            raise _gemini_http_error(response)
        try:
            payload = response.json()
        except ValueError as exc:
            raise CodeAssistError(
                f"Invalid JSON from Code Assist: {exc}",
                code="code_assist_invalid_json",
            ) from exc
        return _translate_gemini_response(payload, model=model)

    def _stream_completion(
        self,
        *,
        model: str,
        wrapped: Dict[str, Any],
        headers: Dict[str, str],
    ) -> Iterator[_GeminiStreamChunk]:
        """Generator that yields OpenAI-shaped streaming chunks."""
        url = f"{CODE_ASSIST_ENDPOINT}/v1internal:streamGenerateContent?alt=sse"
        stream_headers = dict(headers)
        stream_headers["Accept"] = "text/event-stream"

        def _generator() -> Iterator[_GeminiStreamChunk]:
            try:
                with self._http.stream("POST", url, json=wrapped, headers=stream_headers) as response:
                    if response.status_code != 200:
                        # Materialize error body for better diagnostics
                        response.read()
                        raise _gemini_http_error(response)
                    tool_call_counter: List[int] = [0]
                    for event in _iter_sse_events(response):
                        for chunk in _translate_stream_event(event, model, tool_call_counter):
                            yield chunk
            except httpx.HTTPError as exc:
                raise CodeAssistError(
                    f"Streaming request failed: {exc}",
                    code="code_assist_stream_error",
                ) from exc

        return _generator()


def _gemini_http_error(response: httpx.Response) -> CodeAssistError:
    """Translate an httpx response into a CodeAssistError with rich metadata.

    Parses Google's error envelope (``{"error": {"code", "message", "status",
    "details": [...]}}``) so the agent's error classifier can reason about
    the failure — ``status_code`` enables the rate_limit / auth classification
    paths, and ``response`` lets the main loop honor ``Retry-After`` just
    like it does for OpenAI SDK exceptions.

    Also lifts a few recognizable Google conditions into human-readable
    messages so the user sees something better than a 500-char JSON dump:

        MODEL_CAPACITY_EXHAUSTED → "Gemini model capacity exhausted for
            <model>. This is a Google-side throttle..."
        RESOURCE_EXHAUSTED w/o reason → quota-style message
        404 → "Model <name> not found at cloudcode-pa..."
    """
    status = response.status_code

    # Parse the body once, surviving any weird encodings.
    body_text = ""
    body_json: Dict[str, Any] = {}
    try:
        body_text = response.text
    except Exception:
        body_text = ""
    if body_text:
        try:
            parsed = json.loads(body_text)
            if isinstance(parsed, dict):
                body_json = parsed
        except (ValueError, TypeError):
            body_json = {}

    # Dig into Google's error envelope.  Shape is:
    #   {"error": {"code": 429, "message": "...", "status": "RESOURCE_EXHAUSTED",
    #              "details": [{"@type": ".../ErrorInfo", "reason": "MODEL_CAPACITY_EXHAUSTED",
    #                           "metadata": {...}},
    #                          {"@type": ".../RetryInfo", "retryDelay": "30s"}]}}
    err_obj = body_json.get("error") if isinstance(body_json, dict) else None
    if not isinstance(err_obj, dict):
        err_obj = {}
    err_status = str(err_obj.get("status") or "").strip()
    err_message = str(err_obj.get("message") or "").strip()
    _raw_details = err_obj.get("details")
    err_details_list = _raw_details if isinstance(_raw_details, list) else []

    # Extract google.rpc.ErrorInfo reason + metadata.  There may be more
    # than one ErrorInfo (rare), so we pick the first one with a reason.
    error_reason = ""
    error_metadata: Dict[str, Any] = {}
    retry_delay_seconds: Optional[float] = None
    for detail in err_details_list:
        if not isinstance(detail, dict):
            continue
        type_url = str(detail.get("@type") or "")
        if not error_reason and type_url.endswith("/google.rpc.ErrorInfo"):
            reason = detail.get("reason")
            if isinstance(reason, str) and reason:
                error_reason = reason
            md = detail.get("metadata")
            if isinstance(md, dict):
                error_metadata = md
        elif retry_delay_seconds is None and type_url.endswith("/google.rpc.RetryInfo"):
            # retryDelay is a google.protobuf.Duration string like "30s" or "1.5s".
            delay_raw = detail.get("retryDelay")
            if isinstance(delay_raw, str) and delay_raw.endswith("s"):
                try:
                    retry_delay_seconds = float(delay_raw[:-1])
                except ValueError:
                    pass
            elif isinstance(delay_raw, (int, float)):
                retry_delay_seconds = float(delay_raw)

    # Fall back to the Retry-After header if the body didn't include RetryInfo.
    if retry_delay_seconds is None:
        try:
            header_val = response.headers.get("Retry-After") or response.headers.get("retry-after")
        except Exception:
            header_val = None
        if header_val:
            try:
                retry_delay_seconds = float(header_val)
            except (TypeError, ValueError):
                retry_delay_seconds = None

    # Classify the error code.  ``code_assist_rate_limited`` stays the default
    # for 429s; a more specific reason tag helps downstream callers (e.g. tests,
    # logs) without changing the rate_limit classification path.
    code = f"code_assist_http_{status}"
    if status == 401:
        code = "code_assist_unauthorized"
    elif status == 429:
        code = "code_assist_rate_limited"
        if error_reason == "MODEL_CAPACITY_EXHAUSTED":
            code = "code_assist_capacity_exhausted"

    # Build a human-readable message.  Keep the status + a raw-body tail for
    # debugging, but lead with a friendlier summary when we recognize the
    # Google signal.
    model_hint = ""
    if isinstance(error_metadata, dict):
        model_hint = str(error_metadata.get("model") or error_metadata.get("modelId") or "").strip()

    if status == 429 and error_reason == "MODEL_CAPACITY_EXHAUSTED":
        target = model_hint or "this Gemini model"
        message = (
            f"Gemini capacity exhausted for {target} (Google-side throttle, "
            f"not a Hermes issue). Try a different Gemini model or set a "
            f"fallback_providers entry to a non-Gemini provider."
        )
        if retry_delay_seconds is not None:
            message += f" Google suggests retrying in {retry_delay_seconds:g}s."
    elif status == 429 and err_status == "RESOURCE_EXHAUSTED":
        message = (
            f"Gemini quota exhausted ({err_message or 'RESOURCE_EXHAUSTED'}). "
            f"Check /gquota for remaining daily requests."
        )
        if retry_delay_seconds is not None:
            message += f" Retry suggested in {retry_delay_seconds:g}s."
    elif status == 404:
        # Google returns 404 when a model has been retired or renamed.
        target = model_hint or (err_message or "model")
        message = (
            f"Code Assist 404: {target} is not available at "
            f"cloudcode-pa.googleapis.com. It may have been renamed or "
            f"retired. Check hermes_cli/models.py for the current list."
        )
    elif err_message:
        # Generic fallback with the parsed message.
        message = f"Code Assist HTTP {status} ({err_status or 'error'}): {err_message}"
    else:
        # Last-ditch fallback — raw body snippet.
        message = f"Code Assist returned HTTP {status}: {body_text[:500]}"

    return CodeAssistError(
        message,
        code=code,
        status_code=status,
        response=response,
        retry_after=retry_delay_seconds,
        details={
            "status": err_status,
            "reason": error_reason,
            "metadata": error_metadata,
            "message": err_message,
        },
    )
