Source code for nitpick.core

"""The Nitpick application and project-related utilities."""
from __future__ import annotations

import itertools
import os
from dataclasses import dataclass
from functools import lru_cache
from itertools import chain
from pathlib import Path
from typing import TYPE_CHECKING, Iterable, Iterator

import click
import tomlkit
from autorepr import autorepr
from identify import identify
from loguru import logger
from marshmallow_polyfield import PolyField
from more_itertools import always_iterable
from packaging.version import parse as parse_version
from pluggy import PluginManager
from tomlkit import items

from nitpick import fields, plugins, tomlkit_ext
from nitpick.blender import search_json
from nitpick.constants import (
from nitpick.exceptions import QuitComplainingError
from nitpick.generic import filter_names, glob_files, glob_non_ignored_files, relative_to_current_dir
from import FileInfo
from nitpick.schemas import BaseNitpickSchema, flatten_marshmallow_errors, help_message
from import (
from nitpick.violations import Fuss, ProjectViolations, Reporter, StyleViolations

    from nitpick.typedefs import JsonDict, PathOrStr

[docs]class Nitpick: """The Nitpick API.""" _allow_init = False project: Project def __init__(self) -> None: if not self._allow_init: msg = "This class cannot be instantiated directly. Use Nitpick.singleton().init(...) instead" raise TypeError(msg) self.offline: bool = False
[docs] @classmethod @lru_cache def singleton(cls) -> Nitpick: """Return a single instance of the class.""" Nitpick._allow_init = True instance = cls() Nitpick._allow_init = False return instance
[docs] def init(self, project_root: PathOrStr | None = None, offline: bool | None = None) -> Nitpick: """Initialize attributes of the singleton.""" self.project = Project(project_root) if offline is not None: self.offline = offline return self
[docs] def run(self, *partial_names: str, autofix=False) -> Iterator[Fuss]: """Run Nitpick. :param partial_names: Names of the files to enforce configs for. :param autofix: Flag to modify files, if the plugin supports it (default: True). :return: Fuss generator. """ Reporter.reset() try: yield from chain( self.project.merge_styles(self.offline), self.enforce_present_absent(*partial_names), self.enforce_style(*partial_names, autofix=autofix), ) except QuitComplainingError as err: yield from err.violations
[docs] def enforce_present_absent(self, *partial_names: str) -> Iterator[Fuss]: """Enforce files that should be present or absent. :param partial_names: Names of the files to enforce configs for. :return: Fuss generator. """ if not self.project: return for present in (True, False): key = "present" if present else "absent" logger.debug(f"Enforce {key} files") absent = not present file_mapping = self.project.nitpick_files_section.get(key, {}) for filename in filter_names(file_mapping, *partial_names): custom_message = file_mapping[filename] file_path: Path = self.project.root / filename exists = file_path.exists() if (present and exists) or (absent and not exists): continue reporter = Reporter(FileInfo.create(self.project, filename)) extra = f": {custom_message}" if custom_message else "" violation = ProjectViolations.MISSING_FILE if present else ProjectViolations.FILE_SHOULD_BE_DELETED yield reporter.make_fuss(violation, extra=extra)
[docs] def enforce_style(self, *partial_names: str, autofix=True) -> Iterator[Fuss]: """Read the merged style and enforce the rules in it. 1. Get all root keys from the merged style (every key is a filename, except "nitpick"). 2. For each file name, find the plugin(s) that can handle the file. :param partial_names: Names of the files to enforce configs for. :param autofix: Flag to modify files, if the plugin supports it (default: True). :return: Fuss generator. """ # 1. for config_key in filter_names(self.project.style_dict, *partial_names): config_dict = self.project.style_dict[config_key] logger.debug(f"{config_key}: Finding plugins to enforce style") # 2. info = FileInfo.create(self.project, config_key) # pylint: disable=no-member for plugin_class in self.project.plugin_manager.hook.can_handle(info=info): yield from plugin_class(info, config_dict, autofix).entry_point()
[docs] def configured_files(self, *partial_names: str) -> list[Path]: """List of files configured in the Nitpick style. Filter only the selected partial names. """ return [Path(self.project.root) / key for key in filter_names(self.project.style_dict, *partial_names)]
[docs] def echo(self, message: str): """Echo a message on the terminal, with the relative path at the beginning.""" relative = relative_to_current_dir(self.project.root) if relative: relative += os.path.sep click.echo(f"{relative}{message}")
[docs]def confirm_project_root(dir_: PathOrStr | None = None) -> Path: """Confirm this is the root dir of the project (the one that has one of the ``ROOT_FILES``).""" possible_root_dir = Path(dir_ or Path.cwd()).resolve() root_files = glob_files(possible_root_dir, ROOT_FILES) logger.debug(f"Root files found: {root_files}") if root_files: return next(iter(root_files)).parent logger.error(f"No root files found on directory {possible_root_dir}") raise QuitComplainingError(Reporter().make_fuss(ProjectViolations.NO_ROOT_DIR))
[docs]def find_main_python_file(root_dir: Path) -> Path: """Find the main Python file in the root dir, the one that will be used to report Flake8 warnings. The search order is: 1. Python files that belong to the root dir of the project (e.g.: ````, ````). 2. ````: they can be on the root or on a subdir (Django projects). 3. Any other ``*.py`` Python file on the root dir and subdir. This avoid long recursions when there is a ``node_modules`` subdir for instance. """ for the_file in itertools.chain( # 1. [root_dir / root_file for root_file in ROOT_PYTHON_FILES], # 2. root_dir.glob(f"*/{PYTHON_MANAGE_PY}"), # 3. root_dir.glob("*.py"), root_dir.glob("*/*.py"), ): if the_file.exists():"Found the file {the_file}") return Path(the_file) raise QuitComplainingError(Reporter().make_fuss(ProjectViolations.NO_PYTHON_FILE, root=str(root_dir)))
[docs]class ToolNitpickSectionSchema(BaseNitpickSchema): """Validation schema for the ``[tool.nitpick]`` table on ``pyproject.toml``.""" error_messages = {"unknown": help_message("Unknown configuration", "configuration.html")} # noqa: RUF012 style = PolyField(deserialization_schema_selector=fields.string_or_list_field) cache = fields.NonEmptyString() ignore_styles = fields.List(fields.NonEmptyString())
[docs]@dataclass class Configuration: """Configuration read from the ``[tool.nitpick]`` table from one of the ``CONFIG_FILES``.""" file: Path doc: tomlkit.TOMLDocument table: items.Table styles: items.Array dont_suggest: items.Array cache: str
[docs]class Project: """A project to be nitpicked.""" __repr__ = autorepr(["_chosen_root", "root"]) _plugin_manager: PluginManager _confirmed_root: Path def __init__(self, root: PathOrStr | None = None) -> None: self._chosen_root = root self.style_dict: JsonDict = {} self.nitpick_section: JsonDict = {} self.nitpick_files_section: JsonDict = {} @property def root(self) -> Path: """Root dir of the project.""" try: root = self._confirmed_root except AttributeError: root = self._confirmed_root = confirm_project_root(self._chosen_root) return root @property def plugin_manager(self) -> PluginManager: """Load all defined plugins.""" try: manager = self._plugin_manager except AttributeError: manager = self._plugin_manager = PluginManager(PROJECT_NAME) manager.add_hookspecs(plugins) manager.load_setuptools_entrypoints(PROJECT_NAME) return manager
[docs] def config_file_or_default(self) -> Path: """Return a config file if found, or the default one.""" config_file = self.config_file() if config_file: return config_file return self.root / DOT_NITPICK_TOML
[docs] def config_file(self) -> Path | None: """Determine which config file to use.""" found: Path | None = None for possible in CONFIG_FILES: existing: Path = self.root / possible if not existing.exists(): continue if not found:"Config file: reading from {existing}") found = existing else: logger.warning(f"Config file: ignoring existing {existing}") if not found: logger.warning("Config file: none found") return found
[docs] def read_configuration(self) -> Configuration: """Return the ``[tool.nitpick]`` table from the configuration file. Optionally, validate it against a Marshmallow schema. """ config_file = self.config_file_or_default() doc = tomlkit_ext.load(config_file) table: items.Table | None = doc.get(CONFIG_TOOL_NITPICK_KEY) if not table: super_table = tomlkit.table(True) nitpick_table = tomlkit.table() nitpick_table.update({CONFIG_KEY_STYLE: []}) super_table.append(PROJECT_NAME, nitpick_table) doc.append(CONFIG_KEY_TOOL, super_table) table = doc.get(CONFIG_TOOL_NITPICK_KEY) validation_errors = ToolNitpickSectionSchema().validate(table) if validation_errors: raise QuitComplainingError( Reporter(FileInfo(self, PYTHON_PYPROJECT_TOML)).make_fuss( StyleViolations.INVALID_DATA_TOOL_NITPICK, flatten_marshmallow_errors(validation_errors), section=CONFIG_TOOL_NITPICK_KEY, ) ) existing_styles: items.Array = table.get(CONFIG_KEY_STYLE) if existing_styles is None: existing_styles = tomlkit.array() table.add(CONFIG_KEY_STYLE, existing_styles) ignored_styles: items.Array = table.get(CONFIG_KEY_IGNORE_STYLES) if ignored_styles is None: ignored_styles = tomlkit.array() return Configuration(config_file, doc, table, existing_styles, ignored_styles, table.get("cache", ""))
[docs] def merge_styles(self, offline: bool) -> Iterator[Fuss]: """Merge one or multiple style files.""" config = self.read_configuration() style = StyleManager(self, offline, config.cache) base = config.file.expanduser().resolve().as_uri() style_errors = list(style.find_initial_styles(list(always_iterable(config.styles)), base)) if style_errors: raise QuitComplainingError(style_errors) self.style_dict = style.merge_toml_dict() from nitpick.flake8 import NitpickFlake8Extension # pylint: disable=import-outside-toplevel minimum_version = search_json(self.style_dict, JMEX_NITPICK_MINIMUM_VERSION, None) logger.debug(f"Minimum version: {minimum_version}") if minimum_version and parse_version(NitpickFlake8Extension.version) < parse_version(minimum_version): yield Reporter().make_fuss( ProjectViolations.MINIMUM_VERSION, project=PROJECT_NAME, expected=minimum_version, actual=NitpickFlake8Extension.version, ) self.nitpick_section = self.style_dict.get("nitpick", {}) self.nitpick_files_section = self.nitpick_section.get("files", {})
[docs] def suggest_styles(self, library_path_str: PathOrStr | None) -> list[str]: """Suggest styles based on the files in the project root (skipping Git ignored files).""" all_tags: set[str] = {ANY_BUILTIN_STYLE} for project_file_path in glob_non_ignored_files(self.root): all_tags.update(identify.tags_from_path(str(project_file_path))) if library_path_str: library_dir = Path(library_path_str) all_styles: Iterable[Path] = library_dir.glob("**/*.toml") else: library_dir = None all_styles = builtin_styles() suggested_styles: set[str] = set() for style_path in all_styles: style = BuiltinStyle.from_path(style_path, library_dir) if style.identify_tag in all_tags: suggested_styles.add(style.formatted) return sorted(suggested_styles)