This week shifts to defensive security automation. You will build Python tools that ingest logs, detect anomalies, and enrich suspicious events with threat intelligence.
Parse and normalize syslog, JSON, and Windows event logs
Build baseline metrics and detect deviations
Automate alerting for failed logins and suspicious activity
Enrich events with IP reputation and geolocation
Design a mini-SIEM workflow with summary dashboards
Real-World Context: SOC teams depend on log pipelines (ELK, Splunk, Azure Sentinel) to transform raw events into alerts. Your scripts emulate core SIEM features: parsing, correlation, enrichment, and analyst-ready outputs.
Privacy & Legal Note: Logs often contain PII (usernames, IPs, device IDs). Handle them according to policy, minimize retention, and redact before sharing or storing outside controlled environments.
Section 1: Log Formats & Normalization
Why Normalization Matters
Logs come from different sources and formats. Normalization converts them into a consistent schema so alerts, dashboards, and analytics can query reliably.
Common Log Formats
Syslog: RFC 3164/5424 text format, common for Linux services
JSON Logs: Structured logs from cloud systems and modern apps
Windows Event Logs: XML or EVTX data exported from Windows
Syslog Severity Levels
Level
Keyword
Meaning
0
Emergency
System unusable
1
Alert
Immediate action required
2
Critical
Critical condition
3
Error
Error condition
4
Warning
Warning condition
5
Notice
Normal but significant
6
Informational
Informational messages
7
Debug
Debug-level messages
Windows Event ID Quick Reference
4624: Successful logon
4625: Failed logon (brute force indicator)
4688: Process creation (useful for malware hunting)
4720: User account creation
4726: User account deletion
4769: Kerberos service ticket request
Example: Multi-format Normalizer
#!/usr/bin/env python3
"""
Multi-format log normalizer.
Supports syslog, JSON, and Windows Event Log exports (XML).
"""
from __future__ import annotations
import json
import re
import xml.etree.ElementTree as ET
from datetime import datetime
from typing import Dict, Optional
SYSLOG_PATTERN = re.compile(
r"^(?P<ts>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+"
r"(?P<host>[\w\.-]+)\s+(?P<app>[\w\-/]+)(?:\[(?P<pid>\d+)\])?:\s+(?P<msg>.*)$"
)
def parse_syslog(line: str) -> Optional[Dict[str, str]]:
"""
Parse a syslog line into a normalized dict.
Args:
line: Raw syslog line
Returns:
Normalized dict or None if parsing fails
"""
match = SYSLOG_PATTERN.match(line.strip())
if not match:
return None
data = match.groupdict()
try:
ts = datetime.strptime(data["ts"], "%b %d %H:%M:%S")
except ValueError:
ts = datetime.utcnow()
return {
"timestamp": ts.isoformat(),
"source": "syslog",
"host": data.get("host", "unknown"),
"app": data.get("app", "unknown"),
"pid": data.get("pid", "-"),
"message": data.get("msg", ""),
}
def parse_json(line: str) -> Optional[Dict[str, str]]:
"""
Parse JSON log line into a normalized dict.
"""
try:
raw = json.loads(line)
except json.JSONDecodeError:
return None
return {
"timestamp": raw.get("timestamp") or raw.get("@timestamp", ""),
"source": raw.get("source", "json"),
"host": raw.get("host", "unknown"),
"app": raw.get("service", raw.get("app", "unknown")),
"pid": str(raw.get("pid", "-")),
"message": raw.get("message", ""),
}
def parse_windows_event(xml_line: str) -> Optional[Dict[str, str]]:
"""
Parse Windows Event Log XML export.
"""
try:
root = ET.fromstring(xml_line)
except ET.ParseError:
return None
ns = {"ns": "http://schemas.microsoft.com/win/2004/08/events/event"}
system = root.find("ns:System", ns)
event_data = root.find("ns:EventData", ns)
if system is None:
return None
ts_node = system.find("ns:TimeCreated", ns)
provider = system.find("ns:Provider", ns)
computer = system.find("ns:Computer", ns)
timestamp = ts_node.get("SystemTime") if ts_node is not None else ""
provider_name = provider.get("Name") if provider is not None else "Unknown"
host = computer.text if computer is not None else "unknown"
message_fields = []
if event_data is not None:
for data in event_data.findall("ns:Data", ns):
field_name = data.get("Name", "field")
field_val = data.text or ""
message_fields.append(f"{field_name}={field_val}")
return {
"timestamp": timestamp,
"source": "windows_event",
"host": host,
"app": provider_name,
"pid": "-",
"message": "; ".join(message_fields),
}
def normalize_line(line: str) -> Optional[Dict[str, str]]:
"""
Determine log format and normalize.
"""
if line.strip().startswith("{"):
return parse_json(line)
if line.strip().startswith("<Event"):
return parse_windows_event(line)
return parse_syslog(line)
def example() -> None:
"""Example usage for quick validation."""
syslog_line = "Oct 17 12:10:22 server sshd[1234]: Failed password for root from 10.0.0.5 port 51515"
json_line = '{"timestamp": "2024-10-17T12:10:22Z", "service": "api", "message": "Login failed", "host": "web1"}'
print(normalize_line(syslog_line))
print(normalize_line(json_line))
if __name__ == "__main__":
example()
Sample Logs for Testing
Oct 17 12:10:22 server sshd[1234]: Failed password for root from 10.0.0.5 port 51515
Oct 17 12:10:25 server sshd[1234]: Failed password for root from 10.0.0.5 port 51516
{"timestamp": "2024-10-17T12:12:10Z", "service": "api", "message": "Invalid token", "host": "edge-1"}
<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event">
<System>
<Provider Name="Microsoft-Windows-Security-Auditing"/>
<TimeCreated SystemTime="2024-10-17T12:14:01.000Z"/>
<Computer>WIN-01</Computer>
</System>
<EventData>
<Data Name="TargetUserName">administrator</Data>
<Data Name="IpAddress">203.0.113.12</Data>
<Data Name="LogonType">3</Data>
</EventData>
</Event>
Section 2: Baselines & Anomaly Detection
Baseline vs Deviation
Defensive analytics starts with a baseline. You quantify normal behavior (failed logins per user, ports per host) then flag deviations.
Failed Login Detector with Rolling Stats
#!/usr/bin/env python3
"""
Detect login anomalies using rolling statistics.
"""
from __future__ import annotations
import csv
from datetime import datetime, timedelta
from typing import Dict, List
import pandas as pd
def load_login_events(path: str) -> pd.DataFrame:
"""
Load normalized login events from CSV.
CSV must include: timestamp, username, source_ip, outcome
"""
try:
df = pd.read_csv(path)
except FileNotFoundError:
raise FileNotFoundError(f"Missing log file: {path}")
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
df = df.dropna(subset=["timestamp"])
return df
def calculate_baseline(df: pd.DataFrame, window: str = "60min") -> pd.DataFrame:
"""
Compute rolling baseline for failed logins.
"""
failures = df[df["outcome"] == "failed"].copy()
failures = failures.set_index("timestamp")
failures["count"] = 1
baseline = failures.groupby("username").resample(window).sum()
baseline = baseline.reset_index().rename(columns={"count": "failed_count"})
return baseline
def detect_anomalies(baseline: pd.DataFrame, threshold: int = 8) -> pd.DataFrame:
"""
Flag buckets exceeding threshold.
"""
return baseline[baseline["failed_count"] >= threshold]
def summarize(anomalies: pd.DataFrame) -> None:
"""
Print simple summary of anomalies for analysts.
"""
if anomalies.empty:
print("[โ] No anomalies detected")
return
print("[!] Anomalous failed login spikes")
for _, row in anomalies.iterrows():
print(
f"User={row['username']}, Window={row['timestamp']}, "
f"Failures={int(row['failed_count'])}"
)
if __name__ == "__main__":
events = load_login_events("./normalized_logins.csv")
baseline = calculate_baseline(events)
spikes = detect_anomalies(baseline, threshold=10)
summarize(spikes)
Z-Score Anomaly Scoring
#!/usr/bin/env python3
"""
Score anomalies using a z-score for each user.
"""
from __future__ import annotations
import pandas as pd
from typing import Tuple
def zscore_anomalies(df: pd.DataFrame, window: str = "30min") -> pd.DataFrame:
"""
Compute per-user z-scores for failed logins.
"""
failures = df[df["outcome"] == "failed"].copy()
failures["timestamp"] = pd.to_datetime(failures["timestamp"], errors="coerce")
failures = failures.dropna(subset=["timestamp"])
failures = failures.set_index("timestamp")
counts = failures.groupby("username").resample(window).size().reset_index(name="failed_count")
# Compute mean/std per user
stats = counts.groupby("username")["failed_count"].agg(["mean", "std"]).reset_index()
merged = counts.merge(stats, on="username", how="left")
merged["std"] = merged["std"].fillna(1.0)
merged["zscore"] = (merged["failed_count"] - merged["mean"]) / merged["std"]
return merged.sort_values("zscore", ascending=False)
if __name__ == "__main__":
df = pd.read_csv("./normalized_logins.csv")
scored = zscore_anomalies(df)
print(scored.head(10))
Sample Normalized CSV
timestamp,host,source,app,message,source_ip,username,outcome
2024-10-17T12:10:22Z,auth-01,syslog,sshd,Failed password for root from 203.0.113.12,203.0.113.12,root,failed
2024-10-17T12:10:24Z,auth-01,syslog,sshd,Failed password for root from 203.0.113.12,203.0.113.12,root,failed
2024-10-17T12:11:10Z,auth-01,syslog,sshd,Accepted password for alice from 198.51.100.40,198.51.100.40,alice,success
2024-10-17T12:12:02Z,auth-01,syslog,sshd,Failed password for admin from 203.0.113.12,203.0.113.12,admin,failed
2024-10-17T12:12:05Z,auth-01,syslog,sshd,Failed password for admin from 203.0.113.12,203.0.113.12,admin,failed
2024-10-17T12:13:10Z,auth-01,syslog,sshd,Accepted password for admin from 203.0.113.12,203.0.113.12,admin,success
Rule Definition (Sigma-like YAML)
title: SSH Brute Force Spike
id: 2e4a8e6b-9a10-4a50-94e6-8b3b4051f2df
status: experimental
description: Detects multiple failed SSH logins from same IP
logsource:
product: linux
service: sshd
detection:
selection:
message|contains: "Failed password"
timeframe: 10m
condition: selection | count(source_ip) > 8
falsepositives:
- Maintenance scripts
- Vulnerability scanners in authorized tests
level: high
Enrichment: IP Reputation Cache
#!/usr/bin/env python3
"""
Simple IP reputation cache to reduce API calls.
"""
from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Dict, Optional
CACHE_FILE = Path("./ip_reputation_cache.json")
CACHE_TTL = 3600 # 1 hour
def load_cache() -> Dict[str, Dict[str, str]]:
"""Load cached reputation data from disk."""
if CACHE_FILE.exists():
try:
return json.loads(CACHE_FILE.read_text())
except json.JSONDecodeError:
return {}
return {}
def save_cache(cache: Dict[str, Dict[str, str]]) -> None:
"""Persist cache to disk."""
CACHE_FILE.write_text(json.dumps(cache, indent=2))
def get_cached_ip(cache: Dict[str, Dict[str, str]], ip: str) -> Optional[Dict[str, str]]:
"""Return cached entry if fresh."""
entry = cache.get(ip)
if not entry:
return None
if time.time() - entry.get("cached_at", 0) > CACHE_TTL:
return None
return entry
def set_cached_ip(cache: Dict[str, Dict[str, str]], ip: str, data: Dict[str, str]) -> None:
"""Store entry with cache timestamp."""
data["cached_at"] = int(time.time())
cache[ip] = data
Section 3: Real-Time Monitoring with watchdog
Why Real-Time Matters
Batch analysis is useful for reporting, but defenders often need immediate alerts. A lightweight file watcher can mimic SIEM live pipelines.
Tail-Following Log Stream
#!/usr/bin/env python3
"""
Follow a log file like `tail -f` and emit normalized events.
"""
from __future__ import annotations
import time
from pathlib import Path
from typing import Iterator
def follow(path: Path) -> Iterator[str]:
"""
Yield new lines appended to a file.
"""
with path.open("r", encoding="utf-8") as handle:
handle.seek(0, 2) # Seek to end
while True:
line = handle.readline()
if line:
yield line
else:
time.sleep(0.2)
if __name__ == "__main__":
log_path = Path("/var/log/auth.log")
for line in follow(log_path):
print(line.strip())
Watchdog Event Handler
#!/usr/bin/env python3
"""
Use watchdog to monitor log directory and parse new files.
"""
from __future__ import annotations
import time
from pathlib import Path
from typing import List
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from log_normalizer import normalize_line
class LogEventHandler(FileSystemEventHandler):
"""
Process log file changes and emit normalized events.
"""
def __init__(self, watch_paths: List[Path]) -> None:
self.watch_paths = watch_paths
def on_modified(self, event) -> None:
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix not in {".log", ".txt"}:
return
self._handle_file(path)
def _handle_file(self, path: Path) -> None:
try:
with path.open("r", encoding="utf-8") as handle:
for line in handle.readlines()[-20:]:
normalized = normalize_line(line)
if normalized:
print(normalized)
except OSError as exc:
print(f"[!] Failed to read {path}: {exc}")
def main() -> None:
watch_dir = Path("/var/log")
handler = LogEventHandler([watch_dir])
observer = Observer()
observer.schedule(handler, str(watch_dir), recursive=False)
observer.start()
print("[*] Watching /var/log for updates...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
main()
Rule-Based Correlation
Simple correlation rules can reduce alert fatigue by combining multiple signals into a single alert.
#!/usr/bin/env python3
"""
Rule-based correlation engine for normalized events.
"""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
class RuleEngine:
"""
Correlate events into security alerts.
"""
def __init__(self, window_minutes: int = 10) -> None:
self.window = timedelta(minutes=window_minutes)
self.events_by_ip: Dict[str, List[Dict[str, str]]] = defaultdict(list)
def add_event(self, event: Dict[str, str]) -> List[Dict[str, str]]:
"""
Add event and return any triggered alerts.
"""
alerts = []
ip = event.get("source_ip", "unknown")
now = datetime.utcnow()
self.events_by_ip[ip].append(event)
# Drop old events
self.events_by_ip[ip] = [
e for e in self.events_by_ip[ip]
if now - datetime.fromisoformat(e.get("timestamp", now.isoformat())) <= self.window
]
failed = [e for e in self.events_by_ip[ip] if "Failed password" in e.get("message", "")]
if len(failed) >= 5:
alerts.append({
"alert_type": "correlated_bruteforce",
"severity": "high",
"source_ip": ip,
"count": str(len(failed)),
"message": "Correlated brute-force pattern detected",
"timestamp": now.isoformat(),
})
return alerts
if __name__ == "__main__":
engine = RuleEngine()
sample = {
"timestamp": datetime.utcnow().isoformat(),
"source_ip": "203.0.113.12",
"message": "Failed password for root",
}
print(engine.add_event(sample))
Section 4: Threat Intelligence & Geo Enrichment
Why Enrichment?
Alert quality improves when analysts can quickly see context: IP reputation, country, ASN, and known malicious indicators.
GeoIP Enrichment (geoip2)
#!/usr/bin/env python3
"""
Enrich log events with GeoIP data.
Requires GeoLite2 database (MaxMind) and geoip2 library.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Optional
import geoip2.database
@dataclass
class GeoContext:
ip: str
country: str
city: str
latitude: float
longitude: float
def lookup_geo(ip: str, db_path: str = "./GeoLite2-City.mmdb") -> Optional[GeoContext]:
"""
Lookup IP geolocation safely.
"""
try:
reader = geoip2.database.Reader(db_path)
response = reader.city(ip)
return GeoContext(
ip=ip,
country=response.country.name or "Unknown",
city=response.city.name or "Unknown",
latitude=response.location.latitude or 0.0,
longitude=response.location.longitude or 0.0,
)
except Exception as exc:
print(f"[!] Geo lookup failed for {ip}: {exc}")
return None
if __name__ == "__main__":
ctx = lookup_geo("8.8.8.8")
print(ctx)
Threat Intel Enrichment (AbuseIPDB/VirusTotal)
#!/usr/bin/env python3
"""
Threat intelligence enrichment for suspicious IPs.
Uses AbuseIPDB as an example (replace with your API provider).
"""
from __future__ import annotations
import os
import time
from typing import Dict, Optional
import requests
ABUSEIPDB_URL = "https://api.abuseipdb.com/api/v2/check"
def lookup_abuse_ip(ip: str, api_key: str) -> Optional[Dict[str, str]]:
"""
Query AbuseIPDB for an IP reputation score.
"""
headers = {"Key": api_key, "Accept": "application/json"}
params = {"ipAddress": ip, "maxAgeInDays": 90}
try:
response = requests.get(ABUSEIPDB_URL, headers=headers, params=params, timeout=15)
response.raise_for_status()
except requests.RequestException as exc:
print(f"[!] Threat intel lookup failed: {exc}")
return None
data = response.json().get("data", {})
return {
"ip": ip,
"abuseConfidenceScore": str(data.get("abuseConfidenceScore", "0")),
"isPublic": str(data.get("isPublic", "false")),
"lastReportedAt": str(data.get("lastReportedAt", "")),
}
def rate_limited_lookup(ip: str, api_key: str, delay: float = 1.2) -> Optional[Dict[str, str]]:
"""
Respect API rate limits by sleeping between calls.
"""
result = lookup_abuse_ip(ip, api_key)
time.sleep(delay)
return result
if __name__ == "__main__":
key = os.getenv("ABUSEIPDB_KEY", "")
if not key:
raise SystemExit("Missing ABUSEIPDB_KEY")
print(rate_limited_lookup("203.0.113.12", key))
Section 5: Alerting Pipeline
From Event to Alert
A defensive pipeline converts raw logs into alert objects. These alerts can be sent to Slack, email, or a ticketing system.
Simple Alert Formatter
#!/usr/bin/env python3
"""
Create alert objects for suspicious events.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Dict
@dataclass
class Alert:
alert_type: str
severity: str
message: str
source_ip: str
host: str
timestamp: str
def to_dict(self) -> Dict[str, str]:
"""Convert to dict for JSON export."""
return {
"alert_type": self.alert_type,
"severity": self.severity,
"message": self.message,
"source_ip": self.source_ip,
"host": self.host,
"timestamp": self.timestamp,
}
def build_alert(event: Dict[str, str], reason: str, severity: str = "high") -> Alert:
"""
Build a structured alert from a normalized event.
"""
return Alert(
alert_type="suspicious_activity",
severity=severity,
message=reason,
source_ip=event.get("source_ip", "unknown"),
host=event.get("host", "unknown"),
timestamp=event.get("timestamp", datetime.utcnow().isoformat()),
)
def export_alert(alert: Alert, output_path: str) -> None:
"""
Save alert to JSON for SIEM ingestion.
"""
with open(output_path, "w", encoding="utf-8") as handle:
json.dump(alert.to_dict(), handle, indent=2)
if __name__ == "__main__":
test_event = {
"timestamp": "2024-10-17T12:14:01Z",
"host": "auth-01",
"source_ip": "203.0.113.12",
"message": "Failed password for admin",
}
alert = build_alert(test_event, reason="Brute force spike from IP")
export_alert(alert, "./alert.json")
Lab Safety: Use only synthetic logs or logs from your own lab environment. Do not export production logs with PII.
Lab Overview
You will build a four-part defensive toolkit: multi-format ingestion, brute-force detection, geolocation anomalies, and threat intel enrichment.
Lab Part 1: Multi-format Log Aggregator (25-35 min)
Objective: Normalize syslog, JSON, and Windows Event logs into a single CSV.
Requirements:
Accept a directory of log files and detect format
Normalize to fields: timestamp, host, source, app, message, source_ip, username
Write output to normalized_events.csv
Success Criteria: Running your script produces a clean CSV with consistent columns.
Hint: Robust parsing approach
def extract_source_ip(message: str) -> str:
"""Extract IP from message; return empty if not found."""
match = re.search(r"\b(?:\d{1,3}\.){3}\d{1,3}\b", message)
return match.group(0) if match else ""
# Apply extraction after normalization
row["source_ip"] = extract_source_ip(row["message"])
Lab Part 2: Brute Force Attack Detector (20-30 min)
Objective: Detect brute-force logins using rolling windows.
Requirements:
Detect 10+ failed logins for the same user within 15 minutes
Output alerts to alerts/failed_login_*.json
Include severity and remediation hint in each alert
Success Criteria: Trigger alerts with a provided test dataset.