Skip to main content

Binary Parser

import struct
import socket
import datetime
from collections import defaultdict, Counter

def parse_binary_logs(file_path):
    """Parse binary log file according to the specified format."""
    login_attempts = []
    
    with open(file_path, 'rb') as f:
        data = f.read()
        
    offset = 0
    while offset < len(data):
        # Read username length (4-byte integer, big-endian)
        username_length = struct.unpack('>I', data[offset:offset+4])[0]
        offset += 4
        
        # Read username (variable length string)
        username = data[offset:offset+username_length].decode('utf-8')
        offset += username_length
        
        # Read IPv4 address (4 bytes)
        ip_bytes = data[offset:offset+4]
        ip_address = socket.inet_ntoa(ip_bytes)
        offset += 4
        
        # Read timestamp (4-byte Unix timestamp)
        timestamp = struct.unpack('>I', data[offset:offset+4])[0]
        datetime_obj = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
        offset += 4
        
        # Read success flag (1-byte boolean)
        success = bool(data[offset])
        offset += 1
        
        # Store the parsed login attempt
        login_attempts.append({
            'username': username,
            'ip_address': ip_address,
            'timestamp': timestamp,
            'datetime': datetime_obj,
            'success': success
        })
    
    return login_attempts

def analyze_logs(login_attempts):
    """Basic analysis of the log data to extract key metrics."""
    # Get the earliest timestamp (start date of the log)
    earliest_timestamp = min(login_attempts, key=lambda x: x['timestamp'])['timestamp']
    start_date_utc = datetime.datetime.fromtimestamp(earliest_timestamp, tz=datetime.timezone.utc)
    
    # Count unique usernames
    unique_usernames = set(attempt['username'] for attempt in login_attempts)
    
    # Count unique IP addresses
    unique_ips = set(attempt['ip_address'] for attempt in login_attempts)
    
    # Count total login attempts
    total_attempts = len(login_attempts)
    
    return {
        'start_date_utc': start_date_utc,
        'total_attempts': total_attempts,
        'unique_usernames': len(unique_usernames),
        'unique_ips': len(unique_ips),
        'usernames': unique_usernames,
        'ip_addresses': unique_ips
    }

def analyze_login_patterns(login_attempts):
    """Analyze login patterns to identify potentially compromised users."""
    # Track login data per user
    user_data = defaultdict(lambda: {
        'ips': set(),
        'successful_logins': 0,
        'failed_logins': 0,
        'login_times': [],
        'success_after_failure': 0,
        'unique_ips_count': 0
    })
    
    # Group login attempts by user
    user_attempts = defaultdict(list)
    for attempt in login_attempts:
        user = attempt['username']
        user_attempts[user].append(attempt)
        
        # Track IPs used
        user_data[user]['ips'].add(attempt['ip_address'])
        
        # Track login success/failure
        if attempt['success']:
            user_data[user]['successful_logins'] += 1
        else:
            user_data[user]['failed_logins'] += 1
            
        # Track login times
        user_data[user]['login_times'].append(attempt['timestamp'])
    
    # Calculate additional metrics
    for user, attempts in user_attempts.items():
        # Sort attempts by timestamp
        sorted_attempts = sorted(attempts, key=lambda x: x['timestamp'])
        
        # Check for successful login after failures
        for i in range(1, len(sorted_attempts)):
            if (not sorted_attempts[i-1]['success'] and 
                sorted_attempts[i]['success'] and
                sorted_attempts[i]['ip_address'] != sorted_attempts[i-1]['ip_address']):
                user_data[user]['success_after_failure'] += 1
                
        # Count unique IPs
        user_data[user]['unique_ips_count'] = len(user_data[user]['ips'])
    
    # Identify suspicious users based on multiple criteria
    suspicious_users = []
    for user, data in user_data.items():
        suspicion_score = 0
        
        # Multiple IPs used (especially if significantly more than other users)
        if data['unique_ips_count'] > 3:
            suspicion_score += data['unique_ips_count']
        
        # High number of failed logins followed by successful ones from different IPs
        if data['success_after_failure'] > 0:
            suspicion_score += data['success_after_failure'] * 10
            
        # High ratio of failed to successful logins
        if data['successful_logins'] > 0:
            failure_ratio = data['failed_logins'] / data['successful_logins']
            if failure_ratio > 3:
                suspicion_score += failure_ratio
                
        if suspicion_score > 10:
            suspicious_users.append((user, suspicion_score, data))
    
    # Sort by suspicion score
    suspicious_users.sort(key=lambda x: x[1], reverse=True)
    
    return suspicious_users, user_data

def main():
    file_path = input("Enter the path to the binary log file: ")
    
    try:
        login_attempts = parse_binary_logs(file_path)
        print(f"Successfully parsed {len(login_attempts)} login attempts.")
        
        # Basic log analysis for the requested metrics
        log_metrics = analyze_logs(login_attempts)
        
        print("\n===== LOG METRICS =====")
        print(f"Start date of the log (UTC): {log_metrics['start_date_utc'].strftime('%Y-%m-%d %H:%M:%S UTC')}")
        print(f"Total login attempts recorded: {log_metrics['total_attempts']}")
        print(f"Number of unique usernames: {log_metrics['unique_usernames']}")
        print(f"Number of unique IP addresses: {log_metrics['unique_ips']}")
        
        # Advanced pattern analysis for suspicious activity
        suspicious_users, user_data = analyze_login_patterns(login_attempts)
        
        print("\n===== SUSPICIOUS ACTIVITY ANALYSIS =====")
        print(f"Total users analyzed: {len(user_data)}")
        
        if suspicious_users:
            print("\nPotentially compromised users (sorted by suspicion score):")
            for user, score, data in suspicious_users:
                print(f"\nUsername: {user} (Suspicion Score: {score:.2f})")
                print(f"  Unique IPs: {data['unique_ips_count']}")
                print(f"  Successful logins: {data['successful_logins']}")
                print(f"  Failed logins: {data['failed_logins']}")
                print(f"  Successful logins after failures from different IPs: {data['success_after_failure']}")
                print(f"  IP addresses used: {', '.join(data['ips'])}")
        else:
            print("\nNo suspicious users identified.")
        
    except Exception as e:
        print(f"Error processing the file: {e}")

if __name__ == "__main__":
    main()