From 38f93c80f1becf339ea8c6eb04d0b0e7c0fc12a4 Mon Sep 17 00:00:00 2001 From: wtcline-intc Date: Fri, 20 Feb 2026 10:11:08 -0800 Subject: [PATCH] New Callback plugin: `loganalytics_ingestion` adding Azure Log Analytics Ingestion (#10306) * Add Azure Log Analytics Ingestion API plugin The Ingestion API allows sending data to a Log Analytics workspace in Azure Monitor. * Fix LogAnalytics Ingestion shebang * Fix Log Analytics Ingestion pep8 tests * Fix Log Analytics Ingestion pylint tests * Fix Log Analytics Ingestion import tests * Fix Log Analytics Ingestion pylint test * Add Log Analytics Ingestion auth timeout Previous behavior was to use the 'request' module's default timeout; this makes auth timeout value consistent with the task submission timeout value. * Display Log Analytics Ingestion event data as JSON Previous behavior was to display the data as a Python dictionary. The new behavior makes it easier to generate a sample JSON file in order to import into Azure when creating the table. * Add Azure Log Analytics Ingestion timeout param This parameter controls how long the plugin will wait for an HTTP response from the Azure Log Analytics API before considering the request a failure. Previous behavior was hardcoded to 2 seconds. * Fix Azure Log Ingestion unit test The class instantiation was missing an additional argument that was added in a previous patch; add it. Converting to JSON also caused the Mock TaskResult object to throw a serialization error; override the function for JSON conversion to just return bogus data instead. * Fix loganalytics_ingestion linter errors * Fix LogAnalytics Ingestion env vars Prefix the LogAnalytics Ingestion plugin's environment variable names with 'ANSIBLE_' in order to align with plugin best practices. * Remove LogAnalytics 'requests' dep from docs The LogAnalytics callback plugin does not actually require 'requests', so remove it from the documented dependencies. * Refactor LogAnalytics Ingestion to use URL utils This replaces the previous behavior of depending on the external 'requests' library. * Simplify LogAnalytics Ingestion token valid check Co-authored-by: Felix Fontein * Remove LogAnalytics Ingestion extra arg validation Argument validation should be handled by ansible-core, so remove the extra argument validation in the plugin itself. * Update LogAnalytics Ingestion version added * Remove LogAnalytics Ingestion coding marker The marker is no longer needed as Python2 is no longer supported. * Fix some LogAnalytics Ingestion grammar errors * Refactor LogAnalytics Ingestion plugin messages Consistently use "plugin" instead of module, and refer to the module by its FQCN instead of its prose name. * Remove LogAnalytics Ingestion extra logic A few unused vars were being set; stop setting them. * Fix LogAnalytics Ingestion nox sanity tests * Fix LogAnalytics Ingestion unit tests The refactor to move away from the 'requests' dependency to use module_utils broke the plugin's unit tests; re-write the plugin's unit tests for module_utils. * Add nox formatting to LogAnalytics Ingestion * Fix Log Analytics Ingestion urllib import Remove the compatibility import via 'six' for 'urllib' since Python 2 support is no longer supported. * Bump LogAnalytics Ingestion plugin version added * Remove LogAnalytics Ingestion required: false docs Required being false is the default, so no need to explicitly add it. * Simplify LogAnalytics Ingestion role name logic * Clean LogAnalytics Ingestion redundant comments * Clean LogAnalytics Ingestion unit test code Rename all Mock objects to use snake_case and consistently use '_mock' as a suffix instead of sometimes using it as a prefix and sometimes using it as a suffix. * Refactor LogAnalytics Ingestion unit tests Move all of the tests outside of the 'setUp' method. * Refactor LogAnalytics Ingestion test Add a test to validate that part of the contents sent match what was supposed to be sent. * Refactor LogAnalytics Ingestion test Make the names consistent again. * Add LogAnalytics Ingestion sample data docs * Apply suggestions from code review Co-authored-by: Felix Fontein --------- Co-authored-by: Felix Fontein --- .github/BOTMETA.yml | 3 + plugins/callback/loganalytics_ingestion.py | 340 ++++++++++++++++++ .../callback/test_loganalytics_ingestion.py | 82 +++++ 3 files changed, 425 insertions(+) create mode 100644 plugins/callback/loganalytics_ingestion.py create mode 100644 tests/unit/plugins/callback/test_loganalytics_ingestion.py diff --git a/.github/BOTMETA.yml b/.github/BOTMETA.yml index 16f61156d2..b0db0f7f18 100644 --- a/.github/BOTMETA.yml +++ b/.github/BOTMETA.yml @@ -65,6 +65,9 @@ files: $callbacks/log_plays.py: {} $callbacks/loganalytics.py: maintainers: zhcli + $callbacks/loganalytics_ingestion.py: + ignore: zhcli + maintainers: pboushy vsh47 wtcline-intc $callbacks/logdna.py: {} $callbacks/logentries.py: {} $callbacks/logstash.py: diff --git a/plugins/callback/loganalytics_ingestion.py b/plugins/callback/loganalytics_ingestion.py new file mode 100644 index 0000000000..69a72fd2e7 --- /dev/null +++ b/plugins/callback/loganalytics_ingestion.py @@ -0,0 +1,340 @@ +#!/usr/bin/env python +# Copyright (c) Ansible project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +DOCUMENTATION = """ +name: loganalytics_ingestion +type: notification +short_description: Posts task results to an Azure Log Analytics workspace using the new Logs Ingestion API +author: + - Wade Cline (@wtcline-intc) + - Sriramoju Vishal Bharath (@vsh47) + - Cyrus Li (@zhcli) +description: + - This callback plugin will post task results in JSON format to an Azure Log Analytics workspace using the new Logs Ingestion API. +version_added: "12.4.0" +requirements: + - The callback plugin has been enabled. + - An Azure Log Analytics workspace has been established. + - A Data Collection Rule (DCR) and custom table are created. +options: + dce_url: + description: URL of the Data Collection Endpoint (DCE) for Azure Logs Ingestion API. + type: str + required: true + env: + - name: ANSIBLE_LOGANALYTICS_DCE_URL + ini: + - section: callback_loganalytics + key: dce_url + dcr_id: + description: Data Collection Rule (DCR) ID for the Azure Log Ingestion API. + type: str + required: true + env: + - name: ANSIBLE_LOGANALYTICS_DCR_ID + ini: + - section: callback_loganalytics + key: dcr_id + disable_attempts: + description: + - When O(disable_on_failure=true), number of plugin failures that must occur before the plugin is disabled. + - This helps prevent outright plugin failure from a single, transient network issue. + type: int + default: 3 + env: + - name: ANSIBLE_LOGANALYTICS_DISABLE_ATTEMPTS + ini: + - section: callback_loganalytics + key: disable_attempts + disable_on_failure: + description: Stop trying to send data on plugin failure. + type: bool + default: true + env: + - name: ANSIBLE_LOGANALYTICS_DISABLE_ON_FAILURE + ini: + - section: callback_loganalytics + key: disable_on_failure + client_id: + description: Client ID of the Azure App registration for OAuth2 authentication ("Modern Authentication"). + type: str + required: true + env: + - name: ANSIBLE_LOGANALYTICS_CLIENT_ID + ini: + - section: callback_loganalytics + key: client_id + client_secret: + description: Client Secret of the Azure App registration. + type: str + required: true + env: + - name: ANSIBLE_LOGANALYTICS_CLIENT_SECRET + ini: + - section: callback_loganalytics + key: client_secret + include_content: + description: Send the content to the Azure Log Analytics workspace. + type: bool + default: false + env: + - name: ANSIBLE_LOGANALYTICS_INCLUDE_CONTENT + ini: + - section: callback_loganalytics + key: include_content + include_task_args: + description: Send the task args to the Azure Log Analytics workspace. + type: bool + default: false + env: + - name: ANSIBLE_LOGANALYTICS_INCLUDE_TASK_ARGS + ini: + - section: callback_loganalytics + key: include_task_args + stream_name: + description: The name of the stream used to send the logs to the Azure Log Analytics workspace. + type: str + required: true + env: + - name: ANSIBLE_LOGANALYTICS_STREAM_NAME + ini: + - section: callback_loganalytics + key: stream_name + tenant_id: + description: Tenant ID for the Azure Active Directory. + type: str + required: true + env: + - name: ANSIBLE_LOGANALYTICS_TENANT_ID + ini: + - section: callback_loganalytics + key: tenant_id + timeout: + description: Timeout for the HTTP requests to the Azure Log Analytics API. + type: int + default: 2 + env: + - name: ANSIBLE_LOGANALYTICS_TIMEOUT + ini: + - section: callback_loganalytics + key: timeout +seealso: + - name: Logs Ingestion API + description: Overview of Logs Ingestion API in Azure Monitor + link: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview +notes: + - Triple verbosity logging (C(-vvv)) can be used to generate JSON sample data for creating the table schema in Azure Log Analytics. + Search for the string C(Event Data:) in the output in order to locate the data sample. +""" + +EXAMPLES = """ +examples: | + Enable the plugin in ansible.cfg: + [defaults] + callback_enabled = community.general.loganalytics_ingestion + Set the environment variables: + export ANSIBLE_LOGANALYTICS_DCE_URL=https://my-dce.ingest.monitor.azure.com + export ANSIBLE_LOGANALYTICS_DCR_ID=dcr-xxxxxx + export ANSIBLE_LOGANALYTICS_CLIENT_ID=xxxxxxxx + export ANSIBLE_LOGANALYTICS_CLIENT_SECRET=xxxxxxxx + export ANSIBLE_LOGANALYTICS_TENANT_ID=xxxxxxxx + export ANSIBLE_LOGANALYTICS_STREAM_NAME=Custom-MyTable +""" + +import getpass +import json +import socket +import uuid +from datetime import datetime, timedelta, timezone +from os.path import basename +from urllib.parse import urlencode + +from ansible.module_utils.urls import open_url +from ansible.plugins.callback import CallbackBase +from ansible.utils.display import Display + +display = Display() + + +class AzureLogAnalyticsIngestionSource: + def __init__( + self, + dce_url, + dcr_id, + disable_attempts, + disable_on_failure, + client_id, + client_secret, + tenant_id, + stream_name, + include_task_args, + include_content, + timeout, + fqcn, + ): + self.dce_url = dce_url + self.dcr_id = dcr_id + self.disabled = False + self.disable_attempts = disable_attempts + self.disable_on_failure = disable_on_failure + self.client_id = client_id + self.client_secret = client_secret + self.failures = 0 + self.tenant_id = tenant_id + self.stream_name = stream_name + self.include_task_args = include_task_args + self.include_content = include_content + self.token_expiration_time = None + self.session = str(uuid.uuid4()) + self.host = socket.gethostname() + self.user = getpass.getuser() + self.timeout = timeout + self.fqcn = fqcn + + self.bearer_token = self.get_bearer_token() + + # OAuth2 authentication method to get a Bearer token + # This replaces the shared_key authentication mechanism + def get_bearer_token(self): + url = f"https://login.microsoftonline.com/{self.tenant_id}/oauth2/v2.0/token" + headers = {"Content-Type": "application/x-www-form-urlencoded"} + data = urlencode( + { + "grant_type": "client_credentials", + "client_id": self.client_id, + "client_secret": self.client_secret, + # The scope value comes from https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview#headers + # and https://learn.microsoft.com/en-us/entra/identity-platform/scopes-oidc#the-default-scope + "scope": "https://monitor.azure.com/.default", + } + ) + response = open_url(url, data=data, force=True, headers=headers, method="POST", timeout=self.timeout) + j = json.loads(response.read().decode("utf-8")) + self.token_expiration_time = datetime.now() + timedelta(seconds=j.get("expires_in")) + return j.get("access_token") + + def is_token_valid(self): + return datetime.now() + timedelta(seconds=10) < self.token_expiration_time + + # Method to send event data to the Azure Logs Ingestion API + # This replaces the legacy API call and now uses the Logs Ingestion API endpoint + def send_event(self, event_data): + if not self.is_token_valid(): + self.bearer_token = self.get_bearer_token() + ingestion_url = ( + f"{self.dce_url}/dataCollectionRules/{self.dcr_id}/streams/{self.stream_name}?api-version=2023-01-01" + ) + headers = {"Authorization": f"Bearer {self.bearer_token}", "Content-Type": "application/json"} + open_url(ingestion_url, data=json.dumps(event_data), headers=headers, method="POST", timeout=self.timeout) + + def _rfc1123date(self): + return datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S GMT") + + # This method wraps the private method with the appropriate error handling. + def send_to_loganalytics(self, playbook_name, result, state): + if self.disabled: + return + try: + self._send_to_loganalytics(playbook_name, result, state) + except Exception as e: + display.warning(f"{self.fqcn} callback plugin failure: {e}.") + if self.disable_on_failure: + self.failures += 1 + if self.failures >= self.disable_attempts: + display.warning( + f"{self.fqcn} callback plugin failures exceed maximum of '{self.disable_attempts}'! Disabling plugin!" + ) + self.disabled = True + else: + display.v(f"{self.fqcn} callback plugin failure {self.failures}/{self.disable_attempts}") + + def _send_to_loganalytics(self, playbook_name, result, state): + ansible_role = str(result._task._role) if result._task._role else None + + # Include/Exclude task args + if not self.include_task_args: + result._task_fields.pop("args", None) + + # Include/Exclude content + if not self.include_content: + result._result.pop("content", None) + + # Build the event data + event_data = [ + { + "TimeGenerated": self._rfc1123date(), + "Host": result._host.name, + "User": self.user, + "Playbook": playbook_name, + "Role": ansible_role, + "TaskName": result._task.get_name(), + "Task": result._task_fields, + "Action": result._task_fields["action"], + "State": state, + "Result": result._result, + "Session": self.session, + } + ] + + # The data displayed here can be used as a sample file in order to create the table's schema. + display.vvv(f"Event Data: {json.dumps(event_data)}") + + self.send_event(event_data) + + +class CallbackModule(CallbackBase): + CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = "notification" + CALLBACK_NAME = "loganalytics_ingestion" + CALLBACK_NEEDS_ENABLED = True + + def __init__(self, display=None): + super().__init__(display=display) + self.start_datetimes = {} + self.playbook_name = None + self.azure_loganalytics = None + self.fqcn = f"community.general.{self.CALLBACK_NAME}" + + def set_options(self, task_keys=None, var_options=None, direct=None): + super().set_options(task_keys=task_keys, var_options=var_options, direct=direct) + + # Set options for the new Azure Logs Ingestion API configuration + self.client_id = self.get_option("client_id") + self.client_secret = self.get_option("client_secret") + self.dce_url = self.get_option("dce_url") + self.dcr_id = self.get_option("dcr_id") + self.disable_attempts = self.get_option("disable_attempts") + self.disable_on_failure = self.get_option("disable_on_failure") + self.include_content = self.get_option("include_content") + self.include_task_args = self.get_option("include_task_args") + self.stream_name = self.get_option("stream_name") + self.tenant_id = self.get_option("tenant_id") + self.timeout = self.get_option("timeout") + + # Initialize the AzureLogAnalyticsIngestionSource with the new settings + self.azure_loganalytics = AzureLogAnalyticsIngestionSource( + self.dce_url, + self.dcr_id, + self.disable_attempts, + self.disable_on_failure, + self.client_id, + self.client_secret, + self.tenant_id, + self.stream_name, + self.include_task_args, + self.include_content, + self.timeout, + self.fqcn, + ) + + def v2_playbook_on_start(self, playbook): + self.playbook_name = basename(playbook._file_name) + + # Build event data and send it to the Logs Ingestion API + def v2_runner_on_failed(self, result, **kwargs): + self.azure_loganalytics.send_to_loganalytics(self.playbook_name, result, "FAILED") + + def v2_runner_on_ok(self, result, **kwargs): + self.azure_loganalytics.send_to_loganalytics(self.playbook_name, result, "OK") diff --git a/tests/unit/plugins/callback/test_loganalytics_ingestion.py b/tests/unit/plugins/callback/test_loganalytics_ingestion.py new file mode 100644 index 0000000000..9b7cbcf80a --- /dev/null +++ b/tests/unit/plugins/callback/test_loganalytics_ingestion.py @@ -0,0 +1,82 @@ +# Copyright (c) Ansible project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +import json +import time +import unittest +import unittest.mock +import urllib + +from ansible.executor.task_result import TaskResult + +from ansible_collections.community.general.plugins.callback.loganalytics_ingestion import ( + AzureLogAnalyticsIngestionSource, +) + + +class TestAzureLogAnalyticsIngestion(unittest.TestCase): + dce_url = "https://fake.dce_url.ansible.com" + dcr_id = "fake-dcr-id" + client_id = "fake-client_id" + client_secret = "fake-client-secret" + tenant_id = "fake-tenant-id" + stream_name = "fake-stream-name" + fake_access_token = None + + def setUp(self): + self.fake_access_token = json.dumps( + {"expires_in": time.time() + 3600, "access_token": "fake_access_token"} + ).encode("utf-8") + + @unittest.mock.patch( + "ansible_collections.community.general.plugins.callback.loganalytics_ingestion.open_url", autospec=True + ) + @unittest.mock.patch("ansible.executor.task_result.TaskResult") + def test_sending_data(self, task_result_mock, open_url_mock): + """ + Tests sending data by verifying that the expected POST requests are submitted to the expected hosts. + """ + # The data returned from 'open_url' is only ever read during authentication. + open_url_mock.return_value.read.return_value = self.fake_access_token + + # TODO: How to set plugin default arguments? + # I tried instantiating the 'CallbackModule' but all it ever did was complain that 'client_id' wasn't defined. + self.loganalytics = AzureLogAnalyticsIngestionSource( + self.dce_url, + self.dcr_id, + 3, + True, + self.client_id, + self.client_secret, + self.tenant_id, + self.stream_name, + False, + False, + 2, + "community.general.loganalytics_ingestion", + ) + + assert open_url_mock.call_count == 1 + url = urllib.parse.urlparse(open_url_mock.call_args_list[0][0][0]) + assert url.netloc == "login.microsoftonline.com" + + results = ["foo", "bar", "biz"] + for i, result in enumerate(results, start=1): + host_mock = unittest.mock.Mock("host_mock") + host_mock.name = "fake-name" + task_mock = unittest.mock.Mock("task_mock") + task_mock._role = "fake-role" + task_mock.get_name = lambda r=result: r + + task_result = TaskResult( + host=host_mock, task=task_mock, return_data={}, task_fields={"action": "fake-action", "args": {}} + ) + self.loganalytics.send_to_loganalytics("fake-playbook", task_result, "OK") + + assert open_url_mock.call_count == 1 + i + + url = urllib.parse.urlparse(open_url_mock.call_args_list[i][0][0]) + assert url.scheme + "://" + url.netloc == self.dce_url + + assert json.loads(open_url_mock.call_args_list[i].kwargs.get("data"))[0].get("TaskName") == result