Skip to content
CSY105 Week 03 Beginner

Note: For CPU-bound tasks, use multiprocessing instead of threading. Python's Global Interpreter Lock (GIL) limits threading to I/O-bound operations like network scanning.

Programming for Security

Track your progress through this week's content

Learning Objectives

By the end of this week, you will be able to:

  • Design and implement functions for security operations
  • Create reusable Python modules for scanning and analysis
  • Handle errors gracefully in network operations
  • Use threading for concurrent port scanning
  • Build command-line tools with argparse
  • Document code professionally with docstrings

1) Functions: Building Blocks of Security Tools

Why Functions Matter in Security

Professional security tools aren't monolithic scripts—they're collections of well-designed, reusable functions. Frameworks like Metasploit, Impacket, and Scapy are built on thousands of small, focused functions.

Function Basics

def check_port(host, port, timeout=2):
    """
    Check if a TCP port is open on a target host.

    Args:
        host (str): Target hostname or IP address
        port (int): Port number to check (1-65535)
        timeout (float): Connection timeout in seconds

    Returns:
        bool: True if port is open, False otherwise

    Example:
        >>> check_port("scanme.nmap.org", 80)
        True
    """
    import socket

    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        result = sock.connect_ex((host, port))
        sock.close()
        return result == 0
    except socket.error:
        return False

Function Design Principles

Type Hints for Security Functions

from typing import List, Dict, Optional, Tuple

def scan_ports(host: str, ports: List[int], timeout: float = 2) -> Dict[int, bool]:
    """
    Scan multiple ports on a single host.

    Args:
        host: Target IP or hostname
        ports: List of port numbers to scan
        timeout: Seconds to wait per port

    Returns:
        Dictionary mapping port numbers to open/closed status

    Raises:
        ValueError: If port numbers are out of range
    """
    if not all(1 <= p <= 65535 for p in ports):
        raise ValueError("Port numbers must be between 1 and 65535")

    results = {}
    for port in ports:
        results[port] = check_port(host, port, timeout)

    return results

def parse_banner(banner_bytes: bytes) -> Optional[str]:
    """
    Parse service banner from raw bytes.

    Returns:
        Decoded banner string, or None if parsing fails
    """
    try:
        return banner_bytes.decode('utf-8', errors='ignore').strip()
    except:
        return None

Return Values: Tuples for Multiple Results

def analyze_vulnerability(cve: str, cvss: float) -> Tuple[str, str, str]:
    """
    Classify vulnerability severity and recommend action.

    Returns:
        tuple: (severity_level, risk_color, action_required)
    """
    if cvss >= 9.0:
        return ("CRITICAL", "red", "Patch immediately")
    elif cvss >= 7.0:
        return ("HIGH", "orange", "Patch within 7 days")
    elif cvss >= 4.0:
        return ("MEDIUM", "yellow", "Patch within 30 days")
    else:
        return ("LOW", "green", "Scheduled maintenance")

# Using the function
severity, color, action = analyze_vulnerability("CVE-2023-1234", 8.5)
print(f"{severity}: {action}")

Default Arguments and Keyword Arguments

def banner_grab(host: str, port: int, timeout: float = 2, send_data: bytes = b''):
    """Grab service banner with optional data to send."""
    import socket

    try:
        sock = socket.socket()
        sock.settimeout(timeout)
        sock.connect((host, port))

        if send_data:
            sock.send(send_data)

        banner = sock.recv(1024)
        sock.close()
        return banner
    except:
        return None

# Different ways to call
banner1 = banner_grab("example.com", 80)  # Use defaults
banner2 = banner_grab("example.com", 80, timeout=5)  # Override timeout
banner3 = banner_grab("example.com", 80, send_data=b"HEAD / HTTP/1.0\r\n\r\n")  # HTTP request
banner4 = banner_grab(host="example.com", port=80, timeout=3)  # Keyword args

*args and **kwargs for Flexible Functions

def log_event(level: str, *messages, **metadata):
    """
    Flexible logging function.

    Args:
        level: Log level (INFO, WARNING, ERROR)
        *messages: Variable number of message strings
        **metadata: Additional key-value pairs to log
    """
    import datetime

    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    message = " ".join(str(m) for m in messages)

    log_entry = f"[{timestamp}] {level}: {message}"

    if metadata:
        meta_str = " | ".join(f"{k}={v}" for k, v in metadata.items())
        log_entry += f" | {meta_str}"

    print(log_entry)

# Usage
log_event("INFO", "Scan started")
log_event("WARNING", "Port", 22, "timeout", host="192.168.1.1")
log_event("ERROR", "Connection failed", host="10.0.0.5", port=443, error="Timeout")

2) Error Handling: Robust Security Tools

Network operations fail constantly: hosts are down, ports are filtered, DNS doesn't resolve. Professional security tools handle errors gracefully—they don't crash mid-scan.

Try-Except-Finally Pattern

def safe_connect(host: str, port: int) -> Tuple[bool, str]:
    """
    Attempt connection with comprehensive error handling.

    Returns:
        tuple: (success: bool, message: str)
    """
    import socket

    sock = None
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(2)
        sock.connect((host, port))
        return (True, f"Connected to {host}:{port}")

    except socket.timeout:
        return (False, "Connection timed out")

    except socket.gaierror:
        return (False, f"Could not resolve hostname: {host}")

    except ConnectionRefusedError:
        return (False, "Connection refused (port closed)")

    except PermissionError:
        return (False, "Permission denied (requires root for ports < 1024)")

    except Exception as e:
        return (False, f"Unexpected error: {type(e).__name__}: {e}")

    finally:
        # This ALWAYS runs, even if return happens in try
        if sock:
            sock.close()

Raising Exceptions for Invalid Input

def validate_ip_address(ip: str) -> bool:
    """
    Validate IPv4 address format.

    Raises:
        ValueError: If IP format is invalid
    """
    import re

    pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
    if not re.match(pattern, ip):
        raise ValueError(f"Invalid IP address format: {ip}")

    # Check each octet is 0-255
    octets = [int(x) for x in ip.split('.')]
    if not all(0 <= octet <= 255 for octet in octets):
        raise ValueError(f"IP octets must be 0-255: {ip}")

    return True

# Using the function
try:
    validate_ip_address("192.168.1.1")  # OK
    validate_ip_address("999.999.999.999")  # Raises ValueError
except ValueError as e:
    print(f"Validation error: {e}")

Custom Exceptions for Security Tools

class ScanError(Exception):
    """Base exception for scanning errors."""
    pass

class TargetUnreachable(ScanError):
    """Raised when target host is unreachable."""
    pass

class InvalidPortRange(ScanError):
    """Raised when port range is invalid."""
    pass

def scan_target(host: str, start_port: int, end_port: int):
    """
    Scan port range on target.

    Raises:
        InvalidPortRange: If port range is invalid
        TargetUnreachable: If host doesn't respond to ping
    """
    if start_port > end_port:
        raise InvalidPortRange(f"Start port {start_port} > end port {end_port}")

    if not ping_host(host):
        raise TargetUnreachable(f"Host {host} is unreachable")

    # Scan logic here
    pass

# Usage
try:
    scan_target("192.168.1.1", 80, 22)  # Invalid range
except InvalidPortRange as e:
    print(f"Configuration error: {e}")
except TargetUnreachable as e:
    print(f"Network error: {e}")
except ScanError as e:
    print(f"Scan error: {e}")

Context Managers for Resource Cleanup

import socket
from contextlib import contextmanager

@contextmanager
def secure_socket(host: str, port: int, timeout: float = 2):
    """
    Context manager for socket connections.
    Guarantees socket is closed even if exception occurs.
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)

    try:
        sock.connect((host, port))
        yield sock  # Provide socket to 'with' block
    finally:
        sock.close()  # Always close, even on exception

# Usage
try:
    with secure_socket("example.com", 80) as sock:
        sock.send(b"GET / HTTP/1.0\r\n\r\n")
        response = sock.recv(1024)
        print(response.decode())
    # Socket is automatically closed here
except Exception as e:
    print(f"Error: {e}")

3) Modules: Creating Reusable Security Libraries

Why Modules?

Instead of copy-pasting scanner code into every script, create a module you can import. This is how frameworks like Impacket and Scapy work.

Creating a Module: sectools.py

Create a file sectools.py in your project directory:

#!/usr/bin/env python3
"""
sectools - Security utilities module
Contains common functions for scanning, analysis, and reporting
"""

import socket
from typing import List, Dict, Optional

__version__ = "1.0.0"
__author__ = "Your Name"

# Module-level constants
DEFAULT_TIMEOUT = 2
COMMON_PORTS = [21, 22, 23, 25, 80, 443, 3389, 8080]

def check_port(host: str, port: int, timeout: float = DEFAULT_TIMEOUT) -> bool:
    """Check if TCP port is open."""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        result = sock.connect_ex((host, port))
        sock.close()
        return result == 0
    except:
        return False

def banner_grab(host: str, port: int) -> Optional[str]:
    """Grab service banner from port."""
    try:
        sock = socket.socket()
        sock.settimeout(2)
        sock.connect((host, port))
        banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
        sock.close()
        return banner
    except:
        return None

def scan_common_ports(host: str) -> Dict[int, bool]:
    """Quick scan of commonly used ports."""
    return {port: check_port(host, port) for port in COMMON_PORTS}

class VulnerabilityScanner:
    """Class-based scanner with state management."""

    def __init__(self, target: str, timeout: float = 2):
        self.target = target
        self.timeout = timeout
        self.results = {}

    def scan(self, ports: List[int]):
        """Scan specified ports and store results."""
        for port in ports:
            self.results[port] = check_port(self.target, port, self.timeout)

    def get_open_ports(self) -> List[int]:
        """Return list of open ports from last scan."""
        return [p for p, is_open in self.results.items() if is_open]

if __name__ == "__main__":
    # Test code (only runs when module is executed directly)
    print(f"sectools v{__version__}")
    print("Testing scanner...")
    results = scan_common_ports("scanme.nmap.org")
    print(f"Open ports: {[p for p, o in results.items() if o]}")

Using Your Module

In another file (e.g., my_scanner.py):

#!/usr/bin/env python3
import sectools

# Use module functions
if sectools.check_port("192.168.1.1", 80):
    print("Web server found")

# Use module constants
ports_to_scan = sectools.COMMON_PORTS

# Use module class
scanner = sectools.VulnerabilityScanner("example.com")
scanner.scan([80, 443, 8080])
print(f"Open ports: {scanner.get_open_ports()}")

# Access module metadata
print(f"Using sectools v{sectools.__version__}")

Import Variations

# Import entire module
import sectools
sectools.check_port("host", 80)

# Import specific functions
from sectools import check_port, banner_grab
check_port("host", 80)

# Import with alias
import sectools as st
st.check_port("host", 80)

# Import all (NOT RECOMMENDED - pollutes namespace)
from sectools import *
check_port("host", 80)  # Where did this come from?

Module Organization: Package Structure

# Directory structure for larger projects
security_toolkit/
    __init__.py          # Makes this a package
    scanning/
        __init__.py
        port_scanner.py
        service_detection.py
    exploitation/
        __init__.py
        payload_generator.py
    reporting/
        __init__.py
        html_report.py
        json_export.py

# Using the package
from security_toolkit.scanning import port_scanner
from security_toolkit.reporting import html_report

scanner = port_scanner.PortScanner()
report = html_report.generate(scanner.results)

4) Threading: Concurrent Scanning

Sequential scanning is slow: checking 1000 ports at 2 seconds each = 33 minutes. Threading allows concurrent connections, reducing scan time to seconds.

Basic Threading Example

import threading
import socket
from queue import Queue

# Shared data structure (thread-safe)
results = {}
results_lock = threading.Lock()  # Prevents race conditions

def scan_port_threaded(host: str, port: int):
    """Thread worker function to scan a single port."""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((host, port))
        sock.close()

        # Thread-safe result storage
        with results_lock:
            results[port] = (result == 0)

    except:
        with results_lock:
            results[port] = False

def threaded_scan(host: str, ports: list, num_threads: int = 50):
    """
    Scan ports using multiple threads.

    Args:
        host: Target hostname/IP
        ports: List of ports to scan
        num_threads: Number of concurrent threads
    """
    threads = []

    for port in ports:
        # Create thread for each port
        thread = threading.Thread(target=scan_port_threaded, args=(host, port))
        threads.append(thread)
        thread.start()

        # Limit concurrent threads
        if len(threads) >= num_threads:
            for t in threads:
                t.join()  # Wait for threads to complete
            threads = []

    # Wait for remaining threads
    for thread in threads:
        thread.join()

    return results

# Usage
print("Starting threaded scan...")
import time
start = time.time()

scan_results = threaded_scan("scanme.nmap.org", range(1, 1001), num_threads=100)
open_ports = [p for p, is_open in scan_results.items() if is_open]

elapsed = time.time() - start
print(f"Scanned 1000 ports in {elapsed:.2f} seconds")
print(f"Open ports: {open_ports}")

Producer-Consumer Pattern with Queue

import threading
import queue
import socket

def worker_thread(work_queue: queue.Queue, results_dict: dict, lock: threading.Lock):
    """
    Worker thread pulls tasks from queue and processes them.
    Terminates when None is received.
    """
    while True:
        task = work_queue.get()

        if task is None:  # Poison pill to stop thread
            work_queue.task_done()
            break

        host, port = task

        # Scan the port
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)
            is_open = (sock.connect_ex((host, port)) == 0)
            sock.close()

            with lock:
                if host not in results_dict:
                    results_dict[host] = []
                if is_open:
                    results_dict[host].append(port)

        except:
            pass

        work_queue.task_done()  # Signal task completion

def multi_target_scan(targets: list, ports: list, num_workers: int = 50):
    """
    Scan multiple targets concurrently using worker pool.
    """
    work_queue = queue.Queue()
    results = {}
    lock = threading.Lock()

    # Start worker threads
    workers = []
    for _ in range(num_workers):
        worker = threading.Thread(target=worker_thread, args=(work_queue, results, lock))
        worker.start()
        workers.append(worker)

    # Add tasks to queue
    for target in targets:
        for port in ports:
            work_queue.put((target, port))

    # Wait for all tasks to complete
    work_queue.join()

    # Stop workers
    for _ in range(num_workers):
        work_queue.put(None)  # Poison pill

    for worker in workers:
        worker.join()

    return results

# Usage
targets = ["scanme.nmap.org", "example.com"]
ports = [21, 22, 80, 443]

results = multi_target_scan(targets, ports, num_workers=20)
for host, open_ports in results.items():
    print(f"{host}: {open_ports}")

Threading Best Practices

Note: For CPU-bound tasks, use multiprocessing instead of threading. Python's Global Interpreter Lock (GIL) limits threading to I/O-bound operations like network scanning.

5) Command-Line Tools with argparse

Professional security tools accept command-line arguments: targets, port ranges, output files. The argparse module makes this easy.

Basic argparse Example

#!/usr/bin/env python3
"""
Professional port scanner with command-line interface
"""
import argparse
import socket
from typing import List

def scan_ports(host: str, ports: List[int]) -> List[int]:
    """Scan ports and return open ones."""
    open_ports = []
    for port in ports:
        try:
            sock = socket.socket()
            sock.settimeout(1)
            if sock.connect_ex((host, port)) == 0:
                open_ports.append(port)
            sock.close()
        except:
            pass
    return open_ports

def main():
    # Create parser
    parser = argparse.ArgumentParser(
        description="Professional port scanner",
        epilog="Example: python3 scanner.py -t scanme.nmap.org -p 80,443"
    )

    # Add arguments
    parser.add_argument("-t", "--target", required=True,
                        help="Target IP address or hostname")

    parser.add_argument("-p", "--ports", default="80,443",
                        help="Ports to scan (comma-separated or range)")

    parser.add_argument("-o", "--output",
                        help="Output file for results")

    parser.add_argument("-v", "--verbose", action="store_true",
                        help="Enable verbose output")

    parser.add_argument("--timeout", type=float, default=2.0,
                        help="Connection timeout in seconds")

    # Parse arguments
    args = parser.parse_args()

    # Process port specification
    if "-" in args.ports:
        # Range: "1-100"
        start, end = map(int, args.ports.split("-"))
        ports = list(range(start, end + 1))
    else:
        # List: "80,443,8080"
        ports = [int(p) for p in args.ports.split(",")]

    if args.verbose:
        print(f"Scanning {args.target} on {len(ports)} ports...")

    # Perform scan
    open_ports = scan_ports(args.target, ports)

    # Output results
    result_text = f"{args.target}: {open_ports}"
    print(result_text)

    if args.output:
        with open(args.output, "w") as f:
            f.write(result_text + "\n")
        print(f"Results saved to {args.output}")

if __name__ == "__main__":
    main()

Usage examples:

# Basic usage
python3 scanner.py -t scanme.nmap.org -p 80,443

# Port range
python3 scanner.py -t example.com -p 1-1024

# With output file
python3 scanner.py -t 192.168.1.1 -p 21-25 -o results.txt

# Verbose mode
python3 scanner.py -t example.com -p 80,443 -v --timeout 5

# Help message
python3 scanner.py --help

Advanced argparse Features

import argparse

parser = argparse.ArgumentParser(description="Advanced scanner")

# Positional arguments (required, no flag)
parser.add_argument("target", help="Target host")

# Optional arguments with defaults
parser.add_argument("-p", "--ports", default="80,443")

# Boolean flags
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-q", "--quiet", action="store_true")

# Choices (limit to specific values)
parser.add_argument("--protocol", choices=["tcp", "udp"], default="tcp")

# Type conversion
parser.add_argument("--threads", type=int, default=50)
parser.add_argument("--timeout", type=float, default=2.0)

# File arguments
parser.add_argument("-i", "--input-file", type=argparse.FileType('r'))
parser.add_argument("-o", "--output-file", type=argparse.FileType('w'))

# Mutually exclusive groups
group = parser.add_mutually_exclusive_group()
group.add_argument("--fast", action="store_true")
group.add_argument("--thorough", action="store_true")

args = parser.parse_args()

# Access parsed arguments
print(f"Target: {args.target}")
print(f"Threads: {args.threads}")
print(f"Protocol: {args.protocol}")

6) Documentation: Professional Standards

Docstring Formats

Use Google-style or NumPy-style docstrings for consistency:

def scan_network(targets: List[str], ports: List[int], timeout: float = 2) -> Dict:
    """
    Scan multiple targets for open ports.

    This function performs concurrent TCP scans across multiple hosts and ports.
    It uses threading for performance and includes comprehensive error handling.

    Args:
        targets: List of IP addresses or hostnames to scan
        ports: List of port numbers (1-65535) to check
        timeout: Connection timeout in seconds (default: 2)

    Returns:
        Dictionary mapping each target to its list of open ports:
        {
            "192.168.1.1": [80, 443],
            "192.168.1.5": [22, 3389]
        }

    Raises:
        ValueError: If port numbers are out of valid range
        TypeError: If targets is not iterable

    Examples:
        >>> scan_network(["192.168.1.1"], [80, 443])
        {'192.168.1.1': [80, 443]}

        >>> scan_network(["example.com"], [80, 443], timeout=5)
        {'example.com': [80]}

    Note:
        This function requires raw socket access and may need elevated privileges
        for scanning ports below 1024 on Unix systems.

    See Also:
        check_port: Scan a single port
        banner_grab: Retrieve service information
    """
    # Implementation here
    pass

Module-Level Documentation

#!/usr/bin/env python3
"""
Security Toolkit - Network Scanning Module
============================================

This module provides comprehensive network scanning capabilities including:
- Port scanning (TCP/UDP)
- Service detection
- Banner grabbing
- OS fingerprinting

Author: Your Name
Version: 1.0.0
License: MIT

Example Usage:
    from security_toolkit import scanner

    # Quick scan of common ports
    results = scanner.quick_scan("192.168.1.1")

    # Full port scan with service detection
    results = scanner.full_scan("example.com", ports=range(1, 1025))

Dependencies:
    - Python 3.7+
    - socket (standard library)
    - threading (standard library)
"""

__version__ = "1.0.0"
__author__ = "Your Name"
__license__ = "MIT"

import socket
import threading
# ... rest of module

📝 Lab 3: Building a Professional Scanner Module

Part 1: Create sectools Module (50 minutes)

Objective: Build a reusable security tools library.

Create sectools.py with these functions:

  1. check_port(host, port, timeout) - Single port check
  2. banner_grab(host, port) - Get service banner
  3. scan_ports(host, ports, threaded=False) - Multi-port scan
  4. scan_common_ports(host) - Quick common port scan
  5. resolve_hostname(hostname) - DNS lookup with error handling

Requirements:

Part 2: Command-Line Scanner (40 minutes)

Objective: Build a professional CLI tool using your module.

Create portscan.py that:

Example usage:

python3 portscan.py -t scanme.nmap.org -p 1-1000 -T 100 -v
python3 portscan.py -t example.com -p 80,443,8080 -b -o results.txt

Part 3: Exception Handling (30 minutes)

Objective: Add robust error handling.

Enhance your portscan.py to handle:

Add custom exceptions:

class ScanError(Exception):
    """Base exception"""
    pass

class InvalidTarget(ScanError):
    """Invalid hostname/IP"""
    pass

class InvalidPortRange(ScanError):
    """Invalid port specification"""
    pass

Part 4: Multi-Target Scanner (30 minutes)

Objective: Scan multiple hosts efficiently.

Enhance portscan.py to accept:

Output format:

Host: 192.168.1.1
  Open ports: [80, 443]
  80/tcp: Apache/2.4.41
  443/tcp: nginx/1.18.0

Host: 192.168.1.5
  Open ports: [22, 3306]
  22/tcp: OpenSSH 7.6p1
  3306/tcp: MySQL 5.7

Summary:
  Hosts scanned: 2
  Total open ports: 4
  Time elapsed: 15.3 seconds

Success Criteria

📚 Resources & Further Reading

Essential Reading

Code Examples

Advanced Topics

Week 03 Quiz

Test your understanding of functions, modules, and code organization in Python.

Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.

Take Quiz