# (c) 2021-2023 Martin Wendt; see https://github.com/mar10/nutree
# Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php
"""
Declare the :class:`~nutree.node.Node` class.
"""
from __future__ import annotations
import re
from operator import attrgetter
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, Iterable, Iterator
from nutree.mermaid import (
MermaidDirectionType,
MermaidEdgeMapperCallbackType,
MermaidFormatType,
MermaidNodeMapperCallbackType,
node_to_mermaid_flowchart,
)
if TYPE_CHECKING: # Imported by type checkers, but prevent circular includes
from .tree import Tree
from .common import (
CONNECTORS,
AmbiguousMatchError,
DataIdType,
DeserializeMapperType,
IterMethod,
KeyMapType,
MapperCallbackType,
PredicateCallbackType,
SelectBranch,
SerializeMapperType,
SkipBranch,
StopTraversal,
TraversalCallbackType,
UniqueConstraintError,
ValueMapType,
call_mapper,
call_predicate,
call_traversal_cb,
)
from .dot import node_to_dot
from .rdf import RDFMapperCallbackType, node_to_rdf
# ------------------------------------------------------------------------------
# - Node
# ------------------------------------------------------------------------------
[docs]
class Node:
"""
A Node represents a single element in the tree.
It is a shallow wrapper around a user data instance, that adds navigation,
modification, and other functionality.
Node objects are rarely created using this contructor, but indirectly by
invoking helper methods like :meth:`~nutree.node.Node.add_child`, etc.
`data`
is the arbitrary object that this node will hold.
`parent`
is the parent :meth:`~nutree.node.Node` instance. This node will
also inherit the tree reference from it.
`data_id`
is an optional integer or string that will be used as ID
(instead of calculating ``hash(data)``).
A tree may contain more than one node with the same data and data_id.
In this case we call the nodes 'clones'.
`node_id`
is an optional integer, that is used as unique ID for this node.
Even 'clones' must have unique node IDs. The default is calculated as
`id(self)`.
`meta`
is an optional dictionary. See also :meth:`~nutree.node.Node.get_meta`,
:meth:`~nutree.node.Node.set_meta`, :meth:`~nutree.node.Node.update_meta`,
and :meth:`~nutree.node.Node.clear_meta`.
"""
# Slots may reduce node size (about 20% smaller):
__slots__ = (
"__weakref__", # Allow weak references to Nodes
"_children",
"_data_id",
"_data",
"_meta",
"_node_id",
"_parent",
"_tree",
)
#: Default value for ``repr`` argument when formatting data for print/display.
DEFAULT_RENDER_REPR = "{node.data!r}"
# #: Default value for ``repr`` argument when formatting data for export,
# #: like DOT, RDF, ...
# DEFAULT_NAME_REPR = "{node.data!r}"
# # DEFAULT_NAME_REPR = "{node.data}"
def __init__(
self,
data,
*,
parent: Node,
data_id: DataIdType | None = None,
node_id: int | None = None,
meta: dict = None,
):
self._data = data
self._parent: Node = parent
tree = parent._tree
self._tree: Tree = tree
self._children: list[Node] = None
if data_id is None:
self._data_id: DataIdType = tree.calc_data_id(data)
else:
self._data_id: DataIdType = data_id
if node_id is None:
self._node_id: int = id(self)
else:
self._node_id = int(node_id)
self._meta = meta
tree._register(self)
def __repr__(self) -> str:
return f"{self.__class__.__name__}<{self.name!r}, data_id={self.data_id}>"
[docs]
def __eq__(self, other) -> bool:
"""Implement ``node == other`` syntax to compare embedded data.
If `other` is a :class:`Node` instance, ``self.data == other.data`` is
evaluated.
Otherwise ``self.data == other`` is compared.
Use ``node is other`` syntax instead to check if two nodes are truly
identical.
"""
if isinstance(other, Node):
return self._data == other._data
return self._data == other
[docs]
def __getattr__(self, name: str) -> Any:
"""Implement ``node.NAME`` aliasing to ``node.data.NAME``.
See :ref:`shadow-attributes`.
"""
if self._tree._shadow_attrs:
return getattr(self.data, name)
raise AttributeError
# def __iadd__(self, other) -> None:
# """Add child node(s)."""
# if isinstance(other, (Node, str)):
# self.add_child(other)
# elif isinstance(other, (list, tuple)):
# for o in other:
# self += o
# else:
# raise NotImplementedError
# Do not define __len__: we don't want leaf nodes to evaluate as falsy
# def __len__(self) -> int:
# raise NotImplementedError("Use `len(node.data)` or `len(node._children)`.")
@property
def name(self) -> str:
"""String representation of the embedded `data` object."""
return f"{self.data}"
@property
def path(self) -> str:
"""All ancestor names including self, starting with and separated by '/'."""
return self.get_path(repr="{node.name}")
@property
def tree(self) -> Tree:
"""Return container :class:`~nutree.tree.Tree` instance."""
return self._tree
@property
def parent(self) -> Node:
"""Return parent node or None for toplevel nodes."""
p = self._parent
return p if p._parent else None
@property
def children(self) -> list[Node]:
"""Return list of direct child nodes (list may be empty)."""
c = self._children
return [] if c is None else c
@property
def data(self) -> Any:
"""Return the wrapped data instance (use :meth:`~nutree.tree.Tree.set_data()`
to modify)."""
return self._data
@property
def data_id(self) -> DataIdType:
"""Return the wrapped data instance's id
(use :meth:`~nutree.tree.Tree.set_data()` to modify)."""
return self._data_id
@property
def node_id(self) -> int:
"""Return the node's unique key."""
return self._node_id
@property
def meta(self) -> dict | None:
"""Return the node's metadata dictionary or None if empty.
See also :meth:`get_meta`, :meth:`set_meta`, :meth:`update_meta`,
:meth:`clear_meta`.
"""
return self._meta
[docs]
def rename(self, new_name: str) -> None:
"""Set `self.data` to a new string (assuming plain string node)."""
if type(self._data) is str:
return self.set_data(new_name)
raise ValueError("Can only rename plain string nodes")
[docs]
def set_data(self, data, *, data_id=None, with_clones: bool = None) -> None:
"""Change node's `data` and/or `data_id` and update bookkeeping."""
if not data and not data_id:
raise ValueError("Missing data or data_id")
tree = self._tree
if data is None or data is self._data:
new_data = None
else:
new_data = data
if data_id is None:
data_id = tree.calc_data_id(data)
if data_id is None or data_id == self._data_id:
new_data_id = None
else:
new_data_id = data_id
node_map = tree._nodes_by_data_id
cur_nodes = node_map[self._data_id]
has_clones = len(cur_nodes) > 1
if has_clones and with_clones is None:
raise AmbiguousMatchError(
"set_data() for clones requires `with_clones` decision"
)
if new_data_id:
# data_id (and possibly data) changes: we have to update the map
if has_clones:
if with_clones:
# Move the whole slot (but check if new id already exist)
prev_clones = node_map[self._data_id]
del node_map[self._data_id]
try: # are we adding to existing clones now?
node_map[new_data_id].extend(prev_clones)
except KeyError: # still a singleton, just a new data_id
node_map[new_data_id] = prev_clones
for n in prev_clones:
n._data_id = new_data_id
if new_data:
n._data = new_data
else:
# Move this one node to another slot in the map
node_map[self._data_id].remove(self)
try: # are we adding to existing clones again?
node_map[new_data_id].append(self)
except KeyError: # now a singleton with a new data_id
node_map[new_data_id] = [self]
self._data_id = new_data_id
if new_data:
self._data = new_data
else:
# data_id (and possibly data) changed for a *single* node
del node_map[self._data_id]
try: # are we creating a clone now?
node_map[new_data_id].append(self)
except KeyError: # still a singleton, just a new data_id
node_map[new_data_id] = [self]
self._data_id = new_data_id
if new_data:
self._data = new_data
elif new_data:
# `data` changed, but `data_id` remains the same:
# simply replace the reference
if with_clones:
for n in cur_nodes:
n._data = data
else:
self._data = new_data
return
[docs]
def get_children(self) -> list[Node]:
"""Return list of direct child nodes (list may be empty)."""
return self.children
[docs]
def first_child(self) -> Node | None:
"""First direct child node or None if no children exist."""
return self._children[0] if self._children else None
[docs]
def last_child(self) -> Node | None:
"""Last direct child node or None if no children exist."""
return self._children[-1] if self._children else None
[docs]
def get_siblings(self, *, add_self=False) -> list[Node]:
"""Return a list of all sibling entries of self (excluding self) if any."""
if add_self:
return self._parent._children
return [n for n in self._parent._children if n is not self]
[docs]
def first_sibling(self) -> Node:
"""Return first sibling (may be self)."""
return self._parent._children[0]
[docs]
def prev_sibling(self) -> Node | None:
"""Predecessor or None, if node is first sibling."""
if self.is_first_sibling():
return None
idx = self._parent._children.index(self)
return self._parent._children[idx - 1]
[docs]
def next_sibling(self) -> Node | None:
"""Return successor or None, if node is last sibling."""
if self.is_last_sibling():
return None
idx = self._parent._children.index(self)
return self._parent._children[idx + 1]
[docs]
def last_sibling(self) -> Node:
"""Return last node, that share own parent (may be `self`)."""
return self._parent._children[-1]
[docs]
def get_clones(self, *, add_self=False) -> list[Node]:
"""Return a list of all nodes that reference the same data if any."""
clones = self._tree._nodes_by_data_id[self._data_id]
if add_self:
return clones.copy()
return [n for n in clones if n is not self]
[docs]
def depth(self) -> int:
"""Return the distance to the root node (1 for toplevel nodes)."""
return self.calc_depth()
[docs]
def count_descendants(self, *, leaves_only=False) -> int:
"""Return number of descendant nodes, not counting self."""
all = not leaves_only
i = 0
for node in self.iterator():
if all or not node._children:
i += 1
return i
[docs]
def calc_depth(self) -> int:
"""Return the distance to the root node (1 for toplevel nodes)."""
depth = 0
pe = self._parent
while pe is not None:
depth += 1
pe = pe._parent
return depth
[docs]
def calc_height(self) -> int:
"""Return the maximum depth of all descendants (0 for leaves)."""
height = 0
def _ch(n: Node, h: int) -> None:
nonlocal height
c = n._children
if c:
for n in c:
_ch(n, h + 1)
elif h > height:
height = h
_ch(self, 0)
return height
[docs]
def get_index(self) -> int:
"""Return index in sibling list."""
return self._parent._children.index(self)
# --------------------------------------------------------------------------
[docs]
def is_system_root(self) -> bool:
"""Return true if this node is the invisible system root
:class:`~nutree.tree._SystemRootNode`."""
return self._parent is None
[docs]
def is_top(self) -> bool:
"""Return true if this node has no parent."""
return self._parent._parent is None
[docs]
def is_leaf(self) -> bool:
"""Return true if this node is an end node, i.e. has no children."""
return not self._children
[docs]
def is_clone(self) -> bool:
"""Return true if this node's data is referenced at least one more time."""
return bool(len(self._tree._nodes_by_data_id.get(self._data_id)) > 1)
[docs]
def is_first_sibling(self) -> bool:
"""Return true if this node is the first sibling, i.e. the first child
of its parent."""
return self is self._parent._children[0]
[docs]
def is_last_sibling(self) -> bool:
"""Return true if this node is the last sibling, i.e. the last child
of its parent."""
return self is self._parent._children[-1]
[docs]
def has_children(self) -> bool:
"""Return true if this node has one or more children."""
return bool(self._children)
[docs]
def get_top(self) -> Node:
"""Return toplevel ancestor (may be self)."""
root = self
while root._parent._parent:
root = root._parent
return root
[docs]
def is_descendant_of(self, other: Node) -> bool:
"""Return true if this node is direct or indirect child of `other`."""
parent = self._parent
while parent is not None and parent._parent is not None:
if parent is other:
return True
parent = parent._parent
return False
[docs]
def is_ancestor_of(self, other: Node) -> bool:
"""Return true if this node is a parent, grandparent, ... of `other`."""
return other.is_descendant_of(self)
[docs]
def get_common_ancestor(self, other: Node) -> Node | None:
"""Return the nearest node that contains `self` and `other` (may be None)."""
if self._tree is other._tree:
other_parent_set = {
n._node_id for n in other.get_parent_list(add_self=True)
}
for parent in self.get_parent_list(add_self=True, bottom_up=True):
if parent._node_id in other_parent_set:
return parent
return None
[docs]
def get_parent_list(self, *, add_self=False, bottom_up=False) -> list[Node]:
"""Return ordered list of all parent nodes."""
res = []
parent = self if add_self else self._parent
while parent is not None and parent._parent is not None:
res.append(parent)
parent = parent._parent
if not bottom_up:
res.reverse()
return res
[docs]
def get_path(self, *, add_self=True, separator="/", repr="{node.name}") -> str:
"""Return a breadcrumb string, e.g. '/A/a1/a12'."""
res = (repr.format(node=p) for p in self.get_parent_list(add_self=add_self))
return separator + separator.join(res)
# --------------------------------------------------------------------------
[docs]
def add_child(
self,
child: Node | Tree | Any,
*,
before: Node | bool | int | None = None,
deep: bool = None,
data_id=None,
node_id=None,
) -> Node:
"""Append or insert a new subnode or branch as child.
If `child` is an existing :class:`~nutree.node.Node` instance, a copy
of this node will be created, that references the same `child.data`
object. |br|
If `deep` is true, the children are copied recursively.
If `child` is a :class:`~nutree.tree.Tree`, all of its topnodes are
added recursively (unless `deep` is false).
If `child` is neither a :class:`~nutree.node.Node` nor a
:class:`~nutree.tree.Tree`, `child` itself will become the
`data` object of a new node that is added.
The source node may come from the same or from a foreign tree. |br|
Note that adding the same data below one parent is not allowed.
If this node has no children yet, the new node is created as first child.
Otherwise, it will be appended to the existing children by default. |br|
The `before` option may be used to specifiy the position:
- False, None: append the new node as last child
- True, 0: prepend the new node as first child
- <int>: prepend the new node before the existing child with this index
- <Node>: prepend the new node before this child node
Args:
child (Node|Tree|Any):
Either an existing Node or a data object.
before (bool|int|Node|None):
Optional position.
deep (bool):
Copy descendants if any.
This argument defaults to false when `child` is a
:class:`~nutree.node.Node`.
It defaults to true when `child` is a :class:`~nutree.tree.Tree`.
data_id (str|int|None):
Allow to override the new node's `data_id`.
This argument is only allowed for single nodes, but not for
deep copies.
Default `None` will to calculate from ``hash(node.data)``.
node_id (str|int|None):
Optional custom unique node key (defaults to ``id(node)``)
This argument is only allowed for single nodes, but not for
deep copies.
Returns:
the new :class:`~nutree.node.Node` instance
"""
if isinstance(child, self._tree.__class__):
if deep is None:
deep = True
topnodes = child._root.children
if isinstance(before, (int, Node)) or before is True:
topnodes.reverse()
for n in topnodes:
self.add_child(n, before=before, deep=deep)
return
source_node = None
factory = self._tree._node_factory
if isinstance(child, Node):
if deep is None:
deep = False
if deep and data_id is not None or node_id is not None:
raise ValueError("Cannot set ID for deep copies.")
source_node = child
if source_node._tree is self._tree:
if source_node._parent is self._parent:
raise UniqueConstraintError(
f"Same parent not allowed: {source_node}"
)
else:
pass
# raise NotImplementedError("Cross-tree adding")
if data_id and data_id != source_node._data_id:
raise UniqueConstraintError(f"data_id conflict: {source_node}")
# If creating an inherited node, use the parent class as constructor
child_class = child.__class__
node = child_class(
source_node.data, parent=self, data_id=data_id, node_id=node_id
)
else:
node = factory(child, parent=self, data_id=data_id, node_id=node_id)
children = self._children
if children is None:
assert before in (None, True, int, False)
self._children = [node]
elif before is True: # prepend
children.insert(0, node)
elif type(before) is int:
children.insert(before, node)
elif before:
if before._parent is not self:
raise ValueError(
f"`before=node` ({before._parent}) "
f"must be a child of target node ({self})"
)
idx = children.index(before) # raises ValueError
children.insert(idx, node)
else:
children.append(node)
if deep and source_node:
node._add_from(source_node)
return node
#: Alias for :meth:`add_child`
add = add_child
[docs]
def append_child(
self,
child: Node | Tree | Any,
*,
deep=None,
data_id=None,
node_id=None,
):
"""Append a new subnode.
This is a shortcut for :meth:`add_child` with ``before=None``.
"""
return self.add_child(
child, before=None, deep=deep, data_id=data_id, node_id=node_id
)
[docs]
def prepend_child(
self,
child: Node | Tree | Any,
*,
deep=None,
data_id=None,
node_id=None,
):
"""Prepend a new subnode.
This is a shortcut for :meth:`add_child` with ``before=True``.
"""
return self.add_child(
child,
before=self.first_child(),
deep=deep,
data_id=data_id,
node_id=node_id,
)
[docs]
def prepend_sibling(
self,
child: Node | Tree | Any,
*,
deep=None,
data_id=None,
node_id=None,
) -> Node:
"""Add a new node before `self`.
This method calls :meth:`add_child` on ``self.parent``.
"""
return self._parent.add_child(
child, before=self, deep=deep, data_id=data_id, node_id=node_id
)
[docs]
def append_sibling(
self,
child: Node | Tree | Any,
*,
deep=None,
data_id=None,
node_id=None,
) -> Node:
"""Add a new node after `self`.
This method calls :meth:`add_child` on ``self.parent``.
"""
next_node = self.next_sibling()
return self._parent.add_child(
child, before=next_node, deep=deep, data_id=data_id, node_id=node_id
)
[docs]
def move_to(
self,
new_parent: Node | Tree,
*,
before: Node | bool | int | None = None,
):
"""Move this node to another parent.
By default, the node is appended to existing children.
See :meth:`add_child` for a description of `before`.
"""
if new_parent is None:
new_parent = self._tree._root
# elif isinstance(new_parent, Tree):
# new_parent = new_parent._root
elif hasattr(new_parent, "_root"): # it's a Tree
new_parent = new_parent._root
if new_parent._tree is not self._tree:
raise NotImplementedError("Can only move nodes inside same tree")
self._parent._children.remove(self)
self._parent = new_parent
target_siblings = new_parent._children
if target_siblings is None:
assert before in (None, True, False)
new_parent._children = [self]
elif before is True: # prepend
target_siblings.insert(0, self)
elif before:
assert before._parent is new_parent
idx = target_siblings.index(before) # raise ValueError if not found
target_siblings.insert(idx, self)
else:
target_siblings.append(self)
return
[docs]
def remove(self, *, keep_children=False, with_clones=False) -> None:
"""Remove this node.
If `keep_children` is true, all children will be moved one level up,
so they become siblings, before this node is removed.
If `with_clones` is true, all nodes that reference the same data
instance are removed as well.
"""
if with_clones:
for c in self.get_clones(): # Excluding self
c.remove(keep_children=keep_children, with_clones=False)
assert not self.is_clone()
if keep_children:
for c in self.children.copy():
c.move_to(self._parent, before=self)
else:
self.remove_children()
pc = self._parent._children
pc.remove(self)
if not pc: # store None instead of `[]`
pc = self._parent._children = None
self._tree._unregister(self)
[docs]
def remove_children(self) -> None:
"""Remove all children of this node, making it a leaf node."""
_unregister = self._tree._unregister
for n in self._iter_post():
_unregister(n)
self._children = None
return
[docs]
def copy(self, *, add_self=True, predicate: PredicateCallbackType = None) -> Tree:
"""Return a new :class:`~nutree.tree.Tree` instance from this branch.
See also :ref:`iteration-callbacks`.
"""
new_tree = self._tree.__class__()
if add_self:
root = new_tree.add(self)
else:
root = new_tree._root
root._add_from(self, predicate=predicate)
return new_tree
[docs]
def copy_to(
self,
target: Node | Tree,
*,
add_self=True,
before: Node | bool | int | None = None,
deep: bool = False,
) -> Node:
"""Copy this node to another parent and return the new node.
If `add_self` is set, a copy of this node becomes a child of `target`.
Otherwise copies of all children of this node are created below `target`.
By default new nodes are appended to existing children. The `before`
argument defines an alternative positioning.
It is only available when `add_self` is true.
See :meth:`add_child` for a description of `before`.
If `deep` is set, all descendants are copied recursively.
"""
if add_self:
return target.add_child(self, before=before, deep=deep)
assert before is None
res = None
for child in self.children:
n = target.add_child(child, before=None, deep=deep)
res = res or n # Return the first new node
return res
def _add_from(
self, other: Node, *, predicate: PredicateCallbackType = None
) -> None:
"""Append copies of all source descendants to self.
See also :ref:`iteration-callbacks`.
"""
if predicate:
return self._add_filtered(other, predicate)
assert not self._children
for child in other.children:
new_child = self.add_child(child.data, data_id=child._data_id)
if child.children:
# if child.has_children():
new_child._add_from(child, predicate=None)
return
def _add_filtered(self, other: Node, predicate: PredicateCallbackType) -> None:
"""Append a filtered copy of `other` and its descendants as children.
See also :ref:`iteration-callbacks`.
"""
# Stack of parent node objects 2-tuples (is_existing, node), used to
# create optional parents on demand.
# If existing, `node` references this tree. Otherwise, `node` references
# the `other` tree.
parent_stack = [(True, self)]
def _create_parents() -> Node:
"""Materialize all virtual parents and return the last one."""
# print("_create_parents", parent_stack)
p = parent_stack[0][1]
for idx, (existing, n) in enumerate(parent_stack):
if existing:
p = n
else:
p = p.add(n)
parent_stack[idx] = (True, p)
return p
def _visit(other: Node) -> None:
"""Return True if any descendant returned True."""
# print("_visit", parent_stack, other)
for n in other.children:
parent_stack.append((False, n))
res = call_predicate(predicate, n)
if isinstance(res, SkipBranch):
if res.and_self is False:
# Add the node itself if user explicitly returned
# `SkipBranch(and_self=False)`
p = _create_parents()
p.add_child(n)
elif isinstance(res, StopTraversal):
raise res
elif isinstance(res, SelectBranch):
# Unconditionally copy whole branch: no need to visit children
p = _create_parents()
p._add_from(n)
elif res in (None, False): # Add only if has a `true` descendant
_visit(n)
elif res is True: # Add this node (and also check children)
p = _create_parents()
p.add_child(n)
_visit(n)
parent_stack.pop()
return
try:
_visit(other)
except StopTraversal:
pass
return
[docs]
def filtered(self, predicate: PredicateCallbackType) -> Tree:
"""Return a filtered copy of this node and descendants as tree.
See also :ref:`iteration-callbacks`.
"""
return self.copy(add_self=True, predicate=predicate)
[docs]
def filter(self, predicate: PredicateCallbackType) -> None:
"""In-place removal of mismatching nodes.
See also :ref:`iteration-callbacks`.
"""
def _visit(parent: Node) -> bool:
"""Return True if any descendant returned True."""
remove_nodes = []
must_keep = None
for n in parent.children:
res = call_predicate(predicate, n)
if res in (None, False): # Keep only if has a `true` descendant
if _visit(n):
must_keep = True
else:
remove_nodes.append(n)
elif res is True: # Keep this node (and also check children)
_visit(n)
must_keep = True
elif isinstance(res, SelectBranch):
# Unconditionally keep whole branch: no need to visit children
must_keep = True
elif isinstance(res, SkipBranch):
if res.and_self is False:
remove_nodes = n.children
else:
remove_nodes.append(n)
elif isinstance(res, StopTraversal):
raise res
for n in remove_nodes:
n.remove()
return must_keep
try:
_visit(self)
except StopTraversal:
pass
return
[docs]
def from_dict(
self, obj: list[dict], *, mapper: DeserializeMapperType | None = None
) -> None:
"""Append copies of all source children to self."""
# TODO:
# if mapper is None:
# mapper = self._tree.DEFAULT_DESERIALZATION_MAPPER
assert not self._children
for item in obj:
if mapper:
# mapper may add item['data_id']
# data = mapper(parent=self, item=item)
data_obj = call_mapper(mapper, self, item)
else:
data_obj = item["data"]
child = self.append_child(
data_obj, data_id=item.get("data_id"), node_id=item.get("node_id")
)
child_items = item.get("children")
if child_items:
child.from_dict(child_items)
return
def _visit_pre(self, callback, memo) -> None:
"""Depth-first, pre-order traversal."""
# Call callback and skip children if SkipBranch was returned.
# Also a StopTraversal(value) exception may be raised.
if call_traversal_cb(callback, self, memo) is False:
return
children = self._children
if children:
for c in children:
c._visit_pre(callback, memo)
return
def _visit_post(self, callback, memo) -> None:
"""Depth-first, post-order traversal."""
# Callback may raise StopTraversal (also if callback returns false)
# but SkipBranch is not supported with post-order traversal
children = self._children
if children:
for c in self._children:
c._visit_post(callback, memo)
call_traversal_cb(callback, self, memo)
def _visit_level(self, callback, memo) -> None:
"""Breadth-first (aka level-order) traversal."""
# Note that this is non-recursive.
children = self._children
while children:
next_level = []
for c in children:
if call_traversal_cb(callback, c, memo) is False:
continue
if c._children:
next_level.extend(c._children)
children = next_level
return
[docs]
def visit(
self,
callback: TraversalCallbackType,
*,
add_self=False,
method=IterMethod.PRE_ORDER,
memo=None,
) -> None | Any:
"""Call `callback(node, memo)` for all subnodes.
The callback may return :class:`SkipBranch` (or an instance
thereof) to omit child nodes but continue traversal otherwise.
Raising `SkipBranch` has the same effect.
The callback may return ``False`` or :class:`StopIteration` to immediately
interrupt traversal.
Raising `StopTraversal(value)` has the same effect but also allows to
specify a return value for the visit method. |br|
See also :ref:`iteration-callbacks`.
Args:
callback (function(node, memo)):
Callback ``function(node, memo)``
add_self (bool):
If true, this node will also be visited (typically as first call).
method (IterMethod):
Traversal method, defaults to pre-order, depth-first search.
memo (Any):
This value will be passed to all calls and may be useful to
implement caching or collect and return traversal results.
If no `memo` argument is passed, an empty dict is created at
start, which has a life-span of the traversal only.
"""
try:
handler = getattr(self.__class__, f"_visit_{method.value}")
except AttributeError:
raise NotImplementedError(
f"Unsupported traversal method {method!r}."
) from None
if memo is None:
memo = {}
try:
# Level-order is non-recursive
if method == IterMethod.LEVEL_ORDER:
if add_self:
if call_traversal_cb(callback, self, memo) is False:
return
self._visit_level(callback, memo)
return
# Otherwise pre- or post-order
if add_self and method != IterMethod.POST_ORDER:
if call_traversal_cb(callback, self, memo) is False:
return
for c in self._children:
handler(c, callback, memo)
if add_self and method == IterMethod.POST_ORDER:
if call_traversal_cb(callback, self, memo) is False:
return
except StopTraversal as e:
return e.value
def _iter_pre(self) -> Iterator[Node]:
"""Depth-first, pre-order traversal."""
children = self._children
if children:
for c in children:
yield c
yield from c._iter_pre()
return
def _iter_post(self) -> Iterator[Node]:
"""Depth-first, post-order traversal."""
children = self._children
if children:
for c in self._children:
yield from c._iter_post()
yield c
return
def _iter_level(self, *, revert=False, toggle=False) -> Iterator[Node]:
"""Breadth-first (aka level-order) traversal."""
children = self._children
while children:
next_level = []
for c in children:
if c._children:
next_level.extend(c._children)
if revert:
yield from reversed(children)
else:
yield from children
if toggle:
revert = not revert
children = next_level
return
def _iter_level_rtl(self) -> Iterator[Node]:
"""Breadth-first (aka level-order) traversal, right-to-left."""
return self._iter_level(revert=True, toggle=False)
def _iter_zigzag(self) -> Iterator[Node]:
"""ZigZag traversal."""
return self._iter_level(revert=False, toggle=True)
def _iter_zigzag_rtl(self) -> Iterator[Node]:
"""ZigZag traversal, right-to-left."""
return self._iter_level(revert=True, toggle=True)
[docs]
def iterator(
self, method=IterMethod.PRE_ORDER, *, add_self=False
) -> Iterator[Node]:
"""Generator that walks the hierarchy."""
try:
handler = getattr(self, f"_iter_{method.value}")
except AttributeError:
raise NotImplementedError(
f"Unsupported traversal method {method!r}."
) from None
if add_self and method != IterMethod.POST_ORDER:
yield self
yield from handler()
if add_self and method == IterMethod.POST_ORDER:
yield self
#: Implement ``for subnode in node: ...`` syntax to iterate descendant nodes.
__iter__ = iterator
def _search(self, match, *, max_results=None, add_self=False) -> Iterator[Node]:
if callable(match):
cb_match = match
elif type(match) is str:
pattern = re.compile(pattern=match)
cb_match = lambda node: pattern.fullmatch(node.name) # noqa: E731
elif isinstance(match, (list, tuple)):
pattern = re.compile(pattern=match[0], flags=match[1])
cb_match = lambda node: pattern.fullmatch(node.name) # noqa: E731
else:
cb_match = lambda node: node._data is match # noqa: E731
count = 0
for node in self.iterator(add_self=add_self):
if not cb_match(node):
continue
count += 1
yield node
if max_results and count >= max_results:
break
return
[docs]
def find_all(
self, data=None, *, match=None, data_id=None, add_self=False, max_results=None
) -> list[Node]:
"""Return a list of matching nodes (list may be empty).
See also :ref:`iteration-callbacks`.
"""
if data:
assert data_id is None
data_id = self._tree.calc_data_id(data)
if data_id:
assert match is None
return [
n for n in self.iterator(add_self=add_self) if n._data_id == data_id
]
return [
n for n in self._search(match, add_self=add_self, max_results=max_results)
]
[docs]
def find_first(self, data=None, *, match=None, data_id=None) -> Node | None:
"""Return the first matching node or `None`.
See also :ref:`iteration-callbacks`.
"""
res = self.find_all(data, match=match, data_id=data_id, max_results=1)
return res[0] if res else None
#: Alias for :meth:`find_first`
find = find_first
[docs]
def sort_children(self, *, key=None, reverse=False, deep=False) -> None:
"""Sort child nodes.
`key` defaults to ``attrgetter("name")``, so children are sorted by
their string representation.
"""
cl = self._children
if not cl or len(cl) == 1 and not deep:
return
if key is None:
key = attrgetter("name")
cl.sort(key=key, reverse=reverse)
if deep:
for c in cl:
c.sort_children(key=key, reverse=reverse, deep=True)
return
def _get_prefix(self, style, lstrip) -> str:
if len(style) == 4:
s0, s1, s2, s3 = style
s4 = s2
s5 = s3
elif len(style) == 6:
s0, s1, s2, s3, s4, s5 = style
else:
raise ValueError(f"Invalid style {style!r}")
def _is_last(p) -> bool:
# Don't use `is_last_sibling()` which is overloaded by TypedNode
return p is p._parent._children[-1]
parts = []
depth = 0
for p in self.get_parent_list():
depth += 1
if depth <= lstrip:
continue
if _is_last(p):
parts.append(s0) # " "
else:
parts.append(s1) # " | "
if depth >= lstrip:
if self._children:
if _is_last(self):
parts.append(s4) # " ╰┬─ "
else:
parts.append(s5) # " ├┬─ "
else:
if _is_last(self):
parts.append(s2) # " ╰── "
else:
parts.append(s3) # " ├── "
return "".join(parts)
def _render_lines(self, *, repr=None, style=None, add_self=True) -> Iterator[str]:
if type(style) not in (list, tuple):
try:
style = CONNECTORS[style or self.tree.DEFAULT_CONNECTOR_STYLE]
except KeyError:
raise ValueError(
f"Invalid style {style!r}. Expected: {'|'.join(CONNECTORS.keys())}"
) from None
if repr is None:
repr = self.DEFAULT_RENDER_REPR
# Find out if we need to strip some of the leftmost prefixes.
# If this was called for a normal node, we strip all parent levels
# (and also the own prefix when `add_self` is false).
# If this was called for the system root node, we do the same, but we
# never render self, because the title is rendered by the caller.
lstrip = self.depth()
if not add_self:
lstrip += 1
if not self._parent:
add_self = False
for n in self.iterator(add_self=add_self):
prefix = n._get_prefix(style, lstrip)
if callable(repr):
s = repr(n)
else:
s = repr.format(node=n)
yield prefix + s
return
[docs]
def to_dict(self, *, mapper: SerializeMapperType | None = None) -> dict:
"""Return a nested dict of this node and its children."""
res = {
"data": str(self.data),
}
# Add custom data_id if not calculated to the hash by default.
if self._data_id != hash(self._data):
res["data_id"] = self._data_id
res = call_mapper(mapper, self, res)
# if mapper:
# res = mapper(self, res)
if self._children:
res["children"] = cl = []
for n in self._children:
cl.append(n.to_dict(mapper=mapper))
return res
@classmethod
def _compress_entry(
cls, data: dict | str, key_map: KeyMapType, value_map: ValueMapType
) -> None:
if isinstance(data, str):
return
for key, value in list(data.items()):
if key in key_map:
short_key = key_map[key]
data[short_key] = data.pop(key)
else:
short_key = key
if key in value_map:
data[short_key] = value_map[key][value]
return
@classmethod
def _make_list_entry(cls, node: Node) -> dict:
node_data = node._data
is_custom_id = node._data_id != hash(node_data)
# If data is more complex than a simple string, or if we use a custom
# data_id, we store data as a dict instead of a str:
if type(node_data) is str:
if not is_custom_id:
# Special case: node.data is a plain string without custom ID
return node_data
data = {
"str": node_data,
}
else:
# data is stored as-is, i.e. plain string instead of dict
data = {
# "id": data_id,
}
# Add custom data_id if not calculated as hash by default.
if node._data_id != hash(node_data):
data["data_id"] = node._data_id
return data
[docs]
def to_list_iter(
self,
*,
mapper: SerializeMapperType | None = None,
key_map: KeyMapType | None = None,
value_map: ValueMapType | None = None,
) -> Iterator[dict]:
"""Yield children as parent-referencing list.
```py
[(parent_key, data)]
```
"""
calc_id = self._tree.calc_data_id
#: For nodes with multiple occurrences: index of the first one
#: For typed nodes, we must also check if the `kind` matches, before
#: simply store a reference.
clone_idx_and_kind_map = {}
parent_id_map = {self._node_id: 0}
if mapper is None:
mapper = self._tree.serialize_mapper
key_map = {} if key_map is None else key_map
if value_map is None:
value_map = {}
else:
# Convert value_map entries from lists to dicts for faster lookup
# of the index.
# E.g. `{'t': ['person', 'dept']}` -> {'t': {'person': 0, 'dept': 1}}`
value_map = {
k: {v: i for i, v in enumerate(a)} for k, a in value_map.items()
}
# print("value_map", value_map)
for id_gen, node in enumerate(self, 1):
# Compact mode: use integer sequence as keys
# Store idx with original id for later parent-ref. We only have to
# do this for nodes that have children though:
node_id = node._node_id
if node._children:
parent_id_map[node_id] = id_gen
parent_id = node._parent._node_id
parent_idx = parent_id_map[parent_id]
node_data = node._data
data_id = calc_id(node_data)
# If node is a 2nd occurrence of a clone, only store the index of the
# first occurrence and do not call the mapper
node_kind = getattr(node, "kind", None)
clone_idx, clone_kind = clone_idx_and_kind_map.get(data_id, (None, None))
if clone_idx:
if node_kind == clone_kind:
yield (parent_idx, clone_idx)
continue
elif node.is_clone():
# First instance of a clone node: take a note
clone_idx_and_kind_map[data_id] = (id_gen, node_kind)
# If node.data is more complex than a simple string, or if we use a
# custom data_id, we store data as a dict instead of a str:
data = self._make_list_entry(node)
# Let caller serialize custom data objects
data = call_mapper(mapper, node, data)
# Compress data if requested
if key_map or value_map:
self._compress_entry(data, key_map, value_map)
yield (parent_idx, data)
return
[docs]
def to_dot(
self,
*,
add_self=False,
unique_nodes=True,
graph_attrs: dict = None,
node_attrs: dict = None,
edge_attrs: dict = None,
node_mapper: MapperCallbackType = None,
edge_mapper: MapperCallbackType = None,
) -> Iterator[str]:
"""Generate a DOT formatted graph representation.
See :ref:`graphs` for details.
"""
res = node_to_dot(
self,
add_self=add_self,
unique_nodes=unique_nodes,
graph_attrs=graph_attrs,
node_attrs=node_attrs,
edge_attrs=edge_attrs,
node_mapper=node_mapper,
edge_mapper=edge_mapper,
)
return res
[docs]
def to_rdf_graph(
self, *, add_self: bool = True, node_mapper: RDFMapperCallbackType = None
):
"""Return an instance of ``rdflib.Graph``.
See :ref:`graphs` for details.
"""
return node_to_rdf(self, add_self=add_self, node_mapper=node_mapper)
[docs]
def to_mermaid_flowchart(
self,
target: IO[str] | str | Path,
*,
as_markdown: bool = True,
direction: MermaidDirectionType = "TD",
title: str | bool | None = True,
format: MermaidFormatType | None = None,
mmdc_options: dict | None = None,
add_self: bool = True,
unique_nodes: bool = True,
headers: Iterable[str] | None = None,
node_mapper: MermaidNodeMapperCallbackType | None = None,
edge_mapper: MermaidEdgeMapperCallbackType | None = None,
) -> None:
"""Serialize a Mermaid flowchart representation.
Optionally convert to a Graphviz display formats.
See :ref:`graphs` for details.
"""
return node_to_mermaid_flowchart(
node=self,
target=target,
as_markdown=as_markdown,
direction=direction,
title=title,
format=format,
mmdc_options=mmdc_options,
add_root=add_self,
unique_nodes=unique_nodes,
headers=headers,
node_mapper=node_mapper,
edge_mapper=edge_mapper,
)