mirror of
https://github.com/ansible-collections/community.general.git
synced 2026-03-25 22:57:21 +00:00
[PR #11655/6d3ab1a8 backport][stable-12] passwordstore lookup: update code meant for Python2 (#11669)
passwordstore lookup: update code meant for Python2 (#11655)
* passwordstore lookup: update code meant for Python2
* add changelog frag
* add check param to subprocess.run() to reinstate sanity
(cherry picked from commit 6d3ab1a80c)
Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
This commit is contained in:
parent
555d7b9038
commit
02b25fb096
2 changed files with 18 additions and 32 deletions
3
changelogs/fragments/11655-passwordstore-cleanup.yml
Normal file
3
changelogs/fragments/11655-passwordstore-cleanup.yml
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
minor_changes:
|
||||
- lookup plugin passwordstore - modernize internal ``check_output2()`` helper using ``subprocess.run()`` and rename it to ``run_backend_cmd()``
|
||||
(https://github.com/ansible-collections/community.general/pull/11655).
|
||||
|
|
@ -259,38 +259,21 @@ from ansible_collections.community.general.plugins.module_utils._filelock import
|
|||
display = Display()
|
||||
|
||||
|
||||
# backhacked check_output with input for python 2.7
|
||||
# http://stackoverflow.com/questions/10103551/passing-data-to-subprocess-check-output
|
||||
# note: contains special logic for calling 'pass', so not a drop-in replacement for check_output
|
||||
def check_output2(*popenargs, **kwargs):
|
||||
if "stdout" in kwargs:
|
||||
raise ValueError("stdout argument not allowed, it will be overridden.")
|
||||
if "stderr" in kwargs:
|
||||
raise ValueError("stderr argument not allowed, it will be overridden.")
|
||||
if "input" in kwargs:
|
||||
if "stdin" in kwargs:
|
||||
raise ValueError("stdin and input arguments may not both be used.")
|
||||
b_inputdata = to_bytes(kwargs["input"], errors="surrogate_or_strict")
|
||||
del kwargs["input"]
|
||||
kwargs["stdin"] = subprocess.PIPE
|
||||
else:
|
||||
b_inputdata = None
|
||||
process = subprocess.Popen(*popenargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
|
||||
try:
|
||||
b_out, b_err = process.communicate(b_inputdata)
|
||||
except Exception:
|
||||
process.kill()
|
||||
process.wait()
|
||||
raise
|
||||
retcode = process.poll()
|
||||
def run_backend_cmd(cmd, *, input=None, env=None):
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
check=False,
|
||||
capture_output=True,
|
||||
input=to_bytes(input, errors="surrogate_or_strict") if input else None,
|
||||
env=env,
|
||||
)
|
||||
b_out, b_err = result.stdout, result.stderr
|
||||
retcode = result.returncode
|
||||
if retcode == 0 and (
|
||||
b"encryption failed: Unusable public key" in b_out or b"encryption failed: Unusable public key" in b_err
|
||||
):
|
||||
retcode = 78 # os.EX_CONFIG
|
||||
if retcode != 0:
|
||||
cmd = kwargs.get("args")
|
||||
if cmd is None:
|
||||
cmd = popenargs[0]
|
||||
raise subprocess.CalledProcessError(retcode, cmd, to_native(b_out + b_err, errors="surrogate_or_strict"))
|
||||
return b_out
|
||||
|
||||
|
|
@ -304,7 +287,7 @@ class LookupModule(LookupBase):
|
|||
if self.realpass is None:
|
||||
try:
|
||||
passoutput = to_text(
|
||||
check_output2([self.pass_cmd, "--version"], env=self.env), errors="surrogate_or_strict"
|
||||
run_backend_cmd([self.pass_cmd, "--version"], env=self.env), errors="surrogate_or_strict"
|
||||
)
|
||||
self.realpass = "pass: the standard unix password manager" in passoutput
|
||||
except subprocess.CalledProcessError as e:
|
||||
|
|
@ -349,7 +332,7 @@ class LookupModule(LookupBase):
|
|||
|
||||
# Collect pass environment variables from the plugin's parameters.
|
||||
self.env = os.environ.copy()
|
||||
self.env["LANGUAGE"] = "C" # make sure to get errors in English as required by check_output2
|
||||
self.env["LANGUAGE"] = "C" # make sure to get errors in English as required by run_backend_cmd
|
||||
|
||||
if self.backend == "gopass":
|
||||
self.env["GOPASS_NO_REMINDER"] = "YES"
|
||||
|
|
@ -371,7 +354,7 @@ class LookupModule(LookupBase):
|
|||
def check_pass(self):
|
||||
try:
|
||||
self.passoutput = to_text(
|
||||
check_output2([self.pass_cmd, "show"] + [self.passname], env=self.env), errors="surrogate_or_strict"
|
||||
run_backend_cmd([self.pass_cmd, "show"] + [self.passname], env=self.env), errors="surrogate_or_strict"
|
||||
).splitlines()
|
||||
self.password = self.passoutput[0]
|
||||
self.passdict = {}
|
||||
|
|
@ -456,7 +439,7 @@ class LookupModule(LookupBase):
|
|||
msg += f"lookup_pass: old password was {self.password} (Updated on {datetime})\n"
|
||||
|
||||
try:
|
||||
check_output2([self.pass_cmd, "insert", "-f", "-m", self.passname], input=msg, env=self.env)
|
||||
run_backend_cmd([self.pass_cmd, "insert", "-f", "-m", self.passname], input=msg, env=self.env)
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise AnsibleError(f"exit code {e.returncode} while running {e.cmd}. Error output: {e.output}") from e
|
||||
return newpass
|
||||
|
|
@ -477,7 +460,7 @@ class LookupModule(LookupBase):
|
|||
msg += f"\nlookup_pass: First generated by ansible on {datetime}\n"
|
||||
|
||||
try:
|
||||
check_output2([self.pass_cmd, "insert", "-f", "-m", self.passname], input=msg, env=self.env)
|
||||
run_backend_cmd([self.pass_cmd, "insert", "-f", "-m", self.passname], input=msg, env=self.env)
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise AnsibleError(f"exit code {e.returncode} while running {e.cmd}. Error output: {e.output}") from e
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue