#!/usr/bin/env python3 """ Exploitability Analyzer - Analyzes CVE exploitability in OpenClaw/NanoClaw deployments Usage: python utils/analyze_exploitability.py --help echo '{"cve_id":"CVE-2026-27488","cvss_score":7.3}' | python utils/analyze_exploitability.py --json python utils/analyze_exploitability.py --test-cases Example: cat cve-data.json | python utils/analyze_exploitability.py --json """ import argparse import json import sys from typing import Any def parse_cvss_vector(vector_string: str) -> dict[str, str]: """ Parse CVSS v2, v3.0, or v3.1 vector string into components. Args: vector_string: CVSS vector (e.g., "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H") Returns: Dictionary of CVSS metrics and values """ if not vector_string: return {} metrics = {} normalized = vector_string.strip() # Remove leading CVSS v3.x prefix if present (e.g., "CVSS:3.1/") if normalized.startswith("CVSS:3"): _, separator, remainder = normalized.partition("/") normalized = remainder if separator else "" # Remove surrounding parentheses/whitespace used by some CVSS v2 strings. normalized = normalized.strip().strip("()").strip() if not normalized: return metrics # Parse all vector formats with shared key/value extraction logic. for part in normalized.split("/"): if ":" in part: key, value = part.split(":", 1) metrics[key] = value return metrics def analyze_attack_vector(cvss_metrics: dict[str, str]) -> dict[str, Any]: """ Analyze attack vector from CVSS metrics. Args: cvss_metrics: Parsed CVSS metrics dictionary Returns: Dictionary with attack vector analysis """ analysis = { "is_network_accessible": False, "requires_authentication": True, "requires_user_interaction": True, "complexity": "unknown" } # Attack Vector (AV) av = cvss_metrics.get("AV", "") if av == "N": # Network analysis["is_network_accessible"] = True elif av == "A": # Adjacent Network analysis["is_network_accessible"] = True elif av in ["L", "P"]: # Local or Physical analysis["is_network_accessible"] = False # Privileges Required (PR) / Authentication (AU for v2) pr = cvss_metrics.get("PR", cvss_metrics.get("Au", "")) if pr in ["N", "NONE"]: analysis["requires_authentication"] = False elif pr in ["L", "H", "SINGLE", "MULTIPLE"]: analysis["requires_authentication"] = True # User Interaction (UI) ui = cvss_metrics.get("UI", "") if ui == "N": # None analysis["requires_user_interaction"] = False elif ui == "R": # Required analysis["requires_user_interaction"] = True # Attack Complexity (AC) ac = cvss_metrics.get("AC", "") if ac == "L": analysis["complexity"] = "low" elif ac in ["M", "H"]: analysis["complexity"] = "high" return analysis def detect_exploit_availability(references: list[str]) -> dict[str, Any]: """ Detect if exploits are publicly available based on reference URLs. Args: references: List of reference URLs Returns: Dictionary with exploit_available (bool) and exploit_sources (list) """ exploit_indicators = [ "exploit-db.com", "exploit-database", "exploitdb", "packetstormsecurity.com", "packetstorm", "github.com/exploit", "github.com/poc", "github.com/proof-of-concept", "metasploit", "exploit/", "/exploit", "/poc", "/proof-of-concept", "exploitability", "exploit-code", ] exploit_sources = [] for ref in references: ref_lower = ref.lower() for indicator in exploit_indicators: if indicator in ref_lower: exploit_sources.append(ref) break return { "exploit_available": len(exploit_sources) > 0, "exploit_sources": exploit_sources } def analyze_exploitability(cve_data: dict[str, Any], check_exploits: bool = False) -> dict[str, Any]: """ Analyze CVE exploitability for OpenClaw/NanoClaw deployments. Args: cve_data: Dictionary containing CVE information with keys: - cve_id: CVE identifier - cvss_score: CVSS base score (float) - cvss_vector: CVSS vector string (optional) - type: Vulnerability type - description: CVE description text - references: List of reference URLs (optional) check_exploits: Whether to check references for exploit availability Returns: Dictionary with exploitability_score (high/medium/low/unknown) and rationale """ cve_id = cve_data.get("cve_id", "unknown") cvss_score = cve_data.get("cvss_score", 0.0) cvss_vector = cve_data.get("cvss_vector", "") vuln_type = cve_data.get("type", "") description = cve_data.get("description", "") references = cve_data.get("references", []) # Parse CVSS vector if available cvss_metrics = parse_cvss_vector(cvss_vector) attack_analysis = analyze_attack_vector(cvss_metrics) # Initial scoring based on CVSS score = "unknown" rationale_parts = [] # CVSS-based baseline if cvss_score >= 9.0: score = "high" rationale_parts.append(f"Critical CVSS score ({cvss_score})") elif cvss_score >= 7.0: score = "high" rationale_parts.append(f"High CVSS score ({cvss_score})") elif cvss_score >= 4.0: score = "medium" rationale_parts.append(f"Medium CVSS score ({cvss_score})") elif cvss_score > 0: score = "low" rationale_parts.append(f"Low CVSS score ({cvss_score})") else: score = "unknown" rationale_parts.append("No CVSS score available") # Adjust based on attack vector analysis if attack_analysis["is_network_accessible"]: if not attack_analysis["requires_authentication"] and not attack_analysis["requires_user_interaction"]: # Network accessible, no auth, no user interaction = highly exploitable if score == "medium": score = "high" rationale_parts.append("remotely exploitable without authentication") else: rationale_parts.append("network accessible") else: # Local-only vulnerabilities are less critical in agent deployments if score == "high": score = "medium" rationale_parts.append("requires local access") # OpenClaw/NanoClaw deployment context - adjust based on vulnerability type vuln_type_lower = vuln_type.lower() description_lower = description.lower() # High-risk vulnerability types in AI agent deployments if any(keyword in vuln_type_lower or keyword in description_lower for keyword in [ "ssrf", "server_side_request_forgery", "server-side request forgery" ]): # SSRF is critical for agents that make external API calls if score != "high" and cvss_score >= 6.0: score = "high" rationale_parts.append("SSRF affects agents making external requests") elif any(keyword in vuln_type_lower or keyword in description_lower for keyword in [ "path_traversal", "path traversal", "directory traversal", "file_inclusion" ]): # Path traversal is critical for agents with file system access if score != "high" and cvss_score >= 6.0: score = "high" rationale_parts.append("path traversal affects agents with file access") elif any(keyword in vuln_type_lower or keyword in description_lower for keyword in [ "rce", "remote_code_execution", "remote code execution", "code_injection", "command_injection", "command injection", "arbitrary code" ]): # RCE is always critical regardless of other factors score = "high" rationale_parts.append("RCE is critical in agent deployments") elif any(keyword in vuln_type_lower or keyword in description_lower for keyword in [ "prototype_pollution", "prototype pollution" ]): # Prototype pollution in Node.js agents can lead to RCE if score == "low": score = "medium" rationale_parts.append("prototype pollution can escalate in Node.js agents") elif any(keyword in vuln_type_lower or keyword in description_lower for keyword in [ "xss", "cross_site_scripting", "cross-site scripting", "reflected xss", "stored xss" ]): # XSS is lower risk in headless agent deployments (no browser rendering) if score == "high" and not attack_analysis["is_network_accessible"]: score = "medium" rationale_parts.append("XSS has limited impact in headless agents") elif any(keyword in vuln_type_lower or keyword in description_lower for keyword in [ "sql_injection", "sql injection", "nosql injection" ]): # SQL injection depends on whether agent uses databases if attack_analysis["is_network_accessible"] and not attack_analysis["requires_authentication"]: if score == "medium": score = "high" rationale_parts.append("injection affects agents with database access") # Check for exploit availability if requested exploit_info = {"exploit_available": False, "exploit_sources": []} if check_exploits and references: exploit_info = detect_exploit_availability(references) if exploit_info["exploit_available"]: # Elevate score if public exploits exist if score == "low": score = "medium" elif score == "medium": score = "high" elif score == "unknown" and cvss_score > 0: # If we have some CVSS score but it was unknown, upgrade to at least medium score = "medium" exploit_count = len(exploit_info["exploit_sources"]) source_suffix = "s" if exploit_count > 1 else "" rationale_parts.append( f"public exploit available ({exploit_count} source{source_suffix})" ) # Build rationale string rationale = "; ".join(rationale_parts[:5]) # Limit to first 5 parts for context result = { "cve_id": cve_id, "exploitability_score": score, "exploitability_rationale": rationale, "attack_vector_analysis": attack_analysis } # Include exploit info if check_exploits was enabled if check_exploits: result["exploit_detection"] = exploit_info return result def run_test_cases(): """ Run comprehensive test cases for attack vector analysis. Tests CVSS vector parsing and attack vector analysis logic. """ test_cases = [ { "name": "CVSS 3.1 - Network accessible, no auth, no UI (critical)", "cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "expected": { "is_network_accessible": True, "requires_authentication": False, "requires_user_interaction": False, "complexity": "low" } }, { "name": "CVSS 3.1 - Network accessible, requires auth", "cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H", "expected": { "is_network_accessible": True, "requires_authentication": True, "requires_user_interaction": False, "complexity": "low" } }, { "name": "CVSS 3.1 - Network accessible, requires UI", "cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:H/I:H/A:H", "expected": { "is_network_accessible": True, "requires_authentication": False, "requires_user_interaction": True, "complexity": "low" } }, { "name": "CVSS 3.1 - Local access required", "cvss_vector": "CVSS:3.1/AV:L/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "expected": { "is_network_accessible": False, "requires_authentication": False, "requires_user_interaction": False, "complexity": "low" } }, { "name": "CVSS 3.1 - Adjacent network, high auth", "cvss_vector": "CVSS:3.1/AV:A/AC:H/PR:H/UI:R/S:U/C:L/I:L/A:L", "expected": { "is_network_accessible": True, "requires_authentication": True, "requires_user_interaction": True, "complexity": "high" } }, { "name": "CVSS 3.0 - Physical access required", "cvss_vector": "CVSS:3.0/AV:P/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "expected": { "is_network_accessible": False, "requires_authentication": False, "requires_user_interaction": False, "complexity": "low" } }, { "name": "CVSS v2 - Network, no auth required", "cvss_vector": "(AV:N/AC:L/Au:N/C:C/I:C/A:C)", "expected": { "is_network_accessible": True, "requires_authentication": False, "requires_user_interaction": True, # v2 doesn't have UI, defaults to True "complexity": "low" } }, { "name": "CVSS v2 - Network, single auth", "cvss_vector": "AV:N/AC:M/Au:SINGLE/C:P/I:P/A:P", "expected": { "is_network_accessible": True, "requires_authentication": True, "requires_user_interaction": True, # v2 doesn't have UI, defaults to True "complexity": "high" } }, { "name": "CVSS v2 - Local access, multiple auth", "cvss_vector": "(AV:L/AC:L/Au:MULTIPLE/C:C/I:C/A:C)", "expected": { "is_network_accessible": False, "requires_authentication": True, "requires_user_interaction": True, # v2 doesn't have UI, defaults to True "complexity": "low" } }, { "name": "Empty CVSS vector", "cvss_vector": "", "expected": { "is_network_accessible": False, "requires_authentication": True, "requires_user_interaction": True, "complexity": "unknown" } } ] print("Running attack vector analysis test cases...") print("=" * 70) passed = 0 failed = 0 for i, test in enumerate(test_cases, 1): print(f"\nTest {i}/{len(test_cases)}: {test['name']}") print(f" CVSS Vector: {test['cvss_vector']}") # Parse CVSS vector and analyze attack vector cvss_metrics = parse_cvss_vector(test['cvss_vector']) result = analyze_attack_vector(cvss_metrics) # Compare with expected results test_passed = True for key, expected_value in test['expected'].items(): actual_value = result.get(key) if actual_value != expected_value: print(f" āŒ FAILED: {key}") print(f" Expected: {expected_value}") print(f" Got: {actual_value}") test_passed = False failed += 1 break if test_passed: print(" āœ“ PASSED") passed += 1 else: # Show full result for debugging print(f" Full result: {json.dumps(result, indent=6)}") print("\n" + "=" * 70) print(f"Test Results: {passed} passed, {failed} failed out of {len(test_cases)} total") if failed > 0: print("\nāŒ Some tests failed!") sys.exit(1) else: print("\nāœ… All tests passed!") sys.exit(0) def main(): parser = argparse.ArgumentParser( description="Analyze CVE exploitability for OpenClaw/NanoClaw deployments", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Analyze from JSON stdin echo '{"cve_id":"CVE-2026-27488","cvss_score":7.3,"type":"ssrf"}' | python utils/analyze_exploitability.py --json # Analyze with CVSS vector echo '{"cve_id":"CVE-2026-1234","cvss_score":9.8,"cvss_vector":"CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H"}' \ | python utils/analyze_exploitability.py --json # Run test cases python utils/analyze_exploitability.py --test-cases # Parse CVSS vector only python utils/analyze_exploitability.py --parse-vector "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" """ ) parser.add_argument( "--json", action="store_true", help="Read CVE data from stdin as JSON and output analysis" ) parser.add_argument( "--parse-vector", type=str, metavar="VECTOR", help="Parse and display CVSS vector string" ) parser.add_argument( "--test-cases", action="store_true", help="Run built-in test cases to verify analyzer logic" ) parser.add_argument( "--check-exploits", action="store_true", help="Check references for publicly available exploits and adjust score accordingly" ) args = parser.parse_args() # Handle --parse-vector if args.parse_vector: metrics = parse_cvss_vector(args.parse_vector) print(json.dumps(metrics, indent=2)) sys.exit(0) # Handle --test-cases if args.test_cases: run_test_cases() sys.exit(0) # Handle --json (stdin) if args.json: try: cve_data = json.load(sys.stdin) except json.JSONDecodeError as e: print(f"Error: Invalid JSON input: {e}", file=sys.stderr) sys.exit(1) result = analyze_exploitability(cve_data, check_exploits=args.check_exploits) print(json.dumps(result, indent=2)) sys.exit(0) # No action specified - show help parser.print_help() sys.exit(0) if __name__ == "__main__": main()