Source code for _delb.plugins.core_loaders

# Copyright (C) 2018-'25  Frank Sachsenheim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.


"""
The ``core_loaders`` module provides a set loaders to retrieve documents from various
data sources.
"""

from __future__ import annotations

from contextlib import suppress
from io import IOBase, UnsupportedOperation
from pathlib import Path
from typing import TYPE_CHECKING, Any

from _delb.builder import parse_nodes
from _delb.plugins import plugin_manager
from _delb.typing import _DocumentNodeType, TagNodeType

if TYPE_CHECKING:
    from types import SimpleNamespace

    from _delb.typing import LoaderResult


# TODO rename to node_loader
[docs] def tag_node_loader(data: Any, config: SimpleNamespace) -> LoaderResult: """ This loader loads either uses a root node (of type :class:`delb.typing.TagNodeType) that has no :class:`delb.Document` context or clones those with such and any non-root node. """ if isinstance(data, _DocumentNodeType): return tuple(n.clone(deep=True) for n in data._child_nodes) if isinstance(data, TagNodeType): if isinstance(data._parent, _DocumentNodeType): data = data.clone(deep=True) elif data._parent is not None: return "Node has a parent node." return (data,) return "The input value is not a TagNode instance."
[docs] @plugin_manager.register_loader() def path_loader(data: Any, config: SimpleNamespace) -> LoaderResult: """ This loader loads from a file that is pointed at with a :class:`pathlib.Path` instance. That instance will be bound to ``source_path`` on the document's :attr:`delb.Document.config` attribute. """ if isinstance(data, Path): if not hasattr(config, "source_url"): config.source_url = (Path.cwd() / data).as_uri() with data.open("rb") as file: return buffer_loader(file, config) return "The input value is not a pathlib.Path instance."
[docs] @plugin_manager.register_loader(after=path_loader) def buffer_loader(data: Any, config: SimpleNamespace) -> LoaderResult: """ This loader loads a document from a :term:`file-like object` that reads binary data. """ if isinstance(data, IOBase): if ( not hasattr(config, "source_url") and isinstance(name := getattr(data, "name", None), (bytes, str)) and ( path := Path.cwd() / Path(name if isinstance(name, str) else name.decode()) ).is_file() ): config.source_url = path.as_uri() with suppress(UnsupportedOperation): data.seek(0) return tuple( parse_nodes(data, config.parser_options, base_url=config.source_url) ) return "The input value is no buffer object."
[docs] @plugin_manager.register_loader() def text_loader(data: Any, config: SimpleNamespace) -> LoaderResult: """ Parses a string containing a full document. """ if isinstance(data, (bytes, str)): return tuple(parse_nodes(data, config.parser_options, base_url=None)) return "The input value is not a byte sequence or a string."
__all__ = ( buffer_loader.__name__, path_loader.__name__, tag_node_loader.__name__, text_loader.__name__, )