Подглава 11.2.3 Понимание кода отслеживания объектов
Прежде чем запускать приложение отслеживания на реальном роботе, давайте взглянем на код
Ссылка на источник: object_tracker.py
#!/usr/bin/env python
import rospy
from sensor_msgs.msg import RegionOfInterest, CameraInfo
from geometry_msgs.msg import Twist
import thread
class ObjectTracker():
def __init__(self):
rospy.init_node("object_tracker")
# Вызовем функцию выключения (остановите робота)
rospy.on_shutdown(self.shutdown)
# Как часто мы должны обновлять движение робота?
self.rate = rospy.get_param("~rate", 10)
r = rospy.Rate(self.rate)
# Максимальная скорость вращения в рад/с
self.max_rotation_speed = rospy.get_param("~max_rotation_speed", 2.0)
# Минимальная скорость вращения в рад/с
self.min_rotation_speed = rospy.get_param("~min_rotation_speed", 0.5)
# Чувствительность к целевым смещениям. Установка слишком высокого
# уровня может привести к колебаниям робота.
self.gain = rospy.get_param("~gain", 2.0)
# x threshold (%от ширины изображения) указывает, как
# далеко от центра ROI должен быть в направлении x,
# прежде чем мы отреагируем
self.x_threshold = rospy.get_param("~x_threshold", 0.1)
# Publisher для управления движения робота
self.cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=5)
# Инициализируем команду перемещения
self.move_cmd = Twist()
# Получаем блокировку для обновлений значений self.move_cmd
self.lock = thread.allocate_lock()
# Получим ширину и высоту изображения из топика camera_info
self.image_width = 0
self.image_height = 0
# Установим флаг, указывающий, когда ROI прекращает обновления
self.target_visible = False
# Ждем пока топик camera_info станет доступным
rospy.loginfo("Waiting for camera_info topic...")
rospy.wait_for_message('camera_info', CameraInfo)
# Подпишемся на топик camera_info, чтобы получить ширину и высоту изображения
rospy.Subscriber('camera_info', CameraInfo, self.get_camera_info, queue_size=1)
# Подождем, пока действительно не получим данные с камеры
while self.image_width == 0 or self.image_height == 0:
rospy.sleep(1)
# Подпишемся на топик ROI и установим обратный вызов(callback) для обновления движения робота
rospy.Subscriber('roi', RegionOfInterest, self.set_cmd_vel, queue_size=1)
# Ждем пока не получим сообщений от ROI
rospy.loginfo("Waiting for messages on /roi...")
rospy.wait_for_message('roi', RegionOfInterest)
rospy.loginfo("ROI messages detected. Starting tracker...")
# Начнем цикл отслеживания
while not rospy.is_shutdown():
# Захватим замок, пока устанавливаем скорость робота
self.lock.acquire()
try:
# Если цель не видна, остановим робота
if not self.target_visible:
self.move_cmd = Twist()
else:
# Сбросим флаг на False по умолчанию
self.target_visible = False
# Отправим команду Twist(поворота) роботу
self.cmd_vel_pub.publish(self.move_cmd)
finally:
# Снимем блокировку
self.lock.release()
# Сон на 1/self.rate секунд
r.sleep()
def set_cmd_vel(self, msg):
# Захватим замок, пока устанавливаем скорость робота
self.lock.acquire()
try:
# Если ROI имеет ширину или высоту 0, мы потеряли цель
if msg.width == 0 or msg.height == 0:
self.target_visible = False
return
# Если ROI перестанет обновлять, следующее не произойдет
self.target_visible = True
# Вычислим смещение ROI от центра изображения
target_offset_x = msg.x_offset + msg.width / 2 - self.image_width / 2
try:
percent_offset_x = float(target_offset_x) / (float(self.image_width) / 2.0)
except:
percent_offset_x = 0
# Вращаем робота только в том случае, если смещение цели
# превышает пороговое значение
if abs(percent_offset_x) > self.x_threshold:
# Установим скорость вращения,
# пропорциональную перемещению мишени
try:
speed = self.gain * percent_offset_x
if speed < 0:
direction = -1
else:
direction = 1
self.move_cmd.angular.z = -direction * max(self.min_rotation_speed,
min(self.max_rotation_speed, abs(speed)))
except:
self.move_cmd = Twist()
else:
# Иначе остановим робота
self.move_cmd = Twist()
finally:
# Снимем блокировку
self.lock.release()
def get_camera_info(self, msg):
self.image_width = msg.width
self.image_height = msg.height
def shutdown(self):
rospy.loginfo("Stopping the robot...")
self.cmd_vel_pub.publish(Twist())
rospy.sleep(1)
if __name__ == '__main__':
try:
ObjectTracker()
rospy.spin()
except rospy.ROSInterruptException:
rospy.loginfo("Object tracking node terminated.")
К этому моменту в книге скрипт, вероятно, будет довольно понятным. Однако давайте пробежимся по ключевым линиям. Имейте в виду общую цель приложения tracker: мы хотим отслеживать топик /roi для любого изменения положения цели слева или справа от центра обзора камеры. Затем мы будем поворачивать робота в нужном направлении, чтобы компенсировать это.
# Максимальная скорость вращения в рад/c
self.max_rotation_speed = rospy.get_param("~max_rotation_speed", 2.0)
# Минимальная скорость вращения в рад/с
self.min_rotation_speed = rospy.get_param("~min_rotation_speed", 0.5)
При управлении мобильным роботом всегда полезно установить максимальную скорость. Установка минимальной скорости также может гарантировать, что собственный вес и трение не будут мешать роботу при попытке двигаться слишком медленно.
self.gain = rospy.get_param("~gain", 2.0)
Большинство циклов обратной связи(feedback loops) требуют параметра усиления для управления тем, как быстро мы хотим, чтобы система реагировала на смещения цели из нейтральной точки. В нашем случае параметр усиления будет определять, насколько быстро робот реагирует на перемещение цели в сторону от центра обзора.
self.x_threshold = rospy.get_param("~x_threshold", 0.05)
Мы не хотим, чтобы робот разряжал свою батарею, преследуя каждое крошечное движение цели. Поэтому мы устанавливаем пороговое значение горизонтального смещения цели, на которое робот будет реагировать. В этом случае порог задается в процентах (0,05 = 5%) от ширины изображения.
# Получаем блокировку для обновлений значений self.move_cmd
self.lock = thread.allocate_lock()
Функции обратного вызова(callback), назначенные подписчикам ROS, выполняются в отдельном потоке от основной программы. Поскольку мы будем изменять скорость вращения робота как в функции обратного вызова ROI, так и в основном цикле программы, нам нужно использовать блокировку, чтобы сделать общий поток скрипта безопасным. Выше мы видим, как реализовать блокировку.
self.target_visible = False
Если цель потеряна (например, выходит из поля зрения), мы хотим, чтобы робот остановился. Поэтому мы будем использовать флаг, чтобы указать, когда цель видна или нет.
rospy.Subscriber('camera_info', CameraInfo, self.get_camera_info, queue_size=1)
Вместо того, чтобы жестко закодировать видео в программу, мы можем получить его динамически из соответствующего топика camera_info. Фактическое имя этого топика отображается в файле запуска object_tracker.launch. В случае камеры Kinect или Xtion, управляемой узлом OpenNI, обычно используется имя темы /camera/rgb/camera_info. Функция обратного вызова(callback) self.get_camera_info (определенный позже в скрипте) просто устанавливает глобальные переменные self.image_width и self.image_height из сообщений camera_info.
rospy.Subscriber('roi', RegionOfInterest, self.set_cmd_vel, queue_size=1)
Здесь мы подписываемся на топик /roi и устанавливаем функцию обратного вызова set_cmd_vel(), которая установит команду Twist для отправки роботу при изменении положения цели.
# Захватим замок, пока устанавливаем скорость робота
self.lock.acquire()
try:
# Если цель не видна, остановим робота
if not self.target_visible:
self.move_cmd = Twist()
else:
# Сбросим флаг на False по умолчанию
self.target_visible = False
# Отправим команду Twist(поворота) роботу
self.cmd_vel_pub.publish(self.move_cmd)
finally:
# Снимем блокировку
self.lock.release()
# Сон на 1/self.rate секунд
r.sleep()
Это наш основной цикл управления. Сначала мы получаем блокировку для защиты двух глобальных переменных self.move_cmd и self.target_visible, так как обе переменные также могут быть изменены в функции обратного вызова set_cmd_vel (). Затем мы проверяем, видна ли еще цель. Если нет, мы остановим робота, установив команду движения на пустое сообщение Twist. В противном случае мы устанавливаем флаг target_visible обратно в False (безопасное значение по умолчанию), а затем публикуем текущую команду перемещения, заданную в обратном вызове set_cmd_vel (), описанном далее. Наконец, мы освобождаем замок и спим в течение одного цикла.
def set_cmd_vel(self, msg):
# Захватим замок, пока устанавливаем скорость робота
self.lock.acquire()
try:
# Если ROI имеет ширину или высоту 0, мы потеряли цель
if msg.width == 0 or msg.height == 0:
self.target_visible = False
return
# Если ROI перестанет обновлять, следующее не произойдет
self.target_visible = True
Обратный вызов set_cmd_vel () срабатывает всякий раз, когда появляется новое сообщение в топике /roi. Первое, что нужно проверить, - это то, что ни ширина, ни высота ROI не равны нулю. Если это так, то цель, вероятно, потеряна, поэтому мы немедленно возвращаемся к главному циклу, который остановит вращение робота. В противном случае мы устанавливаем флаг target_visible в True, чтобы робот реагировал на положение цели.
target_offset_x = msg.x_offset + msg.width / 2 - self.image_width / 2
try:
percent_offset_x = float(target_offset_x) / (float(self.image_width) / 2.0)
except:
percent_offset_x = 0
Прежде, чем мы сможем определить, как робот должен двигаться, мы вычисляем смещение цели от центра изображения камеры. Напомним, что поле x_offset в сообщении ROI указывает координату x верхнего левого угла области, поэтому для нахождения центра области мы добавляем половину ширины. Затем, чтобы найти смещение от центра изображения, мы вычитаем половину ширины изображения. С помощью смещения, вычисленного в пикселях, мы получаем смещение в виде доли ширины изображения. Блок try-catch гарантирует, что мы поймаем попытку разделить на ноль, которая может произойти, если тема camera_info ошибется и отправит нам значение image_width равное 0.
if abs(percent_offset_x) > self.x_threshold:
# Установим скорость вращения,
# пропорциональную перемещению мишени
try:
speed = self.gain * percent_offset_x
if speed < 0:
direction = -1
else:
direction = 1
self.move_cmd.angular.z = -direction * max(self.min_rotation_speed,
min(self.max_rotation_speed, abs(speed)))
except:
self.move_cmd = Twist()
else:
# Иначе остановим робота
self.move_cmd = Twist()
Наконец, мы вычисляем скорость вращения робота, пропорциональную смещению цели, где множитель является параметром self.gain. Если целевое смещение не превышает параметра x_threshold, мы устанавливаем команду движения на пустое сообщение Twist, чтобы остановить робота (или сохранить его остановленным).
Last updated
Was this helpful?