Week Overview
This week dives into packets: how they are structured, captured, crafted, and analyzed. You will use Scapy to build professional-grade sniffers and analyzers in a lab-only environment.
- Understand Ethernet, IP, TCP, and UDP headers
- Sniff traffic with filters and extract fields
- Craft packets for testing and learning
- Analyze DNS and TCP sessions
- Build safe demos of common attack patterns
Section 1: Packet Structure Fundamentals
OSI Layers in Practice
When you sniff traffic, you see how layers stack: Ethernet → IP → TCP/UDP → Application data. Scapy exposes these layers as Python objects.
Packet Anatomy Example
#!/usr/bin/env python3
"""
Inspect basic packet fields with Scapy.
"""
from __future__ import annotations
from scapy.all import Ether, IP, TCP
packet = Ether() / IP(dst="8.8.8.8") / TCP(dport=443, sport=51515, flags="S")
packet.show()
# Accessing fields
print(packet[Ether].src)
print(packet[IP].dst)
print(packet[TCP].flags)
Common Header Fields
| Layer | Key Fields | Why It Matters |
|---|---|---|
| Ethernet | src, dst, type | MAC addressing and protocol identification |
| IP | src, dst, ttl, id | Routing, hop limits, fragmentation |
| TCP | sport, dport, flags, seq | Connection management, reliability |
| UDP | sport, dport, len | Connectionless, low overhead |
Packet Layers Visualized
Ethernet: 14 bytes
└── IP: 20 bytes
└── TCP: 20 bytes
└── Payload: HTTP, TLS, DNS, etc.
TCP Flag Reference
| Flag | Meaning |
|---|---|
| SYN | Start connection |
| ACK | Acknowledge data |
| FIN | Close connection |
| RST | Reset connection |
Layer Stacking Examples
#!/usr/bin/env python3
"""
Demonstrate common layer stacks in Scapy.
"""
from __future__ import annotations
from scapy.all import Ether, IP, UDP, TCP, ICMP, DNS, DNSQR, Raw
def demo_stacks() -> None:
"""
Print summaries of common packet stacks.
"""
dns_packet = Ether() / IP(dst="8.8.8.8") / UDP(dport=53) / DNS(rd=1, qd=DNSQR(qname="example.com"))
http_packet = Ether() / IP(dst="93.184.216.34") / TCP(dport=80, flags="S")
icmp_packet = Ether() / IP(dst="1.1.1.1") / ICMP()
udp_payload = Ether() / IP(dst="192.168.56.10") / UDP(dport=9999) / Raw(load=b"TEST")
print(dns_packet.summary())
print(http_packet.summary())
print(icmp_packet.summary())
print(udp_payload.summary())
if __name__ == "__main__":
demo_stacks()
Section 2: Sniffing & Filtering
Basic Sniffer with Callback
#!/usr/bin/env python3
"""
Capture packets and print key fields.
"""
from __future__ import annotations
from datetime import datetime
from scapy.all import sniff, IP, TCP
def handle_packet(packet) -> None:
"""Callback for each sniffed packet."""
if IP in packet:
ip_layer = packet[IP]
src = ip_layer.src
dst = ip_layer.dst
proto = ip_layer.proto
timestamp = datetime.utcnow().isoformat()
summary = f"{timestamp} IP {src} -> {dst} proto={proto}"
if TCP in packet:
summary += f" TCP dport={packet[TCP].dport} flags={packet[TCP].flags}"
print(summary)
if __name__ == "__main__":
print("[*] Sniffing 10 packets on default interface...")
sniff(count=10, prn=handle_packet)
BPF Filter Examples
# Only HTTP traffic
port 80
# Only DNS queries
udp and port 53
# Only traffic from a specific host
host 192.168.1.10
# TCP SYN packets
tcp[13] & 2 != 0
Filtered Sniffer with BPF
#!/usr/bin/env python3
"""
Sniff only DNS traffic and extract query names.
"""
from __future__ import annotations
from scapy.all import sniff, DNS, DNSQR
def handle_dns(packet) -> None:
"""Print DNS queries."""
if DNS in packet and packet[DNS].qd is not None:
query = packet[DNSQR].qname.decode(errors="ignore")
print(f"DNS Query: {query}")
if __name__ == "__main__":
sniff(filter="udp port 53", prn=handle_dns, store=False)
Timed Capture to PCAP
#!/usr/bin/env python3
"""
Capture packets for a fixed duration and save to PCAP.
"""
from __future__ import annotations
import time
from scapy.all import sniff, wrpcap
def capture_for(seconds: int, bpf: str = "tcp") -> None:
"""
Capture packets for a fixed time window.
"""
print(f"[*] Capturing {seconds}s with filter: {bpf}")
packets = sniff(timeout=seconds, filter=bpf)
ts = int(time.time())
filename = f"capture_{ts}.pcap"
wrpcap(filename, packets)
print(f"[*] Wrote {len(packets)} packets to {filename}")
if __name__ == "__main__":
capture_for(15, bpf="tcp port 80")
Section 3: Crafting Packets Safely
Custom Packet Builder
#!/usr/bin/env python3
"""
Craft and send a custom TCP SYN packet (lab only).
"""
from __future__ import annotations
from scapy.all import IP, TCP, send
def send_syn(target_ip: str, target_port: int, source_port: int = 51515) -> None:
"""
Send a single SYN packet for testing.
"""
packet = IP(dst=target_ip) / TCP(dport=target_port, sport=source_port, flags="S")
send(packet, verbose=False)
print(f"[*] Sent SYN to {target_ip}:{target_port}")
if __name__ == "__main__":
send_syn("192.168.56.10", 22)
Rate-Limited SYN Demo (Local Lab)
#!/usr/bin/env python3
"""
Educational SYN flood demonstration with strict rate limits.
"""
from __future__ import annotations
import time
from scapy.all import IP, TCP, send
def syn_demo(target_ip: str, target_port: int, count: int = 50, delay: float = 0.1) -> None:
"""
Send a low-rate stream of SYN packets for learning.
"""
for i in range(count):
packet = IP(dst=target_ip) / TCP(dport=target_port, sport=40000 + i, flags="S")
send(packet, verbose=False)
time.sleep(delay)
print(f"[*] Completed demo ({count} packets)")
if __name__ == "__main__":
syn_demo("192.168.56.10", 80)
Custom UDP Packet
#!/usr/bin/env python3
"""
Send a UDP packet with payload.
"""
from __future__ import annotations
from scapy.all import IP, UDP, Raw, send
def send_udp(target_ip: str, target_port: int, payload: bytes) -> None:
"""
Send a UDP packet with payload for testing.
"""
packet = IP(dst=target_ip) / UDP(dport=target_port) / Raw(load=payload)
send(packet, verbose=False)
print(f"[*] Sent UDP payload to {target_ip}:{target_port}")
if __name__ == "__main__":
send_udp("192.168.56.10", 9999, b"TEST-UDP-PAYLOAD")
Section 4: Scanning & Discovery with ARP
ARP Scan for Local Network Discovery
#!/usr/bin/env python3
"""
ARP scan to identify live hosts on a local subnet.
"""
from __future__ import annotations
from typing import List, Dict
from scapy.all import ARP, Ether, srp
def arp_scan(network: str) -> List[Dict[str, str]]:
"""
Return list of hosts from ARP scan.
"""
packet = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=network)
answered, _ = srp(packet, timeout=2, verbose=False)
results = []
for _, response in answered:
results.append({"ip": response.psrc, "mac": response.hwsrc})
return results
if __name__ == "__main__":
hosts = arp_scan("192.168.56.0/24")
for host in hosts:
print(f"{host['ip']} - {host['mac']}")
Ethical Focus
ARP scanning can be noisy. Limit scans to lab subnets and notify stakeholders if used in enterprise environments.
Section 5: DNS and TCP Session Analysis
DNS Query/Response Analyzer
#!/usr/bin/env python3
"""
Capture DNS queries and match with responses.
"""
from __future__ import annotations
from collections import defaultdict
from scapy.all import sniff, DNS, DNSQR, DNSRR
pending = defaultdict(list)
def handle_dns(packet) -> None:
"""Track DNS queries and responses."""
if DNS in packet:
dns = packet[DNS]
if dns.qr == 0 and dns.qd is not None:
query = dns[DNSQR].qname.decode(errors="ignore")
pending[dns.id].append(query)
print(f"[Q] {query}")
elif dns.qr == 1 and dns.an is not None:
answers = []
for i in range(dns.ancount):
rr = dns.an[i]
if isinstance(rr, DNSRR):
answers.append(rr.rdata)
queries = pending.pop(dns.id, [])
for query in queries:
print(f"[A] {query} -> {answers}")
if __name__ == "__main__":
sniff(filter="udp port 53", prn=handle_dns, store=False)
TCP Session Tracker
#!/usr/bin/env python3
"""
Track TCP connections by 4-tuple.
"""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime
from scapy.all import sniff, IP, TCP
connections = defaultdict(lambda: {"packets": 0, "start": None})
def handle_tcp(packet) -> None:
"""Count packets per TCP flow."""
if IP in packet and TCP in packet:
ip = packet[IP]
tcp = packet[TCP]
flow = (ip.src, tcp.sport, ip.dst, tcp.dport)
if connections[flow]["start"] is None:
connections[flow]["start"] = datetime.utcnow().isoformat()
connections[flow]["packets"] += 1
if tcp.flags & 1: # FIN
count = connections[flow]["packets"]
print(f"[FIN] Flow {flow} - packets={count}")
if __name__ == "__main__":
sniff(filter="tcp", prn=handle_tcp, store=False)
Section 6: PCAP Capture and Analysis
Saving Packets to PCAP
#!/usr/bin/env python3
"""
Save captured packets to a PCAP file for offline analysis.
"""
from __future__ import annotations
from scapy.all import sniff, wrpcap
if __name__ == "__main__":
packets = sniff(count=50)
wrpcap("capture.pcap", packets)
print("[*] Saved 50 packets to capture.pcap")
Reading PCAP Files
#!/usr/bin/env python3
"""
Read a PCAP and summarize protocols.
"""
from __future__ import annotations
from collections import Counter
from scapy.all import rdpcap, IP, TCP, UDP
def summarize_pcap(path: str) -> None:
"""Summarize packets by protocol."""
packets = rdpcap(path)
proto_counts = Counter()
for packet in packets:
if IP in packet:
proto = packet[IP].proto
proto_counts[proto] += 1
print("Protocol counts:")
for proto, count in proto_counts.items():
print(f" - {proto}: {count}")
if __name__ == "__main__":
summarize_pcap("capture.pcap")
Sample Summary Output
Protocol counts:
- 6: 240 # TCP
- 17: 55 # UDP
- 1: 12 # ICMP
PCAP Review Checklist
- Identify top talkers (src IP) and top destinations
- Confirm expected ports and protocols are present
- Look for repeated failed handshakes or resets
- Export suspicious flows for deeper inspection
Section 7: TCP Handshake Analysis
Detecting Three-Way Handshakes
Tracking SYN, SYN-ACK, and ACK packets helps verify connection establishment and detect half-open scans.
Handshake Tracker
#!/usr/bin/env python3
"""
Track TCP handshakes and identify incomplete connections.
"""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, Tuple
from scapy.all import sniff, IP, TCP
FlowKey = Tuple[str, int, str, int]
handshakes: Dict[FlowKey, Dict[str, bool]] = defaultdict(lambda: {"syn": False, "synack": False, "ack": False})
timestamps: Dict[FlowKey, datetime] = {}
def handle_handshake(packet) -> None:
if IP not in packet or TCP not in packet:
return
ip = packet[IP]
tcp = packet[TCP]
flow = (ip.src, tcp.sport, ip.dst, tcp.dport)
flags = tcp.flags
if flags & 0x02: # SYN
handshakes[flow]["syn"] = True
timestamps[flow] = datetime.utcnow()
elif flags & 0x12: # SYN-ACK
reverse = (ip.dst, tcp.dport, ip.src, tcp.sport)
handshakes[reverse]["synack"] = True
elif flags & 0x10: # ACK
handshakes[flow]["ack"] = True
status = handshakes[flow]
if status["syn"] and status["synack"] and status["ack"]:
print(f"[✓] Complete handshake: {flow}")
def detect_incomplete(timeout_sec: int = 10) -> None:
"""
Find SYNs that did not complete.
"""
now = datetime.utcnow()
for flow, started in list(timestamps.items()):
if now - started > timedelta(seconds=timeout_sec):
status = handshakes[flow]
if status["syn"] and not status["ack"]:
print(f"[!] Incomplete handshake: {flow} status={status}")
timestamps.pop(flow, None)
def main() -> None:
"""
Run live handshake tracker.
"""
while True:
sniff(count=20, prn=handle_handshake, store=False)
detect_incomplete()
if __name__ == "__main__":
main()
Section 8: ARP Spoofing Detection
Detecting MAC Address Changes
ARP spoofing often involves rapid MAC changes for the same IP. Track ARP replies and alert when a mapping changes.
ARP Monitor
#!/usr/bin/env python3
"""
Monitor ARP traffic and detect potential spoofing.
"""
from __future__ import annotations
from typing import Dict
from scapy.all import sniff, ARP
arp_table: Dict[str, str] = {}
def handle_arp(packet) -> None:
if ARP in packet and packet[ARP].op == 2: # is-at (reply)
ip = packet[ARP].psrc
mac = packet[ARP].hwsrc
if ip in arp_table and arp_table[ip] != mac:
print(f"[!] ARP change detected: {ip} {arp_table[ip]} -> {mac}")
arp_table[ip] = mac
if __name__ == "__main__":
print("[*] Monitoring ARP traffic...")
sniff(filter="arp", prn=handle_arp, store=False)
Section 9: HTTP and TLS Metadata Extraction
HTTP Request Extractor (Plaintext Only)
#!/usr/bin/env python3
"""
Extract HTTP request lines from plaintext traffic.
"""
from __future__ import annotations
from scapy.all import sniff, TCP, Raw
def handle_http(packet) -> None:
if packet.haslayer(TCP) and packet.haslayer(Raw):
payload = packet[Raw].load
if payload.startswith(b"GET") or payload.startswith(b"POST") or payload.startswith(b"HEAD"):
try:
request_line = payload.split(b"\\r\\n", 1)[0].decode()
print(f\"HTTP: {request_line}\")
except UnicodeDecodeError:
return
if __name__ == "__main__":
sniff(filter="tcp port 80", prn=handle_http, store=False)
TLS SNI Extractor (Metadata Only)
#!/usr/bin/env python3
"""
Extract TLS Server Name Indication (SNI) without decrypting payloads.
"""
from __future__ import annotations
from scapy.all import sniff, TCP, Raw
def parse_sni(payload: bytes) -> str | None:
"""
Simple TLS ClientHello SNI parser.
"""
try:
if payload[0] != 0x16: # TLS Handshake
return None
# Skip record header (5 bytes) and handshake header (4 bytes)
pointer = 5 + 4
# Skip version (2) + random (32) + session ID length (1) + session ID
pointer += 2 + 32
session_len = payload[pointer]
pointer += 1 + session_len
# Cipher suites
cs_len = int.from_bytes(payload[pointer:pointer+2], "big")
pointer += 2 + cs_len
# Compression methods
comp_len = payload[pointer]
pointer += 1 + comp_len
# Extensions length
ext_len = int.from_bytes(payload[pointer:pointer+2], "big")
pointer += 2
end = pointer + ext_len
while pointer + 4 <= end:
ext_type = int.from_bytes(payload[pointer:pointer+2], "big")
ext_size = int.from_bytes(payload[pointer+2:pointer+4], "big")
pointer += 4
if ext_type == 0x00: # SNI
# Skip list length (2) and name type (1)
pointer += 2 + 1
name_len = int.from_bytes(payload[pointer:pointer+2], "big")
pointer += 2
return payload[pointer:pointer+name_len].decode()
pointer += ext_size
except (IndexError, UnicodeDecodeError):
return None
return None
def handle_tls(packet) -> None:
if packet.haslayer(TCP) and packet.haslayer(Raw):
sni = parse_sni(packet[Raw].load)
if sni:
print(f\"TLS SNI: {sni}\")
if __name__ == "__main__":
sniff(filter="tcp port 443", prn=handle_tls, store=False)
Section 10: Traffic Profiling with Pandas
Flow Summary Report
#!/usr/bin/env python3
"""
Convert PCAP to a flow summary and output CSV.
"""
from __future__ import annotations
from collections import Counter
from typing import Dict, Tuple
import pandas as pd
from scapy.all import rdpcap, IP, TCP, UDP
FlowKey = Tuple[str, int, str, int, str]
def pcap_to_flows(path: str) -> pd.DataFrame:
"""
Aggregate packets into 5-tuple flows.
"""
packets = rdpcap(path)
flow_counts: Counter[FlowKey] = Counter()
for packet in packets:
if IP not in packet:
continue
ip = packet[IP]
if TCP in packet:
proto = "TCP"
sport, dport = packet[TCP].sport, packet[TCP].dport
elif UDP in packet:
proto = "UDP"
sport, dport = packet[UDP].sport, packet[UDP].dport
else:
continue
key = (ip.src, sport, ip.dst, dport, proto)
flow_counts[key] += 1
rows = []
for (src, sport, dst, dport, proto), count in flow_counts.items():
rows.append({
"src": src,
"sport": sport,
"dst": dst,
"dport": dport,
"proto": proto,
"packets": count,
})
df = pd.DataFrame(rows)
return df.sort_values("packets", ascending=False)
def main() -> None:
df = pcap_to_flows("capture.pcap")
df.to_csv("flow_summary.csv", index=False)
print(df.head(10))
if __name__ == "__main__":
main()
Charting Top Conversations
#!/usr/bin/env python3
"""
Create a bar chart of top TCP destinations.
"""
from __future__ import annotations
import pandas as pd
import matplotlib.pyplot as plt
def chart_top_destinations(path: str) -> None:
"""
Chart top destination ports from flow summary.
"""
df = pd.read_csv(path)
top = df.groupby("dport")["packets"].sum().sort_values(ascending=False).head(10)
top.plot(kind="bar")
plt.title("Top Destination Ports")
plt.xlabel("Destination Port")
plt.ylabel("Packet Count")
plt.tight_layout()
plt.savefig("top_ports.png")
print("[*] Saved chart to top_ports.png")
if __name__ == "__main__":
chart_top_destinations("flow_summary.csv")
Section 11: ICMP and Traceroute
ICMP Echo Sweep
#!/usr/bin/env python3
"""
ICMP ping sweep for lab discovery (authorized only).
"""
from __future__ import annotations
import ipaddress
from typing import List
from scapy.all import IP, ICMP, sr1
def ping_host(ip: str, timeout: float = 1.0) -> bool:
"""
Return True if host replies to ICMP.
"""
packet = IP(dst=ip) / ICMP()
response = sr1(packet, timeout=timeout, verbose=False)
return response is not None
def sweep(network: str) -> List[str]:
"""
Sweep a subnet for live hosts.
"""
live = []
for host in ipaddress.ip_network(network).hosts():
ip = str(host)
if ping_host(ip):
live.append(ip)
print(f"[+] Alive: {ip}")
return live
if __name__ == "__main__":
sweep("192.168.56.0/24")
Traceroute with Increasing TTL
#!/usr/bin/env python3
"""
Simple traceroute implementation using TTL.
"""
from __future__ import annotations
from scapy.all import IP, ICMP, sr1
def traceroute(target: str, max_hops: int = 20) -> None:
"""
Print hop-by-hop route.
"""
for ttl in range(1, max_hops + 1):
packet = IP(dst=target, ttl=ttl) / ICMP()
response = sr1(packet, timeout=1, verbose=False)
if response is None:
print(f\"{ttl}: *\")
continue
print(f\"{ttl}: {response.src}\")
if response.src == target:
break
if __name__ == "__main__":
traceroute(\"8.8.8.8\")
Section 12: Checksums and Integrity
Why Checksums Matter
Checksums detect accidental corruption and tampering. Packet analyzers can verify or recompute checksums when crafting packets.
Checksum Verification Example
#!/usr/bin/env python3
"""
Verify IP and TCP checksums in captured packets.
"""
from __future__ import annotations
from scapy.all import rdpcap, IP, TCP
def verify_checksums(path: str) -> None:
"""
Compare stored checksums with recomputed values.
"""
packets = rdpcap(path)
for packet in packets:
if IP in packet and TCP in packet:
ip = packet[IP]
tcp = packet[TCP]
original_ip = ip.chksum
original_tcp = tcp.chksum
del ip.chksum
del tcp.chksum
rebuilt = IP(bytes(ip))
rebuilt_tcp = TCP(bytes(tcp))
if rebuilt.chksum != original_ip or rebuilt_tcp.chksum != original_tcp:
print(f\"[!] Checksum mismatch: {ip.src} -> {ip.dst}\")
if __name__ == "__main__":
verify_checksums(\"capture.pcap\")
Section 13: IDS-Style Signatures
Simple Payload Signature
#!/usr/bin/env python3
"""
Detect payload patterns in network traffic.
"""
from __future__ import annotations
from scapy.all import sniff, Raw
SIGNATURES = [
b\"/etc/passwd\",
b\"union select\",
b\"powershell -enc\",
]
def handle_payload(packet) -> None:
if packet.haslayer(Raw):
payload = packet[Raw].load.lower()
for sig in SIGNATURES:
if sig in payload:
print(f\"[ALERT] Signature match: {sig.decode(errors='ignore')}\")
break
if __name__ == "__main__":
sniff(filter=\"tcp\", prn=handle_payload, store=False)
Basic Signature Config
# signatures.txt
union select
../../../../
cmd.exe /c
powershell -enc
curl http
Section 14: tcpdump Integration
Launching tcpdump from Python
#!/usr/bin/env python3
"""
Run tcpdump and parse output lines.
"""
from __future__ import annotations
import subprocess
def run_tcpdump() -> None:
"""
Run tcpdump for 10 packets and print output.
"""
command = ["tcpdump", "-n", "-c", "10", "tcp"]
result = subprocess.run(command, capture_output=True, text=True, check=False)
print(result.stdout)
if __name__ == "__main__":
run_tcpdump()
BPF Cheat Sheet
# Host or network
host 10.0.0.5
net 192.168.1.0/24
# Protocols
tcp
udp
icmp
# Ports
port 22
portrange 1-1024
# Combinations
tcp and port 443
udp and port 53
src host 10.0.0.5 and dst port 80
Section 15: Stream Reconstruction
Reassemble TCP Streams (Simple)
#!/usr/bin/env python3
"""
Reconstruct TCP payload streams from a PCAP.
"""
from __future__ import annotations
from collections import defaultdict
from typing import Dict, Tuple
from scapy.all import rdpcap, IP, TCP, Raw
FlowKey = Tuple[str, int, str, int]
def reconstruct_streams(path: str) -> Dict[FlowKey, bytes]:
"""
Reassemble payloads in order of capture (simplified).
"""
packets = rdpcap(path)
streams: Dict[FlowKey, bytes] = defaultdict(bytes)
for packet in packets:
if IP in packet and TCP in packet and Raw in packet:
ip = packet[IP]
tcp = packet[TCP]
flow = (ip.src, tcp.sport, ip.dst, tcp.dport)
streams[flow] += packet[Raw].load
return streams
def save_streams(streams: Dict[FlowKey, bytes]) -> None:
"""
Save reconstructed streams to files.
"""
for idx, (flow, data) in enumerate(streams.items(), start=1):
src, sport, dst, dport = flow
filename = f\"stream_{idx}_{src}_{sport}_to_{dst}_{dport}.bin\"
with open(filename, \"wb\") as handle:
handle.write(data)
print(f\"[*] Wrote {filename} ({len(data)} bytes)\")
if __name__ == \"__main__\":
streams = reconstruct_streams(\"capture.pcap\")
save_streams(streams)
Extract HTTP Payloads from Streams
#!/usr/bin/env python3
"""
Search reconstructed streams for HTTP headers.
"""
from __future__ import annotations
from typing import Dict, Tuple
def find_http(streams: Dict[Tuple[str, int, str, int], bytes]) -> None:
"""
Print HTTP headers found in reconstructed streams.
"""
for flow, data in streams.items():
if b"HTTP/" in data or data.startswith(b"GET") or data.startswith(b"POST"):
try:
header = data.split(b"\r\n\r\n", 1)[0]
print(f"[HTTP] {flow}\n{header.decode(errors='ignore')}\n")
except Exception:
continue
if __name__ == "__main__":
example_streams = {
("10.0.0.5", 51515, "93.184.216.34", 80): b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
}
find_http(example_streams)
Wireshark Filter Equivalents
# TCP Reset packets
tcp.flags.reset == 1
# DNS queries
dns.flags.response == 0
# HTTP requests
http.request
# TLS ClientHello with SNI
ssl.handshake.extensions_server_name
Section 16: Port Scan Detection
Detecting Fast Scans
Rapid sequential port attempts from a single IP are a common scanning indicator. Track unique destination ports per source.
Scan Detector (Rolling Window)
#!/usr/bin/env python3
"""
Detect potential port scans based on unique ports per source IP.
"""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, Set
from scapy.all import sniff, IP, TCP
WINDOW = timedelta(seconds=30)
THRESHOLD = 15
seen_ports: Dict[str, Dict[str, Set[int]]] = defaultdict(lambda: {"ports": set(), "first_seen": None})
def handle_packet(packet) -> None:
if IP not in packet or TCP not in packet:
return
ip = packet[IP].src
dport = packet[TCP].dport
entry = seen_ports[ip]
if entry["first_seen"] is None:
entry["first_seen"] = datetime.utcnow()
entry["ports"].add(dport)
if datetime.utcnow() - entry["first_seen"] > WINDOW:
entry["ports"].clear()
entry["first_seen"] = datetime.utcnow()
if len(entry["ports"]) >= THRESHOLD:
print(f"[ALERT] Port scan detected from {ip} ({len(entry['ports'])} ports)")
if __name__ == "__main__":
sniff(filter="tcp", prn=handle_packet, store=False)
Section 17: RTT and Latency Measurement
Measuring RTT with ICMP
#!/usr/bin/env python3
"""
Measure ICMP round-trip times and compute statistics.
"""
from __future__ import annotations
import statistics
import time
from typing import List
from scapy.all import IP, ICMP, sr1
def ping_rtt(host: str, count: int = 5) -> List[float]:
"""
Return RTT measurements in milliseconds.
"""
rtts = []
for _ in range(count):
start = time.time()
reply = sr1(IP(dst=host) / ICMP(), timeout=1, verbose=False)
if reply:
rtts.append((time.time() - start) * 1000)
return rtts
def summarize_rtt(rtts: List[float]) -> None:
"""
Print RTT statistics.
"""
if not rtts:
print("[!] No replies")
return
print(f"Min: {min(rtts):.2f} ms")
print(f"Max: {max(rtts):.2f} ms")
print(f"Avg: {statistics.mean(rtts):.2f} ms")
if __name__ == "__main__":
results = ping_rtt("8.8.8.8")
summarize_rtt(results)
Sample RTT Output
Min: 11.24 ms
Max: 18.93 ms
Avg: 14.65 ms
Lab 09: Packet Analysis Toolkit (90-150 minutes)
Lab Part 1: Custom Packet Sniffer (25-35 min)
Objective: Build a sniffer that filters and logs traffic to CSV.
Requirements:
- Support BPF filters via CLI args
- Extract src/dst IP, ports, protocol, flags
- Write to
sniff_log.csv
Success Criteria: CSV updates in real time while traffic flows.
Hint: CSV logging
with open("sniff_log.csv", "a", encoding="utf-8") as handle:
handle.write("timestamp,src,dst,proto,info\n")
handle.flush()
Lab Part 2: ARP Scanner (20-30 min)
Objective: Discover live hosts in your lab subnet.
Requirements:
- Accept subnet input (e.g., 192.168.56.0/24)
- Return IP and MAC addresses
- Save results to
arp_results.json
Success Criteria: All known lab hosts appear in results.
Hint: JSON output
import json
with open("arp_results.json", "w", encoding="utf-8") as handle:
json.dump(results, handle, indent=2)
Lab Part 3: DNS Analyzer (20-30 min)
Objective: Track DNS queries and responses to detect suspicious domains.
Requirements:
- Capture DNS traffic on UDP 53
- Log query domain and response IPs
- Highlight newly seen domains
Success Criteria: Output includes query/response pairs with timestamps.
Hint: Seen domains set
seen = set()
if query not in seen:
print(f"[NEW] {query}")
seen.add(query)
Lab Part 4: SYN Flood Demo (20-30 min)
Objective: Demonstrate a low-rate SYN flood in a controlled lab.
Requirements:
- Rate-limit to <= 10 packets/second
- Log timestamps and packet counts
- Stop automatically after 60 seconds
Success Criteria: Target host logs show SYN spike without service disruption.
Hint: Rate limiting
start = time.time()
count = 0
while time.time() - start < 60:
send(packet, verbose=False)
count += 1
time.sleep(0.1)
print(f"Sent {count} packets")
Lab Part 5: TCP Session Report (20-25 min)
Objective: Generate a report of top TCP flows.
Requirements:
- Capture traffic for 5 minutes
- Count packets per flow (4-tuple)
- Output top 10 flows by volume
Success Criteria: Report shows the heaviest TCP conversations.
Hint: Sorting flows
top = sorted(flows.items(), key=lambda x: x[1]["packets"], reverse=True)[:10]
for flow, stats in top:
print(flow, stats)
Stretch Challenges (Optional)
- Detect DNS tunneling by flagging unusually long query names
- Create a PCAP-to-JSON converter for alert pipelines
- Compare live traffic to baseline flow summaries from yesterday
Hint: DNS tunneling heuristic
if len(query) > 60 and query.count('.') >= 3:
print(f"[SUSPECT] Long DNS query: {query}")
📤 Deliverables:
packet_sniffer.py- BPF-filtered packet capturearp_scanner.py- ARP-based discovery tooldns_analyzer.py- DNS query/response loggersyn_demo.py- Rate-limited SYN demonstrationtcp_flow_report.py- TCP flow analyzercapture.pcap- Sample packet capture
Additional Resources
Scapy & Packet Analysis
Network Protocol References
- RFC 791 (IP), RFC 793 (TCP), RFC 768 (UDP)
- RFC 826 (ARP), RFC 1034/1035 (DNS)
- Practical Packet Analysis - Chris Sanders
Blue Team Use Cases
- Zeek (formerly Bro) network security monitoring
- Suricata IDS rule development
- Threat hunting with PCAP archives
Key Takeaways
- ✅ Packets reveal precise network behavior and anomalies
- ✅ BPF filters make sniffing efficient and targeted
- ✅ Scapy provides a flexible API for packet crafting
- ✅ DNS analysis helps detect phishing and C2 domains
- ✅ PCAP files enable offline forensic analysis
- ⚠️ Packet manipulation must remain within authorized labs
Week 09 Quiz
Test your understanding of packet analysis with Scapy.
Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.
Take Quiz