Skip to content

Commit

Permalink
feat: add omnigraph
Browse files Browse the repository at this point in the history
  • Loading branch information
mqjinwon committed Oct 23, 2024
1 parent b60c2df commit 641a9b8
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 0 deletions.
1 change: 1 addition & 0 deletions exts/StrideSim/StrideSim/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
from .anymal_d import AnymalD
from .anymal_d_extension import AnyamlDExtension
from .base_sample import *
from .omnigraph import ROS2OmniBackend
5 changes: 5 additions & 0 deletions exts/StrideSim/StrideSim/anymal_d.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from StrideSim.anymal_articulation import AnymalD_Atriculation
from StrideSim.base_sample import BaseSample
# from StrideSim.omnigraph import ROS2OmniBackend
from StrideSim.parameters import DEFAULT_WORLD_SETTINGS # , SIMULATION_ENVIRONMENTS


Expand Down Expand Up @@ -80,6 +81,10 @@ def setup_scene(self) -> None:
name="AnymalD",
position=np.array([0, 0, 0.8]),
)

# action graph backend
# self._backend = ROS2OmniBackend({})

timeline = omni.timeline.get_timeline_interface()
self._event_timer_callback = timeline.get_timeline_event_stream().create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.STOP),
Expand Down
120 changes: 120 additions & 0 deletions exts/StrideSim/StrideSim/omnigraph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import omni.graph.core as og


class ROS2OmniBackend():
def __init__(self, prim_paths: dict):
super().__init__()
self._prim_paths = self.initialize_prim_paths(prim_paths)
(self._omni_graph, _, _, _) = og.Controller.edit(
{"graph_path": "/action_graph", "evaluator_name": "execution"}, {}
)
self._omni_controller = og.Controller(self._omni_graph)

self._keys = og.Controller.Keys
self.initialize_omnigraph()

def initialize_prim_paths(self, prim_paths: dict):
default_paths = {
"Lidar_path": "/World/AnymalD/base/lidar_parent/lidar_PhysX",
"Imu_path": "/World/AnymalD/base/imu_link/Imu_Sensor",
"TF_path": "/World/AnymalD",
"namespace": "AnymalD",
}
for key, default in default_paths.items():
if key not in prim_paths:
prim_paths[key] = default
return prim_paths

def initialize_omnigraph(self):
self.create_general_nodes()
self.create_ros_nodes()
self.set_general_node_values()
self.set_ros_node_values()
self.connect_general_nodes()
self.connect_ros_nodes()

def create_general_nodes(self):
nodes = [
("PTick", "omni.graph.action.OnPlaybackTick"),
("Sim_Time", "omni.isaac.core_nodes.IsaacReadSimulationTime"),
("Namespace_string", "omni.graph.nodes.ConstantString"),
]
self.create_nodes(nodes)

def create_ros_nodes(self):
nodes = [
("Ros_Context", "omni.isaac.ros2_bridge.ROS2Context"),
("Imu_Read", "omni.isaac.sensor.IsaacReadIMU"),
("Ros2_Imu_Pub", "omni.isaac.ros2_bridge.ROS2PublishImu"),
("Lidar_Read", "omni.isaac.range_sensor.IsaacReadLidarPointCloud"),
("Ros2_Lidar_Pub", "omni.isaac.ros2_bridge.ROS2PublishPointCloud"),
("Ros2_tf_pub", "omni.isaac.ros2_bridge.ROS2PublishTransformTree"),
]
self.create_nodes(nodes)

def create_nodes(self, nodes):
self._omni_controller.edit(
self._omni_graph,
{self._keys.CREATE_NODES: nodes},
)

def set_general_node_values(self):
values = [
("Namespace_string.inputs:value", self._prim_paths["namespace"]),
]
self.set_node_values(values)

def set_ros_node_values(self):
values = [
("Ros_Context.inputs:useDomainIDEnvVar", False),
("Imu_Read.inputs:imuPrim", self._prim_paths["Imu_path"]),
("Lidar_Read.inputs:lidarPrim", self._prim_paths["Lidar_path"]),
("Imu_Read.inputs:readGravity", True),
("Ros2_Imu_Pub.inputs:frameId", "sim_imu"),
("Ros2_Imu_Pub.inputs:topicName", "imu"),
("Ros2_Lidar_Pub.inputs:frameId", "sim_lidar"),
("Ros2_Lidar_Pub.inputs:topicName", "point_cloud"),
("Ros2_tf_pub.inputs:targetPrims", self._prim_paths["TF_path"]),
("Namespace_string.inputs:value", self._prim_paths["namespace"]),
]
self.set_node_values(values)

def set_node_values(self, values):
self._omni_controller.edit(
self._omni_graph,
{self._keys.SET_VALUES: values},
)

def connect_general_nodes(self):
connections = [
("PTick.outputs:tick", "Imu_Read.inputs:execIn"),
("PTick.outputs:tick", "Lidar_Read.inputs:execIn"),
("PTick.outputs:tick", "Ros2_tf_pub.inputs:execIn"),
("Sim_Time.outputs:simulationTime", "Ros2_Imu_Pub.inputs:timeStamp"),
("Sim_Time.outputs:simulationTime", "Ros2_Lidar_Pub.inputs:timeStamp"),
("Sim_Time.outputs:simulationTime", "Ros2_tf_pub.inputs:timeStamp"),
]
self.connect_nodes(connections)

def connect_ros_nodes(self):
connections = [
("Ros_Context.outputs:context", "Ros2_Imu_Pub.inputs:context"),
("Imu_Read.outputs:execOut", "Ros2_Imu_Pub.inputs:execIn"),
("Imu_Read.outputs:angVel", "Ros2_Imu_Pub.inputs:angularVelocity"),
("Imu_Read.outputs:linAcc", "Ros2_Imu_Pub.inputs:linearAcceleration"),
("Imu_Read.outputs:orientation", "Ros2_Imu_Pub.inputs:orientation"),
("Ros_Context.outputs:context", "Ros2_Lidar_Pub.inputs:context"),
("Lidar_Read.outputs:execOut", "Ros2_Lidar_Pub.inputs:execIn"),
("Lidar_Read.outputs:data", "Ros2_Lidar_Pub.inputs:data"),
("Ros_Context.outputs:context", "Ros2_tf_pub.inputs:context"),
("Namespace_string.inputs:value", "Ros2_Imu_Pub.inputs:nodeNamespace"),
("Namespace_string.inputs:value", "Ros2_Lidar_Pub.inputs:nodeNamespace"),
("Namespace_string.inputs:value", "Ros2_tf_pub.inputs:nodeNamespace"),
]
self.connect_nodes(connections)

def connect_nodes(self, connections):
self._omni_controller.edit(
self._omni_graph,
{self._keys.CONNECT: connections},
)

0 comments on commit 641a9b8

Please sign in to comment.