Source code for ipfshttpclient.http_requests

"""HTTP client for API requests based on good old requests library

This exists mainly for Python 3.5 compatibility.
"""

import math
import http.client
import os
import typing as ty
import urllib.parse

import urllib3.exceptions  # type: ignore[import]

from . import encoding
from . import exceptions
from .http_common import (
	ClientSyncBase, multiaddr_to_url_data,
	
	addr_t, auth_t, cookies_t, headers_t, params_t, reqdata_sync_t, timeout_t,
	Closable,
)

PATCH_REQUESTS = (os.environ.get("PY_IPFS_HTTP_CLIENT_PATCH_REQUESTS", "yes").lower()
                  not in ("false", "no"))
if PATCH_REQUESTS:
	from . import requests_wrapper as requests
elif not ty.TYPE_CHECKING:  # pragma: no cover (always enabled in production)
	import requests


def map_args_to_requests(
		*,
		auth: auth_t = None,
		cookies: cookies_t = None,
		headers: headers_t = None,
		params: params_t = None,
		timeout: timeout_t = None
) -> ty.Dict[str, ty.Any]:
	kwargs = {}  # type: ty.Dict[str, ty.Any]
	
	if auth is not None:
		kwargs["auth"] = auth
	
	if cookies is not None:
		kwargs["cookies"] = cookies
	
	if headers is not None:
		kwargs["headers"] = headers
	
	if timeout is not None:
		if isinstance(timeout, tuple):
			timeout_ = (
				timeout[0] if timeout[0] < math.inf else None,
				timeout[1] if timeout[1] < math.inf else None,
			)  # type: ty.Union[ty.Optional[float], ty.Tuple[ty.Optional[float], ty.Optional[float]]]
		else:
			timeout_ = timeout if timeout < math.inf else None
		kwargs["timeout"] = timeout_
	
	if params is not None:
		kwargs["params"] = {}
		for name, value in params:
			if name not in kwargs["params"]:
				kwargs["params"][name] = value
			elif not isinstance(kwargs["params"][name], list):
				kwargs["params"][name] = [kwargs["params"][name], value]
			else:
				kwargs["params"][name].append(value)
	
	return kwargs


[docs]class ClientSync(ClientSyncBase[requests.Session]): # type: ignore[name-defined] __slots__ = ("_base_url", "_default_timeout", "_request_proxies", "_session_props") #_base_url: str #_default_timeout: timeout_t #_request_proxies: ty.Optional[ty.Dict[str, str]] #_session_props: ty.Dict[str, ty.Any] def _init(self, addr: addr_t, base: str, *, # type: ignore[no-any-unimported] auth: auth_t, cookies: cookies_t, headers: headers_t, params: params_t, timeout: timeout_t) -> None: self._base_url, uds_path, family, host_numeric = multiaddr_to_url_data(addr, base) self._session_props = map_args_to_requests( auth=auth, cookies=cookies, headers=headers, params=params, ) self._default_timeout = timeout if PATCH_REQUESTS: # pragma: no branch (always enabled in production) self._session_props["family"] = family # Ensure that no proxy lookups are done for the UDS pseudo-hostname # # I'm well aware of the `.proxies` attribute of the session object: As it turns out, # setting *that* attribute will *not* bypass system proxy resolution – only the # per-request keyword-argument can do *that*…! self._request_proxies = None # type: ty.Optional[ty.Dict[str, str]] if uds_path: self._request_proxies = { "no_proxy": urllib.parse.quote(uds_path, safe=""), } def _make_session(self) -> requests.Session: # type: ignore[name-defined] session = requests.Session() # type: ignore[attr-defined] try: for name, value in self._session_props.items(): setattr(session, name, value) return session # It is very unlikely that this would ever error, but if it does try our # best to prevent a leak except: # pragma: no cover session.close() raise def _do_raise_for_status(self, response: requests.Request) -> None: # type: ignore[name-defined] try: response.raise_for_status() except requests.exceptions.HTTPError as error: # type: ignore[attr-defined] content = [] try: decoder = encoding.get_encoding("json") for chunk in response.iter_content(chunk_size=None): content += list(decoder.parse_partial(chunk)) content += list(decoder.parse_finalize()) except exceptions.DecodingError: pass # If we have decoded an error response from the server, # use that as the exception message; otherwise, just pass # the exception on to the caller. if len(content) == 1 \ and isinstance(content[0], dict) \ and "Message" in content[0]: msg = content[0]["Message"] raise exceptions.ErrorResponse(msg, error) from error else: raise exceptions.StatusError(error) from error def _request( self, method: str, path: str, params: ty.Sequence[ty.Tuple[str, str]], *, auth: auth_t, data: reqdata_sync_t, headers: headers_t, timeout: timeout_t, chunk_size: ty.Optional[int] ) -> ty.Tuple[ty.List[Closable], ty.Generator[bytes, ty.Any, ty.Any]]: # Ensure path is relative so that it is resolved relative to the base while path.startswith("/"): path = path[1:] url = urllib.parse.urljoin(self._base_url, path) try: # Determine session object to use closables, session = self._access_session() # Do HTTP request (synchronously) and map exceptions try: res = session.request( method=method, url=url, **map_args_to_requests( params=params, auth=auth, headers=headers, timeout=(timeout if timeout is not None else self._default_timeout), ), proxies=self._request_proxies, data=data, stream=True, ) closables.insert(0, res) except (requests.ConnectTimeout, requests.Timeout) as error: # type: ignore[attr-defined] raise exceptions.TimeoutError(error) from error except requests.ConnectionError as error: # type: ignore[attr-defined] # Report protocol violations separately # # This used to happen because requests wouldn't catch # `http.client.HTTPException` at all, now we recreate # this behaviour manually if we detect it. if isinstance(error.args[0], urllib3.exceptions.ProtocolError): raise exceptions.ProtocolError(error.args[0]) from error.args[0] raise exceptions.ConnectionError(error) from error # Looks like the following error doesn't happen anymore with modern requests? except http.client.HTTPException as error: # pragma: no cover raise exceptions.ProtocolError(error) from error # Raise exception for response status # (optionally incorporating the response message, if available) self._do_raise_for_status(res) return closables, res.iter_content(chunk_size=chunk_size) except: for closable in closables: closable.close() raise