Source code for mcpcap.modules.base

"""Base module interface for protocol analyzers."""

import os
import tempfile
from abc import ABC, abstractmethod
from typing import Any

from ..core.config import Config


[docs] class BaseModule(ABC): """Base class for protocol analysis modules."""
[docs] def __init__(self, config: Config): """Initialize the module. Args: config: Configuration instance """ self.config = config
@property @abstractmethod def protocol_name(self) -> str: """Return the name of the protocol this module analyzes.""" pass @abstractmethod def _analyze_protocol_file(self, pcap_file: str) -> dict[str, Any]: """Analyze a local PCAP file for this protocol. This method should be implemented by each module to perform the actual protocol-specific analysis. Args: pcap_file: Path to local PCAP file Returns: Analysis results dictionary """ pass
[docs] def analyze_packets(self, pcap_file: str) -> dict[str, Any]: """Analyze packets from a PCAP file (local or remote). Args: pcap_file: Path to local PCAP file or HTTP URL to remote PCAP file Returns: A structured dictionary containing packet analysis results """ # Check if this is a remote URL or local file if pcap_file.startswith(("http://", "https://")): return self._handle_remote_analysis(pcap_file) else: return self._handle_local_analysis(pcap_file)
def _handle_remote_analysis(self, pcap_url: str) -> dict[str, Any]: """Handle remote PCAP file analysis.""" try: # Download remote file to temporary location with tempfile.NamedTemporaryFile(suffix=".pcap", delete=False) as tmp_file: temp_path = tmp_file.name local_path = self._download_pcap_file(pcap_url, temp_path) result = self._analyze_protocol_file(local_path) # Clean up temporary file try: os.unlink(local_path) except OSError: pass # Ignore cleanup errors return result except Exception as e: return { "error": f"Failed to download PCAP file '{pcap_url}': {str(e)}", "pcap_url": pcap_url, } def _handle_local_analysis(self, pcap_file: str) -> dict[str, Any]: """Handle local PCAP file analysis.""" # Validate file exists if not os.path.exists(pcap_file): return { "error": f"PCAP file not found: {pcap_file}", "pcap_file": pcap_file, } # Validate file extension if not pcap_file.lower().endswith((".pcap", ".pcapng", ".cap")): return { "error": f"File '{pcap_file}' is not a supported PCAP file (.pcap/.pcapng/.cap)", "pcap_file": pcap_file, } try: return self._analyze_protocol_file(pcap_file) except Exception as e: return { "error": f"Failed to analyze PCAP file '{pcap_file}': {str(e)}", "pcap_file": pcap_file, } def _download_pcap_file(self, pcap_url: str, local_path: str) -> str: """Download a remote PCAP file to local storage. Args: pcap_url: URL of the PCAP file to download local_path: Local path to save the file Returns: Local path to the downloaded file """ import requests try: response = requests.get(pcap_url, timeout=60, stream=True) response.raise_for_status() os.makedirs(os.path.dirname(local_path), exist_ok=True) with open(local_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return local_path except requests.RequestException as e: raise ValueError( f"Failed to download PCAP file '{pcap_url}': {str(e)}" ) from e