Подглава 11.4.2 Понимание скрипта Follower
#!/usr/bin/env python
import rospy
from roslib import message
from sensor_msgs import point_cloud2
from sensor_msgs.msg import PointCloud2
from geometry_msgs.msg import Twist
from math import copysign
class Follower():
def __init__(self):
rospy.init_node("follower")
# Вызовем функцию выключения (остановим робота)
rospy.on_shutdown(self.shutdown)
# Размеры (в метрах) области, в которой мы будем искать человека.
# Они задаются в координатах камеры, где x-влево/вправо,
# y-вверх/вниз, а z-глубина (вперед/назад)
self.min_x = rospy.get_param("~min_x", -0.2)
self.max_x = rospy.get_param("~max_x", 0.2)
self.min_y = rospy.get_param("~min_y", -0.3)
self.max_y = rospy.get_param("~max_y", 0.5)
self.max_z = rospy.get_param("~max_z", 1.2)
# Целевое расстояние (в метрах), которое должно держаться между роботом и человеком
self.goal_z = rospy.get_param("~goal_z", 0.6)
# Максимальная разницы между целевым и реальным расстоянем до цели до того, как робот отреагирует
self.z_threshold = rospy.get_param("~z_threshold", 0.05)
# Насколько далеко от центра (смещение x) может находиться человек
# до того, как робот отреагирует
self.x_threshold = rospy.get_param("~x_threshold", 0.05)
# Как мы взвешиваем целевое расстояние, когда совершаем движение
self.z_scale = rospy.get_param("~z_scale", 1.0)
# Как мы взвешивем отклонение человека влево/вправо, когда совершаем движение
self.x_scale = rospy.get_param("~x_scale", 2.5)
# Максимальная скорость вращения в рад/с
self.max_angular_speed = rospy.get_param("~max_angular_speed", 2.0)
# Минимальная скорость вращения в рад/с
self.min_angular_speed = rospy.get_param("~min_angular_speed", 0.0)
# Максимальная линейная скорость в м/c
self.max_linear_speed = rospy.get_param("~max_linear_speed", 0.3)
# Минимальная линейная скорость в м/c
self.min_linear_speed = rospy.get_param("~min_linear_speed", 0.1)
# Фактор замедления при остановке
self.slow_down_factor = rospy.get_param("~slow_down_factor", 0.8)
# Инициализируем команду перемещения
self.move_cmd = Twist()
# Publisher для управления движения робота
self.cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=5)
# Подписка на облако точек
self.depth_subscriber = rospy.Subscriber('point_cloud', PointCloud2, self.set_cmd_vel, queue_size=1)
rospy.loginfo("Subscribing to point cloud...")
# Ждем топик pointcloud, чтобы он стал доступным
rospy.wait_for_message('point_cloud', PointCloud2)
rospy.loginfo("Ready to follow!")
def set_cmd_vel(self, msg):
# Инициализация счетчика точек координат центроида
x = y = z = n = 0
# Считаем в координатах x, y, z все точки в облаке
for point in point_cloud2.read_points(msg, skip_nans=True):
pt_x = point[0]
pt_y = point[1]
pt_z = point[2]
# Учитываем только эти точки в пределах наших обозначенных границ и суммируем их
if -pt_y > self.min_y and -pt_y < self.max_y and pt_x < self.max_x and pt_x > self.min_x and pt_z < self.max_z:
x += pt_x
y += pt_y
z += pt_z
n += 1
# Если у нас есть точки, вычислим координаты центроида
if n:
x /= n
y /= n
z /= n
# Проверим наши предельные отклонения
if (abs(z - self.goal_z) > self.z_threshold):
# Вычислим угловую составляющую движения
linear_speed = (z - self.goal_z) * self.z_scale
# Убедимся, что мы соответствуем нашим минимальным/максимальным характеристикам
self.move_cmd.linear.x = copysign(max(self.min_linear_speed,
min(self.max_linear_speed, abs(linear_speed))), linear_speed)
else:
self.move_cmd.linear.x *= self.slow_down_factor
if (abs(x) > self.x_threshold):
# Вычислим линейную составляющую движения
angular_speed = -x * self.x_scale
# Убедимся, что мы соответствуем нашим минимальным/максимальным характеристикам
self.move_cmd.angular.z = copysign(max(self.min_angular_speed,
min(self.max_angular_speed, abs(angular_speed))), angular_speed)
else:
# Плавно остановим вращение
self.move_cmd.angular.z *= self.slow_down_factor
else:
# Плавно остановим робота
self.move_cmd.linear.x *= self.slow_down_factor
self.move_cmd.angular.z *= self.slow_down_factor
# Опубликуем команду перемещения
self.cmd_vel_pub.publish(self.move_cmd)
def shutdown(self):
rospy.loginfo("Stopping the robot...")
# Отмените регистрацию подписчика, чтобы остановить публикацию cmd_vel
self.depth_subscriber.unregister()
rospy.sleep(1)
# Отправим пустое Twist-сообщение, чтобы остановить робота
self.cmd_vel_pub.publish(Twist())
rospy.sleep(1)
if __name__ == '__main__':
try:
Follower()
rospy.spin()
except rospy.ROSInterruptException:
rospy.loginfo("Follower node terminated.")
PreviousПодглава 11.4.1 Тестирование приложения Follower в симулятореNextПодглава 11.4.3 Запуск приложения Follower на TurtleBot
Last updated