Подглава 11.4.2 Понимание скрипта Follower

Let's now take a look at the follower code.

Link to source: follower.py

#!/usr/bin/env python


import rospy
from roslib import message
from sensor_msgs import point_cloud2
from sensor_msgs.msg import PointCloud2
from geometry_msgs.msg import Twist
from math import copysign

class Follower():
    def __init__(self):
        rospy.init_node("follower")
        
        # Вызовем функцию выключения (остановим робота)
        rospy.on_shutdown(self.shutdown)
        
        # Размеры (в метрах) области, в которой мы будем искать человека. 
        # Они задаются в координатах камеры, где x-влево/вправо,
        # y-вверх/вниз, а z-глубина (вперед/назад)
        self.min_x = rospy.get_param("~min_x", -0.2)
        self.max_x = rospy.get_param("~max_x", 0.2)
        self.min_y = rospy.get_param("~min_y", -0.3)
        self.max_y = rospy.get_param("~max_y", 0.5)
        self.max_z = rospy.get_param("~max_z", 1.2)
        
        # Целевое расстояние (в метрах), которое должно держаться между роботом и человеком
        self.goal_z = rospy.get_param("~goal_z", 0.6)
        
        # Максимальная разницы между целевым и реальным расстоянем до цели до того, как робот отреагирует
        self.z_threshold = rospy.get_param("~z_threshold", 0.05)
        
        # Насколько далеко от центра (смещение x) может находиться человек 
        # до того, как робот отреагирует
        self.x_threshold = rospy.get_param("~x_threshold", 0.05)
        
        # Как мы взвешиваем целевое расстояние, когда совершаем движение
        self.z_scale = rospy.get_param("~z_scale", 1.0)

        # Как мы взвешивем отклонение человека влево/вправо, когда совершаем движение  
        self.x_scale = rospy.get_param("~x_scale", 2.5)
        
        # Максимальная скорость вращения в рад/с
        self.max_angular_speed = rospy.get_param("~max_angular_speed", 2.0)
        
        # Минимальная скорость вращения в рад/с
        self.min_angular_speed = rospy.get_param("~min_angular_speed", 0.0)
        
        # Максимальная линейная скорость в м/c
        self.max_linear_speed = rospy.get_param("~max_linear_speed", 0.3)
        
        # Минимальная линейная скорость в м/c
        self.min_linear_speed = rospy.get_param("~min_linear_speed", 0.1)
        
        # Фактор замедления при остановке
        self.slow_down_factor = rospy.get_param("~slow_down_factor", 0.8)
        
        # Инициализируем команду перемещения
        self.move_cmd = Twist()

        # Publisher для управления движения робота
        self.cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=5)

        # Подписка на облако точек
        self.depth_subscriber = rospy.Subscriber('point_cloud', PointCloud2, self.set_cmd_vel, queue_size=1)

        rospy.loginfo("Subscribing to point cloud...")
        
        # Ждем топик pointcloud, чтобы он стал доступным
        rospy.wait_for_message('point_cloud', PointCloud2)

        rospy.loginfo("Ready to follow!")
        
    def set_cmd_vel(self, msg):
        # Инициализация счетчика точек координат центроида
        x = y = z = n = 0

        # Считаем в координатах x, y, z все точки в облаке
        for point in point_cloud2.read_points(msg, skip_nans=True):
            pt_x = point[0]
            pt_y = point[1]
            pt_z = point[2]
            
            # Учитываем только эти точки в пределах наших обозначенных границ и суммируем их
            if -pt_y > self.min_y and -pt_y < self.max_y and pt_x < self.max_x and pt_x > self.min_x and pt_z < self.max_z:
                x += pt_x
                y += pt_y
                z += pt_z
                n += 1
        
       # Если у нас есть точки, вычислим координаты центроида
        if n:
            x /= n 
            y /= n 
            z /= n
            
            # Проверим наши предельные отклонения
            if (abs(z - self.goal_z) > self.z_threshold):
                # Вычислим угловую составляющую движения
                linear_speed = (z - self.goal_z) * self.z_scale
                
                # Убедимся, что мы соответствуем нашим минимальным/максимальным характеристикам
                self.move_cmd.linear.x = copysign(max(self.min_linear_speed, 
                                        min(self.max_linear_speed, abs(linear_speed))), linear_speed)
            else:
                self.move_cmd.linear.x *= self.slow_down_factor
                
            if (abs(x) > self.x_threshold):     
                # Вычислим линейную составляющую движения
                angular_speed = -x * self.x_scale
                
                # Убедимся, что мы соответствуем нашим минимальным/максимальным характеристикам
                self.move_cmd.angular.z = copysign(max(self.min_angular_speed, 
                                        min(self.max_angular_speed, abs(angular_speed))), angular_speed)
            else:
                # Плавно остановим вращение
                self.move_cmd.angular.z *= self.slow_down_factor
                
        else:
            # Плавно остановим робота
            self.move_cmd.linear.x *= self.slow_down_factor
            self.move_cmd.angular.z *= self.slow_down_factor
            
        # Опубликуем команду перемещения
        self.cmd_vel_pub.publish(self.move_cmd)

        
    def shutdown(self):
        rospy.loginfo("Stopping the robot...")
        
        # Отмените регистрацию подписчика, чтобы остановить публикацию cmd_vel
        self.depth_subscriber.unregister()
        rospy.sleep(1)
        
        # Отправим пустое Twist-сообщение, чтобы остановить робота
        self.cmd_vel_pub.publish(Twist())
        rospy.sleep(1)        
                   
if __name__ == '__main__':
    try:
        Follower()
        rospy.spin()
    except rospy.ROSInterruptException:
        rospy.loginfo("Follower node terminated.")

The overall strategy behind the script is fairly simple. First, sample all the points in the depth cloud that lie within a search box in front of the robot. From those points, compute the centroid of the region; i.e. the average x, y and z value for all the points. If there is a person in front of the robot, the z-coordinate of the centroid tells us how far away they are and the x-coordinate reflects whether they are to the right or left. From these two numbers we can compute an appropriate Twist message to keep the robot near the person

Общая стратегия, лежащая в основе скрипта, довольно проста. Во-первых, проверим все точки в облаке глубины, которые лежат в поле поиска перед роботом. Из этих точек вычислим центроид области, т. е. среднее значение x, y и z для всех точек. Если перед роботом находится человек, то z-координата центроида говорит нам, как далеко они находятся, а X-координата отражает, находятся ли они справа или слева. Из этих двух чисел мы можем вычислить соответствующее Twist-сообщение, чтобы держать робота рядом с человеком.

Давайте теперь посмотрим на ключевые строки сценария.

from roslib import message
from sensor_msgs import point_cloud2
from sensor_msgs.msg import PointCloud2

Чтобы получить доступ к точкам в облаке глубины, нам нужен класс message из roslib и библиотека point_cloud2 из пакета Ros sensor_msgs. Нам также нужен тип сообщения PointCloud2.

Длинный список параметров должен быть достаточно понятным из комментариев в коде. Сердцем скрипта является обратный вызов set_cmd_vel() в топике облака точек:

self.depth_subscriber = rospy.Subscriber('point_cloud', PointCloud2, self.set_cmd_vel, queue_size=1)

Обратите внимание, как мы используем общее имя топика ('point_cloud') в операторе подписчика. Это позволяет нам переназначить топик облака в launch-файле. Обычно мы будем использовать топик /camera/depth_registered / points, но если мы предварительно отфильтруем облако, мы можем использовать другую тему.

    def set_cmd_vel(self, msg):
        # Инициализация счетчика точек координат центроида
        x = y = z = n = 0

Каждый раз, когда мы получаем сообщение в топике point_cloud, мы начинаем обратный вызов set_cmd_vel, обнуляя координаты центроида и количество точек.

        for point in point_cloud2.read_points(msg, skip_nans=True):
            pt_x = point[0]
            pt_y = point[1]
            pt_z = point[2]

Here we use the point_cloud2 library to cycle through all the points in the cloud. The skip_nans parameter is handy since a NaN (not a number) can occur when the point is inside or outside the camera's depth range.

Здесь мы используем библиотеку point_cloud2 для перебора всех точек в облаке. Параметр skip_nans нужен, так как NaN (не число) может возникать, когда точка находится внутри или вне диапазона глубины камеры.

            if -pt_y > self.min_y and -pt_y < self.max_y and pt_x < self.max_x and pt_x > self.min_x and pt_z < self.max_z:
                x += pt_x
                y += pt_y
                z += pt_z
                n += 1

Для каждой точки в облачном сообщении мы проверяем, попадает ли она в область поиска. Если это так, добавим его координаты x, y и z к суммам центроидов и увеличьте количество точек.

        self.move_cmd = Twist()

Инициализируем команду перемещения в пустое сообщение Twist, которое по умолчанию остановит робота.

        if n:
            x /= n 
            y /= n 
            z /= n

Предполагая, что мы нашли по крайней мере одну точку, не являющуюся NaN, мы вычисляем координаты центроида путем деления на количество точек. Если перед роботом находится человек, то эти координаты должны дать нам приблизительное представление о том, как далеко они находятся и находятся ли они слева или справа.

            if (abs(z - self.goal_z) > self.z_threshold):
                # Вычислим угловую составляющую движения
                linear_speed = (z - self.goal_z) * self.z_scale
                
                # Убедимся, что мы соответствуем нашим минимальным/максимальным характеристикам
                self.move_cmd.linear.x = copysign(max(self.min_linear_speed, 
                                        min(self.max_linear_speed, abs(linear_speed))), linear_speed)
            else:
                self.move_cmd.linear.x *= self.slow_down_factor
                
            if (abs(x) > self.x_threshold):     
                # Вычислим линейную составляющую движения
                angular_speed = -x * self.x_scale
                
                # Убедимся, что мы соответствуем нашим минимальным/максимальным характеристикам
                self.move_cmd.angular.z = copysign(max(self.min_angular_speed, 
                                        min(self.max_angular_speed, abs(angular_speed))), angular_speed)

Если человек находится ближе или дальше, чем целевое расстояние, более чем на z_threshold, или смещение влево/вправо превышает x_threshold, установим линейную и угловую скорость робота соответствующим образом, взвешивая каждый из параметров z_scale и x_scale.

        self.cmd_vel_pub.publish(self.move_cmd)

Наконец, опубликуем команду движения для перемещения (или остановки) робота.

Last updated