Coverage for kube_notify/stream/crds_stream.py: 48%

46 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-02-07 09:16 +0000

1import asyncio 

2import datetime 

3 

4from kubernetes_asyncio import client 

5 

6import kube_notify.notifications as notifs 

7from kube_notify.utils import logger, misc 

8 

9 

10async def crds_stream( 

11 crd: dict, namespace: str, kube_notify_config: dict, iterate: bool = True 

12) -> None: 

13 async with client.ApiClient() as api: 

14 last_event_info = None 

15 # Watch Velero Backups 

16 crd_group, crd_version, crd_plural = crd.get("type").split("/") 

17 crds_api = client.CustomObjectsApi(api) 

18 event_infos = set() 

19 first_loop = True 

20 while iterate or first_loop: 

21 first_loop = False 

22 try: 

23 logger.logger.debug(f"Strating New loop for crds {crd.get('type')}") 

24 stream = None 

25 if namespace: 

26 stream = await crds_api.list_namespaced_custom_object( 

27 crd_group, crd_version, namespace, crd_plural, watch=False 

28 ) 

29 else: 

30 stream = await crds_api.list_cluster_custom_object( 

31 crd_group, crd_version, crd_plural, watch=False 

32 ) 

33 for obj in stream["items"]: 

34 resource_name = str(obj["metadata"]["name"]) 

35 resource_namespace = str(obj["metadata"]["namespace"]) 

36 resource_kind = str(obj["kind"]) 

37 resource_apiversion = str(obj["apiVersion"]) 

38 creation_timestamp = datetime.datetime.strptime( 

39 obj["metadata"]["creationTimestamp"], "%Y-%m-%dT%H:%M:%SZ" 

40 ) 

41 

42 # add fields to Message 

43 fields = misc.add_fields_to_the_message(obj, crd) 

44 last_timestamp = misc.process_last_timestamp(obj, crd) 

45 fields["Timestamp"] = last_timestamp.isoformat() 

46 fields["Namespace"] = resource_namespace 

47 event_type = ( 

48 "ADDED" if creation_timestamp == last_timestamp else "UPDATED" 

49 ) 

50 title = f"{resource_apiversion} {resource_kind} {event_type}" 

51 description = f"{resource_apiversion} {resource_kind} {resource_name} {event_type}." 

52 event_info = ( 

53 last_timestamp, 

54 resource_apiversion, 

55 resource_kind, 

56 resource_name, 

57 resource_namespace, 

58 ) 

59 if event_info in event_infos or last_event_info == event_info: 

60 continue 

61 event_infos.add(event_info) 

62 last_event_info = event_info 

63 # logger.logger.info(event) 

64 await notifs.handle_notify( 

65 "customResources", 

66 title, 

67 description, 

68 fields, 

69 event_info, 

70 kube_notify_config, 

71 crd.get("type"), 

72 dict(obj["metadata"].get("labels", {})), 

73 resource_namespace, 

74 ) 

75 await asyncio.sleep(0) 

76 del stream 

77 except Exception as e: 

78 logger.logger.error(f"{type(e).__name__}: {e}") 

79 if not iterate: 

80 raise e 

81 iterate and await asyncio.sleep(crd.get("pollingIntervalSeconds", 60))