Source code for mcpcap.modules.icmp

"""ICMP analysis module."""

from datetime import datetime
from typing import Any

from fastmcp import FastMCP
from scapy.all import ICMP, IP, IPv6, rdpcap

from .base import BaseModule


[docs] class ICMPModule(BaseModule): """Module for analyzing ICMP packets in PCAP files.""" @property def protocol_name(self) -> str: """Return the name of the protocol this module analyzes.""" return "ICMP"
[docs] def analyze_icmp_packets(self, pcap_file: str) -> dict[str, Any]: """ Analyze ICMP packets from a PCAP file and return comprehensive analysis results. FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through Claude's web interface. Files must be accessible via URL or local file path. SUPPORTED INPUT FORMATS: - Remote files: "https://example.com/capture.pcap" - Local files: "/absolute/path/to/capture.pcap" UNSUPPORTED: - Files uploaded through Claude's file upload feature - Base64 file content - Relative file paths Args: pcap_file: HTTP URL or absolute local file path to PCAP file Returns: A structured dictionary containing ICMP packet analysis results """ return self.analyze_packets(pcap_file)
def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]: """Perform the actual ICMP packet analysis on a local PCAP file.""" try: packets = rdpcap(pcap_file) icmp_packets = [pkt for pkt in packets if pkt.haslayer(ICMP)] if not icmp_packets: return { "file": pcap_file, "total_packets": len(packets), "icmp_packets_found": 0, "message": "No ICMP packets found in this capture", } # Apply max_packets limit if specified packets_to_analyze = icmp_packets limited = False if self.config.max_packets and len(icmp_packets) > self.config.max_packets: packets_to_analyze = icmp_packets[: self.config.max_packets] limited = True packet_details = [] for i, pkt in enumerate(packets_to_analyze, 1): packet_info = self._analyze_icmp_packet(pkt, i) packet_details.append(packet_info) # Generate statistics stats = self._generate_statistics(packet_details) result = { "file": pcap_file, "analysis_timestamp": datetime.now().isoformat(), "total_packets": len(packets), "icmp_packets_found": len(icmp_packets), "icmp_packets_analyzed": len(packet_details), "statistics": stats, "packets": packet_details, } if limited: result["note"] = ( f"Analysis limited to first {self.config.max_packets} ICMP packets due to --max-packets setting" ) return result except Exception as e: return { "error": f"Error reading PCAP file '{pcap_file}': {str(e)}", "file": pcap_file, } def _analyze_icmp_packet(self, packet, packet_num: int) -> dict[str, Any]: """Analyze a single ICMP packet and extract relevant information.""" info = { "packet_number": packet_num, "timestamp": datetime.fromtimestamp(float(packet.time)).isoformat(), } # Basic IP information if packet.haslayer(IP): ip_layer = packet[IP] info.update( { "src_ip": ip_layer.src, "dst_ip": ip_layer.dst, "ip_version": 4, "ttl": ip_layer.ttl, "packet_size": len(packet), } ) elif packet.haslayer(IPv6): ipv6_layer = packet[IPv6] info.update( { "src_ip": ipv6_layer.src, "dst_ip": ipv6_layer.dst, "ip_version": 6, "hop_limit": ipv6_layer.hlim, "packet_size": len(packet), } ) # ICMP information if packet.haslayer(ICMP): icmp_layer = packet[ICMP] # Map ICMP types to human-readable names icmp_types = { 0: "Echo Reply", 3: "Destination Unreachable", 4: "Source Quench", 5: "Redirect", 8: "Echo Request", 11: "Time Exceeded", 12: "Parameter Problem", 13: "Timestamp Request", 14: "Timestamp Reply", 15: "Information Request", 16: "Information Reply", } # Map destination unreachable codes dest_unreach_codes = { 0: "Network Unreachable", 1: "Host Unreachable", 2: "Protocol Unreachable", 3: "Port Unreachable", 4: "Fragmentation Required", 5: "Source Route Failed", } # Map time exceeded codes time_exceeded_codes = { 0: "TTL Exceeded in Transit", 1: "Fragment Reassembly Time Exceeded", } icmp_type = icmp_layer.type icmp_code = icmp_layer.code info.update( { "icmp_type": icmp_type, "icmp_code": icmp_code, "icmp_type_name": icmp_types.get( icmp_type, f"Unknown Type ({icmp_type})" ), "icmp_id": getattr(icmp_layer, "id", None), "icmp_seq": getattr(icmp_layer, "seq", None), "checksum": icmp_layer.chksum, } ) # Add code descriptions for specific types if icmp_type == 3: # Destination Unreachable info["icmp_code_name"] = dest_unreach_codes.get( icmp_code, f"Unknown Code ({icmp_code})" ) elif icmp_type == 11: # Time Exceeded info["icmp_code_name"] = time_exceeded_codes.get( icmp_code, f"Unknown Code ({icmp_code})" ) else: info["icmp_code_name"] = f"Code {icmp_code}" return info def _generate_statistics(self, packets: list[dict[str, Any]]) -> dict[str, Any]: """Generate statistics from analyzed ICMP packets.""" stats = { "icmp_types": {}, "unique_sources": set(), "unique_destinations": set(), "echo_pairs": {}, "unreachable_destinations": set(), } for pkt in packets: # Count ICMP types if "icmp_type_name" in pkt: type_name = pkt["icmp_type_name"] stats["icmp_types"][type_name] = ( stats["icmp_types"].get(type_name, 0) + 1 ) # Track unique IPs if "src_ip" in pkt: stats["unique_sources"].add(pkt["src_ip"]) if "dst_ip" in pkt: stats["unique_destinations"].add(pkt["dst_ip"]) # Track echo request/reply pairs if pkt.get("icmp_type") in [0, 8] and pkt.get("icmp_id") is not None: echo_id = pkt["icmp_id"] if echo_id not in stats["echo_pairs"]: stats["echo_pairs"][echo_id] = {"requests": 0, "replies": 0} if pkt["icmp_type"] == 8: # Echo Request stats["echo_pairs"][echo_id]["requests"] += 1 elif pkt["icmp_type"] == 0: # Echo Reply stats["echo_pairs"][echo_id]["replies"] += 1 # Track unreachable destinations if pkt.get("icmp_type") == 3: # Destination Unreachable if "dst_ip" in pkt: stats["unreachable_destinations"].add(pkt["dst_ip"]) # Convert sets to lists for JSON serialization return { "icmp_type_counts": stats["icmp_types"], "unique_sources_count": len(stats["unique_sources"]), "unique_destinations_count": len(stats["unique_destinations"]), "unique_sources": list(stats["unique_sources"]), "unique_destinations": list(stats["unique_destinations"]), "echo_sessions": len(stats["echo_pairs"]), "echo_pairs": stats["echo_pairs"], "unreachable_destinations_count": len(stats["unreachable_destinations"]), "unreachable_destinations": list(stats["unreachable_destinations"]), }
[docs] def setup_prompts(self, mcp: FastMCP) -> None: """Set up ICMP-specific analysis prompts for the MCP server.""" @mcp.prompt def icmp_network_diagnostics(): """Prompt for analyzing ICMP traffic from a network diagnostics perspective""" return """You are a network engineer analyzing ICMP traffic for network diagnostics. Focus on: 1. **Connectivity Testing:** - Analyze ping (echo request/reply) patterns and success rates - Identify network reachability issues from failed pings - Check ping response times and latency patterns - Look for asymmetric routing issues 2. **Network Path Analysis:** - Examine TTL values and time exceeded messages for traceroute data - Identify network hops and routing paths - Look for routing loops or suboptimal paths - Check for fragmentation issues 3. **Error Diagnostics:** - Analyze destination unreachable messages and their causes - Identify network, host, protocol, or port unreachability - Look for fragmentation required messages - Check source quench messages for congestion 4. **Network Health Assessment:** - Monitor ICMP message frequencies and patterns - Identify potential network congestion indicators - Look for unusual ICMP traffic that might indicate problems - Assess overall network responsiveness Provide specific recommendations for network troubleshooting and optimization.""" @mcp.prompt def icmp_security_analysis(): """Prompt for analyzing ICMP traffic from a security perspective""" return """You are a security analyst examining ICMP traffic for threats and anomalies. Focus on: 1. **Reconnaissance Detection:** - Identify ping sweeps and network scanning activities - Look for systematic probing of network ranges - Check for unusual ping patterns that might indicate reconnaissance - Monitor for traceroute-based network mapping 2. **Covert Channel Analysis:** - Examine ICMP payloads for potential data exfiltration - Look for unusual ICMP packet sizes or timing patterns - Check for non-standard ICMP types or codes - Identify potential ICMP tunneling activities 3. **DoS Attack Detection:** - Monitor for ICMP flood attacks (ping floods) - Look for smurf attack patterns (broadcast ping amplification) - Check for fragmentation attacks using ICMP - Identify potential death of death or similar attacks 4. **Policy Compliance:** - Verify ICMP traffic matches network security policies - Check for unauthorized ICMP types (if policy restricts certain types) - Monitor for ICMP traffic from unexpected sources - Identify potential firewall bypass attempts Provide threat assessment and recommended security controls.""" @mcp.prompt def icmp_forensic_investigation(): """Prompt for forensic analysis of ICMP traffic""" return """You are conducting a digital forensics investigation involving ICMP traffic. Approach systematically: 1. **Timeline Reconstruction:** - Create chronological sequence of ICMP events - Map ping activities to potential user or system actions - Correlate ICMP traffic with incident timeframes - Track network connectivity patterns over time 2. **Attribution and Tracking:** - Trace ICMP traffic to source systems and networks - Identify systems involved in ping exchanges - Map network topology from traceroute data - Document unique identifiers in ICMP packets 3. **Evidence Collection:** - Preserve all ICMP packet details with precise timestamps - Document network paths and hop information - Record error messages and their contexts - Note any unusual or suspicious ICMP characteristics 4. **Impact Assessment:** - Determine scope of network reconnaissance or scanning - Assess potential information leakage through ICMP - Identify systems that may have been probed - Evaluate ongoing security implications Present findings with precise timestamps, evidence preservation notes, and clear documentation suitable for legal proceedings."""