Coverage for kube_notify/notifications/__init__.py: 88%
40 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-02-07 09:16 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-02-07 09:16 +0000
1import kube_notify
2from kube_notify.utils import logger, selectors
4from . import discord, gotify, mattermost
7def get_status_icon(event_type: str, fields: dict) -> str:
8 if event_type == "Warning":
9 return "⚠️"
10 if event_type == "Normal":
11 return "✅"
12 if event_type == "Error":
13 return "💥"
14 if fields.get("Status") == "Completed":
15 return "✅"
16 if fields.get("Status") == "InProgress":
17 return "⏳"
18 if fields.get("Status") == "PartiallyFailed":
19 return "⚠️"
20 if fields.get("Status") == "Failed":
21 return "❌"
22 if fields.get("Status") in ["New", "{}"] or event_type == "ADDED":
23 return "🆕"
24 return "🔔"
27async def handle_notify(
28 resource_name: str,
29 title: str,
30 description: str,
31 fields: dict,
32 event_info: kube_notify.EventInfo,
33 kube_notify_config: dict,
34 resource_type: str,
35 labels: dict,
36 namespace: str,
37 involved_object_kind: str = None,
38 reason: str = None,
39) -> None:
40 notifs = ["Skipping"]
41 status_icon = get_status_icon(resource_type, fields)
42 description = f"[{status_icon}] {description}"
43 if event_info[0].timestamp() > kube_notify.STARTUP_TIME.timestamp():
44 # Send notification
45 notifs = ["Excluded"]
46 for group_name, group in kube_notify_config.get("notifications", {}).items():
47 if selectors.check_selector(
48 resource_name,
49 group.get("resources"),
50 resource_type,
51 labels,
52 namespace,
53 involved_object_kind,
54 reason,
55 ):
56 notifs = []
57 if group_values := group.get("discord"):
58 notifs.append(f"{group_name}/discord")
59 discord.send_discord_webhook(
60 group_values["webhook"],
61 title,
62 description,
63 fields,
64 group_values.get("username"),
65 group_values.get("avatar_url"),
66 )
67 if group_values := group.get("gotify"):
68 notifs.append(f"{group_name}/gotify")
69 gotify.send_gotify_message(
70 group_values["url"],
71 group_values["token"],
72 title,
73 description,
74 fields,
75 )
76 if group_values := group.get("mattermost"):
77 notifs.append(f"{group_name}/mattermost")
78 mattermost.send_mattermost_message(
79 group_values["webhook"],
80 title,
81 description,
82 fields,
83 group_values.get("channel"),
84 group_values.get("username"),
85 group_values.get("icon_url"),
86 )
87 logger.logger.info(f"{event_info[0]} [{','.join(notifs)}] {description}")