1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-06-10 18:15:39 +00:00
This commit is contained in:
Alexei Znamensky 2026-06-07 07:26:25 +02:00 committed by GitHub
commit 66a10c6ec8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 385 additions and 144 deletions

View file

@ -0,0 +1,2 @@
bugfixes:
- onepassword_info - rewrite module to support op CLI v2 (https://github.com/ansible-collections/community.general/issues/5303, https://github.com/ansible-collections/community.general/issues/7321, https://github.com/ansible-collections/community.general/pull/12089).

View file

@ -9,6 +9,29 @@ from __future__ import annotations
import os
from ansible_collections.community.general.plugins.module_utils import _cmd_runner_fmt as fmt
from ansible_collections.community.general.plugins.module_utils._cmd_runner import CmdRunner
def onepassword_runner(module, command):
return CmdRunner(
module,
command=command,
arg_formats=dict(
_account_list=fmt.as_fixed(["account", "list"]),
_account_get=fmt.as_fixed(["account", "get"]),
_signin=fmt.as_fixed(["signin", "--raw"]),
_account_add=fmt.as_fixed(["account", "add", "--raw", "--signin"]),
_item_get=fmt.as_fixed(["item", "get", "--format", "json"]),
account=fmt.as_opt_val("--account"),
address=fmt.as_opt_val("--address"),
email=fmt.as_opt_val("--email"),
item_id=fmt.as_list(),
vault=fmt.as_opt_eq_val("--vault"),
session=fmt.as_optval("--session="),
),
)
class OnePasswordConfig:
_config_file_paths = (

View file

@ -14,9 +14,8 @@ module: onepassword_info
author:
- Ryan Conway (@Rylon)
requirements:
- C(op) 1Password command line utility. See U(https://support.1password.com/command-line/)
- C(op) 1Password command line utility version 2 or later. See U(https://support.1password.com/command-line/)
notes:
- Tested with C(op) version 0.5.5.
- Based on the P(community.general.onepassword#lookup) lookup plugin by Scott Buchanan <sbuchanan@ri.pn>.
short_description: Gather items from 1Password
description:
@ -43,8 +42,7 @@ options:
field:
type: str
description:
- The name of the field to search for within this item (optional, defaults to V(password), or V(document) if the
item has an attachment).
- The name of the field to search for within this item (optional, defaults to V(password)).
section:
type: str
description:
@ -124,7 +122,6 @@ EXAMPLES = r"""
field: Custom field name # optional, defaults to 'password'
section: Custom section name # optional, defaults to 'None'
vault: Name of the vault # optional, only necessary if there is more than 1 Vault available
- name: A 1Password item with document attachment
delegate_to: localhost
register: my_1password_item
no_log: true # Don't want to log the secrets to the console!
@ -148,89 +145,58 @@ onepassword:
Custom field name: the value of this field
"My Other 1Password item":
password: the value of this field
"A 1Password item with document attachment":
document: the contents of the document attached to this item
"""
import errno
import json
import os
import re
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_bytes, to_native
from ansible.module_utils.common.text.converters import to_bytes
from ansible_collections.community.general.plugins.module_utils._onepassword import OnePasswordConfig
class AnsibleModuleError(Exception):
def __init__(self, results):
self.results = results
def __repr__(self):
return self.results
from ansible_collections.community.general.plugins.module_utils._onepassword import (
OnePasswordConfig,
onepassword_runner,
)
class OnePasswordInfo:
def __init__(self):
self.cli_path = module.params.get("cli_path")
self.auto_login = module.params.get("auto_login")
def __init__(self, module):
self.module = module
self.cli_path = self.module.params.get("cli_path")
self.auto_login = self.module.params.get("auto_login")
self.logged_in = False
self.token = None
terms = module.params.get("search_terms")
terms = self.module.params.get("search_terms")
self.terms = self.parse_search_terms(terms)
self._config = OnePasswordConfig()
def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False):
if self.token:
# Adds the session token to all commands if we're logged in.
args += [to_bytes("--session=") + self.token]
command = [self.cli_path] + args
rc, out, err = module.run_command(command, data=command_input, check_rc=False, binary_data=True, encoding=None)
if not ignore_errors and rc != expected_rc:
raise AnsibleModuleError(to_native(err))
return rc, out, err
self._runner = onepassword_runner(self.module, self.cli_path)
def _parse_field(self, data_json, item_id, field_name, section_title=None):
data = json.loads(data_json)
field_name_lower = field_name.lower()
if "documentAttributes" in data["details"]:
# This is actually a document, let's fetch the document data instead!
document = self._run(["get", "document", data["overview"]["title"]])
return {"document": document[1].strip()}
else:
# This is not a document, let's try to find the requested field
# Some types of 1Password items have a 'password' field directly alongside the 'fields' attribute,
# not inside it, so we need to check there first.
if field_name in data["details"]:
return {field_name: data["details"][field_name]}
# Otherwise we continue looking inside the 'fields' attribute for the specified field.
for field in data.get("fields", []):
if section_title is None:
if field.get(field_name_lower):
return {field_name: field.get(field_name_lower)}
if field.get("label", "").lower() == field_name_lower:
return {field_name: field.get("value", "")}
if field.get("id", "").lower() == field_name_lower:
return {field_name: field.get("value", "")}
else:
if section_title is None:
for field_data in data["details"].get("fields", []):
if field_data.get("name", "").lower() == field_name.lower():
return {field_name: field_data.get("value", "")}
section = field.get("section", {})
current_section = section.get("label", section.get("id", "")).lower()
if section_title.lower() == current_section:
if field.get("label", "").lower() == field_name_lower:
return {field_name: field.get("value", "")}
if field.get("id", "").lower() == field_name_lower:
return {field_name: field.get("value", "")}
# Not found it yet, so now lets see if there are any sections defined
# and search through those for the field. If a section was given, we skip
# any non-matching sections, otherwise we search them all until we find the field.
for section_data in data["details"].get("sections", []):
if section_title is not None and section_title.lower() != section_data["title"].lower():
continue
for field_data in section_data.get("fields", []):
if field_data.get("t", "").lower() == field_name.lower():
return {field_name: field_data.get("v", "")}
# We will get here if the field could not be found in any section and the item wasn't a document to be downloaded.
optional_section_title = "" if section_title is None else f" in the section '{section_title}'"
module.fail_json(
self.module.fail_json(
msg=f"Unable to find an item in 1Password named '{item_id}' with the field '{field_name}'{optional_section_title}."
)
@ -242,7 +208,7 @@ class OnePasswordInfo:
term = {"name": term}
if "name" not in term:
module.fail_json(msg=f"Missing required 'name' field from search term, got: '{term}'")
self.module.fail_json(msg=f"Missing required 'name' field from search term, got: '{term}'")
term["field"] = term.get("field", "password")
term["section"] = term.get("section")
@ -253,94 +219,80 @@ class OnePasswordInfo:
return processed_terms
def get_raw(self, item_id, vault=None):
try:
args = ["get", "item", item_id]
if vault is not None:
args += [f"--vault={vault}"]
rc, output, dummy = self._run(args)
return output
except Exception as e:
if re.search(".*not found.*", f"{e}"):
module.fail_json(msg=f"Unable to find an item in 1Password named '{item_id}'.")
else:
module.fail_json(msg=f"Unexpected error attempting to find an item in 1Password named '{item_id}': {e}")
with self._runner("_item_get item_id vault session") as ctx:
rc, out, err = ctx.run(item_id=item_id, vault=vault, session=self.token)
if rc != 0:
if "not found" in err.lower():
self.module.fail_json(msg=f"Unable to find an item in 1Password named '{item_id}'.")
self.module.fail_json(
msg=f"Unexpected error attempting to find an item in 1Password named '{item_id}': {err}"
)
return out
def get_field(self, item_id, field, section=None, vault=None):
output = self.get_raw(item_id, vault)
return self._parse_field(output, item_id, field, section) if output != "" else ""
return self._parse_field(output, item_id, field, section) if output else ""
def full_login(self):
if self.auto_login is not None:
if None in [
self.auto_login.get("subdomain"),
self.auto_login.get("username"),
self.auto_login.get("secret_key"),
self.auto_login.get("master_password"),
]:
module.fail_json(
msg="Unable to perform initial sign in to 1Password. "
"subdomain, username, secret_key, and master_password are required to perform initial sign in."
)
args = [
"signin",
f"{self.auto_login['subdomain']}.1password.com",
to_bytes(self.auto_login["username"]),
to_bytes(self.auto_login["secret_key"]),
"--output=raw",
]
try:
rc, out, err = self._run(args, command_input=to_bytes(self.auto_login["master_password"]))
self.token = out.strip()
except AnsibleModuleError as e:
module.fail_json(msg=f"Failed to perform initial sign in to 1Password: {e}")
else:
module.fail_json(
if self.auto_login is None:
self.module.fail_json(
msg=f"Unable to perform an initial sign in to 1Password. Please run '{self.cli_path} signin' "
"or define credentials in 'auto_login'. See the module documentation for details."
)
if None in [
self.auto_login.get("subdomain"),
self.auto_login.get("username"),
self.auto_login.get("secret_key"),
self.auto_login.get("master_password"),
]:
self.module.fail_json(
msg="Unable to perform initial sign in to 1Password. "
"subdomain, username, secret_key, and master_password are required to perform initial sign in."
)
subdomain = self.auto_login["subdomain"]
with self._runner(
"_account_add address email",
data=to_bytes(self.auto_login["master_password"]),
environ_update={"OP_SECRET_KEY": self.auto_login["secret_key"]},
) as ctx:
rc, out, err = ctx.run(
address=f"{subdomain}.1password.com",
email=self.auto_login["username"],
)
if rc != 0:
self.module.fail_json(msg=f"Failed to perform initial sign in to 1Password: {err}")
self.token = out.strip()
def get_token(self):
# If the config file exists, assume an initial signin has taken place and try basic sign in
if os.path.isfile(self._config.config_file_path):
if self.auto_login is not None:
# Since we are not currently signed in, master_password is required at a minimum
if not self.auto_login.get("master_password"):
module.fail_json(msg="Unable to sign in to 1Password. 'auto_login.master_password' is required.")
config_path = self._config.config_file_path
if config_path and os.path.isfile(config_path) and self.auto_login is not None:
if not self.auto_login.get("master_password"):
self.module.fail_json(msg="Unable to sign in to 1Password. 'auto_login.master_password' is required.")
# Try signing in using the master_password and a subdomain if one is provided
try:
args = ["signin", "--output=raw"]
with self._runner("_signin account", data=to_bytes(self.auto_login["master_password"])) as ctx:
rc, out, err = ctx.run(account=self.auto_login.get("subdomain"))
if rc == 0:
self.token = out.strip()
return
if self.auto_login.get("subdomain"):
args = ["signin", self.auto_login["subdomain"], "--output=raw"]
rc, out, err = self._run(args, command_input=to_bytes(self.auto_login["master_password"]))
self.token = out.strip()
except AnsibleModuleError:
self.full_login()
else:
self.full_login()
else:
# Attempt a full sign in since there appears to be no existing sign in
self.full_login()
self.full_login()
def assert_logged_in(self):
try:
rc, out, err = self._run(["get", "account"], ignore_errors=True)
subdomain = self.auto_login.get("subdomain") if self.auto_login else None
account = f"{subdomain}.1password.com" if subdomain else None
with self._runner("_account_list account") as ctx:
rc, out, err = ctx.run(account=account)
if rc == 0 and out.strip():
with self._runner("_account_get account") as ctx:
rc, out, err = ctx.run(account=account)
if rc == 0:
self.logged_in = True
if not self.logged_in:
self.get_token()
except OSError as e:
if e.errno == errno.ENOENT:
module.fail_json(msg=f"1Password CLI tool '{self.cli_path}' not installed in path on control machine")
raise e
if not self.logged_in:
self.get_token()
def run(self):
result = {}
@ -351,19 +303,15 @@ class OnePasswordInfo:
value = self.get_field(term["name"], term["field"], term["section"], term["vault"])
if term["name"] in result:
# If we already have a result for this key, we have to append this result dictionary
# to the existing one. This is only applicable when there is a single item
# in 1Password which has two different fields, and we want to retrieve both of them.
# Two search terms for the same item — merge the field dicts.
result[term["name"]].update(value)
else:
# If this is the first result for this key, simply set it.
result[term["name"]] = value
return result
def main():
global module
module = AnsibleModule(
argument_spec=dict(
cli_path=dict(type="path", default="op"),
@ -380,9 +328,8 @@ def main():
),
supports_check_mode=True,
)
module.run_command_environ_update = {"LANGUAGE": "C", "LC_ALL": "C"}
results = {"onepassword": OnePasswordInfo().run()}
results = {"onepassword": OnePasswordInfo(module).run()}
module.exit_json(changed=False, **results)

View file

@ -0,0 +1,135 @@
# Copyright (c) 2026, Alexei Znamensky <russoz@gmail.com>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import annotations
import json
from unittest.mock import PropertyMock
import pytest
from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import set_module_args
from ansible_collections.community.general.plugins.module_utils._onepassword import OnePasswordConfig
from ansible_collections.community.general.plugins.modules import onepassword_info
from .uthelper import RunCommandMock, TestCaseMock, UTHelper
ITEM_JSON = json.dumps(
{
"fields": [
{"id": "password", "label": "password", "value": "secret123"},
]
}
)
class OnePasswordConfigMock(TestCaseMock):
name = "onepassword_config"
def setup(self, mocker):
mocker.patch.object(
OnePasswordConfig,
"config_file_path",
new_callable=PropertyMock,
return_value=self.mock_specs.get("config_file_path"),
)
def check(self, test_case, results):
pass
UTHelper.from_module(onepassword_info, __name__, mocks=[RunCommandMock, OnePasswordConfigMock])
def _patch_bin_path(mocker):
mocker.patch(
"ansible.module_utils.basic.AnsibleModule.get_bin_path",
lambda self_, path, *args, **kwargs: f"/testbin/{path}",
)
def _patch_run_command(mocker, responses):
mocker.patch(
"ansible.module_utils.basic.AnsibleModule.run_command",
side_effect=lambda cmd, **kwargs: next(responses),
)
def test_get_token_signin(mocker, capfd):
master_password = "masterpass"
with set_module_args(
{
"search_terms": [{"name": "My Item"}],
"auto_login": {"subdomain": "mycompany", "master_password": master_password},
}
):
_patch_bin_path(mocker)
mocker.patch.object(
OnePasswordConfig,
"config_file_path",
new_callable=PropertyMock,
return_value="/home/user/.op/config",
)
mocker.patch(
"ansible_collections.community.general.plugins.modules.onepassword_info.os.path.isfile",
return_value=True,
)
_patch_run_command(
mocker,
iter(
[
(0, "", ""), # account list → out empty, not logged in
(0, "mytoken\n", ""), # signin --raw --account mycompany
(0, ITEM_JSON, ""), # item get --format json My Item --session=mytoken
]
),
)
with pytest.raises(SystemExit):
onepassword_info.main()
out, dummy = capfd.readouterr()
result = json.loads(out)
assert not result.get("failed"), result.get("msg")
assert result["onepassword"]["My Item"]["password"] == "secret123"
def test_full_login(mocker, capfd):
master_password = "masterpass"
with set_module_args(
{
"search_terms": [{"name": "My Item"}],
"auto_login": {
"subdomain": "mycompany",
"username": "user@example.com",
"secret_key": "mysecretkey",
"master_password": master_password,
},
}
):
_patch_bin_path(mocker)
mocker.patch.object(
OnePasswordConfig,
"config_file_path",
new_callable=PropertyMock,
return_value=None,
)
_patch_run_command(
mocker,
iter(
[
(0, "", ""), # account list → not logged in
(0, "mytoken\n", ""), # account add --raw --signin → token
(0, ITEM_JSON, ""), # item get --format json My Item --session=mytoken
]
),
)
with pytest.raises(SystemExit):
onepassword_info.main()
out, dummy = capfd.readouterr()
result = json.loads(out)
assert not result.get("failed"), result.get("msg")
assert result["onepassword"]["My Item"]["password"] == "secret123"

View file

@ -0,0 +1,134 @@
# Copyright (c) 2026, Alexei Znamensky <russoz@gmail.com>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
---
anchors:
env: &env {environ_update: {LANGUAGE: C, LC_ALL: C}, check_rc: false}
item_json: &item_json '{"fields": [{"id": "password", "label": "password", "value": "secret123"}]}'
account_list: &account_list
command: [/testbin/op, account, list]
environ: *env
rc: 0
out: "account info\n"
err: ''
account_get: &account_get
command: [/testbin/op, account, get]
environ: *env
rc: 0
out: ''
err: ''
test_cases:
- id: already_logged_in
input:
search_terms:
- name: My Item
output:
changed: false
onepassword:
"My Item":
password: secret123
mocks:
run_command:
- *account_list
- *account_get
- command: [/testbin/op, item, get, --format, json, My Item]
environ: *env
rc: 0
out: *item_json
err: ''
- id: already_logged_in_with_vault
input:
search_terms:
- name: My Item
vault: Personal
output:
changed: false
onepassword:
"My Item":
password: secret123
mocks:
run_command:
- *account_list
- *account_get
- command: [/testbin/op, item, get, --format, json, My Item, --vault=Personal]
environ: *env
rc: 0
out: *item_json
err: ''
- id: already_logged_in_section_field
input:
search_terms:
- name: My Item
field: Custom Field
section: My Section
output:
changed: false
onepassword:
"My Item":
Custom Field: custom_val
mocks:
run_command:
- *account_list
- *account_get
- command: [/testbin/op, item, get, --format, json, My Item]
environ: *env
rc: 0
out: >-
{"fields": [{"id": "cf_id", "label": "Custom Field", "value": "custom_val",
"section": {"id": "sec_id", "label": "My Section"}}]}
err: ''
- id: item_not_found
input:
search_terms:
- name: Missing Item
output:
failed: true
msg: "Unable to find an item in 1Password named 'Missing Item'."
mocks:
run_command:
- *account_list
- *account_get
- command: [/testbin/op, item, get, --format, json, Missing Item]
environ: *env
rc: 1
out: ''
err: '"Missing Item" not found in vault'
- id: field_not_found
input:
search_terms:
- name: My Item
field: nonexistent
output:
failed: true
msg: "Unable to find an item in 1Password named 'My Item' with the field 'nonexistent'."
mocks:
run_command:
- *account_list
- *account_get
- command: [/testbin/op, item, get, --format, json, My Item]
environ: *env
rc: 0
out: *item_json
err: ''
- id: not_logged_in_no_auto_login
input:
search_terms:
- name: My Item
output:
failed: true
mocks:
run_command:
- command: [/testbin/op, account, list]
environ: *env
rc: 0
out: ''
err: ''
onepassword_config:
config_file_path: ~