Coverage for kube_notify/stream/pod_terminations.py: 100%

35 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-02-07 09:16 +0000

1from kubernetes_asyncio import client 

2 

3 

4def find_container_restart_policy( 

5 containers: list[client.V1Container], container_name: str, pod_name: str 

6) -> str: 

7 for container in containers: 

8 if container.name == container_name: 

9 return container.restart_policy 

10 # We should never be in the following situation : 

11 raise KeyError( 

12 f"{container_name} not found in containers for pod {pod_name}" 

13 ) # pragma: no cover 

14 

15 

16def get_container_state(restart_policy: str | None, exit_code: int) -> tuple[bool, str]: 

17 if restart_policy in ["Always", None] or ( 

18 restart_policy == "OnFailure" and exit_code != 0 

19 ): 

20 return True, "restarted" 

21 if restart_policy == "Never" and exit_code != 0: 

22 return True, "crashed" 

23 return False, "" # Completed or Running 

24 

25 

26async def generate_pod_termination_events( 

27 api: client.CoreV1Api, 

28) -> list[client.CoreV1Event]: 

29 events = [] 

30 pods: client.V1PodList = await api.list_pod_for_all_namespaces() 

31 for pod in pods.items: 

32 containers = ( 

33 (pod.spec.containers or []) 

34 + (pod.spec.init_containers or []) 

35 + (pod.spec.ephemeral_containers or []) 

36 ) 

37 for container_status in ( 

38 (pod.status.container_statuses or []) 

39 + (pod.status.ephemeral_container_statuses or []) 

40 + (pod.status.init_container_statuses or []) 

41 ): 

42 namespace = pod.metadata.namespace 

43 pod_name = pod.metadata.name 

44 restart_count = container_status.restart_count 

45 container_name = container_status.name 

46 status = container_status.last_state 

47 restart_policy = find_container_restart_policy( 

48 containers, container_name, pod_name 

49 ) 

50 if status.terminated is None: 

51 continue 

52 exit_code = int(status.terminated.exit_code) 

53 is_error, state = get_container_state(restart_policy, exit_code) 

54 # see doc here 

55 # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ContainerStateTerminated.md 

56 if is_error: 

57 reason = status.terminated.reason 

58 timestamp = status.terminated.finished_at 

59 message = ( 

60 f"{reason}({exit_code}): Container {container_name} {state} " 

61 f"(restartCount: {restart_count})" 

62 ) 

63 if status.terminated.message: 

64 message += f"{', message :\n' + status.terminated.message}" 

65 events.append( 

66 client.CoreV1Event( 

67 kind="Event", 

68 type="Error", 

69 metadata=client.V1ObjectMeta( 

70 name=f"{pod_name}.{container_name}.{restart_count}", 

71 namespace=namespace, 

72 ), 

73 reason=reason, 

74 last_timestamp=timestamp, 

75 message=message, 

76 involved_object=client.V1ObjectReference( 

77 kind="Pod", name=pod_name, namespace=namespace 

78 ), 

79 ) 

80 ) 

81 return events