Source code for nutree.tree_generator

"""
Implements a generator that creates a random tree structure from a specification.

See :ref:`randomize` for details.
"""

from __future__ import annotations

import random
import sys
from abc import ABC, abstractmethod
from collections.abc import Sequence
from datetime import date, datetime, timedelta, timezone
from typing import Any, Union

from nutree.common import DictWrapper
from nutree.node import Node
from nutree.tree import Tree
from nutree.typed_tree import TypedNode

try:
    from fabulist import Fabulist

    fab = Fabulist()
except ImportError:  # pragma: no cover
    # We run without fabulist (with reduced functionality in this case)
    Fabulist = None
    fab = None

# TTree = TypeVar("TTree", bound=Tree)


# ------------------------------------------------------------------------------
# Randomizers
# ------------------------------------------------------------------------------
[docs] class Randomizer(ABC): """ Abstract base class for randomizers. Args: probability (float, optional): The probability of using the randomizer. Must be in the range [0.0, 1.0]. Defaults to 1.0. """ def __init__(self, *, probability: float = 1.0) -> None: assert isinstance(probability, float) and 0.0 <= probability <= 1.0, ( f"probality must be in the range [0.0 .. 1.0]: {probability}" ) self.probability = probability def _skip_value(self) -> bool: use = self.probability == 1.0 or random.random() <= self.probability return not use
[docs] @abstractmethod def generate(self) -> Any: ...
[docs] class RangeRandomizer(Randomizer): """ A randomizer class that generates random values within a specified range. Args: min_val (float| int): The minimum value of the range. max_val (float| int): The maximum value of the range. probability (float, optional): The probability of generating a value. Defaults to 1.0. none_value (Any, optional): The value to return when skipping generation. Defaults to None. Returns: Union[float, int, None]: The generated random value, or none_value if generation is skipped. """ """""" def __init__( self, min_val: float | int, max_val: float | int, *, probability: float = 1.0, none_value: Any = None, ) -> None: super().__init__(probability=probability) assert type(min_val) is type(max_val), ( f"min_val and max_val must be of the same type: {min_val}, {max_val}" ) self.is_float = isinstance(min_val, float) self.min = min_val self.max = max_val self.none_value = none_value assert self.max > self.min
[docs] def generate(self) -> Union[float, int, Any, None]: if self._skip_value(): return self.none_value if self.is_float: return random.uniform(self.min, self.max) return random.randrange(self.min, self.max) # type: ignore
[docs] class DateRangeRandomizer(Randomizer): """ A randomizer class that generates random dates within a specified range. Args: min_dt (date): The minimum date of the range. max_dt (date | int): The maximum date of the range. Pass an integer to specify the number of days from min_dt. as_js_stamp (bool, optional): If True, return the date as a JavaScript timestamp. Defaults to True. probability (float, optional): The probability of generating a value. Defaults to 1.0. Examples: >>> DateRangeRandomizer(date(2020, 1, 1), date(2020, 12, 31)).generate() datetime.date(2020, 3, 7) >>> DateRangeRandomizer(date(2020, 1, 1), 365).generate() """ def __init__( self, min_dt: date, max_dt: date | int, *, as_js_stamp=True, probability: float = 1.0, ) -> None: super().__init__(probability=probability) assert isinstance(min_dt, date), f"min_dt must be a date: {min_dt}" assert isinstance(max_dt, (date, int)), ( f"max_dt must be a date or int: {max_dt}" ) if isinstance(max_dt, int): self.delta_days = max_dt max_dt = min_dt + timedelta(days=self.delta_days) else: self.delta_days = (max_dt - min_dt).days assert max_dt > min_dt, ( f"max_dt must be greater than min_dt: {min_dt}, {max_dt}" ) self.min = min_dt self.max = max_dt self.as_js_stamp = as_js_stamp
[docs] def generate(self) -> Union[date, float, None]: # print(self.min, self.max, self.delta_days, self.probability) if self._skip_value(): return None res = self.min + timedelta(days=random.randrange(self.delta_days)) if self.as_js_stamp: ONE_DAY_SEC = 24 * 60 * 60 dt = datetime(res.year, res.month, res.day) dt_utc = dt.replace(tzinfo=timezone.utc) stamp_ms = (dt_utc.timestamp() + ONE_DAY_SEC) * 1000.0 return stamp_ms return res
[docs] class ValueRandomizer(Randomizer): """ A randomizer class that generates a fixed value with a given probability. Args: value (Any): The value to generate. probability (float): The probability of generating a value [0.0 .. 1.0]. """ def __init__(self, value: Any, *, probability: float) -> None: super().__init__(probability=probability) self.value = value
[docs] def generate(self) -> Any: if self._skip_value(): return return self.value
[docs] class SparseBoolRandomizer(ValueRandomizer): """ A randomizer class that generates a boolean value with a given probability. If the value is False, it is returned as None. """ def __init__(self, *, probability: float) -> None: super().__init__(True, probability=probability)
[docs] class SampleRandomizer(Randomizer): """ A randomizer class that generates a random value from a sample list. """ def __init__( self, sample_list: Sequence, *, counts=None, probability: float = 1.0 ) -> None: super().__init__(probability=probability) self.sample_list = sample_list # TODO: remove this when support for Python 3.8 is removed if sys.version_info < (3, 9) and counts: # pragma: no cover raise RuntimeError("counts argument requires Python 3.9 or later.") self.counts = counts
[docs] def generate(self) -> Any: if self._skip_value(): return # TODO: remove this when support for Python 3.8 is removed if sys.version_info < (3, 9) and not self.counts: # pragma: no cover return random.sample(self.sample_list, 1)[0] return random.sample(self.sample_list, 1, counts=self.counts)[0]
# class BoolRandomizer(SampleRandomizer): # def __init__(self, *, allow_none: bool = False) -> None: # if allow_none: # super().__init__((True, False, None)) # else: # super().__init__((True, False))
[docs] class TextRandomizer(Randomizer): """ A randomizer class that generates a random string value from a Fabulist template. Uses the [`fabulist`](https://github.com/mar10/fabulist/) library to generate text values. Args: template (str | list): A template string or list of strings. probability (float, optional): The probability of generating a value. Defaults to 1.0. """ def __init__(self, template: str | list[str], *, probability: float = 1.0) -> None: super().__init__(probability=probability) if not fab: # pragma: no cover raise RuntimeError("Need fabulist installed to generate random text.") self.template = template
[docs] def generate(self) -> Any: if self._skip_value(): return return fab.get_quote(self.template) # type: ignore[reportOptionalMemberAccess]
[docs] class BlindTextRandomizer(Randomizer): """ A randomizer class that generates a random lorem ipsum text value from a template. Uses the [`fabulist`](https://github.com/mar10/fabulist/) library to generate text values. Args: sentence_count (int | tuple, optional): The number of sentences to generate. Defaults to (2, 6). dialect (str, optional): The dialect of the text. Defaults to "ipsum". entropy (int, optional): The entropy of the text. Defaults to 2. keep_first (bool, optional): If True, keep the first sentence. Defaults to False. words_per_sentence (int | tuple, optional): The number of words per sentence. Defaults to (3, 15). probability (float, optional): The probability of generating a value. Defaults to 1.0. """ def __init__( self, *, sentence_count: int | tuple = (2, 6), dialect: str = "ipsum", entropy: int = 2, keep_first: bool = False, words_per_sentence: int | tuple = (3, 15), probability: float = 1.0, ) -> None: super().__init__(probability=probability) if not fab: # pragma: no cover raise RuntimeError("Need fabulist installed to generate random text.") self.sentence_count = sentence_count self.dialect = dialect self.entropy = entropy self.keep_first = keep_first self.words_per_sentence = words_per_sentence
[docs] def generate(self) -> Any: if self._skip_value(): return return fab.get_lorem_paragraph( # type: ignore[reportOptionalMemberAccess] sentence_count=self.sentence_count, dialect=self.dialect, entropy=self.entropy, keep_first=self.keep_first, words_per_sentence=self.words_per_sentence, )
def _resolve_random(val: Any) -> Any: if isinstance(val, Randomizer): return val.generate() return val def _resolve_random_dict(d: dict, *, macros: dict | None = None) -> None: remove = [] for key in d.keys(): val = d[key] if isinstance(val, Randomizer): val = val.generate() if val is None: # Skip due to probability remove.append(key) else: d[key] = val if macros and isinstance(val, str): d[key] = val.format(**macros) for key in remove: d.pop(key) return # ------------------------------------------------------------------------------ # Tree Builder # ------------------------------------------------------------------------------ def _merge_specs(node_type: str, spec: dict, types: dict) -> dict: res: dict = types.get("*", {}).copy() res.update(types.get(node_type, {})) res.update(spec) return res def _make_tree( *, parent_node: Node, parent_type: str, types: dict, relations: dict, prefix: str, ): child_specs = relations[parent_type] for node_type, spec in child_specs.items(): spec = _merge_specs(node_type, spec, types) count = spec.pop(":count", 1) count = _resolve_random(count) or 0 callback = spec.pop(":callback", None) factory = spec.pop(":factory", DictWrapper) for i in range(count): i += 1 # 1-based p = f"{prefix}.{i}" if prefix else f"{i}" # Resolve `Randomizer` values and expand `{idx}` and `{hier_idx}` macros data = spec.copy() _resolve_random_dict(data, macros={"idx": i, "hier_idx": p}) if callback: callback(data) node_data: Node = factory(**data) if isinstance(parent_node, TypedNode): node: Node = parent_node.add_child(node_data, kind=node_type) else: node = parent_node.add_child(node_data) # Generate child relations if node_type in relations: _make_tree( parent_node=node, parent_type=node_type, types=types, relations=relations, prefix=p, ) return
[docs] def build_random_tree(*, tree_class: type[Tree[Any, Any]], structure_def: dict) -> Tree: """ Return a nutree.TypedTree with random data from a specification. See :ref:`randomize` for details. """ structure_def = structure_def.copy() name = structure_def.pop("name", None) types = structure_def.pop("types", {}) relations = structure_def.pop("relations") # mandatory assert not structure_def, f"found extra data: {structure_def}" assert "__root__" in relations, "missing '__root__' relation" tree: Tree = tree_class( name=name, forward_attrs=True, ) _make_tree( parent_node=tree.system_root, parent_type="__root__", types=types, relations=relations, prefix="", ) return tree