Skip to content
CSY105 Week 03 Beginner

Note: For CPU-bound tasks, use multiprocessing instead of threading. Python's Global Interpreter Lock (GIL) limits threading to I/O-bound operations like network scanning.

Programming for Security

Track your progress through this week's content

Python Modularity and Security Shield

Learning Objectives

By the end of this week, you will be able to:

  • Design and implement functions for security operations
  • Create reusable Python modules for scanning and analysis
  • Handle errors gracefully in network operations
  • Use threading for concurrent port scanning
  • Build command-line tools with argparse
  • Document code professionally with docstrings

1) Functions: Building Blocks of Security Tools

Why Functions Matter in Security

Professional security tools aren't monolithic scripts—they're collections of well-designed, reusable functions. Frameworks like Metasploit, Impacket, and Scapy are built on thousands of small, focused functions.

Function Basics

def check_port(host, port, timeout=2):
    """
    Check if a TCP port is open on a target host.

    Args:
        host (str): Target hostname or IP address
        port (int): Port number to check (1-65535)
        timeout (float): Connection timeout in seconds

    Returns:
        bool: True if port is open, False otherwise

    Example:
        >>> check_port("scanme.nmap.org", 80)
        True
    """
    import socket

    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        result = sock.connect_ex((host, port))
        sock.close()
        return result == 0
    except socket.error:
        return False

Function Design Principles

  • Single Responsibility: Each function does ONE thing well
  • Descriptive Names: check_port() not cp()
  • Type Hints: Document expected types (Python 3.5+)
  • Docstrings: Explain purpose, parameters, return values
  • Error Handling: Don't crash on invalid input

Type Hints for Security Functions

from typing import List, Dict, Optional, Tuple

def scan_ports(host: str, ports: List[int], timeout: float = 2) -> Dict[int, bool]:
    """
    Scan multiple ports on a single host.

    Args:
        host: Target IP or hostname
        ports: List of port numbers to scan
        timeout: Seconds to wait per port

    Returns:
        Dictionary mapping port numbers to open/closed status

    Raises:
        ValueError: If port numbers are out of range
    """
    if not all(1 <= p <= 65535 for p in ports):
        raise ValueError("Port numbers must be between 1 and 65535")

    results = {}
    for port in ports:
        results[port] = check_port(host, port, timeout)

    return results

def parse_banner(banner_bytes: bytes) -> Optional[str]:
    """
    Parse service banner from raw bytes.

    Returns:
        Decoded banner string, or None if parsing fails
    """
    try:
        return banner_bytes.decode('utf-8', errors='ignore').strip()
    except:
        return None

Return Values: Tuples for Multiple Results

def analyze_vulnerability(cve: str, cvss: float) -> Tuple[str, str, str]:
    """
    Classify vulnerability severity and recommend action.

    Returns:
        tuple: (severity_level, risk_color, action_required)
    """
    if cvss >= 9.0:
        return ("CRITICAL", "red", "Patch immediately")
    elif cvss >= 7.0:
        return ("HIGH", "orange", "Patch within 7 days")
    elif cvss >= 4.0:
        return ("MEDIUM", "yellow", "Patch within 30 days")
    else:
        return ("LOW", "green", "Scheduled maintenance")

# Using the function
severity, color, action = analyze_vulnerability("CVE-2023-1234", 8.5)
print(f"{severity}: {action}")

Default Arguments and Keyword Arguments

def banner_grab(host: str, port: int, timeout: float = 2, send_data: bytes = b''):
    """Grab service banner with optional data to send."""
    import socket

    try:
        sock = socket.socket()
        sock.settimeout(timeout)
        sock.connect((host, port))

        if send_data:
            sock.send(send_data)

        banner = sock.recv(1024)
        sock.close()
        return banner
    except:
        return None

# Different ways to call
banner1 = banner_grab("example.com", 80)  # Use defaults
banner2 = banner_grab("example.com", 80, timeout=5)  # Override timeout
banner3 = banner_grab("example.com", 80, send_data=b"HEAD / HTTP/1.0\r\n\r\n")  # HTTP request
banner4 = banner_grab(host="example.com", port=80, timeout=3)  # Keyword args

*args and **kwargs for Flexible Functions

def log_event(level: str, *messages, **metadata):
    """
    Flexible logging function.

    Args:
        level: Log level (INFO, WARNING, ERROR)
        *messages: Variable number of message strings
        **metadata: Additional key-value pairs to log
    """
    import datetime

    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    message = " ".join(str(m) for m in messages)

    log_entry = f"[{timestamp}] {level}: {message}"

    if metadata:
        meta_str = " | ".join(f"{k}={v}" for k, v in metadata.items())
        log_entry += f" | {meta_str}"

    print(log_entry)

# Usage
log_event("INFO", "Scan started")
log_event("WARNING", "Port", 22, "timeout", host="192.168.1.1")
log_event("ERROR", "Connection failed", host="10.0.0.5", port=443, error="Timeout")

2) Error Handling: Robust Security Tools

Network operations fail constantly: hosts are down, ports are filtered, DNS doesn't resolve. Professional security tools handle errors gracefully—they don't crash mid-scan.

Try-Except-Finally Pattern

def safe_connect(host: str, port: int) -> Tuple[bool, str]:
    """
    Attempt connection with comprehensive error handling.

    Returns:
        tuple: (success: bool, message: str)
    """
    import socket

    sock = None
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(2)
        sock.connect((host, port))
        return (True, f"Connected to {host}:{port}")

    except socket.timeout:
        return (False, "Connection timed out")

    except socket.gaierror:
        return (False, f"Could not resolve hostname: {host}")

    except ConnectionRefusedError:
        return (False, "Connection refused (port closed)")

    except PermissionError:
        return (False, "Permission denied (requires root for ports < 1024)")

    except Exception as e:
        return (False, f"Unexpected error: {type(e).__name__}: {e}")

    finally:
        # This ALWAYS runs, even if return happens in try
        if sock:
            sock.close()

Raising Exceptions for Invalid Input

def validate_ip_address(ip: str) -> bool:
    """
    Validate IPv4 address format.

    Raises:
        ValueError: If IP format is invalid
    """
    import re

    pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
    if not re.match(pattern, ip):
        raise ValueError(f"Invalid IP address format: {ip}")

    # Check each octet is 0-255
    octets = [int(x) for x in ip.split('.')]
    if not all(0 <= octet <= 255 for octet in octets):
        raise ValueError(f"IP octets must be 0-255: {ip}")

    return True

# Using the function
try:
    validate_ip_address("192.168.1.1")  # OK
    validate_ip_address("999.999.999.999")  # Raises ValueError
except ValueError as e:
    print(f"Validation error: {e}")

Custom Exceptions for Security Tools

class ScanError(Exception):
    """Base exception for scanning errors."""
    pass

class TargetUnreachable(ScanError):
    """Raised when target host is unreachable."""
    pass

class InvalidPortRange(ScanError):
    """Raised when port range is invalid."""
    pass

def scan_target(host: str, start_port: int, end_port: int):
    """
    Scan port range on target.

    Raises:
        InvalidPortRange: If port range is invalid
        TargetUnreachable: If host doesn't respond to ping
    """
    if start_port > end_port:
        raise InvalidPortRange(f"Start port {start_port} > end port {end_port}")

    if not ping_host(host):
        raise TargetUnreachable(f"Host {host} is unreachable")

    # Scan logic here
    pass

# Usage
try:
    scan_target("192.168.1.1", 80, 22)  # Invalid range
except InvalidPortRange as e:
    print(f"Configuration error: {e}")
except TargetUnreachable as e:
    print(f"Network error: {e}")
except ScanError as e:
    print(f"Scan error: {e}")

Context Managers for Resource Cleanup

import socket
from contextlib import contextmanager

@contextmanager
def secure_socket(host: str, port: int, timeout: float = 2):
    """
    Context manager for socket connections.
    Guarantees socket is closed even if exception occurs.
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)

    try:
        sock.connect((host, port))
        yield sock  # Provide socket to 'with' block
    finally:
        sock.close()  # Always close, even on exception

# Usage
try:
    with secure_socket("example.com", 80) as sock:
        sock.send(b"GET / HTTP/1.0\r\n\r\n")
        response = sock.recv(1024)
        print(response.decode())
    # Socket is automatically closed here
except Exception as e:
    print(f"Error: {e}")

3) Modules: Creating Reusable Security Libraries

Why Modules?

Instead of copy-pasting scanner code into every script, create a module you can import. This is how frameworks like Impacket and Scapy work.

Creating a Module: sectools.py

Create a file sectools.py in your project directory:

#!/usr/bin/env python3
"""
sectools - Security utilities module
Contains common functions for scanning, analysis, and reporting
"""

import socket
from typing import List, Dict, Optional

__version__ = "1.0.0"
__author__ = "Your Name"

# Module-level constants
DEFAULT_TIMEOUT = 2
COMMON_PORTS = [21, 22, 23, 25, 80, 443, 3389, 8080]

def check_port(host: str, port: int, timeout: float = DEFAULT_TIMEOUT) -> bool:
    """Check if TCP port is open."""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        result = sock.connect_ex((host, port))
        sock.close()
        return result == 0
    except:
        return False

def banner_grab(host: str, port: int) -> Optional[str]:
    """Grab service banner from port."""
    try:
        sock = socket.socket()
        sock.settimeout(2)
        sock.connect((host, port))
        banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
        sock.close()
        return banner
    except:
        return None

def scan_common_ports(host: str) -> Dict[int, bool]:
    """Quick scan of commonly used ports."""
    return {port: check_port(host, port) for port in COMMON_PORTS}

class VulnerabilityScanner:
    """Class-based scanner with state management."""

    def __init__(self, target: str, timeout: float = 2):
        self.target = target
        self.timeout = timeout
        self.results = {}

    def scan(self, ports: List[int]):
        """Scan specified ports and store results."""
        for port in ports:
            self.results[port] = check_port(self.target, port, self.timeout)

    def get_open_ports(self) -> List[int]:
        """Return list of open ports from last scan."""
        return [p for p, is_open in self.results.items() if is_open]

if __name__ == "__main__":
    # Test code (only runs when module is executed directly)
    print(f"sectools v{__version__}")
    print("Testing scanner...")
    results = scan_common_ports("scanme.nmap.org")
    print(f"Open ports: {[p for p, o in results.items() if o]}")

Using Your Module

In another file (e.g., my_scanner.py):

#!/usr/bin/env python3
import sectools

# Use module functions
if sectools.check_port("192.168.1.1", 80):
    print("Web server found")

# Use module constants
ports_to_scan = sectools.COMMON_PORTS

# Use module class
scanner = sectools.VulnerabilityScanner("example.com")
scanner.scan([80, 443, 8080])
print(f"Open ports: {scanner.get_open_ports()}")

# Access module metadata
print(f"Using sectools v{sectools.__version__}")

Import Variations

# Import entire module
import sectools
sectools.check_port("host", 80)

# Import specific functions
from sectools import check_port, banner_grab
check_port("host", 80)

# Import with alias
import sectools as st
st.check_port("host", 80)

# Import all (NOT RECOMMENDED - pollutes namespace)
from sectools import *
check_port("host", 80)  # Where did this come from?

Module Organization: Package Structure

# Directory structure for larger projects
security_toolkit/
    __init__.py          # Makes this a package
    scanning/
        __init__.py
        port_scanner.py
        service_detection.py
    exploitation/
        __init__.py
        payload_generator.py
    reporting/
        __init__.py
        html_report.py
        json_export.py

# Using the package
from security_toolkit.scanning import port_scanner
from security_toolkit.reporting import html_report

scanner = port_scanner.PortScanner()
report = html_report.generate(scanner.results)

4) Threading: Concurrent Scanning

Sequential scanning is slow: checking 1000 ports at 2 seconds each = 33 minutes. Threading allows concurrent connections, reducing scan time to seconds.

Basic Threading Example

import threading
import socket
from queue import Queue

# Shared data structure (thread-safe)
results = {}
results_lock = threading.Lock()  # Prevents race conditions

def scan_port_threaded(host: str, port: int):
    """Thread worker function to scan a single port."""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((host, port))
        sock.close()

        # Thread-safe result storage
        with results_lock:
            results[port] = (result == 0)

    except:
        with results_lock:
            results[port] = False

def threaded_scan(host: str, ports: list, num_threads: int = 50):
    """
    Scan ports using multiple threads.

    Args:
        host: Target hostname/IP
        ports: List of ports to scan
        num_threads: Number of concurrent threads
    """
    threads = []

    for port in ports:
        # Create thread for each port
        thread = threading.Thread(target=scan_port_threaded, args=(host, port))
        threads.append(thread)
        thread.start()

        # Limit concurrent threads
        if len(threads) >= num_threads:
            for t in threads:
                t.join()  # Wait for threads to complete
            threads = []

    # Wait for remaining threads
    for thread in threads:
        thread.join()

    return results

# Usage
print("Starting threaded scan...")
import time
start = time.time()

scan_results = threaded_scan("scanme.nmap.org", range(1, 1001), num_threads=100)
open_ports = [p for p, is_open in scan_results.items() if is_open]

elapsed = time.time() - start
print(f"Scanned 1000 ports in {elapsed:.2f} seconds")
print(f"Open ports: {open_ports}")

Producer-Consumer Pattern with Queue

import threading
import queue
import socket

def worker_thread(work_queue: queue.Queue, results_dict: dict, lock: threading.Lock):
    """
    Worker thread pulls tasks from queue and processes them.
    Terminates when None is received.
    """
    while True:
        task = work_queue.get()

        if task is None:  # Poison pill to stop thread
            work_queue.task_done()
            break

        host, port = task

        # Scan the port
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)
            is_open = (sock.connect_ex((host, port)) == 0)
            sock.close()

            with lock:
                if host not in results_dict:
                    results_dict[host] = []
                if is_open:
                    results_dict[host].append(port)

        except:
            pass

        work_queue.task_done()  # Signal task completion

def multi_target_scan(targets: list, ports: list, num_workers: int = 50):
    """
    Scan multiple targets concurrently using worker pool.
    """
    work_queue = queue.Queue()
    results = {}
    lock = threading.Lock()

    # Start worker threads
    workers = []
    for _ in range(num_workers):
        worker = threading.Thread(target=worker_thread, args=(work_queue, results, lock))
        worker.start()
        workers.append(worker)

    # Add tasks to queue
    for target in targets:
        for port in ports:
            work_queue.put((target, port))

    # Wait for all tasks to complete
    work_queue.join()

    # Stop workers
    for _ in range(num_workers):
        work_queue.put(None)  # Poison pill

    for worker in workers:
        worker.join()

    return results

# Usage
targets = ["scanme.nmap.org", "example.com"]
ports = [21, 22, 80, 443]

results = multi_target_scan(targets, ports, num_workers=20)
for host, open_ports in results.items():
    print(f"{host}: {open_ports}")

Threading Best Practices

  • Use locks when multiple threads access shared data
  • Limit thread count (50-200 threads for network scanning)
  • Handle exceptions in worker functions (don't crash threads)
  • Use Queue for task distribution (thread-safe)
  • Join threads to wait for completion
  • Test with small datasets first (avoid overwhelming network)

Note: For CPU-bound tasks, use multiprocessing instead of threading. Python's Global Interpreter Lock (GIL) limits threading to I/O-bound operations like network scanning.

5) Command-Line Tools with argparse

Professional security tools accept command-line arguments: targets, port ranges, output files. The argparse module makes this easy.

Basic argparse Example

#!/usr/bin/env python3
"""
Professional port scanner with command-line interface
"""
import argparse
import socket
from typing import List

def scan_ports(host: str, ports: List[int]) -> List[int]:
    """Scan ports and return open ones."""
    open_ports = []
    for port in ports:
        try:
            sock = socket.socket()
            sock.settimeout(1)
            if sock.connect_ex((host, port)) == 0:
                open_ports.append(port)
            sock.close()
        except:
            pass
    return open_ports

def main():
    # Create parser
    parser = argparse.ArgumentParser(
        description="Professional port scanner",
        epilog="Example: python3 scanner.py -t scanme.nmap.org -p 80,443"
    )

    # Add arguments
    parser.add_argument("-t", "--target", required=True,
                        help="Target IP address or hostname")

    parser.add_argument("-p", "--ports", default="80,443",
                        help="Ports to scan (comma-separated or range)")

    parser.add_argument("-o", "--output",
                        help="Output file for results")

    parser.add_argument("-v", "--verbose", action="store_true",
                        help="Enable verbose output")

    parser.add_argument("--timeout", type=float, default=2.0,
                        help="Connection timeout in seconds")

    # Parse arguments
    args = parser.parse_args()

    # Process port specification
    if "-" in args.ports:
        # Range: "1-100"
        start, end = map(int, args.ports.split("-"))
        ports = list(range(start, end + 1))
    else:
        # List: "80,443,8080"
        ports = [int(p) for p in args.ports.split(",")]

    if args.verbose:
        print(f"Scanning {args.target} on {len(ports)} ports...")

    # Perform scan
    open_ports = scan_ports(args.target, ports)

    # Output results
    result_text = f"{args.target}: {open_ports}"
    print(result_text)

    if args.output:
        with open(args.output, "w") as f:
            f.write(result_text + "\n")
        print(f"Results saved to {args.output}")

if __name__ == "__main__":
    main()

Usage examples:

# Basic usage
python3 scanner.py -t scanme.nmap.org -p 80,443

# Port range
python3 scanner.py -t example.com -p 1-1024

# With output file
python3 scanner.py -t 192.168.1.1 -p 21-25 -o results.txt

# Verbose mode
python3 scanner.py -t example.com -p 80,443 -v --timeout 5

# Help message
python3 scanner.py --help

Advanced argparse Features

import argparse

parser = argparse.ArgumentParser(description="Advanced scanner")

# Positional arguments (required, no flag)
parser.add_argument("target", help="Target host")

# Optional arguments with defaults
parser.add_argument("-p", "--ports", default="80,443")

# Boolean flags
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-q", "--quiet", action="store_true")

# Choices (limit to specific values)
parser.add_argument("--protocol", choices=["tcp", "udp"], default="tcp")

# Type conversion
parser.add_argument("--threads", type=int, default=50)
parser.add_argument("--timeout", type=float, default=2.0)

# File arguments
parser.add_argument("-i", "--input-file", type=argparse.FileType('r'))
parser.add_argument("-o", "--output-file", type=argparse.FileType('w'))

# Mutually exclusive groups
group = parser.add_mutually_exclusive_group()
group.add_argument("--fast", action="store_true")
group.add_argument("--thorough", action="store_true")

args = parser.parse_args()

# Access parsed arguments
print(f"Target: {args.target}")
print(f"Threads: {args.threads}")
print(f"Protocol: {args.protocol}")

6) Documentation: Professional Standards

Docstring Formats

Use Google-style or NumPy-style docstrings for consistency:

def scan_network(targets: List[str], ports: List[int], timeout: float = 2) -> Dict:
    """
    Scan multiple targets for open ports.

    This function performs concurrent TCP scans across multiple hosts and ports.
    It uses threading for performance and includes comprehensive error handling.

    Args:
        targets: List of IP addresses or hostnames to scan
        ports: List of port numbers (1-65535) to check
        timeout: Connection timeout in seconds (default: 2)

    Returns:
        Dictionary mapping each target to its list of open ports:
        {
            "192.168.1.1": [80, 443],
            "192.168.1.5": [22, 3389]
        }

    Raises:
        ValueError: If port numbers are out of valid range
        TypeError: If targets is not iterable

    Examples:
        >>> scan_network(["192.168.1.1"], [80, 443])
        {'192.168.1.1': [80, 443]}

        >>> scan_network(["example.com"], [80, 443], timeout=5)
        {'example.com': [80]}

    Note:
        This function requires raw socket access and may need elevated privileges
        for scanning ports below 1024 on Unix systems.

    See Also:
        check_port: Scan a single port
        banner_grab: Retrieve service information
    """
    # Implementation here
    pass

Module-Level Documentation

#!/usr/bin/env python3
"""
Security Toolkit - Network Scanning Module
============================================

This module provides comprehensive network scanning capabilities including:
- Port scanning (TCP/UDP)
- Service detection
- Banner grabbing
- OS fingerprinting

Author: Your Name
Version: 1.0.0
License: MIT

Example Usage:
    from security_toolkit import scanner

    # Quick scan of common ports
    results = scanner.quick_scan("192.168.1.1")

    # Full port scan with service detection
    results = scanner.full_scan("example.com", ports=range(1, 1025))

Dependencies:
    - Python 3.7+
    - socket (standard library)
    - threading (standard library)
"""

__version__ = "1.0.0"
__author__ = "Your Name"
__license__ = "MIT"

import socket
import threading
# ... rest of module

📝 Lab 3: Building a Professional Scanner Module

Part 1: Create sectools Module (50 minutes)

Objective: Build a reusable security tools library.

Create sectools.py with these functions:

  1. check_port(host, port, timeout) - Single port check
  2. banner_grab(host, port) - Get service banner
  3. scan_ports(host, ports, threaded=False) - Multi-port scan
  4. scan_common_ports(host) - Quick common port scan
  5. resolve_hostname(hostname) - DNS lookup with error handling

Requirements:

  • All functions have docstrings (Google format)
  • Type hints on all parameters and returns
  • Comprehensive error handling
  • Threading support in scan_ports()
  • __main__ block with self-tests

Part 2: Command-Line Scanner (40 minutes)

Objective: Build a professional CLI tool using your module.

Create portscan.py that:

  • Imports and uses your sectools module
  • Accepts arguments:
    • -t/--target (required): Target host
    • -p/--ports: Port list or range (default: common)
    • -o/--output: Output file
    • -T/--threads: Thread count (default: 50)
    • -v/--verbose: Verbose output
    • -b/--banner: Grab banners from open ports
  • Displays progress during scan
  • Shows summary (total scanned, open ports, time elapsed)

Example usage:

python3 portscan.py -t scanme.nmap.org -p 1-1000 -T 100 -v
python3 portscan.py -t example.com -p 80,443,8080 -b -o results.txt

Part 3: Exception Handling (30 minutes)

Objective: Add robust error handling.

Enhance your portscan.py to handle:

  • Invalid IP addresses/hostnames
  • Invalid port numbers (< 1 or > 65535)
  • Network timeouts
  • Permission errors (ports < 1024)
  • File write errors

Add custom exceptions:

class ScanError(Exception):
    """Base exception"""
    pass

class InvalidTarget(ScanError):
    """Invalid hostname/IP"""
    pass

class InvalidPortRange(ScanError):
    """Invalid port specification"""
    pass

Part 4: Multi-Target Scanner (30 minutes)

Objective: Scan multiple hosts efficiently.

Enhance portscan.py to accept:

  • Multiple targets: -t 192.168.1.1 192.168.1.5
  • Target file: --target-file hosts.txt
  • CIDR notation: -t 192.168.1.0/24 (bonus)

Output format:

Host: 192.168.1.1
  Open ports: [80, 443]
  80/tcp: Apache/2.4.41
  443/tcp: nginx/1.18.0

Host: 192.168.1.5
  Open ports: [22, 3306]
  22/tcp: OpenSSH 7.6p1
  3306/tcp: MySQL 5.7

Summary:
  Hosts scanned: 2
  Total open ports: 4
  Time elapsed: 15.3 seconds

Success Criteria

  • ✅ Created reusable module with proper documentation
  • ✅ Built CLI tool with argparse
  • ✅ Implemented threading for performance
  • ✅ Added comprehensive error handling
  • ✅ Tool handles edge cases gracefully
  • ✅ Code follows professional standards

📚 Resources & Further Reading

Essential Reading

Code Examples

Advanced Topics

Week 03 Quiz

Test your understanding of functions, modules, and code organization in Python.

Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.

Take Quiz