Skip to content
CSY105 Week 09 Beginner

Week Content

Programming for Security

Track your progress through this week's content

Week Overview

This week dives into packets: how they are structured, captured, crafted, and analyzed. You will use Scapy to build professional-grade sniffers and analyzers in a lab-only environment.

  • Understand Ethernet, IP, TCP, and UDP headers
  • Sniff traffic with filters and extract fields
  • Craft packets for testing and learning
  • Analyze DNS and TCP sessions
  • Build safe demos of common attack patterns
⚠️ Ethical Warning: Packet manipulation and interception are illegal without explicit authorization. Perform all labs in isolated test networks you own. Do not run ARP spoofing, SYN floods, or sniffers on public or shared networks.
Real-World Context: SOC analysts and network engineers use packet analysis to validate alerts, troubleshoot outages, and investigate suspicious communications. Tools like tcpdump, Wireshark, and Zeek rely on the same packet-level understanding.

Section 1: Packet Structure Fundamentals

OSI Layers in Practice

When you sniff traffic, you see how layers stack: Ethernet → IP → TCP/UDP → Application data. Scapy exposes these layers as Python objects.

Packet Anatomy Example

#!/usr/bin/env python3
"""
Inspect basic packet fields with Scapy.
"""
from __future__ import annotations

from scapy.all import Ether, IP, TCP


packet = Ether() / IP(dst="8.8.8.8") / TCP(dport=443, sport=51515, flags="S")
packet.show()

# Accessing fields
print(packet[Ether].src)
print(packet[IP].dst)
print(packet[TCP].flags)

Common Header Fields

Layer Key Fields Why It Matters
Ethernet src, dst, type MAC addressing and protocol identification
IP src, dst, ttl, id Routing, hop limits, fragmentation
TCP sport, dport, flags, seq Connection management, reliability
UDP sport, dport, len Connectionless, low overhead

Packet Layers Visualized

Ethernet: 14 bytes
  └── IP: 20 bytes
       └── TCP: 20 bytes
            └── Payload: HTTP, TLS, DNS, etc.

TCP Flag Reference

Flag Meaning
SYN Start connection
ACK Acknowledge data
FIN Close connection
RST Reset connection

Layer Stacking Examples

#!/usr/bin/env python3
"""
Demonstrate common layer stacks in Scapy.
"""
from __future__ import annotations

from scapy.all import Ether, IP, UDP, TCP, ICMP, DNS, DNSQR, Raw


def demo_stacks() -> None:
    """
    Print summaries of common packet stacks.
    """
    dns_packet = Ether() / IP(dst="8.8.8.8") / UDP(dport=53) / DNS(rd=1, qd=DNSQR(qname="example.com"))
    http_packet = Ether() / IP(dst="93.184.216.34") / TCP(dport=80, flags="S")
    icmp_packet = Ether() / IP(dst="1.1.1.1") / ICMP()
    udp_payload = Ether() / IP(dst="192.168.56.10") / UDP(dport=9999) / Raw(load=b"TEST")

    print(dns_packet.summary())
    print(http_packet.summary())
    print(icmp_packet.summary())
    print(udp_payload.summary())


if __name__ == "__main__":
    demo_stacks()

Section 2: Sniffing & Filtering

Basic Sniffer with Callback

#!/usr/bin/env python3
"""
Capture packets and print key fields.
"""
from __future__ import annotations

from datetime import datetime
from scapy.all import sniff, IP, TCP


def handle_packet(packet) -> None:
    """Callback for each sniffed packet."""
    if IP in packet:
        ip_layer = packet[IP]
        src = ip_layer.src
        dst = ip_layer.dst
        proto = ip_layer.proto
        timestamp = datetime.utcnow().isoformat()

        summary = f"{timestamp} IP {src} -> {dst} proto={proto}"
        if TCP in packet:
            summary += f" TCP dport={packet[TCP].dport} flags={packet[TCP].flags}"
        print(summary)


if __name__ == "__main__":
    print("[*] Sniffing 10 packets on default interface...")
    sniff(count=10, prn=handle_packet)

BPF Filter Examples

# Only HTTP traffic
port 80

# Only DNS queries
udp and port 53

# Only traffic from a specific host
host 192.168.1.10

# TCP SYN packets
tcp[13] & 2 != 0

Filtered Sniffer with BPF

#!/usr/bin/env python3
"""
Sniff only DNS traffic and extract query names.
"""
from __future__ import annotations

from scapy.all import sniff, DNS, DNSQR


def handle_dns(packet) -> None:
    """Print DNS queries."""
    if DNS in packet and packet[DNS].qd is not None:
        query = packet[DNSQR].qname.decode(errors="ignore")
        print(f"DNS Query: {query}")


if __name__ == "__main__":
    sniff(filter="udp port 53", prn=handle_dns, store=False)

Timed Capture to PCAP

#!/usr/bin/env python3
"""
Capture packets for a fixed duration and save to PCAP.
"""
from __future__ import annotations

import time
from scapy.all import sniff, wrpcap


def capture_for(seconds: int, bpf: str = "tcp") -> None:
    """
    Capture packets for a fixed time window.
    """
    print(f"[*] Capturing {seconds}s with filter: {bpf}")
    packets = sniff(timeout=seconds, filter=bpf)
    ts = int(time.time())
    filename = f"capture_{ts}.pcap"
    wrpcap(filename, packets)
    print(f"[*] Wrote {len(packets)} packets to {filename}")


if __name__ == "__main__":
    capture_for(15, bpf="tcp port 80")

Section 3: Crafting Packets Safely

Custom Packet Builder

#!/usr/bin/env python3
"""
Craft and send a custom TCP SYN packet (lab only).
"""
from __future__ import annotations

from scapy.all import IP, TCP, send


def send_syn(target_ip: str, target_port: int, source_port: int = 51515) -> None:
    """
    Send a single SYN packet for testing.
    """
    packet = IP(dst=target_ip) / TCP(dport=target_port, sport=source_port, flags="S")
    send(packet, verbose=False)
    print(f"[*] Sent SYN to {target_ip}:{target_port}")


if __name__ == "__main__":
    send_syn("192.168.56.10", 22)

Rate-Limited SYN Demo (Local Lab)

#!/usr/bin/env python3
"""
Educational SYN flood demonstration with strict rate limits.
"""
from __future__ import annotations

import time
from scapy.all import IP, TCP, send


def syn_demo(target_ip: str, target_port: int, count: int = 50, delay: float = 0.1) -> None:
    """
    Send a low-rate stream of SYN packets for learning.
    """
    for i in range(count):
        packet = IP(dst=target_ip) / TCP(dport=target_port, sport=40000 + i, flags="S")
        send(packet, verbose=False)
        time.sleep(delay)
    print(f"[*] Completed demo ({count} packets)")


if __name__ == "__main__":
    syn_demo("192.168.56.10", 80)

Custom UDP Packet

#!/usr/bin/env python3
"""
Send a UDP packet with payload.
"""
from __future__ import annotations

from scapy.all import IP, UDP, Raw, send


def send_udp(target_ip: str, target_port: int, payload: bytes) -> None:
    """
    Send a UDP packet with payload for testing.
    """
    packet = IP(dst=target_ip) / UDP(dport=target_port) / Raw(load=payload)
    send(packet, verbose=False)
    print(f"[*] Sent UDP payload to {target_ip}:{target_port}")


if __name__ == "__main__":
    send_udp("192.168.56.10", 9999, b"TEST-UDP-PAYLOAD")

Section 4: Scanning & Discovery with ARP

ARP Scan for Local Network Discovery

#!/usr/bin/env python3
"""
ARP scan to identify live hosts on a local subnet.
"""
from __future__ import annotations

from typing import List, Dict
from scapy.all import ARP, Ether, srp


def arp_scan(network: str) -> List[Dict[str, str]]:
    """
    Return list of hosts from ARP scan.
    """
    packet = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=network)
    answered, _ = srp(packet, timeout=2, verbose=False)

    results = []
    for _, response in answered:
        results.append({"ip": response.psrc, "mac": response.hwsrc})
    return results


if __name__ == "__main__":
    hosts = arp_scan("192.168.56.0/24")
    for host in hosts:
        print(f"{host['ip']} - {host['mac']}")

Ethical Focus

ARP scanning can be noisy. Limit scans to lab subnets and notify stakeholders if used in enterprise environments.

Section 5: DNS and TCP Session Analysis

DNS Query/Response Analyzer

#!/usr/bin/env python3
"""
Capture DNS queries and match with responses.
"""
from __future__ import annotations

from collections import defaultdict
from scapy.all import sniff, DNS, DNSQR, DNSRR


pending = defaultdict(list)


def handle_dns(packet) -> None:
    """Track DNS queries and responses."""
    if DNS in packet:
        dns = packet[DNS]
        if dns.qr == 0 and dns.qd is not None:
            query = dns[DNSQR].qname.decode(errors="ignore")
            pending[dns.id].append(query)
            print(f"[Q] {query}")
        elif dns.qr == 1 and dns.an is not None:
            answers = []
            for i in range(dns.ancount):
                rr = dns.an[i]
                if isinstance(rr, DNSRR):
                    answers.append(rr.rdata)
            queries = pending.pop(dns.id, [])
            for query in queries:
                print(f"[A] {query} -> {answers}")


if __name__ == "__main__":
    sniff(filter="udp port 53", prn=handle_dns, store=False)

TCP Session Tracker

#!/usr/bin/env python3
"""
Track TCP connections by 4-tuple.
"""
from __future__ import annotations

from collections import defaultdict
from datetime import datetime
from scapy.all import sniff, IP, TCP


connections = defaultdict(lambda: {"packets": 0, "start": None})


def handle_tcp(packet) -> None:
    """Count packets per TCP flow."""
    if IP in packet and TCP in packet:
        ip = packet[IP]
        tcp = packet[TCP]
        flow = (ip.src, tcp.sport, ip.dst, tcp.dport)

        if connections[flow]["start"] is None:
            connections[flow]["start"] = datetime.utcnow().isoformat()

        connections[flow]["packets"] += 1
        if tcp.flags & 1:  # FIN
            count = connections[flow]["packets"]
            print(f"[FIN] Flow {flow} - packets={count}")


if __name__ == "__main__":
    sniff(filter="tcp", prn=handle_tcp, store=False)

Section 6: PCAP Capture and Analysis

Saving Packets to PCAP

#!/usr/bin/env python3
"""
Save captured packets to a PCAP file for offline analysis.
"""
from __future__ import annotations

from scapy.all import sniff, wrpcap


if __name__ == "__main__":
    packets = sniff(count=50)
    wrpcap("capture.pcap", packets)
    print("[*] Saved 50 packets to capture.pcap")

Reading PCAP Files

#!/usr/bin/env python3
"""
Read a PCAP and summarize protocols.
"""
from __future__ import annotations

from collections import Counter
from scapy.all import rdpcap, IP, TCP, UDP


def summarize_pcap(path: str) -> None:
    """Summarize packets by protocol."""
    packets = rdpcap(path)
    proto_counts = Counter()

    for packet in packets:
        if IP in packet:
            proto = packet[IP].proto
            proto_counts[proto] += 1

    print("Protocol counts:")
    for proto, count in proto_counts.items():
        print(f"  - {proto}: {count}")


if __name__ == "__main__":
    summarize_pcap("capture.pcap")

Sample Summary Output

Protocol counts:
  - 6: 240   # TCP
  - 17: 55   # UDP
  - 1: 12    # ICMP

PCAP Review Checklist

Section 7: TCP Handshake Analysis

Detecting Three-Way Handshakes

Tracking SYN, SYN-ACK, and ACK packets helps verify connection establishment and detect half-open scans.

Handshake Tracker

#!/usr/bin/env python3
"""
Track TCP handshakes and identify incomplete connections.
"""
from __future__ import annotations

from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, Tuple

from scapy.all import sniff, IP, TCP


FlowKey = Tuple[str, int, str, int]
handshakes: Dict[FlowKey, Dict[str, bool]] = defaultdict(lambda: {"syn": False, "synack": False, "ack": False})
timestamps: Dict[FlowKey, datetime] = {}


def handle_handshake(packet) -> None:
    if IP not in packet or TCP not in packet:
        return

    ip = packet[IP]
    tcp = packet[TCP]
    flow = (ip.src, tcp.sport, ip.dst, tcp.dport)

    flags = tcp.flags
    if flags & 0x02:  # SYN
        handshakes[flow]["syn"] = True
        timestamps[flow] = datetime.utcnow()
    elif flags & 0x12:  # SYN-ACK
        reverse = (ip.dst, tcp.dport, ip.src, tcp.sport)
        handshakes[reverse]["synack"] = True
    elif flags & 0x10:  # ACK
        handshakes[flow]["ack"] = True

    status = handshakes[flow]
    if status["syn"] and status["synack"] and status["ack"]:
        print(f"[✓] Complete handshake: {flow}")


def detect_incomplete(timeout_sec: int = 10) -> None:
    """
    Find SYNs that did not complete.
    """
    now = datetime.utcnow()
    for flow, started in list(timestamps.items()):
        if now - started > timedelta(seconds=timeout_sec):
            status = handshakes[flow]
            if status["syn"] and not status["ack"]:
                print(f"[!] Incomplete handshake: {flow} status={status}")
            timestamps.pop(flow, None)


def main() -> None:
    """
    Run live handshake tracker.
    """
    while True:
        sniff(count=20, prn=handle_handshake, store=False)
        detect_incomplete()


if __name__ == "__main__":
    main()

Section 8: ARP Spoofing Detection

Detecting MAC Address Changes

ARP spoofing often involves rapid MAC changes for the same IP. Track ARP replies and alert when a mapping changes.

ARP Monitor

#!/usr/bin/env python3
"""
Monitor ARP traffic and detect potential spoofing.
"""
from __future__ import annotations

from typing import Dict

from scapy.all import sniff, ARP


arp_table: Dict[str, str] = {}


def handle_arp(packet) -> None:
    if ARP in packet and packet[ARP].op == 2:  # is-at (reply)
        ip = packet[ARP].psrc
        mac = packet[ARP].hwsrc
        if ip in arp_table and arp_table[ip] != mac:
            print(f"[!] ARP change detected: {ip} {arp_table[ip]} -> {mac}")
        arp_table[ip] = mac


if __name__ == "__main__":
    print("[*] Monitoring ARP traffic...")
    sniff(filter="arp", prn=handle_arp, store=False)

Section 9: HTTP and TLS Metadata Extraction

HTTP Request Extractor (Plaintext Only)

#!/usr/bin/env python3
"""
Extract HTTP request lines from plaintext traffic.
"""
from __future__ import annotations

from scapy.all import sniff, TCP, Raw


def handle_http(packet) -> None:
    if packet.haslayer(TCP) and packet.haslayer(Raw):
        payload = packet[Raw].load
        if payload.startswith(b"GET") or payload.startswith(b"POST") or payload.startswith(b"HEAD"):
            try:
                request_line = payload.split(b"\\r\\n", 1)[0].decode()
                print(f\"HTTP: {request_line}\")
            except UnicodeDecodeError:
                return


if __name__ == "__main__":
    sniff(filter="tcp port 80", prn=handle_http, store=False)

TLS SNI Extractor (Metadata Only)

#!/usr/bin/env python3
"""
Extract TLS Server Name Indication (SNI) without decrypting payloads.
"""
from __future__ import annotations

from scapy.all import sniff, TCP, Raw


def parse_sni(payload: bytes) -> str | None:
    """
    Simple TLS ClientHello SNI parser.
    """
    try:
        if payload[0] != 0x16:  # TLS Handshake
            return None
        # Skip record header (5 bytes) and handshake header (4 bytes)
        pointer = 5 + 4
        # Skip version (2) + random (32) + session ID length (1) + session ID
        pointer += 2 + 32
        session_len = payload[pointer]
        pointer += 1 + session_len
        # Cipher suites
        cs_len = int.from_bytes(payload[pointer:pointer+2], "big")
        pointer += 2 + cs_len
        # Compression methods
        comp_len = payload[pointer]
        pointer += 1 + comp_len
        # Extensions length
        ext_len = int.from_bytes(payload[pointer:pointer+2], "big")
        pointer += 2
        end = pointer + ext_len

        while pointer + 4 <= end:
            ext_type = int.from_bytes(payload[pointer:pointer+2], "big")
            ext_size = int.from_bytes(payload[pointer+2:pointer+4], "big")
            pointer += 4
            if ext_type == 0x00:  # SNI
                # Skip list length (2) and name type (1)
                pointer += 2 + 1
                name_len = int.from_bytes(payload[pointer:pointer+2], "big")
                pointer += 2
                return payload[pointer:pointer+name_len].decode()
            pointer += ext_size
    except (IndexError, UnicodeDecodeError):
        return None
    return None


def handle_tls(packet) -> None:
    if packet.haslayer(TCP) and packet.haslayer(Raw):
        sni = parse_sni(packet[Raw].load)
        if sni:
            print(f\"TLS SNI: {sni}\")


if __name__ == "__main__":
    sniff(filter="tcp port 443", prn=handle_tls, store=False)

Section 10: Traffic Profiling with Pandas

Flow Summary Report

#!/usr/bin/env python3
"""
Convert PCAP to a flow summary and output CSV.
"""
from __future__ import annotations

from collections import Counter
from typing import Dict, Tuple

import pandas as pd
from scapy.all import rdpcap, IP, TCP, UDP


FlowKey = Tuple[str, int, str, int, str]


def pcap_to_flows(path: str) -> pd.DataFrame:
    """
    Aggregate packets into 5-tuple flows.
    """
    packets = rdpcap(path)
    flow_counts: Counter[FlowKey] = Counter()

    for packet in packets:
        if IP not in packet:
            continue
        ip = packet[IP]
        if TCP in packet:
            proto = "TCP"
            sport, dport = packet[TCP].sport, packet[TCP].dport
        elif UDP in packet:
            proto = "UDP"
            sport, dport = packet[UDP].sport, packet[UDP].dport
        else:
            continue
        key = (ip.src, sport, ip.dst, dport, proto)
        flow_counts[key] += 1

    rows = []
    for (src, sport, dst, dport, proto), count in flow_counts.items():
        rows.append({
            "src": src,
            "sport": sport,
            "dst": dst,
            "dport": dport,
            "proto": proto,
            "packets": count,
        })

    df = pd.DataFrame(rows)
    return df.sort_values("packets", ascending=False)


def main() -> None:
    df = pcap_to_flows("capture.pcap")
    df.to_csv("flow_summary.csv", index=False)
    print(df.head(10))


if __name__ == "__main__":
    main()

Charting Top Conversations

#!/usr/bin/env python3
"""
Create a bar chart of top TCP destinations.
"""
from __future__ import annotations

import pandas as pd
import matplotlib.pyplot as plt


def chart_top_destinations(path: str) -> None:
    """
    Chart top destination ports from flow summary.
    """
    df = pd.read_csv(path)
    top = df.groupby("dport")["packets"].sum().sort_values(ascending=False).head(10)

    top.plot(kind="bar")
    plt.title("Top Destination Ports")
    plt.xlabel("Destination Port")
    plt.ylabel("Packet Count")
    plt.tight_layout()
    plt.savefig("top_ports.png")
    print("[*] Saved chart to top_ports.png")


if __name__ == "__main__":
    chart_top_destinations("flow_summary.csv")

Section 11: ICMP and Traceroute

ICMP Echo Sweep

#!/usr/bin/env python3
"""
ICMP ping sweep for lab discovery (authorized only).
"""
from __future__ import annotations

import ipaddress
from typing import List

from scapy.all import IP, ICMP, sr1


def ping_host(ip: str, timeout: float = 1.0) -> bool:
    """
    Return True if host replies to ICMP.
    """
    packet = IP(dst=ip) / ICMP()
    response = sr1(packet, timeout=timeout, verbose=False)
    return response is not None


def sweep(network: str) -> List[str]:
    """
    Sweep a subnet for live hosts.
    """
    live = []
    for host in ipaddress.ip_network(network).hosts():
        ip = str(host)
        if ping_host(ip):
            live.append(ip)
            print(f"[+] Alive: {ip}")
    return live


if __name__ == "__main__":
    sweep("192.168.56.0/24")

Traceroute with Increasing TTL

#!/usr/bin/env python3
"""
Simple traceroute implementation using TTL.
"""
from __future__ import annotations

from scapy.all import IP, ICMP, sr1


def traceroute(target: str, max_hops: int = 20) -> None:
    """
    Print hop-by-hop route.
    """
    for ttl in range(1, max_hops + 1):
        packet = IP(dst=target, ttl=ttl) / ICMP()
        response = sr1(packet, timeout=1, verbose=False)
        if response is None:
            print(f\"{ttl}: *\")
            continue
        print(f\"{ttl}: {response.src}\")
        if response.src == target:
            break


if __name__ == "__main__":
    traceroute(\"8.8.8.8\")

Section 12: Checksums and Integrity

Why Checksums Matter

Checksums detect accidental corruption and tampering. Packet analyzers can verify or recompute checksums when crafting packets.

Checksum Verification Example

#!/usr/bin/env python3
"""
Verify IP and TCP checksums in captured packets.
"""
from __future__ import annotations

from scapy.all import rdpcap, IP, TCP


def verify_checksums(path: str) -> None:
    """
    Compare stored checksums with recomputed values.
    """
    packets = rdpcap(path)
    for packet in packets:
        if IP in packet and TCP in packet:
            ip = packet[IP]
            tcp = packet[TCP]

            original_ip = ip.chksum
            original_tcp = tcp.chksum

            del ip.chksum
            del tcp.chksum

            rebuilt = IP(bytes(ip))
            rebuilt_tcp = TCP(bytes(tcp))

            if rebuilt.chksum != original_ip or rebuilt_tcp.chksum != original_tcp:
                print(f\"[!] Checksum mismatch: {ip.src} -> {ip.dst}\")


if __name__ == "__main__":
    verify_checksums(\"capture.pcap\")

Section 13: IDS-Style Signatures

Simple Payload Signature

#!/usr/bin/env python3
"""
Detect payload patterns in network traffic.
"""
from __future__ import annotations

from scapy.all import sniff, Raw


SIGNATURES = [
    b\"/etc/passwd\",
    b\"union select\",
    b\"powershell -enc\",
]


def handle_payload(packet) -> None:
    if packet.haslayer(Raw):
        payload = packet[Raw].load.lower()
        for sig in SIGNATURES:
            if sig in payload:
                print(f\"[ALERT] Signature match: {sig.decode(errors='ignore')}\")
                break


if __name__ == "__main__":
    sniff(filter=\"tcp\", prn=handle_payload, store=False)

Basic Signature Config

# signatures.txt
union select
../../../../
cmd.exe /c
powershell -enc
curl http

Section 14: tcpdump Integration

Launching tcpdump from Python

#!/usr/bin/env python3
"""
Run tcpdump and parse output lines.
"""
from __future__ import annotations

import subprocess


def run_tcpdump() -> None:
    """
    Run tcpdump for 10 packets and print output.
    """
    command = ["tcpdump", "-n", "-c", "10", "tcp"]
    result = subprocess.run(command, capture_output=True, text=True, check=False)
    print(result.stdout)


if __name__ == "__main__":
    run_tcpdump()

BPF Cheat Sheet

# Host or network
host 10.0.0.5
net 192.168.1.0/24

# Protocols
tcp
udp
icmp

# Ports
port 22
portrange 1-1024

# Combinations
tcp and port 443
udp and port 53
src host 10.0.0.5 and dst port 80

Section 15: Stream Reconstruction

Reassemble TCP Streams (Simple)

#!/usr/bin/env python3
"""
Reconstruct TCP payload streams from a PCAP.
"""
from __future__ import annotations

from collections import defaultdict
from typing import Dict, Tuple

from scapy.all import rdpcap, IP, TCP, Raw


FlowKey = Tuple[str, int, str, int]


def reconstruct_streams(path: str) -> Dict[FlowKey, bytes]:
    """
    Reassemble payloads in order of capture (simplified).
    """
    packets = rdpcap(path)
    streams: Dict[FlowKey, bytes] = defaultdict(bytes)

    for packet in packets:
        if IP in packet and TCP in packet and Raw in packet:
            ip = packet[IP]
            tcp = packet[TCP]
            flow = (ip.src, tcp.sport, ip.dst, tcp.dport)
            streams[flow] += packet[Raw].load

    return streams


def save_streams(streams: Dict[FlowKey, bytes]) -> None:
    """
    Save reconstructed streams to files.
    """
    for idx, (flow, data) in enumerate(streams.items(), start=1):
        src, sport, dst, dport = flow
        filename = f\"stream_{idx}_{src}_{sport}_to_{dst}_{dport}.bin\"
        with open(filename, \"wb\") as handle:
            handle.write(data)
        print(f\"[*] Wrote {filename} ({len(data)} bytes)\")


if __name__ == \"__main__\":
    streams = reconstruct_streams(\"capture.pcap\")
    save_streams(streams)

Extract HTTP Payloads from Streams

#!/usr/bin/env python3
"""
Search reconstructed streams for HTTP headers.
"""
from __future__ import annotations

from typing import Dict, Tuple


def find_http(streams: Dict[Tuple[str, int, str, int], bytes]) -> None:
    """
    Print HTTP headers found in reconstructed streams.
    """
    for flow, data in streams.items():
        if b"HTTP/" in data or data.startswith(b"GET") or data.startswith(b"POST"):
            try:
                header = data.split(b"\r\n\r\n", 1)[0]
                print(f"[HTTP] {flow}\n{header.decode(errors='ignore')}\n")
            except Exception:
                continue


if __name__ == "__main__":
    example_streams = {
        ("10.0.0.5", 51515, "93.184.216.34", 80): b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
    }
    find_http(example_streams)

Wireshark Filter Equivalents

# TCP Reset packets
tcp.flags.reset == 1

# DNS queries
dns.flags.response == 0

# HTTP requests
http.request

# TLS ClientHello with SNI
ssl.handshake.extensions_server_name

Section 16: Port Scan Detection

Detecting Fast Scans

Rapid sequential port attempts from a single IP are a common scanning indicator. Track unique destination ports per source.

Scan Detector (Rolling Window)

#!/usr/bin/env python3
"""
Detect potential port scans based on unique ports per source IP.
"""
from __future__ import annotations

from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, Set

from scapy.all import sniff, IP, TCP


WINDOW = timedelta(seconds=30)
THRESHOLD = 15
seen_ports: Dict[str, Dict[str, Set[int]]] = defaultdict(lambda: {"ports": set(), "first_seen": None})


def handle_packet(packet) -> None:
    if IP not in packet or TCP not in packet:
        return

    ip = packet[IP].src
    dport = packet[TCP].dport
    entry = seen_ports[ip]

    if entry["first_seen"] is None:
        entry["first_seen"] = datetime.utcnow()

    entry["ports"].add(dport)

    if datetime.utcnow() - entry["first_seen"] > WINDOW:
        entry["ports"].clear()
        entry["first_seen"] = datetime.utcnow()

    if len(entry["ports"]) >= THRESHOLD:
        print(f"[ALERT] Port scan detected from {ip} ({len(entry['ports'])} ports)")


if __name__ == "__main__":
    sniff(filter="tcp", prn=handle_packet, store=False)

Section 17: RTT and Latency Measurement

Measuring RTT with ICMP

#!/usr/bin/env python3
"""
Measure ICMP round-trip times and compute statistics.
"""
from __future__ import annotations

import statistics
import time
from typing import List

from scapy.all import IP, ICMP, sr1


def ping_rtt(host: str, count: int = 5) -> List[float]:
    """
    Return RTT measurements in milliseconds.
    """
    rtts = []
    for _ in range(count):
        start = time.time()
        reply = sr1(IP(dst=host) / ICMP(), timeout=1, verbose=False)
        if reply:
            rtts.append((time.time() - start) * 1000)
    return rtts


def summarize_rtt(rtts: List[float]) -> None:
    """
    Print RTT statistics.
    """
    if not rtts:
        print("[!] No replies")
        return
    print(f"Min: {min(rtts):.2f} ms")
    print(f"Max: {max(rtts):.2f} ms")
    print(f"Avg: {statistics.mean(rtts):.2f} ms")


if __name__ == "__main__":
    results = ping_rtt("8.8.8.8")
    summarize_rtt(results)

Sample RTT Output

Min: 11.24 ms
Max: 18.93 ms
Avg: 14.65 ms

Lab 09: Packet Analysis Toolkit (90-150 minutes)

Lab Safety: All exercises must be done in a private VM lab or isolated Docker network. Obtain explicit authorization for any network you test.

Lab Part 1: Custom Packet Sniffer (25-35 min)

Objective: Build a sniffer that filters and logs traffic to CSV.

Requirements:

Success Criteria: CSV updates in real time while traffic flows.

Hint: CSV logging
with open("sniff_log.csv", "a", encoding="utf-8") as handle:
    handle.write("timestamp,src,dst,proto,info\n")
    handle.flush()

Lab Part 2: ARP Scanner (20-30 min)

Objective: Discover live hosts in your lab subnet.

Requirements:

Success Criteria: All known lab hosts appear in results.

Hint: JSON output
import json
with open("arp_results.json", "w", encoding="utf-8") as handle:
    json.dump(results, handle, indent=2)

Lab Part 3: DNS Analyzer (20-30 min)

Objective: Track DNS queries and responses to detect suspicious domains.

Requirements:

Success Criteria: Output includes query/response pairs with timestamps.

Hint: Seen domains set
seen = set()
if query not in seen:
    print(f"[NEW] {query}")
    seen.add(query)

Lab Part 4: SYN Flood Demo (20-30 min)

Objective: Demonstrate a low-rate SYN flood in a controlled lab.

Requirements:

Success Criteria: Target host logs show SYN spike without service disruption.

Hint: Rate limiting
start = time.time()
count = 0
while time.time() - start < 60:
    send(packet, verbose=False)
    count += 1
    time.sleep(0.1)
print(f"Sent {count} packets")

Lab Part 5: TCP Session Report (20-25 min)

Objective: Generate a report of top TCP flows.

Requirements:

Success Criteria: Report shows the heaviest TCP conversations.

Hint: Sorting flows
top = sorted(flows.items(), key=lambda x: x[1]["packets"], reverse=True)[:10]
for flow, stats in top:
    print(flow, stats)

Stretch Challenges (Optional)

Hint: DNS tunneling heuristic
if len(query) > 60 and query.count('.') >= 3:
    print(f"[SUSPECT] Long DNS query: {query}")
🎯 Lab Complete! You now have a packet analysis toolkit for detection and troubleshooting.

📤 Deliverables:

Additional Resources

Scapy & Packet Analysis

Network Protocol References

Blue Team Use Cases

Key Takeaways

Week 09 Quiz

Test your understanding of packet analysis with Scapy.

Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.

Take Quiz