Skip to content

Commit

Permalink
Merge pull request #1 from KazuyaOguma18/moveit
Browse files Browse the repository at this point in the history
feat: moveit機能の実装
  • Loading branch information
KazuyaOguma18 authored Feb 11, 2025
2 parents 36abcf8 + f7d2c98 commit b1ec3e8
Show file tree
Hide file tree
Showing 37 changed files with 1,666 additions and 219 deletions.
4 changes: 1 addition & 3 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,5 @@
]
}
},
"runArgs": [
"--device=/dev/input/js0"
]
"runArgs": ["--device=/dev/input/js0"]
}
15 changes: 0 additions & 15 deletions mofpy/launch/mofpy_demo.launch.yaml

This file was deleted.

7 changes: 3 additions & 4 deletions mofpy/mofpy/action/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# Import only the base class. Automatically import the rest
from .action import Action

import pkgutil

from .action import Action

__path__ = pkgutil.extend_path(__path__, __name__)
for importer, modname, ispkg in pkgutil.walk_packages(path=__path__,
prefix=__name__ + '.'):
for importer, modname, ispkg in pkgutil.walk_packages(path=__path__, prefix=__name__ + "."):
try:
__import__(modname)
except ImportError as e:
Expand Down
27 changes: 16 additions & 11 deletions mofpy/mofpy/action/action.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,34 @@
from abc import *
from abc import abstractmethod

from rclpy.node import Node


class Action(object):
# Note: Make sure to define the NAME in the derived class!
NAME = ''
NAME = ""
# Action names and their corresponding classes
actions = {}
# Disabled actions due to import failure. module name => reason
disabled = {}

def __init__(self, definition, node : Node):
def __init__(self, definition, node: Node):
"""
Define a action
Define a action.
:param definition: a dictionary containing the description of action
:param definition: a dictionary containing the description of action.
"""
self.definition = definition
self.node = node

@abstractmethod
def execute(self, named_joy=None):
"""
Method that is called when a preset is triggered.
Call this method when a preset is triggered.
The behavior of the method should be defined in the derived class, and
the required parameters should be retrieved in the __init__ method of
the derived class.
:param named_joy:
"""
raise NotImplementedError()
Expand All @@ -40,14 +44,15 @@ def get_required(self, key_name):

def get(self, key_name, default_val=None):
"""
Get the value in self.definition from a slash-separated key
Get the value in self.definition from a slash-separated key.
:param key_name: slash-separated key name
:param default_val: value to be used if key is not found
:return: The value. If not found, returns None
"""
definition = self.definition

for key in key_name.split('/'):
for key in key_name.split("/"):
if key not in definition:
return default_val
definition = definition[key]
Expand All @@ -56,15 +61,15 @@ def get(self, key_name, default_val=None):

def has(self, key_name):
definition = self.definition
for key in key_name.split('/'):
for key in key_name.split("/"):
if key not in definition:
return False
definition = definition[key]
return True

@staticmethod
def register_preset(cls):
if cls.NAME == '':
msg = 'NAME not defined for class {0}'.format(cls.__name__)
if cls.NAME == "":
msg = "NAME not defined for class {0}".format(cls.__name__)
raise ValueError(msg)
Action.actions[cls.NAME] = cls
61 changes: 61 additions & 0 deletions mofpy/mofpy/action/moveit_named_target.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import rclpy
import rclpy.logging

from .action import Action
from ..move_group_utils import MoveGroupUtils
from ..shared import Shared


class MoveitNamedTarget(Action):
"""
Control the movement towards a named target pose stored in MoveIt.
:type __moveit: MoveitPy
:type __planner: PlanningComponent
:type __executor: TrajectoryExecutionManager
"""

NAME = "moveit_named_target"

def __init__(self, definition, node):
super(MoveitNamedTarget, self).__init__(definition, node)
Action.actions[self.__class__.NAME] = self.__class__

self.__moveit = MoveGroupUtils.moveit
self.__planner = MoveGroupUtils.planner
self.__executor = MoveGroupUtils.executor
self.__target_name = self.get_required("target_name")

def execute(self, named_joy=None):
rclpy.logging.get_logger("mofpy.MoveitNamedTarget").error("Hello")
if Shared.get("move_group_disabled"):
msg = "move_group disabled; not executing: {0} {1}".format(
self.__action, self.__target_name
)
rclpy.logging.get_logger("mofpy.MoveitNamedTarget").error(msg)
return

self.__move__()

def __move__(self):
rclpy.logging.get_logger("mofpy.MoveitNamedTarget").info(
"Moving to {0}".format(self.__target_name)
)
self.__planner.set_start_state_to_current_state()

if self.__target_name not in self.__planner.named_target_states:
rclpy.logging.get_logger("mofpy.MoveitNamedTarget").error(
"could not find the named target '{}'. Please select from the list '{}'".format(
self.__target_name, self.__planner.named_target_states
)
)
return

self.__planner.set_goal_state(self.__target_name)
plan = self.__planner.plan()

if plan:
self.__moveit.execute(plan.trajectory, controllers=[])


Action.register_preset(MoveitNamedTarget)
145 changes: 145 additions & 0 deletions mofpy/mofpy/action/moveit_servo_joint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import threading

from control_msgs.msg import JointJog
from moveit_msgs.srv import ServoCommandType
import rclpy
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rclpy.qos import QoSProfile

from .action import Action
from ..move_group_utils import MoveGroupUtils
from ..shared import Shared


class MoveitServoJoint(Action):

NAME = "moveit_servo_joint"

def __init__(self, definition, node: Node):
super(MoveitServoJoint, self).__init__(definition, node)
Action.actions[self.__class__.NAME] = self.__class__

self.__frame_id = self.get("frame_id", "base_link")
self.__scale = self.get("scale", 0.1)
self.__quiet_on_zero = self.get("quiet_on_zero", True)
self.__joint_mapping = self.__mapping__("joints")
self.__axis_mapping = self.__mapping__("axes")
self.__published_zero = False

self.__pub = node.create_publisher(
JointJog, MoveGroupUtils.servo_node_name + "/delta_joint_cmds", QoSProfile(depth=10)
)
client_node = Node(node.get_name() + "_moveit_servo_joint")
self.__client = client_node.create_client(
ServoCommandType, MoveGroupUtils.servo_node_name + "/switch_command_type"
)

self.executor = SingleThreadedExecutor()
self.executor.add_node(client_node)
self.executor_thread = threading.Thread(target=self.executor.spin, daemon=True)
self.executor_thread.start()

self.__moveit = MoveGroupUtils.moveit
robot_model = self.__moveit.get_robot_model()
group_name = MoveGroupUtils.planner.planning_group_name
self.__joint_model_names = robot_model.get_joint_model_group(group_name).joint_model_names

def execute(self, named_joy=None):
if Shared.get("move_group_disabled"):
return

if Shared.get("moveit_servo_command_type") != ServoCommandType.Request.JOINT_JOG:
if not self.__servo_init__(self.node):
rclpy.logging.get_logger("mofpy.MoveitServoJoint").error(
"Failed to initialize servo command type"
)

jog_joints, is_quiet = self.__get_jog_joints__(named_joy["buttons"], named_joy["axes"])

if self.__quiet_on_zero:
if is_quiet:
# Publish the all-zero message just once
if not self.__published_zero:
self.__pub.publish(jog_joints)
self.__published_zero = True
return

self.__pub.publish(jog_joints)
self.__published_zero = False

def __servo_init__(self, node: Node):
while not self.__client.wait_for_service(1):
continue

req = ServoCommandType.Request()
req.command_type = ServoCommandType.Request.JOINT_JOG
rclpy.logging.get_logger("moveit_servo_joint_jog").info(
"call service {}".format(self.__client.service_name)
)

self.future = self.__client.call_async(req)
self.executor.spin_until_future_complete(future=self.future, timeout_sec=1)

res: ServoCommandType.Response = self.future.result()

if not res:
rclpy.logging.get_logger("moveit_servo_joint_jog").error(
"failed to get response from {}".format(self.__client.service_name)
)
return False

rclpy.logging.get_logger("moveit_servo_joint_jog").info(
"get response from {}".format(self.__client.service_name)
)
if res.success:
Shared.update("moveit_servo_command_type", ServoCommandType.Request.JOINT_JOG)

return res.success

def __get_jog_joints__(self, named_buttons, named_axes):
msg = JointJog()
msg.header.stamp = self.node.get_clock().now().to_msg()
msg.header.frame_id = self.__frame_id
for joint in self.__joint_mapping:
if joint in self.__joint_model_names:
v = self.__get_value__("plus", named_axes) - self.__get_value__("minus", named_axes)
vel = self.__scale * self.__get_enable_joint__(joint, named_buttons) * v
msg.joint_names.append(joint)
msg.velocities.append(vel)

is_quiet = all(val == 0 for val in msg.velocities)
return msg, is_quiet

def __mapping__(self, key=None):
mapping_key = "mapping" + f"/{key}" if key else key
params = self.get(mapping_key, {})
mapping = {}
for key in params.keys():
val = params[key]
mapping[key] = val

return mapping

def __get_enable_joint__(self, joint_name, named_buttons):
if joint_name not in self.__joint_mapping:
return 0

button = self.__joint_mapping[joint_name]
return 1 if named_buttons[button].value else 0

def __get_value__(self, axis, named_axes):
"""
Extract the axis/buttons value from joy.
:param axis: one of +, - to get the value of
:param named_axes: the processed joy values to get the value from
:return: the value
"""
if axis not in self.__axis_mapping:
return 0

return named_axes[self.__axis_mapping[axis]].value


Action.register_preset(MoveitServoJoint)
Loading

0 comments on commit b1ec3e8

Please sign in to comment.