前言
本文最早是属于《斯坦福Mobile ALOHA背后的关键技术:动作分块ACT算法的原理解析》的第二、第三部分,涉及到动作分块ACT的代码剖析与部署训练
但因为想把ACT的代码逐行剖析的更细致些,加之为避免上一篇文章太过于长,故把动作分块ACT的代码剖析与部署实践这块独立出来成本文
第一部分 动作分块算法ACT的代码剖析
关于ACT的代码,我们可以重点研究下这个仓库:GitHub - tonyzhaozh/act,我司同事杜老师也于24年1.10日跑通了这份代码(如何跑通的教程见下文第二部分)
- imitate_episodes.py,训练和评估 ACT
- policy.py,An adaptor for ACT policy
- detr,ACT 的模型定义 修改自 DETR
- sim_env.py,具有 joint space control的 Mujoco + DM_Control 环境
- ee_sim_env.py,具有EE space control的 Mujoco + DM_Control 环境
- scripted_policy.py,模拟环境的脚本化策略
- constants.py,跨文件共享的常量
- utils.py,数据加载和辅助函数等实用程序
- visualize_episodes.py,保存 .hdf5 数据集中的视频
1.1 ACT的训练与评估imitate_episodes.py
1.1.1 主程序
- 从命令行参数中获取模型训练和评估的相关配置
- def main(args):
- set_seed(1) # 设置随机种子以保证结果可重现
- # 解析命令行参数
- is_eval = args["eval"] # 是否为评估模式的布尔标志
- ckpt_dir = args["ckpt_dir"] # 保存/加载检查点的目录
- policy_class = args["policy_class"] # 使用的策略类
- onscreen_render = args["onscreen_render"] # 是否进行屏幕渲染的标志
- task_name = args["task_name"] # 任务名称
- batch_size_train = args["batch_size"] # 训练批大小
- batch_size_val = args["batch_size"] # 验证批大小
- num_epochs = args["num_epochs"] # 训练的总周期数
- use_waypoint = args["use_waypoint"] # 是否使用航点
- constant_waypoint = args["constant_waypoint"] # 持续航点的设置
- # 根据是否使用航点打印相应信息
- if use_waypoint:
- print("Using waypoint") # 使用航点
- if constant_waypoint is not None:
- print(f"Constant waypoint: {constant_waypoint}") # 持续航点
- 根据任务名称和配置获取任务参数,例如数据集目录、任务类型等
- # 获取任务参数
- is_sim = True # 硬编码为True以避免从aloha中查找常量
- # 如果是模拟任务,从constants导入SIM_TASK_CONFIGS
- if is_sim:
- from constants import SIM_TASK_CONFIGS
- task_config = SIM_TASK_CONFIGS[task_name]
- else:
- from aloha_scripts.constants import TASK_CONFIGS
- task_config = TASK_CONFIGS[task_name]
- # 从任务配置中获取相关参数
- dataset_dir = task_config["dataset_dir"]
- num_episodes = task_config["num_episodes"]
- episode_len = task_config["episode_len"]
- camera_names = task_config["camera_names"]
- 定义模型的架构和超参数,包括学习率、网络结构、层数等
- # 固定参数
- state_dim = 14 # 状态维度
- lr_backbone = 1e-5 # 主干网络的学习率
- backbone = "resnet18" # 使用的主干网络类型
- 根据策略类别设置策略配置
- # 根据策略类别设置策略配置
- if policy_class == "ACT":
- # ACT策略的特定参数
- enc_layers = 4
- dec_layers = 7
- nheads = 8
- policy_config = {
- "lr": args["lr"],
- "num_queries": args["chunk_size"],
- "kl_weight": args["kl_weight"],
- "hidden_dim": args["hidden_dim"],
- "dim_feedforward": args["dim_feedforward"],
- "lr_backbone": lr_backbone,
- "backbone": backbone,
- "enc_layers": enc_layers,
- "dec_layers": dec_layers,
- "nheads": nheads,
- "camera_names": camera_names,
- }
- elif policy_class == "CNNMLP":
- # CNNMLP策略的特定参数
- policy_config = {
- "lr": args["lr"],
- "lr_backbone": lr_backbone,
- "backbone": backbone,
- "num_queries": 1,
- "camera_names": camera_names,
- }
- else:
- raise NotImplementedError
- 配置训练参数
- # 配置训练参数
- config = {
- "num_epochs": num_epochs,
- "ckpt_dir": ckpt_dir,
- "episode_len": episode_len,
- "state_dim": state_dim,
- "lr": args["lr"],
- "policy_class": policy_class,
- "onscreen_render": onscreen_render,
- "policy_config": policy_config,
- "task_name": task_name,
- "seed": args["seed"],
- "temporal_agg": args["temporal_agg"],
- "camera_names": camera_names,
- "real_robot": not is_sim,
- }
- 如果设置为评估模式,加载保存的模型权重并在验证集上评估模型性能,计算成功率和平均回报
- # 如果为评估模式,执行评估流程
- if is_eval:
- ckpt_names = [f"policy_best.ckpt"]
- results = []
- for ckpt_name in ckpt_names:
- success_rate, avg_return = eval_bc(config, ckpt_name, save_episode=True)
- results.append([ckpt_name, success_rate, avg_return])
- for ckpt_name, success_rate, avg_return in results:
- print(f"{ckpt_name}: {success_rate=} {avg_return=}")
- print()
- exit()
- # 加载数据
- train_dataloader, val_dataloader, stats, _ = load_data(
- dataset_dir,
- num_episodes,
- camera_names,
- batch_size_train,
- batch_size_val,
- use_waypoint,
- constant_waypoint,
- )
- # 保存数据集统计信息
- if not os.path.isdir(ckpt_dir):
- os.makedirs(ckpt_dir)
- stats_path = os.path.join(ckpt_dir, f"dataset_stats.pkl")
- with open(stats_path, "wb") as f:
- pickle.dump(stats, f)
- 最后,将结果打印出来
- # 训练并获取最佳检查点信息
- best_ckpt_info = train_bc(train_dataloader, val_dataloader, config)
- best_epoch, min_val_loss, best_state_dict = best_ckpt_info
- # 保存最佳检查点
- ckpt_path = os.path.join(ckpt_dir, f"policy_best.ckpt")
- torch.save(best_state_dict, ckpt_path)
- print(f"Best ckpt, val loss {min_val_loss:.6f} @ epoch{best_epoch}")
1.1.2 make_policy、make_optimizer、get_image
根据指定的policy_class(策略类别,目前支持两种类型:"ACT"和"CNNMLP"),和policy_config(策略配置)创建一个策略模型对象
- def make_policy(policy_class, policy_config):
- if policy_class == 'ACT':
- policy = ACTPolicy(policy_config) # 如果策略类是 ACT,创建 ACTPolicy
- elif policy_class == 'CNNMLP':
- policy = CNNMLPPolicy(policy_config) # 如果策略类是 CNNMLP,创建 CNNMLPPolicy
- else:
- raise NotImplementedError # 如果不是以上两种类型,则抛出未实现错误
- return policy # 返回创建的策略对象
make_optimizer用于创建策略模型的优化器(optimizer),并返回创建的优化器对象。优化器的作用是根据策略模型的损失函数来更新模型的参数,以使损失函数尽量减小
- def make_optimizer(policy_class, policy):
- if policy_class == 'ACT':
- optimizer = policy.configure_optimizers() # 如果策略类是 ACT,配置优化器
- elif policy_class == 'CNNMLP':
- optimizer = policy.configure_optimizers() # 如果策略类是 CNNMLP,配置优化器
- else:
- raise NotImplementedError # 如果不是以上两种类型,则抛出未实现错误
- return optimizer # 返回配置的优化器
get_image的作用是获取一个时间步(ts)的图像数据。函数接受两个参数:ts
和camera_names
- def get_image(ts, camera_names):
- curr_images = []
- for cam_name in camera_names:
- curr_image = rearrange(ts.observation['images'][cam_name], 'h w c -> c h w') # 重排图像数组
- curr_images.append(curr_image) # 将处理后的图像添加到列表中
- curr_image = np.stack(curr_images, axis=0) # 将图像列表堆叠成数组
- curr_image = torch.from_numpy(curr_image / 255.0).float().cuda().unsqueeze(0) # 将数组转换为 PyTorch 张量
- return curr_image # 返回处理后的图像张量
ts是一个时间步的数据,包含了多个相机(摄像头)拍摄的图像
ts.observation["images"]包含了各个相机拍摄的图像数据,而camera_names是一个列表,包含了要获取的相机的名称函数通过循环遍历camera_names中的相机名称,从ts.observation["images"]中获取对应相机的图像数据
这些图像数据首先通过rearrange函数重新排列维度,将"height-width-channels"的顺序变为"channels-height-width",以适应PyTorch的数据格式获取的图像数据被放入curr_images列表中
接下来,函数将curr_images列表中的所有图像数据堆叠成一个张量(tensor),np.stack(curr_images, axis=0)这一行代码实现了这个操作
接着,图像数据被归一化到[0, 1]的范围,然后转换为PyTorch的float类型,并移到GPU上(如果可用)。最后,图像数据被增加了一个额外的维度(unsqueeze(0)),以适应模型的输入要求
最终,函数返回包含时间步图像数据的PyTorch张量。这个图像数据可以被用于输入到神经网络模型中进行处理
1.1.3 eval_bc:评估一个行为克隆(behavior cloning)模型
- 的
- def eval_bc(config, ckpt_name, save_episode=True):
- set_seed(1000) # 设置随机种子为 1000
- # 从配置中获取参数
- ckpt_dir = config['ckpt_dir']
- state_dim = config['state_dim']
- real_robot = config['real_robot']
- policy_class = config['policy_class']
- onscreen_render = config['onscreen_render']
- policy_config = config['policy_config']
- camera_names = config['camera_names']
- max_timesteps = config['episode_len']
- task_name = config['task_name']
- temporal_agg = config['temporal_agg']
- onscreen_cam = 'angle'
- # 加载策略和统计信息
- ckpt_path = os.path.join(ckpt_dir, ckpt_name)
- policy = make_policy(policy_class, policy_config)
- loading_status = policy.load_state_dict(torch.load(ckpt_path))
- print(loading_status)
- policy.cuda()
- policy.eval()
- print(f'Loaded: {ckpt_path}')
- stats_path = os.path.join(ckpt_dir, f'dataset_stats.pkl')
- with open(stats_path, 'rb') as f:
- stats = pickle.load(f)
- # 定义预处理和后处理函数
- pre_process = lambda s_qpos: (s_qpos - stats['qpos_mean']) / stats['qpos_std']
- post_process = lambda a: a * stats['action_std'] + stats['action_mean']
- 的
- # 加载环境
- if real_robot:
- from aloha_scripts.robot_utils import move_grippers # 从 aloha_scripts.robot_utils 导入 move_grippers
- from aloha_scripts.real_env import make_real_env # 从 aloha_scripts.real_env 导入 make_real_env
- env = make_real_env(init_node=True) # 创建真实机器人环境
- env_max_reward = 0
- else:
- from sim_env import make_sim_env # 从 sim_env 导入 make_sim_env
- env = make_sim_env(task_name) # 创建模拟环境
- env_max_reward = env.task.max_reward
- # 设置查询频率和时间聚合参数
- query_frequency = policy_config['num_queries']
- if temporal_agg:
- query_frequency = 1
- num_queries = policy_config['num_queries']
- # 设置最大时间步数
- max_timesteps = int(max_timesteps * 1) # 可以根据实际任务调整最大时间步数
- 设置评估的循环次数(num_rollouts),每次循环都会进行一次评估
在每次循环中,初始化环境,执行模型生成的动作并观测环境的响应
将每个时间步的观测数据(包括图像、关节位置等)存储在相应的列表中
计算每次评估的总回报,以及每次评估的最高回报,并记录成功率- # 设置回放次数和初始化结果列表
- num_rollouts = 50
- episode_returns = []
- highest_rewards = []
- # 回放循环
- for rollout_id in range(num_rollouts):
- rollout_id += 0
- # 设置任务
- if 'sim_transfer_cube' in task_name:
- BOX_POSE[0] = sample_box_pose() # 在模拟重置中使用的 BOX_POSE
- elif 'sim_insertion' in task_name:
- BOX_POSE[0] = np.concatenate(sample_insertion_pose()) # 在模拟重置中使用的 BOX_POSE
- ts = env.reset() # 重置环境
- # 处理屏幕渲染
- if onscreen_render:
- ax = plt.subplot()
- plt_img = ax.imshow(env._physics.render(height=480, width=640, camera_id=onscreen_cam))
- plt.ion()
- # 评估循环
- if temporal_agg:
- all_time_actions = torch.zeros([max_timesteps, max_timesteps+num_queries, state_dim]).cuda()
- qpos_history = torch.zeros((1, max_timesteps, state_dim)).cuda()
- image_list = [] # 用于可视化的图像列表
- qpos_list = []
- target_qpos_list = []
- rewards = []
- # 在不计算梯度的模式下执行
- with torch.inference_mode():
- for t in range(max_timesteps):
- # 更新屏幕渲染和等待时间
- if onscreen_render:
- image = env._physics.render(height=480, width=640, camera_id=onscreen_cam)
- plt_img.set_data(image)
- plt.pause(DT)
- # 处理上一时间步的观测值以获取 qpos 和图像列表
- obs = ts.observation
- if 'images' in obs:
- image_list.append(obs['images'])
- else:
- image_list.append({'main': obs['image']})
- qpos_numpy = np.array(obs['qpos'])
- qpos = pre_process(qpos_numpy)
- qpos = torch.from_numpy(qpos).float().cuda().unsqueeze(0)
- qpos_history[:, t] = qpos
- curr_image = get_image(ts, camera_names)
- # 查询策略
- if config['policy_class'] == "ACT":
- if t % query_frequency == 0:
- all_actions = policy(qpos, curr_image)
- if temporal_agg:
- all_time_actions[[t], t:t+num_queries] = all_actions
- actions_for_curr_step = all_time_actions[:, t]
- actions_populated = torch.all(actions_for_curr_step != 0, axis=1)
- actions_for_curr_step = actions_for_curr_step[actions_populated]
- k = 0.01
- exp_weights = np.exp(-k * np.arange(len(actions_for_curr_step)))
- exp_weights = exp_weights / exp_weights.sum()
- exp_weights = torch.from_numpy(exp_weights).cuda().unsqueeze(dim=1)
- raw_action = (actions_for_curr_step * exp_weights).sum(dim=0, keepdim=True)
- else:
- raw_action = all_actions[:, t % query_frequency]
- elif config['policy_class'] == "CNNMLP":
- raw_action = policy(qpos, curr_image)
- else:
- raise NotImplementedError
- # 后处理动作
- raw_action = raw_action.squeeze(0).cpu().numpy()
- action = post_process(raw_action)
- target_qpos = action
- # 步进环境
- ts = env.step(target_qpos)
- # 用于可视化的列表
- qpos_list.append(qpos_numpy)
- target_qpos_list.append(target_qpos)
- rewards.append(ts.reward)
- plt.close() # 关闭绘图窗口
- if real_robot:
- move_grippers([env.puppet_bot_left, env.puppet_bot_right], [PUPPET_GRIPPER_JOINT_OPEN] * 2, move_time=0.5) # 打开夹持器
- pass
如果指定了保存评估过程中的图像数据,将每次评估的图像数据保存为视频- # 计算回报和奖励
- rewards = np.array(rewards)
- episode_return = np.sum(rewards[rewards != None])
- episode_returns.append(episode_return)
- episode_highest_reward = np.max(rewards)
- highest_rewards.append(episode_highest_reward)
- print(f'Rollout {rollout_id}\n{episode_return=}, {episode_highest_reward=}, {env_max_reward=}, Success: {episode_highest_reward == env_max_reward}')
- # 保存视频
- if save_episode:
- save_videos(image_list, DT, video_path=os.path.join(ckpt_dir, f'video{rollout_id}.mp4'))
- 输出评估结果,包括成功率、平均回报以及回报分布
将评估结果保存到文本文件中- # 计算成功率和平均回报
- # 计算成功率,即最高奖励的次数与环境最大奖励相等的比率
- success_rate = np.mean(np.array(highest_rewards) == env_max_reward)
- # 计算平均回报
- avg_return = np.mean(episode_returns)
- # 创建一个包含成功率和平均回报的摘要字符串
- summary_str = f'\n成功率: {success_rate}\n平均回报: {avg_return}\n\n'
- # 遍历奖励范围,计算每个奖励范围内的成功率
- for r in range(env_max_reward + 1):
- # 统计最高奖励大于等于 r 的次数
- more_or_equal_r = (np.array(highest_rewards) >= r).sum()
- # 计算成功率
- more_or_equal_r_rate = more_or_equal_r / num_rollouts
- # 将结果添加到摘要字符串中
- summary_str += f'奖励 >= {r}: {more_or_equal_r}/{num_rollouts} = {more_or_equal_r_rate*100}%\n'
- # 打印摘要字符串
- print(summary_str)
- # 将成功率保存到文本文件
- result_file_name = 'result_' + ckpt_name.split('.')[0] + '.txt'
- with open(os.path.join(ckpt_dir, result_file_name), 'w') as f:
- f.write(summary_str) # 写入摘要字符串
- f.write(repr(episode_returns)) # 写入回报数据
- f.write('\n\n')
- f.write(repr(highest_rewards)) # 写入最高奖励数据
- # 返回成功率和平均回报
- return success_rate, avg_return
最终,函数返回成功率和平均回报。这些结果可以用于评估模型的性能
1.1.4 forward_pass
- def forward_pass(data, policy):
- image_data, qpos_data, action_data, is_pad = data
- image_data, qpos_data, action_data, is_pad = (
- image_data.cuda(),
- qpos_data.cuda(),
- action_data.cuda(),
- is_pad.cuda(),
- )
- return policy(qpos_data, image_data, action_data, is_pad)
这个函数用于执行前向传播(forward pass)操作,以生成模型的输出。它接受以下参数:
- data:包含输入数据的元组,其中包括图像数据、关节位置数据、动作数据以及填充标志
- policy:行为克隆模型
函数的主要步骤如下:
- 将输入数据转移到GPU上,以便在GPU上进行计算。
- 调用行为克隆模型的前向传播方法(policy),将关节位置数据、图像数据、动作数据和填充标志传递给模型
- 返回模型的输出,这可能是模型对动作数据的预测结果
1.1.5 train_bc
这个函数用于训练行为克隆(Behavior Cloning)模型。它接受以下参数:
- train_dataloader:训练数据的数据加载器,用于从训练集中获取批次的数据。
- val_dataloader:验证数据的数据加载器,用于从验证集中获取批次的数据。
- config:包含训练配置信息的字典
函数的主要步骤如下
- 初始化训练过程所需的各种参数和配置
- def train_bc(train_dataloader, val_dataloader, config):
- num_epochs = config["num_epochs"]
- ckpt_dir = config["ckpt_dir"]
- seed = config["seed"]
- policy_class = config["policy_class"]
- policy_config = config["policy_config"]
- set_seed(seed)
- 创建行为克隆模型,并根据是否存在之前的训练检查点来加载模型权重
- policy = make_policy(policy_class, policy_config)
- # if ckpt_dir is not empty, prompt the user to load the checkpoint
- if os.path.isdir(ckpt_dir) and len(os.listdir(ckpt_dir)) > 1:
- print(f"Checkpoint directory {ckpt_dir} is not empty. Load checkpoint? (y/n)")
- load_ckpt = input()
- if load_ckpt == "y":
- # load the latest checkpoint
- latest_idx = max(
- [
- int(f.split("_")[2])
- for f in os.listdir(ckpt_dir)
- if f.startswith("policy_epoch_")
- ]
- )
- ckpt_path = os.path.join(
- ckpt_dir, f"policy_epoch_{latest_idx}_seed_{seed}.ckpt"
- )
- print(f"Loading checkpoint from {ckpt_path}")
- loading_status = policy.load_state_dict(torch.load(ckpt_path))
- print(loading_status)
- else:
- print("Not loading checkpoint")
- latest_idx = 0
- else:
- latest_idx = 0
- 定义优化器,用于更新模型的权重
- policy.cuda()
- optimizer = make_optimizer(policy_class, policy)
- 进行训练循环,每个循环迭代一个 epoch,包括以下步骤:
验证:在验证集上计算模型的性能,并记录验证结果。如果当前模型的验证性能优于历史最佳模型,则保存当前模型的权重。
训练:在训练集上进行模型的训练,计算损失并执行反向传播来更新模型的权重
每隔一定周期,保存当前模型的权重和绘制训练曲线图- train_history = []
- validation_history = []
- min_val_loss = np.inf
- best_ckpt_info = None
- for epoch in tqdm(range(latest_idx, num_epochs)):
- print(f"\nEpoch {epoch}")
- # validation
- with torch.inference_mode():
- policy.eval()
- epoch_dicts = []
- for batch_idx, data in enumerate(val_dataloader):
- forward_dict = forward_pass(data, policy)
- epoch_dicts.append(forward_dict)
- epoch_summary = compute_dict_mean(epoch_dicts)
- validation_history.append(epoch_summary)
- epoch_val_loss = epoch_summary["loss"]
- if epoch_val_loss < min_val_loss:
- min_val_loss = epoch_val_loss
- best_ckpt_info = (epoch, min_val_loss, deepcopy(policy.state_dict()))
- print(f"Val loss: {epoch_val_loss:.5f}")
- summary_string = ""
- for k, v in epoch_summary.items():
- summary_string += f"{k}: {v.item():.3f} "
- print(summary_string)
- # training
- policy.train()
- optimizer.zero_grad()
- for batch_idx, data in enumerate(train_dataloader):
- forward_dict = forward_pass(data, policy)
- # backward
- loss = forward_dict["loss"]
- loss.backward()
- optimizer.step()
- optimizer.zero_grad()
- train_history.append(detach_dict(forward_dict))
- e = epoch - latest_idx
- epoch_summary = compute_dict_mean(
- train_history[(batch_idx + 1) * e : (batch_idx + 1) * (epoch + 1)]
- )
- epoch_train_loss = epoch_summary["loss"]
- print(f"Train loss: {epoch_train_loss:.5f}")
- summary_string = ""
- for k, v in epoch_summary.items():
- summary_string += f"{k}: {v.item():.3f} "
- print(summary_string)
- if epoch % 100 == 0:
- ckpt_path = os.path.join(ckpt_dir, f"policy_epoch_{epoch}_seed_{seed}.ckpt")
- torch.save(policy.state_dict(), ckpt_path)
- plot_history(train_history, validation_history, epoch, ckpt_dir, seed)
- ckpt_path = os.path.join(ckpt_dir, f"policy_last.ckpt")
- torch.save(policy.state_dict(), ckpt_path)
- 训练完成后,保存最佳模型的权重和绘制训练曲线图
- best_epoch, min_val_loss, best_state_dict = best_ckpt_info
- ckpt_path = os.path.join(ckpt_dir, f"policy_epoch_{best_epoch}_seed_{seed}.ckpt")
- torch.save(best_state_dict, ckpt_path)
- print(
- f"Training finished:\nSeed {seed}, val loss {min_val_loss:.6f} at epoch {best_epoch}"
- )
- # save training curves
- plot_history(train_history, validation_history, num_epochs, ckpt_dir, seed)
- return best_ckpt_info
1.1.6 plot_history
// 待更
第二部分 Mobile Aloha或Aloha软件层面代码的跑通与部署
// 待更
评论记录:
回复评论: