Source code for _delb.plugins.core_loaders

# Copyright (C) 2018-'22  Frank Sachsenheim
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <>.

The ``core_loaders`` module provides a set loaders to retrieve documents from various
data sources.

from __future__ import annotations

from contextlib import suppress
from copy import deepcopy
from io import IOBase, UnsupportedOperation
from pathlib import Path
from typing import TYPE_CHECKING, cast, Any, IO
from warnings import warn

from lxml import etree

from _delb import utils
from _delb.nodes import TagNode
from _delb.plugins import plugin_manager

    from types import SimpleNamespace

    from _delb.typing import LoaderResult

[docs]def tag_node_loader(data: Any, config: SimpleNamespace) -> LoaderResult: """ This loader loads, or rather clones, a :class:`delb.TagNode` instance and its descendant nodes. """ if isinstance(data, TagNode): tree = etree.ElementTree() root = data.clone(deep=True) tree._setroot(root._etree_obj) utils._copy_root_siblings(data._etree_obj, root._etree_obj) return tree return "The input value is not a TagNode instance."
[docs]@plugin_manager.register_loader() def etree_loader(data: Any, config: SimpleNamespace) -> LoaderResult: """ This loader processes :class:`lxml.etree._Element` and :class:`lxml.etree._ElementTree` instances. """ if isinstance(data, etree._ElementTree): return deepcopy(data) if isinstance(data, etree._Element): return etree.ElementTree(element=deepcopy(data), parser=config.parser) return "The input value is neither an etree.Element or …Tree instance."
[docs]@plugin_manager.register_loader(after=etree_loader) def path_loader(data: Any, config: SimpleNamespace) -> LoaderResult: """ This loader loads from a file that is pointed at with a :class:`pathlib.Path` instance. That instance will be bound to ``source_path`` on the document's :attr:`Document.config` attribute. """ if isinstance(data, Path): config.source_url = f"file://{data}" with"r") as file: return buffer_loader(file, config) return "The input value is not a pathlib.Path instance."
[docs]@plugin_manager.register_loader(after=path_loader) def buffer_loader(data: Any, config: SimpleNamespace) -> LoaderResult: """ This loader loads a document from a :term:`file-like object`. """ if isinstance(data, IOBase): with suppress(UnsupportedOperation): return etree.parse(cast("IO", data), parser=config.parser) return "The input value is no buffer object."
[docs]@plugin_manager.register_loader(after=buffer_loader) def ftp_loader(data: Any, config: SimpleNamespace) -> LoaderResult: """ Loads a document from a URL with either the ``ftp`` schema. The URL will be bound to ``source_url`` on the document's :attr:`Document.config` attribute. """ if isinstance(data, str) and data.lower().startswith("ftp://"): warn("The FTP loader will be removed in a future release.") result = etree.parse(data, parser=config.parser) config.source_url = data return result return "The input value is not an URL with the ftp scheme."
[docs]@plugin_manager.register_loader(after=ftp_loader) def text_loader(data: Any, config: SimpleNamespace) -> LoaderResult: """ Parses a string containing a full document. """ if isinstance(data, str): data = data.encode() if isinstance(data, bytes): root = etree.fromstring(data, config.parser) return etree.ElementTree(element=root) return "The input value is not a byte sequence."
__all__ = ( buffer_loader.__name__, etree_loader.__name__, ftp_loader.__name__, path_loader.__name__, tag_node_loader.__name__, text_loader.__name__, )