Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apis 1131 fix name connect issue in mac #323

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 32 additions & 61 deletions src/explorepy/serial_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
"""A module for bluetooth connection"""
import logging
import struct
import threading
import time
from collections import deque
Expand All @@ -13,10 +12,7 @@
exploresdk,
settings_manager
)
from explorepy._exceptions import (
DeviceNotFoundError,
UnsupportedBtHardwareError
)
from explorepy._exceptions import DeviceNotFoundError


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -122,26 +118,6 @@ def _check_mac_address(device_name, mac_address):
return (device_name[-4:-2] == mac_address[-5:-3]) and (device_name[-2:] == mac_address[-2:])


def get_device_name(p):
""" Gets name of the Explore device
Args:
p (port instance): number of bytes to be read
"""
serial_port = serial.Serial(port=p.device, baudrate=115200, timeout=2)
get_name_cmd = b'\xC6' * 14
serial_port.write(get_name_cmd)
data = serial_port.read(4) # get packet header
if len(data) == 0:
# device does not support name query command, raise Exception
raise UnsupportedBtHardwareError
length = struct.unpack('<H', data[2:])[0] # read payload length
data = serial_port.read(length)
name = data[4:-4].decode('utf-8', errors='ignore') # read device name(12 bytes)
serial_port.close()
time.sleep(1)
return name


class SerialStream:
""" Responsible for Connecting and reconnecting explore devices via bluetooth"""

Expand Down Expand Up @@ -178,10 +154,9 @@ def connect(self):
Returns:
socket (bluetooth.socket)
"""
port = self.scan_usb_ports()

for _ in range(5):
try:
port = get_correct_com_port(self.device_name)
self.comm_manager = serial.Serial(port=port, baudrate=115200, timeout=2)

# stop stream
Expand Down Expand Up @@ -225,40 +200,6 @@ def connect(self):
"serial port before starting ExplorePy"
)

def scan_usb_ports(self):
ports = list(list_ports.comports())
for p in ports:
try:
if p.vid == 0x0483 and p.pid == 0x5740:
# Check device name
name = get_device_name(p)
if name == self.device_name:
logger.info('Device connected to USB port.')
return p.device
except UnsupportedBtHardwareError:
# device does not support naming query, continue connection process
return p.device
except PermissionError:
# do nothing here as this comes from posix
pass
except serial.serialutil.SerialException:
logger.info(
"Permission denied on serial port access, please run this command via terminal:"
"sudo chmod 777 {}".format(p)
)
except Exception as error:
self.is_connected = False
logger.info(
"Got an exception while connecting to the device: {} of type: {}".format(error, type(error))
)
logger.debug('trying to connect again as tty port is not visible yet')
logger.warning("Could not connect; Retrying in 2s...")
time.sleep(2)
raise DeviceNotFoundError(
"Could not find the device! Please turn on the device,"
"wait a few seconds and connect to serial port before starting ExplorePy"
)

def reconnect(self):
"""Reconnect to the last connected device

Expand All @@ -274,6 +215,7 @@ def disconnect(self):
self.is_connected = False
self.comm_manager.cancel_read()
self.comm_manager.close()

time.sleep(1)

def read(self, n_bytes):
Expand Down Expand Up @@ -306,3 +248,32 @@ def send(self, data):
"""
with threading.Lock():
self.comm_manager.write(data)


def get_correct_com_port(device_name):
""" Returns correct COM/tty port for usb connection
Args: device name: the name of the device to connect to
"""
ports = list(list_ports.comports())
for p in ports:
if p.vid == 0x0483 and p.pid == 0x5740:
serial_port = serial.Serial(port=p.device, baudrate=115200, timeout=2)

# stop stream
cmd = b'\xE5' * 14
serial_port.write(cmd)
time.sleep(.1)
# read all the stream data
serial_port.readall()

get_name_cmd = b'\xC6' * 14
serial_port.write(get_name_cmd)
data = serial_port.read(24)
if len(data) == 0:
# incompatible explore device, continue connection
print('got data as zero')
serial_port.close()
return p.device
name = data[8:-4].decode('utf-8', errors='ignore')
if name == device_name:
return p.device
25 changes: 15 additions & 10 deletions src/explorepy/settings_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import shutil
from pathlib import Path
from threading import Lock

import yaml
from appdirs import user_config_dir
Expand All @@ -11,6 +12,8 @@
log_path = user_config_dir(appname="Mentalab", appauthor="explorepy")
data_path = user_config_dir(appname="Mentalab", appauthor="explorepy", version='archive')

read_write_lock = Lock()


class SettingsManager:
def __init__(self, name):
Expand All @@ -33,22 +36,24 @@ def __init__(self, name):
self.sr_key = "sampling_rate"

def load_current_settings(self):
self.settings_dict = {}
stream = open(self.full_file_path, 'r')
try:
self.settings_dict = yaml.load(stream, Loader=yaml.SafeLoader)
except (yaml.scanner.ScannerError, yaml.parser.ParserError):
logger.info('Corrupt yaml file, reloading')
if self.settings_dict is None:
with read_write_lock:
self.settings_dict = {}
stream = open(self.full_file_path, 'r')
try:
self.settings_dict = yaml.load(stream, Loader=yaml.SafeLoader)
except (yaml.scanner.ScannerError, yaml.parser.ParserError):
logger.info('Corrupt yaml file, reloading')
if self.settings_dict is None:
self.settings_dict = {}

def get_file_path(self):
return self.log_path + self.file_name

def write_settings(self):
with open(self.full_file_path, 'w+') as fp:
yaml.safe_dump(self.settings_dict, fp, default_flow_style=False)
fp.close()
with read_write_lock:
with open(self.full_file_path, 'w+') as fp:
yaml.safe_dump(self.settings_dict, fp, default_flow_style=False)
fp.close()

def set_hardware_channel_mask(self, value):
""" Setter method for hardware channel mask for Explore Desktop"""
Expand Down
Loading