1
0
Fork 0
mirror of https://github.com/ansible-collections/community.general.git synced 2026-04-05 11:46:57 +00:00

Initial commit

This commit is contained in:
Ansible Core Team 2020-03-09 09:11:07 +00:00
commit aebc1b03fd
4861 changed files with 812621 additions and 0 deletions

View file

@ -0,0 +1,238 @@
# Copyright (c) 2018 Cisco and/or its affiliates.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
import re
from ansible.module_utils._text import to_text
from ansible.module_utils.common.collections import is_string
from ansible.module_utils.six import iteritems
INVALID_IDENTIFIER_SYMBOLS = r'[^a-zA-Z0-9_]'
IDENTITY_PROPERTIES = ['id', 'version', 'ruleId']
NON_COMPARABLE_PROPERTIES = IDENTITY_PROPERTIES + ['isSystemDefined', 'links', 'token', 'rulePosition']
class HTTPMethod:
GET = 'get'
POST = 'post'
PUT = 'put'
DELETE = 'delete'
class ResponseParams:
SUCCESS = 'success'
STATUS_CODE = 'status_code'
RESPONSE = 'response'
class FtdConfigurationError(Exception):
def __init__(self, msg, obj=None):
super(FtdConfigurationError, self).__init__(msg)
self.msg = msg
self.obj = obj
class FtdServerError(Exception):
def __init__(self, response, code):
super(FtdServerError, self).__init__(response)
self.response = response
self.code = code
class FtdUnexpectedResponse(Exception):
"""The exception to be raised in case of unexpected responses from 3d parties."""
pass
def construct_ansible_facts(response, params):
facts = dict()
if response:
response_body = response['items'] if 'items' in response else response
if params.get('register_as'):
facts[params['register_as']] = response_body
elif type(response_body) is dict and response_body.get('name') and response_body.get('type'):
object_name = re.sub(INVALID_IDENTIFIER_SYMBOLS, '_', response_body['name'].lower())
fact_name = '%s_%s' % (response_body['type'], object_name)
facts[fact_name] = response_body
return facts
def copy_identity_properties(source_obj, dest_obj):
for property_name in IDENTITY_PROPERTIES:
if property_name in source_obj:
dest_obj[property_name] = source_obj[property_name]
return dest_obj
def is_object_ref(d):
"""
Checks if a dictionary is a reference object. The dictionary is considered to be a
reference object when it contains non-empty 'id' and 'type' fields.
:type d: dict
:return: True if passed dictionary is a reference object, otherwise False
"""
has_id = 'id' in d.keys() and d['id']
has_type = 'type' in d.keys() and d['type']
return has_id and has_type
def equal_object_refs(d1, d2):
"""
Checks whether two references point to the same object.
:type d1: dict
:type d2: dict
:return: True if passed references point to the same object, otherwise False
"""
have_equal_ids = d1['id'] == d2['id']
have_equal_types = d1['type'] == d2['type']
return have_equal_ids and have_equal_types
def equal_lists(l1, l2):
"""
Checks whether two lists are equal. The order of elements in the arrays is important.
:type l1: list
:type l2: list
:return: True if passed lists, their elements and order of elements are equal. Otherwise, returns False.
"""
if len(l1) != len(l2):
return False
for v1, v2 in zip(l1, l2):
if not equal_values(v1, v2):
return False
return True
def equal_dicts(d1, d2, compare_by_reference=True):
"""
Checks whether two dictionaries are equal. If `compare_by_reference` is set to True, dictionaries referencing
objects are compared using `equal_object_refs` method. Otherwise, every key and value is checked.
:type d1: dict
:type d2: dict
:param compare_by_reference: if True, dictionaries referencing objects are compared using `equal_object_refs` method
:return: True if passed dicts are equal. Otherwise, returns False.
"""
if compare_by_reference and is_object_ref(d1) and is_object_ref(d2):
return equal_object_refs(d1, d2)
if len(d1) != len(d2):
return False
for key, v1 in d1.items():
if key not in d2:
return False
v2 = d2[key]
if not equal_values(v1, v2):
return False
return True
def equal_values(v1, v2):
"""
Checks whether types and content of two values are the same. In case of complex objects, the method might be
called recursively.
:param v1: first value
:param v2: second value
:return: True if types and content of passed values are equal. Otherwise, returns False.
:rtype: bool
"""
# string-like values might have same text but different types, so checking them separately
if is_string(v1) and is_string(v2):
return to_text(v1) == to_text(v2)
if type(v1) != type(v2):
return False
value_type = type(v1)
if value_type == list:
return equal_lists(v1, v2)
elif value_type == dict:
return equal_dicts(v1, v2)
else:
return v1 == v2
def equal_objects(d1, d2):
"""
Checks whether two objects are equal. Ignores special object properties (e.g. 'id', 'version') and
properties with None and empty values. In case properties contains a reference to the other object,
only object identities (ids and types) are checked. Also, if an array field contains multiple references
to the same object, duplicates are ignored when comparing objects.
:type d1: dict
:type d2: dict
:return: True if passed objects and their properties are equal. Otherwise, returns False.
"""
def prepare_data_for_comparison(d):
d = dict((k, d[k]) for k in d.keys() if k not in NON_COMPARABLE_PROPERTIES and d[k])
d = delete_ref_duplicates(d)
return d
d1 = prepare_data_for_comparison(d1)
d2 = prepare_data_for_comparison(d2)
return equal_dicts(d1, d2, compare_by_reference=False)
def delete_ref_duplicates(d):
"""
Removes reference duplicates from array fields: if an array contains multiple items and some of
them refer to the same object, only unique references are preserved (duplicates are removed).
:param d: dict with data
:type d: dict
:return: dict without reference duplicates
"""
def delete_ref_duplicates_from_list(refs):
if all(type(i) == dict and is_object_ref(i) for i in refs):
unique_refs = set()
unique_list = list()
for i in refs:
key = (i['id'], i['type'])
if key not in unique_refs:
unique_refs.add(key)
unique_list.append(i)
return list(unique_list)
else:
return refs
if not d:
return d
modified_d = {}
for k, v in iteritems(d):
if type(v) == list:
modified_d[k] = delete_ref_duplicates_from_list(v)
elif type(v) == dict:
modified_d[k] = delete_ref_duplicates(v)
else:
modified_d[k] = v
return modified_d

View file

@ -0,0 +1,565 @@
# Copyright (c) 2018 Cisco and/or its affiliates.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
import copy
from functools import partial
from ansible_collections.community.general.plugins.module_utils.network.ftd.common import HTTPMethod, equal_objects, FtdConfigurationError, \
FtdServerError, ResponseParams, copy_identity_properties, FtdUnexpectedResponse
from ansible_collections.community.general.plugins.module_utils.network.ftd.fdm_swagger_client import OperationField, ValidationError
from ansible.module_utils.six import iteritems
DEFAULT_PAGE_SIZE = 10
DEFAULT_OFFSET = 0
UNPROCESSABLE_ENTITY_STATUS = 422
INVALID_UUID_ERROR_MESSAGE = "Validation failed due to an invalid UUID"
DUPLICATE_NAME_ERROR_MESSAGE = "Validation failed due to a duplicate name"
MULTIPLE_DUPLICATES_FOUND_ERROR = (
"Multiple objects matching specified filters are found. "
"Please, define filters more precisely to match one object exactly."
)
DUPLICATE_ERROR = (
"Cannot add a new object. "
"An object with the same name but different parameters already exists."
)
ADD_OPERATION_NOT_SUPPORTED_ERROR = (
"Cannot add a new object while executing an upsert request. "
"Creation of objects with this type is not supported."
)
PATH_PARAMS_FOR_DEFAULT_OBJ = {'objId': 'default'}
class OperationNamePrefix:
ADD = 'add'
EDIT = 'edit'
GET = 'get'
DELETE = 'delete'
UPSERT = 'upsert'
class QueryParams:
FILTER = 'filter'
class ParamName:
QUERY_PARAMS = 'query_params'
PATH_PARAMS = 'path_params'
DATA = 'data'
FILTERS = 'filters'
class CheckModeException(Exception):
pass
class FtdInvalidOperationNameError(Exception):
def __init__(self, operation_name):
super(FtdInvalidOperationNameError, self).__init__(operation_name)
self.operation_name = operation_name
class OperationChecker(object):
@classmethod
def is_add_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is add object operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is add object operation, otherwise False
:rtype: bool
"""
# Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
return operation_name.startswith(OperationNamePrefix.ADD) and is_post_request(operation_spec)
@classmethod
def is_edit_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is edit object operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is edit object operation, otherwise False
:rtype: bool
"""
# Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
return operation_name.startswith(OperationNamePrefix.EDIT) and is_put_request(operation_spec)
@classmethod
def is_delete_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is delete object operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is delete object operation, otherwise False
:rtype: bool
"""
# Some endpoints have non-CRUD operations, so checking operation name is required in addition to the HTTP method
return operation_name.startswith(OperationNamePrefix.DELETE) \
and operation_spec[OperationField.METHOD] == HTTPMethod.DELETE
@classmethod
def is_get_list_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is get list of objects operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is get a list of objects operation, otherwise False
:rtype: bool
"""
return operation_spec[OperationField.METHOD] == HTTPMethod.GET \
and operation_spec[OperationField.RETURN_MULTIPLE_ITEMS]
@classmethod
def is_get_operation(cls, operation_name, operation_spec):
"""
Check if operation defined with 'operation_name' is get objects operation according to 'operation_spec'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:return: True if the called operation is get object operation, otherwise False
:rtype: bool
"""
return operation_spec[OperationField.METHOD] == HTTPMethod.GET \
and not operation_spec[OperationField.RETURN_MULTIPLE_ITEMS]
@classmethod
def is_upsert_operation(cls, operation_name):
"""
Check if operation defined with 'operation_name' is upsert objects operation according to 'operation_name'.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:return: True if the called operation is upsert object operation, otherwise False
:rtype: bool
"""
return operation_name.startswith(OperationNamePrefix.UPSERT)
@classmethod
def is_find_by_filter_operation(cls, operation_name, params, operation_spec):
"""
Checks whether the called operation is 'find by filter'. This operation fetches all objects and finds
the matching ones by the given filter. As filtering is done on the client side, this operation should be used
only when selected filters are not implemented on the server side.
:param operation_name: name of the operation being called by the user
:type operation_name: str
:param operation_spec: specification of the operation being called by the user
:type operation_spec: dict
:param params: params - params should contain 'filters'
:return: True if the called operation is find by filter, otherwise False
:rtype: bool
"""
is_get_list = cls.is_get_list_operation(operation_name, operation_spec)
return is_get_list and ParamName.FILTERS in params and params[ParamName.FILTERS]
@classmethod
def is_upsert_operation_supported(cls, operations):
"""
Checks if all operations required for upsert object operation are defined in 'operations'.
:param operations: specification of the operations supported by model
:type operations: dict
:return: True if all criteria required to provide requested called operation are satisfied, otherwise False
:rtype: bool
"""
has_edit_op = next((name for name, spec in iteritems(operations) if cls.is_edit_operation(name, spec)), None)
has_get_list_op = next((name for name, spec in iteritems(operations)
if cls.is_get_list_operation(name, spec)), None)
return has_edit_op and has_get_list_op
class BaseConfigurationResource(object):
def __init__(self, conn, check_mode=False):
self._conn = conn
self.config_changed = False
self._operation_spec_cache = {}
self._models_operations_specs_cache = {}
self._check_mode = check_mode
self._operation_checker = OperationChecker
self._system_info = None
def execute_operation(self, op_name, params):
"""
Allow user request execution of simple operations(natively supported by API provider) as well as complex
operations(operations that are implemented as a set of simple operations).
:param op_name: name of the operation being called by the user
:type op_name: str
:param params: definition of the params that operation should be executed with
:type params: dict
:return: Result of the operation being executed
:rtype: dict
"""
if self._operation_checker.is_upsert_operation(op_name):
return self.upsert_object(op_name, params)
else:
return self.crud_operation(op_name, params)
def crud_operation(self, op_name, params):
"""
Allow user request execution of simple operations(natively supported by API provider) only.
:param op_name: name of the operation being called by the user
:type op_name: str
:param params: definition of the params that operation should be executed with
:type params: dict
:return: Result of the operation being executed
:rtype: dict
"""
op_spec = self.get_operation_spec(op_name)
if op_spec is None:
raise FtdInvalidOperationNameError(op_name)
if self._operation_checker.is_add_operation(op_name, op_spec):
resp = self.add_object(op_name, params)
elif self._operation_checker.is_edit_operation(op_name, op_spec):
resp = self.edit_object(op_name, params)
elif self._operation_checker.is_delete_operation(op_name, op_spec):
resp = self.delete_object(op_name, params)
elif self._operation_checker.is_find_by_filter_operation(op_name, params, op_spec):
resp = list(self.get_objects_by_filter(op_name, params))
else:
resp = self.send_general_request(op_name, params)
return resp
def get_operation_spec(self, operation_name):
if operation_name not in self._operation_spec_cache:
self._operation_spec_cache[operation_name] = self._conn.get_operation_spec(operation_name)
return self._operation_spec_cache[operation_name]
def get_operation_specs_by_model_name(self, model_name):
if model_name not in self._models_operations_specs_cache:
model_op_specs = self._conn.get_operation_specs_by_model_name(model_name)
self._models_operations_specs_cache[model_name] = model_op_specs
for op_name, op_spec in iteritems(model_op_specs):
self._operation_spec_cache.setdefault(op_name, op_spec)
return self._models_operations_specs_cache[model_name]
def get_objects_by_filter(self, operation_name, params):
def match_filters(filter_params, obj):
for k, v in iteritems(filter_params):
if k not in obj or obj[k] != v:
return False
return True
dummy, query_params, path_params = _get_user_params(params)
# copy required params to avoid mutation of passed `params` dict
url_params = {ParamName.QUERY_PARAMS: dict(query_params), ParamName.PATH_PARAMS: dict(path_params)}
filters = params.get(ParamName.FILTERS) or {}
if QueryParams.FILTER not in url_params[ParamName.QUERY_PARAMS] and 'name' in filters:
# most endpoints only support filtering by name, so remaining `filters` are applied on returned objects
url_params[ParamName.QUERY_PARAMS][QueryParams.FILTER] = self._stringify_name_filter(filters)
item_generator = iterate_over_pageable_resource(
partial(self.send_general_request, operation_name=operation_name), url_params
)
return (i for i in item_generator if match_filters(filters, i))
def _stringify_name_filter(self, filters):
build_version = self.get_build_version()
if build_version >= '6.4.0':
return "fts~%s" % filters['name']
return "name:%s" % filters['name']
def _fetch_system_info(self):
if not self._system_info:
params = {ParamName.PATH_PARAMS: PATH_PARAMS_FOR_DEFAULT_OBJ}
self._system_info = self.send_general_request('getSystemInformation', params)
return self._system_info
def get_build_version(self):
system_info = self._fetch_system_info()
return system_info['databaseInfo']['buildVersion']
def add_object(self, operation_name, params):
def is_duplicate_name_error(err):
return err.code == UNPROCESSABLE_ENTITY_STATUS and DUPLICATE_NAME_ERROR_MESSAGE in str(err)
try:
return self.send_general_request(operation_name, params)
except FtdServerError as e:
if is_duplicate_name_error(e):
return self._check_equality_with_existing_object(operation_name, params, e)
else:
raise e
def _check_equality_with_existing_object(self, operation_name, params, e):
"""
Looks for an existing object that caused "object duplicate" error and
checks whether it corresponds to the one specified in `params`.
In case a single object is found and it is equal to one we are trying
to create, the existing object is returned.
When the existing object is not equal to the object being created or
several objects are returned, an exception is raised.
"""
model_name = self.get_operation_spec(operation_name)[OperationField.MODEL_NAME]
existing_obj = self._find_object_matching_params(model_name, params)
if existing_obj is not None:
if equal_objects(existing_obj, params[ParamName.DATA]):
return existing_obj
else:
raise FtdConfigurationError(DUPLICATE_ERROR, existing_obj)
raise e
def _find_object_matching_params(self, model_name, params):
get_list_operation = self._find_get_list_operation(model_name)
if not get_list_operation:
return None
data = params[ParamName.DATA]
if not params.get(ParamName.FILTERS):
params[ParamName.FILTERS] = {'name': data['name']}
obj = None
filtered_objs = self.get_objects_by_filter(get_list_operation, params)
for i, obj in enumerate(filtered_objs):
if i > 0:
raise FtdConfigurationError(MULTIPLE_DUPLICATES_FOUND_ERROR)
obj = obj
return obj
def _find_get_list_operation(self, model_name):
operations = self.get_operation_specs_by_model_name(model_name) or {}
return next((
op for op, op_spec in operations.items()
if self._operation_checker.is_get_list_operation(op, op_spec)), None)
def _find_get_operation(self, model_name):
operations = self.get_operation_specs_by_model_name(model_name) or {}
return next((
op for op, op_spec in operations.items()
if self._operation_checker.is_get_operation(op, op_spec)), None)
def delete_object(self, operation_name, params):
def is_invalid_uuid_error(err):
return err.code == UNPROCESSABLE_ENTITY_STATUS and INVALID_UUID_ERROR_MESSAGE in str(err)
try:
return self.send_general_request(operation_name, params)
except FtdServerError as e:
if is_invalid_uuid_error(e):
return {'status': 'Referenced object does not exist'}
else:
raise e
def edit_object(self, operation_name, params):
data, dummy, path_params = _get_user_params(params)
model_name = self.get_operation_spec(operation_name)[OperationField.MODEL_NAME]
get_operation = self._find_get_operation(model_name)
if get_operation:
existing_object = self.send_general_request(get_operation, {ParamName.PATH_PARAMS: path_params})
if not existing_object:
raise FtdConfigurationError('Referenced object does not exist')
elif equal_objects(existing_object, data):
return existing_object
return self.send_general_request(operation_name, params)
def send_general_request(self, operation_name, params):
def stop_if_check_mode():
if self._check_mode:
raise CheckModeException()
self.validate_params(operation_name, params)
stop_if_check_mode()
data, query_params, path_params = _get_user_params(params)
op_spec = self.get_operation_spec(operation_name)
url, method = op_spec[OperationField.URL], op_spec[OperationField.METHOD]
return self._send_request(url, method, data, path_params, query_params)
def _send_request(self, url_path, http_method, body_params=None, path_params=None, query_params=None):
def raise_for_failure(resp):
if not resp[ResponseParams.SUCCESS]:
raise FtdServerError(resp[ResponseParams.RESPONSE], resp[ResponseParams.STATUS_CODE])
response = self._conn.send_request(url_path=url_path, http_method=http_method, body_params=body_params,
path_params=path_params, query_params=query_params)
raise_for_failure(response)
if http_method != HTTPMethod.GET:
self.config_changed = True
return response[ResponseParams.RESPONSE]
def validate_params(self, operation_name, params):
report = {}
op_spec = self.get_operation_spec(operation_name)
data, query_params, path_params = _get_user_params(params)
def validate(validation_method, field_name, user_params):
key = 'Invalid %s provided' % field_name
try:
is_valid, validation_report = validation_method(operation_name, user_params)
if not is_valid:
report[key] = validation_report
except Exception as e:
report[key] = str(e)
return report
validate(self._conn.validate_query_params, ParamName.QUERY_PARAMS, query_params)
validate(self._conn.validate_path_params, ParamName.PATH_PARAMS, path_params)
if is_post_request(op_spec) or is_put_request(op_spec):
validate(self._conn.validate_data, ParamName.DATA, data)
if report:
raise ValidationError(report)
@staticmethod
def _get_operation_name(checker, operations):
return next((op_name for op_name, op_spec in iteritems(operations) if checker(op_name, op_spec)), None)
def _add_upserted_object(self, model_operations, params):
add_op_name = self._get_operation_name(self._operation_checker.is_add_operation, model_operations)
if not add_op_name:
raise FtdConfigurationError(ADD_OPERATION_NOT_SUPPORTED_ERROR)
return self.add_object(add_op_name, params)
def _edit_upserted_object(self, model_operations, existing_object, params):
edit_op_name = self._get_operation_name(self._operation_checker.is_edit_operation, model_operations)
_set_default(params, 'path_params', {})
_set_default(params, 'data', {})
params['path_params']['objId'] = existing_object['id']
copy_identity_properties(existing_object, params['data'])
return self.edit_object(edit_op_name, params)
def upsert_object(self, op_name, params):
"""
Updates an object if it already exists, or tries to create a new one if there is no
such object. If multiple objects match filter criteria, or add operation is not supported,
the exception is raised.
:param op_name: upsert operation name
:type op_name: str
:param params: params that upsert operation should be executed with
:type params: dict
:return: upserted object representation
:rtype: dict
"""
def extract_and_validate_model():
model = op_name[len(OperationNamePrefix.UPSERT):]
if not self._conn.get_model_spec(model):
raise FtdInvalidOperationNameError(op_name)
return model
model_name = extract_and_validate_model()
model_operations = self.get_operation_specs_by_model_name(model_name)
if not self._operation_checker.is_upsert_operation_supported(model_operations):
raise FtdInvalidOperationNameError(op_name)
existing_obj = self._find_object_matching_params(model_name, params)
if existing_obj:
equal_to_existing_obj = equal_objects(existing_obj, params[ParamName.DATA])
return existing_obj if equal_to_existing_obj \
else self._edit_upserted_object(model_operations, existing_obj, params)
else:
return self._add_upserted_object(model_operations, params)
def _set_default(params, field_name, value):
if field_name not in params or params[field_name] is None:
params[field_name] = value
def is_post_request(operation_spec):
return operation_spec[OperationField.METHOD] == HTTPMethod.POST
def is_put_request(operation_spec):
return operation_spec[OperationField.METHOD] == HTTPMethod.PUT
def _get_user_params(params):
return params.get(ParamName.DATA) or {}, params.get(ParamName.QUERY_PARAMS) or {}, params.get(
ParamName.PATH_PARAMS) or {}
def iterate_over_pageable_resource(resource_func, params):
"""
A generator function that iterates over a resource that supports pagination and lazily returns present items
one by one.
:param resource_func: function that receives `params` argument and returns a page of objects
:type resource_func: callable
:param params: initial dictionary of parameters that will be passed to the resource_func.
Should contain `query_params` inside.
:type params: dict
:return: an iterator containing returned items
:rtype: iterator of dict
"""
# creating a copy not to mutate passed dict
params = copy.deepcopy(params)
params[ParamName.QUERY_PARAMS].setdefault('limit', DEFAULT_PAGE_SIZE)
params[ParamName.QUERY_PARAMS].setdefault('offset', DEFAULT_OFFSET)
limit = int(params[ParamName.QUERY_PARAMS]['limit'])
def received_less_items_than_requested(items_in_response, items_expected):
if items_in_response == items_expected:
return False
elif items_in_response < items_expected:
return True
raise FtdUnexpectedResponse(
"Get List of Objects Response from the server contains more objects than requested. "
"There are {0} item(s) in the response while {1} was(ere) requested".format(
items_in_response, items_expected)
)
while True:
result = resource_func(params=params)
for item in result['items']:
yield item
if received_less_items_than_requested(len(result['items']), limit):
break
# creating a copy not to mutate existing dict
params = copy.deepcopy(params)
query_params = params[ParamName.QUERY_PARAMS]
query_params['offset'] = int(query_params['offset']) + limit

View file

@ -0,0 +1,138 @@
# Copyright (c) 2019 Cisco and/or its affiliates.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from ansible.module_utils.six.moves.urllib.parse import urlparse
try:
from kick.device2.ftd5500x.actions.ftd5500x import Ftd5500x
from kick.device2.kp.actions import Kp
HAS_KICK = True
except ImportError:
HAS_KICK = False
def assert_kick_is_installed(module):
if not HAS_KICK:
module.fail_json(msg='Firepower-kickstart library is required to run this module. '
'Please, install the library with `pip install firepower-kickstart` '
'command and run the playbook again.')
class FtdModel:
FTD_ASA5506_X = 'Cisco ASA5506-X Threat Defense'
FTD_ASA5508_X = 'Cisco ASA5508-X Threat Defense'
FTD_ASA5516_X = 'Cisco ASA5516-X Threat Defense'
FTD_2110 = 'Cisco Firepower 2110 Threat Defense'
FTD_2120 = 'Cisco Firepower 2120 Threat Defense'
FTD_2130 = 'Cisco Firepower 2130 Threat Defense'
FTD_2140 = 'Cisco Firepower 2140 Threat Defense'
@classmethod
def supported_models(cls):
return [getattr(cls, item) for item in dir(cls) if item.startswith('FTD_')]
class FtdPlatformFactory(object):
@staticmethod
def create(model, module_params):
for cls in AbstractFtdPlatform.__subclasses__():
if cls.supports_ftd_model(model):
return cls(module_params)
raise ValueError("FTD model '%s' is not supported by this module." % model)
class AbstractFtdPlatform(object):
PLATFORM_MODELS = []
def install_ftd_image(self, params):
raise NotImplementedError('The method should be overridden in subclass')
@classmethod
def supports_ftd_model(cls, model):
return model in cls.PLATFORM_MODELS
@staticmethod
def parse_rommon_file_location(rommon_file_location):
rommon_url = urlparse(rommon_file_location)
if rommon_url.scheme != 'tftp':
raise ValueError('The ROMMON image must be downloaded from TFTP server, other protocols are not supported.')
return rommon_url.netloc, rommon_url.path
class Ftd2100Platform(AbstractFtdPlatform):
PLATFORM_MODELS = [FtdModel.FTD_2110, FtdModel.FTD_2120, FtdModel.FTD_2130, FtdModel.FTD_2140]
def __init__(self, params):
self._ftd = Kp(hostname=params["device_hostname"],
login_username=params["device_username"],
login_password=params["device_password"],
sudo_password=params.get("device_sudo_password") or params["device_password"])
def install_ftd_image(self, params):
line = self._ftd.ssh_console(ip=params["console_ip"],
port=params["console_port"],
username=params["console_username"],
password=params["console_password"])
try:
rommon_server, rommon_path = self.parse_rommon_file_location(params["rommon_file_location"])
line.baseline_fp2k_ftd(tftp_server=rommon_server,
rommon_file=rommon_path,
uut_hostname=params["device_hostname"],
uut_username=params["device_username"],
uut_password=params.get("device_new_password") or params["device_password"],
uut_ip=params["device_ip"],
uut_netmask=params["device_netmask"],
uut_gateway=params["device_gateway"],
dns_servers=params["dns_server"],
search_domains=params["search_domains"],
fxos_url=params["image_file_location"],
ftd_version=params["image_version"])
finally:
line.disconnect()
class FtdAsa5500xPlatform(AbstractFtdPlatform):
PLATFORM_MODELS = [FtdModel.FTD_ASA5506_X, FtdModel.FTD_ASA5508_X, FtdModel.FTD_ASA5516_X]
def __init__(self, params):
self._ftd = Ftd5500x(hostname=params["device_hostname"],
login_password=params["device_password"],
sudo_password=params.get("device_sudo_password") or params["device_password"])
def install_ftd_image(self, params):
line = self._ftd.ssh_console(ip=params["console_ip"],
port=params["console_port"],
username=params["console_username"],
password=params["console_password"])
try:
rommon_server, rommon_path = self.parse_rommon_file_location(params["rommon_file_location"])
line.rommon_to_new_image(rommon_tftp_server=rommon_server,
rommon_image=rommon_path,
pkg_image=params["image_file_location"],
uut_ip=params["device_ip"],
uut_netmask=params["device_netmask"],
uut_gateway=params["device_gateway"],
dns_server=params["dns_server"],
search_domains=params["search_domains"],
hostname=params["device_hostname"])
finally:
line.disconnect()

View file

@ -0,0 +1,638 @@
# Copyright (c) 2018 Cisco and/or its affiliates.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from ansible_collections.community.general.plugins.module_utils.network.ftd.common import HTTPMethod
from ansible.module_utils.six import integer_types, string_types, iteritems
FILE_MODEL_NAME = '_File'
SUCCESS_RESPONSE_CODE = '200'
DELETE_PREFIX = 'delete'
class OperationField:
URL = 'url'
METHOD = 'method'
PARAMETERS = 'parameters'
MODEL_NAME = 'modelName'
DESCRIPTION = 'description'
RETURN_MULTIPLE_ITEMS = 'returnMultipleItems'
TAGS = "tags"
class SpecProp:
DEFINITIONS = 'definitions'
OPERATIONS = 'operations'
MODELS = 'models'
MODEL_OPERATIONS = 'model_operations'
class PropName:
ENUM = 'enum'
TYPE = 'type'
REQUIRED = 'required'
INVALID_TYPE = 'invalid_type'
REF = '$ref'
ALL_OF = 'allOf'
BASE_PATH = 'basePath'
PATHS = 'paths'
OPERATION_ID = 'operationId'
SCHEMA = 'schema'
ITEMS = 'items'
PROPERTIES = 'properties'
RESPONSES = 'responses'
NAME = 'name'
DESCRIPTION = 'description'
class PropType:
STRING = 'string'
BOOLEAN = 'boolean'
INTEGER = 'integer'
NUMBER = 'number'
OBJECT = 'object'
ARRAY = 'array'
FILE = 'file'
class OperationParams:
PATH = 'path'
QUERY = 'query'
class QueryParams:
FILTER = 'filter'
class PathParams:
OBJ_ID = 'objId'
def _get_model_name_from_url(schema_ref):
path = schema_ref.split('/')
return path[len(path) - 1]
class IllegalArgumentException(ValueError):
"""
Exception raised when the function parameters:
- not all passed
- empty string
- wrong type
"""
pass
class ValidationError(ValueError):
pass
class FdmSwaggerParser:
_definitions = None
_base_path = None
def parse_spec(self, spec, docs=None):
"""
This method simplifies a swagger format, resolves a model name for each operation, and adds documentation for
each operation and model if it is provided.
:param spec: An API specification in the swagger format, see
<https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md>
:type spec: dict
:param spec: A documentation map containing descriptions for models, operations and operation parameters.
:type docs: dict
:rtype: dict
:return:
Ex.
The models field contains model definition from swagger see
<#https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#definitions>
{
'models':{
'model_name':{...},
...
},
'operations':{
'operation_name':{
'method': 'get', #post, put, delete
'url': '/api/fdm/v2/object/networks', #url already contains a value from `basePath`
'modelName': 'NetworkObject', # it is a link to the model from 'models'
# None - for a delete operation or we don't have information
# '_File' - if an endpoint works with files
'returnMultipleItems': False, # shows if the operation returns a single item or an item list
'parameters': {
'path':{
'param_name':{
'type': 'string'#integer, boolean, number
'required' True #False
}
...
},
'query':{
'param_name':{
'type': 'string'#integer, boolean, number
'required' True #False
}
...
}
}
},
...
},
'model_operations':{
'model_name':{ # a list of operations available for the current model
'operation_name':{
... # the same as in the operations section
},
...
},
...
}
}
"""
self._definitions = spec[SpecProp.DEFINITIONS]
self._base_path = spec[PropName.BASE_PATH]
operations = self._get_operations(spec)
if docs:
operations = self._enrich_operations_with_docs(operations, docs)
self._definitions = self._enrich_definitions_with_docs(self._definitions, docs)
return {
SpecProp.MODELS: self._definitions,
SpecProp.OPERATIONS: operations,
SpecProp.MODEL_OPERATIONS: self._get_model_operations(operations)
}
@property
def base_path(self):
return self._base_path
def _get_model_operations(self, operations):
model_operations = {}
for operations_name, params in iteritems(operations):
model_name = params[OperationField.MODEL_NAME]
model_operations.setdefault(model_name, {})[operations_name] = params
return model_operations
def _get_operations(self, spec):
paths_dict = spec[PropName.PATHS]
operations_dict = {}
for url, operation_params in iteritems(paths_dict):
for method, params in iteritems(operation_params):
operation = {
OperationField.METHOD: method,
OperationField.URL: self._base_path + url,
OperationField.MODEL_NAME: self._get_model_name(method, params),
OperationField.RETURN_MULTIPLE_ITEMS: self._return_multiple_items(params),
OperationField.TAGS: params.get(OperationField.TAGS, [])
}
if OperationField.PARAMETERS in params:
operation[OperationField.PARAMETERS] = self._get_rest_params(params[OperationField.PARAMETERS])
operation_id = params[PropName.OPERATION_ID]
operations_dict[operation_id] = operation
return operations_dict
def _enrich_operations_with_docs(self, operations, docs):
def get_operation_docs(op):
op_url = op[OperationField.URL][len(self._base_path):]
return docs[PropName.PATHS].get(op_url, {}).get(op[OperationField.METHOD], {})
for operation in operations.values():
operation_docs = get_operation_docs(operation)
operation[OperationField.DESCRIPTION] = operation_docs.get(PropName.DESCRIPTION, '')
if OperationField.PARAMETERS in operation:
param_descriptions = dict((
(p[PropName.NAME], p[PropName.DESCRIPTION])
for p in operation_docs.get(OperationField.PARAMETERS, {})
))
for param_name, params_spec in operation[OperationField.PARAMETERS][OperationParams.PATH].items():
params_spec[OperationField.DESCRIPTION] = param_descriptions.get(param_name, '')
for param_name, params_spec in operation[OperationField.PARAMETERS][OperationParams.QUERY].items():
params_spec[OperationField.DESCRIPTION] = param_descriptions.get(param_name, '')
return operations
def _enrich_definitions_with_docs(self, definitions, docs):
for model_name, model_def in definitions.items():
model_docs = docs[SpecProp.DEFINITIONS].get(model_name, {})
model_def[PropName.DESCRIPTION] = model_docs.get(PropName.DESCRIPTION, '')
for prop_name, prop_spec in model_def.get(PropName.PROPERTIES, {}).items():
prop_spec[PropName.DESCRIPTION] = model_docs.get(PropName.PROPERTIES, {}).get(prop_name, '')
prop_spec[PropName.REQUIRED] = prop_name in model_def.get(PropName.REQUIRED, [])
return definitions
def _get_model_name(self, method, params):
if method == HTTPMethod.GET:
return self._get_model_name_from_responses(params)
elif method == HTTPMethod.POST or method == HTTPMethod.PUT:
return self._get_model_name_for_post_put_requests(params)
elif method == HTTPMethod.DELETE:
return self._get_model_name_from_delete_operation(params)
else:
return None
@staticmethod
def _return_multiple_items(op_params):
"""
Defines if the operation returns one item or a list of items.
:param op_params: operation specification
:return: True if the operation returns a list of items, otherwise False
"""
try:
schema = op_params[PropName.RESPONSES][SUCCESS_RESPONSE_CODE][PropName.SCHEMA]
return PropName.ITEMS in schema[PropName.PROPERTIES]
except KeyError:
return False
def _get_model_name_from_delete_operation(self, params):
operation_id = params[PropName.OPERATION_ID]
if operation_id.startswith(DELETE_PREFIX):
model_name = operation_id[len(DELETE_PREFIX):]
if model_name in self._definitions:
return model_name
return None
def _get_model_name_for_post_put_requests(self, params):
model_name = None
if OperationField.PARAMETERS in params:
body_param_dict = self._get_body_param_from_parameters(params[OperationField.PARAMETERS])
if body_param_dict:
schema_ref = body_param_dict[PropName.SCHEMA][PropName.REF]
model_name = self._get_model_name_byschema_ref(schema_ref)
if model_name is None:
model_name = self._get_model_name_from_responses(params)
return model_name
@staticmethod
def _get_body_param_from_parameters(params):
return next((param for param in params if param['in'] == 'body'), None)
def _get_model_name_from_responses(self, params):
responses = params[PropName.RESPONSES]
if SUCCESS_RESPONSE_CODE in responses:
response = responses[SUCCESS_RESPONSE_CODE][PropName.SCHEMA]
if PropName.REF in response:
return self._get_model_name_byschema_ref(response[PropName.REF])
elif PropName.PROPERTIES in response:
ref = response[PropName.PROPERTIES][PropName.ITEMS][PropName.ITEMS][PropName.REF]
return self._get_model_name_byschema_ref(ref)
elif (PropName.TYPE in response) and response[PropName.TYPE] == PropType.FILE:
return FILE_MODEL_NAME
else:
return None
def _get_rest_params(self, params):
path = {}
query = {}
operation_param = {
OperationParams.PATH: path,
OperationParams.QUERY: query
}
for param in params:
in_param = param['in']
if in_param == OperationParams.QUERY:
query[param[PropName.NAME]] = self._simplify_param_def(param)
elif in_param == OperationParams.PATH:
path[param[PropName.NAME]] = self._simplify_param_def(param)
return operation_param
@staticmethod
def _simplify_param_def(param):
return {
PropName.TYPE: param[PropName.TYPE],
PropName.REQUIRED: param[PropName.REQUIRED]
}
def _get_model_name_byschema_ref(self, schema_ref):
model_name = _get_model_name_from_url(schema_ref)
model_def = self._definitions[model_name]
if PropName.ALL_OF in model_def:
return self._get_model_name_byschema_ref(model_def[PropName.ALL_OF][0][PropName.REF])
else:
return model_name
class FdmSwaggerValidator:
def __init__(self, spec):
"""
:param spec: dict
data from FdmSwaggerParser().parse_spec()
"""
self._operations = spec[SpecProp.OPERATIONS]
self._models = spec[SpecProp.MODELS]
def validate_data(self, operation_name, data=None):
"""
Validate data for the post|put requests
:param operation_name: string
The value must be non empty string.
The operation name is used to get a model specification
:param data: dict
The value must be in the format that the model(from operation) expects
:rtype: (bool, string|dict)
:return:
(True, None) - if data valid
Invalid:
(False, {
'required': [ #list of the fields that are required but were not present in the data
'field_name',
'patent.field_name',# when the nested field is omitted
'patent.list[2].field_name' # if data is array and one of the field is omitted
],
'invalid_type':[ #list of the fields with invalid data
{
'path': 'objId', #field name or path to the field. Ex. objects[3].id, parent.name
'expected_type': 'string',# expected type. Ex. 'object', 'array', 'string', 'integer',
# 'boolean', 'number'
'actually_value': 1 # the value that user passed
}
]
})
:raises IllegalArgumentException
'The operation_name parameter must be a non-empty string' if operation_name is not valid
'The data parameter must be a dict' if data neither dict or None
'{operation_name} operation does not support' if the spec does not contain the operation
"""
if data is None:
data = {}
self._check_validate_data_params(data, operation_name)
operation = self._operations[operation_name]
model = self._models[operation[OperationField.MODEL_NAME]]
status = self._init_report()
self._validate_object(status, model, data, '')
if len(status[PropName.REQUIRED]) > 0 or len(status[PropName.INVALID_TYPE]) > 0:
return False, self._delete_empty_field_from_report(status)
return True, None
def _check_validate_data_params(self, data, operation_name):
if not operation_name or not isinstance(operation_name, string_types):
raise IllegalArgumentException("The operation_name parameter must be a non-empty string")
if not isinstance(data, dict):
raise IllegalArgumentException("The data parameter must be a dict")
if operation_name not in self._operations:
raise IllegalArgumentException("{0} operation does not support".format(operation_name))
def validate_query_params(self, operation_name, params):
"""
Validate params for the get requests. Use this method for validating the query part of the url.
:param operation_name: string
The value must be non empty string.
The operation name is used to get a params specification
:param params: dict
should be in the format that the specification(from operation) expects
Ex.
{
'objId': "string_value",
'p_integer': 1,
'p_boolean': True,
'p_number': 2.3
}
:rtype:(Boolean, msg)
:return:
(True, None) - if params valid
Invalid:
(False, {
'required': [ #list of the fields that are required but are not present in the params
'field_name'
],
'invalid_type':[ #list of the fields with invalid data and expected type of the params
{
'path': 'objId', #field name
'expected_type': 'string',#expected type. Ex. 'string', 'integer', 'boolean', 'number'
'actually_value': 1 # the value that user passed
}
]
})
:raises IllegalArgumentException
'The operation_name parameter must be a non-empty string' if operation_name is not valid
'The params parameter must be a dict' if params neither dict or None
'{operation_name} operation does not support' if the spec does not contain the operation
"""
return self._validate_url_params(operation_name, params, resource=OperationParams.QUERY)
def validate_path_params(self, operation_name, params):
"""
Validate params for the get requests. Use this method for validating the path part of the url.
:param operation_name: string
The value must be non empty string.
The operation name is used to get a params specification
:param params: dict
should be in the format that the specification(from operation) expects
Ex.
{
'objId': "string_value",
'p_integer': 1,
'p_boolean': True,
'p_number': 2.3
}
:rtype:(Boolean, msg)
:return:
(True, None) - if params valid
Invalid:
(False, {
'required': [ #list of the fields that are required but are not present in the params
'field_name'
],
'invalid_type':[ #list of the fields with invalid data and expected type of the params
{
'path': 'objId', #field name
'expected_type': 'string',#expected type. Ex. 'string', 'integer', 'boolean', 'number'
'actually_value': 1 # the value that user passed
}
]
})
:raises IllegalArgumentException
'The operation_name parameter must be a non-empty string' if operation_name is not valid
'The params parameter must be a dict' if params neither dict or None
'{operation_name} operation does not support' if the spec does not contain the operation
"""
return self._validate_url_params(operation_name, params, resource=OperationParams.PATH)
def _validate_url_params(self, operation, params, resource):
if params is None:
params = {}
self._check_validate_url_params(operation, params)
operation = self._operations[operation]
if OperationField.PARAMETERS in operation and resource in operation[OperationField.PARAMETERS]:
spec = operation[OperationField.PARAMETERS][resource]
status = self._init_report()
self._check_url_params(status, spec, params)
if len(status[PropName.REQUIRED]) > 0 or len(status[PropName.INVALID_TYPE]) > 0:
return False, self._delete_empty_field_from_report(status)
return True, None
else:
return True, None
def _check_validate_url_params(self, operation, params):
if not operation or not isinstance(operation, string_types):
raise IllegalArgumentException("The operation_name parameter must be a non-empty string")
if not isinstance(params, dict):
raise IllegalArgumentException("The params parameter must be a dict")
if operation not in self._operations:
raise IllegalArgumentException("{0} operation does not support".format(operation))
def _check_url_params(self, status, spec, params):
for prop_name in spec.keys():
prop = spec[prop_name]
if prop[PropName.REQUIRED] and prop_name not in params:
status[PropName.REQUIRED].append(prop_name)
continue
if prop_name in params:
expected_type = prop[PropName.TYPE]
value = params[prop_name]
if prop_name in params and not self._is_correct_simple_types(expected_type, value, allow_null=False):
self._add_invalid_type_report(status, '', prop_name, expected_type, value)
def _validate_object(self, status, model, data, path):
if self._is_enum(model):
self._check_enum(status, model, data, path)
elif self._is_object(model):
self._check_object(status, model, data, path)
def _is_enum(self, model):
return self._is_string_type(model) and PropName.ENUM in model
def _check_enum(self, status, model, value, path):
if value is not None and value not in model[PropName.ENUM]:
self._add_invalid_type_report(status, path, '', PropName.ENUM, value)
def _add_invalid_type_report(self, status, path, prop_name, expected_type, actually_value):
status[PropName.INVALID_TYPE].append({
'path': self._create_path_to_field(path, prop_name),
'expected_type': expected_type,
'actually_value': actually_value
})
def _check_object(self, status, model, data, path):
if data is None:
return
if not isinstance(data, dict):
self._add_invalid_type_report(status, path, '', PropType.OBJECT, data)
return None
if PropName.REQUIRED in model:
self._check_required_fields(status, model[PropName.REQUIRED], data, path)
model_properties = model[PropName.PROPERTIES]
for prop in model_properties.keys():
if prop in data:
model_prop_val = model_properties[prop]
expected_type = model_prop_val[PropName.TYPE]
actually_value = data[prop]
self._check_types(status, actually_value, expected_type, model_prop_val, path, prop)
def _check_types(self, status, actually_value, expected_type, model, path, prop_name):
if expected_type == PropType.OBJECT:
ref_model = self._get_model_by_ref(model)
self._validate_object(status, ref_model, actually_value,
path=self._create_path_to_field(path, prop_name))
elif expected_type == PropType.ARRAY:
self._check_array(status, model, actually_value,
path=self._create_path_to_field(path, prop_name))
elif not self._is_correct_simple_types(expected_type, actually_value):
self._add_invalid_type_report(status, path, prop_name, expected_type, actually_value)
def _get_model_by_ref(self, model_prop_val):
model = _get_model_name_from_url(model_prop_val[PropName.REF])
return self._models[model]
def _check_required_fields(self, status, required_fields, data, path):
missed_required_fields = [self._create_path_to_field(path, field) for field in
required_fields if field not in data.keys() or data[field] is None]
if len(missed_required_fields) > 0:
status[PropName.REQUIRED] += missed_required_fields
def _check_array(self, status, model, data, path):
if data is None:
return
elif not isinstance(data, list):
self._add_invalid_type_report(status, path, '', PropType.ARRAY, data)
else:
item_model = model[PropName.ITEMS]
for i, item_data in enumerate(data):
self._check_types(status, item_data, item_model[PropName.TYPE], item_model, "{0}[{1}]".format(path, i),
'')
@staticmethod
def _is_correct_simple_types(expected_type, value, allow_null=True):
def is_numeric_string(s):
try:
float(s)
return True
except ValueError:
return False
if value is None and allow_null:
return True
elif expected_type == PropType.STRING:
return isinstance(value, string_types)
elif expected_type == PropType.BOOLEAN:
return isinstance(value, bool)
elif expected_type == PropType.INTEGER:
is_integer = isinstance(value, integer_types) and not isinstance(value, bool)
is_digit_string = isinstance(value, string_types) and value.isdigit()
return is_integer or is_digit_string
elif expected_type == PropType.NUMBER:
is_number = isinstance(value, (integer_types, float)) and not isinstance(value, bool)
is_numeric_string = isinstance(value, string_types) and is_numeric_string(value)
return is_number or is_numeric_string
return False
@staticmethod
def _is_string_type(model):
return PropName.TYPE in model and model[PropName.TYPE] == PropType.STRING
@staticmethod
def _init_report():
return {
PropName.REQUIRED: [],
PropName.INVALID_TYPE: []
}
@staticmethod
def _delete_empty_field_from_report(status):
if not status[PropName.REQUIRED]:
del status[PropName.REQUIRED]
if not status[PropName.INVALID_TYPE]:
del status[PropName.INVALID_TYPE]
return status
@staticmethod
def _create_path_to_field(path='', field=''):
separator = ''
if path and field:
separator = '.'
return "{0}{1}{2}".format(path, separator, field)
@staticmethod
def _is_object(model):
return PropName.TYPE in model and model[PropName.TYPE] == PropType.OBJECT

View file

@ -0,0 +1,41 @@
# Copyright (c) 2018 Cisco and/or its affiliates.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from ansible_collections.community.general.plugins.module_utils.network.ftd.configuration import ParamName, PATH_PARAMS_FOR_DEFAULT_OBJ
class FtdOperations:
"""
Utility class for common operation names
"""
GET_SYSTEM_INFO = 'getSystemInformation'
GET_MANAGEMENT_IP_LIST = 'getManagementIPList'
GET_DNS_SETTING_LIST = 'getDeviceDNSSettingsList'
GET_DNS_SERVER_GROUP = 'getDNSServerGroup'
def get_system_info(resource):
"""
Executes `getSystemInformation` operation and returns information about the system.
:param resource: a BaseConfigurationResource object to connect to the device
:return: a dictionary with system information about the device and its software
"""
path_params = {ParamName.PATH_PARAMS: PATH_PARAMS_FOR_DEFAULT_OBJ}
system_info = resource.execute_operation(FtdOperations.GET_SYSTEM_INFO, path_params)
return system_info