Skip to content
CSY105 Week 04 Beginner

Week Content

Programming for Security

Track your progress through this week's content

Week Overview

This week focuses on file operations and data processing—critical skills for security automation. You'll learn to:

  • Read and write files efficiently (wordlists, configs, reports)
  • Parse structured data formats (CSV, JSON, XML)
  • Use regular expressions for pattern matching and extraction
  • Generate professional security reports
  • Process large datasets with best practices
Real-World Context: Security professionals spend significant time processing logs, parsing scan results, managing wordlists, and generating reports. Python's file I/O and data processing capabilities make these tasks efficient and repeatable.

Section 1: File Operations

Reading Files

Python provides multiple ways to read files. The modern approach uses context managers (with statements) to ensure files are properly closed:

#!/usr/bin/env python3
"""
File reading examples for security work
"""

# Method 1: Read entire file (small files only)
def read_wordlist(filepath: str) -> list[str]:
    """
    Read a wordlist file and return as list of strings.

    Args:
        filepath: Path to wordlist file

    Returns:
        List of words (stripped of whitespace)
    """
    with open(filepath, 'r') as f:
        # Read all lines, strip whitespace, filter empty lines
        words = [line.strip() for line in f if line.strip()]
    return words

# Usage
passwords = read_wordlist('/usr/share/wordlists/rockyou.txt')
print(f"Loaded {len(passwords)} passwords")


# Method 2: Read line by line (large files)
def count_failed_logins(log_file: str) -> int:
    """
    Count failed login attempts in log file.
    Memory-efficient for large files.
    """
    count = 0
    with open(log_file, 'r') as f:
        for line in f:  # Reads one line at a time
            if "Failed password" in line:
                count += 1
    return count


# Method 3: Read in chunks (binary files)
def calculate_file_hash(filepath: str) -> str:
    """
    Calculate SHA256 hash of file (works for any size).
    """
    import hashlib

    sha256 = hashlib.sha256()

    with open(filepath, 'rb') as f:  # 'rb' = read binary
        # Read in 64KB chunks
        while chunk := f.read(65536):
            sha256.update(chunk)

    return sha256.hexdigest()


# Error handling for file operations
def safe_read_config(config_file: str) -> str | None:
    """
    Safely read config file with error handling.
    """
    try:
        with open(config_file, 'r') as f:
            return f.read()
    except FileNotFoundError:
        print(f"❌ Config file not found: {config_file}")
        return None
    except PermissionError:
        print(f"❌ Permission denied: {config_file}")
        return None
    except Exception as e:
        print(f"❌ Error reading {config_file}: {e}")
        return None

Writing Files

Writing files follows similar patterns. Always use context managers to ensure data is flushed to disk:

#!/usr/bin/env python3
"""
File writing examples for security reports and data export
"""

# Method 1: Write text file
def save_scan_results(results: dict, output_file: str) -> None:
    """
    Save scan results to text file.

    Args:
        results: Dictionary of scan results
        output_file: Path to output file
    """
    with open(output_file, 'w') as f:
        f.write("=== Network Scan Results ===\n\n")

        for host, ports in results.items():
            f.write(f"Host: {host}\n")
            f.write(f"Open Ports: {', '.join(map(str, ports))}\n")
            f.write("-" * 40 + "\n")

    print(f"✅ Results saved to {output_file}")


# Method 2: Append to log file
def log_scan_activity(message: str, log_file: str = "scan.log") -> None:
    """
    Append scan activity to log file.
    """
    from datetime import datetime

    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] {message}\n"

    with open(log_file, 'a') as f:  # 'a' = append mode
        f.write(log_entry)


# Method 3: Write binary data
def save_network_capture(packets: bytes, pcap_file: str) -> None:
    """
    Save raw packet data to file.
    """
    with open(pcap_file, 'wb') as f:  # 'wb' = write binary
        f.write(packets)

    print(f"✅ Captured {len(packets)} bytes to {pcap_file}")


# Example: Generate HTML report
def generate_html_report(scan_data: dict, output_file: str) -> None:
    """
    Generate professional HTML security report.
    """
    html = f"""


    Security Scan Report
    


    

Network Scan Report

Generated: {scan_data.get('timestamp', 'N/A')}

""" for host, info in scan_data.get('hosts', {}).items(): html += f"""

{host}

Open Ports: {', '.join(map(str, info['ports']))}

""" html += """ """ with open(output_file, 'w') as f: f.write(html) print(f"✅ HTML report saved to {output_file}")

Working with Paths (pathlib)

The pathlib module provides object-oriented file path handling, superior to string concatenation:

#!/usr/bin/env python3
"""
Modern path handling with pathlib
"""
from pathlib import Path

# Create path objects
wordlists_dir = Path("/usr/share/wordlists")
rockyou = wordlists_dir / "rockyou.txt"  # Clean path joining

# Check existence
if rockyou.exists():
    print(f"✅ Found: {rockyou}")
    print(f"Size: {rockyou.stat().st_size / 1024 / 1024:.2f} MB")

# Iterate directory
for wordlist in wordlists_dir.glob("*.txt"):
    print(f"Wordlist: {wordlist.name}")

# Create directories
reports_dir = Path("./scan_reports")
reports_dir.mkdir(exist_ok=True)  # Create if doesn't exist

# Generate unique output filename
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = reports_dir / f"scan_{timestamp}.html"

# Read/write with Path objects
config = Path("config.txt")
if config.exists():
    content = config.read_text()  # Shorthand for open/read/close

# Write with Path
report_file.write_text("<h1>Report</h1>")

# Check file properties
if report_file.is_file():
    print(f"File size: {report_file.stat().st_size} bytes")
    print(f"Modified: {datetime.fromtimestamp(report_file.stat().st_mtime)}")

Section 2: Parsing Structured Data

JSON - JavaScript Object Notation

JSON is the most common format for APIs, configuration files, and structured security data:

#!/usr/bin/env python3
"""
Working with JSON data in security contexts
"""
import json
from pathlib import Path

# Example: Parse CVE database
cve_data = '''
{
  "CVE-2021-44228": {
    "description": "Log4Shell - Remote Code Execution in Apache Log4j",
    "severity": "CRITICAL",
    "cvss_score": 10.0,
    "affected_versions": ["2.0-beta9", "2.15.0"],
    "published": "2021-12-10"
  },
  "CVE-2014-0160": {
    "description": "Heartbleed - OpenSSL TLS Heartbeat Extension Information Disclosure",
    "severity": "HIGH",
    "cvss_score": 7.5,
    "affected_versions": ["1.0.1", "1.0.1f"],
    "published": "2014-04-07"
  }
}
'''

# Parse JSON string to Python dictionary
cve_db = json.loads(cve_data)

# Access data
for cve_id, details in cve_db.items():
    if details['severity'] == 'CRITICAL':
        print(f"🚨 {cve_id}: {details['description']}")
        print(f"   CVSS: {details['cvss_score']}")


# Read JSON from file
def load_cve_database(filepath: str) -> dict:
    """
    Load CVE database from JSON file.
    """
    with open(filepath, 'r') as f:
        return json.load(f)


# Write JSON to file
def save_scan_results_json(results: dict, output_file: str) -> None:
    """
    Save scan results as JSON with pretty formatting.
    """
    with open(output_file, 'w') as f:
        json.dump(results, f, indent=2)

    print(f"✅ Results saved to {output_file}")


# Example: Parse nmap JSON output
def parse_nmap_json(nmap_file: str) -> dict:
    """
    Parse nmap JSON output (-oJ flag).
    Returns dictionary of hosts and open ports.
    """
    with open(nmap_file, 'r') as f:
        nmap_data = json.load(f)

    results = {}

    for host in nmap_data.get('nmaprun', {}).get('host', []):
        ip = host.get('address', {}).get('addr', 'Unknown')
        ports = []

        for port in host.get('ports', {}).get('port', []):
            if port.get('state', {}).get('state') == 'open':
                ports.append({
                    'port': port.get('portid'),
                    'service': port.get('service', {}).get('name', 'unknown')
                })

        results[ip] = ports

    return results


# Practical example: Vulnerability scanner with JSON config
class VulnScanner:
    """
    Vulnerability scanner configured via JSON file.
    """
    def __init__(self, config_file: str):
        """Load scanner configuration from JSON."""
        with open(config_file, 'r') as f:
            self.config = json.load(f)

        self.targets = self.config.get('targets', [])
        self.ports = self.config.get('ports', [80, 443])
        self.timeout = self.config.get('timeout', 2)

    def scan(self) -> dict:
        """Run scan based on config."""
        results = {
            'timestamp': str(datetime.now()),
            'config': self.config,
            'hosts': {}
        }

        # Scan logic here...

        return results

# Example config.json:
# {
#   "targets": ["192.168.1.1", "192.168.1.10"],
#   "ports": [21, 22, 80, 443, 3389],
#   "timeout": 3,
#   "output": "scan_results.json"
# }

CSV - Comma-Separated Values

CSV is common for tabular security data (scan results, logs, vulnerability exports):

#!/usr/bin/env python3
"""
Working with CSV data in security contexts
"""
import csv
from pathlib import Path

# Read CSV file
def parse_vulnerability_report(csv_file: str) -> list[dict]:
    """
    Parse vulnerability scan CSV export.

    Expected format:
    Host,Port,Service,Vulnerability,Severity,CVSS
    """
    vulnerabilities = []

    with open(csv_file, 'r') as f:
        reader = csv.DictReader(f)  # Returns dict per row

        for row in reader:
            # Filter critical/high only
            if row['Severity'] in ['Critical', 'High']:
                vulnerabilities.append({
                    'host': row['Host'],
                    'port': int(row['Port']),
                    'service': row['Service'],
                    'vuln': row['Vulnerability'],
                    'severity': row['Severity'],
                    'cvss': float(row['CVSS'])
                })

    return vulnerabilities


# Write CSV file
def export_scan_results_csv(results: list[dict], output_file: str) -> None:
    """
    Export scan results to CSV format.
    """
    fieldnames = ['Host', 'Port', 'State', 'Service', 'Banner']

    with open(output_file, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)

        writer.writeheader()  # Write column headers

        for result in results:
            writer.writerow({
                'Host': result['host'],
                'Port': result['port'],
                'State': result.get('state', 'open'),
                'Service': result.get('service', 'unknown'),
                'Banner': result.get('banner', '')
            })

    print(f"✅ Exported {len(results)} results to {output_file}")


# Example: Process firewall logs
def analyze_firewall_logs(log_csv: str) -> dict:
    """
    Analyze firewall logs from CSV export.
    Returns statistics on blocked connections.
    """
    stats = {
        'total_blocks': 0,
        'top_blocked_ips': {},
        'top_blocked_ports': {}
    }

    with open(log_csv, 'r') as f:
        reader = csv.DictReader(f)

        for row in reader:
            if row['Action'] == 'BLOCK':
                stats['total_blocks'] += 1

                # Count blocked IPs
                src_ip = row['Source_IP']
                stats['top_blocked_ips'][src_ip] = \
                    stats['top_blocked_ips'].get(src_ip, 0) + 1

                # Count blocked ports
                dst_port = row['Dest_Port']
                stats['top_blocked_ports'][dst_port] = \
                    stats['top_blocked_ports'].get(dst_port, 0) + 1

    return stats

XML Parsing

Many security tools output XML (nmap, vulnerability scanners). Python's xml.etree.ElementTree handles XML parsing:

#!/usr/bin/env python3
"""
Parsing XML security data (nmap, Nessus, etc.)
"""
import xml.etree.ElementTree as ET

def parse_nmap_xml(xml_file: str) -> dict:
    """
    Parse nmap XML output (-oX flag).

    Returns:
        Dictionary of hosts with open ports and services
    """
    tree = ET.parse(xml_file)
    root = tree.getroot()

    results = {}

    # Iterate through each host
    for host in root.findall('host'):
        # Get IP address
        address = host.find('address').get('addr')

        # Get hostname if available
        hostnames = host.find('hostnames')
        hostname = None
        if hostnames is not None:
            hostname_elem = hostnames.find('hostname')
            if hostname_elem is not None:
                hostname = hostname_elem.get('name')

        # Get open ports
        ports_data = []
        ports = host.find('ports')
        if ports is not None:
            for port in ports.findall('port'):
                state = port.find('state').get('state')
                if state == 'open':
                    port_id = port.get('portid')
                    protocol = port.get('protocol')

                    service = port.find('service')
                    service_name = service.get('name', 'unknown') if service is not None else 'unknown'

                    ports_data.append({
                        'port': port_id,
                        'protocol': protocol,
                        'service': service_name
                    })

        results[address] = {
            'hostname': hostname,
            'ports': ports_data
        }

    return results


# Usage
nmap_results = parse_nmap_xml('scan_results.xml')

for ip, data in nmap_results.items():
    print(f"\n🎯 {ip} ({data['hostname'] or 'N/A'})")
    for port_info in data['ports']:
        print(f"   {port_info['port']}/{port_info['protocol']} - {port_info['service']}")

Section 3: Regular Expressions (Regex)

Regular expressions are essential for extracting structured data from unstructured text (logs, banners, responses):

Regex Basics for Security

#!/usr/bin/env python3
"""
Regular expressions for security data extraction
"""
import re

# Example 1: Extract IP addresses
def extract_ips(text: str) -> list[str]:
    """
    Extract all IPv4 addresses from text.
    """
    # Regex pattern for IPv4 (simplified)
    ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'

    ips = re.findall(ip_pattern, text)
    return ips


log_line = "Failed login from 192.168.1.100 to 10.0.0.5"
print(extract_ips(log_line))  # ['192.168.1.100', '10.0.0.5']


# Example 2: Extract email addresses
def extract_emails(text: str) -> list[str]:
    """
    Extract email addresses (for OSINT, phishing analysis).
    """
    email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'

    emails = re.findall(email_pattern, text)
    return emails


# Example 3: Parse SSH failed login attempts
def parse_ssh_failures(log_file: str) -> dict:
    """
    Parse /var/log/auth.log for SSH failures.
    Returns dict of {ip: failure_count}.
    """
    # Sample log line:
    # Dec 10 15:30:45 server sshd[12345]: Failed password for invalid user admin from 203.0.113.1 port 54321 ssh2

    pattern = r'Failed password for .* from (\d+\.\d+\.\d+\.\d+) port (\d+)'

    failures = {}

    with open(log_file, 'r') as f:
        for line in f:
            match = re.search(pattern, line)
            if match:
                ip = match.group(1)      # First captured group (IP)
                port = match.group(2)    # Second captured group (port)

                failures[ip] = failures.get(ip, 0) + 1

    return failures


# Example 4: Extract URLs from HTML/text
def extract_urls(text: str) -> list[str]:
    """
    Extract all URLs (useful for web scraping, phishing analysis).
    """
    url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'

    urls = re.findall(url_pattern, text)
    return urls


# Example 5: Extract hashes from text
def extract_hashes(text: str) -> dict:
    """
    Extract MD5, SHA1, SHA256 hashes from text.
    Returns categorized dict.
    """
    hashes = {
        'md5': [],
        'sha1': [],
        'sha256': []
    }

    # MD5: 32 hex chars
    md5_pattern = r'\b[a-fA-F0-9]{32}\b'
    hashes['md5'] = re.findall(md5_pattern, text)

    # SHA1: 40 hex chars
    sha1_pattern = r'\b[a-fA-F0-9]{40}\b'
    hashes['sha1'] = re.findall(sha1_pattern, text)

    # SHA256: 64 hex chars
    sha256_pattern = r'\b[a-fA-F0-9]{64}\b'
    hashes['sha256'] = re.findall(sha256_pattern, text)

    return hashes


# Example 6: Validate and extract CVE IDs
def extract_cves(text: str) -> list[str]:
    """
    Extract CVE identifiers (CVE-YYYY-NNNNN).
    """
    cve_pattern = r'CVE-\d{4}-\d{4,7}'

    cves = re.findall(cve_pattern, text, re.IGNORECASE)
    return cves


vulnerability_report = """
This system is vulnerable to CVE-2021-44228 (Log4Shell) and CVE-2014-0160 (Heartbleed).
Immediate patching required.
"""

print(extract_cves(vulnerability_report))
# Output: ['CVE-2021-44228', 'CVE-2014-0160']


# Example 7: Parse HTTP headers
def parse_http_response(response: str) -> dict:
    """
    Parse HTTP response headers using regex.
    """
    headers = {}

    # Split into lines
    lines = response.split('\n')

    # First line is status
    status_match = re.match(r'HTTP/[\d.]+ (\d+) (.+)', lines[0])
    if status_match:
        headers['status_code'] = int(status_match.group(1))
        headers['status_text'] = status_match.group(2)

    # Parse headers (Header: Value)
    for line in lines[1:]:
        match = re.match(r'([^:]+):\s*(.+)', line)
        if match:
            header_name = match.group(1)
            header_value = match.group(2)
            headers[header_name] = header_value

    return headers

Advanced Regex Patterns

#!/usr/bin/env python3
"""
Advanced regex patterns for security analysis
"""
import re

# Compiled patterns (faster for repeated use)
IP_PATTERN = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
EMAIL_PATTERN = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
HASH_MD5 = re.compile(r'\b[a-fA-F0-9]{32}\b')


def extract_indicators_of_compromise(text: str) -> dict:
    """
    Extract multiple IOCs from text using compiled patterns.
    """
    iocs = {
        'ips': IP_PATTERN.findall(text),
        'emails': EMAIL_PATTERN.findall(text),
        'md5_hashes': HASH_MD5.findall(text)
    }

    return iocs


# Multi-line regex for complex log parsing
def parse_multiline_error(log_text: str) -> list[dict]:
    """
    Parse Java stack traces or multi-line errors.
    """
    # Match patterns across multiple lines
    pattern = re.compile(
        r'Exception: (.+?)\n.*?at (.+?)\((.+?):(\d+)\)',
        re.DOTALL  # . matches newlines
    )

    errors = []
    for match in pattern.finditer(log_text):
        errors.append({
            'exception': match.group(1),
            'method': match.group(2),
            'file': match.group(3),
            'line': int(match.group(4))
        })

    return errors


# Regex substitution for sanitization
def sanitize_log(log_text: str) -> str:
    """
    Remove sensitive data (IPs, emails) from logs before sharing.
    """
    # Replace IPs with [REDACTED_IP]
    sanitized = IP_PATTERN.sub('[REDACTED_IP]', log_text)

    # Replace emails with [REDACTED_EMAIL]
    sanitized = EMAIL_PATTERN.sub('[REDACTED_EMAIL]', sanitized)

    return sanitized

Section 4: Practical Applications

Wordlist Generation

Building custom wordlists for password cracking or fuzzing:

#!/usr/bin/env python3
"""
Custom wordlist generator for security testing
"""
from itertools import product, permutations
from pathlib import Path

class WordlistGenerator:
    """
    Generate custom wordlists for password attacks or fuzzing.
    """

    def __init__(self, output_file: str):
        """Initialize generator with output file."""
        self.output_file = Path(output_file)
        self.words = set()  # Use set to avoid duplicates

    def add_base_words(self, base_words: list[str]) -> None:
        """Add base words to wordlist."""
        self.words.update(base_words)

    def add_variations(self, word: str) -> None:
        """
        Add common variations of a word.
        Example: "password" -> "Password", "PASSWORD", "p@ssword", etc.
        """
        variations = [
            word,
            word.lower(),
            word.upper(),
            word.capitalize(),
            word.replace('a', '@'),
            word.replace('e', '3'),
            word.replace('i', '1'),
            word.replace('o', '0'),
            word.replace('s', '$'),
        ]

        self.words.update(variations)

    def add_year_suffix(self, base_word: str, start_year: int = 2020, end_year: int = 2026) -> None:
        """
        Add year suffixes to word.
        Example: "password2024", "password2025"
        """
        for year in range(start_year, end_year + 1):
            self.words.add(f"{base_word}{year}")

    def add_number_suffix(self, base_word: str, max_num: int = 999) -> None:
        """
        Add number suffixes (0-max_num).
        """
        for num in range(max_num + 1):
            self.words.add(f"{base_word}{num}")

    def add_common_patterns(self, base_word: str) -> None:
        """
        Add common password patterns.
        """
        patterns = [
            f"{base_word}!",
            f"{base_word}123",
            f"{base_word}@123",
            f"1{base_word}",
            f"{base_word}{base_word}",
        ]
        self.words.update(patterns)

    def save(self) -> None:
        """Write wordlist to file."""
        with open(self.output_file, 'w') as f:
            for word in sorted(self.words):
                f.write(word + '\n')

        print(f"✅ Generated {len(self.words)} words in {self.output_file}")


# Usage example
generator = WordlistGenerator("custom_passwords.txt")

# Add base words (company name, common passwords)
generator.add_base_words(["company", "admin", "password"])

# Generate variations
for word in ["company", "admin", "password"]:
    generator.add_variations(word)
    generator.add_year_suffix(word)
    generator.add_common_patterns(word)

generator.save()

CVE Database Parser

Parse and query CVE databases for vulnerability research:

#!/usr/bin/env python3
"""
CVE database parser and search tool
"""
import json
from pathlib import Path
from typing import Optional

class CVEDatabase:
    """
    Parse and query CVE vulnerability database.
    """

    def __init__(self, json_file: str):
        """Load CVE database from JSON."""
        with open(json_file, 'r') as f:
            self.cves = json.load(f)

        print(f"✅ Loaded {len(self.cves)} CVE entries")

    def search_by_severity(self, severity: str) -> list[dict]:
        """
        Find all CVEs with given severity.

        Args:
            severity: CRITICAL, HIGH, MEDIUM, or LOW

        Returns:
            List of matching CVE entries
        """
        matches = []

        for cve_id, details in self.cves.items():
            if details.get('severity', '').upper() == severity.upper():
                matches.append({
                    'id': cve_id,
                    **details
                })

        return matches

    def search_by_keyword(self, keyword: str) -> list[dict]:
        """
        Search CVE descriptions for keyword.
        """
        matches = []

        for cve_id, details in self.cves.items():
            description = details.get('description', '').lower()
            if keyword.lower() in description:
                matches.append({
                    'id': cve_id,
                    **details
                })

        return matches

    def search_by_cvss_range(self, min_score: float, max_score: float = 10.0) -> list[dict]:
        """
        Find CVEs within CVSS score range.
        """
        matches = []

        for cve_id, details in self.cves.items():
            cvss = details.get('cvss_score', 0)
            if min_score <= cvss <= max_score:
                matches.append({
                    'id': cve_id,
                    'cvss': cvss,
                    **details
                })

        # Sort by CVSS (highest first)
        matches.sort(key=lambda x: x['cvss'], reverse=True)

        return matches

    def get_critical_vulnerabilities(self) -> list[dict]:
        """Get all CRITICAL severity CVEs."""
        return self.search_by_severity('CRITICAL')

    def export_report(self, cves: list[dict], output_file: str) -> None:
        """
        Export CVE list to HTML report.
        """
        html = """


    CVE Report
    


    

CVE Vulnerability Report

Total CVEs: """ + str(len(cves)) + """

""" for cve in cves: severity_class = cve.get('severity', 'medium').lower() html += f"""

{cve['id']}

Severity: {cve.get('severity', 'N/A')} (CVSS: {cve.get('cvss_score', 'N/A')})

Description: {cve.get('description', 'N/A')}

Published: {cve.get('published', 'N/A')}

""" html += """ """ Path(output_file).write_text(html) print(f"✅ Report exported to {output_file}") # Usage db = CVEDatabase('cve_database.json') # Find all critical vulnerabilities critical = db.get_critical_vulnerabilities() print(f"Found {len(critical)} critical CVEs") # Search for specific vulnerability type rce_vulns = db.search_by_keyword('remote code execution') print(f"Found {len(rce_vulns)} RCE vulnerabilities") # Export report db.export_report(critical, 'critical_cves.html')

Log File Aggregator

Combine and analyze logs from multiple sources:

#!/usr/bin/env python3
"""
Log file aggregator and pattern detector
"""
import re
from pathlib import Path
from datetime import datetime
from collections import Counter

class LogAggregator:
    """
    Aggregate and analyze logs from multiple files.
    """

    def __init__(self):
        """Initialize aggregator."""
        self.logs = []
        self.stats = {
            'total_lines': 0,
            'error_count': 0,
            'warning_count': 0,
            'ip_addresses': Counter(),
            'failed_logins': Counter()
        }

    def add_log_file(self, log_file: str) -> None:
        """
        Add log file to aggregator.
        """
        log_path = Path(log_file)

        if not log_path.exists():
            print(f"❌ File not found: {log_file}")
            return

        print(f"📄 Processing {log_file}...")

        with open(log_path, 'r') as f:
            for line in f:
                self.logs.append({
                    'source': log_file,
                    'content': line.strip(),
                    'timestamp': self._extract_timestamp(line)
                })

                self.stats['total_lines'] += 1

                # Count errors/warnings
                if 'ERROR' in line.upper():
                    self.stats['error_count'] += 1
                if 'WARNING' in line.upper():
                    self.stats['warning_count'] += 1

                # Extract IPs
                ips = re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', line)
                for ip in ips:
                    self.stats['ip_addresses'][ip] += 1

                # Detect failed logins
                if re.search(r'failed|failure|invalid', line, re.IGNORECASE):
                    ips = re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', line)
                    for ip in ips:
                        self.stats['failed_logins'][ip] += 1

    def _extract_timestamp(self, line: str) -> str | None:
        """
        Extract timestamp from log line (if present).
        """
        # Common timestamp patterns
        patterns = [
            r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}',  # 2024-01-15 14:30:45
            r'\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}',  # Jan 15 14:30:45
        ]

        for pattern in patterns:
            match = re.search(pattern, line)
            if match:
                return match.group(0)

        return None

    def get_top_ips(self, n: int = 10) -> list[tuple]:
        """
        Get top N most frequent IP addresses.
        """
        return self.stats['ip_addresses'].most_common(n)

    def get_suspicious_ips(self, threshold: int = 5) -> list[str]:
        """
        Get IPs with more than threshold failed login attempts.
        """
        suspicious = []

        for ip, count in self.stats['failed_logins'].items():
            if count >= threshold:
                suspicious.append((ip, count))

        # Sort by count (descending)
        suspicious.sort(key=lambda x: x[1], reverse=True)

        return suspicious

    def export_summary(self, output_file: str) -> None:
        """
        Export summary report.
        """
        report = f"""
=== LOG ANALYSIS SUMMARY ===
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

Total Log Lines: {self.stats['total_lines']}
Errors: {self.stats['error_count']}
Warnings: {self.stats['warning_count']}

=== TOP 10 IP ADDRESSES ===
"""

        for ip, count in self.get_top_ips(10):
            report += f"{ip}: {count} occurrences\n"

        report += "\n=== SUSPICIOUS IPs (Failed Logins) ===\n"

        suspicious = self.get_suspicious_ips(5)
        if suspicious:
            for ip, count in suspicious:
                report += f"⚠️  {ip}: {count} failed attempts\n"
        else:
            report += "No suspicious activity detected.\n"

        Path(output_file).write_text(report)
        print(f"\n✅ Summary exported to {output_file}")


# Usage
aggregator = LogAggregator()

# Add multiple log files
aggregator.add_log_file('/var/log/auth.log')
aggregator.add_log_file('/var/log/apache2/access.log')
aggregator.add_log_file('/var/log/syslog')

# Get suspicious IPs
suspicious = aggregator.get_suspicious_ips(threshold=10)
for ip, count in suspicious:
    print(f"🚨 {ip}: {count} failed login attempts")

# Export summary
aggregator.export_summary('log_analysis.txt')

Lab 4: File I/O & Data Processing

⚠️ Lab Environment: Use authorized test systems only. Never run security tools against systems you don't own or have permission to test.
⏱️ 140 minutes total Difficulty: Intermediate

Part 1: Custom Wordlist Generator (35 minutes)

Objective: Build a wordlist generator for password testing with multiple variation strategies.

Requirements:

  1. Create wordlist_gen.py with the following features:
    • Accept base words from command line or file
    • Generate variations (case, l33t speak, years, numbers)
    • Support multiple output formats (txt, sorted by length, etc.)
    • Display statistics (total words, file size)
  2. Test with company names, common passwords, personal info
  3. Add custom patterns (special chars, prefixes/suffixes)

Success Criteria:

Hint 1: Program Structure
#!/usr/bin/env python3
import argparse
from pathlib import Path

class WordlistGenerator:
    def __init__(self):
        self.words = set()  # Automatic deduplication

    def add_variations(self, word):
        # Add case variations
        # Add l33t speak (a->@, e->3, etc.)
        # Add year/number suffixes
        pass

    def save(self, output_file):
        # Write sorted wordlist
        pass

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--base-words', required=True)
    parser.add_argument('--output', default='wordlist.txt')
    args = parser.parse_args()

    # Load base words, generate variations, save

if __name__ == '__main__':
    main()
Hint 2: L33t Speak Conversion
def leet_speak(word):
    """Convert word to l33t speak variations."""
    substitutions = {
        'a': '@', 'e': '3', 'i': '1',
        'o': '0', 's': '$', 't': '7'
    }

    variations = [word]

    for char, replacement in substitutions.items():
        new_variation = word.replace(char, replacement)
        variations.append(new_variation)

    return variations

Part 2: CVE Database Parser & Search Tool (40 minutes)

Objective: Build a tool to parse CVE JSON database and search for vulnerabilities.

Requirements:

  1. Download sample CVE data (create JSON with 10+ CVEs):
    • Include fields: CVE ID, description, severity, CVSS score, published date
  2. Create cve_search.py with:
    • Load CVE database from JSON
    • Search by severity (CRITICAL, HIGH, MEDIUM, LOW)
    • Search by keyword in description
    • Filter by CVSS score range
    • Export results to HTML report
  3. Add command-line interface with argparse

Success Criteria:

Hint: Sample CVE JSON Structure
{
  "CVE-2021-44228": {
    "description": "Apache Log4j2 RCE (Log4Shell)",
    "severity": "CRITICAL",
    "cvss_score": 10.0,
    "published": "2021-12-10"
  },
  "CVE-2014-0160": {
    "description": "OpenSSL Heartbleed",
    "severity": "HIGH",
    "cvss_score": 7.5,
    "published": "2014-04-07"
  }
}

Part 3: Security Scan Report Generator (35 minutes)

Objective: Generate professional HTML/Markdown reports from scan results.

Requirements:

  1. Create report_generator.py that:
    • Reads scan results from JSON file
    • Generates professional HTML report with CSS styling
    • Includes summary statistics (total hosts, open ports, vulnerabilities)
    • Color-codes severity levels
    • Optionally generates Markdown version
  2. Use sample scan data (you can create fictional data)
  3. Add timestamp and metadata to report

Success Criteria:

Hint: HTML Template Structure
def generate_html_report(scan_data, output_file):
    """Generate HTML report from scan data."""

    html = f"""<!DOCTYPE html>
<html>
<head>
    <title>Security Scan Report</title>
    <style>
        body {{ font-family: Arial, sans-serif; margin: 40px; }}
        .critical {{ background-color: #ffebee; border-left: 4px solid #f44336; }}
        .high {{ background-color: #fff3e0; border-left: 4px solid #ff9800; }}
        .host {{ padding: 20px; margin: 15px 0; border-radius: 8px; }}
    </style>
</head>
<body>
    <h1>Security Scan Report</h1>
    <p>Generated: {datetime.now()}</p>

    <h2>Summary</h2>
    <p>Total Hosts Scanned: {len(scan_data)}</p>
"""

    # Add host details...

    html += "</body></html>"

    with open(output_file, 'w') as f:
        f.write(html)

Part 4: Log File Aggregator & Pattern Detector (30 minutes)

Objective: Build a tool to aggregate multiple log files and detect security patterns.

Requirements:

  1. Create log_aggregator.py that:
    • Accepts multiple log file paths as arguments
    • Extracts IP addresses from all logs
    • Detects failed login attempts (use regex)
    • Identifies IPs with >5 failed attempts
    • Counts errors, warnings, critical events
    • Exports summary report
  2. Test with sample log files (create your own or use system logs)
  3. Display top 10 most active IPs

Success Criteria:

Hint: Sample Log Lines
# Create test log file: test.log
Jan 15 10:30:45 server sshd[1234]: Failed password for admin from 192.168.1.100 port 22 ssh2
Jan 15 10:30:50 server sshd[1235]: Failed password for root from 192.168.1.100 port 22 ssh2
Jan 15 10:31:00 server sshd[1236]: Accepted password for user from 10.0.0.5 port 22 ssh2
Jan 15 10:32:15 server kernel: ERROR: Disk read failure
🎯 Lab Complete! You've now built production-quality tools for file I/O and data processing. These skills are essential for security automation, log analysis, and vulnerability management.

📤 Deliverables:

Additional Resources

Python Documentation

Regex Resources

Security Data Sources

Practice Challenges

Key Takeaways

Week 04 Quiz

Test your understanding of file I/O and data processing for security applications.

Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.

Take Quiz