Incident System Architecture¶
Overview¶
The Incident System provides persistent storage for diagnostic events, enabling post-mortem analysis of connection and communication issues. It is designed to capture all relevant information needed to understand what led to a problem.
Purpose¶
- Diagnostic Analysis: Capture enough context so that AI or developers can analyze root causes
- Persistent History: Store incidents across restarts for trend analysis
- Correlation: Enable correlation between related events (e.g., PingPong mismatches followed by circuit breaker trips)
- Automatic Cleanup: Manage storage growth with configurable retention
- Per-Type Storage: Each incident type maintains its own history (max 20 per type, 7-day retention) to ensure important but infrequent events are not crowded out by high-frequency ones
Architecture¶
┌─────────────────────────────────────────────────────────────────────────┐
│ CentralUnit │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────┐ │
│ │ CacheCoordinator│ │ClientCoordinator │ │ConnectionRecovery │ │
│ │ │ │ │ │ Coordinator │ │
│ │ ┌───────────┐ │ │ ┌────────────┐ │ │ │ │
│ │ │IncidentSto│◄─┼────┼──│CircuitBreak│ │ │ CONNECTION_LOST ──┼──┤
│ │ │ re │ │ │ │ er │ │ │ incidents │ │
│ │ └───────────┘ │ │ └────────────┘ │ └─────────────────────┘ │
│ │ ▲ │ │ │ │
│ │ │ │ └─────────────────┘ │
│ │ ┌────┴─────┐ │ │
│ │ │PingPongTr│ │ │
│ │ │ acker │ │ │
│ │ └──────────┘ │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
Core Components¶
IncidentStore¶
Location: aiohomematic/store/persistent/incident.py
Responsibilities:
- Persist incidents to disk as JSON
- Load incidents on demand
- Auto-cleanup based on retention period
- Provide diagnostics API for Home Assistant
Persistence Strategy: Save-on-incident, load-on-demand
- Incidents are written immediately when recorded
- Full history is only loaded when diagnostics are requested
- Reduces memory footprint during normal operation
IncidentRecorderProtocol¶
Location: aiohomematic/interfaces/operations.py
Purpose: Decoupled interface for recording incidents, allowing components to record incidents without direct dependency on IncidentStore.
class IncidentRecorderProtocol(Protocol):
def record_incident(
self,
*,
incident_type: IncidentType,
severity: IncidentSeverity,
message: str,
interface_id: str | None = None,
context: dict[str, Any] | None = None,
journal: PingPongJournal | None = None,
) -> Coroutine[Any, Any, IncidentSnapshot]: ...
Incident Types¶
Connection Health Incidents¶
| Type | Severity | Source | Description |
|---|---|---|---|
PING_PONG_MISMATCH_HIGH | ERROR | PingPongTracker | Pending PONG count exceeded threshold |
PING_PONG_UNKNOWN_HIGH | WARNING | PingPongTracker | Unknown PONG count exceeded threshold |
CIRCUIT_BREAKER_TRIPPED | ERROR | CircuitBreaker | Circuit breaker opened due to failures |
CIRCUIT_BREAKER_RECOVERED | INFO | CircuitBreaker | Circuit breaker recovered |
CONNECTION_LOST | ERROR | ConnectionRecoveryCoordinator | Connection to backend lost |
CONNECTION_RESTORED | INFO | ConnectionRecoveryCoordinator | Connection to backend restored |
RPC_ERROR | ERROR | AioXmlRpcProxy / AioJsonRpcAioHttpClient | RPC call failed |
CALLBACK_TIMEOUT | WARNING | ClientCCU | Callback from backend timed out |
Incident Data Structure¶
IncidentSnapshot¶
@dataclass(frozen=True, slots=True)
class IncidentSnapshot:
id: str # UUID
timestamp: datetime # When incident occurred
incident_type: IncidentType # Type classification
severity: IncidentSeverity # INFO, WARNING, ERROR
message: str # Human-readable description
interface_id: str | None # Which interface (if applicable)
context: dict[str, Any] # Type-specific diagnostic data
journal: list[JournalEntry] | None # Event history (for PingPong)
Context Data Requirements¶
Critical: Each incident MUST include enough context for AI analysis.
PING_PONG_MISMATCH_HIGH Context¶
{
"pending_count": int, # Current pending PONG count
"threshold": int, # Configured threshold
# Plus: journal with last N PING/PONG events and timestamps
}
PING_PONG_UNKNOWN_HIGH Context¶
{
"unknown_count": int, # Current unknown PONG count
"threshold": int, # Configured threshold
# Plus: journal with last N PING/PONG events and timestamps
}
CIRCUIT_BREAKER_TRIPPED Context¶
{
"old_state": str, # State before trip (closed/half_open)
"failure_count": int, # Consecutive failures that caused trip
"failure_threshold": int, # Configured threshold
"recovery_timeout": float, # Seconds before half-open attempt
"last_failure_time": str, # ISO timestamp of last failure
"total_requests": int, # Total requests since start
}
CIRCUIT_BREAKER_RECOVERED Context¶
{
"success_count": int, # Successful requests in half-open
"success_threshold": int, # Configured threshold for recovery
}
CONNECTION_LOST Context¶
{
"reason": str, # Reason for connection loss
"detected_at": str, # ISO timestamp when loss was detected
"client_state": str | None, # Client state at time of loss
"circuit_breaker_state": str | None, # Circuit breaker state
"recovery_attempt_count": int, # Number of recovery attempts so far
"active_recoveries": list[str], # List of interfaces currently recovering
"in_failed_state": bool, # Whether coordinator is in FAILED state
}
CONNECTION_RESTORED Context¶
{
"total_attempts": int, # Total recovery attempts made
"total_duration_ms": float, # Total recovery duration in milliseconds
"stages_completed": list[str], # Recovery stages completed (e.g., ["TCP_CHECKING", "RPC_CHECKING", ...])
"client_state": str | None, # Client state after restoration
"circuit_breaker_state": str | None, # Circuit breaker state after restoration
"was_in_failed_state": bool, # Whether coordinator was in FAILED state before recovery
}
RPC_ERROR Context¶
{
"protocol": str, # Protocol type ("xml-rpc" or "json-rpc")
"method": str, # RPC method that failed (e.g., "setValue", "getParamset")
"error_type": str, # Error type (e.g., "SSLError", "OSError", "XMLRPCFault", "JSONRPCError")
"error_message": str, # Sanitized error message (sensitive info removed)
"tls_enabled": bool, # Whether TLS is enabled for this connection
}
CALLBACK_TIMEOUT Context¶
{
"seconds_since_last_event": float, # Time since last callback was received
"callback_warn_interval": float, # Configured threshold in seconds
"last_event_time": str, # ISO timestamp of last callback received
"client_state": str, # Client state when timeout detected
"circuit_breaker_state": str | None, # Circuit breaker state (if available)
}
Storage Configuration¶
Per-Type Limits¶
Incidents are stored per IncidentType to ensure each type maintains its own history:
INCIDENT_STORE_MAX_PER_TYPE: Final = 20 # Max incidents per type
DEFAULT_MAX_AGE_DAYS: Final = 7 # 7-day retention
Benefits:
- High-frequency incidents (e.g., CONNECTION_LOST during network issues) don't crowd out rare but important incidents
- Each incident type has guaranteed storage space
- Easy to analyze patterns within a specific incident type
Example Storage Distribution:
PING_PONG_MISMATCH_HIGH: 5 incidents (last 7 days)
CIRCUIT_BREAKER_TRIPPED: 12 incidents (last 7 days)
CONNECTION_LOST: 20 incidents (oldest evicted to maintain limit)
CIRCUIT_BREAKER_RECOVERED: 8 incidents (last 7 days)
Integration Pattern¶
Sync-to-Async Bridge¶
Components like CircuitBreaker operate synchronously but IncidentStore is async. The pattern used:
def _record_incident_sync(self, ...) -> None:
if (incident_recorder := self._incident_recorder) is None:
return
# Capture current state for async closure
captured_data = {...}
async def _record() -> None:
try:
await incident_recorder.record_incident(...)
except Exception as err:
_LOGGER.debug("Failed to record incident: %s", err)
# Fire and forget - suppress if no event loop
with contextlib.suppress(RuntimeError):
asyncio.get_running_loop().create_task(_record())
Dependency Injection¶
Incident recorder is injected as an optional dependency:
class CircuitBreaker:
def __init__(
self,
*,
incident_recorder: IncidentRecorderProtocol | None = None,
...
) -> None:
self._incident_recorder = incident_recorder
Storage Format¶
Incidents are stored as JSON in {storage_path}/cache/{central_name}_hm_incidents.json:
{
"incidents": [
{
"id": "uuid-v4",
"timestamp": "2026-01-03T10:15:30.123456",
"incident_type": "CIRCUIT_BREAKER_TRIPPED",
"severity": "error",
"message": "Circuit breaker opened for BidCos-RF after 5 failures",
"interface_id": "BidCos-RF",
"context": {
"old_state": "closed",
"failure_count": 5,
"failure_threshold": 5,
"recovery_timeout": 30.0,
"last_failure_time": "2026-01-03T10:15:30.100000",
"total_requests": 127
},
"journal": null
}
],
"version": 1
}
Home Assistant Integration¶
Diagnostics Export¶
IncidentStore exposes a diagnostics method for HA:
async def get_diagnostics(self) -> dict[str, Any]:
"""Return diagnostic data for Home Assistant."""
return {
"incidents": [i.to_dict() for i in await self._load_incidents()],
"total_count": len(self._incidents),
"retention_days": self._retention_days,
}
This is integrated in homematicip_local/diagnostics.py:
Design Principles¶
1. Comprehensive Context¶
Every incident should include enough information to answer:
- What happened?
- When did it happen?
- What was the system state?
- What thresholds or configurations were in effect?
- What events led up to this?
2. Fire-and-Forget Recording¶
Incident recording should never block the caller:
- Use async tasks scheduled on event loop
- Silently skip if no event loop
- Log and continue on recording failures
3. Minimal Runtime Impact¶
- Save-on-incident avoids keeping all data in memory
- Load-on-demand for diagnostics only
- Auto-cleanup prevents unbounded growth
4. Protocol-Based DI¶
Components depend on IncidentRecorderProtocol, not IncidentStore:
- Enables testing with mocks
- Allows alternative implementations
- Reduces coupling
Adding New Incident Types¶
1. Add Type to IncidentType Enum¶
# aiohomematic/store/types.py
class IncidentType(StrEnum):
MY_NEW_INCIDENT = "MY_NEW_INCIDENT"
"""Description of when this incident occurs."""
2. Add Recording in Component¶
def _record_my_incident(self) -> None:
if (incident_recorder := self._incident_recorder) is None:
return
from aiohomematic.store.types import IncidentSeverity, IncidentType
# Capture all relevant state
context = {
"relevant_field_1": self._field_1,
"relevant_field_2": self._field_2,
"configuration": self._config.some_setting,
# Include anything that helps diagnose the issue
}
async def _record() -> None:
try:
await incident_recorder.record_incident(
incident_type=IncidentType.MY_NEW_INCIDENT,
severity=IncidentSeverity.ERROR,
message=f"Descriptive message about what happened",
interface_id=self._interface_id,
context=context,
)
except Exception as err:
_LOGGER.debug("Failed to record incident: %s", err)
with contextlib.suppress(RuntimeError):
asyncio.get_running_loop().create_task(_record())
3. Add Tests¶
@pytest.mark.asyncio
async def test_my_incident_recorded(self) -> None:
from unittest.mock import AsyncMock, MagicMock
incident_recorder = MagicMock()
incident_recorder.record_incident = AsyncMock()
# Trigger the incident
...
# Wait for async task
await asyncio.sleep(0.01)
# Verify
incident_recorder.record_incident.assert_called_once()
call_kwargs = incident_recorder.record_incident.call_args.kwargs
assert call_kwargs["incident_type"] == IncidentType.MY_NEW_INCIDENT
4. Document Context Schema¶
Add the context schema to this document in the "Context Data Requirements" section.