Version: 3.3.0


#!/usr/bin/env python3
# Version 1.3
# Copyright © Datapunctum AG 2024
# CONFIDENTIAL - Use or disclosure of this material in whole or in part
# without a valid written license from Datapunctum AG is PROHIBITED.

import argparse
import requests
import re
import getpass

from base64 import b64encode


COLLECTION_DATA_URL = "/servicesNS/nobody/alert_manager_enterprise/storage/collections/data/"
COLLECTION_URL = "/servicesNS/nobody/alert_manager_enterprise/storage/collections/config?output_mode=json&count=0"

LEVELS = ["low", "medium", "high"]

class bcolors:
HEADER = "\033[95m"
OKGREEN = "\033[92m"
OKCYAN = "\033[96m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"

ILLEGAL_CHARS = re.compile(r"^[^<>:;{}\[\],'\"\\\.]*$")

def validate_item_name(item, item_type):
if not ILLEGAL_CHARS.match(item["name"]):
print_error(f"{item_type} \"{item['name']}\" has illegal characters." + "Name must not contain < > : ; { } [ ] , ' \" \\ .")
return True
return False

def print_error(error):

print(bcolors.FAIL, error, bcolors.ENDC)

def prepare_headers(username, password, token):
headers = {"Content-Type": "application/json", "Accept": "application/json"}

if token:
headers["Authorization"] = f"Splunk {token}"
headers["Authorization"] = f'Basic {b64encode(f"{username}:{password}".encode()).decode()}'

return headers

def fetch_collection(url, username, password, token, collection, arguments="", skip = 0):
headers = prepare_headers(username, password, token)
arguments_copy = arguments
if arguments.strip() != "":
arguments_copy += f"&skip={skip}"
arguments_copy = f"?skip={skip}"
url_copy = url + COLLECTION_DATA_URL + collection + arguments_copy
response = requests.get(url_copy, headers=headers, verify=False)

active_elements = [element for element in response.json() if element.get("deleted", 0) == 0]
skip_next = len(active_elements)
further_active_elements = []
if skip_next > 0:
further_active_elements = fetch_collection(url, username, password, token, collection, arguments, skip + skip_next)
return active_elements + further_active_elements

def find_event_collections(url, username, password, token):
headers = prepare_headers(username, password, token)
response = requests.get(url + COLLECTION_URL, headers=headers, verify=False)

collection_names = [collection["name"] for collection in response.json()["entry"]]
return [collection_name for collection_name in collection_names if collection_name.startswith("ame_") and collection_name.endswith("_events")]

def validate_status_options(status_options):
has_error = False

for status_option in status_options:
for required_key in ["name"]:
if required_key not in status_option or not status_option[required_key]:
has_error = True
print_error(f"Status option \"{status_option['name']}\" is missing required key \"{required_key}\"")

has_error_in_name = validate_item_name(status_option, "Status option")
has_error = has_error or has_error_in_name

if not has_error:
print(bcolors.OKCYAN, "No known inconsistencies found in status options. Ready for upgrade.", bcolors.ENDC)

def validate_notification_schemes(notification_schemes):
has_error = False

for notification_scheme in notification_schemes:
for required_key in ["name", "tenantlist", "status_map"]:
if required_key not in notification_scheme or not notification_scheme[required_key]:
has_error = True
print_error(f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" is missing required key \"{required_key}\"")

if not isinstance(notification_scheme["tenantlist"], list):
has_error = True
f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" has an invalid value for key \"tenantlist\". Value must be a list"

has_error_in_name = validate_item_name(notification_scheme, "Notification scheme")
has_error = has_error or has_error_in_name

for status, status_map in notification_scheme["status_map"].items():
for status_map_entry in status_map:
for required_key in ["action", "channel", "info"]:
if required_key not in status_map_entry or not status_map_entry[required_key]:
has_error = True
f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" is missing required key \"{required_key}\" for status \"{status}\""

if not isinstance(status_map_entry["info"], dict):
has_error = True
f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" has an invalid value for key \"info\" for status \"{status}\". Value must be a dictionary"

if not isinstance(status_map_entry["channel"], str):
has_error = True
f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" has an invalid value for key \"channel\" for status \"{status}\". Value must be a string"

if not isinstance(status_map_entry["action"], str):
has_error = True
f"Notification scheme \"{notification_scheme['name']}\" for tenants \"{notification_scheme.get('tenantlist', 'unknown')}\" has an invalid value for key \"action\" for status \"{status}\". Value must be a string"

if not has_error:
print(bcolors.OKCYAN, "No known inconsistencies found in notification schemes. Ready for upgrade.", bcolors.ENDC)

def validate_templates(templates):
has_error = False

for template in templates:
for required_key in ["name", "tenant_uid", "impact", "urgency", "default_assignee", "notifications", "status", "time_to_auto_resolve"]:
if required_key not in template or not template[required_key]:
has_error = True
print_error(f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" is missing required key \"{required_key}\"")

if template["impact"] not in LEVELS:
has_error = True
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid impact level \"{template['impact']}\". Valid levels are low, medium, high"

if template["urgency"] not in LEVELS:
has_error = True
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid urgency level \"{template['urgency']}\". Valid levels are low, medium, high"

for bool_key in ["append_alert", "notify_on_append"]:
if bool_key not in template:
has_error = True
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has a missing value for key \"{bool_key}\". Try adding a value by updating the template in the UI"
elif not isinstance(template[bool_key], bool):
has_error = True
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid value for key \"{bool_key}\". Value must be a boolean but was \"{template.get(bool_key, 'missing')}\" of type \"{type(template.get(bool_key, 'missing'))}\""

if not isinstance(template["time_to_auto_resolve"], int):
has_error = True
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid value for key \"time_to_auto_resolve\". Value must be an integer but was \"{template.get('time_to_auto_resolve', 'missing')}\" of type \"{type(template.get('time_to_auto_resolve', 'missing'))}\""

for list_key in ["notable_fields", "tags"]:
if list_key not in template or not isinstance(template[list_key], list):
has_error = True
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid value for key \"{list_key}\". Value must be a list but was \"{template.get(list_key, 'missing')}\" of type \"{type(template.get(list_key, 'missing'))}\""

for str_key in ["name", "tenant_uid", "default_assignee", "notifications", "status"]:
if not isinstance(template[str_key], str):
has_error = True
f"Template \"{template['name']}\" in tenant \"{template.get('tenant_uid', 'unknown')}\" has an invalid value for key \"{str_key}\". Value must be a string but was \"{template.get(str_key, 'missing')}\" of type \"{type(template.get(str_key, 'missing'))}\""

has_error_in_name = validate_item_name(template, "Template")
has_error = has_error or has_error_in_name

if not has_error:
print(bcolors.OKCYAN, "No known inconsistencies found in templates. Ready for upgrade.", bcolors.ENDC)

def validate_rules(rules):
has_error = False

for rule in rules:
for required_key in ["name", "type", "tenant_uid", "set_status", "value", "from", "to"]:
if required_key not in rule or not rule[required_key]:
has_error = True
print_error(f"Rule \"{rule['name']}\" in tenant \"{rule.get('tenant_uid', 'unknown')}\" is missing required key \"{required_key}\"")

if rule["type"] == "multi":
has_error = True
print(f"Multi rules are not supported in AME 3.0. Remove the rule \"{rule['name']}\" and create a new rule after the upgrade")

for required_single_key in ["field", "comparator", "value"]:
if required_single_key not in rule or not rule[required_single_key]:
has_error = True
print_error(f"Rule \"{rule['name']}\" is missing required key {required_single_key}")

if rule["comparator"] not in ["eq", "neq", "gt", "lt", "gte", "lte"]:
has_error = True
print_error(f"Rule \"{rule['name']}\" in tenant \"{rule.get('tenant_uid', 'unknown')}\" has an invalid comparator \"{rule['comparator']}\". Valid comparators are lt, lte, gt, gte, eq, ne")

has_error_in_name = validate_item_name(rule, "Rule")
has_error = has_error or has_error_in_name

if not has_error:
print(bcolors.OKCYAN, "No known inconsistencies found in rules. Ready for upgrade.", bcolors.ENDC)

def validate_tenants(tenants):
has_error = False

for tenant in tenants:
for required_key in ["name", "tenant_uid"]:
if required_key not in tenant or not tenant[required_key]:
raise Exception(f"Tenant \"{tenant['name']}\" is missing required key \"{required_key}\"")

has_error_in_name = validate_item_name(tenant, "Tenant")
has_error = has_error or has_error_in_name

if not has_error:
print(bcolors.OKCYAN, "No known inconsistencies found in tenants. Ready for upgrade.", bcolors.ENDC)

def validate_referential_integrity(status_options, notification_schemes, templates, rules, args, username, password, token):
has_error = False

status_option_keys = [status_option["_key"] for status_option in status_options]
templates_by_tenant = {}
for template in templates:
if template["tenant_uid"] not in templates_by_tenant:
templates_by_tenant[template["tenant_uid"]] = []

notification_schemes_by_tenant = {}
for notification_scheme in notification_schemes:
for tenant_uid in notification_scheme["tenantlist"]:
if tenant_uid not in notification_schemes_by_tenant:
notification_schemes_by_tenant[tenant_uid] = []

for rule in rules:
if rule["set_status"] not in status_option_keys:
has_error = True
f"Rule \"{rule['name']}\" in tenant \"{rule['tenant_uid']}\" has a status \"{rule['set_status']}\" that does not exist in the status options collection - configure an existing status"

for template in templates:
tempate_tenant_uid = template["tenant_uid"]
if template["notifications"] not in notification_schemes_by_tenant.get(tempate_tenant_uid, []):
has_error = True
f"Template \"{template['name']}\" in tenant \"{template['tenant_uid']}\" has a notification scheme \"{template['notifications']}\" that does not exist in the notification schemes collection for tenant \"{tempate_tenant_uid}\" - configure an existing notification scheme"

if template["status"] not in status_option_keys:
has_error = True
f"Template \"{template['name']}\" in tenant \"{template['tenant_uid']}\" has a status \"{template['status']}\" that does not exist in the status options collection - configure an existing status"

event_collections = find_event_collections(args.url, username, password, token)
print("\nFound event collections: ", event_collections)

for event_collection in event_collections:
tenant_uid = event_collection.split("_")[1]
if tenant_uid not in templates_by_tenant:
has_error = True
print_error(f'No templates found for tenant "{tenant_uid}". Please create at least one template for each tenant before upgrading to AME 3.0')

tenant_events = fetch_collection(args.url, username, password, token, event_collection, "?output_mode=json&count=0&fields=_key,status,notifications")
print(f'Found {len(tenant_events)} events for tenant "{tenant_uid}" in collection "{event_collection}"')

for event in tenant_events:
if event["status"] not in status_option_keys:
has_error = True
f"Event \"{event['_key']}\" in tenant \"{tenant_uid}\" has a status \"{event['status']}\" that does not exist in the status options collection - change the status of the event to an existing status"

if event["notifications"] not in notification_schemes_by_tenant.get(tenant_uid, []):
has_error = True
f"Event \"{event['_key']}\" in tenant \"{tenant_uid}\" has a notification scheme \"{event['notifications']}\" that does not exist in the notification schemes collection for tenant \"{tenant_uid}\" - change the notification scheme of the event to an existing notification scheme"

print(bcolors.OKCYAN, f'Referential integrity checks for tenant "{tenant_uid}" complete', bcolors.ENDC)

if not has_error:
print(bcolors.OKGREEN, "\nReferential integrity checks complete", bcolors.ENDC)

def main():
parser = argparse.ArgumentParser(description="Validate your AME instance for referential integrity")
parser.add_argument("--url", help="API URL of the AME instance - including port and protocol", required=True)
parser.add_argument("--auth-type", help="Wether you want to use username / pw or authentication token", required=True)
args = parser.parse_args()

username = None
password = None
token = None

if args.auth_type not in ["token", "password"]:
print("Invalid auth type. Please use 'token' or 'password'")

if args.auth_type == "token":
token = getpass.getpass(prompt="Token for the AME instance: ")
if not token:
print("Token is required")
username = input("Username for the AME instance: ")
if not username:
print("Username is required")

password = getpass.getpass(prompt="Password for the AME instance: ")
if not password:
print("Password is required")

status_options = fetch_collection(args.url, username, password, token, "ame_statusoptions")
notification_schemes = fetch_collection(args.url, username, password, token, "ame_notifications")
templates = fetch_collection(args.url, username, password, token, "ame_templates")
rules = fetch_collection(args.url, username, password, token, "ame_rules")
tenants = fetch_collection(args.url, username, password, token, "ame_tenants")
print("Fetched data from the AME instance\n")


print(bcolors.FAIL, "\nInternal consistency checks failed. Please fix them before upgrading to AME 3.0", bcolors.ENDC)
print(bcolors.OKGREEN, "\nInternal consistency checks passed", bcolors.ENDC, "\n")

print("Starting referential integrity checks...")
validate_referential_integrity(status_options, notification_schemes, templates, rules, args, username, password, token)

print(bcolors.FAIL, "\n\nInconsistencies found. Please fix them before upgrading to AME 3.0", bcolors.ENDC)
print("\n\nYou are ready to upgrade to AME 3.0! \nPlease make sure to backup your data before proceeding", bcolors.BOLD, "and consult the upgrade guide!\n\n")

if __name__ == "__main__":