From bfd8b014fe5dadcb24a9f0f3ddf700af3f1b368c Mon Sep 17 00:00:00 2001 From: Jean Khawand <22157081+jeankhawand@users.noreply.github.com> Date: Sun, 7 Jun 2026 11:13:39 +0200 Subject: [PATCH] Add proton pass-cli lookup plugin --- .github/BOTMETA.yml | 2 + plugins/lookup/proton_pass.py | 634 +++++++++++++++ tests/unit/plugins/lookup/test_proton_pass.py | 741 ++++++++++++++++++ 3 files changed, 1377 insertions(+) create mode 100644 plugins/lookup/proton_pass.py create mode 100644 tests/unit/plugins/lookup/test_proton_pass.py diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index 251bb71bff..3b608bbe0b 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -281,6 +281,8 @@ files: maintainers: jantari $lookups/bitwarden.py: maintainers: lungj + $lookups/proton_pass.py: + maintainers: jeankhawand $lookups/cartesian.py: {} $lookups/chef_databag.py: {} $lookups/collection_version.py: diff --git a/plugins/lookup/proton_pass.py b/plugins/lookup/proton_pass.py new file mode 100644 index 0000000000..d4524a4043 --- /dev/null +++ b/plugins/lookup/proton_pass.py @@ -0,0 +1,634 @@ +# Copyright (c) 2026, Jean Khawand (@jeankhawand) +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + +DOCUMENTATION = r""" +name: proton_pass +author: + - Jean Khawand (@jeankhawand) +version_added: "13.1.0" +short_description: Fetch secrets from Proton Pass via the C(pass-cli) command-line tool. +description: + - Retrieves secrets stored in Proton Pass by calling the C(pass-cli) binary + on the Ansible control machine. + - When only an item title is given, returns a dictionary of all extra fields for + that item so the caller can access individual secrets by key. + - When a field name is also given, returns the field value as a string directly, + without parsing JSON. + - Every lookup invokes C(pass-cli) directly — no in-process caching — so each + call always reflects the current vault state. +requirements: + - C(pass-cli) installed on the Ansible control machine + (see U(https://github.com/protonpass/pass-cli)) + - An active Proton Pass session established with C(pass-cli login) or a PAT + token supplied via O(pat) / E(ANSIBLE_PROTON_PASS_PAT). +notes: + - Two non-interactive authentication paths are available. + - B(Agent tokens) (recommended for CI/CD) are scoped to specific vaults, + carry an expiration date, and produce an encrypted audit log — create one + with C(pass-cli agent create --expiration 1m --vault ), then + supply the token via O(pat) / E(ANSIBLE_PROTON_PASS_PAT) and set + O(agent_reason) / E(ANSIBLE_PROTON_PASS_AGENT_REASON) to a short description + of why the automation is accessing the vault. + - B(Personal Access Tokens (PAT)) grant full account access and should be + reserved for interactive or development use — generate one in the Proton + Pass web app under B(Account > Security > Access and authentication > + Personal Access Tokens). PAT format is C(pst_::). + - Authenticate once interactively with C(pass-cli login) before running + playbooks when neither an agent token nor a PAT is configured. + - Set E(ANSIBLE_PROTON_PASS_DEBUG=1) to print raw C(pass-cli) JSON to stderr + when field extraction fails — useful for diagnosing JSON structure changes + across C(pass-cli) versions. + - >- + All Proton Pass item types are supported: B(Login), B(Note), B(Credit Card), + B(Wi-Fi), B(Identity), B(SSH Key), B(Custom), and B(Alias). Standard fields + for each type (for example C(username)/C(password) for Login, C(ssid)/C(password) + for Wi-Fi, C(private_key)/C(public_key) for SSH Key) are included in the returned + dictionary alongside any custom extra fields. + - For B(Login) items the C(urls) field is returned as a newline-separated string + when multiple URLs are present. Use C(| split("\n")) to obtain a list. + - In Proton Pass, create one item per host. Add every secret as an extra B(hidden) + field whose name exactly matches the key your playbooks reference (for example + C(api_token), C(gpg_keyphrase)). + - Sections (C(extra_sections)) are supported — fields nested inside a section + are merged into the same flat dict as top-level extra fields. + - >- + Two C(pass-cli) JSON schemas are supported. The B(current schema) (C(pass-cli) 1.x) + nests item data under C(item.content) with C(extra_fields) entries shaped as + C({"name": "key", "content": {"Hidden": "value"}}). The B(legacy schema) wraps data + under C(data.content) with entries shaped as + C({"field_name": "key", "data": {"field_type": "hidden", "content": "value"}}). + Both are handled transparently. If field extraction fails on a newer C(pass-cli) + release, set E(ANSIBLE_PROTON_PASS_DEBUG=1) to inspect the raw JSON and report + a bug. +seealso: + - module: community.general.bitwarden + description: Similar plugin for Bitwarden. +options: + _terms: + description: + - One or more Proton Pass item titles to look up. + - If exactly two terms are given and O(field) is not set, the second term + is interpreted as a field name and a plain string is returned instead of + a dictionary. + required: true + type: list + elements: str + field: + description: + - Field name to retrieve. When set, the lookup returns a plain string + instead of a dictionary, equivalent to passing the field as the second + positional term. + - When both a second positional term and O(field) are supplied, O(field) + takes precedence. + type: str + default: "" + vault: + description: + - Name of the Proton Pass vault to query. + - When omitted, C(pass-cli) searches across all vaults accessible to + the authenticated account; this may raise an error when the item title + is not unique across vaults. + type: str + default: "" + ini: + - section: proton_pass_lookup + key: vault + env: + - name: ANSIBLE_PROTON_PASS_VAULT + pat: + description: + - Personal Access Token used for non-interactive authentication. + - Format is C(pst_::). + - When provided, the plugin runs C(pass-cli login --pat ) before + the first item lookup if no active session is detected. + type: str + default: "" + ini: + - section: proton_pass_lookup + key: pat + env: + - name: ANSIBLE_PROTON_PASS_PAT + agent_reason: + description: + - Human-readable reason string required by Proton Pass when authenticating + with an B(agent token) (as opposed to a personal account PAT). + - Maps to the C(PROTON_PASS_AGENT_REASON) environment variable consumed by + C(pass-cli). Must be between 1 and 300 characters. + - Agent tokens are the recommended authentication mechanism for CI/CD + pipelines and automated processes — they are scoped to specific vaults, + carry an expiration date, and produce an encrypted audit log. + Create one with C(pass-cli agent create --expiration 1m + --vault ). + type: str + default: "" + ini: + - section: proton_pass_lookup + key: agent_reason + env: + - name: ANSIBLE_PROTON_PASS_AGENT_REASON + cli_path: + description: + - Path to the C(pass-cli) binary. + - When not set, C(pass-cli) is resolved from C($PATH). + type: str + default: pass-cli + ini: + - section: proton_pass_lookup + key: cli_path + env: + - name: ANSIBLE_PROTON_PASS_CLI_PATH + timeout: + description: + - Timeout in seconds for each C(pass-cli) subprocess invocation. + type: int + default: 30 + ini: + - section: proton_pass_lookup + key: timeout + env: + - name: ANSIBLE_PROTON_PASS_TIMEOUT +""" + +EXAMPLES = r""" +# Prerequisites — authenticate once on the control machine: +# pass-cli login --pat pst_:: +# Or export for non-interactive / CI use: +# export ANSIBLE_PROTON_PASS_PAT='pst_::' +# Or configure ansible.cfg (applies to all lookups in the project): +# [proton_pass_lookup] +# vault = myproject + +# --- All-fields mode: returns a dict of all extra fields --- + +- name: Load all secrets for the current host + ansible.builtin.set_fact: + host_secrets: "{{ lookup('community.general.proton_pass', inventory_hostname, vault='myproject') }}" + +- name: Use a field from the loaded dict + ansible.builtin.debug: + msg: "Token: {{ host_secrets.api_token }}" + +# --- Single-field mode: returns a plain string --- + +- name: Fetch one field via second positional term + ansible.builtin.debug: + msg: "{{ lookup('community.general.proton_pass', 'my_host', 'api_token', vault='myproject') }}" + +- name: Fetch one field via keyword argument + ansible.builtin.debug: + msg: "{{ lookup('community.general.proton_pass', 'my_host', field='api_token', vault='myproject') }}" + +# --- Multiple items, same field --- + +- name: Fetch gpg_keyphrase from two items + ansible.builtin.debug: + msg: "{{ lookup('community.general.proton_pass', 'host_a', 'host_b', field='gpg_keyphrase', vault='myproject') }}" + +# --- Drop-in replacement for ansible-vault secrets --- +# Place in group_vars/all/secrets.yaml (plain YAML, safe to commit): +# +# secrets: +# host_a: "{{ lookup('community.general.proton_pass', 'host_a', vault='myproject') }}" +# host_b: "{{ lookup('community.general.proton_pass', 'host_b', vault='myproject') }}" +# +# Existing role tasks continue to work without modification: +# "{{ secrets[inventory_hostname].some_secret }}" + +# --- Vault configured globally in ansible.cfg — no vault= kwarg needed --- +- name: Lookup when ansible.cfg sets [proton_pass_lookup] vault + ansible.builtin.debug: + msg: "{{ lookup('community.general.proton_pass', 'my_host', 'api_token') }}" + +# --- Agent token authentication (recommended for CI/CD) --- +# Create the agent first (one-time setup): +# pass-cli agent create ci-runner --expiration 1m --vault myproject +# Then configure ansible.cfg: +# [proton_pass_lookup] +# vault = myproject +# agent_reason = ansible playbook run +# And set the token in the environment (or ansible.cfg): +# export ANSIBLE_PROTON_PASS_PAT='' +- name: Lookup using an agent token + ansible.builtin.debug: + msg: "{{ lookup('community.general.proton_pass', 'my_host', 'api_token', + pat=lookup('env', 'ANSIBLE_PROTON_PASS_PAT'), + agent_reason='nightly deployment pipeline') }}" +""" + +RETURN = r""" +_raw: + description: + - A list with one element per input term. + - In single-field mode (O(field) set or two-term shorthand), each element is + the plain-text value of the requested field. + - In all-fields mode, each element is a flat dictionary mapping standard + item-type fields (C(username)/C(password) for login, C(ssid)/C(password) for + wi-fi, C(private_key)/C(public_key) for ssh-key, etc.) and every custom extra + field to its string value. + type: list + elements: raw +""" + +import json +import os +from subprocess import PIPE, Popen, TimeoutExpired + +from ansible.errors import AnsibleLookupError, AnsibleOptionsError +from ansible.module_utils.common.text.converters import to_text +from ansible.plugins.lookup import LookupBase +from ansible.utils.display import Display + +display = Display() + +# --------------------------------------------------------------------------- +# Item-type field registries — mirrors pass-cli item create --get-template +# --------------------------------------------------------------------------- + +_LOGIN_FIELDS = ("username", "email", "password", "totp_uri") +_CREDIT_CARD_FIELDS = ("cardholder_name", "card_type", "number", "cvv", "expiration_date", "pin") +_WIFI_FIELDS = ("ssid", "password", "security") +_SSH_KEY_FIELDS = ("private_key", "public_key", "fingerprint", "key_type") +_IDENTITY_FIELDS = ( + "full_name", "email", "phone_number", "first_name", "middle_name", "last_name", + "birthdate", "gender", "organization", "street_address", "zip_or_postal_code", + "city", "state_or_province", "country_or_region", "floor", "county", + "social_security_number", "passport_number", "license_number", "website", + "x_handle", "second_phone_number", "linkedin", "reddit", "facebook", "yahoo", + "instagram", "company", "job_title", "personal_website", "work_phone_number", + "work_email", +) + + +class ProtonPassCLIError(AnsibleLookupError): + """Raised when the pass-cli subprocess fails in an unexpected way.""" + + +# --------------------------------------------------------------------------- +# CLI wrapper +# --------------------------------------------------------------------------- + +class ProtonPassClient: + """Thin wrapper around the pass-cli binary.""" + + def __init__( + self, + cli_path: str = "pass-cli", + timeout: int = 30, + agent_reason: str = "", + ) -> None: + self.cli_path = cli_path + self.timeout = timeout + self.agent_reason = agent_reason + self._session_ok = False + + def _run(self, args: list[str]) -> tuple[int, str, str]: + """ + Run pass-cli with *args* and return (returncode, stdout, stderr). + + When O(agent_reason) is set, the C(PROTON_PASS_AGENT_REASON) environment + variable is injected into the subprocess environment so that pass-cli + records the reason in its encrypted audit log. + + Raises: + ProtonPassCLIError: if the binary is not found or the call times out. + """ + command = [self.cli_path] + args + display.vvv(f"proton_pass: running {command}") + + env = os.environ.copy() + if self.agent_reason: + env["PROTON_PASS_AGENT_REASON"] = self.agent_reason + + try: + proc = Popen(command, stdout=PIPE, stderr=PIPE, env=env) + out_bytes, err_bytes = proc.communicate(timeout=self.timeout) + rc = proc.returncode + except FileNotFoundError as e: + raise ProtonPassCLIError( + f"'{self.cli_path}' not found — install pass-cli or configure " + "E(ANSIBLE_PROTON_PASS_CLI_PATH) / [proton_pass_lookup] cli_path " + "in ansible.cfg" + ) from e + except TimeoutExpired as e: + proc.kill() + raise ProtonPassCLIError( + f"pass-cli timed out after {self.timeout}s" + ) from e + + return ( + rc, + to_text(out_bytes, errors="surrogate_or_strict"), + to_text(err_bytes, errors="surrogate_or_strict"), + ) + + # --- authentication ---------------------------------------------------------- + + def test_session(self) -> bool: + """Return True when an active pass-cli session exists.""" + rc, *_ignored = self._run(["test"]) + return rc == 0 + + def login_with_pat(self, pat: str) -> None: + """Authenticate using a Personal Access Token (C(pst_::)).""" + rc, _out, err = self._run(["login", "--pat", pat]) + if rc != 0: + raise AnsibleLookupError( + f"pass-cli login failed: {err.strip()} — " + "verify the PAT format is pst_::" + ) + + def ensure_authenticated(self, pat: str) -> None: + """Ensure there is an active session, logging in with PAT if necessary.""" + if self._session_ok: + return + if self.test_session(): + self._session_ok = True + return + if not pat: + raise AnsibleLookupError( + "No active pass-cli session and no PAT supplied. " + "Run 'pass-cli login' on the control machine, or provide " + "O(pat) / E(ANSIBLE_PROTON_PASS_PAT)." + ) + self.login_with_pat(pat) + self._session_ok = True + + # --- single-field retrieval -------------------------------------------------- + + def fetch_all_fields(self, vault: str, title: str) -> dict[str, str]: + """ + Return all fields for *title* as a flat ``{field_name: value}`` dict. + + Always calls C(pass-cli) — no caching, so callers always receive the + current vault state. + """ + args = ["item", "view", "--item-title", title, "--output", "json"] + if vault: + args += ["--vault-name", vault] + + rc, out, err = self._run(args) + if rc != 0: + _raise_item_error(title, vault, None, err) + + if os.environ.get("ANSIBLE_PROTON_PASS_DEBUG"): + display.warning(f"proton_pass DEBUG — raw JSON for '{title}':\n{out}") + + try: + raw = json.loads(out) + except json.JSONDecodeError as e: + raise AnsibleLookupError( + f"pass-cli returned invalid JSON for item '{title}': {e}. " + "Set E(ANSIBLE_PROTON_PASS_DEBUG=1) to inspect the raw output." + ) from e + + return _parse_fields(raw, title) + + def fetch_field(self, vault: str, title: str, field: str) -> str: + """ + Return a single field value for *title*. + + Delegates to C(fetch_all_fields) so that trashed-item detection, + JSON parsing, and error messages are consistent regardless of whether + the caller requests one field or all fields. + """ + all_fields = self.fetch_all_fields(vault=vault, title=title) + if field not in all_fields: + vault_hint = f" in vault '{vault}'" if vault else "" + raise AnsibleLookupError( + f"proton_pass: field '{field}' not found in item '{title}'{vault_hint}. " + f"Available fields: {', '.join(sorted(all_fields)) or '(none)'}" + ) + return all_fields[field] + + +# --------------------------------------------------------------------------- +# Error helper +# --------------------------------------------------------------------------- + +def _raise_item_error(title: str, vault: str, field: str | None, stderr: str) -> None: + """Translate pass-cli stderr into a descriptive AnsibleLookupError.""" + msg = stderr.strip() + vault_hint = f" in vault '{vault}'" if vault else "" + field_hint = f", field '{field}'" if field else "" + + lower = msg.lower() + if "not found" in lower or "no items found" in lower: + raise AnsibleLookupError( + f"Item '{title}'{vault_hint} not found in Proton Pass{field_hint}. " + "Verify the item title and vault name." + ) + if "unauthorized" in lower or "unauthenticated" in lower: + raise AnsibleLookupError( + f"pass-cli authentication error: {msg}. " + "Re-run 'pass-cli login' or refresh your PAT." + ) + raise AnsibleLookupError( + f"pass-cli failed for item '{title}'{vault_hint}{field_hint}: {msg}" + ) + + +# --------------------------------------------------------------------------- +# JSON parser +# --------------------------------------------------------------------------- + +def _extract_scalar_fields(source: dict, keys: tuple, target: dict[str, str]) -> None: + """Copy *keys* from *source* into *target*, skipping None and empty-string values.""" + for key in keys: + val = source.get(key) + if val is not None and val != "": + target[key] = str(val) + + +def _parse_fields(raw: dict, title: str) -> dict[str, str]: + """ + Extract a flat ``{field_name: value}`` dict from C(pass-cli --output json). + + Handles both the current schema (C(pass-cli) 1.x, ``item.content``) and the + legacy schema (``data.content``). See the module notes for schema details. + """ + fields: dict[str, str] = {} + + if "item" in raw: + # ── New schema ──────────────────────────────────────────────────────── + item_obj = raw["item"] + state = item_obj.get("state", "") + if state.lower() == "trashed": + raise AnsibleLookupError( + f"proton_pass: item '{title}' is in the Proton Pass trash and cannot be " + "used. Restore it or permanently delete it." + ) + + content_wrapper = item_obj.get("content", {}) + note = content_wrapper.get("note") + if note: + fields["note"] = str(note) + + # Type-specific standard fields: content.content is {"Login": {...}} etc. + type_dict = content_wrapper.get("content", {}) + if isinstance(type_dict, dict): + for type_key, type_data in type_dict.items(): + if not isinstance(type_data, dict): + continue + tk = type_key.lower() + if tk == "login": + _extract_scalar_fields(type_data, _LOGIN_FIELDS, fields) + urls = type_data.get("urls") + if urls: + fields["urls"] = "\n".join(urls) if isinstance(urls, list) else str(urls) + elif tk in ("creditcard", "credit_card", "credit-card"): + _extract_scalar_fields(type_data, _CREDIT_CARD_FIELDS, fields) + elif tk == "wifi": + _extract_scalar_fields(type_data, _WIFI_FIELDS, fields) + elif tk in ("sshkey", "ssh_key", "ssh-key"): + _extract_scalar_fields(type_data, _SSH_KEY_FIELDS, fields) + elif tk == "identity": + _extract_scalar_fields(type_data, _IDENTITY_FIELDS, fields) + elif tk == "alias": + for alias_key in ("aliased_email", "aliased_address"): + val = type_data.get(alias_key) + if val: + fields["aliased_email"] = val + break + + _extract_extra_fields(content_wrapper.get("extra_fields", []), fields) + for section in content_wrapper.get("extra_sections", []): + _extract_extra_fields(section.get("extra_fields", []), fields) + + else: + # ── Legacy schema ───────────────────────────────────────────────────── + data = raw.get("data", raw) + content = data.get("content", data) + metadata = data.get("metadata", {}) + + item_type = content.get("item_type", "") + note = metadata.get("note") or content.get("note") + if note: + fields["note"] = str(note) + + if item_type in ("login", ""): + _extract_scalar_fields(content, _LOGIN_FIELDS, fields) + urls = content.get("urls") + if urls: + fields["urls"] = "\n".join(urls) if isinstance(urls, list) else str(urls) + elif item_type == "credit-card": + _extract_scalar_fields(content, _CREDIT_CARD_FIELDS, fields) + elif item_type == "wifi": + _extract_scalar_fields(content, _WIFI_FIELDS, fields) + elif item_type == "identity": + _extract_scalar_fields(content, _IDENTITY_FIELDS, fields) + elif item_type == "ssh-key": + _extract_scalar_fields(content, _SSH_KEY_FIELDS, fields) + elif item_type == "alias": + for alias_key in ("aliased_email", "aliased_address"): + val = content.get(alias_key) + if val: + fields["aliased_email"] = val + break + + _extract_extra_fields(content.get("extra_fields", []), fields) + for section in content.get("extra_sections", []): + _extract_extra_fields(section.get("extra_fields", []), fields) + + if not fields: + raise AnsibleLookupError( + f"proton_pass: no fields extracted from item '{title}'. " + "Ensure the item has a note, standard fields, or at least one extra " + "hidden field. Set E(ANSIBLE_PROTON_PASS_DEBUG=1) to inspect the raw " + "JSON." + ) + + return fields + + +def _extract_extra_fields(extra_fields: list[dict], target: dict[str, str]) -> None: + """ + Merge a C(pass-cli) extra_fields list into *target*. + + Handles two element shapes: + + New schema:: + + {"name": "key", "content": {"Hidden": "value"}} + + The type wrapper key (C(Hidden), C(Text), C(Totp), …) varies; the actual + value is always the first (and only) dict value. + + Legacy schema:: + + {"field_name": "key", "data": {"field_type": "hidden", "content": "value"}} + """ + for extra in extra_fields: + name = extra.get("field_name") or extra.get("name", "") + if not name: + continue + + content = extra.get("content") + if isinstance(content, dict): + # New schema: {"Hidden": "value"} — take the first non-None value + value = next((v for v in content.values() if v is not None), "") + elif isinstance(content, str): + value = content + else: + # Legacy schema: {"data": {"field_type": "...", "content": "value"}} + field_data = extra.get("data", {}) + if isinstance(field_data, dict): + value = field_data.get("content", "") + elif isinstance(field_data, str): + value = field_data + else: + value = str(field_data) if field_data else "" + + target[name] = str(value) if value is not None else "" + + +# --------------------------------------------------------------------------- +# Lookup module +# --------------------------------------------------------------------------- + +class LookupModule(LookupBase): + + def run(self, terms: list[str], variables: dict | None = None, **kwargs) -> list: + self.set_options(var_options=variables, direct=kwargs) + + vault = self.get_option("vault") or "" + pat = self.get_option("pat") or "" + agent_reason = self.get_option("agent_reason") or "" + cli_path = self.get_option("cli_path") or "pass-cli" + timeout = int(self.get_option("timeout")) + field_opt = self.get_option("field") or "" + + if not terms: + raise AnsibleLookupError( + "community.general.proton_pass requires at least one positional " + "argument (item title)" + ) + + # Two-term shorthand: lookup('proton_pass', 'item_title', 'field_name') + if len(terms) == 2: + item_titles = [terms[0]] + field = field_opt or terms[1] + else: + item_titles = list(terms) + field = field_opt + + if len(item_titles) > 1 and not field: + raise AnsibleOptionsError( + "community.general.proton_pass: fetching all fields for multiple " + "items in one call is not supported. Specify O(field) or look up " + "one item at a time." + ) + + client = ProtonPassClient(cli_path=cli_path, timeout=timeout, agent_reason=agent_reason) + client.ensure_authenticated(pat) + + results = [] + for title in item_titles: + if field: + results.append(client.fetch_field(vault=vault, title=title, field=field)) + else: + results.append(client.fetch_all_fields(vault=vault, title=title)) + + return results diff --git a/tests/unit/plugins/lookup/test_proton_pass.py b/tests/unit/plugins/lookup/test_proton_pass.py new file mode 100644 index 0000000000..6fbce2b8bb --- /dev/null +++ b/tests/unit/plugins/lookup/test_proton_pass.py @@ -0,0 +1,741 @@ +# Copyright (c) 2026, Jean Khawand (@jeankhawand) +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + +import json +import unittest +from unittest.mock import MagicMock, patch + +from ansible.errors import AnsibleLookupError, AnsibleOptionsError + +from ansible_collections.community.general.plugins.lookup.proton_pass import ( + LookupModule, + ProtonPassClient, + ProtonPassCLIError, + _extract_extra_fields, + _parse_fields, + _raise_item_error, +) + +# --------------------------------------------------------------------------- +# Fixtures — representative pass-cli --output json payloads +# --------------------------------------------------------------------------- + +MOCK_LOGIN_ITEM = { + "item_id": "abc123", + "share_id": "share456", + "data": { + "metadata": {"name": "example_item", "note": ""}, + "content": { + "item_type": "login", + "username": "", + "password": "correct-horse-battery", + "extra_fields": [ + { + "field_name": "api_token", + "data": {"field_type": "hidden", "content": "token_value_abc"}, + }, + { + "field_name": "backup_secret", + "data": {"field_type": "hidden", "content": "secure_value_1"}, + }, + ], + "extra_sections": [], + }, + }, +} + +MOCK_SECTIONED_ITEM = { + "item_id": "def456", + "share_id": "share789", + "data": { + "metadata": {"name": "grouped_item", "note": "test item with sections"}, + "content": { + "item_type": "custom", + "extra_fields": [], + "extra_sections": [ + { + "section_name": "Credentials", + "extra_fields": [ + { + "field_name": "backup_secret", + "data": {"field_type": "hidden", "content": "secure_value_2"}, + }, + { + "field_name": "storage_key", + "data": {"field_type": "hidden", "content": "secure_value_3"}, + }, + ], + } + ], + }, + }, +} + +MOCK_NOTE_ITEM = { + "data": { + "metadata": {"name": "notes_item", "note": "some free-text note"}, + "content": {"item_type": "note", "extra_fields": [], "extra_sections": []}, + } +} + +MOCK_LOGIN_URLS_ITEM = { + "data": { + "metadata": {"name": "web_login", "note": ""}, + "content": { + "item_type": "login", + "username": "user@example.com", + "email": "user@example.com", + "password": "secret", + "totp_uri": "otpauth://totp/example?secret=JBSWY3DPEHPK3PXP", + "urls": ["https://example.com", "https://app.example.com"], + "extra_fields": [], + "extra_sections": [], + }, + } +} + +MOCK_CREDIT_CARD_ITEM = { + "data": { + "metadata": {"name": "my_visa", "note": "backup card"}, + "content": { + "item_type": "credit-card", + "cardholder_name": "John Doe", + "card_type": "Visa", + "number": "4111111111111111", + "cvv": "123", + "expiration_date": "2027-12", + "pin": "1234", + "note": "backup card", + "extra_fields": [], + "extra_sections": [], + }, + } +} + +MOCK_WIFI_ITEM = { + "data": { + "metadata": {"name": "home_wifi", "note": ""}, + "content": { + "item_type": "wifi", + "ssid": "HomeNetwork", + "password": "wifipass123", + "security": "wpa2", + "extra_fields": [], + "extra_sections": [], + }, + } +} + +MOCK_IDENTITY_ITEM = { + "data": { + "metadata": {"name": "my_identity", "note": ""}, + "content": { + "item_type": "identity", + "first_name": "John", + "last_name": "Doe", + "email": "john@example.com", + "phone_number": "+1234567890", + "organization": "ACME", + "city": "Springfield", + "country_or_region": "US", + "extra_fields": [], + "extra_sections": [], + }, + } +} + +MOCK_SSH_KEY_ITEM = { + "data": { + "metadata": {"name": "deploy_key", "note": ""}, + "content": { + "item_type": "ssh-key", + "private_key": "-----BEGIN OPENSSH PRIVATE KEY-----\nfake\n-----END OPENSSH PRIVATE KEY-----", + "public_key": "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFake deploy@host", + "fingerprint": "SHA256:abc123fake", + "key_type": "ed25519", + "extra_fields": [], + "extra_sections": [], + }, + } +} + +MOCK_ALIAS_ITEM = { + "data": { + "metadata": {"name": "shopping_alias", "note": ""}, + "content": { + "item_type": "alias", + "aliased_email": "shopping.abc123@pm.me", + "extra_fields": [], + "extra_sections": [], + }, + } +} + +MOCK_UNKNOWN_TYPE_ITEM = { + "data": { + "metadata": {"name": "legacy", "note": ""}, + "content": { + "username": "admin", + "password": "legacypass", + "extra_fields": [], + "extra_sections": [], + }, + } +} + +# New pass-cli schema: {"item": {"content": {...}, "state": "Active"}} +MOCK_NEW_SCHEMA_ITEM = { + "item": { + "id": "yaA6QBGqS0JTdzB1X8", + "content": { + "title": "jix_test", + "note": "", + "content": { + "Login": { + "email": "", + "username": "admin", + "password": "secret123", + "urls": [], + "totp_uri": "", + "passkeys": [], + } + }, + "extra_fields": [ + {"name": "public_key", "content": {"Hidden": "age1abc123"}}, + {"name": "test_field", "content": {"Text": "hunter2"}}, + ], + "extra_sections": [], + }, + "state": "Active", + }, + "attachments": [], +} + +MOCK_TRASHED_ITEM = { + "item": { + "id": "trashed123", + "content": { + "title": "old_item", + "note": "", + "content": { + "Login": { + "username": "x", + "password": "y", + "urls": [], + "totp_uri": "", + "passkeys": [], + } + }, + "extra_fields": [{"name": "secret", "content": {"Hidden": "value"}}], + "extra_sections": [], + }, + "state": "Trashed", + }, + "attachments": [], +} + + +# --------------------------------------------------------------------------- +# _extract_extra_fields +# --------------------------------------------------------------------------- + +class TestExtractExtraFields(unittest.TestCase): + + def test_hidden_field(self): + target: dict = {} + _extract_extra_fields( + [{"field_name": "my_key", "data": {"field_type": "hidden", "content": "secret"}}], + target, + ) + self.assertEqual(target, {"my_key": "secret"}) + + def test_flat_string_data(self): + """Tolerate data as a plain string (schema variant).""" + target: dict = {} + _extract_extra_fields( + [{"field_name": "key", "data": "plain_value"}], + target, + ) + self.assertEqual(target, {"key": "plain_value"}) + + def test_alternate_name_key(self): + """Tolerate 'name' key instead of 'field_name'.""" + target: dict = {} + _extract_extra_fields( + [{"name": "alt_key", "data": {"content": "alt_val"}}], + target, + ) + self.assertEqual(target, {"alt_key": "alt_val"}) + + def test_skips_entries_without_name(self): + target: dict = {} + _extract_extra_fields([{"data": {"content": "orphan"}}], target) + self.assertEqual(target, {}) + + def test_multiple_fields(self): + target: dict = {} + fields = [ + {"field_name": "a", "data": {"content": "1"}}, + {"field_name": "b", "data": {"content": "2"}}, + ] + _extract_extra_fields(fields, target) + self.assertEqual(target, {"a": "1", "b": "2"}) + + +# --------------------------------------------------------------------------- +# _parse_fields +# --------------------------------------------------------------------------- + +class TestParseFields(unittest.TestCase): + + def test_login_item_with_extra_fields(self): + fields = _parse_fields(MOCK_LOGIN_ITEM, "example_item") + self.assertEqual(fields["api_token"], "token_value_abc") + self.assertEqual(fields["backup_secret"], "secure_value_1") + self.assertEqual(fields["password"], "correct-horse-battery") + + def test_sectioned_item(self): + fields = _parse_fields(MOCK_SECTIONED_ITEM, "grouped_item") + self.assertEqual(fields["backup_secret"], "secure_value_2") + self.assertEqual(fields["storage_key"], "secure_value_3") + + def test_note_item(self): + fields = _parse_fields(MOCK_NOTE_ITEM, "notes_item") + self.assertIn("note", fields) + self.assertEqual(fields["note"], "some free-text note") + + def test_empty_item_raises(self): + empty = {"data": {"metadata": {}, "content": {"extra_fields": [], "extra_sections": []}}} + with self.assertRaises(AnsibleLookupError) as ctx: + _parse_fields(empty, "empty_item") + self.assertIn("empty_item", str(ctx.exception)) + + def test_flat_envelope_fallback(self): + """When there is no 'data' wrapper the parser should still work.""" + flat = { + "metadata": {"name": "flat", "note": ""}, + "content": { + "extra_fields": [ + {"field_name": "key", "data": {"content": "val"}} + ], + "extra_sections": [], + }, + } + fields = _parse_fields(flat, "flat") + self.assertEqual(fields["key"], "val") + + def test_login_urls_joined_with_newline(self): + fields = _parse_fields(MOCK_LOGIN_URLS_ITEM, "web_login") + self.assertEqual(fields["username"], "user@example.com") + self.assertEqual(fields["password"], "secret") + self.assertEqual(fields["totp_uri"], "otpauth://totp/example?secret=JBSWY3DPEHPK3PXP") + self.assertEqual(fields["urls"], "https://example.com\nhttps://app.example.com") + + def test_credit_card_item(self): + fields = _parse_fields(MOCK_CREDIT_CARD_ITEM, "my_visa") + self.assertEqual(fields["cardholder_name"], "John Doe") + self.assertEqual(fields["card_type"], "Visa") + self.assertEqual(fields["number"], "4111111111111111") + self.assertEqual(fields["cvv"], "123") + self.assertEqual(fields["expiration_date"], "2027-12") + self.assertEqual(fields["pin"], "1234") + + def test_wifi_item(self): + fields = _parse_fields(MOCK_WIFI_ITEM, "home_wifi") + self.assertEqual(fields["ssid"], "HomeNetwork") + self.assertEqual(fields["password"], "wifipass123") + self.assertEqual(fields["security"], "wpa2") + self.assertNotIn("username", fields) + + def test_identity_item(self): + fields = _parse_fields(MOCK_IDENTITY_ITEM, "my_identity") + self.assertEqual(fields["first_name"], "John") + self.assertEqual(fields["last_name"], "Doe") + self.assertEqual(fields["email"], "john@example.com") + self.assertEqual(fields["organization"], "ACME") + self.assertEqual(fields["city"], "Springfield") + self.assertEqual(fields["country_or_region"], "US") + + def test_ssh_key_item(self): + fields = _parse_fields(MOCK_SSH_KEY_ITEM, "deploy_key") + self.assertIn("-----BEGIN OPENSSH PRIVATE KEY-----", fields["private_key"]) + self.assertIn("ssh-ed25519", fields["public_key"]) + self.assertEqual(fields["fingerprint"], "SHA256:abc123fake") + self.assertEqual(fields["key_type"], "ed25519") + + def test_alias_item(self): + fields = _parse_fields(MOCK_ALIAS_ITEM, "shopping_alias") + self.assertEqual(fields["aliased_email"], "shopping.abc123@pm.me") + + def test_alias_item_aliased_address_fallback(self): + """Support aliased_address as an alternative key name.""" + item = { + "data": { + "metadata": {"name": "alias2", "note": ""}, + "content": { + "item_type": "alias", + "aliased_address": "alt.abc@pm.me", + "extra_fields": [], + "extra_sections": [], + }, + } + } + fields = _parse_fields(item, "alias2") + self.assertEqual(fields["aliased_email"], "alt.abc@pm.me") + + def test_unknown_type_falls_back_to_login_fields(self): + """Items with no item_type key should still extract username/password.""" + fields = _parse_fields(MOCK_UNKNOWN_TYPE_ITEM, "legacy") + self.assertEqual(fields["username"], "admin") + self.assertEqual(fields["password"], "legacypass") + + def test_note_in_content_field(self): + """note field present in content (not just metadata) should be captured.""" + item = { + "data": { + "metadata": {"name": "card", "note": ""}, + "content": { + "item_type": "credit-card", + "number": "4111111111111111", + "note": "for online purchases", + "extra_fields": [], + "extra_sections": [], + }, + } + } + fields = _parse_fields(item, "card") + self.assertEqual(fields["note"], "for online purchases") + self.assertEqual(fields["number"], "4111111111111111") + + def test_new_schema_login_item(self): + """New pass-cli schema (item wrapper) parses login fields and custom extra fields.""" + fields = _parse_fields(MOCK_NEW_SCHEMA_ITEM, "jix_test") + self.assertEqual(fields["username"], "admin") + self.assertEqual(fields["password"], "secret123") + self.assertEqual(fields["public_key"], "age1abc123") + self.assertEqual(fields["test_field"], "hunter2") + + def test_trashed_item_raises(self): + """Items in the Proton Pass trash must raise AnsibleLookupError.""" + with self.assertRaises(AnsibleLookupError) as ctx: + _parse_fields(MOCK_TRASHED_ITEM, "old_item") + self.assertIn("trash", str(ctx.exception).lower()) + self.assertIn("old_item", str(ctx.exception)) + + +# --------------------------------------------------------------------------- +# _extract_extra_fields — new vs legacy schema +# --------------------------------------------------------------------------- + +class TestExtractExtraFields(unittest.TestCase): + + def test_new_schema_hidden_field(self): + """New schema: {"name": "key", "content": {"Hidden": "value"}}""" + from ansible_collections.community.general.plugins.lookup.proton_pass import _extract_extra_fields + target = {} + _extract_extra_fields( + [{"name": "my_key", "content": {"Hidden": "secret_val"}}], + target, + ) + self.assertEqual(target, {"my_key": "secret_val"}) + + def test_new_schema_text_field(self): + """New schema: {"name": "key", "content": {"Text": "value"}}""" + from ansible_collections.community.general.plugins.lookup.proton_pass import _extract_extra_fields + target = {} + _extract_extra_fields( + [{"name": "my_key", "content": {"Text": "text_val"}}], + target, + ) + self.assertEqual(target, {"my_key": "text_val"}) + + def test_legacy_schema_field(self): + """Legacy schema: {"field_name": "key", "data": {"content": "value"}}""" + from ansible_collections.community.general.plugins.lookup.proton_pass import _extract_extra_fields + target = {} + _extract_extra_fields( + [{"field_name": "old_key", "data": {"field_type": "hidden", "content": "old_val"}}], + target, + ) + self.assertEqual(target, {"old_key": "old_val"}) + + def test_skips_field_with_no_name(self): + from ansible_collections.community.general.plugins.lookup.proton_pass import _extract_extra_fields + target = {} + _extract_extra_fields([{"content": {"Hidden": "val"}}], target) + self.assertEqual(target, {}) + + +# --------------------------------------------------------------------------- +# _raise_item_error +# --------------------------------------------------------------------------- + +class TestRaiseItemError(unittest.TestCase): + + def test_not_found_stderr(self): + with self.assertRaises(AnsibleLookupError) as ctx: + _raise_item_error("vm", "vault", None, "Error: item not found") + self.assertIn("not found", str(ctx.exception).lower()) + self.assertIn("vm", str(ctx.exception)) + + def test_unauthorized_stderr(self): + with self.assertRaises(AnsibleLookupError) as ctx: + _raise_item_error("vm", "vault", None, "Error: unauthorized access") + self.assertIn("authentication", str(ctx.exception).lower()) + + def test_generic_error(self): + with self.assertRaises(AnsibleLookupError) as ctx: + _raise_item_error("vm", "vault", "api_key", "unexpected failure") + self.assertIn("unexpected failure", str(ctx.exception)) + self.assertIn("api_key", str(ctx.exception)) + + +# --------------------------------------------------------------------------- +# ProtonPassClient +# --------------------------------------------------------------------------- + +def _make_popen_mock(returncode: int, stdout: bytes, stderr: bytes): + mock_proc = MagicMock() + mock_proc.returncode = returncode + mock_proc.communicate.return_value = (stdout, stderr) + return mock_proc + + +class TestProtonPassClientRun(unittest.TestCase): + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_successful_run(self, mock_popen): + mock_popen.return_value = _make_popen_mock(0, b"output\n", b"") + client = ProtonPassClient() + rc, out, err = client._run(["test"]) + self.assertEqual(rc, 0) + self.assertEqual(out, "output\n") + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_missing_binary_raises(self, mock_popen): + mock_popen.side_effect = FileNotFoundError() + client = ProtonPassClient(cli_path="/no/such/binary") + with self.assertRaises(ProtonPassCLIError) as ctx: + client._run(["test"]) + self.assertIn("not found", str(ctx.exception)) + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_agent_reason_injected_into_env(self, mock_popen): + """PROTON_PASS_AGENT_REASON must be present in the subprocess env when set.""" + mock_popen.return_value = _make_popen_mock(0, b"ok\n", b"") + client = ProtonPassClient(agent_reason="nightly deployment") + client._run(["test"]) + _call_args, kwargs = mock_popen.call_args + self.assertEqual(kwargs["env"].get("PROTON_PASS_AGENT_REASON"), "nightly deployment") + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_no_agent_reason_does_not_override_env(self, mock_popen): + """When agent_reason is empty, PROTON_PASS_AGENT_REASON should not be forced.""" + mock_popen.return_value = _make_popen_mock(0, b"ok\n", b"") + client = ProtonPassClient(agent_reason="") + client._run(["test"]) + _call_args, kwargs = mock_popen.call_args + # The key may be inherited from the real environment — we only assert it + # was not forcibly added by the plugin logic (value will be None if absent). + env = kwargs["env"] + self.assertNotEqual(env.get("PROTON_PASS_AGENT_REASON"), "") # not set to empty + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_timeout_raises(self, mock_popen): + from subprocess import TimeoutExpired + mock_proc = MagicMock() + mock_proc.communicate.side_effect = TimeoutExpired(cmd="pass-cli", timeout=30) + mock_popen.return_value = mock_proc + client = ProtonPassClient(timeout=30) + with self.assertRaises(ProtonPassCLIError) as ctx: + client._run(["test"]) + self.assertIn("timed out", str(ctx.exception)) + + +class TestProtonPassClientAuth(unittest.TestCase): + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_test_session_returns_true_on_rc0(self, mock_popen): + mock_popen.return_value = _make_popen_mock(0, b"", b"") + client = ProtonPassClient() + self.assertTrue(client.test_session()) + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_test_session_returns_false_on_nonzero(self, mock_popen): + mock_popen.return_value = _make_popen_mock(1, b"", b"not logged in\n") + client = ProtonPassClient() + self.assertFalse(client.test_session()) + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_ensure_authenticated_already_ok(self, mock_popen): + client = ProtonPassClient() + client._session_ok = True + # ensure_authenticated must not invoke pass-cli at all + client.ensure_authenticated(pat="") + mock_popen.assert_not_called() + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_ensure_authenticated_no_session_no_pat_raises(self, mock_popen): + mock_popen.return_value = _make_popen_mock(1, b"", b"unauthenticated\n") + client = ProtonPassClient() + with self.assertRaises(AnsibleLookupError) as ctx: + client.ensure_authenticated(pat="") + self.assertIn("PAT", str(ctx.exception)) + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_ensure_authenticated_logs_in_with_pat(self, mock_popen): + # First call (test): unauthenticated; second call (login): success + mock_popen.side_effect = [ + _make_popen_mock(1, b"", b"unauthenticated\n"), + _make_popen_mock(0, b"", b""), + ] + client = ProtonPassClient() + client.ensure_authenticated(pat="pst_token::key") + self.assertEqual(mock_popen.call_count, 2) + + +class TestProtonPassClientFetch(unittest.TestCase): + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_fetch_field_success(self, mock_popen): + """fetch_field delegates to fetch_all_fields and returns the requested field.""" + payload = json.dumps(MOCK_LOGIN_ITEM).encode() + mock_popen.return_value = _make_popen_mock(0, payload, b"") + client = ProtonPassClient() + result = client.fetch_field(vault="vault", title="example_item", field="api_token") + self.assertEqual(result, "token_value_abc") + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_fetch_field_not_found_raises(self, mock_popen): + mock_popen.return_value = _make_popen_mock(1, b"", b"Error: item not found\n") + client = ProtonPassClient() + with self.assertRaises(AnsibleLookupError): + client.fetch_field(vault="vault", title="missing", field="password") + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_fetch_field_missing_field_raises(self, mock_popen): + """fetch_field raises with available fields listed when field is absent from item.""" + payload = json.dumps(MOCK_LOGIN_ITEM).encode() + mock_popen.return_value = _make_popen_mock(0, payload, b"") + client = ProtonPassClient() + with self.assertRaises(AnsibleLookupError) as ctx: + client.fetch_field(vault="vault", title="example_item", field="nonexistent_key") + err = str(ctx.exception) + self.assertIn("nonexistent_key", err) + self.assertIn("Available fields", err) + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_fetch_all_fields_success(self, mock_popen): + payload = json.dumps(MOCK_LOGIN_ITEM).encode() + mock_popen.return_value = _make_popen_mock(0, payload, b"") + client = ProtonPassClient() + result = client.fetch_all_fields(vault="vault", title="example_item") + self.assertEqual(result["api_token"], "token_value_abc") + self.assertEqual(result["backup_secret"], "secure_value_1") + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.Popen") + def test_fetch_all_fields_invalid_json_raises(self, mock_popen): + mock_popen.return_value = _make_popen_mock(0, b"not json {{", b"") + client = ProtonPassClient() + with self.assertRaises(AnsibleLookupError) as ctx: + client.fetch_all_fields(vault="vault", title="bad_item") + self.assertIn("invalid JSON", str(ctx.exception)) + + +# --------------------------------------------------------------------------- +# LookupModule integration +# --------------------------------------------------------------------------- + +class TestLookupModule(unittest.TestCase): + """ + Integration tests for LookupModule.run(). + + Ansible's config system requires a fully initialised plugin (via lookup_loader + or ansible-test) to resolve options declared in DOCUMENTATION. In a plain + unittest environment lookup_loader returns None, so these tests patch + set_options (no-op) and get_option (returns values from a local dict) to + exercise the routing logic without depending on the Ansible config manager. + Use ``ansible-test unit`` for a full integration test in CI. + """ + + def setUp(self): + self.lookup = LookupModule() + + def _run(self, terms, vault="", field="", pat="", agent_reason="", + cli_path="pass-cli", timeout=30): + """Run with Ansible option plumbing patched out.""" + options = {"vault": vault, "field": field, "pat": pat, + "agent_reason": agent_reason, "cli_path": cli_path, "timeout": timeout} + with patch.object(self.lookup, "set_options"): + with patch.object(self.lookup, "get_option", side_effect=lambda k: options.get(k, "")): + return self.lookup.run(terms) + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.ProtonPassClient") + def test_all_fields_mode(self, mock_client_cls): + mock_client = MagicMock() + mock_client.fetch_all_fields.return_value = {"api_key": "secret123"} + mock_client_cls.return_value = mock_client + + result = self._run(["vm"], vault="myvault") + self.assertEqual(result, [{"api_key": "secret123"}]) + mock_client.fetch_all_fields.assert_called_once_with(vault="myvault", title="vm") + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.ProtonPassClient") + def test_single_field_via_second_term(self, mock_client_cls): + mock_client = MagicMock() + mock_client.fetch_field.return_value = "secret123" + mock_client_cls.return_value = mock_client + + result = self._run(["vm", "api_key"], vault="myvault") + self.assertEqual(result, ["secret123"]) + mock_client.fetch_field.assert_called_once_with( + vault="myvault", title="vm", field="api_key" + ) + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.ProtonPassClient") + def test_single_field_via_kwarg(self, mock_client_cls): + mock_client = MagicMock() + mock_client.fetch_field.return_value = "secret123" + mock_client_cls.return_value = mock_client + + result = self._run(["vm"], vault="myvault", field="api_key") + self.assertEqual(result, ["secret123"]) + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.ProtonPassClient") + def test_field_kwarg_overrides_second_term(self, mock_client_cls): + """O(field) takes precedence over the second positional term.""" + mock_client = MagicMock() + mock_client.fetch_field.return_value = "secret123" + mock_client_cls.return_value = mock_client + + self._run(["vm", "ignored_term"], vault="myvault", field="real_field") + mock_client.fetch_field.assert_called_once_with( + vault="myvault", title="vm", field="real_field" + ) + + @patch("ansible_collections.community.general.plugins.lookup.proton_pass.ProtonPassClient") + def test_multiple_items_with_field(self, mock_client_cls): + # Three terms required: two terms trigger the (item, field) shorthand, + # so three or more terms are needed to look up multiple items. + mock_client = MagicMock() + mock_client.fetch_field.side_effect = ["pass_a", "pass_b", "pass_c"] + mock_client_cls.return_value = mock_client + + result = self._run(["host_a", "host_b", "host_c"], vault="", field="gpg_keyphrase") + self.assertEqual(result, ["pass_a", "pass_b", "pass_c"]) + + def test_multiple_items_no_field_raises(self): + # Three terms without field= should raise AnsibleOptionsError. + with self.assertRaises(AnsibleOptionsError): + self._run(["host_a", "host_b", "host_c"], vault="") + + def test_no_terms_raises(self): + with self.assertRaises(AnsibleLookupError): + self._run([], vault="myvault") + + +if __name__ == "__main__": + unittest.main()