之前我们掌握了Ray Core的基本编程,我们已经学会了如何使用Ray API。现在,让我们将这些知识应用到一个更实际的场景中——构建一个强化学习项目,并且利用Ray来加速它。
我们的目标是,通过Ray的任务和Actor,将一个简单的强化学习问题并行化,整个过程代码量不超过250行。这个项目不仅适合没有RL经验的初学者,即使是经验丰富的专家也能从中获得新的视角。
在深入代码之前,我们先快速回顾一下强化学习的核心思想。它本质上是一种让机器像人一样学习的方法。想象一下训练一只小狗,你不会直接告诉它坐下,而是给它做对了就奖励,做错了就忽略。强化学习就是这个道理。它让智能体通过与环境的交互,不断尝试、观察结果、接收奖励或惩罚,最终学会如何采取行动以达到目标,比如最大化累积奖励。这里面的关键角色包括:执行动作的智能体、它所处的环境、以及环境对智能体行为的反馈,也就是奖励。理解了这些,我们就能更好地理解接下来的实现。
强化学习的应用远不止于识别图像。它真正擅长处理那些需要动态决策、长期规划和复杂交互的场景。比如,我们前面提到的植物养护机器人,它需要根据季节变化、天气状况、甚至虫害情况,自动调整浇水、施肥策略。再比如,管理一个复杂的供应链,系统需要根据实时的需求预测和库存水平,智能地决定何时补货、补多少。还有像著名的咖啡机测试,要求机器人能在陌生环境中找到咖啡机并完成冲泡。这些都是强化学习大显身手的舞台,它们都需要智能体在复杂环境中做出最优决策。
为了让大家更直观地理解,我们来看一个具体的例子:一个简单的5x5迷宫游戏。想象一个寻觅者,它的目标是找到迷宫里的一个特定目标。我们不预先设定路径,而是让寻觅者通过反复尝试,学习如何找到目标。这个过程会涉及到智能体的移动、状态的观察、奖励的获取,以及最终的决策。
我们将使用Ray来并行化这个模拟过程,让多个寻觅者同时探索,从而加速学习过程。
在实现这个迷宫游戏之前,我们先定义一个基础的动作空间。我们称之为Discrete类,它代表一组离散的、可选的动作。比如,我们的寻觅者可以向上、向下、向左或向右移动。这个类的设计非常灵活,它接受一个参数num_actions,表示动作的数量。这样,我们就可以轻松地扩展到更多不同的动作,而不仅仅是这四个方向。通过sample方法,我们可以随机选择一个动作,这对于智能体的探索和学习至关重要。
import random
class Discrete:
def __init__(self, num_actions: int):
self.n = num_actions
def sample(self):
return random.randint(0, self.n - 1)
space = Discrete(4)
print(space.sample())
现在我们有了动作空间,接下来就是构建迷宫本身,也就是Environment类。这个类包含了游戏的关键信息:寻觅者的当前位置seeker,目标的位置goal,以及寻觅者可以执行的动作空间action_space和它能观察到的状态空间observation_space。
import os
class Environment:
def __init__(self, *args, **kwargs):
self.seeker, self.goal = (0, 0), (4, 4)
self.info = {'seeker': self.seeker, 'goal': self.goal}
self.action_space = Discrete(4)
self.observation_space = Discrete(5*5)
def reset(self):
"""Reset seeker position and return observations."""
self.seeker = (0, 0)
return self.get_observation()
def get_observation(self):
"""Encode the seeker position as integer"""
return 5 * self.seeker[0] + self.seeker[1]
def get_reward(self):
"""Reward finding the goal"""
return 1 if self.seeker == self.goal else 0
def is_done(self):
"""We're done if we found the goal"""
return self.seeker == self.goal
def step(self, action):
"""Take a step in a direction and return all available information."""
if action == 0: # move down
self.seeker = (min(self.seeker[0] + 1, 4), self.seeker[1])
elif action == 1: # move left
self.seeker = (self.seeker[0], max(self.seeker[1] - 1, 0))
elif action == 2: # move up
self.seeker = (max(self.seeker[0] - 1, 0), self.seeker[1])
elif action == 3: # move right
self.seeker = (self.seeker[0], min(self.seeker[1] + 1, 4))
else:
raise ValueError("Invalid action")
obs = self.get_observation()
rew = self.get_reward()
done = self.is_done()
return obs, rew, done, self.info
def render(self, *args, **kwargs):
"""We override this method here so clear the output in Jupyter notebooks.
The previous implementation works well in the terminal, but does not clear
the screen in interactive environments.
"""
os.system('cls' if os.name == 'nt' else 'clear')
try:
from IPython.display import clear_output
clear_output(wait=True)
except Exception:
pass
grid = [['| ' for _ in range(5)] + ["|\n"] for _ in range(5)]
grid[self.goal[0]][self.goal[1]] = '|G'
grid[self.seeker[0]][self.seeker[1]] = '|S'
print(''.join([''.join(grid_row) for grid_row in grid]))
我们还定义了几个关键方法:reset()用于重置游戏状态;get_observation()将当前状态编码为一个整数;get_reward()判断是否到达目标并返回奖励;is_done()检查游戏是否结束;以及核心的step(action)方法,它根据动作更新状态,并返回新的状态、奖励、是否结束等信息。
这构成了我们游戏的基本交互逻辑。有了环境,我们还需要一个机制来模拟整个游戏过程,这就是Simulation类。它的主要任务是根据一个给定的策略Policy,模拟一个完整的寻宝游戏回合,直到找到目标或达到最大步数。
这个过程会收集一系列的经验数据,也就是在每个状态采取的动作、得到的奖励、以及下一个状态。这些经验数据是智能体学习的关键。Simulation类的rollout方法就是实现这个过程的,它会跟踪游戏状态,执行策略动作,收集经验,并在游戏结束时返回所有经验。
class Simulation(object):
def __init__(self, env):
"""Simulates rollouts of an environment, given a policy to follow."""
self.env = env
def rollout(self, policy, render=False, explore=True, epsilon=0.1):
"""Returns experiences for a policy rollout."""
experiences = []
state = self.env.reset()
done = False
while not done:
action = policy.get_action(state, explore, epsilon)
next_state, reward, done, info = self.env.step(action)
experiences.append([state, action, reward, next_state])
state = next_state
if render:
time.sleep(0.05)
self.env.render()
return experiences
策略Policy是智能体学习的核心。它决定了在给定状态下应该采取什么动作。最简单的策略就是维护一个状态-动作价值表,记录每个状态和动作组合的预期价值。我们的Policy类初始化时创建这样一个表,所有值都设为0。然后,get_action方法会根据当前状态查询这个表,选择价值最高的动作。这里引入了探索参数explore和epsilon,这是为了防止过早地陷入局部最优,即所谓的探索-利用权衡。有时我们需要随机尝试,即使当前策略看起来不错,以发现更好的可能性。
import numpy as np
class Policy:
def __init__(self, env):
"""A Policy suggests actions based on the current state.
We do this by tracking the value of each state-action pair.
"""
self.state_action_table = [
[0 for _ in range(env.action_space.n)]
for _ in range(env.observation_space.n)
]
self.action_space = env.action_space
def get_action(self, state, explore=True, epsilon=0.1):
"""Explore randomly or exploit the best value currently available."""
if explore and random.uniform(0, 1) < epsilon:
return self.action_space.sample()
return np.argmax(self.state_action_table[state])
有了经验数据,我们如何让智能体学习呢?这里我们采用经典的Q-Learning算法。Q-Learning的核心在于更新那个状态-动作价值表。当智能体经历了一个状态、动作、奖励、下一个状态的序列后,它会更新当前状态和动作的价值。这个更新公式是关键:新价值等于旧价值乘以一个权重,加上新奖励加未来预期价值的折扣,再乘以另一个权重。这个公式平衡了当前的经验和对未来可能的期望。通过不断重复这个过程,智能体就能逐渐学会哪个动作在哪个状态下是最佳的。
现在我们来谈谈如何利用Ray来加速这个过程。我们注意到,模拟一个游戏回合是独立的,可以并行执行。因此,我们可以将Simulation类封装成一个Ray Actor。通过简单的@ray.remote装饰器,我们就能让这个类在Ray集群中运行,成为一个独立的进程。这样,我们就可以创建多个SimulationActor,它们可以同时运行,模拟不同的游戏回合,大大缩短训练时间。这对于需要大量模拟的强化学习任务来说,效率提升是巨大的。具体怎么用Ray Actor来加速呢?我们修改了之前的训练函数,创建了多个SimulationActor。我们把策略对象先用ray.put存到Ray对象存储里,这样所有Actor都能共享。然后在每个训练回合,我们让多个Actor同时调用rollout远程方法,传入策略对象的引用。由于是并行执行的,有些Actor可能会先完成,所以我们用ray.wait来等待一批完成的,然后用ray.get获取结果并更新策略。这样,我们就实现了并行训练,大大提高了效率。整个代码结构和之前的串行版本非常相似,只是把单个模拟换成了多个Actor。
通过这个迷宫寻宝的例子,我们看到了如何利用Ray Core来构建一个分布式强化学习应用。核心在于,Ray API让我们能够非常方便地将串行的代码转换为并行的分布式任务,而无需过多关注底层的分布式细节。这让我们能够专注于业务逻辑本身,比如如何定义环境、策略和奖励。当然,Ray还提供了更高级的库,比如RLLib,它提供了更丰富的强化学习算法和模型,可以处理更复杂的场景。
RLib强化学习
上节课我们深入探讨了构建分布式应用和强化学习的基本原理,现在,我们来聚焦一个强大的工具,它能帮助我们把这些想法落地,并且在实际应用中发挥威力。这就是Ray RLib。它不是从零开始,而是站在巨人肩膀上,专注于构建大规模、高性能的强化学习算法,提供了一个非常成熟和易用的框架。
RLib之所以能成为工业级解决方案,很大程度上得益于它与Ray生态系统的紧密集成。这意味着什么?
它天生就具备了Ray的分布式处理能力,你可以轻松地将你的强化学习训练任务扩展到多台机器上,充分利用集群资源。
它不是孤立存在的,而是与Ray家族的其他成员无缝协作。比如,你可以用Ray Tune来自动调优RLib算法的超参数,让模型性能达到最佳;训练好的模型可以直接通过Ray Serve部署到生产环境,真正实现从研发到应用的闭环。对于很多企业来说,选择一个深度学习框架是一个关键决策,一旦选定,就很难轻易改变。RLib在这方面做得非常出色。它同时支持PyTorch和TensorFlow这两个主流框架,而且切换起来非常方便,通常只需要修改一行代码就能完成。这极大地降低了企业在技术选型和迁移上的风险和成本,让开发者能够更专注于业务逻辑和算法本身,而不是被底层框架的兼容性问题所困扰。
RLib不仅仅是一个理论上的概念,它已经在解决实际问题中证明了其价值。很多公司已经在生产环境中使用RLib来部署和运行他们的强化学习工作负载。这背后的原因是它提供了恰到好处的抽象层次,既足够高,让开发者能够快速上手,又足够灵活,能够满足各种复杂的需求。当然,除了这些通用优势,RLib还内置了丰富的强化学习算法库,以及对各种环境类型的强大支持,这些都是我们后续要深入探讨的。
好,理论讲了这么多,我们来看怎么实际操作。
首先,确保你的环境安装了RLib,命令很简单
pip install ray[rllib]
你需要准备一个环境。在RL中,有一个事实标准叫做Gym,它定义了一个通用的环境接口。这个接口很简单,包括动作空间、观察空间、step方法执行动作、reset方法重置环境、render方法渲染环境。如果你之前自己实现了一个环境,比如我们第三章的迷宫游戏,你需要把它适配成符合Gym接口的格式,这样RLib就能识别和使用了。
RLib提供了命令行界面,简称CLI,让你快速启动和运行实验。最常用的就是rllib train命令。你可以把训练配置写在一个Python文件里,比如我们这里用的maze.py,它定义了使用DQN算法,并指定了我们要训练的环境是maze_gym_env.GymEnvironment。
from ray.rllib.algorithms.dqn import DQNConfig
config = DQNConfig().environment("maze_gym_env.GymEnvironment")\
.rollouts(num_rollout_workers=0)
然后,通过rllib train maze.py --stop ‘{“timesteps_total”: 10000}’,就能启动训练,指定训练10000步后停止。
rllib train file maze.py --stop '{"timesteps_total": 10000}'
训练完成后,你可以用rllib evaluate命令来评估模型的性能,输出结果非常直观,比如每条episode的奖励值。
rllib evaluate ~/ray_results/maze_env/<checkpoint> \
--algo DQN \
--env maze_gym_env.Environment \
--steps 100
虽然CLI很方便,但如果你想更精细地控制训练过程,或者进行更复杂的实验,Python API才是更强大的工具。Python API的核心是Algorithm类。你通过一个对应的AlgorithmConfig类来配置你的算法,比如DQNConfig。你可以像之前那样指定环境、设置rollout worker的数量,然后通过config.build方法创建出Algorithm实例。
from ray.tune.logger import pretty_print
from maze_gym_env import GymEnvironment
from ray.rllib.algorithms.dqn import DQNConfig
config = (DQNConfig().environment(GymEnvironment)
.rollouts(num_rollout_workers=2, create_env_on_local_worker=True))
pretty_print(config.to_dict())
algo = config.build()
for i in range(10):
result = algo.train()
print(pretty_print(result))
之后,你可以直接调用algo.train来启动训练,或者通过algo.get_policy获取策略,algo.get_weights获取模型参数,进行更深入的分析和调试。
使用Python API,训练过程非常直观。你调用algo.train,它会迭代地优化你的模型。训练过程中,你可以随时调用algo.evaluate来检查模型的当前表现,比如平均奖励、最大奖励、最小奖励等。为了防止训练中断,或者想保存中间状态,你可以随时调用algo.save来保存模型检查点。如果需要恢复训练,或者加载一个已有的模型进行评估,可以使用Algorithm.from_checkpoint方法。这个流程非常清晰,让你对训练过程的每一个环节都了如指掌。RLlib不仅让你能跑起来,还能让你深入到模型内部。你可以直接调用algo.compute_single_action或compute_actions方法,根据当前的观察状态,让模型给出下一步的动作。如果你想看看模型的内部状态,比如它的权重,可以调用algo.get_policy()获取策略,再调用policy.get_weights()查看。
from ray.rllib.algorithms.algorithm import Algorithm
checkpoint = algo.save()
print(checkpoint)
evaluation = algo.evaluate()
print(pretty_print(evaluation))
algo.stop()
restored_algo = Algorithm.from_checkpoint(checkpoint)
algo = restored_algo
更进一步,你可以查看模型的底层结构,比如神经网络的层数、参数量,通过调用model.base_model.summary()就能看到详细的模型概览。在强化学习中,价值函数是核心概念之一。Q-Value,也就是状态-动作价值,它衡量了在某个状态下采取某个动作的期望回报。而Value Function,状态价值,则衡量了某个状态本身的期望回报。RLib的模型通常会同时预测这两种价值。你可以通过model.q_value_head.summary()和model.state_value_head.summary()来查看模型中负责预测这两种价值的网络结构,这有助于理解模型是如何学习和决策的。
RLlib之所以强大,很大程度上在于它提供了极其灵活的配置能力。所有的配置都通过AlgorithmConfig类来完成。你可以看到,它把不同的配置选项分门别类地组织起来,比如training方法控制训练参数,environment方法控制环境,rollouts方法控制采样worker,exploration方法控制探索策略,resources方法控制资源分配,甚至还有offline_data和multi_agent方法来处理离线数据和多智能体问题。这种模块化的设计让你能够精确地控制实验的每一个细节。
资源分配是训练效率的关键。你可以通过resources方法来指定总的GPU数量,以及每个rollout worker可以使用的CPU和GPU。这在需要大量计算资源的场景下非常重要。同时,rollouts方法让你控制rollout worker的数量,以及每个worker上并行运行的环境数量。比如,num_envs_per_worker等于10意味着每个worker可以同时运行10个环境实例,这对于加速采样非常有效。
create_env_on_local_worker选项则允许你在本地worker上也创建一个环境,方便调试。环境配置是连接RL算法与真实世界或模拟环境的桥梁。你可以用env参数指定要使用的环境,可以是Gym注册的名称,也可以是你自定义的环境类。env_config参数可以传递给环境的初始化方法,用于设置环境的特定参数。你还可以显式地定义observation_space和action_space,或者让RLlib自动推断。render_env参数则控制是否在训练过程中可视化环境,这对于调试和理解模型行为非常有用。
RLlib支持的环境类型远不止我们熟悉的Gym环境。它有一个BaseEnv基类,所有环境都继承自它。
- VectorEnv是将多个Gym环境打包起来,实现并行执行,提高采样效率。
- MultiAgentEnv则专门用于处理多智能体问题,这是RLlib的一个重要特色。
- ExternalEnv和ExternalMultiAgentEnv则允许你将RLlib与外部的模拟器或控制系统连接起来,实现更复杂的系统集成。
多智能体强化学习(MARL)是一个非常复杂但又极具应用价值的领域。RLlib通过MultiAgentEnv类提供了很好的支持。你需要为每个智能体分配一个ID,然后在step、reset等方法中,所有返回值如观察、奖励、done状态都变成了一个字典,键就是智能体的ID。更关键的是,你可以通过multi_agent方法来精细地控制哪些智能体使用哪个策略。比如,你可以让所有智能体共享一个策略,也可以让每个智能体学习一个独特的策略,甚至可以混合使用,这为解决复杂的协作或竞争问题提供了强大的工具。
在某些场景下,比如你的环境模拟器运行在特定的硬件上,或者需要与外部系统交互,你可能希望将环境和训练逻辑分开。RLlib提供了Policy Server和Policy Client的模式来实现这一点。你可以把训练算法和策略推理放在一个Ray集群上运行,作为Policy Server,而让环境和客户端交互放在另一个地方,比如一个资源有限的机器上。它们通过一个简单的REST API进行通信。客户端把环境的观察信息发送给服务器,服务器返回动作,客户端再把环境的反馈信息如奖励、done状态返回给服务器。这种架构使得系统更加灵活和可扩展。
RLlib还支持一些非常前沿的强化学习概念。比如课程学习,它模拟了人类学习的过程,先从简单的任务开始,逐步过渡到更复杂的任务。这对于训练那些一开始难以解决的复杂问题非常有效。另一个重要概念是离线数据,你可以预先收集一些数据,比如专家演示数据,或者通过其他方式生成的数据,然后用RLlib的算法来训练模型,而无需实时与环境交互。这非常适合用于模仿学习,即让模型学习模仿人类或其他智能体的行为。
今天我们深入探讨了Ray RLib的核心功能和特性。从它的分布式架构、与Ray生态的紧密集成,到灵活的API、强大的多智能体支持、以及课程学习和离线数据处理等高级特性,RLib展现了一个成熟且功能强大的强化学习框架。它不仅降低了开发门槛,也为解决复杂问题提供了强大的工具。希望今天的介绍能帮助大家更好地理解并利用RLib来构建自己的强化学习应用。
Tune 超参调优
现在我们来聊聊如何更高效地调优这些模型,特别是当模型变得复杂,或者我们希望在有限的资源和时间下获得最佳性能时,超参数优化就显得尤为重要。今天,我们就来介绍一个强大的工具——Ray Tune。
为什么我们要花大力气去调优超参数?因为很多时候,仅仅选择一个算法,比如DQN,是远远不够的。就像我们之前在RLib里看到的,即使使用了DQN,也需要配置像rollout workers数量这样的参数。这些参数,我们称之为超参数,它们直接决定了模型的性能上限。尤其是在面对复杂任务时,模型的参数空间往往非常庞大,可能包含成百上千个参数,而且它们之间可能还相互影响。如果只是简单地随机尝试,或者凭感觉调整,那效率太低了,而且很可能找不到最优解。
我们的目标是找到那个最佳的超参数组合,让模型性能达到顶峰,同时尽可能地节省宝贵的计算资源和时间。最简单粗暴的方法是什么?就是随机搜索。每次随机抽取一组参数,跑一遍,然后记录结果。听起来不错,但实际操作起来问题多多。首先,效率极低。参数空间一旦变大,组合数量就呈指数级增长,你可能要跑成千上万次才能找到一个不错的组合。其次,它很容易陷入局部最优,找不到全局最优解。更重要的是,对于很多机器学习任务,尤其是深度学习,训练一次模型可能需要很长时间,比如几个小时甚至几天。在这种情况下,随机搜索简直就是一场灾难,你可能还没找到好参数,时间就耗尽了。
面对随机搜索的种种困境,我们该怎么办呢?这就需要 Ray Tune 登场了。**Ray Tune 是一个专门为分布式超参数优化设计的框架。它的核心优势在于分布式并行计算。**它利用 Ray 集群,可以将大量的实验任务并行分配到不同的节点上同时运行,极大地缩短了整体的优化时间。这就像把一个大任务拆分成多个小任务,然后交给多台机器同时处理,效率自然就上去了。同时,Tune 不仅支持随机搜索,还集成了非常丰富的算法库,比如 Hyperopt、Optuna 等,提供了贝叶斯优化、网格搜索等多种策略,你可以根据具体问题选择合适的算法。而且,Tune 的 API 设计得非常友好,上手简单,还提供了回调、检查点等实用功能,让整个超参数优化实验的管理变得更加高效和可控。
要理解 Tune 的工作原理,我们需要掌握几个核心概念。
- Search spaces,你可以把它想象成一个地图,标明了我们想要探索的参数有哪些,以及它们的取值范围和采样方式。比如,我们可以用 tune.uniform 来指定一个参数在0到1之间均匀取值,用 tune.randint 指定一个整数范围。这个空间定义了我们探索的范围。
- Trainables,也就是我们常说的目标函数。这个函数就是用来评估我们当前参数组合下模型性能的。在 Tune 的世界里,我们只需要定义一个函数,接收一个参数配置字典,然后返回一个评估结果,比如损失值或者准确率。Tune 会自动处理并行执行,我们不需要手动加 ray.remote 装饰器。
- Trials,它代表了我们针对每个参数组合进行的一次完整实验。Tune 会自动调度这些试运行,利用集群资源并行执行。每个试运行都会记录下它的参数配置和最终的评估结果。当我们调用 tune.run 完成所有试运行后,会得到一个分析结果,英文是 Analyses,它汇总了所有试运行的数据,包括每个试运行的参数、结果、日志等等。我们可以利用这个分析结果来找到最佳的超参数组合,甚至可以进行可视化分析。
- Schedulers,这个组件负责控制试运行的执行策略。比如,它可以决定何时启动新的试运行,何时停止那些看起来不太有希望的试运行。通过调度器,我们可以实现更智能的优化,比如早期停止,从而显著提升效率。
把这些组件串起来看,Tune 的工作流程就非常清晰了。我们
- 首先定义好搜索空间,告诉 Tune 我们要探索哪些参数。
- 然后,我们定义一个可训练对象,也就是评估函数,用来衡量参数好坏。
- Tune 会根据搜索空间和评估函数,生成一系列的试运行,也就是不同的参数组合。
- 调度器会负责管理这些试运行的执行,比如决定哪些先跑,哪些后跑,或者哪些跑得不好就赶紧停止。
- 最后,所有试运行结束后,Tune 会提供一个分析结果,让我们能够找到那个表现最好的参数组合。
整个过程,Tune 的核心就是自动化、并行化和智能化。刚才我们提到了搜索算法,Tune 默认支持随机搜索,但显然不是最聪明的。除了随机搜索,还有网格搜索,它会穷举所有可能的参数组合,虽然系统性好,但在高维空间里,计算量会爆炸,基本不可行。真正强大的是贝叶斯优化。它不是盲目地随机采样,而是基于之前试运行的结果,不断学习和调整探索策略,倾向于探索那些看起来更有希望的区域,从而更快地收敛到最优解。
Tune 内置了多种贝叶斯优化算法,比如 TPE。更厉害的是,Tune 还集成了大量的第三方优化库,像 Hyperopt、Optuna、Ax、SigOpt 等等。这意味着你可以轻松地利用这些强大的工具,而不需要自己再去学习它们的 API,直接在 Tune 的框架下就能使用它们。
调度器是 Tune 中另一个非常重要的组成部分,它能显著提升实验效率。一个核心思想是早期停止。想象一下,我们正在训练一个模型,如果发现某个参数组合下的性能提升非常缓慢,甚至停滞不前,那我们还有必要继续跑下去吗?当然没必要。调度器就可以监测到这种情况,及时停止这个试运行,把宝贵的资源比如GPU、CPU时间分配给更有潜力的参数组合。
HyperBand 是一个非常流行的调度器,它能动态地调整每个试运行的训练时间,让资源分配更加智能。除了 HyperBand,Tune 还提供了 Median Stopping Rule、ASHA、PBT、PB2 等多种调度器,你可以根据具体任务和数据集的特点,选择最合适的调度器来加速你的超参数优化过程。除了基本的搜索、调度和分析,Tune 还提供了一些非常实用的高级功能。
- 首先是资源管理。你可以精确地控制每个试运行需要多少 CPU、GPU 以及内存,甚至可以使用小数 GPU,比如 0.5 GPU,实现资源的共享。你还可以通过 max_concurrent_trials 参数来限制同时运行的试运行数量,避免资源过度占用。
- 其次是回调机制。你可以自定义一些回调函数,比如在每次试运行结果更新时,或者试运行出错时,执行一些操作,比如记录日志、打印信息、或者进行更复杂的监控。这大大增强了 Tune 的灵活性。
- 最后是检查点。Tune 会自动保存实验的中间状态,这样即使实验中途中断,比如断电或者网络问题,你也可以通过 resume 参数轻松地从上次保存的位置继续运行,保证了实验的可靠性和可重复性。
理论讲完了,我们来看几个实际应用。首先是 Ray RLib。由于 RLLib 是 Ray 生态的一部分,它和 Tune 的集成非常自然。我们只需要在 tune.run 的第一个参数里,直接传入 RLLib 的算法名称,比如 DQN,或者对应的 Trainer 类,Tune 就能自动识别并进行优化。
配置文件 config 就是我们熟悉的 RLLib 配置,但我们可以用 Tune 的搜索空间语法,比如 tune.uniform 或者 tune.choice,来定义需要优化的超参数,比如学习率 lr 或者训练批次大小 train_batch_size。
Tune 会自动帮你尝试不同的组合,找到最佳的 DQN 参数。你看这个例子,我们只需要几行代码,就能让 Tune 去优化 CartPole 环境下的 DQN 算法。
from ray import tune
analysis = tune.run(
"DQN",
metric="episode_reward_mean",
mode="max",
config={
"env": "CartPole-v1",
"lr": tune.uniform(1e-5, 1e-4),
"train_batch_size": tune.choice([10000, 20000, 40000]),
},
)
除了强化学习,Tune 在监督学习领域也大有用武之地。比如,我们常用的深度学习框架 Keras。Tune 对 Keras 的支持也非常完善。你可以直接使用 TuneReportCallback,这是一个 Keras 的回调函数,它可以自动地把 Keras 模型训练过程中产生的指标,比如准确率,报告给 Tune。这样,你就可以在 Tune 的框架下,轻松地优化 Keras 模型的超参数了。
在这个例子中,我们用 Tune 来优化一个经典的 MNIST 手写数字识别模型。
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense, Dropout
from ray.tune.integration.keras import TuneReportCallback
def objective(config):
(x_train, y_train), (x_test, y_test) = load_data()
model = Sequential()
model.add(Flatten(input_shape=(28, 28)))
model.add(Dense(config["hidden"], activation=config["activation"]))
model.add(Dropout(config["rate"]))
model.add(Dense(10, activation="softmax"))
model.compile(loss="categorical_crossentropy", metrics=["accuracy"])
model.fit(x_train, y_train, batch_size=128, epochs=10,
validation_data=(x_test, y_test),
callbacks=[TuneReportCallback({"mean_accuracy": "accuracy"})])
我们想调整的是隐藏层的激活函数、Dropout 率以及隐藏层的神经元数量。通过 Tune 和 Hyperopt 的结合,我们可以高效地找到最佳的超参数组合,提升模型的分类精度。
from ray import tune
from ray.tune.suggest.hyperopt import HyperOptSearch
initial_params = [{"rate": 0.2, "hidden": 128, "activation": "relu"}]
algo = HyperOptSearch(points_to_evaluate=initial_params)
search_space = {
"rate": tune.uniform(0.1, 0.5),
"hidden": tune.randint(32, 512),
"activation": tune.choice(["relu", "tanh"])
}
analysis = tune.run(
objective,
name="keras_hyperopt_exp",
search_alg=algo,
metric="mean_accuracy",
mode="max",
stop={"mean_accuracy": 0.99},
num_samples=10,
config=search_space,
)
print("Best hyperparameters found were: ", analysis.best_config)
数据处理
光有好的模型还不够,机器学习的基石是什么?毫无疑问,是数据!数据的质量和处理效率直接决定了模型的上限。因此,今天我们将聚焦于Ray生态系统中的数据处理核心组件——Ray Data。
Ray Data的核心是Ray Datasets。你可以把它想象成Ray集群内部数据流动的通用语言和标准接口。它不是要取代Spark或Hadoop,而是专注于提供一个轻量级、高性能的框架,用于数据的加载、转换和传递。它底层巧妙地利用了Apache Arrow,这是一种高效的列式存储格式,使得与NumPy、Pandas等库的交互变得顺畅。
更重要的是,数据集本质上是由一系列数据块组成的,这些数据块存储在Ray的共享内存对象存储中,这意味着数据在不同任务和Actor之间传递时,几乎不需要复制,极大地提升了效率和可扩展性。这就像搭积木一样,Ray Datasets就是那个最基础、最通用的积木块,让各种Ray库都能方便地拼接起来。
Ray Datasets之所以强大,主要归功于它的两大核心优势:灵活性和性能。
首先是灵活性。它能处理各种格式的数据,无论是CSV、JSON、Parquet还是自定义格式,它都能应对。更重要的是,它能与像Dask on Ray这样的外部库无缝集成,让你在Ray的框架下也能享受到Dask的高级数据处理能力。而且,由于数据是通过对象引用传递的,而不是实际复制,这大大减少了内存开销,使得数据在不同计算节点间高效流转。
其次是性能,尤其是在机器学习场景下。它原生支持GPU加速,能够利用流水线处理技术,让数据处理阶段重叠执行,而不是傻等上一个阶段完成,这大大提高了吞吐量。还有全局随机shuflle,这对训练模型至关重要。
总而言之,Ray Datasets是为那些需要高效处理大规模数据的机器学习和数据密集型应用量身打造的。好,理论讲完了,我们来看点实际操作。
import ray
# Create a dataset containing integers in the range [0, 10000).
ds = ray.data.range(10000)
# Basic operations: show the size of the dataset, get a few samples, print the schema.
print(ds.count()) # -> 10000
print(ds.take(5)) # -> [0, 1, 2, 3, 4]
print(ds.schema()) # -> <class 'int'>
创建一个最简单的Dataset,就像这样:ds等于ray.data.range(10000)。它创建了一个包含0到9999整数的Dataset。然后,你可以用几个简单的命令来了解它:ds.count()告诉你总共有多少条记录,take(5)给你看几个样本,ds.schema()则显示了数据的类型。
# Save the dataset to a local file and load it back.
ray.data.range(10000).write_csv("local_dir")
ds = ray.data.read_csv("local_dir")
print(ds.count())
当然,实际工作中数据通常来自外部存储。Ray Datasets提供了非常方便的读写接口,比如write_csv和read_csv。你可以把数据写入本地文件,也可以直接读取S3上的数据。它会自动处理并行读写,充分利用集群资源。这就像给数据装上了一个高速通道,让它们可以快速进出你的计算流程。
有了数据集,我们自然要进行各种变换。Ray Datasets提供了丰富的内置操作,让你可以像搭积木一样组合数据。
- unions 将两个数据集合并成一个更大的数据集。
- filter 操作可以根据你指定的条件过滤掉不需要的数据。
- sort 操作可以按某个字段对数据进行排序。
这些操作都非常直观,如果你熟悉Spark RDD或者类似的概念,会觉得非常熟悉。它们都是基于函数式编程思想,链式调用,非常简洁。当然,这只是冰山一角,还有 groupby、sum、min 等等聚合操作,甚至可以自定义聚合函数。
在深入数据转换之前,我们得理解一个关键概念:数据块。数据块是构成数据集的最小单元,所有的计算操作,比如map、filter,都是在这些块上并行执行的。块的数量直接影响到并行计算的效率。如果块数太少,比如只有10个,那可能就只有10个任务在跑,资源利用不充分;如果块数太多,比如每个块只有10条数据,那每个操作的开销都很大,反而会降低效率。所以,找到一个合适的块数很重要。
Ray Datasets提供了 repartition 方法,让你可以精确控制数据块的数量。比如,你可以把一个合并后块数增加的数据集,重新调整回原来的块数,以保持最佳的并行度。这就像调整生产线的节奏,确保每个环节都能高效运转。前面我们处理的数据比较简单,都是整数。
但在实际应用中,数据通常是结构化的,比如包含用户ID、商品名称、价格等等。这时候,我们需要定义数据模式,也就是Schema。Schema告诉你数据有哪些列,每列是什么类型的。Ray Datasets支持从多种来源创建带Schema的Dataset。最简单的方式是用from_items,传入一个Python字典的列表,它会自动推断出Schema。你也可以直接从Pandas DataFrame创建,或者从Parquet文件读取。
ds = ray.data.from_items([{"id": "abc", "value": 1}, {"id": "def", "value": 2}])
print(ds.schema()) # -> id: string, value: int64
得益于Arrow格式的支持,Dataset可以在这些不同类型的结构化数据之间轻松转换,比如ds.to_pandas()或者ds.to_arrow()。这保证了数据在不同处理阶段的兼容性,就像一个通用的翻译器,让不同格式的数据都能顺畅交流。
pandas_df = ds.to_pandas() # pandas_df will inherit the schema from our Dataset.
Ray Datasets的强大之处不仅在于它能处理各种数据,更在于它能让你在上面高效地进行各种计算。最基础的计算方式是 map,它会将你提供的函数应用到每个数据项上。但如果你的函数是向量化的,比如在NumPy上运行,那用 map_batches 就更高效了。map_batches 一次处理一批数据块,利用了向量化的计算优势,速度更快。
对于需要GPU加速的场景,比如深度学习推理,map_batches 结合 Actor 模式就派上用场了。你可以创建一个 Actor 类,它负责加载模型到GPU上,然后在处理每个数据块时调用这个 Actor。由于 Actor 是长驻内存的,模型加载只需要一次,后续的推理计算就能充分利用GPU的性能。这就像给每个GPU分配一个专属的工人,负责处理一批批的数据。
import numpy as np
ds = ray.data.range(10000).map_batches(lambda batch: np.square(batch).tolist())
ds.take(5) # -> [0, 1, 4, 9, 16]
现在我们来聊聊一个非常重要的优化技巧:Dataset Pipeline。想象一下,你有一个数据处理流程,比如读取数据、预处理、GPU推理、再写入。如果用传统的串行方式,每个步骤完成后才开始下一个,那中间的资源,比如CPU、GPU、网络带宽,就会有大量闲置时间。这就像工厂流水线,如果每个工序都等上一个工序完全做完,那效率肯定不高。
而 Dataset Pipeline 就是解决这个问题的利器。它通过 window 方法,让数据处理变成流水线模式:读取一批数据,同时开始预处理,预处理完成后,GPU推理开始,同时下一批数据的预处理也在进行,以此类推。这样,各个阶段的资源就能重叠使用,最大限度地减少等待时间,提高整体吞吐量。这对于需要处理大量数据,特别是计算密集型任务,比如深度学习推理,意义重大。它能显著提升效率,降低计算成本。
理论讲了这么多,我们来看一个实际应用:并行训练多个分类器。这在超参数调优中非常常见。比如,我们要尝试不同的正则化参数 alpha,训练多个模型。怎么做?
X_train, X_test, Y_train, Y_test = train_test_split(
*datasets.make_classification()
)
train_ds = ray.data.from_items(list(zip(X_train, Y_train)))
shards = (train_ds.repeat(10)
.random_shuffle_each_window()
.split(len(workers), locality_hints=workers))
ray.get([worker.train.remote(shard) for worker, shard in zip(workers, shards)])
首先,用 Ray Datasets 加载并预处理好数据。然后,关键一步:数据分片。对于一个大型数据集,我们不可能把所有数据都加载到一个机器上训练,所以需要把数据分成几个小块,每个块分配给一个训练Worker。每个Worker负责训练一个模型,比如使用不同的alpha值。Ray Datasets 的优势在于,它可以让多个 Worker 共享同一个数据集,只需要加载一次数据,然后分发数据块,这样就避免了重复加载数据的开销。
同时,它还能轻松实现数据分片和全局随机洗牌,保证每个Worker都能拿到训练所需的随机数据子集。这就像一个高效的调度中心,把数据和任务合理分配给各个工人。虽然 Ray Datasets 提供了强大的数据处理能力,但它的定位是专注于最后一公里的处理,比如在模型训练前的数据清洗和特征工程。
对于更复杂的数据分析任务,比如复杂的SQL查询、复杂的DataFrame操作,Ray Datasets 可能就显得力不从心了。幸运的是,Ray 生态系统提供了丰富的外部库集成,弥补了这一不足。比如 Dask on Ray,它让你在 Ray 集群上运行 Dask 的任务图,享受 Dask 的高级数据处理能力,同时又利用 Ray 的调度和共享内存。还有 RayDP,也就是 Spark on Ray,让你在 Ray 上运行 Spark 的作业。Modin 则提供了 Pandas 的分布式版本。这些集成就像给 Ray Datasets 装上了各种高级工具,让它在处理复杂数据时也能游刃有余,同时又能保持 Ray 的高效和可扩展性。
构建一个完整的机器学习管道,从数据收集到模型训练再到部署,往往是一个复杂的过程。传统方法通常需要将不同的系统,比如 ETL 系统、数据仓库、Spark、TensorFlow 等,通过 Airflow 等工作流引擎串联起来。这不仅增加了系统的复杂性,而且在每个阶段之间,都需要将数据写入外部存储,然后再读取,这带来了巨大的开销和延迟。
而 Ray 的优势在于,它允许你用一个 Python 脚本,构建一个端到端的 ML 管道。Ray Datasets 就是这个管道中的核心粘合剂,它负责高效地加载、预处理、转换数据,并在不同的计算阶段,比如特征工程、模型训练、推理,之间传递数据,而且大部分数据都保持在内存中。这大大简化了流程,降低了延迟,提高了效率。你可以把整个流程想象成一个高度集成的工厂,数据在其中顺畅流转,无需频繁地进出仓库。