Source code for mcpcap.modules.sip

"""SIP analysis module."""

from collections import Counter
from datetime import datetime
from typing import Any

from fastmcp import FastMCP
from scapy.all import IP, TCP, UDP, IPv6, Raw, rdpcap

from .base import BaseModule

SIP_METHODS = {
    "ACK",
    "BYE",
    "CANCEL",
    "INFO",
    "INVITE",
    "MESSAGE",
    "NOTIFY",
    "OPTIONS",
    "PRACK",
    "PUBLISH",
    "REFER",
    "REGISTER",
    "SUBSCRIBE",
    "UPDATE",
}
SIP_PORTS = {5060, 5061}


[docs] class SIPModule(BaseModule): """Module for analyzing SIP packets in PCAP files.""" @property def protocol_name(self) -> str: """Return the name of the protocol this module analyzes.""" return "SIP"
[docs] def analyze_sip_packets(self, pcap_file: str) -> dict[str, Any]: """ Analyze SIP packets from a PCAP file and return structured signaling details. FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through Claude's web interface. Files must be accessible via URL or local file path. SUPPORTED INPUT FORMATS: - Remote files: "https://example.com/capture.pcap" - Local files: "/absolute/path/to/capture.pcap" UNSUPPORTED: - Files uploaded through Claude's file upload feature - Base64 file content - Relative file paths Args: pcap_file: HTTP URL or absolute local file path to PCAP file Returns: A structured dictionary containing SIP packet analysis results """ return self.analyze_packets(pcap_file)
def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]: """Perform the actual SIP packet analysis on a local PCAP file.""" try: packets = rdpcap(pcap_file) sip_packets = [pkt for pkt in packets if self._is_sip_packet(pkt)] if not sip_packets: return { "file": pcap_file, "total_packets_in_file": len(packets), "sip_packets_found": 0, "message": "No SIP packets found in this capture", } packets_to_analyze = sip_packets limited = False if self.config.max_packets and len(sip_packets) > self.config.max_packets: packets_to_analyze = sip_packets[: self.config.max_packets] limited = True packet_details = [ self._analyze_sip_packet(pkt, packet_number) for packet_number, pkt in enumerate(packets_to_analyze, 1) ] stats = self._generate_statistics(packet_details) result = { "file": pcap_file, "analysis_timestamp": datetime.now().isoformat(), "total_packets_in_file": len(packets), "sip_packets_found": len(sip_packets), "sip_packets_analyzed": len(packet_details), "statistics": stats, "packets": packet_details, } if limited: result["note"] = ( f"Analysis limited to first {self.config.max_packets} SIP packets due to --max-packets setting" ) return result except Exception as e: return { "error": f"Error reading PCAP file '{pcap_file}': {str(e)}", "file": pcap_file, } def _is_sip_packet(self, pkt: Any) -> bool: """Check whether a packet contains SIP payload data.""" if not pkt.haslayer(Raw): return False if not pkt.haslayer(UDP) and not pkt.haslayer(TCP): return False payload = bytes(pkt[Raw].load) if not payload: return False return self._is_sip_payload(payload) def _is_sip_payload(self, payload: bytes) -> bool: """Check whether payload bytes look like a SIP message.""" try: first_line = ( payload.decode("utf-8", errors="ignore").splitlines()[0].strip() ) except IndexError: return False if first_line.startswith("SIP/2.0 "): return True method = first_line.split(" ", 1)[0].upper() return method in SIP_METHODS def _analyze_sip_packet(self, pkt: Any, packet_number: int) -> dict[str, Any]: """Analyze a single SIP packet.""" payload = bytes(pkt[Raw].load).decode("utf-8", errors="replace") start_line, headers, body = self._parse_sip_message(payload) src_ip, dst_ip = self._extract_ips(pkt) transport, src_port, dst_port = self._extract_transport(pkt) message_type, parsed_message = self._parse_start_line(start_line) via_header = headers.get("via", "") content_length = self._safe_int(headers.get("content-length")) known_port_match = src_port in SIP_PORTS or dst_port in SIP_PORTS packet_info = { "packet_number": packet_number, "timestamp": datetime.fromtimestamp(float(pkt.time)).isoformat(), "source_ip": src_ip, "destination_ip": dst_ip, "source_port": src_port, "destination_port": dst_port, "transport": transport, "message_type": message_type, "start_line": start_line, "call_id": headers.get("call-id", ""), "cseq": headers.get("cseq", ""), "from": headers.get("from", ""), "to": headers.get("to", ""), "contact": headers.get("contact", ""), "user_agent": headers.get("user-agent", ""), "server": headers.get("server", ""), "via": via_header, "content_type": headers.get("content-type", ""), "content_length": content_length, "body_length": len(body.encode("utf-8")), "known_sip_port": known_port_match, "headers": headers, "summary": pkt.summary(), } packet_info.update(parsed_message) return packet_info def _parse_sip_message(self, payload: str) -> tuple[str, dict[str, str], str]: """Parse a SIP message into start line, headers, and body.""" normalized = payload.replace("\r\n", "\n").replace("\r", "\n") header_part, _, body = normalized.partition("\n\n") header_lines = [line for line in header_part.split("\n") if line.strip()] start_line = header_lines[0].strip() if header_lines else "" headers: dict[str, str] = {} current_header: str | None = None for line in header_lines[1:]: if line.startswith((" ", "\t")) and current_header: headers[current_header] = f"{headers[current_header]} {line.strip()}" continue if ":" not in line: continue key, value = line.split(":", 1) normalized_key = key.strip().lower() headers[normalized_key] = value.strip() current_header = normalized_key return start_line, headers, body def _parse_start_line(self, start_line: str) -> tuple[str, dict[str, Any]]: """Parse the SIP start line into either request or response data.""" if start_line.startswith("SIP/2.0 "): parts = start_line.split(" ", 2) status_code = self._safe_int(parts[1]) if len(parts) > 1 else None return "response", { "status_code": status_code, "reason_phrase": parts[2] if len(parts) > 2 else "", } parts = start_line.split(" ", 2) method = parts[0].upper() if parts else "" return "request", { "method": method, "request_uri": parts[1] if len(parts) > 1 else "", "sip_version": parts[2] if len(parts) > 2 else "", } def _extract_ips(self, pkt: Any) -> tuple[str, str]: """Extract source and destination IP addresses.""" if pkt.haslayer(IP): return pkt[IP].src, pkt[IP].dst if pkt.haslayer(IPv6): return pkt[IPv6].src, pkt[IPv6].dst return "unknown", "unknown" def _extract_transport(self, pkt: Any) -> tuple[str, int | None, int | None]: """Extract transport protocol and ports.""" if pkt.haslayer(UDP): return "UDP", pkt[UDP].sport, pkt[UDP].dport if pkt.haslayer(TCP): return "TCP", pkt[TCP].sport, pkt[TCP].dport return "unknown", None, None def _safe_int(self, value: str | None) -> int | None: """Convert a header value to int when possible.""" if value is None: return None try: return int(value.strip().split(" ", 1)[0]) except (TypeError, ValueError, AttributeError): return None def _generate_statistics(self, packets: list[dict[str, Any]]) -> dict[str, Any]: """Generate SIP-specific statistics from analyzed packets.""" requests = [packet for packet in packets if packet["message_type"] == "request"] responses = [ packet for packet in packets if packet["message_type"] == "response" ] method_counts = Counter(packet.get("method", "") for packet in requests) response_classes = Counter() for packet in responses: status_code = packet.get("status_code") if isinstance(status_code, int): response_classes[f"{status_code // 100}xx"] += 1 call_ids = {packet["call_id"] for packet in packets if packet.get("call_id")} transports = Counter(packet["transport"] for packet in packets) user_agents = sorted( {packet["user_agent"] for packet in packets if packet.get("user_agent")} ) return { "requests": len(requests), "responses": len(responses), "methods": dict(sorted(method_counts.items())), "response_classes": dict(sorted(response_classes.items())), "unique_call_ids": len(call_ids), "call_ids": sorted(call_ids), "transports": dict(sorted(transports.items())), "user_agents": user_agents, }
[docs] def setup_prompts(self, mcp: FastMCP) -> None: """Set up SIP-specific prompts for the MCP server.""" @mcp.prompt def sip_security_analysis(): """Prompt for reviewing SIP traffic from a security perspective.""" return """You are analyzing SIP signaling traffic for security issues. Focus on: 1. Authentication failures, brute-force registration attempts, or credential misuse. 2. Unusual call setup patterns, suspicious destinations, or unexpected SIP methods. 3. Indicators of toll fraud, rogue endpoints, or malformed signaling. 4. Exposure of internal addressing, software banners, or topology information. 5. Concrete packet-level evidence and any missing context needed for confidence.""" @mcp.prompt def sip_troubleshooting_analysis(): """Prompt for troubleshooting SIP signaling behavior.""" return """You are troubleshooting SIP signaling. Focus on: 1. Call setup progression across INVITE, provisional responses, final responses, ACK, BYE, and CANCEL. 2. Registration success or failure, including CSeq progression and response codes. 3. Transport or addressing mismatches visible in Via, Contact, From, and To headers. 4. Error response patterns such as 4xx, 5xx, or 6xx classes and the point where signaling fails. 5. Concise next-step hypotheses grounded only in the capture contents.""" @mcp.prompt def sip_forensic_investigation(): """Prompt for reconstructing SIP activity for forensic work.""" return """You are reconstructing SIP activity for a forensic investigation. Focus on: 1. Building a timeline of registrations, call attempts, responses, and terminations. 2. Correlating traffic by Call-ID, CSeq, source/destination IP, and transport. 3. Identifying the apparent user agents, servers, and contacted SIP URIs. 4. Highlighting failed calls, repeated attempts, and notable response codes. 5. Preserving uncertainty explicitly when fields are missing or ambiguous."""