Event Reference¶
This document provides a complete reference of all events in aiohomematic and how to subscribe to them.
Overview¶
aiohomematic uses a type-safe, async-first EventBus for decoupled event handling. Events are immutable dataclasses that carry information about state changes in the system.
Key concepts:
- Event types: Strongly-typed dataclasses inheriting from
Event - Event key: Each event has a
keyproperty for filtering subscriptions - Handlers: Can be sync or async functions
- Unsubscribe: All subscriptions return an unsubscribe callable
Quick Reference¶
| Event Type | Category | Key | Description |
|---|---|---|---|
DataPointValueReceivedEvent | Data Point | DataPointKey | Value updated from backend |
DataPointStatusReceivedEvent | Data Point | DataPointKey | Status parameter updated |
DataPointStateChangedEvent | Data Point | unique_id | External callback notification |
RpcParameterReceivedEvent | Backend | DataPointKey | Re-published raw parameter from RPC |
SysvarStateChangedEvent | System Variable | state_path | Sysvar value changed |
DeviceStateChangedEvent | Device | device_address | Device state updated |
DeviceRemovedEvent | Device | unique_id | Device/data point removed |
FirmwareStateChangedEvent | Device | device_address | Firmware info updated |
LinkPeerChangedEvent | Device | channel_address | Link peers changed |
ConnectionStageChangedEvent | Connection | interface_id | Reconnection stage |
ConnectionHealthChangedEvent | Connection | interface_id | Health status change |
CacheInvalidatedEvent | Cache | scope | Cache entries cleared |
CircuitBreakerStateChangedEvent | Circuit Breaker | interface_id | State transition |
CircuitBreakerTrippedEvent | Circuit Breaker | interface_id | Breaker tripped |
HealthRecordedEvent | Health | interface_id | Health status recorded |
ClientStateChangedEvent | State Machine | interface_id | Client state change |
CentralStateChangedEvent | State Machine | central_name | Central state change |
DataRefreshTriggeredEvent | Data Refresh | interface_id | Refresh started |
DataRefreshCompletedEvent | Data Refresh | interface_id | Refresh completed |
ProgramExecutedEvent | Hub | program_id | Program executed |
RequestCoalescedEvent | Optimization | interface_id | Requests merged |
ConnectionLostEvent | Recovery | interface_id | Connection lost detected |
RecoveryStageChangedEvent | Recovery | interface_id | Recovery stage transition |
RecoveryAttemptedEvent | Recovery | interface_id | Recovery attempt started |
RecoveryCompletedEvent | Recovery | interface_id | Recovery completed successfully |
RecoveryFailedEvent | Recovery | interface_id | Recovery failed |
HeartbeatTimerFiredEvent | Recovery | interface_id | Heartbeat timer triggered |
DataFetchCompletedEvent | Data | interface_id | Data fetch operation completed |
SystemStatusChangedEvent | Integration | None | System status (HA) |
DeviceLifecycleEvent | Integration | None | Device lifecycle (HA) |
DataPointsCreatedEvent | Integration | None | Data points created (HA) |
DeviceTriggerEvent | Integration | None | Device trigger (HA) |
Data Point Events¶
DataPointValueReceivedEvent¶
Fired when a data point value is updated from the backend.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
dpk | DataPointKey | Unique identifier for the data point |
value | Any | The new value |
received_at | datetime | When the value was received from backend |
Key: DataPointKey
async def handler(*, event: DataPointValueReceivedEvent) -> None:
print(f"DataPoint {event.dpk.parameter} = {event.value}")
unsubscribe = central.event_bus.subscribe(
event_type=DataPointValueReceivedEvent,
event_key=None, # All data points, or specific DataPointKey
handler=handler,
)
DataPointStatusReceivedEvent¶
Fired when a STATUS parameter value is updated (e.g., LEVEL_STATUS).
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
dpk | DataPointKey | Key of the MAIN parameter (not STATUS) |
status_value | int \| str | The status value |
received_at | datetime | When the value was received |
Key: DataPointKey (of the main parameter)
DataPointStateChangedEvent¶
Callback event for external integrations (e.g., Home Assistant entities).
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
unique_id | str | Unique identifier of the data point |
custom_id | str | Custom identifier for the subscriber |
Key: unique_id
Backend Events¶
RpcParameterReceivedEvent¶
Raw parameter update event from the backend (re-published from RPC callbacks).
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
interface_id | str | Interface identifier (e.g., "BidCos-RF") |
channel_address | str | Channel address (e.g., "VCU0000001:1") |
parameter | str | Parameter name (e.g., "STATE") |
value | Any | The parameter value |
Key: DataPointKey (constructed from fields)
System Variable Events¶
SysvarStateChangedEvent¶
System variable value was updated.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
state_path | str | Path/identifier of the system variable |
value | Any | The new value |
received_at | datetime | When the value was received |
Key: state_path
Device Events¶
DeviceStateChangedEvent¶
Device state has been updated.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
device_address | str | Address of the device |
Key: device_address
DeviceRemovedEvent¶
Device or data point has been removed.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
unique_id | str | Unique identifier of the removed entity |
Key: unique_id
FirmwareStateChangedEvent¶
Device firmware information has been updated.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
device_address | str | Address of the device |
Key: device_address
LinkPeerChangedEvent¶
Channel link peer addresses have changed.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
channel_address | str | Address of the channel |
Key: channel_address
Connection Events¶
ConnectionStageChangedEvent¶
Connection reconnection stage progression during recovery.
from aiohomematic.central.events import ConnectionStageChangedEvent
from aiohomematic.const import ConnectionStage
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
interface_id | str | Interface identifier |
stage | ConnectionStage | Current stage |
previous_stage | ConnectionStage | Previous stage |
duration_in_previous_stage_ms | float | Time in previous stage (ms) |
Key: interface_id
ConnectionStage values:
LOST- Connection lostTCP_AVAILABLE- TCP connection establishedRPC_AVAILABLE- RPC calls workingWARMUP- Waiting for stable connectionESTABLISHED- Fully connected
async def handler(*, event: ConnectionStageChangedEvent) -> None:
print(f"{event.interface_id}: {event.previous_stage} -> {event.stage}")
unsubscribe = central.event_bus.subscribe(
event_type=ConnectionStageChangedEvent,
event_key=None,
handler=handler,
)
ConnectionHealthChangedEvent¶
Connection health status update.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
interface_id | str | Interface identifier |
is_healthy | bool | Whether connection is healthy |
failure_reason | FailureReason \| None | Reason for unhealthy state |
consecutive_failures | int | Number of consecutive failures |
last_successful_contact | datetime \| None | Last successful communication |
Key: interface_id
Cache Events¶
CacheInvalidatedEvent¶
Cache invalidation notification.
from aiohomematic.central.events import CacheInvalidatedEvent
from aiohomematic.const import CacheType, CacheInvalidationReason
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
cache_type | CacheType | Type of cache affected |
reason | CacheInvalidationReason | Why cache was invalidated |
scope | str \| None | Scope (device_address, interface_id, or None) |
entries_affected | int | Number of entries invalidated |
Key: scope
CacheType values:
DEVICE_DESCRIPTIONS- Device description cachePARAMSET_DESCRIPTIONS- Paramset description cacheDATA_CACHE- Runtime data cache
Circuit Breaker Events¶
CircuitBreakerStateChangedEvent¶
Circuit breaker state transition.
from aiohomematic.central.events import CircuitBreakerStateChangedEvent
from aiohomematic.client.circuit_breaker import CircuitState
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
interface_id | str | Interface identifier |
old_state | CircuitState | Previous state |
new_state | CircuitState | New state |
failure_count | int | Current failure count |
success_count | int | Current success count |
last_failure_time | datetime \| None | Last failure timestamp |
Key: interface_id
CircuitState values:
CLOSED- Normal operation (requests allowed)OPEN- Circuit tripped (requests blocked)HALF_OPEN- Testing recovery (limited requests)
async def handler(*, event: CircuitBreakerStateChangedEvent) -> None:
if event.new_state == CircuitState.OPEN:
print(f"Circuit breaker OPEN for {event.interface_id}")
unsubscribe = central.event_bus.subscribe(
event_type=CircuitBreakerStateChangedEvent,
event_key=None,
handler=handler,
)
CircuitBreakerTrippedEvent¶
Circuit breaker tripped (opened due to repeated failures).
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
interface_id | str | Interface identifier |
failure_count | int | Number of failures that caused trip |
last_failure_reason | str \| None | Reason for last failure |
cooldown_seconds | float | Seconds until recovery attempt |
Key: interface_id
Health Events¶
HealthRecordedEvent¶
Emitted by CircuitBreaker when a request succeeds or fails, enabling decoupled health tracking.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
interface_id | str | Interface identifier |
success | bool | Whether the request was successful |
Key: interface_id
This event is used by ClientCoordinator to track health metrics for each interface. It replaces the previous callback-based health recording pattern.
async def handler(*, event: HealthRecordedEvent) -> None:
status = "success" if event.success else "failure"
print(f"{event.interface_id}: {status}")
unsubscribe = central.event_bus.subscribe(
event_type=HealthRecordedEvent,
event_key=None,
handler=handler,
)
State Machine Events¶
ClientStateChangedEvent¶
Client state machine transition.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
interface_id | str | Interface identifier |
old_state | str | Previous state (ClientState value) |
new_state | str | New state (ClientState value) |
trigger | str \| None | What triggered the transition |
Key: interface_id
ClientState values:
INIT- Initial stateCONNECTING- Establishing connectionCONNECTED- Fully connectedRECONNECTING- Attempting reconnectionDISCONNECTED- DisconnectedFAILED- Permanent failure
CentralStateChangedEvent¶
Central unit state machine transition.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
central_name | str | Name of the central unit |
old_state | str | Previous state (CentralState value) |
new_state | str | New state (CentralState value) |
trigger | str \| None | What triggered the transition |
Key: central_name
CentralState values:
STARTING- Starting upINITIALIZING- Initializing clientsRUNNING- Normal operationDEGRADED- Some clients unavailableRECOVERING- Attempting recoveryFAILED- All clients failedSTOPPED- Shut down
async def handler(*, event: CentralStateChangedEvent) -> None:
if event.new_state == "DEGRADED":
print(f"Central {event.central_name} is degraded")
unsubscribe = central.event_bus.subscribe(
event_type=CentralStateChangedEvent,
event_key=None,
handler=handler,
)
Data Refresh Events¶
DataRefreshTriggeredEvent¶
Data refresh operation triggered.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
refresh_type | str | Type: "client_data", "program", "sysvar", "inbox", "firmware" |
interface_id | str \| None | Interface (None for hub-level) |
scheduled | bool | Whether this was a scheduled refresh |
Key: interface_id
DataRefreshCompletedEvent¶
Data refresh operation completed.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
refresh_type | str | Type of refresh |
interface_id | str \| None | Interface (None for hub-level) |
success | bool | Whether refresh succeeded |
duration_ms | float | Duration in milliseconds |
items_refreshed | int | Number of items refreshed |
error_message | str \| None | Error message if failed |
Key: interface_id
async def handler(*, event: DataRefreshCompletedEvent) -> None:
if event.success:
print(f"Refreshed {event.items_refreshed} items in {event.duration_ms}ms")
else:
print(f"Refresh failed: {event.error_message}")
unsubscribe = central.event_bus.subscribe(
event_type=DataRefreshCompletedEvent,
event_key=None,
handler=handler,
)
Hub Events¶
ProgramExecutedEvent¶
Backend program was executed.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
program_id | str | Program identifier |
program_name | str | Program name |
triggered_by | str | Trigger source: "user", "scheduler", "automation" |
success | bool | Whether execution succeeded |
Key: program_id
Optimization Events¶
RequestCoalescedEvent¶
Multiple requests were coalesced into one.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
request_key | str | Key identifying the request type |
coalesced_count | int | Number of requests merged |
interface_id | str | Interface identifier |
Key: interface_id
Recovery Events¶
Events related to connection recovery operations.
ConnectionLostEvent¶
Fired when connection to an interface is lost.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
interface_id | str | Interface identifier |
Key: interface_id
RecoveryStageChangedEvent¶
Fired when recovery transitions between stages.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
interface_id | str | Interface identifier |
old_stage | str | Previous recovery stage |
new_stage | str | New recovery stage |
Key: interface_id
RecoveryCompletedEvent¶
Fired when recovery completes successfully.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
interface_id | str | Interface identifier |
duration_ms | float | Total recovery duration |
Key: interface_id
RecoveryFailedEvent¶
Fired when recovery fails after all retry attempts.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
interface_id | str | Interface identifier |
reason | str | Failure reason |
Key: interface_id
Data Events¶
DataFetchCompletedEvent¶
Fired when a data fetch operation completes.
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
interface_id | str | Interface identifier |
Key: interface_id
Integration Events¶
These events are defined in aiohomematic/central/events/integration.py and are designed for Home Assistant and other consumers.
SystemStatusChangedEvent¶
Aggregated system status changes for integration consumers.
from aiohomematic.central.events import SystemStatusChangedEvent
from aiohomematic.const import CentralState, FailureReason
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
central_state | CentralState \| None | Central state change |
failure_reason | FailureReason \| None | Failure reason (when FAILED) |
failure_interface_id | str \| None | Interface that caused failure |
degraded_interfaces | Mapping[str, FailureReason] \| None | Degraded interfaces |
connection_state | tuple[str, bool] \| None | Connection change |
client_state | tuple[str, ClientState, ClientState] \| None | Client state change |
callback_state | tuple[str, bool] \| None | Callback server state |
issues | tuple[IntegrationIssue, ...] | Issues for user display |
Key: None (global event)
async def handler(*, event: SystemStatusChangedEvent) -> None:
if event.central_state == CentralState.FAILED:
if event.failure_reason == FailureReason.AUTH:
print("Authentication failed!")
elif event.central_state == CentralState.DEGRADED:
for iface_id, reason in (event.degraded_interfaces or {}).items():
print(f"Interface {iface_id} degraded: {reason}")
unsubscribe = central.event_bus.subscribe(
event_type=SystemStatusChangedEvent,
event_key=None,
handler=handler,
)
DeviceLifecycleEvent¶
Device lifecycle events (created, removed, availability).
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
event_type | DeviceLifecycleEventType | Type of lifecycle event |
device_addresses | tuple[str, ...] | Affected device addresses |
availability_changes | tuple[tuple[str, bool], ...] | Availability changes |
includes_virtual_remotes | bool | Whether virtual remotes included |
Key: None (global event)
DeviceLifecycleEventType values:
CREATED- Device createdUPDATED- Device updatedREMOVED- Device removedAVAILABILITY_CHANGED- Device availability changed
async def handler(*, event: DeviceLifecycleEvent) -> None:
if event.event_type == DeviceLifecycleEventType.CREATED:
for address in event.device_addresses:
print(f"New device: {address}")
elif event.event_type == DeviceLifecycleEventType.AVAILABILITY_CHANGED:
for address, available in event.availability_changes:
print(f"{address}: {'available' if available else 'unavailable'}")
unsubscribe = central.event_bus.subscribe(
event_type=DeviceLifecycleEvent,
event_key=None,
handler=handler,
)
DataPointsCreatedEvent¶
New data points created event.
from aiohomematic.central.events import DataPointsCreatedEvent
from aiohomematic.const import DataPointCategory
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
new_data_points | Mapping[DataPointCategory, tuple] | Data points by category |
Key: None (global event)
DeviceTriggerEvent¶
Device trigger events (button press, motion, etc.).
from aiohomematic.central.events import DeviceTriggerEvent
from aiohomematic.const import DeviceTriggerEventType
| Field | Type | Description |
|---|---|---|
timestamp | datetime | When the event was created |
trigger_type | DeviceTriggerEventType | Type of trigger |
model | str | Device model |
interface_id | str | Interface identifier |
device_address | str | Device address |
channel_no | int \| None | Channel number |
parameter | str | Parameter name |
value | Any | Trigger value |
Key: None (global event)
DeviceTriggerEventType values:
KEYPRESS- Button pressKEYPRESS_LONG_START- Long press startedIMPULSE- Impulse trigger
async def handler(*, event: DeviceTriggerEvent) -> None:
if event.trigger_type == DeviceTriggerEventType.KEYPRESS:
print(f"Button {event.parameter} pressed on {event.device_address}")
unsubscribe = central.event_bus.subscribe(
event_type=DeviceTriggerEvent,
event_key=None,
handler=handler,
)
Subscription Methods on Model Classes¶
Model classes provide convenience subscription methods:
DataPoint¶
# Subscribe to value updates
unsubscribe = data_point.subscribe_to_data_point_updated(
handler=my_handler,
custom_id="my-integration",
)
# Subscribe to device removal
unsubscribe = data_point.subscribe_to_device_removed(
handler=my_removal_handler,
)
Device¶
# Subscribe to device state updates
unsubscribe = device.subscribe_to_device_updated(
handler=my_handler,
)
# Subscribe to firmware updates
unsubscribe = device.subscribe_to_firmware_updated(
handler=my_firmware_handler,
)
Channel¶
# Subscribe to link peer changes
unsubscribe = channel.subscribe_to_link_peer_changed(
handler=my_handler,
)
EventBus API¶
Direct Subscription¶
from aiohomematic.central.events import EventBus, DataPointValueReceivedEvent
# Subscribe to all events of a type
unsubscribe = event_bus.subscribe(
event_type=DataPointValueReceivedEvent,
event_key=None,
handler=my_handler,
)
# Subscribe to events with a specific key
unsubscribe = event_bus.subscribe(
event_type=DataPointValueReceivedEvent,
event_key=my_data_point_key,
handler=my_handler,
)
# Unsubscribe when done
unsubscribe()
Handler Priorities¶
from aiohomematic.central.events import EventPriority
# High priority handler (runs before normal handlers)
unsubscribe = event_bus.subscribe(
event_type=DataPointValueReceivedEvent,
event_key=None,
handler=my_critical_handler,
priority=EventPriority.HIGH,
)
| Priority | Value | Use Case |
|---|---|---|
CRITICAL | 200 | Logging, metrics (runs first) |
HIGH | 100 | Important handlers |
NORMAL | 50 | Default for most handlers |
LOW | 0 | Cleanup, notifications (runs last) |
Batch Publishing¶
from aiohomematic.central.events import EventBatch
async with EventBatch(bus=event_bus) as batch:
batch.add(event=event1)
batch.add(event=event2)
batch.add(event=event3)
# All events published when context exits
Testing with Events¶
Use EventCapture for behavior-focused testing:
from aiohomematic_test_support.event_capture import EventCapture
capture = EventCapture()
capture.subscribe_to(event_bus, CircuitBreakerTrippedEvent)
# ... perform test actions ...
capture.assert_event_emitted(
event_type=CircuitBreakerTrippedEvent,
failure_count=5,
)
capture.cleanup()
See Testing with Events for complete testing guide.
Best Practices¶
1. Use Specific Event Types¶
# Good - type-safe, IDE autocomplete works
async def handler(*, event: DataPointValueReceivedEvent) -> None:
print(event.dpk, event.value)
# Bad - loses type information
async def handler(*, event: Event) -> None:
print(event.timestamp) # Only base fields available
2. Keep Handlers Lightweight¶
# Good - quick handler, offloads heavy work
async def handler(*, event: DataPointValueReceivedEvent) -> None:
asyncio.create_task(process_update(event))
# Avoid - blocks other handlers
async def handler(*, event: DataPointValueReceivedEvent) -> None:
await slow_database_operation(event) # Blocks for seconds
3. Always Unsubscribe¶
class MyIntegration:
def __init__(self, event_bus: EventBus) -> None:
self._unsubscribe = event_bus.subscribe(
event_type=DataPointValueReceivedEvent,
event_key=None,
handler=self._handler,
)
def cleanup(self) -> None:
self._unsubscribe()
4. Handle Errors Gracefully¶
The EventBus isolates handler errors - one failing handler won't affect others:
async def handler(*, event: DataPointValueReceivedEvent) -> None:
try:
await process(event)
except ExpectedError:
_LOGGER.warning("Could not process event: %s", event.dpk)
Related Documentation¶
- EventBus Architecture - Internal architecture and design decisions
- Event-Driven Metrics - Metrics collected via events
- Testing with Events - Event-based testing guide
- Architecture Overview - Overall system architecture
- Data Flow - How data flows through the system