Pb (bug?) with custom ROS2 service
Posted: 20 Nov 2024, 16:36
Hi,
My goal is to create a ROS2 service to get the image from a vision sensor. To do this, I follow these steps :
1/ Publish a ROS2 message Image from an PNG file
And I can see this image with :
This code validates that I can read a PNG image and send it as a sensor_msgs.msg.Image message.
Scene : https://drive.google.com/file/d/1bRa8gy ... sp=sharing
2/ Create a ROS2 service with a custom message based on Image
I defined the MyImage service message in a tutorial_interfaces ROS2 package :
MyImage.srv file
Now, I test that I can use this new message to define a ROS2 service with this image_server.py program :
and test it with this client.py file :
And I can see the image so it prove that I can create a service with this new MyImage message (all is OK with ROS2).
3/ Coppelia + ROS2 MyImage service
I have this Coppelia python script :
Scene : https://drive.google.com/file/d/1tSr289 ... sp=sharing
Now I test the service :
Here's the result :
So, why do I have this serialization error and not in the case of Topic.
Regrads,
Philippe
My goal is to create a ROS2 service to get the image from a vision sensor. To do this, I follow these steps :
1/ Publish a ROS2 message Image from an PNG file
Code: Select all
import rclpy
from rclpy.node import Node
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
class Publisher(Node):
def __init__(self):
super().__init__('publisher')
self.publisher = self.create_publisher(Image, 'image', 1)
def publish(self):
cv_image = cv2.imread('cylinder.png')
# Convert OpenCV image to ROS Image message
bridge = CvBridge()
ros_image = bridge.cv2_to_imgmsg(cv_image, encoding='bgr8')
self.publisher.publish(ros_image)
def sysCall_init():
sim = require('sim')
rclpy.init()
self.publisher_node = Publisher()
def sysCall_sensing():
self.publisher_node.publish()
rclpy.spin_once(self.publisher_node, timeout_sec=0)
def sysCall_cleanup():
self.publisher_node.destroy_node()
rclpy.shutdown()
Code: Select all
ros2 run image_view image_view --ros-args --remap image:=/image
Scene : https://drive.google.com/file/d/1bRa8gy ... sp=sharing
2/ Create a ROS2 service with a custom message based on Image
I defined the MyImage service message in a tutorial_interfaces ROS2 package :
MyImage.srv file
Code: Select all
---
sensor_msgs/Image image
Code: Select all
import rclpy
from rclpy.node import Node
from tutorial_interfaces.srv import MyImage
from sensor_msgs.msg import Image
from std_msgs.msg import Header
from builtin_interfaces.msg import Time
import cv2
from cv_bridge import CvBridge
import os
class ImageServer(Node):
def __init__(self):
super().__init__('image_server')
self.srv = self.create_service(MyImage, 'vision_sensor', self.service_callback)
self.bridge = CvBridge()
# Specify the path to your image file
self.image_path = 'cylinder.jpg' # Update this path
def service_callback(self, request, response):
self.get_logger().info('Incoming request for image')
# Read the image using OpenCV
if not os.path.exists(self.image_path):
self.get_logger().error(f'Image file not found: {self.image_path}')
return response
cv_image = cv2.imread(self.image_path)
if cv_image is None:
self.get_logger().error('Failed to read image')
return response
# Convert OpenCV image to ROS Image message
ros_image = self.bridge.cv2_to_imgmsg(cv_image, encoding='bgr8')
# Populate the response
response.image = ros_image
return response
def main(args=None):
rclpy.init(args=args)
image_server = ImageServer()
rclpy.spin(image_server)
image_server.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Code: Select all
# image_service/image_service/image_client.py
import rclpy
from rclpy.node import Node
from tutorial_interfaces.srv import MyImage
from cv_bridge import CvBridge
import cv2
class ImageClient(Node):
def __init__(self):
super().__init__('image_client')
self.cli = self.create_client(MyImage, 'vision_sensor')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Waiting for service...')
self.req = MyImage.Request()
self.bridge = CvBridge()
def send_request(self):
future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
self.get_logger().info('Received image response')
ros_image = future.result().image
cv_image = self.bridge.imgmsg_to_cv2(ros_image, desired_encoding='bgr8')
cv2.imshow('Received Image', cv_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
else:
self.get_logger().error('Service call failed')
def main(args=None):
rclpy.init(args=args)
image_client = ImageClient()
image_client.send_request()
image_client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
3/ Coppelia + ROS2 MyImage service
I have this Coppelia python script :
Code: Select all
# Ensure that this script is added as a threaded child script in CoppeliaSim
# Set the script's interpreter to Python
from sensor_msgs.msg import Image
import numpy as np
import cv2
from cv_bridge import CvBridge
def sysCall_init():
sim = require('sim')
simROS2 = require('simROS2')
self.visionSensor = sim.getObject('/visionSensor')
global srv
sim.addLog(sim.verbosity_msgs, 'Thread started')
# Create the ROS2 service
srv = simROS2.createService('/vision_sensor', 'tutorial_interfaces/srv/MyImage', srv_callback)
def sysCall_thread():
while not sim.getSimulationStopping():
# Keep the thread alive
sim.wait(0.01) # Wait for 10ms before the next iteration
def srv_callback(req):
sim.addLog(sim.verbosity_msgs, f'Service called: {req}')
# Create the response
resp = simROS2.createInterface('tutorial_interfaces/srv/MyImageResponse')
print('==================')
bridge = CvBridge()
cv_image = cv2.imread('cylinder.png')
# Convert OpenCV image to ROS Image message
bridge = CvBridge()
ros_image = bridge.cv2_to_imgmsg(cv_image, encoding='bgr8')
resp['image'] = ros_image
print('image generated')
return resp
def sysCall_cleanup():
# Shutdown the ROS2 service
simROS2.shutdownService(srv)
sim.addLog(sim.verbosity_msgs, 'Thread finished')
Now I test the service :
Code: Select all
ros2 service call /vision_sensor tutorial_interfaces/srv/MyImage
Code: Select all
[/scriptCamera:info] Service called: {}
==================
image generated
[/scriptCamera:warning] Received non-serializable data: {'image': sensor_msgs.msg.Image(header=std_msgs.msg.Header(stamp=builtin_interfaces.msg.Time(sec=0, nanosec=0), frame_id=''), height=100, width=100, encoding='bgr8', is_bigendian=0, step=300, data=[198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198, 206, 217, 198,
Regrads,
Philippe