本教程将深入探讨Webots控制器的更多高级功能。你将学习如何创建更复杂的机器人行为、处理传感器数据、实现状态机、使用多线程,以及如何调试和优化控制器代码。本教程基于官方教程4。
在开始本教程之前,请确保你已经完成了前面的教程,并且熟悉基本的控制器编程。我们将基于e-puck机器人来学习更高级的控制器功能。
advanced_controller.py
,我们将在这个文件中实现各种高级功能。
e-puck机器人配备了多种传感器,包括距离传感器、摄像头等。让我们学习如何有效地处理这些传感器数据。
from controller import Robot, DistanceSensor, Motor
import math
TIME_STEP = 64
MAX_SPEED = 6.28
class EPuckController:
def __init__(self):
self.robot = Robot()
self.time_step = TIME_STEP
# 获取距离传感器
self.distance_sensors = []
for i in range(8):
sensor = self.robot.getDevice(f'ps{i}')
sensor.enable(TIME_STEP)
self.distance_sensors.append(sensor)
# 获取电机
self.left_motor = self.robot.getDevice('left wheel motor')
self.right_motor = self.robot.getDevice('right wheel motor')
self.left_motor.setPosition(float('inf'))
self.right_motor.setPosition(float('inf'))
self.left_motor.setVelocity(0.0)
self.right_motor.setVelocity(0.0)
def get_sensor_values(self):
"""获取所有距离传感器的值"""
values = []
for sensor in self.distance_sensors:
values.append(sensor.getValue())
return values
def avoid_obstacles(self):
"""避障算法"""
sensor_values = self.get_sensor_values()
# 检测前方障碍物
front_left = sensor_values[0]
front_right = sensor_values[7]
# 如果前方有障碍物,转向
if front_left > 80 or front_right > 80:
# 根据障碍物位置决定转向方向
if front_left > front_right:
self.left_motor.setVelocity(-0.5 * MAX_SPEED)
self.right_motor.setVelocity(0.5 * MAX_SPEED)
else:
self.left_motor.setVelocity(0.5 * MAX_SPEED)
self.right_motor.setVelocity(-0.5 * MAX_SPEED)
else:
# 无障碍物时前进
self.left_motor.setVelocity(0.5 * MAX_SPEED)
self.right_motor.setVelocity(0.5 * MAX_SPEED)
def run(self):
"""主运行循环"""
while self.robot.step(self.time_step) != -1:
self.avoid_obstacles()
# 创建并运行控制器
controller = EPuckController()
controller.run()
from controller import Robot, Camera, DistanceSensor, Motor
import numpy as np
TIME_STEP = 64
MAX_SPEED = 6.28
class EPuckWithCamera:
def __init__(self):
self.robot = Robot()
self.time_step = TIME_STEP
# 获取摄像头
self.camera = self.robot.getDevice('camera')
self.camera.enable(TIME_STEP)
self.camera_width = self.camera.getWidth()
self.camera_height = self.camera.getHeight()
# 获取距离传感器
self.distance_sensors = []
for i in range(8):
sensor = self.robot.getDevice(f'ps{i}')
sensor.enable(TIME_STEP)
self.distance_sensors.append(sensor)
# 获取电机
self.left_motor = self.robot.getDevice('left wheel motor')
self.right_motor = self.robot.getDevice('right wheel motor')
self.left_motor.setPosition(float('inf'))
self.right_motor.setPosition(float('inf'))
def get_camera_image(self):
"""获取摄像头图像"""
image = self.camera.getImage()
# 转换为numpy数组进行处理
image_array = np.frombuffer(image, dtype=np.uint8)
return image_array.reshape((self.camera_height, self.camera_width, 4))
def detect_objects(self, image):
"""简单的物体检测(基于颜色)"""
# 检测红色物体
red_pixels = np.sum(image[:, :, 0] > 200)
total_pixels = image.shape[0] * image.shape[1]
red_ratio = red_pixels / total_pixels
return red_ratio > 0.1 # 如果红色像素超过10%,认为检测到红色物体
def run(self):
"""主运行循环"""
while self.robot.step(self.time_step) != -1:
# 获取摄像头图像
image = self.get_camera_image()
# 检测物体
if self.detect_objects(image):
# 检测到红色物体,停止
self.left_motor.setVelocity(0.0)
self.right_motor.setVelocity(0.0)
else:
# 继续前进
self.left_motor.setVelocity(0.3 * MAX_SPEED)
self.right_motor.setVelocity(0.3 * MAX_SPEED)
状态机是机器人控制中的重要概念,它允许机器人根据当前状态和输入做出不同的决策。
from controller import Robot, DistanceSensor, Motor
from enum import Enum
import math
TIME_STEP = 64
MAX_SPEED = 6.28
class RobotState(Enum):
EXPLORE = "explore"
AVOID = "avoid"
SEARCH = "search"
RETURN = "return"
class StateMachineController:
def __init__(self):
self.robot = Robot()
self.time_step = TIME_STEP
self.current_state = RobotState.EXPLORE
self.state_timer = 0
# 获取传感器和电机
self.setup_devices()
# 状态机参数
self.avoid_duration = 50 # 避障持续时间
self.search_duration = 100 # 搜索持续时间
def setup_devices(self):
"""设置设备"""
# 距离传感器
self.distance_sensors = []
for i in range(8):
sensor = self.robot.getDevice(f'ps{i}')
sensor.enable(TIME_STEP)
self.distance_sensors.append(sensor)
# 电机
self.left_motor = self.robot.getDevice('left wheel motor')
self.right_motor = self.robot.getDevice('right wheel motor')
self.left_motor.setPosition(float('inf'))
self.right_motor.setPosition(float('inf'))
def get_sensor_values(self):
"""获取传感器值"""
return [sensor.getValue() for sensor in self.distance_sensors]
def state_explore(self):
"""探索状态"""
sensor_values = self.get_sensor_values()
# 检测障碍物
if any(value > 80 for value in sensor_values):
self.current_state = RobotState.AVOID
self.state_timer = 0
return
# 正常前进
self.left_motor.setVelocity(0.5 * MAX_SPEED)
self.right_motor.setVelocity(0.5 * MAX_SPEED)
def state_avoid(self):
"""避障状态"""
self.state_timer += 1
# 避障行为
sensor_values = self.get_sensor_values()
left_obstacle = sensor_values[0] + sensor_values[1]
right_obstacle = sensor_values[6] + sensor_values[7]
if left_obstacle > right_obstacle:
self.left_motor.setVelocity(-0.3 * MAX_SPEED)
self.right_motor.setVelocity(0.3 * MAX_SPEED)
else:
self.left_motor.setVelocity(0.3 * MAX_SPEED)
self.right_motor.setVelocity(-0.3 * MAX_SPEED)
# 检查是否应该退出避障状态
if self.state_timer > self.avoid_duration:
self.current_state = RobotState.SEARCH
self.state_timer = 0
def state_search(self):
"""搜索状态"""
self.state_timer += 1
# 原地旋转搜索
self.left_motor.setVelocity(0.2 * MAX_SPEED)
self.right_motor.setVelocity(-0.2 * MAX_SPEED)
# 搜索一段时间后返回探索状态
if self.state_timer > self.search_duration:
self.current_state = RobotState.EXPLORE
self.state_timer = 0
def run(self):
"""主运行循环"""
while self.robot.step(self.time_step) != -1:
# 根据当前状态执行相应的行为
if self.current_state == RobotState.EXPLORE:
self.state_explore()
elif self.current_state == RobotState.AVOID:
self.state_avoid()
elif self.current_state == RobotState.SEARCH:
self.state_search()
# 创建并运行控制器
controller = StateMachineController()
controller.run()
对于复杂的机器人控制任务,多线程可以提供更好的性能和响应性。
from controller import Robot, DistanceSensor, Motor
import threading
import time
import queue
TIME_STEP = 64
MAX_SPEED = 6.28
class MultiThreadedController:
def __init__(self):
self.robot = Robot()
self.time_step = TIME_STEP
# 设置设备
self.setup_devices()
# 线程间通信队列
self.sensor_queue = queue.Queue()
self.command_queue = queue.Queue()
# 控制标志
self.running = True
# 创建线程
self.sensor_thread = threading.Thread(target=self.sensor_processing)
self.decision_thread = threading.Thread(target=self.decision_making)
self.motor_thread = threading.Thread(target=self.motor_control)
def setup_devices(self):
"""设置设备"""
# 距离传感器
self.distance_sensors = []
for i in range(8):
sensor = self.robot.getDevice(f'ps{i}')
sensor.enable(TIME_STEP)
self.distance_sensors.append(sensor)
# 电机
self.left_motor = self.robot.getDevice('left wheel motor')
self.right_motor = self.robot.getDevice('right wheel motor')
self.left_motor.setPosition(float('inf'))
self.right_motor.setPosition(float('inf'))
def sensor_processing(self):
"""传感器处理线程"""
while self.running:
# 读取传感器数据
sensor_values = [sensor.getValue() for sensor in self.distance_sensors]
# 将数据放入队列
self.sensor_queue.put(sensor_values)
# 控制读取频率
time.sleep(0.01)
def decision_making(self):
"""决策线程"""
while self.running:
try:
# 从队列获取传感器数据
sensor_values = self.sensor_queue.get(timeout=0.1)
# 简单的决策逻辑
if any(value > 80 for value in sensor_values):
# 检测到障碍物,停止
command = {'left': 0.0, 'right': 0.0}
else:
# 无障碍物,前进
command = {'left': 0.5 * MAX_SPEED, 'right': 0.5 * MAX_SPEED}
# 将命令放入队列
self.command_queue.put(command)
except queue.Empty:
continue
def motor_control(self):
"""电机控制线程"""
while self.running:
try:
# 从队列获取命令
command = self.command_queue.get(timeout=0.1)
# 执行电机命令
self.left_motor.setVelocity(command['left'])
self.right_motor.setVelocity(command['right'])
except queue.Empty:
continue
def run(self):
"""启动所有线程并运行"""
# 启动线程
self.sensor_thread.start()
self.decision_thread.start()
self.motor_thread.start()
# 主循环
while self.robot.step(self.time_step) != -1:
pass
# 停止所有线程
self.running = False
self.sensor_thread.join()
self.decision_thread.join()
self.motor_thread.join()
# 创建并运行控制器
controller = MultiThreadedController()
controller.run()
调试是控制器开发中的重要环节。Webots提供了多种调试工具和方法。
from controller import Robot, DistanceSensor, Motor
import logging
import time
TIME_STEP = 64
MAX_SPEED = 6.28
class DebugController:
def __init__(self):
self.robot = Robot()
self.time_step = TIME_STEP
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('robot_debug.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# 设置设备
self.setup_devices()
# 性能监控
self.step_count = 0
self.start_time = time.time()
def setup_devices(self):
"""设置设备"""
try:
# 距离传感器
self.distance_sensors = []
for i in range(8):
sensor = self.robot.getDevice(f'ps{i}')
sensor.enable(TIME_STEP)
self.distance_sensors.append(sensor)
self.logger.info(f"距离传感器 {i} 初始化成功")
# 电机
self.left_motor = self.robot.getDevice('left wheel motor')
self.right_motor = self.robot.getDevice('right wheel motor')
self.left_motor.setPosition(float('inf'))
self.right_motor.setPosition(float('inf'))
self.logger.info("电机初始化成功")
except Exception as e:
self.logger.error(f"设备初始化失败: {e}")
raise
def get_sensor_values(self):
"""获取传感器值并记录"""
try:
values = [sensor.getValue() for sensor in self.distance_sensors]
# 记录异常值
for i, value in enumerate(values):
if value > 100:
self.logger.warning(f"传感器 {i} 值异常: {value}")
return values
except Exception as e:
self.logger.error(f"读取传感器失败: {e}")
return [0] * 8
def set_motor_velocity(self, left_vel, right_vel):
"""设置电机速度并记录"""
try:
self.left_motor.setVelocity(left_vel)
self.right_motor.setVelocity(right_vel)
# 记录速度变化
if abs(left_vel - right_vel) > MAX_SPEED * 0.5:
self.logger.info(f"转向: 左轮={left_vel:.2f}, 右轮={right_vel:.2f}")
except Exception as e:
self.logger.error(f"设置电机速度失败: {e}")
def log_performance(self):
"""记录性能信息"""
self.step_count += 1
if self.step_count % 1000 == 0:
elapsed_time = time.time() - self.start_time
fps = self.step_count / elapsed_time
self.logger.info(f"性能统计: 步数={self.step_count}, FPS={fps:.2f}")
def run(self):
"""主运行循环"""
self.logger.info("控制器启动")
while self.robot.step(self.time_step) != -1:
try:
# 获取传感器数据
sensor_values = self.get_sensor_values()
# 简单的避障逻辑
if any(value > 80 for value in sensor_values):
self.set_motor_velocity(-0.3 * MAX_SPEED, 0.3 * MAX_SPEED)
else:
self.set_motor_velocity(0.5 * MAX_SPEED, 0.5 * MAX_SPEED)
# 记录性能
self.log_performance()
except Exception as e:
self.logger.error(f"主循环错误: {e}")
break
self.logger.info("控制器停止")
# 创建并运行控制器
controller = DebugController()
controller.run()
让我们学习一些高级的编程模式,如观察者模式、策略模式等。
from controller import Robot, DistanceSensor, Motor
from abc import ABC, abstractmethod
TIME_STEP = 64
MAX_SPEED = 6.28
class BehaviorStrategy(ABC):
"""行为策略基类"""
@abstractmethod
def execute(self, robot, sensors):
pass
class ExploreBehavior(BehaviorStrategy):
"""探索行为"""
def execute(self, robot, sensors):
left_motor, right_motor = robot.left_motor, robot.right_motor
left_motor.setVelocity(0.5 * MAX_SPEED)
right_motor.setVelocity(0.5 * MAX_SPEED)
class AvoidBehavior(BehaviorStrategy):
"""避障行为"""
def execute(self, robot, sensors):
left_motor, right_motor = robot.left_motor, robot.right_motor
# 根据障碍物位置决定转向
left_obstacle = sensors[0] + sensors[1]
right_obstacle = sensors[6] + sensors[7]
if left_obstacle > right_obstacle:
left_motor.setVelocity(-0.3 * MAX_SPEED)
right_motor.setVelocity(0.3 * MAX_SPEED)
else:
left_motor.setVelocity(0.3 * MAX_SPEED)
right_motor.setVelocity(-0.3 * MAX_SPEED)
class SearchBehavior(BehaviorStrategy):
"""搜索行为"""
def execute(self, robot, sensors):
left_motor, right_motor = robot.left_motor, robot.right_motor
left_motor.setVelocity(0.2 * MAX_SPEED)
right_motor.setVelocity(-0.2 * MAX_SPEED)
class AdvancedController:
def __init__(self):
self.robot = Robot()
self.time_step = TIME_STEP
# 设置设备
self.setup_devices()
# 行为策略
self.behaviors = {
'explore': ExploreBehavior(),
'avoid': AvoidBehavior(),
'search': SearchBehavior()
}
self.current_behavior = 'explore'
def setup_devices(self):
"""设置设备"""
# 距离传感器
self.distance_sensors = []
for i in range(8):
sensor = self.robot.getDevice(f'ps{i}')
sensor.enable(TIME_STEP)
self.distance_sensors.append(sensor)
# 电机
self.left_motor = self.robot.getDevice('left wheel motor')
self.right_motor = self.robot.getDevice('right wheel motor')
self.left_motor.setPosition(float('inf'))
self.right_motor.setPosition(float('inf'))
def get_sensor_values(self):
"""获取传感器值"""
return [sensor.getValue() for sensor in self.distance_sensors]
def select_behavior(self, sensor_values):
"""选择行为策略"""
# 检测障碍物
if any(value > 80 for value in sensor_values):
return 'avoid'
else:
return 'explore'
def run(self):
"""主运行循环"""
while self.robot.step(self.time_step) != -1:
# 获取传感器数据
sensor_values = self.get_sensor_values()
# 选择行为
self.current_behavior = self.select_behavior(sensor_values)
# 执行当前行为
self.behaviors[self.current_behavior].execute(self, sensor_values)
# 创建并运行控制器
controller = AdvancedController()
controller.run()
恭喜你完成了Webots高级控制器教程!你已经学会了:
这些技能将帮助你创建更复杂、更可靠的机器人控制器。在下一个教程中,我们将学习环境建模的高级技巧。
继续学习 教程5:环境建模,了解如何创建复杂的仿真环境。