diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index 16f61156d2..b0db0f7f18 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -65,6 +65,9 @@ files: $callbacks/log_plays.py: {} $callbacks/loganalytics.py: maintainers: zhcli + $callbacks/loganalytics_ingestion.py: + ignore: zhcli + maintainers: pboushy vsh47 wtcline-intc $callbacks/logdna.py: {} $callbacks/logentries.py: {} $callbacks/logstash.py: diff --git a/plugins/callback/loganalytics_ingestion.py b/plugins/callback/loganalytics_ingestion.py new file mode 100644 index 0000000000..69a72fd2e7 --- /dev/null +++ b/plugins/callback/loganalytics_ingestion.py @@ -0,0 +1,340 @@ +#!/usr/bin/env python +# Copyright (c) Ansible project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +DOCUMENTATION = """ +name: loganalytics_ingestion +type: notification +short_description: Posts task results to an Azure Log Analytics workspace using the new Logs Ingestion API +author: + - Wade Cline (@wtcline-intc) + - Sriramoju Vishal Bharath (@vsh47) + - Cyrus Li (@zhcli) +description: + - This callback plugin will post task results in JSON format to an Azure Log Analytics workspace using the new Logs Ingestion API. +version_added: "12.4.0" +requirements: + - The callback plugin has been enabled. + - An Azure Log Analytics workspace has been established. + - A Data Collection Rule (DCR) and custom table are created. +options: + dce_url: + description: URL of the Data Collection Endpoint (DCE) for Azure Logs Ingestion API. + type: str + required: true + env: + - name: ANSIBLE_LOGANALYTICS_DCE_URL + ini: + - section: callback_loganalytics + key: dce_url + dcr_id: + description: Data Collection Rule (DCR) ID for the Azure Log Ingestion API. + type: str + required: true + env: + - name: ANSIBLE_LOGANALYTICS_DCR_ID + ini: + - section: callback_loganalytics + key: dcr_id + disable_attempts: + description: + - When O(disable_on_failure=true), number of plugin failures that must occur before the plugin is disabled. + - This helps prevent outright plugin failure from a single, transient network issue. + type: int + default: 3 + env: + - name: ANSIBLE_LOGANALYTICS_DISABLE_ATTEMPTS + ini: + - section: callback_loganalytics + key: disable_attempts + disable_on_failure: + description: Stop trying to send data on plugin failure. + type: bool + default: true + env: + - name: ANSIBLE_LOGANALYTICS_DISABLE_ON_FAILURE + ini: + - section: callback_loganalytics + key: disable_on_failure + client_id: + description: Client ID of the Azure App registration for OAuth2 authentication ("Modern Authentication"). + type: str + required: true + env: + - name: ANSIBLE_LOGANALYTICS_CLIENT_ID + ini: + - section: callback_loganalytics + key: client_id + client_secret: + description: Client Secret of the Azure App registration. + type: str + required: true + env: + - name: ANSIBLE_LOGANALYTICS_CLIENT_SECRET + ini: + - section: callback_loganalytics + key: client_secret + include_content: + description: Send the content to the Azure Log Analytics workspace. + type: bool + default: false + env: + - name: ANSIBLE_LOGANALYTICS_INCLUDE_CONTENT + ini: + - section: callback_loganalytics + key: include_content + include_task_args: + description: Send the task args to the Azure Log Analytics workspace. + type: bool + default: false + env: + - name: ANSIBLE_LOGANALYTICS_INCLUDE_TASK_ARGS + ini: + - section: callback_loganalytics + key: include_task_args + stream_name: + description: The name of the stream used to send the logs to the Azure Log Analytics workspace. + type: str + required: true + env: + - name: ANSIBLE_LOGANALYTICS_STREAM_NAME + ini: + - section: callback_loganalytics + key: stream_name + tenant_id: + description: Tenant ID for the Azure Active Directory. + type: str + required: true + env: + - name: ANSIBLE_LOGANALYTICS_TENANT_ID + ini: + - section: callback_loganalytics + key: tenant_id + timeout: + description: Timeout for the HTTP requests to the Azure Log Analytics API. + type: int + default: 2 + env: + - name: ANSIBLE_LOGANALYTICS_TIMEOUT + ini: + - section: callback_loganalytics + key: timeout +seealso: + - name: Logs Ingestion API + description: Overview of Logs Ingestion API in Azure Monitor + link: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview +notes: + - Triple verbosity logging (C(-vvv)) can be used to generate JSON sample data for creating the table schema in Azure Log Analytics. + Search for the string C(Event Data:) in the output in order to locate the data sample. +""" + +EXAMPLES = """ +examples: | + Enable the plugin in ansible.cfg: + [defaults] + callback_enabled = community.general.loganalytics_ingestion + Set the environment variables: + export ANSIBLE_LOGANALYTICS_DCE_URL=https://my-dce.ingest.monitor.azure.com + export ANSIBLE_LOGANALYTICS_DCR_ID=dcr-xxxxxx + export ANSIBLE_LOGANALYTICS_CLIENT_ID=xxxxxxxx + export ANSIBLE_LOGANALYTICS_CLIENT_SECRET=xxxxxxxx + export ANSIBLE_LOGANALYTICS_TENANT_ID=xxxxxxxx + export ANSIBLE_LOGANALYTICS_STREAM_NAME=Custom-MyTable +""" + +import getpass +import json +import socket +import uuid +from datetime import datetime, timedelta, timezone +from os.path import basename +from urllib.parse import urlencode + +from ansible.module_utils.urls import open_url +from ansible.plugins.callback import CallbackBase +from ansible.utils.display import Display + +display = Display() + + +class AzureLogAnalyticsIngestionSource: + def __init__( + self, + dce_url, + dcr_id, + disable_attempts, + disable_on_failure, + client_id, + client_secret, + tenant_id, + stream_name, + include_task_args, + include_content, + timeout, + fqcn, + ): + self.dce_url = dce_url + self.dcr_id = dcr_id + self.disabled = False + self.disable_attempts = disable_attempts + self.disable_on_failure = disable_on_failure + self.client_id = client_id + self.client_secret = client_secret + self.failures = 0 + self.tenant_id = tenant_id + self.stream_name = stream_name + self.include_task_args = include_task_args + self.include_content = include_content + self.token_expiration_time = None + self.session = str(uuid.uuid4()) + self.host = socket.gethostname() + self.user = getpass.getuser() + self.timeout = timeout + self.fqcn = fqcn + + self.bearer_token = self.get_bearer_token() + + # OAuth2 authentication method to get a Bearer token + # This replaces the shared_key authentication mechanism + def get_bearer_token(self): + url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token" + headers = {"Content-Type": "application/x-www-form-urlencoded"} + data = urlencode( + { + "grant_type": "client_credentials", + "client_id": self.client_id, + "client_secret": self.client_secret, + # The scope value comes from https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview#headers + # and https://learn.microsoft.com/en-us/entra/identity-platform/scopes-oidc#the-default-scope + "scope": "https://monitor.azure.com/.default", + } + ) + response = open_url(url, data=data, force=True, headers=headers, method="POST", timeout=self.timeout) + j = json.loads(response.read().decode("utf-8")) + self.token_expiration_time = datetime.now() + timedelta(seconds=j.get("expires_in")) + return j.get("access_token") + + def is_token_valid(self): + return datetime.now() + timedelta(seconds=10) < self.token_expiration_time + + # Method to send event data to the Azure Logs Ingestion API + # This replaces the legacy API call and now uses the Logs Ingestion API endpoint + def send_event(self, event_data): + if not self.is_token_valid(): + self.bearer_token = self.get_bearer_token() + ingestion_url = ( + f"{self.dce_url}/dataCollectionRules/{self.dcr_id}/streams/{self.stream_name}?api-version=2023-01-01" + ) + headers = {"Authorization": f"Bearer {self.bearer_token}", "Content-Type": "application/json"} + open_url(ingestion_url, data=json.dumps(event_data), headers=headers, method="POST", timeout=self.timeout) + + def _rfc1123date(self): + return datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT") + + # This method wraps the private method with the appropriate error handling. + def send_to_loganalytics(self, playbook_name, result, state): + if self.disabled: + return + try: + self._send_to_loganalytics(playbook_name, result, state) + except Exception as e: + display.warning(f"{self.fqcn} callback plugin failure: {e}.") + if self.disable_on_failure: + self.failures += 1 + if self.failures >= self.disable_attempts: + display.warning( + f"{self.fqcn} callback plugin failures exceed maximum of '{self.disable_attempts}'! Disabling plugin!" + ) + self.disabled = True + else: + display.v(f"{self.fqcn} callback plugin failure {self.failures}/{self.disable_attempts}") + + def _send_to_loganalytics(self, playbook_name, result, state): + ansible_role = str(result._task._role) if result._task._role else None + + # Include/Exclude task args + if not self.include_task_args: + result._task_fields.pop("args", None) + + # Include/Exclude content + if not self.include_content: + result._result.pop("content", None) + + # Build the event data + event_data = [ + { + "TimeGenerated": self._rfc1123date(), + "Host": result._host.name, + "User": self.user, + "Playbook": playbook_name, + "Role": ansible_role, + "TaskName": result._task.get_name(), + "Task": result._task_fields, + "Action": result._task_fields["action"], + "State": state, + "Result": result._result, + "Session": self.session, + } + ] + + # The data displayed here can be used as a sample file in order to create the table's schema. + display.vvv(f"Event Data: {json.dumps(event_data)}") + + self.send_event(event_data) + + +class CallbackModule(CallbackBase): + CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = "notification" + CALLBACK_NAME = "loganalytics_ingestion" + CALLBACK_NEEDS_ENABLED = True + + def __init__(self, display=None): + super().__init__(display=display) + self.start_datetimes = {} + self.playbook_name = None + self.azure_loganalytics = None + self.fqcn = f"community.general.{self.CALLBACK_NAME}" + + def set_options(self, task_keys=None, var_options=None, direct=None): + super().set_options(task_keys=task_keys, var_options=var_options, direct=direct) + + # Set options for the new Azure Logs Ingestion API configuration + self.client_id = self.get_option("client_id") + self.client_secret = self.get_option("client_secret") + self.dce_url = self.get_option("dce_url") + self.dcr_id = self.get_option("dcr_id") + self.disable_attempts = self.get_option("disable_attempts") + self.disable_on_failure = self.get_option("disable_on_failure") + self.include_content = self.get_option("include_content") + self.include_task_args = self.get_option("include_task_args") + self.stream_name = self.get_option("stream_name") + self.tenant_id = self.get_option("tenant_id") + self.timeout = self.get_option("timeout") + + # Initialize the AzureLogAnalyticsIngestionSource with the new settings + self.azure_loganalytics = AzureLogAnalyticsIngestionSource( + self.dce_url, + self.dcr_id, + self.disable_attempts, + self.disable_on_failure, + self.client_id, + self.client_secret, + self.tenant_id, + self.stream_name, + self.include_task_args, + self.include_content, + self.timeout, + self.fqcn, + ) + + def v2_playbook_on_start(self, playbook): + self.playbook_name = basename(playbook._file_name) + + # Build event data and send it to the Logs Ingestion API + def v2_runner_on_failed(self, result, **kwargs): + self.azure_loganalytics.send_to_loganalytics(self.playbook_name, result, "FAILED") + + def v2_runner_on_ok(self, result, **kwargs): + self.azure_loganalytics.send_to_loganalytics(self.playbook_name, result, "OK") diff --git a/tests/unit/plugins/callback/test_loganalytics_ingestion.py b/tests/unit/plugins/callback/test_loganalytics_ingestion.py new file mode 100644 index 0000000000..9b7cbcf80a --- /dev/null +++ b/tests/unit/plugins/callback/test_loganalytics_ingestion.py @@ -0,0 +1,82 @@ +# Copyright (c) Ansible project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +import json +import time +import unittest +import unittest.mock +import urllib + +from ansible.executor.task_result import TaskResult + +from ansible_collections.community.general.plugins.callback.loganalytics_ingestion import ( + AzureLogAnalyticsIngestionSource, +) + + +class TestAzureLogAnalyticsIngestion(unittest.TestCase): + dce_url = "https://fake.dce_url.ansible.com" + dcr_id = "fake-dcr-id" + client_id = "fake-client_id" + client_secret = "fake-client-secret" + tenant_id = "fake-tenant-id" + stream_name = "fake-stream-name" + fake_access_token = None + + def setUp(self): + self.fake_access_token = json.dumps( + {"expires_in": time.time() + 3600, "access_token": "fake_access_token"} + ).encode("utf-8") + + @unittest.mock.patch( + "ansible_collections.community.general.plugins.callback.loganalytics_ingestion.open_url", autospec=True + ) + @unittest.mock.patch("ansible.executor.task_result.TaskResult") + def test_sending_data(self, task_result_mock, open_url_mock): + """ + Tests sending data by verifying that the expected POST requests are submitted to the expected hosts. + """ + # The data returned from 'open_url' is only ever read during authentication. + open_url_mock.return_value.read.return_value = self.fake_access_token + + # TODO: How to set plugin default arguments? + # I tried instantiating the 'CallbackModule' but all it ever did was complain that 'client_id' wasn't defined. + self.loganalytics = AzureLogAnalyticsIngestionSource( + self.dce_url, + self.dcr_id, + 3, + True, + self.client_id, + self.client_secret, + self.tenant_id, + self.stream_name, + False, + False, + 2, + "community.general.loganalytics_ingestion", + ) + + assert open_url_mock.call_count == 1 + url = urllib.parse.urlparse(open_url_mock.call_args_list[0][0][0]) + assert url.netloc == "login.microsoftonline.com" + + results = ["foo", "bar", "biz"] + for i, result in enumerate(results, start=1): + host_mock = unittest.mock.Mock("host_mock") + host_mock.name = "fake-name" + task_mock = unittest.mock.Mock("task_mock") + task_mock._role = "fake-role" + task_mock.get_name = lambda r=result: r + + task_result = TaskResult( + host=host_mock, task=task_mock, return_data={}, task_fields={"action": "fake-action", "args": {}} + ) + self.loganalytics.send_to_loganalytics("fake-playbook", task_result, "OK") + + assert open_url_mock.call_count == 1 + i + + url = urllib.parse.urlparse(open_url_mock.call_args_list[i][0][0]) + assert url.scheme + "://" + url.netloc == self.dce_url + + assert json.loads(open_url_mock.call_args_list[i].kwargs.get("data"))[0].get("TaskName") == result