From 1238472530f73031fae280e8e5f4290861167030 Mon Sep 17 00:00:00 2001 From: Wanbogang Date: Fri, 13 Mar 2026 17:17:53 +0700 Subject: [PATCH] feat: add SmolVLM2 local VLM plugin without external dependencies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add VLM_SmolVLM_Local plugin that runs SmolVLM2 vision-language model directly via HuggingFace transformers — no Ollama or internet connection required after the initial model download. - Auto-detects GPU via torch.cuda.is_available(), falls back to CPU - Default model SmolVLM2-256M requires less than 1GB VRAM - Graceful degradation if transformers is not installed - Add smolvlm optional dependency group in pyproject.toml - Add config/smolvlm_local.json5 for fully local stack with OllamaLLM - 16 tests, 99% coverage Install optional dependencies with: pip install om1[smolvlm] --- config/smolvlm_local.json5 | 27 ++ pyproject.toml | 4 + src/inputs/plugins/vlm_smolvlm_local.py | 292 +++++++++++++++ .../inputs/plugins/test_vlm_smolvlm_local.py | 354 ++++++++++++++++++ 4 files changed, 677 insertions(+) create mode 100644 config/smolvlm_local.json5 create mode 100644 src/inputs/plugins/vlm_smolvlm_local.py create mode 100644 tests/inputs/plugins/test_vlm_smolvlm_local.py diff --git a/config/smolvlm_local.json5 b/config/smolvlm_local.json5 new file mode 100644 index 000000000..8bb3efb24 --- /dev/null +++ b/config/smolvlm_local.json5 @@ -0,0 +1,27 @@ +{ + version: "v1.0.3", + hertz: 1, + name: "spot_speak", + api_key: "${OM_API_KEY:-openmind_free}", + system_prompt_base: "You are a smart, curious, and friendly dog. Your name is Spot. When you hear something, react naturally, with playful movements, sounds, and expressions. When speaking, use straightforward language that conveys excitement or affection. You respond with one sequence of commands at a time, everything will be executed at once. Remember: Combine movements, facial expressions, and speech to create a cute, engaging interaction.", + system_governance: "Here are the laws that govern your actions. Do not violate these laws.\nFirst Law: A robot cannot harm a human or allow a human to come to harm.\nSecond Law: A robot must obey orders from humans, unless those orders conflict with the First Law.\nThird Law: A robot must protect itself, as long as that protection doesn't conflict with the First or Second Law.\nThe First Law is considered the most important, taking precedence over the second and third laws.", + system_prompt_examples: "Here are some examples of interactions you might encounter:\n\n1. If a person says 'Give me your paw!', you might:\n Move: 'shake paw'\n Speak: {{'Hello, let\\'s shake paws!'}}\n Emotion: 'joy'\n\n2. If a person says 'Sit!' you might:\n Move: 'sit'\n Speak: {{'Ok, but I like running more'}}\n Emotion: 'smile'\n\n3. If there\\'s no sound, go explore. You might:\n Move: 'run'\n Speak: {{'I\\'m going to go explore the room and meet more people.'}}\n Emotion: 'think'", + agent_inputs: [ + { + type: "VLM_SmolVLM_Local", + config: { + camera_index: 0, + model_id: "HuggingFaceTB/SmolVLM2-256M-Video-Instruct", + prompt: "Briefly describe what you see in one or two sentences.", + }, + }, + ], + cortex_llm: { + type: "OllamaLLM", + config: { + agent_name: "Spot", + history_length: 5, + }, + }, + agent_actions: [], +} diff --git a/pyproject.toml b/pyproject.toml index 1b4d51e1a..911ae846d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,10 @@ dds = [ "cyclonedds==0.10.2" ] macos = ["osascript"] +smolvlm = [ + "transformers>=4.52.0", + "num2words>=0.5.14", +] [dependency-groups] dev = [ diff --git a/src/inputs/plugins/vlm_smolvlm_local.py b/src/inputs/plugins/vlm_smolvlm_local.py new file mode 100644 index 000000000..2431a4bc4 --- /dev/null +++ b/src/inputs/plugins/vlm_smolvlm_local.py @@ -0,0 +1,292 @@ +import asyncio +import logging +import time +from typing import Optional + +import cv2 +import numpy as np +import torch +from PIL import Image as PILImage +from pydantic import Field + +from inputs.base import Message, SensorConfig +from inputs.base.loop import FuserInput +from providers.io_provider import IOProvider + +SmolVLMForConditionalGeneration = None # type: ignore[assignment] +SmolVLMProcessor = None # type: ignore[assignment] +HAS_TRANSFORMERS = False + +try: + from transformers import ( + SmolVLMForConditionalGeneration, + SmolVLMProcessor, + ) + + HAS_TRANSFORMERS = True +except ImportError: + logging.warning( + "SmolVLM local: 'transformers' not installed. " + "Install with: pip install transformers num2words" + ) + + +class VLM_SmolVLM_LocalConfig(SensorConfig): + """ + Configuration for SmolVLM2 local VLM sensor. + + Parameters + ---------- + camera_index : int + Index of the camera device to capture frames from. + model_id : str + HuggingFace model ID for SmolVLM2. + prompt : str + Text prompt sent alongside the image to the VLM. + """ + + camera_index: int = Field(default=0, description="Index of the camera device") + model_id: str = Field( + default="HuggingFaceTB/SmolVLM2-256M-Video-Instruct", + description="HuggingFace model ID for SmolVLM2", + ) + prompt: str = Field( + default="Briefly describe what you see in one or two sentences.", + description="Text prompt sent alongside the image to the VLM", + ) + + +def check_webcam(index_to_check: int) -> bool: + """ + Check if a webcam is available at the given index. + + Parameters + ---------- + index_to_check : int + The camera index to check. + + Returns + ------- + bool + True if the webcam is available, False otherwise. + """ + cap = cv2.VideoCapture(index_to_check) + if not cap.isOpened(): + logging.error(f"VLM SmolVLM Local: camera not found at index {index_to_check}") + cap.release() + return False + logging.info(f"VLM SmolVLM Local: camera found at index {index_to_check}") + cap.release() + return True + + +class VLM_SmolVLM_Local(FuserInput[VLM_SmolVLM_LocalConfig, Optional[np.ndarray]]): + """ + Vision Language Model input using a local SmolVLM2 model. + + Captures frames from a webcam and runs inference using SmolVLM2 + directly via the HuggingFace transformers library — no internet + connection or external server required after the initial model download. + + The model is downloaded automatically from HuggingFace on first run + and cached locally. Default model is SmolVLM2-256M which requires + less than 1GB of VRAM and can run on CPU as a fallback. + + Requires the transformers package: + + pip install transformers num2words + + For GPU acceleration, a CUDA-capable device is recommended but not required. + """ + + def __init__(self, config: VLM_SmolVLM_LocalConfig): + """ + Initialize the SmolVLM2 local VLM input handler. + + Parameters + ---------- + config : VLM_SmolVLM_LocalConfig + Configuration settings for the SmolVLM2 sensor. + """ + super().__init__(config) + + self.io_provider = IOProvider() + self.messages: list[Message] = [] + self.descriptor_for_LLM = "Vision" + + self.model = None + self.processor = None + + if not HAS_TRANSFORMERS: + logging.error( + "VLM SmolVLM Local: transformers not installed. Plugin disabled. " + "Install with: pip install transformers num2words" + ) + self.have_cam = False + self.cap = None + return + + self.device = "cuda" if torch.cuda.is_available() else "cpu" + logging.info(f"VLM SmolVLM Local: using device: {self.device}") + + logging.info( + f"VLM SmolVLM Local: loading model '{config.model_id}' " + "(downloading if not cached)..." + ) + try: + self.processor = SmolVLMProcessor.from_pretrained(config.model_id) # type: ignore[union-attr] + self.model = SmolVLMForConditionalGeneration.from_pretrained( # type: ignore[union-attr] + config.model_id, + torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, + device_map=self.device, + ) + self.model.eval() + logging.info("VLM SmolVLM Local: model loaded successfully") + except Exception as e: + logging.error(f"VLM SmolVLM Local: failed to load model: {e}") + self.have_cam = False + self.cap = None + return + + self.have_cam = check_webcam(self.config.camera_index) + self.cap: Optional[cv2.VideoCapture] = None + + if self.have_cam: + self.cap = cv2.VideoCapture(self.config.camera_index) + width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + logging.info(f"VLM SmolVLM Local: camera resolution {width}x{height}") + + async def _poll(self) -> Optional[np.ndarray]: + """ + Poll for a new frame from the camera. + + Returns + ------- + Optional[np.ndarray] + Captured frame as a numpy array, or None if unavailable. + """ + await asyncio.sleep(0.5) + + if not self.have_cam or self.cap is None: + return None + + ret, frame = self.cap.read() + if not ret or frame is None: + logging.warning("VLM SmolVLM Local: failed to read frame from camera") + return None + + return frame + + async def _raw_to_text(self, raw_input: Optional[np.ndarray]) -> Optional[Message]: + """ + Run SmolVLM2 inference on a camera frame and return a text description. + + Converts the numpy frame to a PIL image, runs the SmolVLM2 model, + and wraps the response in a timestamped Message. + + Parameters + ---------- + raw_input : Optional[np.ndarray] + Camera frame to process. + + Returns + ------- + Optional[Message] + Timestamped message containing the VLM description, + or None if processing fails or input is None. + """ + if raw_input is None: + return None + + if self.model is None or self.processor is None: + return None + + try: + image = PILImage.fromarray(cv2.cvtColor(raw_input, cv2.COLOR_BGR2RGB)) + + messages = [ + { + "role": "user", + "content": [ + {"type": "image"}, + {"type": "text", "text": self.config.prompt}, + ], + } + ] + + text = self.processor.apply_chat_template( + messages, tokenize=False, add_generation_prompt=True + ) + inputs = self.processor(text=text, images=[image], return_tensors="pt").to( + self.device + ) + + with torch.no_grad(): + output = self.model.generate(**inputs, max_new_tokens=128) + + description = self.processor.decode( + output[0][inputs["input_ids"].shape[1] :], + skip_special_tokens=True, + ).strip() + + if not description: + logging.warning("VLM SmolVLM Local: received empty response") + return None + + logging.info(f"VLM SmolVLM Local: {description}") + return Message(timestamp=time.time(), message=description) + + except Exception as e: + logging.error(f"VLM SmolVLM Local: inference error: {e}") + return None + + async def raw_to_text(self, raw_input: Optional[np.ndarray]): + """ + Convert a camera frame to text and append to the message buffer. + + Parameters + ---------- + raw_input : Optional[np.ndarray] + Camera frame to process. + """ + pending_message = await self._raw_to_text(raw_input) + + if pending_message is not None: + self.messages.append(pending_message) + + def formatted_latest_buffer(self) -> Optional[str]: + """ + Format the latest buffered message for the fuser and clear the buffer. + + Retrieves the most recent VLM description, formats it with the + standard INPUT block structure, records it in the IO provider, + and clears the internal message buffer. + + Returns + ------- + Optional[str] + Formatted input string for the fuser, or None if buffer is empty. + """ + if len(self.messages) == 0: + return None + + latest_message = self.messages[-1] + + logging.info(f"VLM_SmolVLM_Local: {latest_message.message}") + + result = f""" +INPUT: {self.descriptor_for_LLM} +// START +{latest_message.message} +// END +""" + + self.io_provider.add_input( + self.descriptor_for_LLM, + latest_message.message, + latest_message.timestamp, + ) + self.messages = [] + + return result diff --git a/tests/inputs/plugins/test_vlm_smolvlm_local.py b/tests/inputs/plugins/test_vlm_smolvlm_local.py new file mode 100644 index 000000000..7c0f01bc6 --- /dev/null +++ b/tests/inputs/plugins/test_vlm_smolvlm_local.py @@ -0,0 +1,354 @@ +from unittest.mock import AsyncMock, MagicMock, patch + +import cv2 +import numpy as np +import pytest + +from inputs.base import Message +from inputs.plugins.vlm_smolvlm_local import ( + VLM_SmolVLM_Local, + VLM_SmolVLM_LocalConfig, +) + +cv2_CAP_PROP_FRAME_WIDTH = cv2.CAP_PROP_FRAME_WIDTH +cv2_CAP_PROP_FRAME_HEIGHT = cv2.CAP_PROP_FRAME_HEIGHT + + +@pytest.fixture +def mock_transformers(): + with ( + patch("inputs.plugins.vlm_smolvlm_local.HAS_TRANSFORMERS", True), + patch( + "inputs.plugins.vlm_smolvlm_local.SmolVLMForConditionalGeneration" + ) as mock_model_cls, + patch( + "inputs.plugins.vlm_smolvlm_local.SmolVLMProcessor" + ) as mock_processor_cls, + ): + mock_model = MagicMock() + mock_model.eval = MagicMock() + mock_model_cls.from_pretrained.return_value = mock_model + + mock_processor = MagicMock() + mock_processor_cls.from_pretrained.return_value = mock_processor + + yield mock_model, mock_processor + + +@pytest.fixture +def mock_check_webcam(): + with patch("inputs.plugins.vlm_smolvlm_local.check_webcam", return_value=True): + yield + + +@pytest.fixture +def mock_cv2_video_capture(): + with patch("inputs.plugins.vlm_smolvlm_local.cv2.VideoCapture") as mock: + mock_instance = MagicMock() + dummy_frame = np.zeros((480, 640, 3), dtype=np.uint8) + mock_instance.read.return_value = (True, dummy_frame) + mock_instance.get.side_effect = lambda x: { + cv2_CAP_PROP_FRAME_WIDTH: 640, + cv2_CAP_PROP_FRAME_HEIGHT: 480, + }.get(x, 0) + mock.return_value = mock_instance + yield mock_instance + + +@pytest.fixture +def sensor(mock_transformers, mock_check_webcam, mock_cv2_video_capture): + with ( + patch("inputs.plugins.vlm_smolvlm_local.IOProvider"), + patch( + "inputs.plugins.vlm_smolvlm_local.torch.cuda.is_available", + return_value=False, + ), + ): + config = VLM_SmolVLM_LocalConfig(camera_index=0) + return VLM_SmolVLM_Local(config=config) + + +def test_initialization(sensor): + """Test basic initialization.""" + assert hasattr(sensor, "messages") + assert hasattr(sensor, "have_cam") + assert hasattr(sensor, "descriptor_for_LLM") + assert sensor.descriptor_for_LLM == "Vision" + assert sensor.have_cam is True + + +def test_initialization_without_transformers(): + """Test graceful degradation when transformers is not installed.""" + with ( + patch("inputs.plugins.vlm_smolvlm_local.HAS_TRANSFORMERS", False), + patch("inputs.plugins.vlm_smolvlm_local.IOProvider"), + ): + config = VLM_SmolVLM_LocalConfig(camera_index=0) + s = VLM_SmolVLM_Local(config=config) + assert s.have_cam is False + assert s.model is None + assert s.processor is None + + +@pytest.mark.asyncio +async def test_poll_returns_frame(sensor, mock_cv2_video_capture): + """Test _poll returns a valid numpy frame.""" + with patch("inputs.plugins.vlm_smolvlm_local.asyncio.sleep", new=AsyncMock()): + frame = await sensor._poll() + assert isinstance(frame, np.ndarray) + assert frame.shape == (480, 640, 3) + + +@pytest.mark.asyncio +async def test_poll_returns_none_on_failed_frame_read( + mock_transformers, mock_check_webcam +): + """Test _poll returns None when cap.read() fails.""" + mock_cap = MagicMock() + mock_cap.read.return_value = (False, None) + + with ( + patch("inputs.plugins.vlm_smolvlm_local.IOProvider"), + patch( + "inputs.plugins.vlm_smolvlm_local.torch.cuda.is_available", + return_value=False, + ), + patch( + "inputs.plugins.vlm_smolvlm_local.cv2.VideoCapture", + return_value=mock_cap, + ), + patch("inputs.plugins.vlm_smolvlm_local.asyncio.sleep", new=AsyncMock()), + ): + config = VLM_SmolVLM_LocalConfig(camera_index=0) + s = VLM_SmolVLM_Local(config=config) + result = await s._poll() + assert result is None + + +@pytest.mark.asyncio +async def test_poll_returns_none_without_camera(): + """Test _poll returns None when no camera is available.""" + with ( + patch("inputs.plugins.vlm_smolvlm_local.HAS_TRANSFORMERS", True), + patch("inputs.plugins.vlm_smolvlm_local.SmolVLMForConditionalGeneration"), + patch("inputs.plugins.vlm_smolvlm_local.SmolVLMProcessor"), + patch("inputs.plugins.vlm_smolvlm_local.IOProvider"), + patch( + "inputs.plugins.vlm_smolvlm_local.torch.cuda.is_available", + return_value=False, + ), + patch("inputs.plugins.vlm_smolvlm_local.check_webcam", return_value=False), + patch("inputs.plugins.vlm_smolvlm_local.asyncio.sleep", new=AsyncMock()), + ): + config = VLM_SmolVLM_LocalConfig(camera_index=0) + s = VLM_SmolVLM_Local(config=config) + result = await s._poll() + assert result is None + + +@pytest.mark.asyncio +async def test_raw_to_text_none(sensor): + """Test _raw_to_text returns None for None input.""" + result = await sensor._raw_to_text(None) + assert result is None + + +@pytest.mark.asyncio +async def test_raw_to_text_with_frame(sensor, mock_transformers): + """Test _raw_to_text returns a Message when given a valid frame.""" + mock_model, mock_processor = mock_transformers + + mock_output = MagicMock() + mock_output.__getitem__ = MagicMock(return_value=MagicMock()) + mock_model.generate.return_value = mock_output + + mock_inputs = MagicMock() + mock_inputs.__getitem__ = MagicMock(return_value=MagicMock()) + mock_inputs.to.return_value = mock_inputs + mock_processor.return_value = mock_inputs + mock_processor.apply_chat_template.return_value = "chat_text" + mock_processor.decode.return_value = "A chair on a wooden floor." + + sensor.processor = mock_processor + sensor.model = mock_model + + dummy_frame = np.zeros((480, 640, 3), dtype=np.uint8) + + with patch("inputs.plugins.vlm_smolvlm_local.torch") as mock_torch: + mock_torch.no_grad.return_value.__enter__ = MagicMock(return_value=None) + mock_torch.no_grad.return_value.__exit__ = MagicMock(return_value=False) + mock_torch.float16 = MagicMock() + mock_torch.float32 = MagicMock() + result = await sensor._raw_to_text(dummy_frame) + + assert isinstance(result, Message) + assert "chair" in result.message + + +@pytest.mark.asyncio +async def test_raw_to_text_no_model(sensor): + """Test _raw_to_text returns None when model is not loaded.""" + sensor.model = None + sensor.processor = None + dummy_frame = np.zeros((480, 640, 3), dtype=np.uint8) + result = await sensor._raw_to_text(dummy_frame) + assert result is None + + +def test_formatted_latest_buffer_empty(sensor): + """Test formatted_latest_buffer returns None when buffer is empty.""" + sensor.messages = [] + assert sensor.formatted_latest_buffer() is None + + +def test_formatted_latest_buffer(sensor): + """Test formatted_latest_buffer returns formatted string and clears buffer.""" + sensor.messages = [ + Message(timestamp=123.456, message="A person is sitting on a chair.") + ] + result = sensor.formatted_latest_buffer() + assert isinstance(result, str) + assert "INPUT:" in result + assert "Vision" in result + assert "A person is sitting on a chair." in result + assert "// START" in result + assert "// END" in result + assert len(sensor.messages) == 0 + + +@pytest.mark.asyncio +async def test_raw_to_text_appends_message(sensor, mock_transformers): + """Test raw_to_text appends message to buffer when valid frame given.""" + mock_model, mock_processor = mock_transformers + + mock_output = MagicMock() + mock_output.__getitem__ = MagicMock(return_value=MagicMock()) + mock_model.generate.return_value = mock_output + + mock_inputs = MagicMock() + mock_inputs.__getitem__ = MagicMock(return_value=MagicMock()) + mock_inputs.to.return_value = mock_inputs + mock_processor.return_value = mock_inputs + mock_processor.apply_chat_template.return_value = "chat_text" + mock_processor.decode.return_value = "A person is standing." + + sensor.processor = mock_processor + sensor.model = mock_model + + dummy_frame = np.zeros((480, 640, 3), dtype=np.uint8) + + with patch("inputs.plugins.vlm_smolvlm_local.torch") as mock_torch: + mock_torch.no_grad.return_value.__enter__ = MagicMock(return_value=None) + mock_torch.no_grad.return_value.__exit__ = MagicMock(return_value=False) + mock_torch.float16 = MagicMock() + mock_torch.float32 = MagicMock() + await sensor.raw_to_text(dummy_frame) + + assert len(sensor.messages) == 1 + assert "person" in sensor.messages[0].message + + +def test_check_webcam_not_found(): + """Test check_webcam returns False when camera not found.""" + from inputs.plugins.vlm_smolvlm_local import check_webcam + + with patch("inputs.plugins.vlm_smolvlm_local.cv2.VideoCapture") as mock_cap: + mock_instance = MagicMock() + mock_instance.isOpened.return_value = False + mock_cap.return_value = mock_instance + + result = check_webcam(0) + assert result is False + mock_instance.release.assert_called_once() + + +def test_check_webcam_found(): + """Test check_webcam returns True when camera found.""" + from inputs.plugins.vlm_smolvlm_local import check_webcam + + with patch("inputs.plugins.vlm_smolvlm_local.cv2.VideoCapture") as mock_cap: + mock_instance = MagicMock() + mock_instance.isOpened.return_value = True + mock_cap.return_value = mock_instance + + result = check_webcam(0) + assert result is True + mock_instance.release.assert_called_once() + + +def test_initialization_model_load_failure(): + """Test graceful degradation when model fails to load.""" + with ( + patch("inputs.plugins.vlm_smolvlm_local.HAS_TRANSFORMERS", True), + patch( + "inputs.plugins.vlm_smolvlm_local.SmolVLMProcessor" + ) as mock_processor_cls, + patch("inputs.plugins.vlm_smolvlm_local.SmolVLMForConditionalGeneration"), + patch("inputs.plugins.vlm_smolvlm_local.IOProvider"), + patch( + "inputs.plugins.vlm_smolvlm_local.torch.cuda.is_available", + return_value=False, + ), + ): + mock_processor_cls.from_pretrained.side_effect = RuntimeError("load failed") + config = VLM_SmolVLM_LocalConfig(camera_index=0) + s = VLM_SmolVLM_Local(config=config) + assert s.have_cam is False + assert s.cap is None + + +@pytest.mark.asyncio +async def test_raw_to_text_empty_response(sensor, mock_transformers): + """Test _raw_to_text returns None when model returns empty response.""" + mock_model, mock_processor = mock_transformers + + mock_output = MagicMock() + mock_output.__getitem__ = MagicMock(return_value=MagicMock()) + mock_model.generate.return_value = mock_output + + mock_inputs = MagicMock() + mock_inputs.__getitem__ = MagicMock(return_value=MagicMock()) + mock_inputs.to.return_value = mock_inputs + mock_processor.return_value = mock_inputs + mock_processor.apply_chat_template.return_value = "chat_text" + mock_processor.decode.return_value = "" + + sensor.processor = mock_processor + sensor.model = mock_model + + dummy_frame = np.zeros((480, 640, 3), dtype=np.uint8) + + with patch("inputs.plugins.vlm_smolvlm_local.torch") as mock_torch: + mock_torch.no_grad.return_value.__enter__ = MagicMock(return_value=None) + mock_torch.no_grad.return_value.__exit__ = MagicMock(return_value=False) + mock_torch.float16 = MagicMock() + mock_torch.float32 = MagicMock() + result = await sensor._raw_to_text(dummy_frame) + + assert result is None + + +@pytest.mark.asyncio +async def test_raw_to_text_inference_exception(sensor, mock_transformers): + """Test _raw_to_text returns None when inference raises exception.""" + mock_model, mock_processor = mock_transformers + + mock_inputs = MagicMock() + mock_inputs.to.return_value = mock_inputs + mock_processor.return_value = mock_inputs + mock_processor.apply_chat_template.return_value = "chat_text" + mock_model.generate.side_effect = RuntimeError("inference failed") + + sensor.processor = mock_processor + sensor.model = mock_model + + dummy_frame = np.zeros((480, 640, 3), dtype=np.uint8) + + with patch("inputs.plugins.vlm_smolvlm_local.torch") as mock_torch: + mock_torch.no_grad.return_value.__enter__ = MagicMock(return_value=None) + mock_torch.no_grad.return_value.__exit__ = MagicMock(return_value=False) + mock_torch.float16 = MagicMock() + mock_torch.float32 = MagicMock() + result = await sensor._raw_to_text(dummy_frame) + + assert result is None