from __future__ import annotations

import asyncio
import http.cookies
import queue
import sys
import threading
import warnings
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager, contextmanager, suppress
from io import BytesIO
from typing import (
    TYPE_CHECKING,
    Callable,
    Generic,
    Literal,
    Optional,
    TypedDict,
    TypeVar,
    Union,
    cast,
)
from urllib.parse import urlparse

from ..aio import AsyncCurl
from ..const import CurlHttpVersion, CurlInfo, CurlOpt
from ..curl import Curl, CurlError, CurlMime
from ..utils import CurlCffiWarning
from .cookies import Cookies, CookieTypes, CurlMorsel
from .exceptions import RequestException, SessionClosed, code2error
from .headers import Headers, HeaderTypes
from .impersonate import BrowserTypeLiteral, ExtraFingerprints, ExtraFpDict
from .models import STREAM_END, Response
from .utils import not_set, set_curl_options, HttpVersionLiteral
from .websockets import AsyncWebSocket, WebSocket

with suppress(ImportError):
    import gevent

with suppress(ImportError):
    import eventlet.tpool

# Added in 3.13: https://docs.python.org/3/library/typing.html#typing.TypeVar.__default__
if sys.version_info >= (3, 13):
    R = TypeVar("R", bound=Response, default=Response)
else:
    R = TypeVar("R", bound=Response)

if TYPE_CHECKING:
    from typing_extensions import Unpack

    class ProxySpec(TypedDict, total=False):
        all: str
        http: str
        https: str
        ws: str
        wss: str

    class BaseSessionParams(Generic[R], TypedDict, total=False):
        headers: Optional[HeaderTypes]
        cookies: Optional[CookieTypes]
        auth: Optional[tuple[str, str]]
        proxies: Optional[ProxySpec]
        proxy: Optional[str]
        proxy_auth: Optional[tuple[str, str]]
        base_url: Optional[str]
        params: Optional[dict]
        verify: bool
        timeout: Union[float, tuple[float, float]]
        trust_env: bool
        allow_redirects: bool
        max_redirects: int
        impersonate: Optional[BrowserTypeLiteral]
        ja3: Optional[str]
        akamai: Optional[str]
        extra_fp: Optional[Union[ExtraFingerprints, ExtraFpDict]]
        default_headers: bool
        default_encoding: Union[str, Callable[[bytes], str]]
        curl_options: Optional[dict]
        curl_infos: Optional[list]
        http_version: Optional[Union[CurlHttpVersion, HttpVersionLiteral]]
        debug: bool
        interface: Optional[str]
        cert: Optional[Union[str, tuple[str, str]]]
        response_class: Optional[type[R]]
        discard_cookies: bool

    class StreamRequestParams(TypedDict, total=False):
        params: Optional[Union[dict, list, tuple]]
        data: Optional[Union[dict[str, str], list[tuple], str, BytesIO, bytes]]
        json: Optional[dict | list]
        headers: Optional[HeaderTypes]
        cookies: Optional[CookieTypes]
        files: Optional[dict]
        auth: Optional[tuple[str, str]]
        timeout: Optional[Union[float, tuple[float, float], object]]
        allow_redirects: Optional[bool]
        max_redirects: Optional[int]
        proxies: Optional[ProxySpec]
        proxy: Optional[str]
        proxy_auth: Optional[tuple[str, str]]
        verify: Optional[bool]
        referer: Optional[str]
        accept_encoding: Optional[str]
        content_callback: Optional[Callable]
        impersonate: Optional[BrowserTypeLiteral]
        ja3: Optional[str]
        akamai: Optional[str]
        extra_fp: Optional[Union[ExtraFingerprints, ExtraFpDict]]
        default_headers: Optional[bool]
        default_encoding: Union[str, Callable[[bytes], str]]
        quote: Union[str, Literal[False]]
        http_version: Optional[Union[CurlHttpVersion, HttpVersionLiteral]]
        interface: Optional[str]
        cert: Optional[Union[str, tuple[str, str]]]
        max_recv_speed: int
        multipart: Optional[CurlMime]
        discard_cookies: bool

    class RequestParams(StreamRequestParams, total=False):
        stream: Optional[bool]

else:

    class _Unpack:
        @staticmethod
        def __getitem__(*args, **kwargs):
            pass

    Unpack = _Unpack()

    ProxySpec = dict[str, str]
    BaseSessionParams = TypedDict
    StreamRequestParams, RequestParams = TypedDict, TypedDict

ThreadType = Literal["eventlet", "gevent"]
HttpMethod = Literal[
    "GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "TRACE", "PATCH", "QUERY"
]


def _is_absolute_url(url: str) -> bool:
    """Check if the provided url is an absolute url"""
    parsed_url = urlparse(url)
    return bool(parsed_url.scheme and parsed_url.hostname)


def _peek_queue(q: queue.Queue, default=None):
    try:
        return q.queue[0]
    except IndexError:
        return default


def _peek_aio_queue(q: asyncio.Queue, default=None):
    try:
        return q._queue[0]  # type: ignore
    except IndexError:
        return default


class BaseSession(Generic[R]):
    """Provide common methods for setting curl options and reading info in sessions."""

    def __init__(
        self,
        *,
        headers: Optional[HeaderTypes] = None,
        cookies: Optional[CookieTypes] = None,
        auth: Optional[tuple[str, str]] = None,
        proxies: Optional[ProxySpec] = None,
        proxy: Optional[str] = None,
        proxy_auth: Optional[tuple[str, str]] = None,
        base_url: Optional[str] = None,
        params: Optional[dict] = None,
        verify: bool = True,
        timeout: Union[float, tuple[float, float]] = 30,
        trust_env: bool = True,
        allow_redirects: bool = True,
        max_redirects: int = 30,
        impersonate: Optional[BrowserTypeLiteral] = None,
        ja3: Optional[str] = None,
        akamai: Optional[str] = None,
        extra_fp: Optional[Union[ExtraFingerprints, ExtraFpDict]] = None,
        default_headers: bool = True,
        default_encoding: Union[str, Callable[[bytes], str]] = "utf-8",
        curl_options: Optional[dict] = None,
        curl_infos: Optional[list] = None,
        http_version: Optional[Union[CurlHttpVersion, HttpVersionLiteral]] = None,
        debug: bool = False,
        interface: Optional[str] = None,
        cert: Optional[Union[str, tuple[str, str]]] = None,
        response_class: Optional[type[R]] = None,
        discard_cookies: bool = False,
    ):
        self.headers = Headers(headers)
        self._cookies = Cookies(cookies)  # guarded by @property
        self.auth = auth
        self.base_url = base_url
        self.params = params
        self.verify = verify
        self.timeout = timeout
        self.trust_env = trust_env
        self.allow_redirects = allow_redirects
        self.max_redirects = max_redirects
        self.impersonate = impersonate
        self.ja3 = ja3
        self.akamai = akamai
        self.extra_fp = extra_fp
        self.default_headers = default_headers
        self.default_encoding = default_encoding
        self.curl_options = curl_options or {}
        self.curl_infos = curl_infos or []
        self.http_version = http_version
        self.debug = debug
        self.interface = interface
        self.cert = cert

        if response_class is not None and issubclass(response_class, Response) is False:
            raise TypeError(
                "`response_class` must be a subclass of "
                "`curl_cffi.requests.models.Response`, "
                f"not of type `{response_class}`"
            )
        self.response_class = response_class or Response
        self.discard_cookies = discard_cookies

        if proxy and proxies:
            raise TypeError("Cannot specify both 'proxy' and 'proxies'")
        if proxy:
            proxies = {"all": proxy}
        self.proxies: ProxySpec = proxies or {}
        self.proxy_auth = proxy_auth

        if self.base_url and not _is_absolute_url(self.base_url):
            raise ValueError("You need to provide an absolute url for 'base_url'")

        self._closed = False

    def _parse_response(
        self, curl, buffer, header_buffer, default_encoding, discard_cookies
    ) -> R:
        c = curl
        rsp = cast(R, self.response_class(c))
        rsp.url = cast(bytes, c.getinfo(CurlInfo.EFFECTIVE_URL)).decode()
        if buffer:
            rsp.content = buffer.getvalue()
        rsp.http_version = cast(int, c.getinfo(CurlInfo.HTTP_VERSION))
        rsp.status_code = cast(int, c.getinfo(CurlInfo.RESPONSE_CODE))
        rsp.ok = 200 <= rsp.status_code < 400
        header_lines = header_buffer.getvalue().splitlines()

        # TODO: history urls
        header_list: list[bytes] = []
        for header_line in header_lines:
            if not header_line.strip():
                continue
            if header_line.startswith(b"HTTP/"):
                # read header from last response
                rsp.reason = c.get_reason_phrase(header_line).decode()
                # empty header list for new redirected response
                header_list = []
                continue
            if header_line.startswith(b" ") or header_line.startswith(b"\t"):
                header_list[-1] += header_line
                continue
            header_list.append(header_line)
        rsp.headers = Headers(header_list)

        # Response cookies - only from Set-Cookie headers
        rsp.cookies = Cookies()
        set_cookie_headers = rsp.headers.get_list("set-cookie")
        for set_cookie in set_cookie_headers:
            try:
                cookie = http.cookies.SimpleCookie()
                cookie.load(set_cookie)  # type: ignore
                for name, morsel in cookie.items():
                    rsp.cookies.set(
                        name,
                        morsel.value,
                        domain=morsel.get("domain", ""),
                        path=morsel.get("path", "/"),
                        secure=bool(morsel.get("secure")),
                    )
            except Exception:
                continue

        # Session cookies - from full cookie store
        discard_cookies = discard_cookies or self.discard_cookies
        if not discard_cookies:
            morsels = [
                CurlMorsel.from_curl_format(c) for c in c.getinfo(CurlInfo.COOKIELIST)
            ]
            self._cookies.update_cookies_from_curl(morsels)

        rsp.primary_ip = cast(bytes, c.getinfo(CurlInfo.PRIMARY_IP)).decode()
        rsp.primary_port = cast(int, c.getinfo(CurlInfo.PRIMARY_PORT))
        rsp.local_ip = cast(bytes, c.getinfo(CurlInfo.LOCAL_IP)).decode()
        rsp.local_port = cast(int, c.getinfo(CurlInfo.LOCAL_PORT))
        rsp.default_encoding = default_encoding
        rsp.elapsed = cast(float, c.getinfo(CurlInfo.TOTAL_TIME))
        rsp.redirect_count = cast(int, c.getinfo(CurlInfo.REDIRECT_COUNT))
        redirect_url_bytes = cast(bytes, c.getinfo(CurlInfo.REDIRECT_URL))
        try:
            rsp.redirect_url = redirect_url_bytes.decode()
        except UnicodeDecodeError:
            rsp.redirect_url = redirect_url_bytes.decode("latin-1")

        # custom info options
        for info in self.curl_infos:
            rsp.infos[info] = c.getinfo(info)

        return rsp

    def _check_session_closed(self):
        if self._closed:
            raise SessionClosed("Session is closed, cannot send request.")

    @property
    def cookies(self) -> Cookies:
        return self._cookies

    @cookies.setter
    def cookies(self, cookies: CookieTypes) -> None:
        # This ensures that the cookies property is always converted to Cookies.
        self._cookies = Cookies(cookies)


class Session(BaseSession[R]):
    """A request session, cookies and connections will be reused. This object is
    thread-safe, but it's recommended to use a separate session for each thread."""

    def __init__(
        self,
        curl: Optional[Curl] = None,
        thread: Optional[ThreadType] = None,
        use_thread_local_curl: bool = True,
        **kwargs: Unpack[BaseSessionParams[R]],
    ):
        """
        Parameters set in the ``__init__`` method will be overriden by the same
        parameter in request method.

        Args:
            curl: curl object to use in the session. If not provided, a new one will be
                created. Also, a fresh curl object will always be created when accessed
                from another thread.
            thread: thread engine to use for working with other thread implementations.
                choices: eventlet, gevent.
            headers: headers to use in the session.
            cookies: cookies to add in the session.
            auth: HTTP basic auth, a tuple of (username, password), only basic auth is
                supported.
            proxies: dict of proxies to use, prefer to use proxy if they are the same.
                format: ``{"http": proxy_url, "https": proxy_url}``.
            proxy: proxy to use, format: "http://proxy_url".
                Cannot be used with the above parameter.
            proxy_auth: HTTP basic auth for proxy, a tuple of (username, password).
            base_url: absolute url to use as base for relative urls.
            params: query string for the session.
            verify: whether to verify https certs.
            timeout: how many seconds to wait before giving up.
            trust_env: use http_proxy/https_proxy and other environments, default True.
            allow_redirects: whether to allow redirection.
            max_redirects: max redirect counts, default 30, use -1 for unlimited.
            impersonate: which browser version to impersonate in the session.
            ja3: ja3 string to impersonate in the session.
            akamai: akamai string to impersonate in the session.
            extra_fp: extra fingerprints options, in complement to ja3 and akamai str.
            interface: which interface use.
            default_encoding: encoding for decoding response content if charset is not
                found in headers. Defaults to "utf-8". Can be set to a callable for
                automatic detection.
            cert: a tuple of (cert, key) filenames for client cert.
            response_class: A customized subtype of ``Response`` to use.

        Notes:
            This class can be used as a context manager.

        .. code-block:: python

            from curl_cffi.requests import Session

            with Session() as s:
                r = s.get("https://example.com")
        """
        super().__init__(**kwargs)
        self._thread = thread
        self._use_thread_local_curl = use_thread_local_curl
        self._queue = None
        self._executor = None
        if use_thread_local_curl:
            self._local = threading.local()
            if curl:
                self._is_customized_curl = True
                self._local.curl = curl
            else:
                self._is_customized_curl = False
                self._local.curl = Curl(debug=self.debug)
        else:
            self._curl = curl if curl else Curl(debug=self.debug)

    @property
    def curl(self):
        if self._use_thread_local_curl:
            if self._is_customized_curl:
                warnings.warn(
                    "Creating fresh curl handle in different thread.",
                    CurlCffiWarning,
                    stacklevel=2,
                )
            if not getattr(self._local, "curl", None):
                self._local.curl = Curl(debug=self.debug)
            return self._local.curl
        else:
            return self._curl

    @property
    def executor(self):
        if self._executor is None:
            self._executor = ThreadPoolExecutor()
        return self._executor

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()

    def close(self) -> None:
        """Close the session."""
        self._closed = True
        self.curl.close()

    @contextmanager
    def stream(
        self,
        method: HttpMethod,
        url: str,
        **kwargs: Unpack[StreamRequestParams],
    ):
        """Equivalent to ``with request(..., stream=True) as r:``"""
        rsp = self.request(method=method, url=url, **kwargs, stream=True)
        try:
            yield rsp
        finally:
            rsp.close()

    def ws_connect(
        self, url, on_message=None, on_error=None, on_open=None, on_close=None, **kwargs
    ) -> WebSocket:
        """Connects to a websocket url.

        Note: This method is deprecated, use WebSocket instead.

        Args:
            url: the ws url to connect.
            on_message: message callback, ``def on_message(ws, str)``
            on_error: error callback, ``def on_error(ws, error)``
            on_open: open callback, ``def on_open(ws)``
            on_close: close callback, ``def on_close(ws)``

        Other parameters are the same as ``.request``

        Returns:
            a WebSocket instance to communicate with the server.
        """
        self._check_session_closed()

        curl = self.curl.duphandle()
        self.curl.reset()

        ws = WebSocket(
            curl=curl,
            on_message=on_message,
            on_error=on_error,
            on_open=on_open,
            on_close=on_close,
        )

        ws.connect(url, **kwargs)
        return ws

    def upkeep(self) -> int:
        return self.curl.upkeep()

    def request(
        self,
        method: HttpMethod,
        url: str,
        params: Optional[Union[dict, list, tuple]] = None,
        data: Optional[Union[dict[str, str], list[tuple], str, BytesIO, bytes]] = None,
        json: Optional[dict | list] = None,
        headers: Optional[HeaderTypes] = None,
        cookies: Optional[CookieTypes] = None,
        files: Optional[dict] = None,
        auth: Optional[tuple[str, str]] = None,
        timeout: Optional[Union[float, tuple[float, float], object]] = not_set,
        allow_redirects: Optional[bool] = None,
        max_redirects: Optional[int] = None,
        proxies: Optional[ProxySpec] = None,
        proxy: Optional[str] = None,
        proxy_auth: Optional[tuple[str, str]] = None,
        verify: Optional[bool] = None,
        referer: Optional[str] = None,
        accept_encoding: Optional[str] = "gzip, deflate, br",
        content_callback: Optional[Callable] = None,
        impersonate: Optional[BrowserTypeLiteral] = None,
        ja3: Optional[str] = None,
        akamai: Optional[str] = None,
        extra_fp: Optional[Union[ExtraFingerprints, ExtraFpDict]] = None,
        default_headers: Optional[bool] = None,
        default_encoding: Union[str, Callable[[bytes], str]] = "utf-8",
        quote: Union[str, Literal[False]] = "",
        http_version: Optional[Union[CurlHttpVersion, HttpVersionLiteral]] = None,
        interface: Optional[str] = None,
        cert: Optional[Union[str, tuple[str, str]]] = None,
        stream: Optional[bool] = None,
        max_recv_speed: int = 0,
        multipart: Optional[CurlMime] = None,
        discard_cookies: bool = False,
    ):
        """Send the request, see ``requests.request`` for details on parameters."""

        self._check_session_closed()

        # clone a new curl instance for streaming response
        if stream:
            c = self.curl.duphandle()
            self.curl.reset()
        else:
            c = self.curl

        req, buffer, header_buffer, q, header_recved, quit_now = set_curl_options(
            c,
            method=method,
            url=url,
            params_list=[self.params, params],
            base_url=self.base_url,
            data=data,
            json=json,
            headers_list=[self.headers, headers],
            cookies_list=[self._cookies, cookies],
            files=files,
            auth=auth or self.auth,
            timeout=self.timeout if timeout is not_set else timeout,
            allow_redirects=self.allow_redirects
            if allow_redirects is None
            else allow_redirects,
            max_redirects=self.max_redirects
            if max_redirects is None
            else max_redirects,
            proxies_list=[self.proxies, proxies],
            proxy=proxy,
            proxy_auth=proxy_auth or self.proxy_auth,
            verify_list=[self.verify, verify],
            referer=referer,
            accept_encoding=accept_encoding,
            content_callback=content_callback,
            impersonate=impersonate or self.impersonate,
            ja3=ja3 or self.ja3,
            akamai=akamai or self.akamai,
            extra_fp=extra_fp or self.extra_fp,
            default_headers=self.default_headers
            if default_headers is None
            else default_headers,
            quote=quote,
            http_version=http_version or self.http_version,
            interface=interface or self.interface,
            stream=stream,
            max_recv_speed=max_recv_speed,
            multipart=multipart,
            cert=cert or self.cert,
            curl_options=self.curl_options,
            queue_class=queue.Queue,
            event_class=threading.Event,
        )

        if stream:
            header_parsed = threading.Event()

            def perform():
                try:
                    c.perform()
                except CurlError as e:
                    rsp = self._parse_response(
                        c, buffer, header_buffer, default_encoding, discard_cookies
                    )
                    rsp.request = req
                    q.put_nowait(RequestException(str(e), e.code, rsp))  # type: ignore
                finally:
                    if not cast(threading.Event, header_recved).is_set():
                        cast(threading.Event, header_recved).set()
                    q.put(STREAM_END)  # type: ignore

            def cleanup(fut):
                header_parsed.wait()
                c.reset()

            stream_task = self.executor.submit(perform)
            stream_task.add_done_callback(cleanup)

            # Wait for the first chunk
            header_recved.wait()  # type: ignore
            rsp = self._parse_response(
                c, buffer, header_buffer, default_encoding, discard_cookies
            )

            header_parsed.set()

            # Raise the exception if something wrong happens when receiving the header.
            first_element = _peek_queue(q)  # type: ignore
            if isinstance(first_element, RequestException):
                c.reset()
                raise first_element

            rsp.request = req
            rsp.stream_task = stream_task
            rsp.quit_now = quit_now
            rsp.queue = q
            return rsp
        else:
            try:
                if self._thread == "eventlet":
                    # see: https://eventlet.net/doc/threading.html
                    eventlet.tpool.execute(c.perform)  # type: ignore
                elif self._thread == "gevent":
                    # see: https://www.gevent.org/api/gevent.threadpool.html
                    gevent.get_hub().threadpool.spawn(c.perform).get()  # type: ignore
                else:
                    c.perform()
            except CurlError as e:
                rsp = self._parse_response(
                    c, buffer, header_buffer, default_encoding, discard_cookies
                )
                rsp.request = req
                error = code2error(e.code, str(e))
                raise error(str(e), e.code, rsp) from e
            else:
                rsp = self._parse_response(
                    c, buffer, header_buffer, default_encoding, discard_cookies
                )
                rsp.request = req
                return rsp
            finally:
                c.reset()

    def head(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="HEAD", url=url, **kwargs)

    def get(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="GET", url=url, **kwargs)

    def post(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="POST", url=url, **kwargs)

    def put(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="PUT", url=url, **kwargs)

    def patch(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="PATCH", url=url, **kwargs)

    def delete(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="DELETE", url=url, **kwargs)

    def options(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="OPTIONS", url=url, **kwargs)

    def trace(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="TRACE", url=url, **kwargs)

    def query(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="QUERY", url=url, **kwargs)


class AsyncSession(BaseSession[R]):
    """An async request session, cookies and connections will be reused."""

    def __init__(
        self,
        *,
        loop=None,
        async_curl: Optional[AsyncCurl] = None,
        max_clients: int = 10,
        **kwargs: Unpack[BaseSessionParams[R]],
    ):
        """
        Parameters set in the ``__init__`` method will be override by the same parameter
        in request method.

        Parameters:
            loop: loop to use, if not provided, the running loop will be used.
            async_curl: [AsyncCurl](/api/curl_cffi#curl_cffi.AsyncCurl) object to use.
            max_clients: maxmium curl handle to use in the session,
                this will affect the concurrency ratio.
            headers: headers to use in the session.
            cookies: cookies to add in the session.
            auth: HTTP basic auth, a tuple of (username, password), only basic auth is
                supported.
            proxies: dict of proxies to use, prefer to use ``proxy`` if they are the
                same. format: ``{"http": proxy_url, "https": proxy_url}``.
            proxy: proxy to use, format: "http://proxy_url".
                Cannot be used with the above parameter.
            proxy_auth: HTTP basic auth for proxy, a tuple of (username, password).
            base_url: absolute url to use for relative urls.
            params: query string for the session.
            verify: whether to verify https certs.
            timeout: how many seconds to wait before giving up.
            trust_env: use http_proxy/https_proxy and other environments, default True.
            allow_redirects: whether to allow redirection.
            max_redirects: max redirect counts, default 30, use -1 for unlimited.
            impersonate: which browser version to impersonate in the session.
            ja3: ja3 string to impersonate in the session.
            akamai: akamai string to impersonate in the session.
            extra_fp: extra fingerprints options, in complement to ja3 and akamai str.
            default_encoding: encoding for decoding response content if charset is not
                found in headers. Defaults to "utf-8". Can be set to a callable for
                automatic detection.
            cert: a tuple of (cert, key) filenames for client cert.
            response_class: A customized subtype of ``Response`` to use.

        Notes:
            This class can be used as a context manager, and it's recommended to use via
            ``async with``.
            However, unlike aiohttp, it is not required to use ``with``.

        .. code-block:: python

            from curl_cffi.requests import AsyncSession

            # recommended.
            async with AsyncSession() as s:
                r = await s.get("https://example.com")

            s = AsyncSession()  # it also works.
        """
        super().__init__(**kwargs)
        self._loop = loop
        self._acurl = async_curl
        self.max_clients = max_clients
        self.init_pool()

    @property
    def loop(self):
        if self._loop is None:
            self._loop = asyncio.get_running_loop()
        return self._loop

    @property
    def acurl(self):
        if self._acurl is None:
            self._acurl = AsyncCurl(loop=self.loop)
        return self._acurl

    def init_pool(self):
        self.pool = asyncio.LifoQueue(self.max_clients)
        while True:
            try:
                self.pool.put_nowait(None)
            except asyncio.QueueFull:
                break

    async def pop_curl(self):
        curl = await self.pool.get()
        if curl is None:
            curl = Curl(debug=self.debug)
        # XXX: This may be related to proxy rotation
        # curl.setopt(CurlOpt.FRESH_CONNECT, 1)
        # curl.setopt(CurlOpt.FORBID_REUSE, 1)
        return curl

    def push_curl(self, curl):
        with suppress(asyncio.QueueFull):
            self.pool.put_nowait(curl)

    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        await self.close()
        return None

    async def close(self) -> None:
        """Close the session."""
        await self.acurl.close()
        self._closed = True
        while True:
            try:
                curl = self.pool.get_nowait()
                if curl:
                    curl.close()
            except asyncio.QueueEmpty:
                break

    def release_curl(self, curl):
        curl.clean_after_perform()
        if not self._closed:
            self.acurl.remove_handle(curl)
            curl.reset()
            # curl.setopt(CurlOpt.PIPEWAIT, 1)
            self.push_curl(curl)
        else:
            curl.close()

    @asynccontextmanager
    async def stream(
        self,
        method: HttpMethod,
        url: str,
        **kwargs: Unpack[StreamRequestParams],
    ):
        """Equivalent to ``async with request(..., stream=True) as r:``"""
        rsp = await self.request(method=method, url=url, **kwargs, stream=True)
        try:
            yield rsp
        finally:
            await rsp.aclose()

    async def ws_connect(
        self,
        url: str,
        autoclose: bool = True,
        params: Optional[Union[dict, list, tuple]] = None,
        headers: Optional[HeaderTypes] = None,
        cookies: Optional[CookieTypes] = None,
        auth: Optional[tuple[str, str]] = None,
        timeout: Optional[Union[float, tuple[float, float], object]] = not_set,
        allow_redirects: Optional[bool] = None,
        max_redirects: Optional[int] = None,
        proxies: Optional[ProxySpec] = None,
        proxy: Optional[str] = None,
        proxy_auth: Optional[tuple[str, str]] = None,
        verify: Optional[bool] = None,
        referer: Optional[str] = None,
        accept_encoding: Optional[str] = "gzip, deflate, br",
        impersonate: Optional[BrowserTypeLiteral] = None,
        ja3: Optional[str] = None,
        akamai: Optional[str] = None,
        extra_fp: Optional[Union[ExtraFingerprints, ExtraFpDict]] = None,
        default_headers: Optional[bool] = None,
        quote: Union[str, Literal[False]] = "",
        http_version: Optional[Union[CurlHttpVersion, HttpVersionLiteral]] = None,
        interface: Optional[str] = None,
        cert: Optional[Union[str, tuple[str, str]]] = None,
        max_recv_speed: int = 0,
    ) -> AsyncWebSocket:
        """Connects to a WebSocket.

        Args:
            url: url for the requests.
            autoclose: whether to close the WebSocket after receiving a close frame.
            params: query string for the requests.
            headers: headers to send.
            cookies: cookies to use.
            auth: HTTP basic auth, a tuple of (username, password), only basic auth is
                supported.
            timeout: how many seconds to wait before giving up.
            allow_redirects: whether to allow redirection.
            max_redirects: max redirect counts, default 30, use -1 for unlimited.
            proxies: dict of proxies to use, prefer to use ``proxy`` if they are the
                same. format: ``{"http": proxy_url, "https": proxy_url}``.
            proxy: proxy to use, format: "http://user@pass:proxy_url".
                Can't be used with `proxies` parameter.
            proxy_auth: HTTP basic auth for proxy, a tuple of (username, password).
            verify: whether to verify https certs.
            referer: shortcut for setting referer header.
            accept_encoding: shortcut for setting accept-encoding header.
            impersonate: which browser version to impersonate.
            ja3: ja3 string to impersonate.
            akamai: akamai string to impersonate.
            extra_fp: extra fingerprints options, in complement to ja3 and akamai str.
            default_headers: whether to set default browser headers.
            quote: Set characters to be quoted, i.e. percent-encoded. Default safe
                string is ``!#$%&'()*+,/:;=?@[]~``. If set to a sting, the character
                will be removed from the safe string, thus quoted. If set to False, the
                url will be kept as is, without any automatic percent-encoding, you must
                encode the URL yourself.
            curl_options: extra curl options to use.
            http_version: limiting http version, defaults to http2.
            interface: which interface to use.
            cert: a tuple of (cert, key) filenames for client cert.
            max_recv_speed: maximum receive speed, bytes per second.
        """

        self._check_session_closed()

        curl = await self.pop_curl()
        set_curl_options(
            curl=curl,
            method="GET",
            url=url,
            base_url=self.base_url,
            params_list=[self.params, params],
            headers_list=[self.headers, headers],
            cookies_list=[self.cookies, cookies],
            auth=auth or self.auth,
            timeout=self.timeout if timeout is not_set else timeout,
            allow_redirects=self.allow_redirects
            if allow_redirects is None
            else allow_redirects,
            max_redirects=self.max_redirects
            if max_redirects is None
            else max_redirects,
            proxies_list=[self.proxies, proxies],
            proxy=proxy,
            proxy_auth=proxy_auth or self.proxy_auth,
            verify_list=[self.verify, verify],
            referer=referer,
            accept_encoding=accept_encoding,
            impersonate=impersonate or self.impersonate,
            ja3=ja3 or self.ja3,
            akamai=akamai or self.akamai,
            extra_fp=extra_fp or self.extra_fp,
            default_headers=self.default_headers
            if default_headers is None
            else default_headers,
            quote=quote,
            http_version=http_version or self.http_version,
            interface=interface or self.interface,
            max_recv_speed=max_recv_speed,
            cert=cert or self.cert,
            queue_class=asyncio.Queue,
            event_class=asyncio.Event,
        )
        curl.setopt(CurlOpt.CONNECT_ONLY, 2)  # https://curl.se/docs/websocket.html

        await self.loop.run_in_executor(None, curl.perform)
        return AsyncWebSocket(
            cast(AsyncSession[Response], self),
            curl,
            autoclose=autoclose,
        )

    async def request(
        self,
        method: HttpMethod,
        url: str,
        params: Optional[Union[dict, list, tuple]] = None,
        data: Optional[Union[dict[str, str], list[tuple], str, BytesIO, bytes]] = None,
        json: Optional[dict | list] = None,
        headers: Optional[HeaderTypes] = None,
        cookies: Optional[CookieTypes] = None,
        files: Optional[dict] = None,
        auth: Optional[tuple[str, str]] = None,
        timeout: Optional[Union[float, tuple[float, float], object]] = not_set,
        allow_redirects: Optional[bool] = None,
        max_redirects: Optional[int] = None,
        proxies: Optional[ProxySpec] = None,
        proxy: Optional[str] = None,
        proxy_auth: Optional[tuple[str, str]] = None,
        verify: Optional[bool] = None,
        referer: Optional[str] = None,
        accept_encoding: Optional[str] = "gzip, deflate, br",
        content_callback: Optional[Callable] = None,
        impersonate: Optional[BrowserTypeLiteral] = None,
        ja3: Optional[str] = None,
        akamai: Optional[str] = None,
        extra_fp: Optional[Union[ExtraFingerprints, ExtraFpDict]] = None,
        default_headers: Optional[bool] = None,
        default_encoding: Union[str, Callable[[bytes], str]] = "utf-8",
        quote: Union[str, Literal[False]] = "",
        http_version: Optional[Union[CurlHttpVersion, HttpVersionLiteral]] = None,
        interface: Optional[str] = None,
        cert: Optional[Union[str, tuple[str, str]]] = None,
        stream: Optional[bool] = None,
        max_recv_speed: int = 0,
        multipart: Optional[CurlMime] = None,
        discard_cookies: bool = False,
    ):
        """Send the request, see ``curl_cffi.requests.request`` for details on args."""

        self._check_session_closed()

        curl = await self.pop_curl()
        req, buffer, header_buffer, q, header_recved, quit_now = set_curl_options(
            curl=curl,
            method=method,
            url=url,
            params_list=[self.params, params],
            base_url=self.base_url,
            data=data,
            json=json,
            headers_list=[self.headers, headers],
            cookies_list=[self.cookies, cookies],
            files=files,
            auth=auth or self.auth,
            timeout=self.timeout if timeout is not_set else timeout,
            allow_redirects=self.allow_redirects
            if allow_redirects is None
            else allow_redirects,
            max_redirects=self.max_redirects
            if max_redirects is None
            else max_redirects,
            proxies_list=[self.proxies, proxies],
            proxy=proxy,
            proxy_auth=proxy_auth or self.proxy_auth,
            verify_list=[self.verify, verify],
            referer=referer,
            accept_encoding=accept_encoding,
            content_callback=content_callback,
            impersonate=impersonate or self.impersonate,
            ja3=ja3 or self.ja3,
            akamai=akamai or self.akamai,
            extra_fp=extra_fp or self.extra_fp,
            default_headers=self.default_headers
            if default_headers is None
            else default_headers,
            quote=quote,
            http_version=http_version or self.http_version,
            interface=interface or self.interface,
            stream=stream,
            max_recv_speed=max_recv_speed,
            multipart=multipart,
            cert=cert or self.cert,
            curl_options=self.curl_options,
            queue_class=asyncio.Queue,
            event_class=asyncio.Event,
        )
        if stream:
            task = self.acurl.add_handle(curl)

            async def perform():
                try:
                    await task
                except CurlError as e:
                    rsp = self._parse_response(
                        curl, buffer, header_buffer, default_encoding, discard_cookies
                    )
                    rsp.request = req
                    q.put_nowait(RequestException(str(e), e.code, rsp))  # type: ignore
                finally:
                    if not cast(asyncio.Event, header_recved).is_set():
                        cast(asyncio.Event, header_recved).set()
                    await q.put(STREAM_END)  # type: ignore

            def cleanup(fut):
                self.release_curl(curl)

            stream_task = asyncio.create_task(perform())
            stream_task.add_done_callback(cleanup)

            await cast(asyncio.Event, header_recved).wait()

            # Unlike threads, coroutines does not use preemptive scheduling.
            # For asyncio, there is no need for a header_parsed event, the
            # _parse_response will execute in the foreground, no background tasks
            # running.
            rsp = self._parse_response(
                curl, buffer, header_buffer, default_encoding, discard_cookies
            )

            first_element = _peek_aio_queue(q)  # type: ignore
            if isinstance(first_element, RequestException):
                self.release_curl(curl)
                raise first_element

            rsp.request = req
            rsp.astream_task = stream_task
            rsp.quit_now = quit_now
            rsp.queue = q
            return rsp
        else:
            try:
                task = self.acurl.add_handle(curl)
                await task
            except CurlError as e:
                rsp = self._parse_response(
                    curl, buffer, header_buffer, default_encoding, discard_cookies
                )
                rsp.request = req
                error = code2error(e.code, str(e))
                raise error(str(e), e.code, rsp) from e
            else:
                rsp = self._parse_response(
                    curl, buffer, header_buffer, default_encoding, discard_cookies
                )
                rsp.request = req
                return rsp
            finally:
                self.release_curl(curl)

    def head(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="HEAD", url=url, **kwargs)

    def get(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="GET", url=url, **kwargs)

    def post(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="POST", url=url, **kwargs)

    def put(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="PUT", url=url, **kwargs)

    def patch(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="PATCH", url=url, **kwargs)

    def delete(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="DELETE", url=url, **kwargs)

    def options(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="OPTIONS", url=url, **kwargs)

    def trace(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="TRACE", url=url, **kwargs)

    def query(self, url: str, **kwargs: Unpack[RequestParams]):
        return self.request(method="QUERY", url=url, **kwargs)
