教程 4:更多关于控制器 (90分钟)


目标与简介

本教程将深入探讨Webots控制器的更多高级功能。你将学习如何创建更复杂的机器人行为、处理传感器数据、实现状态机、使用多线程,以及如何调试和优化控制器代码。本教程基于官方教程4

目录

1. 环境准备

在开始本教程之前,请确保你已经完成了前面的教程,并且熟悉基本的控制器编程。我们将基于e-puck机器人来学习更高级的控制器功能。

📝 动手实践 #1: 打开之前创建的项目,确保场景中有e-puck机器人。创建一个新的控制器文件 advanced_controller.py,我们将在这个文件中实现各种高级功能。

2. 传感器数据处理

e-puck机器人配备了多种传感器,包括距离传感器、摄像头等。让我们学习如何有效地处理这些传感器数据。

📝 动手实践 #2: 创建距离传感器控制器:
from controller import Robot, DistanceSensor, Motor
import math

TIME_STEP = 64
MAX_SPEED = 6.28

class EPuckController:
    def __init__(self):
        self.robot = Robot()
        self.time_step = TIME_STEP
        
        # 获取距离传感器
        self.distance_sensors = []
        for i in range(8):
            sensor = self.robot.getDevice(f'ps{i}')
            sensor.enable(TIME_STEP)
            self.distance_sensors.append(sensor)
        
        # 获取电机
        self.left_motor = self.robot.getDevice('left wheel motor')
        self.right_motor = self.robot.getDevice('right wheel motor')
        self.left_motor.setPosition(float('inf'))
        self.right_motor.setPosition(float('inf'))
        self.left_motor.setVelocity(0.0)
        self.right_motor.setVelocity(0.0)
    
    def get_sensor_values(self):
        """获取所有距离传感器的值"""
        values = []
        for sensor in self.distance_sensors:
            values.append(sensor.getValue())
        return values
    
    def avoid_obstacles(self):
        """避障算法"""
        sensor_values = self.get_sensor_values()
        
        # 检测前方障碍物
        front_left = sensor_values[0]
        front_right = sensor_values[7]
        
        # 如果前方有障碍物,转向
        if front_left > 80 or front_right > 80:
            # 根据障碍物位置决定转向方向
            if front_left > front_right:
                self.left_motor.setVelocity(-0.5 * MAX_SPEED)
                self.right_motor.setVelocity(0.5 * MAX_SPEED)
            else:
                self.left_motor.setVelocity(0.5 * MAX_SPEED)
                self.right_motor.setVelocity(-0.5 * MAX_SPEED)
        else:
            # 无障碍物时前进
            self.left_motor.setVelocity(0.5 * MAX_SPEED)
            self.right_motor.setVelocity(0.5 * MAX_SPEED)
    
    def run(self):
        """主运行循环"""
        while self.robot.step(self.time_step) != -1:
            self.avoid_obstacles()

# 创建并运行控制器
controller = EPuckController()
controller.run()
📝 动手实践 #3: 添加摄像头处理:
from controller import Robot, Camera, DistanceSensor, Motor
import numpy as np

TIME_STEP = 64
MAX_SPEED = 6.28

class EPuckWithCamera:
    def __init__(self):
        self.robot = Robot()
        self.time_step = TIME_STEP
        
        # 获取摄像头
        self.camera = self.robot.getDevice('camera')
        self.camera.enable(TIME_STEP)
        self.camera_width = self.camera.getWidth()
        self.camera_height = self.camera.getHeight()
        
        # 获取距离传感器
        self.distance_sensors = []
        for i in range(8):
            sensor = self.robot.getDevice(f'ps{i}')
            sensor.enable(TIME_STEP)
            self.distance_sensors.append(sensor)
        
        # 获取电机
        self.left_motor = self.robot.getDevice('left wheel motor')
        self.right_motor = self.robot.getDevice('right wheel motor')
        self.left_motor.setPosition(float('inf'))
        self.right_motor.setPosition(float('inf'))
    
    def get_camera_image(self):
        """获取摄像头图像"""
        image = self.camera.getImage()
        # 转换为numpy数组进行处理
        image_array = np.frombuffer(image, dtype=np.uint8)
        return image_array.reshape((self.camera_height, self.camera_width, 4))
    
    def detect_objects(self, image):
        """简单的物体检测(基于颜色)"""
        # 检测红色物体
        red_pixels = np.sum(image[:, :, 0] > 200)
        total_pixels = image.shape[0] * image.shape[1]
        red_ratio = red_pixels / total_pixels
        
        return red_ratio > 0.1  # 如果红色像素超过10%,认为检测到红色物体
    
    def run(self):
        """主运行循环"""
        while self.robot.step(self.time_step) != -1:
            # 获取摄像头图像
            image = self.get_camera_image()
            
            # 检测物体
            if self.detect_objects(image):
                # 检测到红色物体,停止
                self.left_motor.setVelocity(0.0)
                self.right_motor.setVelocity(0.0)
            else:
                # 继续前进
                self.left_motor.setVelocity(0.3 * MAX_SPEED)
                self.right_motor.setVelocity(0.3 * MAX_SPEED)

3. 状态机实现

状态机是机器人控制中的重要概念,它允许机器人根据当前状态和输入做出不同的决策。

📝 动手实践 #4: 实现状态机控制器:
from controller import Robot, DistanceSensor, Motor
from enum import Enum
import math

TIME_STEP = 64
MAX_SPEED = 6.28

class RobotState(Enum):
    EXPLORE = "explore"
    AVOID = "avoid"
    SEARCH = "search"
    RETURN = "return"

class StateMachineController:
    def __init__(self):
        self.robot = Robot()
        self.time_step = TIME_STEP
        self.current_state = RobotState.EXPLORE
        self.state_timer = 0
        
        # 获取传感器和电机
        self.setup_devices()
        
        # 状态机参数
        self.avoid_duration = 50  # 避障持续时间
        self.search_duration = 100  # 搜索持续时间
    
    def setup_devices(self):
        """设置设备"""
        # 距离传感器
        self.distance_sensors = []
        for i in range(8):
            sensor = self.robot.getDevice(f'ps{i}')
            sensor.enable(TIME_STEP)
            self.distance_sensors.append(sensor)
        
        # 电机
        self.left_motor = self.robot.getDevice('left wheel motor')
        self.right_motor = self.robot.getDevice('right wheel motor')
        self.left_motor.setPosition(float('inf'))
        self.right_motor.setPosition(float('inf'))
    
    def get_sensor_values(self):
        """获取传感器值"""
        return [sensor.getValue() for sensor in self.distance_sensors]
    
    def state_explore(self):
        """探索状态"""
        sensor_values = self.get_sensor_values()
        
        # 检测障碍物
        if any(value > 80 for value in sensor_values):
            self.current_state = RobotState.AVOID
            self.state_timer = 0
            return
        
        # 正常前进
        self.left_motor.setVelocity(0.5 * MAX_SPEED)
        self.right_motor.setVelocity(0.5 * MAX_SPEED)
    
    def state_avoid(self):
        """避障状态"""
        self.state_timer += 1
        
        # 避障行为
        sensor_values = self.get_sensor_values()
        left_obstacle = sensor_values[0] + sensor_values[1]
        right_obstacle = sensor_values[6] + sensor_values[7]
        
        if left_obstacle > right_obstacle:
            self.left_motor.setVelocity(-0.3 * MAX_SPEED)
            self.right_motor.setVelocity(0.3 * MAX_SPEED)
        else:
            self.left_motor.setVelocity(0.3 * MAX_SPEED)
            self.right_motor.setVelocity(-0.3 * MAX_SPEED)
        
        # 检查是否应该退出避障状态
        if self.state_timer > self.avoid_duration:
            self.current_state = RobotState.SEARCH
            self.state_timer = 0
    
    def state_search(self):
        """搜索状态"""
        self.state_timer += 1
        
        # 原地旋转搜索
        self.left_motor.setVelocity(0.2 * MAX_SPEED)
        self.right_motor.setVelocity(-0.2 * MAX_SPEED)
        
        # 搜索一段时间后返回探索状态
        if self.state_timer > self.search_duration:
            self.current_state = RobotState.EXPLORE
            self.state_timer = 0
    
    def run(self):
        """主运行循环"""
        while self.robot.step(self.time_step) != -1:
            # 根据当前状态执行相应的行为
            if self.current_state == RobotState.EXPLORE:
                self.state_explore()
            elif self.current_state == RobotState.AVOID:
                self.state_avoid()
            elif self.current_state == RobotState.SEARCH:
                self.state_search()

# 创建并运行控制器
controller = StateMachineController()
controller.run()

4. 多线程控制器

对于复杂的机器人控制任务,多线程可以提供更好的性能和响应性。

📝 动手实践 #5: 创建多线程控制器:
from controller import Robot, DistanceSensor, Motor
import threading
import time
import queue

TIME_STEP = 64
MAX_SPEED = 6.28

class MultiThreadedController:
    def __init__(self):
        self.robot = Robot()
        self.time_step = TIME_STEP
        
        # 设置设备
        self.setup_devices()
        
        # 线程间通信队列
        self.sensor_queue = queue.Queue()
        self.command_queue = queue.Queue()
        
        # 控制标志
        self.running = True
        
        # 创建线程
        self.sensor_thread = threading.Thread(target=self.sensor_processing)
        self.decision_thread = threading.Thread(target=self.decision_making)
        self.motor_thread = threading.Thread(target=self.motor_control)
    
    def setup_devices(self):
        """设置设备"""
        # 距离传感器
        self.distance_sensors = []
        for i in range(8):
            sensor = self.robot.getDevice(f'ps{i}')
            sensor.enable(TIME_STEP)
            self.distance_sensors.append(sensor)
        
        # 电机
        self.left_motor = self.robot.getDevice('left wheel motor')
        self.right_motor = self.robot.getDevice('right wheel motor')
        self.left_motor.setPosition(float('inf'))
        self.right_motor.setPosition(float('inf'))
    
    def sensor_processing(self):
        """传感器处理线程"""
        while self.running:
            # 读取传感器数据
            sensor_values = [sensor.getValue() for sensor in self.distance_sensors]
            
            # 将数据放入队列
            self.sensor_queue.put(sensor_values)
            
            # 控制读取频率
            time.sleep(0.01)
    
    def decision_making(self):
        """决策线程"""
        while self.running:
            try:
                # 从队列获取传感器数据
                sensor_values = self.sensor_queue.get(timeout=0.1)
                
                # 简单的决策逻辑
                if any(value > 80 for value in sensor_values):
                    # 检测到障碍物,停止
                    command = {'left': 0.0, 'right': 0.0}
                else:
                    # 无障碍物,前进
                    command = {'left': 0.5 * MAX_SPEED, 'right': 0.5 * MAX_SPEED}
                
                # 将命令放入队列
                self.command_queue.put(command)
                
            except queue.Empty:
                continue
    
    def motor_control(self):
        """电机控制线程"""
        while self.running:
            try:
                # 从队列获取命令
                command = self.command_queue.get(timeout=0.1)
                
                # 执行电机命令
                self.left_motor.setVelocity(command['left'])
                self.right_motor.setVelocity(command['right'])
                
            except queue.Empty:
                continue
    
    def run(self):
        """启动所有线程并运行"""
        # 启动线程
        self.sensor_thread.start()
        self.decision_thread.start()
        self.motor_thread.start()
        
        # 主循环
        while self.robot.step(self.time_step) != -1:
            pass
        
        # 停止所有线程
        self.running = False
        self.sensor_thread.join()
        self.decision_thread.join()
        self.motor_thread.join()

# 创建并运行控制器
controller = MultiThreadedController()
controller.run()

5. 调试和优化

调试是控制器开发中的重要环节。Webots提供了多种调试工具和方法。

📝 动手实践 #6: 添加调试功能:
from controller import Robot, DistanceSensor, Motor
import logging
import time

TIME_STEP = 64
MAX_SPEED = 6.28

class DebugController:
    def __init__(self):
        self.robot = Robot()
        self.time_step = TIME_STEP
        
        # 设置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('robot_debug.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
        # 设置设备
        self.setup_devices()
        
        # 性能监控
        self.step_count = 0
        self.start_time = time.time()
    
    def setup_devices(self):
        """设置设备"""
        try:
            # 距离传感器
            self.distance_sensors = []
            for i in range(8):
                sensor = self.robot.getDevice(f'ps{i}')
                sensor.enable(TIME_STEP)
                self.distance_sensors.append(sensor)
                self.logger.info(f"距离传感器 {i} 初始化成功")
            
            # 电机
            self.left_motor = self.robot.getDevice('left wheel motor')
            self.right_motor = self.robot.getDevice('right wheel motor')
            self.left_motor.setPosition(float('inf'))
            self.right_motor.setPosition(float('inf'))
            self.logger.info("电机初始化成功")
            
        except Exception as e:
            self.logger.error(f"设备初始化失败: {e}")
            raise
    
    def get_sensor_values(self):
        """获取传感器值并记录"""
        try:
            values = [sensor.getValue() for sensor in self.distance_sensors]
            
            # 记录异常值
            for i, value in enumerate(values):
                if value > 100:
                    self.logger.warning(f"传感器 {i} 值异常: {value}")
            
            return values
            
        except Exception as e:
            self.logger.error(f"读取传感器失败: {e}")
            return [0] * 8
    
    def set_motor_velocity(self, left_vel, right_vel):
        """设置电机速度并记录"""
        try:
            self.left_motor.setVelocity(left_vel)
            self.right_motor.setVelocity(right_vel)
            
            # 记录速度变化
            if abs(left_vel - right_vel) > MAX_SPEED * 0.5:
                self.logger.info(f"转向: 左轮={left_vel:.2f}, 右轮={right_vel:.2f}")
                
        except Exception as e:
            self.logger.error(f"设置电机速度失败: {e}")
    
    def log_performance(self):
        """记录性能信息"""
        self.step_count += 1
        
        if self.step_count % 1000 == 0:
            elapsed_time = time.time() - self.start_time
            fps = self.step_count / elapsed_time
            self.logger.info(f"性能统计: 步数={self.step_count}, FPS={fps:.2f}")
    
    def run(self):
        """主运行循环"""
        self.logger.info("控制器启动")
        
        while self.robot.step(self.time_step) != -1:
            try:
                # 获取传感器数据
                sensor_values = self.get_sensor_values()
                
                # 简单的避障逻辑
                if any(value > 80 for value in sensor_values):
                    self.set_motor_velocity(-0.3 * MAX_SPEED, 0.3 * MAX_SPEED)
                else:
                    self.set_motor_velocity(0.5 * MAX_SPEED, 0.5 * MAX_SPEED)
                
                # 记录性能
                self.log_performance()
                
            except Exception as e:
                self.logger.error(f"主循环错误: {e}")
                break
        
        self.logger.info("控制器停止")

# 创建并运行控制器
controller = DebugController()
controller.run()

6. 高级编程模式

让我们学习一些高级的编程模式,如观察者模式、策略模式等。

📝 动手实践 #7: 实现策略模式控制器:
from controller import Robot, DistanceSensor, Motor
from abc import ABC, abstractmethod

TIME_STEP = 64
MAX_SPEED = 6.28

class BehaviorStrategy(ABC):
    """行为策略基类"""
    @abstractmethod
    def execute(self, robot, sensors):
        pass

class ExploreBehavior(BehaviorStrategy):
    """探索行为"""
    def execute(self, robot, sensors):
        left_motor, right_motor = robot.left_motor, robot.right_motor
        left_motor.setVelocity(0.5 * MAX_SPEED)
        right_motor.setVelocity(0.5 * MAX_SPEED)

class AvoidBehavior(BehaviorStrategy):
    """避障行为"""
    def execute(self, robot, sensors):
        left_motor, right_motor = robot.left_motor, robot.right_motor
        
        # 根据障碍物位置决定转向
        left_obstacle = sensors[0] + sensors[1]
        right_obstacle = sensors[6] + sensors[7]
        
        if left_obstacle > right_obstacle:
            left_motor.setVelocity(-0.3 * MAX_SPEED)
            right_motor.setVelocity(0.3 * MAX_SPEED)
        else:
            left_motor.setVelocity(0.3 * MAX_SPEED)
            right_motor.setVelocity(-0.3 * MAX_SPEED)

class SearchBehavior(BehaviorStrategy):
    """搜索行为"""
    def execute(self, robot, sensors):
        left_motor, right_motor = robot.left_motor, robot.right_motor
        left_motor.setVelocity(0.2 * MAX_SPEED)
        right_motor.setVelocity(-0.2 * MAX_SPEED)

class AdvancedController:
    def __init__(self):
        self.robot = Robot()
        self.time_step = TIME_STEP
        
        # 设置设备
        self.setup_devices()
        
        # 行为策略
        self.behaviors = {
            'explore': ExploreBehavior(),
            'avoid': AvoidBehavior(),
            'search': SearchBehavior()
        }
        self.current_behavior = 'explore'
    
    def setup_devices(self):
        """设置设备"""
        # 距离传感器
        self.distance_sensors = []
        for i in range(8):
            sensor = self.robot.getDevice(f'ps{i}')
            sensor.enable(TIME_STEP)
            self.distance_sensors.append(sensor)
        
        # 电机
        self.left_motor = self.robot.getDevice('left wheel motor')
        self.right_motor = self.robot.getDevice('right wheel motor')
        self.left_motor.setPosition(float('inf'))
        self.right_motor.setPosition(float('inf'))
    
    def get_sensor_values(self):
        """获取传感器值"""
        return [sensor.getValue() for sensor in self.distance_sensors]
    
    def select_behavior(self, sensor_values):
        """选择行为策略"""
        # 检测障碍物
        if any(value > 80 for value in sensor_values):
            return 'avoid'
        else:
            return 'explore'
    
    def run(self):
        """主运行循环"""
        while self.robot.step(self.time_step) != -1:
            # 获取传感器数据
            sensor_values = self.get_sensor_values()
            
            # 选择行为
            self.current_behavior = self.select_behavior(sensor_values)
            
            # 执行当前行为
            self.behaviors[self.current_behavior].execute(self, sensor_values)

# 创建并运行控制器
controller = AdvancedController()
controller.run()

7. 总结

恭喜你完成了Webots高级控制器教程!你已经学会了:

  • 如何有效地处理传感器数据
  • 如何实现状态机来控制机器人行为
  • 如何使用多线程提高控制器性能
  • 如何添加调试和日志功能
  • 如何应用高级编程模式
  • 如何优化和调试控制器代码

这些技能将帮助你创建更复杂、更可靠的机器人控制器。在下一个教程中,我们将学习环境建模的高级技巧。

下一步

继续学习 教程5:环境建模,了解如何创建复杂的仿真环境。