Note: For CPU-bound tasks, use multiprocessing instead of threading. Python's Global Interpreter Lock (GIL) limits threading to I/O-bound operations like network scanning.
Programming for Security
Track your progress through this week's content
Learning Objectives
By the end of this week, you will be able to:
Design and implement functions for security operations
Create reusable Python modules for scanning and analysis
Handle errors gracefully in network operations
Use threading for concurrent port scanning
Build command-line tools with argparse
Document code professionally with docstrings
1) Functions: Building Blocks of Security Tools
Why Functions Matter in Security
Professional security tools aren't monolithic scripts—they're collections of well-designed, reusable functions.
Frameworks like Metasploit, Impacket, and Scapy are built on thousands of small, focused functions.
Function Basics
def check_port(host, port, timeout=2):
"""
Check if a TCP port is open on a target host.
Args:
host (str): Target hostname or IP address
port (int): Port number to check (1-65535)
timeout (float): Connection timeout in seconds
Returns:
bool: True if port is open, False otherwise
Example:
>>> check_port("scanme.nmap.org", 80)
True
"""
import socket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except socket.error:
return False
Function Design Principles
Single Responsibility: Each function does ONE thing well
from typing import List, Dict, Optional, Tuple
def scan_ports(host: str, ports: List[int], timeout: float = 2) -> Dict[int, bool]:
"""
Scan multiple ports on a single host.
Args:
host: Target IP or hostname
ports: List of port numbers to scan
timeout: Seconds to wait per port
Returns:
Dictionary mapping port numbers to open/closed status
Raises:
ValueError: If port numbers are out of range
"""
if not all(1 <= p <= 65535 for p in ports):
raise ValueError("Port numbers must be between 1 and 65535")
results = {}
for port in ports:
results[port] = check_port(host, port, timeout)
return results
def parse_banner(banner_bytes: bytes) -> Optional[str]:
"""
Parse service banner from raw bytes.
Returns:
Decoded banner string, or None if parsing fails
"""
try:
return banner_bytes.decode('utf-8', errors='ignore').strip()
except:
return None
def banner_grab(host: str, port: int, timeout: float = 2, send_data: bytes = b''):
"""Grab service banner with optional data to send."""
import socket
try:
sock = socket.socket()
sock.settimeout(timeout)
sock.connect((host, port))
if send_data:
sock.send(send_data)
banner = sock.recv(1024)
sock.close()
return banner
except:
return None
# Different ways to call
banner1 = banner_grab("example.com", 80) # Use defaults
banner2 = banner_grab("example.com", 80, timeout=5) # Override timeout
banner3 = banner_grab("example.com", 80, send_data=b"HEAD / HTTP/1.0\r\n\r\n") # HTTP request
banner4 = banner_grab(host="example.com", port=80, timeout=3) # Keyword args
*args and **kwargs for Flexible Functions
def log_event(level: str, *messages, **metadata):
"""
Flexible logging function.
Args:
level: Log level (INFO, WARNING, ERROR)
*messages: Variable number of message strings
**metadata: Additional key-value pairs to log
"""
import datetime
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = " ".join(str(m) for m in messages)
log_entry = f"[{timestamp}] {level}: {message}"
if metadata:
meta_str = " | ".join(f"{k}={v}" for k, v in metadata.items())
log_entry += f" | {meta_str}"
print(log_entry)
# Usage
log_event("INFO", "Scan started")
log_event("WARNING", "Port", 22, "timeout", host="192.168.1.1")
log_event("ERROR", "Connection failed", host="10.0.0.5", port=443, error="Timeout")
2) Error Handling: Robust Security Tools
Network operations fail constantly: hosts are down, ports are filtered, DNS doesn't resolve.
Professional security tools handle errors gracefully—they don't crash mid-scan.
Try-Except-Finally Pattern
def safe_connect(host: str, port: int) -> Tuple[bool, str]:
"""
Attempt connection with comprehensive error handling.
Returns:
tuple: (success: bool, message: str)
"""
import socket
sock = None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((host, port))
return (True, f"Connected to {host}:{port}")
except socket.timeout:
return (False, "Connection timed out")
except socket.gaierror:
return (False, f"Could not resolve hostname: {host}")
except ConnectionRefusedError:
return (False, "Connection refused (port closed)")
except PermissionError:
return (False, "Permission denied (requires root for ports < 1024)")
except Exception as e:
return (False, f"Unexpected error: {type(e).__name__}: {e}")
finally:
# This ALWAYS runs, even if return happens in try
if sock:
sock.close()
Raising Exceptions for Invalid Input
def validate_ip_address(ip: str) -> bool:
"""
Validate IPv4 address format.
Raises:
ValueError: If IP format is invalid
"""
import re
pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
if not re.match(pattern, ip):
raise ValueError(f"Invalid IP address format: {ip}")
# Check each octet is 0-255
octets = [int(x) for x in ip.split('.')]
if not all(0 <= octet <= 255 for octet in octets):
raise ValueError(f"IP octets must be 0-255: {ip}")
return True
# Using the function
try:
validate_ip_address("192.168.1.1") # OK
validate_ip_address("999.999.999.999") # Raises ValueError
except ValueError as e:
print(f"Validation error: {e}")
Custom Exceptions for Security Tools
class ScanError(Exception):
"""Base exception for scanning errors."""
pass
class TargetUnreachable(ScanError):
"""Raised when target host is unreachable."""
pass
class InvalidPortRange(ScanError):
"""Raised when port range is invalid."""
pass
def scan_target(host: str, start_port: int, end_port: int):
"""
Scan port range on target.
Raises:
InvalidPortRange: If port range is invalid
TargetUnreachable: If host doesn't respond to ping
"""
if start_port > end_port:
raise InvalidPortRange(f"Start port {start_port} > end port {end_port}")
if not ping_host(host):
raise TargetUnreachable(f"Host {host} is unreachable")
# Scan logic here
pass
# Usage
try:
scan_target("192.168.1.1", 80, 22) # Invalid range
except InvalidPortRange as e:
print(f"Configuration error: {e}")
except TargetUnreachable as e:
print(f"Network error: {e}")
except ScanError as e:
print(f"Scan error: {e}")
Context Managers for Resource Cleanup
import socket
from contextlib import contextmanager
@contextmanager
def secure_socket(host: str, port: int, timeout: float = 2):
"""
Context manager for socket connections.
Guarantees socket is closed even if exception occurs.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect((host, port))
yield sock # Provide socket to 'with' block
finally:
sock.close() # Always close, even on exception
# Usage
try:
with secure_socket("example.com", 80) as sock:
sock.send(b"GET / HTTP/1.0\r\n\r\n")
response = sock.recv(1024)
print(response.decode())
# Socket is automatically closed here
except Exception as e:
print(f"Error: {e}")
3) Modules: Creating Reusable Security Libraries
Why Modules?
Instead of copy-pasting scanner code into every script, create a module you can import.
This is how frameworks like Impacket and Scapy work.
Creating a Module: sectools.py
Create a file sectools.py in your project directory:
#!/usr/bin/env python3
"""
sectools - Security utilities module
Contains common functions for scanning, analysis, and reporting
"""
import socket
from typing import List, Dict, Optional
__version__ = "1.0.0"
__author__ = "Your Name"
# Module-level constants
DEFAULT_TIMEOUT = 2
COMMON_PORTS = [21, 22, 23, 25, 80, 443, 3389, 8080]
def check_port(host: str, port: int, timeout: float = DEFAULT_TIMEOUT) -> bool:
"""Check if TCP port is open."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except:
return False
def banner_grab(host: str, port: int) -> Optional[str]:
"""Grab service banner from port."""
try:
sock = socket.socket()
sock.settimeout(2)
sock.connect((host, port))
banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
sock.close()
return banner
except:
return None
def scan_common_ports(host: str) -> Dict[int, bool]:
"""Quick scan of commonly used ports."""
return {port: check_port(host, port) for port in COMMON_PORTS}
class VulnerabilityScanner:
"""Class-based scanner with state management."""
def __init__(self, target: str, timeout: float = 2):
self.target = target
self.timeout = timeout
self.results = {}
def scan(self, ports: List[int]):
"""Scan specified ports and store results."""
for port in ports:
self.results[port] = check_port(self.target, port, self.timeout)
def get_open_ports(self) -> List[int]:
"""Return list of open ports from last scan."""
return [p for p, is_open in self.results.items() if is_open]
if __name__ == "__main__":
# Test code (only runs when module is executed directly)
print(f"sectools v{__version__}")
print("Testing scanner...")
results = scan_common_ports("scanme.nmap.org")
print(f"Open ports: {[p for p, o in results.items() if o]}")
Using Your Module
In another file (e.g., my_scanner.py):
#!/usr/bin/env python3
import sectools
# Use module functions
if sectools.check_port("192.168.1.1", 80):
print("Web server found")
# Use module constants
ports_to_scan = sectools.COMMON_PORTS
# Use module class
scanner = sectools.VulnerabilityScanner("example.com")
scanner.scan([80, 443, 8080])
print(f"Open ports: {scanner.get_open_ports()}")
# Access module metadata
print(f"Using sectools v{sectools.__version__}")
Import Variations
# Import entire module
import sectools
sectools.check_port("host", 80)
# Import specific functions
from sectools import check_port, banner_grab
check_port("host", 80)
# Import with alias
import sectools as st
st.check_port("host", 80)
# Import all (NOT RECOMMENDED - pollutes namespace)
from sectools import *
check_port("host", 80) # Where did this come from?
Module Organization: Package Structure
# Directory structure for larger projects
security_toolkit/
__init__.py # Makes this a package
scanning/
__init__.py
port_scanner.py
service_detection.py
exploitation/
__init__.py
payload_generator.py
reporting/
__init__.py
html_report.py
json_export.py
# Using the package
from security_toolkit.scanning import port_scanner
from security_toolkit.reporting import html_report
scanner = port_scanner.PortScanner()
report = html_report.generate(scanner.results)
4) Threading: Concurrent Scanning
Sequential scanning is slow: checking 1000 ports at 2 seconds each = 33 minutes.
Threading allows concurrent connections, reducing scan time to seconds.
Basic Threading Example
import threading
import socket
from queue import Queue
# Shared data structure (thread-safe)
results = {}
results_lock = threading.Lock() # Prevents race conditions
def scan_port_threaded(host: str, port: int):
"""Thread worker function to scan a single port."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((host, port))
sock.close()
# Thread-safe result storage
with results_lock:
results[port] = (result == 0)
except:
with results_lock:
results[port] = False
def threaded_scan(host: str, ports: list, num_threads: int = 50):
"""
Scan ports using multiple threads.
Args:
host: Target hostname/IP
ports: List of ports to scan
num_threads: Number of concurrent threads
"""
threads = []
for port in ports:
# Create thread for each port
thread = threading.Thread(target=scan_port_threaded, args=(host, port))
threads.append(thread)
thread.start()
# Limit concurrent threads
if len(threads) >= num_threads:
for t in threads:
t.join() # Wait for threads to complete
threads = []
# Wait for remaining threads
for thread in threads:
thread.join()
return results
# Usage
print("Starting threaded scan...")
import time
start = time.time()
scan_results = threaded_scan("scanme.nmap.org", range(1, 1001), num_threads=100)
open_ports = [p for p, is_open in scan_results.items() if is_open]
elapsed = time.time() - start
print(f"Scanned 1000 ports in {elapsed:.2f} seconds")
print(f"Open ports: {open_ports}")
Producer-Consumer Pattern with Queue
import threading
import queue
import socket
def worker_thread(work_queue: queue.Queue, results_dict: dict, lock: threading.Lock):
"""
Worker thread pulls tasks from queue and processes them.
Terminates when None is received.
"""
while True:
task = work_queue.get()
if task is None: # Poison pill to stop thread
work_queue.task_done()
break
host, port = task
# Scan the port
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
is_open = (sock.connect_ex((host, port)) == 0)
sock.close()
with lock:
if host not in results_dict:
results_dict[host] = []
if is_open:
results_dict[host].append(port)
except:
pass
work_queue.task_done() # Signal task completion
def multi_target_scan(targets: list, ports: list, num_workers: int = 50):
"""
Scan multiple targets concurrently using worker pool.
"""
work_queue = queue.Queue()
results = {}
lock = threading.Lock()
# Start worker threads
workers = []
for _ in range(num_workers):
worker = threading.Thread(target=worker_thread, args=(work_queue, results, lock))
worker.start()
workers.append(worker)
# Add tasks to queue
for target in targets:
for port in ports:
work_queue.put((target, port))
# Wait for all tasks to complete
work_queue.join()
# Stop workers
for _ in range(num_workers):
work_queue.put(None) # Poison pill
for worker in workers:
worker.join()
return results
# Usage
targets = ["scanme.nmap.org", "example.com"]
ports = [21, 22, 80, 443]
results = multi_target_scan(targets, ports, num_workers=20)
for host, open_ports in results.items():
print(f"{host}: {open_ports}")
Threading Best Practices
Use locks when multiple threads access shared data
Limit thread count (50-200 threads for network scanning)
Handle exceptions in worker functions (don't crash threads)
Use Queue for task distribution (thread-safe)
Join threads to wait for completion
Test with small datasets first (avoid overwhelming network)
Note: For CPU-bound tasks, use multiprocessing instead of threading. Python's Global Interpreter Lock (GIL) limits threading to I/O-bound operations like network scanning.
5) Command-Line Tools with argparse
Professional security tools accept command-line arguments: targets, port ranges, output files.
The argparse module makes this easy.
Basic argparse Example
#!/usr/bin/env python3
"""
Professional port scanner with command-line interface
"""
import argparse
import socket
from typing import List
def scan_ports(host: str, ports: List[int]) -> List[int]:
"""Scan ports and return open ones."""
open_ports = []
for port in ports:
try:
sock = socket.socket()
sock.settimeout(1)
if sock.connect_ex((host, port)) == 0:
open_ports.append(port)
sock.close()
except:
pass
return open_ports
def main():
# Create parser
parser = argparse.ArgumentParser(
description="Professional port scanner",
epilog="Example: python3 scanner.py -t scanme.nmap.org -p 80,443"
)
# Add arguments
parser.add_argument("-t", "--target", required=True,
help="Target IP address or hostname")
parser.add_argument("-p", "--ports", default="80,443",
help="Ports to scan (comma-separated or range)")
parser.add_argument("-o", "--output",
help="Output file for results")
parser.add_argument("-v", "--verbose", action="store_true",
help="Enable verbose output")
parser.add_argument("--timeout", type=float, default=2.0,
help="Connection timeout in seconds")
# Parse arguments
args = parser.parse_args()
# Process port specification
if "-" in args.ports:
# Range: "1-100"
start, end = map(int, args.ports.split("-"))
ports = list(range(start, end + 1))
else:
# List: "80,443,8080"
ports = [int(p) for p in args.ports.split(",")]
if args.verbose:
print(f"Scanning {args.target} on {len(ports)} ports...")
# Perform scan
open_ports = scan_ports(args.target, ports)
# Output results
result_text = f"{args.target}: {open_ports}"
print(result_text)
if args.output:
with open(args.output, "w") as f:
f.write(result_text + "\n")
print(f"Results saved to {args.output}")
if __name__ == "__main__":
main()
Use Google-style or NumPy-style docstrings for consistency:
def scan_network(targets: List[str], ports: List[int], timeout: float = 2) -> Dict:
"""
Scan multiple targets for open ports.
This function performs concurrent TCP scans across multiple hosts and ports.
It uses threading for performance and includes comprehensive error handling.
Args:
targets: List of IP addresses or hostnames to scan
ports: List of port numbers (1-65535) to check
timeout: Connection timeout in seconds (default: 2)
Returns:
Dictionary mapping each target to its list of open ports:
{
"192.168.1.1": [80, 443],
"192.168.1.5": [22, 3389]
}
Raises:
ValueError: If port numbers are out of valid range
TypeError: If targets is not iterable
Examples:
>>> scan_network(["192.168.1.1"], [80, 443])
{'192.168.1.1': [80, 443]}
>>> scan_network(["example.com"], [80, 443], timeout=5)
{'example.com': [80]}
Note:
This function requires raw socket access and may need elevated privileges
for scanning ports below 1024 on Unix systems.
See Also:
check_port: Scan a single port
banner_grab: Retrieve service information
"""
# Implementation here
pass
Module-Level Documentation
#!/usr/bin/env python3
"""
Security Toolkit - Network Scanning Module
============================================
This module provides comprehensive network scanning capabilities including:
- Port scanning (TCP/UDP)
- Service detection
- Banner grabbing
- OS fingerprinting
Author: Your Name
Version: 1.0.0
License: MIT
Example Usage:
from security_toolkit import scanner
# Quick scan of common ports
results = scanner.quick_scan("192.168.1.1")
# Full port scan with service detection
results = scanner.full_scan("example.com", ports=range(1, 1025))
Dependencies:
- Python 3.7+
- socket (standard library)
- threading (standard library)
"""
__version__ = "1.0.0"
__author__ = "Your Name"
__license__ = "MIT"
import socket
import threading
# ... rest of module
📝 Lab 3: Building a Professional Scanner Module
Part 1: Create sectools Module (50 minutes)
Objective: Build a reusable security tools library.
Create sectools.py with these functions:
check_port(host, port, timeout) - Single port check
class ScanError(Exception):
"""Base exception"""
pass
class InvalidTarget(ScanError):
"""Invalid hostname/IP"""
pass
class InvalidPortRange(ScanError):
"""Invalid port specification"""
pass
Part 4: Multi-Target Scanner (30 minutes)
Objective: Scan multiple hosts efficiently.
Enhance portscan.py to accept:
Multiple targets: -t 192.168.1.1 192.168.1.5
Target file: --target-file hosts.txt
CIDR notation: -t 192.168.1.0/24 (bonus)
Output format:
Host: 192.168.1.1
Open ports: [80, 443]
80/tcp: Apache/2.4.41
443/tcp: nginx/1.18.0
Host: 192.168.1.5
Open ports: [22, 3306]
22/tcp: OpenSSH 7.6p1
3306/tcp: MySQL 5.7
Summary:
Hosts scanned: 2
Total open ports: 4
Time elapsed: 15.3 seconds
Success Criteria
✅ Created reusable module with proper documentation