Coverage for kube_notify/app.py: 93%

27 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-02-07 09:16 +0000

1import asyncio 

2import os 

3 

4from kubernetes_asyncio import config 

5 

6import kube_notify 

7from kube_notify.stream import core_stream, crds_stream 

8from kube_notify.utils import logger, misc 

9 

10 

11def start_kube_notify_loop( 

12 kube_notify_config: dict, 

13 in_cluster: bool = True, 

14 context: str | None = None, 

15 iterate: bool = True, 

16) -> None: 

17 ioloop = asyncio.new_event_loop() 

18 asyncio.set_event_loop(ioloop) 

19 # Initialize Kubernetes client 

20 if in_cluster: # pragma: no cover 

21 config.load_incluster_config() 

22 else: 

23 ioloop.run_until_complete(config.load_kube_config(context=context)) 

24 logger.logger.info( 

25 f"Starting kube-notify {kube_notify.__version__} at {kube_notify.STARTUP_TIME}" 

26 ) 

27 logger.logger.info(f"PYTHONUNBUFFERED={os.environ['PYTHONUNBUFFERED']}") 

28 tasks = [] 

29 if kube_notify_config["events"].get("coreApiEvents").get("enabled"): 

30 logger.logger.info("Creating watcher for coreApiEvents") 

31 tasks.append( 

32 asyncio.ensure_future(core_stream.core_stream(kube_notify_config, iterate)) 

33 ) 

34 

35 for index, crd in enumerate( 

36 kube_notify_config.get("events").get("customResources", []) 

37 ): 

38 # loop to watch crds 

39 if crd.get("type"): 

40 for namespace in crd.get("namespaces", [None]): 

41 logger.logger.info( 

42 f"Creating watcher for crd {crd.get('type')} {namespace}" 

43 ) 

44 tasks.append( 

45 asyncio.ensure_future( 

46 crds_stream.crds_stream( 

47 crd, namespace, kube_notify_config, iterate 

48 ) 

49 ) 

50 ) 

51 else: # pragma: no cover 

52 # if crd configuration is missing "type" 

53 error = f"Couldn't get CRD type from 'customResources' at index {index}" 

54 logger.logger.error(error) 

55 raise ValueError(error) 

56 try: 

57 ioloop.run_until_complete(asyncio.wait(tasks)) 

58 except Exception as e: 

59 logger.logger.error("[Error] Ignoring :" + e) 

60 finally: 

61 ioloop.run_until_complete(ioloop.shutdown_asyncgens()) 

62 ioloop.close() 

63 

64 

65def main() -> None: # pragma: no cover 

66 args = kube_notify.parser.parse_args() 

67 kube_notify_config = misc.load_kube_notify_config(args.config) 

68 start_kube_notify_loop(kube_notify_config, args.inCluster, args.context) 

69 

70 

71if __name__ == "__main__": # pragma: no cover 

72 main()