File: //opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/wordpress/incident_parser.py
"""Parser for WordPress plugin incident files."""
import base64
import json
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
class IncidentFileParser:
"""
Parse incident files written by the WordPress plugin.
These files have format:
<?php __halt_compiler();
#{base64-encoded JSON data for incident}
#{base64-encoded JSON data for incident}
...
File pattern: wp-content/imunify-security/incidents/yyyy-mm-dd-hh.php
"""
@classmethod
def parse_file(cls, file_path: Path) -> list[dict]:
"""
Parse an incident file and return list of incident dictionaries.
The file format is:
- First line: <?php __halt_compiler();
- Following lines: #{base64-encoded JSON}
Args:
file_path: Path to the incident file
Returns:
List of parsed incident dictionaries
"""
incidents = []
try:
with open(file_path, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
incident = cls._process_line(line, line_num, file_path)
if incident is not None:
incidents.append(incident)
except Exception as e:
logger.error(
"Error reading incident file %s: %s",
file_path,
e,
)
return []
return incidents
@classmethod
def _process_line(
cls, line: str, line_num: int, file_path: Path
) -> dict | None:
"""
Process a single line from an incident file.
Args:
line: The line content (already stripped)
line_num: Line number for logging
file_path: Path to the file being processed
Returns:
Parsed incident dictionary or None if line should be skipped
"""
# Skip empty lines
if not line:
return None
if line.startswith("<?php"):
logger.debug(
"Skipping PHP header line %d in %s",
line_num,
file_path.name,
)
return None
# Lines should start with # followed by base64-encoded JSON
if not line.startswith("#"):
logger.debug(
"Line %d in %s doesn't start with #: %s",
line_num,
file_path.name,
line[:50],
)
return None
# Remove the # prefix
encoded_data = line[1:]
return cls._process_encoded_line(encoded_data, line_num, file_path)
@classmethod
def _process_encoded_line(
cls, encoded_data: str, line_num: int, file_path: Path
) -> dict | None:
"""
Decode base64-encoded JSON data from an incident line.
Args:
encoded_data: Base64-encoded JSON string
line_num: Line number for logging
file_path: Path to the file being processed
Returns:
Parsed incident dictionary or None if decoding/parsing fails
"""
try:
decoded_bytes = base64.b64decode(encoded_data)
decoded_str = decoded_bytes.decode("utf-8")
incident = json.loads(decoded_str)
if isinstance(incident, dict):
return incident
logger.warning(
"Line %d in %s is not a JSON object: %s",
line_num,
file_path.name,
decoded_str[:100],
)
return None
except (Exception, json.JSONDecodeError) as e:
logger.error(
"Failed to decode base64 on line %d in %s: %s",
line_num,
file_path.name,
e,
)
return None