Week Overview
This week focuses on building offensive tools in an ethical, controlled environment. You will design a lightweight command-and-control (C2) workflow, enumerate processes, and build safe simulations of common offensive capabilities.
- Understand C2 architecture and secure tasking workflows
- Build an allowlisted agent with telemetry and audit logs
- Enumerate processes for situational awareness
- Create consent-based key capture and clipboard monitors
- Implement a simple payload encoder for lab-only testing
Section 1: Rules of Engagement and Threat Modeling
Rules of Engagement Checklist
- Written authorization and explicit test scope
- Systems, IP ranges, and time windows defined
- Data handling rules and PII constraints
- Kill switch and emergency contact defined
Threat Model Snapshot
Target: Lab domain (csy105.local)
Assumptions: Isolated VM network, local-only testing
Assets: Auth servers, developer workstations
Risks: Credential misuse, data leakage, service disruption
Controls: Logging, rate limits, allowlisted commands
Scope Definition (YAML)
engagement:
name: "CSY105 Week 11 Lab"
dates: "2026-01-20"
scope:
ip_ranges:
- "192.168.56.0/24"
systems:
- "lab-auth-01"
- "lab-win-02"
constraints:
- "No persistence"
- "No destructive actions"
- "No data exfiltration"
contacts:
ir_lead: "instructor@example.edu"
emergency_stop: "stop-flag"
Section 2: Command and Control Foundations
Core C2 Concepts
- Beaconing: Agent checks in at intervals
- Tasking: Operator queues commands
- Allowlist: Only approved tasks execute
- Telemetry: Results and errors are logged
C2 Message Schema
#!/usr/bin/env python3
"""
Define signed C2 messages to prevent tampering.
"""
from __future__ import annotations
import hmac
import json
import time
from dataclasses import dataclass
from hashlib import sha256
from typing import Dict
@dataclass
class C2Message:
"""
Signed C2 message for lab-only tooling.
"""
agent_id: str
task: str
payload: Dict[str, str]
timestamp: int
signature: str = ""
def to_dict(self) -> Dict[str, str]:
"""Return message as dict."""
return {
"agent_id": self.agent_id,
"task": self.task,
"payload": json.dumps(self.payload),
"timestamp": str(self.timestamp),
"signature": self.signature,
}
def sign_message(message: C2Message, key: str) -> C2Message:
"""
Add HMAC signature to message.
"""
body = f"{message.agent_id}|{message.task}|{message.payload}|{message.timestamp}".encode()
message.signature = hmac.new(key.encode(), body, sha256).hexdigest()
return message
def verify_message(message: C2Message, key: str) -> bool:
"""
Verify HMAC signature before executing.
"""
expected = sign_message(C2Message(
agent_id=message.agent_id,
task=message.task,
payload=message.payload,
timestamp=message.timestamp
), key).signature
return hmac.compare_digest(expected, message.signature)
if __name__ == "__main__":
msg = C2Message(agent_id="agent-01", task="sysinfo", payload={}, timestamp=int(time.time()))
signed = sign_message(msg, "lab_secret")
print(signed.to_dict())
Section 3: Lightweight C2 Server (Lab-Only)
Safe Task Queue Server
#!/usr/bin/env python3
"""
Lab-only C2 server with allowlisted tasks.
"""
from __future__ import annotations
import json
import socket
import time
from typing import Dict, List
from c2_protocol import C2Message, sign_message, verify_message
HOST = "127.0.0.1"
PORT = 9101
SECRET = "lab_secret"
TASK_FILE = "tasks.json"
def load_tasks() -> List[Dict[str, str]]:
"""
Load tasks from disk.
"""
try:
with open(TASK_FILE, "r", encoding="utf-8") as handle:
return json.load(handle)
except (OSError, json.JSONDecodeError):
return []
def save_tasks(tasks: List[Dict[str, str]]) -> None:
"""
Persist tasks to disk.
"""
with open(TASK_FILE, "w", encoding="utf-8") as handle:
json.dump(tasks, handle, indent=2)
def get_next_task(agent_id: str) -> Dict[str, str]:
"""
Pop the next task for the agent.
"""
tasks = load_tasks()
for idx, task in enumerate(tasks):
if task.get("agent_id") == agent_id:
next_task = tasks.pop(idx)
save_tasks(tasks)
return next_task
return {"task": "idle", "payload": {}}
def handle_client(conn: socket.socket) -> None:
"""
Handle a single agent request.
"""
data = conn.recv(4096).decode(errors="ignore")
if not data:
return
try:
request = json.loads(data)
msg = C2Message(
agent_id=request["agent_id"],
task=request["task"],
payload=request.get("payload", {}),
timestamp=int(request["timestamp"]),
signature=request.get("signature", ""),
)
except (KeyError, ValueError, json.JSONDecodeError) as exc:
conn.sendall(json.dumps({"error": str(exc)}).encode())
return
if not verify_message(msg, SECRET):
conn.sendall(json.dumps({"error": "invalid_signature"}).encode())
return
task = get_next_task(msg.agent_id)
response = C2Message(
agent_id=msg.agent_id,
task=task["task"],
payload=task.get("payload", {}),
timestamp=int(time.time()),
)
response = sign_message(response, SECRET)
conn.sendall(json.dumps(response.to_dict()).encode())
def run_server() -> None:
"""Start TCP server for lab-only agent traffic."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
server.bind((HOST, PORT))
server.listen(5)
print(f"[*] C2 server listening on {HOST}:{PORT}")
while True:
conn, _ = server.accept()
with conn:
handle_client(conn)
if __name__ == "__main__":
run_server()
Operator Task Queue
#!/usr/bin/env python3
"""
Add tasks to the C2 task queue.
"""
from __future__ import annotations
import json
import time
from typing import Dict, List
TASK_FILE = "tasks.json"
def load_tasks() -> List[Dict[str, str]]:
"""
Load tasks list.
"""
try:
with open(TASK_FILE, "r", encoding="utf-8") as handle:
return json.load(handle)
except (OSError, json.JSONDecodeError):
return []
def save_tasks(tasks: List[Dict[str, str]]) -> None:
"""
Save tasks list.
"""
with open(TASK_FILE, "w", encoding="utf-8") as handle:
json.dump(tasks, handle, indent=2)
def enqueue_task(agent_id: str, task: str, payload: Dict[str, str]) -> None:
"""
Add a new task for the agent.
"""
tasks = load_tasks()
tasks.append({
"agent_id": agent_id,
"task": task,
"payload": payload,
"created_at": int(time.time()),
})
save_tasks(tasks)
if __name__ == "__main__":
enqueue_task("agent-01", "sysinfo", {})
enqueue_task("agent-01", "process_list", {"limit": "10"})
print("[*] Tasks added")
Section 4: Allowlisted C2 Agent
Safe Agent Design
- Allowlist tasks (no raw shell execution)
- Rate-limit requests with jitter
- Log all actions for audit
- Support a kill switch
Agent Implementation (Lab-Only)
#!/usr/bin/env python3
"""
Allowlisted C2 agent with telemetry and kill switch.
"""
from __future__ import annotations
import json
import random
import socket
import time
from typing import Callable, Dict
import platform
import psutil
from c2_protocol import C2Message, sign_message, verify_message
HOST = "127.0.0.1"
PORT = 9101
SECRET = "lab_secret"
AGENT_ID = "agent-01"
KILL_SWITCH = "stop-flag"
def task_sysinfo(payload: Dict[str, str]) -> Dict[str, str]:
"""Collect basic system info."""
return {
"hostname": platform.node(),
"os": platform.platform(),
"cpu_count": str(psutil.cpu_count(logical=True)),
}
def task_process_list(payload: Dict[str, str]) -> Dict[str, str]:
"""Return a limited process list."""
limit = int(payload.get("limit", "10"))
results = []
for proc in psutil.process_iter(["pid", "name", "username"]):
if len(results) >= limit:
break
try:
results.append({
"pid": str(proc.info.get("pid")),
"name": proc.info.get("name", ""),
"user": proc.info.get("username", ""),
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return {"processes": json.dumps(results)}
TASKS: Dict[str, Callable[[Dict[str, str]], Dict[str, str]]] = {
"sysinfo": task_sysinfo,
"process_list": task_process_list,
}
def send_message(message: C2Message) -> C2Message:
"""Send a message to the server and return response."""
message = sign_message(message, SECRET)
with socket.create_connection((HOST, PORT), timeout=5) as sock:
sock.sendall(json.dumps(message.to_dict()).encode())
response = sock.recv(4096).decode(errors="ignore")
data = json.loads(response)
return C2Message(
agent_id=data["agent_id"],
task=data["task"],
payload=json.loads(data.get("payload", "{}")),
timestamp=int(data["timestamp"]),
signature=data.get("signature", ""),
)
def execute_task(task: str, payload: Dict[str, str]) -> Dict[str, str]:
"""Execute allowlisted tasks only."""
if task == KILL_SWITCH:
raise SystemExit("Kill switch triggered")
handler = TASKS.get(task)
if not handler:
return {"error": f"task_not_allowed: {task}"}
try:
return handler(payload)
except Exception as exc:
return {"error": str(exc)}
def run_agent() -> None:
"""Main beacon loop."""
while True:
message = C2Message(agent_id=AGENT_ID, task="beacon", payload={}, timestamp=int(time.time()))
response = send_message(message)
if not verify_message(response, SECRET):
print("[!] Invalid server signature")
time.sleep(2)
continue
if response.task != "idle":
result = execute_task(response.task, response.payload)
print(f"[*] Task result: {result}")
# Jittered sleep to reduce predictability (lab only)
time.sleep(random.uniform(1.5, 3.5))
if __name__ == "__main__":
run_agent()
Section 5: Process Enumeration and Interrogation
Process Inventory
#!/usr/bin/env python3
"""
Enumerate running processes with metadata.
"""
from __future__ import annotations
from typing import Dict, List
import psutil
def list_processes(limit: int = 50) -> List[Dict[str, str]]:
"""
Return a list of processes with pid, name, and user.
"""
results = []
for proc in psutil.process_iter(["pid", "name", "username", "ppid"]):
if len(results) >= limit:
break
try:
info = proc.info
results.append({
"pid": str(info.get("pid")),
"ppid": str(info.get("ppid")),
"name": info.get("name", ""),
"user": info.get("username", ""),
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return results
def print_summary(processes: List[Dict[str, str]]) -> None:
"""Print a formatted summary."""
for proc in processes:
print(f"{proc['pid']:>6} {proc['ppid']:>6} {proc['user']:<20} {proc['name']}")
if __name__ == "__main__":
print_summary(list_processes())
Windows Process Interrogator
#!/usr/bin/env python3
"""
Filter Windows processes by keyword (lab only).
"""
from __future__ import annotations
from typing import List
import psutil
def find_processes(keyword: str) -> List[str]:
"""
Return process names matching keyword.
"""
matches = []
for proc in psutil.process_iter(["name"]):
try:
name = proc.info.get("name", "")
if keyword.lower() in name.lower():
matches.append(name)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return sorted(set(matches))
if __name__ == "__main__":
for item in find_processes("chrome"):
print(item)
Section 6: Privilege Escalation Basics (Defensive Auditing)
Common Misconfigurations
- World-writable scripts or binaries
- Unquoted service paths on Windows
- Over-permissive scheduled tasks
- SUID binaries with unsafe paths
Privilege Escalation Vectors
| Vector | Example | Defense |
|---|---|---|
| Weak permissions | World-writable service script | Harden file ACLs |
| Unquoted paths | Service path with spaces | Quote service binary paths |
| SUID misuse | Legacy admin utilities | Remove unneeded SUID |
World-Writable File Scanner (Linux)
#!/usr/bin/env python3
"""
Find world-writable files in a directory tree.
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import List
def find_world_writable(root: str) -> List[str]:
"""
Return list of files with global write permission.
"""
hits = []
for path in Path(root).rglob("*"):
try:
if not path.is_file():
continue
mode = path.stat().st_mode
if mode & 0o002:
hits.append(str(path))
except OSError:
continue
return hits
if __name__ == "__main__":
for item in find_world_writable("/tmp"):
print(item)
SUID Finder (Linux)
#!/usr/bin/env python3
"""
List SUID binaries for review.
"""
from __future__ import annotations
from pathlib import Path
from typing import List
def find_suid(root: str) -> List[str]:
"""
Return list of SUID binaries.
"""
results = []
for path in Path(root).rglob("*"):
try:
if not path.is_file():
continue
if path.stat().st_mode & 0o4000:
results.append(str(path))
except OSError:
continue
return results
if __name__ == "__main__":
for item in find_suid("/usr/bin"):
print(item)
Unquoted Service Path Audit (Windows Export)
#!/usr/bin/env python3
"""
Parse service listing and flag unquoted paths.
"""
from __future__ import annotations
from pathlib import Path
from typing import List
def find_unquoted(paths: List[str]) -> List[str]:
"""
Return paths with spaces and missing quotes.
"""
results = []
for line in paths:
if ":" in line and " " in line and not line.strip().startswith("\""):
results.append(line.strip())
return results
if __name__ == "__main__":
lines = Path("services.txt").read_text(encoding="utf-8").splitlines()
for item in find_unquoted(lines):
print(item)
Section 7: Consent-Based Input Capture
Educational Key Capture (Local Only)
#!/usr/bin/env python3
"""
Consent-based key capture for training (local only).
Requires pynput and explicit user consent.
"""
from __future__ import annotations
import time
from typing import List
from pynput import keyboard
def capture_keys(duration_sec: int = 15) -> List[str]:
"""
Capture key presses for a limited duration.
"""
captured: List[str] = []
def on_press(key) -> None:
try:
captured.append(key.char)
except AttributeError:
captured.append(str(key))
with keyboard.Listener(on_press=on_press) as listener:
time.sleep(duration_sec)
listener.stop()
return captured
if __name__ == "__main__":
consent = input("Type YES to consent to key capture in this lab: ")
if consent.strip().upper() != "YES":
raise SystemExit("Consent required")
keys = capture_keys(10)
print("Captured:", keys)
Clipboard Monitor (Local Only)
#!/usr/bin/env python3
"""
Monitor clipboard changes for lab awareness.
Requires pyperclip.
"""
from __future__ import annotations
import re
import time
from typing import Optional
import pyperclip
SECRET_PATTERNS = [
re.compile(r"AKIA[0-9A-Z]{16}"), # AWS access key pattern
re.compile(r"ghp_[A-Za-z0-9]{36}"), # GitHub token pattern
]
def check_clipboard(last_value: str) -> Optional[str]:
"""
Return new clipboard content if changed.
"""
try:
current = pyperclip.paste()
except pyperclip.PyperclipException:
return None
if current != last_value:
return current
return None
def looks_sensitive(text: str) -> bool:
"""
Detect patterns that resemble secrets.
"""
return any(pattern.search(text) for pattern in SECRET_PATTERNS)
if __name__ == "__main__":
last = ""
print("[*] Monitoring clipboard (Ctrl+C to stop)")
try:
while True:
value = check_clipboard(last)
if value is not None:
last = value
if looks_sensitive(value):
print("[ALERT] Possible secret detected")
else:
print("[INFO] Clipboard updated")
time.sleep(1)
except KeyboardInterrupt:
print("[*] Stopped")
Section 8: Credential Harvester Simulation
Local Training Login (Loopback Only)
#!/usr/bin/env python3
"""
Local credential capture simulator for awareness training.
Binds to 127.0.0.1 only.
"""
from __future__ import annotations
import hashlib
import os
from typing import Dict
from flask import Flask, request, render_template_string
app = Flask(__name__)
PAGE = """
<!doctype html>
<html>
<body>
<h1>Training Login (Local Only)</h1>
<p>This is a lab simulator. Do not use real passwords.</p>
<form method="post">
<label>Username: <input name="username" /></label><br />
<label>Password: <input name="password" type="password" /></label><br />
<button type="submit">Submit</button>
</form>
</body>
</html>
"""
def hash_value(value: str, salt: str) -> str:
"""
Hash a value with a salt (prevents storing raw secrets).
"""
return hashlib.sha256((salt + value).encode()).hexdigest()
@app.route("/", methods=["GET", "POST"])
def login() -> str:
"""
Accept credentials and store only hashed values.
"""
if request.method == "POST":
username = request.form.get("username", "")
password = request.form.get("password", "")
salt = os.getenv("LAB_SALT", "lab_salt")
entry = {
"user_hash": hash_value(username, salt),
"pass_hash": hash_value(password, salt),
}
with open("harvest_log.jsonl", "a", encoding="utf-8") as handle:
handle.write(f"{entry}\n")
return "Captured (hashed) - training only"
return render_template_string(PAGE)
if __name__ == "__main__":
app.run(host="127.0.0.1", port=5005)
Safety Notes
- Bind to localhost only
- Do not store raw credentials
- Delete logs after the lab
Section 9: Screenshot Capture Utility
Screenshot Capture (Lab-Only)
#!/usr/bin/env python3
"""
Capture a screenshot in a lab environment.
Requires mss library.
"""
from __future__ import annotations
from datetime import datetime
import mss
import mss.tools
def capture_screen() -> str:
"""
Capture primary monitor and return filename.
"""
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
filename = f"screenshot_{timestamp}.png"
with mss.mss() as sct:
monitor = sct.monitors[1]
screenshot = sct.grab(monitor)
mss.tools.to_png(screenshot.rgb, screenshot.size, output=filename)
return filename
if __name__ == "__main__":
print(f"Saved {capture_screen()}")
Section 10: Obfuscation and Payload Encoding
Simple Base64 Encoder
#!/usr/bin/env python3
"""
Encode and decode strings for lab demonstrations.
"""
from __future__ import annotations
import base64
from typing import Tuple
def encode_text(value: str) -> str:
"""
Return base64 encoded string.
"""
return base64.b64encode(value.encode()).decode()
def decode_text(encoded: str) -> str:
"""
Decode base64 string safely.
"""
try:
return base64.b64decode(encoded.encode()).decode()
except (ValueError, UnicodeDecodeError) as exc:
raise ValueError(f"Invalid base64: {exc}")
if __name__ == "__main__":
data = "lab-only-payload"
encoded = encode_text(data)
print(encoded)
print(decode_text(encoded))
ROT13 Obfuscation (Educational)
#!/usr/bin/env python3
"""
Use ROT13 to obfuscate test strings.
"""
from __future__ import annotations
import codecs
def rot13(value: str) -> str:
"""
Return ROT13 encoded string.
"""
return codecs.encode(value, "rot_13")
if __name__ == "__main__":
sample = "training-string"
print(rot13(sample))
Section 11: Telemetry and Audit Logging
Agent Audit Log
#!/usr/bin/env python3
"""
Record agent actions to a JSONL audit log.
"""
from __future__ import annotations
import json
from datetime import datetime
from typing import Dict
def log_action(event_type: str, detail: Dict[str, str], path: str = "agent_audit.jsonl") -> None:
"""
Write a single audit event.
"""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"detail": detail,
}
with open(path, "a", encoding="utf-8") as handle:
handle.write(json.dumps(entry) + "\n")
if __name__ == "__main__":
log_action("task_executed", {"task": "sysinfo", "status": "ok"})
Server-Side Event Summary
#!/usr/bin/env python3
"""
Summarize audit events for accountability.
"""
from __future__ import annotations
import json
from collections import Counter
from typing import Dict
def summarize_events(path: str) -> Dict[str, int]:
"""
Count events by type.
"""
counts = Counter()
with open(path, "r", encoding="utf-8") as handle:
for line in handle:
try:
entry = json.loads(line)
counts[entry.get("event_type", "unknown")] += 1
except json.JSONDecodeError:
continue
return dict(counts)
if __name__ == "__main__":
print(summarize_events("agent_audit.jsonl"))
Section 12: Secure Configuration and Secrets Handling
Configuration Hygiene
- Use explicit allowlists for hosts and tasks
- Reject default secrets in production
- Validate ports and timeouts
- Log config changes for audit
Config Loader with Validation
#!/usr/bin/env python3
"""
Load and validate configuration for lab tools.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import List
@dataclass
class ToolConfig:
host: str
port: int
allowed_tasks: List[str]
beacon_min: float
beacon_max: float
def load_config(path: str) -> ToolConfig:
"""
Load config JSON and validate fields.
"""
with open(path, "r", encoding="utf-8") as handle:
data = json.load(handle)
host = data.get("host", "127.0.0.1")
port = int(data.get("port", 9101))
if port < 1024 or port > 65535:
raise ValueError("Port out of range")
allowed_tasks = data.get("allowed_tasks", [])
if not isinstance(allowed_tasks, list) or not allowed_tasks:
raise ValueError("allowed_tasks must be a non-empty list")
beacon_min = float(data.get("beacon_min", 1.5))
beacon_max = float(data.get("beacon_max", 3.5))
if beacon_min >= beacon_max:
raise ValueError("beacon_min must be less than beacon_max")
return ToolConfig(
host=host,
port=port,
allowed_tasks=allowed_tasks,
beacon_min=beacon_min,
beacon_max=beacon_max,
)
if __name__ == "__main__":
print(load_config("config.json"))
Secret Loader (Environment First)
#!/usr/bin/env python3
"""
Load secrets from environment with safe defaults blocked.
"""
from __future__ import annotations
import os
def get_secret(name: str) -> str:
"""
Return a secret value or fail fast.
"""
value = os.getenv(name, "")
if not value or value in {"changeme", "lab_secret"}:
raise ValueError(f"Missing or unsafe secret: {name}")
return value
if __name__ == "__main__":
print(get_secret("C2_SHARED_KEY"))
Allowlist Loader
#!/usr/bin/env python3
"""
Load a task allowlist from a file.
"""
from __future__ import annotations
from pathlib import Path
from typing import List
def load_allowlist(path: str) -> List[str]:
"""
Return allowlisted task names from a text file.
"""
tasks = []
for line in Path(path).read_text(encoding="utf-8").splitlines():
name = line.strip()
if name and not name.startswith("#"):
tasks.append(name)
return tasks
if __name__ == "__main__":
print(load_allowlist("allowlist.txt"))
Section 13: Defensive Detection Mapping
ATT&CK Technique Mapping
| Technique | What You Built | Detection Ideas |
|---|---|---|
| T1071 - Application Layer Protocol | C2 beaconing | Monitor loopback traffic, unusual beacons |
| T1057 - Process Discovery | Process enumeration | EDR alerts for process inventory scans |
| T1056 - Input Capture | Consent-based key capture | Monitor for keyboard hooks |
| T1115 - Clipboard Data | Clipboard monitoring | Alert on clipboard API use |
| T1113 - Screen Capture | Screenshot utility | Monitor screen capture libraries |
| T1027 - Obfuscated Files or Information | Payload encoding | Scan for encoded payload patterns |
Section 14: Engagement Reporting and Evidence Handling
Evidence Log Template
timestamp: 2026-01-20T14:33:21Z
tool: c2_agent
action: process_list
target: lab-win-02
result: success
evidence: process_report.json
notes: allowlisted task, lab-only
Report Sections
- Scope and authorization
- Tooling summary and versions
- Actions performed and results
- Detected risks and mitigations
- Artifacts and hashes
Report Generator (Markdown)
#!/usr/bin/env python3
"""
Generate a short engagement report from artifacts.
"""
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import Dict, List
def generate_report(findings: List[Dict[str, str]], output_path: str) -> None:
"""
Write a markdown report summarizing lab actions.
"""
lines = [
"# Week 11 Engagement Report",
"",
f"Generated: {datetime.utcnow().isoformat()}",
"",
"## Findings",
]
for item in findings:
lines.append(f"- {item.get('title', 'finding')}: {item.get('detail', '')}")
Path(output_path).write_text("\\n".join(lines), encoding="utf-8")
if __name__ == "__main__":
sample = [
{"title": "C2 Tasking", "detail": "Agent executed allowlisted tasks only"},
{"title": "Clipboard Monitor", "detail": "Token pattern detected in test string"},
]
generate_report(sample, "engagement_report.md")
Evidence Handling Checklist
- Timestamp every action and artifact
- Store evidence in a read-only folder
- Hash artifacts to preserve integrity
- Delete or sanitize sensitive data after lab
Lab 11: Offensive Tool Development (90-150 minutes)
Lab Part 1: Lightweight C2 Agent (25-35 min)
Objective: Build a beaconing agent and allowlisted server task queue.
Requirements:
- Agent checks in every 2-4 seconds with jitter
- Server returns tasks from a JSON queue
- Tasks are allowlisted (no raw shell)
Success Criteria: Agent receives tasks and prints results with signatures verified.
Hint: Allowlist pattern
TASKS = {"sysinfo": task_sysinfo, "process_list": task_process_list}
if task not in TASKS:
return {"error": "task_not_allowed"}
Lab Part 2: Windows Process Interrogator (20-25 min)
Objective: Enumerate and filter processes on a Windows VM.
Requirements:
- List pid, name, user, parent pid
- Filter by keyword or username
- Export results to
process_report.json
Success Criteria: Report lists 20+ processes with metadata.
Hint: JSON export
import json
with open("process_report.json", "w", encoding="utf-8") as handle:
json.dump(processes, handle, indent=2)
Lab Part 3: Screenshot Capture Utility (15-20 min)
Objective: Capture a local screenshot and log metadata.
Requirements:
- Save a PNG file with timestamp
- Log file path and size to
screenshot_log.jsonl - Run only on local lab VM
Success Criteria: Screenshot file saved and logged.
Hint: Metadata log
entry = {"file": filename, "size": os.path.getsize(filename)}
with open("screenshot_log.jsonl", "a", encoding="utf-8") as handle:
handle.write(json.dumps(entry) + "\n")
Lab Part 4: Clipboard Monitor (15-20 min)
Objective: Monitor clipboard changes and flag secret-like patterns.
Requirements:
- Detect clipboard changes every 1 second
- Flag patterns like API keys or tokens
- Log alerts to
clipboard_alerts.jsonl
Success Criteria: Alerts generated when test tokens are copied.
Hint: Pattern detection
if looks_sensitive(value):
log_action("clipboard_alert", {"value": "[REDACTED]"})
Lab Part 5: Simple Payload Encoder (15-20 min)
Objective: Encode and decode strings using base64 or ROT13.
Requirements:
- Provide encode/decode functions
- Validate input and handle errors
- Log encoded output to
encoded_payload.txt
Success Criteria: Round-trip decode matches original input.
Hint: Error handling
try:
decoded = decode_text(encoded)
except ValueError as exc:
print(f"Error: {exc}")
Deliverables:
c2_protocol.py- Signed message schemac2_server.py- Task queue serverc2_agent.py- Allowlisted agentprocess_report.json- Process inventoryscreenshot_*.png- Screenshot artifactclipboard_alerts.jsonl- Clipboard alertsencoded_payload.txt- Encoded payload output
Additional Resources
Offensive Tooling
Libraries Used
Safe Lab Environments
- FLARE VM (Windows analysis lab)
- Kali Linux (offensive testing tools)
- Security Onion (blue team monitoring)
Key Takeaways
- Authorization and scope define ethical offensive work
- C2 agents must be allowlisted and auditable
- Process inventory is foundational reconnaissance
- Input capture and clipboard tools require strict consent
- Obfuscation and encoding are detectable signals
- Defenders map offensive tooling to ATT&CK techniques
Week 11 Quiz
Test your understanding of offensive tool development.
Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.
Take Quiz