本教程将深入探讨Webots中的场景修改和监督者控制器概念。监督者控制器是一个特殊的控制器,具有对整个仿真环境的完全访问权限。你将学习如何创建监督者控制器、动态修改场景、创建动态环境、实现天气系统,以及监控仿真状态。本教程基于官方教程8。
监督者控制器是Webots中一个非常强大的功能,它允许你控制整个仿真过程,而不仅仅是单个机器人。监督者控制器可以访问场景树、修改对象属性、监控仿真状态,甚至控制仿真的开始、暂停和停止。
supervisor_tutorial
。在场景中添加一个 RectangleArena
作为基础环境,并添加一个 E-puck
机器人。将机器人的 supervisor
字段设置为 TRUE
,这样它就可以作为监督者控制器使用。
监督者控制器是Webots中一个特殊的控制器类型,它继承自Robot类,但具有额外的功能。监督者控制器可以访问场景树、修改对象属性、控制仿真流程等。
from controller import Supervisor
import math
# 创建监督者控制器
supervisor = Supervisor()
# 获取时间步长
TIME_STEP = int(supervisor.getBasicTimeStep())
# 获取机器人节点
epuck = supervisor.getFromDef("EPUCK")
def main():
print("监督者控制器已启动")
while supervisor.step(TIME_STEP) != -1:
# 获取当前仿真时间
current_time = supervisor.getTime()
# 获取机器人位置
if epuck:
position = epuck.getPosition()
print(f"时间: {current_time:.2f}s, 机器人位置: ({position[0]:.3f}, {position[1]:.3f}, {position[2]:.3f})")
# 每5秒输出一次状态
if int(current_time) % 5 == 0:
print(f"仿真运行中... 当前时间: {current_time:.1f}秒")
if __name__ == "__main__":
main()
监督者控制器最强大的功能之一就是能够动态修改场景。你可以添加、删除、修改场景中的对象,改变它们的属性,甚至创建全新的对象。
from controller import Supervisor
import math
class SceneModifier:
def __init__(self, supervisor):
self.supervisor = supervisor
self.root_children_field = supervisor.getRoot().getField("children")
def add_box_to_scene(self, position, size, color):
"""向场景中添加一个盒子"""
# 创建Box节点
box_string = f'''
DEF BOX_{len(self.root_children_field.getMF())} Transform {{
translation {position[0]} {position[1]} {position[2]}
children [
Shape {{
appearance Appearance {{
material Material {{
diffuseColor {color[0]} {color[1]} {color[2]}
}}
}}
geometry Box {{
size {size[0]} {size[1]} {size[2]}
}}
}}
]
}}
'''
# 导入节点到场景
self.supervisor.importMFString(box_string)
print(f"已添加盒子到位置: {position}")
def remove_object_by_def(self, def_name):
"""通过DEF名称删除对象"""
node = self.supervisor.getFromDef(def_name)
if node:
node.remove()
print(f"已删除对象: {def_name}")
else:
print(f"未找到对象: {def_name}")
def change_object_color(self, def_name, new_color):
"""改变对象的颜色"""
node = self.supervisor.getFromDef(def_name)
if node:
# 获取材质字段
material_field = node.getField("material")
if material_field:
# 创建新的材质
new_material_string = f'''
Material {{
diffuseColor {new_color[0]} {new_color[1]} {new_color[2]}
}}
'''
material_field.importMFString(new_material_string)
print(f"已改变对象 {def_name} 的颜色为: {new_color}")
# 使用示例
if __name__ == "__main__":
supervisor = Supervisor()
TIME_STEP = int(supervisor.getBasicTimeStep())
scene_modifier = SceneModifier(supervisor)
# 添加一个红色盒子
scene_modifier.add_box_to_scene([1, 0.5, 1], [0.5, 0.5, 0.5], [1, 0, 0])
# 运行仿真
while supervisor.step(TIME_STEP) != -1:
current_time = supervisor.getTime()
# 每10秒改变盒子颜色
if int(current_time) % 10 == 0 and current_time > 0:
import random
new_color = [random.random(), random.random(), random.random()]
scene_modifier.change_object_color("BOX_0", new_color)
除了基本的对象添加和删除,监督者控制器还支持更复杂的场景修改操作:
class DynamicObstacleSystem:
def __init__(self, supervisor):
self.supervisor = supervisor
self.root_children_field = supervisor.getRoot().getField("children")
self.obstacles = []
self.obstacle_count = 0
self.max_obstacles = 10
def create_moving_obstacle(self, initial_position, velocity, size, color):
"""创建移动的障碍物"""
obstacle_def = f"MOVING_OBSTACLE_{self.obstacle_count}"
# 创建移动障碍物的VRML字符串
obstacle_string = f'''
DEF {obstacle_def} Transform {{
translation {initial_position[0]} {initial_position[1]} {initial_position[2]}
children [
Shape {{
appearance Appearance {{
material Material {{
diffuseColor {color[0]} {color[1]} {color[2]}
}}
}}
geometry Sphere {{
radius {size}
}}
}}
]
}}
'''
# 导入到场景
self.supervisor.importMFString(obstacle_string)
# 记录障碍物信息
obstacle_info = {
'def_name': obstacle_def,
'position': list(initial_position),
'velocity': list(velocity),
'size': size,
'color': list(color)
}
self.obstacles.append(obstacle_info)
self.obstacle_count += 1
print(f"创建移动障碍物: {obstacle_def}")
return obstacle_def
def update_moving_obstacles(self, time_step):
"""更新所有移动障碍物的位置"""
for obstacle in self.obstacles:
# 更新位置
obstacle['position'][0] += obstacle['velocity'][0] * time_step
obstacle['position'][1] += obstacle['velocity'][1] * time_step
obstacle['position'][2] += obstacle['velocity'][2] * time_step
# 边界检查 - 如果超出边界则反弹
if abs(obstacle['position'][0]) > 5:
obstacle['velocity'][0] *= -1
if abs(obstacle['position'][2]) > 5:
obstacle['velocity'][2] *= -1
# 更新场景中的位置
node = self.supervisor.getFromDef(obstacle['def_name'])
if node:
translation_field = node.getField("translation")
if translation_field:
translation_field.setSFVec3f(obstacle['position'])
def create_obstacle_pattern(self, pattern_type="grid"):
"""创建障碍物模式"""
if pattern_type == "grid":
# 创建网格模式的障碍物
for i in range(-2, 3):
for j in range(-2, 3):
if (i + j) % 2 == 0: # 棋盘模式
x = i * 1.5
z = j * 1.5
self.create_moving_obstacle(
[x, 0.5, z],
[0, 0, 0],
0.3,
[0.8, 0.2, 0.2]
)
elif pattern_type == "circle":
# 创建圆形模式的障碍物
import math
for i in range(8):
angle = i * 2 * math.pi / 8
radius = 3
x = radius * math.cos(angle)
z = radius * math.sin(angle)
self.create_moving_obstacle(
[x, 0.5, z],
[0, 0, 0],
0.4,
[0.2, 0.8, 0.2]
)
def remove_oldest_obstacle(self):
"""移除最旧的障碍物"""
if self.obstacles:
oldest = self.obstacles.pop(0)
node = self.supervisor.getFromDef(oldest['def_name'])
if node:
node.remove()
print(f"移除障碍物: {oldest['def_name']}")
self.obstacle_count -= 1
# 使用示例
if __name__ == "__main__":
supervisor = Supervisor()
TIME_STEP = int(supervisor.getBasicTimeStep())
obstacle_system = DynamicObstacleSystem(supervisor)
# 创建一些移动障碍物
obstacle_system.create_moving_obstacle([2, 0.5, 2], [0.5, 0, 0.3], 0.3, [1, 0, 0])
obstacle_system.create_moving_obstacle([-2, 0.5, -2], [-0.3, 0, 0.5], 0.3, [0, 0, 1])
# 创建障碍物模式
obstacle_system.create_obstacle_pattern("grid")
step_count = 0
while supervisor.step(TIME_STEP) != -1:
step_count += 1
# 更新移动障碍物
obstacle_system.update_moving_obstacles(TIME_STEP / 1000.0)
# 每1000步添加新障碍物
if step_count % 1000 == 0 and obstacle_system.obstacle_count < obstacle_system.max_obstacles:
import random
x = random.uniform(-4, 4)
z = random.uniform(-4, 4)
vx = random.uniform(-0.5, 0.5)
vz = random.uniform(-0.5, 0.5)
color = [random.random(), random.random(), random.random()]
obstacle_system.create_moving_obstacle([x, 0.5, z], [vx, 0, vz], 0.25, color)
# 每2000步移除最旧的障碍物
if step_count % 2000 == 0 and obstacle_system.obstacle_count > 5:
obstacle_system.remove_oldest_obstacle()
class EnvironmentConditionSystem:
def __init__(self, supervisor):
self.supervisor = supervisor
self.weather_conditions = ["sunny", "rainy", "foggy", "windy"]
self.current_weather = "sunny"
self.light_intensity = 1.0
self.fog_density = 0.0
def change_weather(self, weather_type):
"""改变天气条件"""
if weather_type in self.weather_conditions:
self.current_weather = weather_type
print(f"天气已改变为: {weather_type}")
# 根据天气调整环境参数
if weather_type == "rainy":
self.light_intensity = 0.6
self.fog_density = 0.2
elif weather_type == "foggy":
self.light_intensity = 0.4
self.fog_density = 0.8
elif weather_type == "windy":
self.light_intensity = 0.8
self.fog_density = 0.1
else: # sunny
self.light_intensity = 1.0
self.fog_density = 0.0
# 应用天气效果到场景
self.apply_weather_effects()
def apply_weather_effects(self):
"""应用天气效果到场景"""
# 获取场景根节点
root = self.supervisor.getRoot()
# 查找或创建WorldInfo节点来设置环境参数
world_info = root.getField("info")
if world_info:
# 更新环境信息
info_string = f'''
WorldInfo {{
title "Weather: {self.current_weather}"
info [
"Light intensity: {self.light_intensity}"
"Fog density: {self.fog_density}"
]
}}
'''
world_info.importMFString(info_string)
def create_weather_effects(self):
"""创建视觉天气效果"""
if self.current_weather == "rainy":
# 创建雨滴效果
self.create_rain_effect()
elif self.current_weather == "foggy":
# 创建雾效果
self.create_fog_effect()
def create_rain_effect(self):
"""创建雨滴效果"""
# 在场景中添加多个小水滴
for i in range(20):
x = (i % 5 - 2) * 2
z = (i // 5 - 2) * 2
y = 5 + (i % 3) * 0.5
raindrop_string = f'''
DEF RAINDROP_{i} Transform {{
translation {x} {y} {z}
children [
Shape {{
appearance Appearance {{
material Material {{
diffuseColor 0.5 0.7 1.0
transparency 0.7
}}
}}
geometry Cylinder {{
height 0.3
radius 0.02
}}
}}
]
}}
'''
self.supervisor.importMFString(raindrop_string)
def create_fog_effect(self):
"""创建雾效果"""
# 添加半透明的雾层
fog_string = '''
DEF FOG_LAYER Transform {
translation 0 2 0
children [
Shape {
appearance Appearance {
material Material {
diffuseColor 0.9 0.9 0.9
transparency 0.6
}
}
geometry Box {
size 20 0.1 20
}
}
]
}
'''
self.supervisor.importMFString(fog_string)
def cycle_weather(self, step_count):
"""循环改变天气"""
if step_count % 5000 == 0: # 每5000步改变一次天气
import random
new_weather = random.choice(self.weather_conditions)
self.change_weather(new_weather)
self.create_weather_effects()
# 使用示例
if __name__ == "__main__":
supervisor = Supervisor()
TIME_STEP = int(supervisor.getBasicTimeStep())
weather_system = EnvironmentConditionSystem(supervisor)
step_count = 0
while supervisor.step(TIME_STEP) != -1:
step_count += 1
# 循环改变天气
weather_system.cycle_weather(step_count)
# 每1000步输出当前天气状态
if step_count % 1000 == 0:
print(f"当前天气: {weather_system.current_weather}")
print(f"光照强度: {weather_system.light_intensity}")
print(f"雾密度: {weather_system.fog_density}")
监督者控制器可以监控整个仿真的状态,包括机器人的性能、环境变化等。通过实时监控,你可以了解仿真的运行情况,并根据需要调整参数。
class PerformanceMonitor:
def __init__(self, supervisor):
self.supervisor = supervisor
self.start_time = supervisor.getTime()
self.start_position = None
self.max_distance = 0
self.total_distance = 0
self.last_position = None
# 获取机器人初始位置
if self.epuck:
self.start_position = self.epuck.getPosition()
self.last_position = self.start_position
def update_metrics(self):
"""更新性能指标"""
if not self.epuck:
return
current_position = self.epuck.getPosition()
current_time = self.supervisor.getTime()
# 计算距离
if self.last_position:
distance = math.sqrt(
(current_position[0] - self.last_position[0])**2 +
(current_position[1] - self.last_position[1])**2
)
self.total_distance += distance
# 计算最大距离
if self.start_position:
current_distance = math.sqrt(
(current_position[0] - self.start_position[0])**2 +
(current_position[1] - self.start_position[1])**2
)
self.max_distance = max(self.max_distance, current_distance)
self.last_position = current_position
# 每100步输出一次统计信息
if int(current_time * 1000) % 6400 == 0: # 每100步
print("仿真时间: {:.2f}s".format(current_time))
print("总移动距离: {:.3f}m".format(self.total_distance))
print("最大距离: {:.3f}m".format(self.max_distance))
print("平均速度: {:.3f}m/s".format(self.total_distance/current_time))
print("-" * 40)
def get_final_report(self):
"""获取最终报告"""
final_time = self.supervisor.getTime()
return {
"total_time": final_time,
"total_distance": self.total_distance,
"max_distance": self.max_distance,
"average_speed": self.total_distance / final_time if final_time > 0 else 0
}
监督者控制器还支持许多高级功能,如数据记录、场景保存、多机器人协调等。
import csv
import os
class DataLogger:
def __init__(self, filename="simulation_data.csv"):
self.filename = filename
self.data = []
# 创建CSV文件并写入表头
with open(filename, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['Time', 'X', 'Y', 'Z', 'Distance', 'Speed'])
def log_data(self, time, position, distance, speed):
"""记录数据"""
self.data.append([time, position[0], position[1], position[2], distance, speed])
# 写入CSV文件
with open(self.filename, 'a', newline='') as file:
writer = csv.writer(file)
writer.writerow([time, position[0], position[1], position[2], distance, speed])
def export_summary(self, summary_data):
"""导出总结报告"""
summary_filename = "simulation_summary.txt"
with open(summary_filename, 'w') as file:
file.write("仿真总结报告\n")
file.write("=" * 30 + "\n")
file.write("总仿真时间: {:.2f} 秒\n".format(summary_data['total_time']))
file.write("总移动距离: {:.3f} 米\n".format(summary_data['total_distance']))
file.write("最大距离: {:.3f} 米\n".format(summary_data['max_distance']))
file.write("平均速度: {:.3f} 米/秒\n".format(summary_data['average_speed']))
print("总结报告已保存到:", summary_filename)
def save_scene_state(self, filename="scene_backup.wbt"):
"""保存场景状态"""
self.supervisor.saveWorld(filename)
print("场景已保存到:", filename)
def load_scene_state(self, filename="scene_backup.wbt"):
"""加载场景状态"""
self.supervisor.loadWorld(filename)
print("场景已从", filename, "加载")
def restart_simulation(self):
"""重启仿真"""
self.supervisor.simulationReset()
print("仿真已重启")
def pause_simulation(self):
"""暂停仿真"""
self.supervisor.simulationSetMode(Supervisor.SIMULATION_MODE_PAUSE)
print("仿真已暂停")
def resume_simulation(self):
"""恢复仿真"""
self.supervisor.simulationSetMode(Supervisor.SIMULATION_MODE_REAL_TIME)
print("仿真已恢复")
class CompleteSupervisor:
def __init__(self):
self.supervisor = Supervisor()
self.time_step = TIME_STEP
self.epuck = self.supervisor.getFromDef("EPUCK")
# 初始化组件
self.monitor = PerformanceMonitor(self.supervisor)
self.logger = DataLogger()
# 仿真参数
self.simulation_duration = 60 # 60秒
self.start_time = self.supervisor.getTime()
def run_complete_simulation(self):
"""运行完整的监督仿真"""
print("开始监督仿真...")
while self.supervisor.step(self.time_step) != -1:
current_time = self.supervisor.getTime()
# 更新监控数据
self.monitor.update_metrics()
# 记录数据
if self.epuck:
position = self.epuck.getPosition()
self.logger.log_data(
current_time,
position,
self.monitor.total_distance,
self.monitor.total_distance / current_time if current_time > 0 else 0
)
# 检查仿真时间
if current_time >= self.simulation_duration:
print("仿真时间达到设定值,正在结束...")
break
# 生成最终报告
final_report = self.monitor.get_final_report()
self.logger.export_summary(final_report)
print("监督仿真完成!")
print("最终报告:", final_report)
# 运行完整的监督者控制器
if __name__ == "__main__":
supervisor = CompleteSupervisor()
supervisor.run_complete_simulation()
恭喜你完成了Webots场景修改与监督者控制教程!你已经学会了:
场景修改和监督者控制器是Webots中非常强大的功能,它们让你能够完全控制仿真环境,创建动态变化的实验场景,实现复杂的实验和分析。这些技能将帮助你创建更加智能、灵活和真实的仿真系统。
你已经完成了Webots场景修改与监督者控制教程!现在你可以开始创建动态变化的仿真项目,结合场景修改和监督者控制技能来构建更加真实和复杂的机器人仿真系统。