Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 75 additions & 39 deletions src/main/scripts/lib/micropython/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,33 @@
import json


class RPCBusError(Exception):
def __init__(self, data):
super().__init__(f"error on RPC bus: {data}")
self.data = data

class MessageTypeError(Exception):
def __init__(self, message):
super().__init__(f"unexpected message type: {message['type']}")
self.type = message['type']
self.message = message

class Device:
def __init__(self, device_bus, device_id):
self.bus = device_bus
self.device_id = device_id
self.methods = None
self._methods = None

@property
def methods(self):
if self._methods is None:
self._methods = self.bus.methods(self.device_id)
return self._methods

def __getattr__(self, item):
return lambda *args: self.bus.invoke(self.device_id, item, *args)

def __str__(self):
if self.methods is None:
self.methods = self.bus.methods(self.device_id)
doc = ""
for method in self.methods:
doc += method["name"] + "("
Expand Down Expand Up @@ -54,19 +69,18 @@ def __str__(self):


class DeviceBus:
MESSAGE_DELIMITER = "\0"
MESSAGE_DELIMITER = b'\0'

def __init__(self, path):
self.file = io.open(path, "+b")
os.system("stty -F %s raw -echo" % path)
self.poll = select.poll()
self.poll.register(self.file.fileno(), select.POLLIN)
self.buffer = None
self.buffer_pos = 0
self._clear_buffer()

def close(self):
self.file.close()
self.buffer = None
self._clear_buffer()

def flush(self):
self._clear_buffer()
Expand Down Expand Up @@ -114,62 +128,84 @@ def _write_message(self, data):
self.file.write(self.MESSAGE_DELIMITER + json.dumps(data) + self.MESSAGE_DELIMITER)

def _read_message(self, expected_type):
message = ""
'''Read a message from the bus, blocking if necessary

@param expected_type: The type of message we expect to see (eg. results)
'''

message = b""

# Skip leading delimiters
while True:
if self.buffer is None:
if self._buffer_remaining() == 0:
self._fill_buffer()

value = self.buffer.decode()[self.buffer_pos:]
while self._buffer_remaining() and self._buffer[self._buffer_pos] in self.MESSAGE_DELIMITER:
self._buffer_pos += 1

if len(message) == 0 and value[0] == self.MESSAGE_DELIMITER:
self.buffer_pos += 1
value = value[1:]
if self._buffer_remaining():
break

if value.find(self.MESSAGE_DELIMITER) != -1:
value = value[:value.find(self.MESSAGE_DELIMITER) + 1]
self.buffer_pos += len(value)
if self.buffer_pos >= len(self.buffer):
self._clear_buffer()
# Rest of the buffer should have at least one non-delim byte
# Read full message
while (next_delim_pos := self._buffer.find(self.MESSAGE_DELIMITER, self._buffer_pos)) == -1:
message += self._read_buffer()
self._fill_buffer()

message += self._read_buffer(next_delim_pos)

# parse message
data = json.loads(message)
if data["type"] == expected_type:
if "data" in data:
return data["data"]
else:
self._clear_buffer()

message += value

if message[-1] == self.MESSAGE_DELIMITER:
data = json.loads(message)
if data["type"] == expected_type:
if "data" in data:
return data["data"]
else:
return
elif data["type"] == "error":
raise Exception(data["data"])
else:
raise Exception("unexpected message type: %s" % data["type"])
return
elif data["type"] == "error":
raise RPCBusError(data["data"])
else:
raise MessageTypeError(data)


def _buffer_remaining(self):
return len(self._buffer) - self._buffer_pos

def _read_buffer(self, end=None):
if end is None:
end = len(self._buffer)

old_pos = self._buffer_pos
self._buffer_pos = end
return self._buffer[old_pos:end]

def _clear_buffer(self):
self.buffer = None
self.buffer_pos = 0
self._buffer = bytearray()
self._buffer_pos = 0

def _fill_buffer(self):
assert self._buffer_remaining() == 0
self.poll.poll() # Blocking wait until we have some data.
self.buffer = self._read(1024)
self.buffer_pos = 0
self._buffer = self._read(1024)
self._buffer_pos = 0

def _has_file_data(self):
'''Check if there is data available on the bus without blocking'''
return len(self.poll.poll(0)) > 0

def _read(self, limit):
# This is horrible, but don't know how to know how many bytes are available,
# so reading one by one is necessary to avoid blocking.
data = bytearray()
bytesRead = 0
while bytesRead < limit and len(self.poll.poll(0)) > 0:
while bytesRead < limit and self._has_file_data():
data.extend(self.file.read(1))
bytesRead += 1
return data

def _skip_input(self):
# This is horrible, but don't know how to know how many bytes are available,
# so reading one by one is necessary to avoid blocking.
while len(self.poll.poll(0)) > 0:
while self._has_file_data():
self.file.read(1)


Expand Down