HMAC Integrity Checker

#!/usr/bin/env python3
"""
DNS Subdomain Batch Integrity Checker

This script processes multiple message/HMAC file pairs in a directory, following the pattern:
message_#.txt and message_#.hmac

It automatically detects and verifies all matching pairs in the specified directory,
generating a comprehensive report of integrity issues across all files.

Usage:
 python dns_batch_integrity.py --directory <logs_directory> --output <output_dir>
"""

import hmac
import hashlib
import sys
import os
import re
import argparse
import json
import glob
from datetime import datetime
from typing import Dict, List, Any, Tuple, Set

# The valid signing key
VALID_KEY = 'ciCloud-API-20240315-4f7b9c'

class DNSSubdomainBatchChecker:
 def __init__(self, key: str = VALID_KEY):
 """
 Initialize the DNS Subdomain Integrity Checker.
 
 Args:
 key: The HMAC signing key
 """
 self.key = key
 
 # Initialize common DNS patterns to check for tampering
 self.common_subdomains = {
 'www', 'mail', 'api', 'admin', 'portal', 'test', 'dev', 'staging', 
 'secure', 'vpn', 'internal', 'mx', 'smtp', 'pop', 'imap', 'webmail',
 'remote', 'cdn', 'dns', 'ns1', 'ns2', 'ldap', 'db', 'mysql', 'ftp'
 }
 
 # Suspicious TLDs often used in attacks
 self.suspicious_tlds = {
 'xyz', 'top', 'club', 'cyou', 'icu', 'rest', 'space', 'casa',
 'monster', 'bar', 'gq', 'tk', 'ml', 'cf', 'ga'
 }
 
 # Common character substitutions used in spoofing
 self.char_substitutions = {
 '0': 'o', 'o': '0',
 '1': 'l', 'l': '1', 'i': '1',
 '5': 's', 's': '5',
 '3': 'e', 'e': '3',
 '4': 'a', 'a': '4',
 '6': 'g', 'g': '6',
 '7': 't', 't': '7',
 '8': 'b', 'b': '8'
 }
 
 def calculate_hmac(self, message: str) -> str:
 """
 Calculate HMAC signature for a message.
 
 Args:
 message: The message to sign
 
 Returns:
 The HMAC signature (hex encoded)
 """
 key_bytes = self.key.encode('utf-8')
 message_bytes = message.encode('utf-8')
 signature = hmac.new(key_bytes, message_bytes, hashlib.sha256)
 return signature.hexdigest()
 
 def verify_hmac(self, message: str, signature: str) -> bool:
 """
 Verify if a message's HMAC signature is valid.
 
 Args:
 message: The message to verify
 signature: The provided HMAC signature
 
 Returns:
 True if signature is valid, False otherwise
 """
 calculated_signature = self.calculate_hmac(message)
 # Use constant-time comparison to prevent timing attacks
 return hmac.compare_digest(calculated_signature, signature)
 
 def read_file(self, file_path: str) -> List[str]:
 """
 Read a file and return its lines.
 
 Args:
 file_path: Path to the file
 
 Returns:
 List of lines from the file
 """
 with open(file_path, 'r') as f:
 return [line.rstrip() for line in f.readlines()]
 
 def find_file_pairs(self, directory: str) -> List[Tuple[str, str]]:
 """
 Find matching message/HMAC file pairs in the directory.
 
 Args:
 directory: Directory to search for files
 
 Returns:
 List of tuples (message_file_path, hmac_file_path)
 """
 file_pairs = []
 
 # Find all message_*.txt files
 message_files = glob.glob(os.path.join(directory, "message_*.txt"))
 
 for message_file in message_files:
 # Extract the number part
 match = re.search(r'message_(\d+)\.txt$', message_file)
 if match:
 number = match.group(1)
 hmac_file = os.path.join(directory, f"message_{number}.hmac")
 
 # Check if the corresponding HMAC file exists
 if os.path.exists(hmac_file):
 file_pairs.append((message_file, hmac_file))
 
 return file_pairs
 
 def extract_domain_info(self, log_entry: str) -> Dict[str, Any]:
 """
 Extract domain and subdomain information from a log entry.
 
 Args:
 log_entry: A log entry string
 
 Returns:
 Dictionary with extracted domain information
 """
 domain_info = {
 'has_domain': False,
 'domain': '',
 'subdomain': '',
 'tld': ''
 }
 
 # Try to find domain patterns in the log entry
 # This regex looks for domain.tld or subdomain.domain.tld patterns
 domain_matches = re.findall(r'([a-zA-Z0-9][-a-zA-Z0-9]*(\.[a-zA-Z0-9][-a-zA-Z0-9]*)+)', log_entry)
 
 if domain_matches:
 domain_info['has_domain'] = True
 full_domain = domain_matches[0][0]
 domain_info['domain'] = full_domain
 
 # Split by dots to extract subdomain and TLD
 parts = full_domain.split('.')
 
 if len(parts) >= 2:
 domain_info['tld'] = parts[-1].lower()
 
 if len(parts) > 2:
 domain_info['subdomain'] = '.'.join(parts[:-2])
 
 return domain_info
 
 def detect_tampering(self, log_entry: str) -> Dict[str, Any]:
 """
 Detect possible tampering in a DNS log entry.
 
 Args:
 log_entry: A log entry string
 
 Returns:
 Dictionary with tampering analysis
 """
 analysis = {
 'is_suspicious': False,
 'tampering_patterns': set(),
 'possible_original': '',
 'risk_level': 'low',
 'reasons': []
 }
 
 # Extract any domain information from the log entry
 domain_info = self.extract_domain_info(log_entry)
 
 if domain_info['has_domain']:
 # Check for suspicious TLDs
 if domain_info['tld'] in self.suspicious_tlds:
 analysis['is_suspicious'] = True
 analysis['tampering_patterns'].add('suspicious_tld')
 analysis['risk_level'] = 'medium'
 analysis['reasons'].append(f"Suspicious TLD found: .{domain_info['tld']}")
 
 # Check for subdomain issues
 if domain_info['subdomain']:
 subdomain = domain_info['subdomain']
 
 # Check for character substitutions
 for char in subdomain:
 if char in self.char_substitutions:
 analysis['is_suspicious'] = True
 analysis['tampering_patterns'].add('character_substitution')
 analysis['risk_level'] = 'high'
 analysis['reasons'].append(f"Possible character substitution: '{char}' might be '{self.char_substitutions[char]}'")
 
 # Generate a possible original by replacing the character
 possible_original = log_entry.replace(subdomain, 
 subdomain.replace(char, self.char_substitutions[char]))
 analysis['possible_original'] = possible_original
 
 # Check for similar but different subdomains
 for common_sub in self.common_subdomains:
 if subdomain != common_sub and self.levenshtein_distance(subdomain, common_sub) <= 2:
 analysis['is_suspicious'] = True
 analysis['tampering_patterns'].add('similar_subdomain')
 analysis['risk_level'] = 'high'
 analysis['reasons'].append(f"Subdomain '{subdomain}' is suspiciously similar to common subdomain '{common_sub}'")
 
 # Generate a possible original version
 possible_original = log_entry.replace(subdomain, common_sub)
 analysis['possible_original'] = possible_original
 
 # Check for unusually long subdomains (potential data exfiltration)
 if len(subdomain) > 30:
 analysis['is_suspicious'] = True
 analysis['tampering_patterns'].add('exfiltration_subdomain')
 analysis['risk_level'] = 'high'
 analysis['reasons'].append(f"Unusually long subdomain (length: {len(subdomain)}) may indicate data exfiltration")
 
 # Check for IP address patterns
 ip_matches = re.findall(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', log_entry)
 if ip_matches:
 # Check for suspicious IP ranges
 for ip in ip_matches:
 octets = [int(octet) for octet in ip.split('.')]
 
 # Check for loopback or private IP misuse
 if octets[0] == 127 or (octets[0] == 10) or \
 (octets[0] == 172 and 16 <= octets[1] <= 31) or \
 (octets[0] == 192 and octets[1] == 168):
 analysis['is_suspicious'] = True
 analysis['tampering_patterns'].add('internal_ip_exposure')
 analysis['risk_level'] = 'critical'
 analysis['reasons'].append(f"Internal IP address exposed: {ip}")
 
 # Check for DNS record types and modifications
 record_types = ['A', 'AAAA', 'MX', 'CNAME', 'TXT', 'NS', 'SOA', 'SRV', 'PTR']
 for record_type in record_types:
 # Look for record type followed by manipulation indicators
 pattern = r'\b' + record_type + r'\s+(?:changed|modified|updated|deleted|removed|added)\b'
 if re.search(pattern, log_entry, re.IGNORECASE):
 analysis['is_suspicious'] = True
 analysis['tampering_patterns'].add('dns_record_modification')
 analysis['risk_level'] = 'high'
 analysis['reasons'].append(f"DNS {record_type} record modification detected")
 
 # Look for DNS amplification or reflection attack patterns
 if re.search(r'\b(?:amplification|reflection|flood|ddos)\b', log_entry, re.IGNORECASE) and domain_info['has_domain']:
 analysis['is_suspicious'] = True
 analysis['tampering_patterns'].add('dns_amplification')
 analysis['risk_level'] = 'critical'
 analysis['reasons'].append(f"Possible DNS amplification attack signature")
 
 # Update risk level based on number of patterns
 if len(analysis['tampering_patterns']) >= 3:
 analysis['risk_level'] = 'critical'
 elif len(analysis['tampering_patterns']) == 2:
 analysis['risk_level'] = 'high' if analysis['risk_level'] != 'critical' else 'critical'
 
 return analysis
 
 @staticmethod
 def levenshtein_distance(s1: str, s2: str) -> int:
 """
 Calculate the Levenshtein distance between two strings.
 
 Args:
 s1: First string
 s2: Second string
 
 Returns:
 The Levenshtein distance
 """
 if len(s1) < len(s2):
 return DNSSubdomainBatchChecker.levenshtein_distance(s2, s1)
 
 if len(s2) == 0:
 return len(s1)
 
 previous_row = range(len(s2) + 1)
 for i, c1 in enumerate(s1):
 current_row = [i + 1]
 for j, c2 in enumerate(s2):
 insertions = previous_row[j + 1] + 1
 deletions = current_row[j] + 1
 substitutions = previous_row[j] + (c1 != c2)
 current_row.append(min(insertions, deletions, substitutions))
 previous_row = current_row
 
 return previous_row[-1]
 
 def process_file_pair(self, message_file: str, hmac_file: str) -> Dict[str, Any]:
 """
 Process a single message/HMAC file pair.
 
 Args:
 message_file: Path to the message file
 hmac_file: Path to the HMAC file
 
 Returns:
 Dictionary with processing results
 """
 # Extract file number for identification
 match = re.search(r'message_(\d+)\.txt$', message_file)
 file_id = match.group(1) if match else os.path.basename(message_file)
 
 # Read files
 try:
 message_content = self.read_file(message_file)
 hmac_content = self.read_file(hmac_file)
 
 # Verify each line
 results = {
 'file_id': file_id,
 'message_file': message_file,
 'hmac_file': hmac_file,
 'total_lines': min(len(message_content), len(hmac_content)),
 'valid_lines': 0,
 'invalid_lines': 0,
 'suspicious_lines': 0,
 'invalid_entries': [],
 'tampering_summary': {
 'patterns': {},
 'risk_levels': {
 'low': 0,
 'medium': 0,
 'high': 0,
 'critical': 0
 }
 }
 }
 
 # Process lines
 for i in range(min(len(message_content), len(hmac_content))):
 message = message_content[i]
 signature = hmac_content[i]
 
 # Skip empty lines
 if not message or not signature:
 continue
 
 # Verify HMAC
 is_valid = self.verify_hmac(message, signature)
 
 if is_valid:
 results['valid_lines'] += 1
 else:
 results['invalid_lines'] += 1
 
 # Generate correct signature
 correct_signature = self.calculate_hmac(message)
 
 # Check for tampering
 tampering_analysis = self.detect_tampering(message)
 
 invalid_entry = {
 'line_number': i + 1,
 'message': message,
 'provided_signature': signature,
 'correct_signature': correct_signature,
 'tampering_analysis': tampering_analysis
 }
 
 results['invalid_entries'].append(invalid_entry)
 
 # Update tampering statistics
 if tampering_analysis['is_suspicious']:
 results['suspicious_lines'] += 1
 results['tampering_summary']['risk_levels'][tampering_analysis['risk_level']] += 1
 
 # Count pattern occurrences
 for pattern in tampering_analysis['tampering_patterns']:
 if pattern not in results['tampering_summary']['patterns']:
 results['tampering_summary']['patterns'][pattern] = 0
 results['tampering_summary']['patterns'][pattern] += 1
 
 return results
 
 except Exception as e:
 print(f"Error processing file pair ({message_file}, {hmac_file}): {e}")
 return {
 'file_id': file_id,
 'message_file': message_file,
 'hmac_file': hmac_file,
 'error': str(e)
 }
 
 def process_directory(self, directory: str) -> Dict[str, Any]:
 """
 Process all matching file pairs in a directory.
 
 Args:
 directory: Directory containing message_*.txt and message_*.hmac files
 
 Returns:
 Dictionary with processing results for all files
 """
 # Find all matching file pairs
 file_pairs = self.find_file_pairs(directory)
 
 if not file_pairs:
 print(f"No matching message/HMAC file pairs found in {directory}")
 return {'error': 'No matching file pairs found'}
 
 # Process each file pair
 results = {
 'directory': directory,
 'total_files': len(file_pairs),
 'processed_files': 0,
 'files_with_errors': 0,
 'total_lines_processed': 0,
 'total_invalid_lines': 0,
 'total_suspicious_lines': 0,
 'file_results': [],
 'overall_tampering_summary': {
 'patterns': {},
 'risk_levels': {
 'low': 0,
 'medium': 0,
 'high': 0,
 'critical': 0
 }
 }
 }
 
 for message_file, hmac_file in file_pairs:
 print(f"Processing file pair: {os.path.basename(message_file)} and {os.path.basename(hmac_file)}")
 
 # Process file pair
 file_result = self.process_file_pair(message_file, hmac_file)
 results['file_results'].append(file_result)
 
 # Update overall statistics
 if 'error' in file_result:
 results['files_with_errors'] += 1
 else:
 results['processed_files'] += 1
 results['total_lines_processed'] += file_result['total_lines']
 results['total_invalid_lines'] += file_result['invalid_lines']
 results['total_suspicious_lines'] += file_result['suspicious_lines']
 
 # Aggregate tampering patterns
 for pattern, count in file_result['tampering_summary']['patterns'].items():
 if pattern not in results['overall_tampering_summary']['patterns']:
 results['overall_tampering_summary']['patterns'][pattern] = 0
 results['overall_tampering_summary']['patterns'][pattern] += count
 
 # Aggregate risk levels
 for level in ['low', 'medium', 'high', 'critical']:
 results['overall_tampering_summary']['risk_levels'][level] += \
 file_result['tampering_summary']['risk_levels'][level]
 
 return results
 
 def save_corrected_hmac_files(self, results: Dict[str, Any], output_dir: str) -> None:
 """
 Save corrected HMAC files for each processed file pair.
 
 Args:
 results: Overall processing results
 output_dir: Output directory
 """
 corrected_dir = os.path.join(output_dir, 'corrected_hmac_files')
 os.makedirs(corrected_dir, exist_ok=True)
 
 for file_result in results['file_results']:
 if 'error' in file_result:
 continue
 
 # Get original file content
 message_file = file_result['message_file']
 hmac_file = file_result['hmac_file']
 
 try:
 # Read original message file
 message_content = self.read_file(message_file)
 
 # Create corrected HMAC file
 corrected_hmac_path = os.path.join(corrected_dir, os.path.basename(hmac_file))
 
 with open(corrected_hmac_path, 'w') as f:
 for message in message_content:
 if message: # Skip empty lines
 correct_signature = self.calculate_hmac(message)
 f.write(f"{correct_signature}\n")
 
 print(f"Created corrected HMAC file: {corrected_hmac_path}")
 
 except Exception as e:
 print(f"Error creating corrected HMAC file for {os.path.basename(hmac_file)}: {e}")
 
 def save_results(self, results: Dict[str, Any], output_dir: str) -> None:
 """
 Save processing results to output files.
 
 Args:
 results: Overall processing results
 output_dir: Output directory
 """
 os.makedirs(output_dir, exist_ok=True)
 
 # Save overall JSON results
 with open(os.path.join(output_dir, 'batch_results.json'), 'w') as f:
 # Convert sets to lists for JSON serialization
 serializable_results = json.dumps(results, indent=2, default=lambda x: list(x) if isinstance(x, set) else x)
 f.write(serializable_results)
 
 # Save detailed report
 with open(os.path.join(output_dir, 'integrity_report.txt'), 'w') as f:
 f.write(f"DNS Subdomain Batch Integrity Report\n")
 f.write(f"==================================\n\n")
 f.write(f"Generated: {datetime.now().isoformat()}\n\n")
 
 f.write(f"Overall Summary:\n")
 f.write(f"---------------\n")
 f.write(f"Directory processed: {results['directory']}\n")
 f.write(f"Total file pairs: {results['total_files']}\n")
 f.write(f"Successfully processed: {results['processed_files']}\n")
 f.write(f"Files with errors: {results['files_with_errors']}\n")
 f.write(f"Total log lines processed: {results['total_lines_processed']}\n")
 f.write(f"Total invalid lines: {results['total_invalid_lines']}\n")
 f.write(f"Total suspicious lines: {results['total_suspicious_lines']}\n\n")
 
 # Risk level summary
 if results['total_suspicious_lines'] > 0:
 f.write(f"Risk Level Distribution:\n")
 for level in ['low', 'medium', 'high', 'critical']:
 count = results['overall_tampering_summary']['risk_levels'][level]
 indicator = '!' * (1 if level == 'low' else 2 if level == 'medium' else 3 if level == 'high' else 4)
 f.write(f" {indicator} {level.upper()}: {count}\n")
 
 f.write(f"\nTampering Patterns Detected:\n")
 for pattern, count in sorted(results['overall_tampering_summary']['patterns'].items(), 
 key=lambda x: x[1], reverse=True):
 f.write(f" - {pattern}: {count}\n")
 
 # Per-file summary
 f.write(f"\nPer-File Summary:\n")
 f.write(f"----------------\n")
 for file_result in results['file_results']:
 if 'error' in file_result:
 f.write(f"File {file_result['file_id']}: ERROR - {file_result['error']}\n")
 else:
 integrity_status = "COMPROMISED" if file_result['invalid_lines'] > 0 else "INTACT"
 risk_level = "HIGH RISK" if (file_result['tampering_summary']['risk_levels']['high'] > 0 or 
 file_result['tampering_summary']['risk_levels']['critical'] > 0) else \
 "MEDIUM RISK" if file_result['tampering_summary']['risk_levels']['medium'] > 0 else \
 "LOW RISK" if file_result['suspicious_lines'] > 0 else "SAFE"
 
 f.write(f"File {file_result['file_id']}: {integrity_status} - {risk_level}\n")
 f.write(f" Message file: {os.path.basename(file_result['message_file'])}\n")
 f.write(f" Lines: {file_result['total_lines']} total, {file_result['invalid_lines']} invalid, {file_result['suspicious_lines']} suspicious\n")
 
 if file_result['suspicious_lines'] > 0:
 # Show the first few suspicious entries
 suspicious_entries = [entry for entry in file_result['invalid_entries'] 
 if entry['tampering_analysis']['is_suspicious']]
 
 f.write(f" Top suspicious entries ({min(3, len(suspicious_entries))} of {len(suspicious_entries)}):\n")
 for i, entry in enumerate(suspicious_entries[:3]):
 f.write(f" Line {entry['line_number']}: {entry['message'][:50]}{'...' if len(entry['message']) > 50 else ''}\n")
 f.write(f" Risk: {entry['tampering_analysis']['risk_level'].upper()}\n")
 f.write(f" Patterns: {', '.join(entry['tampering_analysis']['tampering_patterns'])}\n")
 
 f.write("\n")
 
 # Create a file with high-risk entries for immediate attention
 high_risk_entries = []
 for file_result in results['file_results']:
 if 'error' in file_result:
 continue
 
 file_id = file_result['file_id']
 for entry in file_result['invalid_entries']:
 if entry['tampering_analysis']['is_suspicious'] and \
 entry['tampering_analysis']['risk_level'] in ['high', 'critical']:
 entry_copy = entry.copy()
 entry_copy['file_id'] = file_id
 high_risk_entries.append(entry_copy)
 
 if high_risk_entries:
 with open(os.path.join(output_dir, 'high_risk_entries.txt'), 'w') as f:
 f.write(f"HIGH RISK DNS LOG ENTRIES - IMMEDIATE ATTENTION REQUIRED\n")
 f.write(f"======================================================\n\n")
 f.write(f"Generated: {datetime.now().isoformat()}\n")
 f.write(f"Total high-risk entries: {len(high_risk_entries)}\n\n")
 
 # Sort by risk level (critical first)
 high_risk_entries.sort(key=lambda x: 0 if x['tampering_analysis']['risk_level'] == 'critical' else 1)
 
 for entry in high_risk_entries:
 f.write(f"File {entry['file_id']}, Line {entry['line_number']} - [{entry['tampering_analysis']['risk_level'].upper()}]\n")
 f.write(f" Message: {entry['message']}\n")
 f.write(f" Provided signature: {entry['provided_signature']}\n")
 f.write(f" Correct signature: {entry['correct_signature']}\n")
 f.write(f" Tampering patterns: {', '.join(entry['tampering_analysis']['tampering_patterns'])}\n")
 f.write(f" Reasons:\n")
 for reason in entry['tampering_analysis']['reasons']:
 f.write(f" - {reason}\n")
 
 if entry['tampering_analysis']['possible_original']:
 f.write(f" Possible original: {entry['tampering_analysis']['possible_original']}\n")
 
 f.write("\n")
 
 # Save corrected HMAC files
 self.save_corrected_hmac_files(results, output_dir)

def main():
 """Main entry point for the script."""
 parser = argparse.ArgumentParser(description='DNS Subdomain Batch Integrity Checker')
 parser.add_argument('--directory', '-d', required=True, help='Directory containing log files')
 parser.add_argument('--output', '-o', default='batch_output', help='Output directory (default: batch_output)')
 parser.add_argument('--key', '-k', default=VALID_KEY, help=f'HMAC signing key (default: {VALID_KEY})')
 
 args = parser.parse_args()
 
 checker = DNSSubdomainBatchChecker(key=args.key)
 
 try:
 start_time = datetime.now()
 print(f"Starting batch processing of DNS log files in {args.directory}")
 print(f"Started at: {start_time.isoformat()}")
 
 results = checker.process_directory(args.directory)
 
 if 'error' in results:
 print(f"Error: {results['error']}")
 sys.exit(1)
 
 # Save results
 checker.save_results(results, args.output)
 
 end_time = datetime.now()
 duration = end_time - start_time
 
 print(f"\nBatch processing completed!")
 print(f"Duration: {duration.total_seconds():.2f} seconds")
 print(f"Files processed: {results['processed_files']} of {results['total_files']}")
 print(f"Total lines checked: {results['total_lines_processed']}")
 print(f"Invalid lines detected: {results['total_invalid_lines']}")
 print(f"Suspicious lines detected: {results['total_suspicious_lines']}")
 print(f"Results saved to: {args.output}")
 
 if results['total_suspicious_lines'] > 0:
 print(f"\n⚠️ WARNING: {results['total_suspicious_lines']} suspicious log entries detected!")
 high_risk = results['overall_tampering_summary']['risk_levels']['high'] + \
 results['overall_tampering_summary']['risk_levels']['critical']
 
 if high_risk > 0:
 print(f"❗ CRITICAL: {high_risk} high or critical risk entries found!")
 print(f"Check {os.path.join(args.output, 'high_risk_entries.txt')} for details")
 
 except Exception as e:
 print(f"Error: {e}")
 import traceback
 traceback.print_exc()
 sys.exit(1)

if __name__ == "__main__":
 main()