"""MCP server setup and configuration."""
from fastmcp import FastMCP
from ..modules.capinfos import CapInfosModule
from ..modules.dhcp import DHCPModule
from ..modules.dns import DNSModule
from ..modules.icmp import ICMPModule
from ..modules.sip import SIPModule
from ..modules.tcp import TCPModule
from .config import Config
[docs]
class MCPServer:
"""MCP server for PCAP analysis."""
[docs]
def __init__(self, config: Config):
"""Initialize MCP server.
Args:
config: Configuration instance
"""
self.config = config
self.mcp = FastMCP("mcpcap")
# Initialize modules based on configuration
self.modules = {}
if "dns" in self.config.modules:
self.modules["dns"] = DNSModule(config)
if "dhcp" in self.config.modules:
self.modules["dhcp"] = DHCPModule(config)
if "icmp" in self.config.modules:
self.modules["icmp"] = ICMPModule(config)
if "capinfos" in self.config.modules:
self.modules["capinfos"] = CapInfosModule(config)
if "tcp" in self.config.modules:
self.modules["tcp"] = TCPModule(config)
if "sip" in self.config.modules:
self.modules["sip"] = SIPModule(config)
# Register tools
self._register_tools()
# Setup prompts
for module in self.modules.values():
module.setup_prompts(self.mcp)
def _register_tools(self) -> None:
"""Register all available tools with the MCP server."""
# Register tools for each loaded module
for module_name, module in self.modules.items():
if module_name == "dns":
self.mcp.tool(module.analyze_dns_packets)
elif module_name == "dhcp":
self.mcp.tool(module.analyze_dhcp_packets)
elif module_name == "icmp":
self.mcp.tool(module.analyze_icmp_packets)
elif module_name == "capinfos":
self.mcp.tool(module.analyze_capinfos)
elif module_name == "tcp":
self.mcp.tool(module.analyze_tcp_connections)
self.mcp.tool(module.analyze_tcp_anomalies)
self.mcp.tool(module.analyze_tcp_retransmissions)
self.mcp.tool(module.analyze_traffic_flow)
elif module_name == "sip":
self.mcp.tool(module.analyze_sip_packets)
[docs]
def run(self) -> None:
"""Start the MCP server."""
if self.config.transport == "http":
self.mcp.run(
transport="http",
host=self.config.host,
port=self.config.port,
show_banner=False,
)
else:
self.mcp.run(show_banner=False)