Coverage for kube_notify/stream/crds_stream.py: 48%
46 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-02-07 09:16 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-02-07 09:16 +0000
1import asyncio
2import datetime
4from kubernetes_asyncio import client
6import kube_notify.notifications as notifs
7from kube_notify.utils import logger, misc
10async def crds_stream(
11 crd: dict, namespace: str, kube_notify_config: dict, iterate: bool = True
12) -> None:
13 async with client.ApiClient() as api:
14 last_event_info = None
15 # Watch Velero Backups
16 crd_group, crd_version, crd_plural = crd.get("type").split("/")
17 crds_api = client.CustomObjectsApi(api)
18 event_infos = set()
19 first_loop = True
20 while iterate or first_loop:
21 first_loop = False
22 try:
23 logger.logger.debug(f"Strating New loop for crds {crd.get('type')}")
24 stream = None
25 if namespace:
26 stream = await crds_api.list_namespaced_custom_object(
27 crd_group, crd_version, namespace, crd_plural, watch=False
28 )
29 else:
30 stream = await crds_api.list_cluster_custom_object(
31 crd_group, crd_version, crd_plural, watch=False
32 )
33 for obj in stream["items"]:
34 resource_name = str(obj["metadata"]["name"])
35 resource_namespace = str(obj["metadata"]["namespace"])
36 resource_kind = str(obj["kind"])
37 resource_apiversion = str(obj["apiVersion"])
38 creation_timestamp = datetime.datetime.strptime(
39 obj["metadata"]["creationTimestamp"], "%Y-%m-%dT%H:%M:%SZ"
40 )
42 # add fields to Message
43 fields = misc.add_fields_to_the_message(obj, crd)
44 last_timestamp = misc.process_last_timestamp(obj, crd)
45 fields["Timestamp"] = last_timestamp.isoformat()
46 fields["Namespace"] = resource_namespace
47 event_type = (
48 "ADDED" if creation_timestamp == last_timestamp else "UPDATED"
49 )
50 title = f"{resource_apiversion} {resource_kind} {event_type}"
51 description = f"{resource_apiversion} {resource_kind} {resource_name} {event_type}."
52 event_info = (
53 last_timestamp,
54 resource_apiversion,
55 resource_kind,
56 resource_name,
57 resource_namespace,
58 )
59 if event_info in event_infos or last_event_info == event_info:
60 continue
61 event_infos.add(event_info)
62 last_event_info = event_info
63 # logger.logger.info(event)
64 await notifs.handle_notify(
65 "customResources",
66 title,
67 description,
68 fields,
69 event_info,
70 kube_notify_config,
71 crd.get("type"),
72 dict(obj["metadata"].get("labels", {})),
73 resource_namespace,
74 )
75 await asyncio.sleep(0)
76 del stream
77 except Exception as e:
78 logger.logger.error(f"{type(e).__name__}: {e}")
79 if not iterate:
80 raise e
81 iterate and await asyncio.sleep(crd.get("pollingIntervalSeconds", 60))