1
0
Fork 0
mirror of https://github.com/containers/ansible-podman-collections.git synced 2026-02-04 07:11:49 +00:00
ansible-podman-collections/plugins/module_utils/podman/podman_api.py
2025-08-24 17:56:50 +03:00

303 lines
10 KiB
Python

# The follwing code is taken from
# https://github.com/msabramo/requests-unixsocket/blob/master/
# requests_unixsocket/adapters.py
from __future__ import absolute_import, division, print_function
import socket
try:
import requests
from requests.adapters import HTTPAdapter
from requests.compat import urlparse, unquote, quote
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
# Try to use requests-unixsocket if available, as it handles compatibility issues
try:
import requests_unixsocket
HAS_REQUESTS_UNIXSOCKET = True
except ImportError:
HAS_REQUESTS_UNIXSOCKET = False
try:
from requests.packages import urllib3
HAS_URLLIB3 = True
except ImportError:
try:
import urllib3
HAS_URLLIB3 = True
except ImportError:
HAS_URLLIB3 = False
try:
import http.client as httplib
except ImportError:
import httplib
import json
__metaclass__ = type
DEFAULT_SCHEME = "http+unix://"
class PodmanAPIError(Exception):
pass
# The following was adapted from some code from docker-py
# https://github.com/docker/docker-py/blob/master/docker/transport/unixconn.py
class UnixHTTPConnection(httplib.HTTPConnection, object):
def __init__(self, unix_socket_url, timeout=60):
"""Create an HTTP connection to a unix domain socket
:param unix_socket_url: A URL with a scheme of 'http+unix' and the
netloc is a percent-encoded path to a unix domain socket. E.g.:
'http+unix://%2Ftmp%2Fprofilesvc.sock/status/pid'
"""
super(UnixHTTPConnection, self).__init__("localhost", timeout=timeout)
self.unix_socket_url = unix_socket_url
self.timeout = timeout
self.sock = None
def __del__(self): # base class does not have d'tor
if self.sock:
self.sock.close()
def connect(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
socket_path = unquote(urlparse(self.unix_socket_url).netloc)
sock.connect(socket_path)
self.sock = sock
if HAS_URLLIB3:
class UnixHTTPConnectionPool(urllib3.connectionpool.HTTPConnectionPool):
def __init__(self, socket_path, timeout=60):
super(UnixHTTPConnectionPool, self).__init__("localhost", timeout=timeout)
self.socket_path = socket_path
self.timeout = timeout
def _new_conn(self):
return UnixHTTPConnection(self.socket_path, self.timeout)
if HAS_REQUESTS:
class UnixAdapter(HTTPAdapter):
def __init__(self, *args, timeout=60, pool_connections=25, **kwargs):
super(UnixAdapter, self).__init__(*args, **kwargs)
self.timeout = timeout
self.pools = urllib3._collections.RecentlyUsedContainer(pool_connections, dispose_func=lambda p: p.close())
def get_connection_with_tls_context(self, request, verify, proxies=None, cert=None):
# Fix for requests 2.32.2+:
# https://github.com/psf/requests/commit/c98e4d133ef29c46a9b68cd783087218a8075e05
return self.get_connection(request.url, proxies)
def get_connection(self, url, proxies=None):
proxies = proxies or {}
proxy = proxies.get(urlparse(url.lower()).scheme)
if proxy:
raise ValueError("%s does not support specifying proxies" % self.__class__.__name__)
with self.pools.lock:
pool = self.pools.get(url)
if pool:
return pool
pool = UnixHTTPConnectionPool(url, self.timeout)
self.pools[url] = pool
return pool
def request_url(self, request, proxies):
return request.path_url
def close(self):
self.pools.clear()
if HAS_REQUESTS:
class APISession(requests.Session):
def __init__(self, *args, url_scheme=DEFAULT_SCHEME, **kwargs):
super(APISession, self).__init__(*args, **kwargs)
if HAS_REQUESTS_UNIXSOCKET and url_scheme == DEFAULT_SCHEME:
# Use requests-unixsocket which handles newer requests versions
self.mount("http+unix://", requests_unixsocket.adapters.UnixAdapter())
else:
# Fallback to custom implementation
self.mount(url_scheme, UnixAdapter())
# Also mount the scheme without the '+' for new requests versions
if url_scheme == DEFAULT_SCHEME:
self.mount("http+unix://", UnixAdapter())
class PodmanAPIHTTP:
def __init__(self, base_url, scheme=DEFAULT_SCHEME):
self.api_url = "".join((scheme, quote(base_url, safe=""), "/v5.0.0/libpod"))
if scheme == "http://":
self.api_url = "".join((scheme, base_url, "/v5.0.0/libpod"))
self.session = APISession()
def request(self, method, url, **kwargs):
return self.session.request(method=method, url=self.api_url + url, **kwargs)
def get(self, url, **kwargs):
kwargs.setdefault("allow_redirects", True)
return self.request("get", url, **kwargs)
def head(self, url, **kwargs):
kwargs.setdefault("allow_redirects", False)
return self.request("head", url, **kwargs)
def post(self, url, data=None, json=None, **kwargs):
return self.request("post", url, data=data, json=json, **kwargs)
def patch(self, url, data=None, **kwargs):
return self.request("patch", url, data=data, **kwargs)
def put(self, url, data=None, **kwargs):
return self.request("put", url, data=data, **kwargs)
def delete(self, url, **kwargs):
return self.request("delete", url, **kwargs)
def options(self, url, **kwargs):
kwargs.setdefault("allow_redirects", True)
return self.request("options", url, **kwargs)
class PodmanAPIClient:
def __init__(self, base_url):
if not HAS_REQUESTS:
raise PodmanAPIError("requests package is required for podman API")
socket_opt = urlparse(base_url)
if socket_opt.scheme not in ("unix", "http"):
raise PodmanAPIError("Scheme %s is not supported! Use %s" % (socket_opt.scheme, DEFAULT_SCHEME))
if socket_opt.scheme == "http":
self.api = PodmanAPIHTTP(socket_opt.netloc, "http://")
else:
self.api = PodmanAPIHTTP(socket_opt.path)
self.containers = PodmanAPIContainers(self.api)
self.images = PodmanAPIImages(api=self.api)
def version(self):
response = self.api.get("/version")
return response.json()
class PodmanAPIContainers:
def __init__(self, api):
self.api = api
self.quote = quote
def list(self, all_=None, filters=None, limit=None, size=None, sync=None):
"""List all images for a Podman service."""
query = {}
if all_ is not None:
query["all"] = True
if filters is not None:
query["filters"] = filters
if limit is not None:
query["limit"] = limit
if size is not None:
query["size"] = size
if sync is not None:
query["sync"] = sync
response = self.api.get("/containers/json", params=query)
# observed to return None when no containers
return response.json() or []
def create(self, **container_data):
response = self.api.post(
"/containers/create",
json=container_data,
)
if response.ok:
return response.json()
raise PodmanAPIError("Container %s failed to create! Error: %s" % (container_data.get("name"), response.text))
def get(self, name):
response = self.api.get("/containers/{0}/json".format(self.quote(name)))
data = response.json()
if data.get("response") == 404:
data = {}
# raise Exception("Container %s not found!" % name)
return data
def run(self, **container_data):
_ = self.create(**container_data) # pylint: disable=blacklisted-name
name = container_data.get("name")
_ = self.api.post( # pylint: disable=blacklisted-name
"/containers/{0}/start".format(self.quote(name)),
)
return self.get(name)
def start(self, name):
_ = self.api.post( # pylint: disable=blacklisted-name
"/containers/{0}/start".format(self.quote(name)),
)
return self.get(name)
def stop(self, name):
_ = self.api.post( # pylint: disable=blacklisted-name
"/containers/{0}/stop".format(self.quote(name)),
)
return self.get(name)
def restart(self, name):
_ = self.api.post( # pylint: disable=blacklisted-name
"/containers/{0}/restart".format(self.quote(name)),
)
return self.get(name)
def remove(self, name, force=False):
_ = self.api.delete( # pylint: disable=blacklisted-name
"/containers/{0}".format(self.quote(name)), params={"force": force}
)
return
class PodmanAPIImages:
def __init__(self, api):
self.api = api
self.quote = quote
self.inspect = self.get
def exists(self, name):
response = self.api.get("/images/{0}/exists".format(self.quote(name)))
return response.status_code == 204
def pull(self, reference):
response = self.api.post("/images/pull", params={"reference": reference})
if response.ok:
correct_response = {"stream": "", "text": response.text}
for i in response.text.splitlines():
if '"images"' in i:
correct_response["images"] = json.loads(i)["images"]
if '"id"' in i:
correct_response["id"] = json.loads(i)["id"]
elif '"stream"' in i:
correct_response["stream"] += json.loads(i)["stream"]
elif '"error"' in i:
correct_response["error"] = json.loads(i)["error"]
correct_response["code"] = response.status_code
return correct_response
return {"error": "HTTP %s Error: %s" % (response.json()["message"])}
def get(self, name):
response = self.api.get("/images/{0}/json".format(self.quote(name)))
return response.json()