#!/usr/bin/env pythonimport rospyfrom roslib import messagefrom sensor_msgs import point_cloud2from sensor_msgs.msg import PointCloud2from geometry_msgs.msg import Twistfrom math import copysignclassFollower():def__init__(self): rospy.init_node("follower")# Вызовем функцию выключения (остановим робота) rospy.on_shutdown(self.shutdown)# Размеры (в метрах) области, в которой мы будем искать человека. # Они задаются в координатах камеры, где x-влево/вправо,# y-вверх/вниз, а z-глубина (вперед/назад) self.min_x = rospy.get_param("~min_x", -0.2) self.max_x = rospy.get_param("~max_x", 0.2) self.min_y = rospy.get_param("~min_y", -0.3) self.max_y = rospy.get_param("~max_y", 0.5) self.max_z = rospy.get_param("~max_z", 1.2)# Целевое расстояние (в метрах), которое должно держаться между роботом и человеком self.goal_z = rospy.get_param("~goal_z", 0.6)# Максимальная разницы между целевым и реальным расстоянем до цели до того, как робот отреагирует self.z_threshold = rospy.get_param("~z_threshold", 0.05)# Насколько далеко от центра (смещение x) может находиться человек # до того, как робот отреагирует self.x_threshold = rospy.get_param("~x_threshold", 0.05)# Как мы взвешиваем целевое расстояние, когда совершаем движение self.z_scale = rospy.get_param("~z_scale", 1.0)# Как мы взвешивем отклонение человека влево/вправо, когда совершаем движение self.x_scale = rospy.get_param("~x_scale", 2.5)# Максимальная скорость вращения в рад/с self.max_angular_speed = rospy.get_param("~max_angular_speed", 2.0)# Минимальная скорость вращения в рад/с self.min_angular_speed = rospy.get_param("~min_angular_speed", 0.0)# Максимальная линейная скорость в м/c self.max_linear_speed = rospy.get_param("~max_linear_speed", 0.3)# Минимальная линейная скорость в м/c self.min_linear_speed = rospy.get_param("~min_linear_speed", 0.1)# Фактор замедления при остановке self.slow_down_factor = rospy.get_param("~slow_down_factor", 0.8)# Инициализируем команду перемещения self.move_cmd =Twist()# Publisher для управления движения робота self.cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=5)# Подписка на облако точек self.depth_subscriber = rospy.Subscriber('point_cloud', PointCloud2, self.set_cmd_vel, queue_size=1) rospy.loginfo("Subscribing to point cloud...")# Ждем топик pointcloud, чтобы он стал доступным rospy.wait_for_message('point_cloud', PointCloud2) rospy.loginfo("Ready to follow!")defset_cmd_vel(self,msg):# Инициализация счетчика точек координат центроида x = y = z = n =0# Считаем в координатах x, y, z все точки в облакеfor point in point_cloud2.read_points(msg, skip_nans=True): pt_x = point[0] pt_y = point[1] pt_z = point[2]# Учитываем только эти точки в пределах наших обозначенных границ и суммируем ихif-pt_y > self.min_y and-pt_y < self.max_y and pt_x < self.max_x and pt_x > self.min_x and pt_z < self.max_z: x += pt_x y += pt_y z += pt_z n +=1# Если у нас есть точки, вычислим координаты центроидаif n: x /= n y /= n z /= n# Проверим наши предельные отклоненияif (abs(z - self.goal_z)> self.z_threshold):# Вычислим угловую составляющую движения linear_speed = (z - self.goal_z) * self.z_scale# Убедимся, что мы соответствуем нашим минимальным/максимальным характеристикам self.move_cmd.linear.x =copysign(max(self.min_linear_speed, min(self.max_linear_speed, abs(linear_speed))), linear_speed)else: self.move_cmd.linear.x *= self.slow_down_factorif (abs(x)> self.x_threshold):# Вычислим линейную составляющую движения angular_speed =-x * self.x_scale# Убедимся, что мы соответствуем нашим минимальным/максимальным характеристикам self.move_cmd.angular.z =copysign(max(self.min_angular_speed, min(self.max_angular_speed, abs(angular_speed))), angular_speed)else:# Плавно остановим вращение self.move_cmd.angular.z *= self.slow_down_factorelse:# Плавно остановим робота self.move_cmd.linear.x *= self.slow_down_factor self.move_cmd.angular.z *= self.slow_down_factor# Опубликуем команду перемещения self.cmd_vel_pub.publish(self.move_cmd)defshutdown(self): rospy.loginfo("Stopping the robot...")# Отмените регистрацию подписчика, чтобы остановить публикацию cmd_vel self.depth_subscriber.unregister() rospy.sleep(1)# Отправим пустое Twist-сообщение, чтобы остановить робота self.cmd_vel_pub.publish(Twist()) rospy.sleep(1)if__name__=='__main__':try:Follower() rospy.spin()except rospy.ROSInterruptException: rospy.loginfo("Follower node terminated.")
The overall strategy behind the script is fairly simple. First, sample all the points in the
depth cloud that lie within a search box in front of the robot. From those points,
compute the centroid of the region; i.e. the average x, y and z value for all the points. If
there is a person in front of the robot, the z-coordinate of the centroid tells us how far
away they are and the x-coordinate reflects whether they are to the right or left. From
these two numbers we can compute an appropriate Twist message to keep the robot
near the person
Общая стратегия, лежащая в основе скрипта, довольно проста. Во-первых, проверим все точки в облаке глубины, которые лежат в поле поиска перед роботом. Из этих точек вычислим центроид области, т. е. среднее значение x, y и z для всех точек. Если перед роботом находится человек, то z-координата центроида говорит нам, как далеко они находятся, а X-координата отражает, находятся ли они справа или слева. Из этих двух чисел мы можем вычислить соответствующее Twist-сообщение, чтобы держать робота рядом с человеком.
Давайте теперь посмотрим на ключевые строки сценария.
from roslib import messagefrom sensor_msgs import point_cloud2from sensor_msgs.msg import PointCloud2
Чтобы получить доступ к точкам в облаке глубины, нам нужен класс message из roslib и библиотека point_cloud2 из пакета Ros sensor_msgs. Нам также нужен тип сообщения PointCloud2.
Длинный список параметров должен быть достаточно понятным из комментариев в коде. Сердцем скрипта является обратный вызов set_cmd_vel() в топике облака точек:
Обратите внимание, как мы используем общее имя топика ('point_cloud') в операторе подписчика. Это позволяет нам переназначить топик облака в launch-файле. Обычно мы будем использовать топик /camera/depth_registered / points, но если мы предварительно отфильтруем облако, мы можем использовать другую тему.
defset_cmd_vel(self,msg):# Инициализация счетчика точек координат центроида x = y = z = n =0
Каждый раз, когда мы получаем сообщение в топике point_cloud, мы начинаем обратный вызов set_cmd_vel, обнуляя координаты центроида и количество точек.
for point in point_cloud2.read_points(msg, skip_nans=True): pt_x = point[0] pt_y = point[1] pt_z = point[2]
Here we use the point_cloud2 library to cycle through all the points in the cloud. The skip_nans parameter is handy since a NaN (not a number) can occur when the point is inside or outside the camera's depth range.
Здесь мы используем библиотеку point_cloud2 для перебора всех точек в облаке. Параметр skip_nans нужен, так как NaN (не число) может возникать, когда точка находится внутри или вне диапазона глубины камеры.
if-pt_y > self.min_y and-pt_y < self.max_y and pt_x < self.max_x and pt_x > self.min_x and pt_z < self.max_z: x += pt_x y += pt_y z += pt_z n +=1
Для каждой точки в облачном сообщении мы проверяем, попадает ли она в область поиска. Если это так, добавим его координаты x, y и z к суммам центроидов и увеличьте количество точек.
self.move_cmd =Twist()
Инициализируем команду перемещения в пустое сообщение Twist, которое по умолчанию остановит робота.
if n: x /= n y /= n z /= n
Предполагая, что мы нашли по крайней мере одну точку, не являющуюся NaN, мы вычисляем координаты центроида путем деления на количество точек. Если перед роботом находится человек, то эти координаты должны дать нам приблизительное представление о том, как далеко они находятся и находятся ли они слева или справа.
if (abs(z - self.goal_z)> self.z_threshold):# Вычислим угловую составляющую движения linear_speed = (z - self.goal_z) * self.z_scale# Убедимся, что мы соответствуем нашим минимальным/максимальным характеристикам self.move_cmd.linear.x =copysign(max(self.min_linear_speed, min(self.max_linear_speed, abs(linear_speed))), linear_speed)else: self.move_cmd.linear.x *= self.slow_down_factorif (abs(x)> self.x_threshold):# Вычислим линейную составляющую движения angular_speed =-x * self.x_scale# Убедимся, что мы соответствуем нашим минимальным/максимальным характеристикам self.move_cmd.angular.z =copysign(max(self.min_angular_speed, min(self.max_angular_speed, abs(angular_speed))), angular_speed)
Если человек находится ближе или дальше, чем целевое расстояние, более чем на z_threshold, или смещение влево/вправо превышает x_threshold, установим линейную и угловую скорость робота соответствующим образом, взвешивая каждый из параметров z_scale и x_scale.
self.cmd_vel_pub.publish(self.move_cmd)
Наконец, опубликуем команду движения для перемещения (или остановки) робота.