Binary Log Parser and Anomaly Detector

#!/usr/bin/env python3
"""
Binary Log Parser and Anomaly Detector

This script parses a custom binary format for login attempt logs and identifies 
potentially compromised accounts based on anomalous behavior.

Format:
- username_length: 4-byte integer (big-endian)
- username: variable-length string
- ip: 4-byte IPv4 address
- timestamp: 4-byte Unix timestamp (big-endian)
- success: 1-byte boolean

Usage:
 python log_analyzer.py --input <log_file> [--output <output_file>] [--sql <sql_file>]
"""

import argparse
import struct
import socket
import sqlite3
import json
import os
import sys
from datetime import datetime
from collections import defaultdict

def parse_binary_log(file_path):
 """
 Parse the binary log file according to the specified format.
 
 Args:
 file_path: Path to the binary log file
 
 Returns:
 List of login attempt records
 """
 logs = []
 
 try:
 with open(file_path, 'rb') as f:
 data = f.read()
 
 offset = 0
 while offset < len(data):
 # Read username length (4-byte integer, big-endian)
 username_length = struct.unpack('>I', data[offset:offset+4])[0]
 offset += 4
 
 # Read username (variable length string)
 username = data[offset:offset+username_length].decode('utf-8')
 offset += username_length
 
 # Read IP address (4-byte IPv4 address)
 ip_bytes = data[offset:offset+4]
 ip_address = socket.inet_ntoa(ip_bytes)
 offset += 4
 
 # Read timestamp (4-byte Unix timestamp, big-endian)
 timestamp = struct.unpack('>I', data[offset:offset+4])[0]
 login_time = datetime.fromtimestamp(timestamp)
 offset += 4
 
 # Read success flag (1-byte boolean)
 success = data[offset] == 1
 offset += 1
 
 # Add the parsed entry to our array
 logs.append({
 'username': username,
 'ip_address': ip_address,
 'timestamp': timestamp,
 'login_time': login_time,
 'success': success
 })
 
 print(f"Successfully parsed {len(logs)} login attempts")
 return logs
 
 except Exception as e:
 print(f"Error parsing log file: {str(e)}")
 sys.exit(1)

def detect_anomalies(logs):
 """
 Analyze logs to identify potentially compromised accounts.
 
 Args:
 logs: List of parsed login attempt records
 
 Returns:
 List of users with anomaly scores and suspicious behavior details
 """
 # Group logs by username
 user_logs = defaultdict(list)
 for log in logs:
 user_logs[log['username']].append(log)
 
 anomalies = []
 
 # Business hours (assuming 9 AM to 5 PM)
 business_start_hour = 9
 business_end_hour = 17
 
 # Time threshold for rapid location changes (in seconds)
 location_change_threshold = 3600 # 1 hour
 
 # Analyze each user's login patterns
 for username, user_log in user_logs.items():
 # Sort logs by timestamp
 user_log.sort(key=lambda x: x['timestamp'])
 
 # Calculate anomaly indicators
 unique_ips = set(log['ip_address'] for log in user_log)
 failed_attempts = sum(1 for log in user_log if not log['success'])
 successful_attempts = sum(1 for log in user_log if log['success'])
 
 # Check for rapid location changes
 rapid_location_changes = 0
 for i in range(1, len(user_log)):
 current_log = user_log[i]
 previous_log = user_log[i-1]
 
 if current_log['ip_address'] != previous_log['ip_address']:
 time_diff = current_log['timestamp'] - previous_log['timestamp']
 if time_diff < location_change_threshold:
 rapid_location_changes += 1
 
 # Calculate after-hours logins
 after_hours_logins = sum(
 1 for log in user_log 
 if log['login_time'].hour < business_start_hour or log['login_time'].hour >= business_end_hour
 )
 
 # Calculate anomaly score based on these factors
 # Weights can be adjusted based on the relative importance of each factor
 anomaly_score = (
 (len(unique_ips) * 10) + 
 (failed_attempts * 5) + 
 (rapid_location_changes * 20) + 
 (after_hours_logins * 3)
 )
 
 anomalies.append({
 'username': username,
 'anomaly_score': anomaly_score,
 'unique_ips': len(unique_ips),
 'ip_addresses': list(unique_ips),
 'failed_attempts': failed_attempts,
 'successful_attempts': successful_attempts,
 'rapid_location_changes': rapid_location_changes,
 'after_hours_logins': after_hours_logins,
 'total_attempts': len(user_log)
 })
 
 # Sort by anomaly score (descending)
 anomalies.sort(key=lambda x: x['anomaly_score'], reverse=True)
 
 return anomalies

def create_database(logs, db_path=':memory:'):
 """
 Create a SQLite database with the login data
 
 Args:
 logs: List of parsed login attempt records
 db_path: Path to save the SQLite database (default: in-memory)
 
 Returns:
 SQLite connection
 """
 conn = sqlite3.connect(db_path)
 cursor = conn.cursor()
 
 # Create table
 cursor.execute('''
 CREATE TABLE login_attempts (
 id INTEGER PRIMARY KEY AUTOINCREMENT,
 username TEXT NOT NULL,
 ip_address TEXT NOT NULL,
 timestamp INTEGER NOT NULL,
 login_time TEXT NOT NULL,
 success INTEGER NOT NULL
 )
 ''')
 
 # Create indexes
 cursor.execute('CREATE INDEX idx_username ON login_attempts(username)')
 cursor.execute('CREATE INDEX idx_ip_address ON login_attempts(ip_address)')
 cursor.execute('CREATE INDEX idx_timestamp ON login_attempts(timestamp)')
 cursor.execute('CREATE INDEX idx_success ON login_attempts(success)')
 
 # Insert data
 for log in logs:
 cursor.execute(
 'INSERT INTO login_attempts (username, ip_address, timestamp, login_time, success) VALUES (?, ?, ?, ?, ?)',
 (
 log['username'],
 log['ip_address'],
 log['timestamp'],
 log['login_time'].isoformat(),
 1 if log['success'] else 0
 )
 )
 
 conn.commit()
 return conn

def generate_sql_script():
 """
 Generate a SQL script for creating the table and analyzing login data
 
 Returns:
 SQL script as a string
 """
 return '''-- Create a table to store login attempts
CREATE TABLE login_attempts (
 id INTEGER PRIMARY KEY AUTOINCREMENT,
 username TEXT NOT NULL,
 ip_address TEXT NOT NULL,
 timestamp INTEGER NOT NULL,
 login_time TEXT NOT NULL,
 success INTEGER NOT NULL
);

-- Create indexes for efficient searching
CREATE INDEX idx_username ON login_attempts(username);
CREATE INDEX idx_ip_address ON login_attempts(ip_address);
CREATE INDEX idx_timestamp ON login_attempts(timestamp);
CREATE INDEX idx_success ON login_attempts(success);

-- Query to find users with multiple IP addresses
SELECT 
 username,
 COUNT(DISTINCT ip_address) AS unique_ip_count
FROM 
 login_attempts
GROUP BY 
 username
HAVING 
 unique_ip_count > 1
ORDER BY 
 unique_ip_count DESC;

-- Query to find failed login attempts followed by successful ones
SELECT 
 a.username,
 COUNT(*) AS suspicious_patterns
FROM 
 login_attempts a
JOIN 
 login_attempts b 
ON 
 a.username = b.username
 AND a.timestamp < b.timestamp
 AND a.success = 0
 AND b.success = 1
 AND (b.timestamp - a.timestamp) < 300 -- Within 5 minutes
GROUP BY 
 a.username
ORDER BY 
 suspicious_patterns DESC;

-- Query to find rapid login attempts from different locations
SELECT 
 a.username,
 a.ip_address AS ip1,
 b.ip_address AS ip2,
 datetime(a.login_time) AS time1,
 datetime(b.login_time) AS time2,
 (julianday(b.login_time) - julianday(a.login_time)) * 24 * 60 AS minutes_between
FROM 
 login_attempts a
JOIN 
 login_attempts b 
ON 
 a.username = b.username
 AND a.ip_address != b.ip_address
 AND a.id < b.id
 AND (julianday(b.login_time) - julianday(a.login_time)) * 24 * 60 < 60 -- Less than 60 minutes apart
ORDER BY 
 minutes_between ASC;

-- Query to find users with after-hours login activity
SELECT 
 username,
 COUNT(*) AS after_hours_logins
FROM 
 login_attempts
WHERE 
 (strftime('%H', login_time) < '09' OR strftime('%H', login_time) >= '17')
GROUP BY 
 username
ORDER BY 
 after_hours_logins DESC;

-- Comprehensive anomaly detection query
WITH 
 unique_ips AS (
 SELECT 
 username, 
 COUNT(DISTINCT ip_address) AS ip_count
 FROM 
 login_attempts
 GROUP BY 
 username
 ),
 failed_logins AS (
 SELECT 
 username, 
 SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) AS failed_count
 FROM 
 login_attempts
 GROUP BY 
 username
 ),
 after_hours AS (
 SELECT 
 username, 
 COUNT(*) AS after_hours_count
 FROM 
 login_attempts
 WHERE 
 (strftime('%H', login_time) < '09' OR strftime('%H', login_time) >= '17')
 GROUP BY 
 username
 ),
 rapid_location_changes AS (
 SELECT 
 a.username,
 COUNT(*) AS rapid_changes
 FROM 
 login_attempts a
 JOIN 
 login_attempts b 
 ON 
 a.username = b.username
 AND a.ip_address != b.ip_address
 AND a.id < b.id
 AND (b.timestamp - a.timestamp) < 3600 -- Less than 1 hour apart
 GROUP BY 
 a.username
 )
SELECT 
 u.username,
 COALESCE(u.ip_count, 0) AS unique_ip_count,
 COALESCE(f.failed_count, 0) AS failed_logins,
 COALESCE(a.after_hours_count, 0) AS after_hours_logins,
 COALESCE(r.rapid_changes, 0) AS rapid_location_changes,
 (COALESCE(u.ip_count, 0) * 10) + 
 (COALESCE(f.failed_count, 0) * 5) + 
 (COALESCE(r.rapid_changes, 0) * 20) + 
 (COALESCE(a.after_hours_count, 0) * 3) AS anomaly_score
FROM 
 unique_ips u
LEFT JOIN 
 failed_logins f ON u.username = f.username
LEFT JOIN 
 after_hours a ON u.username = a.username
LEFT JOIN 
 rapid_location_changes r ON u.username = r.username
ORDER BY 
 anomaly_score DESC
LIMIT 10;
'''

def analyze_compromised_user(conn, username):
 """
 Perform detailed analysis on a potentially compromised user
 
 Args:
 conn: SQLite connection
 username: Username to analyze
 
 Returns:
 Dictionary with detailed analysis
 """
 cursor = conn.cursor()
 
 # Get all login attempts for this user
 cursor.execute(
 '''
 SELECT 
 timestamp,
 login_time,
 ip_address,
 success
 FROM 
 login_attempts
 WHERE 
 username = ?
 ORDER BY 
 timestamp ASC
 ''',
 (username,)
 )
 
 logins = cursor.fetchall()
 
 # Analyze suspicious patterns
 suspicious_events = []
 previous_ip = None
 previous_time = None
 
 for timestamp, login_time, ip_address, success in logins:
 if previous_ip and previous_ip != ip_address:
 time_diff = timestamp - previous_time
 if time_diff < 3600: # Less than 1 hour
 suspicious_events.append({
 'event_type': 'rapid_location_change',
 'previous_ip': previous_ip,
 'new_ip': ip_address,
 'minutes_between': time_diff / 60
 })
 
 previous_ip = ip_address
 previous_time = timestamp
 
 # Get login success rate
 cursor.execute(
 '''
 SELECT 
 COUNT(*) AS total,
 SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) AS successful
 FROM 
 login_attempts
 WHERE 
 username = ?
 ''',
 (username,)
 )
 
 total, successful = cursor.fetchone()
 success_rate = (successful / total) * 100 if total > 0 else 0
 
 return {
 'username': username,
 'login_count': total,
 'success_rate': success_rate,
 'suspicious_events': suspicious_events,
 'login_history': [
 {
 'timestamp': timestamp,
 'login_time': login_time,
 'ip_address': ip_address,
 'success': bool(success)
 }
 for timestamp, login_time, ip_address, success in logins
 ]
 }

def main():
 """
 Main function to process arguments and run the analysis
 """
 parser = argparse.ArgumentParser(description='Analyze binary login logs for compromised accounts')
 parser.add_argument('--input', '-i', required=True, help='Path to binary log file')
 parser.add_argument('--output', '-o', help='Path to save analysis results (JSON)')
 parser.add_argument('--sql', '-s', help='Path to save SQL script')
 parser.add_argument('--db', '-d', help='Path to save SQLite database')
 parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output')
 
 args = parser.parse_args()
 
 # Parse the binary log file
 print(f"Parsing binary log file: {args.input}")
 logs = parse_binary_log(args.input)
 
 # Analyze for anomalies
 print("Analyzing for suspicious behavior...")
 anomalies = detect_anomalies(logs)
 
 # Print top suspicious users
 print("\nTop potentially compromised accounts:")
 for i, anomaly in enumerate(anomalies[:5]):
 print(f"{i+1}. Username: {anomaly['username']}")
 print(f" Anomaly Score: {anomaly['anomaly_score']}")
 print(f" Unique IPs: {anomaly['unique_ips']}")
 print(f" Failed/Successful Logins: {anomaly['failed_attempts']}/{anomaly['successful_attempts']}")
 print(f" Rapid Location Changes: {anomaly['rapid_location_changes']}")
 print(f" After-Hours Logins: {anomaly['after_hours_logins']}")
 print()
 
 # Identify the most likely compromised user
 if anomalies:
 compromised_user = anomalies[0]['username']
 print(f"RESULT: The most likely compromised account is: {compromised_user}")
 
 # Create database for SQL analysis
 db_path = args.db if args.db else ':memory:'
 conn = create_database(logs, db_path)
 
 # Get detailed analysis for the compromised user
 detailed_analysis = analyze_compromised_user(conn, compromised_user)
 
 if args.verbose:
 print("\nDetailed analysis for the compromised account:")
 print(f"Login history for {compromised_user}:")
 for entry in detailed_analysis['login_history']:
 status = "SUCCESS" if entry['success'] else "FAILED"
 print(f"{entry['login_time']} | {entry['ip_address']} | {status}")
 
 if detailed_analysis['suspicious_events']:
 print("\nSuspicious events:")
 for event in detailed_analysis['suspicious_events']:
 print(f"IP changed from {event['previous_ip']} to {event['new_ip']} "
 f"in {event['minutes_between']:.1f} minutes")
 else:
 print("No anomalies detected in the log data")
 
 # Save results to output file
 if args.output:
 with open(args.output, 'w') as f:
 json.dump({
 'summary': {
 'total_logs': len(logs),
 'total_users': len({log['username'] for log in logs}),
 'compromised_user': compromised_user if anomalies else None
 },
 'anomalies': anomalies,
 'detailed_analysis': detailed_analysis if anomalies else None
 }, f, indent=4, default=str)
 print(f"Analysis results saved to {args.output}")
 
 # Save SQL script
 if args.sql:
 with open(args.sql, 'w') as f:
 f.write(generate_sql_script())
 print(f"SQL script saved to {args.sql}")
 
 # Report if database was saved
 if args.db:
 print(f"SQLite database saved to {args.db}")

if __name__ == "__main__":
 main()