ame-validate-3-0-upgrade
validate_3_0_upgrade.py
#!/usr/bin/env python3
#
# Version 1.3
# Copyright © Datapunctum AG 2024
#
# CONFIDENTIAL - Use or disclosure of this material in whole or in part
# without a valid written license from Datapunctum AG is PROHIBITED.
#
import argparse
import requests
import re
import getpass
from base64 import b64encode
requests.packages.urllib3.disable_warnings()
COLLECTION_DATA_URL = "/servicesNS/nobody/alert_manager_enterprise/storage/collections/data/"
COLLECTION_URL = "/servicesNS/nobody/alert_manager_enterprise/storage/collections/config?output_mode=json&count=0"
LEVELS = ["low", "medium", "high"]
HAS_ERROR_FOUND = False
class bcolors:
    HEADER = "\033[95m"
    OKGREEN = "\033[92m"
    OKCYAN = "\033[96m"
    WARNING = "\033[93m"
    FAIL = "\033[91m"
    ENDC = "\033[0m"
    BOLD = "\033[1m"
ILLEGAL_CHARS = re.compile(r"^[^<>:;{}\[\],'\"\\\.]*$")
def validate_item_name(item, item_type):
    if not ILLEGAL_CHARS.match(item["name"]):
        print_error(f"{item_type} \"{item['name']}\" has illegal characters." + "Name must not contain < > : ; { } [ ] , ' \" \\ .")
        return True
    return False
def print_error(error):
    global HAS_ERROR_FOUND
    print(bcolors.FAIL, error, bcolors.ENDC)
    HAS_ERROR_FOUND = True
def prepare_headers(username, password, token):
    headers = {"Content-Type": "application/json", "Accept": "application/json"}
    if token:
        headers["Authorization"] = f"Splunk {token}"
    else:
        headers["Authorization"] = f'Basic {b64encode(f"{username}:{password}".encode()).decode()}'
    return headers
def fetch_collection(url, username, password, token, collection, arguments="", skip = 0):
    headers = prepare_headers(username, password, token)
    arguments_copy = arguments
    if arguments.strip() != "":
        arguments_copy += f"&skip={skip}"
    else:
        arguments_copy = f"?skip={skip}"
    url_copy = url + COLLECTION_DATA_URL + collection + arguments_copy
    response = requests.get(url_copy, headers=headers, verify=False)
    response.raise_for_status()
    active_elements = [element for element in response.json() if element.get("deleted", 0) == 0]
    skip_next = len(active_elements)
    further_active_elements = []
    if skip_next > 0:
        further_active_elements = fetch_collection(url, username, password, token, collection, arguments, skip + skip_next)
    return active_elements + further_active_elements
def find_event_collections(url, username, password, token):
    headers = prepare_headers(username, password, token)
    response = requests.get(url + COLLECTION_URL, headers=headers, verify=False)
    response.raise_for_status()
    collection_names = [collection["name"] for collection in response.json()["entry"]]
    return [collection_name for collection_name in collection_names if collection_name.startswith("ame_") and collection_name.endswith("_events")]
def validate_status_options(status_options):
    has_error = False
    for status_option in status_options:
        for required_key in ["name"]:
            if required_key not in status_option or not status_option[required_key]:
                has_error = True
                print_error(f"Status option \"{status_option['name']}\" is missing required key \"{required_key}\"")
        has_error_in_name = validate_item_name(status_option, "Status option")
        has_error = has_error or has_error_in_name
    if not has_error:
        print(bcolors.OKCYAN, "No known inconsistencies found in status options. Ready for upgrade.", bcolors.ENDC)
def validate_notification_schemes(notification_schemes):
    has_error = False
    for notification_scheme in notification_schemes:
        for required_key in ["name", "tenantlist", "status_map"]:
            if required_key not in notification_scheme or not notification_scheme[required_key]:
                has_error = True
                print_error(f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" is missing required key \"{required_key}\"")
        if not isinstance(notification_scheme["tenantlist"], list):
            has_error = True
            print_error(
                f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" has an invalid value for key \"tenantlist\". Value must be a list"
            )
        has_error_in_name = validate_item_name(notification_scheme, "Notification scheme")
        has_error = has_error or has_error_in_name
        for status, status_map in notification_scheme["status_map"].items():
            for status_map_entry in status_map:
                for required_key in ["action", "channel", "info"]:
                    if required_key not in status_map_entry or not status_map_entry[required_key]:
                        has_error = True
                        print_error(
                            f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" is missing required key \"{required_key}\" for status \"{status}\""
                        )
                if not isinstance(status_map_entry["info"], dict):
                    has_error = True
                    print_error(
                        f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" has an invalid value for key \"info\" for status \"{status}\". Value must be a dictionary"
                    )
                if not isinstance(status_map_entry["channel"], str):
                    has_error = True
                    print_error(
                        f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" has an invalid value for key \"channel\" for status \"{status}\". Value must be a string"
                    )
                if not isinstance(status_map_entry["action"], str):
                    has_error = True
                    print_error(
                        f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" has an invalid value for key \"action\" for status \"{status}\". Value must be a string"
                    )
    if not has_error:
        print(bcolors.OKCYAN, "No known inconsistencies found in notification schemes. Ready for upgrade.", bcolors.ENDC)
def validate_templates(templates):
    has_error = False
    for template in templates:
        for required_key in ["name", "tenant_uid", "impact", "urgency", "default_assignee", "notifications", "status", "time_to_auto_resolve"]:
            if required_key not in template or not template[required_key]:
                has_error = True
                print_error(f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" is missing required key \"{required_key}\"")
        if template["impact"] not in LEVELS:
            has_error = True
            print_error(
                f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid impact level \"{template['impact']}\". Valid levels are low, medium, high"
            )
        if template["urgency"] not in LEVELS:
            has_error = True
            print_error(
                f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid urgency level \"{template['urgency']}\". Valid levels are low, medium, high"
            )
        for bool_key in ["append_alert", "notify_on_append"]:
            if bool_key not in template:
                has_error = True
                print_error(
                    f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has a missing value for key \"{bool_key}\". Try adding a value by updating the template in the UI"
                )
            elif not isinstance(template[bool_key], bool):
                has_error = True
                print_error(
                    f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid value for key \"{bool_key}\". Value must be a boolean but was \"{template.get(bool_key, 'missing')}\" of type \"{type(template.get(bool_key, 'missing'))}\""
                )
        if not isinstance(template["time_to_auto_resolve"], int):
            has_error = True
            print_error(
                f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid value for key \"time_to_auto_resolve\". Value must be an integer but was \"{template.get('time_to_auto_resolve', 'missing')}\" of type \"{type(template.get('time_to_auto_resolve', 'missing'))}\""
            )
        for list_key in ["notable_fields", "tags"]:
            if list_key not in template or not isinstance(template[list_key], list):
                has_error = True
                print_error(
                    f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid value for key \"{list_key}\". Value must be a list but was \"{template.get(list_key, 'missing')}\" of type \"{type(template.get(list_key, 'missing'))}\""
                )
        for str_key in ["name", "tenant_uid", "default_assignee", "notifications", "status"]:
            if not isinstance(template[str_key], str):
                has_error = True
                print_error(
                    f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid value for key \"{str_key}\". Value must be a string but was \"{template.get(str_key, 'missing')}\" of type \"{type(template.get(str_key, 'missing'))}\""
                )
        has_error_in_name = validate_item_name(template, "Template")
        has_error = has_error or has_error_in_name
    if not has_error:
        print(bcolors.OKCYAN, "No known inconsistencies found in templates. Ready for upgrade.", bcolors.ENDC)
def validate_rules(rules):
    has_error = False
    for rule in rules:
        for required_key in ["name", "type", "tenant_uid", "set_status", "value", "from", "to"]:
            if required_key not in rule or not rule[required_key]:
                has_error = True
                print_error(f"Rule \"{rule['name']}\" in tenant \"{rule.get('tenant_uid', 'unknown')}\"  is missing required key \"{required_key}\"")
        if rule["type"] == "multi":
            has_error = True
            print(f"Multi rules are not supported in AME 3.0. Remove the rule \"{rule['name']}\" and create a new rule after the upgrade")
            continue
        for required_single_key in ["field", "comparator", "value"]:
            if required_single_key not in rule or not rule[required_single_key]:
                has_error = True
                print_error(f"Rule \"{rule['name']}\" is missing required key {required_single_key}")
        if rule["comparator"] not in ["eq", "neq", "gt", "lt", "gte", "lte"]:
            has_error = True
            print_error(f"Rule \"{rule['name']}\" in tenant \"{rule.get('tenant_uid', 'unknown')}\" has an invalid comparator \"{rule['comparator']}\". Valid comparators are lt, lte, gt, gte, eq, ne")
        has_error_in_name = validate_item_name(rule, "Rule")
        has_error = has_error or has_error_in_name
    if not has_error:
        print(bcolors.OKCYAN, "No known inconsistencies found in rules. Ready for upgrade.", bcolors.ENDC)
def validate_tenants(tenants):
    has_error = False
    for tenant in tenants:
        for required_key in ["name", "tenant_uid"]:
            if required_key not in tenant or not tenant[required_key]:
                raise Exception(f"Tenant \"{tenant['name']}\" is missing required key \"{required_key}\"")
        has_error_in_name = validate_item_name(tenant, "Tenant")
        has_error = has_error or has_error_in_name
    if not has_error:
        print(bcolors.OKCYAN, "No known inconsistencies found in tenants. Ready for upgrade.", bcolors.ENDC)
def validate_referential_integrity(status_options, notification_schemes, templates, rules, args, username, password, token):
    has_error = False
    status_option_keys = [status_option["_key"] for status_option in status_options]
    templates_by_tenant = {}
    for template in templates:
        if template["tenant_uid"] not in templates_by_tenant:
            templates_by_tenant[template["tenant_uid"]] = []
        templates_by_tenant[template["tenant_uid"]].append(template["_key"])
    notification_schemes_by_tenant = {}
    for notification_scheme in notification_schemes:
        for tenant_uid in notification_scheme["tenantlist"]:
            if tenant_uid not in notification_schemes_by_tenant:
                notification_schemes_by_tenant[tenant_uid] = []
            notification_schemes_by_tenant[tenant_uid].append(notification_scheme["_key"])
    for rule in rules:
        if rule["set_status"] not in status_option_keys:
            has_error = True
            print_error(
                f"Rule \"{rule['name']}\" in tenant \"{rule['tenant_uid']}\" has a status \"{rule['set_status']}\" that does not exist in the status options collection - configure an existing status"
            )
    for template in templates:
        tempate_tenant_uid = template["tenant_uid"]
        if template["notifications"] not in notification_schemes_by_tenant.get(tempate_tenant_uid, []):
            has_error = True
            print_error(
                f"Template \"{template['name']}\" in tenant \"{template['tenant_uid']}\" has a notification scheme \"{template['notifications']}\" that does not exist in the notification schemes collection for tenant \"{tempate_tenant_uid}\" - configure an existing notification scheme"
            )
        if template["status"] not in status_option_keys:
            has_error = True
            print_error(
                f"Template \"{template['name']}\" in tenant \"{template['tenant_uid']}\" has a status \"{template['status']}\" that does not exist in the status options collection - configure an existing status"
            )
    event_collections = find_event_collections(args.url, username, password, token)
    print("\nFound event collections: ", event_collections)
    for event_collection in event_collections:
        tenant_uid = event_collection.split("_")[1]
        if tenant_uid not in templates_by_tenant:
            has_error = True
            print_error(f'No templates found for tenant "{tenant_uid}". Please create at least one template for each tenant before upgrading to AME 3.0')
        tenant_events = fetch_collection(args.url, username, password, token, event_collection, "?output_mode=json&count=0&fields=_key,status,notifications")
        print(f'Found {len(tenant_events)} events for tenant "{tenant_uid}" in collection "{event_collection}"')
        for event in tenant_events:
            if event["status"] not in status_option_keys:
                has_error = True
                print_error(
                    f"Event \"{event['_key']}\" in tenant \"{tenant_uid}\" has a status \"{event['status']}\" that does not exist in the status options collection - change the status of the event to an existing status"
                )
            if event["notifications"] not in notification_schemes_by_tenant.get(tenant_uid, []):
                has_error = True
                print_error(
                    f"Event \"{event['_key']}\" in tenant \"{tenant_uid}\" has a notification scheme \"{event['notifications']}\" that does not exist in the notification schemes collection for tenant \"{tenant_uid}\" - change the notification scheme of the event to an existing notification scheme"
                )
        print(bcolors.OKCYAN, f'Referential integrity checks for tenant "{tenant_uid}" complete', bcolors.ENDC)
    if not has_error:
        print(bcolors.OKGREEN, "\nReferential integrity checks complete", bcolors.ENDC)
def main():
    parser = argparse.ArgumentParser(description="Validate your AME instance for referential integrity")
    parser.add_argument("--url", help="API URL of the AME instance - including port and protocol", required=True)
    parser.add_argument("--auth-type", help="Wether you want to use username / pw or authentication token", required=True)
    args = parser.parse_args()
    username = None
    password = None
    token = None
    if args.auth_type not in ["token", "password"]:
        print("Invalid auth type. Please use 'token' or 'password'")
        return
    if args.auth_type == "token":
        token = getpass.getpass(prompt="Token for the AME instance: ")
        if not token:
            print("Token is required")
            return
    else:
        username = input("Username for the AME instance: ")
        if not username:
            print("Username is required")
            return
        password = getpass.getpass(prompt="Password for the AME instance: ")
        if not password:
            print("Password is required")
            return
    global HAS_ERROR_FOUND
    status_options = fetch_collection(args.url, username, password, token, "ame_statusoptions")
    notification_schemes = fetch_collection(args.url, username, password, token, "ame_notifications")
    templates = fetch_collection(args.url, username, password, token, "ame_templates")
    rules = fetch_collection(args.url, username, password, token, "ame_rules")
    tenants = fetch_collection(args.url, username, password, token, "ame_tenants")
    print("Fetched data from the AME instance\n")
    validate_status_options(status_options)
    validate_notification_schemes(notification_schemes)
    validate_rules(rules)
    validate_templates(templates)
    validate_tenants(tenants)
    if HAS_ERROR_FOUND:
        print(bcolors.FAIL, "\nInternal consistency checks failed. Please fix them before upgrading to AME 3.0", bcolors.ENDC)
    else:
        print(bcolors.OKGREEN, "\nInternal consistency checks passed", bcolors.ENDC, "\n")
    print("Starting referential integrity checks...")
    validate_referential_integrity(status_options, notification_schemes, templates, rules, args, username, password, token)
    if HAS_ERROR_FOUND:
        print(bcolors.FAIL, "\n\nInconsistencies found. Please fix them before upgrading to AME 3.0", bcolors.ENDC)
    else:
        print("\n\nYou are ready to upgrade to AME 3.0! \nPlease make sure to backup your data before proceeding", bcolors.BOLD, "and consult the upgrade guide!\n\n")
if __name__ == "__main__":
    main()