Opening Framing: Working with Real Data
Until now, your scripts have worked with data defined in the code itself. But real security work involves files: log files from servers, exported alerts from SIEMs, threat intelligence feeds, configuration files, and reports you generate for stakeholders.
File operations connect your scripts to the real world. Reading files lets you process actual log data. Writing files lets you save results, generate reports, and export data for other tools. This is where scripts become practical security tools.
This week, you'll learn to read log files, parse CSV data, work with JSON (the lingua franca of APIs), and write professional reports—skills you'll use in every security role.
Key insight: Security data lives in files. Scripts that can't read and write files can't do real work. Master file operations and you can automate any data processing task.
1) Reading Text Files
The most common operation: reading a file line by line. This is how you process log files:
# Basic file reading
with open("auth.log", "r") as file:
content = file.read()
print(content)
# Read line by line (memory efficient for large files)
with open("auth.log", "r") as file:
for line in file:
print(line.strip()) # strip() removes trailing newline
# Read all lines into a list
with open("auth.log", "r") as file:
lines = file.readlines()
print(f"File has {len(lines)} lines")
The with Statement:
- Automatically closes the file when done (even if errors occur)
- Prevents resource leaks and file corruption
- Always use
withfor file operations
Security Log Processing:
# Process auth.log for failed logins
failed_logins = []
with open("auth.log", "r") as file:
for line in file:
if "Failed password" in line:
failed_logins.append(line.strip())
print(f"Found {len(failed_logins)} failed login attempts")
for entry in failed_logins[:5]: # Show first 5
print(f" {entry}")
Key insight: Process files line by line for large logs. Reading 10GB into memory crashes your script; iterating line by line processes any file size.
2) Writing Text Files
Writing files lets you save results, generate reports, and export data:
# Write mode ("w") - creates new or overwrites existing
with open("report.txt", "w") as file:
file.write("Security Analysis Report\n")
file.write("=" * 30 + "\n")
file.write("Generated by automated scan\n")
# Append mode ("a") - adds to existing file
with open("alerts.log", "a") as file:
file.write("2024-01-15 10:30:00 ALERT: Suspicious activity\n")
# Write multiple lines
findings = ["Finding 1: Open port 22", "Finding 2: Weak password", "Finding 3: Missing patches"]
with open("findings.txt", "w") as file:
for finding in findings:
file.write(finding + "\n")
# Or use writelines (doesn't add newlines automatically)
with open("findings.txt", "w") as file:
file.writelines([f + "\n" for f in findings])
Generating a Security Report:
# Generate formatted report
def generate_report(scan_results, output_file):
with open(output_file, "w") as file:
file.write("=" * 50 + "\n")
file.write("VULNERABILITY SCAN REPORT\n")
file.write("=" * 50 + "\n\n")
file.write(f"Total hosts scanned: {scan_results['host_count']}\n")
file.write(f"Vulnerabilities found: {scan_results['vuln_count']}\n\n")
file.write("FINDINGS:\n")
file.write("-" * 30 + "\n")
for finding in scan_results['findings']:
file.write(f" - {finding}\n")
file.write("\n" + "=" * 50 + "\n")
file.write("END OF REPORT\n")
# Use the function
results = {
"host_count": 50,
"vuln_count": 12,
"findings": ["CVE-2024-1234 on 10.0.0.5", "Weak SSH config on 10.0.0.10"]
}
generate_report(results, "scan_report.txt")
Key insight: "w" overwrites, "a" appends. Use
append for logs that accumulate over time; use write for reports you
regenerate.
3) Working with CSV Files
CSV (Comma-Separated Values) is common for exporting SIEM data, threat intel feeds, and tabular security data:
import csv
# Reading CSV
with open("alerts.csv", "r") as file:
reader = csv.reader(file)
header = next(reader) # Skip header row
for row in reader:
timestamp, severity, source_ip, message = row
print(f"{severity}: {message} from {source_ip}")
# Reading CSV as dictionaries (easier to work with)
with open("alerts.csv", "r") as file:
reader = csv.DictReader(file)
for row in reader:
print(f"{row['severity']}: {row['message']}")
Writing CSV:
import csv
# Write CSV from list of lists
alerts = [
["2024-01-15 10:00", "HIGH", "10.0.0.5", "Brute force detected"],
["2024-01-15 10:05", "MEDIUM", "10.0.0.10", "Port scan detected"],
["2024-01-15 10:10", "LOW", "10.0.0.15", "Failed login"]
]
with open("output.csv", "w", newline="") as file:
writer = csv.writer(file)
writer.writerow(["timestamp", "severity", "source_ip", "message"]) # Header
writer.writerows(alerts)
# Write from dictionaries (cleaner)
alert_dicts = [
{"timestamp": "2024-01-15 10:00", "severity": "HIGH", "source": "10.0.0.5"},
{"timestamp": "2024-01-15 10:05", "severity": "MEDIUM", "source": "10.0.0.10"}
]
with open("output.csv", "w", newline="") as file:
fieldnames = ["timestamp", "severity", "source"]
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(alert_dicts)
Key insight: Use DictReader and DictWriter for
cleaner code—access columns by name instead of position.
4) JSON: The Universal Data Format
JSON (JavaScript Object Notation) is the standard for API responses, configuration files, and data exchange. Python's dictionaries map directly to JSON:
import json
# Reading JSON file
with open("config.json", "r") as file:
config = json.load(file)
print(config["api_key"])
print(config["settings"]["timeout"])
# Writing JSON file
threat_data = {
"iocs": [
{"type": "ip", "value": "203.0.113.50", "severity": "high"},
{"type": "hash", "value": "abc123...", "severity": "critical"}
],
"generated": "2024-01-15",
"source": "Internal scan"
}
with open("threats.json", "w") as file:
json.dump(threat_data, file, indent=2) # indent for readability
JSON and API Responses:
import json
# Simulated API response (string)
api_response = '''
{
"status": "success",
"data": {
"ip": "203.0.113.50",
"reputation": "malicious",
"tags": ["c2", "botnet"],
"confidence": 95
}
}
'''
# Parse JSON string
result = json.loads(api_response) # loads = load string
print(f"IP: {result['data']['ip']}")
print(f"Reputation: {result['data']['reputation']}")
print(f"Tags: {', '.join(result['data']['tags'])}")
# Convert back to string
json_string = json.dumps(result, indent=2) # dumps = dump string
Handling JSON Errors:
import json
try:
with open("data.json", "r") as file:
data = json.load(file)
except FileNotFoundError:
print("File not found")
data = {}
except json.JSONDecodeError as e:
print(f"Invalid JSON: {e}")
data = {}
Key insight: json.load() reads from file, json.loads()
parses a string. Same for dump() vs dumps().
5) File Paths and Error Handling
Robust scripts handle missing files, permission errors, and work across operating systems:
import os
from pathlib import Path
# Check if file exists before reading
if os.path.exists("auth.log"):
with open("auth.log", "r") as file:
content = file.read()
else:
print("File not found")
# Using pathlib (modern approach)
log_path = Path("logs/auth.log")
if log_path.exists():
content = log_path.read_text()
# Cross-platform path handling
from pathlib import Path
# Works on Windows and Linux
log_dir = Path("logs")
auth_log = log_dir / "auth.log" # Creates "logs/auth.log" or "logs\auth.log"
# Create directory if needed
log_dir.mkdir(exist_ok=True)
Comprehensive Error Handling:
def safe_read_file(filepath):
"""Safely read a file with proper error handling."""
try:
with open(filepath, "r") as file:
return file.read()
except FileNotFoundError:
print(f"ERROR: File not found: {filepath}")
return None
except PermissionError:
print(f"ERROR: Permission denied: {filepath}")
return None
except Exception as e:
print(f"ERROR: Unexpected error reading {filepath}: {e}")
return None
# Use safely
content = safe_read_file("/var/log/auth.log")
if content:
# Process content
pass
Working with Multiple Files:
from pathlib import Path
# Process all .log files in a directory
log_dir = Path("/var/log")
for log_file in log_dir.glob("*.log"):
print(f"Processing: {log_file.name}")
# Process each file...
# Recursive search
for log_file in log_dir.glob("**/*.log"): # ** means all subdirectories
print(log_file)
Key insight: Always handle file errors. Production scripts encounter missing files, permission issues, and corrupted data. Graceful error handling prevents crashes during incidents.
Real-World Context: Files in Security Operations
File operations are central to security workflows:
Log Analysis: Every security investigation starts with logs. Auth.log, syslog, Windows Event Logs (exported as EVTX or CSV), application logs—all are files your scripts can process. The first skill in DFIR is parsing log files efficiently.
Threat Intelligence: IOC feeds arrive as files—STIX/TAXII bundles (JSON), CSV exports from platforms, or plain text lists. Your scripts read these feeds, parse them, and integrate them into detection systems.
Report Generation: Security assessments produce reports. Automating report generation from scan results saves hours of manual work. Many tools output JSON that scripts transform into readable reports.
MITRE ATT&CK Reference: Technique T1005 (Data from Local System) describes how attackers collect files for exfiltration. Defenders use the same file operations to analyze what was accessed, monitor file integrity, and investigate breaches.
Key insight: The ability to read, process, and write files transforms you from a tool user into a tool builder. Every custom security workflow involves file operations.
Guided Lab: Log Parser and Reporter
Let's build a complete log analysis tool that reads a log file, analyzes it, and generates both CSV and JSON reports.
Step 1: Create Sample Log File
Create sample_auth.log:
2024-01-15 09:00:00 INFO Successful login user=jsmith src=192.168.1.10
2024-01-15 09:01:00 WARN Failed login user=admin src=203.0.113.50
2024-01-15 09:01:05 WARN Failed login user=admin src=203.0.113.50
2024-01-15 09:01:10 WARN Failed login user=admin src=203.0.113.50
2024-01-15 09:02:00 INFO Successful login user=mjones src=192.168.1.25
2024-01-15 09:03:00 WARN Failed login user=root src=203.0.113.50
2024-01-15 09:04:00 ERROR Connection timeout src=10.0.0.5
2024-01-15 09:05:00 WARN Failed login user=admin src=203.0.113.50
2024-01-15 09:06:00 INFO Successful login user=admin src=192.168.1.100
2024-01-15 09:07:00 WARN Failed login user=guest src=198.51.100.25
2024-01-15 09:08:00 INFO Successful login user=jsmith src=192.168.1.10
2024-01-15 09:09:00 ERROR Service unavailable component=database
Step 2: Create the Parser Script
Create log_parser.py:
import csv
import json
from collections import defaultdict
def parse_log_file(filepath):
"""Parse log file and extract structured data."""
events = []
with open(filepath, "r") as file:
for line_num, line in enumerate(file, 1):
line = line.strip()
if not line:
continue
# Parse: "2024-01-15 09:00:00 INFO message..."
parts = line.split(" ", 3)
if len(parts) >= 4:
event = {
"line_number": line_num,
"date": parts[0],
"time": parts[1],
"level": parts[2],
"message": parts[3],
"raw": line
}
# Extract source IP if present
if "src=" in line:
src_start = line.find("src=") + 4
src_end = line.find(" ", src_start)
if src_end == -1:
src_end = len(line)
event["source_ip"] = line[src_start:src_end]
# Extract username if present
if "user=" in line:
user_start = line.find("user=") + 5
user_end = line.find(" ", user_start)
if user_end == -1:
user_end = len(line)
event["username"] = line[user_start:user_end]
events.append(event)
return events
def analyze_events(events):
"""Analyze parsed events and generate statistics."""
analysis = {
"total_events": len(events),
"by_level": defaultdict(int),
"failed_logins": [],
"by_source_ip": defaultdict(int),
"by_username": defaultdict(int)
}
for event in events:
analysis["by_level"][event["level"]] += 1
if "Failed login" in event["message"]:
analysis["failed_logins"].append(event)
if "source_ip" in event:
analysis["by_source_ip"][event["source_ip"]] += 1
if "username" in event:
analysis["by_username"][event["username"]] += 1
# Convert defaultdicts to regular dicts for JSON
analysis["by_level"] = dict(analysis["by_level"])
analysis["by_source_ip"] = dict(analysis["by_source_ip"])
analysis["by_username"] = dict(analysis["by_username"])
return analysis
def export_csv(events, filepath):
"""Export events to CSV file."""
if not events:
return
fieldnames = ["line_number", "date", "time", "level", "message", "source_ip", "username"]
with open(filepath, "w", newline="") as file:
writer = csv.DictWriter(file, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(events)
print(f"Exported {len(events)} events to {filepath}")
def export_json(data, filepath):
"""Export data to JSON file."""
with open(filepath, "w") as file:
json.dump(data, file, indent=2)
print(f"Exported analysis to {filepath}")
def generate_text_report(analysis, filepath):
"""Generate human-readable text report."""
with open(filepath, "w") as file:
file.write("=" * 60 + "\n")
file.write("LOG ANALYSIS REPORT\n")
file.write("=" * 60 + "\n\n")
file.write(f"Total Events: {analysis['total_events']}\n\n")
file.write("Events by Level:\n")
for level, count in analysis["by_level"].items():
file.write(f" {level}: {count}\n")
file.write(f"\nFailed Login Attempts: {len(analysis['failed_logins'])}\n")
if analysis["by_source_ip"]:
file.write("\nFailed Logins by Source IP:\n")
for ip, count in sorted(analysis["by_source_ip"].items(),
key=lambda x: x[1], reverse=True):
alert = " [!] POTENTIAL BRUTE FORCE" if count >= 3 else ""
file.write(f" {ip}: {count}{alert}\n")
if analysis["by_username"]:
file.write("\nTargeted Usernames:\n")
for user, count in sorted(analysis["by_username"].items(),
key=lambda x: x[1], reverse=True):
file.write(f" {user}: {count} attempts\n")
file.write("\n" + "=" * 60 + "\n")
file.write("END OF REPORT\n")
print(f"Generated report: {filepath}")
# Main execution
if __name__ == "__main__":
print("Log Parser and Reporter")
print("-" * 40)
# Parse log file
events = parse_log_file("sample_auth.log")
print(f"Parsed {len(events)} events")
# Analyze
analysis = analyze_events(events)
# Export all formats
export_csv(events, "parsed_events.csv")
export_json(analysis, "analysis.json")
generate_text_report(analysis, "security_report.txt")
print("\nAnalysis Summary:")
print(f" Total events: {analysis['total_events']}")
print(f" Failed logins: {len(analysis['failed_logins'])}")
print(f" Unique source IPs: {len(analysis['by_source_ip'])}")
Step 3: Run and Examine Output
Run python3 log_parser.py and examine all three output files.
Step 4: Reflection (mandatory)
- Why do we use
withfor all file operations? - What's the benefit of exporting to multiple formats?
- How does
defaultdictsimplify counting? - How would you modify this to handle a different log format?
Week 7 Outcome Check
By the end of this week, you should be able to:
- Read text files line by line and in full
- Write text files in write and append modes
- Parse and generate CSV files
- Work with JSON for data exchange
- Handle file errors gracefully
- Work with file paths cross-platform
Next week: Error Handling and Defensive Coding—where we make our scripts robust enough for production use.
🎯 Hands-On Labs (Free & Essential)
Practice reading and writing files before moving to reading resources.
🎮 TryHackMe: Python Basics (Files)
What you'll do: Read and write files with Python and handle basic parsing.
Why it matters: Real security data lives in files and logs.
Time estimate: 1-1.5 hours
📝 Lab Exercise: CSV Log Export
Task: Parse a text log and export a CSV summary (timestamp, user, result).
Deliverable: CSV file plus a short script to generate it.
Why it matters: CSV is a common interchange format for SIEMs and reports.
Time estimate: 45-60 minutes
🏁 PicoCTF Practice: General Skills (File Parsing)
What you'll do: Solve beginner challenges that require reading and parsing files.
Why it matters: File parsing is the foundation of log analysis.
Time estimate: 1-2 hours
🛡️ Lab: Secure Random Tokens
What you'll do: Use the secrets module to generate file-safe tokens.
Deliverable: Script that writes 10 unique tokens to a file and verifies uniqueness.
Why it matters: Predictable tokens enable guessing and session hijacking.
Time estimate: 45-60 minutes
💡 Lab Tip: Use `with open(...)` for every file operation to avoid data loss.
🛡️ Secure Coding: Safe File Handling
Files are a common attack surface. Defensive scripts validate paths, minimize permissions, and avoid writing sensitive data unsafely.
File safety checklist:
- Use fixed directories and validate filenames
- Avoid path traversal (../) in user input
- Write files with least-privilege permissions
- Prefer atomic writes for critical files
📚 Building on CSY101 Week-13: Threat model how untrusted input reaches file operations.
Resources
Complete the required resources to build your foundation.
- Python Tutorial - Reading and Writing Files · 30-45 min · 50 XP · Resource ID: csy103_w7_r1 (Required)
- Real Python - Working With Files · 45-60 min · 50 XP · Resource ID: csy103_w7_r2 (Required)
- Automate the Boring Stuff - Chapter 9: Files · 30-45 min · 25 XP · Resource ID: csy103_w7_r3 (Optional)
Lab: IOC File Processor
Goal: Build a tool that reads IOCs from multiple file formats and consolidates them.
Linux/Windows Path (same for both)
- Create three input files:
iocs_ips.txt- plain text, one IP per lineiocs_hashes.csv- CSV with columns: hash, type, sourceiocs_domains.json- JSON array of domain objects
- Create
ioc_processor.pythat:- Reads all three input files
- Validates IOC formats (basic validation)
- Consolidates into a single data structure
- Exports to
consolidated_iocs.json - Generates
ioc_summary.txtreport
- Include proper error handling for missing files
Deliverable (submit):
- All input files and
ioc_processor.py - Generated output files
- One paragraph: How would this tool fit into a threat intel workflow?
Checkpoint Questions
- What is the difference between
"w"and"a"file modes? - Why should you always use
withwhen opening files? - What's the difference between
json.load()andjson.loads()? - How do you read a CSV file as dictionaries instead of lists?
- Why process large log files line by line instead of reading all at once?
- What exceptions should you handle when working with files?
Weekly Reflection
Reflection Prompt (200-300 words):
This week you learned file operations—the bridge between your scripts and real-world data. Reading logs, parsing feeds, and generating reports are core security automation tasks.
Reflect on these questions:
- Think of a security task you've done manually that involved files (reading logs, creating reports). How would you automate it now?
- Why is JSON so prevalent in security tools and APIs?
- How does error handling change when your script processes files from untrusted sources?
- What's the relationship between file parsing and log analysis in incident response?
A strong reflection will connect file operations to practical security workflows you've encountered or learned about.
Verified Resources & Videos
- Python CSV Module: Python Docs - CSV Module
- Python JSON Module: Python Docs - JSON Module
- Security perspective (MITRE ATT&CK): MITRE ATT&CK — Data from Local System (T1005)
File operations connect your scripts to the real world. With this week's skills, you can process real logs, real threat feeds, and generate real reports. Next week: making your code robust with error handling.