This week dives into packets: how they are structured, captured, crafted, and analyzed. You will use Scapy to build professional-grade sniffers and analyzers in a lab-only environment.
Understand Ethernet, IP, TCP, and UDP headers
Sniff traffic with filters and extract fields
Craft packets for testing and learning
Analyze DNS and TCP sessions
Build safe demos of common attack patterns
⚠️ Ethical Warning: Packet manipulation and interception are illegal without explicit authorization. Perform all labs in isolated test networks you own. Do not run ARP spoofing, SYN floods, or sniffers on public or shared networks.
Real-World Context: SOC analysts and network engineers use packet analysis to validate alerts, troubleshoot outages, and investigate suspicious communications. Tools like tcpdump, Wireshark, and Zeek rely on the same packet-level understanding.
Section 1: Packet Structure Fundamentals
OSI Layers in Practice
When you sniff traffic, you see how layers stack: Ethernet → IP → TCP/UDP → Application data. Scapy exposes these layers as Python objects.
#!/usr/bin/env python3
"""
Demonstrate common layer stacks in Scapy.
"""
from __future__ import annotations
from scapy.all import Ether, IP, UDP, TCP, ICMP, DNS, DNSQR, Raw
def demo_stacks() -> None:
"""
Print summaries of common packet stacks.
"""
dns_packet = Ether() / IP(dst="8.8.8.8") / UDP(dport=53) / DNS(rd=1, qd=DNSQR(qname="example.com"))
http_packet = Ether() / IP(dst="93.184.216.34") / TCP(dport=80, flags="S")
icmp_packet = Ether() / IP(dst="1.1.1.1") / ICMP()
udp_payload = Ether() / IP(dst="192.168.56.10") / UDP(dport=9999) / Raw(load=b"TEST")
print(dns_packet.summary())
print(http_packet.summary())
print(icmp_packet.summary())
print(udp_payload.summary())
if __name__ == "__main__":
demo_stacks()
Section 2: Sniffing & Filtering
Basic Sniffer with Callback
#!/usr/bin/env python3
"""
Capture packets and print key fields.
"""
from __future__ import annotations
from datetime import datetime
from scapy.all import sniff, IP, TCP
def handle_packet(packet) -> None:
"""Callback for each sniffed packet."""
if IP in packet:
ip_layer = packet[IP]
src = ip_layer.src
dst = ip_layer.dst
proto = ip_layer.proto
timestamp = datetime.utcnow().isoformat()
summary = f"{timestamp} IP {src} -> {dst} proto={proto}"
if TCP in packet:
summary += f" TCP dport={packet[TCP].dport} flags={packet[TCP].flags}"
print(summary)
if __name__ == "__main__":
print("[*] Sniffing 10 packets on default interface...")
sniff(count=10, prn=handle_packet)
BPF Filter Examples
# Only HTTP traffic
port 80
# Only DNS queries
udp and port 53
# Only traffic from a specific host
host 192.168.1.10
# TCP SYN packets
tcp[13] & 2 != 0
Filtered Sniffer with BPF
#!/usr/bin/env python3
"""
Sniff only DNS traffic and extract query names.
"""
from __future__ import annotations
from scapy.all import sniff, DNS, DNSQR
def handle_dns(packet) -> None:
"""Print DNS queries."""
if DNS in packet and packet[DNS].qd is not None:
query = packet[DNSQR].qname.decode(errors="ignore")
print(f"DNS Query: {query}")
if __name__ == "__main__":
sniff(filter="udp port 53", prn=handle_dns, store=False)
Timed Capture to PCAP
#!/usr/bin/env python3
"""
Capture packets for a fixed duration and save to PCAP.
"""
from __future__ import annotations
import time
from scapy.all import sniff, wrpcap
def capture_for(seconds: int, bpf: str = "tcp") -> None:
"""
Capture packets for a fixed time window.
"""
print(f"[*] Capturing {seconds}s with filter: {bpf}")
packets = sniff(timeout=seconds, filter=bpf)
ts = int(time.time())
filename = f"capture_{ts}.pcap"
wrpcap(filename, packets)
print(f"[*] Wrote {len(packets)} packets to {filename}")
if __name__ == "__main__":
capture_for(15, bpf="tcp port 80")
Section 3: Crafting Packets Safely
Custom Packet Builder
#!/usr/bin/env python3
"""
Craft and send a custom TCP SYN packet (lab only).
"""
from __future__ import annotations
from scapy.all import IP, TCP, send
def send_syn(target_ip: str, target_port: int, source_port: int = 51515) -> None:
"""
Send a single SYN packet for testing.
"""
packet = IP(dst=target_ip) / TCP(dport=target_port, sport=source_port, flags="S")
send(packet, verbose=False)
print(f"[*] Sent SYN to {target_ip}:{target_port}")
if __name__ == "__main__":
send_syn("192.168.56.10", 22)
Rate-Limited SYN Demo (Local Lab)
#!/usr/bin/env python3
"""
Educational SYN flood demonstration with strict rate limits.
"""
from __future__ import annotations
import time
from scapy.all import IP, TCP, send
def syn_demo(target_ip: str, target_port: int, count: int = 50, delay: float = 0.1) -> None:
"""
Send a low-rate stream of SYN packets for learning.
"""
for i in range(count):
packet = IP(dst=target_ip) / TCP(dport=target_port, sport=40000 + i, flags="S")
send(packet, verbose=False)
time.sleep(delay)
print(f"[*] Completed demo ({count} packets)")
if __name__ == "__main__":
syn_demo("192.168.56.10", 80)
Custom UDP Packet
#!/usr/bin/env python3
"""
Send a UDP packet with payload.
"""
from __future__ import annotations
from scapy.all import IP, UDP, Raw, send
def send_udp(target_ip: str, target_port: int, payload: bytes) -> None:
"""
Send a UDP packet with payload for testing.
"""
packet = IP(dst=target_ip) / UDP(dport=target_port) / Raw(load=payload)
send(packet, verbose=False)
print(f"[*] Sent UDP payload to {target_ip}:{target_port}")
if __name__ == "__main__":
send_udp("192.168.56.10", 9999, b"TEST-UDP-PAYLOAD")
Section 4: Scanning & Discovery with ARP
ARP Scan for Local Network Discovery
#!/usr/bin/env python3
"""
ARP scan to identify live hosts on a local subnet.
"""
from __future__ import annotations
from typing import List, Dict
from scapy.all import ARP, Ether, srp
def arp_scan(network: str) -> List[Dict[str, str]]:
"""
Return list of hosts from ARP scan.
"""
packet = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=network)
answered, _ = srp(packet, timeout=2, verbose=False)
results = []
for _, response in answered:
results.append({"ip": response.psrc, "mac": response.hwsrc})
return results
if __name__ == "__main__":
hosts = arp_scan("192.168.56.0/24")
for host in hosts:
print(f"{host['ip']} - {host['mac']}")
Ethical Focus
ARP scanning can be noisy. Limit scans to lab subnets and notify stakeholders if used in enterprise environments.
Section 5: DNS and TCP Session Analysis
DNS Query/Response Analyzer
#!/usr/bin/env python3
"""
Capture DNS queries and match with responses.
"""
from __future__ import annotations
from collections import defaultdict
from scapy.all import sniff, DNS, DNSQR, DNSRR
pending = defaultdict(list)
def handle_dns(packet) -> None:
"""Track DNS queries and responses."""
if DNS in packet:
dns = packet[DNS]
if dns.qr == 0 and dns.qd is not None:
query = dns[DNSQR].qname.decode(errors="ignore")
pending[dns.id].append(query)
print(f"[Q] {query}")
elif dns.qr == 1 and dns.an is not None:
answers = []
for i in range(dns.ancount):
rr = dns.an[i]
if isinstance(rr, DNSRR):
answers.append(rr.rdata)
queries = pending.pop(dns.id, [])
for query in queries:
print(f"[A] {query} -> {answers}")
if __name__ == "__main__":
sniff(filter="udp port 53", prn=handle_dns, store=False)
TCP Session Tracker
#!/usr/bin/env python3
"""
Track TCP connections by 4-tuple.
"""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime
from scapy.all import sniff, IP, TCP
connections = defaultdict(lambda: {"packets": 0, "start": None})
def handle_tcp(packet) -> None:
"""Count packets per TCP flow."""
if IP in packet and TCP in packet:
ip = packet[IP]
tcp = packet[TCP]
flow = (ip.src, tcp.sport, ip.dst, tcp.dport)
if connections[flow]["start"] is None:
connections[flow]["start"] = datetime.utcnow().isoformat()
connections[flow]["packets"] += 1
if tcp.flags & 1: # FIN
count = connections[flow]["packets"]
print(f"[FIN] Flow {flow} - packets={count}")
if __name__ == "__main__":
sniff(filter="tcp", prn=handle_tcp, store=False)
Section 6: PCAP Capture and Analysis
Saving Packets to PCAP
#!/usr/bin/env python3
"""
Save captured packets to a PCAP file for offline analysis.
"""
from __future__ import annotations
from scapy.all import sniff, wrpcap
if __name__ == "__main__":
packets = sniff(count=50)
wrpcap("capture.pcap", packets)
print("[*] Saved 50 packets to capture.pcap")
Reading PCAP Files
#!/usr/bin/env python3
"""
Read a PCAP and summarize protocols.
"""
from __future__ import annotations
from collections import Counter
from scapy.all import rdpcap, IP, TCP, UDP
def summarize_pcap(path: str) -> None:
"""Summarize packets by protocol."""
packets = rdpcap(path)
proto_counts = Counter()
for packet in packets:
if IP in packet:
proto = packet[IP].proto
proto_counts[proto] += 1
print("Protocol counts:")
for proto, count in proto_counts.items():
print(f" - {proto}: {count}")
if __name__ == "__main__":
summarize_pcap("capture.pcap")
Identify top talkers (src IP) and top destinations
Confirm expected ports and protocols are present
Look for repeated failed handshakes or resets
Export suspicious flows for deeper inspection
Section 7: TCP Handshake Analysis
Detecting Three-Way Handshakes
Tracking SYN, SYN-ACK, and ACK packets helps verify connection establishment and detect half-open scans.
Handshake Tracker
#!/usr/bin/env python3
"""
Track TCP handshakes and identify incomplete connections.
"""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, Tuple
from scapy.all import sniff, IP, TCP
FlowKey = Tuple[str, int, str, int]
handshakes: Dict[FlowKey, Dict[str, bool]] = defaultdict(lambda: {"syn": False, "synack": False, "ack": False})
timestamps: Dict[FlowKey, datetime] = {}
def handle_handshake(packet) -> None:
if IP not in packet or TCP not in packet:
return
ip = packet[IP]
tcp = packet[TCP]
flow = (ip.src, tcp.sport, ip.dst, tcp.dport)
flags = tcp.flags
if flags & 0x02: # SYN
handshakes[flow]["syn"] = True
timestamps[flow] = datetime.utcnow()
elif flags & 0x12: # SYN-ACK
reverse = (ip.dst, tcp.dport, ip.src, tcp.sport)
handshakes[reverse]["synack"] = True
elif flags & 0x10: # ACK
handshakes[flow]["ack"] = True
status = handshakes[flow]
if status["syn"] and status["synack"] and status["ack"]:
print(f"[✓] Complete handshake: {flow}")
def detect_incomplete(timeout_sec: int = 10) -> None:
"""
Find SYNs that did not complete.
"""
now = datetime.utcnow()
for flow, started in list(timestamps.items()):
if now - started > timedelta(seconds=timeout_sec):
status = handshakes[flow]
if status["syn"] and not status["ack"]:
print(f"[!] Incomplete handshake: {flow} status={status}")
timestamps.pop(flow, None)
def main() -> None:
"""
Run live handshake tracker.
"""
while True:
sniff(count=20, prn=handle_handshake, store=False)
detect_incomplete()
if __name__ == "__main__":
main()
Section 8: ARP Spoofing Detection
Detecting MAC Address Changes
ARP spoofing often involves rapid MAC changes for the same IP. Track ARP replies and alert when a mapping changes.
ARP Monitor
#!/usr/bin/env python3
"""
Monitor ARP traffic and detect potential spoofing.
"""
from __future__ import annotations
from typing import Dict
from scapy.all import sniff, ARP
arp_table: Dict[str, str] = {}
def handle_arp(packet) -> None:
if ARP in packet and packet[ARP].op == 2: # is-at (reply)
ip = packet[ARP].psrc
mac = packet[ARP].hwsrc
if ip in arp_table and arp_table[ip] != mac:
print(f"[!] ARP change detected: {ip} {arp_table[ip]} -> {mac}")
arp_table[ip] = mac
if __name__ == "__main__":
print("[*] Monitoring ARP traffic...")
sniff(filter="arp", prn=handle_arp, store=False)
Section 9: HTTP and TLS Metadata Extraction
HTTP Request Extractor (Plaintext Only)
#!/usr/bin/env python3
"""
Extract HTTP request lines from plaintext traffic.
"""
from __future__ import annotations
from scapy.all import sniff, TCP, Raw
def handle_http(packet) -> None:
if packet.haslayer(TCP) and packet.haslayer(Raw):
payload = packet[Raw].load
if payload.startswith(b"GET") or payload.startswith(b"POST") or payload.startswith(b"HEAD"):
try:
request_line = payload.split(b"\\r\\n", 1)[0].decode()
print(f\"HTTP: {request_line}\")
except UnicodeDecodeError:
return
if __name__ == "__main__":
sniff(filter="tcp port 80", prn=handle_http, store=False)
TLS SNI Extractor (Metadata Only)
#!/usr/bin/env python3
"""
Extract TLS Server Name Indication (SNI) without decrypting payloads.
"""
from __future__ import annotations
from scapy.all import sniff, TCP, Raw
def parse_sni(payload: bytes) -> str | None:
"""
Simple TLS ClientHello SNI parser.
"""
try:
if payload[0] != 0x16: # TLS Handshake
return None
# Skip record header (5 bytes) and handshake header (4 bytes)
pointer = 5 + 4
# Skip version (2) + random (32) + session ID length (1) + session ID
pointer += 2 + 32
session_len = payload[pointer]
pointer += 1 + session_len
# Cipher suites
cs_len = int.from_bytes(payload[pointer:pointer+2], "big")
pointer += 2 + cs_len
# Compression methods
comp_len = payload[pointer]
pointer += 1 + comp_len
# Extensions length
ext_len = int.from_bytes(payload[pointer:pointer+2], "big")
pointer += 2
end = pointer + ext_len
while pointer + 4 <= end:
ext_type = int.from_bytes(payload[pointer:pointer+2], "big")
ext_size = int.from_bytes(payload[pointer+2:pointer+4], "big")
pointer += 4
if ext_type == 0x00: # SNI
# Skip list length (2) and name type (1)
pointer += 2 + 1
name_len = int.from_bytes(payload[pointer:pointer+2], "big")
pointer += 2
return payload[pointer:pointer+name_len].decode()
pointer += ext_size
except (IndexError, UnicodeDecodeError):
return None
return None
def handle_tls(packet) -> None:
if packet.haslayer(TCP) and packet.haslayer(Raw):
sni = parse_sni(packet[Raw].load)
if sni:
print(f\"TLS SNI: {sni}\")
if __name__ == "__main__":
sniff(filter="tcp port 443", prn=handle_tls, store=False)
Section 10: Traffic Profiling with Pandas
Flow Summary Report
#!/usr/bin/env python3
"""
Convert PCAP to a flow summary and output CSV.
"""
from __future__ import annotations
from collections import Counter
from typing import Dict, Tuple
import pandas as pd
from scapy.all import rdpcap, IP, TCP, UDP
FlowKey = Tuple[str, int, str, int, str]
def pcap_to_flows(path: str) -> pd.DataFrame:
"""
Aggregate packets into 5-tuple flows.
"""
packets = rdpcap(path)
flow_counts: Counter[FlowKey] = Counter()
for packet in packets:
if IP not in packet:
continue
ip = packet[IP]
if TCP in packet:
proto = "TCP"
sport, dport = packet[TCP].sport, packet[TCP].dport
elif UDP in packet:
proto = "UDP"
sport, dport = packet[UDP].sport, packet[UDP].dport
else:
continue
key = (ip.src, sport, ip.dst, dport, proto)
flow_counts[key] += 1
rows = []
for (src, sport, dst, dport, proto), count in flow_counts.items():
rows.append({
"src": src,
"sport": sport,
"dst": dst,
"dport": dport,
"proto": proto,
"packets": count,
})
df = pd.DataFrame(rows)
return df.sort_values("packets", ascending=False)
def main() -> None:
df = pcap_to_flows("capture.pcap")
df.to_csv("flow_summary.csv", index=False)
print(df.head(10))
if __name__ == "__main__":
main()
Charting Top Conversations
#!/usr/bin/env python3
"""
Create a bar chart of top TCP destinations.
"""
from __future__ import annotations
import pandas as pd
import matplotlib.pyplot as plt
def chart_top_destinations(path: str) -> None:
"""
Chart top destination ports from flow summary.
"""
df = pd.read_csv(path)
top = df.groupby("dport")["packets"].sum().sort_values(ascending=False).head(10)
top.plot(kind="bar")
plt.title("Top Destination Ports")
plt.xlabel("Destination Port")
plt.ylabel("Packet Count")
plt.tight_layout()
plt.savefig("top_ports.png")
print("[*] Saved chart to top_ports.png")
if __name__ == "__main__":
chart_top_destinations("flow_summary.csv")
Section 11: ICMP and Traceroute
ICMP Echo Sweep
#!/usr/bin/env python3
"""
ICMP ping sweep for lab discovery (authorized only).
"""
from __future__ import annotations
import ipaddress
from typing import List
from scapy.all import IP, ICMP, sr1
def ping_host(ip: str, timeout: float = 1.0) -> bool:
"""
Return True if host replies to ICMP.
"""
packet = IP(dst=ip) / ICMP()
response = sr1(packet, timeout=timeout, verbose=False)
return response is not None
def sweep(network: str) -> List[str]:
"""
Sweep a subnet for live hosts.
"""
live = []
for host in ipaddress.ip_network(network).hosts():
ip = str(host)
if ping_host(ip):
live.append(ip)
print(f"[+] Alive: {ip}")
return live
if __name__ == "__main__":
sweep("192.168.56.0/24")
Traceroute with Increasing TTL
#!/usr/bin/env python3
"""
Simple traceroute implementation using TTL.
"""
from __future__ import annotations
from scapy.all import IP, ICMP, sr1
def traceroute(target: str, max_hops: int = 20) -> None:
"""
Print hop-by-hop route.
"""
for ttl in range(1, max_hops + 1):
packet = IP(dst=target, ttl=ttl) / ICMP()
response = sr1(packet, timeout=1, verbose=False)
if response is None:
print(f\"{ttl}: *\")
continue
print(f\"{ttl}: {response.src}\")
if response.src == target:
break
if __name__ == "__main__":
traceroute(\"8.8.8.8\")
Section 12: Checksums and Integrity
Why Checksums Matter
Checksums detect accidental corruption and tampering. Packet analyzers can verify or recompute checksums when crafting packets.
Checksum Verification Example
#!/usr/bin/env python3
"""
Verify IP and TCP checksums in captured packets.
"""
from __future__ import annotations
from scapy.all import rdpcap, IP, TCP
def verify_checksums(path: str) -> None:
"""
Compare stored checksums with recomputed values.
"""
packets = rdpcap(path)
for packet in packets:
if IP in packet and TCP in packet:
ip = packet[IP]
tcp = packet[TCP]
original_ip = ip.chksum
original_tcp = tcp.chksum
del ip.chksum
del tcp.chksum
rebuilt = IP(bytes(ip))
rebuilt_tcp = TCP(bytes(tcp))
if rebuilt.chksum != original_ip or rebuilt_tcp.chksum != original_tcp:
print(f\"[!] Checksum mismatch: {ip.src} -> {ip.dst}\")
if __name__ == "__main__":
verify_checksums(\"capture.pcap\")
Section 13: IDS-Style Signatures
Simple Payload Signature
#!/usr/bin/env python3
"""
Detect payload patterns in network traffic.
"""
from __future__ import annotations
from scapy.all import sniff, Raw
SIGNATURES = [
b\"/etc/passwd\",
b\"union select\",
b\"powershell -enc\",
]
def handle_payload(packet) -> None:
if packet.haslayer(Raw):
payload = packet[Raw].load.lower()
for sig in SIGNATURES:
if sig in payload:
print(f\"[ALERT] Signature match: {sig.decode(errors='ignore')}\")
break
if __name__ == "__main__":
sniff(filter=\"tcp\", prn=handle_payload, store=False)
#!/usr/bin/env python3
"""
Run tcpdump and parse output lines.
"""
from __future__ import annotations
import subprocess
def run_tcpdump() -> None:
"""
Run tcpdump for 10 packets and print output.
"""
command = ["tcpdump", "-n", "-c", "10", "tcp"]
result = subprocess.run(command, capture_output=True, text=True, check=False)
print(result.stdout)
if __name__ == "__main__":
run_tcpdump()
BPF Cheat Sheet
# Host or network
host 10.0.0.5
net 192.168.1.0/24
# Protocols
tcp
udp
icmp
# Ports
port 22
portrange 1-1024
# Combinations
tcp and port 443
udp and port 53
src host 10.0.0.5 and dst port 80
Section 15: Stream Reconstruction
Reassemble TCP Streams (Simple)
#!/usr/bin/env python3
"""
Reconstruct TCP payload streams from a PCAP.
"""
from __future__ import annotations
from collections import defaultdict
from typing import Dict, Tuple
from scapy.all import rdpcap, IP, TCP, Raw
FlowKey = Tuple[str, int, str, int]
def reconstruct_streams(path: str) -> Dict[FlowKey, bytes]:
"""
Reassemble payloads in order of capture (simplified).
"""
packets = rdpcap(path)
streams: Dict[FlowKey, bytes] = defaultdict(bytes)
for packet in packets:
if IP in packet and TCP in packet and Raw in packet:
ip = packet[IP]
tcp = packet[TCP]
flow = (ip.src, tcp.sport, ip.dst, tcp.dport)
streams[flow] += packet[Raw].load
return streams
def save_streams(streams: Dict[FlowKey, bytes]) -> None:
"""
Save reconstructed streams to files.
"""
for idx, (flow, data) in enumerate(streams.items(), start=1):
src, sport, dst, dport = flow
filename = f\"stream_{idx}_{src}_{sport}_to_{dst}_{dport}.bin\"
with open(filename, \"wb\") as handle:
handle.write(data)
print(f\"[*] Wrote {filename} ({len(data)} bytes)\")
if __name__ == \"__main__\":
streams = reconstruct_streams(\"capture.pcap\")
save_streams(streams)
Extract HTTP Payloads from Streams
#!/usr/bin/env python3
"""
Search reconstructed streams for HTTP headers.
"""
from __future__ import annotations
from typing import Dict, Tuple
def find_http(streams: Dict[Tuple[str, int, str, int], bytes]) -> None:
"""
Print HTTP headers found in reconstructed streams.
"""
for flow, data in streams.items():
if b"HTTP/" in data or data.startswith(b"GET") or data.startswith(b"POST"):
try:
header = data.split(b"\r\n\r\n", 1)[0]
print(f"[HTTP] {flow}\n{header.decode(errors='ignore')}\n")
except Exception:
continue
if __name__ == "__main__":
example_streams = {
("10.0.0.5", 51515, "93.184.216.34", 80): b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
}
find_http(example_streams)
Rapid sequential port attempts from a single IP are a common scanning indicator. Track unique destination ports per source.
Scan Detector (Rolling Window)
#!/usr/bin/env python3
"""
Detect potential port scans based on unique ports per source IP.
"""
from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, Set
from scapy.all import sniff, IP, TCP
WINDOW = timedelta(seconds=30)
THRESHOLD = 15
seen_ports: Dict[str, Dict[str, Set[int]]] = defaultdict(lambda: {"ports": set(), "first_seen": None})
def handle_packet(packet) -> None:
if IP not in packet or TCP not in packet:
return
ip = packet[IP].src
dport = packet[TCP].dport
entry = seen_ports[ip]
if entry["first_seen"] is None:
entry["first_seen"] = datetime.utcnow()
entry["ports"].add(dport)
if datetime.utcnow() - entry["first_seen"] > WINDOW:
entry["ports"].clear()
entry["first_seen"] = datetime.utcnow()
if len(entry["ports"]) >= THRESHOLD:
print(f"[ALERT] Port scan detected from {ip} ({len(entry['ports'])} ports)")
if __name__ == "__main__":
sniff(filter="tcp", prn=handle_packet, store=False)
Section 17: RTT and Latency Measurement
Measuring RTT with ICMP
#!/usr/bin/env python3
"""
Measure ICMP round-trip times and compute statistics.
"""
from __future__ import annotations
import statistics
import time
from typing import List
from scapy.all import IP, ICMP, sr1
def ping_rtt(host: str, count: int = 5) -> List[float]:
"""
Return RTT measurements in milliseconds.
"""
rtts = []
for _ in range(count):
start = time.time()
reply = sr1(IP(dst=host) / ICMP(), timeout=1, verbose=False)
if reply:
rtts.append((time.time() - start) * 1000)
return rtts
def summarize_rtt(rtts: List[float]) -> None:
"""
Print RTT statistics.
"""
if not rtts:
print("[!] No replies")
return
print(f"Min: {min(rtts):.2f} ms")
print(f"Max: {max(rtts):.2f} ms")
print(f"Avg: {statistics.mean(rtts):.2f} ms")
if __name__ == "__main__":
results = ping_rtt("8.8.8.8")
summarize_rtt(results)
Sample RTT Output
Min: 11.24 ms
Max: 18.93 ms
Avg: 14.65 ms
Lab 09: Packet Analysis Toolkit (90-150 minutes)
Lab Safety: All exercises must be done in a private VM lab or isolated Docker network. Obtain explicit authorization for any network you test.
Lab Part 1: Custom Packet Sniffer (25-35 min)
Objective: Build a sniffer that filters and logs traffic to CSV.
Requirements:
Support BPF filters via CLI args
Extract src/dst IP, ports, protocol, flags
Write to sniff_log.csv
Success Criteria: CSV updates in real time while traffic flows.
Hint: CSV logging
with open("sniff_log.csv", "a", encoding="utf-8") as handle:
handle.write("timestamp,src,dst,proto,info\n")
handle.flush()
Lab Part 2: ARP Scanner (20-30 min)
Objective: Discover live hosts in your lab subnet.
Requirements:
Accept subnet input (e.g., 192.168.56.0/24)
Return IP and MAC addresses
Save results to arp_results.json
Success Criteria: All known lab hosts appear in results.
Hint: JSON output
import json
with open("arp_results.json", "w", encoding="utf-8") as handle:
json.dump(results, handle, indent=2)
Lab Part 3: DNS Analyzer (20-30 min)
Objective: Track DNS queries and responses to detect suspicious domains.
Requirements:
Capture DNS traffic on UDP 53
Log query domain and response IPs
Highlight newly seen domains
Success Criteria: Output includes query/response pairs with timestamps.
Hint: Seen domains set
seen = set()
if query not in seen:
print(f"[NEW] {query}")
seen.add(query)
Lab Part 4: SYN Flood Demo (20-30 min)
Objective: Demonstrate a low-rate SYN flood in a controlled lab.
Requirements:
Rate-limit to <= 10 packets/second
Log timestamps and packet counts
Stop automatically after 60 seconds
Success Criteria: Target host logs show SYN spike without service disruption.