from __future__ import annotations

from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine, Iterable, Iterator
from os import PathLike
from typing import Any, TypeVar

from vercel._internal.blob.core import (
    AsyncBlobOpsClient,
    SyncBlobOpsClient,
    normalize_delete_urls,
)
from vercel._internal.iter_coroutine import iter_coroutine
from vercel.blob.types import (
    Access,
    CreateFolderResult as CreateFolderResultType,
    GetBlobResult as GetBlobResultType,
    HeadBlobResult as HeadBlobResultType,
    ListBlobItem,
    ListBlobResult as ListBlobResultType,
    PutBlobResult as PutBlobResultType,
    UploadProgressEvent,
)

_T = TypeVar("_T")


def _run_sync_blob_operation(
    operation: Callable[[SyncBlobOpsClient], Coroutine[None, None, _T]],
    *,
    token: str | None = None,
) -> _T:
    with SyncBlobOpsClient(token=token) as client:
        # Keep exactly one sync bridge at the wrapper boundary.
        return iter_coroutine(operation(client))


def put(
    path: str,
    body: Any,
    *,
    access: Access = "public",
    content_type: str | None = None,
    add_random_suffix: bool = False,
    overwrite: bool = False,
    cache_control_max_age: int | None = None,
    token: str | None = None,
    multipart: bool = False,
    on_upload_progress: Callable[[UploadProgressEvent], None] | None = None,
) -> PutBlobResultType:
    result, _ = _run_sync_blob_operation(
        lambda client: client.put_blob(
            path,
            body,
            access=access,
            content_type=content_type,
            add_random_suffix=add_random_suffix,
            overwrite=overwrite,
            cache_control_max_age=cache_control_max_age,
            token=token,
            multipart=multipart,
            on_upload_progress=on_upload_progress,
        ),
        token=token,
    )
    return result


async def put_async(
    path: str,
    body: Any,
    *,
    access: Access = "public",
    content_type: str | None = None,
    add_random_suffix: bool = False,
    overwrite: bool = False,
    cache_control_max_age: int | None = None,
    token: str | None = None,
    multipart: bool = False,
    on_upload_progress: (
        Callable[[UploadProgressEvent], None]
        | Callable[[UploadProgressEvent], Awaitable[None]]
        | None
    ) = None,
) -> PutBlobResultType:
    async with AsyncBlobOpsClient(token=token) as client:
        result, _ = await client.put_blob(
            path,
            body,
            access=access,
            content_type=content_type,
            add_random_suffix=add_random_suffix,
            overwrite=overwrite,
            cache_control_max_age=cache_control_max_age,
            token=token,
            multipart=multipart,
            on_upload_progress=on_upload_progress,
        )
    return result


def delete(
    url_or_path: str | Iterable[str],
    *,
    token: str | None = None,
) -> None:
    normalized_urls = normalize_delete_urls(url_or_path)
    _run_sync_blob_operation(
        lambda client: client.delete_blob(
            normalized_urls,
        ),
        token=token,
    )


async def delete_async(
    url_or_path: str | Iterable[str],
    *,
    token: str | None = None,
) -> None:
    normalized_urls = normalize_delete_urls(url_or_path)
    async with AsyncBlobOpsClient(token=token) as client:
        await client.delete_blob(
            normalized_urls,
        )


def head(url_or_path: str, *, token: str | None = None) -> HeadBlobResultType:
    return _run_sync_blob_operation(
        lambda client: client.head_blob(
            url_or_path,
        ),
        token=token,
    )


async def head_async(url_or_path: str, *, token: str | None = None) -> HeadBlobResultType:
    async with AsyncBlobOpsClient(token=token) as client:
        return await client.head_blob(
            url_or_path,
        )


def get(
    url_or_path: str,
    *,
    access: Access = "public",
    token: str | None = None,
    timeout: float | None = None,
    use_cache: bool = True,
    if_none_match: str | None = None,
) -> GetBlobResultType:
    return _run_sync_blob_operation(
        lambda client: client.get_blob(
            url_or_path,
            access=access,
            timeout=timeout,
            use_cache=use_cache,
            if_none_match=if_none_match,
            default_timeout=30.0,
        ),
        token=token,
    )


async def get_async(
    url_or_path: str,
    *,
    access: Access = "public",
    token: str | None = None,
    timeout: float | None = None,
    use_cache: bool = True,
    if_none_match: str | None = None,
) -> GetBlobResultType:
    async with AsyncBlobOpsClient(token=token) as client:
        return await client.get_blob(
            url_or_path,
            access=access,
            timeout=timeout,
            use_cache=use_cache,
            if_none_match=if_none_match,
            default_timeout=30.0,
        )


def list_objects(
    *,
    limit: int | None = None,
    prefix: str | None = None,
    cursor: str | None = None,
    mode: str | None = None,
    token: str | None = None,
) -> ListBlobResultType:
    with SyncBlobOpsClient(token=token) as client:
        return client.list_objects(
            limit=limit,
            prefix=prefix,
            cursor=cursor,
            mode=mode,
        )


async def list_objects_async(
    *,
    limit: int | None = None,
    prefix: str | None = None,
    cursor: str | None = None,
    mode: str | None = None,
    token: str | None = None,
) -> ListBlobResultType:
    async with AsyncBlobOpsClient(token=token) as client:
        return await client.list_objects(
            limit=limit,
            prefix=prefix,
            cursor=cursor,
            mode=mode,
        )


def iter_objects(
    *,
    prefix: str | None = None,
    mode: str | None = None,
    token: str | None = None,
    batch_size: int | None = None,
    limit: int | None = None,
    cursor: str | None = None,
) -> Iterator[ListBlobItem]:
    with SyncBlobOpsClient(token=token) as client:
        yield from client.iter_objects(
            prefix=prefix,
            mode=mode,
            batch_size=batch_size,
            limit=limit,
            cursor=cursor,
        )


async def iter_objects_async(
    *,
    prefix: str | None = None,
    mode: str | None = None,
    token: str | None = None,
    batch_size: int | None = None,
    limit: int | None = None,
    cursor: str | None = None,
) -> AsyncIterator[ListBlobItem]:
    async with AsyncBlobOpsClient(token=token) as client:
        async for item in client.iter_objects(
            prefix=prefix,
            mode=mode,
            batch_size=batch_size,
            limit=limit,
            cursor=cursor,
        ):
            yield item


def copy(
    src_path: str,
    dst_path: str,
    *,
    access: Access = "public",
    content_type: str | None = None,
    add_random_suffix: bool = False,
    overwrite: bool = False,
    cache_control_max_age: int | None = None,
    token: str | None = None,
) -> PutBlobResultType:
    return _run_sync_blob_operation(
        lambda client: client.copy_blob(
            src_path,
            dst_path,
            access=access,
            content_type=content_type,
            add_random_suffix=add_random_suffix,
            overwrite=overwrite,
            cache_control_max_age=cache_control_max_age,
        ),
        token=token,
    )


async def copy_async(
    src_path: str,
    dst_path: str,
    *,
    access: Access = "public",
    content_type: str | None = None,
    add_random_suffix: bool = False,
    overwrite: bool = False,
    cache_control_max_age: int | None = None,
    token: str | None = None,
) -> PutBlobResultType:
    async with AsyncBlobOpsClient(token=token) as client:
        return await client.copy_blob(
            src_path,
            dst_path,
            access=access,
            content_type=content_type,
            add_random_suffix=add_random_suffix,
            overwrite=overwrite,
            cache_control_max_age=cache_control_max_age,
        )


def create_folder(
    path: str,
    *,
    token: str | None = None,
    overwrite: bool = False,
) -> CreateFolderResultType:
    return _run_sync_blob_operation(
        lambda client: client.create_folder(
            path,
            overwrite=overwrite,
        ),
        token=token,
    )


async def create_folder_async(
    path: str,
    *,
    token: str | None = None,
    overwrite: bool = False,
) -> CreateFolderResultType:
    async with AsyncBlobOpsClient(token=token) as client:
        return await client.create_folder(
            path,
            overwrite=overwrite,
        )


def upload_file(
    local_path: str | PathLike,
    path: str,
    *,
    access: Access = "public",
    content_type: str | None = None,
    add_random_suffix: bool = False,
    overwrite: bool = False,
    cache_control_max_age: int | None = None,
    token: str | None = None,
    multipart: bool = False,
    on_upload_progress: Callable[[UploadProgressEvent], None] | None = None,
) -> PutBlobResultType:
    return _run_sync_blob_operation(
        lambda client: client.upload_file(
            local_path,
            path,
            access=access,
            content_type=content_type,
            add_random_suffix=add_random_suffix,
            overwrite=overwrite,
            cache_control_max_age=cache_control_max_age,
            token=token,
            multipart=multipart,
            on_upload_progress=on_upload_progress,
            missing_local_path_error="src_path is required",
        ),
        token=token,
    )


async def upload_file_async(
    local_path: str | PathLike,
    path: str,
    *,
    access: Access = "public",
    content_type: str | None = None,
    add_random_suffix: bool = False,
    overwrite: bool = False,
    cache_control_max_age: int | None = None,
    token: str | None = None,
    multipart: bool = False,
    on_upload_progress: (
        Callable[[UploadProgressEvent], None]
        | Callable[[UploadProgressEvent], Awaitable[None]]
        | None
    ) = None,
) -> PutBlobResultType:
    async with AsyncBlobOpsClient(token=token) as client:
        return await client.upload_file(
            local_path,
            path,
            access=access,
            content_type=content_type,
            add_random_suffix=add_random_suffix,
            overwrite=overwrite,
            cache_control_max_age=cache_control_max_age,
            token=token,
            multipart=multipart,
            on_upload_progress=on_upload_progress,
            missing_local_path_error="local_path is required",
        )


def download_file(
    url_or_path: str,
    local_path: str | PathLike,
    *,
    access: Access = "public",
    token: str | None = None,
    timeout: float | None = None,
    overwrite: bool = True,
    create_parents: bool = True,
    progress: Callable[[int, int | None], None] | None = None,
) -> str:
    return _run_sync_blob_operation(
        lambda client: client.download_file(
            url_or_path,
            local_path,
            access=access,
            timeout=timeout,
            overwrite=overwrite,
            create_parents=create_parents,
            progress=progress,
        ),
        token=token,
    )


async def download_file_async(
    url_or_path: str,
    local_path: str | PathLike,
    *,
    access: Access = "public",
    token: str | None = None,
    timeout: float | None = None,
    overwrite: bool = True,
    create_parents: bool = True,
    progress: (
        Callable[[int, int | None], None] | Callable[[int, int | None], Awaitable[None]] | None
    ) = None,
) -> str:
    async with AsyncBlobOpsClient(token=token) as client:
        return await client.download_file(
            url_or_path,
            local_path,
            access=access,
            timeout=timeout,
            overwrite=overwrite,
            create_parents=create_parents,
            progress=progress,
        )
