import io
from abc import ABC, abstractmethod
from contextlib import suppress
from distutils.util import strtobool
from typing import BinaryIO, Optional
from xml.etree import ElementTree
from . import mapper, utils
from .stream import BufferedStream
from .tag import Tag
from .tree import BuilderBase, TlvError, TlvNode, Tree, TreeBuilder
[docs]class ParserError(TlvError):
def __init__(
self,
message: str,
*,
tag: Tag = None,
offset: int = None,
element: ElementTree.Element = None,
):
kwargs = {}
if tag is not None:
kwargs["tag"] = repr(tag)
if offset is not None:
kwargs["offset"] = str(offset)
if element is not None:
string = ElementTree.tostring(element, encoding="unicode")
kwargs["element"] = f"'{string.rstrip()}'"
super().__init__(f"error while parsing the {message}", **kwargs)
[docs]class InsufficientDataError(ParserError):
def __init__(
self, message: str, **kwargs,
):
super().__init__(f"{message}, insufficient data", **kwargs)
[docs]class ParserBase(ABC):
def __init__(self, *, target: BuilderBase = None):
self.target = target or TreeBuilder()
[docs] @abstractmethod
def close(self) -> Tree:
"""Close the parser and return the tree."""
[docs] @abstractmethod
def feed(self, data: bytes) -> None:
"""Feed data to the parser."""
[docs]class BinaryParser(ParserBase):
def __init__(self, *, target: BuilderBase = None):
super().__init__(target=target)
self.stream = BufferedStream()
[docs] def close(self) -> Tree:
"""Close the parser and return the tree."""
try:
while not self.stream.is_eof():
self._parse()
finally:
self.stream.close()
return self.target.close()
[docs] def feed(self, data: bytes) -> None:
"""Feed data to the parser."""
self.stream.push(data)
with suppress(InsufficientDataError):
while not self.stream.is_eof():
with self.stream.rollback(InsufficientDataError):
self._parse()
def _parse(self) -> TlvNode:
tag = self._parse_tag()
length = self._parse_length(tag)
if self.stream.size() < length:
message = "value"
if tag.is_constructed():
message = "constructed value"
raise InsufficientDataError(message, tag=tag, offset=self.stream.tell())
self.target.start(tag)
if tag.is_constructed():
self._parse_children(length)
else:
data = self._parse_value(tag, length)
self.target.data(data)
return self.target.end(tag)
def _parse_tag(self) -> Tag:
try:
number = bytearray([next(self.stream)])
if number[-1] & 0x1F == 0x1F:
number.append(next(self.stream))
while number[-1] & 0x80 == 0x80:
number.append(next(self.stream))
tag = Tag(number)
except StopIteration as e:
raise InsufficientDataError("tag", offset=self.stream.tell()) from e
except ValueError as e:
raise ParserError("tag", offset=self.stream.tell()) from e
return tag
def _parse_length(self, tag: Tag) -> int:
try:
length = next(self.stream)
if length == 0x80:
raise NotImplementedError("indefinite length form is not supported")
if length & 0x80 == 0x80:
length_data = bytes([next(self.stream) for _ in range(length & 0x7F)])
length = int(length_data.hex(), 16)
except StopIteration as e:
raise InsufficientDataError(
"length", tag=tag, offset=self.stream.tell()
) from e
except ValueError as e:
raise ParserError("length", tag=tag, offset=self.stream.tell()) from e
return length
def _parse_value(self, tag: Tag, length: int) -> bytes:
try:
value = bytes([next(self.stream) for _ in range(length)])
except StopIteration as e:
raise InsufficientDataError(
"value", tag=tag, offset=self.stream.tell()
) from e
except ValueError as e:
raise ParserError("value", tag=tag, offset=self.stream.tell()) from e
return value
def _parse_children(self, length: int) -> None:
end = self.stream.tell() + length
while self.stream.tell() < end:
self._parse()
[docs]class XmlParser(ParserBase):
def __init__(self, *, target: BuilderBase = None):
super().__init__(target=target)
self.parser = ElementTree.XMLPullParser(["start", "end"])
[docs] def close(self) -> Tree:
"""Close the parser and return the tree."""
self.parser.close()
return self.target.close()
[docs] def feed(self, data: bytes) -> None:
"""Feed data to the parser."""
self.parser.feed(data)
for event in self.parser.read_events():
self._parse(event)
def _parse(self, event: (str, ElementTree.Element)) -> Optional[TlvNode]:
event_type, element = event
if element.tag.capitalize() == "Tlv":
return None
mapper.decode(element)
tag = self._parse_tag(element)
node = None
if event_type == "start":
self.target.start(tag)
else:
assert event_type == "end"
if not tag.is_constructed():
data = self._parse_value(tag, element)
self.target.data(data)
node = self.target.end(tag)
return node
@staticmethod
def _parse_tag(element: ElementTree.Element) -> Tag:
try:
tag_attr = element.get("Tag")
if not tag_attr:
raise ValueError("missing 'Tag' attribute")
force_constructed = False
if element.tag == "Element":
# noinspection PyTypeChecker
force_constructed = bool(strtobool(element.get("Force", "False")))
tag = Tag.from_hex(tag_attr, force_constructed=force_constructed)
if tag.is_constructed():
assert element.tag == "Element"
else:
assert element.tag == "Primitive"
except Exception as e:
raise ParserError("tag", element=element) from e
return tag
@staticmethod
def _parse_value(tag: Tag, element: ElementTree.Element) -> bytes:
try:
value = bytes()
if element.text:
if element.get("Type") == "Hex" or not element.get("Type"):
value = utils.xml_text2hex(element)
elif element.get("Type") == "ASCII":
value = bytes(element.text, "iso8859_15")
else:
raise ValueError(
f"invalid 'Type' attribute value: {element.get('Type')}"
)
except Exception as e:
raise ParserError("value", tag=tag, element=element) from e
return value
[docs]def parse(fp: BinaryIO, parser: ParserBase) -> Tree:
"""Parse data read from the file-like object fp and return the tree."""
while True:
data = fp.read(8192)
if not data:
break
parser.feed(data)
return parser.close()
[docs]def parse_bytes(data: bytes, parser: ParserBase) -> Tree:
"""Parse data and return the tree."""
return parse(io.BytesIO(data), parser)