Coverage for kube_notify/notifications/__init__.py: 88%

40 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-02-07 09:16 +0000

1import kube_notify 

2from kube_notify.utils import logger, selectors 

3 

4from . import discord, gotify, mattermost 

5 

6 

7def get_status_icon(event_type: str, fields: dict) -> str: 

8 if event_type == "Warning": 

9 return "⚠️" 

10 if event_type == "Normal": 

11 return "✅" 

12 if event_type == "Error": 

13 return "💥" 

14 if fields.get("Status") == "Completed": 

15 return "✅" 

16 if fields.get("Status") == "InProgress": 

17 return "⏳" 

18 if fields.get("Status") == "PartiallyFailed": 

19 return "⚠️" 

20 if fields.get("Status") == "Failed": 

21 return "❌" 

22 if fields.get("Status") in ["New", "{}"] or event_type == "ADDED": 

23 return "🆕" 

24 return "🔔" 

25 

26 

27async def handle_notify( 

28 resource_name: str, 

29 title: str, 

30 description: str, 

31 fields: dict, 

32 event_info: kube_notify.EventInfo, 

33 kube_notify_config: dict, 

34 resource_type: str, 

35 labels: dict, 

36 namespace: str, 

37 involved_object_kind: str = None, 

38 reason: str = None, 

39) -> None: 

40 notifs = ["Skipping"] 

41 status_icon = get_status_icon(resource_type, fields) 

42 description = f"[{status_icon}] {description}" 

43 if event_info[0].timestamp() > kube_notify.STARTUP_TIME.timestamp(): 

44 # Send notification 

45 notifs = ["Excluded"] 

46 for group_name, group in kube_notify_config.get("notifications", {}).items(): 

47 if selectors.check_selector( 

48 resource_name, 

49 group.get("resources"), 

50 resource_type, 

51 labels, 

52 namespace, 

53 involved_object_kind, 

54 reason, 

55 ): 

56 notifs = [] 

57 if group_values := group.get("discord"): 

58 notifs.append(f"{group_name}/discord") 

59 discord.send_discord_webhook( 

60 group_values["webhook"], 

61 title, 

62 description, 

63 fields, 

64 group_values.get("username"), 

65 group_values.get("avatar_url"), 

66 ) 

67 if group_values := group.get("gotify"): 

68 notifs.append(f"{group_name}/gotify") 

69 gotify.send_gotify_message( 

70 group_values["url"], 

71 group_values["token"], 

72 title, 

73 description, 

74 fields, 

75 ) 

76 if group_values := group.get("mattermost"): 

77 notifs.append(f"{group_name}/mattermost") 

78 mattermost.send_mattermost_message( 

79 group_values["webhook"], 

80 title, 

81 description, 

82 fields, 

83 group_values.get("channel"), 

84 group_values.get("username"), 

85 group_values.get("icon_url"), 

86 ) 

87 logger.logger.info(f"{event_info[0]} [{','.join(notifs)}] {description}")