diff --git a/config/home_assistant.json5 b/config/home_assistant.json5 new file mode 100644 index 000000000..0a84e080d --- /dev/null +++ b/config/home_assistant.json5 @@ -0,0 +1,40 @@ +{ + version: "v1.0.3", + hertz: 0.5, + name: "home_assistant_agent", + api_key: "${OM_API_KEY:-openmind_free}", + system_prompt_base: "You are a smart home assistant agent. You can control IoT devices connected to Home Assistant including lights, switches, and thermostats. When a user asks you to control a device, use the home_assistant action with the appropriate device_type, entity_id, action, and parameters. Always confirm what action you are taking.", + system_governance: "Here are the laws that govern your actions. Do not violate these laws.\nFirst Law: A robot cannot harm a human or allow a human to come to harm.\nSecond Law: A robot must obey orders from humans, unless those orders conflict with the First Law.\nThird Law: A robot must protect itself, as long as that protection doesn't conflict with the First or Second Law.", + system_prompt_examples: "Here are some examples of interactions you might encounter:\n\n1. Turn on the living room light:\n home_assistant: {{'device_type': 'light', 'entity_id': 'light.living_room', 'action': 'turn_on'}}\n\n2. Set brightness to 50%:\n home_assistant: {{'device_type': 'light', 'entity_id': 'light.living_room', 'action': 'set_brightness', 'brightness': 128}}\n\n3. Set bedroom temperature to 24 degrees:\n home_assistant: {{'device_type': 'climate', 'entity_id': 'climate.bedroom', 'action': 'set_temperature', 'temperature': 24.0}}\n\n4. Turn off the fan switch:\n home_assistant: {{'device_type': 'switch', 'entity_id': 'switch.fan', 'action': 'turn_off'}}\n", + agent_inputs: [ + { + type: "HomeAssistantStateInput", + config: { + base_url: "${HA_BASE_URL:-http://homeassistant.local:8123}", + token: "${HA_TOKEN:-}", + entity_ids: "${HA_ENTITY_IDS:-light.living_room,switch.fan,climate.bedroom}", + poll_interval: 30.0, + }, + }, + ], + cortex_llm: { + type: "OpenAILLM", + config: { + agent_name: "HomeAssistantAgent", + history_length: 10, + }, + }, + agent_actions: [ + { + name: "home_assistant", + llm_label: "home_assistant", + connector: "rest_api", + config: { + base_url: "${HA_BASE_URL:-http://homeassistant.local:8123}", + token: "${HA_TOKEN:-}", + timeout: 10.0, + }, + }, + ], + simulators: [], +} diff --git a/pyproject.toml b/pyproject.toml index 1b4d51e1a..bb708ca54 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ dependencies = [ "websockets==13.1", "om1-modules @ git+https://github.com/OpenMind/om1-modules.git@56fbb4f498e67b666306d9b11f7422d9447db92b", "aiohttp==3.13.3", + "aiomqtt==2.5.0", "pynput==1.8.1", "dimo-python-sdk @ git+https://github.com/openminddev/dimo-python-sdk.git@6b47fcd28654a4145cedee649a0999a8eb08a2f6", "nest-asyncio==1.6.0", diff --git a/src/actions/home_assistant/__init__.py b/src/actions/home_assistant/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/actions/home_assistant/connector/__init__.py b/src/actions/home_assistant/connector/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/actions/home_assistant/connector/mqtt.py b/src/actions/home_assistant/connector/mqtt.py new file mode 100644 index 000000000..d0eaaddf6 --- /dev/null +++ b/src/actions/home_assistant/connector/mqtt.py @@ -0,0 +1,199 @@ +"""Home Assistant MQTT action connector.""" + +import json +import logging + +import aiomqtt +from pydantic import Field + +from actions.base import ActionConfig, ActionConnector +from actions.home_assistant.interface import ( + COLOR_MAP, + HAAction, + HADeviceType, + HomeAssistantInput, +) + + +class HomeAssistantMQTTConfig(ActionConfig): + """ + Configuration for Home Assistant MQTT connector. + + Parameters + ---------- + broker : str + MQTT broker hostname or IP address. + port : int + MQTT broker port (default: 1883). + username : str + MQTT broker username (optional). + password : str + MQTT broker password (optional). + topic_prefix : str + Base topic prefix for Home Assistant (default: homeassistant). + timeout : float + Connection timeout in seconds. + """ + + broker: str = Field(default="", description="MQTT broker hostname or IP") + port: int = Field(default=1883, description="MQTT broker port") + username: str = Field(default="", description="MQTT broker username") + password: str = Field(default="", description="MQTT broker password") + topic_prefix: str = Field(default="homeassistant", description="Base topic prefix") + timeout: float = Field(default=10.0, description="Connection timeout in seconds") + + +class HomeAssistantMQTTConnector( + ActionConnector[HomeAssistantMQTTConfig, HomeAssistantInput] +): + """ + Home Assistant connector using MQTT protocol. + + Publishes device control commands to MQTT topics following + the Home Assistant MQTT convention: + ///set + """ + + def __init__(self, config: HomeAssistantMQTTConfig): + """ + Initialize the MQTT connector. + + Parameters + ---------- + config : HomeAssistantMQTTConfig + Configuration for the connector. + """ + super().__init__(config) + self._broker = config.broker + self._port = config.port + self._username = config.username or None + self._password = config.password or None + self._topic_prefix = config.topic_prefix.rstrip("/") + self._timeout = config.timeout + + if not config.broker: + logging.warning("HomeAssistantMQTTConnector: broker not provided") + + def _build_topic(self, domain: str, entity_id: str) -> str: + """ + Build MQTT command topic for an entity. + + Parameters + ---------- + domain : str + Device domain (e.g. 'light', 'switch', 'climate'). + entity_id : str + Full entity ID (e.g. 'light.living_room'). + + Returns + ------- + str + MQTT topic string. + """ + name = entity_id.split(".", 1)[-1] if "." in entity_id else entity_id + return f"{self._topic_prefix}/{domain}/{name}/set" + + async def _publish(self, topic: str, payload: dict) -> bool: + """ + Publish a JSON payload to an MQTT topic. + + Parameters + ---------- + topic : str + MQTT topic to publish to. + payload : dict + Payload to serialize as JSON. + + Returns + ------- + bool + True if published successfully, False otherwise. + """ + if not self._broker: + return False + + try: + async with aiomqtt.Client( + hostname=self._broker, + port=self._port, + username=self._username, + password=self._password, + timeout=self._timeout, + ) as client: + await client.publish(topic, json.dumps(payload)) + return True + except aiomqtt.MqttError as e: + logging.error(f"HomeAssistantMQTTConnector: MQTT error - {e}") + return False + except TimeoutError: + logging.error("HomeAssistantMQTTConnector: connection timed out") + return False + except Exception as e: + logging.error(f"HomeAssistantMQTTConnector: unexpected error - {e}") + return False + + async def connect(self, output_interface: HomeAssistantInput) -> None: + """ + Execute a Home Assistant device control action via MQTT. + + Parameters + ---------- + output_interface : HomeAssistantInput + The action request containing device_type, entity_id, action, + and optional parameters. + """ + device_type = output_interface.device_type + entity_id = output_interface.entity_id + action = output_interface.action + + if device_type == HADeviceType.LIGHT: + topic = self._build_topic("light", entity_id) + if action == HAAction.TURN_ON: + await self._publish(topic, {"state": "ON"}) + elif action == HAAction.TURN_OFF: + await self._publish(topic, {"state": "OFF"}) + elif action == HAAction.SET_BRIGHTNESS: + brightness = output_interface.brightness or 255 + await self._publish(topic, {"state": "ON", "brightness": brightness}) + elif action == HAAction.SET_COLOR: + color = (output_interface.color or "white").lower() + hs_color = COLOR_MAP.get(color, COLOR_MAP["white"]) + await self._publish(topic, {"state": "ON", "hs_color": hs_color}) + else: + logging.warning( + f"HomeAssistantMQTTConnector: action '{action.value}' " + f"not supported for light" + ) + + elif device_type == HADeviceType.SWITCH: + topic = self._build_topic("switch", entity_id) + if action == HAAction.TURN_ON: + await self._publish(topic, {"state": "ON"}) + elif action == HAAction.TURN_OFF: + await self._publish(topic, {"state": "OFF"}) + else: + logging.warning( + f"HomeAssistantMQTTConnector: action '{action.value}' " + f"not supported for switch" + ) + + elif device_type == HADeviceType.CLIMATE: + topic = self._build_topic("climate", entity_id) + if action == HAAction.SET_TEMPERATURE: + temperature = output_interface.temperature or 20.0 + await self._publish(topic, {"temperature": temperature}) + elif action == HAAction.TURN_ON: + await self._publish(topic, {"state": "ON"}) + elif action == HAAction.TURN_OFF: + await self._publish(topic, {"state": "OFF"}) + else: + logging.warning( + f"HomeAssistantMQTTConnector: action '{action.value}' " + f"not supported for climate" + ) + + else: + logging.warning( + f"HomeAssistantMQTTConnector: device_type '{device_type.value}' " + f"not supported" + ) diff --git a/src/actions/home_assistant/connector/rest_api.py b/src/actions/home_assistant/connector/rest_api.py new file mode 100644 index 000000000..69fd84dd9 --- /dev/null +++ b/src/actions/home_assistant/connector/rest_api.py @@ -0,0 +1,220 @@ +import asyncio +import logging +from typing import Optional + +import aiohttp +from pydantic import Field + +from actions.base import ActionConfig, ActionConnector +from actions.home_assistant.interface import ( + COLOR_MAP, + HAAction, + HADeviceType, + HomeAssistantInput, +) + + +class HomeAssistantConfig(ActionConfig): + """ + Configuration for Home Assistant REST API connector. + + Parameters + ---------- + base_url : str + Home Assistant base URL (e.g. http://homeassistant.local:8123). + token : str + Long-lived access token from Home Assistant profile. + timeout : float + Request timeout in seconds. + """ + + base_url: str = Field(default="", description="Home Assistant base URL") + token: str = Field(default="", description="Long-lived access token") + timeout: float = Field(default=10.0, description="Request timeout in seconds") + + +class HomeAssistantRESTConnector( + ActionConnector[HomeAssistantConfig, HomeAssistantInput] +): + """ + Connector for Home Assistant REST API. + + Controls smart home devices via the Home Assistant REST API. + Supports lights, switches, and climate devices. + """ + + def __init__(self, config: HomeAssistantConfig): + """ + Initialize the Home Assistant REST connector. + + Parameters + ---------- + config : HomeAssistantConfig + Configuration for the connector. + """ + super().__init__(config) + + if not self.config.base_url: + logging.warning( + "HomeAssistantRESTConnector: base_url not provided in configuration" + ) + if not self.config.token: + logging.warning( + "HomeAssistantRESTConnector: token not provided in configuration" + ) + + def _get_headers(self) -> dict: + """Build authorization headers.""" + return { + "Authorization": f"Bearer {self.config.token}", + "Content-Type": "application/json", + } + + def _build_service_url(self, domain: str, service: str) -> str: + """Build the HA service call URL.""" + base = self.config.base_url.rstrip("/") + return f"{base}/api/services/{domain}/{service}" + + async def _call_service( + self, domain: str, service: str, payload: dict + ) -> Optional[dict]: + """ + Call a Home Assistant service via REST API. + + Parameters + ---------- + domain : str + HA domain (e.g. light, switch, climate). + service : str + HA service (e.g. turn_on, turn_off). + payload : dict + Service call payload. + + Returns + ------- + Optional[dict] + Response data or None on failure. + """ + if not self.config.base_url or not self.config.token: + logging.error("HomeAssistantRESTConnector: base_url or token not set") + return None + + url = self._build_service_url(domain, service) + logging.info(f"HomeAssistantRESTConnector: POST {url} payload={payload}") + + try: + timeout = aiohttp.ClientTimeout(total=self.config.timeout) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post( + url, headers=self._get_headers(), json=payload + ) as response: + if response.status in (200, 201): + logging.info( + f"HomeAssistantRESTConnector: success {response.status}" + ) + return await response.json() + else: + error_text = await response.text() + logging.error( + f"HomeAssistantRESTConnector: error {response.status} - {error_text}" + ) + return None + except asyncio.TimeoutError: + logging.error("HomeAssistantRESTConnector: request timed out") + return None + except aiohttp.ClientError as e: + logging.error(f"HomeAssistantRESTConnector: network error - {e}") + return None + except Exception as e: + logging.error(f"HomeAssistantRESTConnector: unexpected error - {e}") + return None + + async def connect(self, output_interface: HomeAssistantInput) -> None: + """ + Execute a Home Assistant device control action. + + Parameters + ---------- + output_interface : HomeAssistantInput + Input containing device type, entity_id, action, and parameters. + """ + if not output_interface.entity_id: + logging.warning("HomeAssistantRESTConnector: entity_id is empty, skipping") + return + + entity_id = output_interface.entity_id + action = output_interface.action + device_type = output_interface.device_type + + logging.info( + f"HomeAssistantRESTConnector: {action.value} on {entity_id} " + f"(type={device_type.value})" + ) + + # LIGHT + if device_type == HADeviceType.LIGHT: + if action == HAAction.TURN_ON: + await self._call_service("light", "turn_on", {"entity_id": entity_id}) + elif action == HAAction.TURN_OFF: + await self._call_service("light", "turn_off", {"entity_id": entity_id}) + elif action == HAAction.SET_BRIGHTNESS: + brightness = max(0, min(255, output_interface.brightness)) + await self._call_service( + "light", + "turn_on", + {"entity_id": entity_id, "brightness": brightness}, + ) + elif action == HAAction.SET_COLOR: + color_name = output_interface.color.lower() + hs_color = COLOR_MAP.get(color_name, COLOR_MAP["white"]) + await self._call_service( + "light", + "turn_on", + {"entity_id": entity_id, "hs_color": hs_color}, + ) + else: + logging.warning( + f"HomeAssistantRESTConnector: action '{action.value}' " + f"not supported for light" + ) + + # SWITCH + elif device_type == HADeviceType.SWITCH: + if action == HAAction.TURN_ON: + await self._call_service("switch", "turn_on", {"entity_id": entity_id}) + elif action == HAAction.TURN_OFF: + await self._call_service("switch", "turn_off", {"entity_id": entity_id}) + else: + logging.warning( + f"HomeAssistantRESTConnector: action '{action.value}' " + f"not supported for switch" + ) + + # CLIMATE + elif device_type == HADeviceType.CLIMATE: + if action == HAAction.SET_TEMPERATURE: + await self._call_service( + "climate", + "set_temperature", + { + "entity_id": entity_id, + "temperature": output_interface.temperature, + }, + ) + elif action == HAAction.TURN_ON: + await self._call_service("climate", "turn_on", {"entity_id": entity_id}) + elif action == HAAction.TURN_OFF: + await self._call_service( + "climate", "turn_off", {"entity_id": entity_id} + ) + else: + logging.warning( + f"HomeAssistantRESTConnector: action '{action.value}' " + f"not supported for climate" + ) + + else: + logging.warning( + f"HomeAssistantRESTConnector: device_type '{device_type.value}' " + f"not supported" + ) diff --git a/src/actions/home_assistant/connector/websocket.py b/src/actions/home_assistant/connector/websocket.py new file mode 100644 index 000000000..2cea9eeaf --- /dev/null +++ b/src/actions/home_assistant/connector/websocket.py @@ -0,0 +1,229 @@ +"""Home Assistant WebSocket API action connector.""" + +import json +import logging +from typing import Optional + +import websockets +from pydantic import Field + +from actions.base import ActionConfig, ActionConnector +from actions.home_assistant.interface import ( + COLOR_MAP, + HAAction, + HADeviceType, + HomeAssistantInput, +) + + +class HomeAssistantWebSocketConfig(ActionConfig): + """ + Configuration for Home Assistant WebSocket connector. + + Parameters + ---------- + base_url : str + Home Assistant base URL (e.g. http://homeassistant.local:8123). + token : str + Long-lived access token from Home Assistant profile. + timeout : float + Connection and command timeout in seconds. + """ + + base_url: str = Field(default="", description="Home Assistant base URL") + token: str = Field(default="", description="Long-lived access token") + timeout: float = Field(default=10.0, description="Timeout in seconds") + + +class HomeAssistantWebSocketConnector( + ActionConnector[HomeAssistantWebSocketConfig, HomeAssistantInput] +): + """ + Home Assistant connector using WebSocket API. + + Provides persistent bidirectional connection to Home Assistant, + enabling real-time command execution with result confirmation. + """ + + def __init__(self, config: HomeAssistantWebSocketConfig): + """ + Initialize the WebSocket connector. + + Parameters + ---------- + config : HomeAssistantWebSocketConfig + Configuration for the connector. + """ + super().__init__(config) + base = config.base_url.rstrip("/") + if base.startswith("https://"): + self._ws_url = "wss://" + base[len("https://") :] + "/api/websocket" + elif base.startswith("http://"): + self._ws_url = "ws://" + base[len("http://") :] + "/api/websocket" + else: + self._ws_url = base + "/api/websocket" + self._token = config.token + self._timeout = config.timeout + self._msg_id = 1 + + if not config.base_url: + logging.warning("HomeAssistantWebSocketConnector: base_url not provided") + if not config.token: + logging.warning("HomeAssistantWebSocketConnector: token not provided") + + async def _send_command( + self, + domain: str, + service: str, + entity_id: str, + service_data: Optional[dict] = None, + ) -> bool: + """ + Open a WebSocket connection, authenticate, and send a service command. + + Parameters + ---------- + domain : str + Service domain (e.g. 'light', 'switch', 'climate'). + service : str + Service name (e.g. 'turn_on', 'turn_off'). + entity_id : str + Target entity ID. + service_data : Optional[dict] + Additional service data (brightness, temperature, etc.). + + Returns + ------- + bool + True if command succeeded, False otherwise. + """ + if not self._ws_url or not self._token: + return False + + try: + async with websockets.connect( + self._ws_url, open_timeout=self._timeout + ) as ws: + raw = await ws.recv() + msg = json.loads(raw) + if msg.get("type") != "auth_required": + logging.error( + f"HomeAssistantWebSocketConnector: expected auth_required, " + f"got {msg.get('type')}" + ) + return False + + await ws.send(json.dumps({"type": "auth", "access_token": self._token})) + raw = await ws.recv() + msg = json.loads(raw) + if msg.get("type") != "auth_ok": + logging.error( + "HomeAssistantWebSocketConnector: authentication failed" + ) + return False + + command = { + "id": self._msg_id, + "type": "call_service", + "domain": domain, + "service": service, + "target": {"entity_id": entity_id}, + "service_data": service_data or {}, + } + self._msg_id += 1 + + await ws.send(json.dumps(command)) + raw = await ws.recv() + result = json.loads(raw) + + if result.get("success"): + return True + else: + error = result.get("error", {}) + logging.error( + f"HomeAssistantWebSocketConnector: command failed - " + f"{error.get('code')}: {error.get('message')}" + ) + return False + + except TimeoutError: + logging.error("HomeAssistantWebSocketConnector: connection timed out") + return False + except websockets.exceptions.WebSocketException as e: + logging.error(f"HomeAssistantWebSocketConnector: WebSocket error - {e}") + return False + except Exception as e: + logging.error(f"HomeAssistantWebSocketConnector: unexpected error - {e}") + return False + + async def connect(self, output_interface: HomeAssistantInput) -> None: + """ + Execute a Home Assistant device control action via WebSocket. + + Parameters + ---------- + output_interface : HomeAssistantInput + The action request containing device_type, entity_id, action, + and optional parameters. + """ + device_type = output_interface.device_type + entity_id = output_interface.entity_id + action = output_interface.action + + if device_type == HADeviceType.LIGHT: + if action == HAAction.TURN_ON: + await self._send_command("light", "turn_on", entity_id) + elif action == HAAction.TURN_OFF: + await self._send_command("light", "turn_off", entity_id) + elif action == HAAction.SET_BRIGHTNESS: + brightness = output_interface.brightness or 255 + await self._send_command( + "light", "turn_on", entity_id, {"brightness": brightness} + ) + elif action == HAAction.SET_COLOR: + color = (output_interface.color or "white").lower() + hs_color = COLOR_MAP.get(color, COLOR_MAP["white"]) + await self._send_command( + "light", "turn_on", entity_id, {"hs_color": hs_color} + ) + else: + logging.warning( + f"HomeAssistantWebSocketConnector: action '{action.value}' " + f"not supported for light" + ) + + elif device_type == HADeviceType.SWITCH: + if action == HAAction.TURN_ON: + await self._send_command("switch", "turn_on", entity_id) + elif action == HAAction.TURN_OFF: + await self._send_command("switch", "turn_off", entity_id) + else: + logging.warning( + f"HomeAssistantWebSocketConnector: action '{action.value}' " + f"not supported for switch" + ) + + elif device_type == HADeviceType.CLIMATE: + if action == HAAction.SET_TEMPERATURE: + temperature = output_interface.temperature or 20.0 + await self._send_command( + "climate", + "set_temperature", + entity_id, + {"temperature": temperature}, + ) + elif action == HAAction.TURN_ON: + await self._send_command("climate", "turn_on", entity_id) + elif action == HAAction.TURN_OFF: + await self._send_command("climate", "turn_off", entity_id) + else: + logging.warning( + f"HomeAssistantWebSocketConnector: action '{action.value}' " + f"not supported for climate" + ) + + else: + logging.warning( + f"HomeAssistantWebSocketConnector: device_type '{device_type.value}' " + f"not supported" + ) diff --git a/src/actions/home_assistant/interface.py b/src/actions/home_assistant/interface.py new file mode 100644 index 000000000..9017417a7 --- /dev/null +++ b/src/actions/home_assistant/interface.py @@ -0,0 +1,80 @@ +from dataclasses import dataclass +from enum import Enum + +from actions.base import Interface + + +class HADeviceType(str, Enum): + """Supported Home Assistant device types.""" + + LIGHT = "light" + SWITCH = "switch" + CLIMATE = "climate" + + +class HAAction(str, Enum): + """Supported Home Assistant actions.""" + + TURN_ON = "turn_on" + TURN_OFF = "turn_off" + SET_BRIGHTNESS = "set_brightness" + SET_COLOR = "set_color" + SET_TEMPERATURE = "set_temperature" + + +@dataclass +class HomeAssistantInput: + """ + Input interface for the Home Assistant action. + + Parameters + ---------- + device_type : HADeviceType + The type of device to control (light, switch, climate). + entity_id : str + The Home Assistant entity ID (e.g. light.living_room). + action : HAAction + The action to perform on the device. + brightness : int + Brightness level for lights (0-255). + color : str + Color name for lights (e.g. red, blue, green). + temperature : float + Target temperature for climate devices in Celsius. + """ + + device_type: HADeviceType = HADeviceType.LIGHT + entity_id: str = "" + action: HAAction = HAAction.TURN_ON + brightness: int = 255 + color: str = "" + temperature: float = 22.0 + + +@dataclass +class HomeAssistant(Interface[HomeAssistantInput, HomeAssistantInput]): + """ + This action allows the robot to control smart home devices via Home Assistant. + + Effect: Controls IoT devices including lights (on/off, brightness, color), + switches (on/off), and thermostats (temperature setting) through the + Home Assistant REST API. + """ + + input: HomeAssistantInput + output: HomeAssistantInput + + +COLOR_MAP: dict[str, list[int]] = { + "red": [0, 100], + "green": [120, 100], + "blue": [240, 100], + "yellow": [60, 100], + "orange": [30, 100], + "purple": [270, 100], + "pink": [300, 100], + "white": [0, 0], + "warm white": [30, 20], + "cool white": [200, 10], + "cyan": [180, 100], +} diff --git a/src/inputs/plugins/home_assistant.py b/src/inputs/plugins/home_assistant.py new file mode 100644 index 000000000..6f6039477 --- /dev/null +++ b/src/inputs/plugins/home_assistant.py @@ -0,0 +1,270 @@ +import asyncio +import logging +import time +from typing import Optional + +import aiohttp +from pydantic import Field + +from inputs.base import Message, SensorConfig +from inputs.base.loop import FuserInput +from providers.io_provider import IOProvider + + +class HomeAssistantInputConfig(SensorConfig): + """ + Configuration for Home Assistant State Input. + + Parameters + ---------- + base_url : str + Home Assistant base URL (e.g. http://homeassistant.local:8123). + token : str + Long-lived access token from Home Assistant profile. + entity_ids : str + Comma-separated list of entity IDs to monitor + (e.g. light.living_room,switch.fan,climate.bedroom). + poll_interval : float + Seconds between state updates (default: 30). + """ + + base_url: str = Field(default="", description="Home Assistant base URL") + token: str = Field(default="", description="Long-lived access token") + entity_ids: str = Field( + default="", description="Comma-separated entity IDs to monitor" + ) + poll_interval: float = Field( + default=30.0, description="Seconds between state polls" + ) + + +class HomeAssistantStateInput(FuserInput[HomeAssistantInputConfig, Optional[list]]): + """ + Home Assistant state input that polls device states and reports changes. + + Monitors smart home device states and provides updates to the LLM + when states change, enabling context-aware robot responses. + """ + + def __init__(self, config: HomeAssistantInputConfig): + """ + Initialize the Home Assistant state input. + + Parameters + ---------- + config : HomeAssistantInputConfig + Configuration for the state input. + """ + super().__init__(config) + + self.io_provider = IOProvider() + self.messages: list[Message] = [] + self.descriptor_for_LLM = "Home Assistant Device States" + + self.base_url = config.base_url.rstrip("/") + self.token = config.token + self.entity_ids = [e.strip() for e in config.entity_ids.split(",") if e.strip()] + self.poll_interval = config.poll_interval + + self._last_poll_time: float = 0 + self._last_states: dict = {} + + if not self.base_url: + logging.warning("HomeAssistantStateInput: base_url not provided") + if not self.token: + logging.warning("HomeAssistantStateInput: token not provided") + if not self.entity_ids: + logging.warning("HomeAssistantStateInput: no entity_ids configured") + + def _get_headers(self) -> dict: + """Build authorization headers.""" + return { + "Authorization": f"Bearer {self.token}", + "Content-Type": "application/json", + } + + async def _fetch_state(self, entity_id: str) -> Optional[dict]: + """ + Fetch the current state of a single entity from Home Assistant. + + Parameters + ---------- + entity_id : str + The entity ID to fetch. + + Returns + ------- + Optional[dict] + State data or None on failure. + """ + if not self.base_url or not self.token: + return None + + url = f"{self.base_url}/api/states/{entity_id}" + + try: + timeout = aiohttp.ClientTimeout(total=10) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(url, headers=self._get_headers()) as response: + if response.status == 200: + return await response.json() + else: + logging.error( + f"HomeAssistantStateInput: error fetching {entity_id} " + f"status={response.status}" + ) + return None + except asyncio.TimeoutError: + logging.error(f"HomeAssistantStateInput: timeout fetching {entity_id}") + return None + except aiohttp.ClientError as e: + logging.error( + f"HomeAssistantStateInput: network error fetching {entity_id}: {e}" + ) + return None + except Exception as e: + logging.error( + f"HomeAssistantStateInput: unexpected error fetching {entity_id}: {e}" + ) + return None + + async def _poll(self) -> Optional[list]: + """ + Poll Home Assistant for device states at configured interval. + + Returns + ------- + Optional[list] + List of state dicts when poll interval elapsed, None otherwise. + """ + current_time = time.time() + + if current_time - self._last_poll_time < self.poll_interval: + await asyncio.sleep(1.0) + return None + + self._last_poll_time = current_time + await asyncio.sleep(1.0) + + if not self.entity_ids: + return None + + states = [] + for entity_id in self.entity_ids: + state = await self._fetch_state(entity_id) + if state is not None: + states.append(state) + + return states if states else None + + def _format_state(self, state: dict) -> str: + """ + Format a single entity state into human-readable text. + + Parameters + ---------- + state : dict + Raw state data from Home Assistant API. + + Returns + ------- + str + Human-readable state description. + """ + entity_id = state.get("entity_id", "unknown") + current_state = state.get("state", "unknown") + attributes = state.get("attributes", {}) + friendly_name = attributes.get("friendly_name", entity_id) + + parts = [f"{friendly_name} ({entity_id}) is {current_state}"] + + if "brightness" in attributes and attributes["brightness"] is not None: + brightness_pct = round(attributes["brightness"] / 255 * 100) + parts.append(f"brightness {brightness_pct}%") + + if "color_name" in attributes: + parts.append(f"color {attributes['color_name']}") + + if "temperature" in attributes: + parts.append(f"temperature {attributes['temperature']}°C") + + if "current_temperature" in attributes: + parts.append(f"current temperature {attributes['current_temperature']}°C") + + return ", ".join(parts) + + async def _raw_to_text(self, raw_input: Optional[list]) -> Optional[Message]: + """ + Convert raw state list to human-readable message, only when states change. + + Parameters + ---------- + raw_input : Optional[list] + List of state dicts from Home Assistant. + + Returns + ------- + Optional[Message] + Formatted message if states changed, None otherwise. + """ + if raw_input is None: + return None + + changed = [] + for state in raw_input: + entity_id = state.get("entity_id", "") + current_state = state.get("state", "") + last_known = self._last_states.get(entity_id) + + if last_known != current_state: + changed.append(state) + self._last_states[entity_id] = current_state + + if not changed: + return None + + lines = [self._format_state(s) for s in changed] + message = "Smart home device updates: " + "; ".join(lines) + return Message(timestamp=time.time(), message=message) + + async def raw_to_text(self, raw_input: Optional[list]): + """ + Update message buffer with processed state data. + + Parameters + ---------- + raw_input : Optional[list] + Raw state list to process. + """ + pending_message = await self._raw_to_text(raw_input) + + if pending_message is not None: + self.messages.append(pending_message) + + def formatted_latest_buffer(self) -> Optional[str]: + """ + Format and clear the latest buffer contents. + + Returns + ------- + Optional[str] + Formatted string for LLM or None if buffer is empty. + """ + if not self.messages: + return None + + latest_message = self.messages[-1] + + result = ( + f"\nINPUT: {self.descriptor_for_LLM}\n// START\n" + f"{latest_message.message}\n// END\n" + ) + + self.io_provider.add_input( + self.descriptor_for_LLM, + latest_message.message, + latest_message.timestamp, + ) + self.messages = [] + + return result diff --git a/tests/actions/home_assistant/__init__.py b/tests/actions/home_assistant/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/actions/home_assistant/connector/__init__.py b/tests/actions/home_assistant/connector/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/actions/home_assistant/connector/test_home_assistant_mqtt.py b/tests/actions/home_assistant/connector/test_home_assistant_mqtt.py new file mode 100644 index 000000000..f3031c550 --- /dev/null +++ b/tests/actions/home_assistant/connector/test_home_assistant_mqtt.py @@ -0,0 +1,438 @@ +"""Tests for Home Assistant MQTT action connector.""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from actions.home_assistant.connector.mqtt import ( + COLOR_MAP, + HomeAssistantMQTTConfig, + HomeAssistantMQTTConnector, +) +from actions.home_assistant.interface import ( + HAAction, + HADeviceType, + HomeAssistantInput, +) + + +def make_connector(broker="mqtt.local", port=1883): + """Helper to create a connector instance.""" + config = HomeAssistantMQTTConfig( + broker=broker, + port=port, + username="user", + password="pass", + topic_prefix="homeassistant", + timeout=5.0, + ) + return HomeAssistantMQTTConnector(config) + + +def mock_mqtt_session(): + """Helper to mock aiomqtt.Client context manager.""" + mock_client = AsyncMock() + mock_client.publish = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + ctx = patch( + "actions.home_assistant.connector.mqtt.aiomqtt.Client", + return_value=mock_client, + ) + return ctx, mock_client + + +class TestHomeAssistantMQTTConfig: + """Tests for HomeAssistantMQTTConfig.""" + + def test_default_values(self): + """Test config default values.""" + config = HomeAssistantMQTTConfig() + assert config.broker == "" + assert config.port == 1883 + assert config.username == "" + assert config.password == "" + assert config.topic_prefix == "homeassistant" + assert config.timeout == 10.0 + + def test_custom_values(self): + """Test config with custom values.""" + config = HomeAssistantMQTTConfig( + broker="192.168.1.100", + port=8883, + username="admin", + password="secret", + topic_prefix="ha", + timeout=30.0, + ) + assert config.broker == "192.168.1.100" + assert config.port == 8883 + assert config.username == "admin" + assert config.password == "secret" + assert config.topic_prefix == "ha" + assert config.timeout == 30.0 + + +class TestHomeAssistantMQTTConnectorInit: + """Tests for connector initialization.""" + + def test_init_stores_config(self): + """Test that config values are stored correctly.""" + connector = make_connector() + assert connector._broker == "mqtt.local" + assert connector._port == 1883 + assert connector._topic_prefix == "homeassistant" + + def test_empty_username_becomes_none(self): + """Test that empty username is converted to None.""" + config = HomeAssistantMQTTConfig(broker="mqtt.local") + connector = HomeAssistantMQTTConnector(config) + assert connector._username is None + + def test_empty_password_becomes_none(self): + """Test that empty password is converted to None.""" + config = HomeAssistantMQTTConfig(broker="mqtt.local") + connector = HomeAssistantMQTTConnector(config) + assert connector._password is None + + def test_trailing_slash_stripped_from_prefix(self): + """Test that trailing slash is stripped from topic_prefix.""" + config = HomeAssistantMQTTConfig(broker="mqtt.local", topic_prefix="ha/") + connector = HomeAssistantMQTTConnector(config) + assert connector._topic_prefix == "ha" + + def test_warns_missing_broker(self): + """Test warning when broker is missing.""" + with patch( + "actions.home_assistant.connector.mqtt.logging.warning" + ) as mock_warn: + config = HomeAssistantMQTTConfig(broker="") + HomeAssistantMQTTConnector(config) + assert any("broker" in str(c) for c in mock_warn.call_args_list) + + +class TestBuildTopic: + """Tests for _build_topic().""" + + def test_build_topic_with_dot_entity_id(self): + """Test topic building with standard entity_id format.""" + connector = make_connector() + topic = connector._build_topic("light", "light.living_room") + assert topic == "homeassistant/light/living_room/set" + + def test_build_topic_without_dot(self): + """Test topic building when entity_id has no dot.""" + connector = make_connector() + topic = connector._build_topic("switch", "fan") + assert topic == "homeassistant/switch/fan/set" + + def test_build_topic_custom_prefix(self): + """Test topic building with custom prefix.""" + config = HomeAssistantMQTTConfig(broker="mqtt.local", topic_prefix="myhome") + connector = HomeAssistantMQTTConnector(config) + topic = connector._build_topic("light", "light.bedroom") + assert topic == "myhome/light/bedroom/set" + + +class TestPublish: + """Tests for _publish().""" + + @pytest.mark.asyncio + async def test_publish_success(self): + """Test successful publish.""" + connector = make_connector() + ctx, mock_client = mock_mqtt_session() + with ctx: + result = await connector._publish( + "homeassistant/light/x/set", {"state": "ON"} + ) + assert result is True + mock_client.publish.assert_called_once() + + @pytest.mark.asyncio + async def test_publish_no_broker(self): + """Test that missing broker returns False.""" + config = HomeAssistantMQTTConfig(broker="") + connector = HomeAssistantMQTTConnector(config) + result = await connector._publish("topic", {"state": "ON"}) + assert result is False + + @pytest.mark.asyncio + async def test_publish_mqtt_error(self): + """Test handling of MqttError.""" + import aiomqtt + + connector = make_connector() + with patch("actions.home_assistant.connector.mqtt.aiomqtt.Client") as mock_cls: + mock_cls.side_effect = aiomqtt.MqttError("connection refused") + with patch( + "actions.home_assistant.connector.mqtt.logging.error" + ) as mock_err: + result = await connector._publish("topic", {"state": "ON"}) + assert result is False + assert any("MQTT error" in str(c) for c in mock_err.call_args_list) + + @pytest.mark.asyncio + async def test_publish_timeout(self): + """Test handling of timeout error.""" + connector = make_connector() + with patch("actions.home_assistant.connector.mqtt.aiomqtt.Client") as mock_cls: + mock_cls.side_effect = TimeoutError() + with patch( + "actions.home_assistant.connector.mqtt.logging.error" + ) as mock_err: + result = await connector._publish("topic", {"state": "ON"}) + assert result is False + assert any("timed out" in str(c) for c in mock_err.call_args_list) + + @pytest.mark.asyncio + async def test_publish_unexpected_exception(self): + """Test handling of unexpected exception.""" + connector = make_connector() + with patch("actions.home_assistant.connector.mqtt.aiomqtt.Client") as mock_cls: + mock_cls.side_effect = RuntimeError("boom") + with patch( + "actions.home_assistant.connector.mqtt.logging.error" + ) as mock_err: + result = await connector._publish("topic", {"state": "ON"}) + assert result is False + assert any( + "unexpected error" in str(c) for c in mock_err.call_args_list + ) + + +class TestConnectLight: + """Tests for light device control.""" + + @pytest.mark.asyncio + async def test_light_turn_on(self): + """Test light turn on.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.living_room", + action=HAAction.TURN_ON, + ) + with patch.object(connector, "_publish", new_callable=AsyncMock) as mock_pub: + await connector.connect(inp) + mock_pub.assert_called_once_with( + "homeassistant/light/living_room/set", {"state": "ON"} + ) + + @pytest.mark.asyncio + async def test_light_turn_off(self): + """Test light turn off.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.living_room", + action=HAAction.TURN_OFF, + ) + with patch.object(connector, "_publish", new_callable=AsyncMock) as mock_pub: + await connector.connect(inp) + mock_pub.assert_called_once_with( + "homeassistant/light/living_room/set", {"state": "OFF"} + ) + + @pytest.mark.asyncio + async def test_light_set_brightness(self): + """Test light set brightness.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.living_room", + action=HAAction.SET_BRIGHTNESS, + brightness=200, + ) + with patch.object(connector, "_publish", new_callable=AsyncMock) as mock_pub: + await connector.connect(inp) + mock_pub.assert_called_once_with( + "homeassistant/light/living_room/set", + {"state": "ON", "brightness": 200}, + ) + + @pytest.mark.asyncio + async def test_light_set_color(self): + """Test light set color.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.living_room", + action=HAAction.SET_COLOR, + color="blue", + ) + with patch.object(connector, "_publish", new_callable=AsyncMock) as mock_pub: + await connector.connect(inp) + mock_pub.assert_called_once_with( + "homeassistant/light/living_room/set", + {"state": "ON", "hs_color": COLOR_MAP["blue"]}, + ) + + @pytest.mark.asyncio + async def test_light_unknown_color_defaults_to_white(self): + """Test that unknown color defaults to white.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.SET_COLOR, + color="magenta", + ) + with patch.object(connector, "_publish", new_callable=AsyncMock) as mock_pub: + await connector.connect(inp) + mock_pub.assert_called_once_with( + "homeassistant/light/x/set", + {"state": "ON", "hs_color": COLOR_MAP["white"]}, + ) + + @pytest.mark.asyncio + async def test_light_unsupported_action_warns(self): + """Test warning for unsupported light action.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.SET_TEMPERATURE, + ) + with patch( + "actions.home_assistant.connector.mqtt.logging.warning" + ) as mock_warn: + await connector.connect(inp) + assert any("not supported" in str(c) for c in mock_warn.call_args_list) + + +class TestConnectSwitch: + """Tests for switch device control.""" + + @pytest.mark.asyncio + async def test_switch_turn_on(self): + """Test switch turn on.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.SWITCH, + entity_id="switch.fan", + action=HAAction.TURN_ON, + ) + with patch.object(connector, "_publish", new_callable=AsyncMock) as mock_pub: + await connector.connect(inp) + mock_pub.assert_called_once_with( + "homeassistant/switch/fan/set", {"state": "ON"} + ) + + @pytest.mark.asyncio + async def test_switch_turn_off(self): + """Test switch turn off.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.SWITCH, + entity_id="switch.fan", + action=HAAction.TURN_OFF, + ) + with patch.object(connector, "_publish", new_callable=AsyncMock) as mock_pub: + await connector.connect(inp) + mock_pub.assert_called_once_with( + "homeassistant/switch/fan/set", {"state": "OFF"} + ) + + @pytest.mark.asyncio + async def test_switch_unsupported_action_warns(self): + """Test warning for unsupported switch action.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.SWITCH, + entity_id="switch.fan", + action=HAAction.SET_TEMPERATURE, + ) + with patch( + "actions.home_assistant.connector.mqtt.logging.warning" + ) as mock_warn: + await connector.connect(inp) + assert any("not supported" in str(c) for c in mock_warn.call_args_list) + + +class TestConnectClimate: + """Tests for climate device control.""" + + @pytest.mark.asyncio + async def test_climate_set_temperature(self): + """Test climate set temperature.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.CLIMATE, + entity_id="climate.bedroom", + action=HAAction.SET_TEMPERATURE, + temperature=22.0, + ) + with patch.object(connector, "_publish", new_callable=AsyncMock) as mock_pub: + await connector.connect(inp) + mock_pub.assert_called_once_with( + "homeassistant/climate/bedroom/set", {"temperature": 22.0} + ) + + @pytest.mark.asyncio + async def test_climate_turn_on(self): + """Test climate turn on.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.CLIMATE, + entity_id="climate.bedroom", + action=HAAction.TURN_ON, + ) + with patch.object(connector, "_publish", new_callable=AsyncMock) as mock_pub: + await connector.connect(inp) + mock_pub.assert_called_once_with( + "homeassistant/climate/bedroom/set", {"state": "ON"} + ) + + @pytest.mark.asyncio + async def test_climate_turn_off(self): + """Test climate turn off.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.CLIMATE, + entity_id="climate.bedroom", + action=HAAction.TURN_OFF, + ) + with patch.object(connector, "_publish", new_callable=AsyncMock) as mock_pub: + await connector.connect(inp) + mock_pub.assert_called_once_with( + "homeassistant/climate/bedroom/set", {"state": "OFF"} + ) + + @pytest.mark.asyncio + async def test_climate_unsupported_action_warns(self): + """Test warning for unsupported climate action.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.CLIMATE, + entity_id="climate.bedroom", + action=HAAction.SET_BRIGHTNESS, + ) + with patch( + "actions.home_assistant.connector.mqtt.logging.warning" + ) as mock_warn: + await connector.connect(inp) + assert any("not supported" in str(c) for c in mock_warn.call_args_list) + + +class TestConnectUnsupportedDeviceType: + """Tests for unsupported device type.""" + + @pytest.mark.asyncio + async def test_unsupported_device_type_warns(self): + """Test warning for unsupported device type.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, entity_id="light.x", action=HAAction.TURN_ON + ) + with patch( + "actions.home_assistant.connector.mqtt.logging.warning" + ) as mock_warn: + with patch( + "actions.home_assistant.connector.mqtt.HADeviceType" + ) as mock_enum: + mock_enum.LIGHT = "FAKE" + mock_enum.SWITCH = "FAKE" + mock_enum.CLIMATE = "FAKE" + await connector.connect(inp) + assert any("not supported" in str(c) for c in mock_warn.call_args_list) diff --git a/tests/actions/home_assistant/connector/test_home_assistant_rest_api.py b/tests/actions/home_assistant/connector/test_home_assistant_rest_api.py new file mode 100644 index 000000000..17b00748b --- /dev/null +++ b/tests/actions/home_assistant/connector/test_home_assistant_rest_api.py @@ -0,0 +1,481 @@ +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import aiohttp +import pytest + +from actions.home_assistant.connector.rest_api import ( + COLOR_MAP, + HomeAssistantConfig, + HomeAssistantRESTConnector, +) +from actions.home_assistant.interface import ( + HAAction, + HADeviceType, + HomeAssistant, + HomeAssistantInput, +) + + +class TestHomeAssistantInput: + def test_default_values(self): + inp = HomeAssistantInput() + assert inp.device_type == HADeviceType.LIGHT + assert inp.entity_id == "" + assert inp.action == HAAction.TURN_ON + assert inp.brightness == 255 + assert inp.color == "" + assert inp.temperature == 22.0 + + def test_custom_values(self): + inp = HomeAssistantInput( + device_type=HADeviceType.SWITCH, + entity_id="switch.fan", + action=HAAction.TURN_OFF, + ) + assert inp.device_type == HADeviceType.SWITCH + assert inp.entity_id == "switch.fan" + assert inp.action == HAAction.TURN_OFF + + def test_interface_creation(self): + inp = HomeAssistantInput(entity_id="light.living_room") + iface = HomeAssistant(input=inp, output=inp) + assert iface.input.entity_id == "light.living_room" + + +class TestHomeAssistantConfig: + def test_defaults(self): + config = HomeAssistantConfig() + assert config.base_url == "" + assert config.token == "" + assert config.timeout == 10.0 + + def test_custom_values(self): + config = HomeAssistantConfig( + base_url="http://ha.local:8123", + token="abc123", + timeout=5.0, + ) + assert config.base_url == "http://ha.local:8123" + assert config.token == "abc123" + assert config.timeout == 5.0 + + +class TestHomeAssistantRESTConnectorInit: + def test_init_warns_missing_base_url(self): + with patch( + "actions.home_assistant.connector.rest_api.logging.warning" + ) as mock_warn: + HomeAssistantRESTConnector(HomeAssistantConfig(token="tok")) + warned_msgs = [str(c) for c in mock_warn.call_args_list] + assert any("base_url" in m for m in warned_msgs) + + def test_init_warns_missing_token(self): + with patch( + "actions.home_assistant.connector.rest_api.logging.warning" + ) as mock_warn: + HomeAssistantRESTConnector( + HomeAssistantConfig(base_url="http://ha.local:8123") + ) + warned_msgs = [str(c) for c in mock_warn.call_args_list] + assert any("token" in m for m in warned_msgs) + + def test_init_no_warning_when_configured(self): + with patch( + "actions.home_assistant.connector.rest_api.logging.warning" + ) as mock_warn: + HomeAssistantRESTConnector( + HomeAssistantConfig(base_url="http://ha.local:8123", token="tok") + ) + mock_warn.assert_not_called() + + +def make_connector(base_url="http://ha.local:8123", token="test-token"): + return HomeAssistantRESTConnector( + HomeAssistantConfig(base_url=base_url, token=token) + ) + + +def mock_ha_session(status=200, json_data=None): + """Return a patch context for aiohttp.ClientSession.""" + mock_response = AsyncMock() + mock_response.status = status + mock_response.json = AsyncMock(return_value=json_data or []) + mock_response.text = AsyncMock(return_value="error text") + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + + mock_post = MagicMock() + mock_post.__aenter__ = AsyncMock(return_value=mock_response) + mock_post.__aexit__ = AsyncMock(return_value=None) + + mock_session = MagicMock() + mock_session.post = MagicMock(return_value=mock_post) + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=None) + + return ( + patch( + "actions.home_assistant.connector.rest_api.aiohttp.ClientSession", + return_value=mock_session, + ), + mock_session, + ) + + +class TestConnectMissingConfig: + @pytest.mark.asyncio + async def test_skips_empty_entity_id(self): + connector = make_connector() + inp = HomeAssistantInput(entity_id="") + with patch( + "actions.home_assistant.connector.rest_api.logging.warning" + ) as mock_warn: + await connector.connect(inp) + mock_warn.assert_called_once() + assert "entity_id" in str(mock_warn.call_args) + + @pytest.mark.asyncio + async def test_returns_none_when_no_base_url(self): + connector = HomeAssistantRESTConnector( + HomeAssistantConfig(base_url="", token="tok") + ) + inp = HomeAssistantInput(entity_id="light.x") + with patch( + "actions.home_assistant.connector.rest_api.logging.error" + ) as mock_err: + await connector.connect(inp) + assert any( + "base_url" in str(c) or "token" in str(c) + for c in mock_err.call_args_list + ) + + +class TestConnectLight: + @pytest.mark.asyncio + async def test_light_turn_on(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.living_room", + action=HAAction.TURN_ON, + ) + ctx, mock_session = mock_ha_session() + with ctx: + await connector.connect(inp) + mock_session.post.assert_called_once() + url = mock_session.post.call_args[0][0] + assert "light/turn_on" in url + payload = mock_session.post.call_args[1]["json"] + assert payload["entity_id"] == "light.living_room" + + @pytest.mark.asyncio + async def test_light_turn_off(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.living_room", + action=HAAction.TURN_OFF, + ) + ctx, mock_session = mock_ha_session() + with ctx: + await connector.connect(inp) + url = mock_session.post.call_args[0][0] + assert "light/turn_off" in url + + @pytest.mark.asyncio + async def test_light_set_brightness(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.bedroom", + action=HAAction.SET_BRIGHTNESS, + brightness=128, + ) + ctx, mock_session = mock_ha_session() + with ctx: + await connector.connect(inp) + url = mock_session.post.call_args[0][0] + assert "light/turn_on" in url + payload = mock_session.post.call_args[1]["json"] + assert payload["brightness"] == 128 + + @pytest.mark.asyncio + async def test_light_brightness_clamped(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.bedroom", + action=HAAction.SET_BRIGHTNESS, + brightness=999, + ) + ctx, mock_session = mock_ha_session() + with ctx: + await connector.connect(inp) + payload = mock_session.post.call_args[1]["json"] + assert payload["brightness"] == 255 + + @pytest.mark.asyncio + async def test_light_set_color_red(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.kitchen", + action=HAAction.SET_COLOR, + color="red", + ) + ctx, mock_session = mock_ha_session() + with ctx: + await connector.connect(inp) + payload = mock_session.post.call_args[1]["json"] + assert payload["hs_color"] == COLOR_MAP["red"] + + @pytest.mark.asyncio + async def test_light_set_unknown_color_defaults_white(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.kitchen", + action=HAAction.SET_COLOR, + color="chartreuse", + ) + ctx, mock_session = mock_ha_session() + with ctx: + await connector.connect(inp) + payload = mock_session.post.call_args[1]["json"] + assert payload["hs_color"] == COLOR_MAP["white"] + + +class TestConnectSwitch: + @pytest.mark.asyncio + async def test_switch_turn_on(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.SWITCH, + entity_id="switch.fan", + action=HAAction.TURN_ON, + ) + ctx, mock_session = mock_ha_session() + with ctx: + await connector.connect(inp) + url = mock_session.post.call_args[0][0] + assert "switch/turn_on" in url + + @pytest.mark.asyncio + async def test_switch_turn_off(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.SWITCH, + entity_id="switch.fan", + action=HAAction.TURN_OFF, + ) + ctx, mock_session = mock_ha_session() + with ctx: + await connector.connect(inp) + url = mock_session.post.call_args[0][0] + assert "switch/turn_off" in url + + @pytest.mark.asyncio + async def test_switch_unsupported_action_warns(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.SWITCH, + entity_id="switch.fan", + action=HAAction.SET_BRIGHTNESS, + ) + ctx, mock_session = mock_ha_session() + with ctx: + with patch( + "actions.home_assistant.connector.rest_api.logging.warning" + ) as mock_warn: + await connector.connect(inp) + assert any("not supported" in str(c) for c in mock_warn.call_args_list) + + +class TestConnectClimate: + @pytest.mark.asyncio + async def test_climate_set_temperature(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.CLIMATE, + entity_id="climate.bedroom", + action=HAAction.SET_TEMPERATURE, + temperature=24.5, + ) + ctx, mock_session = mock_ha_session() + with ctx: + await connector.connect(inp) + url = mock_session.post.call_args[0][0] + assert "climate/set_temperature" in url + payload = mock_session.post.call_args[1]["json"] + assert payload["temperature"] == 24.5 + + @pytest.mark.asyncio + async def test_climate_turn_on(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.CLIMATE, + entity_id="climate.bedroom", + action=HAAction.TURN_ON, + ) + ctx, mock_session = mock_ha_session() + with ctx: + await connector.connect(inp) + url = mock_session.post.call_args[0][0] + assert "climate/turn_on" in url + + +class TestNetworkErrors: + @pytest.mark.asyncio + async def test_handles_timeout(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.TURN_ON, + ) + with patch( + "actions.home_assistant.connector.rest_api.aiohttp.ClientSession" + ) as mock_cls: + mock_cls.side_effect = asyncio.TimeoutError() + with patch( + "actions.home_assistant.connector.rest_api.logging.error" + ) as mock_err: + await connector.connect(inp) + assert any( + "timed out" in str(c) or "timeout" in str(c).lower() + for c in mock_err.call_args_list + ) + + @pytest.mark.asyncio + async def test_handles_client_error(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.TURN_ON, + ) + with patch( + "actions.home_assistant.connector.rest_api.aiohttp.ClientSession" + ) as mock_cls: + mock_cls.side_effect = aiohttp.ClientError("connection refused") + with patch( + "actions.home_assistant.connector.rest_api.logging.error" + ) as mock_err: + await connector.connect(inp) + assert any( + "network error" in str(c).lower() for c in mock_err.call_args_list + ) + + @pytest.mark.asyncio + async def test_handles_error_status(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.TURN_ON, + ) + ctx, _ = mock_ha_session(status=401) + with ctx: + with patch( + "actions.home_assistant.connector.rest_api.logging.error" + ) as mock_err: + await connector.connect(inp) + assert any("401" in str(c) for c in mock_err.call_args_list) + + +class TestCoverageGaps: + @pytest.mark.asyncio + async def test_call_service_unexpected_exception(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.TURN_ON, + ) + with patch( + "actions.home_assistant.connector.rest_api.aiohttp.ClientSession" + ) as mock_cls: + mock_cls.side_effect = RuntimeError("unexpected boom") + with patch( + "actions.home_assistant.connector.rest_api.logging.error" + ) as mock_err: + await connector.connect(inp) + assert any( + "unexpected error" in str(c).lower() + for c in mock_err.call_args_list + ) + + @pytest.mark.asyncio + async def test_light_unsupported_action_warns(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.SET_TEMPERATURE, + ) + ctx, mock_session = mock_ha_session() + with ctx: + with patch( + "actions.home_assistant.connector.rest_api.logging.warning" + ) as mock_warn: + await connector.connect(inp) + assert any("not supported" in str(c) for c in mock_warn.call_args_list) + + @pytest.mark.asyncio + async def test_climate_unsupported_action_warns(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.CLIMATE, + entity_id="climate.bedroom", + action=HAAction.SET_BRIGHTNESS, + ) + ctx, mock_session = mock_ha_session() + with ctx: + with patch( + "actions.home_assistant.connector.rest_api.logging.warning" + ) as mock_warn: + await connector.connect(inp) + assert any("not supported" in str(c) for c in mock_warn.call_args_list) + + @pytest.mark.asyncio + async def test_climate_turn_off(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.CLIMATE, + entity_id="climate.bedroom", + action=HAAction.TURN_OFF, + ) + ctx, mock_session = mock_ha_session() + with ctx: + await connector.connect(inp) + url = mock_session.post.call_args[0][0] + assert "climate/turn_off" in url + + @pytest.mark.asyncio + async def test_unsupported_device_type_warns(self): + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.TURN_ON, + ) + ctx, mock_session = mock_ha_session() + with ctx: + with patch( + "actions.home_assistant.connector.rest_api.logging.warning" + ) as mock_warn: + # Patch _call_service agar tidak dipanggil, lalu paksa masuk else branch + with patch.object(connector, "_call_service") as _: + # Simulasi device_type yang tidak cocok dengan semua kondisi + # dengan mocking langsung di level connect() + with patch( + "actions.home_assistant.connector.rest_api.HADeviceType" + ) as mock_enum: + mock_enum.LIGHT = "FAKE_LIGHT" + mock_enum.SWITCH = "FAKE_SWITCH" + mock_enum.CLIMATE = "FAKE_CLIMATE" + await connector.connect(inp) + assert any( + "not supported" in str(c) for c in mock_warn.call_args_list + ) diff --git a/tests/actions/home_assistant/connector/test_home_assistant_websocket.py b/tests/actions/home_assistant/connector/test_home_assistant_websocket.py new file mode 100644 index 000000000..1e1ffded5 --- /dev/null +++ b/tests/actions/home_assistant/connector/test_home_assistant_websocket.py @@ -0,0 +1,491 @@ +"""Tests for Home Assistant WebSocket action connector.""" + +import json +from unittest.mock import AsyncMock, patch + +import pytest + +from actions.home_assistant.connector.websocket import ( + COLOR_MAP, + HomeAssistantWebSocketConfig, + HomeAssistantWebSocketConnector, +) +from actions.home_assistant.interface import ( + HAAction, + HADeviceType, + HomeAssistantInput, +) + + +def make_connector(base_url="http://ha.local:8123", token="test_token"): + """Helper to create a connector instance.""" + config = HomeAssistantWebSocketConfig(base_url=base_url, token=token, timeout=5.0) + return HomeAssistantWebSocketConnector(config) + + +def make_ws_mock(auth_ok=True, command_success=True, unexpected_type=False): + """Helper to create a mock WebSocket connection.""" + auth_required = json.dumps({"type": "auth_required"}) + if unexpected_type: + auth_required = json.dumps({"type": "hello"}) + + if auth_ok: + auth_response = json.dumps({"type": "auth_ok"}) + else: + auth_response = json.dumps({"type": "auth_invalid"}) + + if command_success: + command_response = json.dumps({"id": 1, "type": "result", "success": True}) + else: + command_response = json.dumps( + { + "id": 1, + "type": "result", + "success": False, + "error": {"code": "unknown_error", "message": "failed"}, + } + ) + + ws = AsyncMock() + ws.recv = AsyncMock(side_effect=[auth_required, auth_response, command_response]) + ws.send = AsyncMock() + ws.__aenter__ = AsyncMock(return_value=ws) + ws.__aexit__ = AsyncMock(return_value=None) + return ws + + +class TestHomeAssistantWebSocketConfig: + """Tests for HomeAssistantWebSocketConfig.""" + + def test_default_values(self): + """Test config default values.""" + config = HomeAssistantWebSocketConfig() + assert config.base_url == "" + assert config.token == "" + assert config.timeout == 10.0 + + def test_custom_values(self): + """Test config with custom values.""" + config = HomeAssistantWebSocketConfig( + base_url="http://ha.local:8123", + token="my_token", + timeout=30.0, + ) + assert config.base_url == "http://ha.local:8123" + assert config.token == "my_token" + assert config.timeout == 30.0 + + +class TestHomeAssistantWebSocketConnectorInit: + """Tests for connector initialization.""" + + def test_http_url_converted_to_ws(self): + """Test that http:// is converted to ws://.""" + connector = make_connector(base_url="http://ha.local:8123") + assert connector._ws_url == "ws://ha.local:8123/api/websocket" + + def test_https_url_converted_to_wss(self): + """Test that https:// is converted to wss://.""" + connector = make_connector(base_url="https://ha.local:8123") + assert connector._ws_url == "wss://ha.local:8123/api/websocket" + + def test_trailing_slash_stripped(self): + """Test that trailing slash is stripped.""" + connector = make_connector(base_url="http://ha.local:8123/") + assert connector._ws_url == "ws://ha.local:8123/api/websocket" + + def test_no_scheme_url(self): + """Test URL without scheme.""" + connector = make_connector(base_url="ha.local:8123") + assert connector._ws_url == "ha.local:8123/api/websocket" + + def test_warns_missing_base_url(self): + """Test warning when base_url is missing.""" + with patch( + "actions.home_assistant.connector.websocket.logging.warning" + ) as mock_warn: + make_connector(base_url="") + assert any("base_url" in str(c) for c in mock_warn.call_args_list) + + def test_warns_missing_token(self): + """Test warning when token is missing.""" + with patch( + "actions.home_assistant.connector.websocket.logging.warning" + ) as mock_warn: + make_connector(token="") + assert any("token" in str(c) for c in mock_warn.call_args_list) + + +class TestSendCommand: + """Tests for _send_command().""" + + @pytest.mark.asyncio + async def test_send_command_success(self): + """Test successful command send.""" + connector = make_connector() + ws = make_ws_mock(auth_ok=True, command_success=True) + with patch( + "actions.home_assistant.connector.websocket.websockets.connect", + return_value=ws, + ): + result = await connector._send_command("light", "turn_on", "light.x") + assert result is True + + @pytest.mark.asyncio + async def test_send_command_no_token(self): + """Test that missing token returns False.""" + connector = make_connector(token="") + result = await connector._send_command("light", "turn_on", "light.x") + assert result is False + + @pytest.mark.asyncio + async def test_send_command_auth_failed(self): + """Test handling of auth failure.""" + connector = make_connector() + ws = make_ws_mock(auth_ok=False) + with patch( + "actions.home_assistant.connector.websocket.websockets.connect", + return_value=ws, + ): + with patch( + "actions.home_assistant.connector.websocket.logging.error" + ) as mock_err: + result = await connector._send_command("light", "turn_on", "light.x") + assert result is False + assert any( + "authentication failed" in str(c) for c in mock_err.call_args_list + ) + + @pytest.mark.asyncio + async def test_send_command_unexpected_auth_type(self): + """Test handling of unexpected initial message type.""" + connector = make_connector() + ws = make_ws_mock(unexpected_type=True) + with patch( + "actions.home_assistant.connector.websocket.websockets.connect", + return_value=ws, + ): + with patch( + "actions.home_assistant.connector.websocket.logging.error" + ) as mock_err: + result = await connector._send_command("light", "turn_on", "light.x") + assert result is False + assert any("auth_required" in str(c) for c in mock_err.call_args_list) + + @pytest.mark.asyncio + async def test_send_command_failed_response(self): + """Test handling of failed command response.""" + connector = make_connector() + ws = make_ws_mock(command_success=False) + with patch( + "actions.home_assistant.connector.websocket.websockets.connect", + return_value=ws, + ): + with patch( + "actions.home_assistant.connector.websocket.logging.error" + ) as mock_err: + result = await connector._send_command("light", "turn_on", "light.x") + assert result is False + assert any("command failed" in str(c) for c in mock_err.call_args_list) + + @pytest.mark.asyncio + async def test_send_command_timeout(self): + """Test handling of timeout error.""" + connector = make_connector() + with patch( + "actions.home_assistant.connector.websocket.websockets.connect" + ) as mock_connect: + mock_connect.side_effect = TimeoutError() + with patch( + "actions.home_assistant.connector.websocket.logging.error" + ) as mock_err: + result = await connector._send_command("light", "turn_on", "light.x") + assert result is False + assert any("timed out" in str(c) for c in mock_err.call_args_list) + + @pytest.mark.asyncio + async def test_send_command_websocket_exception(self): + """Test handling of WebSocket exception.""" + import websockets.exceptions + + connector = make_connector() + with patch( + "actions.home_assistant.connector.websocket.websockets.connect" + ) as mock_connect: + mock_connect.side_effect = websockets.exceptions.WebSocketException("err") + with patch( + "actions.home_assistant.connector.websocket.logging.error" + ) as mock_err: + result = await connector._send_command("light", "turn_on", "light.x") + assert result is False + assert any("WebSocket error" in str(c) for c in mock_err.call_args_list) + + @pytest.mark.asyncio + async def test_send_command_unexpected_exception(self): + """Test handling of unexpected exception.""" + connector = make_connector() + with patch( + "actions.home_assistant.connector.websocket.websockets.connect" + ) as mock_connect: + mock_connect.side_effect = RuntimeError("boom") + with patch( + "actions.home_assistant.connector.websocket.logging.error" + ) as mock_err: + result = await connector._send_command("light", "turn_on", "light.x") + assert result is False + assert any( + "unexpected error" in str(c) for c in mock_err.call_args_list + ) + + @pytest.mark.asyncio + async def test_send_command_increments_msg_id(self): + """Test that message ID increments on each call.""" + connector = make_connector() + assert connector._msg_id == 1 + ws = make_ws_mock() + with patch( + "actions.home_assistant.connector.websocket.websockets.connect", + return_value=ws, + ): + await connector._send_command("light", "turn_on", "light.x") + assert connector._msg_id == 2 + + +class TestConnectLight: + """Tests for light device control.""" + + @pytest.mark.asyncio + async def test_light_turn_on(self): + """Test light turn on.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, entity_id="light.x", action=HAAction.TURN_ON + ) + with patch.object( + connector, "_send_command", new_callable=AsyncMock + ) as mock_cmd: + await connector.connect(inp) + mock_cmd.assert_called_once_with("light", "turn_on", "light.x") + + @pytest.mark.asyncio + async def test_light_turn_off(self): + """Test light turn off.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.TURN_OFF, + ) + with patch.object( + connector, "_send_command", new_callable=AsyncMock + ) as mock_cmd: + await connector.connect(inp) + mock_cmd.assert_called_once_with("light", "turn_off", "light.x") + + @pytest.mark.asyncio + async def test_light_set_brightness(self): + """Test light set brightness.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.SET_BRIGHTNESS, + brightness=128, + ) + with patch.object( + connector, "_send_command", new_callable=AsyncMock + ) as mock_cmd: + await connector.connect(inp) + mock_cmd.assert_called_once_with( + "light", "turn_on", "light.x", {"brightness": 128} + ) + + @pytest.mark.asyncio + async def test_light_set_color(self): + """Test light set color.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.SET_COLOR, + color="red", + ) + with patch.object( + connector, "_send_command", new_callable=AsyncMock + ) as mock_cmd: + await connector.connect(inp) + mock_cmd.assert_called_once_with( + "light", "turn_on", "light.x", {"hs_color": COLOR_MAP["red"]} + ) + + @pytest.mark.asyncio + async def test_light_unknown_color_defaults_to_white(self): + """Test that unknown color defaults to white.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.SET_COLOR, + color="magenta", + ) + with patch.object( + connector, "_send_command", new_callable=AsyncMock + ) as mock_cmd: + await connector.connect(inp) + mock_cmd.assert_called_once_with( + "light", "turn_on", "light.x", {"hs_color": COLOR_MAP["white"]} + ) + + @pytest.mark.asyncio + async def test_light_unsupported_action_warns(self): + """Test warning for unsupported light action.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, + entity_id="light.x", + action=HAAction.SET_TEMPERATURE, + ) + with patch( + "actions.home_assistant.connector.websocket.logging.warning" + ) as mock_warn: + await connector.connect(inp) + assert any("not supported" in str(c) for c in mock_warn.call_args_list) + + +class TestConnectSwitch: + """Tests for switch device control.""" + + @pytest.mark.asyncio + async def test_switch_turn_on(self): + """Test switch turn on.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.SWITCH, + entity_id="switch.fan", + action=HAAction.TURN_ON, + ) + with patch.object( + connector, "_send_command", new_callable=AsyncMock + ) as mock_cmd: + await connector.connect(inp) + mock_cmd.assert_called_once_with("switch", "turn_on", "switch.fan") + + @pytest.mark.asyncio + async def test_switch_turn_off(self): + """Test switch turn off.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.SWITCH, + entity_id="switch.fan", + action=HAAction.TURN_OFF, + ) + with patch.object( + connector, "_send_command", new_callable=AsyncMock + ) as mock_cmd: + await connector.connect(inp) + mock_cmd.assert_called_once_with("switch", "turn_off", "switch.fan") + + @pytest.mark.asyncio + async def test_switch_unsupported_action_warns(self): + """Test warning for unsupported switch action.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.SWITCH, + entity_id="switch.fan", + action=HAAction.SET_TEMPERATURE, + ) + with patch( + "actions.home_assistant.connector.websocket.logging.warning" + ) as mock_warn: + await connector.connect(inp) + assert any("not supported" in str(c) for c in mock_warn.call_args_list) + + +class TestConnectClimate: + """Tests for climate device control.""" + + @pytest.mark.asyncio + async def test_climate_set_temperature(self): + """Test climate set temperature.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.CLIMATE, + entity_id="climate.bedroom", + action=HAAction.SET_TEMPERATURE, + temperature=24.0, + ) + with patch.object( + connector, "_send_command", new_callable=AsyncMock + ) as mock_cmd: + await connector.connect(inp) + mock_cmd.assert_called_once_with( + "climate", "set_temperature", "climate.bedroom", {"temperature": 24.0} + ) + + @pytest.mark.asyncio + async def test_climate_turn_on(self): + """Test climate turn on.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.CLIMATE, + entity_id="climate.bedroom", + action=HAAction.TURN_ON, + ) + with patch.object( + connector, "_send_command", new_callable=AsyncMock + ) as mock_cmd: + await connector.connect(inp) + mock_cmd.assert_called_once_with("climate", "turn_on", "climate.bedroom") + + @pytest.mark.asyncio + async def test_climate_turn_off(self): + """Test climate turn off.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.CLIMATE, + entity_id="climate.bedroom", + action=HAAction.TURN_OFF, + ) + with patch.object( + connector, "_send_command", new_callable=AsyncMock + ) as mock_cmd: + await connector.connect(inp) + mock_cmd.assert_called_once_with("climate", "turn_off", "climate.bedroom") + + @pytest.mark.asyncio + async def test_climate_unsupported_action_warns(self): + """Test warning for unsupported climate action.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.CLIMATE, + entity_id="climate.bedroom", + action=HAAction.SET_BRIGHTNESS, + ) + with patch( + "actions.home_assistant.connector.websocket.logging.warning" + ) as mock_warn: + await connector.connect(inp) + assert any("not supported" in str(c) for c in mock_warn.call_args_list) + + +class TestConnectUnsupportedDeviceType: + """Tests for unsupported device type.""" + + @pytest.mark.asyncio + async def test_unsupported_device_type_warns(self): + """Test warning for unsupported device type.""" + connector = make_connector() + inp = HomeAssistantInput( + device_type=HADeviceType.LIGHT, entity_id="light.x", action=HAAction.TURN_ON + ) + with patch( + "actions.home_assistant.connector.websocket.logging.warning" + ) as mock_warn: + with patch( + "actions.home_assistant.connector.websocket.HADeviceType" + ) as mock_enum: + mock_enum.LIGHT = "FAKE" + mock_enum.SWITCH = "FAKE" + mock_enum.CLIMATE = "FAKE" + await connector.connect(inp) + assert any("not supported" in str(c) for c in mock_warn.call_args_list) diff --git a/tests/inputs/plugins/test_home_assistant.py b/tests/inputs/plugins/test_home_assistant.py new file mode 100644 index 000000000..5eb7c82e2 --- /dev/null +++ b/tests/inputs/plugins/test_home_assistant.py @@ -0,0 +1,483 @@ +import asyncio +import time +from unittest.mock import AsyncMock, MagicMock, patch + +import aiohttp +import pytest + +from inputs.plugins.home_assistant import ( + HomeAssistantInputConfig, + HomeAssistantStateInput, +) + + +class TestHomeAssistantInputConfig: + """Tests for HomeAssistantInputConfig.""" + + def test_default_values(self): + """Test config with default values.""" + config = HomeAssistantInputConfig() + assert config.base_url == "" + assert config.token == "" + assert config.entity_ids == "" + assert config.poll_interval == 30.0 + + def test_custom_values(self): + """Test config with custom values.""" + config = HomeAssistantInputConfig( + base_url="http://homeassistant.local:8123", + token="my_token", + entity_ids="light.living_room,switch.fan", + poll_interval=60.0, + ) + assert config.base_url == "http://homeassistant.local:8123" + assert config.token == "my_token" + assert config.entity_ids == "light.living_room,switch.fan" + assert config.poll_interval == 60.0 + + +class TestHomeAssistantStateInputInit: + """Tests for HomeAssistantStateInput initialization.""" + + @pytest.fixture + def mock_io_provider(self): + """Mock IOProvider.""" + with patch("inputs.plugins.home_assistant.IOProvider") as mock: + yield mock + + def test_init_parses_entity_ids(self, mock_io_provider): + """Test that entity_ids string is parsed into list.""" + config = HomeAssistantInputConfig( + base_url="http://ha.local:8123", + token="tok", + entity_ids="light.living_room, switch.fan , climate.bedroom", + ) + ha = HomeAssistantStateInput(config) + assert ha.entity_ids == ["light.living_room", "switch.fan", "climate.bedroom"] + + def test_init_strips_trailing_slash(self, mock_io_provider): + """Test that trailing slash is stripped from base_url.""" + config = HomeAssistantInputConfig( + base_url="http://ha.local:8123/", + token="tok", + ) + ha = HomeAssistantStateInput(config) + assert ha.base_url == "http://ha.local:8123" + + def test_init_warns_missing_base_url(self, mock_io_provider): + """Test warning when base_url is missing.""" + with patch("inputs.plugins.home_assistant.logging.warning") as mock_warn: + config = HomeAssistantInputConfig(token="tok", entity_ids="light.x") + HomeAssistantStateInput(config) + assert any("base_url" in str(c) for c in mock_warn.call_args_list) + + def test_init_warns_missing_token(self, mock_io_provider): + """Test warning when token is missing.""" + with patch("inputs.plugins.home_assistant.logging.warning") as mock_warn: + config = HomeAssistantInputConfig( + base_url="http://ha.local:8123", entity_ids="light.x" + ) + HomeAssistantStateInput(config) + assert any("token" in str(c) for c in mock_warn.call_args_list) + + def test_init_warns_missing_entity_ids(self, mock_io_provider): + """Test warning when entity_ids is missing.""" + with patch("inputs.plugins.home_assistant.logging.warning") as mock_warn: + config = HomeAssistantInputConfig( + base_url="http://ha.local:8123", token="tok" + ) + HomeAssistantStateInput(config) + assert any("entity_ids" in str(c) for c in mock_warn.call_args_list) + + def test_descriptor_for_llm(self, mock_io_provider): + """Test descriptor_for_LLM is set correctly.""" + config = HomeAssistantInputConfig() + ha = HomeAssistantStateInput(config) + assert ha.descriptor_for_LLM == "Home Assistant Device States" + + +def make_ha_input(): + """Helper to create a HomeAssistantStateInput with mocked IOProvider.""" + with patch("inputs.plugins.home_assistant.IOProvider"): + config = HomeAssistantInputConfig( + base_url="http://ha.local:8123", + token="test_token", + entity_ids="light.living_room,switch.fan", + poll_interval=30.0, + ) + return HomeAssistantStateInput(config) + + +def mock_ha_get_session(status=200, json_data=None): + """Helper to mock aiohttp.ClientSession for GET requests.""" + mock_response = AsyncMock() + mock_response.status = status + mock_response.json = AsyncMock(return_value=json_data or {}) + + mock_get = MagicMock() + mock_get.__aenter__ = AsyncMock(return_value=mock_response) + mock_get.__aexit__ = AsyncMock(return_value=None) + + mock_session = MagicMock() + mock_session.get = MagicMock(return_value=mock_get) + mock_session.__aenter__ = AsyncMock(return_value=mock_session) + mock_session.__aexit__ = AsyncMock(return_value=None) + + ctx = patch( + "inputs.plugins.home_assistant.aiohttp.ClientSession", return_value=mock_session + ) + return ctx, mock_session + + +class TestFetchState: + """Tests for _fetch_state().""" + + @pytest.mark.asyncio + async def test_fetch_state_success(self): + """Test successful state fetch.""" + ha = make_ha_input() + state_data = { + "entity_id": "light.living_room", + "state": "on", + "attributes": {"brightness": 255, "friendly_name": "Living Room Light"}, + } + ctx, _ = mock_ha_get_session(status=200, json_data=state_data) + with ctx: + result = await ha._fetch_state("light.living_room") + assert result == state_data + + @pytest.mark.asyncio + async def test_fetch_state_error_status(self): + """Test handling of non-200 status.""" + ha = make_ha_input() + ctx, _ = mock_ha_get_session(status=404) + with ctx: + with patch("inputs.plugins.home_assistant.logging.error") as mock_err: + result = await ha._fetch_state("light.missing") + assert result is None + assert any("404" in str(c) for c in mock_err.call_args_list) + + @pytest.mark.asyncio + async def test_fetch_state_no_base_url(self): + """Test that fetch returns None without base_url.""" + with patch("inputs.plugins.home_assistant.IOProvider"): + config = HomeAssistantInputConfig(token="tok", entity_ids="light.x") + ha = HomeAssistantStateInput(config) + result = await ha._fetch_state("light.x") + assert result is None + + @pytest.mark.asyncio + async def test_fetch_state_no_token(self): + """Test that fetch returns None without token.""" + with patch("inputs.plugins.home_assistant.IOProvider"): + config = HomeAssistantInputConfig( + base_url="http://ha.local:8123", entity_ids="light.x" + ) + ha = HomeAssistantStateInput(config) + result = await ha._fetch_state("light.x") + assert result is None + + @pytest.mark.asyncio + async def test_fetch_state_timeout(self): + """Test handling of timeout error.""" + + ha = make_ha_input() + with patch("inputs.plugins.home_assistant.aiohttp.ClientSession") as mock_cls: + mock_cls.side_effect = asyncio.TimeoutError() + with patch("inputs.plugins.home_assistant.logging.error") as mock_err: + result = await ha._fetch_state("light.x") + assert result is None + assert any("timeout" in str(c).lower() for c in mock_err.call_args_list) + + @pytest.mark.asyncio + async def test_fetch_state_client_error(self): + """Test handling of aiohttp.ClientError.""" + + ha = make_ha_input() + with patch("inputs.plugins.home_assistant.aiohttp.ClientSession") as mock_cls: + mock_cls.side_effect = aiohttp.ClientError("conn refused") + with patch("inputs.plugins.home_assistant.logging.error") as mock_err: + result = await ha._fetch_state("light.x") + assert result is None + assert any( + "network error" in str(c).lower() for c in mock_err.call_args_list + ) + + @pytest.mark.asyncio + async def test_fetch_state_unexpected_exception(self): + """Test handling of unexpected exception.""" + ha = make_ha_input() + with patch("inputs.plugins.home_assistant.aiohttp.ClientSession") as mock_cls: + mock_cls.side_effect = RuntimeError("unexpected") + with patch("inputs.plugins.home_assistant.logging.error") as mock_err: + result = await ha._fetch_state("light.x") + assert result is None + assert any( + "unexpected error" in str(c).lower() + for c in mock_err.call_args_list + ) + + +class TestPoll: + """Tests for _poll() behavior.""" + + @pytest.mark.asyncio + async def test_poll_returns_none_before_interval(self): + """Test that poll returns None when interval has not elapsed.""" + ha = make_ha_input() + ha._last_poll_time = time.time() + + with patch( + "inputs.plugins.home_assistant.asyncio.sleep", new_callable=AsyncMock + ): + result = await ha._poll() + assert result is None + + @pytest.mark.asyncio + async def test_poll_fetches_after_interval(self): + """Test that poll fetches states after interval elapses.""" + ha = make_ha_input() + ha._last_poll_time = time.time() - 60.0 + + state_data = {"entity_id": "light.living_room", "state": "on", "attributes": {}} + + with patch( + "inputs.plugins.home_assistant.asyncio.sleep", new_callable=AsyncMock + ): + with patch.object(ha, "_fetch_state", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = state_data + result = await ha._poll() + assert result is not None + assert len(result) == 2 # two entity_ids + + @pytest.mark.asyncio + async def test_poll_returns_none_with_no_entity_ids(self): + """Test that poll returns None when no entity_ids configured.""" + with patch("inputs.plugins.home_assistant.IOProvider"): + config = HomeAssistantInputConfig( + base_url="http://ha.local:8123", + token="tok", + entity_ids="", + poll_interval=0.0, + ) + ha = HomeAssistantStateInput(config) + ha._last_poll_time = 0.0 + + with patch( + "inputs.plugins.home_assistant.asyncio.sleep", new_callable=AsyncMock + ): + result = await ha._poll() + assert result is None + + @pytest.mark.asyncio + async def test_poll_skips_failed_fetches(self): + """Test that failed fetches are excluded from results.""" + ha = make_ha_input() + ha._last_poll_time = 0.0 + + with patch( + "inputs.plugins.home_assistant.asyncio.sleep", new_callable=AsyncMock + ): + with patch.object(ha, "_fetch_state", new_callable=AsyncMock) as mock_fetch: + mock_fetch.return_value = None + result = await ha._poll() + assert result is None + + +class TestFormatState: + """Tests for _format_state().""" + + @pytest.fixture + def ha(self): + return make_ha_input() + + def test_format_basic_state(self, ha): + """Test formatting a basic state.""" + state = { + "entity_id": "switch.fan", + "state": "off", + "attributes": {"friendly_name": "Bedroom Fan"}, + } + result = ha._format_state(state) + assert "Bedroom Fan" in result + assert "switch.fan" in result + assert "off" in result + + def test_format_state_with_brightness(self, ha): + """Test formatting light state with brightness.""" + state = { + "entity_id": "light.living_room", + "state": "on", + "attributes": {"friendly_name": "Living Room", "brightness": 128}, + } + result = ha._format_state(state) + assert "50%" in result + + def test_format_state_with_color(self, ha): + """Test formatting light state with color.""" + state = { + "entity_id": "light.lamp", + "state": "on", + "attributes": {"friendly_name": "Lamp", "color_name": "red"}, + } + result = ha._format_state(state) + assert "red" in result + + def test_format_state_with_temperature(self, ha): + """Test formatting climate state with temperature.""" + state = { + "entity_id": "climate.bedroom", + "state": "heat", + "attributes": { + "friendly_name": "Bedroom AC", + "temperature": 22, + "current_temperature": 20, + }, + } + result = ha._format_state(state) + assert "22" in result + assert "20" in result + assert "°C" in result + + def test_format_state_no_friendly_name(self, ha): + """Test that entity_id is used when friendly_name is absent.""" + state = { + "entity_id": "switch.garage", + "state": "on", + "attributes": {}, + } + result = ha._format_state(state) + assert "switch.garage" in result + + def test_format_state_brightness_none(self, ha): + """Test that None brightness is handled gracefully.""" + state = { + "entity_id": "light.x", + "state": "on", + "attributes": {"brightness": None}, + } + result = ha._format_state(state) + assert "brightness" not in result + + +class TestRawToText: + """Tests for _raw_to_text() and raw_to_text().""" + + @pytest.fixture + def ha(self): + return make_ha_input() + + @pytest.mark.asyncio + async def test_raw_to_text_none_returns_none(self, ha): + """Test that None input returns None.""" + result = await ha._raw_to_text(None) + assert result is None + + @pytest.mark.asyncio + async def test_raw_to_text_new_state_returns_message(self, ha): + """Test that new state change returns a message.""" + states = [{"entity_id": "light.living_room", "state": "on", "attributes": {}}] + result = await ha._raw_to_text(states) + assert result is not None + assert "light.living_room" in result.message + + @pytest.mark.asyncio + async def test_raw_to_text_no_change_returns_none(self, ha): + """Test that unchanged state returns None.""" + states = [{"entity_id": "light.living_room", "state": "on", "attributes": {}}] + await ha._raw_to_text(states) + result = await ha._raw_to_text(states) + assert result is None + + @pytest.mark.asyncio + async def test_raw_to_text_detects_state_change(self, ha): + """Test that state change is detected on second poll.""" + states_on = [ + {"entity_id": "light.living_room", "state": "on", "attributes": {}} + ] + states_off = [ + {"entity_id": "light.living_room", "state": "off", "attributes": {}} + ] + + await ha._raw_to_text(states_on) + result = await ha._raw_to_text(states_off) + assert result is not None + assert "off" in result.message + + @pytest.mark.asyncio + async def test_raw_to_text_updates_last_states(self, ha): + """Test that _last_states is updated after processing.""" + states = [{"entity_id": "switch.fan", "state": "on", "attributes": {}}] + await ha._raw_to_text(states) + assert ha._last_states["switch.fan"] == "on" + + @pytest.mark.asyncio + async def test_raw_to_text_adds_to_messages(self, ha): + """Test that raw_to_text adds message to buffer.""" + states = [{"entity_id": "light.x", "state": "on", "attributes": {}}] + await ha.raw_to_text(states) + assert len(ha.messages) == 1 + + @pytest.mark.asyncio + async def test_raw_to_text_none_does_not_add_message(self, ha): + """Test that None input does not add to buffer.""" + await ha.raw_to_text(None) + assert len(ha.messages) == 0 + + +class TestFormattedLatestBuffer: + """Tests for formatted_latest_buffer().""" + + @pytest.fixture + def mock_io_provider(self): + with patch("inputs.plugins.home_assistant.IOProvider") as mock: + mock_instance = MagicMock() + mock.return_value = mock_instance + yield mock_instance + + @pytest.fixture + def ha(self, mock_io_provider): + config = HomeAssistantInputConfig( + base_url="http://ha.local:8123", + token="tok", + entity_ids="light.x", + ) + ha = HomeAssistantStateInput(config) + ha.io_provider = mock_io_provider + return ha + + def test_formatted_latest_buffer_empty(self, ha): + """Test that empty buffer returns None.""" + result = ha.formatted_latest_buffer() + assert result is None + + @pytest.mark.asyncio + async def test_formatted_latest_buffer_with_message(self, ha): + """Test formatting with message in buffer.""" + states = [{"entity_id": "light.x", "state": "on", "attributes": {}}] + await ha.raw_to_text(states) + + result = ha.formatted_latest_buffer() + assert result is not None + assert "Home Assistant Device States" in result + assert "// START" in result + assert "// END" in result + assert "light.x" in result + + @pytest.mark.asyncio + async def test_formatted_latest_buffer_clears_messages(self, ha): + """Test that buffer is cleared after formatting.""" + states = [{"entity_id": "light.x", "state": "on", "attributes": {}}] + await ha.raw_to_text(states) + assert len(ha.messages) == 1 + + ha.formatted_latest_buffer() + assert len(ha.messages) == 0 + + @pytest.mark.asyncio + async def test_formatted_latest_buffer_calls_io_provider(self, ha): + """Test that io_provider.add_input is called.""" + states = [{"entity_id": "light.x", "state": "on", "attributes": {}}] + await ha.raw_to_text(states) + ha.formatted_latest_buffer() + ha.io_provider.add_input.assert_called_once()