Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 91 additions & 26 deletions provision/bluet.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
#! /usr/bin/python3
import argparse
import asyncio
import os
import sys
import termios
import time
import argparse
import asyncio
import tty

import modules.util as _util
from bleak import BleakScanner
from bleak import BleakClient
from bleak import BleakClient, BleakScanner


class Bluetooth:

def __init__(self, args):
self.args = args
self.stop_event = asyncio.Event()
self.search_addr = None
self.found_dev = None
self.devices = {}

async def scan(self):
print("Scanning....")
async with BleakScanner(self.callback) as scanner:
await self.stop_event.wait()
print("Scanning.... (Press any key to stop)")
async with KeypressMonitor() as keypress:
async with BleakScanner(self.callback) as scanner:
await keypress.wait()

def callback(self, dev, data):
# Avoid duplicates
if dev.address in self.devices: return
if dev.address in self.devices:
return
# New device
d = Device(dev, data)
# Ignore faraway devies
if (self.args.rssi is not None) and d.rssi < self.args.rssi: return
if (self.args.rssi is not None) and d.rssi < self.args.rssi:
return
self.devices[dev.address] = d
print("{}".format(d))
if self.search_addr == d.address:
Expand All @@ -42,7 +46,8 @@ async def find(addr):
print("{}* FIND {}".format(_util.MARGIN, addr))
scanner = BleakScanner()
dev = await scanner.find_device_by_address(addr)
if dev is None: return None
if dev is None:
return None
return Device(dev, None)

@staticmethod
Expand All @@ -51,14 +56,16 @@ async def read(addr, uuid):
# Search for the device
scanner = BleakScanner()
dev = await scanner.find_device_by_address(addr)
if dev is None: _util.fail("Device not found: {}".format(addr))
if dev is None:
_util.fail("Device not found: {}".format(addr))
# Connect
print("{}Connecting...".format(_util.MARGIN))
async with BleakClient(dev.address) as cli:
print("{} [{}]".format(_util.MARGIN, uuid))
ch = Bluetooth.findCharact(cli, uuid)
if ch is None: _util.fail("Unknown characteristic: {}".format(uuid))
res = await cli.read_gatt_char(uuid)
if ch is None:
_util.fail("Unknown characteristic: {}".format(uuid))
res = await cli.read_gatt_char(uuid)
print("{}READ({}): {}".format(2*_util.MARGIN, len(res), res.hex()))
return res

Expand All @@ -69,13 +76,15 @@ async def write(addr, uuid, value):
# Search for the device
scanner = BleakScanner()
dev = await scanner.find_device_by_address(addr)
if dev is None: _util.fail("Device not found: {}".format(addr))
if dev is None:
_util.fail("Device not found: {}".format(addr))
# Connect
print("{}Connecting...".format(_util.MARGIN))
async with BleakClient(dev.address) as cli:
print("{} [{}]".format(_util.MARGIN, uuid))
ch = Bluetooth.findCharact(cli, uuid)
if ch is None: _util.fail("Unknown characteristic: {}".format(uuid))
if ch is None:
_util.fail("Unknown characteristic: {}".format(uuid))
await cli.write_gatt_char(ch, data)
print("{}WRITE({}): {}".format(2*_util.MARGIN, len(data), data.hex()))

Expand All @@ -86,17 +95,19 @@ async def test(addr, uuid, value):
# Search for the device
scanner = BleakScanner()
dev = await scanner.find_device_by_address(addr)
if dev is None: _util.fail("Device not found: {}".format(addr))
if dev is None:
_util.fail("Device not found: {}".format(addr))
# Connect
print("{}Connecting...".format(_util.MARGIN))
async with BleakClient(dev.address) as cli:
print("{} [{}]".format(_util.MARGIN, uuid))
ch = Bluetooth.findCharact(cli, uuid)
if ch is None: _util.fail("Unknown characteristic: {}".format(uuid))
if ch is None:
_util.fail("Unknown characteristic: {}".format(uuid))
await cli.write_gatt_char(ch, data)
print("{}WRITE({}): {}".format(2*_util.MARGIN, len(data), data.hex()))
await asyncio.sleep(0.1)
res = await cli.read_gatt_char(ch)
res = await cli.read_gatt_char(ch)
print("{}READ({}): {}".format(2*_util.MARGIN, len(res), res.hex()))
return res

Expand All @@ -109,9 +120,53 @@ def findCharact(cli, uuid):
return None


# Handles keyboard input monitoring for interrupting async operations.
class KeypressMonitor:

def __init__(self):
self.stop_event = asyncio.Event()
self.task = None

# Start monitoring for keypress.
async def __aenter__(self):
self.task = asyncio.create_task(self._monitor_keypress())
return self

# Clean up the keypress task.
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.task and not self.task.done():
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
return False

# Wait for any keypress to interrupt scanning.
async def _monitor_keypress(self):
# Run the blocking read in a thread
await asyncio.to_thread(self._read_key)
if not self.stop_event.is_set():
print("\n{}Scan stopped by user".format(_util.MARGIN))
self.stop_event.set()

# Blocking read in a thread
def _read_key(self):
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
sys.stdin.read(1) # Block until a key is pressed
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)

async def wait(self):
await self.stop_event.wait()


class Device:

def __init__(self, info, data = None):
def __init__(self, info, data=None):
self.address = info.address
self.name = info.name
self.rssi = None
Expand Down Expand Up @@ -143,6 +198,7 @@ def print(self):
for uuu, d in c.descripts.items():
print("{}. {}".format(3 * _util.MARGIN, d))


class Service:
def __init__(self, info):
self.uuid = info.uuid
Expand All @@ -157,6 +213,7 @@ def __str__(self):
handle = (self.handle is not None) and "{}".format(self.handle) or '?'
return "{} {:02} \"{}\"".format(self.uuid, handle, self.name)


class Characteristic:
def __init__(self, info):
self.uuid = info.uuid
Expand All @@ -171,6 +228,7 @@ def __str__(self):
handle = (self.handle is not None) and "{}".format(self.handle) or '?'
return "{} {:02} \"{}\"".format(self.uuid, handle, self.name)


class Descriptor:
def __init__(self, info):
self.uuid = info.uuid
Expand All @@ -191,26 +249,33 @@ async def run(args):
if 'desc' == args.action:
# Describe
d = await b.find(args.address)
if d is None: _util.fail("Not found: {}".format(args.address))
if d is None:
_util.fail("Not found: {}".format(args.address))
await d.collect()
d.print()
elif 'read' == args.action:
# Read
if args.specific is None: _util.fail("Missing UUID or handle")
if args.specific is None:
_util.fail("Missing UUID or handle")
res = await Bluetooth.read(args.address, args.specific)
elif 'write' == args.action:
# Write
if args.specific is None: _util.fail("Missing UUID or handle")
if args.value is None: _util.fail("Missing value")
if args.specific is None:
_util.fail("Missing UUID or handle")
if args.value is None:
_util.fail("Missing value")
data = bytearray.fromhex(args.value)
res = await Bluetooth.write(args.address, args.specific, data)
elif 'test' == args.action:
# Test
if args.specific is None: _util.fail("Missing UUID or handle")
if args.value is None: _util.fail("Missing value")
if args.specific is None:
_util.fail("Missing UUID or handle")
if args.value is None:
_util.fail("Missing value")
data = bytearray.fromhex(args.value)
res = await Bluetooth.test(args.address, args.specific, data)


async def main(argv):
base_dir = os.path.normpath(os.path.dirname(__file__))

Expand Down
Loading
Loading