Coverage for kube_notify/app.py: 93%
27 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-02-07 09:16 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-02-07 09:16 +0000
1import asyncio
2import os
4from kubernetes_asyncio import config
6import kube_notify
7from kube_notify.stream import core_stream, crds_stream
8from kube_notify.utils import logger, misc
11def start_kube_notify_loop(
12 kube_notify_config: dict,
13 in_cluster: bool = True,
14 context: str | None = None,
15 iterate: bool = True,
16) -> None:
17 ioloop = asyncio.new_event_loop()
18 asyncio.set_event_loop(ioloop)
19 # Initialize Kubernetes client
20 if in_cluster: # pragma: no cover
21 config.load_incluster_config()
22 else:
23 ioloop.run_until_complete(config.load_kube_config(context=context))
24 logger.logger.info(
25 f"Starting kube-notify {kube_notify.__version__} at {kube_notify.STARTUP_TIME}"
26 )
27 logger.logger.info(f"PYTHONUNBUFFERED={os.environ['PYTHONUNBUFFERED']}")
28 tasks = []
29 if kube_notify_config["events"].get("coreApiEvents").get("enabled"):
30 logger.logger.info("Creating watcher for coreApiEvents")
31 tasks.append(
32 asyncio.ensure_future(core_stream.core_stream(kube_notify_config, iterate))
33 )
35 for index, crd in enumerate(
36 kube_notify_config.get("events").get("customResources", [])
37 ):
38 # loop to watch crds
39 if crd.get("type"):
40 for namespace in crd.get("namespaces", [None]):
41 logger.logger.info(
42 f"Creating watcher for crd {crd.get('type')} {namespace}"
43 )
44 tasks.append(
45 asyncio.ensure_future(
46 crds_stream.crds_stream(
47 crd, namespace, kube_notify_config, iterate
48 )
49 )
50 )
51 else: # pragma: no cover
52 # if crd configuration is missing "type"
53 error = f"Couldn't get CRD type from 'customResources' at index {index}"
54 logger.logger.error(error)
55 raise ValueError(error)
56 try:
57 ioloop.run_until_complete(asyncio.wait(tasks))
58 except Exception as e:
59 logger.logger.error("[Error] Ignoring :" + e)
60 finally:
61 ioloop.run_until_complete(ioloop.shutdown_asyncgens())
62 ioloop.close()
65def main() -> None: # pragma: no cover
66 args = kube_notify.parser.parse_args()
67 kube_notify_config = misc.load_kube_notify_config(args.config)
68 start_kube_notify_loop(kube_notify_config, args.inCluster, args.context)
71if __name__ == "__main__": # pragma: no cover
72 main()