"""Dictionary blender and configuration file formats.
.. testsetup::
from nitpick.generic import *
"""
from __future__ import annotations
import abc
import json
import re
import shlex
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
import dictdiffer
import jmespath
import toml
import tomlkit
from attr import define # type: ignore[attr-defined]
from autorepr import autorepr
from flatten_dict import flatten, unflatten
from ruamel.yaml import YAML, RoundTripRepresenter, StringIO
from sortedcontainers import SortedDict
from tomlkit import items
from nitpick.typedefs import ElementData, JsonDict, ListOrCommentedSeq, PathOrStr, YamlObject, YamlValue
if TYPE_CHECKING:
from jmespath.parser import ParsedResult
from nitpick.config import SpecialConfig
# Generic type for classes that inherit from BaseDoc
TBaseDoc = TypeVar("TBaseDoc", bound="BaseDoc")
SINGLE_QUOTE = "'"
DOUBLE_QUOTE = '"'
SEPARATOR_DOT = "."
SEPARATOR_COMMA = ","
SEPARATOR_COLON = ":"
SEPARATOR_SPACE = " "
#: Special unique separator for :py:meth:`flatten()` and :py:meth:`unflatten()`,
# to avoid collision with existing key values (e.g. the default SEPARATOR_DOT separator "." can be part of a TOML key).
SEPARATOR_FLATTEN = "$#@"
#: Special unique separator for :py:meth:`nitpick.blender.quoted_split()`.
SEPARATOR_QUOTED_SPLIT = "#$@"
[docs]def compare_lists_with_dictdiffer(
actual: list | dict, expected: list | dict, *, return_list: bool = True
) -> list | dict:
"""Compare two lists using dictdiffer."""
additions_and_changes = [change for change in dictdiffer.diff(actual, expected) if change[0] != "remove"]
if not additions_and_changes:
return []
try:
changed_dict = dictdiffer.patch(additions_and_changes, {})
except KeyError:
return expected
if return_list:
return list(changed_dict.values())
return changed_dict
[docs]def search_json(json_data: ElementData, jmespath_expression: ParsedResult | str, default: Any | None = None) -> Any:
"""Search a dictionary or list using a JMESPath expression.
Return a default value if not found.
>>> data = {"root": {"app": [1, 2], "test": "something"}}
>>> search_json(data, "root.app", None)
[1, 2]
>>> search_json(data, "root.test", None)
'something'
>>> search_json(data, "root.unknown", "")
''
>>> search_json(data, "root.unknown", None)
>>> search_json(data, "root.unknown")
>>> search_json(data, jmespath.compile("root.app"), [])
[1, 2]
>>> search_json(data, jmespath.compile("root.whatever"), "xxx")
'xxx'
>>> search_json(data, "")
>>> search_json(data, None)
:param jmespath_expression: A compiled JMESPath expression or a string with an expression.
:param json_data: The dictionary to be searched.
:param default: Default value in case nothing is found.
:return: The object that was found or the default value.
"""
if not jmespath_expression:
return default
if isinstance(jmespath_expression, str):
rv = jmespath.search(jmespath_expression, json_data)
else:
rv = jmespath_expression.search(json_data)
return rv or default
[docs]@define
class ElementDetail: # pylint: disable=too-few-public-methods
"""Detailed information about an element of a list."""
data: ElementData
key: str | list[str]
index: int
scalar: bool
compact: str
@property
def cast_to_dict(self) -> JsonDict:
"""Data cast to dict, for mypy."""
return cast(JsonDict, self.data)
[docs] @classmethod
def from_data(cls, index: int, data: ElementData, jmes_key: str) -> ElementDetail:
"""Create an element detail from dict data."""
if isinstance(data, (list, dict)):
scalar = False
compact = json.dumps(data, sort_keys=True, separators=(SEPARATOR_COMMA, SEPARATOR_COLON))
key = search_json(data, jmes_key)
if not key:
key = compact
else:
scalar = True
key = compact = str(data)
return ElementDetail(data=data, key=key, index=index, scalar=scalar, compact=compact)
[docs]@define
class ListDetail: # pylint: disable=too-few-public-methods
"""Detailed info about a list."""
data: ListOrCommentedSeq
elements: list[ElementDetail]
[docs] @classmethod
def from_data(cls, data: ListOrCommentedSeq, jmes_key: str) -> ListDetail:
"""Create a list detail from list data."""
return ListDetail(
data=data, elements=[ElementDetail.from_data(index, data, jmes_key) for index, data in enumerate(data)]
)
[docs] def find_by_key(self, desired: ElementDetail) -> ElementDetail | None:
"""Find an element by key."""
for actual in self.elements:
if isinstance(desired.key, list):
if set(desired.key).issubset(set(actual.key)):
return actual
elif desired.key == actual.key:
return actual
return None
[docs]def set_key_if_not_empty(dict_: JsonDict, key: str, value: Any) -> None:
"""Update the dict if the value is valid."""
if not value:
return
dict_[key] = value
[docs]def quoted_split(string_: str, separator=SEPARATOR_DOT) -> list[str]:
"""Split a string by a separator, but considering quoted parts (single or double quotes).
>>> quoted_split("my.key.without.quotes")
['my', 'key', 'without', 'quotes']
>>> quoted_split('"double.quoted.string"')
['double.quoted.string']
>>> quoted_split('"double.quoted.string".and.after')
['double.quoted.string', 'and', 'after']
>>> quoted_split('something.before."double.quoted.string"')
['something', 'before', 'double.quoted.string']
>>> quoted_split("'single.quoted.string'")
['single.quoted.string']
>>> quoted_split("'single.quoted.string'.and.after")
['single.quoted.string', 'and', 'after']
>>> quoted_split("something.before.'single.quoted.string'")
['something', 'before', 'single.quoted.string']
"""
if DOUBLE_QUOTE not in string_ and SINGLE_QUOTE not in string_:
return string_.split(separator)
quoted_regex = re.compile(
f"([{SINGLE_QUOTE}{DOUBLE_QUOTE}][^{SINGLE_QUOTE}{DOUBLE_QUOTE}]+[{SINGLE_QUOTE}{DOUBLE_QUOTE}])"
)
def remove_quotes(match):
return match.group(0).strip(f"{SINGLE_QUOTE}{DOUBLE_QUOTE}").replace(separator, SEPARATOR_QUOTED_SPLIT)
return [
part.replace(SEPARATOR_QUOTED_SPLIT, separator)
for part in quoted_regex.sub(remove_quotes, string_).split(separator)
]
[docs]def quote_if_dotted(key: str) -> str:
"""Quote the key if it has a dot."""
if not isinstance(key, str):
return key
if SEPARATOR_DOT in key and DOUBLE_QUOTE not in key:
return f"{DOUBLE_QUOTE}{key}{DOUBLE_QUOTE}"
return key
[docs]def quote_reducer(separator: str) -> Callable:
"""Reducer used to unflatten dicts.
Quote keys when they have dots.
"""
def _inner_quote_reducer(key1: str | None, key2: str) -> str:
if key1 is None:
return quote_if_dotted(key2)
return f"{key1}{separator}{quote_if_dotted(key2)}"
return _inner_quote_reducer
[docs]def quotes_splitter(flat_key: str) -> tuple[str, ...]:
"""Split keys keeping quoted strings together."""
return tuple(
piece.replace(SEPARATOR_SPACE, SEPARATOR_DOT) if SEPARATOR_SPACE in piece else piece
for piece in shlex.split(flat_key.replace(SEPARATOR_DOT, SEPARATOR_SPACE))
)
[docs]def custom_reducer(separator: str) -> Callable:
"""Custom reducer for :py:meth:`flatten_dict.flatten_dict.flatten()` accepting a separator."""
def _inner_custom_reducer(key1, key2):
if key1 is None:
return key2
return f"{key1}{separator}{key2}"
return _inner_custom_reducer
[docs]def custom_splitter(separator: str) -> Callable:
"""Custom splitter for :py:meth:`flatten_dict.flatten_dict.unflatten()` accepting a separator."""
def _inner_custom_splitter(flat_key) -> tuple[str, ...]:
"""Return a tuple of keys split by the separator."""
return tuple(flat_key.split(separator))
return _inner_custom_splitter
# TODO: refactor: use only tomlkit and remove uiri/toml
# - tomlkit preserves comments
# - uiri/toml looks abandoned https://github.com/uiri/toml/issues/361
# Code to be used with tomlkit when merging styles
# merged_dict = unflatten(self._merged_styles, toml_style_splitter)
# def toml_style_splitter(flat_key: str) -> Tuple[str, ...]:
# """Splitter for TOML style files, in an attempt to remove empty TOML tables."""
# original = flat_key.split(SEPARATOR_FLATTEN)
# quoted = [quote_if_dotted(k) for k in original]
#
# first = quoted.pop(0)
# last = quoted.pop() if quoted else None
#
# grouped = [first]
# if quoted:
# grouped.append(SEPARATOR_DOT.join(quoted))
# if last:
# grouped.append(last)
# return tuple(grouped)
[docs]def flatten_quotes(dict_: JsonDict, separator=SEPARATOR_DOT) -> JsonDict:
"""Flatten a dict keeping quotes in keys."""
dict_with_quoted_keys = flatten(dict_, reducer=quote_reducer(separator))
clean_dict = {}
for key, value in dict_with_quoted_keys.items(): # type: str, Any
key_with_stripped_ends = key.strip(DOUBLE_QUOTE)
if key_with_stripped_ends.count(DOUBLE_QUOTE):
# Key has quotes in the middle; keep all quotes
clean_dict[key] = value
else:
# Key only has quotes in the beginning and end; remove quotes
clean_dict[key_with_stripped_ends] = value
return clean_dict
unflatten_quotes = partial(unflatten, splitter=quotes_splitter)
[docs]class Comparison:
"""A comparison between two dictionaries, computing missing items and differences."""
def __init__(self, actual: TBaseDoc, expected: JsonDict, special_config: SpecialConfig) -> None:
self.flat_actual = flatten_quotes(actual.as_object)
self.flat_expected = flatten_quotes(expected)
self.doc_class = actual.__class__
self.missing_dict: JsonDict = {}
self.diff_dict: JsonDict = {}
self.replace_dict: JsonDict = {}
self.special_config = special_config
@property
def missing(self) -> TBaseDoc | None:
"""Missing data."""
if not self.missing_dict:
return None
return self.doc_class(obj=(unflatten_quotes(self.missing_dict)))
@property
def diff(self) -> TBaseDoc | None:
"""Different data."""
if not self.diff_dict:
return None
return self.doc_class(obj=(unflatten_quotes(self.diff_dict)))
@property
def replace(self) -> TBaseDoc | None:
"""Data to be replaced."""
if not self.replace_dict:
return None
return self.doc_class(obj=unflatten_quotes(self.replace_dict))
@property
def has_changes(self) -> bool:
"""Return True is there is a difference or something missing."""
return bool(self.missing or self.diff or self.replace)
def __call__(self) -> Comparison:
"""Compare two flattened dictionaries and compute missing and different items."""
if self.flat_expected.items() <= self.flat_actual.items():
return self
for key, expected_value in self.flat_expected.items():
if key not in self.flat_actual:
self.missing_dict[key] = expected_value
self.replace_dict[key] = expected_value
continue
actual = self.flat_actual[key]
if isinstance(expected_value, list):
list_keys = self.special_config.list_keys.value.get(key, "")
if SEPARATOR_DOT in list_keys:
parent_key, child_key = list_keys.rsplit(SEPARATOR_DOT, 1)
jmes_key = f"{parent_key}[].{child_key}"
else:
parent_key = ""
child_key = list_keys
jmes_key = child_key
self._compare_list_elements(
key,
parent_key,
child_key,
ListDetail.from_data(actual, jmes_key),
ListDetail.from_data(expected_value, jmes_key),
)
elif expected_value != actual:
set_key_if_not_empty(self.diff_dict, key, expected_value)
return self
def _compare_list_elements( # pylint: disable=too-many-arguments # noqa: PLR0913
self, key: str, parent_key: str, child_key: str, actual_detail: ListDetail, expected_detail: ListDetail
) -> None:
"""Compare list elements by their keys or hashes."""
display = []
replace = actual_detail.data.copy()
for expected_element in expected_detail.elements:
actual_element = actual_detail.find_by_key(expected_element)
if not actual_element:
display.append(expected_element.data)
replace.append(expected_element.data)
continue
if parent_key:
new_block: JsonDict = self._compare_children(parent_key, child_key, actual_element, expected_element)
if new_block:
display.append(expected_element.data)
replace[actual_element.index] = new_block
continue
diff = compare_lists_with_dictdiffer(
actual_element.cast_to_dict, expected_element.cast_to_dict, return_list=False
)
if diff:
new_block = cast(JsonDict, actual_element.data).copy()
new_block.update(diff)
display.append(new_block)
replace[actual_element.index] = new_block
if display:
set_key_if_not_empty(self.missing_dict, key, display)
set_key_if_not_empty(self.replace_dict, key, replace)
@staticmethod
def _compare_children(
parent_key: str, child_key: str, actual_element: ElementDetail, expected_element: ElementDetail
) -> JsonDict:
"""Compare children of a JSON dict, return only the inner difference.
E.g.: a pre-commit hook ID with different args will return a JSON only with the specific hook,
not with all the hooks of the parent repo.
"""
new_nested_block: JsonDict = {}
jmes_nested = f"{parent_key}[?{child_key}=='{expected_element.key[0]}']"
actual_nested = search_json(actual_element.data, jmes_nested, [])
expected_nested = search_json(expected_element.data, jmes_nested, [{}])
diff_nested = compare_lists_with_dictdiffer(actual_nested, expected_nested, return_list=True)
if diff_nested:
actual_data = cast(JsonDict, actual_element.data)
expected_data = cast(JsonDict, expected_element.data)
# TODO: fix: set value deep down the tree (try dpath-python). parent_key = 'regions[].cities[].people'
expected_data[parent_key] = diff_nested
new_nested_block = actual_data.copy()
for nested_index, obj in enumerate(actual_data[parent_key]):
if obj == actual_nested[0]:
new_nested_block[parent_key][nested_index] = diff_nested[0]
break
return new_nested_block
[docs]class BaseDoc(metaclass=abc.ABCMeta):
"""Base class for configuration file formats.
:param path: Path of the config file to be loaded.
:param string: Config in string format.
:param obj: Config object (Python dict, YamlDoc, TomlDoc instances).
"""
__repr__ = autorepr(["path"])
def __init__(
self, *, path: PathOrStr | None = None, string: str | None = None, obj: JsonDict | None = None
) -> None:
self.path = path
self._string = string
self._object = obj
self._reformatted: str | None = None
[docs] @abc.abstractmethod
def load(self) -> bool:
"""Load the configuration from a file, a string or a dict."""
@property
def as_string(self) -> str:
"""Contents of the file or the original string provided when the instance was created."""
return self._string or ""
@property
def as_object(self) -> dict:
"""String content converted to a Python object (dict, YAML object instance, etc.)."""
if self._object is None:
self.load()
return self._object or {}
@property
def reformatted(self) -> str:
"""Reformat the configuration dict as a new string (it might not match the original string/file contents)."""
if self._reformatted is None:
self.load()
return self._reformatted or ""
[docs]class InlineTableTomlDecoder(toml.TomlDecoder): # type: ignore[name-defined]
"""A hacky decoder to work around some bug (or unfinished work) in the Python TOML package.
https://github.com/uiri/toml/issues/362.
"""
[docs] def get_empty_inline_table(self):
"""Hackity hack for a crappy unmaintained package.
Total lack of respect, the guy doesn't even reply: https://github.com/uiri/toml/issues/361
"""
return self.get_empty_table()
[docs]class TomlDoc(BaseDoc):
"""TOML configuration format."""
# TODO: refactor: use only tomlkit and remove uiri/toml
# remove __init__() completely
def __init__(
self,
*,
path: PathOrStr | None = None,
string: str | None = None,
obj: JsonDict | None = None,
use_tomlkit=False,
) -> None:
super().__init__(path=path, string=string, obj=obj)
self.use_tomlkit = use_tomlkit
[docs] def load(self) -> bool:
"""Load a TOML file by its path, a string or a dict."""
if self.path is not None:
self._string = Path(self.path).read_text(encoding="UTF-8")
if self._string is not None:
# TODO: refactor: use only tomlkit and remove uiri/toml
# I tried to replace toml by tomlkit, but lots of tests break.
if self.use_tomlkit:
# TODO: refactor: use only tomlkit and remove uiri/toml
# Removing empty tables on loads() didn't work.
# The empty tables are gone, but:
# 1. the output has 2 blank lines at the top
# 2. the underlying dict is different than expected, and tests fail:
# 'NIP001 has an incorrect style. Invalid config:',
# '"pyproject.toml".tool.black: Unknown file. See '
# 'https://nitpick.rtfd.io/en/latest/plugins.html.']
# toml_obj = tomlkit.loads(self._string)
# if "tool.black" in self._string:
# from tomlkit.items import KeyType, SingleKey
#
# black_dict = toml_obj["pyproject.toml"]["tool"]["black"]
# toml_obj["pyproject.toml"].remove("tool")
# toml_obj.remove("pyproject.toml")
# toml_obj.add(SingleKey('"pyproject.toml".tool.black', KeyType.Bare), black_dict)
# result = tomlkit.dumps(toml_obj)
# print(result)
self._object = tomlkit.loads(self._string)
else:
self._object = toml.loads(self._string, decoder=InlineTableTomlDecoder(dict)) # type: ignore[call-arg,assignment]
if self._object is not None:
# TODO: fix: tomlkit.dumps() renders comments and I didn't find a way to turn this off,
# but comments are being lost when the TOML plugin does dict comparisons.
if self.use_tomlkit:
# TODO: refactor: use only tomlkit and remove uiri/toml
# Removing empty tables on dumps() didn't work.
# Another attempt would be to remove tables when dumping to TOML when setting self._reformatted:
# 1. load a dict normally with loads()
# 2. clean up TomlDocument and its empty tables recursively, reusing the code with SingleKey above
# 3. dump the cleaned TomlDocument
# It looks like some effort. I'll wait for https://github.com/sdispater/tomlkit/issues/166
# remove_empty_tables = unflatten(
# flatten(self._object, custom_reducer(SEPARATOR_FLATTEN)), toml_style_splitter
# )
self._reformatted = tomlkit.dumps(self._object, sort_keys=True)
else:
self._reformatted = toml.dumps(self._object)
return True
[docs]def traverse_toml_tree(document: tomlkit.TOMLDocument, dictionary):
"""Traverse a TOML document recursively and change values, keeping its formatting and comments."""
for key, value in dictionary.items():
if isinstance(value, (dict,)):
if key in document:
traverse_toml_tree(document[key], value)
else:
document[key] = value
else:
document[key] = value
[docs]class SensibleYAML(YAML):
"""YAML with sensible defaults but an inefficient dump to string.
`Output of dump() as a string <https://yaml.readthedocs.io/en/latest/example.html#output-of-dump-as-a-string>`_.
"""
def __init__(self) -> None:
super().__init__()
self.map_indent = 2
self.sequence_indent = 4
self.sequence_dash_offset = 2
self.preserve_quotes = True
[docs] def loads(self, string: str):
"""Load YAML from a string... that unusual use case in a world of files only."""
return self.load(StringIO(string))
[docs] def dumps(self, data) -> str:
"""Dump to a string... who would want such a thing? One can dump to a file or stdout."""
output = StringIO()
self.dump(data, output, transform=None)
return output.getvalue()
[docs]class YamlDoc(BaseDoc):
"""YAML configuration format."""
updater: SensibleYAML
[docs] def load(self) -> bool:
"""Load a YAML file by its path, a string or a dict."""
self.updater = SensibleYAML()
if self.path is not None:
self._string = Path(self.path).read_text(encoding="UTF-8")
if self._string is not None:
self._object = self.updater.loads(self._string)
if self._object is not None:
self._reformatted = self.updater.dumps(self._object)
return True
# Classes and their representation on ruamel.yaml
for dict_class in (SortedDict, items.Table, items.InlineTable):
RoundTripRepresenter.add_representer(dict_class, RoundTripRepresenter.represent_dict)
RoundTripRepresenter.add_representer(items.String, RoundTripRepresenter.represent_str)
for list_class in (items.Array, items.AoT):
RoundTripRepresenter.add_representer(list_class, RoundTripRepresenter.represent_list)
RoundTripRepresenter.add_representer(items.Integer, RoundTripRepresenter.represent_int)
[docs]def is_scalar(value: YamlValue) -> bool:
"""Return True if the value is NOT a dict or a list."""
return not isinstance(value, (list, dict))
[docs]def replace_or_add_list_element(yaml_obj: YamlObject, element: Any, key: str, index: int) -> None:
"""Replace or add a new element in a YAML sequence of mappings."""
current = yaml_obj
if key in yaml_obj:
current = yaml_obj[key]
insert: bool = index >= len(current)
if insert:
current.append(element)
return
if is_scalar(current[index]) or is_scalar(element):
# If the original object is scalar, replace it with whatever element;
# without traversing, even if it's a dict
current[index] = element
return
if isinstance(element, dict):
traverse_yaml_tree(current[index], element)
return
# At this point, value is probably a list. Set the whole list in YAML.
current[index] = element
return
[docs]def traverse_yaml_tree(yaml_obj: YamlObject, change: JsonDict):
"""Traverse a YAML document recursively and change values, keeping its formatting and comments."""
for key, value in change.items():
if key not in yaml_obj:
if isinstance(yaml_obj, dict):
yaml_obj[key] = value
else:
# Key doesn't exist: we can insert the whole nested dict at once, no regrets
last_pos = len(yaml_obj.keys()) + 1
yaml_obj.insert(last_pos, key, value)
continue
if isinstance(value, dict):
traverse_yaml_tree(yaml_obj[key], value)
elif isinstance(value, list):
for index, element in enumerate(value):
replace_or_add_list_element(yaml_obj, element, key, index)
else:
yaml_obj[key] = value
[docs]class JsonDoc(BaseDoc):
"""JSON configuration format."""
[docs] def load(self) -> bool:
"""Load a JSON file by its path, a string or a dict."""
if self.path is not None:
self._string = Path(self.path).read_text(encoding="UTF-8")
if self._string is not None:
self._object = flatten_quotes(json.loads(self._string))
if self._object is not None:
# Every file should end with a blank line
self._reformatted = json.dumps(self._object, sort_keys=True, indent=2) + "\n"
return True