Coverage for kube_notify/stream/pod_terminations.py: 100%
35 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-02-07 09:16 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-02-07 09:16 +0000
1from kubernetes_asyncio import client
4def find_container_restart_policy(
5 containers: list[client.V1Container], container_name: str, pod_name: str
6) -> str:
7 for container in containers:
8 if container.name == container_name:
9 return container.restart_policy
10 # We should never be in the following situation :
11 raise KeyError(
12 f"{container_name} not found in containers for pod {pod_name}"
13 ) # pragma: no cover
16def get_container_state(restart_policy: str | None, exit_code: int) -> tuple[bool, str]:
17 if restart_policy in ["Always", None] or (
18 restart_policy == "OnFailure" and exit_code != 0
19 ):
20 return True, "restarted"
21 if restart_policy == "Never" and exit_code != 0:
22 return True, "crashed"
23 return False, "" # Completed or Running
26async def generate_pod_termination_events(
27 api: client.CoreV1Api,
28) -> list[client.CoreV1Event]:
29 events = []
30 pods: client.V1PodList = await api.list_pod_for_all_namespaces()
31 for pod in pods.items:
32 containers = (
33 (pod.spec.containers or [])
34 + (pod.spec.init_containers or [])
35 + (pod.spec.ephemeral_containers or [])
36 )
37 for container_status in (
38 (pod.status.container_statuses or [])
39 + (pod.status.ephemeral_container_statuses or [])
40 + (pod.status.init_container_statuses or [])
41 ):
42 namespace = pod.metadata.namespace
43 pod_name = pod.metadata.name
44 restart_count = container_status.restart_count
45 container_name = container_status.name
46 status = container_status.last_state
47 restart_policy = find_container_restart_policy(
48 containers, container_name, pod_name
49 )
50 if status.terminated is None:
51 continue
52 exit_code = int(status.terminated.exit_code)
53 is_error, state = get_container_state(restart_policy, exit_code)
54 # see doc here
55 # https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1ContainerStateTerminated.md
56 if is_error:
57 reason = status.terminated.reason
58 timestamp = status.terminated.finished_at
59 message = (
60 f"{reason}({exit_code}): Container {container_name} {state} "
61 f"(restartCount: {restart_count})"
62 )
63 if status.terminated.message:
64 message += f"{', message :\n' + status.terminated.message}"
65 events.append(
66 client.CoreV1Event(
67 kind="Event",
68 type="Error",
69 metadata=client.V1ObjectMeta(
70 name=f"{pod_name}.{container_name}.{restart_count}",
71 namespace=namespace,
72 ),
73 reason=reason,
74 last_timestamp=timestamp,
75 message=message,
76 involved_object=client.V1ObjectReference(
77 kind="Pod", name=pod_name, namespace=namespace
78 ),
79 )
80 )
81 return events