Skip to content
CSY103 Week 09 Beginner

Practice API requests and external data handling before moving to reading resources.

Programming Fundamentals

Track your progress through this week's content

Opening Framing: Beyond Local Files

Last week, you learned to read and write local files. But security data doesn't just live on your machine—it's distributed across the internet: threat intelligence APIs, vulnerability databases, WHOIS lookups, IP reputation services, and countless web resources.

This week, you'll learn to fetch data from the web, interact with APIs, and process external data sources. These skills let your scripts tap into the collective knowledge of the security community—checking IOCs against threat feeds, enriching alerts with context, and automating lookups that would take hours manually.

Working with external data introduces new challenges: network failures, rate limits, authentication, and unpredictable response formats. You'll apply the defensive coding skills from Week 8 to handle these gracefully.

Key insight: The most powerful security scripts don't work in isolation—they connect to external intelligence sources that provide context and enrichment.

1) HTTP Requests with the Requests Library

The requests library is Python's standard for HTTP operations. It handles the complexity of web communication with a simple interface:

import requests

# Basic GET request
response = requests.get("https://httpbin.org/get")
print(response.status_code)  # 200
print(response.text)         # Response body as string

# Check if request succeeded
if response.status_code == 200:
    print("Success!")
elif response.status_code == 404:
    print("Not found")
elif response.status_code == 403:
    print("Forbidden - check authentication")

Common HTTP Status Codes:

  • 200 - OK (success)
  • 201 - Created (POST success)
  • 400 - Bad Request (client error)
  • 401 - Unauthorized (auth required)
  • 403 - Forbidden (access denied)
  • 404 - Not Found
  • 429 - Too Many Requests (rate limited)
  • 500 - Internal Server Error
# Request with parameters
params = {"ip": "8.8.8.8", "format": "json"}
response = requests.get("https://api.example.com/lookup", params=params)
# URL becomes: https://api.example.com/lookup?ip=8.8.8.8&format=json

# Request with headers (common for APIs)
headers = {
    "User-Agent": "SecurityScript/1.0",
    "Accept": "application/json"
}
response = requests.get("https://api.example.com/data", headers=headers)

Key insight: HTTP is the foundation of web APIs. Understanding requests, responses, and status codes is essential for working with any external data source.

2) Working with JSON APIs

Most modern APIs return JSON. The requests library makes parsing easy:

import requests

# Get JSON data
response = requests.get("https://httpbin.org/json")

# Parse JSON response
if response.status_code == 200:
    data = response.json()  # Automatically parses JSON
    print(data)
else:
    print(f"Error: {response.status_code}")

Security API Example: IP Geolocation

import requests

def get_ip_info(ip_address):
    """Look up IP geolocation information."""
    try:
        # Free IP geolocation API (no key required)
        url = f"http://ip-api.com/json/{ip_address}"
        response = requests.get(url, timeout=10)
        
        if response.status_code == 200:
            data = response.json()
            if data.get("status") == "success":
                return {
                    "ip": ip_address,
                    "country": data.get("country"),
                    "city": data.get("city"),
                    "isp": data.get("isp"),
                    "org": data.get("org")
                }
            else:
                return {"ip": ip_address, "error": data.get("message")}
        else:
            return {"ip": ip_address, "error": f"HTTP {response.status_code}"}
    
    except requests.exceptions.Timeout:
        return {"ip": ip_address, "error": "Request timed out"}
    except requests.exceptions.RequestException as e:
        return {"ip": ip_address, "error": str(e)}

# Test
result = get_ip_info("8.8.8.8")
print(result)

Handling API Authentication:

# API Key in header (most common)
headers = {"Authorization": "Bearer YOUR_API_KEY"}
response = requests.get("https://api.example.com/data", headers=headers)

# API Key as parameter
params = {"api_key": "YOUR_API_KEY", "query": "malware"}
response = requests.get("https://api.example.com/search", params=params)

# Basic authentication
response = requests.get(
    "https://api.example.com/data",
    auth=("username", "password")
)

Key insight: Always use response.json() for JSON APIs—it handles parsing and raises clear errors if the response isn't valid JSON.

3) Error Handling for Network Operations

Network operations fail in ways file operations don't. Robust scripts handle all failure modes:

import requests
from requests.exceptions import (
    RequestException,
    ConnectionError,
    Timeout,
    HTTPError
)

def safe_api_call(url, timeout=10):
    """Make API call with comprehensive error handling."""
    try:
        response = requests.get(url, timeout=timeout)
        response.raise_for_status()  # Raises HTTPError for 4xx/5xx
        return {"success": True, "data": response.json()}
    
    except ConnectionError:
        return {"success": False, "error": "Connection failed - check network"}
    
    except Timeout:
        return {"success": False, "error": f"Request timed out after {timeout}s"}
    
    except HTTPError as e:
        return {"success": False, "error": f"HTTP error: {e.response.status_code}"}
    
    except ValueError:  # JSON decode error
        return {"success": False, "error": "Invalid JSON response"}
    
    except RequestException as e:
        return {"success": False, "error": f"Request failed: {e}"}

Implementing Retry Logic:

import time
import requests

def api_call_with_retry(url, max_retries=3, backoff=2):
    """Make API call with exponential backoff retry."""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=10)
            
            # Handle rate limiting
            if response.status_code == 429:
                wait_time = int(response.headers.get("Retry-After", backoff ** attempt))
                print(f"Rate limited. Waiting {wait_time}s...")
                time.sleep(wait_time)
                continue
            
            response.raise_for_status()
            return response.json()
        
        except requests.exceptions.RequestException as e:
            if attempt < max_retries - 1:
                wait_time = backoff ** attempt
                print(f"Attempt {attempt + 1} failed. Retrying in {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise
    
    return None

Key insight: Network operations need timeouts, retries, and graceful degradation. A script that hangs forever on a network call is worse than one that fails fast.

4) Fetching Web Content

Sometimes you need to fetch and parse web pages, not just APIs:

import requests

# Fetch a web page
response = requests.get("https://example.com")
html_content = response.text

# Check content type
content_type = response.headers.get("Content-Type", "")
print(f"Content-Type: {content_type}")

# Download a file
def download_file(url, local_path):
    """Download file from URL to local path."""
    try:
        response = requests.get(url, stream=True, timeout=30)
        response.raise_for_status()
        
        with open(local_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        return True
    except Exception as e:
        print(f"Download failed: {e}")
        return False

# Download threat intel feed
download_file(
    "https://example.com/ioc-feed.txt",
    "downloaded_iocs.txt"
)

Parsing HTML (Basic):

# For simple extraction, string methods work
html = "<title>Security Alert</title>"
if "<title>" in html:
    start = html.find("<title>") + 7
    end = html.find("</title>")
    title = html[start:end]
    print(title)  # "Security Alert"

# For complex HTML, use BeautifulSoup (install: pip install beautifulsoup4)
# from bs4 import BeautifulSoup
# soup = BeautifulSoup(html, "html.parser")
# title = soup.title.string

Security Consideration: URL Validation

from urllib.parse import urlparse

def is_safe_url(url):
    """Basic URL safety check."""
    try:
        parsed = urlparse(url)
        
        # Must have scheme and netloc
        if not parsed.scheme or not parsed.netloc:
            return False
        
        # Only allow http/https
        if parsed.scheme not in ["http", "https"]:
            return False
        
        # Block localhost and private IPs (basic check)
        dangerous = ["localhost", "127.0.0.1", "0.0.0.0"]
        if parsed.netloc.split(":")[0] in dangerous:
            return False
        
        return True
    except Exception:
        return False

Key insight: Always validate URLs before fetching, especially if they come from user input. SSRF (Server-Side Request Forgery) attacks exploit scripts that fetch arbitrary URLs.

5) Building a Threat Intelligence Client

Let's combine everything into a practical threat intel lookup tool:

import requests
import time

class ThreatIntelClient:
    """Client for querying threat intelligence APIs."""
    
    def __init__(self, api_key=None):
        self.api_key = api_key
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": "ThreatIntelClient/1.0"
        })
        if api_key:
            self.session.headers["Authorization"] = f"Bearer {api_key}"
    
    def _make_request(self, url, params=None):
        """Make request with error handling."""
        try:
            response = self.session.get(url, params=params, timeout=15)
            
            if response.status_code == 429:
                return {"error": "Rate limited", "retry_after": 
                        response.headers.get("Retry-After", 60)}
            
            response.raise_for_status()
            return {"success": True, "data": response.json()}
        
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}
    
    def lookup_ip(self, ip_address):
        """Look up IP reputation."""
        # Using free ip-api.com for demo
        url = f"http://ip-api.com/json/{ip_address}"
        result = self._make_request(url)
        
        if "error" in result:
            return result
        
        data = result["data"]
        return {
            "ip": ip_address,
            "country": data.get("country"),
            "isp": data.get("isp"),
            "org": data.get("org"),
            "query_success": data.get("status") == "success"
        }
    
    def lookup_domain(self, domain):
        """Look up domain information."""
        # Placeholder - would use real API
        return {
            "domain": domain,
            "note": "Implement with real threat intel API"
        }
    
    def bulk_lookup(self, iocs, delay=1):
        """Look up multiple IOCs with rate limiting."""
        results = []
        
        for i, ioc in enumerate(iocs):
            print(f"Looking up {i+1}/{len(iocs)}: {ioc}")
            
            # Detect IOC type and lookup
            if self._is_ip(ioc):
                result = self.lookup_ip(ioc)
            else:
                result = self.lookup_domain(ioc)
            
            results.append(result)
            
            # Rate limiting
            if i < len(iocs) - 1:
                time.sleep(delay)
        
        return results
    
    def _is_ip(self, value):
        """Check if value looks like an IP address."""
        parts = value.split(".")
        if len(parts) != 4:
            return False
        return all(p.isdigit() and 0 <= int(p) <= 255 for p in parts)


# Usage example
if __name__ == "__main__":
    client = ThreatIntelClient()
    
    # Single lookup
    result = client.lookup_ip("8.8.8.8")
    print(result)
    
    # Bulk lookup
    iocs = ["8.8.8.8", "1.1.1.1", "208.67.222.222"]
    results = client.bulk_lookup(iocs)
    for r in results:
        print(r)

Key insight: Wrapping API interactions in a class provides a clean interface, centralizes error handling, and makes it easy to swap between different threat intel providers.

Real-World Context: External Data in Security Operations

External data enrichment is core to modern security operations:

SIEM Enrichment: When a SIEM generates an alert, analysts need context. Is this IP known-malicious? What country? What organization? Scripts that automatically enrich alerts with threat intel save hours of manual lookups during investigations.

Threat Intelligence Platforms: Services like VirusTotal, AlienVault OTX, and Shodan provide APIs for IOC lookups. Integrating these into your workflows means instant access to community threat intelligence.

Vulnerability Management: The NVD (National Vulnerability Database) provides APIs for CVE lookups. Scripts can automatically check if software versions have known vulnerabilities.

MITRE ATT&CK Reference: Technique T1102 (Web Service) describes how attackers use legitimate web services for C2. Understanding how to interact with web services helps you both detect this technique and build defensive tools.

Key insight: The security community shares intelligence through APIs. Scripts that tap into this collective knowledge are far more powerful than those working in isolation.

Guided Lab: Multi-Source IOC Enrichment

Let's build a tool that enriches IOCs using multiple data sources.

Step 1: Create the Enrichment Script

Create ioc_enrichment.py:

"""
IOC Enrichment Tool
Queries multiple sources to enrich indicators of compromise
"""

import requests
import json
import time
from datetime import datetime


def lookup_ip_geolocation(ip):
    """Get geolocation for IP address."""
    try:
        response = requests.get(
            f"http://ip-api.com/json/{ip}",
            timeout=10
        )
        if response.status_code == 200:
            data = response.json()
            if data.get("status") == "success":
                return {
                    "source": "ip-api.com",
                    "country": data.get("country"),
                    "country_code": data.get("countryCode"),
                    "region": data.get("regionName"),
                    "city": data.get("city"),
                    "isp": data.get("isp"),
                    "org": data.get("org"),
                    "as": data.get("as")
                }
        return {"source": "ip-api.com", "error": "Lookup failed"}
    except Exception as e:
        return {"source": "ip-api.com", "error": str(e)}


def lookup_dns(domain):
    """Get DNS information for domain (simulated)."""
    # In production, use dnspython or similar
    return {
        "source": "dns_lookup",
        "note": "Implement with dnspython for real lookups",
        "domain": domain
    }


def check_abuse_ipdb(ip, api_key=None):
    """Check IP against AbuseIPDB (requires API key)."""
    if not api_key:
        return {
            "source": "abuseipdb",
            "note": "API key required - get free key at abuseipdb.com"
        }
    
    try:
        headers = {
            "Key": api_key,
            "Accept": "application/json"
        }
        params = {
            "ipAddress": ip,
            "maxAgeInDays": 90
        }
        response = requests.get(
            "https://api.abuseipdb.com/api/v2/check",
            headers=headers,
            params=params,
            timeout=10
        )
        if response.status_code == 200:
            data = response.json().get("data", {})
            return {
                "source": "abuseipdb",
                "abuse_confidence": data.get("abuseConfidenceScore"),
                "total_reports": data.get("totalReports"),
                "is_whitelisted": data.get("isWhitelisted"),
                "usage_type": data.get("usageType")
            }
        return {"source": "abuseipdb", "error": f"HTTP {response.status_code}"}
    except Exception as e:
        return {"source": "abuseipdb", "error": str(e)}


def enrich_ip(ip, api_keys=None):
    """Enrich an IP address with multiple sources."""
    api_keys = api_keys or {}
    
    enrichment = {
        "indicator": ip,
        "type": "ip",
        "enriched_at": datetime.now().isoformat(),
        "sources": []
    }
    
    # Geolocation lookup
    print(f"  [*] Looking up geolocation...")
    geo_result = lookup_ip_geolocation(ip)
    enrichment["sources"].append(geo_result)
    time.sleep(0.5)  # Rate limiting
    
    # AbuseIPDB lookup
    print(f"  [*] Checking AbuseIPDB...")
    abuse_result = check_abuse_ipdb(ip, api_keys.get("abuseipdb"))
    enrichment["sources"].append(abuse_result)
    
    # Summarize findings
    enrichment["summary"] = generate_summary(enrichment)
    
    return enrichment


def generate_summary(enrichment):
    """Generate human-readable summary of enrichment."""
    summary = []
    
    for source in enrichment["sources"]:
        if source["source"] == "ip-api.com" and "country" in source:
            summary.append(f"Location: {source.get('city', 'Unknown')}, {source.get('country', 'Unknown')}")
            summary.append(f"ISP: {source.get('isp', 'Unknown')}")
        
        if source["source"] == "abuseipdb" and "abuse_confidence" in source:
            score = source["abuse_confidence"]
            if score > 50:
                summary.append(f"WARNING: High abuse score ({score}%)")
            elif score > 0:
                summary.append(f"Note: Some abuse reports ({score}%)")
            else:
                summary.append("No abuse reports found")
    
    return summary


def enrich_batch(iocs, api_keys=None):
    """Enrich a batch of IOCs."""
    results = []
    
    for i, ioc in enumerate(iocs):
        print(f"\n[{i+1}/{len(iocs)}] Enriching: {ioc}")
        
        # Detect type and enrich
        if is_ip(ioc):
            result = enrich_ip(ioc, api_keys)
        else:
            result = {
                "indicator": ioc,
                "type": "unknown",
                "note": "Only IP enrichment implemented in this demo"
            }
        
        results.append(result)
        
        # Rate limiting between IOCs
        if i < len(iocs) - 1:
            time.sleep(1)
    
    return results


def is_ip(value):
    """Check if value is an IP address."""
    parts = value.split(".")
    if len(parts) != 4:
        return False
    try:
        return all(0 <= int(p) <= 255 for p in parts)
    except ValueError:
        return False


def print_report(results):
    """Print enrichment report."""
    print("\n" + "=" * 60)
    print("IOC ENRICHMENT REPORT")
    print("=" * 60)
    
    for result in results:
        print(f"\nIndicator: {result['indicator']} ({result['type']})")
        print("-" * 40)
        
        if "summary" in result:
            for line in result["summary"]:
                print(f"  {line}")
        
        if "note" in result:
            print(f"  Note: {result['note']}")
    
    print("\n" + "=" * 60)


# Main execution
if __name__ == "__main__":
    # Test IOCs
    test_iocs = [
        "8.8.8.8",      # Google DNS
        "1.1.1.1",      # Cloudflare DNS
        "208.67.222.222"  # OpenDNS
    ]
    
    # API keys (set these if you have them)
    api_keys = {
        # "abuseipdb": "YOUR_API_KEY_HERE"
    }
    
    print("IOC Enrichment Tool")
    print("=" * 40)
    
    # Enrich IOCs
    results = enrich_batch(test_iocs, api_keys)
    
    # Print report
    print_report(results)
    
    # Save to JSON
    with open("enrichment_results.json", "w") as f:
        json.dump(results, f, indent=2)
    print("\nResults saved to enrichment_results.json")

Step 2: Run and Test

Run python3 ioc_enrichment.py and observe the enrichment process.

Step 3: Reflection (mandatory)

  1. Why do we add delays between API calls?
  2. How does the script handle missing API keys?
  3. What would you add to make this production-ready?
  4. How could you extend this to support more IOC types?

Week 9 Outcome Check

By the end of this week, you should be able to:

Next week: Regular Expressions—where we learn to find patterns in security data with surgical precision.

🎯 Hands-On Labs (Free & Essential)

Practice API requests and external data handling before moving to reading resources.

🎮 TryHackMe: Python Basics (HTTP Requests)

What you'll do: Use Python to fetch and parse data from web endpoints.
Why it matters: Threat intel and enrichment workflows rely on external APIs.
Time estimate: 1-1.5 hours

Start TryHackMe Python Basics →

📝 Lab Exercise: Simple IOC Enricher

Task: Query a public API (e.g., ip-api.com) and enrich a list of IPs.
Deliverable: JSON output with country/ISP metadata per IP.
Why it matters: Enrichment adds context that improves triage accuracy.
Time estimate: 60-90 minutes

🏁 PicoCTF Practice: General Skills (Web Requests)

What you'll do: Solve beginner challenges that involve pulling data from the web.
Why it matters: Many security scripts must handle unpredictable web responses.
Time estimate: 1-2 hours

Start PicoCTF General Skills →

🛡️ Lab: Password Hashing Basics

What you'll do: Use bcrypt or argon2 to hash and verify passwords.
Deliverable: Script that stores salted hashes and verifies login attempts.
Why it matters: Storing plaintext passwords leads to instant account compromise.
Time estimate: 60-90 minutes

💡 Lab Tip: Always set timeouts for HTTP requests to avoid hanging scripts.

🛡️ Secure Coding: Safe External Data

External data is untrusted by default. Defensive scripts validate inputs, verify responses, and limit what they accept.

External data safety checklist:
- Validate URLs and allowed domains
- Enforce timeouts and retries
- Verify TLS certificates (avoid verify=False)
- Parse responses defensively and check status codes

📚 Building on CSY101 Week-13: Threat model how attacker-controlled endpoints can abuse your client.

Resources

Complete the required resources to build your foundation.

Lab: Threat Feed Aggregator

Goal: Build a tool that fetches IOCs from multiple public threat feeds and consolidates them.

Linux/Windows Path (same for both)

  1. Create feed_aggregator.py
  2. Implement functions to fetch from at least 2 public feeds:
    • Example: Feodo Tracker C2 IPs
    • Example: URLhaus malicious URLs
    • Or other free, public threat feeds
  3. Parse each feed's format (may be CSV, JSON, or plain text)
  4. Consolidate IOCs into a unified format
  5. Deduplicate entries
  6. Export to JSON with source attribution
  7. Include proper error handling for network failures

Deliverable (submit):

Checkpoint Questions

  1. What HTTP method is used for retrieving data from an API?
  2. What does status code 429 indicate and how should you handle it?
  3. Why is a timeout important for network requests?
  4. What is the difference between response.text and response.json()?
  5. Why should you validate URLs before fetching them?
  6. What is rate limiting and why do APIs enforce it?

Weekly Reflection

Reflection Prompt (200-300 words):

This week you learned to connect your scripts to external data sources. APIs and web resources extend your capabilities beyond local data, tapping into the collective intelligence of the security community.

Reflect on these questions:

A strong reflection will consider both the power and the responsibilities of connecting to external security intelligence.

Verified Resources & Videos

External data transforms local scripts into connected intelligence tools. With API skills, you can tap into VirusTotal, Shodan, threat feeds, and countless other resources. Next week: pattern matching with regular expressions.

← Previous: Week 08 Next: Week 10 →

Week 09 Quiz

Test your understanding of the weekly concepts.

Format: 10 multiple-choice questions. Passing score: 70%. Time: Untimed.

Take Quiz