教程 8:监督者控制器 (75分钟)


目标与简介

本教程将深入探讨Webots中的监督者控制器概念。监督者控制器是一个特殊的控制器,具有对整个仿真环境的完全访问权限。你将学习如何创建监督者控制器、监控仿真状态、动态修改场景,以及实现各种高级功能。本教程基于官方教程8

目录

1. 控制器编程基础

控制器编程是Webots中最重要的概念之一。控制器是运行在机器人上的程序,负责控制机器人的行为。Webots支持多种编程语言,包括C、C++、Python、Java和MATLAB。本教程主要使用Python进行演示。

控制器特点

  • 每个机器人都有自己的控制器进程
  • 控制器通过API与Webots仿真器通信
  • 支持多种编程语言,API接口基本一致
  • 控制器可以读取传感器数据并控制执行器
  • 标准输出和错误流会重定向到Webots控制台

2. Hello World示例

按照计算机科学的传统,我们从"Hello World!"示例开始。这是一个最简单的Webots控制器:

📝 动手实践 #1: 创建你的第一个控制器:
from controller import Robot

robot = Robot()

while robot.step(32) != -1:
    print("Hello World!")

这段代码会重复打印"Hello World!"到标准输出流,输出会重定向到Webots控制台。

代码解析

  • from controller import Robot - 导入Robot类
  • robot = Robot() - 创建机器人实例,初始化通信
  • robot.step(32) - 执行32毫秒的仿真步骤
  • != -1 - 检查控制器是否应该继续运行
  • 数值32指定控制步骤的持续时间(仿真时间,非实际时间)

3. 读取传感器

现在让我们学习如何读取机器人的传感器。下面的例子持续更新并打印距离传感器的值:

📝 动手实践 #2: 读取距离传感器:
from controller import Robot, DistanceSensor

TIME_STEP = 32

robot = Robot()

# 获取设备并启用
sensor = robot.getDevice("my_distance_sensor")
sensor.enable(TIME_STEP)

while robot.step(TIME_STEP) != -1:
    value = sensor.getValue()
    print("传感器值:", value)

传感器使用要点

  • 获取设备:使用robot.getDevice("设备名")获取设备标识
  • 启用传感器:每个传感器使用前必须调用enable()方法
  • 更新延迟:enable()的参数指定传感器数据更新间隔
  • 数据同步:传感器值在robot.step()调用时更新
  • 向量传感器:GPS、加速度计、陀螺仪返回[x,y,z]数组

4. 使用执行器

下面的例子展示如何让旋转电机以2Hz的正弦信号振荡:

📝 动手实践 #3: 控制旋转电机:
from controller import Robot, Motor
from math import pi, sin

TIME_STEP = 32

robot = Robot()
motor = robot.getDevice("my_motor")

F = 2.0   # 频率 2 Hz
t = 0.0   # 已过仿真时间

while robot.step(TIME_STEP) != -1:
    position = sin(t * 2.0 * pi * F)
    motor.setPosition(position)
    t += TIME_STEP / 1000.0

执行器使用要点

  • 无需启用:执行器不需要像传感器那样调用enable()方法
  • 设置目标:setPosition()只是设置目标位置,不会立即执行
  • 实际执行:执行器的动作在robot.step()调用时开始
  • 物理限制:电机可能因障碍物或扭矩不足而无法到达目标位置
  • 同时控制:可以为多个电机设置位置,然后一次性执行

5. step函数详解

Webots使用两种不同的时间步:

两种时间步

  • 仿真步(Simulation Step):在场景树WorldInfo.basicTimeStep中指定
  • 控制步(Control Step):作为robot.step()函数的参数指定
📝 重要概念:
  • 仿真步:指定仿真一步的持续时间,即计算位置、速度、碰撞等的时间间隔
  • 控制步:控制循环一次迭代的持续时间,必须是仿真步的整数倍
  • 同步机制:控制器通过step()函数与仿真器同步传感器和执行器数据
  • 阻塞特性:如果控制器不调用step(),仿真器会阻塞(同步模式下)

6. 传感器和执行器协同

下面是一个完整的例子,展示如何同时使用传感器和执行器。这个机器人使用差动转向,通过两个接近传感器检测障碍物:

📝 动手实践 #4: 避障机器人:
from controller import Robot, Motor, DistanceSensor

TIME_STEP = 32

robot = Robot()

# 获取并启用传感器
left_sensor = robot.getDevice("left_sensor")
right_sensor = robot.getDevice("right_sensor")
left_sensor.enable(TIME_STEP)
right_sensor.enable(TIME_STEP)

# 获取电机
left_motor = robot.getDevice("left_motor")
right_motor = robot.getDevice("right_motor")
left_motor.setPosition(float('inf'))  # 设置为速度控制模式
right_motor.setPosition(float('inf'))
left_motor.setVelocity(0.0)
right_motor.setVelocity(0.0)

def compute_left_speed(left_dist, right_dist):
    # 避障逻辑:右侧有障碍物时左转
    if right_dist < 500:
        return 2.0
    elif left_dist < 500:
        return 6.0
    else:
        return 4.0

def compute_right_speed(left_dist, right_dist):
    # 避障逻辑:左侧有障碍物时右转
    if left_dist < 500:
        return 2.0
    elif right_dist < 500:
        return 6.0
    else:
        return 4.0

while robot.step(TIME_STEP) != -1:
    # 读取传感器
    left_dist = left_sensor.getValue()
    right_dist = right_sensor.getValue()
    
    # 计算行为
    left = compute_left_speed(left_dist, right_dist)
    right = compute_right_speed(left_dist, right_dist)
    
    # 驱动轮子电机
    left_motor.setVelocity(left)
    right_motor.setVelocity(right)

7. 控制器参数

在".wbt"文件中,可以在Robot节点的controllerArgs字段中指定传递给控制器的参数:

📝 动手实践 #5: 使用控制器参数:
from controller import Robot
import sys

robot = Robot()

# 打印所有命令行参数
for i in range(1, len(sys.argv)):
    print("argv[%i]=%s" % (i, sys.argv[i]))

# 如果在Robot节点中设置:controllerArgs "one two three"
# 将输出:
# argv[1]=one
# argv[2]=two  
# argv[3]=three

8. 控制器终止

通常控制器在无限循环中运行,直到被Webots终止。以下情况会触发控制器终止:

终止条件

  • Webots退出
  • 仿真重置
  • 世界重新加载
  • 加载新仿真
  • 控制器名称更改
📝 动手实践 #6: 处理控制器终止:
from controller import Robot, DistanceSensor
import sys

TIME_STEP = 32

robot = Robot()
sensor = robot.getDevice("my_distance_sensor")
sensor.enable(TIME_STEP)

def save_experiment_data():
    """保存实验数据"""
    print("正在保存实验数据...")
    # 这里添加保存数据的代码
    pass

while robot.step(TIME_STEP) != -1:
    value = sensor.getValue()
    print("传感器值:", value)
    
    # 检查某个完成条件
    if experiment_finished():
        save_experiment_data()
        sys.exit(0)  # 主动终止仿真

# Webots触发的终止被检测到!
# 此时新的print语句不再显示在Webots控制台中
save_experiment_data()  # 保存数据,不应超过1秒

9. 环境变量和语言设置

Webots支持通过"runtime.ini"文件配置环境变量和语言特定设置:

📝 runtime.ini示例:
; runtime.ini 示例

[environment variables with paths]
WEBOTS_LIBRARY_PATH = lib:$(WEBOTS_LIBRARY_PATH):../../library

[environment variables]
MY_CUSTOM_VAR = some_value

[environment variables for Windows]
PYTHON_PATH = "C:\Python39;C:\Python39\Scripts"

[environment variables for Linux]
LD_LIBRARY_PATH = /usr/local/lib:$(LD_LIBRARY_PATH)

[python]
COMMAND = /usr/bin/python3.9
OPTIONS = -O -W ignore

支持的配置节

  • [environment variables with paths] - 包含路径的环境变量
  • [environment variables] - 通用环境变量
  • [environment variables for Windows/macOS/Linux] - 平台特定变量
  • [python/java/matlab] - 语言特定设置(COMMAND和OPTIONS)

10. 环境准备

现在我们已经了解了控制器编程的基础知识,让我们开始学习监督者控制器。首先创建一个新的环境:

📝 动手实践 #7: 创建一个新的Webots项目,命名为 supervisor_tutorial。在场景中添加一个 RectangleArena 作为基础环境,并添加一个 E-puck 机器人。确保仿真处于暂停状态。

10. 监督者概念

监督者控制器是Webots中的一个特殊控制器类型,它具有以下特点:

  • 可以访问整个仿真环境
  • 可以修改场景中的任何对象
  • 可以监控所有机器人的状态
  • 可以控制仿真的运行
  • 可以导入和导出对象
📝 动手实践 #2: 设置监督者控制器:
  1. 在场景树中选中 WorldInfo 节点
  2. supervisor 字段中设置为 TRUE
  3. E-puck 机器人设置一个简单的控制器
  4. 创建一个新的Python文件作为监督者控制器

3. 基础监督者控制器

让我们创建一个基础的监督者控制器,学习如何访问仿真环境和基本操作。

📝 动手实践 #3: 创建基础监督者控制器:
from controller import Supervisor
import math

TIME_STEP = 64

class BasicSupervisor:
    def __init__(self):
        self.supervisor = Supervisor()
        self.time_step = TIME_STEP
        
        # 获取仿真模式
        self.simulation_mode = self.supervisor.getSimulationMode()
        print("仿真模式:", self.simulation_mode)
        
        # 获取世界信息
        self.world_info = self.supervisor.getFromDef("WORLD_INFO")
        if self.world_info:
            print("成功获取世界信息")
        
        # 获取E-puck机器人
        self.epuck = self.supervisor.getFromDef("EPUCK")
        if self.epuck:
            print("成功获取E-puck机器人")
    
    def get_robot_position(self):
        """获取机器人位置"""
        if self.epuck:
            position = self.epuck.getPosition()
            print("机器人位置:", position)
            return position
        return None
    
    def get_robot_orientation(self):
        """获取机器人朝向"""
        if self.epuck:
            orientation = self.epuck.getOrientation()
            print("机器人朝向:", orientation)
            return orientation
    
    def run(self):
        """主运行循环"""
        step_count = 0
        while self.supervisor.step(self.time_step) != -1:
            step_count += 1
            
            # 每100步获取一次机器人状态
            if step_count % 100 == 0:
                self.get_robot_position()
                self.get_robot_orientation()
                
                # 获取仿真时间
                sim_time = self.supervisor.getTime()
                print("仿真时间: {:.2f} 秒".format(sim_time))

# 创建并运行监督者
supervisor = BasicSupervisor()
supervisor.run()

4. 场景修改

监督者控制器的一个强大功能是可以动态修改场景。我们可以添加、删除或修改场景中的对象。

📝 动手实践 #4: 动态添加对象:
def add_box_to_scene(self, position, size, color):
    """向场景中添加一个盒子"""
    # 创建盒子的VRML字符串
    box_vrml = """
    Solid {
        translation """ + str(position[0]) + " " + str(position[1]) + " " + str(position[2]) + """
        children [
            Shape {
                appearance Appearance {
                    material Material {
                        diffuseColor """ + str(color[0]) + " " + str(color[1]) + " " + str(color[2]) + """
                    }
                }
                geometry Box {
                    size """ + str(size[0]) + " " + str(size[1]) + " " + str(size[2]) + """
                }
            }
        ]
        boundingObject Box {
            size """ + str(size[0]) + " " + str(size[1]) + " " + str(size[2]) + """
        }
        physics Physics {
            mass 1
        }
    }
    """
    
    # 获取根节点
    root = self.supervisor.getRoot()
    root_children = root.getField("children")
    
    # 导入盒子
    self.supervisor.importMFNodeFromString(0, box_vrml)
    print("在位置", position, "添加了一个盒子")

def remove_object_by_name(self, name):
    """根据名称删除对象"""
    node = self.supervisor.getFromDef(name)
    if node:
        node.remove()
        print("删除了对象:", name)
    else:
        print("未找到对象:", name)

def modify_object_property(self, name, field_name, value):
    """修改对象属性"""
    node = self.supervisor.getFromDef(name)
    if node:
        field = node.getField(field_name)
        if field:
            if isinstance(value, list):
                field.setSFVec3f(value)
            elif isinstance(value, float):
                field.setSFFloat(value)
            elif isinstance(value, int):
                field.setSFInt32(value)
            print("修改了", name, "的", field_name, "为", value)
        else:
            print("字段", field_name, "不存在")
    else:
        print("未找到对象:", name)
📝 动手实践 #5: 创建动态环境:
def create_dynamic_environment(self):
    """创建动态环境"""
    step_count = 0
    obstacle_count = 0
    
    while self.supervisor.step(self.time_step) != -1:
        step_count += 1
        
        # 每500步添加一个障碍物
        if step_count % 500 == 0 and obstacle_count < 5:
            import random
            x = random.uniform(-2, 2)
            y = random.uniform(-2, 2)
            size = random.uniform(0.1, 0.3)
            color = [random.random(), random.random(), random.random()]
            
            self.add_box_to_scene([x, y, size/2], [size, size, size], color)
            obstacle_count += 1
            
            print("添加了第", obstacle_count, "个障碍物")
        
        # 每1000步移除最旧的障碍物
        if step_count % 1000 == 0 and obstacle_count > 0:
            # 这里简化处理,实际应用中需要跟踪对象名称
            print("移除一个障碍物")
            obstacle_count -= 1

5. 仿真监控

监督者控制器可以监控整个仿真的状态,包括机器人的性能、环境变化等。

📝 动手实践 #6: 创建性能监控器:
class PerformanceMonitor:
    def __init__(self, supervisor):
        self.supervisor = supervisor
        self.start_time = supervisor.getTime()
        self.start_position = None
        self.max_distance = 0
        self.total_distance = 0
        self.last_position = None
        
        # 获取机器人初始位置
        if self.epuck:
            self.start_position = self.epuck.getPosition()
            self.last_position = self.start_position
    
    def update_metrics(self):
        """更新性能指标"""
        if not self.epuck:
            return
        
        current_position = self.epuck.getPosition()
        current_time = self.supervisor.getTime()
        
        # 计算距离
        if self.last_position:
            distance = math.sqrt(
                (current_position[0] - self.last_position[0])**2 +
                (current_position[1] - self.last_position[1])**2
            )
            self.total_distance += distance
        
        # 计算最大距离
        if self.start_position:
            current_distance = math.sqrt(
                (current_position[0] - self.start_position[0])**2 +
                (current_position[1] - self.start_position[1])**2
            )
            self.max_distance = max(self.max_distance, current_distance)
        
        self.last_position = current_position
        
        # 每100步输出一次统计信息
        if int(current_time * 1000) % 6400 == 0:  # 每100步
            print("仿真时间: {:.2f}s".format(current_time))
            print("总移动距离: {:.3f}m".format(self.total_distance))
            print("最大距离: {:.3f}m".format(self.max_distance))
            print("平均速度: {:.3f}m/s".format(self.total_distance/current_time))
            print("-" * 40)
    
    def get_final_report(self):
        """获取最终报告"""
        final_time = self.supervisor.getTime()
        return {
            "total_time": final_time,
            "total_distance": self.total_distance,
            "max_distance": self.max_distance,
            "average_speed": self.total_distance / final_time if final_time > 0 else 0
        }

6. 高级功能

监督者控制器还支持许多高级功能,如数据记录、场景保存、多机器人协调等。

📝 动手实践 #7: 数据记录功能:
import csv
import os

class DataLogger:
    def __init__(self, filename="simulation_data.csv"):
        self.filename = filename
        self.data = []
        
        # 创建CSV文件并写入表头
        with open(filename, 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Time', 'X', 'Y', 'Z', 'Distance', 'Speed'])
    
    def log_data(self, time, position, distance, speed):
        """记录数据"""
        self.data.append([time, position[0], position[1], position[2], distance, speed])
        
        # 写入CSV文件
        with open(self.filename, 'a', newline='') as file:
            writer = csv.writer(file)
            writer.writerow([time, position[0], position[1], position[2], distance, speed])
    
    def export_summary(self, summary_data):
        """导出总结报告"""
        summary_filename = "simulation_summary.txt"
        with open(summary_filename, 'w') as file:
            file.write("仿真总结报告\n")
            file.write("=" * 30 + "\n")
            file.write("总仿真时间: {:.2f} 秒\n".format(summary_data['total_time']))
            file.write("总移动距离: {:.3f} 米\n".format(summary_data['total_distance']))
            file.write("最大距离: {:.3f} 米\n".format(summary_data['max_distance']))
            file.write("平均速度: {:.3f} 米/秒\n".format(summary_data['average_speed']))
        
        print("总结报告已保存到:", summary_filename)

def save_scene_state(self, filename="scene_backup.wbt"):
    """保存场景状态"""
    self.supervisor.saveWorld(filename)
    print("场景已保存到:", filename)

def load_scene_state(self, filename="scene_backup.wbt"):
    """加载场景状态"""
    self.supervisor.loadWorld(filename)
    print("场景已从", filename, "加载")

def restart_simulation(self):
    """重启仿真"""
    self.supervisor.simulationReset()
    print("仿真已重启")

def pause_simulation(self):
    """暂停仿真"""
    self.supervisor.simulationSetMode(Supervisor.SIMULATION_MODE_PAUSE)
    print("仿真已暂停")

def resume_simulation(self):
    """恢复仿真"""
    self.supervisor.simulationSetMode(Supervisor.SIMULATION_MODE_REAL_TIME)
    print("仿真已恢复")
📝 动手实践 #8: 完整的监督者控制器:
class CompleteSupervisor:
    def __init__(self):
        self.supervisor = Supervisor()
        self.time_step = TIME_STEP
        self.epuck = self.supervisor.getFromDef("EPUCK")
        
        # 初始化组件
        self.monitor = PerformanceMonitor(self.supervisor)
        self.logger = DataLogger()
        
        # 仿真参数
        self.simulation_duration = 60  # 60秒
        self.start_time = self.supervisor.getTime()
    
    def run_complete_simulation(self):
        """运行完整的监督仿真"""
        print("开始监督仿真...")
        
        while self.supervisor.step(self.time_step) != -1:
            current_time = self.supervisor.getTime()
            
            # 更新监控数据
            self.monitor.update_metrics()
            
            # 记录数据
            if self.epuck:
                position = self.epuck.getPosition()
                self.logger.log_data(
                    current_time,
                    position,
                    self.monitor.total_distance,
                    self.monitor.total_distance / current_time if current_time > 0 else 0
                )
            
            # 检查仿真时间
            if current_time >= self.simulation_duration:
                print("仿真时间达到设定值,正在结束...")
                break
        
        # 生成最终报告
        final_report = self.monitor.get_final_report()
        self.logger.export_summary(final_report)
        
        print("监督仿真完成!")
        print("最终报告:", final_report)

# 运行完整的监督者控制器
if __name__ == "__main__":
    supervisor = CompleteSupervisor()
    supervisor.run_complete_simulation()

7. 总结

恭喜你完成了Webots监督者控制器教程!你已经学会了:

  • 如何创建和配置监督者控制器
  • 如何访问和监控仿真环境
  • 如何动态修改场景(添加、删除、修改对象)
  • 如何监控机器人性能和仿真状态
  • 如何记录和分析仿真数据
  • 如何实现高级监督功能(场景保存、仿真控制等)

监督者控制器是Webots中非常强大的功能,它让你能够完全控制仿真环境,实现复杂的实验和分析。这些技能将帮助你创建更加智能和灵活的仿真系统。

下一步

你已经完成了Webots高级教程!现在你可以开始创建复杂的仿真项目,结合所有学到的技能来构建专业的机器人仿真系统。