Source code for _delb.nodes

# Copyright (C) 2018-'25  Frank Sachsenheim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations

import gc
import warnings
from abc import abstractmethod, ABC
from collections import deque
from collections.abc import Iterable, Iterator, Mapping, MutableMapping, Sequence
from contextlib import contextmanager
from copy import copy, deepcopy
from io import StringIO, TextIOWrapper
from itertools import chain
from sys import getrefcount
from typing import (
    TYPE_CHECKING,
    cast,
    overload,
    Any,
    AnyStr,
    ClassVar as ClassWar,
    Final,
    Literal,
    NamedTuple,
    Optional,
)

from lxml import etree

from _delb.exceptions import AmbiguousTreeError, InvalidCodePath, InvalidOperation
from _delb.names import (
    GLOBAL_PREFIXES,
    XML_NAMESPACE,
    deconstruct_clark_notation,
    Namespaces,
)
from _delb.parser import ParserOptions
from _delb.utils import (
    _StringMixin,
    _crunch_whitespace,
    last,
    _random_unused_prefix,
    traverse_bf_ltr_ttb,
)
from _delb.xpath import (
    LegacyXPathExpression,
    QueryResults,
    _css_to_xpath,
)
from _delb.xpath import evaluate as evaluate_xpath, parse as parse_xpath
from _delb.xpath.ast import _DocumentNode, NameMatchTest, XPathExpression

if TYPE_CHECKING:
    from typing import TextIO

    from delb import Document
    from _delb.typing import (
        AttributeAccessor,
        Filter,
        NamespaceDeclarations,
        NodeSource,
    )


# shortcuts


Comment = etree.Comment
_Element = etree._Element
PI = etree.PI
QName = etree.QName


# constants


CTRL_CHAR_ENTITY_NAME_MAPPING: Final = (
    ("&", "amp"),
    (">", "gt"),
    ("<", "lt"),
    ('"', "quot"),
)
CCE_TABLE_FOR_ATTRIBUTES: Final = str.maketrans(
    {ord(k): f"&{v};" for k, v in CTRL_CHAR_ENTITY_NAME_MAPPING}
)
CCE_TABLE_FOR_TEXT: Final = str.maketrans(
    {ord(k): f"&{v};" for k, v in CTRL_CHAR_ENTITY_NAME_MAPPING if k != '"'}
)

DETACHED, DATA, TAIL, APPENDED = 0, 1, 2, 3


ATTRIBUTE_ACCESSOR_MSG: Final = (
    "An attribute name must be provided as string (either a local name or a "
    "universal in Clark notation) or as namespace and local name packed in a tuple."
)


# wrapper cache


class _WrapperCache:
    __slots__ = ("locks", "wrappers")

    def __init__(self):
        self.wrappers = {}
        self.locks = 0
        gc.callbacks.append(self.__gc_callback__)

    def __call__(self, element: _Element) -> _ElementWrappingNode:
        result = self.wrappers.get(element)
        if result is None:
            tag = element.tag
            if tag is Comment:
                result = CommentNode(element)
            elif tag is PI:
                result = ProcessingInstructionNode(element)
            else:
                result = TagNode(element)
            self.wrappers[element] = result
            # turn on when running tests, turn off when using a debugger:
            # assert getrefcount(result) == 3, getrefcount(result)  # noqa: E800
        return result

    def __enter__(self):
        self.locks += 1
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.locks -= 1
        return False

    def __gc_callback__(self, phase: str, info: dict):  # noqa: C901
        """
        This is the verbose implementation, please verify changes with this one:

        .. code-block:

            def __gc_callback__(self, phase: str, info: dict):
                if phase != "stop" or self.locks:
                    return

                for element, node in tuple(self.wrappers.items()):
                    node_user_references = (
                        getrefcount(node)
                        - 1  # `getrefcount`
                        - 1  # `self.wrappers`
                        - 1  # the iterated tuple
                        - 1  # `node`
                        - isinstance(node, TagNode)  # node.attributes.__node
                    )

                    # if referenced as a document's root node that itself isn't
                    # referenced:
                    if getattr(node, "__document__", None) is not None:
                        document_user_references = (
                            getrefcount(node.__document__)
                            - 1  # `getrefcount`
                            - 1  # `document.root.__document__
                            - 1  # `document.head_nodes._document`
                            - 1  # `document.tail_nodes._document`
                        )
                        assert document_user_references >= 0
                        if document_user_references == 0:
                            node_user_references -= 1

                    assert node_user_references >= 0
                    if node_user_references > 0:
                        continue

                    if isinstance(node, TagNode):
                        data_node = node._data_node
                        tail_node = node._tail_node
                        if self._appendees_are_referenced(
                            data_node
                        ) or self._appendees_are_referenced(tail_node):
                            continue
                        data_node._merge_appended_text_nodes()
                    elif isinstance(node, _ElementWrappingNode):
                        tail_node = node._tail_node
                        if self._appendees_are_referenced(tail_node):
                            continue
                    else:
                        raise RuntimeError

                    tail_node._merge_appended_text_nodes()
                    self.wrappers.pop(element)

            def _appendees_are_referenced(self, node: TextNode) -> bool:
                appendees = []
                current = node._appended_text_node
                while current is not None:
                    appendees.append(current)
                    current = current._appended_text_node

                if not appendees:
                    return False

                # the last item in a sequence of text nodes isn't pointed to as a
                # `_bound_to`:
                user_references = (
                    getrefcount(appendees.pop())
                    - 1  # `getrefcount`
                    - 1  # the predecessor's `_appended_text_node`
                )

                assert user_references >= 0
                if user_references:
                    return True

                for refcount in (getrefcount(text_node) for text_node in appendees):
                    user_references = (
                        refcount
                        - 1  # `getrefcount`
                        - 1  # the predecessor's `_appended_text_node`
                        - 1  # the subsequent's `_bound_to`
                        - 1  # `appendees`
                        - 1  # `text_node`
                    )
                    assert user_references >= 0
                    if user_references:
                        return True

                return False
        """
        if phase != "stop" or self.locks:
            return

        for element, node in tuple(self.wrappers.items()):
            if getrefcount(node) > (
                4
                + isinstance(node, TagNode)
                + (
                    getattr(node, "__document__", None) is not None
                    and getrefcount(node.__document__) == 4
                )
            ):
                continue

            skip = False
            tail_node = node._tail_node

            if isinstance(node, TagNode):
                # data node is checked first, assuming that appended text nodes tend
                # to be used in the depths of a tree
                data_node = node._data_node
                current = data_node._appended_text_node
                while current is not None:
                    _next = current._appended_text_node
                    if getrefcount(current) > 3 + (_next is not None):
                        skip = True
                        break
                    current = _next
                if skip:
                    continue

                current = tail_node._appended_text_node
                while current is not None:
                    _next = current._appended_text_node
                    if getrefcount(current) > 3 + (_next is not None):
                        skip = True
                        break
                    current = _next
                if skip:
                    continue

                data_node._merge_appended_text_nodes()

            else:  # isinstance(node, _ElementWrappingNode)
                current = tail_node._appended_text_node
                while current is not None:
                    _next = current._appended_text_node
                    if getrefcount(current) > 3 + (_next is not None):
                        skip = True
                        break
                    current = _next
                if skip:
                    continue

            tail_node._merge_appended_text_nodes()
            self.wrappers.pop(element)


_wrapper_cache = _WrapperCache()


# functions


[docs] def new_comment_node(content: str) -> CommentNode: """ Creates a new :class:`CommentNode`. :param content: The comment's content a.k.a. text. :return: The newly created comment node. """ CommentNode._validate_content(content) result = _wrapper_cache(etree.Comment(content)) assert isinstance(result, CommentNode) return result
[docs] def new_processing_instruction_node( target: str, content: str ) -> ProcessingInstructionNode: """ Creates a new :class:`ProcessingInstructionNode`. :param target: The processing instruction's target name. :param content: The processing instruction's text. :return: The newly created processing instruction node. """ ProcessingInstructionNode._validate_target_value(target) result = _wrapper_cache(etree.PI(target, content)) assert isinstance(result, ProcessingInstructionNode) return result
[docs] def new_tag_node( local_name: str, attributes: Optional[dict[AttributeAccessor, str] | TagAttributes] = None, namespace: Optional[str] = None, children: Iterable[NodeSource] = (), ) -> TagNode: """ Creates a new :class:`TagNode` instance outside any context. It is preferable to use the method ``new_tag_node`` on instances of documents and nodes where the namespace is inherited. :param local_name: The tag name. :param attributes: Optional attributes that are assigned to the new node. :param namespace: An optional tag namespace. :param children: An optional iterable of objects that will be appended as child nodes. This can be existing nodes, strings that will be inserted as text nodes and in-place definitions of :class:`TagNode` instances from :func:`tag`. The latter will be assigned to the same namespace. :return: The newly created tag node. """ result = _wrapper_cache( etree.Element(QName(namespace or None, local_name).text), ) assert isinstance(result, TagNode) if attributes is not None: for name, value in attributes.items(): result.attributes[name] = value for child in children: if isinstance(child, (str, NodeBase, _TagDefinition)): result.append_children(child) else: raise TypeError( "Either node instances, strings or objects from :func:`delb.tag` must " "be provided as children argument." ) return result
# abstract tag definitions
[docs] class _TagDefinition(NamedTuple): """ Instances of this class describe tag nodes that are constructed from the context they are used in (commonly additions to a tree) and the properties that this description holds. For the sake of slick code they are not instantiated directly, but with the :func:`delb.tag` function. """ local_name: str attributes: Optional[dict[AttributeAccessor, str]] = None children: tuple[NodeSource, ...] = ()
@overload def tag(local_name: str): # pragma: no cover ... @overload def tag( local_name: str, attributes: Mapping[AttributeAccessor, str] ): # pragma: no cover ... @overload def tag(local_name: str, child: NodeSource): # pragma: no cover ... @overload def tag(local_name: str, children: Sequence[NodeSource]): # pragma: no cover ... @overload def tag( local_name: str, attributes: Mapping[AttributeAccessor, str], child: NodeSource ): # pragma: no cover ... @overload def tag( local_name: str, attributes: Mapping[AttributeAccessor, str], children: Sequence[NodeSource], ): # pragma: no cover ...
[docs] def tag(*args): # noqa: C901 """ This function can be used for in-place creation (or call it templating if you want to) of :class:`TagNode` instances as: - ``node`` argument to methods that add nodes to a tree - items in the ``children`` argument of :func:`new_tag_node` The first argument to the function is always the local name of the tag node. Optionally, the second argument can be a :term:`mapping` that specifies attributes for that node. The optional last argument is either a single object that will be appended as child node or a sequence of such, these objects can be node instances of any type, strings (for derived :class:`TextNode` instances) or other definitions from this function (for derived :class:`TagNode` instances). The actual nodes that are constructed always inherit the namespace of the context node they are created in. >>> root = new_tag_node('root', children=[ ... tag("head", {"lvl": "1"}, "Hello!"), ... tag("items", ( ... tag("item1"), ... tag("item2"), ... ) ... ) ... ]) >>> str(root) '<root><head lvl="1">Hello!</head><items><item1/><item2/></items></root>' >>> root.append_children(tag("addendum")) # doctest: +ELLIPSIS (<TagNode("addendum", {}, /*/*[3]) [0x...]>,) >>> str(root)[-26:] '</items><addendum/></root>' """ def prepare_attributes( attributes: Mapping[AttributeAccessor, str] ) -> dict[str | tuple[str, str], str]: result: dict[str | tuple[str, str], str] = {} for key, value in attributes.items(): if isinstance(value, Attribute): result[value._qualified_name] = value.value elif isinstance(key, slice): result[(key.start, key.stop)] = value elif isinstance(key, (str, tuple)): result[key] = value else: raise TypeError return result if len(args) == 1: return _TagDefinition(local_name=args[0]) if len(args) == 2: second_arg = args[1] if isinstance(second_arg, Mapping): return _TagDefinition( local_name=args[0], attributes=prepare_attributes(second_arg) ) if isinstance(second_arg, (str, NodeBase, _TagDefinition)): return _TagDefinition(local_name=args[0], children=(second_arg,)) if isinstance(second_arg, Sequence): if not all( isinstance(x, (str, NodeBase, _TagDefinition)) for x in second_arg ): raise TypeError( "Either node instances, strings or objects from :func:`delb.tag` " "must be provided as children argument." ) return _TagDefinition(local_name=args[0], children=tuple(second_arg)) if len(args) == 3: third_arg = args[2] if isinstance(third_arg, (str, NodeBase, _TagDefinition)): return _TagDefinition( local_name=args[0], attributes=prepare_attributes(args[1]), children=(third_arg,), ) if isinstance(third_arg, Sequence): if not all( isinstance(x, (str, NodeBase, _TagDefinition)) for x in third_arg ): raise TypeError( "Either node instances, strings or objects from :func:`delb.tag` " "must be provided as children argument." ) return _TagDefinition( local_name=args[0], attributes=prepare_attributes(args[1]), children=tuple(third_arg), ) raise ValueError
# default filters def _is_tag_or_text_node(node: NodeBase) -> bool: return isinstance(node, (TagNode, TextNode)) default_filters: deque[tuple[Filter, ...]] = deque() default_filters.append((_is_tag_or_text_node,))
[docs] @contextmanager def altered_default_filters(*filter: Filter, extend: bool = False): """ This function can be either used as as :term:`context manager` or :term:`decorator` to define a set of :obj:`default_filters` for the encapsuled code block or callable. :param filter: The filters to set or append. :param extend: Extends the currently active filters with the given ones instead of replacing them. These are then applied in all operations that allow node filtering, like :meth:`TagNode.next_node`. Mind that they also affect a node's index property and indexed access to child nodes. >>> root = Document( ... '<root xmlns="foo"><a/><!--x--><b/><!--y--><c/></root>' ... ).root >>> with altered_default_filters(is_comment_node): ... print([x.content for x in root.iterate_children()]) ['x', 'y'] As the default filters shadow comments and processing instructions by default, use no argument to unset this in order to access all type of nodes. """ global default_filters if extend: default_filters.append(default_filters[-1] + filter) else: default_filters.append(filter) try: yield finally: default_filters.pop()
# attributes
[docs] class Attribute(_StringMixin): """ Attribute objects represent :term:`tag node`'s attributes. See the :meth:`delb.TagNode.attributes` documentation for capabilities. """ __slots__ = ("_attributes", "_detached_value", "_qualified_name") def __init__(self, attributes: TagAttributes, qualified_name: tuple[str, str]): self._attributes: TagAttributes | None = attributes self._detached_value: str | None = None self._qualified_name = qualified_name def __repr__(self): return ( f'<{self.__class__.__name__}({self.universal_name}="{self.value}")' f" [{hex(id(self))}]>" ) def _set_new_key(self, namespace: str, name: str): if self._qualified_name == (namespace, name): return attributes = self._attributes current = self._namespace, self.local_name assert attributes is not None attributes[(namespace, name)] = self.value self._qualified_name = (namespace, name) del attributes[current] self._attributes = attributes @property def local_name(self) -> str: """The attribute's local name.""" return self._qualified_name[1] @local_name.setter def local_name(self, name: str): self._set_new_key(self._namespace, name) @property def namespace(self) -> Optional[str]: """The attribute's namespace""" if result := self._qualified_name[0] or None: return result else: # pragma: nocover warnings.warn( "With delb 0.6 this will return an empty string to represent an " "empty/null namespace. You can use the the intermediate " "`Attribute._namespace` with that future behaviour.", category=DeprecationWarning, ) return None @namespace.setter def namespace(self, namespace: Optional[str]): if namespace is None: warnings.warn( "With delb 0.6 empty/null namespaces will have to be expressed as " "empty strings" ) self._set_new_key(namespace or "", self.local_name) @property def _namespace(self) -> str: return self._qualified_name[0] @property def universal_name(self) -> str: """ The attribute's namespace and local name in `Clark notation`_. .. _Clark notation: http://www.jclark.com/xml/xmlns.htm """ if namespace := self._namespace: return f"{{{namespace}}}{self.local_name}" else: return self.local_name @property def value(self) -> str: """The attribute's value.""" if self._attributes is None: assert self._detached_value is not None return self._detached_value else: value = self._attributes._etree_attrib[ self._attributes._etree_key(self._qualified_name) ] return value.decode() if isinstance(value, bytes) else value @value.setter def value(self, value: str): if self._attributes is None: self._detached_value = value else: etree_key = self._attributes._etree_key(self._qualified_name) self._attributes._etree_attrib[etree_key] = value # for utils._StringMixin: _data = value
class TagAttributes(MutableMapping): """ A data type to access a :term:`tag node`'s attributes. Note that due to the current lxml backend there's no distinction between attributes with no namespace and such in the default namespace, both variants access the same data. If you you managed yourself into a position where that matters, the only technical solution is to re-encode the document to not contain any empty namespace. This will not be an issue with delb 0.7, according to the plan. """ __default = object() __slots__ = ("_attributes", "_etree_attrib", "namespaces", "__node") def __init__(self, node: TagNode): self._attributes: dict[tuple[str, str], Attribute] = {} self._etree_attrib: etree._Attrib = node._etree_obj.attrib with warnings.catch_warnings(): warnings.simplefilter("ignore", category=DeprecationWarning) self.namespaces: Namespaces = node.namespaces self.__node = node def __contains__(self, item: Any) -> bool: return self._etree_key(self.__resolve_accessor(item)) in self._etree_attrib def __delitem__(self, item: AttributeAccessor): if item not in self: raise KeyError qualified_name = self.__resolve_accessor(item) attribute = self[qualified_name] assert attribute is not None attribute._detached_value = attribute.value del self._etree_attrib[self._etree_key(qualified_name)] del self._attributes[qualified_name] attribute._attributes = None def __eq__(self, other: Any) -> bool: if not isinstance(other, Mapping): return False if len(self) != len(other): return False if isinstance(other, TagAttributes): for key, attribute in self.items(): assert isinstance(attribute, Attribute) other_value = other.get((attribute._namespace, attribute.local_name)) if (other_value is None) or (attribute != other_value): return False return True return self.as_dict_with_strings() == other def _etree_key(self, item: tuple[str, str]) -> str: namespace, name = item if namespace and self.namespaces.get(None) != namespace: return f"{{{namespace}}}{name}" else: return name def __getitem__(self, item: AttributeAccessor) -> Attribute: if item in self: qualified_name = self.__resolve_accessor(item) result = self._attributes.get(qualified_name) if result is None: result = Attribute(self, qualified_name) self._attributes[qualified_name] = result return result else: raise KeyError(item) def __iter__(self) -> Iterator[tuple[str, str]]: results = [] for key in self._etree_attrib: assert isinstance(key, str) namespace, name = deconstruct_clark_notation(key) if namespace is None: namespace = self.namespaces.get(None, "") assert isinstance(namespace, str) results.append((namespace, name)) return iter(results) def __len__(self) -> int: return len(self._etree_attrib) def __setitem__(self, item: AttributeAccessor, value: str | Attribute): qualified_name = self.__resolve_accessor(item) key = self._etree_key(qualified_name) if isinstance(value, Attribute): value = value.value self._etree_attrib[key] = value self._attributes[qualified_name] = Attribute(self, qualified_name) def __str__(self): return str(self.as_dict_with_strings()) __repr__ = __str__ def __resolve_accessor(self, item: AttributeAccessor) -> tuple[str, str]: if isinstance(item, str): namespace, name = deconstruct_clark_notation(item) elif isinstance(item, slice): warnings.warn( "Using slices to denote fully qualified attribute names is deprecated." "Use two-value tuples (namespace, local name) instead.", category=DeprecationWarning, ) namespace, name = item.start, item.stop elif isinstance(item, tuple): namespace, name = item else: namespace = name = None if name is None: raise TypeError(ATTRIBUTE_ACCESSOR_MSG) if namespace is None: namespace = self.__node._namespace return namespace, name def as_dict_with_strings(self) -> dict[str, str]: """Returns the attributes as :class:`str` instances in a :class:`dict`.""" return {a.universal_name: a.value for a in self.values()} # nodes class NodeBase(ABC): __slots__ = ("__weakref__",) def __str__(self) -> str: return self.serialize( format_options=DefaultStringOptions.format_options, namespaces=DefaultStringOptions.namespaces, newline=DefaultStringOptions.newline, ) def add_following_siblings( self, *node: NodeSource, clone: bool = False ) -> tuple[NodeBase, ...]: """ Adds one or more nodes to the right of the node this method is called on. :param node: The node(s) to be added. :param clone: Clones the concrete nodes before adding if :obj:`True`. :return: The concrete nodes that were added. :meta category: Methods to add nodes to a tree The nodes can be concrete instances of any node type or rather abstract descriptions in the form of strings or objects returned from the :func:`tag` function that are used to derive :class:`TextNode` respectively :class:`TagNode` instances from. """ if not node: return () this, queue = self._prepare_new_relative(node, clone) self._validate_sibling_operation(this) self._add_following_sibling(this) result: tuple[NodeBase, ...] = (this,) if queue: result += this.add_following_siblings(*queue, clone=clone) return result @abstractmethod def _add_following_sibling(self, node: NodeBase): pass def add_preceding_siblings( self, *node: NodeSource, clone: bool = False ) -> tuple[NodeBase, ...]: """ Adds one or more nodes to the left of the node this method is called on. :param node: The node(s) to be added. :param clone: Clones the concrete nodes before adding if :obj:`True`. :return: The concrete nodes that were added. :meta category: Methods to add nodes to a tree The nodes can be concrete instances of any node type or rather abstract descriptions in the form of strings or objects returned from the :func:`tag` function that are used to derive :class:`TextNode` respectively :class:`TagNode` instances from. """ if not node: return () this, queue = self._prepare_new_relative(node, clone) self._validate_sibling_operation(this) self._add_preceding_sibling(this) result: tuple[NodeBase, ...] = (this,) if queue: result += this.add_preceding_siblings(*queue, clone=clone) return result @abstractmethod def _add_preceding_sibling(self, node: NodeBase): pass @abstractmethod def clone( self, deep: bool = False, quick_and_unsafe: bool | None = None ) -> NodeBase: """ Creates a new node of the same type with duplicated contents. :param deep: Clones the whole subtree if :obj:`True`. :param quick_and_unsafe: Deprecated. :return: A copy of the node. """ pass @property @abstractmethod def depth(self) -> int: """ The depth (or level) of the node in its tree. :meta category: Node properties """ pass @abstractmethod def detach(self, retain_child_nodes: bool = False) -> NodeBase: """ Removes the node from its tree. :param retain_child_nodes: Keeps the node's descendants in the originating tree if :obj:`True`. :return: The removed node. :meta category: Methods to remove a node """ pass @property @abstractmethod def document(self) -> Optional[Document]: """ The :class:`Document` instance that the node is associated with or :obj:`None`. :meta category: Related document and nodes properties """ pass def fetch_following(self, *filter: Filter) -> Optional[NodeBase]: """ Retrieves the next filter matching node on the following axis. :param filter: Any number of :term:`filter` s. :return: The next node in document order that matches all filters or :obj:`None`. :meta category: Methods to fetch a relative node """ try: return next(self.iterate_following(*filter)) except StopIteration: return None def fetch_following_sibling(self, *filter: Filter) -> Optional[NodeBase]: """ Retrieves the next filter matching node on the following-sibling axis. :param filter: Any number of :term:`filter` s. :return: The next sibling to the right that matches all filters or :obj:`None`. :meta category: Methods to fetch a relative node """ all_filters = default_filters[-1] + filter candidate = self._fetch_following_sibling() while candidate is not None: if all(f(candidate) for f in all_filters): return candidate candidate = candidate._fetch_following_sibling() return None @abstractmethod def _fetch_following_sibling(self): pass def fetch_preceding(self, *filter: Filter) -> Optional[NodeBase]: """ Retrieves the next filter matching node on the preceding axis. :param filter: Any number of :term:`filter` s. :return: The previous node in document order that matches all filters or :obj:`None`. :meta category: Methods to fetch a relative node """ try: return next(self.iterate_preceding(*filter)) except StopIteration: return None @abstractmethod def fetch_preceding_sibling(self, *filter: Filter) -> Optional[NodeBase]: """ Retrieves the next filter matching node on the preceding-sibling axis. :param filter: Any number of :term:`filter` s. :return: The next sibling to the left that matches all filters or :obj:`None`. :meta category: Methods to fetch a relative node """ pass @property @abstractmethod def first_child(self) -> Optional[NodeBase]: """ The node's first child node. :meta category: Related document and nodes properties """ pass @property @abstractmethod def full_text(self) -> str: """ The concatenated contents of all text node descendants in document order. :meta category: Node content properties """ pass @property def index(self) -> Optional[int]: """ The node's index within the parent's collection of child nodes or :obj:`None` when the node has no parent. :meta category: Node properties """ parent = self.parent if parent is None: return None for index, node in enumerate(parent.iterate_children()): if node is self: return index raise InvalidCodePath def iterate_ancestors(self, *filter: Filter) -> Iterator[TagNode]: """ Iterator over the filter matching nodes on the ancestor axis. :param filter: Any number of :term:`filter` s that a node must match to be yielded. :return: A :term:`generator iterator` that yields the ancestor nodes from bottom to top. :meta category: Methods to iterate over related node """ parent = self.parent if parent: if all(f(parent) for f in filter): yield parent yield from parent.iterate_ancestors(*filter) @abstractmethod def iterate_children(self, *filter: Filter) -> Iterator[NodeBase]: """ Iterator over the filter matching nodes on the child axis. :param filter: Any number of :term:`filter` s that a node must match to be yielded. :return: A :term:`generator iterator` that yields the child nodes of the node. :meta category: Methods to iterate over related node """ pass @abstractmethod def iterate_descendants(self, *filter: Filter) -> Iterator[NodeBase]: """ Iterator over the filter matching nodes on the ancestor axis. :param filter: Any number of :term:`filter` s that a node must match to be yielded. :return: A :term:`generator iterator` that yields the descending nodes of the node. :meta category: Methods to iterate over related node """ pass def iterate_following(self, *filter: Filter) -> Iterator[NodeBase]: """ Iterator over the filter matching nodes on the following axis. :param filter: Any number of :term:`filter` s that a node must match to be yielded. :return: A :term:`generator iterator` that yields the following nodes in document order. :meta category: Methods to iterate over related node """ for node in self._iterate_following(): if all(f(node) for f in chain(default_filters[-1], filter)): yield node def _iterate_following(self) -> Iterator[NodeBase]: def next_sibling_of_an_ancestor( node: NodeBase, ) -> Optional[_ElementWrappingNode]: parent = node.parent if parent is None: return None parents_next = parent._fetch_following_sibling() if parents_next is None: return next_sibling_of_an_ancestor(parent) return parents_next pointer = self while True: next_node = pointer.first_child if next_node is None: next_node = pointer._fetch_following_sibling() if next_node is None: next_node = next_sibling_of_an_ancestor(pointer) if next_node is None: return yield next_node pointer = next_node def iterate_following_siblings(self, *filter: Filter) -> Iterator[NodeBase]: """ Iterator over the filter matching nodes on the following-sibling axis. :param filter: Any number of :term:`filter` s that a node must match to be yielded. :return: A :term:`generator iterator` that yields the siblings to the node's right. :meta category: Methods to iterate over related node """ next_node = self.fetch_following_sibling(*filter) while next_node is not None: yield next_node next_node = next_node.fetch_following_sibling(*filter) def iterate_preceding(self, *filter: Filter) -> Iterator[NodeBase]: """ Iterator over the filter matching nodes on the preceding axis. :param filter: Any number of :term:`filter` s that a node must match to be yielded. :return: A :term:`generator iterator` that yields the previous nodes in document order. :meta category: Methods to iterate over related node """ for node in self._iterate_preceding(): if all(f(node) for f in filter): yield node @altered_default_filters() def _iterate_preceding(self) -> Iterator[NodeBase]: def iter_children(node: NodeBase) -> Iterator[NodeBase]: for child_node in reversed(tuple(node.iterate_children())): yield from iter_children(child_node) yield child_node pointer: NodeBase | None = self assert pointer is not None last_yield: NodeBase = pointer while True: pointer = pointer.fetch_preceding_sibling() if pointer is not None: yield from iter_children(pointer) yield pointer last_yield = pointer else: parent = last_yield.parent if parent is None: return yield parent pointer = last_yield = parent def iterate_preceding_siblings(self, *filter: Filter) -> Iterator[NodeBase]: """ Iterator over the filter matching nodes on the preceding-sibling axis. :param filter: Any number of :term:`filter` s that a node must match to be yielded. :return: A :term:`generator iterator` that yields the siblings to the node's left. :meta category: Methods to iterate over related node """ previous_node = self.fetch_preceding_sibling(*filter) while previous_node is not None: yield previous_node previous_node = previous_node.fetch_preceding_sibling(*filter) @property @abstractmethod def last_child(self) -> Optional[NodeBase]: """ The node's last child node. :meta category: Related document and nodes properties """ pass @property @abstractmethod def last_descendant(self) -> Optional[NodeBase]: """ The node's last descendant. :meta category: Related document and nodes properties """ pass @property @abstractmethod def namespaces(self): """ The prefix to namespace :term:`mapping` of the node. :meta category: Node properties """ pass @abstractmethod def new_tag_node( self, local_name: str, attributes: Optional[dict[AttributeAccessor, str]] = None, namespace: Optional[str] = None, children: Sequence[NodeSource] = (), ) -> TagNode: """ Creates a new :class:`TagNode` instance in the node's context. :param local_name: The tag name. :param attributes: Optional attributes that are assigned to the new node. :param namespace: An optional tag namespace. If none is provided, the context node's namespace is inherited. :param children: An optional sequence of objects that will be appended as child nodes. This can be existing nodes, strings that will be inserted as text nodes and in-place definitions of :class:`TagNode` instances from :func:`tag`. The latter will be assigned to the same namespace. :return: The newly created tag node. """ pass def _new_tag_node_from( self, context: _Element, local_name: str, attributes: Optional[dict[AttributeAccessor, str]], namespace: Optional[str], children: Sequence[NodeSource], ) -> TagNode: tag: QName context_namespace = QName(context).namespace if namespace: tag = QName(namespace or None, local_name) elif context_namespace: tag = QName(context_namespace, local_name) else: tag = QName(local_name) result = _wrapper_cache( context.makeelement( tag.text, # TODO https://github.com/lxml/lxml-stubs/issues/62 nsmap=context.nsmap, # type: ignore ), ) assert isinstance(result, TagNode) if attributes is not None: for name, value in attributes.items(): result.attributes[name] = value assert isinstance(result, TagNode) if children: result.append_children(*children) return result def _new_tag_node_from_definition(self, definition: _TagDefinition) -> TagNode: return self.parent._new_tag_node_from_definition(definition) @property @abstractmethod def parent(self): """ The node's parent or :obj:`None`. :meta category: Related document and nodes properties """ pass @altered_default_filters() def _prepare_new_relative( self, nodes: tuple[NodeSource, ...], clone: bool ) -> tuple[NodeBase, list[NodeSource]]: this, *queue = nodes if isinstance(this, str): this = TextNode(this) elif isinstance(this, NodeBase): if clone: this = this.clone(deep=True) elif isinstance(this, _TagDefinition): this = self._new_tag_node_from_definition(this) else: raise TypeError( "Either node instances, strings or objects from :func:`delb.tag` must " "be provided as children argument." ) if not all( x is None for x in ( this.parent, this._fetch_following_sibling(), this.fetch_preceding_sibling(), ) ): raise InvalidOperation( "A node that shall be added to a tree must have neither a parent nor " "any sibling node. Use :meth:`NodeBase.detach` or a `clone` argument " "to move a node within or between trees." ) return this, queue def replace_with(self, node: NodeSource, clone: bool = False) -> NodeBase: """ Removes the node and places the given one in its tree location. The node can be a concrete instance of any node type or a rather abstract description in the form of a string or an object returned from the :func:`tag` function that is used to derive a :class:`TextNode` respectively :class:`TagNode` instance from. :param node: The replacing node. :param clone: A concrete, replacing node is cloned if :obj:`True`. :return: The removed node. :meta category: Methods to remove a node """ if self.parent is None: raise InvalidOperation( "Cannot replace a root node of a tree. Maybe you want to set the " "`root` property of a Document instance?" ) self.add_following_siblings(node, clone=clone) return self.detach() @altered_default_filters() def serialize( self, *, format_options: Optional[FormatOptions] = None, namespaces: Optional[NamespaceDeclarations] = None, newline: Optional[str] = None, ): """ Returns a string that contains the serialization of the node. See :doc:`/api/serialization` for details. :param format_options: An instance of :class:`FormatOptions` can be provided to configure formatting. :param namespaces: A mapping of prefixes to namespaces. These are overriding possible declarations from a parsed serialisat that the document instance stems from. Prefixes for undeclared namespaces are enumerated with the prefix ``ns``. :param newline: See :class:`io.TextIOWrapper` for a detailed explanation of the parameter with the same name. """ serializer = _get_serializer( _StringWriter(newline=newline), format_options=format_options, namespaces=namespaces, ) with _wrapper_cache: serializer.serialize_node(self) return serializer.writer.result def _validate_sibling_operation(self, node): if self.parent is None and not ( # is happening among valid root node siblings isinstance(node, (CommentNode, ProcessingInstructionNode)) and (isinstance(self, (CommentNode, ProcessingInstructionNode))) ): raise InvalidOperation( "Not all node types can be added as siblings to a root node." ) @altered_default_filters() def xpath( self, expression: str, namespaces: Optional[NamespaceDeclarations] = None, ) -> QueryResults: """ Queries the tree with an XPath expression with this node as initial context node. :param expression: A supported XPath 1.0 expression that contains one or more location paths. :param namespaces: A mapping of prefixes that are used in the expression to namespaces. The declarations that were used in a document's source serialisat serve as fallback. :return: All nodes that match the evaluation of the provided XPath expression. :meta category: Methods to query the tree See :doc:`/api/querying` for details on the extent of the XPath implementation. """ return evaluate_xpath(node=self, expression=expression, namespaces=namespaces) class _ChildLessNode(NodeBase): """Node types using this mixin also can't be root nodes of a document.""" __slots__ = () first_child = None """ The node's first child. """ last_child = None """ The node's last child node. """ last_descendant = None """ The node's last descendant. """ @property def depth(self) -> int: return cast("TagNode", self.parent).depth + 1 @property def document(self) -> Optional[Document]: parent = self.parent if parent is None: return None return parent.document def iterate_children(self, *filter: Filter) -> Iterator[NodeBase]: """ A :term:`generator iterator` that yields nothing. :meta category: Methods to iterate over related node """ yield from () def iterate_descendants(self, *filter: Filter) -> Iterator[NodeBase]: """ A :term:`generator iterator` that yields nothing. :meta category: Methods to iterate over related node """ yield from () def new_tag_node( self, local_name: str, attributes: Optional[dict[AttributeAccessor, str]] = None, namespace: Optional[str] = None, children: Sequence[str | NodeBase | _TagDefinition] = (), ) -> TagNode: warnings.warn( "The node methods `new_tag_node` will be removed. Use :func:`new_tag_node`" "instead.", category=DeprecationWarning, ) parent = self.parent if parent is None: return new_tag_node( local_name=local_name, attributes=attributes, namespace=namespace, children=children, ) else: return parent.new_tag_node( local_name=local_name, attributes=attributes, namespace=namespace, children=children, ) class _ElementWrappingNode(NodeBase): __slots__ = ("_etree_obj", "__namespaces", "_tail_node") def __init__(self, etree_element: _Element): self._etree_obj = etree_element self._tail_node = TextNode(etree_element, position=TAIL) def __copy__(self) -> _ElementWrappingNode: return self.clone(deep=False) def __deepcopy__(self, memodict=None) -> _ElementWrappingNode: return self.clone(deep=True) def _add_following_sibling(self, node: NodeBase): if isinstance(node, _ElementWrappingNode): my_old_tail = self._tail_node if self._tail_node._exists: my_old_tail._bind_to_tail(node) self._etree_obj.tail = None self._etree_obj.addnext(node._etree_obj) self._tail_node = TextNode(self._etree_obj, TAIL) assert self._tail_node is not my_old_tail assert node._tail_node is my_old_tail else: self._etree_obj.addnext(node._etree_obj) assert self._tail_node is my_old_tail assert node._tail_node is not my_old_tail elif isinstance(node, TextNode): assert node._position is DETACHED assert node._appended_text_node is None if self._tail_node._exists: my_old_tail = self._tail_node my_old_tail_content = my_old_tail.content node._bind_to_tail(self) node._appended_text_node = my_old_tail my_old_tail._bound_to = node my_old_tail._position = APPENDED my_old_tail.content = my_old_tail_content else: node._bind_to_tail(self) def _add_preceding_sibling(self, node: NodeBase): previous = self.fetch_preceding_sibling() if previous is None: if isinstance(node, _ElementWrappingNode): self._etree_obj.addprevious(node._etree_obj) else: assert isinstance(node, TextNode) parent = self.parent assert parent is not None assert not parent._data_node._exists node._bind_to_data(parent) else: previous._add_following_sibling(node) def clone( self, deep: bool = False, quick_and_unsafe: bool | None = None ) -> _ElementWrappingNode: if quick_and_unsafe is not None: warnings.warn( "The `quick_and_unsafe` argument will be removed.", category=DeprecationWarning, ) etree_clone = copy(self._etree_obj) etree_clone.tail = None return _wrapper_cache(etree_clone) @altered_default_filters() def detach(self, retain_child_nodes: bool = False) -> _ElementWrappingNode: parent = self.parent if parent is None: return self etree_obj = self._etree_obj if self._tail_node._exists: if self.index == 0: self._tail_node._bind_to_data(parent) else: previous_node = self.fetch_preceding_sibling() if isinstance(previous_node, _ElementWrappingNode): self._tail_node._bind_to_tail(previous_node) elif isinstance(previous_node, TextNode): previous_node._insert_text_node_as_next_appended(self._tail_node) else: raise InvalidCodePath etree_obj.tail = None self._tail_node = TextNode(etree_obj, position=TAIL) cast("_Element", etree_obj.getparent()).remove(etree_obj) return self def _fetch_following_sibling(self) -> Optional[NodeBase]: if self._tail_node._exists: return self._tail_node next_etree_obj = self._etree_obj.getnext() if next_etree_obj is None: return None return _wrapper_cache(next_etree_obj) def fetch_preceding_sibling(self, *filter: Filter) -> Optional[NodeBase]: candidate: NodeBase | None = None previous_etree_obj = self._etree_obj.getprevious() if previous_etree_obj is None: parent = self.parent if parent is not None and parent._data_node._exists: candidate = parent._data_node assert isinstance(candidate, TextNode) while candidate._appended_text_node: candidate = candidate._appended_text_node else: wrapper_of_previous = _wrapper_cache(previous_etree_obj) if wrapper_of_previous._tail_node._exists: candidate = wrapper_of_previous._tail_node assert isinstance(candidate, TextNode) while candidate._appended_text_node: candidate = candidate._appended_text_node else: candidate = wrapper_of_previous if candidate is None: return None if all(f(candidate) for f in chain(default_filters[-1], filter)): return candidate else: return candidate.fetch_preceding_sibling(*filter) @property def full_text(self) -> str: return "" @property def namespaces(self) -> Namespaces: warnings.warn( "The NodeBase.namespaces attribute is deprecated. " "Future versions will not sustain namespace declarations from parsed " "source streams.", category=DeprecationWarning, ) return Namespaces(self._etree_obj.nsmap) @property def parent(self) -> Optional[TagNode]: etree_parent = self._etree_obj.getparent() if etree_parent is None: return None result = _wrapper_cache(etree_parent) assert isinstance(result, TagNode) return result
[docs] class CommentNode(_ChildLessNode, _ElementWrappingNode, NodeBase): """ The instances of this class represent comment nodes of a tree. To instantiate new nodes use :func:`new_comment_node`. """ __slots__ = () def __eq__(self, other) -> bool: return isinstance(other, CommentNode) and self.content == other.content def __repr__(self) -> str: return f'<{self.__class__.__name__}("{self.content}") [{hex(id(self))}]>' def __str__(self) -> str: return f"<!--{self.content}-->" @property def content(self) -> str: """ The comment's text. :meta category: Node content properties """ return cast("str", self._etree_obj.text) @content.setter def content(self, value: str): self._validate_content(value) self._etree_obj.text = value @staticmethod def _validate_content(value: str): if "--" in value or value.endswith("-"): raise ValueError("Invalid Comment content.")
[docs] class ProcessingInstructionNode(_ChildLessNode, _ElementWrappingNode, NodeBase): """ The instances of this class represent processing instruction nodes of a tree. To instantiate new nodes use :func:`new_processing_instruction_node`. """ __slots__ = () def __eq__(self, other) -> bool: return ( isinstance(other, ProcessingInstructionNode) and self.target == other.target and self.content == other.content ) def __repr__(self) -> str: return ( f'<{self.__class__.__name__}("{self.target}", "{self.content}") ' f"[{hex(id(self))}]>" ) def __str__(self) -> str: return f"<?{self.target} {self.content}?>" @property def content(self) -> str: """ The processing instruction's text. :meta category: Node content properties """ return cast("str", self._etree_obj.text) @content.setter def content(self, value: str): self._etree_obj.text = value @property def target(self) -> str: """ The processing instruction's target. :meta category: Node content properties """ etree_obj = self._etree_obj assert isinstance(etree_obj, etree._ProcessingInstruction) return cast("str", etree_obj.target) @target.setter def target(self, value: str): self._validate_target_value(value) etree_obj = self._etree_obj assert isinstance(etree_obj, etree._ProcessingInstruction) etree_obj.target = value @staticmethod def _validate_target_value(value): if not value: # TODO this should rather validate that value is a valid XML name raise ValueError("Invalid target name.") if value.lower() == "xml": raise ValueError(f"{value} is a reserved target name.")
[docs] class TagNode(_ElementWrappingNode, NodeBase): """ The instances of this class represent :term:`tag node` s of a tree, the equivalent of DOM's elements. To instantiate new nodes use :func:`new_tag_node`. Some syntactic sugar is baked in: Attributes and nodes can be tested for membership in a node. >>> root = Document('<root ham="spam"><child/></root>').root >>> "ham" in root True >>> root.first_child in root True Nodes can be copied. Note that this relies on :meth:`TagNode.clone`. >>> from copy import copy, deepcopy >>> root = Document("<root>Content</root>").root >>> print(copy(root)) <root/> >>> print(deepcopy(root)) <root>Content</root> Attribute values and child nodes can be obtained, set and deleted with the subscript notation. >>> root = Document('<root x="y"><child_1/>child_2<child_3/></root>').root >>> print(root["x"]) y >>> print(root[0]) <child_1/> >>> print(root[-1]) <child_3/> >>> print([str(x) for x in root[1::-1]]) ['child_2', '<child_1/>'] How much child nodes has this node anyway? >>> root = Document("<root><child_1/><child_2/></root>").root >>> len(root) 2 >>> len(root[0]) 0 As seen in the examples above, a tag nodes string representation yields a serialized XML representation of a sub-/tree. See :doc:`/api/serialization` for details. """ __slots__ = ( "_attributes", "_data_node", "__document__", ) def __init__(self, etree_element: _Element): super().__init__(etree_element) self._data_node = TextNode(etree_element, position=DATA) self._attributes = TagAttributes(self) self.__document__: Document | None = None def __contains__(self, item: AttributeAccessor | NodeBase) -> bool: if isinstance(item, (slice, str, tuple)): return item in self.attributes elif isinstance(item, NodeBase): return any(n is item for n in self.iterate_children()) else: raise TypeError( "Argument must be a node instance or an attribute name. " + ATTRIBUTE_ACCESSOR_MSG ) def __delitem__(self, item: AttributeAccessor | int): if isinstance(item, (str, tuple)): del self.attributes[item] elif isinstance(item, int): self[item].detach(retain_child_nodes=False) elif isinstance(item, slice): if isinstance(item.start, int) or isinstance(item.stop, int): raise NotImplementedError( "This will be implemented in a later version." ) else: del self.attributes[(item.start, item.stop)] else: raise TypeError( "Argument must be an integer or an attribute name. " + ATTRIBUTE_ACCESSOR_MSG ) @overload def __getitem__(self, item: int) -> NodeBase: ... @overload def __getitem__(self, item: AttributeAccessor) -> Attribute | None: ... def __getitem__(self, item): if isinstance(item, (str, tuple)): return self.attributes[item] elif isinstance(item, int): if item < 0: item = len(self) + item for index, child_node in enumerate(self.iterate_children()): if index == item: return child_node raise IndexError("Node index out of range.") elif isinstance(item, slice): if all(isinstance(x, str) for x in (item.start, item.stop)): return self.attributes[item] elif all( (isinstance(x, int) or x is None) for x in (item.start, item.stop) ): return list(self.iterate_children())[item] raise TypeError( "Argument must be an integer as index for a child node, a " ":term:`slice` to grab an indexed range of nodes or an attribute " "name. " + ATTRIBUTE_ACCESSOR_MSG ) def __len__(self) -> int: i = 0 for i, _ in enumerate(self.iterate_children(), start=1): pass return i def __repr__(self) -> str: return ( f'<{self.__class__.__name__}("{self.universal_name}", ' f"{self.attributes}, {self.location_path}) [{hex(id(self))}]>" ) @overload def __setitem__(self, item: int, value: NodeSource): ... @overload def __setitem__(self, item: AttributeAccessor, value: str | Attribute): ... def __setitem__(self, item, value): if isinstance(item, (slice, str, tuple)): self.attributes[item] = value elif isinstance(item, int): children_size = len(self) if not children_size and item == 0: self.__add_first_child(value) else: if not 0 <= item < children_size: raise IndexError self[item].replace_with(value) else: raise TypeError( "Argument must be an integer or an attribute name. " + ATTRIBUTE_ACCESSOR_MSG ) def __add_first_child(self, node: NodeBase): assert not len(self) if isinstance(node, _ElementWrappingNode): self._etree_obj.append(node._etree_obj) elif isinstance(node, TextNode): node._bind_to_data(self)
[docs] def append_children( self, *node: NodeSource, clone: bool = False ) -> tuple[NodeBase, ...]: """ Adds one or more nodes as child nodes after any existing to the child nodes of the node this method is called on. :param node: The node(s) to be added. :param clone: Clones the concrete nodes before adding if :obj:`True`. :return: The concrete nodes that were appended. :meta category: Methods to add nodes to a tree The nodes can be concrete instances of any node type or rather abstract descriptions in the form of strings or objects returned from the :func:`tag` function that are used to derive :class:`TextNode` respectively :class:`TagNode` instances from. """ if not node: return () queue: Sequence[NodeSource] if (last_child := self.last_child) is None: last_child, queue = self._prepare_new_relative(node, clone=clone) self.__add_first_child(last_child) result: tuple[NodeBase, ...] = (last_child,) else: queue = node result = () if queue: result += last_child.add_following_siblings(*queue, clone=clone) return result
@property def attributes(self) -> TagAttributes: """ A :term:`mapping` that can be used to access the node's attributes. :meta category: Node content properties >>> node = new_tag_node("node", attributes={"foo": "0", "bar": "0"}) >>> node.attributes {'foo': '0', 'bar': '0'} >>> node.attributes.pop("bar") # doctest: +ELLIPSIS <Attribute(bar="0") [0x...]> >>> node.attributes["foo"] = "1" >>> node.attributes["peng"] = "1" >>> print(node) <node foo="1" peng="1"/> >>> node.attributes.update({"foo": "2", "zong": "2"}) >>> print(node) <node foo="2" peng="1" zong="2"/> Namespaced names are accessed with two-value tuples or a string. The two-value holds the namespace and the local name in that order. A string can either be a fully qualified name in `Clark notation`_ or a local name that belongs to the containing node's namespace. .. _Clark notation: http://www.jclark.com/xml/xmlns.htm >>> DefaultStringOptions.namespaces = {None: "http://namespace"} >>> node = new_tag_node( ... "node", ... namespace="http://namespace", ... ) >>> node.attributes.update({("http://namespace", "foo"): "0"}) >>> print(node) <node xmlns="http://namespace" foo="0"/> >>> attribute = node.attributes[("http://namespace", "foo")] >>> node.attributes["foo"] is attribute True >>> node.attributes["{http://namespace}foo"] is attribute True Attributes behave like strings, but also expose namespace, local name and value for manipulation. >>> node = new_tag_node("node") >>> node.attributes["foo"] = "0" >>> node.attributes["foo"].local_name = "bar" >>> node.attributes["bar"].namespace = "http://namespace" >>> node.attributes[("http://namespace", "bar")].value = "X" >>> print(node) <node xmlns:ns0="http://namespace" ns0:bar="X"/> >>> "ref-" + node.attributes[("http://namespace", "bar")].lower() 'ref-x' """ return self._attributes
[docs] @altered_default_filters() def clone( self, deep: bool = False, quick_and_unsafe: bool | None = None ) -> TagNode: if quick_and_unsafe is not None: warnings.warn( "The `quick_and_unsafe` argument will be removed.", category=DeprecationWarning, ) if deep and quick_and_unsafe: result = _wrapper_cache(deepcopy(self._etree_obj)) assert isinstance(result, TagNode) result._etree_obj.tail = None return result return new_tag_node( local_name=self.local_name, attributes=self.attributes, namespace=self._namespace, children=( [n.clone(deep=True) for n in self.iterate_children()] if deep else () ), )
[docs] def css_select( self, expression: str, namespaces: Optional[NamespaceDeclarations] = None ) -> QueryResults: """ Queries the tree with a CSS selector expression with this node as initial context node. :param expression: A CSS selector expression. :param namespaces: A mapping of prefixes that are used in the expression to namespaces. If omitted, the node's definition is used. :return: All nodes that match the evaluation of the provided CSS selector expression. :meta category: Methods to query the tree See :doc:`/api/querying` regarding the extent of the supported grammar. Namespace prefixes are delimited with a ``|`` before a name test, for example ``div svg|metadata`` selects all descendants of ``div`` named nodes that belong to the default namespace or have no namespace and whose name is ``metadata`` and have a namespace that is mapped to the ``svg`` prefix. """ return self.xpath(expression=_css_to_xpath(expression), namespaces=namespaces)
@property def depth(self) -> int: result = 0 node = self while node.parent: node = node.parent result += 1 return result
[docs] @altered_default_filters() def detach(self, retain_child_nodes: bool = False) -> _ElementWrappingNode: parent = self.parent index = self.index if self.__document__ is not None: raise InvalidOperation("The root node of a document cannot be detached.") if retain_child_nodes and parent is None: raise InvalidOperation( "Child nodes can't be retained when the node to detach has no parent " "node." ) super().detach() with warnings.catch_warnings(): warnings.simplefilter("ignore", category=DeprecationWarning) parent_has_default_namespace = False if parent is None: parent_namespaces = None else: parent_namespaces = parent.namespaces if None in parent_namespaces: parent_has_default_namespace = True if not (parent_has_default_namespace or retain_child_nodes): return self child_nodes = tuple(self.iterate_children()) for child_node in child_nodes: child_node.detach() # workaround to keep a default namespace: if parent_has_default_namespace: _wrapper_cache.wrappers.pop(self._etree_obj) assert isinstance(parent, TagNode) self._etree_obj = parent._etree_obj.makeelement( etree.QName(self._etree_obj), attrib=dict(self._etree_obj.attrib), # type: ignore # TODO https://github.com/lxml/lxml-stubs/issues/62 nsmap=parent_namespaces, ) self._attributes._etree_attrib = self._etree_obj.attrib with warnings.catch_warnings(): warnings.simplefilter("ignore", category=DeprecationWarning) self._attributes.namespaces = self.namespaces _wrapper_cache.wrappers[self._etree_obj] = self if retain_child_nodes: if child_nodes: assert isinstance(parent, TagNode) assert isinstance(index, int) parent.insert_children(index, *child_nodes) else: self.append_children(*child_nodes) return self
@property def document(self) -> Optional[Document]: if self.parent is None: root_node = self else: root_node = cast("TagNode", last(self.iterate_ancestors())) return root_node.__document__
[docs] def fetch_or_create_by_xpath( self, expression: str, namespaces: Optional[NamespaceDeclarations] = None, ) -> TagNode: """ Fetches a single node that is locatable by the provided XPath expression. If the node doesn't exist, the non-existing branch will be created. :param expression: An XPath expression that can unambiguously locate a descending node in a tree that has any state. :param namespaces: An optional mapping of prefixes to namespaces. The declarations that were used in a document's source serialisat serve as fallback. :return: The existing or freshly created node descibed with ``expression``. :meta category: Methods to query the tree These rules are imperative in your endeavour: - All location steps must use the child axis. - Each step needs to provide a name test. - Attribute comparisons against literals are the only allowed predicates. - Multiple attribute comparisons must be joined with the `and` operator and / or contained in more than one predicate expression. - The logical validity of multiple attribute comparisons isn't checked. E.g. one could provide ``foo[@p="her"][@p="him"]``, but expect an undefined behaviour. >>> root = Document("<root/>").root >>> grandchild = root.fetch_or_create_by_xpath( ... "child[@a='b']/grandchild" ... ) >>> grandchild is root.fetch_or_create_by_xpath( ... "child[@a='b']/grandchild" ... ) True >>> str(root) '<root><child a="b"><grandchild/></child></root>' """ ast = parse_xpath(expression) if not ast._is_unambiguously_locatable: raise ValueError( "The XPath expression doesn't determine a distinct branch." ) query_result = self.xpath(expression, namespaces=namespaces) if query_result.size == 1: result = query_result.first assert isinstance(result, TagNode) return result if query_result: raise AmbiguousTreeError( f"The tree already contains {query_result.size} matching branches." ) return self._create_by_xpath( ast, namespaces=Namespaces(namespaces or {}, fallback=self.namespaces) )
def _create_by_xpath( self, ast: XPathExpression, namespaces: Namespaces, ) -> TagNode: node: _DocumentNode | TagNode if ast.location_paths[0].absolute: node = self while node.parent is not None: node = node.parent node = _DocumentNode(node) else: node = self for i, step in enumerate(ast.location_paths[0].location_steps): candidates = tuple(step.evaluate(node_set=(node,), namespaces=namespaces)) if len(candidates) == 0: assert isinstance(node, TagNode) node_test = step.node_test assert isinstance(node_test, NameMatchTest) prefix = node_test.prefix if prefix is None: namespace = namespaces.get(None) else: namespace = namespaces[prefix] new_node = new_tag_node( local_name=node_test.local_name, attributes=None, namespace=namespace, ) for prefix, local_name, value in step._derived_attributes: if prefix is None and None not in namespaces: new_node.attributes[local_name] = value else: new_node.attributes[(namespaces[prefix], local_name)] = value node.append_children(new_node) node = new_node elif len(candidates) == 1: node = cast("TagNode", candidates[0]) else: raise AmbiguousTreeError( f"The tree has multiple possible branches at location step {i}." ) assert isinstance(node, TagNode) return node @property def first_child(self) -> Optional[NodeBase]: for result in self.iterate_children(): return result return None @property def full_text(self) -> str: return "".join( n.content for n in self.iterate_descendants() if isinstance(n, TextNode) ) def _get_normalize_space_directive( self, default: Literal["default", "preserve"] = "default" ) -> Literal["default", "preserve"]: attribute_value = self.attributes.get((XML_NAMESPACE, "space")) if attribute_value is None: return default if attribute_value in ("default", "preserve"): return attribute_value warnings.warn( "Encountered and ignoring an invalid `xml:space` attribute: " + attribute_value, category=UserWarning, ) return default @property def id(self) -> Optional[str]: """ This is a shortcut to retrieve and set the ``id`` attribute in the XML namespace. The client code is responsible to pass properly formed id names. :meta category: Node content properties """ return self.attributes.get((XML_NAMESPACE, "id")) @id.setter def id(self, value: Optional[str]): if value is None: del self.attributes[(XML_NAMESPACE, "id")] elif isinstance(value, str): root = cast("TagNode", last(self.iterate_ancestors())) or self if root._etree_obj.xpath(f"descendant-or-self::*[@xml:id='{value}']"): raise ValueError( "An xml:id-attribute with that value is already assigned in the " "tree." ) self.attributes[(XML_NAMESPACE, "id")] = value else: raise TypeError("Value must be None or a string.") @property def index(self) -> Optional[int]: if self.parent is None: return None return super().index
[docs] def insert_children( self, index: int, *node: NodeSource, clone: bool = False ) -> tuple[NodeBase, ...]: """ Inserts one or more child nodes. :param index: The index at which the first of the given nodes will be inserted, the remaining nodes are added afterwards in the given order. :param node: The node(s) to be added. :param clone: Clones the concrete nodes before adding if :obj:`True`. :return: The concrete nodes that were inserted. :meta category: Methods to add nodes to a tree The nodes can be concrete instances of any node type or rather abstract descriptions in the form of strings or objects returned from the :func:`tag` function that are used to derive :class:`TextNode` respectively :class:`TagNode` instances from. """ if index < 0: raise ValueError("Index must be zero or a positive integer.") children_count = len(self) if index > children_count: raise IndexError("The given index is beyond the target's size.") this, *queue = node result: tuple[NodeBase, ...] if index == 0: if children_count: result = self[0].add_preceding_siblings(this, clone=clone) else: result = (self._prepare_new_relative((this,), clone=clone)[0],) self.__add_first_child(result[0]) else: result = self[index - 1].add_following_siblings(this, clone=clone) if queue: result += self[index].add_following_siblings(*queue, clone=clone) return result
[docs] def iterate_children(self, *filter: Filter) -> Iterator[NodeBase]: all_filters = default_filters[-1] + filter candidate: NodeBase | None assert isinstance(self._data_node, TextNode) if self._data_node._exists: candidate = self._data_node elif len(self._etree_obj): candidate = _wrapper_cache(self._etree_obj[0]) else: candidate = None while candidate is not None: if all(f(candidate) for f in all_filters): yield candidate candidate = candidate._fetch_following_sibling()
[docs] def iterate_descendants(self, *filter: Filter) -> Iterator[NodeBase]: all_filters = default_filters[-1] + filter with altered_default_filters(): candidate = self.first_child if candidate is None: return next_candidates: list[NodeBase | None] = [] while candidate is not None: if all(f(candidate) for f in all_filters): yield candidate if isinstance(candidate, TagNode): next_candidates.append(candidate._fetch_following_sibling()) candidate = candidate.first_child else: candidate = candidate._fetch_following_sibling() while candidate is None and next_candidates: candidate = next_candidates.pop()
@property def last_child(self) -> Optional[NodeBase]: result = None for result in self.iterate_children(): pass return result @property def last_descendant(self) -> Optional[NodeBase]: node = self.last_child while node is not None: candidate = node.last_child if candidate is None: break node = candidate return node @property def local_name(self) -> str: """ The node's name. :meta category: Node properties """ return cast("str", QName(self._etree_obj).localname) @local_name.setter def local_name(self, value: str): self._etree_obj.tag = QName(self._namespace or None, value).text @property def location_path(self) -> str: """ An unambiguous XPath location path that points to this node from its tree root. :meta category: Node properties """ if self.parent is None: return "/*" with altered_default_filters(is_tag_node): steps = list(self.iterate_ancestors()) steps.pop() # root steps.reverse() steps.append(self) return "/*" + "".join(f"/*[{cast('int', n.index)+1}]" for n in steps)
[docs] @altered_default_filters() def merge_text_nodes(self): """ Merges all consecutive text nodes in the subtree into one. Text nodes without content are dropped. """ with _wrapper_cache: empty_nodes: list[TextNode] = [] for node in self.iterate_descendants(): if not isinstance(node, TextNode): continue node._merge_appended_text_nodes() if not node.content: empty_nodes.append(node) for node in empty_nodes: node.detach()
@property def namespace(self) -> Optional[str]: """ The node's namespace. :meta category: Node properties """ if result := QName(self._etree_obj.tag).namespace: return result else: # pragma: nocover warnings.warn( "With the delb 0.6 this will return an empty string to represent an " "empty/null namespace. You can use the the intermediate " "`TagNode._namespace` with that future behaviour.", category=DeprecationWarning, ) return result @namespace.setter def namespace(self, value: Optional[str]): self._etree_obj.tag = QName(value or None, self.local_name).text @property def _namespace(self) -> str: return QName(self._etree_obj.tag).namespace or ""
[docs] def new_tag_node( self, local_name: str, attributes: Optional[dict[AttributeAccessor, str]] = None, namespace: Optional[str] = None, children: Sequence[str | NodeBase | _TagDefinition] = (), ) -> TagNode: warnings.warn( "The node methods `new_tag_node` will be removed. Use :func:`new_tag_node`" "instead.", category=DeprecationWarning, ) return self._new_tag_node_from( self._etree_obj, local_name, attributes, namespace, children )
def _new_tag_node_from_definition(self, definition: _TagDefinition) -> TagNode: return self._new_tag_node_from( context=self._etree_obj, local_name=definition.local_name, attributes=definition.attributes, namespace=self._namespace, children=definition.children, )
[docs] @staticmethod def parse( text: AnyStr, parser_options: Optional[ParserOptions] = None, ) -> TagNode: """ Parses the given string or bytes sequence into a new tree. :param text: A serialized XML tree. :param parser_options: A :class:`delb.ParserOptions` class to configure the used parser. """ warnings.warn( "This method will be replaced by another interface in a future version.", category=DeprecationWarning, ) parser_options = parser_options or ParserOptions() parser = parser_options._make_parser() assert isinstance(parser, etree.XMLParser) result = _wrapper_cache(etree.fromstring(text, parser=parser)) assert isinstance(result, TagNode) if parser_options.reduce_whitespace: result._reduce_whitespace() return result
@property def prefix(self) -> Optional[str]: # pragma: nocover """ The prefix that the node's namespace is currently mapped to. :meta category: Node properties """ warnings.warn("This attribute will be removed.", category=DeprecationWarning) target = QName(self._etree_obj).namespace if target is None: return None for prefix, namespace in self._etree_obj.nsmap.items(): assert isinstance(prefix, str) or prefix is None if namespace == target: return prefix raise InvalidCodePath
[docs] def prepend_children( self, *node: NodeBase, clone: bool = False ) -> tuple[NodeBase, ...]: """ Adds one or more nodes as child nodes before any existing to the child nodes of the node this method is called on. :param node: The node(s) to be added. :param clone: Clones the concrete nodes before adding if :obj:`True`. :return: The concrete nodes that were prepended. :meta category: Methods to add nodes to a tree The nodes can be concrete instances of any node type or rather abstract descriptions in the form of strings or objects returned from the :func:`tag` function that are used to derive :class:`TextNode` respectively :class:`TagNode` instances from. """ return self.insert_children(0, *node, clone=clone)
@altered_default_filters() def _reduce_whitespace( self, normalize_space: Literal["default", "preserve"] = "default" ): with _wrapper_cache: self._reduce_whitespace_of_descendants(normalize_space) def _reduce_whitespace_of_descendants( self, normalize_space: Literal["default", "preserve"] ): child_nodes = list(self.iterate_children()) if not child_nodes: return child_node: NodeBase for child_node in [ n for n in child_nodes if isinstance(n, TextNode) and n.content == "" ]: child_nodes.remove(child_node) child_node.detach() if not child_nodes: return normalize_space = self._get_normalize_space_directive(normalize_space) for child_node in (n for n in child_nodes if isinstance(n, TagNode)): child_node._reduce_whitespace_of_descendants(normalize_space) if normalize_space == "preserve": return for child_node in (n for n in child_nodes if isinstance(n, TextNode)): if reduced_content := self._reduce_whitespace_content( child_node.content, child_node is child_nodes[0], child_node is child_nodes[-1], ): child_node.content = reduced_content else: child_node.detach() def _reduce_whitespace_content( self, content: str, is_first: bool, is_last: bool ) -> str: collapsed = _crunch_whitespace(content) collapsed_and_stripped = collapsed.strip() has_non_whitespace_content = bool(collapsed_and_stripped) has_trailing_whitespace = collapsed.endswith(" ") # 1 Retain one leading space # if the node isn't first, has non-space content, and has leading space. if not is_first and has_non_whitespace_content and collapsed.startswith(" "): result = f" {collapsed_and_stripped}" else: result = collapsed_and_stripped # Retain one trailing space if ( # 2 … if the node isn't last, isn't first, and has trailing space. (not (is_last or is_first) and has_trailing_whitespace) or # 3 … if the node isn't last, is first, has trailing space, and has # non-space content. ( not is_last and is_first and has_trailing_whitespace and has_non_whitespace_content ) or # 4 … if the node is an only child and only has space content. (is_first and is_last and not has_non_whitespace_content) ): result += " " return result
[docs] @altered_default_filters() def serialize( self, *, format_options: Optional[FormatOptions] = None, namespaces: Optional[NamespaceDeclarations] = None, newline: Optional[str] = None, ): serializer = _get_serializer( _StringWriter(newline=newline), format_options=format_options, namespaces=namespaces, ) with _wrapper_cache: serializer.serialize_root(self) return serializer.writer.result
@property def universal_name(self) -> str: """ The node's qualified name in `Clark notation`_. :meta category: Node properties .. _Clark notation: http://www.jclark.com/xml/xmlns.htm """ return cast("str", self._etree_obj.tag) def _validate_sibling_operation(self, node): if self.parent is None and not ( isinstance(node, (CommentNode, ProcessingInstructionNode)) and ( isinstance(self, (CommentNode, ProcessingInstructionNode)) or self.__document__ is not None ) ): raise TypeError( "Not all node types can be added as siblings to a root node." ) # REMOVE eventually
[docs] @altered_default_filters() def xpath( self, expression: str, namespaces: Optional[NamespaceDeclarations] = None, ) -> QueryResults: result = super().xpath(expression=expression, namespaces=namespaces) if __debug__ and all(isinstance(n, TagNode) for n in result): try: etree_result = self._etree_xpath(expression, namespaces=namespaces) except NotImplementedError: # might stem from flaws in LegacyXPathExpression pass else: assert result == etree_result, ( "Please report that the native XPath evaluator seems faulty with " f"the expression `{expression}` at " "https://github.com/delb-xml/delb-py/issues" ) return result
# REMOVE eventually def _etree_xpath( self, expression: str, namespaces: Optional[NamespaceDeclarations] = None ) -> QueryResults: etree_obj = self._etree_obj if namespaces is None: namespaces = etree_obj.nsmap compat_namespaces: etree._DictAnyStr xpath_expression = LegacyXPathExpression(expression) if None in namespaces: has_default_namespace = True prefix = _random_unused_prefix(namespaces) compat_namespaces = cast( "etree._DictAnyStr", { **{k: v for k, v in namespaces.items() if k is not None}, prefix: namespaces[None], }, ) else: has_default_namespace = False prefix = "" compat_namespaces = cast("etree._DictAnyStr", namespaces) for location_path in xpath_expression.location_paths: if has_default_namespace: for location_step in location_path.location_steps: node_test = location_step.node_test if node_test.type != "name_test": continue if ":" not in node_test.data: node_test.data = prefix + ":" + node_test.data if str(xpath_expression) != expression: raise NotImplementedError _results = etree_obj.xpath( str(xpath_expression), namespaces=compat_namespaces, # type: ignore ) assert isinstance(_results, Iterable) return QueryResults( (_wrapper_cache(cast("_Element", element)) for element in _results) )
[docs] class TextNode(_ChildLessNode, NodeBase, _StringMixin): # type: ignore """ TextNodes contain the textual data of a document. The class shall not be initialized by client code, just throw strings into the trees. Instances expose all methods of :class:`str` except :meth:`str.index`: >>> node = TextNode("Show us the way to the next whisky bar.") >>> node.split() ['Show', 'us', 'the', 'way', 'to', 'the', 'next', 'whisky', 'bar.'] Instances can be tested for inequality with other text nodes and strings: >>> TextNode("ham") == TextNode("spam") False >>> TextNode("Patsy") == "Patsy" True And they can be tested for substrings: >>> "Sir" in TextNode("Sir Bedevere the Wise") True Attributes that rely to child nodes yield nothing respectively :obj:`None`. """ __slots__ = ("_appended_text_node", "_bound_to", "__content", "_position") def __init__( self, reference_or_text: _Element | str | TextNode, position: int = DETACHED, ): self._bound_to: _Element | TextNode | None self.__content: str | None self._appended_text_node: TextNode | None = None self._position: int = position if position is DETACHED: assert isinstance(reference_or_text, str) self._bound_to = None self.__content = reference_or_text elif position in (DATA, TAIL): assert isinstance(reference_or_text, _Element) self._bound_to = reference_or_text self.__content = None else: raise ValueError def __eq__(self, other): if isinstance(other, TextNode): return self.content == other.content else: return super().__eq__(other) def __getitem__(self, item): return self.content[item] def __repr__(self): if self._exists: return ( f'<{self.__class__.__name__}(text="{self.content}", ' f"pos={self._position}) [{hex(id(self))}]>" ) else: return ( f"<{self.__class__.__name__}(pos={self._position}) [{hex(id(self))}]>" ) def _add_following_sibling(self, node: NodeBase): if isinstance(node, TextNode): self._insert_text_node_as_next_appended(node) elif isinstance(node, _ElementWrappingNode): self._add_next_element_wrapping_node(node) def _add_next_element_wrapping_node(self, node: _ElementWrappingNode): if self._position is DATA: assert isinstance(self._bound_to, _Element) appended_text_node = self._appended_text_node self._bound_to.insert(0, node._etree_obj) if appended_text_node is not None: self._appended_text_node = None appended_text_node._bind_to_tail(node) elif self._position is TAIL: assert isinstance(self._bound_to, _Element) data = self._bound_to.tail text_sibling = self._appended_text_node self._appended_text_node = None assert not node._tail_node._exists self._bound_to.addnext(node._etree_obj) self._bound_to.tail = data node._etree_obj.tail = None assert not node._tail_node._exists, repr(node._tail_node) if text_sibling is not None: text_sibling._bind_to_tail(node) elif self._position is APPENDED: appended_text_node = self._appended_text_node self._appended_text_node = None head = self._tail_sequence_head assert head._position in (DATA, TAIL) if head._position is DATA: assert isinstance(head._bound_to, _Element) head._bound_to.insert(0, node._etree_obj) elif head._position is TAIL: head_anchor = head._bound_to assert isinstance(head_anchor, _Element) head_content = head.content head_anchor.addnext(node._etree_obj) head_anchor.tail = head_content node._etree_obj.tail = None if appended_text_node is not None: appended_text_node._bind_to_tail(node) def _add_preceding_sibling(self, node: NodeBase): if isinstance(node, TextNode): self._prepend_text_node(node) elif isinstance(node, _ElementWrappingNode): if self._position is DATA: content = self.content current_bound = self._bound_to assert isinstance(current_bound, _Element) current_bound.insert(0, node._etree_obj) current_bound_wrapper = _wrapper_cache(current_bound) assert isinstance(current_bound_wrapper, TagNode) current_bound_wrapper._data_node = TextNode(current_bound, DATA) self._bind_to_tail(node) current_bound.text = None self.content = content elif self._position is TAIL: assert isinstance(self._bound_to, _Element) _wrapper_cache(self._bound_to)._add_following_sibling(node) elif self._position is APPENDED: assert isinstance(self._bound_to, TextNode) self._bound_to._add_following_sibling(node) def _bind_to_data(self, target: TagNode): target._etree_obj.text = self.content target._data_node = self self._bound_to = target._etree_obj self._position = DATA self.__content = None assert isinstance(self.content, str) def _bind_to_tail(self, target: _ElementWrappingNode): assert isinstance(target, _ElementWrappingNode) target._etree_obj.tail = self.content target._tail_node = self self._bound_to = target._etree_obj self._position = TAIL self.__content = None assert isinstance(self.content, str)
[docs] def clone( self, deep: bool = False, quick_and_unsafe: bool | None = None ) -> NodeBase: if quick_and_unsafe is not None: warnings.warn( "The `quick_and_unsafe` argument will be removed.", category=DeprecationWarning, ) assert self.content is not None return self.__class__(self.content)
@property def content(self) -> str: """ The node's text content. :meta category: Node content properties """ if self._position is DATA: assert isinstance(self._bound_to, _Element) return cast("str", self._bound_to.text) elif self._position is TAIL: assert isinstance(self._bound_to, _Element) return cast("str", self._bound_to.tail) elif self._position in (APPENDED, DETACHED): assert self._bound_to is None or isinstance(self._bound_to, TextNode) return cast("str", self.__content) else: raise ValueError( f"A TextNode._position must not be set to {self._position}" ) @content.setter def content(self, text: str): if not isinstance(text, str): raise TypeError if self._position is DATA: assert isinstance(self._bound_to, _Element) self._bound_to.text = text or None elif self._position is TAIL: assert isinstance(self._bound_to, _Element) self._bound_to.tail = text or None elif self._position in (APPENDED, DETACHED): assert self._bound_to is None or isinstance(self._bound_to, TextNode) self.__content = text # for utils._StringMixin: _data = content @property def depth(self) -> int: if self._position is DETACHED: return 0 return super().depth
[docs] def detach(self, retain_child_nodes: bool = False) -> TextNode: if self._position is DETACHED: return self content = self.content text_sibling = self._appended_text_node if self._position is DATA: current_parent = self.parent assert current_parent is not None assert isinstance(self._bound_to, _Element) assert current_parent._etree_obj is self._bound_to if text_sibling: text_sibling._bind_to_data(current_parent) else: current_parent._data_node = TextNode(current_parent._etree_obj, DATA) assert self not in current_parent self._bound_to.text = None assert not current_parent._data_node._exists elif self._position is TAIL: current_bound = self._bound_to assert isinstance(current_bound, _Element) current_previous = _wrapper_cache(current_bound) if text_sibling: text_sibling._bind_to_tail(current_previous) else: current_previous._tail_node = TextNode( current_previous._etree_obj, TAIL ) current_bound.tail = None assert not current_previous._tail_node._exists elif self._position is APPENDED: assert isinstance(self._bound_to, TextNode) self._bound_to._appended_text_node = text_sibling if text_sibling: text_sibling._bound_to = self._bound_to else: raise ValueError( f"A TextNode._position must not be set to {self._position}" ) self._appended_text_node = None self._bound_to = None self._position = DETACHED self.content = content or "" assert self.parent is None assert self._fetch_following_sibling() is None return self
@property def _exists(self) -> bool: if self._position is DATA: assert isinstance(self._bound_to, _Element) return self._bound_to.text is not None elif self._position is TAIL: assert isinstance(self._bound_to, _Element) return self._bound_to.tail is not None else: return True def _fetch_following_sibling(self) -> Optional[NodeBase]: if self._position is DETACHED: return None if self._appended_text_node: return self._appended_text_node elif self._position is DATA: assert isinstance(self._bound_to, _Element) if len(self._bound_to): return _wrapper_cache(self._bound_to[0]) else: return None elif self._position is TAIL: return self.__next_candidate_of_tail() elif self._position is APPENDED: # and last in tail sequence return self.__next_candidate_of_last_appended() raise InvalidCodePath
[docs] def fetch_preceding_sibling(self, *filter: Filter) -> Optional[NodeBase]: candidate: NodeBase | None if self._position in (DATA, DETACHED): return None elif self._position is TAIL: assert isinstance(self._bound_to, _Element) candidate = _wrapper_cache(self._bound_to) elif self._position is APPENDED: assert isinstance(self._bound_to, TextNode) candidate = self._bound_to else: raise ValueError( f"A TextNode._position must not be set to {self._position}" ) if candidate is None: return None if all(f(candidate) for f in chain(default_filters[-1], filter)): return candidate else: return candidate.fetch_preceding_sibling(*filter)
@property def full_text(self) -> str: return self.content or "" def _merge_appended_text_nodes(self): sibling = self._appended_text_node if sibling is None: return current_node, appendix = sibling, "" while current_node is not None: assert isinstance(current_node.content, str) appendix += current_node.content current_node = current_node._appended_text_node self.content += appendix self._appended_text_node = None sibling._bound_to = None def _insert_text_node_as_next_appended(self, node: TextNode): old = self._appended_text_node content = node.content node._bound_to = self node._position = APPENDED node.content = content self._appended_text_node = node if old: assert old._position is APPENDED, old node._insert_text_node_as_next_appended(old) assert isinstance(self.content, str) assert isinstance(node.content, str) @property def namespaces(self): if self.parent: return self.parent.namespaces else: raise InvalidOperation("A lonely text node has no namespace context.") def __next_candidate_of_last_appended(self) -> Optional[NodeBase]: head = self._tail_sequence_head assert isinstance(head._bound_to, _Element) if head._position is DATA: assert head.parent is not None if len(head.parent._etree_obj): return _wrapper_cache(head.parent._etree_obj[0]) else: return None elif head._position is TAIL: next_etree_element = head._bound_to.getnext() if next_etree_element is None: return None else: return _wrapper_cache(next_etree_element) raise InvalidCodePath def __next_candidate_of_tail(self) -> Optional[NodeBase]: assert isinstance(self._bound_to, _Element) next_etree_node = self._bound_to.getnext() if next_etree_node is None: return None return _wrapper_cache(next_etree_node) @property def parent(self) -> Optional[TagNode]: if self._position is DATA: assert isinstance(self._bound_to, _Element) result = _wrapper_cache(self._bound_to) assert isinstance(result, TagNode) return result elif self._position is TAIL: assert isinstance(self._bound_to, _Element) return _wrapper_cache(self._bound_to).parent elif self._position is APPENDED: assert isinstance(self._bound_to, TextNode) return self._tail_sequence_head.parent elif self._position is DETACHED: assert self._bound_to is None return None raise ValueError(f"A TextNode._position must not be set to {self._position}") def _prepend_text_node(self, node: TextNode): if self._position is DATA: assert isinstance(self._bound_to, _Element) parent = _wrapper_cache(self._bound_to) assert isinstance(parent, TagNode) content = self.content node._bind_to_data(parent) node._insert_text_node_as_next_appended(self) self.content = content elif self._position is TAIL: assert isinstance(self._bound_to, _Element) left_sibling = _wrapper_cache(self._bound_to) content = self.content node._bind_to_tail(left_sibling) node._insert_text_node_as_next_appended(self) self.content = content elif self._position is APPENDED: assert node._appended_text_node is None previous = self._bound_to assert isinstance(previous, TextNode) previous._appended_text_node = None previous._insert_text_node_as_next_appended(node) node._insert_text_node_as_next_appended(self) else: raise ValueError( f"A TextNode._position must not be set to {self._position}" ) @property def _tail_sequence_head(self) -> TextNode: if self._position in (DATA, TAIL): return self elif self._position is APPENDED: assert isinstance(self._bound_to, TextNode) return self._bound_to._tail_sequence_head else: raise InvalidCodePath
# contributed node filters and filter wrappers
[docs] def any_of(*filter: Filter) -> Filter: """ A node filter wrapper that matches when any of the given filters is matching, like a boolean ``or``. """ def any_of_wrapper(node: NodeBase) -> bool: return any(x(node) for x in filter) return any_of_wrapper
[docs] def is_comment_node(node: NodeBase) -> bool: """ A node filter that matches :class:`CommentNode` instances. """ return isinstance(node, CommentNode)
[docs] def is_processing_instruction_node(node: NodeBase) -> bool: """ A node filter that matches :class:`ProcessingInstructionNode` instances. """ return isinstance(node, ProcessingInstructionNode)
def is_root_node(node: NodeBase) -> bool: """ A node filter that matches root nodes. """ return node.parent is None
[docs] def is_tag_node(node: NodeBase) -> bool: """ A node filter that matches :class:`TagNode` instances. """ return isinstance(node, TagNode)
[docs] def is_text_node(node: NodeBase) -> bool: """ A node filter that matches :class:`TextNode` instances. """ return isinstance(node, TextNode)
[docs] def not_(*filter: Filter) -> Filter: """ A node filter wrapper that matches when the given filter is not matching, like a boolean ``not``. """ def not_wrapper(node: NodeBase) -> bool: return not all(f(node) for f in filter) return not_wrapper
# serializer def _get_serializer( writer: _SerializationWriter, format_options: Optional[FormatOptions], namespaces: Optional[NamespaceDeclarations], ) -> Serializer: if format_options is None: return Serializer( writer=writer, namespaces=namespaces, ) if format_options.indentation and not format_options.indentation.isspace(): raise ValueError("Invalid indentation characters.") if format_options.width: return TextWrappingSerializer( writer=writer, format_options=format_options, namespaces=namespaces, ) else: return PrettySerializer( writer=writer, format_options=format_options, namespaces=namespaces, ) class Serializer: __slots__ = ( "_namespaces", "_prefixes", "writer", ) def __init__( self, writer: _SerializationWriter, *, namespaces: Optional[NamespaceDeclarations] = None, ): self._namespaces = ( Namespaces({}) if namespaces is None else Namespaces(namespaces) ) self._prefixes: dict[str, str] = {} self.writer = writer def _collect_prefixes(self, root: TagNode): # noqa: C901 REMOVE eventually for node in traverse_bf_ltr_ttb(root, is_tag_node): assert isinstance(node, TagNode) for namespace in {node._namespace} | { a._namespace for a in node.attributes.values() }: if namespace in self._prefixes: continue prefix: str | None if not namespace: for other_namespace, prefix in self._prefixes.items(): if prefix == "": # hence there's a default namespace in use, it needs to use # a prefix though as an empty namespace can't be declared assert other_namespace self._new_namespace_declaration(other_namespace) break self._prefixes[""] = "" continue # TODO unlike lxml, delb will probably not keep the individual # namespace declarations per node in its representation. # that should render a lot of the following unnecessary # with the lxml backend, a node retains prefix declarations from the # parsed source with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) self._namespaces._fallback = node.namespaces try: prefix = self._namespaces.lookup_prefix(namespace) except KeyError: assert isinstance(namespace, str) self._new_namespace_declaration(namespace) continue if prefix is not None: # that found prefix still needs a colon for faster serialisat # composition later self._prefixes[namespace] = f"{prefix}:" else: # now we need to come up with a new prefix if "" not in self._prefixes.values(): # be bold and claim to be the default self._prefixes[namespace] = "" else: assert isinstance(namespace, str) self._new_namespace_declaration(namespace) def _generate_attributes_data(self, node: TagNode) -> dict[str, str]: data = {} for attribute in (node.attributes[a] for a in sorted(node.attributes)): assert isinstance(attribute, Attribute) data[self._prefixes[attribute._namespace] + attribute.local_name] = ( f'"{attribute.value.translate(CCE_TABLE_FOR_ATTRIBUTES)}"' ) return data def _handle_child_nodes(self, child_nodes: tuple[NodeBase, ...]): for child_node in child_nodes: self.serialize_node(child_node) def _new_namespace_declaration(self, namespace: str): for i in range(2**16): prefix = f"ns{i}:" if prefix not in self._prefixes.values(): self._prefixes[namespace] = prefix return else: raise NotImplementedError("Just don't.") def _serialize_attributes(self, attributes_data): for key, value in attributes_data.items(): self.writer(f" {key}={value}") def serialize_node(self, node: NodeBase): if isinstance(node, (CommentNode, ProcessingInstructionNode)): self.writer(str(node)) elif isinstance(node, TagNode): self._serialize_tag( node, attributes_data=self._generate_attributes_data(node), ) elif isinstance(node, TextNode) and node.content: self.writer(node.content.translate(CCE_TABLE_FOR_TEXT)) else: raise InvalidCodePath def serialize_root(self, root: TagNode): self._collect_prefixes(root) attributes_data = {} declarations = {p: "" if n is None else n for n, p in self._prefixes.items()} if "" in declarations: default_namespace = declarations.pop("") if default_namespace: attributes_data["xmlns"] = f'"{default_namespace}"' # else it would be an unprefixed, empty namespace for prefix in sorted(p for p in declarations if p[:-1] not in GLOBAL_PREFIXES): assert len(prefix) >= 2 # at least a colon and one letter attributes_data[f"xmlns:{prefix[:-1]}"] = f'"{declarations[prefix]}"' attributes_data.update(self._generate_attributes_data(root)) self._serialize_tag(root, attributes_data=attributes_data) def _serialize_tag( self, node: TagNode, attributes_data: dict[str, str], ): child_nodes = tuple(node.iterate_children()) prefixed_name = self._prefixes[node._namespace] + node.local_name self.writer(f"<{prefixed_name}") if attributes_data: self._serialize_attributes(attributes_data) if child_nodes: self.writer(">") self._handle_child_nodes(child_nodes) self.writer(f"</{prefixed_name}>") else: self.writer("/>") class _LineFittingSerializer(Serializer): __slots__ = ("space",) def __init__( self, writer: _SerializationWriter, *, namespaces: Optional[NamespaceDeclarations] = None, ): self.writer: _LengthTrackingWriter super().__init__(writer, namespaces=namespaces) self.space: Literal["default", "preserve"] = "default" def serialize_node(self, node: NodeBase): if isinstance(node, TextNode): if node.content: if self.space == "default": content = _crunch_whitespace(node.content).translate( CCE_TABLE_FOR_TEXT ) else: content = node.content.translate(CCE_TABLE_FOR_TEXT) self.writer(content) return if isinstance(node, TagNode): space_state = self.space space = node._get_normalize_space_directive(self.space) if space != space_state: self.space = space self.writer.preserve_space = space == "default" super().serialize_node(node) if isinstance(node, TagNode) and space != space_state: self.space = space_state self.writer.preserve_space = space_state == "default" class PrettySerializer(Serializer): __slots__ = ( "_align_attributes", "indentation", "_level", "_serialization_root", "_space_preserving_serializer", "_text_wrapper", "_unwritten_text_nodes", ) def __init__( self, writer: _SerializationWriter, format_options: FormatOptions, *, namespaces: Optional[NamespaceDeclarations] = None, ): super().__init__(writer, namespaces=namespaces) self._align_attributes = format_options.align_attributes self.indentation = format_options.indentation self._level = 0 self._serialization_root: None | TagNode = None self._space_preserving_serializer = Serializer( self.writer, namespaces=self._namespaces ) self._unwritten_text_nodes: list[TextNode] = [] def _collect_prefixes(self, root: TagNode): super()._collect_prefixes(root) self._space_preserving_serializer._prefixes = self._prefixes def _handle_child_nodes(self, child_nodes: tuple[NodeBase, ...]): # newline between an opening tag and its first child if self._whitespace_is_legit_before_node(child_nodes[0]): self.writer("\n") self._level += 1 self._serialize_child_nodes(child_nodes) self._level -= 1 if self.indentation and self._whitespace_is_legit_after_node(child_nodes[-1]): # indentation before closing tag # newline was possibly written in self.serialize_node() # or self._serialize_text self.writer(self._level * self.indentation) def _normalize_text(self, text: str) -> str: return _crunch_whitespace(text).translate(CCE_TABLE_FOR_TEXT) def _serialize_attributes(self, attributes_data: dict[str, str]): if self._align_attributes and len(attributes_data) > 1: key_width = max(len(k) for k in attributes_data) for key, value in attributes_data.items(): self.writer( f"\n{self._level * self.indentation} {self.indentation}" f"{' ' * (key_width - len(key))}{key}={value}" ) if self.indentation: self.writer(f"\n{self._level * self.indentation}") else: super()._serialize_attributes(attributes_data) def _serialize_child_nodes(self, child_nodes: tuple[NodeBase, ...]): for node in child_nodes: if isinstance(node, TextNode): if node.content: self._unwritten_text_nodes.append(node) else: self.serialize_node(node) if self._unwritten_text_nodes: self._serialize_text() def serialize_node(self, node: NodeBase): if self._unwritten_text_nodes: self._serialize_text() if self.indentation and self._whitespace_is_legit_before_node(node): self.writer(self._level * self.indentation) super().serialize_node(node) if self._whitespace_is_legit_after_node(node): self.writer("\n") def serialize_root(self, root: TagNode): self._serialization_root = root super().serialize_root(root) self._serialization_root = None def _serialize_tag( self, node: TagNode, attributes_data: dict[str, str], ): if node._get_normalize_space_directive() == "preserve": self._space_preserving_serializer._serialize_tag( node=node, attributes_data=attributes_data, ) else: super()._serialize_tag( node, attributes_data=attributes_data, ) def _serialize_text(self): nodes = self._unwritten_text_nodes content = self._normalize_text("".join(n.content for n in nodes)) # a whitespace-only node can be omitted as a newline has been inserted before if content == " ": nodes.clear() return if self.indentation and self._whitespace_is_legit_before_node(nodes[0]): content = self._level * self.indentation + content.lstrip() if self._whitespace_is_legit_after_node(nodes[-1]): content = content.rstrip() + "\n" assert content self.writer(content) nodes.clear() def _whitespace_is_legit_after_node(self, node: NodeBase) -> bool: if self._serialization_root is None or node is self._serialization_root: # end of stream return False if node.parent.last_child is node: return True if isinstance(node, TextNode): return node.content[-1].isspace() following_sibling = node.fetch_following_sibling() assert following_sibling is not None if isinstance(following_sibling, TextNode): return self._whitespace_is_legit_before_node(following_sibling) else: return False def _whitespace_is_legit_before_node(self, node: NodeBase) -> bool: if self._serialization_root is None or node is self._serialization_root: # begin of stream return False if node.index == 0: return True if isinstance(node, TextNode): return node.content[0].isspace() preceding_sibling = node.fetch_preceding_sibling() assert preceding_sibling is not None if isinstance(preceding_sibling, TextNode): return self._whitespace_is_legit_after_node(preceding_sibling) else: return False class TextWrappingSerializer(PrettySerializer): __slots__ = ( "_line_fitting_serializer", "_width", ) def __init__( self, writer: _SerializationWriter, format_options: FormatOptions, *, namespaces: Optional[NamespaceDeclarations] = None, ): if format_options.width < 1: raise ValueError self.writer: _LengthTrackingWriter super().__init__( writer=_LengthTrackingWriter(writer.buffer), format_options=format_options, namespaces=namespaces, ) self._line_fitting_serializer = _LineFittingSerializer( self.writer, namespaces=self._namespaces ) self._width = format_options.width @property def _available_space(self): return max(0, self._width - self._line_offset) def _collect_prefixes(self, root: TagNode): super()._collect_prefixes(root) self._line_fitting_serializer._prefixes = self._prefixes @property def _line_offset(self) -> int: if self.writer.offset: return self.writer.offset - self._level * len(self.indentation) else: return 0 def _node_fits_remaining_line(self, node: NodeBase) -> bool: return self._required_space(node, self._available_space) is not None def _required_space(self, node: NodeBase, up_to: int) -> None | int: # counts required space for the serialisation of a node # a returned None signals that the limit up_to was hit if isinstance(node, TextNode): return self._required_space_for_text(node, up_to) if isinstance(node, (CommentNode, ProcessingInstructionNode)): length = len(str(node)) return length if length <= up_to else None assert isinstance(node, TagNode) name_length = len(node.local_name) + len(self._prefixes[node._namespace]) if len(node) == 0: used_space = 3 + name_length # <N/> else: used_space = 5 + 2 * name_length # <N>…<N/> if used_space > up_to: return None if ( attributes_space := self._required_space_for_attributes( node, up_to - used_space ) ) is None: return None used_space += attributes_space for child_node in node.iterate_children(): if ( child_space := self._required_space(child_node, up_to - used_space) ) is None: return None used_space += child_space if ( following_space := self._required_space_for_following( node, up_to - used_space ) ) is None: return None used_space += following_space return used_space def _required_space_for_attributes(self, node: TagNode, up_to: int) -> None | int: result = 0 attribute_names = tuple(node.attributes) for attribute in (node.attributes[n] for n in attribute_names): assert attribute is not None result += ( 4 # preceding space and »="…"« + len(attribute.local_name) + len(self._prefixes[attribute._namespace]) + len(attribute.value.translate(CCE_TABLE_FOR_ATTRIBUTES)) ) if result > up_to: return None return result def _required_space_for_following(self, node: TagNode, up_to: int) -> None | int: if self._whitespace_is_legit_after_node(node): return 0 if (following := node.fetch_following()) is None: return 0 if not isinstance(following, TextNode): return self._required_space(following, up_to) # indeed this doesn't consider the case where a first # whitespace would appear in one of possible subsequent text # nodes content = following.content.translate(CCE_TABLE_FOR_TEXT) if (length := content.find(" ")) == -1: length = len(content) return length if length <= up_to else None def _required_space_for_text(self, node: TextNode, up_to: int) -> None | int: content = self._normalize_text(node.content) if content.startswith(" ") and self._whitespace_is_legit_before_node(node): content = content.lstrip() if content.endswith(" ") and self._whitespace_is_legit_after_node(node): content = content.rstrip() length = len(content) return length if length <= up_to else None def _serialize_appendable_node(self, node: NodeBase): assert not isinstance(node, TextNode) if self.writer.offset == 0 and self.indentation: self.writer(self._level * self.indentation) if isinstance(node, TagNode): if node._get_normalize_space_directive() == "preserve": serializer = self._space_preserving_serializer self.writer.preserve_space = True else: serializer = self._line_fitting_serializer serializer.serialize_node(node) self.writer.preserve_space = False elif isinstance(node, (CommentNode, ProcessingInstructionNode)): self.writer(str(node)) def serialize_node(self, node: NodeBase): if self._unwritten_text_nodes: self._serialize_text() if self._node_fits_remaining_line(node): self._serialize_appendable_node(node) if ( (not self._available_space) or (node is node.parent.last_child) ) and self._whitespace_is_legit_after_node(node): self.writer("\n") return else: if self._line_offset > 0 and self._whitespace_is_legit_before_node(node): self.writer("\n") self.serialize_node(node) return if ( self.indentation and self.writer.offset == 0 and self._whitespace_is_legit_before_node(node) ): self.writer(self._level * self.indentation) if ( isinstance(node, TagNode) and node._get_normalize_space_directive() == "preserve" ): self.writer.preserve_space = True super(PrettySerializer, self).serialize_node(node) self.writer.preserve_space = False if self._whitespace_is_legit_after_node(node) and not ( ( (following := node.fetch_following()) is not None and self._node_fits_remaining_line(following) ) ): self.writer("\n") def _serialize_tag( self, node: TagNode, attributes_data: dict[str, str], ): if node is not self._serialization_root and self._node_fits_remaining_line( node ): self._serialize_appendable_node(node) else: super()._serialize_tag(node, attributes_data) def _serialize_text(self): nodes = self._unwritten_text_nodes content = self._normalize_text("".join(n.content for n in nodes)) last_node = nodes[-1] if self._available_space == len( content.rstrip() ) and self._whitespace_is_legit_after_node(last_node): # text fits perfectly self.writer(content.rstrip() + "\n") elif self._available_space > len(content): # text fits current line if self._line_offset == 0: content = self._level * self.indentation + content.lstrip() if ( (last_node is last_node.parent.last_child) or (following := last_node.fetch_following()) is not None and ( self._whitespace_is_legit_before_node(following) and self._required_space( following, self._available_space - len(content) ) is None ) ): content = content.rstrip() + "\n" self.writer(content) elif content == " ": # " " doesn't fit line self.writer("\n") else: # text doesn't fit current line self._serialize_text_over_lines(content) nodes.clear() def _serialize_text_over_lines(self, content: str): lines: list[str] = [] nodes = self._unwritten_text_nodes if self._line_offset == 0: # just a side note: this branch would also be interesting if # len(content) > self._width # noqa: E800 # and self._whitespace_is_legit_before_node(nodes[0]) # and one would know that the line starts with text content # # so for now that's why text might start after a closing tag of an # element that spans multiple lines on their last one # (i.e. this prefers: "the <hi>A</hi> letter…" over the "A" on a # separate line which the next branch produces) if self._whitespace_is_legit_before_node(nodes[0]): lines.append("") lines.extend(self._wrap_text(content.lstrip(), self._width)) else: if content.startswith(" "): filling = " " + next( self._wrap_text(content[1:], self._available_space - 1) ) else: filling = next(self._wrap_text(content, self._available_space)) if not ( len(filling) > self._available_space and self._whitespace_is_legit_before_node(nodes[0]) ): self.writer(filling) content = content[len(filling) + 1 :] if not content: return lines.append("") lines.extend(self._wrap_text(content, self._width)) if ( lines[-1].endswith(" ") and self._whitespace_is_legit_after_node(nodes[-1]) and (following_sibling := nodes[-1].fetch_following_sibling()) is not None and not self._required_space( following_sibling, self._width - len(lines[-1]) ) ): lines.append("") self._consolidate_text_lines(lines) prefix = self._level * self.indentation for line in lines[:-1]: if line == "": self.writer("\n") else: self.writer(f"{prefix}{line}\n") if lines[-1] != "": self.writer(f"{prefix}{lines[-1]}") def _consolidate_text_lines(self, lines: list[str]): last_node = self._unwritten_text_nodes[-1] assert last_node.parent is not None if ( lines[0] == "" and last_node is last_node.parent.last_child and self._whitespace_is_legit_after_node(last_node) ): lines.append("") if self._line_offset == 0 and lines[0] == "": lines.pop(0) if len(lines) >= 2 and lines[-1] == "": lines[-2] = lines[-2].rstrip() @staticmethod def _wrap_text(text: str, width: int) -> Iterator[str]: while len(text) > width: if (index := text.rfind(" ", 0, width + 1)) > -1 or ( index := text.find(" ", width) ) > 0: yield text[:index] text = text[index + 1 :] else: yield text return if text: yield text
[docs] class FormatOptions(NamedTuple): """ Instances of this class can be used to define serialization formatting that is not so hard to interpret for instances of Homo sapiens s., but more costly to compute. When it's employed whitespace contents will be collapsed and trimmed, newlines will be inserted to improve readability, but only where further whitespace reduction would drop it again. The serialization respects when a tag node bears the ``xml:space`` attribute with the value ``preserve``. But if any descendent of such annotated node signals to allow whitespace alterations again that has no effect. Such attributes with invalid values are ignored. """ align_attributes: bool = False """ Determines whether attributes' names and values line up sharply around vertically aligned equal signs. """ indentation: str = "\t" """ This string prefixes descending nodes' contents one time per depth level. """ width: int = 0 """ A positive value indicates that text nodes shall get wrapped at this character position. Indentations are not considered as part of text. This parameter is purposed to define reasonable widths for text displays that could be scrolled horizontally. """
[docs] class DefaultStringOptions: """ This object's class variables are used to configure the serialization parameters that are applied when nodes are coerced to :class:`str` objects. Hence it also applies when node objects are fed to the :func:`print` function and in other cases where objects are implicitly cast to strings. .. attention:: Use this once to define behaviour on *application level*. For thread-safe serializations of nodes with diverging parameters use :meth:`NodeBase.serialize`! Think thrice whether you want to use this facility in a library. """ namespaces: ClassWar[None | NamespaceDeclarations] = None """ A mapping of prefixes to namespaces. These are overriding possible declarations from a parsed serialisat that the document instance stems from. Any other prefixes for undeclared namespaces are enumerated with the prefix ``ns``. """ newline: ClassWar[None | str] = None """ See :class:`io.TextIOWrapper` for a detailed explanation of the parameter with the same name. """ format_options: ClassWar[None | FormatOptions] = None """ An instance of :class:`FormatOptions` can be provided to configure formatting. """ @classmethod def _get_serializer(cls) -> Serializer: return _get_serializer( _StringWriter(newline=cls.newline), format_options=cls.format_options, namespaces=cls.namespaces, )
[docs] @classmethod def reset_defaults(cls): """Restores the factory settings.""" cls.format_options = None cls.namespaces = None cls.newline = None
class _SerializationWriter(ABC): __slots__ = ("buffer",) def __init__(self, buffer: TextIO): self.buffer = buffer def __call__(self, data: str): self.buffer.write(data) @property def result(self): if isinstance(self.buffer, StringIO): return self.buffer.getvalue() raise TypeError("Underlying buffer must be an instance of `io.StingIO`") class _LengthTrackingWriter(_SerializationWriter): __slots__ = ("offset", "preserve_space") def __init__(self, buffer: TextIO): super().__init__(buffer) self.offset = 0 self.preserve_space = False def __call__(self, data: str): if not self.preserve_space and self.offset == 0: data = data.lstrip("\n") if not data: return if data[-1] == "\n": self.offset = 0 else: if (index := data.rfind("\n")) == -1: self.offset += len(data) else: self.offset = len(data) - (index + 1) super().__call__(data) class _StringWriter(_SerializationWriter): def __init__(self, newline: Optional[str] = None): super().__init__(StringIO(newline=newline)) class _TextBufferWriter(_SerializationWriter): def __init__( self, buffer: TextIOWrapper, encoding: str = "utf-8", newline: Optional[str] = None, ): buffer.reconfigure(encoding=encoding, newline=newline) super().__init__(buffer) # __all__ = ( Attribute.__name__, CommentNode.__name__, DefaultStringOptions.__name__, FormatOptions.__name__, NodeBase.__name__, ProcessingInstructionNode.__name__, QueryResults.__name__, TagAttributes.__name__, TagNode.__name__, TextNode.__name__, altered_default_filters.__name__, any_of.__name__, is_comment_node.__name__, is_processing_instruction_node.__name__, is_root_node.__name__, is_tag_node.__name__, is_text_node.__name__, not_.__name__, new_comment_node.__name__, new_processing_instruction_node.__name__, new_tag_node.__name__, tag.__name__, )