Coverage for kube_notify/stream/core_stream.py: 98%
50 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-02-07 09:16 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-02-07 09:16 +0000
1import asyncio
2import datetime
4from kubernetes_asyncio import client
6import kube_notify.notifications as notifs
7from kube_notify.utils import logger
9from . import pod_terminations
12async def core_stream(kube_notify_config: dict, iterate: bool = True) -> None:
13 async with client.ApiClient() as api:
14 last_event_info = None
15 core_api = client.CoreV1Api(api)
16 event_infos = set()
17 first_loop = True
18 while iterate or first_loop:
19 first_loop = False
20 try:
21 logger.logger.debug("Strating New loop for core api")
22 stream: client.CoreV1EventList = (
23 await core_api.list_event_for_all_namespaces(watch=False)
24 )
25 stream_items = stream.items
26 if kube_notify_config["events"]["coreApiEvents"].get(
27 "addPodTerminationErrors"
28 ):
29 stream_items += (
30 await pod_terminations.generate_pod_termination_events(core_api)
31 )
32 for obj in stream_items:
33 event_type = str(obj.type)
34 resource_name = str(obj.metadata.name)
35 resource_kind = str(obj.kind or "Event")
36 last_timestamp = datetime.datetime.fromisoformat(
37 (obj.last_timestamp or obj.event_time or obj.creation_timestamp)
38 .replace(tzinfo=None)
39 .isoformat()
40 )
41 message = str(obj.message)
42 reason = str(obj.reason)
43 involved_object_kind = str(obj.involved_object.kind)
44 involved_object_name = str(obj.involved_object.name)
45 involved_object_namespace = str(obj.involved_object.namespace)
47 title = f"{event_type} {resource_kind}"
48 description = (
49 f"{involved_object_kind} {involved_object_name} {reason}."
50 )
51 fields = {
52 "Message": message,
53 "Reason": reason,
54 "Type": event_type,
55 "Object kind": involved_object_kind,
56 "Object name": involved_object_name,
57 "Timestamp": last_timestamp.isoformat(),
58 "Namespace": involved_object_namespace,
59 }
60 event_info = (
61 last_timestamp,
62 involved_object_namespace,
63 event_type,
64 involved_object_kind,
65 involved_object_name,
66 reason,
67 resource_name,
68 message,
69 )
70 labels = dict(obj.metadata.labels or {})
71 if event_info in event_infos or last_event_info == event_info:
72 continue
73 elif involved_object_kind == "Pod":
74 try:
75 pod = await core_api.read_namespaced_pod(
76 involved_object_name, involved_object_namespace
77 )
78 labels.update(dict(pod.metadata.labels or {}))
79 fields.update({"Node": pod.spec.node_name})
80 except Exception:
81 pass
82 event_infos.add(event_info)
83 last_event_info = event_info
84 await notifs.handle_notify(
85 "coreApiEvents",
86 title,
87 description,
88 fields,
89 event_info,
90 kube_notify_config,
91 event_type,
92 labels,
93 involved_object_namespace,
94 involved_object_kind,
95 reason,
96 )
97 await asyncio.sleep(0)
98 del stream
99 except Exception as e: # pragma: no cover
100 logger.logger.error(f"{type(e).__name__}: {e}")
101 if not iterate:
102 raise e
103 iterate and await asyncio.sleep(
104 kube_notify_config["events"]["coreApiEvents"].get(
105 "pollingIntervalSeconds", 60
106 )
107 )