Files
clawsec/utils/analyze_exploitability.py
davida-ps 073e771b73 Exploitability Context for CVE Advisories (#89)
* feat(advisories): add exploitability context for CVE advisories

* fix(ci): align exploitability workflow with signing model

* docs(skills): add patch release changelog entries

* chore(clawsec-feed): bump version to 0.0.5

* chore(clawsec-suite): bump version to 0.1.4

* fix(clawsec-nanoclaw): align exploitability handling and nanoclaw integration

* chore(clawsec-nanoclaw): bump version to 0.0.2

* refactor(scripts): share feed path and mirror sync helpers

* refactor(utils): unify cvss vector parsing flow

* refactor(clawsec-nanoclaw): centralize advisory risk evaluation

* docs(exploitability): refresh release metadata dates

* fix(review): align feed signing and advisory dedupe

* chore(clawsec-feed): bump version to 0.0.6

* chore(clawsec-nanoclaw): bump version to 0.0.3

* fix(backfill): limit signing to target feed only

* fix(review): keep skill runtime verify-only and dedupe matching

* chore(clawsec-nanoclaw): bump version to 0.0.4

* chore(skills): align versions with published tags

* feat(feed): enrich local population with exploitability analysis

* docs(exploitability): mark backfill as historical flow
2026-03-01 18:43:24 +02:00

532 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Exploitability Analyzer - Analyzes CVE exploitability in OpenClaw/NanoClaw deployments
Usage:
python utils/analyze_exploitability.py --help
echo '{"cve_id":"CVE-2026-27488","cvss_score":7.3}' | python utils/analyze_exploitability.py --json
python utils/analyze_exploitability.py --test-cases
Example:
cat cve-data.json | python utils/analyze_exploitability.py --json
"""
import argparse
import json
import sys
from typing import Any
def parse_cvss_vector(vector_string: str) -> dict[str, str]:
"""
Parse CVSS v2, v3.0, or v3.1 vector string into components.
Args:
vector_string: CVSS vector (e.g., "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H")
Returns:
Dictionary of CVSS metrics and values
"""
if not vector_string:
return {}
metrics = {}
normalized = vector_string.strip()
# Remove leading CVSS v3.x prefix if present (e.g., "CVSS:3.1/")
if normalized.startswith("CVSS:3"):
_, separator, remainder = normalized.partition("/")
normalized = remainder if separator else ""
# Remove surrounding parentheses/whitespace used by some CVSS v2 strings.
normalized = normalized.strip().strip("()").strip()
if not normalized:
return metrics
# Parse all vector formats with shared key/value extraction logic.
for part in normalized.split("/"):
if ":" in part:
key, value = part.split(":", 1)
metrics[key] = value
return metrics
def analyze_attack_vector(cvss_metrics: dict[str, str]) -> dict[str, Any]:
"""
Analyze attack vector from CVSS metrics.
Args:
cvss_metrics: Parsed CVSS metrics dictionary
Returns:
Dictionary with attack vector analysis
"""
analysis = {
"is_network_accessible": False,
"requires_authentication": True,
"requires_user_interaction": True,
"complexity": "unknown"
}
# Attack Vector (AV)
av = cvss_metrics.get("AV", "")
if av == "N": # Network
analysis["is_network_accessible"] = True
elif av == "A": # Adjacent Network
analysis["is_network_accessible"] = True
elif av in ["L", "P"]: # Local or Physical
analysis["is_network_accessible"] = False
# Privileges Required (PR) / Authentication (AU for v2)
pr = cvss_metrics.get("PR", cvss_metrics.get("Au", ""))
if pr in ["N", "NONE"]:
analysis["requires_authentication"] = False
elif pr in ["L", "H", "SINGLE", "MULTIPLE"]:
analysis["requires_authentication"] = True
# User Interaction (UI)
ui = cvss_metrics.get("UI", "")
if ui == "N": # None
analysis["requires_user_interaction"] = False
elif ui == "R": # Required
analysis["requires_user_interaction"] = True
# Attack Complexity (AC)
ac = cvss_metrics.get("AC", "")
if ac == "L":
analysis["complexity"] = "low"
elif ac in ["M", "H"]:
analysis["complexity"] = "high"
return analysis
def detect_exploit_availability(references: list[str]) -> dict[str, Any]:
"""
Detect if exploits are publicly available based on reference URLs.
Args:
references: List of reference URLs
Returns:
Dictionary with exploit_available (bool) and exploit_sources (list)
"""
exploit_indicators = [
"exploit-db.com",
"exploit-database",
"exploitdb",
"packetstormsecurity.com",
"packetstorm",
"github.com/exploit",
"github.com/poc",
"github.com/proof-of-concept",
"metasploit",
"exploit/",
"/exploit",
"/poc",
"/proof-of-concept",
"exploitability",
"exploit-code",
]
exploit_sources = []
for ref in references:
ref_lower = ref.lower()
for indicator in exploit_indicators:
if indicator in ref_lower:
exploit_sources.append(ref)
break
return {
"exploit_available": len(exploit_sources) > 0,
"exploit_sources": exploit_sources
}
def analyze_exploitability(cve_data: dict[str, Any], check_exploits: bool = False) -> dict[str, Any]:
"""
Analyze CVE exploitability for OpenClaw/NanoClaw deployments.
Args:
cve_data: Dictionary containing CVE information with keys:
- cve_id: CVE identifier
- cvss_score: CVSS base score (float)
- cvss_vector: CVSS vector string (optional)
- type: Vulnerability type
- description: CVE description text
- references: List of reference URLs (optional)
check_exploits: Whether to check references for exploit availability
Returns:
Dictionary with exploitability_score (high/medium/low/unknown) and rationale
"""
cve_id = cve_data.get("cve_id", "unknown")
cvss_score = cve_data.get("cvss_score", 0.0)
cvss_vector = cve_data.get("cvss_vector", "")
vuln_type = cve_data.get("type", "")
description = cve_data.get("description", "")
references = cve_data.get("references", [])
# Parse CVSS vector if available
cvss_metrics = parse_cvss_vector(cvss_vector)
attack_analysis = analyze_attack_vector(cvss_metrics)
# Initial scoring based on CVSS
score = "unknown"
rationale_parts = []
# CVSS-based baseline
if cvss_score >= 9.0:
score = "high"
rationale_parts.append(f"Critical CVSS score ({cvss_score})")
elif cvss_score >= 7.0:
score = "high"
rationale_parts.append(f"High CVSS score ({cvss_score})")
elif cvss_score >= 4.0:
score = "medium"
rationale_parts.append(f"Medium CVSS score ({cvss_score})")
elif cvss_score > 0:
score = "low"
rationale_parts.append(f"Low CVSS score ({cvss_score})")
else:
score = "unknown"
rationale_parts.append("No CVSS score available")
# Adjust based on attack vector analysis
if attack_analysis["is_network_accessible"]:
if not attack_analysis["requires_authentication"] and not attack_analysis["requires_user_interaction"]:
# Network accessible, no auth, no user interaction = highly exploitable
if score == "medium":
score = "high"
rationale_parts.append("remotely exploitable without authentication")
else:
rationale_parts.append("network accessible")
else:
# Local-only vulnerabilities are less critical in agent deployments
if score == "high":
score = "medium"
rationale_parts.append("requires local access")
# OpenClaw/NanoClaw deployment context - adjust based on vulnerability type
vuln_type_lower = vuln_type.lower()
description_lower = description.lower()
# High-risk vulnerability types in AI agent deployments
if any(keyword in vuln_type_lower or keyword in description_lower for keyword in [
"ssrf", "server_side_request_forgery", "server-side request forgery"
]):
# SSRF is critical for agents that make external API calls
if score != "high" and cvss_score >= 6.0:
score = "high"
rationale_parts.append("SSRF affects agents making external requests")
elif any(keyword in vuln_type_lower or keyword in description_lower for keyword in [
"path_traversal", "path traversal", "directory traversal", "file_inclusion"
]):
# Path traversal is critical for agents with file system access
if score != "high" and cvss_score >= 6.0:
score = "high"
rationale_parts.append("path traversal affects agents with file access")
elif any(keyword in vuln_type_lower or keyword in description_lower for keyword in [
"rce", "remote_code_execution", "remote code execution", "code_injection",
"command_injection", "command injection", "arbitrary code"
]):
# RCE is always critical regardless of other factors
score = "high"
rationale_parts.append("RCE is critical in agent deployments")
elif any(keyword in vuln_type_lower or keyword in description_lower for keyword in [
"prototype_pollution", "prototype pollution"
]):
# Prototype pollution in Node.js agents can lead to RCE
if score == "low":
score = "medium"
rationale_parts.append("prototype pollution can escalate in Node.js agents")
elif any(keyword in vuln_type_lower or keyword in description_lower for keyword in [
"xss", "cross_site_scripting", "cross-site scripting", "reflected xss", "stored xss"
]):
# XSS is lower risk in headless agent deployments (no browser rendering)
if score == "high" and not attack_analysis["is_network_accessible"]:
score = "medium"
rationale_parts.append("XSS has limited impact in headless agents")
elif any(keyword in vuln_type_lower or keyword in description_lower for keyword in [
"sql_injection", "sql injection", "nosql injection"
]):
# SQL injection depends on whether agent uses databases
if attack_analysis["is_network_accessible"] and not attack_analysis["requires_authentication"]:
if score == "medium":
score = "high"
rationale_parts.append("injection affects agents with database access")
# Check for exploit availability if requested
exploit_info = {"exploit_available": False, "exploit_sources": []}
if check_exploits and references:
exploit_info = detect_exploit_availability(references)
if exploit_info["exploit_available"]:
# Elevate score if public exploits exist
if score == "low":
score = "medium"
elif score == "medium":
score = "high"
elif score == "unknown" and cvss_score > 0:
# If we have some CVSS score but it was unknown, upgrade to at least medium
score = "medium"
exploit_count = len(exploit_info["exploit_sources"])
source_suffix = "s" if exploit_count > 1 else ""
rationale_parts.append(
f"public exploit available ({exploit_count} source{source_suffix})"
)
# Build rationale string
rationale = "; ".join(rationale_parts[:5]) # Limit to first 5 parts for context
result = {
"cve_id": cve_id,
"exploitability_score": score,
"exploitability_rationale": rationale,
"attack_vector_analysis": attack_analysis
}
# Include exploit info if check_exploits was enabled
if check_exploits:
result["exploit_detection"] = exploit_info
return result
def run_test_cases():
"""
Run comprehensive test cases for attack vector analysis.
Tests CVSS vector parsing and attack vector analysis logic.
"""
test_cases = [
{
"name": "CVSS 3.1 - Network accessible, no auth, no UI (critical)",
"cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H",
"expected": {
"is_network_accessible": True,
"requires_authentication": False,
"requires_user_interaction": False,
"complexity": "low"
}
},
{
"name": "CVSS 3.1 - Network accessible, requires auth",
"cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H",
"expected": {
"is_network_accessible": True,
"requires_authentication": True,
"requires_user_interaction": False,
"complexity": "low"
}
},
{
"name": "CVSS 3.1 - Network accessible, requires UI",
"cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H",
"expected": {
"is_network_accessible": True,
"requires_authentication": False,
"requires_user_interaction": True,
"complexity": "low"
}
},
{
"name": "CVSS 3.1 - Local access required",
"cvss_vector": "CVSS:3.1/AV:L/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H",
"expected": {
"is_network_accessible": False,
"requires_authentication": False,
"requires_user_interaction": False,
"complexity": "low"
}
},
{
"name": "CVSS 3.1 - Adjacent network, high auth",
"cvss_vector": "CVSS:3.1/AV:A/AC:H/PR:H/UI:R/S:U/C:L/I:L/A:L",
"expected": {
"is_network_accessible": True,
"requires_authentication": True,
"requires_user_interaction": True,
"complexity": "high"
}
},
{
"name": "CVSS 3.0 - Physical access required",
"cvss_vector": "CVSS:3.0/AV:P/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H",
"expected": {
"is_network_accessible": False,
"requires_authentication": False,
"requires_user_interaction": False,
"complexity": "low"
}
},
{
"name": "CVSS v2 - Network, no auth required",
"cvss_vector": "(AV:N/AC:L/Au:N/C:C/I:C/A:C)",
"expected": {
"is_network_accessible": True,
"requires_authentication": False,
"requires_user_interaction": True, # v2 doesn't have UI, defaults to True
"complexity": "low"
}
},
{
"name": "CVSS v2 - Network, single auth",
"cvss_vector": "AV:N/AC:M/Au:SINGLE/C:P/I:P/A:P",
"expected": {
"is_network_accessible": True,
"requires_authentication": True,
"requires_user_interaction": True, # v2 doesn't have UI, defaults to True
"complexity": "high"
}
},
{
"name": "CVSS v2 - Local access, multiple auth",
"cvss_vector": "(AV:L/AC:L/Au:MULTIPLE/C:C/I:C/A:C)",
"expected": {
"is_network_accessible": False,
"requires_authentication": True,
"requires_user_interaction": True, # v2 doesn't have UI, defaults to True
"complexity": "low"
}
},
{
"name": "Empty CVSS vector",
"cvss_vector": "",
"expected": {
"is_network_accessible": False,
"requires_authentication": True,
"requires_user_interaction": True,
"complexity": "unknown"
}
}
]
print("Running attack vector analysis test cases...")
print("=" * 70)
passed = 0
failed = 0
for i, test in enumerate(test_cases, 1):
print(f"\nTest {i}/{len(test_cases)}: {test['name']}")
print(f" CVSS Vector: {test['cvss_vector']}")
# Parse CVSS vector and analyze attack vector
cvss_metrics = parse_cvss_vector(test['cvss_vector'])
result = analyze_attack_vector(cvss_metrics)
# Compare with expected results
test_passed = True
for key, expected_value in test['expected'].items():
actual_value = result.get(key)
if actual_value != expected_value:
print(f" ❌ FAILED: {key}")
print(f" Expected: {expected_value}")
print(f" Got: {actual_value}")
test_passed = False
failed += 1
break
if test_passed:
print(" ✓ PASSED")
passed += 1
else:
# Show full result for debugging
print(f" Full result: {json.dumps(result, indent=6)}")
print("\n" + "=" * 70)
print(f"Test Results: {passed} passed, {failed} failed out of {len(test_cases)} total")
if failed > 0:
print("\n❌ Some tests failed!")
sys.exit(1)
else:
print("\n✅ All tests passed!")
sys.exit(0)
def main():
parser = argparse.ArgumentParser(
description="Analyze CVE exploitability for OpenClaw/NanoClaw deployments",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Analyze from JSON stdin
echo '{"cve_id":"CVE-2026-27488","cvss_score":7.3,"type":"ssrf"}' | python utils/analyze_exploitability.py --json
# Analyze with CVSS vector
echo '{"cve_id":"CVE-2026-1234","cvss_score":9.8,"cvss_vector":"CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H"}' \
| python utils/analyze_exploitability.py --json
# Run test cases
python utils/analyze_exploitability.py --test-cases
# Parse CVSS vector only
python utils/analyze_exploitability.py --parse-vector "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H"
"""
)
parser.add_argument(
"--json",
action="store_true",
help="Read CVE data from stdin as JSON and output analysis"
)
parser.add_argument(
"--parse-vector",
type=str,
metavar="VECTOR",
help="Parse and display CVSS vector string"
)
parser.add_argument(
"--test-cases",
action="store_true",
help="Run built-in test cases to verify analyzer logic"
)
parser.add_argument(
"--check-exploits",
action="store_true",
help="Check references for publicly available exploits and adjust score accordingly"
)
args = parser.parse_args()
# Handle --parse-vector
if args.parse_vector:
metrics = parse_cvss_vector(args.parse_vector)
print(json.dumps(metrics, indent=2))
sys.exit(0)
# Handle --test-cases
if args.test_cases:
run_test_cases()
sys.exit(0)
# Handle --json (stdin)
if args.json:
try:
cve_data = json.load(sys.stdin)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON input: {e}", file=sys.stderr)
sys.exit(1)
result = analyze_exploitability(cve_data, check_exploits=args.check_exploits)
print(json.dumps(result, indent=2))
sys.exit(0)
# No action specified - show help
parser.print_help()
sys.exit(0)
if __name__ == "__main__":
main()