Skip to content
CSY105 Week 11 Beginner

Week Content

Programming for Security

Track your progress through this week's content

Week Overview

This week focuses on building offensive tools in an ethical, controlled environment. You will design a lightweight command-and-control (C2) workflow, enumerate processes, and build safe simulations of common offensive capabilities.

  • Understand C2 architecture and secure tasking workflows
  • Build an allowlisted agent with telemetry and audit logs
  • Enumerate processes for situational awareness
  • Create consent-based key capture and clipboard monitors
  • Implement a simple payload encoder for lab-only testing
WARNING: Offensive tooling is illegal without explicit authorization. Use only your own lab or instructor-approved environments. Document consent, scope, and timestamps for every test.
Real-World Context: Red teams build safe, repeatable tooling to validate controls. Blue teams use these same patterns to detect suspicious behavior. This week emphasizes logging and controlled execution for accountability.

Section 1: Rules of Engagement and Threat Modeling

Rules of Engagement Checklist

  • Written authorization and explicit test scope
  • Systems, IP ranges, and time windows defined
  • Data handling rules and PII constraints
  • Kill switch and emergency contact defined

Threat Model Snapshot

Target: Lab domain (csy105.local)
Assumptions: Isolated VM network, local-only testing
Assets: Auth servers, developer workstations
Risks: Credential misuse, data leakage, service disruption
Controls: Logging, rate limits, allowlisted commands

Scope Definition (YAML)

engagement:
  name: "CSY105 Week 11 Lab"
  dates: "2026-01-20"
  scope:
    ip_ranges:
      - "192.168.56.0/24"
    systems:
      - "lab-auth-01"
      - "lab-win-02"
  constraints:
    - "No persistence"
    - "No destructive actions"
    - "No data exfiltration"
  contacts:
    ir_lead: "instructor@example.edu"
    emergency_stop: "stop-flag"

Section 2: Command and Control Foundations

Core C2 Concepts

  • Beaconing: Agent checks in at intervals
  • Tasking: Operator queues commands
  • Allowlist: Only approved tasks execute
  • Telemetry: Results and errors are logged

C2 Message Schema

#!/usr/bin/env python3
"""
Define signed C2 messages to prevent tampering.
"""
from __future__ import annotations

import hmac
import json
import time
from dataclasses import dataclass
from hashlib import sha256
from typing import Dict


@dataclass
class C2Message:
    """
    Signed C2 message for lab-only tooling.
    """
    agent_id: str
    task: str
    payload: Dict[str, str]
    timestamp: int
    signature: str = ""

    def to_dict(self) -> Dict[str, str]:
        """Return message as dict."""
        return {
            "agent_id": self.agent_id,
            "task": self.task,
            "payload": json.dumps(self.payload),
            "timestamp": str(self.timestamp),
            "signature": self.signature,
        }


def sign_message(message: C2Message, key: str) -> C2Message:
    """
    Add HMAC signature to message.
    """
    body = f"{message.agent_id}|{message.task}|{message.payload}|{message.timestamp}".encode()
    message.signature = hmac.new(key.encode(), body, sha256).hexdigest()
    return message


def verify_message(message: C2Message, key: str) -> bool:
    """
    Verify HMAC signature before executing.
    """
    expected = sign_message(C2Message(
        agent_id=message.agent_id,
        task=message.task,
        payload=message.payload,
        timestamp=message.timestamp
    ), key).signature
    return hmac.compare_digest(expected, message.signature)


if __name__ == "__main__":
    msg = C2Message(agent_id="agent-01", task="sysinfo", payload={}, timestamp=int(time.time()))
    signed = sign_message(msg, "lab_secret")
    print(signed.to_dict())

Section 3: Lightweight C2 Server (Lab-Only)

Safe Task Queue Server

#!/usr/bin/env python3
"""
Lab-only C2 server with allowlisted tasks.
"""
from __future__ import annotations

import json
import socket
import time
from typing import Dict, List

from c2_protocol import C2Message, sign_message, verify_message

HOST = "127.0.0.1"
PORT = 9101
SECRET = "lab_secret"
TASK_FILE = "tasks.json"


def load_tasks() -> List[Dict[str, str]]:
    """
    Load tasks from disk.
    """
    try:
        with open(TASK_FILE, "r", encoding="utf-8") as handle:
            return json.load(handle)
    except (OSError, json.JSONDecodeError):
        return []


def save_tasks(tasks: List[Dict[str, str]]) -> None:
    """
    Persist tasks to disk.
    """
    with open(TASK_FILE, "w", encoding="utf-8") as handle:
        json.dump(tasks, handle, indent=2)


def get_next_task(agent_id: str) -> Dict[str, str]:
    """
    Pop the next task for the agent.
    """
    tasks = load_tasks()
    for idx, task in enumerate(tasks):
        if task.get("agent_id") == agent_id:
            next_task = tasks.pop(idx)
            save_tasks(tasks)
            return next_task
    return {"task": "idle", "payload": {}}


def handle_client(conn: socket.socket) -> None:
    """
    Handle a single agent request.
    """
    data = conn.recv(4096).decode(errors="ignore")
    if not data:
        return

    try:
        request = json.loads(data)
        msg = C2Message(
            agent_id=request["agent_id"],
            task=request["task"],
            payload=request.get("payload", {}),
            timestamp=int(request["timestamp"]),
            signature=request.get("signature", ""),
        )
    except (KeyError, ValueError, json.JSONDecodeError) as exc:
        conn.sendall(json.dumps({"error": str(exc)}).encode())
        return

    if not verify_message(msg, SECRET):
        conn.sendall(json.dumps({"error": "invalid_signature"}).encode())
        return

    task = get_next_task(msg.agent_id)
    response = C2Message(
        agent_id=msg.agent_id,
        task=task["task"],
        payload=task.get("payload", {}),
        timestamp=int(time.time()),
    )
    response = sign_message(response, SECRET)
    conn.sendall(json.dumps(response.to_dict()).encode())


def run_server() -> None:
    """Start TCP server for lab-only agent traffic."""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
        server.bind((HOST, PORT))
        server.listen(5)
        print(f"[*] C2 server listening on {HOST}:{PORT}")
        while True:
            conn, _ = server.accept()
            with conn:
                handle_client(conn)


if __name__ == "__main__":
    run_server()

Operator Task Queue

#!/usr/bin/env python3
"""
Add tasks to the C2 task queue.
"""
from __future__ import annotations

import json
import time
from typing import Dict, List

TASK_FILE = "tasks.json"


def load_tasks() -> List[Dict[str, str]]:
    """
    Load tasks list.
    """
    try:
        with open(TASK_FILE, "r", encoding="utf-8") as handle:
            return json.load(handle)
    except (OSError, json.JSONDecodeError):
        return []


def save_tasks(tasks: List[Dict[str, str]]) -> None:
    """
    Save tasks list.
    """
    with open(TASK_FILE, "w", encoding="utf-8") as handle:
        json.dump(tasks, handle, indent=2)


def enqueue_task(agent_id: str, task: str, payload: Dict[str, str]) -> None:
    """
    Add a new task for the agent.
    """
    tasks = load_tasks()
    tasks.append({
        "agent_id": agent_id,
        "task": task,
        "payload": payload,
        "created_at": int(time.time()),
    })
    save_tasks(tasks)


if __name__ == "__main__":
    enqueue_task("agent-01", "sysinfo", {})
    enqueue_task("agent-01", "process_list", {"limit": "10"})
    print("[*] Tasks added")

Section 4: Allowlisted C2 Agent

Safe Agent Design

  • Allowlist tasks (no raw shell execution)
  • Rate-limit requests with jitter
  • Log all actions for audit
  • Support a kill switch

Agent Implementation (Lab-Only)

#!/usr/bin/env python3
"""
Allowlisted C2 agent with telemetry and kill switch.
"""
from __future__ import annotations

import json
import random
import socket
import time
from typing import Callable, Dict

import platform
import psutil

from c2_protocol import C2Message, sign_message, verify_message

HOST = "127.0.0.1"
PORT = 9101
SECRET = "lab_secret"
AGENT_ID = "agent-01"
KILL_SWITCH = "stop-flag"


def task_sysinfo(payload: Dict[str, str]) -> Dict[str, str]:
    """Collect basic system info."""
    return {
        "hostname": platform.node(),
        "os": platform.platform(),
        "cpu_count": str(psutil.cpu_count(logical=True)),
    }


def task_process_list(payload: Dict[str, str]) -> Dict[str, str]:
    """Return a limited process list."""
    limit = int(payload.get("limit", "10"))
    results = []
    for proc in psutil.process_iter(["pid", "name", "username"]):
        if len(results) >= limit:
            break
        try:
            results.append({
                "pid": str(proc.info.get("pid")),
                "name": proc.info.get("name", ""),
                "user": proc.info.get("username", ""),
            })
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            continue
    return {"processes": json.dumps(results)}


TASKS: Dict[str, Callable[[Dict[str, str]], Dict[str, str]]] = {
    "sysinfo": task_sysinfo,
    "process_list": task_process_list,
}


def send_message(message: C2Message) -> C2Message:
    """Send a message to the server and return response."""
    message = sign_message(message, SECRET)
    with socket.create_connection((HOST, PORT), timeout=5) as sock:
        sock.sendall(json.dumps(message.to_dict()).encode())
        response = sock.recv(4096).decode(errors="ignore")

    data = json.loads(response)
    return C2Message(
        agent_id=data["agent_id"],
        task=data["task"],
        payload=json.loads(data.get("payload", "{}")),
        timestamp=int(data["timestamp"]),
        signature=data.get("signature", ""),
    )


def execute_task(task: str, payload: Dict[str, str]) -> Dict[str, str]:
    """Execute allowlisted tasks only."""
    if task == KILL_SWITCH:
        raise SystemExit("Kill switch triggered")

    handler = TASKS.get(task)
    if not handler:
        return {"error": f"task_not_allowed: {task}"}

    try:
        return handler(payload)
    except Exception as exc:
        return {"error": str(exc)}


def run_agent() -> None:
    """Main beacon loop."""
    while True:
        message = C2Message(agent_id=AGENT_ID, task="beacon", payload={}, timestamp=int(time.time()))
        response = send_message(message)

        if not verify_message(response, SECRET):
            print("[!] Invalid server signature")
            time.sleep(2)
            continue

        if response.task != "idle":
            result = execute_task(response.task, response.payload)
            print(f"[*] Task result: {result}")

        # Jittered sleep to reduce predictability (lab only)
        time.sleep(random.uniform(1.5, 3.5))


if __name__ == "__main__":
    run_agent()

Section 5: Process Enumeration and Interrogation

Process Inventory

#!/usr/bin/env python3
"""
Enumerate running processes with metadata.
"""
from __future__ import annotations

from typing import Dict, List

import psutil


def list_processes(limit: int = 50) -> List[Dict[str, str]]:
    """
    Return a list of processes with pid, name, and user.
    """
    results = []
    for proc in psutil.process_iter(["pid", "name", "username", "ppid"]):
        if len(results) >= limit:
            break
        try:
            info = proc.info
            results.append({
                "pid": str(info.get("pid")),
                "ppid": str(info.get("ppid")),
                "name": info.get("name", ""),
                "user": info.get("username", ""),
            })
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            continue
    return results


def print_summary(processes: List[Dict[str, str]]) -> None:
    """Print a formatted summary."""
    for proc in processes:
        print(f"{proc['pid']:>6} {proc['ppid']:>6} {proc['user']:<20} {proc['name']}")


if __name__ == "__main__":
    print_summary(list_processes())

Windows Process Interrogator

#!/usr/bin/env python3
"""
Filter Windows processes by keyword (lab only).
"""
from __future__ import annotations

from typing import List

import psutil


def find_processes(keyword: str) -> List[str]:
    """
    Return process names matching keyword.
    """
    matches = []
    for proc in psutil.process_iter(["name"]):
        try:
            name = proc.info.get("name", "")
            if keyword.lower() in name.lower():
                matches.append(name)
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            continue
    return sorted(set(matches))


if __name__ == "__main__":
    for item in find_processes("chrome"):
        print(item)

Section 6: Privilege Escalation Basics (Defensive Auditing)

Common Misconfigurations

  • World-writable scripts or binaries
  • Unquoted service paths on Windows
  • Over-permissive scheduled tasks
  • SUID binaries with unsafe paths

Privilege Escalation Vectors

Vector Example Defense
Weak permissions World-writable service script Harden file ACLs
Unquoted paths Service path with spaces Quote service binary paths
SUID misuse Legacy admin utilities Remove unneeded SUID

World-Writable File Scanner (Linux)

#!/usr/bin/env python3
"""
Find world-writable files in a directory tree.
"""
from __future__ import annotations

import os
from pathlib import Path
from typing import List


def find_world_writable(root: str) -> List[str]:
    """
    Return list of files with global write permission.
    """
    hits = []
    for path in Path(root).rglob("*"):
        try:
            if not path.is_file():
                continue
            mode = path.stat().st_mode
            if mode & 0o002:
                hits.append(str(path))
        except OSError:
            continue
    return hits


if __name__ == "__main__":
    for item in find_world_writable("/tmp"):
        print(item)

SUID Finder (Linux)

#!/usr/bin/env python3
"""
List SUID binaries for review.
"""
from __future__ import annotations

from pathlib import Path
from typing import List


def find_suid(root: str) -> List[str]:
    """
    Return list of SUID binaries.
    """
    results = []
    for path in Path(root).rglob("*"):
        try:
            if not path.is_file():
                continue
            if path.stat().st_mode & 0o4000:
                results.append(str(path))
        except OSError:
            continue
    return results


if __name__ == "__main__":
    for item in find_suid("/usr/bin"):
        print(item)

Unquoted Service Path Audit (Windows Export)

#!/usr/bin/env python3
"""
Parse service listing and flag unquoted paths.
"""
from __future__ import annotations

from pathlib import Path
from typing import List


def find_unquoted(paths: List[str]) -> List[str]:
    """
    Return paths with spaces and missing quotes.
    """
    results = []
    for line in paths:
        if ":" in line and " " in line and not line.strip().startswith("\""):
            results.append(line.strip())
    return results


if __name__ == "__main__":
    lines = Path("services.txt").read_text(encoding="utf-8").splitlines()
    for item in find_unquoted(lines):
        print(item)

Section 7: Consent-Based Input Capture

Educational Key Capture (Local Only)

#!/usr/bin/env python3
"""
Consent-based key capture for training (local only).
Requires pynput and explicit user consent.
"""
from __future__ import annotations

import time
from typing import List

from pynput import keyboard


def capture_keys(duration_sec: int = 15) -> List[str]:
    """
    Capture key presses for a limited duration.
    """
    captured: List[str] = []

    def on_press(key) -> None:
        try:
            captured.append(key.char)
        except AttributeError:
            captured.append(str(key))

    with keyboard.Listener(on_press=on_press) as listener:
        time.sleep(duration_sec)
        listener.stop()

    return captured


if __name__ == "__main__":
    consent = input("Type YES to consent to key capture in this lab: ")
    if consent.strip().upper() != "YES":
        raise SystemExit("Consent required")

    keys = capture_keys(10)
    print("Captured:", keys)

Clipboard Monitor (Local Only)

#!/usr/bin/env python3
"""
Monitor clipboard changes for lab awareness.
Requires pyperclip.
"""
from __future__ import annotations

import re
import time
from typing import Optional

import pyperclip


SECRET_PATTERNS = [
    re.compile(r"AKIA[0-9A-Z]{16}"),  # AWS access key pattern
    re.compile(r"ghp_[A-Za-z0-9]{36}"),  # GitHub token pattern
]


def check_clipboard(last_value: str) -> Optional[str]:
    """
    Return new clipboard content if changed.
    """
    try:
        current = pyperclip.paste()
    except pyperclip.PyperclipException:
        return None

    if current != last_value:
        return current
    return None


def looks_sensitive(text: str) -> bool:
    """
    Detect patterns that resemble secrets.
    """
    return any(pattern.search(text) for pattern in SECRET_PATTERNS)


if __name__ == "__main__":
    last = ""
    print("[*] Monitoring clipboard (Ctrl+C to stop)")
    try:
        while True:
            value = check_clipboard(last)
            if value is not None:
                last = value
                if looks_sensitive(value):
                    print("[ALERT] Possible secret detected")
                else:
                    print("[INFO] Clipboard updated")
            time.sleep(1)
    except KeyboardInterrupt:
        print("[*] Stopped")

Section 8: Credential Harvester Simulation

Local Training Login (Loopback Only)

#!/usr/bin/env python3
"""
Local credential capture simulator for awareness training.
Binds to 127.0.0.1 only.
"""
from __future__ import annotations

import hashlib
import os
from typing import Dict

from flask import Flask, request, render_template_string

app = Flask(__name__)

PAGE = """
<!doctype html>
<html>
<body>
  <h1>Training Login (Local Only)</h1>
  <p>This is a lab simulator. Do not use real passwords.</p>
  <form method="post">
    <label>Username: <input name="username" /></label><br />
    <label>Password: <input name="password" type="password" /></label><br />
    <button type="submit">Submit</button>
  </form>
</body>
</html>
"""


def hash_value(value: str, salt: str) -> str:
    """
    Hash a value with a salt (prevents storing raw secrets).
    """
    return hashlib.sha256((salt + value).encode()).hexdigest()


@app.route("/", methods=["GET", "POST"])
def login() -> str:
    """
    Accept credentials and store only hashed values.
    """
    if request.method == "POST":
        username = request.form.get("username", "")
        password = request.form.get("password", "")
        salt = os.getenv("LAB_SALT", "lab_salt")
        entry = {
            "user_hash": hash_value(username, salt),
            "pass_hash": hash_value(password, salt),
        }
        with open("harvest_log.jsonl", "a", encoding="utf-8") as handle:
            handle.write(f"{entry}\n")
        return "Captured (hashed) - training only"
    return render_template_string(PAGE)


if __name__ == "__main__":
    app.run(host="127.0.0.1", port=5005)

Safety Notes

  • Bind to localhost only
  • Do not store raw credentials
  • Delete logs after the lab

Section 9: Screenshot Capture Utility

Screenshot Capture (Lab-Only)

#!/usr/bin/env python3
"""
Capture a screenshot in a lab environment.
Requires mss library.
"""
from __future__ import annotations

from datetime import datetime

import mss
import mss.tools


def capture_screen() -> str:
    """
    Capture primary monitor and return filename.
    """
    timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    filename = f"screenshot_{timestamp}.png"

    with mss.mss() as sct:
        monitor = sct.monitors[1]
        screenshot = sct.grab(monitor)
        mss.tools.to_png(screenshot.rgb, screenshot.size, output=filename)

    return filename


if __name__ == "__main__":
    print(f"Saved {capture_screen()}")

Section 10: Obfuscation and Payload Encoding

Simple Base64 Encoder

#!/usr/bin/env python3
"""
Encode and decode strings for lab demonstrations.
"""
from __future__ import annotations

import base64
from typing import Tuple


def encode_text(value: str) -> str:
    """
    Return base64 encoded string.
    """
    return base64.b64encode(value.encode()).decode()


def decode_text(encoded: str) -> str:
    """
    Decode base64 string safely.
    """
    try:
        return base64.b64decode(encoded.encode()).decode()
    except (ValueError, UnicodeDecodeError) as exc:
        raise ValueError(f"Invalid base64: {exc}")


if __name__ == "__main__":
    data = "lab-only-payload"
    encoded = encode_text(data)
    print(encoded)
    print(decode_text(encoded))

ROT13 Obfuscation (Educational)

#!/usr/bin/env python3
"""
Use ROT13 to obfuscate test strings.
"""
from __future__ import annotations

import codecs


def rot13(value: str) -> str:
    """
    Return ROT13 encoded string.
    """
    return codecs.encode(value, "rot_13")


if __name__ == "__main__":
    sample = "training-string"
    print(rot13(sample))

Section 11: Telemetry and Audit Logging

Agent Audit Log

#!/usr/bin/env python3
"""
Record agent actions to a JSONL audit log.
"""
from __future__ import annotations

import json
from datetime import datetime
from typing import Dict


def log_action(event_type: str, detail: Dict[str, str], path: str = "agent_audit.jsonl") -> None:
    """
    Write a single audit event.
    """
    entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "event_type": event_type,
        "detail": detail,
    }
    with open(path, "a", encoding="utf-8") as handle:
        handle.write(json.dumps(entry) + "\n")


if __name__ == "__main__":
    log_action("task_executed", {"task": "sysinfo", "status": "ok"})

Server-Side Event Summary

#!/usr/bin/env python3
"""
Summarize audit events for accountability.
"""
from __future__ import annotations

import json
from collections import Counter
from typing import Dict


def summarize_events(path: str) -> Dict[str, int]:
    """
    Count events by type.
    """
    counts = Counter()
    with open(path, "r", encoding="utf-8") as handle:
        for line in handle:
            try:
                entry = json.loads(line)
                counts[entry.get("event_type", "unknown")] += 1
            except json.JSONDecodeError:
                continue
    return dict(counts)


if __name__ == "__main__":
    print(summarize_events("agent_audit.jsonl"))

Section 12: Secure Configuration and Secrets Handling

Configuration Hygiene

  • Use explicit allowlists for hosts and tasks
  • Reject default secrets in production
  • Validate ports and timeouts
  • Log config changes for audit

Config Loader with Validation

#!/usr/bin/env python3
"""
Load and validate configuration for lab tools.
"""
from __future__ import annotations

import json
from dataclasses import dataclass
from typing import List


@dataclass
class ToolConfig:
    host: str
    port: int
    allowed_tasks: List[str]
    beacon_min: float
    beacon_max: float


def load_config(path: str) -> ToolConfig:
    """
    Load config JSON and validate fields.
    """
    with open(path, "r", encoding="utf-8") as handle:
        data = json.load(handle)

    host = data.get("host", "127.0.0.1")
    port = int(data.get("port", 9101))
    if port < 1024 or port > 65535:
        raise ValueError("Port out of range")

    allowed_tasks = data.get("allowed_tasks", [])
    if not isinstance(allowed_tasks, list) or not allowed_tasks:
        raise ValueError("allowed_tasks must be a non-empty list")

    beacon_min = float(data.get("beacon_min", 1.5))
    beacon_max = float(data.get("beacon_max", 3.5))
    if beacon_min >= beacon_max:
        raise ValueError("beacon_min must be less than beacon_max")

    return ToolConfig(
        host=host,
        port=port,
        allowed_tasks=allowed_tasks,
        beacon_min=beacon_min,
        beacon_max=beacon_max,
    )


if __name__ == "__main__":
    print(load_config("config.json"))

Secret Loader (Environment First)

#!/usr/bin/env python3
"""
Load secrets from environment with safe defaults blocked.
"""
from __future__ import annotations

import os


def get_secret(name: str) -> str:
    """
    Return a secret value or fail fast.
    """
    value = os.getenv(name, "")
    if not value or value in {"changeme", "lab_secret"}:
        raise ValueError(f"Missing or unsafe secret: {name}")
    return value


if __name__ == "__main__":
    print(get_secret("C2_SHARED_KEY"))

Allowlist Loader

#!/usr/bin/env python3
"""
Load a task allowlist from a file.
"""
from __future__ import annotations

from pathlib import Path
from typing import List


def load_allowlist(path: str) -> List[str]:
    """
    Return allowlisted task names from a text file.
    """
    tasks = []
    for line in Path(path).read_text(encoding="utf-8").splitlines():
        name = line.strip()
        if name and not name.startswith("#"):
            tasks.append(name)
    return tasks


if __name__ == "__main__":
    print(load_allowlist("allowlist.txt"))

Section 13: Defensive Detection Mapping

ATT&CK Technique Mapping

Technique What You Built Detection Ideas
T1071 - Application Layer Protocol C2 beaconing Monitor loopback traffic, unusual beacons
T1057 - Process Discovery Process enumeration EDR alerts for process inventory scans
T1056 - Input Capture Consent-based key capture Monitor for keyboard hooks
T1115 - Clipboard Data Clipboard monitoring Alert on clipboard API use
T1113 - Screen Capture Screenshot utility Monitor screen capture libraries
T1027 - Obfuscated Files or Information Payload encoding Scan for encoded payload patterns

Section 14: Engagement Reporting and Evidence Handling

Evidence Log Template

timestamp: 2026-01-20T14:33:21Z
tool: c2_agent
action: process_list
target: lab-win-02
result: success
evidence: process_report.json
notes: allowlisted task, lab-only

Report Sections

  • Scope and authorization
  • Tooling summary and versions
  • Actions performed and results
  • Detected risks and mitigations
  • Artifacts and hashes

Report Generator (Markdown)

#!/usr/bin/env python3
"""
Generate a short engagement report from artifacts.
"""
from __future__ import annotations

from datetime import datetime
from pathlib import Path
from typing import Dict, List


def generate_report(findings: List[Dict[str, str]], output_path: str) -> None:
    """
    Write a markdown report summarizing lab actions.
    """
    lines = [
        "# Week 11 Engagement Report",
        "",
        f"Generated: {datetime.utcnow().isoformat()}",
        "",
        "## Findings",
    ]

    for item in findings:
        lines.append(f"- {item.get('title', 'finding')}: {item.get('detail', '')}")

    Path(output_path).write_text("\\n".join(lines), encoding="utf-8")


if __name__ == "__main__":
    sample = [
        {"title": "C2 Tasking", "detail": "Agent executed allowlisted tasks only"},
        {"title": "Clipboard Monitor", "detail": "Token pattern detected in test string"},
    ]
    generate_report(sample, "engagement_report.md")

Evidence Handling Checklist

  • Timestamp every action and artifact
  • Store evidence in a read-only folder
  • Hash artifacts to preserve integrity
  • Delete or sanitize sensitive data after lab

Lab 11: Offensive Tool Development (90-150 minutes)

Lab Safety: Run all exercises in isolated lab VMs. Do not target real systems or networks.

Lab Part 1: Lightweight C2 Agent (25-35 min)

Objective: Build a beaconing agent and allowlisted server task queue.

Requirements:

  • Agent checks in every 2-4 seconds with jitter
  • Server returns tasks from a JSON queue
  • Tasks are allowlisted (no raw shell)

Success Criteria: Agent receives tasks and prints results with signatures verified.

Hint: Allowlist pattern
TASKS = {"sysinfo": task_sysinfo, "process_list": task_process_list}
if task not in TASKS:
    return {"error": "task_not_allowed"}

Lab Part 2: Windows Process Interrogator (20-25 min)

Objective: Enumerate and filter processes on a Windows VM.

Requirements:

  • List pid, name, user, parent pid
  • Filter by keyword or username
  • Export results to process_report.json

Success Criteria: Report lists 20+ processes with metadata.

Hint: JSON export
import json
with open("process_report.json", "w", encoding="utf-8") as handle:
    json.dump(processes, handle, indent=2)

Lab Part 3: Screenshot Capture Utility (15-20 min)

Objective: Capture a local screenshot and log metadata.

Requirements:

  • Save a PNG file with timestamp
  • Log file path and size to screenshot_log.jsonl
  • Run only on local lab VM

Success Criteria: Screenshot file saved and logged.

Hint: Metadata log
entry = {"file": filename, "size": os.path.getsize(filename)}
with open("screenshot_log.jsonl", "a", encoding="utf-8") as handle:
    handle.write(json.dumps(entry) + "\n")

Lab Part 4: Clipboard Monitor (15-20 min)

Objective: Monitor clipboard changes and flag secret-like patterns.

Requirements:

  • Detect clipboard changes every 1 second
  • Flag patterns like API keys or tokens
  • Log alerts to clipboard_alerts.jsonl

Success Criteria: Alerts generated when test tokens are copied.

Hint: Pattern detection
if looks_sensitive(value):
    log_action("clipboard_alert", {"value": "[REDACTED]"})

Lab Part 5: Simple Payload Encoder (15-20 min)

Objective: Encode and decode strings using base64 or ROT13.

Requirements:

  • Provide encode/decode functions
  • Validate input and handle errors
  • Log encoded output to encoded_payload.txt

Success Criteria: Round-trip decode matches original input.

Hint: Error handling
try:
    decoded = decode_text(encoded)
except ValueError as exc:
    print(f"Error: {exc}")
Lab Complete! You built a safe, auditable offensive toolchain for controlled labs.

Deliverables:

  • c2_protocol.py - Signed message schema
  • c2_server.py - Task queue server
  • c2_agent.py - Allowlisted agent
  • process_report.json - Process inventory
  • screenshot_*.png - Screenshot artifact
  • clipboard_alerts.jsonl - Clipboard alerts
  • encoded_payload.txt - Encoded payload output

Additional Resources

Offensive Tooling

Libraries Used

Safe Lab Environments

  • FLARE VM (Windows analysis lab)
  • Kali Linux (offensive testing tools)
  • Security Onion (blue team monitoring)

Key Takeaways

  • Authorization and scope define ethical offensive work
  • C2 agents must be allowlisted and auditable
  • Process inventory is foundational reconnaissance
  • Input capture and clipboard tools require strict consent
  • Obfuscation and encoding are detectable signals
  • Defenders map offensive tooling to ATT&CK techniques

Week 11 Quiz

Test your understanding of offensive tool development.

Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.

Take Quiz