Skip to main content

Compromised User Detector

import struct
import socket
import datetime
from collections import defaultdict, Counter

def parse_binary_logs(file_path):
    """Parse binary log file according to the specified format."""
    login_attempts = []
    
    with open(file_path, 'rb') as f:
        data = f.read()
        
    offset = 0
    while offset < len(data):
        # Read username length (4-byte integer, big-endian)
        username_length = struct.unpack('>I', data[offset:offset+4])[0]
        offset += 4
        
        # Read username (variable length string)
        username = data[offset:offset+username_length].decode('utf-8')
        offset += username_length
        
        # Read IPv4 address (4 bytes)
        ip_bytes = data[offset:offset+4]
        ip_address = socket.inet_ntoa(ip_bytes)
        offset += 4
        
        # Read timestamp (4-byte Unix timestamp)
        timestamp = struct.unpack('>I', data[offset:offset+4])[0]
        datetime_obj = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
        offset += 4
        
        # Read success flag (1-byte boolean)
        success = bool(data[offset])
        offset += 1
        
        # Store the parsed login attempt
        login_attempts.append({
            'username': username,
            'ip_address': ip_address,
            'timestamp': timestamp,
            'datetime': datetime_obj,
            'success': success
        })
    
    return login_attempts

def analyze_logs(login_attempts):
    """Basic analysis of the log data to extract key metrics."""
    # Get the earliest timestamp (start date of the log)
    earliest_timestamp = min(login_attempts, key=lambda x: x['timestamp'])['timestamp']
    start_date_utc = datetime.datetime.fromtimestamp(earliest_timestamp, tz=datetime.timezone.utc)
    
    # Count unique usernames
    unique_usernames = set(attempt['username'] for attempt in login_attempts)
    
    # Count unique IP addresses
    unique_ips = set(attempt['ip_address'] for attempt in login_attempts)
    
    # Count total login attempts
    total_attempts = len(login_attempts)
    
    return {
        'start_date_utc': start_date_utc,
        'total_attempts': total_attempts,
        'unique_usernames': len(unique_usernames),
        'unique_ips': len(unique_ips),
        'usernames': unique_usernames,
        'ip_addresses': unique_ips
    }

def analyze_login_patterns(login_attempts):
    """Analyze login patterns to identify potentially compromised users."""
    # Track login data per user
    user_data = defaultdict(lambda: {
        'ips': set(),
        'successful_logins': 0,
        'failed_logins': 0,
        'login_times': [],
        'success_after_failure': 0,
        'unique_ips_count': 0,
        'unusual_hour_logins': 0,
        'successful_after_multiple_failures': 0,
        'geographic_anomalies': 0
    })
    
    # Group login attempts by user
    user_attempts = defaultdict(list)
    for attempt in login_attempts:
        user = attempt['username']
        user_attempts[user].append(attempt)
        
        # Track IPs used
        user_data[user]['ips'].add(attempt['ip_address'])
        
        # Track login success/failure
        if attempt['success']:
            user_data[user]['successful_logins'] += 1
        else:
            user_data[user]['failed_logins'] += 1
            
        # Track login times
        user_data[user]['login_times'].append(attempt['timestamp'])
        
        # Track unusual hour logins (between 1am and 5am)
        hour = attempt['datetime'].hour
        if 1 <= hour <= 5:
            user_data[user]['unusual_hour_logins'] += 1
    
    # Calculate additional metrics
    for user, attempts in user_attempts.items():
        # Sort attempts by timestamp
        sorted_attempts = sorted(attempts, key=lambda x: x['timestamp'])
        
        # Check for successful login after failures
        consecutive_failures = 0
        for i in range(1, len(sorted_attempts)):
            current = sorted_attempts[i]
            previous = sorted_attempts[i-1]
            
            # Check for IP hopping (successful login from different IP)
            if (not previous['success'] and 
                current['success'] and
                current['ip_address'] != previous['ip_address']):
                user_data[user]['success_after_failure'] += 1
                
            # Count consecutive failures
            if not previous['success']:
                consecutive_failures += 1
            else:
                consecutive_failures = 0
                
            # Successful login after multiple consecutive failures
            if current['success'] and consecutive_failures >= 3:
                user_data[user]['successful_after_multiple_failures'] += 1
                consecutive_failures = 0
                
        # Count unique IPs
        user_data[user]['unique_ips_count'] = len(user_data[user]['ips'])
        
        # Calculate rough geographic anomalies based on IP
        # This is simplified - in reality, you'd use IP geolocation
        if user_data[user]['unique_ips_count'] >= 3:
            ips_first_octet = [int(ip.split('.')[0]) for ip in user_data[user]['ips']]
            unique_first_octets = len(set(ips_first_octet))
            if unique_first_octets >= 2:  # Different network classes suggest geographic spread
                user_data[user]['geographic_anomalies'] += unique_first_octets
    
    # Calculate comprehensive suspicion scores
    suspicious_users = []
    for user, data in user_data.items():
        # Initialize base suspicion score
        suspicion_score = 0
        
        # Factor 1: Multiple IPs used
        ip_anomaly_score = (data['unique_ips_count'] - 1) * 5  # Expect one normal IP
        suspicion_score += max(0, ip_anomaly_score)
        
        # Factor 2: Successful logins after failures from different IPs
        suspicion_score += data['success_after_failure'] * 15
        
        # Factor 3: Failed login ratio
        if data['successful_logins'] > 0:
            failure_ratio = data['failed_logins'] / data['successful_logins']
            suspicion_score += min(failure_ratio * 3, 20)  # Cap at 20 points
        elif data['failed_logins'] > 5:  # Only failures, no successes
            suspicion_score += 20
            
        # Factor 4: Unusual hour logins
        suspicion_score += data['unusual_hour_logins'] * 2
        
        # Factor 5: Successful after multiple failures
        suspicion_score += data['successful_after_multiple_failures'] * 25
        
        # Factor 6: Geographic anomalies
        suspicion_score += data['geographic_anomalies'] * 8
        
        # Add context for this score
        reasons = []
        if data['unique_ips_count'] > 1:
            reasons.append(f"Used {data['unique_ips_count']} different IPs")
        if data['success_after_failure'] > 0:
            reasons.append(f"Successful login after failure from different IP: {data['success_after_failure']} times")
        if data['failed_logins'] > 3:
            reasons.append(f"High number of failed logins: {data['failed_logins']}")
        if data['unusual_hour_logins'] > 0:
            reasons.append(f"Logins during unusual hours: {data['unusual_hour_logins']}")
        if data['successful_after_multiple_failures'] > 0:
            reasons.append(f"Successful login after multiple failures: {data['successful_after_multiple_failures']}")
        if data['geographic_anomalies'] > 0:
            reasons.append(f"Potential geographic anomalies detected")
            
        suspicious_users.append((user, suspicion_score, data, reasons))
    
    # Sort by suspicion score
    suspicious_users.sort(key=lambda x: x[1], reverse=True)
    
    # Identify the most likely compromised user
    most_likely_compromised = suspicious_users[0][0] if suspicious_users else None
    
    return suspicious_users, user_data, most_likely_compromised

def main():
    file_path = input("Enter the path to the binary log file: ")
    
    try:
        login_attempts = parse_binary_logs(file_path)
        print(f"Successfully parsed {len(login_attempts)} login attempts.")
        
        # Basic log analysis for the requested metrics
        log_metrics = analyze_logs(login_attempts)
        
        print("\n===== LOG METRICS =====")
        print(f"Start date of the log (UTC): {log_metrics['start_date_utc'].strftime('%Y-%m-%d %H:%M:%S UTC')}")
        print(f"Total login attempts recorded: {log_metrics['total_attempts']}")
        print(f"Number of unique usernames: {log_metrics['unique_usernames']}")
        print(f"Number of unique IP addresses: {log_metrics['unique_ips']}")
        
        # Advanced pattern analysis for suspicious activity
        suspicious_users, user_data, most_likely_compromised = analyze_login_patterns(login_attempts)
        
        print("\n===== COMPROMISED USER IDENTIFICATION =====")
        if most_likely_compromised:
            print(f"\n🚨 MOST LIKELY COMPROMISED USER: {most_likely_compromised} 🚨")
            
            # Find this user in the suspicious_users list
            for user, score, data, reasons in suspicious_users:
                if user == most_likely_compromised:
                    print(f"Suspicion Score: {score:.2f}")
                    print("\nReasons for suspicion:")
                    for i, reason in enumerate(reasons, 1):
                        print(f"  {i}. {reason}")
                    print("\nDetailed metrics:")
                    print(f"  Unique IPs: {data['unique_ips_count']}")
                    print(f"  Successful logins: {data['successful_logins']}")
                    print(f"  Failed logins: {data['failed_logins']}")
                    print(f"  Logins during unusual hours: {data['unusual_hour_logins']}")
                    print(f"  IP addresses used: {', '.join(data['ips'])}")
                    break
        else:
            print("No compromised user identified.")
        
        print("\n===== OTHER SUSPICIOUS USERS =====")
        if len(suspicious_users) > 1:
            for user, score, data, reasons in suspicious_users[1:6]:  # Show top 5 after the most suspicious
                if score > 10:  # Only show users with meaningful suspicion scores
                    print(f"\nUsername: {user} (Suspicion Score: {score:.2f})")
                    print("Reasons for suspicion:")
                    for i, reason in enumerate(reasons, 1):
                        print(f"  {i}. {reason}")
        else:
            print("No other suspicious users identified.")
        
    except Exception as e:
        print(f"Error processing the file: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    main()