首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐
2025年5月23日 星期五 7:49am

斯坦福泡茶机器人DexCap源码解析:涵盖收集数据、处理数据、模型训练三大阶段

  • 25-03-02 10:42
  • 2449
  • 12402
blog.csdn.net

前言

因为我司「七月在线」关于dexcap的复现/优化接近尾声了(每月逐步提高复现的效果),故准备把dexcap的源码也分析下,11月​下旬则分析下iDP3的源码——为队伍「iDP3人形的复现/优化」助力

最开始,dexcap的源码分析属于此文《DexCap——斯坦福李飞飞团队泡茶机器人:带灵巧手和动作捕捉的数据收集系统(含硬件清单)》的第三部分

然原理讲解、硬件配置、源码解析都放在一篇文章里的话,篇幅会显得特别长,故把源码的部分抽取出来,独立成此文

以下为本文的全文目录

前言

第一部分 STEP1_collect_data

1.1 calculate_offset_vis_calib.py

1.2 data_recording.py——配置多个Realsense T265相机和一个L515相机

1.2.1 一系列定义:比如保存帧数据的函数

1.2.2 RealsenseProcessor 的类:获取多个相机的RGB-D帧和位姿数据

1.3 demo_clipping_3d.py——可视化点云数据且选择每个演示的起始帧/结束帧

1.4 redis_glove_server.py——接收UDP数据包并将手部关节数据存储到Redis数据库

1.5 replay_human_traj_vis.py——可视化保存点云数据和位姿(可用于查看所有帧和校准)

1.6 transform_to_robot_table.py——将可视化点云数据放置在机器人桌面上

1.7 visualizer.py——可视化手部关节数据

1.7.1 一系列定义:比如五个手指等

1.7.2 DataVisualizer类:用于可视化点云数据和手部关节数据

1.7.3 _load_frame_data:加载给定帧的点云和位姿数据,并进行可视化处理

第二部分 STEP2_build_dataset

2.1 dataset_utils.py

2.1.1 read_pose_data:读取和处理手部姿态数据(过程中使用 LeapPybulLetIK 计算逆运动学,得到手部关节的目标位置)

2.1.2 process_hdf5:处理多个数据集(手部姿态/图像/点云),作为模型的训练数据集

2.2 pybullet_ik_bimanual.py——含点云生成、计算逆运动学compute_IK的实现

2.2.1 导入库、类的定义和初始化、获取网格点云的方法等等

2.2.2 create_target_vis、update_target_vis、rest_target_vis的实现

2.2.3 compute_IK的实现:用于计算双手的逆运动学IK——使用PyBullet来模拟手部关节的位置和方向

第三部分 STEP3_train_policy

3.1 robomimic/algo

3.1.1algo.py

3.1.2 bc.py

3.1.3 bcq.py

3.1.4 cql.py

3.1.5 diffusion_policy.py

3.1.6 gl.py

3.1.7 hbc.py

3.1.8 iql.py

3.1.9 iris.py

3.1.10 td3_bc.py

第一部分 STEP1_collect_data

1.1 calculate_offset_vis_calib.py

这段代码是一个用于校准和保存偏移量、方向偏移量的脚本,用于从指定目录中读取数据,计算偏移量和方向偏移量,并将结果保存到指定目录中,以下是代码的部分解释

  1. 导入库
    1. import argparse # 用于解析命令行参数
    2. import numpy as np # 用于数值计算
    3. import os # 用于操作系统相关功能
    4. import sys # 用于系统相关功能
    5. from scipy.spatial.transform import Rotation as R # 用于旋转矩阵和欧拉角转换
    6. from transforms3d.euler import euler2mat # 用于将欧拉角转换为旋转矩阵
  2. 主函数
    1. if __name__ == "__main__":
    2. parser = argparse.ArgumentParser(description="calibrate and save in dataset folder") # 创建命令行参数解析器
    3. parser.add_argument("--directory", type=str, default="", help="Directory with saved data") # 添加目录参数
    4. parser.add_argument("--default", type=str, default="default_offset", help="Directory with saved data") # 添加默认目录参数
    5. args = parser.parse_args() # 解析命令行参数
    6. if os.path.exists("{}/calib_offset.txt".format(args.directory)): # 检查目标目录中是否已经存在calib_offset.txt文件
    7. response = (
    8. input(
    9. f"calib_offset.txt already exists. Do you want to override? (y/n): " # 提示用户是否覆盖文件
    10. )
    11. .strip()
    12. .lower()
    13. )
    14. if response != "y": # 如果用户选择不覆盖,退出程序
    15. print("Exiting program without overriding the existing directory.")
    16. sys.exit()
  3. 读取默认偏移量和方向偏移量
    1. default_offset = np.loadtxt(os.path.join(args.default, "calib_offset.txt")) # 读取默认偏移量
    2. default_ori = np.loadtxt(os.path.join(args.default, "calib_ori_offset.txt")) # 读取默认方向偏移量
    3. default_offset_left = np.loadtxt(os.path.join(args.default, "calib_offset_left.txt")) # 读取左手默认偏移量
    4. default_ori_left = np.loadtxt(os.path.join(args.default, "calib_ori_offset_left.txt")) # 读取左手默认方向偏移量
    5. default_ori_matrix = euler2mat(*default_ori) # 将默认方向偏移量转换为旋转矩阵
    6. default_ori_matrix_left = euler2mat(*default_ori_left) # 将左手默认方向偏移量转换为旋转矩阵
  4. 提取偏移量和方向偏移量
    1. frame_dirs = os.listdir("./test_data") # 列出test_data目录中的所有文件
    2. list = [] # 存储偏移量的列表
    3. list_ori = [] # 存储方向偏移量的列表
    4. list_left = [] # 存储左手偏移量的列表
    5. list_ori_left = [] # 存储左手方向偏移量的列表
    6. for frame_dir in frame_dirs: #

1.2 data_recording.py——配置多个Realsense T265相机和一个L515相机

这段代码是一个用于记录和处理Realsense相机数据的Python脚本

顺带再回顾一下


DexCap团队设计了一个装载摄像机的背包「如图(a)、(b)所示,为方便大家对照,特把上图再贴一下,如下」

  • 在正前面,它通过胸部摄像机支架的4个插槽集成了4个相机,最顶部是一台Intel Realsense L515 RGB-D LiDAR摄像机,顶部下面是3个Realsense T265鱼眼SLAM跟踪相机(分别为红、绿、蓝),用于在人类数据收集过程中捕捉观察结果

1.2.1 一系列定义:比如保存帧数据的函数

它可以捕捉颜色图像、深度图像、姿态数据以及手部关节数据,并将这些数据保存到指定的目录中

  1. 首先,导入库
    1. """
    2. 示例用法
    3. python data_recording.py -s --store_hand -o ./save_data_scenario_1
    4. """
    5. import argparse # 用于解析命令行参数
    6. import copy # 用于复制对象
    7. import numpy as np # 用于数值计算
    8. import open3d as o3d # 用于3D数据处理
    9. import os # 用于操作系统相关功能
    10. import shutil # 用于文件操作
    11. import sys # 用于系统相关功能
    12. import pyrealsense2 as rs # 用于Realsense相机操作
    13. import cv2 # 用于图像处理
    14. from enum import IntEnum # 用于定义枚举类型
    15. from realsense_helper import get_profiles # 导入自定义的Realsense帮助函数
    16. from transforms3d.quaternions import axangle2quat, qmult, quat2mat, mat2quat # 用于四元数操作
    17. import redis # 用于Redis数据库操作
    18. import concurrent.futures # 用于并发执行
    19. from hyperparameters import* # 导入超参数
  2. 其次,定义一个枚举类型 Preset,用于表示不同的预设配置
    1. class Preset(IntEnum):
    2. Custom = 0
    3. Default = 1
    4. Hand = 2
    5. HighAccuracy = 3
    6. HighDensity = 4
    7. MediumDensity = 5
  3. 然后,保存帧数据的函数
    1. def save_frame(
    2. frame_id,
    3. out_directory,
    4. color_buffer,
    5. depth_buffer,
    6. pose_buffer,
    7. pose2_buffer,
    8. pose3_buffer,
    9. rightHandJoint_buffer,
    10. leftHandJoint_buffer,
    11. rightHandJointOri_buffer,
    12. leftHandJointOri_buffer,
    13. save_hand,
    14. ):
    15. frame_directory = os.path.join(out_directory, f"frame_{frame_id}") # 创建帧目录
    16. os.makedirs(frame_directory, exist_ok=True) # 如果目录不存在则创建
    17. cv2.imwrite(
    18. os.path.join(frame_directory, "color_image.jpg"),
    19. color_buffer[frame_id][:, :, ::-1], # 保存颜色图像
    20. )
    21. cv2.imwrite(
    22. os.path.join(frame_directory, "depth_image.png"), depth_buffer[frame_id] # 保存深度图像
    23. )
    24. np.savetxt(os.path.join(frame_directory, "pose.txt"), pose_buffer[frame_id]) # 保存姿态数据
    25. np.savetxt(os.path.join(frame_directory, "pose_2.txt"), pose2_buffer[frame_id]) # 保存第二个姿态数据
    26. np.savetxt(os.path.join(frame_directory, "pose_3.txt"), pose3_buffer[frame_id]) # 保存第三个姿态数据
    27. if save_hand: # 如果需要保存手部数据
    28. np.savetxt(
    29. os.path.join(frame_directory, "right_hand_joint.txt"),
    30. rightHandJoint_buffer[frame_id], # 保存右手关节数据
    31. )
    32. np.savetxt(
    33. os.path.join(frame_directory, "left_hand_joint.txt"),
    34. leftHandJoint_buffer[frame_id], # 保存左手关节数据
    35. )
    36. np.savetxt(
    37. os.path.join(frame_directory, "right_hand_joint_ori.txt"),
    38. rightHandJointOri_buffer[frame_id], # 保存右手关节方向数据
    39. )
    40. np.savetxt(
    41. os.path.join(frame_directory, "left_hand_joint_ori.txt"),
    42. leftHandJointOri_buffer[frame_id], # 保存左手关节方向数据
    43. )
    44. return f"frame {frame_id + 1} saved" # 返回保存帧的消息

1.2.2 RealsenseProcessor 的类:获取多个相机的RGB-D帧和位姿数据

接下来,定义了一个名为 RealsenseProcessor 的类,用于处理Realsense相机的数据。它可以配置多个Realsense T265相机和一个L515相机,获取RGB-D帧和位姿数据,并可视化和存储这些数据

以下是对RealsenseProcessor类的详细解读:

  1. 类定义和初始化
    初始化方法接受多个参数,用于配置T265相机的序列号、总帧数、是否存储帧、输出目录、是否保存手部数据和是否启用可视化
    1. class RealsesneProcessor: # 定义Realsense处理器类
    2. def __init__( # 初始化方法
    3. self,
    4. first_t265_serial, # 第一个T265相机的序列号
    5. second_t265_serial, # 第二个T265相机的序列号
    6. thrid_t265_serial, # 第三个T265相机的序列号
    7. total_frame, # 总帧数
    8. store_frame=False, # 是否存储帧,默认为False
    9. out_directory=None, # 输出目录,默认为None
    10. save_hand=False, # 是否保存手部数据,默认为False
    11. enable_visualization=True, # 是否启用可视化,默认为True
    12. ):
    13. self.first_t265_serial = first_t265_serial # 初始化第一个T265相机的序列号
    14. self.second_t265_serial = second_t265_serial # 初始化第二个T265相机的序列号
    15. self.thrid_t265_serial = thrid_t265_serial # 初始化第三个T265相机的序列号
    16. self.store_frame = store_frame # 初始化是否存储帧
    17. self.out_directory = out_directory # 初始化输出目录
    18. self.total_frame = total_frame # 初始化总帧数
    19. self.save_hand = save_hand # 初始化是否保存手部数据
    20. self.enable_visualization = enable_visualization # 初始化是否启用可视化
    21. self.rds = None # 初始化Redis连接
    初始化各种缓冲区,用于存储彩色图像、 深度图像、位姿数据,和手部关节数据(包含左右两手的关节位置、关节方向)
    1. self.color_buffer = [] # 初始化彩色图像缓冲区
    2. self.depth_buffer = [] # 初始化深度图像缓冲区
    3. self.pose_buffer = [] # 初始化第一个T265相机的位姿缓冲区
    4. self.pose2_buffer = [] # 初始化第二个T265相机的位姿缓冲区
    5. self.pose3_buffer = [] # 初始化第三个T265相机的位姿缓冲区
    6. self.pose2_image_buffer = [] # 初始化第二个T265相机的图像缓冲区
    7. self.pose3_image_buffer = [] # 初始化第三个T265相机的图像缓冲区
    8. self.rightHandJoint_buffer = [] # 初始化右手关节位置缓冲区
    9. self.leftHandJoint_buffer = [] # 初始化左手关节位置缓冲区
    10. self.rightHandJointOri_buffer = [] # 初始化右手关节方向缓冲区
    11. self.leftHandJointOri_buffer = [] # 初始化左手关节方向缓冲区
  2. 获取T265相机配置
    具体方法是根据T265相机的序列号和管道配置,返回一个配置对象
    1. def get_rs_t265_config(self, t265_serial, t265_pipeline):
    2. t265_config = rs.config()
    3. t265_config.enable_device(t265_serial)
    4. t265_config.enable_stream(rs.stream.pose)
    5. return t265_config
  3. 配置流
    该方法配置并启动多个Realsense相机的流,包括一个L515相机和三个T265相机
    如果启用了手部数据保存功能,则连接到Redis服务器
    1. def configure_stream(self): # 配置流的方法
    2. # 连接到Redis服务器
    3. if self.save_hand: # 如果启用了手部数据保存功能
    4. self.rds = redis.Redis(host="localhost", port=6669, db=0) # 连接到本地Redis服务器
    配置并启动L515相机的深度和彩色流
    1. # 创建一个管道
    2. self.pipeline = rs.pipeline() # 创建一个Realsense管道
    3. config = rs.config() # 创建一个配置对象
    4. color_profiles, depth_profiles = get_profiles() # 获取彩色和深度流的配置文件
    5. w, h, fps, fmt = depth_profiles[1] # 获取深度流的宽度、高度、帧率和格式
    6. config.enable_stream(rs.stream.depth, w, h, fmt, fps) # 启用深度流
    7. w, h, fps, fmt = color_profiles[18] # 获取彩色流的宽度、高度、帧率和格式
    8. config.enable_stream(rs.stream.color, w, h, fmt, fps) # 启用彩色流
    配置并启动三个T265相机的位姿流,具体而言
    \rightarrow  先配置
    1. # 配置第一个T265相机的流
    2. ctx = rs.context() # 创建一个Realsense上下文
    3. self.t265_pipeline = rs.pipeline(ctx) # 创建一个T265管道
    4. t265_config = rs.config() # 创建一个T265配置对象
    5. t265_config.enable_device(self.first_t265_serial) # 启用第一个T265相机
    6. # 配置第二个T265相机的流
    7. ctx_2 = rs.context() # 创建另一个Realsense上下文
    8. self.t265_pipeline_2 = rs.pipeline(ctx_2) # 创建第二个T265管道
    9. t265_config_2 = self.get_rs_t265_config(
    10. self.second_t265_serial, self.t265_pipeline_2
    11. ) # 获取第二个T265相机的配置
    12. # 配置第三个T265相机的流
    13. ctx_3 = rs.context() # 创建第三个Realsense上下文
    14. self.t265_pipeline_3 = rs.pipeline(ctx_3) # 创建第三个T265管道
    15. t265_config_3 = self.get_rs_t265_config(
    16. self.thrid_t265_serial, self.t265_pipeline_3
    17. ) # 获取第三个T265相机的配置
    \rightarrow  再启动
    1. self.t265_pipeline.start(t265_config) # 启动第一个T265管道
    2. self.t265_pipeline_2.start(t265_config_2) # 启动第二个T265管道
    3. self.t265_pipeline_3.start(t265_config_3) # 启动第三个T265管道
    4. pipeline_profile = self.pipeline.start(config) # 启动L515管道并获取配置文件
    5. depth_sensor = pipeline_profile.get_device().first_depth_sensor() # 获取深度传感器
    6. depth_sensor.set_option(rs.option.visual_preset, Preset.HighAccuracy) # 设置深度传感器的选项为高精度
    7. self.depth_scale = depth_sensor.get_depth_scale() # 获取深度比例
    8. align_to = rs.stream.color # 对齐到彩色流
    9. self.align = rs.align(align_to) # 创建对齐对象
    如果启用了可视化功能,则初始化Open3D可视化器
    1. self.vis = None # 初始化可视化器
    2. if self.enable_visualization: # 如果启用了可视化功能
    3. self.vis = o3d.visualization.Visualizer() # 创建Open3D可视化器
    4. self.vis.create_window() # 创建可视化窗口
    5. self.vis.get_view_control().change_field_of_view(step=1.0) # 改变视野
  4. 获取RGB-D帧——get_rgbd_frame_from_realsense
    该方法从Realsense相机获取RGB-D顾,并将深度帧与彩色帧对齐
    1. def get_rgbd_frame_from_realsense(self, enable_visualization=False): # 从Realsense获取RGB-D帧的方法
    2. frames = self.pipeline.wait_for_frames() # 等待获取帧数据
    3. # 将深度帧对齐到彩色帧
    4. aligned_frames = self.align.process(frames) # 对齐帧数据
    5. # 获取对齐后的帧
    6. aligned_depth_frame = aligned_frames.get_depth_frame() # 获取对齐后的深度帧
    7. color_frame = aligned_frames.get_color_frame() # 获取对齐后的彩色帧
    8. depth_image = (
    9. np.asanyarray(aligned_depth_frame.get_data()) // 4
    10. ) # 将深度帧数据转换为numpy数组,并除以4以获得以米为单位的深度值(适用于L515相机)
    11. color_image = np.asanyarray(color_frame.get_data()) # 将彩色帧数据转换为numpy数组
    如果启用了可视化功能,则将深度图像和彩色图像转换为Open3D的RGBD图像对象
    1. rgbd = None # 初始化RGBD图像对象
    2. if enable_visualization: # 如果启用了可视化功能
    3. depth_image_o3d = o3d.geometry.Image(depth_image) # 将深度图像转换为Open3D图像对象
    4. color_image_o3d = o3d.geometry.Image(color_image) # 将彩色图像转换为Open3D图像对象
    5. rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(
    6. color_image_o3d, # 彩色图像
    7. depth_image_o3d, # 深度图像
    8. depth_trunc=4.0, # 深度截断值,超过此值的深度将被忽略
    9. convert_rgb_to_intensity=False, # 是否将RGB图像转换为强度图像
    10. ) # 创建RGBD图像对象
    11. return rgbd, depth_image, color_image # 返回RGBD图像、深度图像和彩色图像
  5. 通过frame_to_pose_conversion方法,实现帧到位姿的转换
    1. @staticmethod # 静态方法装饰器
    2. def frame_to_pose_conversion(input_t265_frames): # 定义帧到位姿转换的方法
    3. pose_frame = input_t265_frames.get_pose_frame() # 获取位姿帧
    4. pose_data = pose_frame.get_pose_data() # 获取位姿数据
    5. pose_3x3 = quat2mat( # 将四元数转换为3x3旋转矩阵
    6. np.array(
    7. [
    8. pose_data.rotation.w, # 四元数的w分量
    9. pose_data.rotation.x, # 四元数的x分量
    10. pose_data.rotation.y, # 四元数的y分量
    11. pose_data.rotation.z, # 四元数的z分量
    12. ]
    13. )
    14. )
    15. pose_4x4 = np.eye(4) # 创建4x4单位矩阵
    16. pose_4x4[:3, :3] = pose_3x3 # 将3x3旋转矩阵赋值给4x4矩阵的左上角
    17. pose_4x4[:3, 3] = [ # 将平移向量赋值给4x4矩阵的右上角
    18. pose_data.translation.x, # 位姿的x平移分量
    19. pose_data.translation.y, # 位姿的y平移分量
    20. pose_data.translation.z, # 位姿的z平移分量
    21. ]
    22. return pose_4x4 # 返回4x4位姿矩阵
  6. 处理帧
    该方法处理每一帧数据,首先通过get_rgbd_frame_from_realsense获取三个T265相机的RGB-D帧数据,然后通过frame_to_pose_conversion,把帧数据换成位姿数据
    1. def process_frame(self): # 定义处理帧的方法
    2. frame_count = 0 # 初始化帧计数器
    3. first_frame = True # 标记是否为第一帧
    4. try:
    5. while frame_count < self.total_frame: # 循环处理每一帧,直到达到总帧数
    6. t265_frames = self.t265_pipeline.wait_for_frames() # 获取第一个T265相机的帧数据
    7. t265_frames_2 = self.t265_pipeline_2.wait_for_frames() # 获取第二个T265相机的帧数据
    8. t265_frames_3 = self.t265_pipeline_3.wait_for_frames() # 获取第三个T265相机的帧数据
    9. rgbd, depth_frame, color_frame = self.get_rgbd_frame_from_realsense() # 获取RGB-D帧数据
    10. # 获取第一个T265相机的位姿数据
    11. pose_4x4 = RealsesneProcessor.frame_to_pose_conversion(
    12. input_t265_frames=t265_frames
    13. )
    14. # 获取第二个T265相机的位姿数据
    15. pose_4x4_2 = RealsesneProcessor.frame_to_pose_conversion(
    16. input_t265_frames=t265_frames_2
    17. )
    18. # 获取第三个T265相机的位姿数据
    19. pose_4x4_3 = RealsesneProcessor.frame_to_pose_conversion(
    20. input_t265_frames=t265_frames_3
    21. )
    如果启用了手部数据保存功能,则从Redis服务器获取手部关节数据
    1. if self.save_hand: # 如果启用了手部数据保存功能
    2. # 获取左手关节位置数据
    3. leftHandJointXyz = np.frombuffer(
    4. self.rds.get("rawLeftHandJointXyz"), dtype=np.float64
    5. ).reshape(21, 3)
    6. # 获取右手关节位置数据
    7. rightHandJointXyz = np.frombuffer(
    8. self.rds.get("rawRightHandJointXyz"), dtype=np.float64
    9. ).reshape(21, 3)
    10. # 获取左手关节方向数据
    11. leftHandJointOrientation = np.frombuffer(
    12. self.rds.get("rawLeftHandJointOrientation"), dtype=np.float64
    13. ).reshape(21, 4)
    14. # 获取右手关节方向数据
    15. rightHandJointOrientation = np.frombuffer(
    16. self.rds.get("rawRightHandJointOrientation"), dtype=np.float64
    17. ).reshape(21, 4)
    将位姿数据转换为4x4矩阵,并应用校正矩阵
                corrected_pose = pose_4x4 @ between_cam  # 应用校正矩阵
    且转换为Open3D格式的L515相机内参
    1. # 转换为Open3D格式的L515相机内参
    2. o3d_depth_intrinsic = o3d.camera.PinholeCameraIntrinsic(
    3. 1280,
    4. 720,
    5. 898.2010498046875,
    6. 897.86669921875,
    7. 657.4981079101562,
    8. 364.30950927734375,
    9. )
    如果是第一帧,则初始化Open3D的点云和坐标系,并添加到可视化器中
    1. if first_frame: # 如果是第一帧
    2. if self.enable_visualization: # 如果启用了可视化功能
    3. pcd = o3d.geometry.PointCloud.create_from_rgbd_image(
    4. rgbd, o3d_depth_intrinsic
    5. ) # 创建点云
    6. pcd.transform(corrected_pose) # 应用校正矩阵
    7. rgbd_mesh = o3d.geometry.TriangleMesh.create_coordinate_frame(
    8. size=0.3
    9. ) # 创建RGBD坐标系
    10. rgbd_mesh.transform(corrected_pose) # 应用校正矩阵
    11. rgbd_previous_pose = copy.deepcopy(corrected_pose) # 复制校正矩阵
    12. chest_mesh = o3d.geometry.TriangleMesh.create_coordinate_frame(
    13. size=0.3
    14. ) # 创建胸部坐标系
    15. chest_mesh.transform(pose_4x4) # 应用位姿矩阵
    16. chest_previous_pose = copy.deepcopy(pose_4x4) # 复制位姿矩阵
    17. left_hand_mesh = (
    18. o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.3)
    19. ) # 创建左手坐标系
    20. left_hand_mesh.transform(pose_4x4_2) # 应用位姿矩阵
    21. left_hand_previous_pose = copy.deepcopy(pose_4x4_2) # 复制位姿矩阵
    22. right_hand_mesh = (
    23. o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.3)
    24. ) # 创建右手坐标系
    25. right_hand_mesh.transform(pose_4x4_3) # 应用位姿矩阵
    26. right_hand_previous_pose = copy.deepcopy(pose_4x4_3) # 复制位姿矩阵
    27. self.vis.add_geometry(pcd) # 添加点云到可视化器
    28. self.vis.add_geometry(rgbd_mesh) # 添加RGBD坐标系到可视化器
    29. self.vis.add_geometry(chest_mesh) # 添加胸部坐标系到可视化器
    30. self.vis.add_geometry(left_hand_mesh) # 添加左手坐标系到可视化器
    31. self.vis.add_geometry(right_hand_mesh) # 添加右手坐标系到可视化器
    32. view_params = (
    33. self.vis.get_view_control().convert_to_pinhole_camera_parameters()
    34. ) # 获取视图参数
    35. first_frame = False # 标记为非第一帧
    如果不是第一帧,则更新点云和坐标系的位姿
    1. else:
    2. if self.enable_visualization: # 如果启用了可视化功能
    3. new_pcd = o3d.geometry.PointCloud.create_from_rgbd_image(
    4. rgbd, o3d_depth_intrinsic
    5. ) # 创建新的点云
    6. new_pcd.transform(corrected_pose) # 应用校正矩阵
    7. rgbd_mesh.transform(np.linalg.inv(rgbd_previous_pose)) # 逆变换上一个RGBD坐标系
    8. rgbd_mesh.transform(corrected_pose) # 应用校正矩阵
    9. rgbd_previous_pose = copy.deepcopy(corrected_pose) # 复制校正矩阵
    10. chest_mesh.transform(np.linalg.inv(chest_previous_pose)) # 逆变换上一个胸部坐标系
    11. chest_mesh.transform(pose_4x4) # 应用位姿矩阵
    12. chest_previous_pose = copy.deepcopy(pose_4x4) # 复制位姿矩阵
    13. left_hand_mesh.transform(np.linalg.inv(left_hand_previous_pose)) # 逆变换上一个左手坐标系
    14. left_hand_mesh.transform(pose_4x4_2) # 应用位姿矩阵
    15. left_hand_previous_pose = copy.deepcopy(pose_4x4_2) # 复制位姿矩阵
    16. right_hand_mesh.transform(
    17. np.linalg.inv(right_hand_previous_pose)
    18. ) # 逆变换上一个右手坐标系
    19. right_hand_mesh.transform(pose_4x4_3) # 应用位姿矩阵
    20. right_hand_previous_pose = copy.deepcopy(pose_4x4_3) # 复制位姿矩阵
    21. pcd.points = new_pcd.points # 更新点云的点
    22. pcd.colors = new_pcd.colors # 更新点云的颜色
    23. self.vis.update_geometry(pcd) # 更新点云几何
    24. self.vis.update_geometry(rgbd_mesh) # 更新RGBD坐标系几何
    25. self.vis.update_geometry(chest_mesh) # 更新胸部坐标系几何
    26. self.vis.update_geometry(left_hand_mesh) # 更新左手坐标系几何
    27. self.vis.update_geometry(right_hand_mesh) # 更新右手坐标系几何
    28. self.vis.get_view_control().convert_from_pinhole_camera_parameters(
    29. view_params
    30. ) # 恢复视图参数
    再更新可视化器
    1. if self.enable_visualization: # 如果启用了可视化功能
    2. self.vis.poll_events() # 处理可视化事件
    3. self.vis.update_renderer() # 更新渲染器
    如果启用了帧存储功能,则将深度图像、彩色图像和位姿数据存储到缓冲区中
    处理完所有帧后,停止所有相机的流,并保存所有帧数据
  7. 主函数
    主函数解析命令行参数,创建 RealsenseProcessor 对象,并配置和处理帧数据
    1. import concurrent.futures # 导入并发库,用于多线程处理
    2. def main(args): # 定义主函数,接受命令行参数
    3. realsense_processor = RealsesneProcessor( # 创建Realsense处理器对象
    4. first_t265_serial="11622110012", # 第一个T265相机的序列号
    5. second_t265_serial="909212110944", # 第二个T265相机的序列号
    6. thrid_t265_serial="929122111181", # 第三个T265相机的序列号
    7. total_frame=10000, # 总帧数
    8. store_frame=args.store_frame, # 是否存储帧
    9. out_directory=args.out_directory, # 输出目录
    10. save_hand=args.store_hand, # 是否保存手部数据
    11. enable_visualization=args.enable_vis, # 是否启用可视化
    12. )
    13. realsense_processor.configure_stream() # 配置Realsense流
    14. realsense_processor.process_frame() # 处理帧数据
    15. if __name__ == "__main__": # 主程序入口
    16. # 设置命令行参数解析器
    17. parser = argparse.ArgumentParser(description="Process frames and save data.")
    18. parser.add_argument(
    19. "-s",
    20. "--store_frame",
    21. action="store_true",
    22. help="Flag to indicate whether to store frames", # 是否存储帧的标志
    23. )
    24. parser.add_argument(
    25. "--store_hand",
    26. action="store_true",
    27. help="Flag to indicate whether to store hand joint position and orientation", # 是否保存手部关节位置和方向的标志
    28. )
    29. parser.add_argument(
    30. "-v",
    31. "--enable_vis",
    32. action="store_true",
    33. help="Flag to indicate whether to enable open3d visualization", # 是否启用Open3D可视化的标志
    34. )
    35. parser.add_argument(
    36. "-o",
    37. "--out_directory",
    38. type=str,
    39. help="Output directory for saved data", # 保存数据的输出目录
    40. default="./saved_data", # 默认输出目录为./saved_data
    41. )
    42. args = parser.parse_args() # 解析命令行参数
    如果输出目录已存在,提示用户是否覆盖目录
    1. # 检查输出目录是否存在
    2. if os.path.exists(args.out_directory):
    3. response = (
    4. input(
    5. f"{args.out_directory} already exists. Do you want to override? (y/n): "
    6. )
    7. .strip()
    8. .lower()
    9. )
    10. if response != "y": # 如果用户选择不覆盖,退出程序
    11. print("Exiting program without overriding the existing directory.")
    12. sys.exit()
    13. else:
    14. shutil.rmtree(args.out_directory) # 如果用户选择覆盖,删除现有目录
    如果启用了帧存储功能,则创建输出目录
    1. if args.store_frame:
    2. os.makedirs(args.out_directory, exist_ok=True) # 创建输出目录
    调用 main 函数开始处理帧数据
    1. # 如果用户选择覆盖,删除现有目录
    2. main(args) # 调用主函数

1.3 demo_clipping_3d.py——可视化点云数据且选择每个演示的起始帧/结束帧

这段代码是一个用于可视化点云数据(PCD文件),并选择每个演示的起始帧、结束帧的脚本,以下是详细解读

  1. 导入库
    导入各种库和模块,用于处理命令行参数、文件操作、点云数据处理、键盘事件监听等
    1. import argparse # 用于解析命令行参数
    2. import os # 用于操作系统相关功能
    3. import copy # 用于复制对象
    4. import zmq # 用于消息传递
    5. import cv2 # 用于图像处理
    6. import sys # 用于系统相关功能
    7. import json # 用于处理JSON数据
    8. import shutil # 用于文件操作
    9. import open3d as o3d # 用于3D数据处理
    10. import numpy as np # 用于数值计算
    11. import platform # 用于获取操作系统信息
    12. from pynput import keyboard # 用于监听键盘事件
    13. from transforms3d.quaternions import qmult, quat2mat # 用于四元数操作
    14. from transforms3d.axangles import axangle2mat # 用于轴角转换
    15. from scipy.spatial.transform import Rotation # 用于旋转矩阵和欧拉角转换
    16. from transforms3d.euler import quat2euler, mat2euler, quat2mat, euler2mat # 用于欧拉角和矩阵转换
    17. from visualizer import * # 导入自定义的可视化模块
  2. 定义全局变量,用于存储剪辑标记、当前剪辑、是否切换到下一帧或上一帧的标志
    1. clip_marks = [] # 存储剪辑标记
    2. current_clip = {} # 存储当前剪辑
    3. next_frame = False # 标记是否切换到下一帧
    4. previous_frame = False # 标记是否切换到上一帧
  3. 定义键盘事件处理函数,用于处理不同的键盘输入:
    Key.up:保存剪辑标记到 clip_marks. json 文件
    Key.down:标记切换到上一帧
    Key.page_down :标记切换到下一帧
    Key.end:标记当前剪辑的结束帧
    Key.insert:标记当前剪辑的起始帧
    1. def on_press(key): # 定义键盘按下事件处理函数
    2. global next_frame, previous_frame, delta_movement_accu, delta_ori_accu, delta_movement_accu_left, delta_ori_accu_left, adjust_movement, adjust_right, frame, step, dataset_folder, clip_marks, current_clip
    3. # 确定操作系统类型
    4. os_type = platform.system()
    5. assert os_type == "Windows" # 仅支持Windows系统
    6. frame_folder = 'frame_{}'.format(frame) # 当前帧文件夹名称
    7. # Windows特定的键绑定在AttributeError部分处理
    8. if key == keyboard.Key.up: # y正方向
    9. with open(os.path.join(dataset_folder, 'clip_marks.json'), 'w') as f:
    10. json.dump(clip_marks, f, indent=4) # 保存剪辑标记到JSON文件
    11. elif key == keyboard.Key.down: # y负方向
    12. previous_frame = True # 切换到上一帧
    13. elif key == keyboard.Key.page_down:
    14. next_frame = True # 切换到下一帧
    15. elif key == keyboard.Key.end:
    16. if 'start' in current_clip.keys():
    17. print("end", frame_folder)
    18. current_clip['end'] = frame_folder # 标记当前剪辑的结束帧
    19. clip_marks.append(current_clip) # 添加当前剪辑到剪辑标记列表
    20. current_clip = {} # 重置当前剪辑
    21. elif key == keyboard.Key.insert:
    22. print("start", frame_folder)
    23. current_clip['start'] = frame_folder # 标记当前剪辑的起始帧
    24. else:
    25. print("Key error", key) # 处理其他键的错误
  4. 数据可视化类
    定义数据可祝化类 ReplayDatavisualizer,继承自DataVisualizer.
    replay_frames 方法用于可视化单帧数据,并处理键盘事件以切换帧
    1. class ReplayDataVisualizer(DataVisualizer): # 定义数据重放可视化类,继承自DataVisualizer
    2. def __init__(self, directory):
    3. super().__init__(directory) # 调用父类的初始化方法
    4. def replay_frames(self): # 定义重放帧的方法
    5. """
    6. 可视化单帧数据
    7. """
    8. global delta_movement_accu, delta_ori_accu, next_frame, previous_frame, frame
    9. if self.R_delta_init is None:
    10. self.initialize_canonical_frame() # 初始化标准帧
    11. self._load_frame_data(frame) # 加载当前帧数据
    12. self.vis.add_geometry(self.pcd) # 添加点云数据到可视化器
    13. self.vis.add_geometry(self.coord_frame_1) # 添加坐标系1到可视化器
    14. self.vis.add_geometry(self.coord_frame_2) # 添加坐标系2到可视化器
    15. self.vis.add_geometry(self.coord_frame_3) # 添加坐标系3到可视化器
    16. for joint in self.left_joints + self.right_joints:
    17. self.vis.add_geometry(joint) # 添加关节数据到可视化器
    18. for cylinder in self.left_line_set + self.right_line_set:
    19. self.vis.add_geometry(cylinder) # 添加连线数据到可视化器
    20. next_frame = True # 初始化为下一帧
    21. try:
    22. with keyboard.Listener(on_press=on_press) as listener: # 监听键盘事件
    23. while True:
    24. if next_frame == True:
    25. next_frame = False
    26. frame += 10 # 切换到下一帧
    27. if previous_frame == True:
    28. previous_frame = False
    29. frame -= 10 # 切换到上一帧
    30. self._load_frame_data(frame) # 加载当前帧数据
    31. self.step += 1 # 增加步数
    32. self.vis.update_geometry(self.pcd) # 更新点云数据
    33. self.vis.update_geometry(self.coord_frame_1) # 更新坐标系1
    34. self.vis.update_geometry(self.coord_frame_2) # 更新坐标系2
    35. self.vis.update_geometry(self.coord_frame_3) # 更新坐标系3
    36. for joint in self.left_joints + self.right_joints:
    37. self.vis.update_geometry(joint) # 更新关节数据
    38. for cylinder in self.left_line_set + self.right_line_set:
    39. self.vis.update_geometry(cylinder) # 更新连线数据
    40. self.vis.poll_events() # 处理可视化事件
    41. self.vis.update_renderer() # 更新渲染器
    42. listener.join() # 等待监听器结束
    43. finally:
    44. print("cumulative_correction ", self.cumulative_correction) # 打印累计修正值
  5. 主函数
    主函数解析命令行参数,获取数据目录
    1. if __name__ == "__main__": # 主程序入口
    2. parser = argparse.ArgumentParser(description="Visualize saved frame data.") # 创建命令行参数解析器
    3. parser.add_argument("--directory", type=str, default="./saved_data", help="Directory with saved data") # 添加目录参数
    4. args = parser.parse_args() # 解析命令行参数
    检查数据目录是否存在,如果存在 clip_marks. json 文件,询问用户是否覆盖
    1. assert os.path.exists(args.directory), f"given directory: {args.directory} not exists" # 确认目录存在
    2. if os.path.exists(os.path.join(args.directory, 'clip_marks.json')): # 检查剪辑标记文件是否存在
    3. response = (
    4. input(
    5. f"clip_marks.json already exists. Do you want to override? (y/n): "
    6. )
    7. .strip()
    8. .lower()
    9. )
    10. if response != "y":
    11. print("Exiting program without overriding the existing directory.") # 如果用户选择不覆盖,退出程序
    12. sys.exit()
    初始化 ReplayDatavisualizer 对象,并加载校准偏移量和方向偏移量
    1. dataset_folder = args.directory # 设置数据集文件夹
    2. visualizer = ReplayDataVisualizer(args.directory) # 创建数据重放可视化器对象
    3. visualizer.right_hand_offset = np.loadtxt("{}/calib_offset.txt".format(args.directory)) # 加载右手偏移量
    4. visualizer.right_hand_ori_offset = np.loadtxt("{}/calib_ori_offset.txt".format(args.directory)) # 加载右手方向偏移量
    5. visualizer.left_hand_offset = np.loadtxt("{}/calib_offset_left.txt".format(args.directory)) # 加载左手偏移量
    6. visualizer.left_hand_ori_offset = np.loadtxt("{}/calib_ori_offset_left.txt".format(args.directory)) # 加载左手方向偏移量
    调用replay_frames 方法开始可视化和处理键盛事件
        visualizer.replay_frames()  # 开始重放帧

1.4 redis_glove_server.py——接收UDP数据包并将手部关节数据存储到Redis数据库

这段代码是一个用于接收UDP数据包并将手部关节数据存储到Redis数据库的Python脚本

所谓UDP (User Datagram Protocol,用户数据报协议),其是一种简单的、面向无连接的传输层协议


与TCP (Transmission Control Protocol, 传输控制协议)不同,UDP不提供可靠性、数据包顺序和流量控制等功能。UDP主要用于 需要快速传输且对数据丢失不敏感的应用场景,例如视频流、在线游戏和实时通信等

以下是代码的详细解读:

  1. 导入库
    1. import socket # 用于网络通信
    2. import json # 用于处理JSON数据
    3. import redis # 用于连接Redis数据库
    4. import numpy as np # 用于数值计算
  2. 初始化Redis连接
    1. # 初始化Redis连接
    2. redis_host = "localhost" # Redis服务器主机名
    3. redis_port = 6669 # Redis服务器端口
    4. redis_password = "" # Redis服务器密码,如果没有密码则保持为空字符串
    5. r = redis.StrictRedis(
    6. host=redis_host, port=redis_port, password=redis_password, decode_responses=True
    7. ) # 创建Redis连接对象
  3. 定义手部关节名称
    1. # 定义左手和右手的关节名称
    2. left_hand_joint_names = ["leftHand",
    3. 'leftThumbProximal', 'leftThumbMedial', 'leftThumbDistal', 'leftThumbTip',
    4. 'leftIndexProximal', 'leftIndexMedial', 'leftIndexDistal', 'leftIndexTip',
    5. 'leftMiddleProximal', 'leftMiddleMedial', 'leftMiddleDistal', 'leftMiddleTip',
    6. 'leftRingProximal', 'leftRingMedial', 'leftRingDistal', 'leftRingTip',
    7. 'leftLittleProximal', 'leftLittleMedial', 'leftLittleDistal', 'leftLittleTip']
    8. right_hand_joint_names = ["rightHand",
    9. 'rightThumbProximal', 'rightThumbMedial', 'rightThumbDistal', 'rightThumbTip',
    10. 'rightIndexProximal', 'rightIndexMedial', 'rightIndexDistal', 'rightIndexTip',
    11. 'rightMiddleProximal', 'rightMiddleMedial', 'rightMiddleDistal', 'rightMiddleTip',
    12. 'rightRingProximal', 'rightRingMedial', 'rightRingDistal', 'rightRingTip',
    13. 'rightLittleProximal', 'rightLittleMedial', 'rightLittleDistal', 'rightLittleTip']
  4. 归一化函数
    1. def normalize_wrt_middle_proximal(hand_positions, is_left=True): # 定义相对于中指近端关节的归一化函数
    2. middle_proximal_idx = left_hand_joint_names.index('leftMiddleProximal') # 获取左手中指近端关节的索引
    3. if not is_left:
    4. middle_proximal_idx = right_hand_joint_names.index('rightMiddleProximal') # 获取右手中指近端关节的索引
    5. wrist_position = hand_positions[0] # 获取手腕位置
    6. middle_proximal_position = hand_positions[middle_proximal_idx] # 获取中指近端关节位置
    7. bone_length = np.linalg.norm(wrist_position - middle_proximal_position) # 计算手腕到中指近端关节的骨骼长度
    8. normalized_hand_positions = (middle_proximal_position - hand_positions) / bone_length # 归一化手部关节位置
    9. return normalized_hand_positions # 返回归一化后的手部关节位置
  5. 启动服务器函数
    创建并绑定UDP套接字
    1. def start_server(port): # 定义启动服务器的函数,接受端口号作为参数
    2. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 使用SOCK_DGRAM创建UDP套接字
    3. s.bind(("192.168.0.200", port)) # 绑定套接字到指定的IP地址和端口
    4. print(f"Server started, listening on port {port} for UDP packets...") # 打印服务器启动信息
    无限循环接收UDP数据包
    1. while True: # 无限循环,持续接收数据
    2. data, address = s.recvfrom(64800) # 接收UDP数据包,最大数据包大小为64800字节
    3. decoded_data = data.decode() # 解码数据
    4. # 尝试解析JSON数据
    5. try:
    6. received_json = json.loads(decoded_data) # 解析JSON数据
    7. # 初始化数组以存储手部关节位置和方向
    8. left_hand_positions = np.zeros((21, 3)) # 初始化左手关节位置数组,21个关节,每个关节3个坐标
    9. right_hand_positions = np.zeros((21, 3)) # 初始化右手关节位置数组,21个关节,每个关节3个坐标
    10. left_hand_orientations = np.zeros((21, 4)) # 初始化左手关节方向数组,21个关节,每个关节4个方向分量
    11. right_hand_orientations = np.zeros((21, 4)) # 初始化右手关节方向数组,21个关节,每个关节4个方向分量
    解析接收到的数据包,提取手部关节位置和方向数据
    1. # 遍历JSON数据以提取手部关节位置和方向
    2. for joint_name in left_hand_joint_names: # 遍历左手关节名称列表
    3. joint_data = received_json["scene"]["actors"][0]["body"][joint_name] # 获取关节数据
    4. joint_position = np.array(list(joint_data["position"].values())) # 获取关节位置
    5. joint_rotation = np.array(list(joint_data["rotation"].values())) # 获取关节方向
    6. left_hand_positions[left_hand_joint_names.index(joint_name)] = joint_position # 存储关节位置
    7. left_hand_orientations[left_hand_joint_names.index(joint_name)] = joint_rotation # 存储关节方向
    8. for joint_name in right_hand_joint_names: # 遍历右手关节名称列表
    9. joint_data = received_json["scene"]["actors"][0]["body"][joint_name] # 获取关节数据
    10. joint_position = np.array(list(joint_data["position"].values())) # 获取关节位置
    11. joint_rotation = np.array(list(joint_data["rotation"].values())) # 获取关节方向
    12. right_hand_positions[right_hand_joint_names.index(joint_name)] = joint_position # 存储关节位置
    13. right_hand_orientations[right_hand_joint_names.index(joint_name)] = joint_rotation # 存储关节方向
    计算相对于中指近端关节的相对距离,并进行归一化
    1. # 计算相对于中指近端关节的相对距离,并归一化
    2. left_middle_proximal_idx = left_hand_joint_names.index('leftMiddleProximal') # 获取左手中指近端关节的索引
    3. right_middle_proximal_idx = right_hand_joint_names.index('rightMiddleProximal') # 获取右手中指近端关节的索引
    4. left_wrist_position = left_hand_positions[0] # 获取左手手腕位置
    5. right_wrist_position = right_hand_positions[0] # 获取右手手腕位置
    6. left_middle_proximal_position = left_hand_positions[left_middle_proximal_idx] # 获取左手中指近端关节位置
    7. right_middle_proximal_position = right_hand_positions[right_middle_proximal_idx] # 获取右手中指近端关节位置
    8. left_bone_length = np.linalg.norm(left_wrist_position - left_middle_proximal_position) # 计算左手骨骼长度
    9. right_bone_length = np.linalg.norm(right_wrist_position - right_middle_proximal_position) # 计算右手骨骼长度
    10. normalized_left_hand_positions = (left_middle_proximal_position - left_hand_positions) / left_bone_length # 归一化左手关节位置
    11. normalized_right_hand_positions = (right_middle_proximal_position - right_hand_positions) / right_bone_length # 归一化右手关节位置
    将原始和归一化后的手部关节数据存储到Redis数据库中
    1. r.set("leftHandJointXyz", np.array(normalized_left_hand_positions).astype(np.float64).tobytes()) # 将归一化后的左手关节位置存储到Redis
    2. r.set("rightHandJointXyz", np.array(normalized_right_hand_positions).astype(np.float64).tobytes()) # 将归一化后的右手关节位置存储到Redis
    3. r.set("rawLeftHandJointXyz", np.array(left_hand_positions).astype(np.float64).tobytes()) # 将原始左手关节位置存储到Redis
    4. r.set("rawRightHandJointXyz", np.array(right_hand_positions).astype(np.float64).tobytes()) # 将原始右手关节位置存储到Redis
    5. r.set("rawLeftHandJointOrientation", np.array(left_hand_orientations).astype(np.float64).tobytes()) # 将原始左手关节方向存储到Redis
    6. r.set("rawRightHandJointOrientation", np.array(right_hand_orientations).astype(np.float64).tobytes()) # 将原始右手关节方向存储到Redis
    打印接收到的手部关节位置数据
    1. print("\n\n")
    2. print("=" * 50)
    3. print(np.round(left_hand_positions, 3)) # 打印左手关节位置
    4. print("-"*50)
    5. print(np.round(right_hand_positions, 3)) # 打印右手关节位置
    6. except json.JSONDecodeError: # 捕获JSON解析错误
    7. print("Invalid JSON received:") # 打印错误信息
  6. 主程序入口
    1. if __name__ == "__main__": # 主程序入口
    2. start_server(14551) # 启动服务器,监听端口14551

1.5 replay_human_traj_vis.py——可视化保存点云数据和位姿(可用于查看所有帧和校准)

1.6 transform_to_robot_table.py——将可视化点云数据放置在机器人桌面上

1.7 visualizer.py——可视化手部关节数据

1.7.1 一系列定义:比如五个手指等

  1. 导入各种库和模块,用于处理命令行参数、文件操作、点云数据处理、键盘事件监听等
  2. 定义手部关节之间的连接线,用于可视化手部骨架
    1. lines = np.array([
    2. # 拇指
    3. [1, 2], [2, 3], [3, 4],
    4. # 食指
    5. [5, 6], [6, 7], [7, 8],
    6. # 中指
    7. [9, 10], [10, 11], [11, 12],
    8. # 无名指
    9. [13, 14], [14, 15], [15, 16],
    10. # 小指
    11. [17, 18], [18, 19], [19, 20],
    12. # 连接近端关节
    13. [1, 5], [5, 9], [9, 13], [13, 17],
    14. # 连接手掌
    15. [0, 1], [17, 0]
    16. ])
  3. 定义一系列全局变量,用于存储累积的校正、当前帧、步长等信息
    1. delta_movement_accu = np.array([0.0, 0.0, 0.0]) # 累积的位移校正
    2. delta_ori_accu = np.array([0.0, 0.0, 0.0]) # 累积的方向校正
    3. delta_movement_accu_left = np.array([0.0, 0.0, 0.0]) # 左手累积的位移校正
    4. delta_ori_accu_left = np.array([0.0, 0.0, 0.0]) # 左手累积的方向校正
    5. adjust_movement = True # 是否调整位移
    6. adjust_right = True # 是否调整右手
    7. next_frame = False # 是否切换到下一帧
    8. frame = 0 # 当前帧
    9. step = 0.01 # 步长
    10. fixed_transform = np.array([0.0, 0.0, 0.0]) # 固定变换
  4. 一些辅助函数
    将手腕位置平移到原点
    1. def translate_wrist_to_origin(joint_positions): # 将手腕位置平移到原点
    2. wrist_position = joint_positions[0] # 获取手腕位置
    3. updated_positions = joint_positions - wrist_position # 平移所有关节位置
    4. return updated_positions # 返回平移后的关节位置
    应用位姿矩阵
    1. def apply_pose_matrix(joint_positions, pose_matrix): # 应用位姿矩阵
    2. homogeneous_joint_positions = np.hstack([joint_positions, np.ones((joint_positions.shape[0], 1))]) # 将关节位置转换为齐次坐标
    3. transformed_positions = np.dot(homogeneous_joint_positions, pose_matrix.T) # 应用位姿矩阵
    4. transformed_positions_3d = transformed_positions[:, :3] # 提取3D坐标
    5. return transformed_positions_3d # 返回变换后的关节位置
    创建或更新圆柱体
    1. def create_or_update_cylinder(start, end, radius=0.003, cylinder_list=None, cylinder_idx=-1): # 创建或更新圆柱体
    2. cyl_length = np.linalg.norm(end - start) # 计算圆柱体的长度
    3. new_cylinder = o3d.geometry.TriangleMesh.create_cylinder(radius=radius, height=cyl_length, resolution=20, split=4) # 创建新的圆柱体
    4. new_cylinder.paint_uniform_color([1, 0, 0]) # 将圆柱体涂成红色
    5. new_cylinder.translate(np.array([0, 0, cyl_length / 2])) # 平移圆柱体
    6. direction = end - start # 计算方向向量
    7. direction /= np.linalg.norm(direction) # 归一化方向向量
    8. up = np.array([0, 0, 1]) # 圆柱体的默认向上向量
    9. rotation_axis = np.cross(up, direction) # 计算旋转轴
    10. rotation_angle = np.arccos(np.dot(up, direction)) # 计算旋转角度
    11. if np.linalg.norm(rotation_axis) != 0: # 如果旋转轴不为零
    12. rotation_axis /= np.linalg.norm(rotation_axis) # 归一化旋转轴
    13. rotation_matrix = o3d.geometry.get_rotation_matrix_from_axis_angle(rotation_axis * rotation_angle) # 计算旋转矩阵
    14. new_cylinder.rotate(rotation_matrix, center=np.array([0, 0, 0])) # 旋转圆柱体
    15. new_cylinder.translate(start) # 平移圆柱体到起始位置
    16. if cylinder_list[cylinder_idx] is not None: # 如果圆柱体列表中已有圆柱体
    17. cylinder_list[cylinder_idx].vertices = new_cylinder.vertices # 更新顶点
    18. cylinder_list[cylinder_idx].triangles = new_cylinder.triangles # 更新三角形
    19. cylinder_list[cylinder_idx].vertex_normals = new_cylinder.vertex_normals # 更新顶点法线
    20. cylinder_list[cylinder_idx].vertex_colors = new_cylinder.vertex_colors # 更新顶点颜色
    21. else:
    22. cylinder_list[cylinder_idx] = new_cylinder # 添加新的圆柱体到列表中

1.7.2 DataVisualizer类:用于可视化点云数据和手部关节数据

接下来,定义了一个名为DataVisualizer的类,用于可视化点云数据和手部关节数据

  1. 类定义和初始化
    1. class DataVisualizer: # 定义DataVisualizer类
    2. def __init__(self, directory): # 初始化方法,接受数据目录作为参数
    3. self.directory = directory # 初始化数据目录
    4. self.base_pcd = None # 初始化基础点云对象
    5. self.pcd = None # 初始化点云对象
    6. self.img_backproj = None # 初始化图像反投影对象
    7. self.coord_frame_1 = None # 初始化坐标系1
    8. self.coord_frame_2 = None # 初始化坐标系2
    9. self.coord_frame_3 = None # 初始化坐标系3
    10. self.right_hand_offset = None # 初始化右手偏移量
    11. self.right_hand_ori_offset = None # 初始化右手方向偏移量
    12. self.left_hand_offset = None # 初始化左手偏移量
    13. self.left_hand_ori_offset = None # 初始化左手方向偏移量
    14. self.pose1_prev = np.eye(4) # 初始化第一个位姿矩阵
    15. self.pose2_prev = np.eye(4) # 初始化第二个位姿矩阵
    16. self.pose3_prev = np.eye(4) # 初始化第三个位姿矩阵
    17. self.vis = o3d.visualization.Visualizer() # 创建Open3D可视化器
    18. self.vis.create_window() # 创建可视化窗口
    19. self.vis.get_view_control().change_field_of_view(step=1.0) # 改变视野
    20. self.between_cam = np.eye(4) # 初始化相机之间的变换矩阵
    21. self.between_cam[:3, :3] = np.array([[1.0, 0.0, 0.0],
    22. [0.0, -1.0, 0.0],
    23. [0.0, 0.0, -1.0]])
    24. self.between_cam[:3, 3] = np.array([0.0, 0.076, 0.0]) # 设置平移部分
    25. self.between_cam_2 = np.eye(4) # 初始化第二个相机之间的变换矩阵
    26. self.between_cam_2[:3, :3] = np.array([[1.0, 0.0, 0.0],
    27. [0.0, 1.0, 0.0],
    28. [0.0, 0.0, 1.0]])
    29. self.between_cam_2[:3, 3] = np.array([0.0, -0.032, 0.0]) # 设置平移部分
    30. self.between_cam_3 = np.eye(4) # 初始化第三个相机之间的变换矩阵
    31. self.between_cam_3[:3, :3] = np.array([[1.0, 0.0, 0.0],
    32. [0.0, 1.0, 0.0],
    33. [0.0, 0.0, 1.0]])
    34. self.between_cam_3[:3, 3] = np.array([0.0, -0.064, 0.0]) # 设置平移部分
    35. self.canonical_t265_ori = None # 初始化标准T265方向
    36. # 可视化左手21个关节
    37. self.left_joints = [] # 初始化左手关节列表
    38. self.right_joints = [] # 初始化右手关节列表
    39. self.left_line_set = [None for _ in lines] # 初始化左手连线列表
    40. self.right_line_set = [None for _ in lines] # 初始化右手连线列表
    41. for i in range(21): # 遍历21个关节
    42. for joint in [self.left_joints, self.right_joints]: # 遍历左手和右手关节列表
    43. # 为指尖和手腕创建较大的球体,为其他关节创建较小的球体
    44. radius = 0.011 if i in [0, 4, 8, 12, 16, 20] else 0.007
    45. sphere = o3d.geometry.TriangleMesh.create_sphere(radius=radius) # 创建球体
    46. # 将手腕和近端关节涂成绿色
    47. if i in [0, 1, 5, 9, 13, 17]:
    48. sphere.paint_uniform_color([0, 1, 0])
    49. # 将指尖涂成红色
    50. elif i in [4, 8, 12, 16, 20]:
    51. sphere.paint_uniform_color([1, 0, 0])
    52. # 将其他关节涂成蓝色
    53. else:
    54. sphere.paint_uniform_color([0, 0, 1])
    55. # 将拇指涂成粉色
    56. if i in [1, 2, 3, 4]:
    57. sphere.paint_uniform_color([1, 0, 1])
    58. joint.append(sphere) # 将球体添加到关节列表中
    59. self.vis.add_geometry(sphere) # 将球体添加到可视化器中
    60. self.step = 0 # 初始化步数
    61. self.distance_buffer = [] # 初始化距离缓冲区
    62. self.R_delta_init = None # 初始化旋转矩阵
    63. self.cumulative_correction = np.array([0.0, 0.0, 0.0]) # 初始化累计修正值
  2. 初始化标准帧的方法
    1. def initialize_canonical_frame(self): # 初始化标准帧的方法
    2. if self.R_delta_init is None: # 如果旋转矩阵未初始化
    3. self._load_frame_data(0) # 加载第0帧数据
    4. pose_ori_matirx = self.pose3_prev[:3, :3] # 获取第三个位姿的旋转矩阵
    5. pose_ori_correction_matrix = np.dot(np.array([[0, -1, 0],
    6. [0, 0, 1],
    7. [1, 0, 0]]), euler2mat(0, 0, 0)) # 计算修正矩阵
    8. pose_ori_matirx = np.dot(pose_ori_matirx, pose_ori_correction_matrix) # 应用修正矩阵
    9. self.canonical_t265_ori = np.array([[1, 0, 0],
    10. [0, -1, 0],
    11. [0, 0, -1]]) # 初始化标准T265方向
    12. x_angle, y_angle, z_angle = mat2euler(self.pose3_prev[:3, :3]) # 将旋转矩阵转换为欧拉角
    13. self.canonical_t265_ori = np.dot(self.canonical_t265_ori, euler2mat(-z_angle, x_angle + 0.3, y_angle)) # 应用欧拉角修正
    14. self.R_delta_init = np.dot(self.canonical_t265_ori, pose_ori_matirx.T) # 计算初始旋转矩阵
  3. 重放关键帧校准的方法
    1. def replay_keyframes_calibration(self):
    2. """
    3. 可视化单帧
    4. """
    5. global delta_movement_accu, delta_ori_accu, next_frame, frame
    6. if self.R_delta_init is None:
    7. self.initialize_canonical_frame() # 初始化标准帧
    8. self._load_frame_data(frame) # 加载当前帧数据
    9. self.vis.add_geometry(self.pcd) # 添加点云数据到可视化器
    10. self.vis.add_geometry(self.coord_frame_1) # 添加坐标系1到可视化器
    11. self.vis.add_geometry(self.coord_frame_2) # 添加坐标系2到可视化器
    12. self.vis.add_geometry(self.coord_frame_3) # 添加坐标系3到可视化器
    13. for joint in self.left_joints + self.right_joints:
    14. self.vis.add_geometry(joint) # 添加关节数据到可视化器
    15. for cylinder in self.left_line_set + self.right_line_set:
    16. self.vis.add_geometry(cylinder) # 添加连线数据到可视化器
    17. next_frame = True # 初始化为下一帧
    18. try:
    19. with keyboard.Listener(on_press=on_press) as listener: # 监听键盘事件
    20. while True:
    21. if next_frame == True:
    22. next_frame = False
    23. frame += 10 # 切换到下一帧
    24. self._load_frame_data(frame) # 加载当前帧数据
    25. self.step += 1 # 增加步数
    26. self.vis.update_geometry(self.pcd) # 更新点云数据
    27. self.vis.update_geometry(self.coord_frame_1) # 更新坐标系1
    28. self.vis.update_geometry(self.coord_frame_2) # 更新坐标系2
    29. self.vis.update_geometry(self.coord_frame_3) # 更新坐标系3
    30. for joint in self.left_joints + self.right_joints:
    31. self.vis.update_geometry(joint) # 更新关节数据
    32. for cylinder in self.left_line_set + self.right_line_set:
    33. self.vis.update_geometry(cylinder) # 更新连线数据
    34. self.vis.poll_events() # 处理可视化事件
    35. self.vis.update_renderer() # 更新渲染器
    36. listener.join() # 等待监听器结束
    37. finally:
    38. print("cumulative_correction ", self.cumulative_correction) # 打印累计修正值
  4. 重放所有帧的方法
    1. def replay_all_frames(self):
    2. """
    3. 连续可视化所有帧
    4. """
    5. try:
    6. if self.R_delta_init is None:
    7. self.initialize_canonical_frame() # 初始化标准帧
    8. frame = 0 # 初始化帧计数器
    9. first_frame = True # 标记是否为第一帧
    10. while True:
    11. if not self._load_frame_data(frame): # 加载当前帧数据
    12. break # 如果无法加载数据,退出循环
    13. if first_frame:
    14. self.vis.add_geometry(self.pcd) # 添加点云数据到可视化器
    15. self.vis.add_geometry(self.coord_frame_1) # 添加坐标系1到可视化器
    16. self.vis.add_geometry(self.coord_frame_2) # 添加坐标系2到可视化器
    17. self.vis.add_geometry(self.coord_frame_3) # 添加坐标系3到可视化器
    18. for joint in self.left_joints + self.right_joints:
    19. self.vis.add_geometry(joint) # 添加关节数据到可视化器
    20. for cylinder in self.left_line_set + self.right_line_set:
    21. self.vis.add_geometry(cylinder) # 添加连线数据到可视化器
    22. else:
    23. self.vis.update_geometry(self.pcd) # 更新点云数据
    24. self.vis.update_geometry(self.coord_frame_1) # 更新坐标系1
    25. self.vis.update_geometry(self.coord_frame_2) # 更新坐标系2
    26. self.vis.update_geometry(self.coord_frame_3) # 更新坐标系3
    27. for joint in self.left_joints + self.right_joints:
    28. self.vis.update_geometry(joint) # 更新关节数据
    29. for cylinder in self.left_line_set + self.right_line_set:
    30. self.vis.update_geometry(cylinder) # 更新连线数据
    31. self.vis.poll_events() # 处理可视化事件
    32. self.vis.update_renderer() # 更新渲染器
    33. if first_frame:
    34. view_params = self.vis.get_view_control().convert_to_pinhole_camera_parameters() # 获取视图参数
    35. else:
    36. self.vis.get_view_control().convert_from_pinhole_camera_parameters(view_params) # 恢复视图参数
    37. self.step += 1 # 增加步数
    38. frame += 5 # 增加帧计数器
    39. if first_frame:
    40. first_frame = False # 标记为非第一帧
    41. finally:
    42. self.vis.destroy_window() # 销毁可视化窗口
  5. 反投影点的方法
    1. def _back_project_point(self, point, intrinsics):
    2. """ 将单个点从3D反投影到2D图像空间 """
    3. x, y, z = point
    4. fx, fy = intrinsics[0, 0], intrinsics[1, 1]
    5. cx, cy = intrinsics[0, 2], intrinsics[1, 2]
    6. u = (x * fx / z) + cx
    7. v = (y * fy / z) + cy
    8. return int(u), int(v)

1.7.3 _load_frame_data:加载给定帧的点云和位姿数据,并进行可视化处理

最后是加载帧数据的方法,即_load_frame_data这个方法,用于加载给定帧的点云和位姿数据,并进行可视化处理

  1. 方法定义和参数说明
    1. def _load_frame_data(self, frame, vis_2d=False, load_table_points=False):
    2. """
    3. Load point cloud and poses for a given frame
    4. @param frame: frame count in integer
    5. @return whether we can successfully load all data from frame subdirectory
    6. """
    7. global delta_movement_accu, delta_ori_accu, delta_movement_accu_left, delta_ori_accu_left # 全局变量,用于存储累积的平移和旋转校正
    8. print(f"frame {frame}") # 打印当前帧编号
    9. if adjust_movement: # 如果启用了平移校正
    10. print("adjusting translation") # 打印平移校正信息
    11. else:
    12. print("adjusting rotation") # 打印旋转校正信息
  2. 初始化最顶部的L515相机内参
    1. # L515:
    2. o3d_depth_intrinsic = o3d.camera.PinholeCameraIntrinsic(
    3. 1280, 720,
    4. 898.2010498046875,
    5. 897.86669921875,
    6. 657.4981079101562,
    7. 364.30950927734375) # 初始化L515相机的内参
  3. 处理桌面点云数据
    1. if load_table_points: # 如果需要加载桌面点云数据
    2. table_color_image_o3d = o3d.io.read_image(os.path.join(self.table_frame, "frame_0", "color_image.jpg")) # 读取桌面彩色图像
    3. table_depth_image_o3d = o3d.io.read_image(os.path.join(self.table_frame, "frame_0", "depth_image.png")) # 读取桌面深度图像
    4. table_rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(table_color_image_o3d, table_depth_image_o3d, depth_trunc=4.0,
    5. convert_rgb_to_intensity=False) # 创建RGBD图像
    6. table_pose_4x4 = np.loadtxt(os.path.join(self.table_frame, "frame_0", "pose.txt")) # 读取桌面位姿
    7. table_corrected_pose = table_pose_4x4 @ self.between_cam # 应用校正矩阵
    8. self.table_pcd = o3d.geometry.PointCloud.create_from_rgbd_image(table_rgbd, o3d_depth_intrinsic) # 创建点云
    9. self.table_pcd.transform(table_corrected_pose) # 应用校正矩阵
  4. 加载当前帧数据
    1. frame_dir = os.path.join(self.directory, f"frame_{frame}") # 当前帧目录
    2. color_image_o3d = o3d.io.read_image(os.path.join(frame_dir, "color_image.jpg")) # 读取彩色图像
    3. depth_image_o3d = o3d.io.read_image(os.path.join(frame_dir, "depth_image.png")) # 读取深度图像
    4. rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(color_image_o3d, depth_image_o3d, depth_trunc=4.0,
    5. convert_rgb_to_intensity=False) # 创建RGBD图像
    6. pose_4x4 = np.loadtxt(os.path.join(frame_dir, "pose.txt")) # 读取位姿
    7. if load_table_points:
    8. pose_4x4[:3, 3] += fixed_transform.T # 应用固定变换
    9. corrected_pose = pose_4x4 @ self.between_cam # 应用校正矩阵
  5. 检查位姿文件是否存在
    1. pose_path = os.path.join(frame_dir, "pose.txt") # 位姿文件路径
    2. pose_2_path = os.path.join(frame_dir, "pose_2.txt") # 第二个位姿文件路径
    3. pose_3_path = os.path.join(frame_dir, "pose_3.txt") # 第三个位姿文件路径
    4. if not all(os.path.exists(path) for path in [pose_path, pose_2_path, pose_3_path]): # 检查所有位姿文件是否存在
    5. return False # 如果有文件不存在,返回False
  6. 创建或更新点云
    1. if self.pcd is None: # 如果点云对象为空
    2. self.pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, o3d_depth_intrinsic) # 创建点云
    3. self.pcd.transform(corrected_pose) # 应用校正矩阵
    4. else:
    5. new_pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, o3d_depth_intrinsic) # 创建新的点云
    6. new_pcd.transform(corrected_pose) # 应用校正矩阵
    7. self.pcd.points = new_pcd.points # 更新点云的点
    8. self.pcd.colors = new_pcd.colors # 更新点云的颜色
  7. 加载并校正位姿
    1. pose_1 = np.loadtxt(pose_path) # 读取第一个位姿
    2. if load_table_points:
    3. pose_1[:3, 3] += fixed_transform.T # 应用固定变换
    4. pose_1 = pose_1 @ self.between_cam # 应用校正矩阵
    5. pose_2 = np.loadtxt(pose_2_path) # 读取第二个位姿
    6. if load_table_points:
    7. pose_2[:3, 3] += fixed_transform.T # 应用固定变换
    8. pose_2 = pose_2 @ self.between_cam_2 # 应用校正矩阵
    9. pose_3 = np.loadtxt(pose_3_path) # 读取第三个位姿
    10. if load_table_points:
    11. pose_3[:3, 3] += fixed_transform.T # 应用固定变换
    12. pose_3 = pose_3 @ self.between_cam_3 # 应用校正矩阵
  8. 创建或更新坐标系
    1. if self.coord_frame_1 is None: # 如果坐标系1为空
    2. self.coord_frame_1 = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1) # 创建坐标系1
    3. self.coord_frame_2 = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1) # 创建坐标系2
    4. self.coord_frame_3 = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1) # 创建坐标系3
    5. self.coord_frame_1 = self.coord_frame_1.transform(np.linalg.inv(self.pose1_prev)) # 逆变换上一个位姿
    6. self.coord_frame_1 = self.coord_frame_1.transform(pose_1) # 应用当前位姿
    7. self.pose1_prev = copy.deepcopy(pose_1) # 复制当前位姿
    8. self.coord_frame_2 = self.coord_frame_2.transform(np.linalg.inv(self.pose2_prev)) # 逆变换上一个位姿
    9. self.coord_frame_2 = self.coord_frame_2.transform(pose_2) # 应用当前位姿
    10. self.pose2_prev = copy.deepcopy(pose_2) # 复制当前位姿
    11. self.coord_frame_3 = self.coord_frame_3.transform(np.linalg.inv(self.pose3_prev)) # 逆变换上一个位姿
    12. self.coord_frame_3 = self.coord_frame_3.transform(pose_3) # 应用当前位姿
    13. self.pose3_prev = copy.deepcopy(pose_3) # 复制当前位姿
  9. 加载并处理左手关节数据
    1. # left hand, read from joint
    2. left_hand_joint_xyz = np.loadtxt(os.path.join(frame_dir, "left_hand_joint.txt")) # 读取左手关节位置
    3. self.left_hand_joint_xyz = left_hand_joint_xyz # 存储左手关节位置
    4. left_hand_joint_xyz = translate_wrist_to_origin(left_hand_joint_xyz) # 平移手腕到原点
    5. left_hand_joint_ori = np.loadtxt(os.path.join(frame_dir, "left_hand_joint_ori.txt"))[0] # 读取左手关节方向
    6. self.left_hand_wrist_ori = left_hand_joint_ori # 存储左手手腕方向
    7. left_rotation_matrix = Rotation.from_quat(left_hand_joint_ori).as_matrix().T # 计算旋转矩阵
    8. left_hand_joint_xyz_reshaped = left_hand_joint_xyz[:, :, np.newaxis] # 重塑关节位置
    9. left_transformed_joint_xyz = np.matmul(left_rotation_matrix, left_hand_joint_xyz_reshaped) # 应用旋转矩阵
    10. left_hand_joint_xyz = left_transformed_joint_xyz[:, :, 0] # 更新关节位置
    11. left_hand_joint_xyz[:, -1] = -left_hand_joint_xyz[:, -1] # z轴反转
    12. rotation_matrix = axangle2mat(np.array([0, 1, 0]), -np.pi * 1 / 2) # 绕y轴旋转
    13. left_hand_joint_xyz = np.dot(left_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
    14. rotation_matrix = axangle2mat(np.array([1, 0, 0]), np.pi * 1 / 2) # 绕x轴旋转
    15. left_hand_joint_xyz = np.dot(left_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
    16. rotation_matrix = axangle2mat(np.array([0, 0, 1]), -np.pi * 1 / 2) # 绕z轴旋转
    17. left_hand_joint_xyz = np.dot(left_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
    18. left_hand_joint_xyz = np.dot(left_hand_joint_xyz, euler2mat(*self.left_hand_ori_offset).T) # 应用欧拉角校正
    19. left_hand_joint_xyz = np.dot(left_hand_joint_xyz, euler2mat(*delta_ori_accu_left).T) # 应用累积旋转校正
    20. left_hand_joint_xyz += self.left_hand_offset # 应用平移校正
    21. left_hand_joint_xyz += delta_movement_accu_left # 应用累积平移校正
    22. left_hand_joint_xyz = apply_pose_matrix(left_hand_joint_xyz, pose_2) # 应用位姿矩阵
  10. 设置左手关节球体和连线
    1. # set joint sphere and lines
    2. for i, sphere in enumerate(self.left_joints): # 遍历左手关节球体
    3. transformation = np.eye(4) # 创建4x4单位矩阵
    4. transformation[:3, 3] = left_hand_joint_xyz[i] - sphere.get_center() # 计算平移向量
    5. sphere.transform(transformation) # 应用平移变换
    6. for i, (x, y) in enumerate(lines): # 遍历连线
    7. start = self.left_joints[x].get_center() # 获取起点
    8. end = self.left_joints[y].get_center() # 获取终点
    9. create_or_update_cylinder(start, end, cylinder_list=self.left_line_set, cylinder_idx=i) # 创建或更新圆柱体
  11. 加载并处理右手关节数据
    1. # right hand, read from joint
    2. right_hand_joint_xyz = np.loadtxt(os.path.join(frame_dir, "right_hand_joint.txt")) # 读取右手关节位置
    3. self.right_hand_joint_xyz = right_hand_joint_xyz # 存储右手关节位置
    4. right_hand_joint_xyz = translate_wrist_to_origin(right_hand_joint_xyz) # 平移手腕到原点
    5. right_hand_joint_ori = np.loadtxt(os.path.join(frame_dir, "right_hand_joint_ori.txt"))[0] # 读取右手关节方向
    6. self.right_hand_wrist_ori = right_hand_joint_ori # 存储右手手腕方向
    7. right_rotation_matrix = Rotation.from_quat(right_hand_joint_ori).as_matrix().T # 计算旋转矩阵
    8. right_joint_xyz_reshaped = right_hand_joint_xyz[:, :, np.newaxis] # 重塑关节位置
    9. right_transformed_joint_xyz = np.matmul(right_rotation_matrix, right_joint_xyz_reshaped) # 应用旋转矩阵
    10. right_hand_joint_xyz = right_transformed_joint_xyz[:, :, 0] # 更新关节位置
    11. right_hand_joint_xyz[:, -1] = -right_hand_joint_xyz[:, -1] # z轴反转
    12. rotation_matrix = axangle2mat(np.array([0, 1, 0]), -np.pi * 1 / 2) # 绕y轴旋转
    13. right_hand_joint_xyz = np.dot(right_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
    14. rotation_matrix = axangle2mat(np.array([1, 0, 0]), np.pi * 1 / 2) # 绕x轴旋转
    15. right_hand_joint_xyz = np.dot(right_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
    16. rotation_matrix = axangle2mat(np.array([0, 0, 1]), -np.pi * 1 / 2) # 绕z轴旋转
    17. right_hand_joint_xyz = np.dot(right_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
    18. right_hand_joint_xyz = np.dot(right_hand_joint_xyz, euler2mat(*self.right_hand_ori_offset).T) # 应用欧拉角校正
    19. right_hand_joint_xyz = np.dot(right_hand_joint_xyz, euler2mat(*delta_ori_accu).T) # 应用累积旋转校正
    20. right_hand_joint_xyz += self.right_hand_offset # 应用平移校正
    21. right_hand_joint_xyz += delta_movement_accu # 应用累积平移校正
    22. right_hand_joint_xyz = apply_pose_matrix(right_hand_joint_xyz, pose_3) # 应用位姿矩阵
  12. 可视化2D图像
    1. if vis_2d: # 如果启用了2D可视化
    2. color_image = np.asarray(rgbd.color) # 获取彩色图像
    3. color_image = cv2.cvtColor(color_image, cv2.COLOR_RGB2BGR) # 转换颜色空间
    4. left_hand_joint_points_homogeneous = np.hstack((left_hand_joint_xyz, np.ones((left_hand_joint_xyz.shape[0], 1)))) # 转换为齐次坐标
    5. left_hand_transformed_points_homogeneous = np.dot(left_hand_joint_points_homogeneous, np.linalg.inv(corrected_pose).T) # 应用逆变换
    6. left_hand_points_to_project = left_hand_transformed_points_homogeneous[:, :3] / left_hand_transformed_points_homogeneous[:, [3]] # 归一化
    7. left_hand_back_projected_points = [self._back_project_point(point, o3d_depth_intrinsic.intrinsic_matrix) for point in left_hand_points_to_project] # 反投影
    8. for i in range(len(left_hand_back_projected_points)): # 遍历左手关节点
    9. u, v = left_hand_back_projected_points[i] # 获取投影点坐标
    10. if i in [0, 1, 5, 9, 13, 17]:
    11. cv2.circle(color_image, (u, v), 10, (0, 255, 0), -1) # 绘制绿色圆点
    12. elif i in [4, 8, 12, 16, 20]:
    13. cv2.circle(color_image, (u, v), 10, (255, 0, 0), -1) # 绘制蓝色圆点
    14. else:
    15. cv2.circle(color_image, (u, v), 10, (0, 0, 255), -1) # 绘制红色圆点
    16. if i in [1, 2, 3, 4]:
    17. cv2.circle(color_image, (u, v), 10, (255, 0, 255), -1) # 绘制紫色圆点
    18. right_hand_joint_points_homogeneous = np.hstack((right_hand_joint_xyz, np.ones((right_hand_joint_xyz.shape[0], 1)))) # 转换为齐次坐标
    19. right_hand_transformed_points_homogeneous = np.dot(right_hand_joint_points_homogeneous, np.linalg.inv(corrected_pose).T) # 应用逆变换
    20. right_hand_points_to_project = right_hand_transformed_points_homogeneous[:, :3] / right_hand_transformed_points_homogeneous[:, [3]] # 归一化
    21. right_hand_back_projected_points = [self._back_project_point(point, o3d_depth_intrinsic.intrinsic_matrix) for point in right_hand_points_to_project] # 反投影
    22. for i in range(len(right_hand_back_projected_points)): # 遍历右手关节点
    23. u, v = right_hand_back_projected_points[i] # 获取投影点坐标
    24. if i in [0, 1, 5, 9, 13, 17]:
    25. cv2.circle(color_image, (u, v), 10, (0, 255, 0), -1) # 绘制绿色圆点
    26. elif i in [4, 8, 12, 16, 20]:
    27. cv2.circle(color_image, (u, v), 10, (255, 0, 0), -1) # 绘制蓝色圆点
    28. else:
    29. cv2.circle(color_image, (u, v), 10, (0, 0, 255), -1) # 绘制红色圆点
    30. if i in [1, 2, 3, 4]:
    31. cv2.circle(color_image, (u, v), 10, (255, 0, if vis_2d: # 如果启用了2D可视化
    32. color_image = np.asarray(rgbd.color) # 获取彩色图像
    33. color_image = cv2.cvtColor(color_image, cv2.COLOR_RGB2BGR) # 转换颜色空间
    34. left_hand_joint_points_homogeneous = np.hstack((left_hand_joint_xyz, np.ones((left_hand_joint_xyz.shape[0], 1)))) # 转换为齐次坐标
    35. left_hand_transformed_points_homogeneous = np.dot(left_hand_joint_points_homogeneous, np.linalg.inv(corrected_pose).T) # 应用逆变换
    36. left_hand_points_to_project = left_hand_transformed_points_homogeneous[:, :3] / left_hand_transformed_points_homogeneous[:, [3]] # 归一化
    37. left_hand_back_projected_points = [self._back_project_point(point, o3d_depth_intrinsic.intrinsic_matrix) for point in left_hand_points_to_project] # 反投影
    38. for i in range(len(left_hand_back_projected_points)): # 遍历左手关节点
    39. u, v = left_hand_back_projected_points[i] # 获取投影点坐标
    40. if i in [0, 1, 5, 9, 13, 17]:
    41. cv2.circle(color_image, (u, v), 10, (0, 255, 0), -1) # 绘制绿色圆点
    42. elif i in [4, 8, 12, 16, 20]:
    43. cv2.circle(color_image, (u, v), 10, (255, 0, 0), -1) # 绘制蓝色圆点
    44. else:
    45. cv2.circle(color_image, (u, v), 10, (0, 0, 255), -1) # 绘制红色圆点
    46. if i in [1, 2, 3, 4]:
    47. cv2.circle(color_image, (u, v), 10, (255, 0, 255), -1) # 绘制紫色圆点
    48. right_hand_joint_points_homogeneous = np.hstack((right_hand_joint_xyz, np.ones((right_hand_joint_xyz.shape[0], 1)))) # 转换为齐次坐标
    49. right_hand_transformed_points_homogeneous = np.dot(right_hand_joint_points_homogeneous, np.linalg.inv(corrected_pose).T) # 应用逆变换
    50. right_hand_points_to_project = right_hand_transformed_points_homogeneous[:, :3] / right_hand_transformed_points_homogeneous[:, [3]] # 归一化
    51. right_hand_back_projected_points = [self._back_project_point(point, o3d_depth_intrinsic.intrinsic_matrix) for point in right_hand_points_to_project] # 反投影
    52. for i in range(len(right_hand_back_projected_points)): # 遍历右手关节点
    53. u, v = right_hand_back_projected_points[i] # 获取投影点坐标
    54. if i in [0, 1, 5, 9, 13, 17]:
    55. cv2.circle(color_image, (u, v), 10, (0, 255, 0), -1) # 绘制绿色圆点
    56. elif i in [4, 8, 12, 16, 20]:
    57. cv2.circle(color_image, (u, v), 10, (255, 0, 0), -1) # 绘制蓝色圆点
    58. else:
    59. cv2.circle(color_image, (u, v), 10, (0, 0, 255), -1) # 绘制红色圆点
    60. if i in [1, 2, 3, 4]:
    61. cv2.circle(color_image, (u, v), 10, (255, 0,
  13. 最后使用Open3D和OpenCV库来处理和现实3D和2D数据
    以下是2D可视化部分
    将RGBD图像的彩色部分转换为NumPy数组,并从RGB格式转换为BGR格式
    1. if vis_2d: # 如果启用了2D可视化
    2. color_image = np.asarray(rgbd.color) # 将RGBD图像的彩色部分转换为NumPy数组
    3. color_image = cv2.cvtColor(color_image, cv2.COLOR_RGB2BGR) # 将图像从RGB格式转换为BGR格式
    处理左手和右手关节数据,将其转换为齐次坐标,应用校正矩阵,转换为 3D坐标,并反向投影到图像平面
    在图像上绘制不同颜色的圆点表示不同的关节点
    1. # 处理左手关节数据
    2. left_hand_joint_points_homogeneous = np.hstack((left_hand_joint_xyz, np.ones((left_hand_joint_xyz.shape[0], 1)))) # 将左手关节位置转换为齐次坐标
    3. left_hand_transformed_points_homogeneous = np.dot(left_hand_joint_points_homogeneous, np.linalg.inv(corrected_pose).T) # 应用校正矩阵
    4. left_hand_points_to_project = left_hand_transformed_points_homogeneous[:, :3] / left_hand_transformed_points_homogeneous[:, [3]] # 将齐次坐标转换为3D坐标
    5. left_hand_back_projected_points = [self._back_project_point(point, o3d_depth_intrinsic.intrinsic_matrix) for point in left_hand_points_to_project] # 反向投影到图像平面
    6. for i in range(len(left_hand_back_projected_points)): # 遍历所有左手关节点
    7. u, v = left_hand_back_projected_points[i] # 获取关节点的图像坐标
    8. if i in [0, 1, 5, 9, 13, 17]: # 如果关节点是手腕或指根
    9. cv2.circle(color_image, (u, v), 10, (0, 255, 0), -1) # 画绿色圆点
    10. elif i in [4, 8, 12, 16, 20]: # 如果关节点是指尖
    11. cv2.circle(color_image, (u, v), 10, (255, 0, 0), -1) # 画蓝色圆点
    12. else: # 其他关节点
    13. cv2.circle(color_image, (u, v), 10, (0, 0, 255), -1) # 画红色圆点
    14. if i in [1, 2, 3, 4]: # 如果关节点是拇指
    15. cv2.circle(color_image, (u, v), 10, (255, 0, 255), -1) # 画紫色圆点
    16. # 处理右手关节数据
    17. right_hand_joint_points_homogeneous = np.hstack((right_hand_joint_xyz, np.ones((right_hand_joint_xyz.shape[0], 1)))) # 将右手关节位置转换为齐次坐标
    18. right_hand_transformed_points_homogeneous = np.dot(right_hand_joint_points_homogeneous, np.linalg.inv(corrected_pose).T) # 应用校正矩阵
    19. right_hand_points_to_project = right_hand_transformed_points_homogeneous[:, :3] / right_hand_transformed_points_homogeneous[:, [3]] # 将齐次坐标转换为3D坐标
    20. right_hand_back_projected_points = [self._back_project_point(point, o3d_depth_intrinsic.intrinsic_matrix) for point in right_hand_points_to_project] # 反向投影到图像平面
    21. for i in range(len(right_hand_back_projected_points)): # 遍历所有右手关节点
    22. u, v = right_hand_back_projected_points[i] # 获取关节点的图像坐标
    23. if i in [0, 1, 5, 9, 13, 17]: # 如果关节点是手腕或指根
    24. cv2.circle(color_image, (u, v), 10, (0, 255, 0), -1) # 画绿色圆点
    25. elif i in [4, 8, 12, 16, 20]: # 如果关节点是指尖
    26. cv2.circle(color_image, (u, v), 10, (255, 0, 0), -1) # 画蓝色圆点
    27. else: # 其他关节点
    28. cv2.circle(color_image, (u, v), 10, (0, 0, 255), -1) # 画红色圆点
    29. if i in [1, 2, 3, 4]: # 如果关节点是拇指
    30. cv2.circle(color_image, (u, v), 10, (255, 0, 255), -1) # 画紫色圆点

    显示带有反向投影点的图像,并在按下'q'键时退出铺环
    1. cv2.imshow("Back-projected Points on Image", color_image) # 显示带有反向投影点的图像
    2. # 如果按下'q'键,退出循环
    3. if cv2.waitKey(1) & 0xFF == ord('q'):
    4. return # 退出函数

    以下是3D可视化部分
    设置右手关节球体的位置
    创建或更新右手关节之问的连线
    1. # 设置关节球体和连线
    2. for i, sphere in enumerate(self.right_joints): # 遍历右手关节球体
    3. transformation = np.eye(4) # 创建4x4单位矩阵
    4. transformation[:3, 3] = right_hand_joint_xyz[i] - sphere.get_center() # 计算平移向量
    5. sphere.transform(transformation) # 应用平移变换
    6. for i, (x, y) in enumerate(lines): # 遍历连线
    7. start = self.right_joints[x].get_center() # 获取连线起点
    8. end = self.right_joints[y].get_center() # 获取连线终点
    9. create_or_update_cylinder(start, end, cylinder_list=self.right_line_set, cylinder_idx=i) # 创建或更新连线的圆柱体
    10. return True # 返回True表示成功

第二部分 STEP2_build_dataset

先导入库

  1. import h5py # 用于处理HDF5文件
  2. import json # 用于处理JSON数据
  3. from scipy.linalg import svd # 用于计算奇异值分解
  4. from utils import * # 导入自定义的工具函数
  5. from hyperparameters import * # 导入超参数
  6. from pybullet_ik_bimanual import LeapPybulletIK # 导入用于逆运动学计算的类

然后初始化全局变量

  1. leapPybulletIK = LeapPybulletIK() # 创建LeapPybulletIK对象
  2. R_delta_init = None # 初始化旋转矩阵

2.1 dataset_utils.py

2.1.1 read_pose_data:读取和处理手部姿态数据(过程中使用 LeapPybulLetIK 计算逆运动学,得到手部关节的目标位置)

read_pose_data 用于读取和处理手部姿态数据。该函数从指定的文件路径中加载手部关节位置和
方向数据,并进行一系列转换和校正,最终返回处理后的数据

  1. 先定义函数本身和一些全局变量
    1. def read_pose_data(frame_path, demo_path, fixed_trans_to_robot_table, first_frame=False): # 定义读取姿态数据的函数
    2. global leapPybulletIK # 声明全局变量 leapPybulletIK
    3. cam_pose_path = os.path.join(frame_path, "pose.txt") # 相机姿态文件路径
  2. 加载左手姿态数据
    1. # 加载左手姿态数据
    2. left_pose_path = os.path.join(frame_path, "pose_2.txt") # 左手姿态文件路径
    3. left_hand_pos_path = os.path.join(frame_path, "left_hand_joint.txt") # 左手关节位置文件路径
    4. left_hand_ori_path = os.path.join(frame_path, "left_hand_joint_ori.txt") # 左手关节方向文件路径
    5. left_hand_off_path = os.path.join(demo_path, "calib_offset_left.txt") # 左手校准偏移量文件路径
    6. left_hand_off_ori_path = os.path.join(demo_path, "calib_ori_offset_left.txt") # 左手校准方向偏移量文件路径
    7. pose_2 = np.loadtxt(left_pose_path) # 加载左手姿态数据
    8. pose_2[:3, 3] += fixed_trans_to_robot_table.T # 应用固定的平移量
    9. pose_2 = pose_2 @ between_cam_2 # 应用相机之间的转换矩阵
    10. left_hand_joint_xyz = np.loadtxt(left_hand_pos_path) # 加载左手关节位置数据
    11. left_hand_joint_xyz = translate_wrist_to_origin(left_hand_joint_xyz) # 将手腕位置平移到原点
    12. left_hand_wrist_ori = np.loadtxt(left_hand_ori_path)[0] # 加载左手关节方向数据
  3. 加载右手姿态数据
    1. # 加载右手姿态数据
    2. pose_path = os.path.join(frame_path, "pose_3.txt") # 右手姿态文件路径
    3. hand_pos_path = os.path.join(frame_path, "right_hand_joint.txt") # 右手关节位置文件路径
    4. hand_ori_path = os.path.join(frame_path, "right_hand_joint_ori.txt") # 右手关节方向文件路径
    5. hand_off_path = os.path.join(demo_path, "calib_offset.txt") # 右手校准偏移量文件路径
    6. hand_off_ori_path = os.path.join(demo_path, "calib_ori_offset.txt") # 右手校准方向偏移量文件路径
    7. pose_3 = np.loadtxt(pose_path) # 加载右手姿态数据
    8. pose_3[:3, 3] += fixed_trans_to_robot_table.T # 应用固定的平移量
    9. pose_3 = pose_3 @ between_cam_3 # 应用相机之间的转换矩阵
    10. right_hand_joint_xyz = np.loadtxt(hand_pos_path) # 加载右手关节位置数据
    11. right_hand_joint_xyz = translate_wrist_to_origin(right_hand_joint_xyz) # 将手腕位置平移到原点
    12. right_hand_wrist_ori = np.loadtxt(hand_ori_path)[0] # 加载右手关节方向数据
  4. 计算逆运动学,其中计算逆运动学compute_IK的实现将在下文分析、讲解
    1. right_hand_target, left_hand_target, right_hand_points, left_hand_points = leapPybulletIK.compute_IK(right_hand_joint_xyz, right_hand_wrist_ori, left_hand_joint_xyz, left_hand_wrist_ori) # 计算逆运动学
    2. np.savetxt(os.path.join(frame_path, "right_joints.txt"), right_hand_target) # 保存右手关节目标位置
    3. np.savetxt(os.path.join(frame_path, "left_joints.txt"), left_hand_target) # 保存左手关节目标位置
  5. 转换左手姿态
    1. # 转换左手姿态
    2. left_rotation_matrix = Rotation.from_quat(left_hand_wrist_ori).as_matrix().T # 将四元数转换为旋转矩阵
    3. left_joint_xyz_reshaped = left_hand_joint_xyz[:, :, np.newaxis] # 重塑左手关节位置数组
    4. left_transformed_joint_xyz = np.matmul(left_rotation_matrix, left_joint_xyz_reshaped) # 应用旋转矩阵
    5. left_hand_joint_xyz = left_transformed_joint_xyz[:, :, 0] # 获取转换后的左手关节位置
    6. left_hand_joint_xyz[:, -1] = -left_hand_joint_xyz[:, -1] # 反转z轴
    7. rotation_matrix = axangle2mat(np.array([0, 1, 0]), -np.pi * 1 / 2) # 绕y轴旋转
    8. left_hand_joint_xyz = np.dot(left_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
    9. rotation_matrix = axangle2mat(np.array([1, 0, 0]), np.pi * 1 / 2) # 绕x轴旋转
    10. left_hand_joint_xyz = np.dot(left_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
    11. rotation_matrix = axangle2mat(np.array([0, 0, 1]), -np.pi * 1 / 2) # 绕z轴旋转
    12. left_hand_joint_xyz = np.dot(left_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
    13. left_hand_ori_offset = np.loadtxt(left_hand_off_ori_path) # 加载左手校准方向偏移量
    14. left_hand_joint_xyz = np.dot(left_hand_joint_xyz, euler2mat(*left_hand_ori_offset).T) # 应用旋转校准
    15. left_hand_offset = np.loadtxt(left_hand_off_path) # 加载左手校准偏移量
    16. left_hand_joint_xyz += left_hand_offset # 应用偏移量
    17. left_hand_joint_xyz = apply_pose_matrix(left_hand_joint_xyz, pose_2) # 应用姿态矩阵
    18. update_pose_2 = copy.deepcopy(pose_2) # 复制左手姿态矩阵
    19. update_pose_2[:3, 3] = left_hand_joint_xyz[0] # 更新左手姿态矩阵的平移部分
    20. left_hand_joint_xyz = apply_pose_matrix(left_hand_joint_xyz, inverse_transformation(update_pose_2)) # 应用逆变换
    21. # 重要!由于手套上的相机安装角度为45度,需要在此处转换以获得正确的手部方向
    22. rotation_45lookup_matrix = axangle2mat(np.array([1, 0, 0]), np.pi * 1 / 4) # 绕z轴旋转45度
    23. update_pose_2[:3, :3] = np.dot(update_pose_2[:3, :3], rotation_45lookup_matrix.T) # 应用旋转矩阵
    24. if not first_frame:
    25. update_pose_2 = hand_to_robot_left(update_pose_2) # 转换为机器人左手坐标系
    26. left_hand_translation = update_pose_2[:3, 3] # 获取左手平移部分
    27. left_hand_rotation_matrix = update_pose_2[:3, :3] # 获取左手旋转矩阵
    28. left_hand_quaternion = mat2quat(left_hand_rotation_matrix) # 将旋转矩阵转换为四元数
  6. 转换右手姿态
    1. # 转换右手姿态
    2. right_rotation_matrix = Rotation.from_quat(right_hand_wrist_ori).as_matrix().T # 将四元数转换为旋转矩阵
    3. right_joint_xyz_reshaped = right_hand_joint_xyz[:, :, np.newaxis] # 重塑右手关节位置数组
    4. right_transformed_joint_xyz = np.matmul(right_rotation_matrix, right_joint_xyz_reshaped) # 应用旋转矩阵
    5. right_hand_joint_xyz = right_transformed_joint_xyz[:, :, 0] # 获取转换后的右手关节位置
    6. right_hand_joint_xyz[:, -1] = -right_hand_joint_xyz[:, -1] # 反转z轴
    7. rotation_matrix = axangle2mat(np.array([0, 1, 0]), -np.pi * 1 / 2) # 绕y轴旋转
    8. right_hand_joint_xyz = np.dot(right_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
    9. rotation_matrix = axangle2mat(np.array([1, 0, 0]), np.pi * 1 / 2) # 绕x轴旋转
    10. right_hand_joint_xyz = np.dot(right_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
    11. rotation_matrix = axangle2mat(np.array([0, 0, 1]), -np.pi * 1 / 2) # 绕z轴旋转
    12. right_hand_joint_xyz = np.dot(right_hand_joint_xyz, rotation_matrix.T) # 应用旋转矩阵
    13. right_hand_ori_offset = np.loadtxt(hand_off_ori_path) # 加载右手校准方向偏移量
    14. right_hand_joint_xyz = np.dot(right_hand_joint_xyz, euler2mat(*right_hand_ori_offset).T) # 应用旋转校准
    15. right_hand_offset = np.loadtxt(hand_off_path) # 加载右手校准偏移量
    16. right_hand_joint_xyz += right_hand_offset # 应用偏移量
    17. right_hand_joint_xyz = apply_pose_matrix(right_hand_joint_xyz, pose_3) # 应用姿态矩阵
    18. update_pose_3 = copy.deepcopy(pose_3) # 复制右手姿态矩阵
    19. update_pose_3[:3, 3] = right_hand_joint_xyz[0] # 更新右手姿态矩阵的平移部分
    20. right_hand_joint_xyz = apply_pose_matrix(right_hand_joint_xyz, inverse_transformation(update_pose_3)) # 应用逆变换
    21. # 重要!由于手套上的相机安装角度为45度,需要在此处转换以获得正确的手部方向
    22. rotation_45lookup_matrix = axangle2mat(np.array([1, 0, 0]), np.pi * 1 / 4) # 绕z轴旋转45度
    23. update_pose_3[:3, :3] = np.dot(update_pose_3[:3, :3], rotation_45lookup_matrix.T) # 应用旋转矩阵
    24. if not first_frame:
    25. update_pose_3 = hand_to_robot(update_pose_3) # 转换为机器人右手坐标系
    26. right_hand_translation = update_pose_3[:3, 3] # 获取右手平移部分
    27. right_hand_rotation_matrix = update_pose_3[:3, :3] # 获取右手旋转矩阵
    28. right_hand_quaternion = mat2quat(right_hand_rotation_matrix) # 将旋转矩阵转换为四元数
  7. 加载相机姿态数据
    1. cam_pose_4x4 = np.loadtxt(cam_pose_path) # 加载相机姿态数据
    2. cam_pose_4x4[:3, 3] += fixed_trans_to_robot_table.T # 应用固定的平移量
    3. cam_corrected_pose = cam_pose_4x4 @ between_cam # 应用相机之间的转换矩阵
    4. cam_corrected_pose = cam_corrected_pose.flatten() # 将矩阵展平
    5. return (np.concatenate([right_hand_translation, right_hand_quaternion, right_hand_target]),
    6. np.concatenate([left_hand_translation, left_hand_quaternion, left_hand_target]),
    7. cam_corrected_pose,
    8. right_hand_joint_xyz.flatten(),
    9. left_hand_joint_xyz.flatten(),
    10. right_hand_points,
    11. left_hand_points) # 返回处理后的数据

总共,这段代码定义了一个名为read_pose_data 的函数,用于读取和处理手部姿态数据。具体步骤如下:

  1. 加载左手和右手的姿态数据:从指定的文件路径中加载手部关节位置和方向数据
  2. 计算逆运动学:使用 LeapPybulLetIK 计算逆运动学,得到手部关节的目标位置
  3. 转换左手和右手的姿态:将手部关节位置和方向数据转换为标准坐标系,并应用校准偏移量和旋转矩阵
  4. 加载相机姿态数据:从指定的文件路径中加载相机姿态数据,并应用固定的平移量和相机之间的转换矩阵
  5. 返回处理后的数据:返回处理后的手部和相机姿态数据,包括关节位置、方向和目标位置

通过这些步骤,函数能够读取和处理手部姿态数据,并将其转换为标准坐标系,供后续处理和分析使用

2.1.2 process_hdf5:处理多个数据集(手部姿态/图像/点云),作为模型的训练数据集

这段代码的主要功能是处理多个数据集文件夹中的数据——比如手部姿态数据、图像数据、点云数据,并将处理后的数据保存到一个HDF5文件中,从而作为训练机器学习模型的数据集

具体步骤如下:

  1. 初始化:创建Open3D可视化器和空的点云对象,打开HDF5文件用于写入
    1. def process_hdf5(output_hdf5_file, dataset_folders, action_gap, num_points_to_sample, in_wild_data=False): # 定义处理HDF5文件的函数
    2. global R_delta_init # 声明全局变量 R_delta_init
    3. vis = o3d.visualization.Visualizer() # 创建Open3D可视化器
    4. vis.create_window() # 创建可视化窗口
    5. pcd_vis = o3d.geometry.PointCloud() # 初始化空的点云对象
    6. firstfirst = True # 标记是否为第一次更新可视化器
    7. with h5py.File(output_hdf5_file, 'w') as output_hdf5: # 打开HDF5文件用于写入
    8. output_data_group = output_hdf5.create_group('data') # 创建数据组
    9. demo_index = 0 # 初始化演示索引
    10. total_frames = 0 # 初始化总帧数
    11. mean_init_pos = [] # 初始化初始位置均值列表
    12. mean_init_quat = [] # 初始化初始四元数均值列表
  2. 遍历数据集文件夹:加载剪辑标记文件,读取手部姿态数据、图像数据和点云数据
    1. for dataset_folder in dataset_folders: # 遍历数据集文件夹
    2. clip_marks_json = os.path.join(dataset_folder, 'clip_marks.json') # 剪辑标记文件路径
    3. if in_wild_data: # 如果数据是在野外收集的,读取固定的平移量到机器人桌面
    4. fixed_trans_to_robot_table = np.loadtxt(os.path.join(dataset_folder, 'map_to_robot_table_trans.txt'))
    5. else:
    6. fixed_trans_to_robot_table = np.array([0.0, 0.0, 0.0]) # 否则,平移量为零
    7. # 加载剪辑标记
    8. with open(clip_marks_json, 'r') as file:
    9. clip_marks = json.load(file) # 读取剪辑标记
    10. for clip in clip_marks: # 遍历每个剪辑
    11. # 保存第0帧并更新 R_delta_init
    12. frame0_pose_data, frame0_left_pose_data, _, _, _, _, _ = read_pose_data(os.path.join(dataset_folder, f'frame_0'), dataset_folder, fixed_trans_to_robot_table=fixed_trans_to_robot_table, first_frame=True)
    13. update_R_delta_init(frame0_pose_data[:3], frame0_pose_data[3:7]) # 更新 R_delta_init
    14. # 获取起始和结束帧编号
    15. start_frame = int(clip['start'].split('_')[-1])
    16. end_frame = int(clip['end'].split('_')[-1])
    17. clip_length = end_frame - start_frame + 1 # 包括第0帧
    18. agentview_images = [] # 初始化代理视角图像列表
    19. pointcloud = [] # 初始化点云列表
    20. poses = [] # 初始化姿态列表
    21. poses_left = [] # 初始化左手姿态列表
    22. states = [] # 初始化状态列表
    23. glove_states = [] # 初始化手套状态列表
    24. left_glove_states = [] # 初始化左手手套状态列表
    25. labels = [] # 初始化标签列表
  3. 处理每一帧数据:调整图像大小,遮罩手部图像
    1. for frame_number in list(range(start_frame, end_frame + 1)): # 遍历每一帧
    2. frame_folder = f'frame_{frame_number}' # 帧文件夹名称
    3. image_path = os.path.join(dataset_folder, frame_folder, "color_image.jpg") # 彩色图像路径
    4. frame_path = os.path.join(dataset_folder, frame_folder) # 帧文件夹路径
    5. # 加载手部姿态数据
    6. pose_data, left_pose_data, cam_data, glove_data, left_glove_data, right_hand_points, left_hand_points = read_pose_data(frame_path, dataset_folder, fixed_trans_to_robot_table=fixed_trans_to_robot_table)
    7. poses.append(pose_data) # 添加右手姿态数据
    8. poses_left.append(left_pose_data) # 添加左手姿态数据
    9. states.append(cam_data) # 添加相机数据
    10. glove_states.append(glove_data) # 添加右手手套数据
    11. left_glove_states.append(left_glove_data) # 添加左手手套数据
    12. # 处理图像
    13. resized_image = resize_image(image_path) # 调整图像大小
    14. resized_image, right_hand_show = mask_image(resized_image, pose_data, cam_data) # 遮罩右手图像
    15. resized_image, left_hand_show = mask_image(resized_image, left_pose_data, cam_data, left=True) # 遮罩左手图像
    16. agentview_images.append(resized_image) # 添加调整大小后的图像
    处理点云数据
    1. # 处理点云
    2. color_image_o3d = o3d.io.read_image(os.path.join(dataset_folder, frame_folder, "color_image.jpg")) # 读取彩色图像
    3. depth_image_o3d = o3d.io.read_image(os.path.join(dataset_folder, frame_folder, "depth_image.png")) # 读取深度图像
    4. max_depth = 1000 # 最大深度
    5. depth_array = np.asarray(depth_image_o3d) # 将深度图像转换为数组
    6. mask = depth_array > max_depth # 创建深度掩码
    7. depth_array[mask] = 0 # 将超过最大深度的值设为0
    8. filtered_depth_image = o3d.geometry.Image(depth_array) # 创建过滤后的深度图像
    9. rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(color_image_o3d, filtered_depth_image, depth_trunc=4.0, convert_rgb_to_intensity=False) # 创建RGBD图像
    10. pose_4x4 = np.loadtxt(os.path.join(dataset_folder, frame_folder, "pose.txt")) # 加载姿态数据
    11. pose_4x4[:3, 3] += fixed_trans_to_robot_table.T # 应用固定的平移量
    12. corrected_pose = pose_4x4 @ between_cam # 应用相机之间的转换矩阵
    13. pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, o3d_depth_intrinsic) # 从RGBD图像创建点云
    14. pcd.transform(corrected_pose) # 应用校正矩阵
    15. color_pcd = np.concatenate((np.array(pcd.points), np.array(pcd.colors)), axis=-1) # 合并点云和颜色数据
    16. if right_hand_show: # 如果检测到右手,合并右手点云
    17. transformed_point_cloud = transform_right_leap_pointcloud_to_camera_frame(right_hand_points, pose_data)
    18. colored_hand_point_cloud = np.concatenate((transformed_point_cloud, np.zeros((transformed_point_cloud.shape[0], 3))), axis=1)
    19. color_pcd = np.concatenate((color_pcd, colored_hand_point_cloud), axis=0)
    20. if left_hand_show: # 如果检测到左手,合并左手点云
    21. transformed_point_cloud_left = transform_left_leap_pointcloud_to_camera_frame(left_hand_points, left_pose_data)
    22. colored_hand_point_cloud_left = np.concatenate((transformed_point_cloud_left, np.zeros((transformed_point_cloud_left.shape[0], 3))), axis=1)
    23. color_pcd = np.concatenate((color_pcd, colored_hand_point_cloud_left), axis=0)
    移除冗余点
    1. # 移除桌面表面和背景下方的冗余点
    2. centroid = np.mean(robot_table_corner_points, axis=0) # 计算质心
    3. A = robot_table_corner_points - centroid # 计算偏移
    4. U, S, Vt = svd(A) # 进行奇异值分解
    5. normal = Vt[-1] # 获取法向量
    6. d = -np.dot(normal, centroid) # 计算平面方程中的d
    7. xyz = color_pcd[:, :3] # 获取点云的xyz坐标
    8. for plane_gap in table_sweep_list: # 遍历平面高度
    9. below_plane = np.dot(xyz, normal[:3]) + d + plane_gap < 0 # 判断点是否在平面下方
    10. if len(color_pcd[~below_plane]) > num_points_to_sample: # 如果点数大于采样点数
    11. color_pcd = color_pcd[~below_plane] # 移除平面下方的点
    12. break
    下采样点云
    1. # 下采样点云
    2. if len(color_pcd) > num_points_to_sample:
    3. indices = np.random.choice(len(color_pcd), num_points_to_sample, replace=False) # 随机选择点
    4. color_pcd = color_pcd[indices] # 获取采样点
    5. pointcloud.append(copy.deepcopy(color_pcd)) # 添加点云数据
    6. labels.append(0) # 添加标签
    7. # 更新点云可视化
    8. pcd_vis.points = o3d.utility.Vector3dVector(color_pcd[:, :3]) # 设置点云的点
    9. pcd_vis.colors = o3d.utility.Vector3dVector(color_pcd[:, 3:]) # 设置点云的颜色
    10. if firstfirst:
    11. vis.add_geometry(pcd_vis) # 添加几何体到可视化器
    12. firstfirst = False # 标记为非第一次
    13. else:
    14. vis.update_geometry(pcd_vis) # 更新几何体
    15. vis.poll_events() # 处理可视化事件
    16. vis.update_renderer() # 更新渲染器
  4. 更新可视化:更新点云和图像的可视化显示
    1. # 更新图像可视化
    2. cv2.imshow("masked_resized_image", resized_image) # 显示遮罩后的图像
    3. cv2.waitKey(1) # 等待键盘输入
  5. 生成动作轨迹:根据 action_gap 生成动作轨迹,并保存到HDF5文件中
    先
    1. poses = np.array(poses) # 转换姿态列表为数组
    2. robot0_eef_pos = poses[:, :3] # 获取末端执行器位置
    3. robot0_eef_quat = poses[:, 3:7] # 获取末端执行器四元数
    4. robot0_eef_hand = (poses[:, 7:] - np.pi) * 0.5 # 缩放手部关节位置
    5. poses_left = np.array(poses_left) # 转换左手姿态列表为数组
    6. robot0_eef_pos_left = poses_left[:, :3] # 获取左手末端执行器位置
    7. robot0_eef_quat_left = poses_left[:, 3:7] # 获取左手末端执行器四元数
    8. robot0_eef_hand_left = (poses_left[:, 7:] - np.pi) * 0.5 # 缩放左手关节位置
    9. robot0_eef_pos = np.concatenate((robot0_eef_pos, robot0_eef_pos_left), axis=-1) # 合并左右手末端执行器位置
    10. robot0_eef_quat = np.concatenate((robot0_eef_quat, robot0_eef_quat_left), axis=-1) # 合并左右手末端执行器四元数
    11. robot0_eef_hand = np.concatenate((robot0_eef_hand, robot0_eef_hand_left), axis=-1) # 合并左右手关节位置
    12. actions_pos = np.concatenate((robot0_eef_pos[action_gap:], robot0_eef_pos[-1:].repeat(action_gap, axis=0)), axis=0) # 生成动作位置
    13. actions_rot = np.concatenate((robot0_eef_quat[action_gap:], robot0_eef_quat[-1:].repeat(action_gap, axis=0)), axis=0) # 生成动作旋转
    14. actions_hand = np.concatenate((robot0_eef_hand[action_gap:], robot0_eef_hand[-1:].repeat(action_gap, axis=0)), axis=0) # 生成动作手部姿态
    15. actions = np.concatenate((actions_pos, actions_rot, actions_hand), axis=-1) # 合并手臂和手部动作
    后
    1. for j in range(action_gap): # 根据 action_gap 生成轨迹
    2. demo_name = f'demo_{demo_index}' # 演示名称
    3. output_demo_group = output_data_group.create_group(demo_name) # 创建演示组
    4. print("{} saved".format(demo_name)) # 打印保存信息
    5. demo_index += 1 # 增加演示索引
    6. output_demo_group.attrs['frame_0_eef_pos'] = frame0_pose_data[:3] # 设置第0帧末端执行器位置
    7. output_demo_group.attrs['frame_0_eef_quat'] = frame0_pose_data[3:7] # 设置第0帧末端执行器四元数
    8. output_obs_group = output_demo_group.create_group('obs') # 创建观察组
    9. output_obs_group.create_dataset('agentview_image', data=np.array(agentview_images)[j::action_gap]) # 保存代理视角图像
    10. output_obs_group.create_dataset('pointcloud', data=np.array(pointcloud)[j::action_gap]) # 保存点云数据
    11. output_obs_group.create_dataset('robot0_eef_pos', data=copy.deepcopy(robot0_eef_pos)[j::action_gap]) # 保存末端执行器位置
    12. output_obs_group.create_dataset('robot0_eef_quat', data=copy.deepcopy(robot0_eef_quat)[j::action_gap]) # 保存末端执行器四元数
    13. output_obs_group.create_dataset('robot0_eef_hand', data=copy.deepcopy(robot0_eef_hand)[j::action_gap]) # 保存手部姿态
    14. output_obs_group.create_dataset('label', data=np.array(labels)[j::action_gap]) # 保存标签
    15. output_demo_group.create_dataset('actions', data=copy.deepcopy(actions)[j::action_gap]) # 保存动作
    16. # 创建 'dones', 'rewards', 和 'states'
    17. dones = np.zeros(clip_length, dtype=np.int64) # 初始化 'dones'
    18. dones[-1] = 1 # 设置最后一帧的 'done' 为 1
    19. output_demo_group.create_dataset('dones', data=dones[j::action_gap]) # 保存 'dones'
    20. rewards = np.zeros(clip_length, dtype=np.float64) # 初始化 'rewards'
    21. output_demo_group.create_dataset('rewards', data=rewards[j::action_gap]) # 保存 'rewards'
    22. output_demo_group.create_dataset('states', data=states[j::action_gap]) # 保存状态
    23. output_demo_group.create_dataset('glove_states', data=glove_states[j::action_gap]) # 保存手套状态
    24. output_demo_group.attrs['num_samples'] = len(actions[j::action_gap]) # 设置样本数量
    25. total_frames += len(actions[j::action_gap]) # 增加总帧数
    26. mean_init_pos.append(copy.deepcopy(robot0_eef_pos[j])) # 添加初始位置
    27. mean_init_quat.append(copy.deepcopy(robot0_eef_quat[j])) # 添加初始四元数
    28. mean_init_hand.append(copy.deepcopy(robot0_eef_hand[j])) # 添加初始手部姿态
    29. output_data_group.attrs['total'] = total_frames # 设置总帧数
  6. 计算初始位置的均值:计算初始位置、四元数和手部姿态的均值,并保存到HDF5文件中
    1. # 计算初始位置的均值
    2. mean_init_pos = np.array(mean_init_pos).mean(axis=0) # 计算初始位置均值
    3. mean_init_quat = mean_init_quat[0] # 获取初始四元数
    4. mean_init_hand = np.array(mean_init_hand).mean(axis=0) # 计算初始手部姿态均值
    5. output_data_group.attrs['mean_init_pos'] = mean_init_pos # 设置初始位置均值
    6. output_data_group.attrs['mean_init_quat'] = mean_init_quat # 设置初始四元数
    7. output_data_group.attrs['mean_init_hand'] = mean_init_hand # 设置初始手部姿态均值

2.2 pybullet_ik_bimanual.py——含点云生成、计算逆运动学compute_IK的实现

2.2.1 导入库、类的定义和初始化、获取网格点云的方法等等

  1. 导入库
    1. import pybullet_data # 导入 PyBullet 数据库
    2. from yourdfpy import URDF # 导入 URDF 解析库
    3. from transforms3d.euler import quat2euler, euler2quat # 导入四元数和欧拉角转换函数
    4. from utils import * # 导入自定义工具函数
    5. from hyperparameters import * # 导入超参数
  2. 类定义和初始化
    1. class LeapPybulletIK(): # 定义 LeapPybulletIK 类
    2. def __init__(self): # 初始化方法
    3. # 启动 PyBullet
    4. clid = p.connect(p.SHARED_MEMORY) # 尝试连接到共享内存
    5. if clid < 0:
    6. p.connect(p.GUI) # 如果连接失败,则启动 GUI
    7. p.setAdditionalSearchPath(pybullet_data.getDataPath()) # 设置 PyBullet 数据路径
    8. p.loadURDF("plane.urdf", [0, 0, -0.3]) # 加载平面 URDF
    9. # 加载右手 LEAP 模型
    10. self.LeapId = p.loadURDF(
    11. "leap_hand_mesh/robot_pybullet.urdf",
    12. [0.0, 0.0, 0.0],
    13. rotate_quaternion(0.0, 0.0, 0.0),
    14. )
    15. # 加载左手 LEAP 模型
    16. self.left_offset = 1.0 # 为了可视化,将左手和右手分开
    17. self.LeapId_2 = p.loadURDF(
    18. "leap_hand_mesh/robot_pybullet.urdf",
    19. [0.0, self.left_offset, 0.0],
    20. rotate_quaternion(0.0, 0.0, 0.0),
    21. )
    22. self.leap_center_offset = [0.18, 0.03, 0.0] # 由于 LEAP 手部 URDF 的根节点不在手掌根部(在食指根部),我们设置一个偏移量来校正根位置
    23. self.leapEndEffectorIndex = [4, 9, 14, 19] # 指尖关节索引
    24. self.fingertip_offset = np.array([0.1, 0.0, -0.08]) # 由于 URDF 中指尖网格的根节点不在指尖(在指尖网格的右下部分),我们设置一个偏移量来校正指尖位置
    25. self.thumb_offset = np.array([0.1, 0.0, -0.06]) # 同样的原因,校正拇指尖位置
    26. self.numJoints = p.getNumJoints(self.LeapId) # 获取 LEAP 手部的关节数量
    27. self.hand_lower_limits, self.hand_upper_limits, self.hand_joint_ranges = self.get_joint_limits(self.LeapId) # 获取 LEAP 手部的关节限制
    28. self.HAND_Q = np.array([np.pi / 6, -np.pi / 6, np.pi / 3, np.pi / 3,
    29. np.pi / 6, 0.0, np.pi / 3, np.pi / 3,
    30. np.pi / 6, np.pi / 6, np.pi / 3, np.pi / 3,
    31. np.pi / 6, np.pi / 6, np.pi / 3, np.pi / 3]) # 为了避免 LEAP 手部的自碰撞,我们定义一个参考姿态用于零空间 IK
    32. # 加载左手和右手的 URDF,用于在正向运动学期间生成点云
    33. self.urdf_dict = {}
    34. self.Leap_urdf = URDF.load("leap_hand_mesh/robot_pybullet.urdf")
    35. self.Leap_urdf_2 = URDF.load("leap_hand_mesh/robot_pybullet.urdf")
    36. self.urdf_dict["right_leap"] = {
    37. "scene": self.Leap_urdf.scene,
    38. "mesh_list": self._load_meshes(self.Leap_urdf.scene),
    39. }
    40. self.urdf_dict["left_leap"] = {
    41. "scene": self.Leap_urdf_2.scene,
    42. "mesh_list": self._load_meshes(self.Leap_urdf_2.scene),
    43. }
    44. self.create_target_vis() # 创建目标可视化
    45. p.setGravity(0, 0, 0) # 设置重力
    46. useRealTimeSimulation = 0 # 禁用实时模拟
    47. p.setRealTimeSimulation(useRealTimeSimulation) # 设置实时模拟
  3. 获取关节限制的方法
    1. def get_joint_limits(self, robot): # 获取关节限制的方法
    2. joint_lower_limits = [] # 初始化关节下限列表
    3. joint_upper_limits = [] # 初始化关节上限列表
    4. joint_ranges = [] # 初始化关节范围列表
    5. for i in range(p.getNumJoints(robot)): # 遍历所有关节
    6. joint_info = p.getJointInfo(robot, i) # 获取关节信息
    7. if joint_info[2] == p.JOINT_FIXED: # 如果关节是固定的,跳过
    8. continue
    9. joint_lower_limits.append(joint_info[8]) # 添加关节下限
    10. joint_upper_limits.append(joint_info[9]) # 添加关节上限
    11. joint_ranges.append(joint_info[9] - joint_info[8]) # 计算并添加关节范围
    12. return joint_lower_limits, joint_upper_limits, joint_ranges # 返回关节限制
  4. 加载网格的方法
    1. def _load_meshes(self, scene): # 加载网格的方法
    2. mesh_list = [] # 初始化网格列表
    3. for name, g in scene.geometry.items(): # 遍历场景中的几何体
    4. mesh = g.as_open3d # 将几何体转换为 Open3D 网格
    5. mesh_list.append(mesh) # 添加到网格列表
    6. return mesh_list # 返回网格列表
  5. 更新网格的方法
    1. def _update_meshes(self, type): # 更新网格的方法
    2. mesh_new = o3d.geometry.TriangleMesh() # 创建新的三角网格
    3. for idx, name in enumerate(self.urdf_dict[type]["scene"].geometry.keys()): # 遍历几何体
    4. mesh_new += copy.deepcopy(self.urdf_dict[type]["mesh_list"][idx]).transform(
    5. self.urdf_dict[type]["scene"].graph.get(name)[0]
    6. ) # 更新网格
    7. return mesh_new # 返回更新后的网格
  6. 获取网格点云的方法
    顺带提下,通过生成点云,可以更好的理解和处理三维空间中的信息,从而实现更复杂和精确的任务
    1. def get_mesh_pointcloud(self, joint_pos, joint_pos_left): # 获取网格点云的方法
    2. self.Leap_urdf.update_cfg(joint_pos) # 更新右手关节配置
    3. right_mesh = self._update_meshes("right_leap") # 获取更新后的右手网格
    4. robot_pc = right_mesh.sample_points_uniformly(number_of_points=80000) # 从网格中均匀采样点云
    5. self.Leap_urdf_2.update_cfg(joint_pos_left) # 更新左手关节配置
    6. left_mesh = self._update_meshes("left_leap") # 获取更新后的左手网格
    7. robot_pc_left = left_mesh.sample_points_uniformly(number_of_points=80000) # 从网格中均匀采样点云
    8. # 将采样的网格点云转换为 Open3D 期望的格式
    9. new_points = np.asarray(robot_pc.points) # 将点云转换为 NumPy 数组
    10. new_points_left = np.asarray(robot_pc_left.points) # 将点云转换为 NumPy 数组
    11. new_points_left[:, 1] = -1.0 * new_points_left[:, 1] # 翻转右手网格为左手网格
    12. return new_points, new_points_left # 返回左右手的点云
  7. 转换向量的方法
    1. def switch_vector_from_rokoko(self, vector): # 转换向量的方法
    2. return [vector[0], -vector[2], vector[1]] # 返回转换后的向量
  8. 后处理Rokoko位置的方法
    1. def post_process_rokoko_pos(self, rightHandThumb_pos, rightHandIndex_pos, rightHandMiddle_pos, rightHandRing_pos): # 后处理 Rokoko 位置的方法
    2. rightHandThumb_pos[-1] *= -1.0 # 翻转 z 轴
    3. rightHandThumb_pos = self.switch_vector_from_rokoko(rightHandThumb_pos) # 转换向量
    4. rightHandIndex_pos[-1] *= -1.0 # 翻转 z 轴
    5. rightHandIndex_pos = self.switch_vector_from_rokoko(rightHandIndex_pos) # 转换向量
    6. rightHandMiddle_pos[-1] *= -1.0 # 翻转 z 轴
    7. rightHandMiddle_pos = self.switch_vector_from_rokoko(rightHandMiddle_pos) # 转换向量
    8. rightHandRing_pos[-1] *= -1.0 # 翻转 z 轴
    9. rightHandRing_pos = self.switch_vector_from_rokoko(rightHandRing_pos) # 转换向量
    10. return rightHandThumb_pos, rightHandIndex_pos, rightHandMiddle_pos, rightHandRing_pos # 返回处理后的位置
  9. 后处理Rokoko方向的方法
    1. def post_process_rokoko_ori(self, input_quat): # 后处理 Rokoko 方向的方法
    2. wxyz_input_quat = np.array([input_quat[3], input_quat[0], input_quat[1], input_quat[2]]) # 转换为 wxyz 四元数
    3. wxyz_input_mat = quat2mat(wxyz_input_quat) # 转换为旋转矩阵
    4. rot_mat = np.array([[0, 1, 0], [-1, 0, 0], [0, 0, 1]]) # 定义旋转矩阵
    5. wxyz_input_mat = np.dot(wxyz_input_mat, rot_mat) # 应用旋转矩阵
    6. wxyz_transform_quat = mat2quat(wxyz_input_mat) # 转换为四元数
    7. xyzw_transform_quat = np.array([wxyz_transform_quat[1], wxyz_transform_quat[2], wxyz_transform_quat[3], wxyz_transform_quat[0]]) # 转换为 xyzw 四元数
    8. return xyzw_transform_quat # 返回处理后的四元数

2.2.2 create_target_vis、update_target_vis、rest_target_vis的实现

接下来是create_target_vis的实现

以及update_target_vis的实现

  1. 方法定义
    该方法接受五个参数:右手的旋转四元数 (rightHand_rot)、右手拇指、食指、中指和
    无名指的关节位置
    def update_target_vis(self, rightHand_rot, rightHandThumb_pos, rightHandIndex_pos, rightHandMiddle_pos, rightHandRing_pos):  # 定义更新目标可视化的方法
  2. 重置球体位置和方向
    1. p.resetBasePositionAndOrientation(
    2. self.ball6Mbt,
    3. rotate_vector_by_quaternion_using_matrix(self.leap_center_offset, rightHand_rot),
    4. rightHand_rot,
    5. ) # 重置球体 ball6Mbt 的位置和方向
    6. p.resetBasePositionAndOrientation(
    7. self.ball5Mbt,
    8. [0.0, 0.0, 0.0],
    9. rightHand_rot,
    10. ) # 重置球体 ball5Mbt 的位置和方向
    11. p.resetBasePositionAndOrientation(
    12. self.ball9Mbt,
    13. p.getLinkState(self.LeapId, 4)[0],
    14. rightHand_rot,
    15. ) # 重置球体 ball9Mbt 的位置和方向
    16. p.resetBasePositionAndOrientation(
    17. self.ball7Mbt,
    18. p.getLinkState(self.LeapId, 9)[0],
    19. rightHand_rot,
    20. ) # 重置球体 ball7Mbt 的位置和方向
    21. p.resetBasePositionAndOrientation(
    22. self.ball8Mbt,
    23. p.getLinkState(self.LeapId, 14)[0],
    24. rightHand_rot,
    25. ) # 重置球体 ball8Mbt 的位置和方向
    26. p.resetBasePositionAndOrientation(
    27. self.ball10Mbt,
    28. p.getLinkState(self.LeapId, 19)[0],
    29. rightHand_rot,
    30. ) # 重置球体 ball10Mbt 的位置和方向
  3. 计算偏移量
  4. 更新拇指位置和方向
  5. 更新食指位置和方向
  6. 更新中指位置和方向
  7. 更新无名指位置和方向
  8. 返回更新后的关键位置

再之后,是update_target_vis_left,用于更新左手目标位置的可视化方法,该方法使用PyBullet来重置多个球体的位置和方向,以反应手部关节的位置和方向

以及rest_target_vis

2.2.3 compute_IK的实现:用于计算双手的逆运动学IK——使用PyBullet来模拟手部关节的位置和方向

最后则是比较重要的compute_IK的实现,其用于计算双手的逆运动学IK,该方法使用PyBullet来模拟手部关节的位置和方向,并生成用于控制真实机器人的关节位置

以下是详细解读

  1. 方法定义
    该方法接收4个参数:右手、左手的关节位置和手腕方向
    1. def compute_IK(self, right_hand_pos, right_hand_wrist_ori, left_hand_pos, left_hand_wrist_ori): # 定义计算逆运动学的方法
    2. p.stepSimulation() # 进行一步模拟
  2. 处理左手数据,包括计算关节位置、应用旋转矩阵、翻转x轴
    1. wxyz_input_quat = np.array([left_hand_wrist_ori[3], left_hand_wrist_ori[0], left_hand_wrist_ori[1], left_hand_wrist_ori[2]]) # 转换左手四元数
    2. wxyz_input_mat = quat2mat(wxyz_input_quat) # 转换为旋转矩阵
    3. leftHand_pos = left_hand_pos[0] # 获取左手位置
    4. leftHandThumb_pos = (left_hand_pos[4] - leftHand_pos) # 计算左手拇指位置
    5. leftHandIndex_pos = (left_hand_pos[8] - leftHand_pos) # 计算左手食指位置
    6. leftHandMiddle_pos = (left_hand_pos[12] - leftHand_pos) # 计算左手中指位置
    7. leftHandRing_pos = (left_hand_pos[16] - leftHand_pos) # 计算左手无名指位置
    8. leftHandThumb_pos = leftHandThumb_pos @ wxyz_input_mat # 应用旋转矩阵
    9. leftHandIndex_pos = leftHandIndex_pos @ wxyz_input_mat # 应用旋转矩阵
    10. leftHandMiddle_pos = leftHandMiddle_pos @ wxyz_input_mat # 应用旋转矩阵
    11. leftHandRing_pos = leftHandRing_pos @ wxyz_input_mat # 应用旋转矩阵
    12. leftHandThumb_pos[0] *= -1.0 # 翻转 x 轴
    13. leftHandIndex_pos[0] *= -1.0 # 翻转 x 轴
    14. leftHandMiddle_pos[0] *= -1.0 # 翻转 x 轴
    15. leftHandRing_pos[0] *= -1.0 # 翻转 x 轴
    16. leftHandThumb_pos = leftHandThumb_pos @ wxyz_input_mat.T # 应用逆旋转矩阵
    17. leftHandIndex_pos = leftHandIndex_pos @ wxyz_input_mat.T # 应用逆旋转矩阵
    18. leftHandMiddle_pos = leftHandMiddle_pos @ wxyz_input_mat.T # 应用逆旋转矩阵
    19. leftHandRing_pos = leftHandRing_pos @ wxyz_input_mat.T # 应用逆旋转矩阵
  3. 转换左手方向,包括后处理、转换为欧拉角和四元数、重新排列和旋转四元数
    1. leftHand_rot = left_hand_wrist_ori # 获取左手方向
    2. leftHand_rot = self.post_process_rokoko_ori(leftHand_rot) # 后处理方向
    3. euler_angles = quat2euler(np.array([leftHand_rot[3], leftHand_rot[0], leftHand_rot[1], leftHand_rot[2]])) # 转换为欧拉角
    4. quat_angles = euler2quat(-euler_angles[0], -euler_angles[1], euler_angles[2]).tolist() # 转换为四元数
    5. leftHand_rot = np.array(quat_angles[1:] + quat_angles[:1]) # 重新排列四元数
    6. leftHand_rot = rotate_quaternion_xyzw(leftHand_rot, np.array([1.0, 0.0, 0.0]), np.pi / 2.0) # 旋转四元数
  4. 处理右手数据,包括计算关节位置和更新左手目标可视化
    1. rightHand_pos = right_hand_pos[0] # 获取右手位置
    2. rightHandThumb_pos = (right_hand_pos[4] - rightHand_pos) # 计算右手拇指位置
    3. rightHandIndex_pos = (right_hand_pos[8] - rightHand_pos) # 计算右手食指位置
    4. rightHandMiddle_pos = (right_hand_pos[12] - rightHand_pos) # 计算右手中指位置
    5. rightHandRing_pos = (right_hand_pos[16] - rightHand_pos) # 计算右手无名指位置
    6. leftHandThumb_pos, leftHandIndex_pos, leftHandMiddle_pos, leftHandRing_pos = self.post_process_rokoko_pos(leftHandThumb_pos, leftHandIndex_pos, leftHandMiddle_pos, leftHandRing_pos) # 后处理左手位置
    7. leftHandThumb_pos, leftHandIndex_pos, leftHandMiddle_pos, leftHandRing_pos = self.update_target_vis_left(leftHand_rot, leftHandThumb_pos, leftHandIndex_pos, leftHandMiddle_pos, leftHandRing_pos) # 更新左手目标可视化
    8. leapEndEffectorPos_2 = [
    9. leftHandIndex_pos,
    10. leftHandMiddle_pos,
    11. leftHandRing_pos,
    12. leftHandThumb_pos
    13. ] # 定义左手末端执行器位置
  5. 转换右手方向
  6. 计算逆运动学,用于计算左手和右手的逆运动学,得到关节位置
    1. jointPoses_2 = []
    2. for i in range(4):
    3. jointPoses_2 = jointPoses_2 + list(
    4. p.calculateInverseKinematics(self.LeapId_2, self.leapEndEffectorIndex[i], leapEndEffectorPos_2[i],
    5. lowerLimits=self.hand_lower_limits, upperLimits=self.hand_upper_limits, jointRanges=self.hand_joint_ranges,
    6. restPoses=self.HAND_Q.tolist(), maxNumIterations=1000, residualThreshold=0.001))[4 * i:4 * (i + 1)]
    7. jointPoses_2 = tuple(jointPoses_2)
    8. jointPoses = []
    9. for i in range(4):
    10. jointPoses = jointPoses + list(
    11. p.calculateInverseKinematics(self.LeapId, self.leapEndEffectorIndex[i], leapEndEffectorPos[i],
    12. lowerLimits=self.hand_lower_limits, upperLimits=self.hand_upper_limits, jointRanges=self.hand_joint_ranges,
    13. restPoses=self.HAND_Q.tolist(), maxNumIterations=1000, residualThreshold=0.001))[4 * i:4 * (i + 1)]
    14. jointPoses = tuple(jointPoses)
  7. 合并关节位置
    1. combined_jointPoses_2 = (jointPoses_2[0:4] + (0.0,) + jointPoses_2[4:8] + (0.0,) + jointPoses_2[8:12] + (0.0,) + jointPoses_2[12:16] + (0.0,))
    2. combined_jointPoses_2 = list(combined_jointPoses_2)
    3. combined_jointPoses = (jointPoses[0:4] + (0.0,) + jointPoses[4:8] + (0.0,) + jointPoses[8:12] + (0.0,) + jointPoses[12:16] + (0.0,))
    4. combined_jointPoses = list(combined_jointPoses)
  8. 更新手部关节
    1. # 更新手部关节
    2. for i in range(20):
    3. p.setJointMotorControl2(
    4. bodyIndex=self.LeapId,
    5. jointIndex=i,
    6. controlMode=p.POSITION_CONTROL,
    7. targetPosition=combined_jointPoses[i],
    8. targetVelocity=0,
    9. force=500,
    10. positionGain=0.3,
    11. velocityGain=1,
    12. )
    13. p.setJointMotorControl2(
    14. bodyIndex=self.LeapId_2,
    15. jointIndex=i,
    16. controlMode=p.POSITION_CONTROL,
    17. targetPosition=combined_jointPoses_2[i],
    18. targetVelocity=0,
    19. force=500,
    20. positionGain=0.3,
    21. velocityGain=1,
    22. )
  9. 重置手部位置和方向
    1. p.resetBasePositionAndOrientation(
    2. self.LeapId,
    3. rotate_vector_by_quaternion_using_matrix(self.leap_center_offset, rightHand_rot),
    4. rightHand_rot,
    5. )
    6. after_left_offset_base = rotate_vector_by_quaternion_using_matrix(self.leap_center_offset, leftHand_rot)
    7. after_left_offset_base[1] += self.left_offset
    8. p.resetBasePositionAndOrientation(
    9. self.LeapId_2,
    10. after_left_offset_base,
    11. leftHand_rot,
    12. )
    13. self.rest_target_vis()
  10. 映射结果到真实机器人
    1. # 映射结果到真实机器人
    2. real_right_robot_hand_q = np.array([0.0 for _ in range(16)])
    3. real_left_robot_hand_q = np.array([0.0 for _ in range(16)])
    4. real_right_robot_hand_q[0:4] = jointPoses[0:4]
    5. real_right_robot_hand_q[4:8] = jointPoses[4:8]
    6. real_right_robot_hand_q[8:12] = jointPoses[8:12]
    7. real_right_robot_hand_q[12:16] = jointPoses[12:16]
    8. real_right_robot_hand_q[0:2] = real_right_robot_hand_q[0:2][::-1]
    9. real_right_robot_hand_q[4:6] = real_right_robot_hand_q[4:6][::-1]
    10. real_right_robot_hand_q[8:10] = real_right_robot_hand_q[8:10][::-1]
    11. real_left_robot_hand_q[0:4] = jointPoses_2[0:4]
    12. real_left_robot_hand_q[4:8] = jointPoses_2[4:8]
    13. real_left_robot_hand_q[8:12] = jointPoses_2[8:12]
    14. real_left_robot_hand_q[12:16] = jointPoses_2[12:16]
    15. real_left_robot_hand_q[0:2] = real_left_robot_hand_q[0:2][::-1]
    16. real_left_robot_hand_q[4:6] = real_left_robot_hand_q[4:6][::-1]
    17. real_left_robot_hand_q[8:10] = real_left_robot_hand_q[8:10][::-1]
  11. 生成点云
    1. # 生成左手和右手的点云
    2. right_hand_pointcloud, left_hand_pointcloud = self.get_mesh_pointcloud(real_right_robot_hand_q, real_left_robot_hand_q)
  12. 进一步映射关节到真实机器人
    1. # 进一步映射关节到真实机器人
    2. real_right_robot_hand_q += np.pi
    3. real_left_robot_hand_q += np.pi
    4. real_left_robot_hand_q[0] = np.pi * 2 - real_left_robot_hand_q[0]
    5. real_left_robot_hand_q[4] = np.pi * 2 - real_left_robot_hand_q[4]
    6. real_left_robot_hand_q[8] = np.pi * 2 - real_left_robot_hand_q[8]
    7. real_left_robot_hand_q[12] = np.pi * 2 - real_left_robot_hand_q[12]
    8. real_left_robot_hand_q[13] = np.pi * 2 - real_left_robot_hand_q[13]
  13. 返回结果,该方法返回右手和左手的关节位置,以及生成的点云
            return real_right_robot_hand_q, real_left_robot_hand_q, right_hand_pointcloud, left_hand_pointcloud  # 返回结果

此外,像STEP2_build_dataset/utils.py这个代码文件里还有对于transform_left_leap_pointcloud_to_camera_frame的实现,其将右手LEAP点云转换到相机坐标系

第三部分 STEP3_train_policy

3.1 robomimic/algo

3.1.1 algo.py

该代码文件里 定义了一系列函数,比如register_algo_factory_func、algo_name_to_factory_func、algo_factory

之后定义了几个大类,比如

  1. 策略算法基类PolicyAlgo:定义了所有可以用作策略的算法的基类
    1. class PolicyAlgo(Algo): # 定义一个名为 PolicyAlgo 的类,继承自 Algo
    2. """
    3. Base class for all algorithms that can be used as policies.
    4. """
    5. def get_action(self, obs_dict, goal_dict=None): # 定义获取动作的方法
    6. """
    7. Get policy action outputs.
    8. Args:
    9. obs_dict (dict): current observation # 当前观测
    10. goal_dict (dict): (optional) goal # (可选)目标
    11. Returns:
    12. action (torch.Tensor): action tensor # 动作张量
    13. """
    14. raise NotImplementedError # 子类需要实现该方法
  2. 值函数算法基类ValueAlgo:定义了所有可以学习值函数的算法的基类,比如get_state_value(状态值)、get_state_action_value(状态-动作值)
    1. class ValueAlgo(Algo): # 定义一个名为 ValueAlgo 的类,继承自 Algo
    2. """
    3. Base class for all algorithms that can learn a value function.
    4. """
    5. def get_state_value(self, obs_dict, goal_dict=None): # 定义获取状态值的方法
    6. """
    7. Get state value outputs.
    8. Args:
    9. obs_dict (dict): current observation # 当前观测
    10. goal_dict (dict): (optional) goal # (可选)目标
    11. Returns:
    12. value (torch.Tensor): value tensor # 值张量
    13. """
    14. raise NotImplementedError # 子类需要实现该方法
    15. def get_state_action_value(self, obs_dict, actions, goal_dict=None): # 定义获取状态-动作值的方法
    16. """
    17. Get state-action value outputs.
    18. Args:
    19. obs_dict (dict): current observation # 当前观测
    20. actions (torch.Tensor): action # 动作
    21. goal_dict (dict): (optional) goal # (可选)目标
    22. Returns:
    23. value (torch.Tensor): value tensor # 值张量
    24. """
    25. raise NotImplementedError # 子类需要实现该方法
  3. 规划算法基类PlannerAlgo:定义了所有可以用于规划子目标的算法的基类
    1. class PlannerAlgo(Algo): # 定义一个名为 PlannerAlgo 的类,继承自 Algo
    2. """
    3. Base class for all algorithms that can be used for planning subgoals
    4. conditioned on current observations and potential goal observations.
    5. """
    6. def get_subgoal_predictions(self, obs_dict, goal_dict=None): # 定义获取子目标预测的方法
    7. """
    8. Get predicted subgoal outputs.
    9. Args:
    10. obs_dict (dict): current observation # 当前观测
    11. goal_dict (dict): (optional) goal # (可选)目标
    12. Returns:
    13. subgoal prediction (dict): name -> Tensor [batch_size, ...] # 子目标预测
    14. """
    15. raise NotImplementedError # 子类需要实现该方法
    16. def sample_subgoals(self, obs_dict, goal_dict, num_samples=1): # 定义采样子目标的方法
    17. """
    18. For planners that rely on sampling subgoals.
    19. Args:
    20. obs_dict (dict): current observation # 当前观测
    21. goal_dict (dict): (optional) goal # (可选)目标
    22. Returns:
    23. subgoals (dict): name -> Tensor [batch_size, num_samples, ...] # 子目标
    24. """
    25. raise NotImplementedError # 子类需要实现该方法
  4. 层次算法基类HierarchicalAlgo:定义了所有层次算法的基类,这些算法包括:1) 子目标规划,和2)子目标条件策略学习
    1. class HierarchicalAlgo(Algo): # 定义一个名为 HierarchicalAlgo 的类,继承自 Algo
    2. """
    3. Base class for all hierarchical algorithms that consist of (1) subgoal planning
    4. and (2) subgoal-conditioned policy learning.
    5. """
    6. def get_action(self, obs_dict, goal_dict=None): # 定义获取动作的方法
    7. """
    8. Get policy action outputs.
    9. Args:
    10. obs_dict (dict): current observation # 当前观测
    11. goal_dict (dict): (optional) goal # (可选)目标
    12. Returns:
    13. action (torch.Tensor): action tensor # 动作张量
    14. """
    15. raise NotImplementedError # 子类需要实现该方法
    16. def get_subgoal_predictions(self, obs_dict, goal_dict=None): # 定义获取子目标预测的方法
    17. """
    18. Get subgoal predictions from high-level subgoal planner.
    19. Args:
    20. obs_dict (dict): current observation # 当前观测
    21. goal_dict (dict): (optional) goal # (可选)目标
    22. Returns:
    23. subgoal (dict): predicted subgoal # 预测的子目标
    24. """
    25. raise NotImplementedError # 子类需要实现该方法
    26. @property
    27. def current_subgoal(self): # 定义获取当前子目标的方法
    28. """
    29. Get the current subgoal for conditioning the low-level policy
    30. Returns:
    31. current subgoal (dict): predicted subgoal # 预测的子目标
    32. """
    33. raise NotImplementedError # 子类需要实现该方法
  5. 策略封装RolloutPolicy:封装算法对象以便在回合循环中运行策略
    1. class RolloutPolicy(object): # 定义一个名为 RolloutPolicy 的类
    2. """
    3. Wraps @Algo object to make it easy to run policies in a rollout loop.
    4. """
    5. def __init__(self, policy, obs_normalization_stats=None, action_normalization_stats=None): # 初始化方法
    6. """
    7. Args:
    8. policy (Algo instance): @Algo object to wrap to prepare for rollouts # 要封装的算法对象
    9. obs_normalization_stats (dict): optionally pass a dictionary for observation # 可选的观测归一化字典
    10. normalization. This should map observation keys to dicts
    11. with a "mean" and "std" of shape (1, ...) where ... is the default
    12. shape for the observation.
    13. """
    14. self.policy = policy # 保存算法对象
    15. self.obs_normalization_stats = obs_normalization_stats # 保存观测归一化字典
    16. self.action_normalization_stats = action_normalization_stats # 保存动作归一化字典
    17. def start_episode(self): # 开始新回合的方法
    18. """
    19. Prepare the policy to start a new rollout.
    20. """
    21. self.policy.set_eval() # 设置算法为评估模式
    22. self.policy.reset() # 重置算法
    23. def _prepare_observation(self, ob): # 准备观测数据的方法
    24. """
    25. Prepare raw observation dict from environment for policy.
    26. Args:
    27. ob (dict): single observation dictionary from environment (no batch dimension,
    28. and np.array values for each key) # 来自环境的单个观测字典(无批次维度,每个键的值为 np.array)
    29. """
    30. ob = TensorUtils.to_tensor(ob) # 将观测数据转换为张量
    31. ob = TensorUtils.to_batch(ob) # 将观测数据转换为批次
    32. ob = TensorUtils.to_device(ob, self.policy.device) # 将观测数据移动到算法所在的设备
    33. ob = TensorUtils.to_float(ob) # 将观测数据转换为浮点数
    34. if self.obs_normalization_stats is not None: # 如果提供了观测归一化字典
    35. # ensure obs_normalization_stats are torch Tensors on proper device
    36. obs_normalization_stats = TensorUtils.to_float(TensorUtils.to_device(TensorUtils.to_tensor(self.obs_normalization_stats), self.policy.device)) # 确保观测归一化字典是正确设备上的 torch 张量
    37. # limit normalization to obs keys being used, in case environment includes extra keys
    38. ob = { k : ob[k] for k in self.policy.global_config.all_obs_keys } # 限制归一化到正在使用的观测键,以防环境中包含额外的键
    39. ob = ObsUtils.normalize_dict(ob, normalization_stats=obs_normalization_stats) # 归一化观测字典
    40. return ob # 返回准备好的观测数据
    41. def __repr__(self): # 打印网络描述的方法
    42. """Pretty print network description"""
    43. return self.policy.__repr__() # 返回算法对象的字符串表示
    44. def __call__(self, ob, goal=None): # 调用方法,生成动作
    45. """
    46. Produce action from raw observation dict (and maybe goal dict) from environment.
    47. Args:
    48. ob (dict): single observation dictionary from environment (no batch dimension,
    49. and np.array values for each key) # 来自环境的单个观测字典(无批次维度,每个键的值为 np.array)
    50. goal (dict): goal observation # 目标观测
    51. """
    52. ob = self._prepare_observation(ob) # 准备观测数据
    53. if goal is not None: # 如果提供了目标观测
    54. goal = self._prepare_observation(goal) # 准备目标观测数据
    55. ac = self.policy.get_action(obs_dict=ob, goal_dict=goal) # 获取动作
    56. ac = TensorUtils.to_numpy(ac[0]) # 将动作转换为 numpy 数组
    57. if self.action_normalization_stats is not None: # 如果提供了动作归一化字典
    58. action_keys = self.policy.global_config.train.action_keys # 获取动作键
    59. action_shapes = {k: self.action_normalization_stats[k]["offset"].shape[1:] for k in self.action_normalization_stats} # 获取动作形状
    60. ac_dict = AcUtils.vector_to_action_dict(ac, action_shapes=action_shapes, action_keys=action_keys) # 将动作向量转换为动作字典
    61. ac_dict = ObsUtils.unnormalize_dict(ac_dict, normalization_stats=self.action_normalization_stats) # 反归一化动作字典
    62. action_config = self.policy.global_config.train.action_config # 获取动作配置
    63. for key, value in ac_dict.items(): # 遍历动作字典
    64. this_format = action_config[key].get('format', None) # 获取动作格式
    65. if this_format == 'rot_6d': # 如果动作格式为 'rot_6d'
    66. rot_6d = torch.from_numpy(value).unsqueeze(0) # 将值转换为 torch 张量并添加一个维度
    67. rot = TorchUtils.rot_6d_to_axis_angle(rot_6d=rot_6d).squeeze().numpy() # 将 'rot_6d' 转换为轴角表示并转换为 numpy 数组
    68. ac_dict[key] = rot # 更新动作字典中的值
    69. ac = AcUtils.action_dict_to_vector(ac_dict, action_keys=action_keys) # 将动作字典转换为动作向量
    70. return ac # 返回动作

3.1.2 bc.py:实现行为克隆算法及其多种变体

3.1.3 bcq.py:实现批约束Q学习(Batch-Constrained Q-Learning)算法

3.1.4 cql.py:实现conservative Q-Leaning算法

3.1.5 diffusion_policy.py

关于diffusion_policy.py的实现,详见此文《Diffusion Policy——斯坦福机器人UMI所用的扩散策略:从原理到其编码实现(含Diff-Control、ControlNet详解)》的第二部分 Diffusion Policy的编码实现与源码解读

3.1.6 gl.py:实现子目标预测模型,用于HBC和IRIS算法

3.1.7 hbc.py:实现分层行为克隆(Hierarchical Behavioral Cloning, HBC)

这段代码实现了分层行为克隆(HBC),其中规划器模型输出子目标(未来观祭),而 actor模型则以子目标为条件尝试达到这些目标

具体功能包括:

  1. 初始化方法:接受多个参数,用于配置 HBC 的结构和参数
  2. 处理训练批次:处理人数据加载器中采样的输入批次,过滤出相关信息并准备批次进行训练
  3. 在单个批次上进行训练:在单个数据批次上进行训练,预测子目标并计算损失
  4. 记录信息:处理从 etrain_on_batch 返回的信息

3.1.8 iql.py:实现隐式Q学习(implicit Q-Learning,IQL)算法

  1. 导入必要的库和模块
  2. 注册所谓的算法工厂函数
  3. 定义IQL类
    class IQL(PolicyAlgo, ValueAlgo):  # 定义 IQL 类,继承自 PolicyAlgo 和 ValueAlgo
  4. 创建网络并将其放入self.nets中
    1. def _create_networks(self):
    2. """
    3. Creates networks and places them into @self.nets.
    4. Networks for this algo: critic (potentially ensemble), actor, value function
    5. """
    6. # Create nets
    7. self.nets = nn.ModuleDict() # 创建一个模块字典来存储网络
    8. # Assemble args to pass to actor
    9. actor_args = dict(self.algo_config.actor.net.common) # 获取 actor 网络的公共参数
    10. # Add network-specific args and define network class
    11. if self.algo_config.actor.net.type == "gaussian": # 如果 actor 网络类型是高斯网络
    12. actor_cls = PolicyNets.GaussianActorNetwork # 使用高斯 actor 网络类
    13. actor_args.update(dict(self.algo_config.actor.net.gaussian)) # 更新 actor 参数
    14. elif self.algo_config.actor.net.type == "gmm": # 如果 actor 网络类型是 GMM 网络
    15. actor_cls = PolicyNets.GMMActorNetwork # 使用 GMM actor 网络类
    16. actor_args.update(dict(self.algo_config.actor.net.gmm)) # 更新 actor 参数
    17. else:
    18. # Unsupported actor type!
    19. raise ValueError(f"Unsupported actor requested. " # 抛出不支持的 actor 类型错误
    20. f"Requested: {self.algo_config.actor.net.type}, "
    21. f"valid options are: {['gaussian', 'gmm']}")
    包括actor网络
    1. # Actor
    2. self.nets["actor"] = actor_cls( # 创建 actor 网络
    3. obs_shapes=self.obs_shapes, # 观察形状
    4. goal_shapes=self.goal_shapes, # 目标形状
    5. ac_dim=self.ac_dim, # 动作维度
    6. mlp_layer_dims=self.algo_config.actor.layer_dims, # MLP 层维度
    7. encoder_kwargs=ObsUtils.obs_encoder_kwargs_from_config(self.obs_config.encoder), # 编码器参数
    8. **actor_args, # 其他 actor 参数
    9. )
    critic网络
    1. # Critics
    2. self.nets["critic"] = nn.ModuleList() # 创建一个模块列表来存储 critic 网络
    3. self.nets["critic_target"] = nn.ModuleList() # 创建一个模块列表来存储目标 critic 网络
    4. for _ in range(self.algo_config.critic.ensemble.n): # 遍历 critic 集成的数量
    5. for net_list in (self.nets["critic"], self.nets["critic_target"]): # 遍历 critic 和目标 critic 列表
    6. critic = ValueNets.ActionValueNetwork( # 创建 critic 网络
    7. obs_shapes=self.obs_shapes, # 观察形状
    8. ac_dim=self.ac_dim, # 动作维度
    9. mlp_layer_dims=self.algo_config.critic.layer_dims, # MLP 层维度
    10. goal_shapes=self.goal_shapes, # 目标形状
    11. encoder_kwargs=ObsUtils.obs_encoder_kwargs_from_config(self.obs_config.encoder), # 编码器参数
    12. )
    13. net_list.append(critic) # 将 critic 网络添加到列表中
    价值函数网络
    1. # Value function network
    2. self.nets["vf"] = ValueNets.ValueNetwork( # 创建价值函数网络
    3. obs_shapes=self.obs_shapes, # 观察形状
    4. mlp_layer_dims=self.algo_config.critic.layer_dims, # MLP 层维度
    5. goal_shapes=self.goal_shapes, # 目标形状
    6. encoder_kwargs=ObsUtils.obs_encoder_kwargs_from_config(self.obs_config.encoder), # 编码器参数
    7. )
    8. # Send networks to appropriate device
    9. self.nets = self.nets.float().to(self.device) # 将网络转换为浮点数并移动到设备上
    10. # sync target networks at beginning of training
    11. with torch.no_grad(): # 在不计算梯度的上下文中
    12. for critic, critic_target in zip(self.nets["critic"], self.nets["critic_target"]): # 遍历 critic 和目标 critic
    13. TorchUtils.hard_update( # 硬更新目标网络
    14. source=critic, # 源网络
    15. target=critic_target, # 目标网络
  5. 处理训练批次
    比如处理从数据加载器中采样的输入批次,过滤出相关信息并准备批次进行训练
    1. def process_batch_for_training(self, batch):
    2. """
    3. Processes input batch from a data loader to filter out relevant info and prepare the batch for training.
    4. Args:
    5. batch (dict): dictionary with torch.Tensors sampled
    6. from a data loader
    7. Returns:
    8. input_batch (dict): processed and filtered batch that
    9. will be used for training
    10. """
    11. input_batch = dict() # 创建一个字典来存储处理后的批次
    12. # remove temporal batches for all
    13. input_batch["obs"] = {k: batch["obs"][k][:, 0, :] for k in batch["obs"]} # 获取第一个时间步的观察
    14. input_batch["next_obs"] = {k: batch["next_obs"][k][:, 0, :] for k in batch["next_obs"]} # 获取第一个时间步的下一个观察
    15. input_batch["goal_obs"] = batch.get("goal_obs", None) # 获取目标观察,可能不存在
    16. input_batch["actions"] = batch["actions"][:, 0, :] # 获取第一个时间步的动作
    17. input_batch["dones"] = batch["dones"][:, 0] # 获取第一个时间步的完成标志
    18. input_batch["rewards"] = batch["rewards"][:, 0] # 获取第一个时间步的奖励
    19. return TensorUtils.to_device(TensorUtils.to_float(input_batch), self.device) # 将批次转换为浮点数并移动到设备上
  6. 在单个批次上进行训练,分别计算critic和actor损失,并更新网络
    1. def train_on_batch(self, batch, epoch, validate=False):
    2. """
    3. Training on a single batch of data.
    4. Args:
    5. batch (dict): dictionary with torch.Tensors sampled
    6. from a data loader and filtered by @process_batch_for_training
    7. epoch (int): epoch number - required by some Algos that need
    8. to perform staged training and early stopping
    9. validate (bool): if True, don't perform any learning updates.
    10. Returns:
    11. info (dict): dictionary of relevant inputs, outputs, and losses
    12. that might be relevant for logging
    13. """
    14. info = OrderedDict() # 创建一个有序字典来存储信息
    15. # Set the correct context for this training step
    16. with TorchUtils.maybe_no_grad(no_grad=validate): # 在不计算梯度的上下文中
    17. # Always run super call first
    18. info = super().train_on_batch(batch, epoch, validate=validate) # 调用父类的 train_on_batch 方法
    19. # Compute loss for critic(s)
    20. critic_losses, vf_loss, critic_info = self._compute_critic_loss(batch) # 计算 critic 的损失
    21. # Compute loss for actor
    22. actor_loss, actor_info = self._compute_actor_loss(batch, critic_info) # 计算 actor 的损失
    23. if not validate: # 如果不是验证模式
    24. # Critic update
    25. self._update_critic(critic_losses, vf_loss) # 更新 critic
    26. # Actor update
    27. self._update_actor(actor_loss) # 更新 actor
    28. # Update info
    29. info.update(actor_info) # 更新信息字典
    30. info.update(critic_info) # 更新信息字典
    31. # Return stats
    32. return info # 返回信息字典
  7. 计算critic损失,计算Q和V的损失,并返回损失和相关信息
    1. def _compute_critic_loss(self, batch):
    2. """
    3. Helper function for computing Q and V losses. Called by @train_on_batch
    4. Args:
    5. batch (dict): dictionary with torch.Tensors sampled
    6. from a data loader and filtered by @process_batch_for_training
    7. Returns:
    8. critic_losses (list): list of critic (Q function) losses
    9. vf_loss (torch.Tensor): value function loss
    10. info (dict): dictionary of Q / V predictions and losses
    11. """
    12. info = OrderedDict() # 创建一个有序字典用于存储信息
    13. # get batch values
    14. obs = batch["obs"] # 获取观察
    15. actions = batch["actions"] # 获取动作
    16. next_obs = batch["next_obs"] # 获取下一个观察
    17. goal_obs = batch["goal_obs"] # 获取目标观察
    18. rewards = torch.unsqueeze(batch["rewards"], 1) # 获取奖励并增加一个维度
    19. dones = torch.unsqueeze(batch["dones"], 1) # 获取完成标志并增加一个维度
    为方便大家理解,我们逐步分析下
    首先,对于Q 函数损失
    Q 函数损失是通过计算 Q 预测值与目标 Q 值之间的误差来实现的

    其中Q 预测值为
    1. # Q predictions
    2. pred_qs = [critic(obs_dict=obs, acts=actions, goal_dict=goal_obs)
    3. for critic in self.nets["critic"]] # 预测 Q 值
    4. info["critic/critic1_pred"] = pred_qs[0].mean() # 记录第一个 critic 的平均预测值
    而目标 Q 值是通过贝尔曼方程计算得到的,其计算公式为
    Q_{\text{target}} = r + \gamma (1 - d) V_{\text{target}}
    1. # Q target values
    2. target_vf_pred = self.nets["vf"](obs_dict=next_obs, goal_dict=goal_obs).detach() # 预测目标价值函数
    3. q_target = rewards + (1. - dones) * self.algo_config.discount * target_vf_pred # 计算 Q 目标值
    4. q_target = q_target.detach() # 分离计算图
    其中:
    - r是奖励值
    - \gamma是折扣因子
    - d是完成信号(done signal)
    - V_{\text{target}}是目标价值函数的预测值

    Q 函数损失的计算公式如下:
    \text{TD\_loss} = \text{loss\_function}(Q_{\text{pred}}, Q_{\text{target}})
    1. # Q losses
    2. critic_losses = [] # 创建一个列表用于存储 critic 的损失
    3. td_loss_fcn = nn.SmoothL1Loss() if self.algo_config.critic.use_huber else nn.MSELoss() # 选择损失函数
    4. for (i, q_pred) in enumerate(pred_qs): # 遍历预测的 Q 值
    5. # Calculate td error loss
    6. td_loss = td_loss_fcn(q_pred, q_target) # 计算 TD 误差损失
    7. info[f"critic/critic{i+1}_loss"] = td_loss # 记录 critic 的损失
    8. critic_losses.append(td_loss) # 将损失添加到列表中
    其中:
    - Q_{\text{pred}}是 Q 网络的预测值
    - Q_{\text{target}}是目标 Q 值
    - \text{loss\_function}是损失函数,可以是均方误差(MSE)或 Huber 损失

    其次,价值函数损失是通过期望回归(expectile regression)来实现的「参考的论文为:Offline Reinforcement Learning with Implicit Q-Learning」,其计算公式如下:
    \text{VF\_loss} = \mathbb{E} \left[ \omega \cdot (\text{VF}_{\text{pred}} - Q_{\text{pred}})^2 \right]
    1. # V losses: expectile regression. see section 4.1 in https://arxiv.org/pdf/2110.06169.pdf
    2. vf_err = vf_pred - q_pred # 计算价值函数误差
    3. vf_sign = (vf_err > 0).float() # 计算误差符号
    4. vf_weight = (1 - vf_sign) * self.algo_config.vf_quantile + vf_sign * (1 - self.algo_config.vf_quantile) # 计算价值函数权重
    5. vf_loss = (vf_weight * (vf_err ** 2)).mean() # 计算价值函数损失
    其中:
    - \text{VF}_{\text{pred}}是价值函数的预测值
    1. # V predictions
    2. pred_qs = [critic(obs_dict=obs, acts=actions, goal_dict=goal_obs)
    3. for critic in self.nets["critic_target"]] # 预测 V 值
    4. q_pred, _ = torch.cat(pred_qs, dim=1).min(dim=1, keepdim=True) # 计算最小 Q 值
    5. q_pred = q_pred.detach() # 分离计算图
    6. vf_pred = self.nets["vf"](obs) # 预测价值函数
    - Q_{\text{pred}}是上面所说的 Q 网络的预测值
    - \omega是权重,定义如下:
    \omega=\left\{\begin{array}{ll} \tau & \text { if } \mathrm{VF}_{\text {pred }}-Q_{\text {pred }}>0 \\ 1-\tau & \text { if } \mathrm{VF}_{\text {pred }}-Q_{\text {pred }} \leq 0 \end{array}\right.
    其中\tau是期望分位数(expectile quantile)

    最后
    1. # update logs for V loss
    2. info["vf/q_pred"] = q_pred # 记录 Q 预测值
    3. info["vf/v_pred"] = vf_pred # 记录 V 预测值
    4. info["vf/v_loss"] = vf_loss # 记录 V 损失
    5. # Return stats
    6. return critic_losses, vf_loss, info # 返回统计信息
  8. 更新critic和价值函数网络
    1. def _update_critic(self, critic_losses, vf_loss):
    2. """
    3. Helper function for updating critic and vf networks. Called by @train_on_batch
    4. Args:
    5. critic_losses (list): list of critic (Q function) losses
    6. vf_loss (torch.Tensor): value function loss
    7. """
    8. # update ensemble of critics
    9. for (critic_loss, critic, critic_target, optimizer) in zip(
    10. critic_losses, self.nets["critic"], self.nets["critic_target"], self.optimizers["critic"]
    11. ): # 遍历 critic 损失、critic 网络、目标 critic 网络和优化器
    12. TorchUtils.backprop_for_loss(
    13. net=critic, # critic 网络
    14. optim=optimizer, # 优化器
    15. loss=critic_loss, # critic 损失
    16. max_grad_norm=self.algo_config.critic.max_gradient_norm, # 最大梯度范数
    17. retain_graph=False, # 不保留计算图
    18. )
    19. # update target network
    20. with torch.no_grad(): # 在不计算梯度的上下文中
    21. TorchUtils.soft_update(source=critic, target=critic_target, tau=self.algo_config.target_tau) # 软更新目标网络
    22. # update V function network
    23. TorchUtils.backprop_for_loss(
    24. net=self.nets["vf"], # 价值函数网络
    25. optim=self.optimizers["vf"], # 优化器
    26. loss=vf_loss, # 价值函数损失
    27. max_grad_norm=self.algo_config.critic.max_gradient_norm, # 最大梯度范数
    28. retain_graph=False, # 不保留计算图
    29. )
  9. 计算actor损失
    先
    1. def _compute_actor_loss(self, batch, critic_info):
    2. """
    3. Helper function for computing actor loss. Called by @train_on_batch
    4. Args:
    5. batch (dict): dictionary with torch.Tensors sampled
    6. from a data loader and filtered by @process_batch_for_training
    7. critic_info (dict): dictionary containing Q and V function predictions,
    8. to be used for computing advantage estimates
    9. Returns:
    10. actor_loss (torch.Tensor): actor loss
    11. info (dict): dictionary of actor losses, log_probs, advantages, and weights
    12. """
    13. info = OrderedDict() # 创建一个有序字典来存储信息
    然后,计算动作的对数概率

    1. # compute log probability of batch actions
    2. dist = self.nets["actor"].forward_train(obs_dict=batch["obs"], goal_dict=batch["goal_obs"]) # 计算动作的对数概率
    3. log_prob = dist.log_prob(batch["actions"]) # 计算动作的对数概率
    4. info["actor/log_prob"] = log_prob.mean() # 记录动作的平均对数概率
    接着,计算优势估计
    \mathrm{adv}=\mathrm{q} \_ \text {pred }-\mathrm{v} \_ \text {pred }
    Q_{\text{pred}}是上面所说的 Q 网络的预测值
    \text{VF}_{\text{pred}}是价值函数的预测值
    1. # compute advantage estimate
    2. q_pred = critic_info["vf/q_pred"] # 获取 Q 预测值
    3. v_pred = critic_info["vf/v_pred"] # 获取价值函数预测值
    4. adv = q_pred - v_pred # 计算优势估计
    再计算优势权重,这一步在 _get_adv_weights 函数中完成,具体计算方式如下:
    \mathrm{weights}=\exp \left(\frac{\mathrm{adv}}{\beta}\right)
    1. # compute weights
    2. weights = self._get_adv_weights(adv) # 计算优势权重
    最后,计算加权的actor损失

    再通过actor_loss=actor_loss.mean(),得出这个损失的平均值
    1. # compute advantage weighted actor loss. disable gradients through weights
    2. actor_loss = (-log_prob * weights.detach()).mean() # 计算加权的 actor 损失
    再返回
    1. info["actor/loss"] = actor_loss # 记录 actor 损失
    2. # log adv-related values
    3. info["adv/adv"] = adv # 记录优势估计
    4. info["adv/adv_weight"] = weights # 记录优势权重
    5. # Return stats
    6. return actor_loss, info # 返回 actor 损失和信息字
  10. 更新Actor网络
    1. def _update_actor(self, actor_loss):
    2. """
    3. 更新 Actor 网络的辅助函数。由 @train_on_batch 调用。
    4. Args:
    5. actor_loss (torch.Tensor): Actor 损失
    6. """
    7. TorchUtils.backprop_for_loss( # 使用 TorchUtils 进行反向传播
    8. net=self.nets["actor"], # Actor 网络
    9. optim=self.optimizers["actor"], # Actor 优化器
    10. loss=actor_loss, # Actor 损失
    11. max_grad_norm=self.algo_config.actor.max_gradient_norm, # 最大梯度范数
    12. )
  11. 计算优势权重
    1. def _get_adv_weights(self, adv):
    2. """
    3. 计算优势权重的辅助函数。由 @_compute_actor_loss 调用。
    4. Args:
    5. adv (torch.Tensor): 原始优势估计
    6. Returns:
    7. weights (torch.Tensor): 基于优势估计计算的权重,形状为 (B,),其中 B 是批次大小
    8. """
    9. # 剪辑原始优势值
    10. if self.algo_config.adv.clip_adv_value is not None:
    11. adv = adv.clamp(max=self.algo_config.adv.clip_adv_value) # 剪辑优势值
    12. # 基于优势值计算权重
    13. beta = self.algo_config.adv.beta # 温度因子
    14. weights = torch.exp(adv / beta) # 计算权重
    15. # 剪辑最终权重
    16. if self.algo_config.adv.use_final_clip is True:
    17. weights = weights.clamp(-100.0, 100.0) # 剪辑权重
    18. # 从 (B, 1) 重新调整为 (B,)
    19. return weights[:, 0] # 返回权重
  12. 记录信息
    1. def log_info(self, info):
    2. """
    3. 处理从 @train_on_batch 返回的信息字典,以汇总信息传递给 tensorboard 进行日志记录。
    4. Args:
    5. info (dict): 信息字典
    6. Returns:
    7. loss_log (dict): 名称 -> 汇总统计
    8. """
    9. log = OrderedDict() # 创建有序字典
    10. log["actor/log_prob"] = info["actor/log_prob"].item() # 记录 Actor 的 log_prob
    11. log["actor/loss"] = info["actor/loss"].item() # 记录 Actor 的损失
    12. log["critic/critic1_pred"] = info["critic/critic1_pred"].item() # 记录 Critic1 的预测值
    13. log["critic/critic1_loss"] = info["critic/critic1_loss"].item() # 记录 Critic1 的损失
    14. log["vf/v_loss"] = info["vf/v_loss"].item() # 记录价值函数的损失
    15. self._log_data_attributes(log, info, "vf/q_pred") # 记录 Q 预测值的统计信息
    16. self._log_data_attributes(log, info, "vf/v_pred") # 记录 V 预测值的统计信息
    17. self._log_data_attributes(log, info, "adv/adv") # 记录优势的统计信息
    18. self._log_data_attributes(log, info, "adv/adv_weight") # 记录优势权重的统计信息
    19. return log # 返回日志
  13. 记录数据属性
    1. def _log_data_attributes(self, log, info, key):
    2. """
    3. 记录统计信息的辅助函数。修改 log 就地。
    4. Args:
    5. log (dict): 现有的日志字典
    6. info (dict): 包含原始统计信息的现有字典
    7. key (str): 要记录的键
    8. """
    9. log[key + "/max"] = info[key].max().item() # 记录最大值
    10. log[key + "/min"] = info[key].min().item() # 记录最小值
    11. log[key + "/mean"] = info[key].mean().item() # 记录均值
    12. log[key + "/std"] = info[key].std().item() # 记录标准差
  14. 每个epoch结束时调用
    1. def on_epoch_end(self, epoch):
    2. """
    3. 在每个 epoch 结束时调用。
    4. """
    5. # 学习率调度更新
    6. for lr_sc in self.lr_schedulers["critic"]:
    7. if lr_sc is not None:
    8. lr_sc.step() # 更新 Critic 的学习率调度
    9. if self.lr_schedulers["vf"] is not None:
    10. self.lr_schedulers["vf"].step() # 更新价值函数的学习率调度
    11. if self.lr_schedulers["actor"] is not None:
    12. self.lr_schedulers["actor"].step() # 更新 Actor 的学习率调度
  15. 获取策略动作输出
    1. def get_action(self, obs_dict, goal_dict=None):
    2. """
    3. 获取策略动作输出。
    4. Args:
    5. obs_dict (dict): 当前观察
    6. goal_dict (dict): (可选)目标
    7. Returns:
    8. action (torch.Tensor): 动作张量
    9. """
    10. assert not self.nets.training # 确保网络不在训练模式
    11. return self.nets["actor"](obs_dict=obs_dict, goal_dict=goal_dict) # 返回 Actor 的动作输出

3.1.9 iris.py:实现李飞飞团队的 IRIS 算法

IRIS 算法对应的论文为:IRIS:基于离线机器人操作数据的大规模无交互隐式强化学习控制

代码实现了 IRIS 算法,包括初始化方法、处理训练批次、获取状态值和获取状态-动作值的功能。通过这些功能,IRIS 算法能够在训练过程中更新网络参数,并根据观察和目标获取相应的值

// 待更

3.1.10 td3_bc.py:实现TD3-BC 算法

Based on https://github.com/sfujim/TD3_BC,其对应的paper为:Off-Policy Deep Reinforcement Learning without Exploration

// 待更

注:本文转载自blog.csdn.net的v_JULY_v的文章"https://blog.csdn.net/v_JULY_v/article/details/143653468"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2024 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top