Week Overview
This week shifts to defensive security automation. You will build Python tools that ingest logs, detect anomalies, and enrich suspicious events with threat intelligence.
- Parse and normalize syslog, JSON, and Windows event logs
- Build baseline metrics and detect deviations
- Automate alerting for failed logins and suspicious activity
- Enrich events with IP reputation and geolocation
- Design a mini-SIEM workflow with summary dashboards
Section 1: Log Formats & Normalization
Why Normalization Matters
Logs come from different sources and formats. Normalization converts them into a consistent schema so alerts, dashboards, and analytics can query reliably.
Common Log Formats
- Syslog: RFC 3164/5424 text format, common for Linux services
- JSON Logs: Structured logs from cloud systems and modern apps
- Windows Event Logs: XML or EVTX data exported from Windows
Syslog Severity Levels
| Level | Keyword | Meaning |
|---|---|---|
| 0 | Emergency | System unusable |
| 1 | Alert | Immediate action required |
| 2 | Critical | Critical condition |
| 3 | Error | Error condition |
| 4 | Warning | Warning condition |
| 5 | Notice | Normal but significant |
| 6 | Informational | Informational messages |
| 7 | Debug | Debug-level messages |
Windows Event ID Quick Reference
- 4624: Successful logon
- 4625: Failed logon (brute force indicator)
- 4688: Process creation (useful for malware hunting)
- 4720: User account creation
- 4726: User account deletion
- 4769: Kerberos service ticket request
Example: Multi-format Normalizer
#!/usr/bin/env python3
"""
Multi-format log normalizer.
Supports syslog, JSON, and Windows Event Log exports (XML).
"""
from __future__ import annotations
import json
import re
import xml.etree.ElementTree as ET
from datetime import datetime
from typing import Dict, Optional
SYSLOG_PATTERN = re.compile(
r"^(?P<ts>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+"
r"(?P<host>[\w\.-]+)\s+(?P<app>[\w\-/]+)(?:\[(?P<pid>\d+)\])?:\s+(?P<msg>.*)$"
)
def parse_syslog(line: str) -> Optional[Dict[str, str]]:
"""
Parse a syslog line into a normalized dict.
Args:
line: Raw syslog line
Returns:
Normalized dict or None if parsing fails
"""
match = SYSLOG_PATTERN.match(line.strip())
if not match:
return None
data = match.groupdict()
try:
ts = datetime.strptime(data["ts"], "%b %d %H:%M:%S")
except ValueError:
ts = datetime.utcnow()
return {
"timestamp": ts.isoformat(),
"source": "syslog",
"host": data.get("host", "unknown"),
"app": data.get("app", "unknown"),
"pid": data.get("pid", "-"),
"message": data.get("msg", ""),
}
def parse_json(line: str) -> Optional[Dict[str, str]]:
"""
Parse JSON log line into a normalized dict.
"""
try:
raw = json.loads(line)
except json.JSONDecodeError:
return None
return {
"timestamp": raw.get("timestamp") or raw.get("@timestamp", ""),
"source": raw.get("source", "json"),
"host": raw.get("host", "unknown"),
"app": raw.get("service", raw.get("app", "unknown")),
"pid": str(raw.get("pid", "-")),
"message": raw.get("message", ""),
}
def parse_windows_event(xml_line: str) -> Optional[Dict[str, str]]:
"""
Parse Windows Event Log XML export.
"""
try:
root = ET.fromstring(xml_line)
except ET.ParseError:
return None
ns = {"ns": "http://schemas.microsoft.com/win/2004/08/events/event"}
system = root.find("ns:System", ns)
event_data = root.find("ns:EventData", ns)
if system is None:
return None
ts_node = system.find("ns:TimeCreated", ns)
provider = system.find("ns:Provider", ns)
computer = system.find("ns:Computer", ns)
timestamp = ts_node.get("SystemTime") if ts_node is not None else ""
provider_name = provider.get("Name") if provider is not None else "Unknown"
host = computer.text if computer is not None else "unknown"
message_fields = []
if event_data is not None:
for data in event_data.findall("ns:Data", ns):
field_name = data.get("Name", "field")
field_val = data.text or ""
message_fields.append(f"{field_name}={field_val}")
return {
"timestamp": timestamp,
"source": "windows_event",
"host": host,
"app": provider_name,
"pid": "-",
"message": "; ".join(message_fields),
}
def normalize_line(line: str) -> Optional[Dict[str, str]]:
"""
Determine log format and normalize.
"""
if line.strip().startswith("{"):
return parse_json(line)
if line.strip().startswith("<Event"):
return parse_windows_event(line)
return parse_syslog(line)
def example() -> None:
"""Example usage for quick validation."""
syslog_line = "Oct 17 12:10:22 server sshd[1234]: Failed password for root from 10.0.0.5 port 51515"
json_line = '{"timestamp": "2024-10-17T12:10:22Z", "service": "api", "message": "Login failed", "host": "web1"}'
print(normalize_line(syslog_line))
print(normalize_line(json_line))
if __name__ == "__main__":
example()
Sample Logs for Testing
Oct 17 12:10:22 server sshd[1234]: Failed password for root from 10.0.0.5 port 51515
Oct 17 12:10:25 server sshd[1234]: Failed password for root from 10.0.0.5 port 51516
{"timestamp": "2024-10-17T12:12:10Z", "service": "api", "message": "Invalid token", "host": "edge-1"}
<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event">
<System>
<Provider Name="Microsoft-Windows-Security-Auditing"/>
<TimeCreated SystemTime="2024-10-17T12:14:01.000Z"/>
<Computer>WIN-01</Computer>
</System>
<EventData>
<Data Name="TargetUserName">administrator</Data>
<Data Name="IpAddress">203.0.113.12</Data>
<Data Name="LogonType">3</Data>
</EventData>
</Event>
Section 2: Baselines & Anomaly Detection
Baseline vs Deviation
Defensive analytics starts with a baseline. You quantify normal behavior (failed logins per user, ports per host) then flag deviations.
Failed Login Detector with Rolling Stats
#!/usr/bin/env python3
"""
Detect login anomalies using rolling statistics.
"""
from __future__ import annotations
import csv
from datetime import datetime, timedelta
from typing import Dict, List
import pandas as pd
def load_login_events(path: str) -> pd.DataFrame:
"""
Load normalized login events from CSV.
CSV must include: timestamp, username, source_ip, outcome
"""
try:
df = pd.read_csv(path)
except FileNotFoundError:
raise FileNotFoundError(f"Missing log file: {path}")
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
df = df.dropna(subset=["timestamp"])
return df
def calculate_baseline(df: pd.DataFrame, window: str = "60min") -> pd.DataFrame:
"""
Compute rolling baseline for failed logins.
"""
failures = df[df["outcome"] == "failed"].copy()
failures = failures.set_index("timestamp")
failures["count"] = 1
baseline = failures.groupby("username").resample(window).sum()
baseline = baseline.reset_index().rename(columns={"count": "failed_count"})
return baseline
def detect_anomalies(baseline: pd.DataFrame, threshold: int = 8) -> pd.DataFrame:
"""
Flag buckets exceeding threshold.
"""
return baseline[baseline["failed_count"] >= threshold]
def summarize(anomalies: pd.DataFrame) -> None:
"""
Print simple summary of anomalies for analysts.
"""
if anomalies.empty:
print("[โ] No anomalies detected")
return
print("[!] Anomalous failed login spikes")
for _, row in anomalies.iterrows():
print(
f"User={row['username']}, Window={row['timestamp']}, "
f"Failures={int(row['failed_count'])}"
)
if __name__ == "__main__":
events = load_login_events("./normalized_logins.csv")
baseline = calculate_baseline(events)
spikes = detect_anomalies(baseline, threshold=10)
summarize(spikes)
Z-Score Anomaly Scoring
#!/usr/bin/env python3
"""
Score anomalies using a z-score for each user.
"""
from __future__ import annotations
import pandas as pd
from typing import Tuple
def zscore_anomalies(df: pd.DataFrame, window: str = "30min") -> pd.DataFrame:
"""
Compute per-user z-scores for failed logins.
"""
failures = df[df["outcome"] == "failed"].copy()
failures["timestamp"] = pd.to_datetime(failures["timestamp"], errors="coerce")
failures = failures.dropna(subset=["timestamp"])
failures = failures.set_index("timestamp")
counts = failures.groupby("username").resample(window).size().reset_index(name="failed_count")
# Compute mean/std per user
stats = counts.groupby("username")["failed_count"].agg(["mean", "std"]).reset_index()
merged = counts.merge(stats, on="username", how="left")
merged["std"] = merged["std"].fillna(1.0)
merged["zscore"] = (merged["failed_count"] - merged["mean"]) / merged["std"]
return merged.sort_values("zscore", ascending=False)
if __name__ == "__main__":
df = pd.read_csv("./normalized_logins.csv")
scored = zscore_anomalies(df)
print(scored.head(10))
Sample Normalized CSV
timestamp,host,source,app,message,source_ip,username,outcome
2024-10-17T12:10:22Z,auth-01,syslog,sshd,Failed password for root from 203.0.113.12,203.0.113.12,root,failed
2024-10-17T12:10:24Z,auth-01,syslog,sshd,Failed password for root from 203.0.113.12,203.0.113.12,root,failed
2024-10-17T12:11:10Z,auth-01,syslog,sshd,Accepted password for alice from 198.51.100.40,198.51.100.40,alice,success
2024-10-17T12:12:02Z,auth-01,syslog,sshd,Failed password for admin from 203.0.113.12,203.0.113.12,admin,failed
2024-10-17T12:12:05Z,auth-01,syslog,sshd,Failed password for admin from 203.0.113.12,203.0.113.12,admin,failed
2024-10-17T12:13:10Z,auth-01,syslog,sshd,Accepted password for admin from 203.0.113.12,203.0.113.12,admin,success
Rule Definition (Sigma-like YAML)
title: SSH Brute Force Spike
id: 2e4a8e6b-9a10-4a50-94e6-8b3b4051f2df
status: experimental
description: Detects multiple failed SSH logins from same IP
logsource:
product: linux
service: sshd
detection:
selection:
message|contains: "Failed password"
timeframe: 10m
condition: selection | count(source_ip) > 8
falsepositives:
- Maintenance scripts
- Vulnerability scanners in authorized tests
level: high
Enrichment: IP Reputation Cache
#!/usr/bin/env python3
"""
Simple IP reputation cache to reduce API calls.
"""
from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Dict, Optional
CACHE_FILE = Path("./ip_reputation_cache.json")
CACHE_TTL = 3600 # 1 hour
def load_cache() -> Dict[str, Dict[str, str]]:
"""Load cached reputation data from disk."""
if CACHE_FILE.exists():
try:
return json.loads(CACHE_FILE.read_text())
except json.JSONDecodeError:
return {}
return {}
def save_cache(cache: Dict[str, Dict[str, str]]) -> None:
"""Persist cache to disk."""
CACHE_FILE.write_text(json.dumps(cache, indent=2))
def get_cached_ip(cache: Dict[str, Dict[str, str]], ip: str) -> Optional[Dict[str, str]]:
"""Return cached entry if fresh."""
entry = cache.get(ip)
if not entry:
return None
if time.time() - entry.get("cached_at", 0) > CACHE_TTL:
return None
return entry
def set_cached_ip(cache: Dict[str, Dict[str, str]], ip: str, data: Dict[str, str]) -> None:
"""Store entry with cache timestamp."""
data["cached_at"] = int(time.time())
cache[ip] = data
Section 3: Real-Time Monitoring with watchdog
Why Real-Time Matters
Batch analysis is useful for reporting, but defenders often need immediate alerts. A lightweight file watcher can mimic SIEM live pipelines.
Tail-Following Log Stream
#!/usr/bin/env python3
"""
Follow a log file like `tail -f` and emit normalized events.
"""
from __future__ import annotations
import time
from pathlib import Path
from typing import Iterator
def follow(path: Path) -> Iterator[str]:
"""
Yield new lines appended to a file.
"""
with path.open("r", encoding="utf-8") as handle:
handle.seek(0, 2) # Seek to end
while True:
line = handle.readline()
if line:
yield line
else:
time.sleep(0.2)
if __name__ == "__main__":
log_path = Path("/var/log/auth.log")
for line in follow(log_path):
print(line.strip())
Watchdog Event Handler
#!/usr/bin/env python3
"""
Use watchdog to monitor log directory and parse new files.
"""
from __future__ import annotations
import time
from pathlib import Path
from typing import List
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from log_normalizer import normalize_line
class LogEventHandler(FileSystemEventHandler):
"""
Process log file changes and emit normalized events.
"""
def __init__(self, watch_paths: List[Path]) -> None:
self.watch_paths = watch_paths
def on_modified(self, event) -> None:
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix not in {".log", ".txt"}:
return
self._handle_file(path)
def _handle_file(self, path: Path) -> None:
try:
with path.open("r", encoding="utf-8") as handle:
for line in handle.readlines()[-20:]:
normalized = normalize_line(line)
if normalized:
print(normalized)
except OSError as exc:
print(f"[!] Failed to read {path}: {exc}")
def main() -> None:
watch_dir = Path("/var/log")
handler = LogEventHandler([watch_dir])
observer = Observer()
observer.schedule(handler, str(watch_dir), recursive=False)
observer.start()
print("[*] Watching /var/log for updates...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
main()
Rule-Based Correlation
Simple correlation rules can reduce alert fatigue by combining multiple signals into a single alert.
#!/usr/bin/env python3
"""
Rule-based correlation engine for normalized events.
"""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
class RuleEngine:
"""
Correlate events into security alerts.
"""
def __init__(self, window_minutes: int = 10) -> None:
self.window = timedelta(minutes=window_minutes)
self.events_by_ip: Dict[str, List[Dict[str, str]]] = defaultdict(list)
def add_event(self, event: Dict[str, str]) -> List[Dict[str, str]]:
"""
Add event and return any triggered alerts.
"""
alerts = []
ip = event.get("source_ip", "unknown")
now = datetime.utcnow()
self.events_by_ip[ip].append(event)
# Drop old events
self.events_by_ip[ip] = [
e for e in self.events_by_ip[ip]
if now - datetime.fromisoformat(e.get("timestamp", now.isoformat())) <= self.window
]
failed = [e for e in self.events_by_ip[ip] if "Failed password" in e.get("message", "")]
if len(failed) >= 5:
alerts.append({
"alert_type": "correlated_bruteforce",
"severity": "high",
"source_ip": ip,
"count": str(len(failed)),
"message": "Correlated brute-force pattern detected",
"timestamp": now.isoformat(),
})
return alerts
if __name__ == "__main__":
engine = RuleEngine()
sample = {
"timestamp": datetime.utcnow().isoformat(),
"source_ip": "203.0.113.12",
"message": "Failed password for root",
}
print(engine.add_event(sample))
Section 4: Threat Intelligence & Geo Enrichment
Why Enrichment?
Alert quality improves when analysts can quickly see context: IP reputation, country, ASN, and known malicious indicators.
GeoIP Enrichment (geoip2)
#!/usr/bin/env python3
"""
Enrich log events with GeoIP data.
Requires GeoLite2 database (MaxMind) and geoip2 library.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Optional
import geoip2.database
@dataclass
class GeoContext:
ip: str
country: str
city: str
latitude: float
longitude: float
def lookup_geo(ip: str, db_path: str = "./GeoLite2-City.mmdb") -> Optional[GeoContext]:
"""
Lookup IP geolocation safely.
"""
try:
reader = geoip2.database.Reader(db_path)
response = reader.city(ip)
return GeoContext(
ip=ip,
country=response.country.name or "Unknown",
city=response.city.name or "Unknown",
latitude=response.location.latitude or 0.0,
longitude=response.location.longitude or 0.0,
)
except Exception as exc:
print(f"[!] Geo lookup failed for {ip}: {exc}")
return None
if __name__ == "__main__":
ctx = lookup_geo("8.8.8.8")
print(ctx)
Threat Intel Enrichment (AbuseIPDB/VirusTotal)
#!/usr/bin/env python3
"""
Threat intelligence enrichment for suspicious IPs.
Uses AbuseIPDB as an example (replace with your API provider).
"""
from __future__ import annotations
import os
import time
from typing import Dict, Optional
import requests
ABUSEIPDB_URL = "https://api.abuseipdb.com/api/v2/check"
def lookup_abuse_ip(ip: str, api_key: str) -> Optional[Dict[str, str]]:
"""
Query AbuseIPDB for an IP reputation score.
"""
headers = {"Key": api_key, "Accept": "application/json"}
params = {"ipAddress": ip, "maxAgeInDays": 90}
try:
response = requests.get(ABUSEIPDB_URL, headers=headers, params=params, timeout=15)
response.raise_for_status()
except requests.RequestException as exc:
print(f"[!] Threat intel lookup failed: {exc}")
return None
data = response.json().get("data", {})
return {
"ip": ip,
"abuseConfidenceScore": str(data.get("abuseConfidenceScore", "0")),
"isPublic": str(data.get("isPublic", "false")),
"lastReportedAt": str(data.get("lastReportedAt", "")),
}
def rate_limited_lookup(ip: str, api_key: str, delay: float = 1.2) -> Optional[Dict[str, str]]:
"""
Respect API rate limits by sleeping between calls.
"""
result = lookup_abuse_ip(ip, api_key)
time.sleep(delay)
return result
if __name__ == "__main__":
key = os.getenv("ABUSEIPDB_KEY", "")
if not key:
raise SystemExit("Missing ABUSEIPDB_KEY")
print(rate_limited_lookup("203.0.113.12", key))
Section 5: Alerting Pipeline
From Event to Alert
A defensive pipeline converts raw logs into alert objects. These alerts can be sent to Slack, email, or a ticketing system.
Simple Alert Formatter
#!/usr/bin/env python3
"""
Create alert objects for suspicious events.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Dict
@dataclass
class Alert:
alert_type: str
severity: str
message: str
source_ip: str
host: str
timestamp: str
def to_dict(self) -> Dict[str, str]:
"""Convert to dict for JSON export."""
return {
"alert_type": self.alert_type,
"severity": self.severity,
"message": self.message,
"source_ip": self.source_ip,
"host": self.host,
"timestamp": self.timestamp,
}
def build_alert(event: Dict[str, str], reason: str, severity: str = "high") -> Alert:
"""
Build a structured alert from a normalized event.
"""
return Alert(
alert_type="suspicious_activity",
severity=severity,
message=reason,
source_ip=event.get("source_ip", "unknown"),
host=event.get("host", "unknown"),
timestamp=event.get("timestamp", datetime.utcnow().isoformat()),
)
def export_alert(alert: Alert, output_path: str) -> None:
"""
Save alert to JSON for SIEM ingestion.
"""
with open(output_path, "w", encoding="utf-8") as handle:
json.dump(alert.to_dict(), handle, indent=2)
if __name__ == "__main__":
test_event = {
"timestamp": "2024-10-17T12:14:01Z",
"host": "auth-01",
"source_ip": "203.0.113.12",
"message": "Failed password for admin",
}
alert = build_alert(test_event, reason="Brute force spike from IP")
export_alert(alert, "./alert.json")
Slack Webhook Notifier
#!/usr/bin/env python3
"""
Send alerts to Slack via incoming webhook.
"""
from __future__ import annotations
import json
import os
from typing import Dict
import requests
def send_slack_alert(alert: Dict[str, str], webhook_url: str) -> bool:
"""
Send alert to Slack channel.
"""
payload = {
"text": f"[{alert['severity'].upper()}] {alert['message']}\n"
f"Host: {alert['host']} IP: {alert['source_ip']}\n"
f"Time: {alert['timestamp']}"
}
try:
response = requests.post(webhook_url, json=payload, timeout=10)
response.raise_for_status()
return True
except requests.RequestException as exc:
print(f"[!] Slack alert failed: {exc}")
return False
if __name__ == "__main__":
webhook = os.getenv("SLACK_WEBHOOK", "")
if not webhook:
raise SystemExit("Missing SLACK_WEBHOOK")
sample_alert = {
"severity": "high",
"message": "Multiple failed logins detected",
"host": "auth-01",
"source_ip": "203.0.113.12",
"timestamp": "2024-10-17T12:15:00Z",
}
send_slack_alert(sample_alert, webhook)
Section 6: Mini-SIEM Dashboard
Analyst-Friendly Summary
A mini dashboard turns alerts into a quick situational report. Start with aggregated counts and high-severity highlights.
Console Dashboard Output
#!/usr/bin/env python3
"""
Generate a console dashboard from alert JSON files.
"""
from __future__ import annotations
import glob
import json
from collections import Counter
from typing import Dict, List
def load_alerts(path_pattern: str = "./alerts/*.json") -> List[Dict[str, str]]:
"""
Load alert files into memory.
"""
alerts = []
for path in glob.glob(path_pattern):
try:
with open(path, "r", encoding="utf-8") as handle:
alerts.append(json.load(handle))
except (OSError, json.JSONDecodeError) as exc:
print(f"[!] Skipping {path}: {exc}")
return alerts
def summarize_alerts(alerts: List[Dict[str, str]]) -> None:
"""
Print summary table.
"""
if not alerts:
print("[โ] No alerts to summarize")
return
severity_counts = Counter(a.get("severity", "unknown") for a in alerts)
type_counts = Counter(a.get("alert_type", "unknown") for a in alerts)
print("=== ALERT SUMMARY ===")
print("Severity counts:")
for sev, count in severity_counts.items():
print(f" - {sev}: {count}")
print("\nAlert types:")
for alert_type, count in type_counts.items():
print(f" - {alert_type}: {count}")
print("\nTop sources:")
ip_counts = Counter(a.get("source_ip", "unknown") for a in alerts)
for ip, count in ip_counts.most_common(5):
print(f" - {ip}: {count} alerts")
def main() -> None:
"""Entrypoint."""
alerts = load_alerts()
summarize_alerts(alerts)
if __name__ == "__main__":
main()
Quick Visualization with Matplotlib
#!/usr/bin/env python3
"""
Plot alert trends for quick analyst visualization.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Dict, List
import matplotlib.pyplot as plt
import pandas as pd
def load_alerts_from_dir(alert_dir: str = "./alerts") -> List[Dict[str, str]]:
"""
Load alerts from a directory.
"""
alerts = []
for path in Path(alert_dir).glob("*.json"):
try:
alerts.append(json.loads(path.read_text()))
except (OSError, json.JSONDecodeError) as exc:
print(f"[!] Failed to read {path}: {exc}")
return alerts
def plot_alerts(alerts: List[Dict[str, str]]) -> None:
"""
Plot alerts by hour and severity.
"""
if not alerts:
print("[โ] No alert data to plot")
return
df = pd.DataFrame(alerts)
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
df = df.dropna(subset=["timestamp"])
df["hour"] = df["timestamp"].dt.floor("H")
counts = df.groupby(["hour", "severity"]).size().reset_index(name="count")
for severity in counts["severity"].unique():
subset = counts[counts["severity"] == severity]
plt.plot(subset["hour"], subset["count"], marker="o", label=severity)
plt.title("Alerts by Hour")
plt.xlabel("Hour")
plt.ylabel("Alert Count")
plt.legend()
plt.tight_layout()
plt.savefig("alert_trends.png")
print("[*] Saved chart to alert_trends.png")
if __name__ == "__main__":
alert_list = load_alerts_from_dir()
plot_alerts(alert_list)
Section 7: SIEM Integration Concepts
Mapping to a Common Schema
SIEM platforms rely on consistent fields. Map your normalized events to a schema like Elastic Common Schema (ECS) or Splunk CIM.
ECS Mapping Example
#!/usr/bin/env python3
"""
Convert normalized events to ECS-like schema for SIEM ingestion.
"""
from __future__ import annotations
from typing import Dict
def to_ecs(event: Dict[str, str]) -> Dict[str, str]:
"""
Map normalized event to ECS-style fields.
"""
return {
"@timestamp": event.get("timestamp", ""),
"host.name": event.get("host", ""),
"event.dataset": event.get("source", ""),
"process.name": event.get("app", ""),
"source.ip": event.get("source_ip", ""),
"user.name": event.get("username", ""),
"message": event.get("message", ""),
"event.kind": "event",
"event.category": "authentication",
"event.type": "info",
}
if __name__ == "__main__":
sample = {
"timestamp": "2024-10-17T12:10:22Z",
"host": "auth-01",
"source": "syslog",
"app": "sshd",
"source_ip": "203.0.113.12",
"username": "root",
"message": "Failed password for root",
}
print(to_ecs(sample))
Elasticsearch Bulk Export (Offline)
#!/usr/bin/env python3
"""
Prepare bulk ingestion payload for Elasticsearch.
This outputs NDJSON for a local lab SIEM instance.
"""
from __future__ import annotations
import json
from typing import Dict, List
def bulk_format(events: List[Dict[str, str]], index_name: str) -> str:
"""
Format events for ES bulk API.
"""
lines = []
for event in events:
action = {"index": {"_index": index_name}}
lines.append(json.dumps(action))
lines.append(json.dumps(event))
return "\\n".join(lines) + "\\n"
if __name__ == "__main__":
events = [
{"@timestamp": "2024-10-17T12:10:22Z", "message": "Failed password", "source.ip": "203.0.113.12"},
{"@timestamp": "2024-10-17T12:11:22Z", "message": "Accepted password", "source.ip": "203.0.113.12"},
]
payload = bulk_format(events, index_name="csy105-logs")
with open("bulk.ndjson", "w", encoding="utf-8") as handle:
handle.write(payload)
print("[*] Wrote bulk payload to bulk.ndjson")
Response Playbook Stubs
Automated response should be conservative. Use playbooks to guide analysts rather than immediate destructive action.
#!/usr/bin/env python3
"""
Generate a response playbook entry for suspicious activity.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict
@dataclass
class Playbook:
title: str
severity: str
steps: str
def to_dict(self) -> Dict[str, str]:
return {
"title": self.title,
"severity": self.severity,
"steps": self.steps,
}
def create_playbook(alert: Dict[str, str]) -> Playbook:
"""
Build a playbook entry for SOC analysts.
"""
steps = (
"1) Verify alert accuracy\\n"
"2) Check source IP reputation\\n"
"3) Search for related events\\n"
"4) Notify incident response lead\\n"
"5) Consider temporary block if malicious\\n"
)
return Playbook(
title=f"Investigate: {alert.get('message', 'Suspicious event')}",
severity=alert.get("severity", "medium"),
steps=steps,
)
if __name__ == "__main__":
sample_alert = {"message": "Brute force spike from 203.0.113.12", "severity": "high"}
print(create_playbook(sample_alert).to_dict())
Log Redaction Utility
#!/usr/bin/env python3
"""
Redact sensitive fields before sharing logs.
"""
from __future__ import annotations
import re
from typing import Dict
EMAIL_PATTERN = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}")
IP_PATTERN = re.compile(r"\\b(?:\\d{1,3}\\.){3}\\d{1,3}\\b")
def redact_message(message: str) -> str:
"""
Replace emails and IP addresses with placeholders.
"""
message = EMAIL_PATTERN.sub("[REDACTED_EMAIL]", message)
message = IP_PATTERN.sub("[REDACTED_IP]", message)
return message
def redact_event(event: Dict[str, str]) -> Dict[str, str]:
"""
Return a redacted copy of an event.
"""
redacted = dict(event)
redacted["message"] = redact_message(event.get("message", ""))
if "source_ip" in redacted:
redacted["source_ip"] = "[REDACTED_IP]"
if "username" in redacted and redacted["username"]:
redacted["username"] = "[REDACTED_USER]"
return redacted
if __name__ == "__main__":
sample = {
"message": "Failed login for alice@example.com from 203.0.113.12",
"source_ip": "203.0.113.12",
"username": "alice",
}
print(redact_event(sample))
Ticketing Stub (Local)
#!/usr/bin/env python3
"""
Create a simple incident ticket in JSON for tracking.
"""
from __future__ import annotations
import json
from datetime import datetime
from typing import Dict
def create_ticket(alert: Dict[str, str], output_dir: str = "./tickets") -> str:
"""
Write a ticket file to disk.
"""
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
ticket_id = f"TICKET-{timestamp}"
ticket = {
"id": ticket_id,
"created_at": datetime.utcnow().isoformat(),
"severity": alert.get("severity", "medium"),
"summary": alert.get("message", "Suspicious activity"),
"source_ip": alert.get("source_ip", "unknown"),
"status": "open",
"owner": "soc-analyst",
}
path = f"{output_dir}/{ticket_id}.json"
with open(path, "w", encoding="utf-8") as handle:
json.dump(ticket, handle, indent=2)
return path
if __name__ == "__main__":
sample_alert = {
"severity": "high",
"message": "Brute force spike from 203.0.113.12",
"source_ip": "203.0.113.12",
}
print(create_ticket(sample_alert))
Lab 08: Defensive Log Analysis Toolkit (90-150 minutes)
Lab Overview
You will build a four-part defensive toolkit: multi-format ingestion, brute-force detection, geolocation anomalies, and threat intel enrichment.
Lab Part 1: Multi-format Log Aggregator (25-35 min)
Objective: Normalize syslog, JSON, and Windows Event logs into a single CSV.
Requirements:
- Accept a directory of log files and detect format
- Normalize to fields: timestamp, host, source, app, message, source_ip, username
- Write output to
normalized_events.csv
Success Criteria: Running your script produces a clean CSV with consistent columns.
Hint: Robust parsing approach
def extract_source_ip(message: str) -> str:
"""Extract IP from message; return empty if not found."""
match = re.search(r"\b(?:\d{1,3}\.){3}\d{1,3}\b", message)
return match.group(0) if match else ""
# Apply extraction after normalization
row["source_ip"] = extract_source_ip(row["message"])
Lab Part 2: Brute Force Attack Detector (20-30 min)
Objective: Detect brute-force logins using rolling windows.
Requirements:
- Detect 10+ failed logins for the same user within 15 minutes
- Output alerts to
alerts/failed_login_*.json - Include severity and remediation hint in each alert
Success Criteria: Trigger alerts with a provided test dataset.
Hint: Pandas resample trick
failures = df[df["outcome"] == "failed"].set_index("timestamp")
windowed = failures.groupby("username").resample("15min").size()
spikes = windowed[windowed >= 10].reset_index(name="failed_count")
Lab Part 3: Geolocation-based Anomaly Detector (20-30 min)
Objective: Flag logins from unusual countries for high-value accounts.
Requirements:
- Use GeoIP data to tag each login event
- Build baseline country list per user (top 3 countries)
- Alert when a login appears from a new country
Success Criteria: Alerts include country, city, and user baseline.
Hint: Baseline aggregation
baseline = (
df.groupby(["username", "country"]).size()
.reset_index(name="count")
.sort_values(["username", "count"], ascending=[True, False])
)
# Keep top 3 countries per user
baseline = baseline.groupby("username").head(3)
Lab Part 4: Threat Intel Enrichment (20-30 min)
Objective: Enrich suspicious IPs with threat intelligence.
Requirements:
- Use AbuseIPDB or VirusTotal API
- Cache responses to avoid rate limits
- Attach score to alert JSON output
Success Criteria: Alerts include score and last-seen metadata.
Hint: Cache + API workflow
cache = load_cache()
if cached := get_cached_ip(cache, ip):
intel = cached
else:
intel = lookup_abuse_ip(ip, api_key)
if intel:
set_cached_ip(cache, ip, intel)
save_cache(cache)
Stretch Challenges (Optional)
- Build a correlation rule that links failed logins + new country + high abuse score
- Export alerts to CSV and include a one-page executive summary
- Create a daily summary job using cron and email the report to yourself
Hint: Correlate multiple signals
def correlated_alert(event: dict, geo: dict, intel: dict) -> dict | None:
\"\"\"Return an alert only if multiple risk signals are present.\"\"\"
if event.get(\"outcome\") != \"failed\":
return None
if geo.get(\"country\") in {\"Unknown\", \"\"}:
return None
if int(intel.get(\"abuseConfidenceScore\", 0)) < 50:
return None
return {
\"alert_type\": \"multi_signal_bruteforce\",
\"severity\": \"critical\",
\"message\": \"Failed login from high-risk IP + unusual country\",
\"source_ip\": event.get(\"source_ip\", \"unknown\"),
\"country\": geo.get(\"country\", \"unknown\"),
\"abuseScore\": intel.get(\"abuseConfidenceScore\", \"0\"),
}
๐ค Deliverables:
log_normalizer.py- Multi-format log parserbruteforce_detector.py- Failed login anomaly detectorgeo_anomaly.py- GeoIP anomaly detectionthreat_intel_enricher.py- Threat intel enrichment toolalerts/folder with JSON alerts- Summary report or screenshots of dashboard output
Additional Resources
Log Analysis
Threat Intelligence
Blue Team Guides
- Blue Team Field Manual (BTFM)
- MITRE ATT&CK - Detection Analytics
- Practical Packet Analysis - for log-to-network correlation
Key Takeaways
- โ Normalization is the foundation of log analytics
- โ Baselines enable anomaly detection with minimal false positives
- โ Enrichment adds actionable context for analysts
- โ Alerting pipelines should be structured and rate-limited
- โ Mini-SIEM dashboards provide fast situational awareness
- โ Defensive automation reduces triage time and burnout
Week 08 Quiz
Test your understanding of defensive Python and log analysis.
Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.
Take Quiz