Coverage for kube_notify/stream/core_stream.py: 98%

50 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-02-07 09:16 +0000

1import asyncio 

2import datetime 

3 

4from kubernetes_asyncio import client 

5 

6import kube_notify.notifications as notifs 

7from kube_notify.utils import logger 

8 

9from . import pod_terminations 

10 

11 

12async def core_stream(kube_notify_config: dict, iterate: bool = True) -> None: 

13 async with client.ApiClient() as api: 

14 last_event_info = None 

15 core_api = client.CoreV1Api(api) 

16 event_infos = set() 

17 first_loop = True 

18 while iterate or first_loop: 

19 first_loop = False 

20 try: 

21 logger.logger.debug("Strating New loop for core api") 

22 stream: client.CoreV1EventList = ( 

23 await core_api.list_event_for_all_namespaces(watch=False) 

24 ) 

25 stream_items = stream.items 

26 if kube_notify_config["events"]["coreApiEvents"].get( 

27 "addPodTerminationErrors" 

28 ): 

29 stream_items += ( 

30 await pod_terminations.generate_pod_termination_events(core_api) 

31 ) 

32 for obj in stream_items: 

33 event_type = str(obj.type) 

34 resource_name = str(obj.metadata.name) 

35 resource_kind = str(obj.kind or "Event") 

36 last_timestamp = datetime.datetime.fromisoformat( 

37 (obj.last_timestamp or obj.event_time or obj.creation_timestamp) 

38 .replace(tzinfo=None) 

39 .isoformat() 

40 ) 

41 message = str(obj.message) 

42 reason = str(obj.reason) 

43 involved_object_kind = str(obj.involved_object.kind) 

44 involved_object_name = str(obj.involved_object.name) 

45 involved_object_namespace = str(obj.involved_object.namespace) 

46 

47 title = f"{event_type} {resource_kind}" 

48 description = ( 

49 f"{involved_object_kind} {involved_object_name} {reason}." 

50 ) 

51 fields = { 

52 "Message": message, 

53 "Reason": reason, 

54 "Type": event_type, 

55 "Object kind": involved_object_kind, 

56 "Object name": involved_object_name, 

57 "Timestamp": last_timestamp.isoformat(), 

58 "Namespace": involved_object_namespace, 

59 } 

60 event_info = ( 

61 last_timestamp, 

62 involved_object_namespace, 

63 event_type, 

64 involved_object_kind, 

65 involved_object_name, 

66 reason, 

67 resource_name, 

68 message, 

69 ) 

70 labels = dict(obj.metadata.labels or {}) 

71 if event_info in event_infos or last_event_info == event_info: 

72 continue 

73 elif involved_object_kind == "Pod": 

74 try: 

75 pod = await core_api.read_namespaced_pod( 

76 involved_object_name, involved_object_namespace 

77 ) 

78 labels.update(dict(pod.metadata.labels or {})) 

79 fields.update({"Node": pod.spec.node_name}) 

80 except Exception: 

81 pass 

82 event_infos.add(event_info) 

83 last_event_info = event_info 

84 await notifs.handle_notify( 

85 "coreApiEvents", 

86 title, 

87 description, 

88 fields, 

89 event_info, 

90 kube_notify_config, 

91 event_type, 

92 labels, 

93 involved_object_namespace, 

94 involved_object_kind, 

95 reason, 

96 ) 

97 await asyncio.sleep(0) 

98 del stream 

99 except Exception as e: # pragma: no cover 

100 logger.logger.error(f"{type(e).__name__}: {e}") 

101 if not iterate: 

102 raise e 

103 iterate and await asyncio.sleep( 

104 kube_notify_config["events"]["coreApiEvents"].get( 

105 "pollingIntervalSeconds", 60 

106 ) 

107 )