Source code for _delb.builder

# Copyright (C) 2018-'25  Frank Sachsenheim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.


# this module is intentionally not duplicated in the `delb` package

from __future__ import annotations

import warnings
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, overload, Final, Optional

from _delb.exceptions import ParsingValidityError
from _delb.names import XML_NAMESPACE
from _delb.nodes import (
    _reduce_whitespace_between_siblings,
    Attribute,
    CommentNode,
    ProcessingInstructionNode,
    _TagDefinition,
    TagNode,
    TextNode,
)
from _delb.parser import Event, EventType, ParserOptions, TagEventData, parse_events
from _delb.typing import XMLNodeType

if TYPE_CHECKING:
    from collections.abc import Iterator

    from _delb.typing import (
        AttributeAccessor,
        NodeSource,
        InputStream,
        TagNodeType,
    )


# defining tag node templates


@overload
def tag(local_name: str): ...


@overload
def tag(local_name: str, attributes: Mapping[AttributeAccessor, str]): ...


@overload
def tag(local_name: str, child: NodeSource): ...


@overload
def tag(local_name: str, children: Sequence[NodeSource]): ...


@overload
def tag(
    local_name: str, attributes: Mapping[AttributeAccessor, str], child: NodeSource
): ...


@overload
def tag(
    local_name: str,
    attributes: Mapping[AttributeAccessor, str],
    children: Sequence[NodeSource],
): ...


[docs] def tag(*args): # noqa: C901 """ This function can be used for in-place creation (or call it templating if you want to) of :class:`delb.nodes.TagNode` instances as: - ``node`` argument to methods that add nodes to a tree - items in the ``children`` argument of :class:`delb.nodes.TagNode` The first argument to the function is always the local name of the tag node. Optionally, the second argument can be a :term:`mapping` that specifies attributes for that node. The optional last argument is either a single object that will be appended as child node or a sequence of such, these objects can be node instances of any type, strings (for derived :class:`delb.nodes.TextNode` instances) or other definitions from this function (for derived :class:`delb.nodes.TagNode` instances). The actual nodes that are constructed always inherit the namespace of the context node they are created in. That also applies to attribute names that are given as single string. >>> root = TagNode('root', children=[ ... tag("head", {"lvl": "1"}, "Hello!"), ... tag("items", ( ... tag("item1"), ... tag("item2"), ... ) ... ) ... ]) >>> str(root) '<root><head lvl="1">Hello!</head><items><item1/><item2/></items></root>' >>> root.append_children(tag("addendum")) # doctest: +ELLIPSIS (<TagNode("{}addendum", {}, /*/*[3]) [0x...]>,) >>> str(root)[-26:] '</items><addendum/></root>' """ def prepare_attributes( attributes: Mapping[AttributeAccessor, str], ) -> dict[AttributeAccessor, str]: result: dict[AttributeAccessor, str] = {} for key, value in attributes.items(): match value: case Attribute(): result[(value.namespace, value.local_name)] = value.value case str() | tuple(): result[key] = value case _: raise TypeError return result if len(args) == 1: return _TagDefinition(local_name=args[0]) if len(args) == 2: second_arg = args[1] if isinstance(second_arg, Mapping): return _TagDefinition( local_name=args[0], attributes=prepare_attributes(second_arg) ) if isinstance(second_arg, (str, XMLNodeType, _TagDefinition)): return _TagDefinition(local_name=args[0], children=(second_arg,)) if isinstance(second_arg, Sequence): if not all( isinstance(x, (str, XMLNodeType, _TagDefinition)) for x in second_arg ): raise TypeError( "Either node instances, strings or objects from :func:`delb.tag` " "must be provided as children argument." ) return _TagDefinition(local_name=args[0], children=tuple(second_arg)) if len(args) == 3: third_arg = args[2] if isinstance(third_arg, (str, XMLNodeType, _TagDefinition)): return _TagDefinition( local_name=args[0], attributes=prepare_attributes(args[1]), children=(third_arg,), ) if isinstance(third_arg, Sequence): if not all( isinstance(x, (str, XMLNodeType, _TagDefinition)) for x in third_arg ): raise TypeError( "Either node instances, strings or objects from :func:`delb.tag` " "must be provided as children argument." ) return _TagDefinition( local_name=args[0], attributes=prepare_attributes(args[1]), children=tuple(third_arg), ) raise ValueError("Unrecognized arguments.")
# deserializing streams class TreeBuilder: __slots__ = ( "children", "event_feed", "options", "preserve_space", "started_tags", "xml_ids", ) def __init__( self, data: InputStream, parse_options: ParserOptions, base_url: str | None ): self.children: Final[list[list[XMLNodeType]]] = [] self.event_feed: Final = parse_events(data, parse_options, base_url) self.options: Final = parse_options self.preserve_space: Final[list[bool]] = [] self.started_tags: Final[list[TagNodeType]] = [] self.xml_ids: Final[set[str]] = set() def __iter__(self): return self def __next__(self) -> XMLNodeType: while True: event = next(self.event_feed) result = self.handle_event(event) if result is not None: return result def handle_event(self, event: Event) -> XMLNodeType | None: result: XMLNodeType | None type_, data = event match type_: case EventType.Comment: assert isinstance(data, str) result = CommentNode(data) case EventType.ProcessingInstruction: assert isinstance(data, tuple) result = ProcessingInstructionNode(data[0], data[1]) case EventType.TagStart: assert isinstance(data, TagEventData) self.handle_tag_start(data) result = None case EventType.TagEnd: assert data is None or isinstance(data, TagEventData) result = self.handle_tag_end(data) case EventType.Text: assert isinstance(data, str) result = TextNode(data) if result is not None: if self.started_tags: self.children[-1].append(result) else: assert not self.children assert not self.started_tags assert not self.preserve_space self.xml_ids.clear() return result return None def handle_tag_end(self, data: TagEventData | None) -> TagNodeType: result = self.started_tags.pop() if __debug__ and data: assert result.namespace == data.namespace assert result.local_name == data.local_name if data.attributes: assert result.attributes == data.attributes children = self.children.pop() if (not self.preserve_space.pop()) and children: _reduce_whitespace_between_siblings(children) for node in children: result._child_nodes.append(node) return result def handle_tag_start(self, data): assert isinstance(data.namespace, str) if (id_ := data.attributes.get((XML_NAMESPACE, "id"))) is not None: if id_ in self.xml_ids: raise ParsingValidityError(f"Redundantly used xml:id: {id_}") else: self.xml_ids.add(id_) self.children.append([]) self.started_tags.append( TagNode( local_name=data.local_name, attributes=data.attributes, namespace=data.namespace, ) ) if not self.options.reduce_whitespace: self.preserve_space.append(True) return xml_space = data.attributes.get((XML_NAMESPACE, "space")) if xml_space not in (None, "default", "preserve"): warnings.warn( "Encountered and ignoring an invalid `xml:space` attribute: " + xml_space, category=UserWarning, ) if not self.preserve_space: self.preserve_space.append(xml_space == "preserve") return match xml_space: case None: # most common self.preserve_space.append(self.preserve_space[-1]) case "default": self.preserve_space.append(False) case "preserve": self.preserve_space.append(True) case _: # invalid values self.preserve_space.append(self.preserve_space[-1])
[docs] def parse_nodes( data: InputStream, options: Optional[ParserOptions] = None, *, base_url: str | None = None, ) -> Iterator[XMLNodeType]: """Parses the provided input data to a sequence of nodes.""" if options is None: options = ParserOptions() yield from TreeBuilder(data, options, base_url)
[docs] def parse_tree( data: InputStream, options: Optional[ParserOptions] = None, *, base_url: str | None = None, ) -> XMLNodeType: """Parses the provided input to a single node.""" result = None for node in parse_nodes(data, options, base_url=base_url): if result is not None: raise ParsingValidityError("The stream contained extra contents.", node) result = node assert result is not None return result
__all__ = (parse_nodes.__name__, parse_tree.__name__, tag.__name__)