前言
因为我司「七月在线」关于dexcap的复现/优化接近尾声了(每月逐步提高复现的效果),故准备把dexcap的源码也分析下,11月下旬则分析下iDP3的源码——为队伍「iDP3人形的复现/优化」助力
最开始,dexcap的源码分析属于此文《DexCap——斯坦福李飞飞团队泡茶机器人:带灵巧手和动作捕捉的数据收集系统(含硬件清单)》的第三部分
然原理讲解、硬件配置、源码解析都放在一篇文章里的话,篇幅会显得特别长,故把源码的部分抽取出来,独立成此文
以下为本文的全文目录
1.1 calculate_offset_vis_calib.py
1.2 data_recording.py——配置多个Realsense T265相机和一个L515相机
1.2.2 RealsenseProcessor 的类:获取多个相机的RGB-D帧和位姿数据
1.3 demo_clipping_3d.py——可视化点云数据且选择每个演示的起始帧/结束帧
1.4 redis_glove_server.py——接收UDP数据包并将手部关节数据存储到Redis数据库
1.5 replay_human_traj_vis.py——可视化保存点云数据和位姿(可用于查看所有帧和校准)
1.6 transform_to_robot_table.py——将可视化点云数据放置在机器人桌面上
1.7.2 DataVisualizer类:用于可视化点云数据和手部关节数据
1.7.3 _load_frame_data:加载给定帧的点云和位姿数据,并进行可视化处理
2.1.1 read_pose_data:读取和处理手部姿态数据(过程中使用 LeapPybulLetIK 计算逆运动学,得到手部关节的目标位置)
2.1.2 process_hdf5:处理多个数据集(手部姿态/图像/点云),作为模型的训练数据集
2.2 pybullet_ik_bimanual.py——含点云生成、计算逆运动学compute_IK的实现
2.2.1 导入库、类的定义和初始化、获取网格点云的方法等等
2.2.2 create_target_vis、update_target_vis、rest_target_vis的实现
2.2.3 compute_IK的实现:用于计算双手的逆运动学IK——使用PyBullet来模拟手部关节的位置和方向
第一部分 STEP1_collect_data
1.1 calculate_offset_vis_calib.py
这段代码是一个用于校准和保存偏移量、方向偏移量的脚本,用于从指定目录中读取数据,计算偏移量和方向偏移量,并将结果保存到指定目录中,以下是代码的部分解释
- 导入库
- import argparse # 用于解析命令行参数
- import numpy as np # 用于数值计算
- import os # 用于操作系统相关功能
- import sys # 用于系统相关功能
- from scipy.spatial.transform import Rotation as R # 用于旋转矩阵和欧拉角转换
- from transforms3d.euler import euler2mat # 用于将欧拉角转换为旋转矩阵
- 主函数
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="calibrate and save in dataset folder") # 创建命令行参数解析器
- parser.add_argument("--directory", type=str, default="", help="Directory with saved data") # 添加目录参数
- parser.add_argument("--default", type=str, default="default_offset", help="Directory with saved data") # 添加默认目录参数
- args = parser.parse_args() # 解析命令行参数
- if os.path.exists("{}/calib_offset.txt".format(args.directory)): # 检查目标目录中是否已经存在calib_offset.txt文件
- response = (
- input(
- f"calib_offset.txt already exists. Do you want to override? (y/n): " # 提示用户是否覆盖文件
- )
- .strip()
- .lower()
- )
- if response != "y": # 如果用户选择不覆盖,退出程序
- print("Exiting program without overriding the existing directory.")
- sys.exit()
- 读取默认偏移量和方向偏移量
- default_offset = np.loadtxt(os.path.join(args.default, "calib_offset.txt")) # 读取默认偏移量
- default_ori = np.loadtxt(os.path.join(args.default, "calib_ori_offset.txt")) # 读取默认方向偏移量
- default_offset_left = np.loadtxt(os.path.join(args.default, "calib_offset_left.txt")) # 读取左手默认偏移量
- default_ori_left = np.loadtxt(os.path.join(args.default, "calib_ori_offset_left.txt")) # 读取左手默认方向偏移量
- default_ori_matrix = euler2mat(*default_ori) # 将默认方向偏移量转换为旋转矩阵
- default_ori_matrix_left = euler2mat(*default_ori_left) # 将左手默认方向偏移量转换为旋转矩阵
- 提取偏移量和方向偏移量
- frame_dirs = os.listdir("./test_data") # 列出test_data目录中的所有文件
- list = [] # 存储偏移量的列表
- list_ori = [] # 存储方向偏移量的列表
- list_left = [] # 存储左手偏移量的列表
- list_ori_left = [] # 存储左手方向偏移量的列表
- for frame_dir in frame_dirs: #
1.2 data_recording.py——配置多个Realsense T265相机和一个L515相机
这段代码是一个用于记录和处理Realsense相机数据的Python脚本
顺带再回顾一下
DexCap团队设计了一个装载摄像机的背包「如图(a)、(b)所示,为方便大家对照,特把上图再贴一下,如下」
- 在正前面,它通过胸部摄像机支架的4个插槽集成了4个相机,最顶部是一台Intel Realsense L515 RGB-D LiDAR摄像机,顶部下面是3个Realsense T265鱼眼SLAM跟踪相机(分别为红、绿、蓝),用于在人类数据收集过程中捕捉观察结果
1.2.1 一系列定义:比如保存帧数据的函数
它可以捕捉颜色图像、深度图像、姿态数据以及手部关节数据,并将这些数据保存到指定的目录中
- 首先,导入库
- """
- 示例用法
- python data_recording.py -s --store_hand -o ./save_data_scenario_1
- """
- import argparse # 用于解析命令行参数
- import copy # 用于复制对象
- import numpy as np # 用于数值计算
- import open3d as o3d # 用于3D数据处理
- import os # 用于操作系统相关功能
- import shutil # 用于文件操作
- import sys # 用于系统相关功能
- import pyrealsense2 as rs # 用于Realsense相机操作
- import cv2 # 用于图像处理
- from enum import IntEnum # 用于定义枚举类型
- from realsense_helper import get_profiles # 导入自定义的Realsense帮助函数
- from transforms3d.quaternions import axangle2quat, qmult, quat2mat, mat2quat # 用于四元数操作
- import redis # 用于Redis数据库操作
- import concurrent.futures # 用于并发执行
- from hyperparameters import* # 导入超参数
- 其次,定义一个枚举类型 Preset,用于表示不同的预设配置
- class Preset(IntEnum):
- Custom = 0
- Default = 1
- Hand = 2
- HighAccuracy = 3
- HighDensity = 4
- MediumDensity = 5
- 然后,保存帧数据的函数
- def save_frame(
- frame_id,
- out_directory,
- color_buffer,
- depth_buffer,
- pose_buffer,
- pose2_buffer,
- pose3_buffer,
- rightHandJoint_buffer,
- leftHandJoint_buffer,
- rightHandJointOri_buffer,
- leftHandJointOri_buffer,
- save_hand,
- ):
- frame_directory = os.path.join(out_directory, f"frame_{frame_id}") # 创建帧目录
- os.makedirs(frame_directory, exist_ok=True) # 如果目录不存在则创建
- cv2.imwrite(
- os.path.join(frame_directory, "color_image.jpg"),
- color_buffer[frame_id][:, :, ::-1], # 保存颜色图像
- )
- cv2.imwrite(
- os.path.join(frame_directory, "depth_image.png"), depth_buffer[frame_id] # 保存深度图像
- )
- np.savetxt(os.path.join(frame_directory, "pose.txt"), pose_buffer[frame_id]) # 保存姿态数据
- np.savetxt(os.path.join(frame_directory, "pose_2.txt"), pose2_buffer[frame_id]) # 保存第二个姿态数据
- np.savetxt(os.path.join(frame_directory, "pose_3.txt"), pose3_buffer[frame_id]) # 保存第三个姿态数据
- if save_hand: # 如果需要保存手部数据
- np.savetxt(
- os.path.join(frame_directory, "right_hand_joint.txt"),
- rightHandJoint_buffer[frame_id], # 保存右手关节数据
- )
- np.savetxt(
- os.path.join(frame_directory, "left_hand_joint.txt"),
- leftHandJoint_buffer[frame_id], # 保存左手关节数据
- )
- np.savetxt(
- os.path.join(frame_directory, "right_hand_joint_ori.txt"),
- rightHandJointOri_buffer[frame_id], # 保存右手关节方向数据
- )
- np.savetxt(
- os.path.join(frame_directory, "left_hand_joint_ori.txt"),
- leftHandJointOri_buffer[frame_id], # 保存左手关节方向数据
- )
- return f"frame {frame_id + 1} saved" # 返回保存帧的消息
1.2.2 RealsenseProcessor 的类:获取多个相机的RGB-D帧和位姿数据
接下来,定义了一个名为 RealsenseProcessor 的类,用于处理Realsense相机的数据。它可以配置多个Realsense T265相机和一个L515相机,获取RGB-D帧和位姿数据,并可视化和存储这些数据
以下是对RealsenseProcessor类的详细解读:
- 类定义和初始化
初始化方法接受多个参数,用于配置T265相机的序列号、总帧数、是否存储帧、输出目录、是否保存手部数据和是否启用可视化
初始化各种缓冲区,用于存储彩色图像、 深度图像、位姿数据,和手部关节数据(包含左右两手的关节位置、关节方向)- class RealsesneProcessor: # 定义Realsense处理器类
- def __init__( # 初始化方法
- self,
- first_t265_serial, # 第一个T265相机的序列号
- second_t265_serial, # 第二个T265相机的序列号
- thrid_t265_serial, # 第三个T265相机的序列号
- total_frame, # 总帧数
- store_frame=False, # 是否存储帧,默认为False
- out_directory=None, # 输出目录,默认为None
- save_hand=False, # 是否保存手部数据,默认为False
- enable_visualization=True, # 是否启用可视化,默认为True
- ):
- self.first_t265_serial = first_t265_serial # 初始化第一个T265相机的序列号
- self.second_t265_serial = second_t265_serial # 初始化第二个T265相机的序列号
- self.thrid_t265_serial = thrid_t265_serial # 初始化第三个T265相机的序列号
- self.store_frame = store_frame # 初始化是否存储帧
- self.out_directory = out_directory # 初始化输出目录
- self.total_frame = total_frame # 初始化总帧数
- self.save_hand = save_hand # 初始化是否保存手部数据
- self.enable_visualization = enable_visualization # 初始化是否启用可视化
- self.rds = None # 初始化Redis连接
- self.color_buffer = [] # 初始化彩色图像缓冲区
- self.depth_buffer = [] # 初始化深度图像缓冲区
- self.pose_buffer = [] # 初始化第一个T265相机的位姿缓冲区
- self.pose2_buffer = [] # 初始化第二个T265相机的位姿缓冲区
- self.pose3_buffer = [] # 初始化第三个T265相机的位姿缓冲区
- self.pose2_image_buffer = [] # 初始化第二个T265相机的图像缓冲区
- self.pose3_image_buffer = [] # 初始化第三个T265相机的图像缓冲区
- self.rightHandJoint_buffer = [] # 初始化右手关节位置缓冲区
- self.leftHandJoint_buffer = [] # 初始化左手关节位置缓冲区
- self.rightHandJointOri_buffer = [] # 初始化右手关节方向缓冲区
- self.leftHandJointOri_buffer = [] # 初始化左手关节方向缓冲区
- 获取T265相机配置
具体方法是根据T265相机的序列号和管道配置,返回一个配置对象- def get_rs_t265_config(self, t265_serial, t265_pipeline):
- t265_config = rs.config()
- t265_config.enable_device(t265_serial)
- t265_config.enable_stream(rs.stream.pose)
- return t265_config
- 配置流
该方法配置并启动多个Realsense相机的流,包括一个L515相机和三个T265相机
如果启用了手部数据保存功能,则连接到Redis服务器
配置并启动L515相机的深度和彩色流- def configure_stream(self): # 配置流的方法
- # 连接到Redis服务器
- if self.save_hand: # 如果启用了手部数据保存功能
- self.rds = redis.Redis(host="localhost", port=6669, db=0) # 连接到本地Redis服务器
配置并启动三个T265相机的位姿流,具体而言- # 创建一个管道
- self.pipeline = rs.pipeline() # 创建一个Realsense管道
- config = rs.config() # 创建一个配置对象
- color_profiles, depth_profiles = get_profiles() # 获取彩色和深度流的配置文件
- w, h, fps, fmt = depth_profiles[1] # 获取深度流的宽度、高度、帧率和格式
- config.enable_stream(rs.stream.depth, w, h, fmt, fps) # 启用深度流
- w, h, fps, fmt = color_profiles[18] # 获取彩色流的宽度、高度、帧率和格式
- config.enable_stream(rs.stream.color, w, h, fmt, fps) # 启用彩色流
先配置
- # 配置第一个T265相机的流
- ctx = rs.context() # 创建一个Realsense上下文
- self.t265_pipeline = rs.pipeline(ctx) # 创建一个T265管道
- t265_config = rs.config() # 创建一个T265配置对象
- t265_config.enable_device(self.first_t265_serial) # 启用第一个T265相机
- # 配置第二个T265相机的流
- ctx_2 = rs.context() # 创建另一个Realsense上下文
- self.t265_pipeline_2 = rs.pipeline(ctx_2) # 创建第二个T265管道
- t265_config_2 = self.get_rs_t265_config(
- self.second_t265_serial, self.t265_pipeline_2
- ) # 获取第二个T265相机的配置
- # 配置第三个T265相机的流
- ctx_3 = rs.context() # 创建第三个Realsense上下文
- self.t265_pipeline_3 = rs.pipeline(ctx_3) # 创建第三个T265管道
- t265_config_3 = self.get_rs_t265_config(
- self.thrid_t265_serial, self.t265_pipeline_3
- ) # 获取第三个T265相机的配置
再启动
如果启用了可视化功能,则初始化Open3D可视化器- self.t265_pipeline.start(t265_config) # 启动第一个T265管道
- self.t265_pipeline_2.start(t265_config_2) # 启动第二个T265管道
- self.t265_pipeline_3.start(t265_config_3) # 启动第三个T265管道
- pipeline_profile = self.pipeline.start(config) # 启动L515管道并获取配置文件
- depth_sensor = pipeline_profile.get_device().first_depth_sensor() # 获取深度传感器
- depth_sensor.set_option(rs.option.visual_preset, Preset.HighAccuracy) # 设置深度传感器的选项为高精度
- self.depth_scale = depth_sensor.get_depth_scale() # 获取深度比例
- align_to = rs.stream.color # 对齐到彩色流
- self.align = rs.align(align_to) # 创建对齐对象
- self.vis = None # 初始化可视化器
- if self.enable_visualization: # 如果启用了可视化功能
- self.vis = o3d.visualization.Visualizer() # 创建Open3D可视化器
- self.vis.create_window() # 创建可视化窗口
- self.vis.get_view_control().change_field_of_view(step=1.0) # 改变视野
- 获取RGB-D帧——get_rgbd_frame_from_realsense
该方法从Realsense相机获取RGB-D顾,并将深度帧与彩色帧对齐
如果启用了可视化功能,则将深度图像和彩色图像转换为Open3D的RGBD图像对象- def get_rgbd_frame_from_realsense(self, enable_visualization=False): # 从Realsense获取RGB-D帧的方法
- frames = self.pipeline.wait_for_frames() # 等待获取帧数据
- # 将深度帧对齐到彩色帧
- aligned_frames = self.align.process(frames) # 对齐帧数据
- # 获取对齐后的帧
- aligned_depth_frame = aligned_frames.get_depth_frame() # 获取对齐后的深度帧
- color_frame = aligned_frames.get_color_frame() # 获取对齐后的彩色帧
- depth_image = (
- np.asanyarray(aligned_depth_frame.get_data()) // 4
- ) # 将深度帧数据转换为numpy数组,并除以4以获得以米为单位的深度值(适用于L515相机)
- color_image = np.asanyarray(color_frame.get_data()) # 将彩色帧数据转换为numpy数组
- rgbd = None # 初始化RGBD图像对象
- if enable_visualization: # 如果启用了可视化功能
- depth_image_o3d = o3d.geometry.Image(depth_image) # 将深度图像转换为Open3D图像对象
- color_image_o3d = o3d.geometry.Image(color_image) # 将彩色图像转换为Open3D图像对象
- rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(
- color_image_o3d, # 彩色图像
- depth_image_o3d, # 深度图像
- depth_trunc=4.0, # 深度截断值,超过此值的深度将被忽略
- convert_rgb_to_intensity=False, # 是否将RGB图像转换为强度图像
- ) # 创建RGBD图像对象
- return rgbd, depth_image, color_image # 返回RGBD图像、深度图像和彩色图像
- 通过frame_to_pose_conversion方法,实现帧到位姿的转换
- @staticmethod # 静态方法装饰器
- def frame_to_pose_conversion(input_t265_frames): # 定义帧到位姿转换的方法
- pose_frame = input_t265_frames.get_pose_frame() # 获取位姿帧
- pose_data = pose_frame.get_pose_data() # 获取位姿数据
- pose_3x3 = quat2mat( # 将四元数转换为3x3旋转矩阵
- np.array(
- [
- pose_data.rotation.w, # 四元数的w分量
- pose_data.rotation.x, # 四元数的x分量
- pose_data.rotation.y, # 四元数的y分量
- pose_data.rotation.z, # 四元数的z分量
- ]
- )
- )
- pose_4x4 = np.eye(4) # 创建4x4单位矩阵
- pose_4x4[:3, :3] = pose_3x3 # 将3x3旋转矩阵赋值给4x4矩阵的左上角
- pose_4x4[:3, 3] = [ # 将平移向量赋值给4x4矩阵的右上角
- pose_data.translation.x, # 位姿的x平移分量
- pose_data.translation.y, # 位姿的y平移分量
- pose_data.translation.z, # 位姿的z平移分量
- ]
- return pose_4x4 # 返回4x4位姿矩阵
- 处理帧
该方法处理每一帧数据,首先通过get_rgbd_frame_from_realsense获取三个T265相机的RGB-D帧数据,然后通过frame_to_pose_conversion,把帧数据换成位姿数据
如果启用了手部数据保存功能,则从Redis服务器获取手部关节数据- def process_frame(self): # 定义处理帧的方法
- frame_count = 0 # 初始化帧计数器
- first_frame = True # 标记是否为第一帧
- try:
- while frame_count < self.total_frame: # 循环处理每一帧,直到达到总帧数
- t265_frames = self.t265_pipeline.wait_for_frames() # 获取第一个T265相机的帧数据
- t265_frames_2 = self.t265_pipeline_2.wait_for_frames() # 获取第二个T265相机的帧数据
- t265_frames_3 = self.t265_pipeline_3.wait_for_frames() # 获取第三个T265相机的帧数据
- rgbd, depth_frame, color_frame = self.get_rgbd_frame_from_realsense() # 获取RGB-D帧数据
- # 获取第一个T265相机的位姿数据
- pose_4x4 = RealsesneProcessor.frame_to_pose_conversion(
- input_t265_frames=t265_frames
- )
- # 获取第二个T265相机的位姿数据
- pose_4x4_2 = RealsesneProcessor.frame_to_pose_conversion(
- input_t265_frames=t265_frames_2
- )
- # 获取第三个T265相机的位姿数据
- pose_4x4_3 = RealsesneProcessor.frame_to_pose_conversion(
- input_t265_frames=t265_frames_3
- )
将位姿数据转换为4x4矩阵,并应用校正矩阵- if self.save_hand: # 如果启用了手部数据保存功能
- # 获取左手关节位置数据
- leftHandJointXyz = np.frombuffer(
- self.rds.get("rawLeftHandJointXyz"), dtype=np.float64
- ).reshape(21, 3)
- # 获取右手关节位置数据
- rightHandJointXyz = np.frombuffer(
- self.rds.get("rawRightHandJointXyz"), dtype=np.float64
- ).reshape(21, 3)
- # 获取左手关节方向数据
- leftHandJointOrientation = np.frombuffer(
- self.rds.get("rawLeftHandJointOrientation"), dtype=np.float64
- ).reshape(21, 4)
- # 获取右手关节方向数据
- rightHandJointOrientation = np.frombuffer(
- self.rds.get("rawRightHandJointOrientation"), dtype=np.float64
- ).reshape(21, 4)
且转换为Open3D格式的L515相机内参corrected_pose = pose_4x4 @ between_cam # 应用校正矩阵
如果是第一帧,则初始化Open3D的点云和坐标系,并添加到可视化器中- # 转换为Open3D格式的L515相机内参
- o3d_depth_intrinsic = o3d.camera.PinholeCameraIntrinsic(
- 1280,
- 720,
- 898.2010498046875,
- 897.86669921875,
- 657.4981079101562,
- 364.30950927734375,
- )
如果不是第一帧,则更新点云和坐标系的位姿- if first_frame: # 如果是第一帧
- if self.enable_visualization: # 如果启用了可视化功能
- pcd = o3d.geometry.PointCloud.create_from_rgbd_image(
- rgbd, o3d_depth_intrinsic
- ) # 创建点云
- pcd.transform(corrected_pose) # 应用校正矩阵
- rgbd_mesh = o3d.geometry.TriangleMesh.create_coordinate_frame(
- size=0.3
- ) # 创建RGBD坐标系
- rgbd_mesh.transform(corrected_pose) # 应用校正矩阵
- rgbd_previous_pose = copy.deepcopy(corrected_pose) # 复制校正矩阵
- chest_mesh = o3d.geometry.TriangleMesh.create_coordinate_frame(
- size=0.3
- ) # 创建胸部坐标系
- chest_mesh.transform(pose_4x4) # 应用位姿矩阵
- chest_previous_pose = copy.deepcopy(pose_4x4) # 复制位姿矩阵
- left_hand_mesh = (
- o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.3)
- ) # 创建左手坐标系
- left_hand_mesh.transform(pose_4x4_2) # 应用位姿矩阵
- left_hand_previous_pose = copy.deepcopy(pose_4x4_2) # 复制位姿矩阵
- right_hand_mesh = (
- o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.3)
- ) # 创建右手坐标系
- right_hand_mesh.transform(pose_4x4_3) # 应用位姿矩阵
- right_hand_previous_pose = copy.deepcopy(pose_4x4_3) # 复制位姿矩阵
- self.vis.add_geometry(pcd) # 添加点云到可视化器
- self.vis.add_geometry(rgbd_mesh) # 添加RGBD坐标系到可视化器
- self.vis.add_geometry(chest_mesh) # 添加胸部坐标系到可视化器
- self.vis.add_geometry(left_hand_mesh) # 添加左手坐标系到可视化器
- self.vis.add_geometry(right_hand_mesh) # 添加右手坐标系到可视化器
- view_params = (
- self.vis.get_view_control().convert_to_pinhole_camera_parameters()
- ) # 获取视图参数
- first_frame = False # 标记为非第一帧
再更新可视化器- else:
- if self.enable_visualization: # 如果启用了可视化功能
- new_pcd = o3d.geometry.PointCloud.create_from_rgbd_image(
- rgbd, o3d_depth_intrinsic
- ) # 创建新的点云
- new_pcd.transform(corrected_pose) # 应用校正矩阵
- rgbd_mesh.transform(np.linalg.inv(rgbd_previous_pose)) # 逆变换上一个RGBD坐标系
- rgbd_mesh.transform(corrected_pose) # 应用校正矩阵
- rgbd_previous_pose = copy.deepcopy(corrected_pose) # 复制校正矩阵
- chest_mesh.transform(np.linalg.inv(chest_previous_pose)) # 逆变换上一个胸部坐标系
- chest_mesh.transform(pose_4x4) # 应用位姿矩阵
- chest_previous_pose = copy.deepcopy(pose_4x4) # 复制位姿矩阵
- left_hand_mesh.transform(np.linalg.inv(left_hand_previous_pose)) # 逆变换上一个左手坐标系
- left_hand_mesh.transform(pose_4x4_2) # 应用位姿矩阵
- left_hand_previous_pose = copy.deepcopy(pose_4x4_2) # 复制位姿矩阵
- right_hand_mesh.transform(
- np.linalg.inv(right_hand_previous_pose)
- ) # 逆变换上一个右手坐标系
- right_hand_mesh.transform(pose_4x4_3) # 应用位姿矩阵
- right_hand_previous_pose = copy.deepcopy(pose_4x4_3) # 复制位姿矩阵
- pcd.points = new_pcd.points # 更新点云的点
- pcd.colors = new_pcd.colors # 更新点云的颜色
- self.vis.update_geometry(pcd) # 更新点云几何
- self.vis.update_geometry(rgbd_mesh) # 更新RGBD坐标系几何
- self.vis.update_geometry(chest_mesh) # 更新胸部坐标系几何
- self.vis.update_geometry(left_hand_mesh) # 更新左手坐标系几何
- self.vis.update_geometry(right_hand_mesh) # 更新右手坐标系几何
- self.vis.get_view_control().convert_from_pinhole_camera_parameters(
- view_params
- ) # 恢复视图参数
如果启用了帧存储功能,则将深度图像、彩色图像和位姿数据存储到缓冲区中- if self.enable_visualization: # 如果启用了可视化功能
- self.vis.poll_events() # 处理可视化事件
- self.vis.update_renderer() # 更新渲染器
处理完所有帧后,停止所有相机的流,并保存所有帧数据 - 主函数
主函数解析命令行参数,创建 RealsenseProcessor 对象,并配置和处理帧数据
如果输出目录已存在,提示用户是否覆盖目录- import concurrent.futures # 导入并发库,用于多线程处理
- def main(args): # 定义主函数,接受命令行参数
- realsense_processor = RealsesneProcessor( # 创建Realsense处理器对象
- first_t265_serial="11622110012", # 第一个T265相机的序列号
- second_t265_serial="909212110944", # 第二个T265相机的序列号
- thrid_t265_serial="929122111181", # 第三个T265相机的序列号
- total_frame=10000, # 总帧数
- store_frame=args.store_frame, # 是否存储帧
- out_directory=args.out_directory, # 输出目录
- save_hand=args.store_hand, # 是否保存手部数据
- enable_visualization=args.enable_vis, # 是否启用可视化
- )
- realsense_processor.configure_stream() # 配置Realsense流
- realsense_processor.process_frame() # 处理帧数据
- if __name__ == "__main__": # 主程序入口
- # 设置命令行参数解析器
- parser = argparse.ArgumentParser(description="Process frames and save data.")
- parser.add_argument(
- "-s",
- "--store_frame",
- action="store_true",
- help="Flag to indicate whether to store frames", # 是否存储帧的标志
- )
- parser.add_argument(
- "--store_hand",
- action="store_true",
- help="Flag to indicate whether to store hand joint position and orientation", # 是否保存手部关节位置和方向的标志
- )
- parser.add_argument(
- "-v",
- "--enable_vis",
- action="store_true",
- help="Flag to indicate whether to enable open3d visualization", # 是否启用Open3D可视化的标志
- )
- parser.add_argument(
- "-o",
- "--out_directory",
- type=str,
- help="Output directory for saved data", # 保存数据的输出目录
- default="./saved_data", # 默认输出目录为./saved_data
- )
- args = parser.parse_args() # 解析命令行参数
如果启用了帧存储功能,则创建输出目录- # 检查输出目录是否存在
- if os.path.exists(args.out_directory):
- response = (
- input(
- f"{args.out_directory} already exists. Do you want to override? (y/n): "
- )
- .strip()
- .lower()
- )
- if response != "y": # 如果用户选择不覆盖,退出程序
- print("Exiting program without overriding the existing directory.")
- sys.exit()
- else:
- shutil.rmtree(args.out_directory) # 如果用户选择覆盖,删除现有目录
调用 main 函数开始处理帧数据- if args.store_frame:
- os.makedirs(args.out_directory, exist_ok=True) # 创建输出目录
- # 如果用户选择覆盖,删除现有目录
- main(args) # 调用主函数
1.3 demo_clipping_3d.py——可视化点云数据且选择每个演示的起始帧/结束帧
这段代码是一个用于可视化点云数据(PCD文件),并选择每个演示的起始帧、结束帧的脚本,以下是详细解读
- 导入库
导入各种库和模块,用于处理命令行参数、文件操作、点云数据处理、键盘事件监听等- import argparse # 用于解析命令行参数
- import os # 用于操作系统相关功能
- import copy # 用于复制对象
- import zmq # 用于消息传递
- import cv2 # 用于图像处理
- import sys # 用于系统相关功能
- import json # 用于处理JSON数据
- import shutil # 用于文件操作
- import open3d as o3d # 用于3D数据处理
- import numpy as np # 用于数值计算
- import platform # 用于获取操作系统信息
- from pynput import keyboard # 用于监听键盘事件
- from transforms3d.quaternions import qmult, quat2mat # 用于四元数操作
- from transforms3d.axangles import axangle2mat # 用于轴角转换
- from scipy.spatial.transform import Rotation # 用于旋转矩阵和欧拉角转换
- from transforms3d.euler import quat2euler, mat2euler, quat2mat, euler2mat # 用于欧拉角和矩阵转换
- from visualizer import * # 导入自定义的可视化模块
- 定义全局变量,用于存储剪辑标记、当前剪辑、是否切换到下一帧或上一帧的标志
- clip_marks = [] # 存储剪辑标记
- current_clip = {} # 存储当前剪辑
- next_frame = False # 标记是否切换到下一帧
- previous_frame = False # 标记是否切换到上一帧
- 定义键盘事件处理函数,用于处理不同的键盘输入:
Key.up:保存剪辑标记到 clip_marks. json 文件
Key.down:标记切换到上一帧
Key.page_down :标记切换到下一帧
Key.end:标记当前剪辑的结束帧
Key.insert:标记当前剪辑的起始帧- def on_press(key): # 定义键盘按下事件处理函数
- global next_frame, previous_frame, delta_movement_accu, delta_ori_accu, delta_movement_accu_left, delta_ori_accu_left, adjust_movement, adjust_right, frame, step, dataset_folder, clip_marks, current_clip
- # 确定操作系统类型
- os_type = platform.system()
- assert os_type == "Windows" # 仅支持Windows系统
- frame_folder = 'frame_{}'.format(frame) # 当前帧文件夹名称
- # Windows特定的键绑定在AttributeError部分处理
- if key == keyboard.Key.up: # y正方向
- with open(os.path.join(dataset_folder, 'clip_marks.json'), 'w') as f:
- json.dump(clip_marks, f, indent=4) # 保存剪辑标记到JSON文件
- elif key == keyboard.Key.down: # y负方向
- previous_frame = True # 切换到上一帧
- elif key == keyboard.Key.page_down:
- next_frame = True # 切换到下一帧
- elif key == keyboard.Key.end:
- if 'start' in current_clip.keys():
- print("end", frame_folder)
- current_clip['end'] = frame_folder # 标记当前剪辑的结束帧
- clip_marks.append(current_clip) # 添加当前剪辑到剪辑标记列表
- current_clip = {} # 重置当前剪辑
- elif key == keyboard.Key.insert:
- print("start", frame_folder)
- current_clip['start'] = frame_folder # 标记当前剪辑的起始帧
- else:
- print("Key error", key) # 处理其他键的错误
- 数据可视化类
定义数据可祝化类 ReplayDatavisualizer,继承自DataVisualizer.
replay_frames 方法用于可视化单帧数据,并处理键盘事件以切换帧- class ReplayDataVisualizer(DataVisualizer): # 定义数据重放可视化类,继承自DataVisualizer
- def __init__(self, directory):
- super().__init__(directory) # 调用父类的初始化方法
- def replay_frames(self): # 定义重放帧的方法
- """
- 可视化单帧数据
- """
- global delta_movement_accu, delta_ori_accu, next_frame, previous_frame, frame
- if self.R_delta_init is None:
- self.initialize_canonical_frame() # 初始化标准帧
- self._load_frame_data(frame) # 加载当前帧数据
- self.vis.add_geometry(self.pcd) # 添加点云数据到可视化器
- self.vis.add_geometry(self.coord_frame_1) # 添加坐标系1到可视化器
- self.vis.add_geometry(self.coord_frame_2) # 添加坐标系2到可视化器
- self.vis.add_geometry(self.coord_frame_3) # 添加坐标系3到可视化器
- for joint in self.left_joints + self.right_joints:
- self.vis.add_geometry(joint) # 添加关节数据到可视化器
- for cylinder in self.left_line_set + self.right_line_set:
- self.vis.add_geometry(cylinder) # 添加连线数据到可视化器
- next_frame = True # 初始化为下一帧
- try:
- with keyboard.Listener(on_press=on_press) as listener: # 监听键盘事件
- while True:
- if next_frame == True:
- next_frame = False
- frame += 10 # 切换到下一帧
- if previous_frame == True:
- previous_frame = False
- frame -= 10 # 切换到上一帧
- self._load_frame_data(frame) # 加载当前帧数据
- self.step += 1 # 增加步数
- self.vis.update_geometry(self.pcd) # 更新点云数据
- self.vis.update_geometry(self.coord_frame_1) # 更新坐标系1
- self.vis.update_geometry(self.coord_frame_2) # 更新坐标系2
- self.vis.update_geometry(self.coord_frame_3) # 更新坐标系3
- for joint in self.left_joints + self.right_joints:
- self.vis.update_geometry(joint) # 更新关节数据
- for cylinder in self.left_line_set + self.right_line_set:
- self.vis.update_geometry(cylinder) # 更新连线数据
- self.vis.poll_events() # 处理可视化事件
- self.vis.update_renderer() # 更新渲染器
- listener.join() # 等待监听器结束
- finally:
- print("cumulative_correction ", self.cumulative_correction) # 打印累计修正值
- 主函数
主函数解析命令行参数,获取数据目录
检查数据目录是否存在,如果存在 clip_marks. json 文件,询问用户是否覆盖- if __name__ == "__main__": # 主程序入口
- parser = argparse.ArgumentParser(description="Visualize saved frame data.") # 创建命令行参数解析器
- parser.add_argument("--directory", type=str, default="./saved_data", help="Directory with saved data") # 添加目录参数
- args = parser.parse_args() # 解析命令行参数
初始化 ReplayDatavisualizer 对象,并加载校准偏移量和方向偏移量- assert os.path.exists(args.directory), f"given directory: {args.directory} not exists" # 确认目录存在
- if os.path.exists(os.path.join(args.directory, 'clip_marks.json')): # 检查剪辑标记文件是否存在
- response = (
- input(
- f"clip_marks.json already exists. Do you want to override? (y/n): "
- )
- .strip()
- .lower()
- )
- if response != "y":
- print("Exiting program without overriding the existing directory.") # 如果用户选择不覆盖,退出程序
- sys.exit()
调用replay_frames 方法开始可视化和处理键盛事件- dataset_folder = args.directory # 设置数据集文件夹
- visualizer = ReplayDataVisualizer(args.directory) # 创建数据重放可视化器对象
- visualizer.right_hand_offset = np.loadtxt("{}/calib_offset.txt".format(args.directory)) # 加载右手偏移量
- visualizer.right_hand_ori_offset = np.loadtxt("{}/calib_ori_offset.txt".format(args.directory)) # 加载右手方向偏移量
- visualizer.left_hand_offset = np.loadtxt("{}/calib_offset_left.txt".format(args.directory)) # 加载左手偏移量
- visualizer.left_hand_ori_offset = np.loadtxt("{}/calib_ori_offset_left.txt".format(args.directory)) # 加载左手方向偏移量
visualizer.replay_frames() # 开始重放帧
1.4 redis_glove_server.py——接收UDP数据包并将手部关节数据存储到Redis数据库
这段代码是一个用于接收UDP数据包并将手部关节数据存储到Redis数据库的Python脚本
所谓UDP (User Datagram Protocol,用户数据报协议),其是一种简单的、面向无连接的传输层协议
与TCP (Transmission Control Protocol, 传输控制协议)不同,UDP不提供可靠性、数据包顺序和流量控制等功能。UDP主要用于 需要快速传输且对数据丢失不敏感的应用场景,例如视频流、在线游戏和实时通信等
以下是代码的详细解读:
- 导入库
- import socket # 用于网络通信
- import json # 用于处理JSON数据
- import redis # 用于连接Redis数据库
- import numpy as np # 用于数值计算
- 初始化Redis连接
- # 初始化Redis连接
- redis_host = "localhost" # Redis服务器主机名
- redis_port = 6669 # Redis服务器端口
- redis_password = "" # Redis服务器密码,如果没有密码则保持为空字符串
- r = redis.StrictRedis(
- host=redis_host, port=redis_port, password=redis_password, decode_responses=True
- ) # 创建Redis连接对象
- 定义手部关节名称
- # 定义左手和右手的关节名称
- left_hand_joint_names = ["leftHand",
- 'leftThumbProximal', 'leftThumbMedial', 'leftThumbDistal', 'leftThumbTip',
- 'leftIndexProximal', 'leftIndexMedial', 'leftIndexDistal', 'leftIndexTip',
- 'leftMiddleProximal', 'leftMiddleMedial', 'leftMiddleDistal', 'leftMiddleTip',
- 'leftRingProximal', 'leftRingMedial', 'leftRingDistal', 'leftRingTip',
- 'leftLittleProximal', 'leftLittleMedial', 'leftLittleDistal', 'leftLittleTip']
- right_hand_joint_names = ["rightHand",
- 'rightThumbProximal', 'rightThumbMedial', 'rightThumbDistal', 'rightThumbTip',
- 'rightIndexProximal', 'rightIndexMedial', 'rightIndexDistal', 'rightIndexTip',
- 'rightMiddleProximal', 'rightMiddleMedial', 'rightMiddleDistal', 'rightMiddleTip',
- 'rightRingProximal', 'rightRingMedial', 'rightRingDistal', 'rightRingTip',
- 'rightLittleProximal', 'rightLittleMedial', 'rightLittleDistal', 'rightLittleTip']
- 归一化函数
- def normalize_wrt_middle_proximal(hand_positions, is_left=True): # 定义相对于中指近端关节的归一化函数
- middle_proximal_idx = left_hand_joint_names.index('leftMiddleProximal') # 获取左手中指近端关节的索引
- if not is_left:
- middle_proximal_idx = right_hand_joint_names.index('rightMiddleProximal') # 获取右手中指近端关节的索引
- wrist_position = hand_positions[0] # 获取手腕位置
- middle_proximal_position = hand_positions[middle_proximal_idx] # 获取中指近端关节位置
- bone_length = np.linalg.norm(wrist_position - middle_proximal_position) # 计算手腕到中指近端关节的骨骼长度
- normalized_hand_positions = (middle_proximal_position - hand_positions) / bone_length # 归一化手部关节位置
- return normalized_hand_positions # 返回归一化后的手部关节位置
- 启动服务器函数
创建并绑定UDP套接字
无限循环接收UDP数据包- def start_server(port): # 定义启动服务器的函数,接受端口号作为参数
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 使用SOCK_DGRAM创建UDP套接字
- s.bind(("192.168.0.200", port)) # 绑定套接字到指定的IP地址和端口
- print(f"Server started, listening on port {port} for UDP packets...") # 打印服务器启动信息
解析接收到的数据包,提取手部关节位置和方向数据- while True: # 无限循环,持续接收数据
- data, address = s.recvfrom(64800) # 接收UDP数据包,最大数据包大小为64800字节
- decoded_data = data.decode() # 解码数据
- # 尝试解析JSON数据
- try:
- received_json = json.loads(decoded_data) # 解析JSON数据
- # 初始化数组以存储手部关节位置和方向
- left_hand_positions = np.zeros((21, 3)) # 初始化左手关节位置数组,21个关节,每个关节3个坐标
- right_hand_positions = np.zeros((21, 3)) # 初始化右手关节位置数组,21个关节,每个关节3个坐标
- left_hand_orientations = np.zeros((21, 4)) # 初始化左手关节方向数组,21个关节,每个关节4个方向分量
- right_hand_orientations = np.zeros((21, 4)) # 初始化右手关节方向数组,21个关节,每个关节4个方向分量
计算相对于中指近端关节的相对距离,并进行归一化- # 遍历JSON数据以提取手部关节位置和方向
- for joint_name in left_hand_joint_names: # 遍历左手关节名称列表
- joint_data = received_json["scene"]["actors"][0]["body"][joint_name] # 获取关节数据
- joint_position = np.array(list(joint_data["position"].values())) # 获取关节位置
- joint_rotation = np.array(list(joint_data["rotation"].values())) # 获取关节方向
- left_hand_positions[left_hand_joint_names.index(joint_name)] = joint_position # 存储关节位置
- left_hand_orientations[left_hand_joint_names.index(joint_name)] = joint_rotation # 存储关节方向
- for joint_name in right_hand_joint_names: # 遍历右手关节名称列表
- joint_data = received_json["scene"]["actors"][0]["body"][joint_name] # 获取关节数据
- joint_position = np.array(list(joint_data["position"].values())) # 获取关节位置
- joint_rotation = np.array(list(joint_data["rotation"].values())) # 获取关节方向
- right_hand_positions[right_hand_joint_names.index(joint_name)] = joint_position # 存储关节位置
- right_hand_orientations[right_hand_joint_names.index(joint_name)] = joint_rotation # 存储关节方向
将原始和归一化后的手部关节数据存储到Redis数据库中- # 计算相对于中指近端关节的相对距离,并归一化
- left_middle_proximal_idx = left_hand_joint_names.index('leftMiddleProximal') # 获取左手中指近端关节的索引
- right_middle_proximal_idx = right_hand_joint_names.index('rightMiddleProximal') # 获取右手中指近端关节的索引
- left_wrist_position = left_hand_positions[0] # 获取左手手腕位置
- right_wrist_position = right_hand_positions[0] # 获取右手手腕位置
- left_middle_proximal_position = left_hand_positions[left_middle_proximal_idx] # 获取左手中指近端关节位置
- right_middle_proximal_position = right_hand_positions[right_middle_proximal_idx] # 获取右手中指近端关节位置
- left_bone_length = np.linalg.norm(left_wrist_position - left_middle_proximal_position) # 计算左手骨骼长度
- right_bone_length = np.linalg.norm(right_wrist_position - right_middle_proximal_position) # 计算右手骨骼长度
- normalized_left_hand_positions = (left_middle_proximal_position - left_hand_positions) / left_bone_length # 归一化左手关节位置
- normalized_right_hand_positions = (right_middle_proximal_position - right_hand_positions) / right_bone_length # 归一化右手关节位置
打印接收到的手部关节位置数据- r.set("leftHandJointXyz", np.array(normalized_left_hand_positions).astype(np.float64).tobytes()) # 将归一化后的左手关节位置存储到Redis
- r.set("rightHandJointXyz", np.array(normalized_right_hand_positions).astype(np.float64).tobytes()) # 将归一化后的右手关节位置存储到Redis
- r.set("rawLeftHandJointXyz", np.array(left_hand_positions).astype(np.float64).tobytes()) # 将原始左手关节位置存储到Redis
- r.set("rawRightHandJointXyz", np.array(right_hand_positions).astype(np.float64).tobytes()) # 将原始右手关节位置存储到Redis
- r.set("rawLeftHandJointOrientation", np.array(left_hand_orientations).astype(np.float64).tobytes()) # 将原始左手关节方向存储到Redis
- r.set("rawRightHandJointOrientation", np.array(right_hand_orientations).astype(np.float64).tobytes()) # 将原始右手关节方向存储到Redis
- print("\n\n")
- print("=" * 50)
- print(np.round(left_hand_positions, 3)) # 打印左手关节位置
- print("-"*50)
- print(np.round(right_hand_positions, 3)) # 打印右手关节位置
- except json.JSONDecodeError: # 捕获JSON解析错误
- print("Invalid JSON received:") # 打印错误信息
- 主程序入口
- if __name__ == "__main__": # 主程序入口
- start_server(14551) # 启动服务器,监听端口14551
1.5 replay_human_traj_vis.py——可视化保存点云数据和位姿(可用于查看所有帧和校准)
1.6 transform_to_robot_table.py——将可视化点云数据放置在机器人桌面上
1.7 visualizer.py——可视化手部关节数据
1.7.1 一系列定义:比如五个手指等
- 导入各种库和模块,用于处理命令行参数、文件操作、点云数据处理、键盘事件监听等
- 定义手部关节之间的连接线,用于可视化手部骨架
- lines = np.array([
- # 拇指
- [1, 2], [2, 3], [3, 4],
- # 食指
- [5, 6], [6, 7], [7, 8],
- # 中指
- [9, 10], [10, 11], [11, 12],
- # 无名指
- [13, 14], [14, 15], [15, 16],
- # 小指
- [17, 18], [18, 19], [19, 20],
- # 连接近端关节
- [1, 5], [5, 9], [9, 13], [13, 17],
- # 连接手掌
- [0, 1], [17, 0]
- ])
- 定义一系列全局变量,用于存储累积的校正、当前帧、步长等信息
- delta_movement_accu = np.array([0.0, 0.0, 0.0]) # 累积的位移校正
- delta_ori_accu = np.array([0.0, 0.0, 0.0]) # 累积的方向校正
- delta_movement_accu_left = np.array([0.0, 0.0, 0.0]) # 左手累积的位移校正
- delta_ori_accu_left = np.array([0.0, 0.0, 0.0]) # 左手累积的方向校正
- adjust_movement = True # 是否调整位移
- adjust_right = True # 是否调整右手
- next_frame = False # 是否切换到下一帧
- frame = 0 # 当前帧
- step = 0.01 # 步长
- fixed_transform = np.array([0.0, 0.0, 0.0]) # 固定变换
- 一些辅助函数
将手腕位置平移到原点
应用位姿矩阵- def translate_wrist_to_origin(joint_positions): # 将手腕位置平移到原点
- wrist_position = joint_positions[0] # 获取手腕位置
- updated_positions = joint_positions - wrist_position # 平移所有关节位置
- return updated_positions # 返回平移后的关节位置
创建或更新圆柱体- def apply_pose_matrix(joint_positions, pose_matrix): # 应用位姿矩阵
- homogeneous_joint_positions = np.hstack([joint_positions, np.ones((joint_positions.shape[0], 1))]) # 将关节位置转换为齐次坐标
- transformed_positions = np.dot(homogeneous_joint_positions, pose_matrix.T) # 应用位姿矩阵
- transformed_positions_3d = transformed_positions[:, :3] # 提取3D坐标
- return transformed_positions_3d # 返回变换后的关节位置
- def create_or_update_cylinder(start, end, radius=0.003, cylinder_list=None, cylinder_idx=-1): # 创建或更新圆柱体
- cyl_length = np.linalg.norm(end - start) # 计算圆柱体的长度
- new_cylinder = o3d.geometry.TriangleMesh.create_cylinder(radius=radius, height=cyl_length, resolution=20, split=4) # 创建新的圆柱体
- new_cylinder.paint_uniform_color([1, 0, 0]) # 将圆柱体涂成红色
- new_cylinder.translate(np.array([0, 0, cyl_length / 2])) # 平移圆柱体
- direction = end - start # 计算方向向量
- direction /= np.linalg.norm(direction) # 归一化方向向量
- up = np.array([0, 0, 1]) # 圆柱体的默认向上向量
- rotation_axis = np.cross(up, direction) # 计算旋转轴
- rotation_angle = np.arccos(np.dot(up, direction)) # 计算旋转角度
- if np.linalg.norm(rotation_axis) != 0: # 如果旋转轴不为零
- rotation_axis /= np.linalg.norm(rotation_axis) # 归一化旋转轴
- rotation_matrix = o3d.geometry.get_rotation_matrix_from_axis_angle(rotation_axis * rotation_angle) # 计算旋转矩阵
- new_cylinder.rotate(rotation_matrix, center=np.array([0, 0, 0])) # 旋转圆柱体
- new_cylinder.translate(start) # 平移圆柱体到起始位置
- if cylinder_list[cylinder_idx] is not None: # 如果圆柱体列表中已有圆柱体
- cylinder_list[cylinder_idx].vertices = new_cylinder.vertices # 更新顶点
- cylinder_list[cylinder_idx].triangles = new_cylinder.triangles # 更新三角形
- cylinder_list[cylinder_idx].vertex_normals = new_cylinder.vertex_normals # 更新顶点法线
- cylinder_list[cylinder_idx].vertex_colors = new_cylinder.vertex_colors # 更新顶点颜色
- else:
- cylinder_list[cylinder_idx] = new_cylinder # 添加新的圆柱体到列表中
1.7.2 DataVisualizer类:用于可视化点云数据和手部关节数据
接下来,定义了一个名为DataVisualizer的类,用于可视化点云数据和手部关节数据
- 类定义和初始化
- class DataVisualizer: # 定义DataVisualizer类
- def __init__(self, directory): # 初始化方法,接受数据目录作为参数
- self.directory = directory # 初始化数据目录
- self.base_pcd = None # 初始化基础点云对象
- self.pcd = None # 初始化点云对象
- self.img_backproj = None # 初始化图像反投影对象
- self.coord_frame_1 = None # 初始化坐标系1
- self.coord_frame_2 = None # 初始化坐标系2
- self.coord_frame_3 = None # 初始化坐标系3
- self.right_hand_offset = None # 初始化右手偏移量
- self.right_hand_ori_offset = None # 初始化右手方向偏移量
- self.left_hand_offset = None # 初始化左手偏移量
- self.left_hand_ori_offset = None # 初始化左手方向偏移量
- self.pose1_prev = np.eye(4) # 初始化第一个位姿矩阵
- self.pose2_prev = np.eye(4) # 初始化第二个位姿矩阵
- self.pose3_prev = np.eye(4) # 初始化第三个位姿矩阵
- self.vis = o3d.visualization.Visualizer() # 创建Open3D可视化器
- self.vis.create_window() # 创建可视化窗口
- self.vis.get_view_control().change_field_of_view(step=1.0) # 改变视野
- self.between_cam = np.eye(4) # 初始化相机之间的变换矩阵
- self.between_cam[:3, :3] = np.array([[1.0, 0.0, 0.0],
- [0.0, -1.0, 0.0],
- [0.0, 0.0, -1.0]])
- self.between_cam[:3, 3] = np.array([0.0, 0.076, 0.0]) # 设置平移部分
- self.between_cam_2 = np.eye(4) # 初始化第二个相机之间的变换矩阵
- self.between_cam_2[:3, :3] = np.array([[1.0, 0.0, 0.0],
- [0.0, 1.0, 0.0],
- [0.0, 0.0, 1.0]])
- self.between_cam_2[:3, 3] = np.array([0.0, -0.032, 0.0]) # 设置平移部分
- self.between_cam_3 = np.eye(4) # 初始化第三个相机之间的变换矩阵
- self.between_cam_3[:3, :3] = np.array([[1.0, 0.0, 0.0],
- [0.0, 1.0, 0.0],
- [0.0, 0.0, 1.0]])
- self.between_cam_3[:3, 3] = np.array([0.0, -0.064, 0.0]) # 设置平移部分
- self.canonical_t265_ori = None # 初始化标准T265方向
- # 可视化左手21个关节
- self.left_joints = [] # 初始化左手关节列表
- self.right_joints = [] # 初始化右手关节列表
- self.left_line_set = [None for _ in lines] # 初始化左手连线列表
- self.right_line_set = [None for _ in lines] # 初始化右手连线列表
- for i in range(21): # 遍历21个关节
- for joint in [self.left_joints, self.right_joints]: # 遍历左手和右手关节列表
- # 为指尖和手腕创建较大的球体,为其他关节创建较小的球体
- radius = 0.011 if i in [0, 4, 8, 12, 16, 20] else 0.007
- sphere = o3d.geometry.TriangleMesh.create_sphere(radius=radius) # 创建球体
- # 将手腕和近端关节涂成绿色
- if i in [0, 1, 5, 9, 13, 17]:
- sphere.paint_uniform_color([0, 1, 0])
- # 将指尖涂成红色
- elif i in [4, 8, 12, 16, 20]:
- sphere.paint_uniform_color([1, 0, 0])
- # 将其他关节涂成蓝色
- else:
- sphere.paint_uniform_color([0, 0, 1])
- # 将拇指涂成粉色
- if i in [1, 2, 3, 4]:
- sphere.paint_uniform_color([1, 0, 1])
- joint.append(sphere) # 将球体添加到关节列表中
- self.vis.add_geometry(sphere) # 将球体添加到可视化器中
- self.step = 0 # 初始化步数
- self.distance_buffer = [] # 初始化距离缓冲区
- self.R_delta_init = None # 初始化旋转矩阵
- self.cumulative_correction = np.array([0.0, 0.0, 0.0]) # 初始化累计修正值
- 初始化标准帧的方法
- def initialize_canonical_frame(self): # 初始化标准帧的方法
- if self.R_delta_init is None: # 如果旋转矩阵未初始化
- self._load_frame_data(0) # 加载第0帧数据
- pose_ori_matirx = self.pose3_prev[:3, :3] # 获取第三个位姿的旋转矩阵
- pose_ori_correction_matrix = np.dot(np.array([[0, -1, 0],
- [0, 0, 1],
- [1, 0, 0]]), euler2mat(0, 0, 0)) # 计算修正矩阵
- pose_ori_matirx = np.dot(pose_ori_matirx, pose_ori_correction_matrix) # 应用修正矩阵
- self.canonical_t265_ori = np.array([[1, 0, 0],
- [0, -1, 0],
- [0, 0, -1]]) # 初始化标准T265方向
- x_angle, y_angle, z_angle = mat2euler(self.pose3_prev[:3, :3]) # 将旋转矩阵转换为欧拉角
- self.canonical_t265_ori = np.dot(self.canonical_t265_ori, euler2mat(-z_angle, x_angle + 0.3, y_angle)) # 应用欧拉角修正
- self.R_delta_init = np.dot(self.canonical_t265_ori, pose_ori_matirx.T) # 计算初始旋转矩阵
- 重放关键帧校准的方法
- def replay_keyframes_calibration(self):
- """
- 可视化单帧
- """
- global delta_movement_accu, delta_ori_accu, next_frame, frame
- if self.R_delta_init is None:
- self.initialize_canonical_frame() # 初始化标准帧
- self._load_frame_data(frame) # 加载当前帧数据
- self.vis.add_geometry(self.pcd) # 添加点云数据到可视化器
- self.vis.add_geometry(self.coord_frame_1) # 添加坐标系1到可视化器
- self.vis.add_geometry(self.coord_frame_2) # 添加坐标系2到可视化器
- self.vis.add_geometry(self.coord_frame_3) # 添加坐标系3到可视化器
- for joint in self.left_joints + self.right_joints:
- self.vis.add_geometry(joint) # 添加关节数据到可视化器
- for cylinder in self.left_line_set + self.right_line_set:
- self.vis.add_geometry(cylinder) # 添加连线数据到可视化器
- next_frame = True # 初始化为下一帧
- try:
- with keyboard.Listener(on_press=on_press) as listener: # 监听键盘事件
- while True:
- if next_frame == True:
- next_frame = False
- frame += 10 # 切换到下一帧
- self._load_frame_data(frame) # 加载当前帧数据
- self.step += 1 # 增加步数
- self.vis.update_geometry(self.pcd) # 更新点云数据
- self.vis.update_geometry(self.coord_frame_1) # 更新坐标系1
- self.vis.update_geometry(self.coord_frame_2) # 更新坐标系2
- self.vis.update_geometry(self.coord_frame_3) # 更新坐标系3
- for joint in self.left_joints + self.right_joints:
- self.vis.update_geometry(joint) # 更新关节数据
- for cylinder in self.left_line_set + self.right_line_set:
- self.vis.update_geometry(cylinder) # 更新连线数据
- self.vis.poll_events() # 处理可视化事件
- self.vis.update_renderer() # 更新渲染器
- listener.join() # 等待监听器结束
- finally:
- print("cumulative_correction ", self.cumulative_correction) # 打印累计修正值
- 重放所有帧的方法
- def replay_all_frames(self):
- """
- 连续可视化所有帧
- """
- try:
- if self.R_delta_init is None:
- self.initialize_canonical_frame() # 初始化标准帧
- frame = 0 # 初始化帧计数器
- first_frame = True # 标记是否为第一帧
- while True:
- if not self._load_frame_data(frame): # 加载当前帧数据
- break # 如果无法加载数据,退出循环
- if first_frame:
- self.vis.add_geometry(self.pcd) # 添加点云数据到可视化器
- self.vis.add_geometry(self.coord_frame_1) # 添加坐标系1到可视化器
- self.vis.add_geometry(self.coord_frame_2) # 添加坐标系2到可视化器
- self.vis.add_geometry(self.coord_frame_3) # 添加坐标系3到可视化器
- for joint in self.left_joints + self.right_joints:
- self.vis.add_geometry(joint) # 添加关节数据到可视化器
- for cylinder in self.left_line_set + self.right_line_set:
- self.vis.add_geometry(cylinder) # 添加连线数据到可视化器
- else:
- self.vis.update_geometry(self.pcd) # 更新点云数据
- self.vis.update_geometry(self.coord_frame_1) # 更新坐标系1
- self.vis.update_geometry(self.coord_frame_2) # 更新坐标系2
- self.vis.update_geometry(self.coord_frame_3) # 更新坐标系3
- for joint in self.left_joints + self.right_joints:
- self.vis.update_geometry(joint) # 更新关节数据
- for cylinder in self.left_line_set + self.right_line_set:
- self.vis.update_geometry(cylinder) # 更新连线数据
- self.vis.poll_events() # 处理可视化事件
- self.vis.update_renderer() # 更新渲染器
- if first_frame:
- view_params = self.vis.get_view_control().convert_to_pinhole_camera_parameters() # 获取视图参数
- else:
- self.vis.get_view_control().convert_from_pinhole_camera_parameters(view_params) # 恢复视图参数
- self.step += 1 # 增加步数
- frame += 5 # 增加帧计数器
- if first_frame:
- first_frame = False # 标记为非第一帧
- finally:
- self.vis.destroy_window() # 销毁可视化窗口
- 反投影点的方法
- def _back_project_point(self, point, intrinsics):
- """ 将单个点从3D反投影到2D图像空间 """
- x, y, z = point
- fx, fy = intrinsics[0, 0], intrinsics[1, 1]
- cx, cy = intrinsics[0, 2], intrinsics[1, 2]
- u = (x * fx / z) + cx
- v = (y * fy / z) + cy
- return int(u), int(v)
1.7.3 _load_frame_data:加载给定帧的点云和位姿数据,并进行可视化处理
最后是加载帧数据的方法,即_load_frame_data这个方法,用于加载给定帧的点云和位姿数据,并进行可视化处理
- 方法定义和参数说明
- def _load_frame_data(self, frame, vis_2d=False, load_table_points=False):
- """
- Load point cloud and poses for a given frame
- @param frame: frame count in integer
- @return whether we can successfully load all data from frame subdirectory
- """
- global delta_movement_accu, delta_ori_accu, delta_movement_accu_left, delta_ori_accu_left # 全局变量,用于存储累积的平移和旋转校正
- print(f"frame {frame}") # 打印当前帧编号
- if adjust_movement: # 如果启用了平移校正
- print("adjusting translation") # 打印平移校正信息
- else:
- print("adjusting rotation") # 打印旋转校正信息
- 初始化最顶部的L515相机内参
- # L515:
- o3d_depth_intrinsic = o3d.camera.PinholeCameraIntrinsic(
- 1280, 720,
- 898.2010498046875,
- 897.86669921875,
- 657.4981079101562,
- 364.30950927734375) # 初始化L515相机的内参
- 处理桌面点云数据
- if load_table_points: # 如果需要加载桌面点云数据
- table_color_image_o3d = o3d.io.read_image(os.path.join(self.table_frame, "frame_0", "color_image.jpg")) # 读取桌面彩色图像
- table_depth_image_o3d = o3d.io.read_image(os.path.join(self.table_frame, "frame_0", "depth_image.png")) # 读取桌面深度图像
- table_rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(table_color_image_o3d, table_depth_image_o3d, depth_trunc=4.0,
- convert_rgb_to_intensity=False) # 创建RGBD图像
- table_pose_4x4 = np.loadtxt(os.path.join(self.table_frame, "frame_0", "pose.txt")) # 读取桌面位姿
- table_corrected_pose = table_pose_4x4 @ self.between_cam # 应用校正矩阵
- self.table_pcd = o3d.geometry.PointCloud.create_from_rgbd_image(table_rgbd, o3d_depth_intrinsic) # 创建点云
- self.table_pcd.transform(table_corrected_pose) # 应用校正矩阵
- 加载当前帧数据
- frame_dir = os.path.join(self.directory, f"frame_{frame}") # 当前帧目录
- color_image_o3d = o3d.io.read_image(os.path.join(frame_dir, "color_image.jpg")) # 读取彩色图像
- depth_image_o3d = o3d.io.read_image(os.path.join(frame_dir, "depth_image.png")) # 读取深度图像
- rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(color_image_o3d, depth_image_o3d, depth_trunc=4.0,
- convert_rgb_to_intensity=False) # 创建RGBD图像
- pose_4x4 = np.loadtxt(os.path.join(frame_dir, "pose.txt")) # 读取位姿
- if load_table_points:
- pose_4x4[:3, 3] += fixed_transform.T # 应用固定变换
- corrected_pose = pose_4x4 @ self.between_cam # 应用校正矩阵
- 检查位姿文件是否存在
- pose_path = os.path.join(frame_dir, "pose.txt") # 位姿文件路径
- pose_2_path = os.path.join(frame_dir, "pose_2.txt") # 第二个位姿文件路径
- pose_3_path = os.path.join(frame_dir, "pose_3.txt") # 第三个位姿文件路径
- if not all(os.path.exists(path) for path in [pose_path, pose_2_path, pose_3_path]): # 检查所有位姿文件是否存在
- return False # 如果有文件不存在,返回False
- 创建或更新点云
- if self.pcd is None: # 如果点云对象为空
- self.pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, o3d_depth_intrinsic) # 创建点云
- self.pcd.transform(corrected_pose) # 应用校正矩阵
- else:
- new_pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, o3d_depth_intrinsic) # 创建新的点云
- new_pcd.transform(corrected_pose) # 应用校正矩阵
- self.pcd.points = new_pcd.points # 更新点云的点
- self.pcd.colors = new_pcd.colors # 更新点云的颜色
- 加载并校正位姿
- pose_1 = np.loadtxt(pose_path) # 读取第一个位姿
- if load_table_points:
- pose_1[:3, 3] += fixed_transform.T # 应用固定变换
- pose_1 = pose_1 @ self.between_cam # 应用校正矩阵
- pose_2 = np.loadtxt(pose_2_path) # 读取第二个位姿
- if load_table_points:
- pose_2[:3, 3] += fixed_transform.T # 应用固定变换
- pose_2 = pose_2 @ self.between_cam_2 # 应用校正矩阵
- pose_3 = np.loadtxt(pose_3_path) # 读取第三个位姿
- if load_table_points:
- pose_3[:3, 3] += fixed_transform.T # 应用固定变换
- pose_3 = pose_3 @ self.between_cam_3 # 应用校正矩阵
- 创建或更新坐标系
- if self.coord_frame_1 is None: # 如果坐标系1为空
- self.coord_frame_1 = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1) # 创建坐标系1
- self.coord_frame_2 = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1) # 创建坐标系2
- self.coord_frame_3 = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1) # 创建坐标系3
- self.coord_frame_1 = self.coord_frame_1.transform(np.linalg.inv(self.pose1_prev)) # 逆变换上一个位姿
- self.coord_frame_1 = self.coord_frame_1.transform(pose_1) # 应用当前位姿
- self.pose1_prev = copy.deepcopy(pose_1) # 复制当前位姿
- self.coord_frame_2 = self.coord_frame_2.transform(np.linalg.inv(self.pose2_prev)) # 逆变换上一个位姿
- self.coord_frame_2 = self.coord_frame_2.transform(pose_2) # 应用当前位姿
- self.pose2_prev = copy.deepcopy(pose_2) # 复制当前位姿
- self.coord_frame_3 = self.coord_frame_3.transform(np.linalg.inv(self.pose3_prev)) # 逆变换上一个位姿
- self.coord_frame_3 = self.coord_frame_3.transform(pose_3) # 应用当前位姿
- self.pose3_prev = copy.deepcopy(pose_3) # 复制当前位姿
- 加载并处理左手关节数据
- # left hand, read from joint
- left_hand_joint_xyz = np.loadtxt(os.path.join(frame_dir, "left_hand_joint.txt")) # 读取左手关节位置
- self.left_hand_joint_xyz = left_hand_joint_xyz # 存储左手关节位置
- left_hand_joint_xyz = translate_wrist_to_origin(left_hand_joint_xyz) # 平移手腕到原点
- left_hand_joint_ori = np.loadtxt(os.path.join(frame_dir, "left_hand_joint_ori.txt"))[0] # 读取左手关节方向
- self.left_hand_wrist_ori = left_hand_joint_ori # 存储左手手腕方向
- left_rotation_matrix = Rotation.from_quat(left_hand_joint_ori).as_matrix().T # 计算旋转矩阵
- left_hand_joint_xyz_reshaped = left_hand_joint_xyz[:, :, np.newaxis] # 重塑关节位置
- left_transformed_joint_xyz = np.matmul(left_rotation_matrix, left_hand_joint_xyz_reshaped) # 应用旋转矩阵
- left_hand_joint_xyz = left_transformed_joint_xyz[:, :, 0] # 更新关节位置
- left_hand_joint_xyz[:, -1] = -left_hand_joint_xyz[:, -1] # z轴反转
- rotation_matrix = axangle2mat(np.array([0, 1, 0]), -np.pi * 1 / 2) # 绕y轴旋转
- left_hand_joint_xyz = np.dot(left_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
- rotation_matrix = axangle2mat(np.array([1, 0, 0]), np.pi * 1 / 2) # 绕x轴旋转
- left_hand_joint_xyz = np.dot(left_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
- rotation_matrix = axangle2mat(np.array([0, 0, 1]), -np.pi * 1 / 2) # 绕z轴旋转
- left_hand_joint_xyz = np.dot(left_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
- left_hand_joint_xyz = np.dot(left_hand_joint_xyz, euler2mat(*self.left_hand_ori_offset).T) # 应用欧拉角校正
- left_hand_joint_xyz = np.dot(left_hand_joint_xyz, euler2mat(*delta_ori_accu_left).T) # 应用累积旋转校正
- left_hand_joint_xyz += self.left_hand_offset # 应用平移校正
- left_hand_joint_xyz += delta_movement_accu_left # 应用累积平移校正
- left_hand_joint_xyz = apply_pose_matrix(left_hand_joint_xyz, pose_2) # 应用位姿矩阵
- 设置左手关节球体和连线
- # set joint sphere and lines
- for i, sphere in enumerate(self.left_joints): # 遍历左手关节球体
- transformation = np.eye(4) # 创建4x4单位矩阵
- transformation[:3, 3] = left_hand_joint_xyz[i] - sphere.get_center() # 计算平移向量
- sphere.transform(transformation) # 应用平移变换
- for i, (x, y) in enumerate(lines): # 遍历连线
- start = self.left_joints[x].get_center() # 获取起点
- end = self.left_joints[y].get_center() # 获取终点
- create_or_update_cylinder(start, end, cylinder_list=self.left_line_set, cylinder_idx=i) # 创建或更新圆柱体
- 加载并处理右手关节数据
- # right hand, read from joint
- right_hand_joint_xyz = np.loadtxt(os.path.join(frame_dir, "right_hand_joint.txt")) # 读取右手关节位置
- self.right_hand_joint_xyz = right_hand_joint_xyz # 存储右手关节位置
- right_hand_joint_xyz = translate_wrist_to_origin(right_hand_joint_xyz) # 平移手腕到原点
- right_hand_joint_ori = np.loadtxt(os.path.join(frame_dir, "right_hand_joint_ori.txt"))[0] # 读取右手关节方向
- self.right_hand_wrist_ori = right_hand_joint_ori # 存储右手手腕方向
- right_rotation_matrix = Rotation.from_quat(right_hand_joint_ori).as_matrix().T # 计算旋转矩阵
- right_joint_xyz_reshaped = right_hand_joint_xyz[:, :, np.newaxis] # 重塑关节位置
- right_transformed_joint_xyz = np.matmul(right_rotation_matrix, right_joint_xyz_reshaped) # 应用旋转矩阵
- right_hand_joint_xyz = right_transformed_joint_xyz[:, :, 0] # 更新关节位置
- right_hand_joint_xyz[:, -1] = -right_hand_joint_xyz[:, -1] # z轴反转
- rotation_matrix = axangle2mat(np.array([0, 1, 0]), -np.pi * 1 / 2) # 绕y轴旋转
- right_hand_joint_xyz = np.dot(right_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
- rotation_matrix = axangle2mat(np.array([1, 0, 0]), np.pi * 1 / 2) # 绕x轴旋转
- right_hand_joint_xyz = np.dot(right_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
- rotation_matrix = axangle2mat(np.array([0, 0, 1]), -np.pi * 1 / 2) # 绕z轴旋转
- right_hand_joint_xyz = np.dot(right_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
- right_hand_joint_xyz = np.dot(right_hand_joint_xyz, euler2mat(*self.right_hand_ori_offset).T) # 应用欧拉角校正
- right_hand_joint_xyz = np.dot(right_hand_joint_xyz, euler2mat(*delta_ori_accu).T) # 应用累积旋转校正
- right_hand_joint_xyz += self.right_hand_offset # 应用平移校正
- right_hand_joint_xyz += delta_movement_accu # 应用累积平移校正
- right_hand_joint_xyz = apply_pose_matrix(right_hand_joint_xyz, pose_3) # 应用位姿矩阵
- 可视化2D图像
- if vis_2d: # 如果启用了2D可视化
- color_image = np.asarray(rgbd.color) # 获取彩色图像
- color_image = cv2.cvtColor(color_image, cv2.COLOR_RGB2BGR) # 转换颜色空间
- left_hand_joint_points_homogeneous = np.hstack((left_hand_joint_xyz, np.ones((left_hand_joint_xyz.shape[0], 1)))) # 转换为齐次坐标
- left_hand_transformed_points_homogeneous = np.dot(left_hand_joint_points_homogeneous, np.linalg.inv(corrected_pose).T) # 应用逆变换
- left_hand_points_to_project = left_hand_transformed_points_homogeneous[:, :3] / left_hand_transformed_points_homogeneous[:, [3]] # 归一化
- left_hand_back_projected_points = [self._back_project_point(point, o3d_depth_intrinsic.intrinsic_matrix) for point in left_hand_points_to_project] # 反投影
- for i in range(len(left_hand_back_projected_points)): # 遍历左手关节点
- u, v = left_hand_back_projected_points[i] # 获取投影点坐标
- if i in [0, 1, 5, 9, 13, 17]:
- cv2.circle(color_image, (u, v), 10, (0, 255, 0), -1) # 绘制绿色圆点
- elif i in [4, 8, 12, 16, 20]:
- cv2.circle(color_image, (u, v), 10, (255, 0, 0), -1) # 绘制蓝色圆点
- else:
- cv2.circle(color_image, (u, v), 10, (0, 0, 255), -1) # 绘制红色圆点
- if i in [1, 2, 3, 4]:
- cv2.circle(color_image, (u, v), 10, (255, 0, 255), -1) # 绘制紫色圆点
- right_hand_joint_points_homogeneous = np.hstack((right_hand_joint_xyz, np.ones((right_hand_joint_xyz.shape[0], 1)))) # 转换为齐次坐标
- right_hand_transformed_points_homogeneous = np.dot(right_hand_joint_points_homogeneous, np.linalg.inv(corrected_pose).T) # 应用逆变换
- right_hand_points_to_project = right_hand_transformed_points_homogeneous[:, :3] / right_hand_transformed_points_homogeneous[:, [3]] # 归一化
- right_hand_back_projected_points = [self._back_project_point(point, o3d_depth_intrinsic.intrinsic_matrix) for point in right_hand_points_to_project] # 反投影
- for i in range(len(right_hand_back_projected_points)): # 遍历右手关节点
- u, v = right_hand_back_projected_points[i] # 获取投影点坐标
- if i in [0, 1, 5, 9, 13, 17]:
- cv2.circle(color_image, (u, v), 10, (0, 255, 0), -1) # 绘制绿色圆点
- elif i in [4, 8, 12, 16, 20]:
- cv2.circle(color_image, (u, v), 10, (255, 0, 0), -1) # 绘制蓝色圆点
- else:
- cv2.circle(color_image, (u, v), 10, (0, 0, 255), -1) # 绘制红色圆点
- if i in [1, 2, 3, 4]:
- cv2.circle(color_image, (u, v), 10, (255, 0, if vis_2d: # 如果启用了2D可视化
- color_image = np.asarray(rgbd.color) # 获取彩色图像
- color_image = cv2.cvtColor(color_image, cv2.COLOR_RGB2BGR) # 转换颜色空间
- left_hand_joint_points_homogeneous = np.hstack((left_hand_joint_xyz, np.ones((left_hand_joint_xyz.shape[0], 1)))) # 转换为齐次坐标
- left_hand_transformed_points_homogeneous = np.dot(left_hand_joint_points_homogeneous, np.linalg.inv(corrected_pose).T) # 应用逆变换
- left_hand_points_to_project = left_hand_transformed_points_homogeneous[:, :3] / left_hand_transformed_points_homogeneous[:, [3]] # 归一化
- left_hand_back_projected_points = [self._back_project_point(point, o3d_depth_intrinsic.intrinsic_matrix) for point in left_hand_points_to_project] # 反投影
- for i in range(len(left_hand_back_projected_points)): # 遍历左手关节点
- u, v = left_hand_back_projected_points[i] # 获取投影点坐标
- if i in [0, 1, 5, 9, 13, 17]:
- cv2.circle(color_image, (u, v), 10, (0, 255, 0), -1) # 绘制绿色圆点
- elif i in [4, 8, 12, 16, 20]:
- cv2.circle(color_image, (u, v), 10, (255, 0, 0), -1) # 绘制蓝色圆点
- else:
- cv2.circle(color_image, (u, v), 10, (0, 0, 255), -1) # 绘制红色圆点
- if i in [1, 2, 3, 4]:
- cv2.circle(color_image, (u, v), 10, (255, 0, 255), -1) # 绘制紫色圆点
- right_hand_joint_points_homogeneous = np.hstack((right_hand_joint_xyz, np.ones((right_hand_joint_xyz.shape[0], 1)))) # 转换为齐次坐标
- right_hand_transformed_points_homogeneous = np.dot(right_hand_joint_points_homogeneous, np.linalg.inv(corrected_pose).T) # 应用逆变换
- right_hand_points_to_project = right_hand_transformed_points_homogeneous[:, :3] / right_hand_transformed_points_homogeneous[:, [3]] # 归一化
- right_hand_back_projected_points = [self._back_project_point(point, o3d_depth_intrinsic.intrinsic_matrix) for point in right_hand_points_to_project] # 反投影
- for i in range(len(right_hand_back_projected_points)): # 遍历右手关节点
- u, v = right_hand_back_projected_points[i] # 获取投影点坐标
- if i in [0, 1, 5, 9, 13, 17]:
- cv2.circle(color_image, (u, v), 10, (0, 255, 0), -1) # 绘制绿色圆点
- elif i in [4, 8, 12, 16, 20]:
- cv2.circle(color_image, (u, v), 10, (255, 0, 0), -1) # 绘制蓝色圆点
- else:
- cv2.circle(color_image, (u, v), 10, (0, 0, 255), -1) # 绘制红色圆点
- if i in [1, 2, 3, 4]:
- cv2.circle(color_image, (u, v), 10, (255, 0,
- 最后使用Open3D和OpenCV库来处理和现实3D和2D数据
以下是2D可视化部分
将RGBD图像的彩色部分转换为NumPy数组,并从RGB格式转换为BGR格式
处理左手和右手关节数据,将其转换为齐次坐标,应用校正矩阵,转换为 3D坐标,并反向投影到图像平面- if vis_2d: # 如果启用了2D可视化
- color_image = np.asarray(rgbd.color) # 将RGBD图像的彩色部分转换为NumPy数组
- color_image = cv2.cvtColor(color_image, cv2.COLOR_RGB2BGR) # 将图像从RGB格式转换为BGR格式
在图像上绘制不同颜色的圆点表示不同的关节点- # 处理左手关节数据
- left_hand_joint_points_homogeneous = np.hstack((left_hand_joint_xyz, np.ones((left_hand_joint_xyz.shape[0], 1)))) # 将左手关节位置转换为齐次坐标
- left_hand_transformed_points_homogeneous = np.dot(left_hand_joint_points_homogeneous, np.linalg.inv(corrected_pose).T) # 应用校正矩阵
- left_hand_points_to_project = left_hand_transformed_points_homogeneous[:, :3] / left_hand_transformed_points_homogeneous[:, [3]] # 将齐次坐标转换为3D坐标
- left_hand_back_projected_points = [self._back_project_point(point, o3d_depth_intrinsic.intrinsic_matrix) for point in left_hand_points_to_project] # 反向投影到图像平面
- for i in range(len(left_hand_back_projected_points)): # 遍历所有左手关节点
- u, v = left_hand_back_projected_points[i] # 获取关节点的图像坐标
- if i in [0, 1, 5, 9, 13, 17]: # 如果关节点是手腕或指根
- cv2.circle(color_image, (u, v), 10, (0, 255, 0), -1) # 画绿色圆点
- elif i in [4, 8, 12, 16, 20]: # 如果关节点是指尖
- cv2.circle(color_image, (u, v), 10, (255, 0, 0), -1) # 画蓝色圆点
- else: # 其他关节点
- cv2.circle(color_image, (u, v), 10, (0, 0, 255), -1) # 画红色圆点
- if i in [1, 2, 3, 4]: # 如果关节点是拇指
- cv2.circle(color_image, (u, v), 10, (255, 0, 255), -1) # 画紫色圆点
- # 处理右手关节数据
- right_hand_joint_points_homogeneous = np.hstack((right_hand_joint_xyz, np.ones((right_hand_joint_xyz.shape[0], 1)))) # 将右手关节位置转换为齐次坐标
- right_hand_transformed_points_homogeneous = np.dot(right_hand_joint_points_homogeneous, np.linalg.inv(corrected_pose).T) # 应用校正矩阵
- right_hand_points_to_project = right_hand_transformed_points_homogeneous[:, :3] / right_hand_transformed_points_homogeneous[:, [3]] # 将齐次坐标转换为3D坐标
- right_hand_back_projected_points = [self._back_project_point(point, o3d_depth_intrinsic.intrinsic_matrix) for point in right_hand_points_to_project] # 反向投影到图像平面
- for i in range(len(right_hand_back_projected_points)): # 遍历所有右手关节点
- u, v = right_hand_back_projected_points[i] # 获取关节点的图像坐标
- if i in [0, 1, 5, 9, 13, 17]: # 如果关节点是手腕或指根
- cv2.circle(color_image, (u, v), 10, (0, 255, 0), -1) # 画绿色圆点
- elif i in [4, 8, 12, 16, 20]: # 如果关节点是指尖
- cv2.circle(color_image, (u, v), 10, (255, 0, 0), -1) # 画蓝色圆点
- else: # 其他关节点
- cv2.circle(color_image, (u, v), 10, (0, 0, 255), -1) # 画红色圆点
- if i in [1, 2, 3, 4]: # 如果关节点是拇指
- cv2.circle(color_image, (u, v), 10, (255, 0, 255), -1) # 画紫色圆点
显示带有反向投影点的图像,并在按下'q'键时退出铺环- cv2.imshow("Back-projected Points on Image", color_image) # 显示带有反向投影点的图像
- # 如果按下'q'键,退出循环
- if cv2.waitKey(1) & 0xFF == ord('q'):
- return # 退出函数
以下是3D可视化部分
设置右手关节球体的位置
创建或更新右手关节之问的连线- # 设置关节球体和连线
- for i, sphere in enumerate(self.right_joints): # 遍历右手关节球体
- transformation = np.eye(4) # 创建4x4单位矩阵
- transformation[:3, 3] = right_hand_joint_xyz[i] - sphere.get_center() # 计算平移向量
- sphere.transform(transformation) # 应用平移变换
- for i, (x, y) in enumerate(lines): # 遍历连线
- start = self.right_joints[x].get_center() # 获取连线起点
- end = self.right_joints[y].get_center() # 获取连线终点
- create_or_update_cylinder(start, end, cylinder_list=self.right_line_set, cylinder_idx=i) # 创建或更新连线的圆柱体
- return True # 返回True表示成功
第二部分 STEP2_build_dataset
先导入库
- import h5py # 用于处理HDF5文件
- import json # 用于处理JSON数据
- from scipy.linalg import svd # 用于计算奇异值分解
- from utils import * # 导入自定义的工具函数
- from hyperparameters import * # 导入超参数
- from pybullet_ik_bimanual import LeapPybulletIK # 导入用于逆运动学计算的类
然后初始化全局变量
- leapPybulletIK = LeapPybulletIK() # 创建LeapPybulletIK对象
- R_delta_init = None # 初始化旋转矩阵
2.1 dataset_utils.py
2.1.1 read_pose_data:读取和处理手部姿态数据(过程中使用 LeapPybulLetIK 计算逆运动学,得到手部关节的目标位置)
read_pose_data 用于读取和处理手部姿态数据。该函数从指定的文件路径中加载手部关节位置和
方向数据,并进行一系列转换和校正,最终返回处理后的数据
- 先定义函数本身和一些全局变量
- def read_pose_data(frame_path, demo_path, fixed_trans_to_robot_table, first_frame=False): # 定义读取姿态数据的函数
- global leapPybulletIK # 声明全局变量 leapPybulletIK
- cam_pose_path = os.path.join(frame_path, "pose.txt") # 相机姿态文件路径
- 加载左手姿态数据
- # 加载左手姿态数据
- left_pose_path = os.path.join(frame_path, "pose_2.txt") # 左手姿态文件路径
- left_hand_pos_path = os.path.join(frame_path, "left_hand_joint.txt") # 左手关节位置文件路径
- left_hand_ori_path = os.path.join(frame_path, "left_hand_joint_ori.txt") # 左手关节方向文件路径
- left_hand_off_path = os.path.join(demo_path, "calib_offset_left.txt") # 左手校准偏移量文件路径
- left_hand_off_ori_path = os.path.join(demo_path, "calib_ori_offset_left.txt") # 左手校准方向偏移量文件路径
- pose_2 = np.loadtxt(left_pose_path) # 加载左手姿态数据
- pose_2[:3, 3] += fixed_trans_to_robot_table.T # 应用固定的平移量
- pose_2 = pose_2 @ between_cam_2 # 应用相机之间的转换矩阵
- left_hand_joint_xyz = np.loadtxt(left_hand_pos_path) # 加载左手关节位置数据
- left_hand_joint_xyz = translate_wrist_to_origin(left_hand_joint_xyz) # 将手腕位置平移到原点
- left_hand_wrist_ori = np.loadtxt(left_hand_ori_path)[0] # 加载左手关节方向数据
- 加载右手姿态数据
- # 加载右手姿态数据
- pose_path = os.path.join(frame_path, "pose_3.txt") # 右手姿态文件路径
- hand_pos_path = os.path.join(frame_path, "right_hand_joint.txt") # 右手关节位置文件路径
- hand_ori_path = os.path.join(frame_path, "right_hand_joint_ori.txt") # 右手关节方向文件路径
- hand_off_path = os.path.join(demo_path, "calib_offset.txt") # 右手校准偏移量文件路径
- hand_off_ori_path = os.path.join(demo_path, "calib_ori_offset.txt") # 右手校准方向偏移量文件路径
- pose_3 = np.loadtxt(pose_path) # 加载右手姿态数据
- pose_3[:3, 3] += fixed_trans_to_robot_table.T # 应用固定的平移量
- pose_3 = pose_3 @ between_cam_3 # 应用相机之间的转换矩阵
- right_hand_joint_xyz = np.loadtxt(hand_pos_path) # 加载右手关节位置数据
- right_hand_joint_xyz = translate_wrist_to_origin(right_hand_joint_xyz) # 将手腕位置平移到原点
- right_hand_wrist_ori = np.loadtxt(hand_ori_path)[0] # 加载右手关节方向数据
- 计算逆运动学,其中计算逆运动学compute_IK的实现将在下文分析、讲解
- right_hand_target, left_hand_target, right_hand_points, left_hand_points = leapPybulletIK.compute_IK(right_hand_joint_xyz, right_hand_wrist_ori, left_hand_joint_xyz, left_hand_wrist_ori) # 计算逆运动学
- np.savetxt(os.path.join(frame_path, "right_joints.txt"), right_hand_target) # 保存右手关节目标位置
- np.savetxt(os.path.join(frame_path, "left_joints.txt"), left_hand_target) # 保存左手关节目标位置
- 转换左手姿态
- # 转换左手姿态
- left_rotation_matrix = Rotation.from_quat(left_hand_wrist_ori).as_matrix().T # 将四元数转换为旋转矩阵
- left_joint_xyz_reshaped = left_hand_joint_xyz[:, :, np.newaxis] # 重塑左手关节位置数组
- left_transformed_joint_xyz = np.matmul(left_rotation_matrix, left_joint_xyz_reshaped) # 应用旋转矩阵
- left_hand_joint_xyz = left_transformed_joint_xyz[:, :, 0] # 获取转换后的左手关节位置
- left_hand_joint_xyz[:, -1] = -left_hand_joint_xyz[:, -1] # 反转z轴
- rotation_matrix = axangle2mat(np.array([0, 1, 0]), -np.pi * 1 / 2) # 绕y轴旋转
- left_hand_joint_xyz = np.dot(left_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
- rotation_matrix = axangle2mat(np.array([1, 0, 0]), np.pi * 1 / 2) # 绕x轴旋转
- left_hand_joint_xyz = np.dot(left_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
- rotation_matrix = axangle2mat(np.array([0, 0, 1]), -np.pi * 1 / 2) # 绕z轴旋转
- left_hand_joint_xyz = np.dot(left_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
- left_hand_ori_offset = np.loadtxt(left_hand_off_ori_path) # 加载左手校准方向偏移量
- left_hand_joint_xyz = np.dot(left_hand_joint_xyz, euler2mat(*left_hand_ori_offset).T) # 应用旋转校准
- left_hand_offset = np.loadtxt(left_hand_off_path) # 加载左手校准偏移量
- left_hand_joint_xyz += left_hand_offset # 应用偏移量
- left_hand_joint_xyz = apply_pose_matrix(left_hand_joint_xyz, pose_2) # 应用姿态矩阵
- update_pose_2 = copy.deepcopy(pose_2) # 复制左手姿态矩阵
- update_pose_2[:3, 3] = left_hand_joint_xyz[0] # 更新左手姿态矩阵的平移部分
- left_hand_joint_xyz = apply_pose_matrix(left_hand_joint_xyz, inverse_transformation(update_pose_2)) # 应用逆变换
- # 重要!由于手套上的相机安装角度为45度,需要在此处转换以获得正确的手部方向
- rotation_45lookup_matrix = axangle2mat(np.array([1, 0, 0]), np.pi * 1 / 4) # 绕z轴旋转45度
- update_pose_2[:3, :3] = np.dot(update_pose_2[:3, :3], rotation_45lookup_matrix.T) # 应用旋转矩阵
- if not first_frame:
- update_pose_2 = hand_to_robot_left(update_pose_2) # 转换为机器人左手坐标系
- left_hand_translation = update_pose_2[:3, 3] # 获取左手平移部分
- left_hand_rotation_matrix = update_pose_2[:3, :3] # 获取左手旋转矩阵
- left_hand_quaternion = mat2quat(left_hand_rotation_matrix) # 将旋转矩阵转换为四元数
- 转换右手姿态
- # 转换右手姿态
- right_rotation_matrix = Rotation.from_quat(right_hand_wrist_ori).as_matrix().T # 将四元数转换为旋转矩阵
- right_joint_xyz_reshaped = right_hand_joint_xyz[:, :, np.newaxis] # 重塑右手关节位置数组
- right_transformed_joint_xyz = np.matmul(right_rotation_matrix, right_joint_xyz_reshaped) # 应用旋转矩阵
- right_hand_joint_xyz = right_transformed_joint_xyz[:, :, 0] # 获取转换后的右手关节位置
- right_hand_joint_xyz[:, -1] = -right_hand_joint_xyz[:, -1] # 反转z轴
- rotation_matrix = axangle2mat(np.array([0, 1, 0]), -np.pi * 1 / 2) # 绕y轴旋转
- right_hand_joint_xyz = np.dot(right_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
- rotation_matrix = axangle2mat(np.array([1, 0, 0]), np.pi * 1 / 2) # 绕x轴旋转
- right_hand_joint_xyz = np.dot(right_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
- rotation_matrix = axangle2mat(np.array([0, 0, 1]), -np.pi * 1 / 2) # 绕z轴旋转
- right_hand_joint_xyz = np.dot(right_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
- right_hand_ori_offset = np.loadtxt(hand_off_ori_path) # 加载右手校准方向偏移量
- right_hand_joint_xyz = np.dot(right_hand_joint_xyz, euler2mat(*right_hand_ori_offset).T) # 应用旋转校准
- right_hand_offset = np.loadtxt(hand_off_path) # 加载右手校准偏移量
- right_hand_joint_xyz += right_hand_offset # 应用偏移量
- right_hand_joint_xyz = apply_pose_matrix(right_hand_joint_xyz, pose_3) # 应用姿态矩阵
- update_pose_3 = copy.deepcopy(pose_3) # 复制右手姿态矩阵
- update_pose_3[:3, 3] = right_hand_joint_xyz[0] # 更新右手姿态矩阵的平移部分
- right_hand_joint_xyz = apply_pose_matrix(right_hand_joint_xyz, inverse_transformation(update_pose_3)) # 应用逆变换
- # 重要!由于手套上的相机安装角度为45度,需要在此处转换以获得正确的手部方向
- rotation_45lookup_matrix = axangle2mat(np.array([1, 0, 0]), np.pi * 1 / 4) # 绕z轴旋转45度
- update_pose_3[:3, :3] = np.dot(update_pose_3[:3, :3], rotation_45lookup_matrix.T) # 应用旋转矩阵
- if not first_frame:
- update_pose_3 = hand_to_robot(update_pose_3) # 转换为机器人右手坐标系
- right_hand_translation = update_pose_3[:3, 3] # 获取右手平移部分
- right_hand_rotation_matrix = update_pose_3[:3, :3] # 获取右手旋转矩阵
- right_hand_quaternion = mat2quat(right_hand_rotation_matrix) # 将旋转矩阵转换为四元数
- 加载相机姿态数据
- cam_pose_4x4 = np.loadtxt(cam_pose_path) # 加载相机姿态数据
- cam_pose_4x4[:3, 3] += fixed_trans_to_robot_table.T # 应用固定的平移量
- cam_corrected_pose = cam_pose_4x4 @ between_cam # 应用相机之间的转换矩阵
- cam_corrected_pose = cam_corrected_pose.flatten() # 将矩阵展平
- return (np.concatenate([right_hand_translation, right_hand_quaternion, right_hand_target]),
- np.concatenate([left_hand_translation, left_hand_quaternion, left_hand_target]),
- cam_corrected_pose,
- right_hand_joint_xyz.flatten(),
- left_hand_joint_xyz.flatten(),
- right_hand_points,
- left_hand_points) # 返回处理后的数据
总共,这段代码定义了一个名为read_pose_data 的函数,用于读取和处理手部姿态数据。具体步骤如下:
- 加载左手和右手的姿态数据:从指定的文件路径中加载手部关节位置和方向数据
- 计算逆运动学:使用 LeapPybulLetIK 计算逆运动学,得到手部关节的目标位置
- 转换左手和右手的姿态:将手部关节位置和方向数据转换为标准坐标系,并应用校准偏移量和旋转矩阵
- 加载相机姿态数据:从指定的文件路径中加载相机姿态数据,并应用固定的平移量和相机之间的转换矩阵
- 返回处理后的数据:返回处理后的手部和相机姿态数据,包括关节位置、方向和目标位置
通过这些步骤,函数能够读取和处理手部姿态数据,并将其转换为标准坐标系,供后续处理和分析使用
2.1.2 process_hdf5:处理多个数据集(手部姿态/图像/点云),作为模型的训练数据集
这段代码的主要功能是处理多个数据集文件夹中的数据——比如手部姿态数据、图像数据、点云数据,并将处理后的数据保存到一个HDF5文件中,从而作为训练机器学习模型的数据集
具体步骤如下:
- 初始化:创建Open3D可视化器和空的点云对象,打开HDF5文件用于写入
- def process_hdf5(output_hdf5_file, dataset_folders, action_gap, num_points_to_sample, in_wild_data=False): # 定义处理HDF5文件的函数
- global R_delta_init # 声明全局变量 R_delta_init
- vis = o3d.visualization.Visualizer() # 创建Open3D可视化器
- vis.create_window() # 创建可视化窗口
- pcd_vis = o3d.geometry.PointCloud() # 初始化空的点云对象
- firstfirst = True # 标记是否为第一次更新可视化器
- with h5py.File(output_hdf5_file, 'w') as output_hdf5: # 打开HDF5文件用于写入
- output_data_group = output_hdf5.create_group('data') # 创建数据组
- demo_index = 0 # 初始化演示索引
- total_frames = 0 # 初始化总帧数
- mean_init_pos = [] # 初始化初始位置均值列表
- mean_init_quat = [] # 初始化初始四元数均值列表
- 遍历数据集文件夹:加载剪辑标记文件,读取手部姿态数据、图像数据和点云数据
- for dataset_folder in dataset_folders: # 遍历数据集文件夹
- clip_marks_json = os.path.join(dataset_folder, 'clip_marks.json') # 剪辑标记文件路径
- if in_wild_data: # 如果数据是在野外收集的,读取固定的平移量到机器人桌面
- fixed_trans_to_robot_table = np.loadtxt(os.path.join(dataset_folder, 'map_to_robot_table_trans.txt'))
- else:
- fixed_trans_to_robot_table = np.array([0.0, 0.0, 0.0]) # 否则,平移量为零
- # 加载剪辑标记
- with open(clip_marks_json, 'r') as file:
- clip_marks = json.load(file) # 读取剪辑标记
- for clip in clip_marks: # 遍历每个剪辑
- # 保存第0帧并更新 R_delta_init
- frame0_pose_data, frame0_left_pose_data, _, _, _, _, _ = read_pose_data(os.path.join(dataset_folder, f'frame_0'), dataset_folder, fixed_trans_to_robot_table=fixed_trans_to_robot_table, first_frame=True)
- update_R_delta_init(frame0_pose_data[:3], frame0_pose_data[3:7]) # 更新 R_delta_init
- # 获取起始和结束帧编号
- start_frame = int(clip['start'].split('_')[-1])
- end_frame = int(clip['end'].split('_')[-1])
- clip_length = end_frame - start_frame + 1 # 包括第0帧
- agentview_images = [] # 初始化代理视角图像列表
- pointcloud = [] # 初始化点云列表
- poses = [] # 初始化姿态列表
- poses_left = [] # 初始化左手姿态列表
- states = [] # 初始化状态列表
- glove_states = [] # 初始化手套状态列表
- left_glove_states = [] # 初始化左手手套状态列表
- labels = [] # 初始化标签列表
- 处理每一帧数据:调整图像大小,遮罩手部图像
处理点云数据- for frame_number in list(range(start_frame, end_frame + 1)): # 遍历每一帧
- frame_folder = f'frame_{frame_number}' # 帧文件夹名称
- image_path = os.path.join(dataset_folder, frame_folder, "color_image.jpg") # 彩色图像路径
- frame_path = os.path.join(dataset_folder, frame_folder) # 帧文件夹路径
- # 加载手部姿态数据
- pose_data, left_pose_data, cam_data, glove_data, left_glove_data, right_hand_points, left_hand_points = read_pose_data(frame_path, dataset_folder, fixed_trans_to_robot_table=fixed_trans_to_robot_table)
- poses.append(pose_data) # 添加右手姿态数据
- poses_left.append(left_pose_data) # 添加左手姿态数据
- states.append(cam_data) # 添加相机数据
- glove_states.append(glove_data) # 添加右手手套数据
- left_glove_states.append(left_glove_data) # 添加左手手套数据
- # 处理图像
- resized_image = resize_image(image_path) # 调整图像大小
- resized_image, right_hand_show = mask_image(resized_image, pose_data, cam_data) # 遮罩右手图像
- resized_image, left_hand_show = mask_image(resized_image, left_pose_data, cam_data, left=True) # 遮罩左手图像
- agentview_images.append(resized_image) # 添加调整大小后的图像
移除冗余点- # 处理点云
- color_image_o3d = o3d.io.read_image(os.path.join(dataset_folder, frame_folder, "color_image.jpg")) # 读取彩色图像
- depth_image_o3d = o3d.io.read_image(os.path.join(dataset_folder, frame_folder, "depth_image.png")) # 读取深度图像
- max_depth = 1000 # 最大深度
- depth_array = np.asarray(depth_image_o3d) # 将深度图像转换为数组
- mask = depth_array > max_depth # 创建深度掩码
- depth_array[mask] = 0 # 将超过最大深度的值设为0
- filtered_depth_image = o3d.geometry.Image(depth_array) # 创建过滤后的深度图像
- rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(color_image_o3d, filtered_depth_image, depth_trunc=4.0, convert_rgb_to_intensity=False) # 创建RGBD图像
- pose_4x4 = np.loadtxt(os.path.join(dataset_folder, frame_folder, "pose.txt")) # 加载姿态数据
- pose_4x4[:3, 3] += fixed_trans_to_robot_table.T # 应用固定的平移量
- corrected_pose = pose_4x4 @ between_cam # 应用相机之间的转换矩阵
- pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, o3d_depth_intrinsic) # 从RGBD图像创建点云
- pcd.transform(corrected_pose) # 应用校正矩阵
- color_pcd = np.concatenate((np.array(pcd.points), np.array(pcd.colors)), axis=-1) # 合并点云和颜色数据
- if right_hand_show: # 如果检测到右手,合并右手点云
- transformed_point_cloud = transform_right_leap_pointcloud_to_camera_frame(right_hand_points, pose_data)
- colored_hand_point_cloud = np.concatenate((transformed_point_cloud, np.zeros((transformed_point_cloud.shape[0], 3))), axis=1)
- color_pcd = np.concatenate((color_pcd, colored_hand_point_cloud), axis=0)
- if left_hand_show: # 如果检测到左手,合并左手点云
- transformed_point_cloud_left = transform_left_leap_pointcloud_to_camera_frame(left_hand_points, left_pose_data)
- colored_hand_point_cloud_left = np.concatenate((transformed_point_cloud_left, np.zeros((transformed_point_cloud_left.shape[0], 3))), axis=1)
- color_pcd = np.concatenate((color_pcd, colored_hand_point_cloud_left), axis=0)
下采样点云- # 移除桌面表面和背景下方的冗余点
- centroid = np.mean(robot_table_corner_points, axis=0) # 计算质心
- A = robot_table_corner_points - centroid # 计算偏移
- U, S, Vt = svd(A) # 进行奇异值分解
- normal = Vt[-1] # 获取法向量
- d = -np.dot(normal, centroid) # 计算平面方程中的d
- xyz = color_pcd[:, :3] # 获取点云的xyz坐标
- for plane_gap in table_sweep_list: # 遍历平面高度
- below_plane = np.dot(xyz, normal[:3]) + d + plane_gap < 0 # 判断点是否在平面下方
- if len(color_pcd[~below_plane]) > num_points_to_sample: # 如果点数大于采样点数
- color_pcd = color_pcd[~below_plane] # 移除平面下方的点
- break
- # 下采样点云
- if len(color_pcd) > num_points_to_sample:
- indices = np.random.choice(len(color_pcd), num_points_to_sample, replace=False) # 随机选择点
- color_pcd = color_pcd[indices] # 获取采样点
- pointcloud.append(copy.deepcopy(color_pcd)) # 添加点云数据
- labels.append(0) # 添加标签
- # 更新点云可视化
- pcd_vis.points = o3d.utility.Vector3dVector(color_pcd[:, :3]) # 设置点云的点
- pcd_vis.colors = o3d.utility.Vector3dVector(color_pcd[:, 3:]) # 设置点云的颜色
- if firstfirst:
- vis.add_geometry(pcd_vis) # 添加几何体到可视化器
- firstfirst = False # 标记为非第一次
- else:
- vis.update_geometry(pcd_vis) # 更新几何体
- vis.poll_events() # 处理可视化事件
- vis.update_renderer() # 更新渲染器
- 更新可视化:更新点云和图像的可视化显示
- # 更新图像可视化
- cv2.imshow("masked_resized_image", resized_image) # 显示遮罩后的图像
- cv2.waitKey(1) # 等待键盘输入
- 生成动作轨迹:根据 action_gap 生成动作轨迹,并保存到HDF5文件中
先
后- poses = np.array(poses) # 转换姿态列表为数组
- robot0_eef_pos = poses[:, :3] # 获取末端执行器位置
- robot0_eef_quat = poses[:, 3:7] # 获取末端执行器四元数
- robot0_eef_hand = (poses[:, 7:] - np.pi) * 0.5 # 缩放手部关节位置
- poses_left = np.array(poses_left) # 转换左手姿态列表为数组
- robot0_eef_pos_left = poses_left[:, :3] # 获取左手末端执行器位置
- robot0_eef_quat_left = poses_left[:, 3:7] # 获取左手末端执行器四元数
- robot0_eef_hand_left = (poses_left[:, 7:] - np.pi) * 0.5 # 缩放左手关节位置
- robot0_eef_pos = np.concatenate((robot0_eef_pos, robot0_eef_pos_left), axis=-1) # 合并左右手末端执行器位置
- robot0_eef_quat = np.concatenate((robot0_eef_quat, robot0_eef_quat_left), axis=-1) # 合并左右手末端执行器四元数
- robot0_eef_hand = np.concatenate((robot0_eef_hand, robot0_eef_hand_left), axis=-1) # 合并左右手关节位置
- actions_pos = np.concatenate((robot0_eef_pos[action_gap:], robot0_eef_pos[-1:].repeat(action_gap, axis=0)), axis=0) # 生成动作位置
- actions_rot = np.concatenate((robot0_eef_quat[action_gap:], robot0_eef_quat[-1:].repeat(action_gap, axis=0)), axis=0) # 生成动作旋转
- actions_hand = np.concatenate((robot0_eef_hand[action_gap:], robot0_eef_hand[-1:].repeat(action_gap, axis=0)), axis=0) # 生成动作手部姿态
- actions = np.concatenate((actions_pos, actions_rot, actions_hand), axis=-1) # 合并手臂和手部动作
- for j in range(action_gap): # 根据 action_gap 生成轨迹
- demo_name = f'demo_{demo_index}' # 演示名称
- output_demo_group = output_data_group.create_group(demo_name) # 创建演示组
- print("{} saved".format(demo_name)) # 打印保存信息
- demo_index += 1 # 增加演示索引
- output_demo_group.attrs['frame_0_eef_pos'] = frame0_pose_data[:3] # 设置第0帧末端执行器位置
- output_demo_group.attrs['frame_0_eef_quat'] = frame0_pose_data[3:7] # 设置第0帧末端执行器四元数
- output_obs_group = output_demo_group.create_group('obs') # 创建观察组
- output_obs_group.create_dataset('agentview_image', data=np.array(agentview_images)[j::action_gap]) # 保存代理视角图像
- output_obs_group.create_dataset('pointcloud', data=np.array(pointcloud)[j::action_gap]) # 保存点云数据
- output_obs_group.create_dataset('robot0_eef_pos', data=copy.deepcopy(robot0_eef_pos)[j::action_gap]) # 保存末端执行器位置
- output_obs_group.create_dataset('robot0_eef_quat', data=copy.deepcopy(robot0_eef_quat)[j::action_gap]) # 保存末端执行器四元数
- output_obs_group.create_dataset('robot0_eef_hand', data=copy.deepcopy(robot0_eef_hand)[j::action_gap]) # 保存手部姿态
- output_obs_group.create_dataset('label', data=np.array(labels)[j::action_gap]) # 保存标签
- output_demo_group.create_dataset('actions', data=copy.deepcopy(actions)[j::action_gap]) # 保存动作
- # 创建 'dones', 'rewards', 和 'states'
- dones = np.zeros(clip_length, dtype=np.int64) # 初始化 'dones'
- dones[-1] = 1 # 设置最后一帧的 'done' 为 1
- output_demo_group.create_dataset('dones', data=dones[j::action_gap]) # 保存 'dones'
- rewards = np.zeros(clip_length, dtype=np.float64) # 初始化 'rewards'
- output_demo_group.create_dataset('rewards', data=rewards[j::action_gap]) # 保存 'rewards'
- output_demo_group.create_dataset('states', data=states[j::action_gap]) # 保存状态
- output_demo_group.create_dataset('glove_states', data=glove_states[j::action_gap]) # 保存手套状态
- output_demo_group.attrs['num_samples'] = len(actions[j::action_gap]) # 设置样本数量
- total_frames += len(actions[j::action_gap]) # 增加总帧数
- mean_init_pos.append(copy.deepcopy(robot0_eef_pos[j])) # 添加初始位置
- mean_init_quat.append(copy.deepcopy(robot0_eef_quat[j])) # 添加初始四元数
- mean_init_hand.append(copy.deepcopy(robot0_eef_hand[j])) # 添加初始手部姿态
- output_data_group.attrs['total'] = total_frames # 设置总帧数
- 计算初始位置的均值:计算初始位置、四元数和手部姿态的均值,并保存到HDF5文件中
- # 计算初始位置的均值
- mean_init_pos = np.array(mean_init_pos).mean(axis=0) # 计算初始位置均值
- mean_init_quat = mean_init_quat[0] # 获取初始四元数
- mean_init_hand = np.array(mean_init_hand).mean(axis=0) # 计算初始手部姿态均值
- output_data_group.attrs['mean_init_pos'] = mean_init_pos # 设置初始位置均值
- output_data_group.attrs['mean_init_quat'] = mean_init_quat # 设置初始四元数
- output_data_group.attrs['mean_init_hand'] = mean_init_hand # 设置初始手部姿态均值
2.2 pybullet_ik_bimanual.py——含点云生成、计算逆运动学compute_IK的实现
2.2.1 导入库、类的定义和初始化、获取网格点云的方法等等
- 导入库
- import pybullet_data # 导入 PyBullet 数据库
- from yourdfpy import URDF # 导入 URDF 解析库
- from transforms3d.euler import quat2euler, euler2quat # 导入四元数和欧拉角转换函数
- from utils import * # 导入自定义工具函数
- from hyperparameters import * # 导入超参数
- 类定义和初始化
- class LeapPybulletIK(): # 定义 LeapPybulletIK 类
- def __init__(self): # 初始化方法
- # 启动 PyBullet
- clid = p.connect(p.SHARED_MEMORY) # 尝试连接到共享内存
- if clid < 0:
- p.connect(p.GUI) # 如果连接失败,则启动 GUI
- p.setAdditionalSearchPath(pybullet_data.getDataPath()) # 设置 PyBullet 数据路径
- p.loadURDF("plane.urdf", [0, 0, -0.3]) # 加载平面 URDF
- # 加载右手 LEAP 模型
- self.LeapId = p.loadURDF(
- "leap_hand_mesh/robot_pybullet.urdf",
- [0.0, 0.0, 0.0],
- rotate_quaternion(0.0, 0.0, 0.0),
- )
- # 加载左手 LEAP 模型
- self.left_offset = 1.0 # 为了可视化,将左手和右手分开
- self.LeapId_2 = p.loadURDF(
- "leap_hand_mesh/robot_pybullet.urdf",
- [0.0, self.left_offset, 0.0],
- rotate_quaternion(0.0, 0.0, 0.0),
- )
- self.leap_center_offset = [0.18, 0.03, 0.0] # 由于 LEAP 手部 URDF 的根节点不在手掌根部(在食指根部),我们设置一个偏移量来校正根位置
- self.leapEndEffectorIndex = [4, 9, 14, 19] # 指尖关节索引
- self.fingertip_offset = np.array([0.1, 0.0, -0.08]) # 由于 URDF 中指尖网格的根节点不在指尖(在指尖网格的右下部分),我们设置一个偏移量来校正指尖位置
- self.thumb_offset = np.array([0.1, 0.0, -0.06]) # 同样的原因,校正拇指尖位置
- self.numJoints = p.getNumJoints(self.LeapId) # 获取 LEAP 手部的关节数量
- self.hand_lower_limits, self.hand_upper_limits, self.hand_joint_ranges = self.get_joint_limits(self.LeapId) # 获取 LEAP 手部的关节限制
- self.HAND_Q = np.array([np.pi / 6, -np.pi / 6, np.pi / 3, np.pi / 3,
- np.pi / 6, 0.0, np.pi / 3, np.pi / 3,
- np.pi / 6, np.pi / 6, np.pi / 3, np.pi / 3,
- np.pi / 6, np.pi / 6, np.pi / 3, np.pi / 3]) # 为了避免 LEAP 手部的自碰撞,我们定义一个参考姿态用于零空间 IK
- # 加载左手和右手的 URDF,用于在正向运动学期间生成点云
- self.urdf_dict = {}
- self.Leap_urdf = URDF.load("leap_hand_mesh/robot_pybullet.urdf")
- self.Leap_urdf_2 = URDF.load("leap_hand_mesh/robot_pybullet.urdf")
- self.urdf_dict["right_leap"] = {
- "scene": self.Leap_urdf.scene,
- "mesh_list": self._load_meshes(self.Leap_urdf.scene),
- }
- self.urdf_dict["left_leap"] = {
- "scene": self.Leap_urdf_2.scene,
- "mesh_list": self._load_meshes(self.Leap_urdf_2.scene),
- }
- self.create_target_vis() # 创建目标可视化
- p.setGravity(0, 0, 0) # 设置重力
- useRealTimeSimulation = 0 # 禁用实时模拟
- p.setRealTimeSimulation(useRealTimeSimulation) # 设置实时模拟
- 获取关节限制的方法
- def get_joint_limits(self, robot): # 获取关节限制的方法
- joint_lower_limits = [] # 初始化关节下限列表
- joint_upper_limits = [] # 初始化关节上限列表
- joint_ranges = [] # 初始化关节范围列表
- for i in range(p.getNumJoints(robot)): # 遍历所有关节
- joint_info = p.getJointInfo(robot, i) # 获取关节信息
- if joint_info[2] == p.JOINT_FIXED: # 如果关节是固定的,跳过
- continue
- joint_lower_limits.append(joint_info[8]) # 添加关节下限
- joint_upper_limits.append(joint_info[9]) # 添加关节上限
- joint_ranges.append(joint_info[9] - joint_info[8]) # 计算并添加关节范围
- return joint_lower_limits, joint_upper_limits, joint_ranges # 返回关节限制
- 加载网格的方法
- def _load_meshes(self, scene): # 加载网格的方法
- mesh_list = [] # 初始化网格列表
- for name, g in scene.geometry.items(): # 遍历场景中的几何体
- mesh = g.as_open3d # 将几何体转换为 Open3D 网格
- mesh_list.append(mesh) # 添加到网格列表
- return mesh_list # 返回网格列表
- 更新网格的方法
- def _update_meshes(self, type): # 更新网格的方法
- mesh_new = o3d.geometry.TriangleMesh() # 创建新的三角网格
- for idx, name in enumerate(self.urdf_dict[type]["scene"].geometry.keys()): # 遍历几何体
- mesh_new += copy.deepcopy(self.urdf_dict[type]["mesh_list"][idx]).transform(
- self.urdf_dict[type]["scene"].graph.get(name)[0]
- ) # 更新网格
- return mesh_new # 返回更新后的网格
- 获取网格点云的方法
顺带提下,通过生成点云,可以更好的理解和处理三维空间中的信息,从而实现更复杂和精确的任务- def get_mesh_pointcloud(self, joint_pos, joint_pos_left): # 获取网格点云的方法
- self.Leap_urdf.update_cfg(joint_pos) # 更新右手关节配置
- right_mesh = self._update_meshes("right_leap") # 获取更新后的右手网格
- robot_pc = right_mesh.sample_points_uniformly(number_of_points=80000) # 从网格中均匀采样点云
- self.Leap_urdf_2.update_cfg(joint_pos_left) # 更新左手关节配置
- left_mesh = self._update_meshes("left_leap") # 获取更新后的左手网格
- robot_pc_left = left_mesh.sample_points_uniformly(number_of_points=80000) # 从网格中均匀采样点云
- # 将采样的网格点云转换为 Open3D 期望的格式
- new_points = np.asarray(robot_pc.points) # 将点云转换为 NumPy 数组
- new_points_left = np.asarray(robot_pc_left.points) # 将点云转换为 NumPy 数组
- new_points_left[:, 1] = -1.0 * new_points_left[:, 1] # 翻转右手网格为左手网格
- return new_points, new_points_left # 返回左右手的点云
- 转换向量的方法
- def switch_vector_from_rokoko(self, vector): # 转换向量的方法
- return [vector[0], -vector[2], vector[1]] # 返回转换后的向量
- 后处理Rokoko位置的方法
- def post_process_rokoko_pos(self, rightHandThumb_pos, rightHandIndex_pos, rightHandMiddle_pos, rightHandRing_pos): # 后处理 Rokoko 位置的方法
- rightHandThumb_pos[-1] *= -1.0 # 翻转 z 轴
- rightHandThumb_pos = self.switch_vector_from_rokoko(rightHandThumb_pos) # 转换向量
- rightHandIndex_pos[-1] *= -1.0 # 翻转 z 轴
- rightHandIndex_pos = self.switch_vector_from_rokoko(rightHandIndex_pos) # 转换向量
- rightHandMiddle_pos[-1] *= -1.0 # 翻转 z 轴
- rightHandMiddle_pos = self.switch_vector_from_rokoko(rightHandMiddle_pos) # 转换向量
- rightHandRing_pos[-1] *= -1.0 # 翻转 z 轴
- rightHandRing_pos = self.switch_vector_from_rokoko(rightHandRing_pos) # 转换向量
- return rightHandThumb_pos, rightHandIndex_pos, rightHandMiddle_pos, rightHandRing_pos # 返回处理后的位置
- 后处理Rokoko方向的方法
- def post_process_rokoko_ori(self, input_quat): # 后处理 Rokoko 方向的方法
- wxyz_input_quat = np.array([input_quat[3], input_quat[0], input_quat[1], input_quat[2]]) # 转换为 wxyz 四元数
- wxyz_input_mat = quat2mat(wxyz_input_quat) # 转换为旋转矩阵
- rot_mat = np.array([[0, 1, 0], [-1, 0, 0], [0, 0, 1]]) # 定义旋转矩阵
- wxyz_input_mat = np.dot(wxyz_input_mat, rot_mat) # 应用旋转矩阵
- wxyz_transform_quat = mat2quat(wxyz_input_mat) # 转换为四元数
- xyzw_transform_quat = np.array([wxyz_transform_quat[1], wxyz_transform_quat[2], wxyz_transform_quat[3], wxyz_transform_quat[0]]) # 转换为 xyzw 四元数
- return xyzw_transform_quat # 返回处理后的四元数
2.2.2 create_target_vis、update_target_vis、rest_target_vis的实现
接下来是create_target_vis的实现
以及update_target_vis的实现
- 方法定义
该方法接受五个参数:右手的旋转四元数 (rightHand_rot)、右手拇指、食指、中指和
无名指的关节位置def update_target_vis(self, rightHand_rot, rightHandThumb_pos, rightHandIndex_pos, rightHandMiddle_pos, rightHandRing_pos): # 定义更新目标可视化的方法
- 重置球体位置和方向
- p.resetBasePositionAndOrientation(
- self.ball6Mbt,
- rotate_vector_by_quaternion_using_matrix(self.leap_center_offset, rightHand_rot),
- rightHand_rot,
- ) # 重置球体 ball6Mbt 的位置和方向
- p.resetBasePositionAndOrientation(
- self.ball5Mbt,
- [0.0, 0.0, 0.0],
- rightHand_rot,
- ) # 重置球体 ball5Mbt 的位置和方向
- p.resetBasePositionAndOrientation(
- self.ball9Mbt,
- p.getLinkState(self.LeapId, 4)[0],
- rightHand_rot,
- ) # 重置球体 ball9Mbt 的位置和方向
- p.resetBasePositionAndOrientation(
- self.ball7Mbt,
- p.getLinkState(self.LeapId, 9)[0],
- rightHand_rot,
- ) # 重置球体 ball7Mbt 的位置和方向
- p.resetBasePositionAndOrientation(
- self.ball8Mbt,
- p.getLinkState(self.LeapId, 14)[0],
- rightHand_rot,
- ) # 重置球体 ball8Mbt 的位置和方向
- p.resetBasePositionAndOrientation(
- self.ball10Mbt,
- p.getLinkState(self.LeapId, 19)[0],
- rightHand_rot,
- ) # 重置球体 ball10Mbt 的位置和方向
- 计算偏移量
- 更新拇指位置和方向
- 更新食指位置和方向
- 更新中指位置和方向
- 更新无名指位置和方向
- 返回更新后的关键位置
再之后,是update_target_vis_left,用于更新左手目标位置的可视化方法,该方法使用PyBullet来重置多个球体的位置和方向,以反应手部关节的位置和方向
以及rest_target_vis
2.2.3 compute_IK的实现:用于计算双手的逆运动学IK——使用PyBullet来模拟手部关节的位置和方向
最后则是比较重要的compute_IK的实现,其用于计算双手的逆运动学IK,该方法使用PyBullet来模拟手部关节的位置和方向,并生成用于控制真实机器人的关节位置
以下是详细解读
- 方法定义
该方法接收4个参数:右手、左手的关节位置和手腕方向- def compute_IK(self, right_hand_pos, right_hand_wrist_ori, left_hand_pos, left_hand_wrist_ori): # 定义计算逆运动学的方法
- p.stepSimulation() # 进行一步模拟
- 处理左手数据,包括计算关节位置、应用旋转矩阵、翻转x轴
- wxyz_input_quat = np.array([left_hand_wrist_ori[3], left_hand_wrist_ori[0], left_hand_wrist_ori[1], left_hand_wrist_ori[2]]) # 转换左手四元数
- wxyz_input_mat = quat2mat(wxyz_input_quat) # 转换为旋转矩阵
- leftHand_pos = left_hand_pos[0] # 获取左手位置
- leftHandThumb_pos = (left_hand_pos[4] - leftHand_pos) # 计算左手拇指位置
- leftHandIndex_pos = (left_hand_pos[8] - leftHand_pos) # 计算左手食指位置
- leftHandMiddle_pos = (left_hand_pos[12] - leftHand_pos) # 计算左手中指位置
- leftHandRing_pos = (left_hand_pos[16] - leftHand_pos) # 计算左手无名指位置
- leftHandThumb_pos = leftHandThumb_pos @ wxyz_input_mat # 应用旋转矩阵
- leftHandIndex_pos = leftHandIndex_pos @ wxyz_input_mat # 应用旋转矩阵
- leftHandMiddle_pos = leftHandMiddle_pos @ wxyz_input_mat # 应用旋转矩阵
- leftHandRing_pos = leftHandRing_pos @ wxyz_input_mat # 应用旋转矩阵
- leftHandThumb_pos[0] *= -1.0 # 翻转 x 轴
- leftHandIndex_pos[0] *= -1.0 # 翻转 x 轴
- leftHandMiddle_pos[0] *= -1.0 # 翻转 x 轴
- leftHandRing_pos[0] *= -1.0 # 翻转 x 轴
- leftHandThumb_pos = leftHandThumb_pos @ wxyz_input_mat.T # 应用逆旋转矩阵
- leftHandIndex_pos = leftHandIndex_pos @ wxyz_input_mat.T # 应用逆旋转矩阵
- leftHandMiddle_pos = leftHandMiddle_pos @ wxyz_input_mat.T # 应用逆旋转矩阵
- leftHandRing_pos = leftHandRing_pos @ wxyz_input_mat.T # 应用逆旋转矩阵
- 转换左手方向,包括后处理、转换为欧拉角和四元数、重新排列和旋转四元数
- leftHand_rot = left_hand_wrist_ori # 获取左手方向
- leftHand_rot = self.post_process_rokoko_ori(leftHand_rot) # 后处理方向
- euler_angles = quat2euler(np.array([leftHand_rot[3], leftHand_rot[0], leftHand_rot[1], leftHand_rot[2]])) # 转换为欧拉角
- quat_angles = euler2quat(-euler_angles[0], -euler_angles[1], euler_angles[2]).tolist() # 转换为四元数
- leftHand_rot = np.array(quat_angles[1:] + quat_angles[:1]) # 重新排列四元数
- leftHand_rot = rotate_quaternion_xyzw(leftHand_rot, np.array([1.0, 0.0, 0.0]), np.pi / 2.0) # 旋转四元数
- 处理右手数据,包括计算关节位置和更新左手目标可视化
- rightHand_pos = right_hand_pos[0] # 获取右手位置
- rightHandThumb_pos = (right_hand_pos[4] - rightHand_pos) # 计算右手拇指位置
- rightHandIndex_pos = (right_hand_pos[8] - rightHand_pos) # 计算右手食指位置
- rightHandMiddle_pos = (right_hand_pos[12] - rightHand_pos) # 计算右手中指位置
- rightHandRing_pos = (right_hand_pos[16] - rightHand_pos) # 计算右手无名指位置
- leftHandThumb_pos, leftHandIndex_pos, leftHandMiddle_pos, leftHandRing_pos = self.post_process_rokoko_pos(leftHandThumb_pos, leftHandIndex_pos, leftHandMiddle_pos, leftHandRing_pos) # 后处理左手位置
- leftHandThumb_pos, leftHandIndex_pos, leftHandMiddle_pos, leftHandRing_pos = self.update_target_vis_left(leftHand_rot, leftHandThumb_pos, leftHandIndex_pos, leftHandMiddle_pos, leftHandRing_pos) # 更新左手目标可视化
- leapEndEffectorPos_2 = [
- leftHandIndex_pos,
- leftHandMiddle_pos,
- leftHandRing_pos,
- leftHandThumb_pos
- ] # 定义左手末端执行器位置
- 转换右手方向
- 计算逆运动学,用于计算左手和右手的逆运动学,得到关节位置
- jointPoses_2 = []
- for i in range(4):
- jointPoses_2 = jointPoses_2 + list(
- p.calculateInverseKinematics(self.LeapId_2, self.leapEndEffectorIndex[i], leapEndEffectorPos_2[i],
- lowerLimits=self.hand_lower_limits, upperLimits=self.hand_upper_limits, jointRanges=self.hand_joint_ranges,
- restPoses=self.HAND_Q.tolist(), maxNumIterations=1000, residualThreshold=0.001))[4 * i:4 * (i + 1)]
- jointPoses_2 = tuple(jointPoses_2)
- jointPoses = []
- for i in range(4):
- jointPoses = jointPoses + list(
- p.calculateInverseKinematics(self.LeapId, self.leapEndEffectorIndex[i], leapEndEffectorPos[i],
- lowerLimits=self.hand_lower_limits, upperLimits=self.hand_upper_limits, jointRanges=self.hand_joint_ranges,
- restPoses=self.HAND_Q.tolist(), maxNumIterations=1000, residualThreshold=0.001))[4 * i:4 * (i + 1)]
- jointPoses = tuple(jointPoses)
- 合并关节位置
- combined_jointPoses_2 = (jointPoses_2[0:4] + (0.0,) + jointPoses_2[4:8] + (0.0,) + jointPoses_2[8:12] + (0.0,) + jointPoses_2[12:16] + (0.0,))
- combined_jointPoses_2 = list(combined_jointPoses_2)
- combined_jointPoses = (jointPoses[0:4] + (0.0,) + jointPoses[4:8] + (0.0,) + jointPoses[8:12] + (0.0,) + jointPoses[12:16] + (0.0,))
- combined_jointPoses = list(combined_jointPoses)
- 更新手部关节
- # 更新手部关节
- for i in range(20):
- p.setJointMotorControl2(
- bodyIndex=self.LeapId,
- jointIndex=i,
- controlMode=p.POSITION_CONTROL,
- targetPosition=combined_jointPoses[i],
- targetVelocity=0,
- force=500,
- positionGain=0.3,
- velocityGain=1,
- )
- p.setJointMotorControl2(
- bodyIndex=self.LeapId_2,
- jointIndex=i,
- controlMode=p.POSITION_CONTROL,
- targetPosition=combined_jointPoses_2[i],
- targetVelocity=0,
- force=500,
- positionGain=0.3,
- velocityGain=1,
- )
- 重置手部位置和方向
- p.resetBasePositionAndOrientation(
- self.LeapId,
- rotate_vector_by_quaternion_using_matrix(self.leap_center_offset, rightHand_rot),
- rightHand_rot,
- )
- after_left_offset_base = rotate_vector_by_quaternion_using_matrix(self.leap_center_offset, leftHand_rot)
- after_left_offset_base[1] += self.left_offset
- p.resetBasePositionAndOrientation(
- self.LeapId_2,
- after_left_offset_base,
- leftHand_rot,
- )
- self.rest_target_vis()
- 映射结果到真实机器人
- # 映射结果到真实机器人
- real_right_robot_hand_q = np.array([0.0 for _ in range(16)])
- real_left_robot_hand_q = np.array([0.0 for _ in range(16)])
- real_right_robot_hand_q[0:4] = jointPoses[0:4]
- real_right_robot_hand_q[4:8] = jointPoses[4:8]
- real_right_robot_hand_q[8:12] = jointPoses[8:12]
- real_right_robot_hand_q[12:16] = jointPoses[12:16]
- real_right_robot_hand_q[0:2] = real_right_robot_hand_q[0:2][::-1]
- real_right_robot_hand_q[4:6] = real_right_robot_hand_q[4:6][::-1]
- real_right_robot_hand_q[8:10] = real_right_robot_hand_q[8:10][::-1]
- real_left_robot_hand_q[0:4] = jointPoses_2[0:4]
- real_left_robot_hand_q[4:8] = jointPoses_2[4:8]
- real_left_robot_hand_q[8:12] = jointPoses_2[8:12]
- real_left_robot_hand_q[12:16] = jointPoses_2[12:16]
- real_left_robot_hand_q[0:2] = real_left_robot_hand_q[0:2][::-1]
- real_left_robot_hand_q[4:6] = real_left_robot_hand_q[4:6][::-1]
- real_left_robot_hand_q[8:10] = real_left_robot_hand_q[8:10][::-1]
- 生成点云
- # 生成左手和右手的点云
- right_hand_pointcloud, left_hand_pointcloud = self.get_mesh_pointcloud(real_right_robot_hand_q, real_left_robot_hand_q)
- 进一步映射关节到真实机器人
- # 进一步映射关节到真实机器人
- real_right_robot_hand_q += np.pi
- real_left_robot_hand_q += np.pi
- real_left_robot_hand_q[0] = np.pi * 2 - real_left_robot_hand_q[0]
- real_left_robot_hand_q[4] = np.pi * 2 - real_left_robot_hand_q[4]
- real_left_robot_hand_q[8] = np.pi * 2 - real_left_robot_hand_q[8]
- real_left_robot_hand_q[12] = np.pi * 2 - real_left_robot_hand_q[12]
- real_left_robot_hand_q[13] = np.pi * 2 - real_left_robot_hand_q[13]
- 返回结果,该方法返回右手和左手的关节位置,以及生成的点云
return real_right_robot_hand_q, real_left_robot_hand_q, right_hand_pointcloud, left_hand_pointcloud # 返回结果
此外,像STEP2_build_dataset/utils.py这个代码文件里还有对于transform_left_leap_pointcloud_to_camera_frame的实现,其将右手LEAP点云转换到相机坐标系
第三部分 STEP3_train_policy
3.1 robomimic/algo
3.1.1 algo.py
该代码文件里 定义了一系列函数,比如register_algo_factory_func、algo_name_to_factory_func、algo_factory
之后定义了几个大类,比如
- 策略算法基类PolicyAlgo:定义了所有可以用作策略的算法的基类
- class PolicyAlgo(Algo): # 定义一个名为 PolicyAlgo 的类,继承自 Algo
- """
- Base class for all algorithms that can be used as policies.
- """
- def get_action(self, obs_dict, goal_dict=None): # 定义获取动作的方法
- """
- Get policy action outputs.
- Args:
- obs_dict (dict): current observation # 当前观测
- goal_dict (dict): (optional) goal # (可选)目标
- Returns:
- action (torch.Tensor): action tensor # 动作张量
- """
- raise NotImplementedError # 子类需要实现该方法
- 值函数算法基类ValueAlgo:定义了所有可以学习值函数的算法的基类,比如get_state_value(状态值)、get_state_action_value(状态-动作值)
- class ValueAlgo(Algo): # 定义一个名为 ValueAlgo 的类,继承自 Algo
- """
- Base class for all algorithms that can learn a value function.
- """
- def get_state_value(self, obs_dict, goal_dict=None): # 定义获取状态值的方法
- """
- Get state value outputs.
- Args:
- obs_dict (dict): current observation # 当前观测
- goal_dict (dict): (optional) goal # (可选)目标
- Returns:
- value (torch.Tensor): value tensor # 值张量
- """
- raise NotImplementedError # 子类需要实现该方法
- def get_state_action_value(self, obs_dict, actions, goal_dict=None): # 定义获取状态-动作值的方法
- """
- Get state-action value outputs.
- Args:
- obs_dict (dict): current observation # 当前观测
- actions (torch.Tensor): action # 动作
- goal_dict (dict): (optional) goal # (可选)目标
- Returns:
- value (torch.Tensor): value tensor # 值张量
- """
- raise NotImplementedError # 子类需要实现该方法
- 规划算法基类PlannerAlgo:定义了所有可以用于规划子目标的算法的基类
- class PlannerAlgo(Algo): # 定义一个名为 PlannerAlgo 的类,继承自 Algo
- """
- Base class for all algorithms that can be used for planning subgoals
- conditioned on current observations and potential goal observations.
- """
- def get_subgoal_predictions(self, obs_dict, goal_dict=None): # 定义获取子目标预测的方法
- """
- Get predicted subgoal outputs.
- Args:
- obs_dict (dict): current observation # 当前观测
- goal_dict (dict): (optional) goal # (可选)目标
- Returns:
- subgoal prediction (dict): name -> Tensor [batch_size, ...] # 子目标预测
- """
- raise NotImplementedError # 子类需要实现该方法
- def sample_subgoals(self, obs_dict, goal_dict, num_samples=1): # 定义采样子目标的方法
- """
- For planners that rely on sampling subgoals.
- Args:
- obs_dict (dict): current observation # 当前观测
- goal_dict (dict): (optional) goal # (可选)目标
- Returns:
- subgoals (dict): name -> Tensor [batch_size, num_samples, ...] # 子目标
- """
- raise NotImplementedError # 子类需要实现该方法
- 层次算法基类HierarchicalAlgo:定义了所有层次算法的基类,这些算法包括:1) 子目标规划,和2)子目标条件策略学习
- class HierarchicalAlgo(Algo): # 定义一个名为 HierarchicalAlgo 的类,继承自 Algo
- """
- Base class for all hierarchical algorithms that consist of (1) subgoal planning
- and (2) subgoal-conditioned policy learning.
- """
- def get_action(self, obs_dict, goal_dict=None): # 定义获取动作的方法
- """
- Get policy action outputs.
- Args:
- obs_dict (dict): current observation # 当前观测
- goal_dict (dict): (optional) goal # (可选)目标
- Returns:
- action (torch.Tensor): action tensor # 动作张量
- """
- raise NotImplementedError # 子类需要实现该方法
- def get_subgoal_predictions(self, obs_dict, goal_dict=None): # 定义获取子目标预测的方法
- """
- Get subgoal predictions from high-level subgoal planner.
- Args:
- obs_dict (dict): current observation # 当前观测
- goal_dict (dict): (optional) goal # (可选)目标
- Returns:
- subgoal (dict): predicted subgoal # 预测的子目标
- """
- raise NotImplementedError # 子类需要实现该方法
- @property
- def current_subgoal(self): # 定义获取当前子目标的方法
- """
- Get the current subgoal for conditioning the low-level policy
- Returns:
- current subgoal (dict): predicted subgoal # 预测的子目标
- """
- raise NotImplementedError # 子类需要实现该方法
- 策略封装RolloutPolicy:封装算法对象以便在回合循环中运行策略
- class RolloutPolicy(object): # 定义一个名为 RolloutPolicy 的类
- """
- Wraps @Algo object to make it easy to run policies in a rollout loop.
- """
- def __init__(self, policy, obs_normalization_stats=None, action_normalization_stats=None): # 初始化方法
- """
- Args:
- policy (Algo instance): @Algo object to wrap to prepare for rollouts # 要封装的算法对象
- obs_normalization_stats (dict): optionally pass a dictionary for observation # 可选的观测归一化字典
- normalization. This should map observation keys to dicts
- with a "mean" and "std" of shape (1, ...) where ... is the default
- shape for the observation.
- """
- self.policy = policy # 保存算法对象
- self.obs_normalization_stats = obs_normalization_stats # 保存观测归一化字典
- self.action_normalization_stats = action_normalization_stats # 保存动作归一化字典
- def start_episode(self): # 开始新回合的方法
- """
- Prepare the policy to start a new rollout.
- """
- self.policy.set_eval() # 设置算法为评估模式
- self.policy.reset() # 重置算法
- def _prepare_observation(self, ob): # 准备观测数据的方法
- """
- Prepare raw observation dict from environment for policy.
- Args:
- ob (dict): single observation dictionary from environment (no batch dimension,
- and np.array values for each key) # 来自环境的单个观测字典(无批次维度,每个键的值为 np.array)
- """
- ob = TensorUtils.to_tensor(ob) # 将观测数据转换为张量
- ob = TensorUtils.to_batch(ob) # 将观测数据转换为批次
- ob = TensorUtils.to_device(ob, self.policy.device) # 将观测数据移动到算法所在的设备
- ob = TensorUtils.to_float(ob) # 将观测数据转换为浮点数
- if self.obs_normalization_stats is not None: # 如果提供了观测归一化字典
- # ensure obs_normalization_stats are torch Tensors on proper device
- obs_normalization_stats = TensorUtils.to_float(TensorUtils.to_device(TensorUtils.to_tensor(self.obs_normalization_stats), self.policy.device)) # 确保观测归一化字典是正确设备上的 torch 张量
- # limit normalization to obs keys being used, in case environment includes extra keys
- ob = { k : ob[k] for k in self.policy.global_config.all_obs_keys } # 限制归一化到正在使用的观测键,以防环境中包含额外的键
- ob = ObsUtils.normalize_dict(ob, normalization_stats=obs_normalization_stats) # 归一化观测字典
- return ob # 返回准备好的观测数据
- def __repr__(self): # 打印网络描述的方法
- """Pretty print network description"""
- return self.policy.__repr__() # 返回算法对象的字符串表示
- def __call__(self, ob, goal=None): # 调用方法,生成动作
- """
- Produce action from raw observation dict (and maybe goal dict) from environment.
- Args:
- ob (dict): single observation dictionary from environment (no batch dimension,
- and np.array values for each key) # 来自环境的单个观测字典(无批次维度,每个键的值为 np.array)
- goal (dict): goal observation # 目标观测
- """
- ob = self._prepare_observation(ob) # 准备观测数据
- if goal is not None: # 如果提供了目标观测
- goal = self._prepare_observation(goal) # 准备目标观测数据
- ac = self.policy.get_action(obs_dict=ob, goal_dict=goal) # 获取动作
- ac = TensorUtils.to_numpy(ac[0]) # 将动作转换为 numpy 数组
- if self.action_normalization_stats is not None: # 如果提供了动作归一化字典
- action_keys = self.policy.global_config.train.action_keys # 获取动作键
- action_shapes = {k: self.action_normalization_stats[k]["offset"].shape[1:] for k in self.action_normalization_stats} # 获取动作形状
- ac_dict = AcUtils.vector_to_action_dict(ac, action_shapes=action_shapes, action_keys=action_keys) # 将动作向量转换为动作字典
- ac_dict = ObsUtils.unnormalize_dict(ac_dict, normalization_stats=self.action_normalization_stats) # 反归一化动作字典
- action_config = self.policy.global_config.train.action_config # 获取动作配置
- for key, value in ac_dict.items(): # 遍历动作字典
- this_format = action_config[key].get('format', None) # 获取动作格式
- if this_format == 'rot_6d': # 如果动作格式为 'rot_6d'
- rot_6d = torch.from_numpy(value).unsqueeze(0) # 将值转换为 torch 张量并添加一个维度
- rot = TorchUtils.rot_6d_to_axis_angle(rot_6d=rot_6d).squeeze().numpy() # 将 'rot_6d' 转换为轴角表示并转换为 numpy 数组
- ac_dict[key] = rot # 更新动作字典中的值
- ac = AcUtils.action_dict_to_vector(ac_dict, action_keys=action_keys) # 将动作字典转换为动作向量
- return ac # 返回动作
3.1.2 bc.py:实现行为克隆算法及其多种变体
3.1.3 bcq.py:实现批约束Q学习(Batch-Constrained Q-Learning)算法
3.1.4 cql.py:实现conservative Q-Leaning算法
3.1.5 diffusion_policy.py
关于diffusion_policy.py的实现,详见此文《Diffusion Policy——斯坦福机器人UMI所用的扩散策略:从原理到其编码实现(含Diff-Control、ControlNet详解)》的第二部分 Diffusion Policy的编码实现与源码解读
3.1.6 gl.py:实现子目标预测模型,用于HBC和IRIS算法
3.1.7 hbc.py:实现分层行为克隆(Hierarchical Behavioral Cloning, HBC)
这段代码实现了分层行为克隆(HBC),其中规划器模型输出子目标(未来观祭),而 actor模型则以子目标为条件尝试达到这些目标
具体功能包括:
- 初始化方法:接受多个参数,用于配置 HBC 的结构和参数
- 处理训练批次:处理人数据加载器中采样的输入批次,过滤出相关信息并准备批次进行训练
- 在单个批次上进行训练:在单个数据批次上进行训练,预测子目标并计算损失
- 记录信息:处理从 etrain_on_batch 返回的信息
3.1.8 iql.py:实现隐式Q学习(implicit Q-Learning,IQL)算法
- 导入必要的库和模块
- 注册所谓的算法工厂函数
- 定义IQL类
class IQL(PolicyAlgo, ValueAlgo): # 定义 IQL 类,继承自 PolicyAlgo 和 ValueAlgo
- 创建网络并将其放入self.nets中
包括actor网络- def _create_networks(self):
- """
- Creates networks and places them into @self.nets.
- Networks for this algo: critic (potentially ensemble), actor, value function
- """
- # Create nets
- self.nets = nn.ModuleDict() # 创建一个模块字典来存储网络
- # Assemble args to pass to actor
- actor_args = dict(self.algo_config.actor.net.common) # 获取 actor 网络的公共参数
- # Add network-specific args and define network class
- if self.algo_config.actor.net.type == "gaussian": # 如果 actor 网络类型是高斯网络
- actor_cls = PolicyNets.GaussianActorNetwork # 使用高斯 actor 网络类
- actor_args.update(dict(self.algo_config.actor.net.gaussian)) # 更新 actor 参数
- elif self.algo_config.actor.net.type == "gmm": # 如果 actor 网络类型是 GMM 网络
- actor_cls = PolicyNets.GMMActorNetwork # 使用 GMM actor 网络类
- actor_args.update(dict(self.algo_config.actor.net.gmm)) # 更新 actor 参数
- else:
- # Unsupported actor type!
- raise ValueError(f"Unsupported actor requested. " # 抛出不支持的 actor 类型错误
- f"Requested: {self.algo_config.actor.net.type}, "
- f"valid options are: {['gaussian', 'gmm']}")
critic网络- # Actor
- self.nets["actor"] = actor_cls( # 创建 actor 网络
- obs_shapes=self.obs_shapes, # 观察形状
- goal_shapes=self.goal_shapes, # 目标形状
- ac_dim=self.ac_dim, # 动作维度
- mlp_layer_dims=self.algo_config.actor.layer_dims, # MLP 层维度
- encoder_kwargs=ObsUtils.obs_encoder_kwargs_from_config(self.obs_config.encoder), # 编码器参数
- **actor_args, # 其他 actor 参数
- )
价值函数网络- # Critics
- self.nets["critic"] = nn.ModuleList() # 创建一个模块列表来存储 critic 网络
- self.nets["critic_target"] = nn.ModuleList() # 创建一个模块列表来存储目标 critic 网络
- for _ in range(self.algo_config.critic.ensemble.n): # 遍历 critic 集成的数量
- for net_list in (self.nets["critic"], self.nets["critic_target"]): # 遍历 critic 和目标 critic 列表
- critic = ValueNets.ActionValueNetwork( # 创建 critic 网络
- obs_shapes=self.obs_shapes, # 观察形状
- ac_dim=self.ac_dim, # 动作维度
- mlp_layer_dims=self.algo_config.critic.layer_dims, # MLP 层维度
- goal_shapes=self.goal_shapes, # 目标形状
- encoder_kwargs=ObsUtils.obs_encoder_kwargs_from_config(self.obs_config.encoder), # 编码器参数
- )
- net_list.append(critic) # 将 critic 网络添加到列表中
- # Value function network
- self.nets["vf"] = ValueNets.ValueNetwork( # 创建价值函数网络
- obs_shapes=self.obs_shapes, # 观察形状
- mlp_layer_dims=self.algo_config.critic.layer_dims, # MLP 层维度
- goal_shapes=self.goal_shapes, # 目标形状
- encoder_kwargs=ObsUtils.obs_encoder_kwargs_from_config(self.obs_config.encoder), # 编码器参数
- )
- # Send networks to appropriate device
- self.nets = self.nets.float().to(self.device) # 将网络转换为浮点数并移动到设备上
- # sync target networks at beginning of training
- with torch.no_grad(): # 在不计算梯度的上下文中
- for critic, critic_target in zip(self.nets["critic"], self.nets["critic_target"]): # 遍历 critic 和目标 critic
- TorchUtils.hard_update( # 硬更新目标网络
- source=critic, # 源网络
- target=critic_target, # 目标网络
- 处理训练批次
比如处理从数据加载器中采样的输入批次,过滤出相关信息并准备批次进行训练- def process_batch_for_training(self, batch):
- """
- Processes input batch from a data loader to filter out relevant info and prepare the batch for training.
- Args:
- batch (dict): dictionary with torch.Tensors sampled
- from a data loader
- Returns:
- input_batch (dict): processed and filtered batch that
- will be used for training
- """
- input_batch = dict() # 创建一个字典来存储处理后的批次
- # remove temporal batches for all
- input_batch["obs"] = {k: batch["obs"][k][:, 0, :] for k in batch["obs"]} # 获取第一个时间步的观察
- input_batch["next_obs"] = {k: batch["next_obs"][k][:, 0, :] for k in batch["next_obs"]} # 获取第一个时间步的下一个观察
- input_batch["goal_obs"] = batch.get("goal_obs", None) # 获取目标观察,可能不存在
- input_batch["actions"] = batch["actions"][:, 0, :] # 获取第一个时间步的动作
- input_batch["dones"] = batch["dones"][:, 0] # 获取第一个时间步的完成标志
- input_batch["rewards"] = batch["rewards"][:, 0] # 获取第一个时间步的奖励
- return TensorUtils.to_device(TensorUtils.to_float(input_batch), self.device) # 将批次转换为浮点数并移动到设备上
- 在单个批次上进行训练,分别计算critic和actor损失,并更新网络
- def train_on_batch(self, batch, epoch, validate=False):
- """
- Training on a single batch of data.
- Args:
- batch (dict): dictionary with torch.Tensors sampled
- from a data loader and filtered by @process_batch_for_training
- epoch (int): epoch number - required by some Algos that need
- to perform staged training and early stopping
- validate (bool): if True, don't perform any learning updates.
- Returns:
- info (dict): dictionary of relevant inputs, outputs, and losses
- that might be relevant for logging
- """
- info = OrderedDict() # 创建一个有序字典来存储信息
- # Set the correct context for this training step
- with TorchUtils.maybe_no_grad(no_grad=validate): # 在不计算梯度的上下文中
- # Always run super call first
- info = super().train_on_batch(batch, epoch, validate=validate) # 调用父类的 train_on_batch 方法
- # Compute loss for critic(s)
- critic_losses, vf_loss, critic_info = self._compute_critic_loss(batch) # 计算 critic 的损失
- # Compute loss for actor
- actor_loss, actor_info = self._compute_actor_loss(batch, critic_info) # 计算 actor 的损失
- if not validate: # 如果不是验证模式
- # Critic update
- self._update_critic(critic_losses, vf_loss) # 更新 critic
- # Actor update
- self._update_actor(actor_loss) # 更新 actor
- # Update info
- info.update(actor_info) # 更新信息字典
- info.update(critic_info) # 更新信息字典
- # Return stats
- return info # 返回信息字典
- 计算critic损失,计算Q和V的损失,并返回损失和相关信息
为方便大家理解,我们逐步分析下- def _compute_critic_loss(self, batch):
- """
- Helper function for computing Q and V losses. Called by @train_on_batch
- Args:
- batch (dict): dictionary with torch.Tensors sampled
- from a data loader and filtered by @process_batch_for_training
- Returns:
- critic_losses (list): list of critic (Q function) losses
- vf_loss (torch.Tensor): value function loss
- info (dict): dictionary of Q / V predictions and losses
- """
- info = OrderedDict() # 创建一个有序字典用于存储信息
- # get batch values
- obs = batch["obs"] # 获取观察
- actions = batch["actions"] # 获取动作
- next_obs = batch["next_obs"] # 获取下一个观察
- goal_obs = batch["goal_obs"] # 获取目标观察
- rewards = torch.unsqueeze(batch["rewards"], 1) # 获取奖励并增加一个维度
- dones = torch.unsqueeze(batch["dones"], 1) # 获取完成标志并增加一个维度
首先,对于Q 函数损失
Q 函数损失是通过计算 Q 预测值与目标 Q 值之间的误差来实现的
其中Q 预测值为
而目标 Q 值是通过贝尔曼方程计算得到的,其计算公式为- # Q predictions
- pred_qs = [critic(obs_dict=obs, acts=actions, goal_dict=goal_obs)
- for critic in self.nets["critic"]] # 预测 Q 值
- info["critic/critic1_pred"] = pred_qs[0].mean() # 记录第一个 critic 的平均预测值
其中:- # Q target values
- target_vf_pred = self.nets["vf"](obs_dict=next_obs, goal_dict=goal_obs).detach() # 预测目标价值函数
- q_target = rewards + (1. - dones) * self.algo_config.discount * target_vf_pred # 计算 Q 目标值
- q_target = q_target.detach() # 分离计算图
-是奖励值
-是折扣因子
-是完成信号(done signal)
-是目标价值函数的预测值
Q 函数损失的计算公式如下:
其中:- # Q losses
- critic_losses = [] # 创建一个列表用于存储 critic 的损失
- td_loss_fcn = nn.SmoothL1Loss() if self.algo_config.critic.use_huber else nn.MSELoss() # 选择损失函数
- for (i, q_pred) in enumerate(pred_qs): # 遍历预测的 Q 值
- # Calculate td error loss
- td_loss = td_loss_fcn(q_pred, q_target) # 计算 TD 误差损失
- info[f"critic/critic{i+1}_loss"] = td_loss # 记录 critic 的损失
- critic_losses.append(td_loss) # 将损失添加到列表中
-是 Q 网络的预测值
-是目标 Q 值
-是损失函数,可以是均方误差(MSE)或 Huber 损失
其次,价值函数损失是通过期望回归(expectile regression)来实现的「参考的论文为:Offline Reinforcement Learning with Implicit Q-Learning」,其计算公式如下:
其中:- # V losses: expectile regression. see section 4.1 in https://arxiv.org/pdf/2110.06169.pdf
- vf_err = vf_pred - q_pred # 计算价值函数误差
- vf_sign = (vf_err > 0).float() # 计算误差符号
- vf_weight = (1 - vf_sign) * self.algo_config.vf_quantile + vf_sign * (1 - self.algo_config.vf_quantile) # 计算价值函数权重
- vf_loss = (vf_weight * (vf_err ** 2)).mean() # 计算价值函数损失
-是价值函数的预测值
-- # V predictions
- pred_qs = [critic(obs_dict=obs, acts=actions, goal_dict=goal_obs)
- for critic in self.nets["critic_target"]] # 预测 V 值
- q_pred, _ = torch.cat(pred_qs, dim=1).min(dim=1, keepdim=True) # 计算最小 Q 值
- q_pred = q_pred.detach() # 分离计算图
- vf_pred = self.nets["vf"](obs) # 预测价值函数
是上面所说的 Q 网络的预测值
-是权重,定义如下:
其中是期望分位数(expectile quantile)
最后- # update logs for V loss
- info["vf/q_pred"] = q_pred # 记录 Q 预测值
- info["vf/v_pred"] = vf_pred # 记录 V 预测值
- info["vf/v_loss"] = vf_loss # 记录 V 损失
- # Return stats
- return critic_losses, vf_loss, info # 返回统计信息
- 更新critic和价值函数网络
- def _update_critic(self, critic_losses, vf_loss):
- """
- Helper function for updating critic and vf networks. Called by @train_on_batch
- Args:
- critic_losses (list): list of critic (Q function) losses
- vf_loss (torch.Tensor): value function loss
- """
- # update ensemble of critics
- for (critic_loss, critic, critic_target, optimizer) in zip(
- critic_losses, self.nets["critic"], self.nets["critic_target"], self.optimizers["critic"]
- ): # 遍历 critic 损失、critic 网络、目标 critic 网络和优化器
- TorchUtils.backprop_for_loss(
- net=critic, # critic 网络
- optim=optimizer, # 优化器
- loss=critic_loss, # critic 损失
- max_grad_norm=self.algo_config.critic.max_gradient_norm, # 最大梯度范数
- retain_graph=False, # 不保留计算图
- )
- # update target network
- with torch.no_grad(): # 在不计算梯度的上下文中
- TorchUtils.soft_update(source=critic, target=critic_target, tau=self.algo_config.target_tau) # 软更新目标网络
- # update V function network
- TorchUtils.backprop_for_loss(
- net=self.nets["vf"], # 价值函数网络
- optim=self.optimizers["vf"], # 优化器
- loss=vf_loss, # 价值函数损失
- max_grad_norm=self.algo_config.critic.max_gradient_norm, # 最大梯度范数
- retain_graph=False, # 不保留计算图
- )
- 计算actor损失
先
然后,计算动作的对数概率- def _compute_actor_loss(self, batch, critic_info):
- """
- Helper function for computing actor loss. Called by @train_on_batch
- Args:
- batch (dict): dictionary with torch.Tensors sampled
- from a data loader and filtered by @process_batch_for_training
- critic_info (dict): dictionary containing Q and V function predictions,
- to be used for computing advantage estimates
- Returns:
- actor_loss (torch.Tensor): actor loss
- info (dict): dictionary of actor losses, log_probs, advantages, and weights
- """
- info = OrderedDict() # 创建一个有序字典来存储信息
接着,计算优势估计- # compute log probability of batch actions
- dist = self.nets["actor"].forward_train(obs_dict=batch["obs"], goal_dict=batch["goal_obs"]) # 计算动作的对数概率
- log_prob = dist.log_prob(batch["actions"]) # 计算动作的对数概率
- info["actor/log_prob"] = log_prob.mean() # 记录动作的平均对数概率
是上面所说的 Q 网络的预测值
是价值函数的预测值
再计算优势权重,这一步在 _get_adv_weights 函数中完成,具体计算方式如下:- # compute advantage estimate
- q_pred = critic_info["vf/q_pred"] # 获取 Q 预测值
- v_pred = critic_info["vf/v_pred"] # 获取价值函数预测值
- adv = q_pred - v_pred # 计算优势估计
最后,计算加权的actor损失- # compute weights
- weights = self._get_adv_weights(adv) # 计算优势权重
再通过actor_loss=actor_loss.mean(),得出这个损失的平均值
再返回- # compute advantage weighted actor loss. disable gradients through weights
- actor_loss = (-log_prob * weights.detach()).mean() # 计算加权的 actor 损失
- info["actor/loss"] = actor_loss # 记录 actor 损失
- # log adv-related values
- info["adv/adv"] = adv # 记录优势估计
- info["adv/adv_weight"] = weights # 记录优势权重
- # Return stats
- return actor_loss, info # 返回 actor 损失和信息字
- 更新Actor网络
- def _update_actor(self, actor_loss):
- """
- 更新 Actor 网络的辅助函数。由 @train_on_batch 调用。
- Args:
- actor_loss (torch.Tensor): Actor 损失
- """
- TorchUtils.backprop_for_loss( # 使用 TorchUtils 进行反向传播
- net=self.nets["actor"], # Actor 网络
- optim=self.optimizers["actor"], # Actor 优化器
- loss=actor_loss, # Actor 损失
- max_grad_norm=self.algo_config.actor.max_gradient_norm, # 最大梯度范数
- )
- 计算优势权重
- def _get_adv_weights(self, adv):
- """
- 计算优势权重的辅助函数。由 @_compute_actor_loss 调用。
- Args:
- adv (torch.Tensor): 原始优势估计
- Returns:
- weights (torch.Tensor): 基于优势估计计算的权重,形状为 (B,),其中 B 是批次大小
- """
- # 剪辑原始优势值
- if self.algo_config.adv.clip_adv_value is not None:
- adv = adv.clamp(max=self.algo_config.adv.clip_adv_value) # 剪辑优势值
- # 基于优势值计算权重
- beta = self.algo_config.adv.beta # 温度因子
- weights = torch.exp(adv / beta) # 计算权重
- # 剪辑最终权重
- if self.algo_config.adv.use_final_clip is True:
- weights = weights.clamp(-100.0, 100.0) # 剪辑权重
- # 从 (B, 1) 重新调整为 (B,)
- return weights[:, 0] # 返回权重
- 记录信息
- def log_info(self, info):
- """
- 处理从 @train_on_batch 返回的信息字典,以汇总信息传递给 tensorboard 进行日志记录。
- Args:
- info (dict): 信息字典
- Returns:
- loss_log (dict): 名称 -> 汇总统计
- """
- log = OrderedDict() # 创建有序字典
- log["actor/log_prob"] = info["actor/log_prob"].item() # 记录 Actor 的 log_prob
- log["actor/loss"] = info["actor/loss"].item() # 记录 Actor 的损失
- log["critic/critic1_pred"] = info["critic/critic1_pred"].item() # 记录 Critic1 的预测值
- log["critic/critic1_loss"] = info["critic/critic1_loss"].item() # 记录 Critic1 的损失
- log["vf/v_loss"] = info["vf/v_loss"].item() # 记录价值函数的损失
- self._log_data_attributes(log, info, "vf/q_pred") # 记录 Q 预测值的统计信息
- self._log_data_attributes(log, info, "vf/v_pred") # 记录 V 预测值的统计信息
- self._log_data_attributes(log, info, "adv/adv") # 记录优势的统计信息
- self._log_data_attributes(log, info, "adv/adv_weight") # 记录优势权重的统计信息
- return log # 返回日志
- 记录数据属性
- def _log_data_attributes(self, log, info, key):
- """
- 记录统计信息的辅助函数。修改 log 就地。
- Args:
- log (dict): 现有的日志字典
- info (dict): 包含原始统计信息的现有字典
- key (str): 要记录的键
- """
- log[key + "/max"] = info[key].max().item() # 记录最大值
- log[key + "/min"] = info[key].min().item() # 记录最小值
- log[key + "/mean"] = info[key].mean().item() # 记录均值
- log[key + "/std"] = info[key].std().item() # 记录标准差
- 每个epoch结束时调用
- def on_epoch_end(self, epoch):
- """
- 在每个 epoch 结束时调用。
- """
- # 学习率调度更新
- for lr_sc in self.lr_schedulers["critic"]:
- if lr_sc is not None:
- lr_sc.step() # 更新 Critic 的学习率调度
- if self.lr_schedulers["vf"] is not None:
- self.lr_schedulers["vf"].step() # 更新价值函数的学习率调度
- if self.lr_schedulers["actor"] is not None:
- self.lr_schedulers["actor"].step() # 更新 Actor 的学习率调度
- 获取策略动作输出
- def get_action(self, obs_dict, goal_dict=None):
- """
- 获取策略动作输出。
- Args:
- obs_dict (dict): 当前观察
- goal_dict (dict): (可选)目标
- Returns:
- action (torch.Tensor): 动作张量
- """
- assert not self.nets.training # 确保网络不在训练模式
- return self.nets["actor"](obs_dict=obs_dict, goal_dict=goal_dict) # 返回 Actor 的动作输出
3.1.9 iris.py:实现李飞飞团队的 IRIS 算法
IRIS 算法对应的论文为:IRIS:基于离线机器人操作数据的大规模无交互隐式强化学习控制
代码实现了 IRIS 算法,包括初始化方法、处理训练批次、获取状态值和获取状态-动作值的功能。通过这些功能,IRIS 算法能够在训练过程中更新网络参数,并根据观察和目标获取相应的值
// 待更
3.1.10 td3_bc.py:实现TD3-BC 算法
Based on https://github.com/sfujim/TD3_BC,其对应的paper为:Off-Policy Deep Reinforcement Learning without Exploration
// 待更
评论记录:
回复评论: