Coverage for kube_notify/utils/misc.py: 35%
23 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-02-07 09:16 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-02-07 09:16 +0000
1import datetime
3import yaml
6def process_last_timestamp(obj: dict, crd: dict) -> datetime.datetime:
7 # Process last timestamp for event
8 # can be calculated using different fields if available (by order of priority)
9 for yaml_path in crd.get("lastTimestamp").split("|"):
10 value = obj.copy()
11 for key in yaml_path.strip(" ").split("."):
12 value = value.get(key, {})
13 if value:
14 last_timestamp = datetime.datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ")
15 break
16 return last_timestamp
19def add_fields_to_the_message(obj: dict, crd: dict) -> dict[str, str]:
20 # add fields to Message
21 fields = {}
22 for key, yaml_path in crd.get("includeFields", {}).items():
23 field = obj.copy()
24 for yaml_key in yaml_path.split("."):
25 field = field.get(yaml_key, {})
26 fields[key] = str(field)
27 return fields
30def load_kube_notify_config(config_path: str) -> dict:
31 with open(config_path) as f:
32 kube_notify_config = yaml.safe_load(f)
33 return kube_notify_config