#!/usr/bin/python # Copyright (c) 2015-2023, Vlad Glagolev # # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import annotations DOCUMENTATION = r""" module: sorcery short_description: Package manager for Source Mage GNU/Linux description: - Manages "spells" on Source Mage GNU/Linux using I(sorcery) toolchain. author: "Vlad Glagolev (@vaygr)" notes: - When all three components are selected, the update goes by the sequence -- Sorcery -> Grimoire(s) -> Spell(s); you cannot override it. - Grimoire handling is supported since community.general 7.3.0. requirements: - bash extends_documentation_fragment: - community.general.attributes attributes: check_mode: support: full diff_mode: support: none options: name: description: - Name of the spell or grimoire. - Multiple names can be given, separated by commas. - Special value V(*) in conjunction with states V(latest) or V(rebuild) updates or rebuilds the whole system respectively. - The alias O(grimoire) was added in community.general 7.3.0. aliases: ["spell", "grimoire"] type: list elements: str repository: description: - Repository location. - If specified, O(name) represents grimoire(s) instead of spell(s). - Special value V(*) pulls grimoire from the official location. - Only single item in O(name) in conjunction with V(*) can be used. - O(state=absent) must be used with a special value V(*). type: str version_added: 7.3.0 state: description: - Whether to cast, dispel or rebuild a package. - State V(cast) is an equivalent of V(present), not V(latest). - State V(rebuild) implies cast of all specified spells, not only those existed before. choices: ["present", "latest", "absent", "cast", "dispelled", "rebuild"] default: "present" type: str depends: description: - Comma-separated list of _optional_ dependencies to build a spell (or make sure it is built) with; use V(+)/V(-) in front of dependency to turn it on/off (V(+) is optional though). - This option is ignored if O(name) parameter is equal to V(*) or contains more than one spell. - Providers must be supplied in the form recognized by Sorcery, for example 'V(openssl(SSL\))'. type: str update: description: - Whether or not to update sorcery scripts at the very first stage. type: bool default: false update_cache: description: - Whether or not to update grimoire collection before casting spells. type: bool default: false aliases: ["update_codex"] cache_valid_time: description: - Time in seconds to invalidate grimoire collection on update. - Especially useful for SCM and rsync grimoires. - Makes sense only in pair with O(update_cache). type: int default: 0 """ EXAMPLES = r""" - name: Make sure spell foo is installed community.general.sorcery: spell: foo state: present - name: Make sure spells foo, bar and baz are removed community.general.sorcery: spell: foo,bar,baz state: absent - name: Make sure spell foo with dependencies bar and baz is installed community.general.sorcery: spell: foo depends: bar,baz state: present - name: Make sure spell foo with bar and without baz dependencies is installed community.general.sorcery: spell: foo depends: +bar,-baz state: present - name: Make sure spell foo with libressl (providing SSL) dependency is installed community.general.sorcery: spell: foo depends: libressl(SSL) state: present - name: Make sure spells with/without required dependencies (if any) are installed community.general.sorcery: name: "{{ item.spell }}" depends: "{{ item.depends | default(None) }}" state: present loop: - {spell: 'vifm', depends: '+file,-gtk+2'} - {spell: 'fwknop', depends: 'gpgme'} - {spell: 'pv,tnftp,tor'} - name: Install the latest version of spell foo using regular glossary community.general.sorcery: name: foo state: latest - name: Rebuild spell foo community.general.sorcery: spell: foo state: rebuild - name: Rebuild the whole system, but update Sorcery and Codex first community.general.sorcery: spell: '*' state: rebuild update: true update_cache: true - name: Refresh the grimoire collection if it is 1 day old using native sorcerous alias community.general.sorcery: update_codex: true cache_valid_time: 86400 - name: Make sure stable grimoire is present community.general.sorcery: name: stable repository: '*' state: present - name: Make sure binary and stable-rc grimoires are removed community.general.sorcery: grimoire: binary,stable-rc repository: '*' state: absent - name: Make sure games grimoire is pulled from rsync community.general.sorcery: grimoire: games repository: "rsync://download.sourcemage.org::codex/games" state: present - name: Make sure a specific branch of stable grimoire is pulled from git community.general.sorcery: grimoire: stable.git repository: "git://download.sourcemage.org/smgl/grimoire.git:stable.git:stable-0.62" state: present - name: Update only Sorcery itself community.general.sorcery: update: true """ RETURN = r""" """ import datetime import fileinput import os import re import shutil import sys from ansible.module_utils.basic import AnsibleModule # auto-filled at module init SORCERY = {"sorcery": None, "scribe": None, "cast": None, "dispel": None, "gaze": None} SORCERY_LOG_DIR = "/var/log/sorcery" SORCERY_STATE_DIR = "/var/state/sorcery" NA = "N/A" def get_sorcery_ver(module): """Get Sorcery version.""" cmd_sorcery = f"{SORCERY['sorcery']} --version" rc, stdout, stderr = module.run_command(cmd_sorcery) if rc != 0 or not stdout: module.fail_json(msg="unable to get Sorcery version") return stdout.strip() def codex_fresh(codex, module): """Check if grimoire collection is fresh enough.""" if not module.params["cache_valid_time"]: return False timedelta = datetime.timedelta(seconds=module.params["cache_valid_time"]) for grimoire in codex: lastupdate_path = os.path.join(SORCERY_STATE_DIR, f"{grimoire}.lastupdate") try: mtime = os.stat(lastupdate_path).st_mtime except Exception: return False lastupdate_ts = datetime.datetime.fromtimestamp(mtime) # if any grimoire is not fresh, we invalidate the Codex if lastupdate_ts + timedelta < datetime.datetime.now(): return False return True def codex_list(module, skip_new=False): """List valid grimoire collection.""" params = module.params codex = {} cmd_scribe = f"{SORCERY['scribe']} index" rc, stdout, stderr = module.run_command(cmd_scribe) if rc != 0: module.fail_json(msg="unable to list grimoire collection, fix your Codex") rex = re.compile(r"^\s*\[\d+\] : (?P[\w\-+.]+) : [\w\-+./]+(?: : (?P[\w\-+.]+))?\s*$") # drop 4-line header and empty trailing line for line in stdout.splitlines()[4:-1]: match = rex.match(line) if match: codex[match.group("grim")] = match.group("ver") # return only specified grimoires unless requested to skip new if params["repository"] and not skip_new: codex = {x: codex.get(x, NA) for x in params["name"]} if not codex: module.fail_json(msg="no grimoires to operate on; add at least one") return codex def update_sorcery(module): """Update sorcery scripts. This runs 'sorcery update' ('sorcery -u'). Check mode always returns a positive change value. """ changed = False if module.check_mode: return (True, "would have updated Sorcery") else: sorcery_ver = get_sorcery_ver(module) cmd_sorcery = f"{SORCERY['sorcery']} update" rc, stdout, stderr = module.run_command(cmd_sorcery) if rc != 0: module.fail_json(msg=f"unable to update Sorcery: {stdout}") if sorcery_ver != get_sorcery_ver(module): changed = True return (changed, "successfully updated Sorcery") def update_codex(module): """Update grimoire collections. This runs 'scribe update'. Check mode always returns a positive change value when 'cache_valid_time' is used. """ params = module.params changed = False codex = codex_list(module) fresh = codex_fresh(codex, module) if module.check_mode: if not fresh: changed = True return (changed, "would have updated Codex") else: if not fresh: # SILENT is required as a workaround for query() in libgpg module.run_command_environ_update.update(dict(SILENT="1")) cmd_scribe = f"{SORCERY['scribe']} update" if params["repository"]: cmd_scribe += f" {' '.join(codex.keys())}" rc, stdout, stderr = module.run_command(cmd_scribe) if rc != 0: module.fail_json(msg=f"unable to update Codex: {stdout}") if codex != codex_list(module): changed = True return (changed, "successfully updated Codex") def match_depends(module): """Check for matching dependencies. This inspects spell's dependencies with the desired states and returns 'False' if a recast is needed to match them. It also adds required lines to the system-wide depends file for proper recast procedure. """ params = module.params spells = params["name"] depends = {} depends_ok = True if len(spells) > 1 or not params["depends"]: return depends_ok spell = spells[0] if module.check_mode: sorcery_depends_orig = os.path.join(SORCERY_STATE_DIR, "depends") sorcery_depends = os.path.join(SORCERY_STATE_DIR, "depends.check") try: shutil.copy2(sorcery_depends_orig, sorcery_depends) except IOError: module.fail_json(msg="failed to copy depends.check file") else: sorcery_depends = os.path.join(SORCERY_STATE_DIR, "depends") rex = re.compile(r"^(?P\+?|\-){1}(?P[a-z0-9]+[a-z0-9_\-\+\.]*(\([A-Z0-9_\-\+\.]+\))*)$") for d in params["depends"].split(","): match = rex.match(d) if not match: module.fail_json(msg=f"wrong depends line for spell '{spell}'") # normalize status if not match.group("status") or match.group("status") == "+": status = "on" else: status = "off" depends[match.group("depend")] = status # drop providers spec depends_list = [s.split("(")[0] for s in depends] cmd_gaze = f"{SORCERY['gaze']} -q version {' '.join(depends_list)}" rc, stdout, stderr = module.run_command(cmd_gaze) if rc != 0: module.fail_json(msg=f"wrong dependencies for spell '{spell}'") fi = fileinput.input(sorcery_depends, inplace=True) try: try: for line in fi: if line.startswith(f"{spell}:"): match = None for d in depends: # when local status is 'off' and dependency is provider, # use only provider value d_offset = d.find("(") if d_offset == -1: d_p = "" else: d_p = re.escape(d[d_offset:]) # .escape() is needed mostly for the spells like 'libsigc++' rex = re.compile(f"{re.escape(spell)}:(?:{re.escape(d)}|{d_p}):(?Pon|off):optional:") match = rex.match(line) # we matched the line "spell:dependency:on|off:optional:" if match: # if we also matched the local status, mark dependency # as empty and put it back into depends file if match.group("lstatus") == depends[d]: depends[d] = None sys.stdout.write(line) # status is not that we need, so keep this dependency # in the list for further reverse switching; # stop and process the next line in both cases break if not match: sys.stdout.write(line) else: sys.stdout.write(line) except IOError: module.fail_json(msg="I/O error on the depends file") finally: fi.close() depends_new = [v for v in depends if depends[v]] if depends_new: try: with open(sorcery_depends, "a") as fl: for k in depends_new: fl.write(f"{spell}:{k}:{depends[k]}:optional::\n") except IOError: module.fail_json(msg="I/O error on the depends file") depends_ok = False if module.check_mode: try: os.remove(sorcery_depends) except IOError: module.fail_json(msg="failed to clean up depends.backup file") return depends_ok def manage_grimoires(module): """Add or remove grimoires.""" params = module.params grimoires = params["name"] url = params["repository"] codex = codex_list(module, True) if url == "*": if params["state"] in ("present", "latest", "absent"): if params["state"] == "absent": action = "remove" todo = set(grimoires) & set(codex) else: action = "add" todo = set(grimoires) - set(codex) if not todo: return (False, f"all grimoire(s) are already {action[:5]}ed") if module.check_mode: return (True, f"would have {action[:5]}ed grimoire(s)") cmd_scribe = f"{SORCERY['scribe']} {action} {' '.join(todo)}" rc, stdout, stderr = module.run_command(cmd_scribe) if rc != 0: module.fail_json(msg=f"failed to {action} one or more grimoire(s): {stdout}") return (True, f"successfully {action[:5]}ed one or more grimoire(s)") else: module.fail_json(msg="unsupported operation on '*' repository value") else: if params["state"] in ("present", "latest"): if len(grimoires) > 1: module.fail_json(msg="using multiple items with repository is invalid") grimoire = grimoires[0] if grimoire in codex: return (False, f"grimoire {grimoire} already exists") if module.check_mode: return (True, f"would have added grimoire {grimoire} from {url}") cmd_scribe = f"{SORCERY['scribe']} add {grimoire} from {url}" rc, stdout, stderr = module.run_command(cmd_scribe) if rc != 0: module.fail_json(msg=f"failed to add grimoire {grimoire} from {url}: {stdout}") return (True, f"successfully added grimoire {grimoire} from {url}") else: module.fail_json(msg="unsupported operation on repository value") def manage_spells(module): """Cast or dispel spells. This manages the whole system ('*'), list or a single spell. Command 'cast' is used to install or rebuild spells, while 'dispel' takes care of theirs removal from the system. """ params = module.params spells = params["name"] sorcery_queue = os.path.join(SORCERY_LOG_DIR, "queue/install") if spells == "*": if params["state"] == "latest": # back up original queue try: os.rename(sorcery_queue, f"{sorcery_queue}.backup") except IOError: module.fail_json(msg="failed to backup the update queue") # see update_codex() module.run_command_environ_update.update(dict(SILENT="1")) cmd_sorcery = f"{SORCERY['sorcery']} queue" rc, stdout, stderr = module.run_command(cmd_sorcery) if rc != 0: module.fail_json(msg="failed to generate the update queue") try: queue_size = os.stat(sorcery_queue).st_size except Exception: module.fail_json(msg="failed to read the update queue") if queue_size != 0: if module.check_mode: try: os.rename(f"{sorcery_queue}.backup", sorcery_queue) except IOError: module.fail_json(msg="failed to restore the update queue") return (True, "would have updated the system") cmd_cast = f"{SORCERY['cast']} --queue" rc, stdout, stderr = module.run_command(cmd_cast) if rc != 0: module.fail_json(msg="failed to update the system") return (True, "successfully updated the system") else: return (False, "the system is already up to date") elif params["state"] == "rebuild": if module.check_mode: return (True, "would have rebuilt the system") cmd_sorcery = f"{SORCERY['sorcery']} rebuild" rc, stdout, stderr = module.run_command(cmd_sorcery) if rc != 0: module.fail_json(msg=f"failed to rebuild the system: {stdout}") return (True, "successfully rebuilt the system") else: module.fail_json(msg="unsupported operation on '*' name value") else: if params["state"] in ("present", "latest", "rebuild", "absent"): # extract versions from the 'gaze' command cmd_gaze = f"{SORCERY['gaze']} -q version {' '.join(spells)}" rc, stdout, stderr = module.run_command(cmd_gaze) # fail if any of spells cannot be found if rc != 0: module.fail_json(msg=f"failed to locate spell(s) in the list ({', '.join(spells)})") cast_queue = [] dispel_queue = [] rex = re.compile(r"[^|]+\|[^|]+\|(?P[^|]+)\|(?P[^|]+)\|(?P[^$]+)") # drop 2-line header and empty trailing line for line in stdout.splitlines()[2:-1]: match = rex.match(line) cast = False if params["state"] == "present": # spell is not installed.. if match.group("inst_ver") == "-": # ..so set up depends reqs for it match_depends(module) cast = True # spell is installed.. else: # ..but does not conform depends reqs if not match_depends(module): cast = True elif params["state"] == "latest": # grimoire and installed versions do not match.. if match.group("grim_ver") != match.group("inst_ver"): # ..so check for depends reqs first and set them up match_depends(module) cast = True # grimoire and installed versions match.. else: # ..but the spell does not conform depends reqs if not match_depends(module): cast = True elif params["state"] == "rebuild": cast = True # 'absent' else: if match.group("inst_ver") != "-": dispel_queue.append(match.group("spell")) if cast: cast_queue.append(match.group("spell")) if cast_queue: if module.check_mode: return (True, "would have cast spell(s)") cmd_cast = f"{SORCERY['cast']} -c {' '.join(cast_queue)}" rc, stdout, stderr = module.run_command(cmd_cast) if rc != 0: module.fail_json(msg=f"failed to cast spell(s): {stdout}") return (True, "successfully cast spell(s)") elif params["state"] != "absent": return (False, "spell(s) are already cast") if dispel_queue: if module.check_mode: return (True, "would have dispelled spell(s)") cmd_dispel = f"{SORCERY['dispel']} {' '.join(dispel_queue)}" rc, stdout, stderr = module.run_command(cmd_dispel) if rc != 0: module.fail_json(msg=f"failed to dispel spell(s): {stdout}") return (True, "successfully dispelled spell(s)") else: return (False, "spell(s) are already dispelled") def main(): module = AnsibleModule( argument_spec=dict( name=dict(aliases=["spell", "grimoire"], type="list", elements="str"), repository=dict(type="str"), state=dict(default="present", choices=["present", "latest", "absent", "cast", "dispelled", "rebuild"]), depends=dict(), update=dict(default=False, type="bool"), update_cache=dict(default=False, aliases=["update_codex"], type="bool"), cache_valid_time=dict(default=0, type="int"), ), required_one_of=[["name", "update", "update_cache"]], supports_check_mode=True, ) if os.geteuid() != 0: module.fail_json(msg="root privileges are required for this operation") for c in SORCERY: SORCERY[c] = module.get_bin_path(c, True) # prepare environment: run sorcery commands without asking questions module.run_command_environ_update = dict(PROMPT_DELAY="0", VOYEUR="0") params = module.params # normalize 'state' parameter if params["state"] in ("present", "cast"): params["state"] = "present" elif params["state"] in ("absent", "dispelled"): params["state"] = "absent" changed = {"sorcery": (False, NA), "grimoires": (False, NA), "codex": (False, NA), "spells": (False, NA)} if params["update"]: changed["sorcery"] = update_sorcery(module) if params["name"] and params["repository"]: changed["grimoires"] = manage_grimoires(module) if params["update_cache"]: changed["codex"] = update_codex(module) if params["name"] and not params["repository"]: changed["spells"] = manage_spells(module) if any(x[0] for x in changed.values()): state_msg = "state changed" state_changed = True else: state_msg = "no change in state" state_changed = False module.exit_json(changed=state_changed, msg=f"{state_msg}: {'; '.join(x[1] for x in changed.values())}") if __name__ == "__main__": main()