Skip to main content
Version: Next

ame-validate-3-0-upgrade

validate_3_0_upgrade.py

#!/usr/bin/env python3
#
# Version 1.3
# Copyright © Datapunctum AG 2024
#
# CONFIDENTIAL - Use or disclosure of this material in whole or in part
# without a valid written license from Datapunctum AG is PROHIBITED.
#


import argparse
import requests
import re
import getpass

from base64 import b64encode

requests.packages.urllib3.disable_warnings()

COLLECTION_DATA_URL = "/servicesNS/nobody/alert_manager_enterprise/storage/collections/data/"
COLLECTION_URL = "/servicesNS/nobody/alert_manager_enterprise/storage/collections/config?output_mode=json&count=0"

LEVELS = ["low", "medium", "high"]
HAS_ERROR_FOUND = False


class bcolors:
HEADER = "\033[95m"
OKGREEN = "\033[92m"
OKCYAN = "\033[96m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"


ILLEGAL_CHARS = re.compile(r"^[^<>:;{}\[\],'\"\\\.]*$")


def validate_item_name(item, item_type):
if not ILLEGAL_CHARS.match(item["name"]):
print_error(f"{item_type} \"{item['name']}\" has illegal characters." + "Name must not contain < > : ; { } [ ] , ' \" \\ .")
return True
return False


def print_error(error):
global HAS_ERROR_FOUND

print(bcolors.FAIL, error, bcolors.ENDC)
HAS_ERROR_FOUND = True


def prepare_headers(username, password, token):
headers = {"Content-Type": "application/json", "Accept": "application/json"}

if token:
headers["Authorization"] = f"Splunk {token}"
else:
headers["Authorization"] = f'Basic {b64encode(f"{username}:{password}".encode()).decode()}'

return headers


def fetch_collection(url, username, password, token, collection, arguments="", skip = 0):
headers = prepare_headers(username, password, token)
arguments_copy = arguments
if arguments.strip() != "":
arguments_copy += f"&skip={skip}"
else:
arguments_copy = f"?skip={skip}"
url_copy = url + COLLECTION_DATA_URL + collection + arguments_copy
response = requests.get(url_copy, headers=headers, verify=False)
response.raise_for_status()

active_elements = [element for element in response.json() if element.get("deleted", 0) == 0]
skip_next = len(active_elements)
further_active_elements = []
if skip_next > 0:
further_active_elements = fetch_collection(url, username, password, token, collection, arguments, skip + skip_next)
return active_elements + further_active_elements



def find_event_collections(url, username, password, token):
headers = prepare_headers(username, password, token)
response = requests.get(url + COLLECTION_URL, headers=headers, verify=False)
response.raise_for_status()

collection_names = [collection["name"] for collection in response.json()["entry"]]
return [collection_name for collection_name in collection_names if collection_name.startswith("ame_") and collection_name.endswith("_events")]


def validate_status_options(status_options):
has_error = False

for status_option in status_options:
for required_key in ["name"]:
if required_key not in status_option or not status_option[required_key]:
has_error = True
print_error(f"Status option \"{status_option['name']}\" is missing required key \"{required_key}\"")

has_error_in_name = validate_item_name(status_option, "Status option")
has_error = has_error or has_error_in_name

if not has_error:
print(bcolors.OKCYAN, "No known inconsistencies found in status options. Ready for upgrade.", bcolors.ENDC)


def validate_notification_schemes(notification_schemes):
has_error = False

for notification_scheme in notification_schemes:
for required_key in ["name", "tenantlist", "status_map"]:
if required_key not in notification_scheme or not notification_scheme[required_key]:
has_error = True
print_error(f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" is missing required key \"{required_key}\"")

if not isinstance(notification_scheme["tenantlist"], list):
has_error = True
print_error(
f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" has an invalid value for key \"tenantlist\". Value must be a list"
)

has_error_in_name = validate_item_name(notification_scheme, "Notification scheme")
has_error = has_error or has_error_in_name

for status, status_map in notification_scheme["status_map"].items():
for status_map_entry in status_map:
for required_key in ["action", "channel", "info"]:
if required_key not in status_map_entry or not status_map_entry[required_key]:
has_error = True
print_error(
f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" is missing required key \"{required_key}\" for status \"{status}\""
)

if not isinstance(status_map_entry["info"], dict):
has_error = True
print_error(
f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" has an invalid value for key \"info\" for status \"{status}\". Value must be a dictionary"
)

if not isinstance(status_map_entry["channel"], str):
has_error = True
print_error(
f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" has an invalid value for key \"channel\" for status \"{status}\". Value must be a string"
)

if not isinstance(status_map_entry["action"], str):
has_error = True
print_error(
f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" has an invalid value for key \"action\" for status \"{status}\". Value must be a string"
)

if not has_error:
print(bcolors.OKCYAN, "No known inconsistencies found in notification schemes. Ready for upgrade.", bcolors.ENDC)


def validate_templates(templates):
has_error = False

for template in templates:
for required_key in ["name", "tenant_uid", "impact", "urgency", "default_assignee", "notifications", "status", "time_to_auto_resolve"]:
if required_key not in template or not template[required_key]:
has_error = True
print_error(f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" is missing required key \"{required_key}\"")

if template["impact"] not in LEVELS:
has_error = True
print_error(
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid impact level \"{template['impact']}\". Valid levels are low, medium, high"
)

if template["urgency"] not in LEVELS:
has_error = True
print_error(
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid urgency level \"{template['urgency']}\". Valid levels are low, medium, high"
)

for bool_key in ["append_alert", "notify_on_append"]:
if bool_key not in template:
has_error = True
print_error(
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has a missing value for key \"{bool_key}\". Try adding a value by updating the template in the UI"
)
elif not isinstance(template[bool_key], bool):
has_error = True
print_error(
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid value for key \"{bool_key}\". Value must be a boolean but was \"{template.get(bool_key, 'missing')}\" of type \"{type(template.get(bool_key, 'missing'))}\""
)

if not isinstance(template["time_to_auto_resolve"], int):
has_error = True
print_error(
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid value for key \"time_to_auto_resolve\". Value must be an integer but was \"{template.get('time_to_auto_resolve', 'missing')}\" of type \"{type(template.get('time_to_auto_resolve', 'missing'))}\""
)

for list_key in ["notable_fields", "tags"]:
if list_key not in template or not isinstance(template[list_key], list):
has_error = True
print_error(
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid value for key \"{list_key}\". Value must be a list but was \"{template.get(list_key, 'missing')}\" of type \"{type(template.get(list_key, 'missing'))}\""
)

for str_key in ["name", "tenant_uid", "default_assignee", "notifications", "status"]:
if not isinstance(template[str_key], str):
has_error = True
print_error(
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid value for key \"{str_key}\". Value must be a string but was \"{template.get(str_key, 'missing')}\" of type \"{type(template.get(str_key, 'missing'))}\""
)

has_error_in_name = validate_item_name(template, "Template")
has_error = has_error or has_error_in_name

if not has_error:
print(bcolors.OKCYAN, "No known inconsistencies found in templates. Ready for upgrade.", bcolors.ENDC)


def validate_rules(rules):
has_error = False

for rule in rules:
for required_key in ["name", "type", "tenant_uid", "set_status", "value", "from", "to"]:
if required_key not in rule or not rule[required_key]:
has_error = True
print_error(f"Rule \"{rule['name']}\" in tenant \"{rule.get('tenant_uid', 'unknown')}\" is missing required key \"{required_key}\"")

if rule["type"] == "multi":
has_error = True
print(f"Multi rules are not supported in AME 3.0. Remove the rule \"{rule['name']}\" and create a new rule after the upgrade")
continue

for required_single_key in ["field", "comparator", "value"]:
if required_single_key not in rule or not rule[required_single_key]:
has_error = True
print_error(f"Rule \"{rule['name']}\" is missing required key {required_single_key}")

if rule["comparator"] not in ["eq", "neq", "gt", "lt", "gte", "lte"]:
has_error = True
print_error(f"Rule \"{rule['name']}\" in tenant \"{rule.get('tenant_uid', 'unknown')}\" has an invalid comparator \"{rule['comparator']}\". Valid comparators are lt, lte, gt, gte, eq, ne")

has_error_in_name = validate_item_name(rule, "Rule")
has_error = has_error or has_error_in_name

if not has_error:
print(bcolors.OKCYAN, "No known inconsistencies found in rules. Ready for upgrade.", bcolors.ENDC)


def validate_tenants(tenants):
has_error = False

for tenant in tenants:
for required_key in ["name", "tenant_uid"]:
if required_key not in tenant or not tenant[required_key]:
raise Exception(f"Tenant \"{tenant['name']}\" is missing required key \"{required_key}\"")

has_error_in_name = validate_item_name(tenant, "Tenant")
has_error = has_error or has_error_in_name

if not has_error:
print(bcolors.OKCYAN, "No known inconsistencies found in tenants. Ready for upgrade.", bcolors.ENDC)


def validate_referential_integrity(status_options, notification_schemes, templates, rules, args, username, password, token):
has_error = False

status_option_keys = [status_option["_key"] for status_option in status_options]
templates_by_tenant = {}
for template in templates:
if template["tenant_uid"] not in templates_by_tenant:
templates_by_tenant[template["tenant_uid"]] = []
templates_by_tenant[template["tenant_uid"]].append(template["_key"])

notification_schemes_by_tenant = {}
for notification_scheme in notification_schemes:
for tenant_uid in notification_scheme["tenantlist"]:
if tenant_uid not in notification_schemes_by_tenant:
notification_schemes_by_tenant[tenant_uid] = []
notification_schemes_by_tenant[tenant_uid].append(notification_scheme["_key"])

for rule in rules:
if rule["set_status"] not in status_option_keys:
has_error = True
print_error(
f"Rule \"{rule['name']}\" in tenant \"{rule['tenant_uid']}\" has a status \"{rule['set_status']}\" that does not exist in the status options collection - configure an existing status"
)

for template in templates:
tempate_tenant_uid = template["tenant_uid"]
if template["notifications"] not in notification_schemes_by_tenant.get(tempate_tenant_uid, []):
has_error = True
print_error(
f"Template \"{template['name']}\" in tenant \"{template['tenant_uid']}\" has a notification scheme \"{template['notifications']}\" that does not exist in the notification schemes collection for tenant \"{tempate_tenant_uid}\" - configure an existing notification scheme"
)

if template["status"] not in status_option_keys:
has_error = True
print_error(
f"Template \"{template['name']}\" in tenant \"{template['tenant_uid']}\" has a status \"{template['status']}\" that does not exist in the status options collection - configure an existing status"
)

event_collections = find_event_collections(args.url, username, password, token)
print("\nFound event collections: ", event_collections)

for event_collection in event_collections:
tenant_uid = event_collection.split("_")[1]
if tenant_uid not in templates_by_tenant:
has_error = True
print_error(f'No templates found for tenant "{tenant_uid}". Please create at least one template for each tenant before upgrading to AME 3.0')

tenant_events = fetch_collection(args.url, username, password, token, event_collection, "?output_mode=json&count=0&fields=_key,status,notifications")
print(f'Found {len(tenant_events)} events for tenant "{tenant_uid}" in collection "{event_collection}"')

for event in tenant_events:
if event["status"] not in status_option_keys:
has_error = True
print_error(
f"Event \"{event['_key']}\" in tenant \"{tenant_uid}\" has a status \"{event['status']}\" that does not exist in the status options collection - change the status of the event to an existing status"
)

if event["notifications"] not in notification_schemes_by_tenant.get(tenant_uid, []):
has_error = True
print_error(
f"Event \"{event['_key']}\" in tenant \"{tenant_uid}\" has a notification scheme \"{event['notifications']}\" that does not exist in the notification schemes collection for tenant \"{tenant_uid}\" - change the notification scheme of the event to an existing notification scheme"
)

print(bcolors.OKCYAN, f'Referential integrity checks for tenant "{tenant_uid}" complete', bcolors.ENDC)

if not has_error:
print(bcolors.OKGREEN, "\nReferential integrity checks complete", bcolors.ENDC)


def main():
parser = argparse.ArgumentParser(description="Validate your AME instance for referential integrity")
parser.add_argument("--url", help="API URL of the AME instance - including port and protocol", required=True)
parser.add_argument("--auth-type", help="Wether you want to use username / pw or authentication token", required=True)
args = parser.parse_args()

username = None
password = None
token = None

if args.auth_type not in ["token", "password"]:
print("Invalid auth type. Please use 'token' or 'password'")
return

if args.auth_type == "token":
token = getpass.getpass(prompt="Token for the AME instance: ")
if not token:
print("Token is required")
return
else:
username = input("Username for the AME instance: ")
if not username:
print("Username is required")
return

password = getpass.getpass(prompt="Password for the AME instance: ")
if not password:
print("Password is required")
return

global HAS_ERROR_FOUND
status_options = fetch_collection(args.url, username, password, token, "ame_statusoptions")
notification_schemes = fetch_collection(args.url, username, password, token, "ame_notifications")
templates = fetch_collection(args.url, username, password, token, "ame_templates")
rules = fetch_collection(args.url, username, password, token, "ame_rules")
tenants = fetch_collection(args.url, username, password, token, "ame_tenants")
print("Fetched data from the AME instance\n")

validate_status_options(status_options)
validate_notification_schemes(notification_schemes)
validate_rules(rules)
validate_templates(templates)
validate_tenants(tenants)

if HAS_ERROR_FOUND:
print(bcolors.FAIL, "\nInternal consistency checks failed. Please fix them before upgrading to AME 3.0", bcolors.ENDC)
else:
print(bcolors.OKGREEN, "\nInternal consistency checks passed", bcolors.ENDC, "\n")

print("Starting referential integrity checks...")
validate_referential_integrity(status_options, notification_schemes, templates, rules, args, username, password, token)

if HAS_ERROR_FOUND:
print(bcolors.FAIL, "\n\nInconsistencies found. Please fix them before upgrading to AME 3.0", bcolors.ENDC)
else:
print("\n\nYou are ready to upgrade to AME 3.0! \nPlease make sure to backup your data before proceeding", bcolors.BOLD, "and consult the upgrade guide!\n\n")


if __name__ == "__main__":
main()