Source code for pibootctl.store

# Copyright (c) 2020 Canonical Ltd.
# Copyright (c) 2020 Dave Jones <dave@waveform.org.uk>
#
# This file is part of pibootctl.
#
# pibootctl is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# pibootctl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with pibootctl.  If not, see <https://www.gnu.org/licenses/>.

"""
The :mod:`pibootctl.store` module defines classes which control a store of
Raspberry Pi boot configurations, or the active boot configuration.

The main class of interest is :class:`Store`. From an instance of this, one can
access derivatives of :class:`BootConfiguration` for the purposes of
manipulating the store of configurations, or the active boot configuration
itself. Each :class:`BootConfiguration` contains an instance of
:class:`Settings` which maps setting names to
:class:`~pibootctl.setting.Setting` instances.

See :class:`pibootctl.main` for information on obtaining an instance of
:class:`Store`.

.. data:: Current

    The key of the active boot configuration in instances of :class:`Store`.

.. data:: Default

    The key of the default (empty) boot configuration in instances of
    :class:`Store`.

.. autoclass:: Store
    :members:

.. autoclass:: BootConfiguration
    :members:

.. autoclass:: StoredConfiguration
    :members:

.. autoclass:: MutableConfiguration
    :members:

.. autoclass:: Settings
    :members:
"""

import os
import gettext
from weakref import ref
from pathlib import Path
from copy import deepcopy
from fnmatch import fnmatch
from datetime import datetime
from operator import itemgetter
from collections.abc import Mapping
from zipfile import ZipFile, BadZipFile, ZIP_DEFLATED

from .files import AtomicReplaceFile
from .parser import BootParser, BootFile, BootComment, BootConditions
from .setting import CommandIncludedFile, Influences
from .settings import SETTINGS
from .exc import InvalidConfiguration, IneffectiveConfiguration, DelegatedOutput

_ = gettext.gettext


[docs]class Current: "Singleton representing the active boot configuration in :class:`Store`." def __repr__(self): return 'Current'
Current = Current()
[docs]class Default: "Singleton representing the default boot configuration in :class:`Store`." def __repr__(self): return 'Default'
Default = Default()
[docs]class Store(Mapping): """ A mapping representing all boot configurations (current, default, and stored). Acts as a mapping keyed by the name of the stored configuration, or the special values :data:`Current` for the current boot configuration, or :data:`Default` for the default (empty) configuration. The values of the mapping are derivatives of :class:`BootConfiguration` which provide the parsed :class:`Settings`, along with some other attributes. The mapping is mutable and this can be used to manipulate stored boot configurations. For instance, to store the current boot configuration under the name "foo":: >>> store = Store('/boot', 'pibootctl') >>> store["foo"] = store[Current] Setting the item with the key :data:`Current` overwrites the current boot configuration:: >>> store[Current] = store["serial"] Note that items retrieved from the store are effectively immutable; modifying them (even internally) does *not* modify the content of the store. To modify the content of the store, you must request a :meth:`~BootConfiguration.mutable` copy of a configuration, modify it, and assign it back:: >>> foo = store["foo"].mutable() >>> foo.update({"serial.enabled": True}) >>> store["serial"] = foo The same applies to the current boot configuration item:: >>> current = store[Current].mutable() >>> current.update({"camera.enabled": True, "gpu.mem": 128}) >>> store[Current] = current Items can be deleted to remove them from the store, with the obvious exception of the items with the keys :data:`Current` and :data:`Default` which cannot be removed (attempting to do so will raise a :exc:`KeyError`). Furthermore, the item with the key :data:`Default` cannot be modified either. :param str boot_path: The path on which the boot partition is mounted. :param str store_path: The path (relative to *boot_path*) under which stored configurations will be saved. :param str config_root: The filename of the "root" of the configuration, i.e. the first file read by the parser, and the file in which certain commands (e.g. start_x) *must* be placed. Currently, this should always be "config.txt", the default. :param set mutable_files: The set of filenames which :class:`MutableConfiguration` instances are permitted to change. By default this is just "config.txt". :param bool comment_lines: If :data:`True`, then :class:`MutableConfiguration` will comment out lines no longer required with a # prefix. When :data:`False` (the default), such lines will be deleted instead. When adding lines, regardless of this setting, the utility will search for, and uncomment, commented out lines which match the required output. """ def __init__(self, boot_path, store_path, config_root='config.txt', mutable_files=frozenset({'config.txt'}), comment_lines=False): self._boot_path = Path(boot_path) self._store_path = self._boot_path / store_path self._config_root = config_root self._mutable_files = frozenset(mutable_files) self._comment_lines = comment_lines def _path_of(self, name): return (self._store_path / name).with_suffix('.zip') def _enumerate(self): for path in self._store_path.glob('*.zip'): with ZipFile(str(path), 'r') as arc: if arc.comment.startswith(b'pibootctl:0:'): yield path.stem def __len__(self): # +2 for the current and default configs return sum(1 for i in self._enumerate()) + 2 def __iter__(self): yield Default yield Current yield from self._enumerate() def __contains__(self, key): if key in (Current, Default): # The current and boot configurations are always present (even if # config.txt doesn't exist, there's still technically a boot # configuration - just a default one) return True else: try: with ZipFile(str(self._path_of(key)), 'r') as arc: return arc.comment.startswith(b'pibootctl:0:') except (FileNotFoundError, BadZipFile): return False def __getitem__(self, key): if key is Default: return DefaultConfiguration() elif key is Current: return BootConfiguration( self._boot_path, self._config_root, self._mutable_files, self._comment_lines) elif key in self: return StoredConfiguration( self._path_of(key), self._config_root, self._mutable_files, self._comment_lines) else: raise KeyError(_( "No stored configuration named {key}").format(key=key)) def __setitem__(self, key, item): if key is Default: raise KeyError(_( "Cannot change the default configuration")) elif key is Current: def replace_file(path, file): with AtomicReplaceFile(self._boot_path / path) as temp: temp.write(file.content) os.utime(str(self._boot_path / path), ( datetime.now().timestamp(), file.timestamp.timestamp())) old_files = set(self[Current].files.keys()) for path, file in item.files.items(): if path != self._config_root: replace_file(path, file) # config.txt is deliberately dealt with last. This ensures that, # in the case of systems using os_prefix to switch boot directories # the switch is effectively atomic try: path = self._config_root file = item.files[self._config_root] except KeyError: pass else: replace_file(path, file) # Remove files that existed in the old configuration but not the # new; this is necessary to deal with the case of switching from # a config with config.txt (or other includes) to one without # (which is a valid, default configuration). Again, for systems # using os_prefix to switch boot dirs, this must occur last for path in old_files: if not path in item.files: os.unlink(str(self._boot_path / path)) elif isinstance(key, str) and key: self._store_path.mkdir(parents=True, exist_ok=True) with ZipFile(str(self._path_of(key)), 'x', compression=ZIP_DEFLATED) as arc: arc.comment = 'pibootctl:0:{hash}\n\n{warning}'.format( hash=item.hash, warning=_( 'Do not edit the content of this archive; the line ' 'above is a hash of the content which will not match ' 'after manual editing. Please use the pibootctl tool ' 'to manipulate stored boot configurations'), ).encode('ascii') for file in item.files.values(): file.add_to_zip(arc) else: raise KeyError(_( '{key!r} is an invalid stored configuration').format(key=key)) def __delitem__(self, key): if key is Default: raise KeyError(_("Cannot remove the default configuration")) elif key is Current: raise KeyError(_("Cannot remove the current boot configuration")) else: try: self._path_of(key).unlink() except FileNotFoundError: raise KeyError(_( "No stored configuration named {key}").format(key=key)) @property def active(self): """ Returns the key of the active configuration, if any. If no configuration is currently active, returns :data:`None`. """ current = self[Current] for key in self: if key not in (Current, Default): stored = self[key] if stored.hash == current.hash: return key return None
class DefaultConfiguration: """ Represents the default boot configuration with an entirely empty file-set and a fresh :class:`Settings` instance. """ @property def files(self): """ A mapping of filenames to :class:`~pibootctl.parser.BootFile` instances representing all the files that make up the boot configuration. """ return {} @property def hash(self): """ The `SHA-1`_ hash that identifies the boot configuration. This is obtained by hashing the files of the boot configuration in parsing order. .. _SHA-1: https://en.wikipedia.org/wiki/SHA-1 """ return 'da39a3ee5e6b4b0d3255bfef95601890afd80709' # empty sha1 @property def timestamp(self): """ The last modified timestamp of the boot configuration, as a :class:`~datetime.datetime`. """ return datetime(1970, 1, 1) # UNIX epoch @property def settings(self): """ A :class:`Settings` instance containing all the settings extracted from the boot configuration. """ return Settings()
[docs]class BootConfiguration: """ Represents a boot configuration, as parsed from *config_root* (default "config.txt") on the boot partition (presumably mounted at *path*, a :class:`~pathlib.Path` instance). """ def __init__(self, path, config_root='config.txt', mutable_files=frozenset({'config.txt'}), comment_lines=False): self._path = path self._config_root = config_root self._mutable_files = mutable_files self._comment_lines = comment_lines self._settings = None self._files = None self._hash = None self._timestamp = None def _parse(self): parser = BootParser(self._path) parser.parse(self._config_root) self._settings = Settings() for setting in self._settings.values(): lines = [] for item, value in setting.extract(parser.config): if item.conditions.enabled and value is not Influences: setting._value = value lines.append(item) setting._lines = tuple(lines[::-1]) for setting in self._settings.values(): if isinstance(setting, CommandIncludedFile): parser.add(setting.filename) self._files = parser.files self._hash = parser.hash self._timestamp = parser.timestamp return parser @property def path(self): """ The path (or archive or entity) containing all the files that make up the boot configuration. """ return self._path @property def config_root(self): """ The root file of the boot configuration. This is currently always "config.txt". """ return self._config_root @property def timestamp(self): """ The last modified timestamp of the boot configuration, as a :class:`~datetime.datetime`. """ if self._timestamp is None: self._parse() return self._timestamp @property def hash(self): """ The SHA1 hash that identifies the boot configuration. This is obtained by hashing the files of the boot configuration in parsing order. """ if self._hash is None: self._parse() return self._hash @property def settings(self): """ A :class:`Settings` instance containing all the settings extracted from the boot configuration. """ if self._settings is None: self._parse() return self._settings @property def files(self): """ A mapping of filenames to :class:`~pibootctl.parser.BootFile` instances representing all the files that make up the boot configuration. """ if self._files is None: self._parse() return self._files
[docs] def mutable(self): """ Return a :class:`MutableConfiguration` based on the parsed content of this configuration. Note that mutable configurations are not backed by any files on disk, so nothing is actually re-written until the updated mutable configuration is assigned back to something in the :class:`Store`. """ return MutableConfiguration(self.files.copy(), self._config_root, self._mutable_files, self._comment_lines)
[docs]class StoredConfiguration(BootConfiguration): """ Represents a boot configuration stored in a :class:`~zipfile.ZipFile` specified by *path*. The starting file of the configuration is given by *config_root*. All other parameters are as in :class:`BootConfiguration`. """ def __init__(self, path, config_root='config.txt', mutable_files=frozenset({'config.txt'}), comment_lines=False): super().__init__( ZipFile(str(path), 'r'), config_root, mutable_files, comment_lines) # We can grab the hash and timestamp from the arc's meta-data without # any decompression work (it's all in the uncompressed footer) comment = self.path.comment if comment.startswith(b'pibootctl:0:'): i = len('pibootctl:0:') zip_hash = comment[i:40 + i].decode('ascii') if len(zip_hash) != 40: raise ValueError(_( 'Invalid stored configuration: invalid length')) if not set(zip_hash) <= set('0123456789abcdef'): raise ValueError(_( 'Invalid stored configuration: non-hex hash')) self._hash = zip_hash # A stored archive can be empty, hence default= is required self._timestamp = max( (datetime(*info.date_time) for info in self.path.infolist()), default=datetime(1970, 1, 1)) else: # TODO Should we allow "self-made" archives without a pibootctl # header comment? We can't currently reach here because the # enumerate and contains tests check for pibootctl:0: but that # could be relaxed... assert False, 'Invalid stored configuration: missing hash'
[docs]class MutableConfiguration(BootConfiguration): """ Represents a changeable boot configuration. Do not construct instances of this class directly; they are typically constructed from a *base* :class:`BootConfiguration`, by calling :meth:`~BootConfiguration.mutable`. Mutable configurations can be changed with the :meth:`update` method which will also validate the new configuration, and check that the settings were not overridden by later files. No link is maintained between the original :class:`BootConfiguration` and the mutable copy. This implies that nothing is re-written on disk when the mutable configuration is updated. The resulting configuration must be assigned back to something in the :class:`Store` in order to re-write disk files. """
[docs] def update(self, values, context): """ Given a mapping of setting names to new values, updates the values of the corresponding settings in this configuration. If a value is :data:`None`, the setting is reset to its default value. """ # Generate the "desired" settings. Note that this is a "pure" copy of # the settings without any actual configuration files backing it. We'll # use this firstly to validate the new settings are coherent, and later # to determine whether the configuration we generate matches the # desired settings. updated = self.settings.copy() for name, value in values.items(): item = updated[name] item._value = item.update(value) item._lines = () errors = {} for item in updated.values(): try: item.validate() except ValueError as exc: errors[item.name] = exc if errors: raise InvalidConfiguration(errors) # Generate a clean configuration devoid of all the lines that affected # "values", then build a final configuration from the desired settings # we generated above, and validate it results in the desired settings self._update_path(self._clean_config(values, context)) self._parse() self._update_path(self._final_config(updated, context)) self._parse() diff = updated.diff(self.settings) if diff: raise IneffectiveConfiguration(diff)
def _parse(self): # Save the parsed lines of the boot configuration; the final phase of # the update method (_final_config) requires this information parser = super()._parse() self._config = parser.config def _update_path(self, new_path): # Update self._path from *new_path*, a dict mapping filenames to # lists of lines. for filename, lines in new_path.items(): try: old_file = self._path[filename] except KeyError: old_file = BootFile.empty( filename, encoding='ascii', errors='replace') new_content = ''.join(lines).encode( old_file.encoding, old_file.errors) self._path[filename] = BootFile( filename, datetime.now(), new_content, old_file.encoding, old_file.errors) def _clean_config(self, values, context): # Generate a "clean" configuration in which all lines which affected # (or would potentially affect, under *context*) the settings mentioned # in *values* are disabled or deleted files = { line.filename for name in values for line in self.settings[name].lines } new_path = { filename: list(self._path[filename].lines()) for filename in files } for name in values: for line in self.settings[name].lines: if ( line.filename in self._mutable_files and line.conditions <= context): new_file = new_path[line.filename] if self._comment_lines: if not new_file[line.linenum - 1].startswith('#'): new_file[line.linenum - 1] = ( '#' + new_file[line.linenum - 1]) else: new_file[line.linenum - 1] = '' return new_path def _final_config(self, updated, context): # Diff the new settings to figure out which settings actually need # writing, and generate content from changed settings. Here we handle # the case of settings delegating their output to other settings and # track which ones have been done to avoid duplication done = set() new_lines = {} # XXX Can new ever be None? Would that be an error? for old, new in self.settings.diff(updated): if new.name in done: continue setting = new while True: try: done.add(setting.name) new_lines[setting.key] = list(setting.output()) except DelegatedOutput as exc: setting = updated[exc.master] else: break # Search for comments that can be "uncommented" instead of writing new # lines, and otherwise record which new lines are required new_path = {} new_config = [] for key, lines in sorted(new_lines.items(), key=itemgetter(0)): for new_line in lines: for old_line in self._config: # XXX This isn't *entirely* safe when dealing with # dt-params, because anything we uncomment is potentially # out of key order in the final output if ( isinstance(old_line, BootComment) and old_line.conditions == context and old_line.comment == new_line): try: new_file = new_path[old_line.filename] except KeyError: new_file = new_path[old_line.filename] = ( list(self._path[old_line.filename].lines())) new_file[old_line.linenum - 1] = old_line.comment + '\n' break else: new_config.append(new_line) # Find the insertion-point for new_config; ideally, this is the last # line of any section in the root configuration file which matches our # desired context. Failing that, it'll be the last line of the root # configuration file insert_at = None for line in reversed(self._config): if line.filename == self.config_root: if insert_at is None: # Set a tentative insertion-point at the last line in the # root configuration file insert_at = line if line.conditions == context: # If we find a line which has conditions matching our # required context, we're done insert_at = line break if insert_at is None: # This can only happen if there's no root configuration file so # we need to generate one with the appropriate context insert_at = BootComment(self.config_root, 0, BootConditions()) # Insert the new content, prefixed with any necessary # sections to adjust the context of the insertion point (ip) if insert_at.conditions != context: # Two cases are relevant here: the above case where no root # configuration file exists, and the case where no lines in the # existing configuration match the desired context new_config.insert(0, '') new_config[1:1] = list(context.generate(insert_at.conditions)) try: new_file = new_path[self.config_root] except KeyError: try: new_file = new_path[self.config_root] = ( list(self._path[self.config_root].lines())) except KeyError: new_file = new_path[self.config_root] = [] new_config = [line + '\n' for line in new_config] new_file[insert_at.linenum:insert_at.linenum] = new_config # TODO Add an (optional?) phase to prune (/comment?) empty sections? # TODO Add an (optional?) phase to ensure [all] is always last? return new_path
[docs]class Settings(Mapping): """ Represents all settings in a boot configuration; acts like an ordered mapping of names to :class:`~pibootctl.setting.Setting` objects. """ def __init__(self, items=None): if items is None: items = SETTINGS self._items = deepcopy(items) for setting in self._items.values(): setting._settings = ref(self) self._visible = set(self._items.keys()) def __len__(self): return len(self._visible) def __iter__(self): for key in self._items: if key in self._visible: yield key def __contains__(self, key): return key in self._visible def __getitem__(self, key): if key not in self._visible: raise KeyError(key) return self._items[key]
[docs] def copy(self): """ Returns a distinct copy of the configuration that can be updated without affecting the original. """ new = deepcopy(self) for setting in new._items.values(): setting._settings = ref(new) return new
[docs] def modified(self): """ Returns a copy of the configuration which only contains modified settings. """ # When filtering we mustn't actually remove any members of _items as # Setting instances may need to refer to a "hidden" value to, for # example, determine their default value new_visible = { name for name in self._visible if self[name].modified } copy = self.copy() copy._visible = new_visible return copy
[docs] def filter(self, pattern): """ Returns a copy of the configuration which only contains settings with names matching *pattern*, which may contain regular shell globbing patterns. """ new_visible = { name for name in self._visible if fnmatch(name, pattern) } copy = self.copy() copy._visible = new_visible return copy
[docs] def diff(self, other): """ Returns a set of (self, other) setting tuples for all settings that differ between *self* and *other* (another :class:`Settings` instance). If a particular setting is missing from either side, its entry will be given as :data:`None`. """ return { (setting, other[setting.name] if setting.name in other else None) for setting in self.values() if setting.name not in other or other[setting.name].value != setting.value } | { (None, other[name]) for name in other if name not in self }