Skip to content
CSY105 Week 08 Beginner

Week Content

Programming for Security

Track your progress through this week's content

Week Overview

This week shifts to defensive security automation. You will build Python tools that ingest logs, detect anomalies, and enrich suspicious events with threat intelligence.

  • Parse and normalize syslog, JSON, and Windows event logs
  • Build baseline metrics and detect deviations
  • Automate alerting for failed logins and suspicious activity
  • Enrich events with IP reputation and geolocation
  • Design a mini-SIEM workflow with summary dashboards
Real-World Context: SOC teams depend on log pipelines (ELK, Splunk, Azure Sentinel) to transform raw events into alerts. Your scripts emulate core SIEM features: parsing, correlation, enrichment, and analyst-ready outputs.
Privacy & Legal Note: Logs often contain PII (usernames, IPs, device IDs). Handle them according to policy, minimize retention, and redact before sharing or storing outside controlled environments.

Section 1: Log Formats & Normalization

Why Normalization Matters

Logs come from different sources and formats. Normalization converts them into a consistent schema so alerts, dashboards, and analytics can query reliably.

Common Log Formats

Syslog Severity Levels

Level Keyword Meaning
0 Emergency System unusable
1 Alert Immediate action required
2 Critical Critical condition
3 Error Error condition
4 Warning Warning condition
5 Notice Normal but significant
6 Informational Informational messages
7 Debug Debug-level messages

Windows Event ID Quick Reference

Example: Multi-format Normalizer

#!/usr/bin/env python3
"""
Multi-format log normalizer.
Supports syslog, JSON, and Windows Event Log exports (XML).
"""
from __future__ import annotations

import json
import re
import xml.etree.ElementTree as ET
from datetime import datetime
from typing import Dict, Optional

SYSLOG_PATTERN = re.compile(
    r"^(?P<ts>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+"
    r"(?P<host>[\w\.-]+)\s+(?P<app>[\w\-/]+)(?:\[(?P<pid>\d+)\])?:\s+(?P<msg>.*)$"
)


def parse_syslog(line: str) -> Optional[Dict[str, str]]:
    """
    Parse a syslog line into a normalized dict.

    Args:
        line: Raw syslog line

    Returns:
        Normalized dict or None if parsing fails
    """
    match = SYSLOG_PATTERN.match(line.strip())
    if not match:
        return None

    data = match.groupdict()
    try:
        ts = datetime.strptime(data["ts"], "%b %d %H:%M:%S")
    except ValueError:
        ts = datetime.utcnow()

    return {
        "timestamp": ts.isoformat(),
        "source": "syslog",
        "host": data.get("host", "unknown"),
        "app": data.get("app", "unknown"),
        "pid": data.get("pid", "-"),
        "message": data.get("msg", ""),
    }


def parse_json(line: str) -> Optional[Dict[str, str]]:
    """
    Parse JSON log line into a normalized dict.
    """
    try:
        raw = json.loads(line)
    except json.JSONDecodeError:
        return None

    return {
        "timestamp": raw.get("timestamp") or raw.get("@timestamp", ""),
        "source": raw.get("source", "json"),
        "host": raw.get("host", "unknown"),
        "app": raw.get("service", raw.get("app", "unknown")),
        "pid": str(raw.get("pid", "-")),
        "message": raw.get("message", ""),
    }


def parse_windows_event(xml_line: str) -> Optional[Dict[str, str]]:
    """
    Parse Windows Event Log XML export.
    """
    try:
        root = ET.fromstring(xml_line)
    except ET.ParseError:
        return None

    ns = {"ns": "http://schemas.microsoft.com/win/2004/08/events/event"}
    system = root.find("ns:System", ns)
    event_data = root.find("ns:EventData", ns)

    if system is None:
        return None

    ts_node = system.find("ns:TimeCreated", ns)
    provider = system.find("ns:Provider", ns)
    computer = system.find("ns:Computer", ns)

    timestamp = ts_node.get("SystemTime") if ts_node is not None else ""
    provider_name = provider.get("Name") if provider is not None else "Unknown"
    host = computer.text if computer is not None else "unknown"

    message_fields = []
    if event_data is not None:
        for data in event_data.findall("ns:Data", ns):
            field_name = data.get("Name", "field")
            field_val = data.text or ""
            message_fields.append(f"{field_name}={field_val}")

    return {
        "timestamp": timestamp,
        "source": "windows_event",
        "host": host,
        "app": provider_name,
        "pid": "-",
        "message": "; ".join(message_fields),
    }


def normalize_line(line: str) -> Optional[Dict[str, str]]:
    """
    Determine log format and normalize.
    """
    if line.strip().startswith("{"):
        return parse_json(line)
    if line.strip().startswith("<Event"):
        return parse_windows_event(line)
    return parse_syslog(line)


def example() -> None:
    """Example usage for quick validation."""
    syslog_line = "Oct 17 12:10:22 server sshd[1234]: Failed password for root from 10.0.0.5 port 51515"
    json_line = '{"timestamp": "2024-10-17T12:10:22Z", "service": "api", "message": "Login failed", "host": "web1"}'
    print(normalize_line(syslog_line))
    print(normalize_line(json_line))


if __name__ == "__main__":
    example()

Sample Logs for Testing

Oct 17 12:10:22 server sshd[1234]: Failed password for root from 10.0.0.5 port 51515
Oct 17 12:10:25 server sshd[1234]: Failed password for root from 10.0.0.5 port 51516
{"timestamp": "2024-10-17T12:12:10Z", "service": "api", "message": "Invalid token", "host": "edge-1"}
<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event">
  <System>
    <Provider Name="Microsoft-Windows-Security-Auditing"/>
    <TimeCreated SystemTime="2024-10-17T12:14:01.000Z"/>
    <Computer>WIN-01</Computer>
  </System>
  <EventData>
    <Data Name="TargetUserName">administrator</Data>
    <Data Name="IpAddress">203.0.113.12</Data>
    <Data Name="LogonType">3</Data>
  </EventData>
</Event>

Section 2: Baselines & Anomaly Detection

Baseline vs Deviation

Defensive analytics starts with a baseline. You quantify normal behavior (failed logins per user, ports per host) then flag deviations.

Failed Login Detector with Rolling Stats

#!/usr/bin/env python3
"""
Detect login anomalies using rolling statistics.
"""
from __future__ import annotations

import csv
from datetime import datetime, timedelta
from typing import Dict, List

import pandas as pd


def load_login_events(path: str) -> pd.DataFrame:
    """
    Load normalized login events from CSV.

    CSV must include: timestamp, username, source_ip, outcome
    """
    try:
        df = pd.read_csv(path)
    except FileNotFoundError:
        raise FileNotFoundError(f"Missing log file: {path}")

    df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
    df = df.dropna(subset=["timestamp"])
    return df


def calculate_baseline(df: pd.DataFrame, window: str = "60min") -> pd.DataFrame:
    """
    Compute rolling baseline for failed logins.
    """
    failures = df[df["outcome"] == "failed"].copy()
    failures = failures.set_index("timestamp")
    failures["count"] = 1

    baseline = failures.groupby("username").resample(window).sum()
    baseline = baseline.reset_index().rename(columns={"count": "failed_count"})
    return baseline


def detect_anomalies(baseline: pd.DataFrame, threshold: int = 8) -> pd.DataFrame:
    """
    Flag buckets exceeding threshold.
    """
    return baseline[baseline["failed_count"] >= threshold]


def summarize(anomalies: pd.DataFrame) -> None:
    """
    Print simple summary of anomalies for analysts.
    """
    if anomalies.empty:
        print("[โœ“] No anomalies detected")
        return

    print("[!] Anomalous failed login spikes")
    for _, row in anomalies.iterrows():
        print(
            f"User={row['username']}, Window={row['timestamp']}, "
            f"Failures={int(row['failed_count'])}"
        )


if __name__ == "__main__":
    events = load_login_events("./normalized_logins.csv")
    baseline = calculate_baseline(events)
    spikes = detect_anomalies(baseline, threshold=10)
    summarize(spikes)

Z-Score Anomaly Scoring

#!/usr/bin/env python3
"""
Score anomalies using a z-score for each user.
"""
from __future__ import annotations

import pandas as pd
from typing import Tuple


def zscore_anomalies(df: pd.DataFrame, window: str = "30min") -> pd.DataFrame:
    """
    Compute per-user z-scores for failed logins.
    """
    failures = df[df["outcome"] == "failed"].copy()
    failures["timestamp"] = pd.to_datetime(failures["timestamp"], errors="coerce")
    failures = failures.dropna(subset=["timestamp"])
    failures = failures.set_index("timestamp")

    counts = failures.groupby("username").resample(window).size().reset_index(name="failed_count")

    # Compute mean/std per user
    stats = counts.groupby("username")["failed_count"].agg(["mean", "std"]).reset_index()
    merged = counts.merge(stats, on="username", how="left")
    merged["std"] = merged["std"].fillna(1.0)
    merged["zscore"] = (merged["failed_count"] - merged["mean"]) / merged["std"]
    return merged.sort_values("zscore", ascending=False)


if __name__ == "__main__":
    df = pd.read_csv("./normalized_logins.csv")
    scored = zscore_anomalies(df)
    print(scored.head(10))

Sample Normalized CSV

timestamp,host,source,app,message,source_ip,username,outcome
2024-10-17T12:10:22Z,auth-01,syslog,sshd,Failed password for root from 203.0.113.12,203.0.113.12,root,failed
2024-10-17T12:10:24Z,auth-01,syslog,sshd,Failed password for root from 203.0.113.12,203.0.113.12,root,failed
2024-10-17T12:11:10Z,auth-01,syslog,sshd,Accepted password for alice from 198.51.100.40,198.51.100.40,alice,success
2024-10-17T12:12:02Z,auth-01,syslog,sshd,Failed password for admin from 203.0.113.12,203.0.113.12,admin,failed
2024-10-17T12:12:05Z,auth-01,syslog,sshd,Failed password for admin from 203.0.113.12,203.0.113.12,admin,failed
2024-10-17T12:13:10Z,auth-01,syslog,sshd,Accepted password for admin from 203.0.113.12,203.0.113.12,admin,success

Rule Definition (Sigma-like YAML)

title: SSH Brute Force Spike
id: 2e4a8e6b-9a10-4a50-94e6-8b3b4051f2df
status: experimental
description: Detects multiple failed SSH logins from same IP
logsource:
  product: linux
  service: sshd
detection:
  selection:
    message|contains: "Failed password"
  timeframe: 10m
  condition: selection | count(source_ip) > 8
falsepositives:
  - Maintenance scripts
  - Vulnerability scanners in authorized tests
level: high

Enrichment: IP Reputation Cache

#!/usr/bin/env python3
"""
Simple IP reputation cache to reduce API calls.
"""
from __future__ import annotations

import json
import time
from pathlib import Path
from typing import Dict, Optional

CACHE_FILE = Path("./ip_reputation_cache.json")
CACHE_TTL = 3600  # 1 hour


def load_cache() -> Dict[str, Dict[str, str]]:
    """Load cached reputation data from disk."""
    if CACHE_FILE.exists():
        try:
            return json.loads(CACHE_FILE.read_text())
        except json.JSONDecodeError:
            return {}
    return {}


def save_cache(cache: Dict[str, Dict[str, str]]) -> None:
    """Persist cache to disk."""
    CACHE_FILE.write_text(json.dumps(cache, indent=2))


def get_cached_ip(cache: Dict[str, Dict[str, str]], ip: str) -> Optional[Dict[str, str]]:
    """Return cached entry if fresh."""
    entry = cache.get(ip)
    if not entry:
        return None

    if time.time() - entry.get("cached_at", 0) > CACHE_TTL:
        return None
    return entry


def set_cached_ip(cache: Dict[str, Dict[str, str]], ip: str, data: Dict[str, str]) -> None:
    """Store entry with cache timestamp."""
    data["cached_at"] = int(time.time())
    cache[ip] = data

Section 3: Real-Time Monitoring with watchdog

Why Real-Time Matters

Batch analysis is useful for reporting, but defenders often need immediate alerts. A lightweight file watcher can mimic SIEM live pipelines.

Tail-Following Log Stream

#!/usr/bin/env python3
"""
Follow a log file like `tail -f` and emit normalized events.
"""
from __future__ import annotations

import time
from pathlib import Path
from typing import Iterator


def follow(path: Path) -> Iterator[str]:
    """
    Yield new lines appended to a file.
    """
    with path.open("r", encoding="utf-8") as handle:
        handle.seek(0, 2)  # Seek to end
        while True:
            line = handle.readline()
            if line:
                yield line
            else:
                time.sleep(0.2)


if __name__ == "__main__":
    log_path = Path("/var/log/auth.log")
    for line in follow(log_path):
        print(line.strip())

Watchdog Event Handler

#!/usr/bin/env python3
"""
Use watchdog to monitor log directory and parse new files.
"""
from __future__ import annotations

import time
from pathlib import Path
from typing import List

from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

from log_normalizer import normalize_line


class LogEventHandler(FileSystemEventHandler):
    """
    Process log file changes and emit normalized events.
    """

    def __init__(self, watch_paths: List[Path]) -> None:
        self.watch_paths = watch_paths

    def on_modified(self, event) -> None:
        if event.is_directory:
            return
        path = Path(event.src_path)
        if path.suffix not in {".log", ".txt"}:
            return
        self._handle_file(path)

    def _handle_file(self, path: Path) -> None:
        try:
            with path.open("r", encoding="utf-8") as handle:
                for line in handle.readlines()[-20:]:
                    normalized = normalize_line(line)
                    if normalized:
                        print(normalized)
        except OSError as exc:
            print(f"[!] Failed to read {path}: {exc}")


def main() -> None:
    watch_dir = Path("/var/log")
    handler = LogEventHandler([watch_dir])
    observer = Observer()
    observer.schedule(handler, str(watch_dir), recursive=False)

    observer.start()
    print("[*] Watching /var/log for updates...")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()


if __name__ == "__main__":
    main()

Rule-Based Correlation

Simple correlation rules can reduce alert fatigue by combining multiple signals into a single alert.

#!/usr/bin/env python3
"""
Rule-based correlation engine for normalized events.
"""
from __future__ import annotations

from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List


class RuleEngine:
    """
    Correlate events into security alerts.
    """

    def __init__(self, window_minutes: int = 10) -> None:
        self.window = timedelta(minutes=window_minutes)
        self.events_by_ip: Dict[str, List[Dict[str, str]]] = defaultdict(list)

    def add_event(self, event: Dict[str, str]) -> List[Dict[str, str]]:
        """
        Add event and return any triggered alerts.
        """
        alerts = []
        ip = event.get("source_ip", "unknown")
        now = datetime.utcnow()
        self.events_by_ip[ip].append(event)

        # Drop old events
        self.events_by_ip[ip] = [
            e for e in self.events_by_ip[ip]
            if now - datetime.fromisoformat(e.get("timestamp", now.isoformat())) <= self.window
        ]

        failed = [e for e in self.events_by_ip[ip] if "Failed password" in e.get("message", "")]
        if len(failed) >= 5:
            alerts.append({
                "alert_type": "correlated_bruteforce",
                "severity": "high",
                "source_ip": ip,
                "count": str(len(failed)),
                "message": "Correlated brute-force pattern detected",
                "timestamp": now.isoformat(),
            })

        return alerts


if __name__ == "__main__":
    engine = RuleEngine()
    sample = {
        "timestamp": datetime.utcnow().isoformat(),
        "source_ip": "203.0.113.12",
        "message": "Failed password for root",
    }
    print(engine.add_event(sample))

Section 4: Threat Intelligence & Geo Enrichment

Why Enrichment?

Alert quality improves when analysts can quickly see context: IP reputation, country, ASN, and known malicious indicators.

GeoIP Enrichment (geoip2)

#!/usr/bin/env python3
"""
Enrich log events with GeoIP data.
Requires GeoLite2 database (MaxMind) and geoip2 library.
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Optional

import geoip2.database

@dataclass
class GeoContext:
    ip: str
    country: str
    city: str
    latitude: float
    longitude: float


def lookup_geo(ip: str, db_path: str = "./GeoLite2-City.mmdb") -> Optional[GeoContext]:
    """
    Lookup IP geolocation safely.
    """
    try:
        reader = geoip2.database.Reader(db_path)
        response = reader.city(ip)
        return GeoContext(
            ip=ip,
            country=response.country.name or "Unknown",
            city=response.city.name or "Unknown",
            latitude=response.location.latitude or 0.0,
            longitude=response.location.longitude or 0.0,
        )
    except Exception as exc:
        print(f"[!] Geo lookup failed for {ip}: {exc}")
        return None


if __name__ == "__main__":
    ctx = lookup_geo("8.8.8.8")
    print(ctx)

Threat Intel Enrichment (AbuseIPDB/VirusTotal)

#!/usr/bin/env python3
"""
Threat intelligence enrichment for suspicious IPs.
Uses AbuseIPDB as an example (replace with your API provider).
"""
from __future__ import annotations

import os
import time
from typing import Dict, Optional

import requests

ABUSEIPDB_URL = "https://api.abuseipdb.com/api/v2/check"


def lookup_abuse_ip(ip: str, api_key: str) -> Optional[Dict[str, str]]:
    """
    Query AbuseIPDB for an IP reputation score.
    """
    headers = {"Key": api_key, "Accept": "application/json"}
    params = {"ipAddress": ip, "maxAgeInDays": 90}

    try:
        response = requests.get(ABUSEIPDB_URL, headers=headers, params=params, timeout=15)
        response.raise_for_status()
    except requests.RequestException as exc:
        print(f"[!] Threat intel lookup failed: {exc}")
        return None

    data = response.json().get("data", {})
    return {
        "ip": ip,
        "abuseConfidenceScore": str(data.get("abuseConfidenceScore", "0")),
        "isPublic": str(data.get("isPublic", "false")),
        "lastReportedAt": str(data.get("lastReportedAt", "")),
    }


def rate_limited_lookup(ip: str, api_key: str, delay: float = 1.2) -> Optional[Dict[str, str]]:
    """
    Respect API rate limits by sleeping between calls.
    """
    result = lookup_abuse_ip(ip, api_key)
    time.sleep(delay)
    return result


if __name__ == "__main__":
    key = os.getenv("ABUSEIPDB_KEY", "")
    if not key:
        raise SystemExit("Missing ABUSEIPDB_KEY")

    print(rate_limited_lookup("203.0.113.12", key))

Section 5: Alerting Pipeline

From Event to Alert

A defensive pipeline converts raw logs into alert objects. These alerts can be sent to Slack, email, or a ticketing system.

Simple Alert Formatter

#!/usr/bin/env python3
"""
Create alert objects for suspicious events.
"""
from __future__ import annotations

import json
from dataclasses import dataclass
from datetime import datetime
from typing import Dict

@dataclass
class Alert:
    alert_type: str
    severity: str
    message: str
    source_ip: str
    host: str
    timestamp: str

    def to_dict(self) -> Dict[str, str]:
        """Convert to dict for JSON export."""
        return {
            "alert_type": self.alert_type,
            "severity": self.severity,
            "message": self.message,
            "source_ip": self.source_ip,
            "host": self.host,
            "timestamp": self.timestamp,
        }


def build_alert(event: Dict[str, str], reason: str, severity: str = "high") -> Alert:
    """
    Build a structured alert from a normalized event.
    """
    return Alert(
        alert_type="suspicious_activity",
        severity=severity,
        message=reason,
        source_ip=event.get("source_ip", "unknown"),
        host=event.get("host", "unknown"),
        timestamp=event.get("timestamp", datetime.utcnow().isoformat()),
    )


def export_alert(alert: Alert, output_path: str) -> None:
    """
    Save alert to JSON for SIEM ingestion.
    """
    with open(output_path, "w", encoding="utf-8") as handle:
        json.dump(alert.to_dict(), handle, indent=2)


if __name__ == "__main__":
    test_event = {
        "timestamp": "2024-10-17T12:14:01Z",
        "host": "auth-01",
        "source_ip": "203.0.113.12",
        "message": "Failed password for admin",
    }
    alert = build_alert(test_event, reason="Brute force spike from IP")
    export_alert(alert, "./alert.json")

Slack Webhook Notifier

#!/usr/bin/env python3
"""
Send alerts to Slack via incoming webhook.
"""
from __future__ import annotations

import json
import os
from typing import Dict

import requests


def send_slack_alert(alert: Dict[str, str], webhook_url: str) -> bool:
    """
    Send alert to Slack channel.
    """
    payload = {
        "text": f"[{alert['severity'].upper()}] {alert['message']}\n"
                f"Host: {alert['host']} IP: {alert['source_ip']}\n"
                f"Time: {alert['timestamp']}"
    }

    try:
        response = requests.post(webhook_url, json=payload, timeout=10)
        response.raise_for_status()
        return True
    except requests.RequestException as exc:
        print(f"[!] Slack alert failed: {exc}")
        return False


if __name__ == "__main__":
    webhook = os.getenv("SLACK_WEBHOOK", "")
    if not webhook:
        raise SystemExit("Missing SLACK_WEBHOOK")

    sample_alert = {
        "severity": "high",
        "message": "Multiple failed logins detected",
        "host": "auth-01",
        "source_ip": "203.0.113.12",
        "timestamp": "2024-10-17T12:15:00Z",
    }
    send_slack_alert(sample_alert, webhook)

Section 6: Mini-SIEM Dashboard

Analyst-Friendly Summary

A mini dashboard turns alerts into a quick situational report. Start with aggregated counts and high-severity highlights.

Console Dashboard Output

#!/usr/bin/env python3
"""
Generate a console dashboard from alert JSON files.
"""
from __future__ import annotations

import glob
import json
from collections import Counter
from typing import Dict, List


def load_alerts(path_pattern: str = "./alerts/*.json") -> List[Dict[str, str]]:
    """
    Load alert files into memory.
    """
    alerts = []
    for path in glob.glob(path_pattern):
        try:
            with open(path, "r", encoding="utf-8") as handle:
                alerts.append(json.load(handle))
        except (OSError, json.JSONDecodeError) as exc:
            print(f"[!] Skipping {path}: {exc}")
    return alerts


def summarize_alerts(alerts: List[Dict[str, str]]) -> None:
    """
    Print summary table.
    """
    if not alerts:
        print("[โœ“] No alerts to summarize")
        return

    severity_counts = Counter(a.get("severity", "unknown") for a in alerts)
    type_counts = Counter(a.get("alert_type", "unknown") for a in alerts)

    print("=== ALERT SUMMARY ===")
    print("Severity counts:")
    for sev, count in severity_counts.items():
        print(f"  - {sev}: {count}")

    print("\nAlert types:")
    for alert_type, count in type_counts.items():
        print(f"  - {alert_type}: {count}")

    print("\nTop sources:")
    ip_counts = Counter(a.get("source_ip", "unknown") for a in alerts)
    for ip, count in ip_counts.most_common(5):
        print(f"  - {ip}: {count} alerts")


def main() -> None:
    """Entrypoint."""
    alerts = load_alerts()
    summarize_alerts(alerts)


if __name__ == "__main__":
    main()

Quick Visualization with Matplotlib

#!/usr/bin/env python3
"""
Plot alert trends for quick analyst visualization.
"""
from __future__ import annotations

import json
from pathlib import Path
from typing import Dict, List

import matplotlib.pyplot as plt
import pandas as pd


def load_alerts_from_dir(alert_dir: str = "./alerts") -> List[Dict[str, str]]:
    """
    Load alerts from a directory.
    """
    alerts = []
    for path in Path(alert_dir).glob("*.json"):
        try:
            alerts.append(json.loads(path.read_text()))
        except (OSError, json.JSONDecodeError) as exc:
            print(f"[!] Failed to read {path}: {exc}")
    return alerts


def plot_alerts(alerts: List[Dict[str, str]]) -> None:
    """
    Plot alerts by hour and severity.
    """
    if not alerts:
        print("[โœ“] No alert data to plot")
        return

    df = pd.DataFrame(alerts)
    df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
    df = df.dropna(subset=["timestamp"])
    df["hour"] = df["timestamp"].dt.floor("H")

    counts = df.groupby(["hour", "severity"]).size().reset_index(name="count")

    for severity in counts["severity"].unique():
        subset = counts[counts["severity"] == severity]
        plt.plot(subset["hour"], subset["count"], marker="o", label=severity)

    plt.title("Alerts by Hour")
    plt.xlabel("Hour")
    plt.ylabel("Alert Count")
    plt.legend()
    plt.tight_layout()
    plt.savefig("alert_trends.png")
    print("[*] Saved chart to alert_trends.png")


if __name__ == "__main__":
    alert_list = load_alerts_from_dir()
    plot_alerts(alert_list)

Section 7: SIEM Integration Concepts

Mapping to a Common Schema

SIEM platforms rely on consistent fields. Map your normalized events to a schema like Elastic Common Schema (ECS) or Splunk CIM.

ECS Mapping Example

#!/usr/bin/env python3
"""
Convert normalized events to ECS-like schema for SIEM ingestion.
"""
from __future__ import annotations

from typing import Dict


def to_ecs(event: Dict[str, str]) -> Dict[str, str]:
    """
    Map normalized event to ECS-style fields.
    """
    return {
        "@timestamp": event.get("timestamp", ""),
        "host.name": event.get("host", ""),
        "event.dataset": event.get("source", ""),
        "process.name": event.get("app", ""),
        "source.ip": event.get("source_ip", ""),
        "user.name": event.get("username", ""),
        "message": event.get("message", ""),
        "event.kind": "event",
        "event.category": "authentication",
        "event.type": "info",
    }


if __name__ == "__main__":
    sample = {
        "timestamp": "2024-10-17T12:10:22Z",
        "host": "auth-01",
        "source": "syslog",
        "app": "sshd",
        "source_ip": "203.0.113.12",
        "username": "root",
        "message": "Failed password for root",
    }
    print(to_ecs(sample))

Elasticsearch Bulk Export (Offline)

#!/usr/bin/env python3
"""
Prepare bulk ingestion payload for Elasticsearch.
This outputs NDJSON for a local lab SIEM instance.
"""
from __future__ import annotations

import json
from typing import Dict, List


def bulk_format(events: List[Dict[str, str]], index_name: str) -> str:
    """
    Format events for ES bulk API.
    """
    lines = []
    for event in events:
        action = {"index": {"_index": index_name}}
        lines.append(json.dumps(action))
        lines.append(json.dumps(event))
    return "\\n".join(lines) + "\\n"


if __name__ == "__main__":
    events = [
        {"@timestamp": "2024-10-17T12:10:22Z", "message": "Failed password", "source.ip": "203.0.113.12"},
        {"@timestamp": "2024-10-17T12:11:22Z", "message": "Accepted password", "source.ip": "203.0.113.12"},
    ]
    payload = bulk_format(events, index_name="csy105-logs")
    with open("bulk.ndjson", "w", encoding="utf-8") as handle:
        handle.write(payload)
    print("[*] Wrote bulk payload to bulk.ndjson")

Response Playbook Stubs

Automated response should be conservative. Use playbooks to guide analysts rather than immediate destructive action.

#!/usr/bin/env python3
"""
Generate a response playbook entry for suspicious activity.
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict


@dataclass
class Playbook:
    title: str
    severity: str
    steps: str

    def to_dict(self) -> Dict[str, str]:
        return {
            "title": self.title,
            "severity": self.severity,
            "steps": self.steps,
        }


def create_playbook(alert: Dict[str, str]) -> Playbook:
    """
    Build a playbook entry for SOC analysts.
    """
    steps = (
        "1) Verify alert accuracy\\n"
        "2) Check source IP reputation\\n"
        "3) Search for related events\\n"
        "4) Notify incident response lead\\n"
        "5) Consider temporary block if malicious\\n"
    )
    return Playbook(
        title=f"Investigate: {alert.get('message', 'Suspicious event')}",
        severity=alert.get("severity", "medium"),
        steps=steps,
    )


if __name__ == "__main__":
    sample_alert = {"message": "Brute force spike from 203.0.113.12", "severity": "high"}
    print(create_playbook(sample_alert).to_dict())

Log Redaction Utility

#!/usr/bin/env python3
"""
Redact sensitive fields before sharing logs.
"""
from __future__ import annotations

import re
from typing import Dict


EMAIL_PATTERN = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}")
IP_PATTERN = re.compile(r"\\b(?:\\d{1,3}\\.){3}\\d{1,3}\\b")


def redact_message(message: str) -> str:
    """
    Replace emails and IP addresses with placeholders.
    """
    message = EMAIL_PATTERN.sub("[REDACTED_EMAIL]", message)
    message = IP_PATTERN.sub("[REDACTED_IP]", message)
    return message


def redact_event(event: Dict[str, str]) -> Dict[str, str]:
    """
    Return a redacted copy of an event.
    """
    redacted = dict(event)
    redacted["message"] = redact_message(event.get("message", ""))
    if "source_ip" in redacted:
        redacted["source_ip"] = "[REDACTED_IP]"
    if "username" in redacted and redacted["username"]:
        redacted["username"] = "[REDACTED_USER]"
    return redacted


if __name__ == "__main__":
    sample = {
        "message": "Failed login for alice@example.com from 203.0.113.12",
        "source_ip": "203.0.113.12",
        "username": "alice",
    }
    print(redact_event(sample))

Ticketing Stub (Local)

#!/usr/bin/env python3
"""
Create a simple incident ticket in JSON for tracking.
"""
from __future__ import annotations

import json
from datetime import datetime
from typing import Dict


def create_ticket(alert: Dict[str, str], output_dir: str = "./tickets") -> str:
    """
    Write a ticket file to disk.
    """
    timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    ticket_id = f"TICKET-{timestamp}"
    ticket = {
        "id": ticket_id,
        "created_at": datetime.utcnow().isoformat(),
        "severity": alert.get("severity", "medium"),
        "summary": alert.get("message", "Suspicious activity"),
        "source_ip": alert.get("source_ip", "unknown"),
        "status": "open",
        "owner": "soc-analyst",
    }

    path = f"{output_dir}/{ticket_id}.json"
    with open(path, "w", encoding="utf-8") as handle:
        json.dump(ticket, handle, indent=2)
    return path


if __name__ == "__main__":
    sample_alert = {
        "severity": "high",
        "message": "Brute force spike from 203.0.113.12",
        "source_ip": "203.0.113.12",
    }
    print(create_ticket(sample_alert))

Lab 08: Defensive Log Analysis Toolkit (90-150 minutes)

Lab Safety: Use only synthetic logs or logs from your own lab environment. Do not export production logs with PII.

Lab Overview

You will build a four-part defensive toolkit: multi-format ingestion, brute-force detection, geolocation anomalies, and threat intel enrichment.

Lab Part 1: Multi-format Log Aggregator (25-35 min)

Objective: Normalize syslog, JSON, and Windows Event logs into a single CSV.

Requirements:

Success Criteria: Running your script produces a clean CSV with consistent columns.

Hint: Robust parsing approach
def extract_source_ip(message: str) -> str:
    """Extract IP from message; return empty if not found."""
    match = re.search(r"\b(?:\d{1,3}\.){3}\d{1,3}\b", message)
    return match.group(0) if match else ""

# Apply extraction after normalization
row["source_ip"] = extract_source_ip(row["message"])

Lab Part 2: Brute Force Attack Detector (20-30 min)

Objective: Detect brute-force logins using rolling windows.

Requirements:

Success Criteria: Trigger alerts with a provided test dataset.

Hint: Pandas resample trick
failures = df[df["outcome"] == "failed"].set_index("timestamp")
windowed = failures.groupby("username").resample("15min").size()
spikes = windowed[windowed >= 10].reset_index(name="failed_count")

Lab Part 3: Geolocation-based Anomaly Detector (20-30 min)

Objective: Flag logins from unusual countries for high-value accounts.

Requirements:

Success Criteria: Alerts include country, city, and user baseline.

Hint: Baseline aggregation
baseline = (
    df.groupby(["username", "country"]).size()
      .reset_index(name="count")
      .sort_values(["username", "count"], ascending=[True, False])
)
# Keep top 3 countries per user
baseline = baseline.groupby("username").head(3)

Lab Part 4: Threat Intel Enrichment (20-30 min)

Objective: Enrich suspicious IPs with threat intelligence.

Requirements:

Success Criteria: Alerts include score and last-seen metadata.

Hint: Cache + API workflow
cache = load_cache()
if cached := get_cached_ip(cache, ip):
    intel = cached
else:
    intel = lookup_abuse_ip(ip, api_key)
    if intel:
        set_cached_ip(cache, ip, intel)
        save_cache(cache)

Stretch Challenges (Optional)

Hint: Correlate multiple signals
def correlated_alert(event: dict, geo: dict, intel: dict) -> dict | None:
    \"\"\"Return an alert only if multiple risk signals are present.\"\"\"
    if event.get(\"outcome\") != \"failed\":
        return None
    if geo.get(\"country\") in {\"Unknown\", \"\"}:
        return None
    if int(intel.get(\"abuseConfidenceScore\", 0)) < 50:
        return None

    return {
        \"alert_type\": \"multi_signal_bruteforce\",
        \"severity\": \"critical\",
        \"message\": \"Failed login from high-risk IP + unusual country\",
        \"source_ip\": event.get(\"source_ip\", \"unknown\"),
        \"country\": geo.get(\"country\", \"unknown\"),
        \"abuseScore\": intel.get(\"abuseConfidenceScore\", \"0\"),
    }
๐ŸŽฏ Lab Complete! You now have a defender-focused log analytics toolkit ready for SIEM-style workflows.

๐Ÿ“ค Deliverables:

Additional Resources

Log Analysis

Threat Intelligence

Blue Team Guides

Key Takeaways

Week 08 Quiz

Test your understanding of defensive Python and log analysis.

Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.

Take Quiz