Source code for mcpcap.modules.dhcp

"""DHCP analysis module."""

from typing import Any

from fastmcp import FastMCP
from scapy.all import BOOTP, DHCP, IP, UDP, rdpcap

from .base import BaseModule


[docs] class DHCPModule(BaseModule): """Module for analyzing DHCP packets in PCAP files.""" @property def protocol_name(self) -> str: """Return the name of the protocol this module analyzes.""" return "DHCP"
[docs] def analyze_dhcp_packets(self, pcap_file: str) -> dict[str, Any]: """ Analyze DHCP packets from a PCAP file and return comprehensive analysis results. FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through Claude's web interface. Files must be accessible via URL or local file path. SUPPORTED INPUT FORMATS: - Remote files: "https://example.com/capture.pcap" - Local files: "/absolute/path/to/capture.pcap" UNSUPPORTED: - Files uploaded through Claude's file upload feature - Base64 file content - Relative file paths Args: pcap_file: HTTP URL or absolute local file path to PCAP file Returns: A structured dictionary containing DHCP packet analysis results """ return self.analyze_packets(pcap_file)
def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]: """Perform the actual DHCP packet analysis on a local PCAP file.""" try: packets = rdpcap(pcap_file) dhcp_packets = [pkt for pkt in packets if pkt.haslayer(BOOTP)] if not dhcp_packets: return { "file": pcap_file, "total_packets": len(packets), "dhcp_packets_found": 0, "message": "No DHCP packets found in this capture", } # Apply max_packets limit if specified packets_to_analyze = dhcp_packets limited = False if self.config.max_packets and len(dhcp_packets) > self.config.max_packets: packets_to_analyze = dhcp_packets[: self.config.max_packets] limited = True packet_details = [] for i, pkt in enumerate(packets_to_analyze, 1): packet_info = self._analyze_dhcp_packet(pkt, i) packet_details.append(packet_info) # Generate statistics stats = self._generate_statistics(packet_details) result = { "file": pcap_file, "total_packets": len(packets), "dhcp_packets_found": len(dhcp_packets), "dhcp_packets_analyzed": len(packet_details), "statistics": stats, "packets": packet_details, } # Add information about packet limiting if limited: result["note"] = ( f"Analysis limited to first {self.config.max_packets} DHCP packets due to --max-packets setting" ) return result except Exception as e: return { "error": f"Error reading PCAP file '{pcap_file}': {str(e)}", "file": pcap_file, } def _analyze_dhcp_packet(self, packet, packet_num: int) -> dict[str, Any]: """Analyze a single DHCP packet and extract relevant information.""" info = { "packet_number": packet_num, "timestamp": packet.time, } # Basic IP information if packet.haslayer(IP): ip_layer = packet[IP] info.update( { "src_ip": ip_layer.src, "dst_ip": ip_layer.dst, } ) # UDP information if packet.haslayer(UDP): udp_layer = packet[UDP] info.update( { "src_port": udp_layer.sport, "dst_port": udp_layer.dport, } ) # BOOTP/DHCP information if packet.haslayer(BOOTP): bootp_layer = packet[BOOTP] info.update( { "op": "Request" if bootp_layer.op == 1 else "Reply", "transaction_id": f"0x{bootp_layer.xid:08x}", "client_ip": str(bootp_layer.ciaddr), "your_ip": str(bootp_layer.yiaddr), "server_ip": str(bootp_layer.siaddr), "relay_ip": str(bootp_layer.giaddr), "client_mac": bootp_layer.chaddr[:6].hex(":"), } ) # DHCP options analysis if packet.haslayer(DHCP): dhcp_layer = packet[DHCP] dhcp_info = self._parse_dhcp_options(dhcp_layer.options) info.update(dhcp_info) return info def _parse_dhcp_options(self, options) -> dict[str, Any]: """Parse DHCP options and return structured information.""" dhcp_info = {"options": {}} # DHCP message type mappings message_types = { 1: "DISCOVER", 2: "OFFER", 3: "REQUEST", 4: "DECLINE", 5: "ACK", 6: "NAK", 7: "RELEASE", 8: "INFORM", } for opt in options: if isinstance(opt, tuple) and len(opt) == 2: key, value = opt if key == "message-type": dhcp_info["message_type"] = message_types.get( value, f"Unknown({value})" ) dhcp_info["message_type_code"] = value elif key == "lease_time": dhcp_info["lease_time"] = value dhcp_info["options"]["lease_time"] = f"{value} seconds" elif key == "renewal_time": dhcp_info["renewal_time"] = value dhcp_info["options"]["renewal_time"] = f"{value} seconds" elif key == "rebinding_time": dhcp_info["rebinding_time"] = value dhcp_info["options"]["rebinding_time"] = f"{value} seconds" elif key == "server_id": dhcp_info["server_id"] = str(value) elif key == "subnet_mask": dhcp_info["options"]["subnet_mask"] = str(value) elif key == "router": dhcp_info["options"]["router"] = str(value) elif key == "name_server": dhcp_info["options"]["dns_servers"] = str(value) elif key == "requested_addr": dhcp_info["requested_ip"] = str(value) elif key == "client_id": if isinstance(value, bytes): dhcp_info["client_id"] = value.hex(":") else: dhcp_info["client_id"] = str(value) elif key == "param_req_list": dhcp_info["options"]["parameter_request_list"] = list(value) else: # Store other options as strings dhcp_info["options"][key] = str(value) return dhcp_info def _generate_statistics(self, packets: list[dict[str, Any]]) -> dict[str, Any]: """Generate statistics from analyzed DHCP packets.""" stats = { "unique_clients": set(), "unique_servers": set(), "message_types": {}, "transactions": {}, } for pkt in packets: # Count unique clients and servers if "client_mac" in pkt: stats["unique_clients"].add(pkt["client_mac"]) if "server_id" in pkt: stats["unique_servers"].add(pkt["server_id"]) # Count message types if "message_type" in pkt: msg_type = pkt["message_type"] stats["message_types"][msg_type] = ( stats["message_types"].get(msg_type, 0) + 1 ) # Track transactions if "transaction_id" in pkt: tx_id = pkt["transaction_id"] if tx_id not in stats["transactions"]: stats["transactions"][tx_id] = [] stats["transactions"][tx_id].append( { "packet_number": pkt["packet_number"], "message_type": pkt.get("message_type", "Unknown"), "timestamp": pkt["timestamp"], } ) # Convert sets to counts for JSON serialization return { "unique_clients_count": len(stats["unique_clients"]), "unique_servers_count": len(stats["unique_servers"]), "unique_clients": list(stats["unique_clients"]), "unique_servers": list(stats["unique_servers"]), "message_type_counts": stats["message_types"], "transaction_count": len(stats["transactions"]), "transactions": stats["transactions"], }
[docs] def setup_prompts(self, mcp: FastMCP) -> None: """Set up DHCP-specific analysis prompts for the MCP server. Args: mcp: FastMCP server instance """ @mcp.prompt def dhcp_network_analysis(): """Prompt for analyzing DHCP traffic from a network perspective""" return """You are a network administrator analyzing DHCP traffic. Focus your analysis on: 1. **IP Address Management:** - Track DHCP lease assignments and renewals - Identify IP address pool usage and availability - Look for lease conflicts or duplicate IP assignments - Monitor lease duration patterns and optimization 2. **Client Behavior Analysis:** - Identify DHCP client types and operating systems - Track client renewal patterns and timing - Look for unusual client behavior (rapid requests, failures) - Monitor mobile devices vs. static clients 3. **Server Performance:** - Analyze DHCP server response times - Look for server failures or timeouts - Check for multiple DHCP servers (potential conflicts) - Monitor server load and capacity planning 4. **Network Troubleshooting:** - Identify DHCP DISCOVER floods or storms - Look for relay agent issues - Check for scope exhaustion problems - Find misconfigurations in options delivery Provide specific recommendations for network optimization and problem resolution.""" @mcp.prompt def dhcp_security_analysis(): """Prompt for analyzing DHCP traffic from a security perspective""" return """You are a security analyst examining DHCP traffic for threats. Focus on: 1. **Rogue DHCP Detection:** - Identify unauthorized DHCP servers on the network - Look for conflicting server responses or options - Check for suspicious server_id values - Monitor for DHCP server impersonation attempts 2. **DHCP Attacks:** - Detect DHCP starvation attacks (rapid lease consumption) - Look for DHCP spoofing or man-in-the-middle attempts - Identify abnormal request patterns or frequencies - Check for MAC address spoofing in client requests 3. **Client Anomalies:** - Find clients with suspicious hostnames or identifiers - Look for rapid MAC address changes from single sources - Detect clients requesting unusual or dangerous options - Monitor for clients bypassing expected DHCP flow 4. **Data Exfiltration Risks:** - Check for unusual data in DHCP option fields - Look for DNS server redirection attempts - Monitor for suspicious router or gateway assignments - Identify potential DNS tunneling via DHCP options Provide threat assessment and recommended security controls.""" @mcp.prompt def dhcp_forensic_investigation(): """Prompt for forensic analysis of DHCP traffic""" return """You are conducting a digital forensics investigation involving DHCP traffic. Approach systematically: 1. **Timeline Reconstruction:** - Create chronological sequence of DHCP transactions - Map client activity periods and network presence - Correlate DHCP assignments with incident timeframes - Track device mobility through IP address changes 2. **Device Attribution:** - Link MAC addresses to specific devices or users - Track device behavior patterns across time - Identify device types through DHCP fingerprinting - Document lease history for evidence correlation 3. **Network Mapping:** - Reconstruct network topology from DHCP data - Identify network segments and VLAN assignments - Map DHCP server infrastructure and relationships - Document network configuration at incident time 4. **Evidence Collection:** - Extract client identifiers and hostnames - Document IP address assignments for legal proceedings - Preserve transaction timing for event correlation - Collect option data that may contain forensic artifacts Present findings with precise timestamps, evidence preservation notes, and clear documentation suitable for legal proceedings."""