10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, 169.254.169.254) before making the request.Metadata-Flavor: Google header, Azure IMDS token requirement.| # | Vulnerability | Risk | Vulnerable Code | Secure Code |
|---|---|---|---|---|
| 1 | Internal network access | Critical -- access admin panels, databases, internal APIs | requests.get(user_url) | Allowlist hosts + validate resolved IP is not private |
| 2 | Cloud metadata theft | Critical -- IAM credential exfiltration (Capital One breach) | fetch(url) reaching 169.254.169.254 | Block link-local ranges + enforce IMDSv2 (PUT + token) |
| 3 | DNS rebinding | High -- IP changes between validation and request | Validate hostname, then fetch (separate DNS lookups) | Pin DNS: resolve once, validate IP, fetch by IP |
| 4 | Protocol smuggling | High -- file://, gopher://, dict:// read local files or interact with services | urllib.urlopen(user_url) with no scheme check | Allowlist only http:// and https:// schemes |
| 5 | Open redirect bypass | High -- redirect from allowed domain to internal target | Allow redirects to pre-validated domain | Disable redirects or re-validate after each hop |
| 6 | IP encoding bypass | Medium -- decimal, octal, hex IP representations bypass blocklists | Block 127.0.0.1 string only | Resolve to IP, then check against private ranges |
| 7 | URL parser confusion | Medium -- inconsistent parsing between validator and HTTP client | Validate URL with one parser, fetch with another | Use same URL parsing library for validation and request |
| 8 | IPv6 bypass | Medium -- ::1, [::], ::ffff:127.0.0.1 bypass IPv4 blocklists | Block only IPv4 loopback | Check both IPv4 and IPv6 private/reserved ranges |
| 9 | Blind SSRF | Medium -- no response but side-channel timing/errors leak info | Fetch user URL, discard response (side effects remain) | Same validation as full SSRF -- response visibility is irrelevant |
| 10 | TOCTOU race condition | Medium -- DNS changes between check and use | Validate, sleep/delay, then fetch | Resolve once, connect directly to resolved IP with Host header |
START: Does the application need to fetch user-supplied URLs?
├── Can you restrict to a fixed set of allowed hosts?
│ ├── YES → Use strict allowlist of hostnames + validate resolved IP
│ └── NO ↓
├── Does the app run in a cloud environment (AWS/GCP/Azure)?
│ ├── YES → Block metadata IPs at network layer + enforce IMDSv2/equivalent + app-layer validation
│ └── NO ↓
├── Does the URL come from user input or external data?
│ ├── YES → Parse URL, validate scheme (http/https only), resolve DNS,
│ │ check resolved IP against private ranges, disable redirects
│ └── NO ↓
├── Is the URL from a trusted internal configuration?
│ ├── YES → Still validate at network layer (defense in depth)
│ └── NO ↓
└── DEFAULT → Deny outbound requests by default, allowlist specific destinations
Reject any URL that does not use http or https. Use a well-tested URL parser -- never construct URLs via string concatenation. [src1]
from urllib.parse import urlparse
def validate_url_scheme(url: str) -> bool:
parsed = urlparse(url)
return parsed.scheme in ('http', 'https') and parsed.hostname is not None
Verify: Test with file:///etc/passwd, gopher://internal:25/, dict://localhost:6379/ -- all should be rejected.
After parsing the URL, resolve the hostname to an IP address and check that it is not in any private, loopback, or link-local range. [src1]
import ipaddress, socket
def is_safe_ip(ip_str: str) -> bool:
ip = ipaddress.ip_address(ip_str)
return ip.is_global and not ip.is_reserved and not ip.is_loopback \
and not ip.is_link_local and not ip.is_private
def resolve_and_validate(hostname: str) -> str:
results = socket.getaddrinfo(hostname, None)
for family, _, _, _, sockaddr in results:
if is_safe_ip(sockaddr[0]):
return sockaddr[0]
raise ValueError(f"Hostname {hostname} resolves to blocked IP range")
Verify: Test with localhost, 169.254.169.254, 10.0.0.1, [::1] -- all should raise ValueError.
Make the request directly to the resolved IP with the original Host header. Disable automatic redirect following. [src2]
import requests
def safe_fetch(url: str, timeout: int = 10) -> requests.Response:
parsed = urlparse(url)
if not validate_url_scheme(url):
raise ValueError(f"Blocked scheme: {parsed.scheme}")
safe_ip = resolve_and_validate(parsed.hostname)
return requests.get(url, timeout=timeout, allow_redirects=False)
Verify: Test with a URL that 302-redirects to http://169.254.169.254/ -- should return the redirect without following.
For AWS, enforce IMDSv2 which requires a PUT request to obtain a session token. Block the metadata IP at the network layer. [src5]
# Require IMDSv2 on all EC2 instances
aws ec2 modify-instance-metadata-options \
--instance-id i-1234567890abcdef0 \
--http-tokens required \
--http-put-response-hop-limit 1
# Block metadata IP at instance level
iptables -A OUTPUT -d 169.254.169.254 -j DROP
Verify: curl -s http://169.254.169.254/latest/meta-data/ should fail or time out.
Apply firewall rules and network policies as defense in depth. [src4]
# Kubernetes NetworkPolicy: deny egress to private ranges
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: deny-internal-egress
spec:
podSelector:
matchLabels:
app: web-service
policyTypes: [Egress]
egress:
- to:
- ipBlock:
cidr: 0.0.0.0/0
except: [10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, 169.254.0.0/16, 127.0.0.0/8]
Verify: From the pod, curl http://169.254.169.254/ and curl http://10.0.0.1/ should time out.
# Input: User-supplied URL string
# Output: HTTP response or ValueError if URL targets blocked IP
import ipaddress, socket, requests
from urllib.parse import urlparse
BLOCKED_RANGES = [
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('172.16.0.0/12'),
ipaddress.ip_network('192.168.0.0/16'),
ipaddress.ip_network('127.0.0.0/8'),
ipaddress.ip_network('169.254.0.0/16'),
ipaddress.ip_network('::1/128'),
ipaddress.ip_network('fc00::/7'),
ipaddress.ip_network('fe80::/10'),
]
def is_ip_blocked(ip_str: str) -> bool:
ip = ipaddress.ip_address(ip_str)
return any(ip in net for net in BLOCKED_RANGES)
def ssrf_safe_fetch(url: str, timeout: int = 10):
parsed = urlparse(url)
if parsed.scheme not in ('http', 'https'):
raise ValueError(f"Blocked scheme: {parsed.scheme}")
for _, _, _, _, sockaddr in socket.getaddrinfo(parsed.hostname, parsed.port):
if is_ip_blocked(sockaddr[0]):
raise ValueError(f"Blocked IP: {sockaddr[0]}")
return requests.get(url, timeout=timeout, allow_redirects=False)
// Input: User-supplied URL string
// Output: HTTP response or throws Error if URL targets blocked IP
const { URL } = require('url');
const dns = require('dns').promises;
const net = require('net');
function isPrivateIP(ip) {
const p = ip.split('.').map(Number);
if (p[0] === 10) return true;
if (p[0] === 172 && p[1] >= 16 && p[1] <= 31) return true;
if (p[0] === 192 && p[1] === 168) return true;
if (p[0] === 127) return true;
if (p[0] === 169 && p[1] === 254) return true;
if (p[0] === 0) return true;
if (net.isIPv6(ip)) return true;
return false;
}
async function ssrfSafeFetch(userUrl) {
const parsed = new URL(userUrl);
if (!['http:', 'https:'].includes(parsed.protocol))
throw new Error(`Blocked protocol: ${parsed.protocol}`);
const { address } = await dns.lookup(parsed.hostname);
if (isPrivateIP(address))
throw new Error(`Blocked IP: ${address}`);
return fetch(userUrl, { redirect: 'manual' });
}
// Input: User-supplied URL string
// Output: *http.Response or error if URL targets blocked IP
package ssrf
import (
"fmt"; "net"; "net/http"; "net/url"; "time"
)
var privateRanges = []string{
"10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16",
"127.0.0.0/8", "169.254.0.0/16", "::1/128",
"fc00::/7", "fe80::/10",
}
func isBlockedIP(ip net.IP) bool {
for _, cidr := range privateRanges {
_, network, _ := net.ParseCIDR(cidr)
if network.Contains(ip) { return true }
}
return false
}
func SafeFetch(rawURL string) (*http.Response, error) {
parsed, err := url.Parse(rawURL)
if err != nil { return nil, err }
if parsed.Scheme != "http" && parsed.Scheme != "https" {
return nil, fmt.Errorf("blocked scheme: %s", parsed.Scheme)
}
ips, _ := net.LookupIP(parsed.Hostname())
for _, ip := range ips {
if isBlockedIP(ip) {
return nil, fmt.Errorf("blocked IP %s", ip)
}
}
client := &http.Client{Timeout: 10 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
}}
return client.Get(rawURL)
}
// Input: User-supplied URL string
// Output: ResponseEntity or SecurityException if blocked
import java.net.*;
public class SsrfSafeFetcher {
public static boolean isBlockedIP(InetAddress addr) {
return addr.isLoopbackAddress()
|| addr.isLinkLocalAddress()
|| addr.isSiteLocalAddress()
|| addr.isAnyLocalAddress()
|| addr.isMulticastAddress();
}
public static void validateUrl(String userUrl) throws Exception {
URL parsed = new URL(userUrl);
if (!"http".equals(parsed.getProtocol())
&& !"https".equals(parsed.getProtocol()))
throw new SecurityException("Blocked scheme");
for (InetAddress addr : InetAddress.getAllByName(parsed.getHost()))
if (isBlockedIP(addr))
throw new SecurityException("Blocked IP: " + addr);
}
}
# BAD -- blocklist filtering is trivially bypassed
BLOCKED = ['127.0.0.1', 'localhost', '169.254.169.254']
def fetch(url):
host = urlparse(url).hostname
if host in BLOCKED: raise ValueError("Blocked")
return requests.get(url)
# Bypass: 0x7f000001, 017700000001, 2130706433, [::1]
# GOOD -- resolve DNS first, then check IP against private ranges
def fetch(url):
parsed = urlparse(url)
if parsed.scheme not in ('http', 'https'): raise ValueError("Blocked scheme")
for _, _, _, _, sa in socket.getaddrinfo(parsed.hostname, parsed.port):
if is_ip_blocked(sa[0]): raise ValueError(f"Blocked IP: {sa[0]}")
return requests.get(url, allow_redirects=False, timeout=10)
# BAD -- allowed.com 302-redirects to http://169.254.169.254/
def fetch(url):
validate_url(url) # Passes -- allowed.com is on allowlist
return requests.get(url, allow_redirects=True) # Follows redirect to metadata!
# GOOD -- disable redirects entirely
def fetch(url):
validate_url(url)
resp = requests.get(url, allow_redirects=False, timeout=10)
if 300 <= resp.status_code < 400:
redirect_url = resp.headers.get('Location')
validate_url(redirect_url) # Re-validate redirect target
return requests.get(redirect_url, allow_redirects=False, timeout=10)
return resp
# BAD -- hostname resolves to public IP during validation,
# then attacker DNS rebinds to 169.254.169.254 on actual request
def fetch(url):
host = urlparse(url).hostname
ip = socket.gethostbyname(host) # 1st resolution: 1.2.3.4 (public)
if is_private(ip): raise ValueError("Blocked")
return requests.get(url) # 2nd resolution: 169.254.169.254 (rebind!)
# GOOD -- resolve once, validate, connect directly to resolved IP
def fetch(url):
parsed = urlparse(url)
ip = socket.gethostbyname(parsed.hostname)
if is_ip_blocked(ip): raise ValueError(f"Blocked IP: {ip}")
safe_url = url.replace(parsed.hostname, ip)
return requests.get(safe_url, headers={'Host': parsed.hostname},
allow_redirects=False, timeout=10)
::ffff:127.0.0.1 or ::ffff:169.254.169.254. Fix: Normalize IPv6-mapped addresses to IPv4 before checking, or use ipaddress.ip_address() which handles both. [src1]http://evil.com#@allowed.com differently. Fix: Use the same parsing library for validation and request execution. [src2]2130706433 (decimal for 127.0.0.1), 0177.0.0.1 (octal), 0x7f000001 (hex) all bypass string-based blocklists. Fix: Always resolve to IP, then check against numeric ranges. [src1]metadata.google.internal (169.254.169.254), Azure IMDS at 169.254.169.254 both need blocking. Fix: Block the entire 169.254.0.0/16 range. [src5]http://169.254.169.254/. Fix: Disable redirects (allow_redirects=False) or re-validate each destination. [src2]file:///etc/passwd reads local files; gopher:// interacts with Redis, Memcached, SMTP. Fix: Strict allowlist of http and https only. [src3]# Check if cloud metadata is accessible from instance
curl -s -o /dev/null -w "%{http_code}" http://169.254.169.254/latest/meta-data/
# Verify IMDSv2 enforcement on AWS EC2
aws ec2 describe-instances --instance-ids i-1234567890abcdef0 \
--query 'Reservations[].Instances[].MetadataOptions'
# Test SSRF protections with IP encoding bypasses
curl -v http://your-app.com/fetch?url=http://2130706433/ # decimal 127.0.0.1
curl -v http://your-app.com/fetch?url=http://0x7f000001/ # hex 127.0.0.1
curl -v http://your-app.com/fetch?url=http://0177.0.0.1/ # octal 127.0.0.1
# Find outbound HTTP requests in Python codebase
grep -rn 'requests\.get\|requests\.post\|urlopen' --include="*.py" .
# Find outbound HTTP requests in Node.js codebase
grep -rn 'http\.get\|https\.get\|fetch(\|axios\.' --include="*.js" --include="*.ts" .
# Check iptables rules blocking metadata IP
iptables -L OUTPUT -n | grep 169.254
| Standard/Milestone | Date | Key Change |
|---|---|---|
| OWASP Top 10 A10:2021 | Sep 2021 | SSRF added as own category |
| AWS IMDSv2 | Nov 2019 | Session-based metadata access (PUT + token) |
| AWS IMDSv2 default enforcement | 2024 | New EC2 instances default to IMDSv2-only |
| GCP metadata header requirement | 2020 | Metadata-Flavor: Google header required |
| Azure IMDS header requirement | 2020 | Metadata: true header required |
| CWE-918 | Ongoing | MITRE weakness entry for SSRF classification |
| Capital One breach | Jul 2019 | SSRF + IMDSv1 exploited for 100M+ customer records |
| Use When | Don't Use When | Use Instead |
|---|---|---|
| App fetches user-supplied URLs (webhooks, image imports, link previews) | App only makes requests to hardcoded internal endpoints | Static configuration with no user input in URLs |
| Accepting webhook callback URLs from third parties | All outbound URLs are compile-time constants | No SSRF risk if no user influence on request targets |
| Running in cloud environments (AWS, GCP, Azure) | On-premise with no cloud metadata endpoints | Still apply private IP blocking but metadata protection less critical |
| PDF/image generation from user-supplied content with embedded URLs | Processing only structured data (JSON/XML) with no URL fields | Input validation for the structured format |
http://127.0.0.1:[email protected]/ differently -- authority component parsing varies between implementations