Opening Framing: Beyond Local Files
Last week, you learned to read and write local files. But security data doesn't just live on your machine—it's distributed across the internet: threat intelligence APIs, vulnerability databases, WHOIS lookups, IP reputation services, and countless web resources.
This week, you'll learn to fetch data from the web, interact with APIs, and process external data sources. These skills let your scripts tap into the collective knowledge of the security community—checking IOCs against threat feeds, enriching alerts with context, and automating lookups that would take hours manually.
Working with external data introduces new challenges: network failures, rate limits, authentication, and unpredictable response formats. You'll apply the defensive coding skills from Week 8 to handle these gracefully.
Key insight: The most powerful security scripts don't work in isolation—they connect to external intelligence sources that provide context and enrichment.
1) HTTP Requests with the Requests Library
The requests library is Python's standard for HTTP operations.
It handles the complexity of web communication with a simple interface:
import requests
# Basic GET request
response = requests.get("https://httpbin.org/get")
print(response.status_code) # 200
print(response.text) # Response body as string
# Check if request succeeded
if response.status_code == 200:
print("Success!")
elif response.status_code == 404:
print("Not found")
elif response.status_code == 403:
print("Forbidden - check authentication")
Common HTTP Status Codes:
200- OK (success)201- Created (POST success)400- Bad Request (client error)401- Unauthorized (auth required)403- Forbidden (access denied)404- Not Found429- Too Many Requests (rate limited)500- Internal Server Error
# Request with parameters
params = {"ip": "8.8.8.8", "format": "json"}
response = requests.get("https://api.example.com/lookup", params=params)
# URL becomes: https://api.example.com/lookup?ip=8.8.8.8&format=json
# Request with headers (common for APIs)
headers = {
"User-Agent": "SecurityScript/1.0",
"Accept": "application/json"
}
response = requests.get("https://api.example.com/data", headers=headers)
Key insight: HTTP is the foundation of web APIs. Understanding requests, responses, and status codes is essential for working with any external data source.
2) Working with JSON APIs
Most modern APIs return JSON. The requests library makes parsing easy:
import requests
# Get JSON data
response = requests.get("https://httpbin.org/json")
# Parse JSON response
if response.status_code == 200:
data = response.json() # Automatically parses JSON
print(data)
else:
print(f"Error: {response.status_code}")
Security API Example: IP Geolocation
import requests
def get_ip_info(ip_address):
"""Look up IP geolocation information."""
try:
# Free IP geolocation API (no key required)
url = f"http://ip-api.com/json/{ip_address}"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
if data.get("status") == "success":
return {
"ip": ip_address,
"country": data.get("country"),
"city": data.get("city"),
"isp": data.get("isp"),
"org": data.get("org")
}
else:
return {"ip": ip_address, "error": data.get("message")}
else:
return {"ip": ip_address, "error": f"HTTP {response.status_code}"}
except requests.exceptions.Timeout:
return {"ip": ip_address, "error": "Request timed out"}
except requests.exceptions.RequestException as e:
return {"ip": ip_address, "error": str(e)}
# Test
result = get_ip_info("8.8.8.8")
print(result)
Handling API Authentication:
# API Key in header (most common)
headers = {"Authorization": "Bearer YOUR_API_KEY"}
response = requests.get("https://api.example.com/data", headers=headers)
# API Key as parameter
params = {"api_key": "YOUR_API_KEY", "query": "malware"}
response = requests.get("https://api.example.com/search", params=params)
# Basic authentication
response = requests.get(
"https://api.example.com/data",
auth=("username", "password")
)
Key insight: Always use response.json() for JSON APIs—it
handles parsing and raises clear errors if the response isn't valid JSON.
3) Error Handling for Network Operations
Network operations fail in ways file operations don't. Robust scripts handle all failure modes:
import requests
from requests.exceptions import (
RequestException,
ConnectionError,
Timeout,
HTTPError
)
def safe_api_call(url, timeout=10):
"""Make API call with comprehensive error handling."""
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status() # Raises HTTPError for 4xx/5xx
return {"success": True, "data": response.json()}
except ConnectionError:
return {"success": False, "error": "Connection failed - check network"}
except Timeout:
return {"success": False, "error": f"Request timed out after {timeout}s"}
except HTTPError as e:
return {"success": False, "error": f"HTTP error: {e.response.status_code}"}
except ValueError: # JSON decode error
return {"success": False, "error": "Invalid JSON response"}
except RequestException as e:
return {"success": False, "error": f"Request failed: {e}"}
Implementing Retry Logic:
import time
import requests
def api_call_with_retry(url, max_retries=3, backoff=2):
"""Make API call with exponential backoff retry."""
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
# Handle rate limiting
if response.status_code == 429:
wait_time = int(response.headers.get("Retry-After", backoff ** attempt))
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = backoff ** attempt
print(f"Attempt {attempt + 1} failed. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise
return None
Key insight: Network operations need timeouts, retries, and graceful degradation. A script that hangs forever on a network call is worse than one that fails fast.
4) Fetching Web Content
Sometimes you need to fetch and parse web pages, not just APIs:
import requests
# Fetch a web page
response = requests.get("https://example.com")
html_content = response.text
# Check content type
content_type = response.headers.get("Content-Type", "")
print(f"Content-Type: {content_type}")
# Download a file
def download_file(url, local_path):
"""Download file from URL to local path."""
try:
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(local_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return True
except Exception as e:
print(f"Download failed: {e}")
return False
# Download threat intel feed
download_file(
"https://example.com/ioc-feed.txt",
"downloaded_iocs.txt"
)
Parsing HTML (Basic):
# For simple extraction, string methods work
html = "<title>Security Alert</title>"
if "<title>" in html:
start = html.find("<title>") + 7
end = html.find("</title>")
title = html[start:end]
print(title) # "Security Alert"
# For complex HTML, use BeautifulSoup (install: pip install beautifulsoup4)
# from bs4 import BeautifulSoup
# soup = BeautifulSoup(html, "html.parser")
# title = soup.title.string
Security Consideration: URL Validation
from urllib.parse import urlparse
def is_safe_url(url):
"""Basic URL safety check."""
try:
parsed = urlparse(url)
# Must have scheme and netloc
if not parsed.scheme or not parsed.netloc:
return False
# Only allow http/https
if parsed.scheme not in ["http", "https"]:
return False
# Block localhost and private IPs (basic check)
dangerous = ["localhost", "127.0.0.1", "0.0.0.0"]
if parsed.netloc.split(":")[0] in dangerous:
return False
return True
except Exception:
return False
Key insight: Always validate URLs before fetching, especially if they come from user input. SSRF (Server-Side Request Forgery) attacks exploit scripts that fetch arbitrary URLs.
5) Building a Threat Intelligence Client
Let's combine everything into a practical threat intel lookup tool:
import requests
import time
class ThreatIntelClient:
"""Client for querying threat intelligence APIs."""
def __init__(self, api_key=None):
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "ThreatIntelClient/1.0"
})
if api_key:
self.session.headers["Authorization"] = f"Bearer {api_key}"
def _make_request(self, url, params=None):
"""Make request with error handling."""
try:
response = self.session.get(url, params=params, timeout=15)
if response.status_code == 429:
return {"error": "Rate limited", "retry_after":
response.headers.get("Retry-After", 60)}
response.raise_for_status()
return {"success": True, "data": response.json()}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def lookup_ip(self, ip_address):
"""Look up IP reputation."""
# Using free ip-api.com for demo
url = f"http://ip-api.com/json/{ip_address}"
result = self._make_request(url)
if "error" in result:
return result
data = result["data"]
return {
"ip": ip_address,
"country": data.get("country"),
"isp": data.get("isp"),
"org": data.get("org"),
"query_success": data.get("status") == "success"
}
def lookup_domain(self, domain):
"""Look up domain information."""
# Placeholder - would use real API
return {
"domain": domain,
"note": "Implement with real threat intel API"
}
def bulk_lookup(self, iocs, delay=1):
"""Look up multiple IOCs with rate limiting."""
results = []
for i, ioc in enumerate(iocs):
print(f"Looking up {i+1}/{len(iocs)}: {ioc}")
# Detect IOC type and lookup
if self._is_ip(ioc):
result = self.lookup_ip(ioc)
else:
result = self.lookup_domain(ioc)
results.append(result)
# Rate limiting
if i < len(iocs) - 1:
time.sleep(delay)
return results
def _is_ip(self, value):
"""Check if value looks like an IP address."""
parts = value.split(".")
if len(parts) != 4:
return False
return all(p.isdigit() and 0 <= int(p) <= 255 for p in parts)
# Usage example
if __name__ == "__main__":
client = ThreatIntelClient()
# Single lookup
result = client.lookup_ip("8.8.8.8")
print(result)
# Bulk lookup
iocs = ["8.8.8.8", "1.1.1.1", "208.67.222.222"]
results = client.bulk_lookup(iocs)
for r in results:
print(r)
Key insight: Wrapping API interactions in a class provides a clean interface, centralizes error handling, and makes it easy to swap between different threat intel providers.
Real-World Context: External Data in Security Operations
External data enrichment is core to modern security operations:
SIEM Enrichment: When a SIEM generates an alert, analysts need context. Is this IP known-malicious? What country? What organization? Scripts that automatically enrich alerts with threat intel save hours of manual lookups during investigations.
Threat Intelligence Platforms: Services like VirusTotal, AlienVault OTX, and Shodan provide APIs for IOC lookups. Integrating these into your workflows means instant access to community threat intelligence.
Vulnerability Management: The NVD (National Vulnerability Database) provides APIs for CVE lookups. Scripts can automatically check if software versions have known vulnerabilities.
MITRE ATT&CK Reference: Technique T1102 (Web Service) describes how attackers use legitimate web services for C2. Understanding how to interact with web services helps you both detect this technique and build defensive tools.
Key insight: The security community shares intelligence through APIs. Scripts that tap into this collective knowledge are far more powerful than those working in isolation.
Guided Lab: Multi-Source IOC Enrichment
Let's build a tool that enriches IOCs using multiple data sources.