mirror of
https://github.com/ansible-collections/community.general.git
synced 2026-06-10 18:15:39 +00:00
feat(onepassword_info): rewrite module to support op CLI v2 only
Drop v1 syntax (get item, --output=raw, documentAttributes, nested fields/sections). Add CmdRunner-based auth and item fetch using v2 commands (account list/get, signin --raw, account add --raw --signin, item get --format json). Fix config_file_path None guard in get_token. Remove global module antipattern. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
3378d0a202
commit
d01402671c
2 changed files with 114 additions and 144 deletions
|
|
@ -9,6 +9,29 @@ from __future__ import annotations
|
|||
|
||||
import os
|
||||
|
||||
from ansible_collections.community.general.plugins.module_utils import _cmd_runner_fmt as fmt
|
||||
from ansible_collections.community.general.plugins.module_utils._cmd_runner import CmdRunner
|
||||
|
||||
|
||||
def onepassword_runner(module, command):
|
||||
return CmdRunner(
|
||||
module,
|
||||
command=command,
|
||||
arg_formats=dict(
|
||||
_account_list=fmt.as_fixed(["account", "list"]),
|
||||
_account_get=fmt.as_fixed(["account", "get"]),
|
||||
_signin=fmt.as_fixed(["signin", "--raw"]),
|
||||
_account_add=fmt.as_fixed(["account", "add", "--raw", "--signin"]),
|
||||
_item_get=fmt.as_fixed(["item", "get", "--format", "json"]),
|
||||
account=fmt.as_opt_val("--account"),
|
||||
address=fmt.as_opt_val("--address"),
|
||||
email=fmt.as_opt_val("--email"),
|
||||
item_id=fmt.as_list(),
|
||||
vault=fmt.as_opt_eq_val("--vault"),
|
||||
session=fmt.as_optval("--session="),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class OnePasswordConfig:
|
||||
_config_file_paths = (
|
||||
|
|
|
|||
|
|
@ -14,9 +14,8 @@ module: onepassword_info
|
|||
author:
|
||||
- Ryan Conway (@Rylon)
|
||||
requirements:
|
||||
- C(op) 1Password command line utility. See U(https://support.1password.com/command-line/)
|
||||
- C(op) 1Password command line utility version 2 or later. See U(https://support.1password.com/command-line/)
|
||||
notes:
|
||||
- Tested with C(op) version 0.5.5.
|
||||
- Based on the P(community.general.onepassword#lookup) lookup plugin by Scott Buchanan <sbuchanan@ri.pn>.
|
||||
short_description: Gather items from 1Password
|
||||
description:
|
||||
|
|
@ -43,8 +42,7 @@ options:
|
|||
field:
|
||||
type: str
|
||||
description:
|
||||
- The name of the field to search for within this item (optional, defaults to V(password), or V(document) if the
|
||||
item has an attachment).
|
||||
- The name of the field to search for within this item (optional, defaults to V(password)).
|
||||
section:
|
||||
type: str
|
||||
description:
|
||||
|
|
@ -124,7 +122,6 @@ EXAMPLES = r"""
|
|||
field: Custom field name # optional, defaults to 'password'
|
||||
section: Custom section name # optional, defaults to 'None'
|
||||
vault: Name of the vault # optional, only necessary if there is more than 1 Vault available
|
||||
- name: A 1Password item with document attachment
|
||||
delegate_to: localhost
|
||||
register: my_1password_item
|
||||
no_log: true # Don't want to log the secrets to the console!
|
||||
|
|
@ -148,89 +145,58 @@ onepassword:
|
|||
Custom field name: the value of this field
|
||||
"My Other 1Password item":
|
||||
password: the value of this field
|
||||
"A 1Password item with document attachment":
|
||||
document: the contents of the document attached to this item
|
||||
"""
|
||||
|
||||
|
||||
import errno
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
from ansible.module_utils.common.text.converters import to_bytes, to_native
|
||||
from ansible.module_utils.common.text.converters import to_bytes
|
||||
|
||||
from ansible_collections.community.general.plugins.module_utils._onepassword import OnePasswordConfig
|
||||
|
||||
|
||||
class AnsibleModuleError(Exception):
|
||||
def __init__(self, results):
|
||||
self.results = results
|
||||
|
||||
def __repr__(self):
|
||||
return self.results
|
||||
from ansible_collections.community.general.plugins.module_utils._onepassword import (
|
||||
OnePasswordConfig,
|
||||
onepassword_runner,
|
||||
)
|
||||
|
||||
|
||||
class OnePasswordInfo:
|
||||
def __init__(self):
|
||||
self.cli_path = module.params.get("cli_path")
|
||||
self.auto_login = module.params.get("auto_login")
|
||||
def __init__(self, module):
|
||||
self.module = module
|
||||
self.cli_path = self.module.params.get("cli_path")
|
||||
self.auto_login = self.module.params.get("auto_login")
|
||||
self.logged_in = False
|
||||
self.token = None
|
||||
|
||||
terms = module.params.get("search_terms")
|
||||
terms = self.module.params.get("search_terms")
|
||||
self.terms = self.parse_search_terms(terms)
|
||||
|
||||
self._config = OnePasswordConfig()
|
||||
|
||||
def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False):
|
||||
if self.token:
|
||||
# Adds the session token to all commands if we're logged in.
|
||||
args += [to_bytes("--session=") + self.token]
|
||||
|
||||
command = [self.cli_path] + args
|
||||
rc, out, err = module.run_command(command, data=command_input, check_rc=False, binary_data=True, encoding=None)
|
||||
if not ignore_errors and rc != expected_rc:
|
||||
raise AnsibleModuleError(to_native(err))
|
||||
return rc, out, err
|
||||
self._runner = onepassword_runner(self.module, self.cli_path)
|
||||
|
||||
def _parse_field(self, data_json, item_id, field_name, section_title=None):
|
||||
data = json.loads(data_json)
|
||||
field_name_lower = field_name.lower()
|
||||
|
||||
if "documentAttributes" in data["details"]:
|
||||
# This is actually a document, let's fetch the document data instead!
|
||||
document = self._run(["get", "document", data["overview"]["title"]])
|
||||
return {"document": document[1].strip()}
|
||||
|
||||
else:
|
||||
# This is not a document, let's try to find the requested field
|
||||
|
||||
# Some types of 1Password items have a 'password' field directly alongside the 'fields' attribute,
|
||||
# not inside it, so we need to check there first.
|
||||
if field_name in data["details"]:
|
||||
return {field_name: data["details"][field_name]}
|
||||
|
||||
# Otherwise we continue looking inside the 'fields' attribute for the specified field.
|
||||
for field in data.get("fields", []):
|
||||
if section_title is None:
|
||||
if field.get(field_name_lower):
|
||||
return {field_name: field.get(field_name_lower)}
|
||||
if field.get("label", "").lower() == field_name_lower:
|
||||
return {field_name: field.get("value", "")}
|
||||
if field.get("id", "").lower() == field_name_lower:
|
||||
return {field_name: field.get("value", "")}
|
||||
else:
|
||||
if section_title is None:
|
||||
for field_data in data["details"].get("fields", []):
|
||||
if field_data.get("name", "").lower() == field_name.lower():
|
||||
return {field_name: field_data.get("value", "")}
|
||||
section = field.get("section", {})
|
||||
current_section = section.get("label", section.get("id", "")).lower()
|
||||
if section_title.lower() == current_section:
|
||||
if field.get("label", "").lower() == field_name_lower:
|
||||
return {field_name: field.get("value", "")}
|
||||
if field.get("id", "").lower() == field_name_lower:
|
||||
return {field_name: field.get("value", "")}
|
||||
|
||||
# Not found it yet, so now lets see if there are any sections defined
|
||||
# and search through those for the field. If a section was given, we skip
|
||||
# any non-matching sections, otherwise we search them all until we find the field.
|
||||
for section_data in data["details"].get("sections", []):
|
||||
if section_title is not None and section_title.lower() != section_data["title"].lower():
|
||||
continue
|
||||
for field_data in section_data.get("fields", []):
|
||||
if field_data.get("t", "").lower() == field_name.lower():
|
||||
return {field_name: field_data.get("v", "")}
|
||||
|
||||
# We will get here if the field could not be found in any section and the item wasn't a document to be downloaded.
|
||||
optional_section_title = "" if section_title is None else f" in the section '{section_title}'"
|
||||
module.fail_json(
|
||||
self.module.fail_json(
|
||||
msg=f"Unable to find an item in 1Password named '{item_id}' with the field '{field_name}'{optional_section_title}."
|
||||
)
|
||||
|
||||
|
|
@ -242,7 +208,7 @@ class OnePasswordInfo:
|
|||
term = {"name": term}
|
||||
|
||||
if "name" not in term:
|
||||
module.fail_json(msg=f"Missing required 'name' field from search term, got: '{term}'")
|
||||
self.module.fail_json(msg=f"Missing required 'name' field from search term, got: '{term}'")
|
||||
|
||||
term["field"] = term.get("field", "password")
|
||||
term["section"] = term.get("section")
|
||||
|
|
@ -253,94 +219,80 @@ class OnePasswordInfo:
|
|||
return processed_terms
|
||||
|
||||
def get_raw(self, item_id, vault=None):
|
||||
try:
|
||||
args = ["get", "item", item_id]
|
||||
if vault is not None:
|
||||
args += [f"--vault={vault}"]
|
||||
rc, output, dummy = self._run(args)
|
||||
return output
|
||||
|
||||
except Exception as e:
|
||||
if re.search(".*not found.*", f"{e}"):
|
||||
module.fail_json(msg=f"Unable to find an item in 1Password named '{item_id}'.")
|
||||
else:
|
||||
module.fail_json(msg=f"Unexpected error attempting to find an item in 1Password named '{item_id}': {e}")
|
||||
with self._runner("_item_get item_id vault session") as ctx:
|
||||
rc, out, err = ctx.run(item_id=item_id, vault=vault, session=self.token)
|
||||
if rc != 0:
|
||||
if "not found" in err.lower():
|
||||
self.module.fail_json(msg=f"Unable to find an item in 1Password named '{item_id}'.")
|
||||
self.module.fail_json(
|
||||
msg=f"Unexpected error attempting to find an item in 1Password named '{item_id}': {err}"
|
||||
)
|
||||
return out
|
||||
|
||||
def get_field(self, item_id, field, section=None, vault=None):
|
||||
output = self.get_raw(item_id, vault)
|
||||
return self._parse_field(output, item_id, field, section) if output != "" else ""
|
||||
return self._parse_field(output, item_id, field, section) if output else ""
|
||||
|
||||
def full_login(self):
|
||||
if self.auto_login is not None:
|
||||
if None in [
|
||||
self.auto_login.get("subdomain"),
|
||||
self.auto_login.get("username"),
|
||||
self.auto_login.get("secret_key"),
|
||||
self.auto_login.get("master_password"),
|
||||
]:
|
||||
module.fail_json(
|
||||
msg="Unable to perform initial sign in to 1Password. "
|
||||
"subdomain, username, secret_key, and master_password are required to perform initial sign in."
|
||||
)
|
||||
|
||||
args = [
|
||||
"signin",
|
||||
f"{self.auto_login['subdomain']}.1password.com",
|
||||
to_bytes(self.auto_login["username"]),
|
||||
to_bytes(self.auto_login["secret_key"]),
|
||||
"--output=raw",
|
||||
]
|
||||
|
||||
try:
|
||||
rc, out, err = self._run(args, command_input=to_bytes(self.auto_login["master_password"]))
|
||||
self.token = out.strip()
|
||||
except AnsibleModuleError as e:
|
||||
module.fail_json(msg=f"Failed to perform initial sign in to 1Password: {e}")
|
||||
else:
|
||||
module.fail_json(
|
||||
if self.auto_login is None:
|
||||
self.module.fail_json(
|
||||
msg=f"Unable to perform an initial sign in to 1Password. Please run '{self.cli_path} signin' "
|
||||
"or define credentials in 'auto_login'. See the module documentation for details."
|
||||
)
|
||||
|
||||
if None in [
|
||||
self.auto_login.get("subdomain"),
|
||||
self.auto_login.get("username"),
|
||||
self.auto_login.get("secret_key"),
|
||||
self.auto_login.get("master_password"),
|
||||
]:
|
||||
self.module.fail_json(
|
||||
msg="Unable to perform initial sign in to 1Password. "
|
||||
"subdomain, username, secret_key, and master_password are required to perform initial sign in."
|
||||
)
|
||||
|
||||
subdomain = self.auto_login["subdomain"]
|
||||
with self._runner(
|
||||
"_account_add address email",
|
||||
data=to_bytes(self.auto_login["master_password"]),
|
||||
environ_update={"OP_SECRET_KEY": self.auto_login["secret_key"]},
|
||||
) as ctx:
|
||||
rc, out, err = ctx.run(
|
||||
address=f"{subdomain}.1password.com",
|
||||
email=self.auto_login["username"],
|
||||
)
|
||||
if rc != 0:
|
||||
self.module.fail_json(msg=f"Failed to perform initial sign in to 1Password: {err}")
|
||||
self.token = out.strip()
|
||||
|
||||
def get_token(self):
|
||||
# If the config file exists, assume an initial signin has taken place and try basic sign in
|
||||
if os.path.isfile(self._config.config_file_path):
|
||||
if self.auto_login is not None:
|
||||
# Since we are not currently signed in, master_password is required at a minimum
|
||||
if not self.auto_login.get("master_password"):
|
||||
module.fail_json(msg="Unable to sign in to 1Password. 'auto_login.master_password' is required.")
|
||||
config_path = self._config.config_file_path
|
||||
if config_path and os.path.isfile(config_path) and self.auto_login is not None:
|
||||
if not self.auto_login.get("master_password"):
|
||||
self.module.fail_json(msg="Unable to sign in to 1Password. 'auto_login.master_password' is required.")
|
||||
|
||||
# Try signing in using the master_password and a subdomain if one is provided
|
||||
try:
|
||||
args = ["signin", "--output=raw"]
|
||||
with self._runner("_signin account", data=to_bytes(self.auto_login["master_password"])) as ctx:
|
||||
rc, out, err = ctx.run(account=self.auto_login.get("subdomain"))
|
||||
if rc == 0:
|
||||
self.token = out.strip()
|
||||
return
|
||||
|
||||
if self.auto_login.get("subdomain"):
|
||||
args = ["signin", self.auto_login["subdomain"], "--output=raw"]
|
||||
|
||||
rc, out, err = self._run(args, command_input=to_bytes(self.auto_login["master_password"]))
|
||||
self.token = out.strip()
|
||||
|
||||
except AnsibleModuleError:
|
||||
self.full_login()
|
||||
|
||||
else:
|
||||
self.full_login()
|
||||
|
||||
else:
|
||||
# Attempt a full sign in since there appears to be no existing sign in
|
||||
self.full_login()
|
||||
self.full_login()
|
||||
|
||||
def assert_logged_in(self):
|
||||
try:
|
||||
rc, out, err = self._run(["get", "account"], ignore_errors=True)
|
||||
subdomain = self.auto_login.get("subdomain") if self.auto_login else None
|
||||
account = f"{subdomain}.1password.com" if subdomain else None
|
||||
|
||||
with self._runner("_account_list account") as ctx:
|
||||
rc, out, err = ctx.run(account=account)
|
||||
if rc == 0 and out.strip():
|
||||
with self._runner("_account_get account") as ctx:
|
||||
rc, out, err = ctx.run(account=account)
|
||||
if rc == 0:
|
||||
self.logged_in = True
|
||||
if not self.logged_in:
|
||||
self.get_token()
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
module.fail_json(msg=f"1Password CLI tool '{self.cli_path}' not installed in path on control machine")
|
||||
raise e
|
||||
|
||||
if not self.logged_in:
|
||||
self.get_token()
|
||||
|
||||
def run(self):
|
||||
result = {}
|
||||
|
|
@ -351,19 +303,15 @@ class OnePasswordInfo:
|
|||
value = self.get_field(term["name"], term["field"], term["section"], term["vault"])
|
||||
|
||||
if term["name"] in result:
|
||||
# If we already have a result for this key, we have to append this result dictionary
|
||||
# to the existing one. This is only applicable when there is a single item
|
||||
# in 1Password which has two different fields, and we want to retrieve both of them.
|
||||
# Two search terms for the same item — merge the field dicts.
|
||||
result[term["name"]].update(value)
|
||||
else:
|
||||
# If this is the first result for this key, simply set it.
|
||||
result[term["name"]] = value
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
global module
|
||||
module = AnsibleModule(
|
||||
argument_spec=dict(
|
||||
cli_path=dict(type="path", default="op"),
|
||||
|
|
@ -380,9 +328,8 @@ def main():
|
|||
),
|
||||
supports_check_mode=True,
|
||||
)
|
||||
module.run_command_environ_update = {"LANGUAGE": "C", "LC_ALL": "C"}
|
||||
|
||||
results = {"onepassword": OnePasswordInfo().run()}
|
||||
results = {"onepassword": OnePasswordInfo(module).run()}
|
||||
|
||||
module.exit_json(changed=False, **results)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue