Binary Parser
import struct
import socket
import datetime
from collections import defaultdict, Counter
def parse_binary_logs(file_path):
"""Parse binary log file according to the specified format."""
login_attempts = []
with open(file_path, 'rb') as f:
data = f.read()
offset = 0
while offset < len(data):
# Read username length (4-byte integer, big-endian)
username_length = struct.unpack('>I', data[offset:offset+4])[0]
offset += 4
# Read username (variable length string)
username = data[offset:offset+username_length].decode('utf-8')
offset += username_length
# Read IPv4 address (4 bytes)
ip_bytes = data[offset:offset+4]
ip_address = socket.inet_ntoa(ip_bytes)
offset += 4
# Read timestamp (4-byte Unix timestamp)
timestamp = struct.unpack('>I', data[offset:offset+4])[0]
datetime_obj = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
offset += 4
# Read success flag (1-byte boolean)
success = bool(data[offset])
offset += 1
# Store the parsed login attempt
login_attempts.append({
'username': username,
'ip_address': ip_address,
'timestamp': timestamp,
'datetime': datetime_obj,
'success': success
})
return login_attempts
def analyze_logs(login_attempts):
"""Basic analysis of the log data to extract key metrics."""
# Get the earliest timestamp (start date of the log)
earliest_timestamp = min(login_attempts, key=lambda x: x['timestamp'])['timestamp']
start_date_utc = datetime.datetime.fromtimestamp(earliest_timestamp, tz=datetime.timezone.utc)
# Count unique usernames
unique_usernames = set(attempt['username'] for attempt in login_attempts)
# Count unique IP addresses
unique_ips = set(attempt['ip_address'] for attempt in login_attempts)
# Count total login attempts
total_attempts = len(login_attempts)
return {
'start_date_utc': start_date_utc,
'total_attempts': total_attempts,
'unique_usernames': len(unique_usernames),
'unique_ips': len(unique_ips),
'usernames': unique_usernames,
'ip_addresses': unique_ips
}
def analyze_login_patterns(login_attempts):
"""Analyze login patterns to identify potentially compromised users."""
# Track login data per user
user_data = defaultdict(lambda: {
'ips': set(),
'successful_logins': 0,
'failed_logins': 0,
'login_times': [],
'success_after_failure': 0,
'unique_ips_count': 0
})
# Group login attempts by user
user_attempts = defaultdict(list)
for attempt in login_attempts:
user = attempt['username']
user_attempts[user].append(attempt)
# Track IPs used
user_data[user]['ips'].add(attempt['ip_address'])
# Track login success/failure
if attempt['success']:
user_data[user]['successful_logins'] += 1
else:
user_data[user]['failed_logins'] += 1
# Track login times
user_data[user]['login_times'].append(attempt['timestamp'])
# Calculate additional metrics
for user, attempts in user_attempts.items():
# Sort attempts by timestamp
sorted_attempts = sorted(attempts, key=lambda x: x['timestamp'])
# Check for successful login after failures
for i in range(1, len(sorted_attempts)):
if (not sorted_attempts[i-1]['success'] and
sorted_attempts[i]['success'] and
sorted_attempts[i]['ip_address'] != sorted_attempts[i-1]['ip_address']):
user_data[user]['success_after_failure'] += 1
# Count unique IPs
user_data[user]['unique_ips_count'] = len(user_data[user]['ips'])
# Identify suspicious users based on multiple criteria
suspicious_users = []
for user, data in user_data.items():
suspicion_score = 0
# Multiple IPs used (especially if significantly more than other users)
if data['unique_ips_count'] > 3:
suspicion_score += data['unique_ips_count']
# High number of failed logins followed by successful ones from different IPs
if data['success_after_failure'] > 0:
suspicion_score += data['success_after_failure'] * 10
# High ratio of failed to successful logins
if data['successful_logins'] > 0:
failure_ratio = data['failed_logins'] / data['successful_logins']
if failure_ratio > 3:
suspicion_score += failure_ratio
if suspicion_score > 10:
suspicious_users.append((user, suspicion_score, data))
# Sort by suspicion score
suspicious_users.sort(key=lambda x: x[1], reverse=True)
return suspicious_users, user_data
def main():
file_path = input("Enter the path to the binary log file: ")
try:
login_attempts = parse_binary_logs(file_path)
print(f"Successfully parsed {len(login_attempts)} login attempts.")
# Basic log analysis for the requested metrics
log_metrics = analyze_logs(login_attempts)
print("\n===== LOG METRICS =====")
print(f"Start date of the log (UTC): {log_metrics['start_date_utc'].strftime('%Y-%m-%d %H:%M:%S UTC')}")
print(f"Total login attempts recorded: {log_metrics['total_attempts']}")
print(f"Number of unique usernames: {log_metrics['unique_usernames']}")
print(f"Number of unique IP addresses: {log_metrics['unique_ips']}")
# Advanced pattern analysis for suspicious activity
suspicious_users, user_data = analyze_login_patterns(login_attempts)
print("\n===== SUSPICIOUS ACTIVITY ANALYSIS =====")
print(f"Total users analyzed: {len(user_data)}")
if suspicious_users:
print("\nPotentially compromised users (sorted by suspicion score):")
for user, score, data in suspicious_users:
print(f"\nUsername: {user} (Suspicion Score: {score:.2f})")
print(f" Unique IPs: {data['unique_ips_count']}")
print(f" Successful logins: {data['successful_logins']}")
print(f" Failed logins: {data['failed_logins']}")
print(f" Successful logins after failures from different IPs: {data['success_after_failure']}")
print(f" IP addresses used: {', '.join(data['ips'])}")
else:
print("\nNo suspicious users identified.")
except Exception as e:
print(f"Error processing the file: {e}")
if __name__ == "__main__":
main()