Binary Log Parser and Anomaly Detector
#!/usr/bin/env python3
"""
Binary Log Parser and Anomaly Detector
This script parses a custom binary format for login attempt logs and identifies
potentially compromised accounts based on anomalous behavior.
Format:
- username_length: 4-byte integer (big-endian)
- username: variable-length string
- ip: 4-byte IPv4 address
- timestamp: 4-byte Unix timestamp (big-endian)
- success: 1-byte boolean
Usage:
python log_analyzer.py --input <log_file> [--output <output_file>] [--sql <sql_file>]
"""
import argparse
import struct
import socket
import sqlite3
import json
import os
import sys
from datetime import datetime
from collections import defaultdict
def parse_binary_log(file_path):
"""
Parse the binary log file according to the specified format.
Args:
file_path: Path to the binary log file
Returns:
List of login attempt records
"""
logs = []
try:
with open(file_path, 'rb') as f:
data = f.read()
offset = 0
while offset < len(data):
# Read username length (4-byte integer, big-endian)
username_length = struct.unpack('>I', data[offset:offset+4])[0]
offset += 4
# Read username (variable length string)
username = data[offset:offset+username_length].decode('utf-8')
offset += username_length
# Read IP address (4-byte IPv4 address)
ip_bytes = data[offset:offset+4]
ip_address = socket.inet_ntoa(ip_bytes)
offset += 4
# Read timestamp (4-byte Unix timestamp, big-endian)
timestamp = struct.unpack('>I', data[offset:offset+4])[0]
login_time = datetime.fromtimestamp(timestamp)
offset += 4
# Read success flag (1-byte boolean)
success = data[offset] == 1
offset += 1
# Add the parsed entry to our array
logs.append({
'username': username,
'ip_address': ip_address,
'timestamp': timestamp,
'login_time': login_time,
'success': success
})
print(f"Successfully parsed {len(logs)} login attempts")
return logs
except Exception as e:
print(f"Error parsing log file: {str(e)}")
sys.exit(1)
def detect_anomalies(logs):
"""
Analyze logs to identify potentially compromised accounts.
Args:
logs: List of parsed login attempt records
Returns:
List of users with anomaly scores and suspicious behavior details
"""
# Group logs by username
user_logs = defaultdict(list)
for log in logs:
user_logs[log['username']].append(log)
anomalies = []
# Business hours (assuming 9 AM to 5 PM)
business_start_hour = 9
business_end_hour = 17
# Time threshold for rapid location changes (in seconds)
location_change_threshold = 3600 # 1 hour
# Analyze each user's login patterns
for username, user_log in user_logs.items():
# Sort logs by timestamp
user_log.sort(key=lambda x: x['timestamp'])
# Calculate anomaly indicators
unique_ips = set(log['ip_address'] for log in user_log)
failed_attempts = sum(1 for log in user_log if not log['success'])
successful_attempts = sum(1 for log in user_log if log['success'])
# Check for rapid location changes
rapid_location_changes = 0
for i in range(1, len(user_log)):
current_log = user_log[i]
previous_log = user_log[i-1]
if current_log['ip_address'] != previous_log['ip_address']:
time_diff = current_log['timestamp'] - previous_log['timestamp']
if time_diff < location_change_threshold:
rapid_location_changes += 1
# Calculate after-hours logins
after_hours_logins = sum(
1 for log in user_log
if log['login_time'].hour < business_start_hour or log['login_time'].hour >= business_end_hour
)
# Calculate anomaly score based on these factors
# Weights can be adjusted based on the relative importance of each factor
anomaly_score = (
(len(unique_ips) * 10) +
(failed_attempts * 5) +
(rapid_location_changes * 20) +
(after_hours_logins * 3)
)
anomalies.append({
'username': username,
'anomaly_score': anomaly_score,
'unique_ips': len(unique_ips),
'ip_addresses': list(unique_ips),
'failed_attempts': failed_attempts,
'successful_attempts': successful_attempts,
'rapid_location_changes': rapid_location_changes,
'after_hours_logins': after_hours_logins,
'total_attempts': len(user_log)
})
# Sort by anomaly score (descending)
anomalies.sort(key=lambda x: x['anomaly_score'], reverse=True)
return anomalies
def create_database(logs, db_path=':memory:'):
"""
Create a SQLite database with the login data
Args:
logs: List of parsed login attempt records
db_path: Path to save the SQLite database (default: in-memory)
Returns:
SQLite connection
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create table
cursor.execute('''
CREATE TABLE login_attempts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
ip_address TEXT NOT NULL,
timestamp INTEGER NOT NULL,
login_time TEXT NOT NULL,
success INTEGER NOT NULL
)
''')
# Create indexes
cursor.execute('CREATE INDEX idx_username ON login_attempts(username)')
cursor.execute('CREATE INDEX idx_ip_address ON login_attempts(ip_address)')
cursor.execute('CREATE INDEX idx_timestamp ON login_attempts(timestamp)')
cursor.execute('CREATE INDEX idx_success ON login_attempts(success)')
# Insert data
for log in logs:
cursor.execute(
'INSERT INTO login_attempts (username, ip_address, timestamp, login_time, success) VALUES (?, ?, ?, ?, ?)',
(
log['username'],
log['ip_address'],
log['timestamp'],
log['login_time'].isoformat(),
1 if log['success'] else 0
)
)
conn.commit()
return conn
def generate_sql_script():
"""
Generate a SQL script for creating the table and analyzing login data
Returns:
SQL script as a string
"""
return '''-- Create a table to store login attempts
CREATE TABLE login_attempts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
ip_address TEXT NOT NULL,
timestamp INTEGER NOT NULL,
login_time TEXT NOT NULL,
success INTEGER NOT NULL
);
-- Create indexes for efficient searching
CREATE INDEX idx_username ON login_attempts(username);
CREATE INDEX idx_ip_address ON login_attempts(ip_address);
CREATE INDEX idx_timestamp ON login_attempts(timestamp);
CREATE INDEX idx_success ON login_attempts(success);
-- Query to find users with multiple IP addresses
SELECT
username,
COUNT(DISTINCT ip_address) AS unique_ip_count
FROM
login_attempts
GROUP BY
username
HAVING
unique_ip_count > 1
ORDER BY
unique_ip_count DESC;
-- Query to find failed login attempts followed by successful ones
SELECT
a.username,
COUNT(*) AS suspicious_patterns
FROM
login_attempts a
JOIN
login_attempts b
ON
a.username = b.username
AND a.timestamp < b.timestamp
AND a.success = 0
AND b.success = 1
AND (b.timestamp - a.timestamp) < 300 -- Within 5 minutes
GROUP BY
a.username
ORDER BY
suspicious_patterns DESC;
-- Query to find rapid login attempts from different locations
SELECT
a.username,
a.ip_address AS ip1,
b.ip_address AS ip2,
datetime(a.login_time) AS time1,
datetime(b.login_time) AS time2,
(julianday(b.login_time) - julianday(a.login_time)) * 24 * 60 AS minutes_between
FROM
login_attempts a
JOIN
login_attempts b
ON
a.username = b.username
AND a.ip_address != b.ip_address
AND a.id < b.id
AND (julianday(b.login_time) - julianday(a.login_time)) * 24 * 60 < 60 -- Less than 60 minutes apart
ORDER BY
minutes_between ASC;
-- Query to find users with after-hours login activity
SELECT
username,
COUNT(*) AS after_hours_logins
FROM
login_attempts
WHERE
(strftime('%H', login_time) < '09' OR strftime('%H', login_time) >= '17')
GROUP BY
username
ORDER BY
after_hours_logins DESC;
-- Comprehensive anomaly detection query
WITH
unique_ips AS (
SELECT
username,
COUNT(DISTINCT ip_address) AS ip_count
FROM
login_attempts
GROUP BY
username
),
failed_logins AS (
SELECT
username,
SUM(CASE WHEN success = 0 THEN 1 ELSE 0 END) AS failed_count
FROM
login_attempts
GROUP BY
username
),
after_hours AS (
SELECT
username,
COUNT(*) AS after_hours_count
FROM
login_attempts
WHERE
(strftime('%H', login_time) < '09' OR strftime('%H', login_time) >= '17')
GROUP BY
username
),
rapid_location_changes AS (
SELECT
a.username,
COUNT(*) AS rapid_changes
FROM
login_attempts a
JOIN
login_attempts b
ON
a.username = b.username
AND a.ip_address != b.ip_address
AND a.id < b.id
AND (b.timestamp - a.timestamp) < 3600 -- Less than 1 hour apart
GROUP BY
a.username
)
SELECT
u.username,
COALESCE(u.ip_count, 0) AS unique_ip_count,
COALESCE(f.failed_count, 0) AS failed_logins,
COALESCE(a.after_hours_count, 0) AS after_hours_logins,
COALESCE(r.rapid_changes, 0) AS rapid_location_changes,
(COALESCE(u.ip_count, 0) * 10) +
(COALESCE(f.failed_count, 0) * 5) +
(COALESCE(r.rapid_changes, 0) * 20) +
(COALESCE(a.after_hours_count, 0) * 3) AS anomaly_score
FROM
unique_ips u
LEFT JOIN
failed_logins f ON u.username = f.username
LEFT JOIN
after_hours a ON u.username = a.username
LEFT JOIN
rapid_location_changes r ON u.username = r.username
ORDER BY
anomaly_score DESC
LIMIT 10;
'''
def analyze_compromised_user(conn, username):
"""
Perform detailed analysis on a potentially compromised user
Args:
conn: SQLite connection
username: Username to analyze
Returns:
Dictionary with detailed analysis
"""
cursor = conn.cursor()
# Get all login attempts for this user
cursor.execute(
'''
SELECT
timestamp,
login_time,
ip_address,
success
FROM
login_attempts
WHERE
username = ?
ORDER BY
timestamp ASC
''',
(username,)
)
logins = cursor.fetchall()
# Analyze suspicious patterns
suspicious_events = []
previous_ip = None
previous_time = None
for timestamp, login_time, ip_address, success in logins:
if previous_ip and previous_ip != ip_address:
time_diff = timestamp - previous_time
if time_diff < 3600: # Less than 1 hour
suspicious_events.append({
'event_type': 'rapid_location_change',
'previous_ip': previous_ip,
'new_ip': ip_address,
'minutes_between': time_diff / 60
})
previous_ip = ip_address
previous_time = timestamp
# Get login success rate
cursor.execute(
'''
SELECT
COUNT(*) AS total,
SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) AS successful
FROM
login_attempts
WHERE
username = ?
''',
(username,)
)
total, successful = cursor.fetchone()
success_rate = (successful / total) * 100 if total > 0 else 0
return {
'username': username,
'login_count': total,
'success_rate': success_rate,
'suspicious_events': suspicious_events,
'login_history': [
{
'timestamp': timestamp,
'login_time': login_time,
'ip_address': ip_address,
'success': bool(success)
}
for timestamp, login_time, ip_address, success in logins
]
}
def main():
"""
Main function to process arguments and run the analysis
"""
parser = argparse.ArgumentParser(description='Analyze binary login logs for compromised accounts')
parser.add_argument('--input', '-i', required=True, help='Path to binary log file')
parser.add_argument('--output', '-o', help='Path to save analysis results (JSON)')
parser.add_argument('--sql', '-s', help='Path to save SQL script')
parser.add_argument('--db', '-d', help='Path to save SQLite database')
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output')
args = parser.parse_args()
# Parse the binary log file
print(f"Parsing binary log file: {args.input}")
logs = parse_binary_log(args.input)
# Analyze for anomalies
print("Analyzing for suspicious behavior...")
anomalies = detect_anomalies(logs)
# Print top suspicious users
print("\nTop potentially compromised accounts:")
for i, anomaly in enumerate(anomalies[:5]):
print(f"{i+1}. Username: {anomaly['username']}")
print(f" Anomaly Score: {anomaly['anomaly_score']}")
print(f" Unique IPs: {anomaly['unique_ips']}")
print(f" Failed/Successful Logins: {anomaly['failed_attempts']}/{anomaly['successful_attempts']}")
print(f" Rapid Location Changes: {anomaly['rapid_location_changes']}")
print(f" After-Hours Logins: {anomaly['after_hours_logins']}")
print()
# Identify the most likely compromised user
if anomalies:
compromised_user = anomalies[0]['username']
print(f"RESULT: The most likely compromised account is: {compromised_user}")
# Create database for SQL analysis
db_path = args.db if args.db else ':memory:'
conn = create_database(logs, db_path)
# Get detailed analysis for the compromised user
detailed_analysis = analyze_compromised_user(conn, compromised_user)
if args.verbose:
print("\nDetailed analysis for the compromised account:")
print(f"Login history for {compromised_user}:")
for entry in detailed_analysis['login_history']:
status = "SUCCESS" if entry['success'] else "FAILED"
print(f"{entry['login_time']} | {entry['ip_address']} | {status}")
if detailed_analysis['suspicious_events']:
print("\nSuspicious events:")
for event in detailed_analysis['suspicious_events']:
print(f"IP changed from {event['previous_ip']} to {event['new_ip']} "
f"in {event['minutes_between']:.1f} minutes")
else:
print("No anomalies detected in the log data")
# Save results to output file
if args.output:
with open(args.output, 'w') as f:
json.dump({
'summary': {
'total_logs': len(logs),
'total_users': len({log['username'] for log in logs}),
'compromised_user': compromised_user if anomalies else None
},
'anomalies': anomalies,
'detailed_analysis': detailed_analysis if anomalies else None
}, f, indent=4, default=str)
print(f"Analysis results saved to {args.output}")
# Save SQL script
if args.sql:
with open(args.sql, 'w') as f:
f.write(generate_sql_script())
print(f"SQL script saved to {args.sql}")
# Report if database was saved
if args.db:
print(f"SQLite database saved to {args.db}")
if __name__ == "__main__":
main()