Source code for mcpcap.modules.tcp

"""TCP analysis module."""

from collections import defaultdict
from datetime import datetime
from typing import Any

from fastmcp import FastMCP
from scapy.all import IP, TCP, IPv6, rdpcap

from .base import BaseModule


[docs] class TCPModule(BaseModule): """Module for analyzing TCP packets in PCAP files.""" @property def protocol_name(self) -> str: """Return the name of the protocol this module analyzes.""" return "TCP"
[docs] def analyze_tcp_connections( self, pcap_file: str, server_ip: str | None = None, server_port: int | None = None, detailed: bool = False, ) -> dict[str, Any]: """ Analyze TCP connection states and lifecycle. This is the core tool for TCP connection analysis, solving 80% of TCP-related issues. FILE UPLOAD LIMITATION: This MCP tool cannot process files uploaded through Claude's web interface. Files must be accessible via URL or local file path. SUPPORTED INPUT FORMATS: - Remote files: "https://example.com/capture.pcap" - Local files: "/absolute/path/to/capture.pcap" UNSUPPORTED: - Files uploaded through Claude's file upload feature - Base64 file content - Relative file paths Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Optional filter for server IP address server_port: Optional filter for server port detailed: Whether to return detailed connection information Returns: A structured dictionary containing TCP connection analysis results including: - summary: Overall connection statistics - connections: List of individual connections with states - issues: Detected problems """ return self.analyze_packets( pcap_file, analysis_type="connections", server_ip=server_ip, server_port=server_port, detailed=detailed, )
[docs] def analyze_tcp_anomalies( self, pcap_file: str, server_ip: str | None = None, server_port: int | None = None, ) -> dict[str, Any]: """ Detect TCP traffic patterns through statistical analysis. This tool analyzes TCP traffic to identify observable patterns without making assumptions about root causes. It provides factual metrics and pattern detection that can be used for further investigation. Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Optional filter for server IP address server_port: Optional filter for server port Returns: A structured dictionary containing: - statistics: Comprehensive TCP metrics (handshakes, flags, RST distribution, etc.) - patterns: Observable patterns detected in the traffic - summary: High-level summary of findings Detected pattern categories: - connection_establishment: Handshake success/failure rates, SYN response ratios - connection_termination: RST distribution, normal vs abnormal closes - reliability: Retransmission rates, packet loss indicators - connection_lifecycle: Connection state transitions The analysis is purely observational - it reports what is seen in the traffic without attempting to diagnose specific issues like "firewall block" or "network congestion". This allows the data to be interpreted in context. """ return self.analyze_packets( pcap_file, analysis_type="anomalies", server_ip=server_ip, server_port=server_port, )
[docs] def analyze_tcp_retransmissions( self, pcap_file: str, server_ip: str | None = None, threshold: float = 0.02, ) -> dict[str, Any]: """ Analyze TCP retransmission patterns. Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Optional filter for server IP address threshold: Retransmission rate threshold (default: 2%) Returns: A structured dictionary containing: - total_retransmissions: Total number of retransmissions - retransmission_rate: Overall retransmission rate - by_connection: Per-connection retransmission statistics - summary: Worst connections and threshold violations """ return self.analyze_packets( pcap_file, analysis_type="retransmissions", server_ip=server_ip, threshold=threshold, )
[docs] def analyze_traffic_flow( self, pcap_file: str, server_ip: str, server_port: int | None = None, ) -> dict[str, Any]: """ Analyze bidirectional traffic flow characteristics. Identifies traffic direction, asymmetry, RST sources, and data transfer patterns. Args: pcap_file: HTTP URL or absolute local file path to PCAP file server_ip: Server IP address (required) server_port: Optional filter for server port Returns: A structured dictionary containing: - client_to_server: Client-to-server traffic statistics - server_to_client: Server-to-client traffic statistics - analysis: Asymmetry analysis and interpretations """ return self.analyze_packets( pcap_file, analysis_type="traffic_flow", server_ip=server_ip, server_port=server_port, )
[docs] def analyze_packets( self, pcap_file: str, analysis_type: str = "connections", **kwargs ) -> dict[str, Any]: """Analyze packets with specified analysis type.""" self._analysis_type = analysis_type self._analysis_kwargs = kwargs return super().analyze_packets(pcap_file)
def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]: """Perform the actual TCP packet analysis on a local PCAP file.""" try: packets = rdpcap(pcap_file) tcp_packets = [pkt for pkt in packets if pkt.haslayer(TCP)] if not tcp_packets: return { "file": pcap_file, "total_packets": len(packets), "tcp_packets_found": 0, "message": "No TCP packets found in this capture", } # Apply filtering if specified filtered_packets = self._apply_filters( tcp_packets, self._analysis_kwargs.get("server_ip"), self._analysis_kwargs.get("server_port"), ) # Route to appropriate analysis method if self._analysis_type == "connections": return self._analyze_connections(pcap_file, filtered_packets, packets) elif self._analysis_type == "anomalies": return self._analyze_anomalies(pcap_file, filtered_packets, packets) elif self._analysis_type == "retransmissions": return self._analyze_retrans(pcap_file, filtered_packets, packets) elif self._analysis_type == "traffic_flow": return self._analyze_flow(pcap_file, filtered_packets, packets) else: return {"error": f"Unknown analysis type: {self._analysis_type}"} except Exception as e: return { "error": f"Error reading PCAP file '{pcap_file}': {str(e)}", "file": pcap_file, } def _apply_filters( self, packets: list, server_ip: str | None, server_port: int | None ) -> list: """Apply IP and port filters to packets.""" if not server_ip and not server_port: return packets filtered = [] for pkt in packets: src_ip, dst_ip = self._extract_ips(pkt) tcp = pkt[TCP] # Check if packet matches filter if server_ip: if src_ip != server_ip and dst_ip != server_ip: continue if server_port: if tcp.sport != server_port and tcp.dport != server_port: continue filtered.append(pkt) return filtered def _analyze_connections( self, pcap_file: str, tcp_packets: list, all_packets: list ) -> dict[str, Any]: """Analyze TCP connections.""" # Group packets by connection (4-tuple) connections = defaultdict(list) for pkt in tcp_packets: conn_key = self._get_connection_key(pkt) connections[conn_key].append(pkt) # Analyze each connection connection_details = [] successful_handshakes = 0 failed_handshakes = 0 reset_connections = 0 normal_close = 0 issues = [] for conn_key, pkts in connections.items(): conn_info = self._analyze_single_connection(conn_key, pkts) connection_details.append(conn_info) if conn_info["handshake_completed"]: successful_handshakes += 1 else: failed_handshakes += 1 if conn_info["close_reason"] == "reset": reset_connections += 1 elif conn_info["close_reason"] == "normal": normal_close += 1 # Detect issues total_retrans = sum(c["retransmissions"] for c in connection_details) if reset_connections > 0: issues.append(f"{reset_connections} connections terminated by RST") if total_retrans > 0: issues.append(f"{total_retrans} retransmissions detected") if failed_handshakes > 0: issues.append(f"{failed_handshakes} failed handshakes") return { "file": pcap_file, "analysis_timestamp": datetime.now().isoformat(), "total_packets": len(all_packets), "tcp_packets_found": len(tcp_packets), "filter": { "server_ip": self._analysis_kwargs.get("server_ip"), "server_port": self._analysis_kwargs.get("server_port"), }, "summary": { "total_connections": len(connections), "successful_handshakes": successful_handshakes, "failed_handshakes": failed_handshakes, "established_connections": successful_handshakes, "reset_connections": reset_connections, "normal_close": normal_close, "active_connections": len(connections) - reset_connections - normal_close, }, "connections": connection_details if self._analysis_kwargs.get("detailed", False) else connection_details[:10], "issues": issues, } def _analyze_single_connection( self, conn_key: tuple, packets: list ) -> dict[str, Any]: """Analyze a single TCP connection.""" client, server = self._identify_connection_roles(packets) syn_count = 0 syn_ack_count = 0 ack_count = 0 rst_count = 0 fin_count = 0 data_packets = 0 retransmissions = 0 seen_seqs_by_sender: dict[tuple[str, int], set[int]] = defaultdict(set) handshake_completed = False for pkt in packets: src_ip, _ = self._extract_ips(pkt) tcp = pkt[TCP] flags = tcp.flags # Count flags if flags & 0x02: # SYN syn_count += 1 if flags & 0x12 == 0x12: # SYN-ACK syn_ack_count += 1 if flags & 0x10: # ACK ack_count += 1 if flags & 0x04: # RST rst_count += 1 if flags & 0x01: # FIN fin_count += 1 # Check for data if len(tcp.payload) > 0: data_packets += 1 # Detect retransmissions (simplified) seq = tcp.seq sender = (src_ip, tcp.sport) if seq in seen_seqs_by_sender[sender] and len(tcp.payload) > 0: retransmissions += 1 seen_seqs_by_sender[sender].add(seq) # Determine handshake completion if syn_count > 0 and syn_ack_count > 0 and ack_count > 0: handshake_completed = True # Determine close reason close_reason = "unknown" if rst_count > 0: close_reason = "reset" elif fin_count >= 2: close_reason = "normal" elif len(packets) > 3: close_reason = "active" return { "client": f"{client[0]}:{client[1]}", "server": f"{server[0]}:{server[1]}", "state": "closed" if close_reason in ["reset", "normal"] else "active", "handshake_completed": handshake_completed, "syn_count": syn_count, "syn_ack_count": syn_ack_count, "ack_count": ack_count, "rst_count": rst_count, "fin_count": fin_count, "data_packets": data_packets, "retransmissions": retransmissions, "close_reason": close_reason, "packet_count": len(packets), } def _analyze_anomalies( self, pcap_file: str, tcp_packets: list, all_packets: list ) -> dict[str, Any]: """Detect TCP anomalies using pattern-based analysis. This method collects observable metrics and patterns from the traffic, without making assumptions about root causes. The analysis focuses on factual observations that can be derived from packet-level data. """ # Group packets by connection connections = defaultdict(list) for pkt in tcp_packets: conn_key = self._get_connection_key(pkt) connections[conn_key].append(pkt) # Collect comprehensive statistics stats = self._collect_tcp_statistics(connections, tcp_packets) # Detect observable patterns (not diagnoses) patterns = self._detect_tcp_patterns(stats, connections) return { "file": pcap_file, "analysis_timestamp": datetime.now().isoformat(), "filter": { "server_ip": self._analysis_kwargs.get("server_ip"), "server_port": self._analysis_kwargs.get("server_port"), }, "statistics": stats, "patterns": patterns, "summary": self._generate_pattern_summary(patterns), } def _analyze_retrans( self, pcap_file: str, tcp_packets: list, all_packets: list ) -> dict[str, Any]: """Analyze TCP retransmissions.""" threshold = self._analysis_kwargs.get("threshold", 0.02) # Group by connection connections = defaultdict(list) for pkt in tcp_packets: conn_key = self._get_connection_key(pkt) connections[conn_key].append(pkt) by_connection = [] total_retrans = 0 worst_rate = 0 worst_conn = "" for conn_key, pkts in connections.items(): src_ip, src_port, dst_ip, dst_port = conn_key conn_str = f"{src_ip}:{src_port} <-> {dst_ip}:{dst_port}" conn_info = self._analyze_single_connection(conn_key, pkts) retrans_count = conn_info["retransmissions"] total_retrans += retrans_count retrans_rate = retrans_count / len(pkts) if len(pkts) > 0 else 0 by_connection.append( { "connection": conn_str, "retrans_count": retrans_count, "total_packets": len(pkts), "retrans_rate": retrans_rate, } ) if retrans_rate > worst_rate: worst_rate = retrans_rate worst_conn = conn_str # Sort by retransmission rate by_connection.sort(key=lambda x: x["retrans_rate"], reverse=True) overall_rate = total_retrans / len(tcp_packets) if len(tcp_packets) > 0 else 0 connections_above_threshold = sum( 1 for c in by_connection if c["retrans_rate"] > threshold ) return { "file": pcap_file, "analysis_timestamp": datetime.now().isoformat(), "total_packets": len(tcp_packets), "total_retransmissions": total_retrans, "retransmission_rate": overall_rate, "threshold": threshold, "exceeds_threshold": overall_rate > threshold, "by_connection": by_connection[:10], # Top 10 "summary": { "worst_connection": worst_conn, "worst_retrans_rate": worst_rate, "connections_above_threshold": connections_above_threshold, }, } def _analyze_flow( self, pcap_file: str, tcp_packets: list, all_packets: list ) -> dict[str, Any]: """Analyze traffic flow.""" server_ip = self._analysis_kwargs.get("server_ip") server_port = self._analysis_kwargs.get("server_port") if not server_ip: return {"error": "server_ip is required for traffic flow analysis"} client_to_server = { "packet_count": 0, "byte_count": 0, "syn_count": 0, "rst_count": 0, "fin_count": 0, "data_packets": 0, "retransmissions": 0, } server_to_client = { "packet_count": 0, "byte_count": 0, "syn_count": 0, "rst_count": 0, "fin_count": 0, "data_packets": 0, "retransmissions": 0, } client_seqs = set() server_seqs = set() for pkt in tcp_packets: src_ip, dst_ip = self._extract_ips(pkt) tcp = pkt[TCP] flags = tcp.flags # Determine direction is_client_to_server = dst_ip == server_ip if server_port: is_client_to_server = tcp.dport == server_port stats = client_to_server if is_client_to_server else server_to_client seqs = client_seqs if is_client_to_server else server_seqs stats["packet_count"] += 1 stats["byte_count"] += len(pkt) if flags & 0x02: stats["syn_count"] += 1 if flags & 0x04: stats["rst_count"] += 1 if flags & 0x01: stats["fin_count"] += 1 if len(tcp.payload) > 0: stats["data_packets"] += 1 # Retransmissions seq = tcp.seq if seq in seqs and len(tcp.payload) > 0: stats["retransmissions"] += 1 seqs.add(seq) # Analysis total_client = client_to_server["packet_count"] total_server = server_to_client["packet_count"] asymmetry_ratio = total_client / total_server if total_server > 0 else 0 # Determine primary RST source client_rst = client_to_server["rst_count"] server_rst = server_to_client["rst_count"] if client_rst > server_rst: rst_source = "client" interpretation = f"Client sends all RST packets ({client_rst} vs {server_rst}). Server responds normally. Suggests client-side issue (possibly firewall)." elif server_rst > client_rst: rst_source = "server" interpretation = "Server sends more RST packets. Suggests server-side rejection or service issue." else: rst_source = "balanced" interpretation = "Balanced RST distribution." return { "file": pcap_file, "analysis_timestamp": datetime.now().isoformat(), "server": f"{server_ip}:{server_port or 'any'}", "client_to_server": client_to_server, "server_to_client": server_to_client, "analysis": { "asymmetry_ratio": asymmetry_ratio, "primary_rst_source": rst_source, "data_flow_direction": "client_heavy" if asymmetry_ratio > 1.2 else "server_heavy" if asymmetry_ratio < 0.8 else "balanced", "interpretation": interpretation, }, } def _get_connection_key(self, pkt) -> tuple: """Extract a direction-agnostic connection key.""" src_ip, dst_ip = self._extract_ips(pkt) tcp = pkt[TCP] endpoints = ((src_ip, tcp.sport), (dst_ip, tcp.dport)) return tuple(sorted(endpoints)) def _identify_connection_roles( self, packets: list ) -> tuple[tuple[str, int], tuple[str, int]]: """Infer client/server roles from packet direction. Prefer the first SYN without ACK as the connection initiator. If that is not present, fall back to the first packet in capture order. """ for pkt in packets: src_ip, dst_ip = self._extract_ips(pkt) tcp = pkt[TCP] if tcp.flags & 0x02 and not (tcp.flags & 0x10): return (src_ip, tcp.sport), (dst_ip, tcp.dport) first_pkt = packets[0] src_ip, dst_ip = self._extract_ips(first_pkt) tcp = first_pkt[TCP] return (src_ip, tcp.sport), (dst_ip, tcp.dport) def _extract_ips(self, pkt) -> tuple: """Extract source and destination IPs.""" if pkt.haslayer(IP): return pkt[IP].src, pkt[IP].dst elif pkt.haslayer(IPv6): return pkt[IPv6].src, pkt[IPv6].dst return "unknown", "unknown" def _collect_tcp_statistics( self, connections: dict, tcp_packets: list ) -> dict[str, Any]: """Collect comprehensive TCP statistics from connections. Returns factual metrics without interpretation. """ server_ip = self._analysis_kwargs.get("server_ip") stats = { "total_connections": len(connections), "total_packets": len(tcp_packets), "handshake": { "successful": 0, "failed": 0, "incomplete": 0, }, "flags": { "syn": 0, "syn_ack": 0, "rst": 0, "fin": 0, "ack": 0, }, "rst_distribution": { "by_source": {}, # IP -> count "by_direction": { "to_server": 0, "from_server": 0, "unknown": 0, }, "connections_with_rst": [], }, "retransmissions": { "total": 0, "by_connection": {}, }, "connection_states": { "established": 0, "reset": 0, "closed": 0, "unknown": 0, }, } for conn_key, pkts in connections.items(): src_ip, src_port, dst_ip, dst_port = conn_key conn_info = self._analyze_single_connection(conn_key, pkts) # Handshake analysis if conn_info["handshake_completed"]: stats["handshake"]["successful"] += 1 else: stats["handshake"]["failed"] += 1 # Flag counting for pkt in pkts: tcp = pkt[TCP] flags = tcp.flags if flags & 0x02: # SYN stats["flags"]["syn"] += 1 if flags & 0x12 == 0x12: # SYN-ACK stats["flags"]["syn_ack"] += 1 if flags & 0x04: # RST stats["flags"]["rst"] += 1 pkt_src_ip, _ = self._extract_ips(pkt) # Track RST by source IP stats["rst_distribution"]["by_source"][pkt_src_ip] = ( stats["rst_distribution"]["by_source"].get(pkt_src_ip, 0) + 1 ) # Track RST by direction (if server info provided) if server_ip: if pkt_src_ip == server_ip: stats["rst_distribution"]["by_direction"][ "from_server" ] += 1 else: stats["rst_distribution"]["by_direction"]["to_server"] += 1 else: stats["rst_distribution"]["by_direction"]["unknown"] += 1 if flags & 0x01: # FIN stats["flags"]["fin"] += 1 if flags & 0x10: # ACK stats["flags"]["ack"] += 1 # RST connections if conn_info["rst_count"] > 0: stats["rst_distribution"]["connections_with_rst"].append( f"{src_ip}:{src_port} <-> {dst_ip}:{dst_port}" ) # Retransmissions retrans = conn_info["retransmissions"] stats["retransmissions"]["total"] += retrans if retrans > 0: conn_str = f"{src_ip}:{src_port} <-> {dst_ip}:{dst_port}" stats["retransmissions"]["by_connection"][conn_str] = retrans # Connection states if conn_info["close_reason"] == "reset": stats["connection_states"]["reset"] += 1 elif conn_info["close_reason"] == "normal": stats["connection_states"]["closed"] += 1 elif conn_info["handshake_completed"]: stats["connection_states"]["established"] += 1 else: stats["connection_states"]["unknown"] += 1 # Calculate rates stats["retransmissions"]["rate"] = ( stats["retransmissions"]["total"] / stats["total_packets"] if stats["total_packets"] > 0 else 0 ) return stats def _detect_tcp_patterns( self, stats: dict, connections: dict ) -> list[dict[str, Any]]: """Detect observable patterns in TCP traffic. Returns patterns with factual descriptions, not interpretations. Each pattern includes the raw data that led to its detection. """ patterns = [] # Pattern 1: RST asymmetry rst_by_dir = stats["rst_distribution"]["by_direction"] if rst_by_dir["to_server"] > 0 or rst_by_dir["from_server"] > 0: total_rst = rst_by_dir["to_server"] + rst_by_dir["from_server"] if total_rst > 0: patterns.append( { "pattern": "rst_directional_asymmetry", "category": "connection_termination", "observations": { "rst_to_server": rst_by_dir["to_server"], "rst_from_server": rst_by_dir["from_server"], "ratio": rst_by_dir["to_server"] / total_rst if total_rst > 0 else 0, "affected_connections": stats["rst_distribution"][ "connections_with_rst" ], }, "description": f"RST packets show directional bias: {rst_by_dir['to_server']} toward server, {rst_by_dir['from_server']} from server", } ) # Pattern 2: Retransmission rate retrans_rate = stats["retransmissions"]["rate"] if retrans_rate > 0: patterns.append( { "pattern": "packet_retransmission", "category": "reliability", "observations": { "total_retransmissions": stats["retransmissions"]["total"], "total_packets": stats["total_packets"], "rate": retrans_rate, "threshold": 0.02, # Common baseline "exceeds_threshold": retrans_rate > 0.02, "by_connection": stats["retransmissions"]["by_connection"], }, "description": f"Retransmission rate: {retrans_rate:.2%} ({stats['retransmissions']['total']}/{stats['total_packets']} packets)", } ) # Pattern 3: Handshake failure rate total_attempts = stats["handshake"]["successful"] + stats["handshake"]["failed"] if total_attempts > 0: failure_rate = stats["handshake"]["failed"] / total_attempts if failure_rate > 0: patterns.append( { "pattern": "handshake_completion", "category": "connection_establishment", "observations": { "successful": stats["handshake"]["successful"], "failed": stats["handshake"]["failed"], "failure_rate": failure_rate, }, "description": f"Handshake success rate: {(1 - failure_rate):.1%} ({stats['handshake']['successful']}/{total_attempts})", } ) # Pattern 4: Connection termination method total_conns = stats["total_connections"] if total_conns > 0: rst_rate = stats["connection_states"]["reset"] / total_conns if rst_rate > 0.1: # More than 10% reset patterns.append( { "pattern": "abnormal_termination", "category": "connection_lifecycle", "observations": { "reset_count": stats["connection_states"]["reset"], "normal_close": stats["connection_states"]["closed"], "total_connections": total_conns, "reset_rate": rst_rate, }, "description": f"{rst_rate:.1%} of connections terminated by RST ({stats['connection_states']['reset']}/{total_conns})", } ) # Pattern 5: Flag anomalies if stats["flags"]["syn"] > 0: syn_ack_ratio = stats["flags"]["syn_ack"] / stats["flags"]["syn"] if syn_ack_ratio < 0.5: # Less than 50% of SYNs get SYN-ACK patterns.append( { "pattern": "syn_response_imbalance", "category": "connection_establishment", "observations": { "syn_count": stats["flags"]["syn"], "syn_ack_count": stats["flags"]["syn_ack"], "response_ratio": syn_ack_ratio, }, "description": f"Only {syn_ack_ratio:.1%} of SYN packets received SYN-ACK response", } ) return patterns def _generate_pattern_summary(self, patterns: list) -> dict[str, Any]: """Generate a summary of detected patterns by category.""" summary = { "total_patterns": len(patterns), "by_category": {}, "notable_observations": [], } for pattern in patterns: category = pattern["category"] summary["by_category"][category] = ( summary["by_category"].get(category, 0) + 1 ) # Extract notable observations (can be extended) for pattern in patterns: if pattern["pattern"] == "rst_directional_asymmetry": obs = pattern["observations"] if obs["ratio"] > 0.9 or obs["ratio"] < 0.1: summary["notable_observations"].append( { "type": "strong_rst_asymmetry", "detail": f"RST packets heavily biased: {obs['ratio']:.1%} in one direction", } ) if pattern["pattern"] == "packet_retransmission": obs = pattern["observations"] if obs["exceeds_threshold"]: summary["notable_observations"].append( { "type": "high_retransmission", "detail": f"Retransmission rate {obs['rate']:.2%} exceeds typical threshold of 2%", } ) return summary
[docs] def setup_prompts(self, mcp: FastMCP) -> None: """Set up TCP-specific analysis prompts for the MCP server. Args: mcp: FastMCP server instance """ @mcp.prompt def tcp_connection_troubleshooting(): """Prompt for troubleshooting TCP connection issues""" return """You are a network engineer troubleshooting TCP connection problems. Focus on: 1. **Connection Establishment:** - Analyze three-way handshake (SYN, SYN-ACK, ACK) - Identify failed handshakes and root causes - Check for connection timeouts - Look for firewall blocks or network ACLs 2. **Connection Termination:** - Identify RST (reset) packets and their sources - Analyze normal FIN-based termination - Detect abnormal connection closures - Determine if issues are client-side or server-side 3. **Network Quality:** - Measure retransmission rates - Identify packet loss patterns - Check for network congestion - Analyze latency issues 4. **Traffic Patterns:** - Examine bidirectional traffic flow - Identify traffic asymmetry - Detect unusual connection patterns - Look for scanning or attack behaviors Provide specific diagnostics with evidence and actionable remediation steps.""" @mcp.prompt def tcp_security_analysis(): """Prompt for analyzing TCP traffic from a security perspective""" return """You are a security analyst examining TCP traffic. Focus on: 1. **Attack Detection:** - Identify SYN flood attacks - Detect port scanning activities - Look for connection flooding - Find abnormal connection patterns 2. **Firewall Analysis:** - Identify blocked connections (RST patterns) - Analyze firewall effectiveness - Detect firewall misconfigurations - Check for bypass attempts 3. **Anomaly Detection:** - Find unusual RST patterns - Detect connection hijacking attempts - Identify data exfiltration patterns - Look for covert channels 4. **Forensic Evidence:** - Document connection timelines - Correlate TCP events with security incidents - Preserve evidence of malicious activity - Track attacker behavior patterns Present findings with severity levels, evidence, and recommended security actions."""