Source code for ipfshttpclient.encoding
"""Classes for encoding and decoding datastreams into object values"""
import abc
import codecs
import typing as ty
import json
from . import exceptions
from . import utils
if ty.TYPE_CHECKING:
import typing_extensions as ty_ext
else:
from . import utils as ty_ext
T = ty.TypeVar("T")
[docs]def empty_gen() -> ty.Generator[T, None, None]:
"""A generator that yields nothing"""
if False: # pragma: no branch
yield ty.cast(T, None) # type: ignore[unreachable]
[docs]class Encoding(ty.Generic[T], metaclass=abc.ABCMeta):
"""Abstract base for a data parser/encoder interface"""
#name: str
is_stream = False # type: bool
[docs] @abc.abstractmethod
def parse_partial(self, raw: bytes) -> ty.Generator[T, ty.Any, ty.Any]:
"""Parses the given data and yields all complete data sets that can
be built from this.
Raises
------
~ipfshttpclient.exceptions.DecodingError
Parameters
----------
raw
Data to be parsed
"""
[docs] def parse_finalize(self) -> ty.Generator[T, ty.Any, ty.Any]:
"""Finalizes parsing based on remaining buffered data and yields the
remaining data sets
Raises
------
~ipfshttpclient.exceptions.DecodingError
"""
return empty_gen()
[docs] @abc.abstractmethod
def encode(self, obj: T) -> bytes:
"""Serializes the given Python object to a bytes string
Raises
------
~ipfshttpclient.exceptions.EncodingError
Parameters
----------
obj
Object to be encoded
"""
[docs]class Dummy(Encoding[bytes]):
"""Dummy parser/encoder that does nothing"""
name = "none"
is_stream = True
[docs] def parse_partial(self, raw: bytes) -> ty.Generator[bytes, ty.Any, ty.Any]:
"""Yields the data passed into this method
Parameters
----------
raw
Any kind of data
"""
yield raw
[docs] def encode(self, obj: bytes) -> bytes:
"""Returns the bytes representation of the data passed into this
function
Parameters
----------
obj
Any Python object
"""
return obj
[docs]class Json(Encoding[utils.json_value_t]):
"""JSON parser/encoder that handles concatenated JSON"""
name = 'json'
def __init__(self) -> None:
self._buffer = [] # type: ty.List[ty.Optional[str]]
self._decoder1 = codecs.getincrementaldecoder('utf-8')()
self._decoder2 = json.JSONDecoder()
self._lasterror = None # type: ty.Optional[ValueError]
[docs] @ty.no_type_check # It works just fine and I don't want to rewrite it just
# because mypy doesn't understand… # noqa: E114, E116
def parse_partial(self, data: bytes) -> ty.Generator[utils.json_value_t, ty.Any, ty.Any]:
"""Incrementally decodes JSON data sets into Python objects.
Raises
------
~ipfshttpclient.exceptions.DecodingError
"""
try:
# Python requires all JSON data to text strings
lines = self._decoder1.decode(data, False).split("\n")
# Add first input line to last buffer line, if applicable, to
# handle cases where the JSON string has been chopped in half
# at the network level due to streaming
if len(self._buffer) > 0 and self._buffer[-1] is not None:
self._buffer[-1] += lines[0]
self._buffer.extend(lines[1:])
else:
self._buffer.extend(lines)
except UnicodeDecodeError as error:
raise exceptions.DecodingError('json', error) from error
# Process data buffer
index = 0
try:
# Process each line as separate buffer
#PERF: This way the `.lstrip()` call becomes almost always a NOP
# even if it does return a different string it will only
# have to allocate a new buffer for the currently processed
# line.
while index < len(self._buffer):
while self._buffer[index]:
# Make sure buffer does not start with whitespace
#PERF: `.lstrip()` does not reallocate if the string does
# not actually start with whitespace.
self._buffer[index] = self._buffer[index].lstrip()
# Handle case where the remainder of the line contained
# only whitespace
if not self._buffer[index]:
self._buffer[index] = None
continue
# Try decoding the partial data buffer and return results
# from this
#
# Use `pragma: no branch` as the final loop iteration will always
# raise if parsing didn't work out, rather then falling through
# to the `yield obj` line.
data = self._buffer[index]
for index2 in range(index, len(self._buffer)): # pragma: no branch
# If decoding doesn't succeed with the currently
# selected buffer (very unlikely with our current
# class of input data) then retry with appending
# any other pending pieces of input data
# This will happen with JSON data that contains
# arbitrary new-lines: "{1:\n2,\n3:4}"
if index2 > index:
data += "\n" + self._buffer[index2]
try:
(obj, offset) = self._decoder2.raw_decode(data)
except ValueError:
# Treat error as fatal if we have already added
# the final buffer to the input
if (index2 + 1) == len(self._buffer):
raise
else:
index = index2
break
# Decoding succeeded – yield result and shorten buffer
yield obj
if offset < len(self._buffer[index]):
self._buffer[index] = self._buffer[index][offset:]
else:
self._buffer[index] = None
index += 1
except ValueError as error:
# It is unfortunately not possible to reliably detect whether
# parsing ended because of an error *within* the JSON string, or
# an unexpected *end* of the JSON string.
# We therefor have to assume that any error that occurs here
# *might* be related to the JSON parser hitting EOF and therefor
# have to postpone error reporting until `parse_finalize` is
# called.
self._lasterror = error
finally:
# Remove all processed buffers
del self._buffer[0:index]
[docs] def parse_finalize(self) -> ty.Generator[utils.json_value_t, ty.Any, ty.Any]:
"""Raises errors for incomplete buffered data that could not be parsed
because the end of the input data has been reached.
Raises
------
~ipfshttpclient.exceptions.DecodingError
"""
try:
try:
# Raise exception for remaining bytes in bytes decoder
self._decoder1.decode(b'', True)
except UnicodeDecodeError as error:
raise exceptions.DecodingError('json', error) from error
# Late raise errors that looked like they could have been fixed if
# the caller had provided more data
if self._buffer and self._lasterror:
raise exceptions.DecodingError('json', self._lasterror) from self._lasterror
finally:
# Reset state
self._buffer = []
self._lasterror = None
self._decoder1.reset()
return empty_gen()
[docs] def encode(self, obj: utils.json_value_t) -> bytes:
"""Returns ``obj`` serialized as JSON formatted bytes
Raises
------
~ipfshttpclient.exceptions.EncodingError
Parameters
----------
obj
JSON serializable Python object
"""
try:
result = json.dumps(obj, sort_keys=True, indent=None,
separators=(',', ':'), ensure_ascii=False)
return result.encode("utf-8")
except (UnicodeEncodeError, TypeError) as error:
raise exceptions.EncodingError('json', error) from error
# encodings supported by the IPFS api (default is JSON)
__encodings = {
Dummy.name: Dummy,
Json.name: Json,
} # type: ty.Dict[str, ty.Type[Encoding[ty.Any]]]
@ty.overload
def get_encoding(name: ty_ext.Literal["none"]) -> Dummy:
...
@ty.overload # noqa: E302
def get_encoding(name: ty_ext.Literal["json"]) -> Json:
...
[docs]def get_encoding(name: str) -> Encoding[ty.Any]: # noqa: E302
"""Returns an Encoder object for the given encoding name
Raises
------
~ipfshttpclient.exceptions.EncoderMissingError
Parameters
----------
name
Encoding name. Supported options:
* ``"none"``
* ``"json"``
"""
try:
return __encodings[name.lower()]()
except KeyError:
raise exceptions.EncoderMissingError(name) from None