Подглава 12.11.2 Скрипт отслеживания головы

Скрипт head_tracker.py довольно длинный, но простой. Общий процесс заключается в следующем:

  • Инициализации сервоприводов.

  • Подписка на топик /roi.

  • Если /roi удаляется от центра обзора, подайте команду сервоприводам на перемещение камеры в направлении ее повторного центрирования.

  • Если /roi теряется в течение заданного периода времени, повторно центрируйте сервоприводы, чтобы защитить их от перегрева.

Для отслеживания цели скрипт использует своего рода"отслеживание скорости". Если вы переместите камеру в то место, где сейчас находится отслеживаемый объект, он может переместиться к тому времени, когда камера доберется туда. Вы можете подумать, что можете просто обновить целевую позицию камеры с высокой скоростью и, следовательно, не отставать от объекта. Однако этот вид отслеживания положения приведет к отрывистому стаккато-подобному движению камеры. Оптимальная стратегия заключается в том, чтобы всегда направлять камеру вперед цели, но регулировать скорость сервопривода так, чтобы она была пропорциональна смещению цели от центра обзора. Это приводит к гораздо более плавному движению камеры и гарантирует, что она движется быстро, если цель находится далеко от центра, и медленнее, если смещение невелико. Когда цель центрирована, скорость сервопривода будет равна нулю, и поэтому камера не будет двигаться.

Скрипт head_tracker.py немного длинноват для отображения в полном объеме, поэтому давайте рассмотрим только ключевые разделы кода. Вы можете просмотреть весь исходный файл по следующей ссылке:

Ссылка на источник: head_tracker.py

Вот теперь ключевые строки.

 rate = rospy.get_param("~rate", 10)
 r = rospy.Rate(rate)
 tick = 1.0 / rate

# Держите скорость обновления ниже - примерно 10 Гц; 
# в противном случае сервоприводы могут вести себя хаотично.
 speed_update_rate = rospy.get_param("~speed_update_rate", 10)
 speed_update_interval = 1.0 / speed_update_rate

# Насколько большие изменения нам нужны в скорости, прежде 
# чем мы отправим обновление к сервомоторам?
 self.speed_update_threshold = rospy.get_param("~speed_update_threshold", 0.01)

Мы определяем два параметра скорости в верхней части скрипта. Общий параметр скорости управляет тем, как быстро мы обновляем контур слежения, который включает в себя изменение как скорости, так и угла соединения сервоприводов в зависимости от местоположения цели. Параметр speed_update_interval обычно устанавливается ниже и определяет, как часто мы обновляем скорости сервоприводов. Единственная причина для этого заключается в том, что оказывается, что сервоприводы Dynamixel могут действовать немного хаотично, если мы слишком часто пытаемся регулировать их скорость. Частота обновления менее 10 Гц или около того, по-видимому, приводит к лучшему поведению. Мы также устанавливаем speed_update_threshold так, чтобы мы обновляли скорость сервопривода только в том случае, если вновь вычисленная скорость отличается на столько же от предыдущей скорости.

self.head_pan_joint = rospy.get_param('~head_pan_joint', 'head_pan_joint')
self.head_tilt_joint = rospy.get_param('~head_tilt_joint', 'head_tilt_joint')
self.joints = [self.head_pan_joint, self.head_tilt_joint]

Нам нужно знать название шарниров поворота и наклона в модели URDF робота. Если ваши имена шарниров отличаются от значений по умолчанию, используйте эти два параметра в файле head_tracker.launch, чтобы установить их соответствующим образом.

# Скорости шарниров задаются в радианах в секунду
 self.default_joint_speed = rospy.get_param('~default_joint_speed', 0.3)
 self.max_joint_speed = rospy.get_param('~max_joint_speed', 0.5)
# На какое расстояние впереди или позади цели (в радианах) мы должны стремиться?
 self.lead_target_angle = rospy.get_param('~lead_target_angle', 1.0)

# Пороги панорамирования/наклона указывают, какой процент окна изображения 
# ROI должен быть смещен от центра, прежде чем мы сделаем движение
self.pan_threshold = rospy.get_param('~pan_threshold', 0.025)
self.tilt_threshold = rospy.get_param('~tilt_threshold', 0.025)
# Параметр gain_pan и gain_tilt определяют, насколько чувствительны 
# движения сервопривода. Если они установлены слишком высоко, 
# это может привести к колебаниям.
self.gain_pan = rospy.get_param('~gain_pan', 1.0)
self.gain_tilt = rospy.get_param('~gain_tilt', 1.0)
# Установите ограничения на углы панорамирования и наклона
self.max_pan = rospy.get_param('~max_pan', radians(145))
self.min_pan = rospy.get_param('~min_pan', radians(-145))
self.max_tilt = rospy.get_param('~max_tilt', radians(90))
self.min_tilt = rospy.get_param('~min_tilt', radians(-90))

Далее следует список параметров для управления поведением отслеживания. Большинство параметров легко понять из добавленных комментариев. Параметры gain_pan и gain_tilt определяют, как быстро сервоприводы будут реагировать на смещение цели из поля зрения камеры. Если они установлены слишком высоко, то возникнут колебания. Если они установлены слишком низко, то движение камеры будет отставать от движущейся цели.

 self.recenter_timeout = rospy.get_param('~recenter_timeout', 5)

Параметр recenter_timeout определяет, как долго (в секундах) цель может быть потеряна, прежде чем мы отцентрируем сервоприводы. Когда цель исчезает из поля зрения, то скрипт head_tracker.py останавливает сервоприводы так, чтобы они удерживали камеру в последнем положении, которое у них было до того, как цель была потеряна. Однако это может привести к перегреву сервоприводов, если камера будет удерживаться таким образом слишком долго. Повторное центрирование сервоприводов позволяет им вернуться в нейтральное положение и остыть.

# Получим блокировку для обновления self.move_cmd значений
 self.lock = thread.allocate_lock()

Здесь мы создаем объект блокировки потока и назначаем его переменной self.lock. Нам понадобится эта блокировка, чтобы сделать нашу общую программу потокобезопасной, так как мы будем обновлять позиции шарниров и скорости в двух местах: основной текст скрипта и функция обратного вызова(callback) (определенная ниже), назначенная топику /roi. Поскольку ROS запускает отдельный поток для каждого обратного вызова подписчика, нам нужно защитить наши обновления шарниров с помощью блокировки, как мы покажем далее.

 self.init_servos()

Инициализация сервоприводов находится в функции init_servos (), которая выглядит следующим образом.

def init_servos(self):
    # Создаем словари для хранения контроллеров скорости, положения и крутящего момента
    self.servo_speed = dict()
    self.servo_position = dict()
    self.torque_enable = dict()

Сначала мы определяем три словаря Python для хранения сервоконтроллеров скорости, положения и крутящего момента.

for joint in sorted(self.joints):

Затем мы проходим через все шарниры, перечисленные в параметра self.joints. В нашем случае есть только два сервопривода с именами head_pan_joint и head_tilt_joint.

 set_speed_service = '/' + joint + '/set_speed'
 rospy.wait_for_service(set_speed_service)
 self.servo_speed[joint] = rospy.ServiceProxy(set_speed_service,
SetSpeed, persistent=True)

Напомним, что пакет dynamixel_controller использует службу set_speed для каждого сервопривода, чтобы установить их скорость. Поэтому мы подключаемся к службе set_speed для каждого сервоконтроллера. Использование аргумента persistent=True в операторе Service Proxy очень важно. В противном случае rospy придется заново подключаться к службе set_speed каждый раз, когда мы хотим отрегулировать скорость сервомотора. Поскольку мы будем постоянно обновлять скорость сервопривода во время отслеживания, мы хотим избежать этой задержки соединения.

self.servo_speed[name](self.default_joint_speed)

Как только мы подключены к службам set_speed, мы можем инициализировать каждую скорость сервопривода до скорости по умолчанию.

 torque_service = '/' + joint + '/torque_enable'
 rospy.wait_for_service(torque_service)
 self.torque_enable[name] = rospy.ServiceProxy(torque_service,
TorqueEnable)
 # Запустите каждый сервопривод в отключенном состоянии, 
 # чтобы мы могли перемещать их вручную
 self.torque_enable[name](False)

Аналогичным образом мы подключаемся к службе torque_enable для каждого сервомотора и инициализируем их в расслабленное состояние, чтобы при необходимости можно было перемещать их вручную.

 self.servo_position[name] = rospy.Publisher('/' + joint + '/command',
Float64, queue_size=5)

Контроллер положения использует издатель(publisher) ROS, а не службу, поэтому мы определяем его для каждого сервопривода. Это завершает инициализацию сервопривода.

rospy.Subscriber('roi', RegionOfInterest, self.set_joint_cmd) 

Мы предполагаем, что положение цели публикуется в топике /roi, как это будет, если мы используем любой из наших более ранних узлов зрения, таких как face tracker или camshift. Функция обратного вызова set_joint_cmd() установит скорость сервопривода и положение цели для ее отслеживания.

self.joint_state = JointState()
rospy.Subscriber('joint_states', JointState, self.update_joint_state)

Мы также следим за текущими позициями сервоприводов, подписавшись на /joint_states.

while not rospy.is_shutdown():
    # Acquire the lock
    self.lock.acquire()
    try:
        # If we have lost the target, stop the servos
        if not self.target_visible:
            self.pan_speed = 0.0
            self.tilt_speed = 0.0

            # Keep track of how long the target is lost
            target_lost_timer += tick
        else:
            self.target_visible = False
            target_lost_timer = 0.0

Это начало основного цикла отслеживания. Сначала мы получаем блокировку в начале каждого цикла обновления. Это делается для того, чтобы защитить переменную self.pan_speed, self.tilt_speed и self.target_visible, которые также модифицируются нашим обратным вызовом set_joint_cmd (). Мы используем переменную self.target_visible, чтобы указать, потеряли ли мы ROI. Эта переменная имеет значение True в обратном вызове set_joint_cmd, как мы увидим ниже. В противном случае он по умолчанию принимает значение False. Если цель потеряна, мы останавливаем сервоприводы, устанавливая их скорость на 0 и увеличиваем таймер, чтобы отслеживать, как долго цель остается потерянной. В противном случае мы используем скорости панорамирования и наклона, установленные в обратном вызове set_joint_cmd, и сбрасываем таймер на 0. Мы также устанавливаем self.target_visible флаг обратно в False, так что мы должны явно установить его в True, когда будет получено следующее сообщение ROI.

 if target_lost_timer > self.recenter_timeout:
     rospy.loginfo("Cannot find target.")
     self.center_head_servos()
     target_lost_timer = 0.0

Если цель потеряна достаточно долго, отцентрируем сервоприводы, вызвав функцию center_head_servos (), определенную позже в скрипте. Это не только предотвращает перегрев сервоприводов, но и помещает камеру в более центральное положение для повторного захвата цели.

    else:               
        # Обновляем скорость сервопривода с соответствующим интервалом
        if speed_update_timer > speed_update_interval: 
            if abs(self.last_pan_speed - self.pan_speed) > self.speed_update_threshold:
                self.set_servo_speed(self.head_pan_joint, self.pan_speed)
                self.last_pan_speed = self.pan_speed
            
            if abs(self.last_tilt_speed - self.tilt_speed) > self.speed_update_threshold:
                self.set_servo_speed(self.head_tilt_joint, self.tilt_speed)
                self.last_tilt_speed = self.tilt_speed
                                    
            speed_update_timer = 0.0
        
        # Обновим положение панорамирования   
        if self.last_pan_position != self.pan_position:
            self.set_servo_position(self.head_pan_joint, self.pan_position)
            self.last_pan_position = self.pan_position
    
        # Обновим положение наклона
        if self.last_tilt_position != self.tilt_position:
            self.set_servo_position(self.head_tilt_joint, self.tilt_position)
            self.last_tilt_position = self.tilt_position
    
    speed_update_timer += tick

finally:
    # Снимем блокировку
    self.lock.release()

Здесь мы, наконец, обновим скорость и положение сервоприводов. Сначала мы проверяем, достигли ли мы speed_update_interval. В противном случае мы оставим скорость в покое. Напомним, что мы делаем это потому, что Dynamixels могут вести себя хаотично, если мы попытаемся слишком часто обновлять их скорости. Мы также проверяем, не отличаются ли новые скорости существенно от предыдущих скоростей; в противном случае мы пропускаем обновление скорости.

Переменные self.pan_speed и self.pan_position задаются в обратном вызове set_joint_cmd(), который мы рассмотрим ниже. Обратный вызов также устанавливает углы панорамирования и наклона, self.pan_position и self.tilt_position основаны на расположении цели относительно центра обзора камеры.

В конце цикла обновления мы снимаем блокировку и спим 1/self.rate секунды.

Наконец, давайте рассмотрим функцию обратного вызова set_joint_cmd(), которая срабатывает всякий раз, когда мы получаем сообщение в топике /roi.

def set_joint_cmd(self, msg):
    # Получим блокировку
    self.lock.acquire()
    
    try:
        # Цель потеряна, если она имеет 0 высоту или ширинуt visible
        if msg.width == 0 or msg.height == 0:
            self.target_visible = False
            return
        
        # Если ROI перестанет обновлять, следующее не выполнится
        self.target_visible = True

Сначала мы получаем блокировку для защиты переменных шарнира и флага target_visible. Затем мы проверяем ширину или высоту на нуль для входящего сообщения ROI, так как это будет указывать на область с нулевой площадью. В таких случаях мы устанавливаем видимость цели на False и немедленно возвращаемся.

Если мы сделаем это после первой проверки, мы установим флаг target_visible в True. Как мы видели ранее в основном цикле программы, флаг сбрасывается в False на каждом цикле.

# Compute the displacement of the ROI from the center of the image
target_offset_x = msg.x_offset + msg.width / 2 - self.image_width / 2
target_offset_y = msg.y_offset + msg.height / 2 - self.image_height / 2

Затем мы вычисляем, насколько далеко от центра находится середина ROI. Напомним, что поля x_offset и y_offset в сообщении ROI указывают координаты верхнего левого угла ROI. (Ширина и высота изображения определяются обратным вызовом get_camera_info(), который, в свою очередь, присваивается подписчику в топике camera_info.)

try:
    percent_offset_x = float(target_offset_x) / (float(self.image_width) / 2.0)
    percent_offset_y = float(target_offset_y) / (float(self.image_height) / 2.0)
except:
    percent_offset_x = 0
    percent_offset_y = 0

Для того чтобы приспособить различные разрешения изображения, лучше работать в относительных смещениях. Блок try-except используется, так как топик camera_info иногда может прерываться и посылать нам значение 0 для ширины и высоты изображения.

# Получим текущее положение панорамного сервопривода
current_pan = self.joint_state.position[self.joint_state.name.index(self.head_pan_joint)]

Нам понадобится текущее положение панорамного сервопривода, которое мы можем получить из массива self.joint_state.position. Напомним, что self.joint_state устанавливается в обратном вызове, назначенном подписчику в топике joint_state.

# Панорамируем камеру только в том случае, 
# если смещение цели x превышает пороговое значение
if abs(percent_offset_x) > self.pan_threshold:            
    # Установим скорость панорамирования пропорционально целевому смещению
    self.pan_speed = min(self.max_joint_speed, max(0, self.gain_pan * abs(percent_offset_x)))

    if target_offset_x > 0:
        self.pan_position = max(self.min_pan, current_pan - self.lead_target_angle)
    else:
        self.pan_position = min(self.max_pan, current_pan + self.lead_target_angle)
else:
    self.pan_speed = 0
    self.pan_position = current_pan

Если горизонтальное смещение цели превышает наш порог, то мы устанавливаем скорость панорамного сервопривода пропорционально смещению. Затем мы устанавливаем положение панорамирования в текущее положение плюс или минус угол опережения в зависимости от направления смещения. Если смещение цели падает ниже порогового значения, то мы устанавливаем скорость сервопривода на 0, а положение цели - на текущее положение.

Затем этот процесс повторяется для сервопривода наклона, устанавливая его скорость и положение цели в зависимости от вертикального смещения цели.

finally:
    # Снимем блокировку
    self.lock.release()

Наконец мы снимаем блокировку. И это завершает самые важные части скрипта. Все остальное должно быть достаточно понятным из комментариев в коде.

Last updated