Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions custom_components/dreame_vacuum/vacuum.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,20 @@
CONSUMABLE_WATER_OUTLET_FILTER,
)

try:
from homeassistant.components.vacuum import Segment
except ImportError:
from dataclasses import dataclass

@dataclass
class Segment:
id: str
name: str
group: str | None = None


CLEAN_AREA_ENTITY_FEATURE = getattr(VacuumEntityFeature, "CLEAN_AREA", 0)

STATE_CODE_TO_STATE: Final = {
DreameVacuumState.UNKNOWN: STATE_IDLE,
DreameVacuumState.SWEEPING: STATE_CLEANING,
Expand Down Expand Up @@ -1023,6 +1037,7 @@ def __init__(self, coordinator: DreameVacuumDataUpdateCoordinator, activity_clas
| VacuumEntityFeature.PAUSE
| VacuumEntityFeature.STOP
| VacuumEntityFeature.RETURN_HOME
| CLEAN_AREA_ENTITY_FEATURE
)
self._activity_class = activity_class

Expand All @@ -1031,8 +1046,23 @@ def __init__(self, coordinator: DreameVacuumDataUpdateCoordinator, activity_clas
@callback
def _handle_coordinator_update(self) -> None:
self._set_attrs()
if CLEAN_AREA_ENTITY_FEATURE:
self._check_segments_changed()
self.async_write_ha_state()

@callback
def _check_segments_changed(self) -> None:
"""Check if segments have changed and create repair issue."""
last_seen = self.last_seen_segments
if last_seen is None:
return

current_ids = {seg.id for seg in self._get_segments()}
last_seen_ids = {seg.id for seg in last_seen}

if current_ids != last_seen_ids:
self.async_create_segments_issue()

def _set_attrs(self):
if self.device.status.has_error:
self._attr_icon = "mdi:alert-octagon"
Expand Down Expand Up @@ -1161,6 +1191,62 @@ async def async_clean_spot(self, points, repeats=1, suction_level="", water_volu
water_volume,
)

def _get_segments(self) -> list[Segment]:
"""Get the segments that can be cleaned."""
segments: list[Segment] = []
map_data_list = self.device.status.map_data_list
if map_data_list is None:
return segments

for map_data in map_data_list.values():
if map_data.segments is None or map_data.map_index is None:
continue

for segment_id, segment in map_data.segments.items():
if segment.visibility is False:
continue

segments.append(
Segment(
id=f"{map_data.map_index}_{segment_id}",
name=segment.name,
group=map_data.map_name,
)
)

return segments

async def async_get_segments(self) -> list[Segment]:
"""Get the segments that can be cleaned."""
return self._get_segments()

async def async_clean_segments(self, segment_ids: list[str], **kwargs) -> None:
"""Perform an area clean.

Only cleans segments from the currently selected map.
"""
selected_map = self.device.status.selected_map
if selected_map is None or selected_map.map_index is None:
return

selected_map_index = selected_map.map_index

# Parse composite IDs and filter to only segments from the selected map
int_segment_ids: list[int] = []
for composite_id in segment_ids:
map_index_str, segment_id_str = composite_id.split("_", 1)
if int(map_index_str) == selected_map_index:
int_segment_ids.append(int(segment_id_str))

if not int_segment_ids:
return

await self._try_command(
"Unable to call clean_segment: %s",
self.device.clean_segment,
int_segment_ids,
)

async def async_goto(self, x, y) -> None:
"""Go to a point and take pictures around."""
if x is not None and y is not None and x != "" and y != "":
Expand Down