Coverage for kube_notify/utils/misc.py: 35%

23 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-02-07 09:16 +0000

1import datetime 

2 

3import yaml 

4 

5 

6def process_last_timestamp(obj: dict, crd: dict) -> datetime.datetime: 

7 # Process last timestamp for event 

8 # can be calculated using different fields if available (by order of priority) 

9 for yaml_path in crd.get("lastTimestamp").split("|"): 

10 value = obj.copy() 

11 for key in yaml_path.strip(" ").split("."): 

12 value = value.get(key, {}) 

13 if value: 

14 last_timestamp = datetime.datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ") 

15 break 

16 return last_timestamp 

17 

18 

19def add_fields_to_the_message(obj: dict, crd: dict) -> dict[str, str]: 

20 # add fields to Message 

21 fields = {} 

22 for key, yaml_path in crd.get("includeFields", {}).items(): 

23 field = obj.copy() 

24 for yaml_key in yaml_path.split("."): 

25 field = field.get(yaml_key, {}) 

26 fields[key] = str(field) 

27 return fields 

28 

29 

30def load_kube_notify_config(config_path: str) -> dict: 

31 with open(config_path) as f: 

32 kube_notify_config = yaml.safe_load(f) 

33 return kube_notify_config