Skip to content

Commit

Permalink
✨ Add the possibility to set the base pose in the visualizer and add …
Browse files Browse the repository at this point in the history
…the 3D point visualization (#74)
  • Loading branch information
GiulioRomualdi authored Jan 8, 2024
1 parent ce77ce8 commit 7fdfcf2
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 23 deletions.
120 changes: 109 additions & 11 deletions robot_log_visualizer/file_reader/signal_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import h5py
import numpy as np
from PyQt5.QtCore import pyqtSignal, QThread, QMutex, QMutexLocker
from robot_log_visualizer.utils.utils import PeriodicThreadState
from robot_log_visualizer.utils.utils import PeriodicThreadState, RobotStatePath
import idyntree.swig as idyn


class TextLoggingMsg:
Expand Down Expand Up @@ -41,6 +42,12 @@ def __init__(self, period: float):
self._index = 0
self.index_lock = QMutex()

self._robot_state_path = RobotStatePath()
self.robot_state_path_lock = QMutex()

self._3d_points_path = {}
self._3d_points_path_lock = QMutex()

self.period = period

self.data = {}
Expand Down Expand Up @@ -190,6 +197,17 @@ def index(self, index):
locker = QMutexLocker(self.index_lock)
self._index = index

@property
def robot_state_path(self):
locker = QMutexLocker(self.robot_state_path_lock)
value = self._robot_state_path
return value

@robot_state_path.setter
def robot_state_path(self, robot_state_path):
locker = QMutexLocker(self.robot_state_path_lock)
self._robot_state_path = robot_state_path

def register_update_index(self, slot):
self.update_index_signal.connect(slot)

Expand All @@ -207,18 +225,98 @@ def current_time(self):
value = self._current_time
return value

def get_joints_position(self):
return self.data[self.root_name]["joints_state"]["positions"]["data"]
def get_item_from_path(self, path, default_path=None):
data = self.data[self.root_name]

if not path:
if default_path is None:
return None, None
else:
for key in default_path:
data = data[key]
return data["data"], data["timestamps"]

for key in path:
data = data[key]

return data["data"], data["timestamps"]

def get_item_from_path_at_index(self, path, index, default_path=None):
data, timestamps = self.get_item_from_path(path, default_path)
if data is None:
return None
closest_index = np.argmin(np.abs(timestamps - self.timestamps[index]))
return data[closest_index, :]

def get_robot_state_at_index(self, index):
robot_state = {}

self.robot_state_path_lock.lock()
robot_state["joints_position"] = self.get_item_from_path_at_index(
self._robot_state_path.joints_state_path,
index,
default_path=["joints_state", "positions"],
)

robot_state["base_position"] = self.get_item_from_path_at_index(
self._robot_state_path.base_position_path, index
)

def get_joints_position_at_index(self, index):
joints_position_timestamps = self.data[self.root_name]["joints_state"][
"positions"
]["timestamps"]
# given the index find the closest timestamp
closest_index = np.argmin(
np.abs(joints_position_timestamps - self.timestamps[index])
robot_state["base_orientation"] = self.get_item_from_path_at_index(
self._robot_state_path.base_orientation_path, index
)
return self.get_joints_position()[closest_index, :]
self.robot_state_path_lock.unlock()

if robot_state["base_position"] is None:
robot_state["base_position"] = np.zeros(3)

if robot_state["base_orientation"] is None:
robot_state["base_orientation"] = np.zeros(3)

# check the size of the base_orientation if 3 we assume is store ad rpy otherwise as a quaternion we need to convert it in a rotation matrix
if robot_state["base_orientation"].shape[0] == 3:
robot_state["base_orientation"] = idyn.Rotation.RPY(
robot_state["base_orientation"][0],
robot_state["base_orientation"][1],
robot_state["base_orientation"][2],
).toNumPy()
if robot_state["base_orientation"].shape[0] == 4:
# convert the x y z w quaternion into w x y z
tmp_quat = robot_state["base_orientation"]
tmp_quat = np.array([tmp_quat[3], tmp_quat[0], tmp_quat[1], tmp_quat[2]])

robot_state["base_orientation"] = idyn.Rotation.RotationFromQuaternion(
tmp_quat
).toNumPy()

return robot_state

def get_3d_point_at_index(self, index):
points = {}

self._3d_points_path_lock.lock()

for key, value in self._3d_points_path.items():
# force the size of the points to be 3 if less than 3 we assume that the point is a 2d point and we add a 0 as z coordinate
points[key] = self.get_item_from_path_at_index(value, index)
if points[key].shape[0] < 3:
points[key] = np.concatenate(
(points[key], np.zeros(3 - points[key].shape[0]))
)

self._3d_points_path_lock.unlock()

return points

def register_3d_point(self, key, points_path):
self._3d_points_path_lock.lock()
self._3d_points_path[key] = points_path
self._3d_points_path_lock.unlock()

def unregister_3d_point(self, key):
self._3d_points_path_lock.lock()
del self._3d_points_path[key]
self._3d_points_path_lock.unlock()

def run(self):
while True:
Expand Down
50 changes: 40 additions & 10 deletions robot_log_visualizer/robot_visualizer/meshcat_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ def __init__(self, signal_provider, period):
self.state_lock = QMutex()

self._period = period
self.meshcat_visualizer = MeshcatVisualizer()
self._meshcat_visualizer = MeshcatVisualizer()
self.meshcat_visualizer_mutex = QMutex()

self._is_model_loaded = False
self._signal_provider = signal_provider

self.custom_model_path = ""
self.custom_package_dir = ""
self.env_list = ["GAZEBO_MODEL_PATH", "ROS_PACKAGE_PATH", "AMENT_PREFIX_PATH"]
self._registered_3d_points = set()

@property
def state(self):
Expand All @@ -44,6 +47,19 @@ def state(self, new_state: PeriodicThreadState):
locker = QMutexLocker(self.state_lock)
self._state = new_state

def register_3d_point(self, point_path, color):
radius = 0.02
locker = QMutexLocker(self.meshcat_visualizer_mutex)
self._registered_3d_points.add(point_path)
self._meshcat_visualizer.load_sphere(
radius=radius, color=color, shape_name=point_path
)

def unregister_3d_point(self, point_path):
locker = QMutexLocker(self.meshcat_visualizer_mutex)
self._registered_3d_points.remove(point_path)
self._meshcat_visualizer.delete(shape_name=point_path)

def load_model(self, considered_joints, model_name):
def get_model_path_from_envs(env_list):
return [
Expand Down Expand Up @@ -132,7 +148,7 @@ def find_model_joints(model_name, considered_joints):
if not model_loader.isValid():
return False

self.meshcat_visualizer.load_model(
self._meshcat_visualizer.load_model(
model_loader.model(), model_name="robot", color=0.8
)

Expand All @@ -141,23 +157,37 @@ def find_model_joints(model_name, considered_joints):
return True

def run(self):
base_rotation = np.eye(3)
base_position = np.array([0.0, 0.0, 0.0])
identity = np.eye(3)

while True:
start = time.time()

if self.state == PeriodicThreadState.running and self._is_model_loaded:
robot_state = self._signal_provider.get_robot_state_at_index(
self._signal_provider.index
)

self.meshcat_visualizer_mutex.lock()
# These are the robot measured joint positions in radians
self.meshcat_visualizer.set_multibody_system_state(
base_position,
base_rotation,
joint_value=self._signal_provider.get_joints_position_at_index(
self._signal_provider.index
)[self.model_joints_index],
self._meshcat_visualizer.set_multibody_system_state(
base_position=robot_state["base_position"],
base_rotation=robot_state["base_orientation"],
joint_value=robot_state["joints_position"][self.model_joints_index],
model_name="robot",
)

for points_path, points in self._signal_provider.get_3d_point_at_index(
self._signal_provider.index
).items():
if points_path not in self._registered_3d_points:
continue

self._meshcat_visualizer.set_primitive_geometry_transform(
position=points, rotation=identity, shape_name=points_path
)

self.meshcat_visualizer_mutex.unlock()

sleep_time = self._period - (time.time() - start)
if sleep_time > 0:
time.sleep(sleep_time)
Expand Down
Loading

0 comments on commit 7fdfcf2

Please sign in to comment.