Week Overview
This week focuses on file operations and data processing—critical skills for security automation. You'll learn to:
- Read and write files efficiently (wordlists, configs, reports)
- Parse structured data formats (CSV, JSON, XML)
- Use regular expressions for pattern matching and extraction
- Generate professional security reports
- Process large datasets with best practices
Section 1: File Operations
Reading Files
Python provides multiple ways to read files. The modern approach uses context managers (with statements) to ensure files are properly closed:
#!/usr/bin/env python3
"""
File reading examples for security work
"""
# Method 1: Read entire file (small files only)
def read_wordlist(filepath: str) -> list[str]:
"""
Read a wordlist file and return as list of strings.
Args:
filepath: Path to wordlist file
Returns:
List of words (stripped of whitespace)
"""
with open(filepath, 'r') as f:
# Read all lines, strip whitespace, filter empty lines
words = [line.strip() for line in f if line.strip()]
return words
# Usage
passwords = read_wordlist('/usr/share/wordlists/rockyou.txt')
print(f"Loaded {len(passwords)} passwords")
# Method 2: Read line by line (large files)
def count_failed_logins(log_file: str) -> int:
"""
Count failed login attempts in log file.
Memory-efficient for large files.
"""
count = 0
with open(log_file, 'r') as f:
for line in f: # Reads one line at a time
if "Failed password" in line:
count += 1
return count
# Method 3: Read in chunks (binary files)
def calculate_file_hash(filepath: str) -> str:
"""
Calculate SHA256 hash of file (works for any size).
"""
import hashlib
sha256 = hashlib.sha256()
with open(filepath, 'rb') as f: # 'rb' = read binary
# Read in 64KB chunks
while chunk := f.read(65536):
sha256.update(chunk)
return sha256.hexdigest()
# Error handling for file operations
def safe_read_config(config_file: str) -> str | None:
"""
Safely read config file with error handling.
"""
try:
with open(config_file, 'r') as f:
return f.read()
except FileNotFoundError:
print(f"❌ Config file not found: {config_file}")
return None
except PermissionError:
print(f"❌ Permission denied: {config_file}")
return None
except Exception as e:
print(f"❌ Error reading {config_file}: {e}")
return None
Writing Files
Writing files follows similar patterns. Always use context managers to ensure data is flushed to disk:
#!/usr/bin/env python3
"""
File writing examples for security reports and data export
"""
# Method 1: Write text file
def save_scan_results(results: dict, output_file: str) -> None:
"""
Save scan results to text file.
Args:
results: Dictionary of scan results
output_file: Path to output file
"""
with open(output_file, 'w') as f:
f.write("=== Network Scan Results ===\n\n")
for host, ports in results.items():
f.write(f"Host: {host}\n")
f.write(f"Open Ports: {', '.join(map(str, ports))}\n")
f.write("-" * 40 + "\n")
print(f"✅ Results saved to {output_file}")
# Method 2: Append to log file
def log_scan_activity(message: str, log_file: str = "scan.log") -> None:
"""
Append scan activity to log file.
"""
from datetime import datetime
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}\n"
with open(log_file, 'a') as f: # 'a' = append mode
f.write(log_entry)
# Method 3: Write binary data
def save_network_capture(packets: bytes, pcap_file: str) -> None:
"""
Save raw packet data to file.
"""
with open(pcap_file, 'wb') as f: # 'wb' = write binary
f.write(packets)
print(f"✅ Captured {len(packets)} bytes to {pcap_file}")
# Example: Generate HTML report
def generate_html_report(scan_data: dict, output_file: str) -> None:
"""
Generate professional HTML security report.
"""
html = f"""
Security Scan Report
Network Scan Report
Generated: {scan_data.get('timestamp', 'N/A')}
"""
for host, info in scan_data.get('hosts', {}).items():
html += f"""
{host}
Open Ports: {', '.join(map(str, info['ports']))}
"""
html += """
"""
with open(output_file, 'w') as f:
f.write(html)
print(f"✅ HTML report saved to {output_file}")
Working with Paths (pathlib)
The pathlib module provides object-oriented file path handling, superior to string concatenation:
#!/usr/bin/env python3
"""
Modern path handling with pathlib
"""
from pathlib import Path
# Create path objects
wordlists_dir = Path("/usr/share/wordlists")
rockyou = wordlists_dir / "rockyou.txt" # Clean path joining
# Check existence
if rockyou.exists():
print(f"✅ Found: {rockyou}")
print(f"Size: {rockyou.stat().st_size / 1024 / 1024:.2f} MB")
# Iterate directory
for wordlist in wordlists_dir.glob("*.txt"):
print(f"Wordlist: {wordlist.name}")
# Create directories
reports_dir = Path("./scan_reports")
reports_dir.mkdir(exist_ok=True) # Create if doesn't exist
# Generate unique output filename
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = reports_dir / f"scan_{timestamp}.html"
# Read/write with Path objects
config = Path("config.txt")
if config.exists():
content = config.read_text() # Shorthand for open/read/close
# Write with Path
report_file.write_text("<h1>Report</h1>")
# Check file properties
if report_file.is_file():
print(f"File size: {report_file.stat().st_size} bytes")
print(f"Modified: {datetime.fromtimestamp(report_file.stat().st_mtime)}")
Section 2: Parsing Structured Data
JSON - JavaScript Object Notation
JSON is the most common format for APIs, configuration files, and structured security data:
#!/usr/bin/env python3
"""
Working with JSON data in security contexts
"""
import json
from pathlib import Path
# Example: Parse CVE database
cve_data = '''
{
"CVE-2021-44228": {
"description": "Log4Shell - Remote Code Execution in Apache Log4j",
"severity": "CRITICAL",
"cvss_score": 10.0,
"affected_versions": ["2.0-beta9", "2.15.0"],
"published": "2021-12-10"
},
"CVE-2014-0160": {
"description": "Heartbleed - OpenSSL TLS Heartbeat Extension Information Disclosure",
"severity": "HIGH",
"cvss_score": 7.5,
"affected_versions": ["1.0.1", "1.0.1f"],
"published": "2014-04-07"
}
}
'''
# Parse JSON string to Python dictionary
cve_db = json.loads(cve_data)
# Access data
for cve_id, details in cve_db.items():
if details['severity'] == 'CRITICAL':
print(f"🚨 {cve_id}: {details['description']}")
print(f" CVSS: {details['cvss_score']}")
# Read JSON from file
def load_cve_database(filepath: str) -> dict:
"""
Load CVE database from JSON file.
"""
with open(filepath, 'r') as f:
return json.load(f)
# Write JSON to file
def save_scan_results_json(results: dict, output_file: str) -> None:
"""
Save scan results as JSON with pretty formatting.
"""
with open(output_file, 'w') as f:
json.dump(results, f, indent=2)
print(f"✅ Results saved to {output_file}")
# Example: Parse nmap JSON output
def parse_nmap_json(nmap_file: str) -> dict:
"""
Parse nmap JSON output (-oJ flag).
Returns dictionary of hosts and open ports.
"""
with open(nmap_file, 'r') as f:
nmap_data = json.load(f)
results = {}
for host in nmap_data.get('nmaprun', {}).get('host', []):
ip = host.get('address', {}).get('addr', 'Unknown')
ports = []
for port in host.get('ports', {}).get('port', []):
if port.get('state', {}).get('state') == 'open':
ports.append({
'port': port.get('portid'),
'service': port.get('service', {}).get('name', 'unknown')
})
results[ip] = ports
return results
# Practical example: Vulnerability scanner with JSON config
class VulnScanner:
"""
Vulnerability scanner configured via JSON file.
"""
def __init__(self, config_file: str):
"""Load scanner configuration from JSON."""
with open(config_file, 'r') as f:
self.config = json.load(f)
self.targets = self.config.get('targets', [])
self.ports = self.config.get('ports', [80, 443])
self.timeout = self.config.get('timeout', 2)
def scan(self) -> dict:
"""Run scan based on config."""
results = {
'timestamp': str(datetime.now()),
'config': self.config,
'hosts': {}
}
# Scan logic here...
return results
# Example config.json:
# {
# "targets": ["192.168.1.1", "192.168.1.10"],
# "ports": [21, 22, 80, 443, 3389],
# "timeout": 3,
# "output": "scan_results.json"
# }
CSV - Comma-Separated Values
CSV is common for tabular security data (scan results, logs, vulnerability exports):
#!/usr/bin/env python3
"""
Working with CSV data in security contexts
"""
import csv
from pathlib import Path
# Read CSV file
def parse_vulnerability_report(csv_file: str) -> list[dict]:
"""
Parse vulnerability scan CSV export.
Expected format:
Host,Port,Service,Vulnerability,Severity,CVSS
"""
vulnerabilities = []
with open(csv_file, 'r') as f:
reader = csv.DictReader(f) # Returns dict per row
for row in reader:
# Filter critical/high only
if row['Severity'] in ['Critical', 'High']:
vulnerabilities.append({
'host': row['Host'],
'port': int(row['Port']),
'service': row['Service'],
'vuln': row['Vulnerability'],
'severity': row['Severity'],
'cvss': float(row['CVSS'])
})
return vulnerabilities
# Write CSV file
def export_scan_results_csv(results: list[dict], output_file: str) -> None:
"""
Export scan results to CSV format.
"""
fieldnames = ['Host', 'Port', 'State', 'Service', 'Banner']
with open(output_file, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader() # Write column headers
for result in results:
writer.writerow({
'Host': result['host'],
'Port': result['port'],
'State': result.get('state', 'open'),
'Service': result.get('service', 'unknown'),
'Banner': result.get('banner', '')
})
print(f"✅ Exported {len(results)} results to {output_file}")
# Example: Process firewall logs
def analyze_firewall_logs(log_csv: str) -> dict:
"""
Analyze firewall logs from CSV export.
Returns statistics on blocked connections.
"""
stats = {
'total_blocks': 0,
'top_blocked_ips': {},
'top_blocked_ports': {}
}
with open(log_csv, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
if row['Action'] == 'BLOCK':
stats['total_blocks'] += 1
# Count blocked IPs
src_ip = row['Source_IP']
stats['top_blocked_ips'][src_ip] = \
stats['top_blocked_ips'].get(src_ip, 0) + 1
# Count blocked ports
dst_port = row['Dest_Port']
stats['top_blocked_ports'][dst_port] = \
stats['top_blocked_ports'].get(dst_port, 0) + 1
return stats
XML Parsing
Many security tools output XML (nmap, vulnerability scanners). Python's xml.etree.ElementTree handles XML parsing:
#!/usr/bin/env python3
"""
Parsing XML security data (nmap, Nessus, etc.)
"""
import xml.etree.ElementTree as ET
def parse_nmap_xml(xml_file: str) -> dict:
"""
Parse nmap XML output (-oX flag).
Returns:
Dictionary of hosts with open ports and services
"""
tree = ET.parse(xml_file)
root = tree.getroot()
results = {}
# Iterate through each host
for host in root.findall('host'):
# Get IP address
address = host.find('address').get('addr')
# Get hostname if available
hostnames = host.find('hostnames')
hostname = None
if hostnames is not None:
hostname_elem = hostnames.find('hostname')
if hostname_elem is not None:
hostname = hostname_elem.get('name')
# Get open ports
ports_data = []
ports = host.find('ports')
if ports is not None:
for port in ports.findall('port'):
state = port.find('state').get('state')
if state == 'open':
port_id = port.get('portid')
protocol = port.get('protocol')
service = port.find('service')
service_name = service.get('name', 'unknown') if service is not None else 'unknown'
ports_data.append({
'port': port_id,
'protocol': protocol,
'service': service_name
})
results[address] = {
'hostname': hostname,
'ports': ports_data
}
return results
# Usage
nmap_results = parse_nmap_xml('scan_results.xml')
for ip, data in nmap_results.items():
print(f"\n🎯 {ip} ({data['hostname'] or 'N/A'})")
for port_info in data['ports']:
print(f" {port_info['port']}/{port_info['protocol']} - {port_info['service']}")
Section 3: Regular Expressions (Regex)
Regular expressions are essential for extracting structured data from unstructured text (logs, banners, responses):
Regex Basics for Security
#!/usr/bin/env python3
"""
Regular expressions for security data extraction
"""
import re
# Example 1: Extract IP addresses
def extract_ips(text: str) -> list[str]:
"""
Extract all IPv4 addresses from text.
"""
# Regex pattern for IPv4 (simplified)
ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
ips = re.findall(ip_pattern, text)
return ips
log_line = "Failed login from 192.168.1.100 to 10.0.0.5"
print(extract_ips(log_line)) # ['192.168.1.100', '10.0.0.5']
# Example 2: Extract email addresses
def extract_emails(text: str) -> list[str]:
"""
Extract email addresses (for OSINT, phishing analysis).
"""
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = re.findall(email_pattern, text)
return emails
# Example 3: Parse SSH failed login attempts
def parse_ssh_failures(log_file: str) -> dict:
"""
Parse /var/log/auth.log for SSH failures.
Returns dict of {ip: failure_count}.
"""
# Sample log line:
# Dec 10 15:30:45 server sshd[12345]: Failed password for invalid user admin from 203.0.113.1 port 54321 ssh2
pattern = r'Failed password for .* from (\d+\.\d+\.\d+\.\d+) port (\d+)'
failures = {}
with open(log_file, 'r') as f:
for line in f:
match = re.search(pattern, line)
if match:
ip = match.group(1) # First captured group (IP)
port = match.group(2) # Second captured group (port)
failures[ip] = failures.get(ip, 0) + 1
return failures
# Example 4: Extract URLs from HTML/text
def extract_urls(text: str) -> list[str]:
"""
Extract all URLs (useful for web scraping, phishing analysis).
"""
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
urls = re.findall(url_pattern, text)
return urls
# Example 5: Extract hashes from text
def extract_hashes(text: str) -> dict:
"""
Extract MD5, SHA1, SHA256 hashes from text.
Returns categorized dict.
"""
hashes = {
'md5': [],
'sha1': [],
'sha256': []
}
# MD5: 32 hex chars
md5_pattern = r'\b[a-fA-F0-9]{32}\b'
hashes['md5'] = re.findall(md5_pattern, text)
# SHA1: 40 hex chars
sha1_pattern = r'\b[a-fA-F0-9]{40}\b'
hashes['sha1'] = re.findall(sha1_pattern, text)
# SHA256: 64 hex chars
sha256_pattern = r'\b[a-fA-F0-9]{64}\b'
hashes['sha256'] = re.findall(sha256_pattern, text)
return hashes
# Example 6: Validate and extract CVE IDs
def extract_cves(text: str) -> list[str]:
"""
Extract CVE identifiers (CVE-YYYY-NNNNN).
"""
cve_pattern = r'CVE-\d{4}-\d{4,7}'
cves = re.findall(cve_pattern, text, re.IGNORECASE)
return cves
vulnerability_report = """
This system is vulnerable to CVE-2021-44228 (Log4Shell) and CVE-2014-0160 (Heartbleed).
Immediate patching required.
"""
print(extract_cves(vulnerability_report))
# Output: ['CVE-2021-44228', 'CVE-2014-0160']
# Example 7: Parse HTTP headers
def parse_http_response(response: str) -> dict:
"""
Parse HTTP response headers using regex.
"""
headers = {}
# Split into lines
lines = response.split('\n')
# First line is status
status_match = re.match(r'HTTP/[\d.]+ (\d+) (.+)', lines[0])
if status_match:
headers['status_code'] = int(status_match.group(1))
headers['status_text'] = status_match.group(2)
# Parse headers (Header: Value)
for line in lines[1:]:
match = re.match(r'([^:]+):\s*(.+)', line)
if match:
header_name = match.group(1)
header_value = match.group(2)
headers[header_name] = header_value
return headers
Advanced Regex Patterns
#!/usr/bin/env python3
"""
Advanced regex patterns for security analysis
"""
import re
# Compiled patterns (faster for repeated use)
IP_PATTERN = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
EMAIL_PATTERN = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
HASH_MD5 = re.compile(r'\b[a-fA-F0-9]{32}\b')
def extract_indicators_of_compromise(text: str) -> dict:
"""
Extract multiple IOCs from text using compiled patterns.
"""
iocs = {
'ips': IP_PATTERN.findall(text),
'emails': EMAIL_PATTERN.findall(text),
'md5_hashes': HASH_MD5.findall(text)
}
return iocs
# Multi-line regex for complex log parsing
def parse_multiline_error(log_text: str) -> list[dict]:
"""
Parse Java stack traces or multi-line errors.
"""
# Match patterns across multiple lines
pattern = re.compile(
r'Exception: (.+?)\n.*?at (.+?)\((.+?):(\d+)\)',
re.DOTALL # . matches newlines
)
errors = []
for match in pattern.finditer(log_text):
errors.append({
'exception': match.group(1),
'method': match.group(2),
'file': match.group(3),
'line': int(match.group(4))
})
return errors
# Regex substitution for sanitization
def sanitize_log(log_text: str) -> str:
"""
Remove sensitive data (IPs, emails) from logs before sharing.
"""
# Replace IPs with [REDACTED_IP]
sanitized = IP_PATTERN.sub('[REDACTED_IP]', log_text)
# Replace emails with [REDACTED_EMAIL]
sanitized = EMAIL_PATTERN.sub('[REDACTED_EMAIL]', sanitized)
return sanitized
Section 4: Practical Applications
Wordlist Generation
Building custom wordlists for password cracking or fuzzing:
#!/usr/bin/env python3
"""
Custom wordlist generator for security testing
"""
from itertools import product, permutations
from pathlib import Path
class WordlistGenerator:
"""
Generate custom wordlists for password attacks or fuzzing.
"""
def __init__(self, output_file: str):
"""Initialize generator with output file."""
self.output_file = Path(output_file)
self.words = set() # Use set to avoid duplicates
def add_base_words(self, base_words: list[str]) -> None:
"""Add base words to wordlist."""
self.words.update(base_words)
def add_variations(self, word: str) -> None:
"""
Add common variations of a word.
Example: "password" -> "Password", "PASSWORD", "p@ssword", etc.
"""
variations = [
word,
word.lower(),
word.upper(),
word.capitalize(),
word.replace('a', '@'),
word.replace('e', '3'),
word.replace('i', '1'),
word.replace('o', '0'),
word.replace('s', '$'),
]
self.words.update(variations)
def add_year_suffix(self, base_word: str, start_year: int = 2020, end_year: int = 2026) -> None:
"""
Add year suffixes to word.
Example: "password2024", "password2025"
"""
for year in range(start_year, end_year + 1):
self.words.add(f"{base_word}{year}")
def add_number_suffix(self, base_word: str, max_num: int = 999) -> None:
"""
Add number suffixes (0-max_num).
"""
for num in range(max_num + 1):
self.words.add(f"{base_word}{num}")
def add_common_patterns(self, base_word: str) -> None:
"""
Add common password patterns.
"""
patterns = [
f"{base_word}!",
f"{base_word}123",
f"{base_word}@123",
f"1{base_word}",
f"{base_word}{base_word}",
]
self.words.update(patterns)
def save(self) -> None:
"""Write wordlist to file."""
with open(self.output_file, 'w') as f:
for word in sorted(self.words):
f.write(word + '\n')
print(f"✅ Generated {len(self.words)} words in {self.output_file}")
# Usage example
generator = WordlistGenerator("custom_passwords.txt")
# Add base words (company name, common passwords)
generator.add_base_words(["company", "admin", "password"])
# Generate variations
for word in ["company", "admin", "password"]:
generator.add_variations(word)
generator.add_year_suffix(word)
generator.add_common_patterns(word)
generator.save()
CVE Database Parser
Parse and query CVE databases for vulnerability research:
#!/usr/bin/env python3
"""
CVE database parser and search tool
"""
import json
from pathlib import Path
from typing import Optional
class CVEDatabase:
"""
Parse and query CVE vulnerability database.
"""
def __init__(self, json_file: str):
"""Load CVE database from JSON."""
with open(json_file, 'r') as f:
self.cves = json.load(f)
print(f"✅ Loaded {len(self.cves)} CVE entries")
def search_by_severity(self, severity: str) -> list[dict]:
"""
Find all CVEs with given severity.
Args:
severity: CRITICAL, HIGH, MEDIUM, or LOW
Returns:
List of matching CVE entries
"""
matches = []
for cve_id, details in self.cves.items():
if details.get('severity', '').upper() == severity.upper():
matches.append({
'id': cve_id,
**details
})
return matches
def search_by_keyword(self, keyword: str) -> list[dict]:
"""
Search CVE descriptions for keyword.
"""
matches = []
for cve_id, details in self.cves.items():
description = details.get('description', '').lower()
if keyword.lower() in description:
matches.append({
'id': cve_id,
**details
})
return matches
def search_by_cvss_range(self, min_score: float, max_score: float = 10.0) -> list[dict]:
"""
Find CVEs within CVSS score range.
"""
matches = []
for cve_id, details in self.cves.items():
cvss = details.get('cvss_score', 0)
if min_score <= cvss <= max_score:
matches.append({
'id': cve_id,
'cvss': cvss,
**details
})
# Sort by CVSS (highest first)
matches.sort(key=lambda x: x['cvss'], reverse=True)
return matches
def get_critical_vulnerabilities(self) -> list[dict]:
"""Get all CRITICAL severity CVEs."""
return self.search_by_severity('CRITICAL')
def export_report(self, cves: list[dict], output_file: str) -> None:
"""
Export CVE list to HTML report.
"""
html = """
CVE Report
CVE Vulnerability Report
Total CVEs: """ + str(len(cves)) + """
"""
for cve in cves:
severity_class = cve.get('severity', 'medium').lower()
html += f"""
{cve['id']}
Severity: {cve.get('severity', 'N/A')}
(CVSS: {cve.get('cvss_score', 'N/A')})
Description: {cve.get('description', 'N/A')}
Published: {cve.get('published', 'N/A')}
"""
html += """
"""
Path(output_file).write_text(html)
print(f"✅ Report exported to {output_file}")
# Usage
db = CVEDatabase('cve_database.json')
# Find all critical vulnerabilities
critical = db.get_critical_vulnerabilities()
print(f"Found {len(critical)} critical CVEs")
# Search for specific vulnerability type
rce_vulns = db.search_by_keyword('remote code execution')
print(f"Found {len(rce_vulns)} RCE vulnerabilities")
# Export report
db.export_report(critical, 'critical_cves.html')
Log File Aggregator
Combine and analyze logs from multiple sources:
#!/usr/bin/env python3
"""
Log file aggregator and pattern detector
"""
import re
from pathlib import Path
from datetime import datetime
from collections import Counter
class LogAggregator:
"""
Aggregate and analyze logs from multiple files.
"""
def __init__(self):
"""Initialize aggregator."""
self.logs = []
self.stats = {
'total_lines': 0,
'error_count': 0,
'warning_count': 0,
'ip_addresses': Counter(),
'failed_logins': Counter()
}
def add_log_file(self, log_file: str) -> None:
"""
Add log file to aggregator.
"""
log_path = Path(log_file)
if not log_path.exists():
print(f"❌ File not found: {log_file}")
return
print(f"📄 Processing {log_file}...")
with open(log_path, 'r') as f:
for line in f:
self.logs.append({
'source': log_file,
'content': line.strip(),
'timestamp': self._extract_timestamp(line)
})
self.stats['total_lines'] += 1
# Count errors/warnings
if 'ERROR' in line.upper():
self.stats['error_count'] += 1
if 'WARNING' in line.upper():
self.stats['warning_count'] += 1
# Extract IPs
ips = re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', line)
for ip in ips:
self.stats['ip_addresses'][ip] += 1
# Detect failed logins
if re.search(r'failed|failure|invalid', line, re.IGNORECASE):
ips = re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', line)
for ip in ips:
self.stats['failed_logins'][ip] += 1
def _extract_timestamp(self, line: str) -> str | None:
"""
Extract timestamp from log line (if present).
"""
# Common timestamp patterns
patterns = [
r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', # 2024-01-15 14:30:45
r'\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}', # Jan 15 14:30:45
]
for pattern in patterns:
match = re.search(pattern, line)
if match:
return match.group(0)
return None
def get_top_ips(self, n: int = 10) -> list[tuple]:
"""
Get top N most frequent IP addresses.
"""
return self.stats['ip_addresses'].most_common(n)
def get_suspicious_ips(self, threshold: int = 5) -> list[str]:
"""
Get IPs with more than threshold failed login attempts.
"""
suspicious = []
for ip, count in self.stats['failed_logins'].items():
if count >= threshold:
suspicious.append((ip, count))
# Sort by count (descending)
suspicious.sort(key=lambda x: x[1], reverse=True)
return suspicious
def export_summary(self, output_file: str) -> None:
"""
Export summary report.
"""
report = f"""
=== LOG ANALYSIS SUMMARY ===
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Total Log Lines: {self.stats['total_lines']}
Errors: {self.stats['error_count']}
Warnings: {self.stats['warning_count']}
=== TOP 10 IP ADDRESSES ===
"""
for ip, count in self.get_top_ips(10):
report += f"{ip}: {count} occurrences\n"
report += "\n=== SUSPICIOUS IPs (Failed Logins) ===\n"
suspicious = self.get_suspicious_ips(5)
if suspicious:
for ip, count in suspicious:
report += f"⚠️ {ip}: {count} failed attempts\n"
else:
report += "No suspicious activity detected.\n"
Path(output_file).write_text(report)
print(f"\n✅ Summary exported to {output_file}")
# Usage
aggregator = LogAggregator()
# Add multiple log files
aggregator.add_log_file('/var/log/auth.log')
aggregator.add_log_file('/var/log/apache2/access.log')
aggregator.add_log_file('/var/log/syslog')
# Get suspicious IPs
suspicious = aggregator.get_suspicious_ips(threshold=10)
for ip, count in suspicious:
print(f"🚨 {ip}: {count} failed login attempts")
# Export summary
aggregator.export_summary('log_analysis.txt')
Lab 4: File I/O & Data Processing
Part 1: Custom Wordlist Generator (35 minutes)
Objective: Build a wordlist generator for password testing with multiple variation strategies.
Requirements:
- Create
wordlist_gen.pywith the following features:- Accept base words from command line or file
- Generate variations (case, l33t speak, years, numbers)
- Support multiple output formats (txt, sorted by length, etc.)
- Display statistics (total words, file size)
- Test with company names, common passwords, personal info
- Add custom patterns (special chars, prefixes/suffixes)
Success Criteria:
- Generate at least 1,000 unique password variations from 3 base words
- Output sorted wordlist file
- Print statistics (unique count, duplicates removed, file size)
Hint 1: Program Structure
#!/usr/bin/env python3
import argparse
from pathlib import Path
class WordlistGenerator:
def __init__(self):
self.words = set() # Automatic deduplication
def add_variations(self, word):
# Add case variations
# Add l33t speak (a->@, e->3, etc.)
# Add year/number suffixes
pass
def save(self, output_file):
# Write sorted wordlist
pass
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--base-words', required=True)
parser.add_argument('--output', default='wordlist.txt')
args = parser.parse_args()
# Load base words, generate variations, save
if __name__ == '__main__':
main()
Hint 2: L33t Speak Conversion
def leet_speak(word):
"""Convert word to l33t speak variations."""
substitutions = {
'a': '@', 'e': '3', 'i': '1',
'o': '0', 's': '$', 't': '7'
}
variations = [word]
for char, replacement in substitutions.items():
new_variation = word.replace(char, replacement)
variations.append(new_variation)
return variations
Part 2: CVE Database Parser & Search Tool (40 minutes)
Objective: Build a tool to parse CVE JSON database and search for vulnerabilities.
Requirements:
- Download sample CVE data (create JSON with 10+ CVEs):
- Include fields: CVE ID, description, severity, CVSS score, published date
- Create
cve_search.pywith:- Load CVE database from JSON
- Search by severity (CRITICAL, HIGH, MEDIUM, LOW)
- Search by keyword in description
- Filter by CVSS score range
- Export results to HTML report
- Add command-line interface with argparse
Success Criteria:
- Successfully parse JSON database with 10+ CVEs
- Implement at least 3 search methods
- Generate HTML report with color-coded severity
- Handle missing fields gracefully
Hint: Sample CVE JSON Structure
{
"CVE-2021-44228": {
"description": "Apache Log4j2 RCE (Log4Shell)",
"severity": "CRITICAL",
"cvss_score": 10.0,
"published": "2021-12-10"
},
"CVE-2014-0160": {
"description": "OpenSSL Heartbleed",
"severity": "HIGH",
"cvss_score": 7.5,
"published": "2014-04-07"
}
}
Part 3: Security Scan Report Generator (35 minutes)
Objective: Generate professional HTML/Markdown reports from scan results.
Requirements:
- Create
report_generator.pythat:- Reads scan results from JSON file
- Generates professional HTML report with CSS styling
- Includes summary statistics (total hosts, open ports, vulnerabilities)
- Color-codes severity levels
- Optionally generates Markdown version
- Use sample scan data (you can create fictional data)
- Add timestamp and metadata to report
Success Criteria:
- Generate HTML report with proper structure and styling
- Display at least 3 hosts with scan results
- Include summary section with statistics
- Report opens correctly in web browser
Hint: HTML Template Structure
def generate_html_report(scan_data, output_file):
"""Generate HTML report from scan data."""
html = f"""<!DOCTYPE html>
<html>
<head>
<title>Security Scan Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; }}
.critical {{ background-color: #ffebee; border-left: 4px solid #f44336; }}
.high {{ background-color: #fff3e0; border-left: 4px solid #ff9800; }}
.host {{ padding: 20px; margin: 15px 0; border-radius: 8px; }}
</style>
</head>
<body>
<h1>Security Scan Report</h1>
<p>Generated: {datetime.now()}</p>
<h2>Summary</h2>
<p>Total Hosts Scanned: {len(scan_data)}</p>
"""
# Add host details...
html += "</body></html>"
with open(output_file, 'w') as f:
f.write(html)
Part 4: Log File Aggregator & Pattern Detector (30 minutes)
Objective: Build a tool to aggregate multiple log files and detect security patterns.
Requirements:
- Create
log_aggregator.pythat:- Accepts multiple log file paths as arguments
- Extracts IP addresses from all logs
- Detects failed login attempts (use regex)
- Identifies IPs with >5 failed attempts
- Counts errors, warnings, critical events
- Exports summary report
- Test with sample log files (create your own or use system logs)
- Display top 10 most active IPs
Success Criteria:
- Process at least 2 different log files
- Successfully extract IP addresses using regex
- Identify suspicious IPs (multiple failed logins)
- Generate summary report with statistics
Hint: Sample Log Lines
# Create test log file: test.log
Jan 15 10:30:45 server sshd[1234]: Failed password for admin from 192.168.1.100 port 22 ssh2
Jan 15 10:30:50 server sshd[1235]: Failed password for root from 192.168.1.100 port 22 ssh2
Jan 15 10:31:00 server sshd[1236]: Accepted password for user from 10.0.0.5 port 22 ssh2
Jan 15 10:32:15 server kernel: ERROR: Disk read failure
📤 Deliverables:
wordlist_gen.py- Custom wordlist generatorcve_search.py- CVE database search toolreport_generator.py- HTML report generatorlog_aggregator.py- Log analysis tool- Sample output files (wordlist, reports, summaries)
Additional Resources
Python Documentation
- pathlib - Object-oriented filesystem paths
- json - JSON encoder/decoder
- csv - CSV file reading/writing
- re - Regular expressions
- xml.etree.ElementTree - XML parsing
Regex Resources
Security Data Sources
- NVD CVE Data Feeds (JSON)
- SecLists - Wordlists for security testing
- Exploit Database - Vulnerability data
Practice Challenges
- Parse nmap XML/JSON output and generate reports
- Build a password strength analyzer using regex
- Create a log correlation tool across multiple formats
- Extract IOCs (IPs, domains, hashes) from malware reports
Key Takeaways
- ✅ Use context managers (
with) for all file operations - ✅
pathlibprovides modern, clean path handling - ✅ JSON is the standard for structured security data
- ✅ CSV works well for tabular scan results and exports
- ✅ Regular expressions are essential for log parsing and data extraction
- ✅ Always validate and sanitize file inputs
- ✅ Generate professional reports to communicate findings
- ✅ Process large files line-by-line to avoid memory issues
Week 04 Quiz
Test your understanding of file I/O and data processing for security applications.
Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.
Take Quiz