#!/usr/bin/env python
import rospy
from sensor_msgs.msg import RegionOfInterest, CameraInfo
from geometry_msgs.msg import Twist
import thread
class ObjectTracker():
def __init__(self):
rospy.init_node("object_tracker")
# Вызовем функцию выключения (остановите робота)
rospy.on_shutdown(self.shutdown)
# Как часто мы должны обновлять движение робота?
self.rate = rospy.get_param("~rate", 10)
r = rospy.Rate(self.rate)
# Максимальная скорость вращения в рад/с
self.max_rotation_speed = rospy.get_param("~max_rotation_speed", 2.0)
# Минимальная скорость вращения в рад/с
self.min_rotation_speed = rospy.get_param("~min_rotation_speed", 0.5)
# Чувствительность к целевым смещениям. Установка слишком высокого
# уровня может привести к колебаниям робота.
self.gain = rospy.get_param("~gain", 2.0)
# x threshold (%от ширины изображения) указывает, как
# далеко от центра ROI должен быть в направлении x,
# прежде чем мы отреагируем
self.x_threshold = rospy.get_param("~x_threshold", 0.1)
# Publisher для управления движения робота
self.cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=5)
# Инициализируем команду перемещения
self.move_cmd = Twist()
# Получаем блокировку для обновлений значений self.move_cmd
self.lock = thread.allocate_lock()
# Получим ширину и высоту изображения из топика camera_info
self.image_width = 0
self.image_height = 0
# Установим флаг, указывающий, когда ROI прекращает обновления
self.target_visible = False
# Ждем пока топик camera_info станет доступным
rospy.loginfo("Waiting for camera_info topic...")
rospy.wait_for_message('camera_info', CameraInfo)
# Подпишемся на топик camera_info, чтобы получить ширину и высоту изображения
rospy.Subscriber('camera_info', CameraInfo, self.get_camera_info, queue_size=1)
# Подождем, пока действительно не получим данные с камеры
while self.image_width == 0 or self.image_height == 0:
rospy.sleep(1)
# Подпишемся на топик ROI и установим обратный вызов(callback) для обновления движения робота
rospy.Subscriber('roi', RegionOfInterest, self.set_cmd_vel, queue_size=1)
# Ждем пока не получим сообщений от ROI
rospy.loginfo("Waiting for messages on /roi...")
rospy.wait_for_message('roi', RegionOfInterest)
rospy.loginfo("ROI messages detected. Starting tracker...")
# Начнем цикл отслеживания
while not rospy.is_shutdown():
# Захватим замок, пока устанавливаем скорость робота
self.lock.acquire()
try:
# Если цель не видна, остановим робота
if not self.target_visible:
self.move_cmd = Twist()
else:
# Сбросим флаг на False по умолчанию
self.target_visible = False
# Отправим команду Twist(поворота) роботу
self.cmd_vel_pub.publish(self.move_cmd)
finally:
# Снимем блокировку
self.lock.release()
# Сон на 1/self.rate секунд
r.sleep()
def set_cmd_vel(self, msg):
# Захватим замок, пока устанавливаем скорость робота
self.lock.acquire()
try:
# Если ROI имеет ширину или высоту 0, мы потеряли цель
if msg.width == 0 or msg.height == 0:
self.target_visible = False
return
# Если ROI перестанет обновлять, следующее не произойдет
self.target_visible = True
# Вычислим смещение ROI от центра изображения
target_offset_x = msg.x_offset + msg.width / 2 - self.image_width / 2
try:
percent_offset_x = float(target_offset_x) / (float(self.image_width) / 2.0)
except:
percent_offset_x = 0
# Вращаем робота только в том случае, если смещение цели
# превышает пороговое значение
if abs(percent_offset_x) > self.x_threshold:
# Установим скорость вращения,
# пропорциональную перемещению мишени
try:
speed = self.gain * percent_offset_x
if speed < 0:
direction = -1
else:
direction = 1
self.move_cmd.angular.z = -direction * max(self.min_rotation_speed,
min(self.max_rotation_speed, abs(speed)))
except:
self.move_cmd = Twist()
else:
# Иначе остановим робота
self.move_cmd = Twist()
finally:
# Снимем блокировку
self.lock.release()
def get_camera_info(self, msg):
self.image_width = msg.width
self.image_height = msg.height
def shutdown(self):
rospy.loginfo("Stopping the robot...")
self.cmd_vel_pub.publish(Twist())
rospy.sleep(1)
if __name__ == '__main__':
try:
ObjectTracker()
rospy.spin()
except rospy.ROSInterruptException:
rospy.loginfo("Object tracking node terminated.")
К этому моменту в книге скрипт, вероятно, будет довольно понятным. Однако давайте пробежимся по ключевым линиям. Имейте в виду общую цель приложения tracker: мы хотим отслеживать топик /roi для любого изменения положения цели слева или справа от центра обзора камеры. Затем мы будем поворачивать робота в нужном направлении, чтобы компенсировать это.
# Максимальная скорость вращения в рад/c
self.max_rotation_speed = rospy.get_param("~max_rotation_speed", 2.0)
# Минимальная скорость вращения в рад/с
self.min_rotation_speed = rospy.get_param("~min_rotation_speed", 0.5)
При управлении мобильным роботом всегда полезно установить максимальную скорость. Установка минимальной скорости также может гарантировать, что собственный вес и трение не будут мешать роботу при попытке двигаться слишком медленно.
self.gain = rospy.get_param("~gain", 2.0)
Большинство циклов обратной связи(feedback loops) требуют параметра усиления для управления тем, как быстро мы хотим, чтобы система реагировала на смещения цели из нейтральной точки. В нашем случае параметр усиления будет определять, насколько быстро робот реагирует на перемещение цели в сторону от центра обзора.
Мы не хотим, чтобы робот разряжал свою батарею, преследуя каждое крошечное движение цели. Поэтому мы устанавливаем пороговое значение горизонтального смещения цели, на которое робот будет реагировать. В этом случае порог задается в процентах (0,05 = 5%) от ширины изображения.
# Получаем блокировку для обновлений значений self.move_cmd
self.lock = thread.allocate_lock()
Функции обратного вызова(callback), назначенные подписчикам ROS, выполняются в отдельном потоке от основной программы. Поскольку мы будем изменять скорость вращения робота как в функции обратного вызова ROI, так и в основном цикле программы, нам нужно использовать блокировку, чтобы сделать общий поток скрипта безопасным. Выше мы видим, как реализовать блокировку.
self.target_visible = False
Если цель потеряна (например, выходит из поля зрения), мы хотим, чтобы робот остановился. Поэтому мы будем использовать флаг, чтобы указать, когда цель видна или нет.
Вместо того, чтобы жестко закодировать видео в программу, мы можем получить его динамически из соответствующего топика camera_info. Фактическое имя этого топика отображается в файле запуска object_tracker.launch. В случае камеры Kinect или Xtion, управляемой узлом OpenNI, обычно используется имя темы /camera/rgb/camera_info. Функция обратного вызова(callback) self.get_camera_info (определенный позже в скрипте) просто устанавливает глобальные переменные self.image_width и self.image_height из сообщений camera_info.
Здесь мы подписываемся на топик /roi и устанавливаем функцию обратного вызова set_cmd_vel(), которая установит команду Twist для отправки роботу при изменении положения цели.
# Захватим замок, пока устанавливаем скорость робота
self.lock.acquire()
try:
# Если цель не видна, остановим робота
if not self.target_visible:
self.move_cmd = Twist()
else:
# Сбросим флаг на False по умолчанию
self.target_visible = False
# Отправим команду Twist(поворота) роботу
self.cmd_vel_pub.publish(self.move_cmd)
finally:
# Снимем блокировку
self.lock.release()
# Сон на 1/self.rate секунд
r.sleep()
Это наш основной цикл управления. Сначала мы получаем блокировку для защиты двух глобальных переменных self.move_cmd и self.target_visible, так как обе переменные также могут быть изменены в функции обратного вызова set_cmd_vel (). Затем мы проверяем, видна ли еще цель. Если нет, мы остановим робота, установив команду движения на пустое сообщение Twist. В противном случае мы устанавливаем флаг target_visible обратно в False (безопасное значение по умолчанию), а затем публикуем текущую команду перемещения, заданную в обратном вызове set_cmd_vel (), описанном далее. Наконец, мы освобождаем замок и спим в течение одного цикла.
def set_cmd_vel(self, msg):
# Захватим замок, пока устанавливаем скорость робота
self.lock.acquire()
try:
# Если ROI имеет ширину или высоту 0, мы потеряли цель
if msg.width == 0 or msg.height == 0:
self.target_visible = False
return
# Если ROI перестанет обновлять, следующее не произойдет
self.target_visible = True
Обратный вызов set_cmd_vel () срабатывает всякий раз, когда появляется новое сообщение в топике /roi. Первое, что нужно проверить, - это то, что ни ширина, ни высота ROI не равны нулю. Если это так, то цель, вероятно, потеряна, поэтому мы немедленно возвращаемся к главному циклу, который остановит вращение робота. В противном случае мы устанавливаем флаг target_visible в True, чтобы робот реагировал на положение цели.
Прежде, чем мы сможем определить, как робот должен двигаться, мы вычисляем смещение цели от центра изображения камеры. Напомним, что поле x_offset в сообщении ROI указывает координату x верхнего левого угла области, поэтому для нахождения центра области мы добавляем половину ширины. Затем, чтобы найти смещение от центра изображения, мы вычитаем половину ширины изображения. С помощью смещения, вычисленного в пикселях, мы получаем смещение в виде доли ширины изображения. Блок try-catch гарантирует, что мы поймаем попытку разделить на ноль, которая может произойти, если тема camera_info ошибется и отправит нам значение image_width равное 0.
if abs(percent_offset_x) > self.x_threshold:
# Установим скорость вращения,
# пропорциональную перемещению мишени
try:
speed = self.gain * percent_offset_x
if speed < 0:
direction = -1
else:
direction = 1
self.move_cmd.angular.z = -direction * max(self.min_rotation_speed,
min(self.max_rotation_speed, abs(speed)))
except:
self.move_cmd = Twist()
else:
# Иначе остановим робота
self.move_cmd = Twist()
Наконец, мы вычисляем скорость вращения робота, пропорциональную смещению цели, где множитель является параметром self.gain. Если целевое смещение не превышает параметра x_threshold, мы устанавливаем команду движения на пустое сообщение Twist, чтобы остановить робота (или сохранить его остановленным).