Source code for nutree.typed_tree

# (c) 2021-2023 Martin Wendt; see
# Licensed under the MIT license:
Declare the :class:`~nutree.tree.TypedTree` class.
from __future__ import annotations

from collections import Counter
from pathlib import Path
from typing import IO, Any, Iterator

from nutree.common import (

from .node import Node
from .tree import Tree

[docs] class ANY_KIND: """Special argument value for some methods that access child nodes."""
# ------------------------------------------------------------------------------ # - TypedNode # ------------------------------------------------------------------------------
[docs] class TypedNode(Node): """ A special node variant, derived from :class:`~nutree.node.Node` and used by :class:`~nutree.typed_tree.TypedTree`. In addition to :class:`~nutree.node.Node`, we have a `kind` member to define the node's type. """ __slots__ = ("_kind",) #: Default value for ``repr`` argument when formatting data for print/display. DEFAULT_RENDER_REPR = "{node.kind}{}" # #: Default value for ``repr`` argument when formatting data for export, # #: like DOT, RDF, ... # DEFAULT_NAME_REPR = "{!r}" def __init__( self, kind: str, data, *, parent: TypedNode, data_id=None, node_id=None, meta: dict = None, ): super().__init__( data, parent=parent, data_id=data_id, node_id=node_id, meta=meta ) assert isinstance(kind, str) and kind != ANY_KIND, f"Unsupported `kind`: {kind}" self._kind = kind # del self._children # self._child_map: Dict[Node] = None def __repr__(self) -> str: return ( f"{self.__class__.__name__}<kind={self.kind}, " f"{}, data_id={self.data_id!r}>" ) # @property # def name(self) -> str: # """String representation of the embedded `data` object with kind.""" # # return f"{self._kind} → {}" # # Inspired by clarc notation: # # return f"{{{self._kind}}}:{}" # return f"{}" @property def kind(self) -> str: return self._kind @property def parent(self) -> TypedNode: """Return parent node or None for toplevel nodes.""" p = self._parent return p if p._parent else None @property def children(self) -> list[TypedNode]: """Return list of direct child nodes (list may be empty). Note that this property returns all children, independent of the kind. See also :meth:`get_children`. """ c = self._children return [] if c is None else c
[docs] def get_children(self, kind: str | ANY_KIND) -> list[TypedNode]: """Return list of direct child nodes of a given type (list may be empty).""" all_children = self._children if not all_children: return [] elif kind is ANY_KIND: return all_children return list(filter(lambda n: n._kind == kind, all_children))
# def set_data( # self, kind: str, data, *, data_id=None, with_clones: bool = None # ) -> None: # """Change node's `data` and/or `data_id` and update bookkeeping.""" # super().set_data(data, data_id=data_id, with_clones=with_clones)
[docs] def first_child(self, kind: str | ANY_KIND) -> TypedNode | None: """First direct child node or None if no children exist.""" all_children = self._children if not all_children: return None elif kind is ANY_KIND: return all_children[0] for n in all_children: if n._kind == kind: return n return None
[docs] def last_child(self, kind: str | ANY_KIND) -> TypedNode | None: """Last direct child node or None if no children exist.""" all_children = self._children if not all_children: return None elif kind is ANY_KIND: return all_children[-1] for i in range(len(all_children) - 1, -1, -1): n = all_children[i] if n._kind == kind: return n return None
[docs] def has_children(self, kind: str | ANY_KIND) -> bool: """Return true if this node has one or more children.""" if kind is ANY_KIND: return bool(self._children) return len(self.get_children(kind)) > 1
[docs] def get_siblings(self, *, add_self=False, any_kind=False) -> list[TypedNode]: """Return a list of all sibling entries of self (excluding self) if any.""" if any_kind: return super().get_siblings(add_self=add_self) children = self._parent._children rel = self.kind return [n for n in children if (add_self or n is not self) and n.kind == rel]
[docs] def first_sibling(self, *, any_kind=False) -> TypedNode: """Return first sibling `of the same kind` (may be self).""" pc = self._parent._children if any_kind: return pc[0] for n in pc: if n._kind == self._kind: return n raise AssertionError("Internal error")
[docs] def last_sibling(self, *, any_kind=False) -> TypedNode: """Return last sibling `of the same kind` (may be self).""" pc = self._parent._children if any_kind: return pc[-1] for n in reversed(pc): if n._kind == self._kind: return n raise AssertionError("Internal error")
[docs] def prev_sibling(self, *, any_kind=False) -> TypedNode | None: """Return predecessor `of the same kind` or None if node is first sibling.""" pc = self._parent._children own_idx = pc.index(self) if own_idx > 0: for idx in range(own_idx - 1, -1, -1): n = pc[idx] if any_kind or n._kind == self._kind: return n return None
[docs] def next_sibling(self, *, any_kind=False) -> TypedNode | None: """Return successor `of the same kind` or None if node is last sibling.""" pc = self._parent._children pc_len = len(pc) own_idx = pc.index(self) if own_idx < pc_len - 2: for idx in range(own_idx + 1, pc_len): n = pc[idx] if any_kind or n._kind == self._kind: return n return None
# def get_clones(self, *, add_self=False) -> List[TypedNode]: # """Return a list of all nodes that reference the same data if any.""" # clones = self._tree._nodes_by_data_id[self._data_id] # if add_self: # return clones.copy() # return [n for n in clones if n is not self]
[docs] def get_index(self, *, any_kind=False) -> int: """Return index in sibling list.""" if any_kind: kc = self._parent._children else: kc = self.parent.get_children(self.kind) return kc.index(self)
[docs] def is_first_sibling(self, *, any_kind=False) -> bool: """Return true if this node is the first sibling, i.e. the first child of its parent.""" if any_kind: return self is self._parent._children[0] return self is self.first_sibling(any_kind=False)
[docs] def is_last_sibling(self, *, any_kind=False) -> bool: """Return true if this node is the last sibling, i.e. the last child **of this kind** of its parent.""" if any_kind: return self is self._parent._children[-1] return self is self.last_sibling(any_kind=False)
[docs] def add_child( self, child: TypedNode | TypedTree | Any, *, kind: str = None, before: TypedNode | bool | int | None = None, deep: bool = None, data_id=None, node_id=None, ) -> TypedNode: """See ...""" # assert not isinstance(child, TypedNode) or child.kind == self.kind # TODO: kind is optional if child is a TypedNode # TODO: Check if target and child types match # TODO: share more code from overloaded method if kind is None: kind = self._tree.DEFAULT_CHILD_TYPE if isinstance(child, (Node, Tree)) and not isinstance( child, (TypedNode, TypedTree) ): raise TypeError("If child is a node or tree it must be typed.") if isinstance(child, self._tree.__class__): if deep is None: deep = True topnodes = child._root.children if isinstance(before, (int, TypedNode)) or before is True: topnodes.reverse() for n in topnodes: self.add_child(n, before=before, deep=deep) return source_node = None factory = self._tree._node_factory if isinstance(child, Node): # TypedNode): if deep is None: deep = False if deep and data_id is not None or node_id is not None: raise ValueError("Cannot set ID for deep copies.") source_node = child if source_node._tree is self._tree: if source_node._parent is self: raise UniqueConstraintError( f"Same parent not allowed: {source_node}" ) else: pass if data_id and data_id != source_node._data_id: raise UniqueConstraintError(f"data_id conflict: {source_node}") # If creating an inherited node, use the parent class as constructor child_class = child.__class__ node = child_class( kind,, parent=self, data_id=data_id, node_id=node_id, ) else: node = factory(kind, child, parent=self, data_id=data_id, node_id=node_id) children = self._children if children is None: assert before in (None, True, int, False) self._children = [node] elif before is True: # prepend children.insert(0, node) elif type(before) is int: children.insert(before, node) elif before: if before._parent is not self: raise ValueError( f"`before=node` ({before._parent}) " f"must be a child of target node ({self})" ) idx = children.index(before) # raises ValueError children.insert(idx, node) else: children.append(node) if deep and source_node: node._add_from(source_node) return node
#: Alias for :meth:`add_child` add = add_child
[docs] def append_child( self, child: TypedNode | TypedTree | Any, *, kind: str = None, deep=None, data_id=None, node_id=None, ): """Append a new subnode. This is a shortcut for :meth:`add_child` with ``before=None``. """ return self.add_child( child, kind=kind, before=None, deep=deep, data_id=data_id, node_id=node_id, )
[docs] def prepend_child( self, child: TypedNode | TypedTree | Any, *, kind: str = None, deep=None, data_id=None, node_id=None, ): """Prepend a new subnode. This is a shortcut for :meth:`add_child` with ``before=True``. """ return self.add_child( child, kind=kind, before=self.first_child(), deep=deep, data_id=data_id, node_id=node_id, )
[docs] def prepend_sibling( self, child: TypedNode | TypedTree | Any, *, deep=None, data_id=None, node_id=None, ) -> TypedNode: """Add a new node **of same kind** before `self`. This method calls :meth:`add_child` on ``self.parent``. """ return self._parent.add_child( child, before=self, deep=deep, data_id=data_id, node_id=node_id )
[docs] def append_sibling( self, child: TypedNode | TypedTree | Any, *, deep=None, data_id=None, node_id=None, ) -> TypedNode: """Add a new node **of same kind** after `self`. This method calls :meth:`add_child` on ``self.parent``. """ next_node = self.next_sibling return self._parent.add_child( child, before=next_node, deep=deep, data_id=data_id, node_id=node_id )
[docs] def move_to( self, new_parent: TypedNode | TypedTree, *, before: TypedNode | bool | int | None = None, ): """Move this node before or after `new_parent`.""" raise NotImplementedError
# def remove(self, *, keep_children=False, with_clones=False) -> None: # """Remove this node. # If `keep_children` is true, all children will be moved one level up, # so they become siblings, before this node is removed. # If `with_clones` is true, all nodes that reference the same data # instance are removed as well. # """ # raise NotImplementedError # def remove_children(self, kind: Union[str, ANY_KIND]): # """Remove all children of this node, making it a leaf node.""" # raise NotImplementedError
[docs] def copy(self, *, add_self=True, predicate=None) -> TypedTree: """Return a new :class:`~nutree.typed_tree.TypedTree` instance from this branch. See also :meth:`_add_from` and :ref:`iteration-callbacks`. """ return super().copy(add_self=add_self, predicate=predicate)
[docs] def filtered(self, predicate: PredicateCallbackType) -> TypedTree: """Return a filtered copy of this node and descendants as tree. See also :ref:`iteration-callbacks`. """ return super().filtered(predicate=predicate)
[docs] def iterator( self, method=IterMethod.PRE_ORDER, *, add_self=False ) -> Iterator[Node]: """Generator that walks the hierarchy.""" return super().iterator(method=method, add_self=add_self)
#: Implement ``for subnode in node: ...`` syntax to iterate descendant nodes. __iter__ = iterator # def sort_children(self, *, key=None, reverse=False, deep=False): # """Sort child nodes. # `key` defaults to ``attrgetter("name")``, so children are sorted by # their string representation. # """ # cl = self._children # if not cl or len(cl) == 1 and not deep: # return # if key is None: # key = attrgetter("name") # cl.sort(key=key, reverse=reverse) # if deep: # for c in cl: # c.sort_children(key=key, reverse=reverse, deep=True) # return # def _get_prefix(self, style, lstrip): # s0, s1, s2, s3 = style # parts = [] # depth = 0 # for p in self.get_parent_list(): # depth += 1 # if depth <= lstrip: # continue # if p.is_last_sibling(): # parts.append(s0) # " " # else: # parts.append(s1) # " | " # if depth >= lstrip: # if self.is_last_sibling(): # parts.append(s2) # " ╰─ " # else: # parts.append(s3) # " ├─ " # return "".join(parts) # def _render_lines(self, *, repr=None, style=None, add_self=True): # if type(style) not in (list, tuple): # try: # style = CONNECTORS[style or self.tree.DEFAULT_CONNECTOR_STYLE] # except KeyError: # raise ValueError( # f"Invalid style '{style}'. " # f"Expected: {'|'.join(CONNECTORS.keys())}" # ) # if repr is None: # repr = DEFAULT_NAME_REPR # # Find out if we need to strip some of the leftmost prefixes. # # If this was called for a normal node, we strip all parent levels # # (and also the own prefix when `add_self` is false). # # If this was called for the system root node, we do the same, but we # # never render self, because the the title is rendered by the caller. # lstrip = self.depth() # if not add_self: # lstrip += 1 # if not self._parent: # add_self = False # for n in self.iterator(add_self=add_self): # prefix = n._get_prefix(style, lstrip) # if callable(repr): # s = repr(n) # else: # s = repr.format(node=n) # yield prefix + s # return # def to_dict(self, *, mapper: MapperCallbackType = None) -> Dict: # """Return a nested dict of this node and its children.""" # res = { # "data": str(, # } # # Add custom data_id if any # # data_id = hash(self._data) # data_id = self._tree.calc_data_id(self._data) # if data_id != self._data_id: # res["data_id"] = data_id # res = call_mapper(mapper, self, res) # # if mapper: # # res = mapper(self, res) # if self._children: # res["children"] = cl = [] # for n in self._children: # cl.append(n.to_dict(mapper=mapper)) # return res @classmethod def _make_list_entry(cls, node: TypedNode) -> dict: node_data = node._data # is_custom_id = node._data_id != hash(node_data) if type(node_data) is str: # Node._make_list_entry() would return a plain str, but we always # need a dict data = { "str": node_data, } else: data = Node._make_list_entry(node) if node.kind != ANY_KIND: data["kind"] = node.kind return data
[docs] def to_dot( self, *, add_self=False, unique_nodes=True, graph_attrs: dict = None, node_attrs: dict = None, edge_attrs: dict = None, node_mapper: MapperCallbackType = None, edge_mapper: MapperCallbackType = None, ) -> Iterator[str]: """Generate a DOT formatted graph representation. See :ref:`graphs` for details. """ # TypedNodes can provide labelled edges: def _edge_mapper(node, data): data["label"] = node.kind if edge_mapper: return edge_mapper(node, data) res = super().to_dot( add_self=add_self, unique_nodes=unique_nodes, graph_attrs=graph_attrs, node_attrs=node_attrs, edge_attrs=edge_attrs, node_mapper=node_mapper, edge_mapper=_edge_mapper, ) return res
# ------------------------------------------------------------------------------ # - TypedTree # ------------------------------------------------------------------------------
[docs] class TypedTree(Tree): """ A special tree variant, derived from :class:`~nutree.tree.Tree`, that uses :class:`~nutree.typed_tree.TypedNode` objects, which maintain an addition `kind` attribute. See :ref:`typed-tree` for details. """ #: Default value for ``key_map`` argument when saving DEFAULT_KEY_MAP = {"data_id": "i", "str": "s", "kind": "k"} #: Default value for ``value_map`` argument when saving DEFAULT_VALUE_MAP = {} # expands to { "kind": [<distinct `kind` values>] } #: Default value for ``add_child`` when loading. DEFAULT_CHILD_TYPE = "child" def __init__( self, name: str | None = None, *, factory: NodeFactoryType | None = None, calc_data_id: CalcIdCallbackType | None = None, shadow_attrs: bool = False, ): if factory is None: factory = TypedNode super().__init__( name, factory=factory, calc_data_id=calc_data_id, shadow_attrs=shadow_attrs ) self._root = _SystemRootTypedNode(self) def __getitem__(self, data: object) -> TypedNode: return super().__getitem__(data)
[docs] @staticmethod def deserialize_mapper(parent: Node, data: dict) -> str | object | None: """Used as default `mapper` argument for :meth:`load`.""" if "str" in data and len(data) <= 2: # This can happen if the source was generated without a # serialization mapper, for a TypedTree that has pure str nodes return data["str"] raise NotImplementedError( f"Override this method or pass a mapper callback to evaluate {data}." )
[docs] def add_child( self, child: TypedNode | TypedTree | Any, *, kind: str = None, before: TypedNode | bool | int | None = None, deep: bool = None, data_id=None, node_id=None, ) -> TypedNode: """Add a toplevel node. See Node's :meth:`~nutree.node.Node.add_child` method for details. """ return self._root.add_child( child, kind=kind, before=before, deep=deep, data_id=data_id, node_id=node_id, )
#: Alias for :meth:`add_child` add = add_child # Must re-bind here
[docs] def first_child(self, kind: str | ANY_KIND) -> TypedNode | None: """Return the first toplevel node.""" return self._root.first_child(kind=kind)
[docs] def last_child(self, kind: str | ANY_KIND) -> TypedNode | None: """Return the last toplevel node.""" return self._root.last_child(kind=kind)
[docs] def iter_by_type(self, kind: str | ANY_KIND) -> Iterator[TypedNode]: if kind == ANY_KIND: return self.iterator() for n in self.iterator(): if n._kind == kind: yield n return
[docs] def save( self, target: IO[str] | str | Path, *, mapper: SerializeMapperType | None = None, meta: dict | None = None, key_map: KeyMapType | bool = True, value_map: ValueMapType | bool = True, ) -> None: """Store tree in a compact JSON file stream. See also :ref:`serialize` and :meth:`to_list_iter` and :meth:`load` methods. """ # TypedTrees can assume reasaonable defaults for key_map and value_map # (key_map is evaluated in base class from TypedTree.DEFAULT_KEY_MAP) # print("value_map ", value_map) if value_map is True or isinstance(value_map, dict): if value_map is True: value_map = self.DEFAULT_VALUE_MAP.copy() if "kind" not in value_map: counter = Counter() for n in self: counter[n.kind] += 1 value_map.update({"kind": list(counter.keys())}) # print("value_map -> ", value_map) else: assert value_map is False, value_map return super().save( target, mapper=mapper, meta=meta, key_map=key_map, value_map=value_map, )
@classmethod def _from_list( cls, obj: list[dict], *, mapper: DeserializeMapperType | None = None ) -> TypedTree: tree = cls() if mapper is None: mapper = cls.deserialize_mapper # System root has index #0: node_idx_map = {0: tree._root} # Start reading data lines starting at index #1: for idx, (parent_idx, data) in enumerate(obj, 1): parent = node_idx_map[parent_idx] # print(idx, parent_idx, data, parent) if type(data) is str: # This can only happen if the source was generated by a plain Tree n = parent.add(data, kind=cls.DEFAULT_CHILD_TYPE) elif type(data) is int: first_clone = node_idx_map[data] n = parent.add( first_clone, kind=first_clone.kind, data_id=first_clone.data_id ) else: kind = data.get("kind", cls.DEFAULT_CHILD_TYPE) data_id = data.get("data_id") data_obj = call_mapper(mapper, parent, data) n = parent.add(data_obj, kind=kind, data_id=data_id) # elif isinstance(data, dict) and "str" in data: # # This can happen if the source was generated without a # # serialization mapper, for a TypedTree that has str nodes # n = parent.add(data["str"], kind=data.get("kind")) # else: # raise RuntimeError(f"Need mapper for {data}") # pragma: no cover node_idx_map[idx] = n return tree
[docs] @classmethod def load( cls, target: IO[str] | str | Path, *, mapper: DeserializeMapperType | None = None, file_meta: dict = None, ) -> TypedTree: """Create a new :class:`TypedTree` instance from a JSON file stream. See also Tree's :meth:`` and :meth:`~nutree.tree.Tree.load()` methods. """ return super().load(target, mapper=mapper, file_meta=file_meta)
# ------------------------------------------------------------------------------ # - _SystemRootTypedNode # ------------------------------------------------------------------------------ class _SystemRootTypedNode(TypedNode): """Invisible system root node.""" def __init__(self, tree: TypedTree) -> None: self._tree: TypedTree = tree self._parent = None self._node_id = self._data_id = ROOT_ID self._data = self._children = [] self._meta = None self._kind = None