Подглава 11.2.3 Понимание кода отслеживания объектов

Прежде чем запускать приложение отслеживания на реальном роботе, давайте взглянем на код

Ссылка на источник: object_tracker.py

#!/usr/bin/env python
 
 
import rospy
from sensor_msgs.msg import RegionOfInterest, CameraInfo
from geometry_msgs.msg import Twist
import thread
 
class ObjectTracker():
    def __init__(self):
        rospy.init_node("object_tracker")
               
        # Вызовем функцию выключения (остановите робота)
        rospy.on_shutdown(self.shutdown)
      
        # Как часто мы должны обновлять движение робота?
        self.rate = rospy.get_param("~rate", 10)
        r = rospy.Rate(self.rate)
       
        # Максимальная скорость вращения в рад/с
        self.max_rotation_speed = rospy.get_param("~max_rotation_speed", 2.0)
       
        # Минимальная скорость вращения в рад/с
        self.min_rotation_speed = rospy.get_param("~min_rotation_speed", 0.5)
       
        # Чувствительность к целевым смещениям. Установка слишком высокого 
        # уровня может привести к колебаниям робота. 
        self.gain = rospy.get_param("~gain", 2.0)
      
        # x threshold (%от ширины изображения) указывает, как
        # далеко от центра ROI должен быть в направлении x,
        # прежде чем мы отреагируем
        self.x_threshold = rospy.get_param("~x_threshold", 0.1)
 
        # Publisher для управления движения робота
        self.cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=5)
       
        # Инициализируем команду перемещения
        self.move_cmd = Twist()
       
        # Получаем блокировку для обновлений значений self.move_cmd
        self.lock = thread.allocate_lock()
       
        # Получим ширину и высоту изображения из топика camera_info
        self.image_width = 0
        self.image_height = 0
       
        # Установим флаг, указывающий, когда ROI прекращает обновления
        self.target_visible = False
       
        # Ждем пока топик camera_info станет доступным
        rospy.loginfo("Waiting for camera_info topic...")
        rospy.wait_for_message('camera_info', CameraInfo)
       
        # Подпишемся на топик camera_info, чтобы получить ширину и высоту изображения
        rospy.Subscriber('camera_info', CameraInfo, self.get_camera_info, queue_size=1)
 
        # Подождем, пока действительно не получим данные с камеры 
        while self.image_width == 0 or self.image_height == 0:
            rospy.sleep(1)
                   
        # Подпишемся на топик ROI и установим обратный вызов(callback) для обновления движения робота
        rospy.Subscriber('roi', RegionOfInterest, self.set_cmd_vel, queue_size=1)
       
        # Ждем пока не получим сообщений от ROI
        rospy.loginfo("Waiting for messages on /roi...")
        rospy.wait_for_message('roi', RegionOfInterest)
       
        rospy.loginfo("ROI messages detected. Starting tracker...")
       
        # Начнем цикл отслеживания
        while not rospy.is_shutdown():
            # Захватим замок, пока устанавливаем скорость робота
            self.lock.acquire()
           
            try:
                # Если цель не видна, остановим робота
                if not self.target_visible:
                    self.move_cmd = Twist()
                else:
                    # Сбросим флаг на False по умолчанию
                    self.target_visible = False
                   
                # Отправим команду Twist(поворота) роботу
                self.cmd_vel_pub.publish(self.move_cmd)
               
            finally:
                # Снимем блокировку
                self.lock.release()
               
            # Сон на 1/self.rate секунд
            r.sleep()
 
    def set_cmd_vel(self, msg):
        # Захватим замок, пока устанавливаем скорость робота
        self.lock.acquire()
       
        try:
            # Если ROI имеет ширину или высоту 0, мы потеряли цель
            if msg.width == 0 or msg.height == 0:
                self.target_visible = False
                return
           
            # Если ROI перестанет обновлять, следующее не произойдет
            self.target_visible = True
   
            # Вычислим смещение ROI от центра изображения
            target_offset_x = msg.x_offset + msg.width / 2 - self.image_width / 2
   
            try:
                percent_offset_x = float(target_offset_x) / (float(self.image_width) / 2.0)
            except:
                percent_offset_x = 0
   
            # Вращаем робота только в том случае, если смещение цели 
            # превышает пороговое значение
            if abs(percent_offset_x) > self.x_threshold:
                # Установим скорость вращения, 
                # пропорциональную перемещению мишени           
     		 try:
                    speed = self.gain * percent_offset_x
                    if speed < 0:
                        direction = -1
                    else:
                        direction = 1
                    self.move_cmd.angular.z = -direction * max(self.min_rotation_speed,
                                                min(self.max_rotation_speed, abs(speed)))
                except:
                    self.move_cmd = Twist()
            else:
                # Иначе остановим робота
                self.move_cmd = Twist()
 
        finally:
            # Снимем блокировку
            self.lock.release()
 
    def get_camera_info(self, msg):
        self.image_width = msg.width
        self.image_height = msg.height
 
    def shutdown(self):
        rospy.loginfo("Stopping the robot...")
        self.cmd_vel_pub.publish(Twist())
        rospy.sleep(1)    
 
if __name__ == '__main__':
    try:
        ObjectTracker()
        rospy.spin()
    except rospy.ROSInterruptException:
        rospy.loginfo("Object tracking node terminated.")

К этому моменту в книге скрипт, вероятно, будет довольно понятным. Однако давайте пробежимся по ключевым линиям. Имейте в виду общую цель приложения tracker: мы хотим отслеживать топик /roi для любого изменения положения цели слева или справа от центра обзора камеры. Затем мы будем поворачивать робота в нужном направлении, чтобы компенсировать это.

        # Максимальная скорость вращения в рад/c
        self.max_rotation_speed = rospy.get_param("~max_rotation_speed", 2.0)
       
        # Минимальная скорость вращения в рад/с
        self.min_rotation_speed = rospy.get_param("~min_rotation_speed", 0.5)

При управлении мобильным роботом всегда полезно установить максимальную скорость. Установка минимальной скорости также может гарантировать, что собственный вес и трение не будут мешать роботу при попытке двигаться слишком медленно.

        self.gain = rospy.get_param("~gain", 2.0)

Большинство циклов обратной связи(feedback loops) требуют параметра усиления для управления тем, как быстро мы хотим, чтобы система реагировала на смещения цели из нейтральной точки. В нашем случае параметр усиления будет определять, насколько быстро робот реагирует на перемещение цели в сторону от центра обзора.

        self.x_threshold = rospy.get_param("~x_threshold", 0.05)

Мы не хотим, чтобы робот разряжал свою батарею, преследуя каждое крошечное движение цели. Поэтому мы устанавливаем пороговое значение горизонтального смещения цели, на которое робот будет реагировать. В этом случае порог задается в процентах (0,05 = 5%) от ширины изображения.

        # Получаем блокировку для обновлений значений self.move_cmd
        self.lock = thread.allocate_lock()

Функции обратного вызова(callback), назначенные подписчикам ROS, выполняются в отдельном потоке от основной программы. Поскольку мы будем изменять скорость вращения робота как в функции обратного вызова ROI, так и в основном цикле программы, нам нужно использовать блокировку, чтобы сделать общий поток скрипта безопасным. Выше мы видим, как реализовать блокировку.

        self.target_visible = False

Если цель потеряна (например, выходит из поля зрения), мы хотим, чтобы робот остановился. Поэтому мы будем использовать флаг, чтобы указать, когда цель видна или нет.

        rospy.Subscriber('camera_info', CameraInfo, self.get_camera_info, queue_size=1)

Вместо того, чтобы жестко закодировать видео в программу, мы можем получить его динамически из соответствующего топика camera_info. Фактическое имя этого топика отображается в файле запуска object_tracker.launch. В случае камеры Kinect или Xtion, управляемой узлом OpenNI, обычно используется имя темы /camera/rgb/camera_info. Функция обратного вызова(callback) self.get_camera_info (определенный позже в скрипте) просто устанавливает глобальные переменные self.image_width и self.image_height из сообщений camera_info.

        rospy.Subscriber('roi', RegionOfInterest, self.set_cmd_vel, queue_size=1)

Здесь мы подписываемся на топик /roi и устанавливаем функцию обратного вызова set_cmd_vel(), которая установит команду Twist для отправки роботу при изменении положения цели.

            # Захватим замок, пока устанавливаем скорость робота
            self.lock.acquire()
           
            try:
                # Если цель не видна, остановим робота
                if not self.target_visible:
                    self.move_cmd = Twist()
                else:
                    # Сбросим флаг на False по умолчанию
                    self.target_visible = False
                   
                # Отправим команду Twist(поворота) роботу
                self.cmd_vel_pub.publish(self.move_cmd)
               
            finally:
                # Снимем блокировку
                self.lock.release()
               
            # Сон на 1/self.rate секунд
            r.sleep()

Это наш основной цикл управления. Сначала мы получаем блокировку для защиты двух глобальных переменных self.move_cmd и self.target_visible, так как обе переменные также могут быть изменены в функции обратного вызова set_cmd_vel (). Затем мы проверяем, видна ли еще цель. Если нет, мы остановим робота, установив команду движения на пустое сообщение Twist. В противном случае мы устанавливаем флаг target_visible обратно в False (безопасное значение по умолчанию), а затем публикуем текущую команду перемещения, заданную в обратном вызове set_cmd_vel (), описанном далее. Наконец, мы освобождаем замок и спим в течение одного цикла.

    def set_cmd_vel(self, msg):
        # Захватим замок, пока устанавливаем скорость робота
        self.lock.acquire()
       
        try:
            # Если ROI имеет ширину или высоту 0, мы потеряли цель
            if msg.width == 0 or msg.height == 0:
                self.target_visible = False
                return
           
            # Если ROI перестанет обновлять, следующее не произойдет
            self.target_visible = True

Обратный вызов set_cmd_vel () срабатывает всякий раз, когда появляется новое сообщение в топике /roi. Первое, что нужно проверить, - это то, что ни ширина, ни высота ROI не равны нулю. Если это так, то цель, вероятно, потеряна, поэтому мы немедленно возвращаемся к главному циклу, который остановит вращение робота. В противном случае мы устанавливаем флаг target_visible в True, чтобы робот реагировал на положение цели.

            target_offset_x = msg.x_offset + msg.width / 2 - self.image_width / 2
   
            try:
                percent_offset_x = float(target_offset_x) / (float(self.image_width) / 2.0)
            except:
                percent_offset_x = 0

Прежде, чем мы сможем определить, как робот должен двигаться, мы вычисляем смещение цели от центра изображения камеры. Напомним, что поле x_offset в сообщении ROI указывает координату x верхнего левого угла области, поэтому для нахождения центра области мы добавляем половину ширины. Затем, чтобы найти смещение от центра изображения, мы вычитаем половину ширины изображения. С помощью смещения, вычисленного в пикселях, мы получаем смещение в виде доли ширины изображения. Блок try-catch гарантирует, что мы поймаем попытку разделить на ноль, которая может произойти, если тема camera_info ошибется и отправит нам значение image_width равное 0.

            if abs(percent_offset_x) > self.x_threshold:
                # Установим скорость вращения, 
                # пропорциональную перемещению мишени           
     		 try:
                    speed = self.gain * percent_offset_x
                    if speed < 0:
                        direction = -1
                    else:
                        direction = 1
                    self.move_cmd.angular.z = -direction * max(self.min_rotation_speed,
                                                min(self.max_rotation_speed, abs(speed)))
                except:
                    self.move_cmd = Twist()
            else:
                # Иначе остановим робота
                self.move_cmd = Twist()

Наконец, мы вычисляем скорость вращения робота, пропорциональную смещению цели, где множитель является параметром self.gain. Если целевое смещение не превышает параметра x_threshold, мы устанавливаем команду движения на пустое сообщение Twist, чтобы остановить робота (или сохранить его остановленным).

Last updated