diff --git a/changelogs/fragments/11434-opentelemetry-spans.yml b/changelogs/fragments/11434-opentelemetry-spans.yml new file mode 100644 index 0000000000..a6a3f54fb3 --- /dev/null +++ b/changelogs/fragments/11434-opentelemetry-spans.yml @@ -0,0 +1,2 @@ +bugfixes: + - opentelemetry callback plugin - set span start to the actual start time of the task for the given host instead of the task start time for the first host of that task (https://github.com/ansible-collections/community.general/pull/11434). diff --git a/plugins/callback/opentelemetry.py b/plugins/callback/opentelemetry.py index 990006f42d..12113f26c3 100644 --- a/plugins/callback/opentelemetry.py +++ b/plugins/callback/opentelemetry.py @@ -185,6 +185,7 @@ class TaskData: # concatenate task include output from multiple items host.result = f"{self.host_data[host.uuid].result}\n{host.result}" else: + self.host_data[host.uuid].update(host.status, host.result) return self.host_data[host.uuid] = host @@ -195,9 +196,15 @@ class HostData: Data about an individual host. """ - def __init__(self, uuid, name, status, result): + def __init__(self, uuid, name, status, result=None): self.uuid = uuid self.name = name + self.status = status + self.result = result + self.finish = None + self.start = time_ns() + + def update(self, status, result): self.status = status self.result = result self.finish = time_ns() @@ -221,12 +228,13 @@ class OpenTelemetrySource: carrier["traceparent"] = traceparent return TraceContextTextMapPropagator().extract(carrier=carrier) - def start_task(self, tasks_data, hide_task_arguments, play_name, task): + def start_task(self, tasks_data, hide_task_arguments, play_name, task, host): """record the start of a task for one or more hosts""" uuid = task._uuid if uuid in tasks_data: + tasks_data[uuid].add_host(HostData(host._uuid, host.name, "started")) return name = task.get_name().strip() @@ -238,6 +246,7 @@ class OpenTelemetrySource: args = task.args tasks_data[uuid] = TaskData(uuid, name, path, play_name, action, args) + tasks_data[uuid].add_host(HostData(host._uuid, host.name, "started")) def finish_task(self, tasks_data, status, result, dump): """record the results of a task for a single host""" @@ -310,7 +319,8 @@ class OpenTelemetrySource: parent.set_attribute("ansible.host.user", self.user) for task in tasks: for host_data in task.host_data.values(): - with tracer.start_as_current_span(task.name, start_time=task.start, end_on_exit=False) as span: + start = host_data.start or task.start + with tracer.start_as_current_span(task.name, start_time=start, end_on_exit=False) as span: self.update_span_data(task, host_data, span, disable_logs, disable_attributes_in_logs) return otel_exporter @@ -327,13 +337,13 @@ class OpenTelemetrySource: if host_data.status != "included": # Support loops enriched_error_message = None - if "results" in host_data.result._result: + if host_data.result and "results" in host_data.result._result: if host_data.status == "failed": message = self.get_error_message_from_results(host_data.result._result["results"], task_data.action) enriched_error_message = self.enrich_error_message_from_results( host_data.result._result["results"], task_data.action ) - else: + elif host_data.result: res = host_data.result._result rc = res.get("rc", 0) if host_data.status == "failed": @@ -559,17 +569,8 @@ class CallbackModule(CallbackBase): def v2_playbook_on_play_start(self, play): self.play_name = play.get_name() - def v2_runner_on_no_hosts(self, task): - self.opentelemetry.start_task(self.tasks_data, self.hide_task_arguments, self.play_name, task) - - def v2_playbook_on_task_start(self, task, is_conditional): - self.opentelemetry.start_task(self.tasks_data, self.hide_task_arguments, self.play_name, task) - - def v2_playbook_on_cleanup_task_start(self, task): - self.opentelemetry.start_task(self.tasks_data, self.hide_task_arguments, self.play_name, task) - - def v2_playbook_on_handler_task_start(self, task): - self.opentelemetry.start_task(self.tasks_data, self.hide_task_arguments, self.play_name, task) + def v2_runner_on_start(self, host, task): + self.opentelemetry.start_task(self.tasks_data, self.hide_task_arguments, self.play_name, task, host) def v2_runner_on_failed(self, result, ignore_errors=False): if ignore_errors: @@ -592,6 +593,12 @@ class CallbackModule(CallbackBase): self.tasks_data, "skipped", result, self.dump_results(self.tasks_data[result._task._uuid], result) ) + def v2_runner_on_unreachable(self, result): + self.errors += 1 + self.opentelemetry.finish_task( + self.tasks_data, "failed", result, self.dump_results(self.tasks_data[result._task._uuid], result) + ) + def v2_playbook_on_include(self, included_file): self.opentelemetry.finish_task(self.tasks_data, "included", included_file, "") diff --git a/tests/unit/plugins/callback/test_opentelemetry.py b/tests/unit/plugins/callback/test_opentelemetry.py index 9fb566ef88..5618e6d313 100644 --- a/tests/unit/plugins/callback/test_opentelemetry.py +++ b/tests/unit/plugins/callback/test_opentelemetry.py @@ -11,7 +11,7 @@ from unittest.mock import MagicMock, Mock, patch from ansible.executor.task_result import TaskResult from ansible.playbook.task import Task -from ansible_collections.community.general.plugins.callback.opentelemetry import OpenTelemetrySource, TaskData +from ansible_collections.community.general.plugins.callback.opentelemetry import HostData, OpenTelemetrySource, TaskData class TestOpentelemetry(unittest.TestCase): @@ -36,11 +36,15 @@ class TestOpentelemetry(unittest.TestCase): self.my_task_result = TaskResult( host=self.mock_host, task=self.mock_task, return_data={}, task_fields=self.task_fields ) + self.mock_span = Mock("MockSpan") + self.mock_span.set_status = MagicMock() + self.mock_span.set_attributes = MagicMock() + self.mock_span.end = MagicMock() - def test_start_task(self): + def test_run_task_with_host(self): tasks_data = OrderedDict() - self.opentelemetry.start_task(tasks_data, False, "myplay", self.mock_task) + self.opentelemetry.start_task(tasks_data, False, "myplay", self.mock_task, self.mock_host) task_data = tasks_data["myuuid"] self.assertEqual(task_data.uuid, "myuuid") @@ -50,6 +54,14 @@ class TestOpentelemetry(unittest.TestCase): self.assertEqual(task_data.action, "myaction") self.assertEqual(task_data.args, {}) + host_data = task_data.host_data["myhost_uuid"] + self.assertEqual(host_data.uuid, "myhost_uuid") + self.assertEqual(host_data.name, "myhost") + self.assertIsNotNone(host_data.start) + + self.opentelemetry.finish_task(tasks_data, "ok", self.my_task_result, "") + self.assertEqual(host_data.status, "ok") + def test_finish_task_with_a_host_match(self): tasks_data = OrderedDict() tasks_data["myuuid"] = self.my_task @@ -75,6 +87,13 @@ class TestOpentelemetry(unittest.TestCase): self.assertEqual(host_data.name, "include") self.assertEqual(host_data.status, "ok") + @patch("ansible_collections.community.general.plugins.callback.opentelemetry.Status", create=True) + @patch("ansible_collections.community.general.plugins.callback.opentelemetry.StatusCode", create=True) + def test_update_span_data(self, mock_status_code, mock_status): + unfinished_host = HostData("myhost_uuid", "myhost", "unreachable") + self.opentelemetry.update_span_data(self.mock_task, unfinished_host, self.mock_span, True, True) + self.mock_span.end.assert_called() + def test_get_error_message(self): test_cases = ( ("my-exception", "my-msg", None, "my-exception"),