教程 8:场景修改与监督者控制 (75分钟)


目标与简介

本教程将深入探讨Webots中的场景修改和监督者控制器概念。监督者控制器是一个特殊的控制器,具有对整个仿真环境的完全访问权限。你将学习如何创建监督者控制器、动态修改场景、创建动态环境、实现天气系统,以及监控仿真状态。本教程基于官方教程8

目录

1. 监督者概念

监督者控制器是Webots中一个非常强大的功能,它允许你控制整个仿真过程,而不仅仅是单个机器人。监督者控制器可以访问场景树、修改对象属性、监控仿真状态,甚至控制仿真的开始、暂停和停止。

监督者控制器的基本概念

  • 场景树访问:可以读取和修改整个场景树
  • 仿真控制:控制仿真的开始、暂停、重启和停止
  • 对象操作:动态添加、删除和修改场景中的对象
  • 数据收集:收集仿真过程中的各种数据
  • 全局监控:监控整个仿真环境的状态
📝 动手实践 #1: 创建一个新的Webots项目,命名为 supervisor_tutorial。在场景中添加一个 RectangleArena 作为基础环境,并添加一个 E-puck 机器人。将机器人的 supervisor 字段设置为 TRUE,这样它就可以作为监督者控制器使用。

2. 基础监督者控制器

监督者控制器是Webots中一个特殊的控制器类型,它继承自Robot类,但具有额外的功能。监督者控制器可以访问场景树、修改对象属性、控制仿真流程等。

📝 动手实践 #2: 创建基础监督者控制器:
from controller import Supervisor
import math

# 创建监督者控制器
supervisor = Supervisor()

# 获取时间步长
TIME_STEP = int(supervisor.getBasicTimeStep())

# 获取机器人节点
epuck = supervisor.getFromDef("EPUCK")

def main():
    print("监督者控制器已启动")
    
    while supervisor.step(TIME_STEP) != -1:
        # 获取当前仿真时间
        current_time = supervisor.getTime()
        
        # 获取机器人位置
        if epuck:
            position = epuck.getPosition()
            print(f"时间: {current_time:.2f}s, 机器人位置: ({position[0]:.3f}, {position[1]:.3f}, {position[2]:.3f})")
        
        # 每5秒输出一次状态
        if int(current_time) % 5 == 0:
            print(f"仿真运行中... 当前时间: {current_time:.1f}秒")

if __name__ == "__main__":
    main()

监督者控制器的优势

  • 全局控制:可以控制整个仿真环境
  • 场景操作:动态修改场景树和对象属性
  • 数据收集:收集和分析仿真数据
  • 自动化测试:实现自动化实验和测试
  • 性能监控:监控仿真性能和机器人行为

3. 场景修改

监督者控制器最强大的功能之一就是能够动态修改场景。你可以添加、删除、修改场景中的对象,改变它们的属性,甚至创建全新的对象。

📝 动手实践 #3: 动态添加和修改场景对象:
from controller import Supervisor
import math

class SceneModifier:
    def __init__(self, supervisor):
        self.supervisor = supervisor
        self.root_children_field = supervisor.getRoot().getField("children")
        
    def add_box_to_scene(self, position, size, color):
        """向场景中添加一个盒子"""
        # 创建Box节点
        box_string = f'''
        DEF BOX_{len(self.root_children_field.getMF())} Transform {{
          translation {position[0]} {position[1]} {position[2]}
          children [
            Shape {{
              appearance Appearance {{
                material Material {{
                  diffuseColor {color[0]} {color[1]} {color[2]}
                }}
              }}
              geometry Box {{
                size {size[0]} {size[1]} {size[2]}
              }}
            }}
          ]
        }}
        '''
        
        # 导入节点到场景
        self.supervisor.importMFString(box_string)
        print(f"已添加盒子到位置: {position}")
    
    def remove_object_by_def(self, def_name):
        """通过DEF名称删除对象"""
        node = self.supervisor.getFromDef(def_name)
        if node:
            node.remove()
            print(f"已删除对象: {def_name}")
        else:
            print(f"未找到对象: {def_name}")
    
    def change_object_color(self, def_name, new_color):
        """改变对象的颜色"""
        node = self.supervisor.getFromDef(def_name)
        if node:
            # 获取材质字段
            material_field = node.getField("material")
            if material_field:
                # 创建新的材质
                new_material_string = f'''
                Material {{
                  diffuseColor {new_color[0]} {new_color[1]} {new_color[2]}
                }}
                '''
                material_field.importMFString(new_material_string)
                print(f"已改变对象 {def_name} 的颜色为: {new_color}")

# 使用示例
if __name__ == "__main__":
    supervisor = Supervisor()
    TIME_STEP = int(supervisor.getBasicTimeStep())
    
    scene_modifier = SceneModifier(supervisor)
    
    # 添加一个红色盒子
    scene_modifier.add_box_to_scene([1, 0.5, 1], [0.5, 0.5, 0.5], [1, 0, 0])
    
    # 运行仿真
    while supervisor.step(TIME_STEP) != -1:
        current_time = supervisor.getTime()
        
        # 每10秒改变盒子颜色
        if int(current_time) % 10 == 0 and current_time > 0:
            import random
            new_color = [random.random(), random.random(), random.random()]
            scene_modifier.change_object_color("BOX_0", new_color)

场景修改的优势

  • 动态环境:可以创建随时间变化的仿真环境
  • 交互式实验:在仿真过程中实时调整实验条件
  • 自适应测试:根据机器人行为动态调整障碍物和奖励
  • 场景生成:程序化生成复杂的测试场景
  • 实时反馈:根据仿真结果立即调整环境参数

高级场景修改技术

除了基本的对象添加和删除,监督者控制器还支持更复杂的场景修改操作:

📝 动手实践 #4: 实现动态障碍物系统:
class DynamicObstacleSystem:
    def __init__(self, supervisor):
        self.supervisor = supervisor
        self.root_children_field = supervisor.getRoot().getField("children")
        self.obstacles = []
        self.obstacle_count = 0
        self.max_obstacles = 10
        
    def create_moving_obstacle(self, initial_position, velocity, size, color):
        """创建移动的障碍物"""
        obstacle_def = f"MOVING_OBSTACLE_{self.obstacle_count}"
        
        # 创建移动障碍物的VRML字符串
        obstacle_string = f'''
        DEF {obstacle_def} Transform {{
          translation {initial_position[0]} {initial_position[1]} {initial_position[2]}
          children [
            Shape {{
              appearance Appearance {{
                material Material {{
                  diffuseColor {color[0]} {color[1]} {color[2]}
                }}
              }}
              geometry Sphere {{
                radius {size}
              }}
            }}
          ]
        }}
        '''
        
        # 导入到场景
        self.supervisor.importMFString(obstacle_string)
        
        # 记录障碍物信息
        obstacle_info = {
            'def_name': obstacle_def,
            'position': list(initial_position),
            'velocity': list(velocity),
            'size': size,
            'color': list(color)
        }
        self.obstacles.append(obstacle_info)
        self.obstacle_count += 1
        
        print(f"创建移动障碍物: {obstacle_def}")
        return obstacle_def
    
    def update_moving_obstacles(self, time_step):
        """更新所有移动障碍物的位置"""
        for obstacle in self.obstacles:
            # 更新位置
            obstacle['position'][0] += obstacle['velocity'][0] * time_step
            obstacle['position'][1] += obstacle['velocity'][1] * time_step
            obstacle['position'][2] += obstacle['velocity'][2] * time_step
            
            # 边界检查 - 如果超出边界则反弹
            if abs(obstacle['position'][0]) > 5:
                obstacle['velocity'][0] *= -1
            if abs(obstacle['position'][2]) > 5:
                obstacle['velocity'][2] *= -1
            
            # 更新场景中的位置
            node = self.supervisor.getFromDef(obstacle['def_name'])
            if node:
                translation_field = node.getField("translation")
                if translation_field:
                    translation_field.setSFVec3f(obstacle['position'])
    
    def create_obstacle_pattern(self, pattern_type="grid"):
        """创建障碍物模式"""
        if pattern_type == "grid":
            # 创建网格模式的障碍物
            for i in range(-2, 3):
                for j in range(-2, 3):
                    if (i + j) % 2 == 0:  # 棋盘模式
                        x = i * 1.5
                        z = j * 1.5
                        self.create_moving_obstacle(
                            [x, 0.5, z], 
                            [0, 0, 0], 
                            0.3, 
                            [0.8, 0.2, 0.2]
                        )
        
        elif pattern_type == "circle":
            # 创建圆形模式的障碍物
            import math
            for i in range(8):
                angle = i * 2 * math.pi / 8
                radius = 3
                x = radius * math.cos(angle)
                z = radius * math.sin(angle)
                self.create_moving_obstacle(
                    [x, 0.5, z], 
                    [0, 0, 0], 
                    0.4, 
                    [0.2, 0.8, 0.2]
                )
    
    def remove_oldest_obstacle(self):
        """移除最旧的障碍物"""
        if self.obstacles:
            oldest = self.obstacles.pop(0)
            node = self.supervisor.getFromDef(oldest['def_name'])
            if node:
                node.remove()
                print(f"移除障碍物: {oldest['def_name']}")
                self.obstacle_count -= 1

# 使用示例
if __name__ == "__main__":
    supervisor = Supervisor()
    TIME_STEP = int(supervisor.getBasicTimeStep())
    
    obstacle_system = DynamicObstacleSystem(supervisor)
    
    # 创建一些移动障碍物
    obstacle_system.create_moving_obstacle([2, 0.5, 2], [0.5, 0, 0.3], 0.3, [1, 0, 0])
    obstacle_system.create_moving_obstacle([-2, 0.5, -2], [-0.3, 0, 0.5], 0.3, [0, 0, 1])
    
    # 创建障碍物模式
    obstacle_system.create_obstacle_pattern("grid")
    
    step_count = 0
    while supervisor.step(TIME_STEP) != -1:
        step_count += 1
        
        # 更新移动障碍物
        obstacle_system.update_moving_obstacles(TIME_STEP / 1000.0)
        
        # 每1000步添加新障碍物
        if step_count % 1000 == 0 and obstacle_system.obstacle_count < obstacle_system.max_obstacles:
            import random
            x = random.uniform(-4, 4)
            z = random.uniform(-4, 4)
            vx = random.uniform(-0.5, 0.5)
            vz = random.uniform(-0.5, 0.5)
            color = [random.random(), random.random(), random.random()]
            
            obstacle_system.create_moving_obstacle([x, 0.5, z], [vx, 0, vz], 0.25, color)
        
        # 每2000步移除最旧的障碍物
        if step_count % 2000 == 0 and obstacle_system.obstacle_count > 5:
            obstacle_system.remove_oldest_obstacle()
📝 动手实践 #5: 实现环境条件变化系统:
class EnvironmentConditionSystem:
    def __init__(self, supervisor):
        self.supervisor = supervisor
        self.weather_conditions = ["sunny", "rainy", "foggy", "windy"]
        self.current_weather = "sunny"
        self.light_intensity = 1.0
        self.fog_density = 0.0
        
    def change_weather(self, weather_type):
        """改变天气条件"""
        if weather_type in self.weather_conditions:
            self.current_weather = weather_type
            print(f"天气已改变为: {weather_type}")
            
            # 根据天气调整环境参数
            if weather_type == "rainy":
                self.light_intensity = 0.6
                self.fog_density = 0.2
            elif weather_type == "foggy":
                self.light_intensity = 0.4
                self.fog_density = 0.8
            elif weather_type == "windy":
                self.light_intensity = 0.8
                self.fog_density = 0.1
            else:  # sunny
                self.light_intensity = 1.0
                self.fog_density = 0.0
            
            # 应用天气效果到场景
            self.apply_weather_effects()
    
    def apply_weather_effects(self):
        """应用天气效果到场景"""
        # 获取场景根节点
        root = self.supervisor.getRoot()
        
        # 查找或创建WorldInfo节点来设置环境参数
        world_info = root.getField("info")
        if world_info:
            # 更新环境信息
            info_string = f'''
            WorldInfo {{
              title "Weather: {self.current_weather}"
              info [
                "Light intensity: {self.light_intensity}"
                "Fog density: {self.fog_density}"
              ]
            }}
            '''
            world_info.importMFString(info_string)
    
    def create_weather_effects(self):
        """创建视觉天气效果"""
        if self.current_weather == "rainy":
            # 创建雨滴效果
            self.create_rain_effect()
        elif self.current_weather == "foggy":
            # 创建雾效果
            self.create_fog_effect()
    
    def create_rain_effect(self):
        """创建雨滴效果"""
        # 在场景中添加多个小水滴
        for i in range(20):
            x = (i % 5 - 2) * 2
            z = (i // 5 - 2) * 2
            y = 5 + (i % 3) * 0.5
            
            raindrop_string = f'''
            DEF RAINDROP_{i} Transform {{
              translation {x} {y} {z}
              children [
                Shape {{
                  appearance Appearance {{
                    material Material {{
                      diffuseColor 0.5 0.7 1.0
                      transparency 0.7
                    }}
                  }}
                  geometry Cylinder {{
                    height 0.3
                    radius 0.02
                  }}
                }}
              ]
            }}
            '''
            self.supervisor.importMFString(raindrop_string)
    
    def create_fog_effect(self):
        """创建雾效果"""
        # 添加半透明的雾层
        fog_string = '''
        DEF FOG_LAYER Transform {
          translation 0 2 0
          children [
            Shape {
              appearance Appearance {
                material Material {
                  diffuseColor 0.9 0.9 0.9
                  transparency 0.6
                }
              }
              geometry Box {
                size 20 0.1 20
              }
            }
          ]
        }
        '''
        self.supervisor.importMFString(fog_string)
    
    def cycle_weather(self, step_count):
        """循环改变天气"""
        if step_count % 5000 == 0:  # 每5000步改变一次天气
            import random
            new_weather = random.choice(self.weather_conditions)
            self.change_weather(new_weather)
            self.create_weather_effects()

# 使用示例
if __name__ == "__main__":
    supervisor = Supervisor()
    TIME_STEP = int(supervisor.getBasicTimeStep())
    
    weather_system = EnvironmentConditionSystem(supervisor)
    
    step_count = 0
    while supervisor.step(TIME_STEP) != -1:
        step_count += 1
        
        # 循环改变天气
        weather_system.cycle_weather(step_count)
        
        # 每1000步输出当前天气状态
        if step_count % 1000 == 0:
            print(f"当前天气: {weather_system.current_weather}")
            print(f"光照强度: {weather_system.light_intensity}")
            print(f"雾密度: {weather_system.fog_density}")

4. 仿真监控

监督者控制器可以监控整个仿真的状态,包括机器人的性能、环境变化等。通过实时监控,你可以了解仿真的运行情况,并根据需要调整参数。

📝 动手实践 #6: 创建性能监控器:
class PerformanceMonitor:
    def __init__(self, supervisor):
        self.supervisor = supervisor
        self.start_time = supervisor.getTime()
        self.start_position = None
        self.max_distance = 0
        self.total_distance = 0
        self.last_position = None
        
        # 获取机器人初始位置
        if self.epuck:
            self.start_position = self.epuck.getPosition()
            self.last_position = self.start_position
    
    def update_metrics(self):
        """更新性能指标"""
        if not self.epuck:
            return
        
        current_position = self.epuck.getPosition()
        current_time = self.supervisor.getTime()
        
        # 计算距离
        if self.last_position:
            distance = math.sqrt(
                (current_position[0] - self.last_position[0])**2 +
                (current_position[1] - self.last_position[1])**2
            )
            self.total_distance += distance
        
        # 计算最大距离
        if self.start_position:
            current_distance = math.sqrt(
                (current_position[0] - self.start_position[0])**2 +
                (current_position[1] - self.start_position[1])**2
            )
            self.max_distance = max(self.max_distance, current_distance)
        
        self.last_position = current_position
        
        # 每100步输出一次统计信息
        if int(current_time * 1000) % 6400 == 0:  # 每100步
            print("仿真时间: {:.2f}s".format(current_time))
            print("总移动距离: {:.3f}m".format(self.total_distance))
            print("最大距离: {:.3f}m".format(self.max_distance))
            print("平均速度: {:.3f}m/s".format(self.total_distance/current_time))
            print("-" * 40)
    
    def get_final_report(self):
        """获取最终报告"""
        final_time = self.supervisor.getTime()
        return {
            "total_time": final_time,
            "total_distance": self.total_distance,
            "max_distance": self.max_distance,
            "average_speed": self.total_distance / final_time if final_time > 0 else 0
        }

5. 高级功能

监督者控制器还支持许多高级功能,如数据记录、场景保存、多机器人协调等。

📝 动手实践 #7: 数据记录功能:
import csv
import os

class DataLogger:
    def __init__(self, filename="simulation_data.csv"):
        self.filename = filename
        self.data = []
        
        # 创建CSV文件并写入表头
        with open(filename, 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Time', 'X', 'Y', 'Z', 'Distance', 'Speed'])
    
    def log_data(self, time, position, distance, speed):
        """记录数据"""
        self.data.append([time, position[0], position[1], position[2], distance, speed])
        
        # 写入CSV文件
        with open(self.filename, 'a', newline='') as file:
            writer = csv.writer(file)
            writer.writerow([time, position[0], position[1], position[2], distance, speed])
    
    def export_summary(self, summary_data):
        """导出总结报告"""
        summary_filename = "simulation_summary.txt"
        with open(summary_filename, 'w') as file:
            file.write("仿真总结报告\n")
            file.write("=" * 30 + "\n")
            file.write("总仿真时间: {:.2f} 秒\n".format(summary_data['total_time']))
            file.write("总移动距离: {:.3f} 米\n".format(summary_data['total_distance']))
            file.write("最大距离: {:.3f} 米\n".format(summary_data['max_distance']))
            file.write("平均速度: {:.3f} 米/秒\n".format(summary_data['average_speed']))
        
        print("总结报告已保存到:", summary_filename)

def save_scene_state(self, filename="scene_backup.wbt"):
    """保存场景状态"""
    self.supervisor.saveWorld(filename)
    print("场景已保存到:", filename)

def load_scene_state(self, filename="scene_backup.wbt"):
    """加载场景状态"""
    self.supervisor.loadWorld(filename)
    print("场景已从", filename, "加载")

def restart_simulation(self):
    """重启仿真"""
    self.supervisor.simulationReset()
    print("仿真已重启")

def pause_simulation(self):
    """暂停仿真"""
    self.supervisor.simulationSetMode(Supervisor.SIMULATION_MODE_PAUSE)
    print("仿真已暂停")

def resume_simulation(self):
    """恢复仿真"""
    self.supervisor.simulationSetMode(Supervisor.SIMULATION_MODE_REAL_TIME)
    print("仿真已恢复")
📝 动手实践 #8: 完整的监督者控制器:
class CompleteSupervisor:
    def __init__(self):
        self.supervisor = Supervisor()
        self.time_step = TIME_STEP
        self.epuck = self.supervisor.getFromDef("EPUCK")
        
        # 初始化组件
        self.monitor = PerformanceMonitor(self.supervisor)
        self.logger = DataLogger()
        
        # 仿真参数
        self.simulation_duration = 60  # 60秒
        self.start_time = self.supervisor.getTime()
    
    def run_complete_simulation(self):
        """运行完整的监督仿真"""
        print("开始监督仿真...")
        
        while self.supervisor.step(self.time_step) != -1:
            current_time = self.supervisor.getTime()
            
            # 更新监控数据
            self.monitor.update_metrics()
            
            # 记录数据
            if self.epuck:
                position = self.epuck.getPosition()
                self.logger.log_data(
                    current_time,
                    position,
                    self.monitor.total_distance,
                    self.monitor.total_distance / current_time if current_time > 0 else 0
                )
            
            # 检查仿真时间
            if current_time >= self.simulation_duration:
                print("仿真时间达到设定值,正在结束...")
                break
        
        # 生成最终报告
        final_report = self.monitor.get_final_report()
        self.logger.export_summary(final_report)
        
        print("监督仿真完成!")
        print("最终报告:", final_report)

# 运行完整的监督者控制器
if __name__ == "__main__":
    supervisor = CompleteSupervisor()
    supervisor.run_complete_simulation()

6. 总结

恭喜你完成了Webots场景修改与监督者控制教程!你已经学会了:

  • 如何创建和配置监督者控制器
  • 如何动态修改场景(添加、删除、修改对象)
  • 如何创建动态障碍物系统和环境变化
  • 如何实现天气系统和视觉特效
  • 如何监控机器人性能和仿真状态
  • 如何记录和分析仿真数据
  • 如何实现高级监督功能(场景保存、仿真控制等)

场景修改和监督者控制器是Webots中非常强大的功能,它们让你能够完全控制仿真环境,创建动态变化的实验场景,实现复杂的实验和分析。这些技能将帮助你创建更加智能、灵活和真实的仿真系统。

下一步

你已经完成了Webots场景修改与监督者控制教程!现在你可以开始创建动态变化的仿真项目,结合场景修改和监督者控制技能来构建更加真实和复杂的机器人仿真系统。