SchedulerInterface定义了vLLM Scheduler的接口,源代码:vllm\v1\core\sched\interface.py。
schedule
schedule方法产生当前Step(一次模型前向传播)的调度结果:SchedulerOutput。
class SchedulerInterface(ABC):
@abstractmethod
def schedule(self) -> "SchedulerOutput":
raise NotImplementedError
SchedulerOutput的定义:
@dataclass
class SchedulerOutput:
scheduled_new_reqs: list[NewRequestData] # 新请求
scheduled_cached_reqs: CachedRequestData # 已调度过的请求
num_scheduled_tokens: dict[str, int] # 请求需要生成的token数字典
total_num_scheduled_tokens: int # 所有请求需要生成的token总数
scheduled_spec_decode_tokens: dict[str, list[int]] # 请求投机推理的前置token,len(scheduled_spec_decode_tokens[req_id] = num_scheduled_tokens[req_id] - 1)
scheduled_encoder_inputs: dict[str, list[int]] # 请求要处理的scheduled_new_reqs中的mm_positions中的多模态输入的id字典。
num_common_prefix_blocks: list[int] # 每个kv cache group中的common prefix的block数量。
finished_req_ids: set[str] # 上一次调度后已经完成的req_id,用于通知worker释放资源
free_encoder_input_ids: list[tuple[str, int]] # 可以释放的[req_id, encoder_inputs_id]列表,用于释放Encoder Cache
structured_output_request_ids: dict[str, int] # 需要结构化输出的req_id在batch中的index,用于计算grammar_bitmask
grammar_bitmask: Optional[npt.NDArray[np.int32]] # batch中需要使用XGrammar做结构化输出的mask
kv_connector_metadata: Optional[KVConnectorMetadata] = None # 抽象基类定义PD分离场景下PD之间KV Cache通信所需的元数据结构
scheduled_new_reqs
第一次被调度的新请求:
class NewRequestData:
req_id: str
prompt_token_ids: list[int]
mm_inputs: list[MultiModalKwargs]
mm_hashes: list[str]
mm_positions: list[PlaceholderRange]
sampling_params: Optional[SamplingParams]
pooling_params: Optional[PoolingParams]
block_ids: tuple[list[int], ...]
num_computed_tokens: int
lora_request: Optional[LoRARequest]
其中:
- req_id:请求ID。
- prompt_token_ids: Prompt的 token ID 列表。
- mm_inputs: 多模态输入的 keyword 参数列表。
- mm_hashes: 多模态输入的哈希值列表。
- mm_positions: 多模态输入在Prompt中的位置信息列表。示例:
Prompt: `AAAA BBBB What is in these images?`
mm_positions参数:
```
A: PlaceholderRange(offset=0, length=4)
B: PlaceholderRange(offset=5, length=4)
```
"""
- sampling_params:采样参数。存储生成文本时的采样参数,例如温度(temperature)、Top-K、Top-P 等。
- pooling_params:池化参数,如池化类型(max pooling、mean pooling 等)。池化操作可以减少输入的维度,提高模型的效率。
- block_ids:存储与请求相关的块ID。vLLM用块来管理内存。
- num_computed_tokens:当前请求已生成的token数量。
- lora_request:LORA参数。
scheduled_cached_reqs
之前已经被调度过的请求:
class CachedRequestData:
req_ids: list[str]
# If resumed_from_preemption is False, new_block_ids will be appended to
# the request's block IDs. If True, new_block_ids will be used as the
# request's block IDs instead of appending to the existing block IDs.
resumed_from_preemption: list[bool]
# NOTE(woosuk): new_token_ids is only used for pipeline parallelism.
# When PP is not used, new_token_ids will be empty.
new_token_ids: list[list[int]]
new_block_ids: list[tuple[list[int], ...]]
num_computed_tokens: list[int]
其中:
- req_ids:请求ID列表。
- resumed_from_preemption:请求是否从抢占中恢复。
- new_token_ids:只在PP并行时使用。TODO:WHY?
- new_block_ids:新分配的块ID列表,如果resumed_from_preemption为false,则追加到请求现有块 ID 列表,如果resumed_from_preemption为True,则替换请求现有块 ID 列表。
- num_computed_tokens:每个请求已计算的 token 数量。
update_from_output
update_from_output方法使用1个Step模型执行的结果,来更新Scheduler的状态。
class SchedulerInterface(ABC):
@abstractmethod
def update_from_output(
self,
scheduler_output: "SchedulerOutput",
model_runner_output: "ModelRunnerOutput",
) -> dict[int, "EngineCoreOutputs"]:
一个输入是Step的调度结果:SchedulerOutput,具体含义参见上述内容。
一个输入是根据Step调度结果,模型执行的输出ModelRunnerOutput。
ModelRunnerOutput的定义:
@dataclass
class ModelRunnerOutput:
req_ids: list[str] # 请求ID列表
req_id_to_index: dict[str, int] # 请求ID在batch中的index字典
sampled_token_ids: list[list[int]] # 请求生成的token列表
spec_token_ids: Optional[list[list[int]]] # 请求生成的投机推理token列表
logprobs: Optional[LogprobsLists] # 请求生成的token列表的对数概率
prompt_logprobs_dict: dict[str, Optional[LogprobsTensors]] # 请求的对数概率张量
pooler_output: list[Optional[torch.Tensor]] # 请求的模型最后一层的隐藏状态
finished_sending: Optional[set[str]] = None # 请求的结果已经从工作进程发送到调度器进程
finished_recving: Optional[set[str]] = None # 请求的结果已经被调度器进程接收
num_nans_in_logits: Optional[dict[str, int]] = None # 请求的logits中的NaN(Not a Number)数量
LogprobsLists
class LogprobsLists(NamedTuple):
# [num_reqs, max_num_logprobs + 1]
logprob_token_ids: list[list[int]] # 请求生成的tokens
# [num_reqs, max_num_logprobs + 1]
logprobs: list[list[float]] # 请求生成的tokens对应的对数概率
# [num_reqs]
sampled_token_ranks: list[int] # 请求生成的第一个token在概率里的排名
add_request
add_request用于添加一个新的请求。
class SchedulerInterface(ABC):
@abstractmethod
def add_request(self, request: "Request") -> None:
raise NotImplementedError
创建一个新的Request:
class Request:
def __init__(
self,
request_id: str,
prompt_token_ids: list[int],
multi_modal_inputs: Optional[list[MultiModalKwargs]],
multi_modal_hashes: Optional[list[str]],
multi_modal_placeholders: Optional[list[PlaceholderRange]],
sampling_params: Optional[SamplingParams],
pooling_params: Optional[PoolingParams],
eos_token_id: Optional[int],
client_index: int = 0,
arrival_time: Optional[float] = None,
lora_request: Optional["LoRARequest"] = None,
structured_output_request: Optional["StructuredOutputRequest"] = None,
cache_salt: Optional[str] = None,
priority: int = 0,
) -> None:
...
其中大部分参数的定义同:NewRequestData,其余部分:
- client_index:用户ID
- cache_salt:Cache的盐值。提供了一个额外的唯一性标识符,用于确保缓存的唯一性。
- priority:请求的优先级。
finish_requests
finish_requests用于完成一个请求:
@abstractmethod
def finish_requests(
self,
request_ids: Union[str, Iterable[str]],
finished_status: "RequestStatus",
) -> None:
raise NotImplementedError
其中:
- request_ids:要结束的请求ID列表
- finished_status:请求结束的状态
RequestStatus
class RequestStatus(enum.IntEnum):
"""Status of a request."""
WAITING = enum.auto() # 请求正在等待处理
WAITING_FOR_FSM = enum.auto() # 请求正在等待FSM(Finite State Machine)的处理
WAITING_FOR_REMOTE_KVS = enum.auto() # 请求正在等待KVS(Remote Key-Value Store)的响应,在vLLM中,KVS用于跨节点存储和检索数据。
RUNNING = enum.auto() # 请求正在处理中
PREEMPTED = enum.auto() # 请求被抢占
FINISHED_STOPPED = enum.auto() # 请求完成并停止
FINISHED_LENGTH_CAPPED = enum.auto() # 请求完成但长度超限(还没遇到EOS但是超最大长度)
FINISHED_ABORTED = enum.auto() # 请求被终止,比如client终止
FINISHED_IGNORED = enum.auto() # 请求完成但被忽略,通常发生在请求无效、系统策略、错误处理或资源管理等场景中
make_stats
make_stats生成Scheduler统计数据,用于log等:
class SchedulerInterface(ABC):
@abstractmethod
def make_stats(self) -> Optional["SchedulerStats"]:
raise NotImplementedError
SchedulerStats的定义:
@dataclass
class SchedulerStats:
num_running_reqs: int = 0 # 运行中的请求
num_waiting_reqs: int = 0 # 等待调度的请求
kv_cache_usage: float = 0.0 # KV Cache利用率
prefix_cache_stats: PrefixCacheStats = field(
default_factory=PrefixCacheStats) # Prefix Cache统计
spec_decoding_stats: Optional[SpecDecodingStats] = None # 投机推理统计
num_corrupted_reqs: int = 0 # 损坏的请求数(结果中有NAN)
@dataclass
class PrefixCacheStats:
reset: bool = False # 在Cache创建后,是否有reset_prefix_cache调用(如RLHF导致权重更新等)
requests: int = 0 # Cache请求次数
queries: int = 0 # Cache查询的token数
hits: int = 0 # Cache查询命中的token数
@dataclass
class SpecDecodingStats:
num_spec_tokens: int # Scheduler创建时的配置参数:投机推理的token数
num_drafts: int = 0 # 投机推理的总次数
num_draft_tokens: int = 0 # 投机推理产生的token数
num_accepted_tokens: int = 0 # 被接受的投机推理的token数
num_accepted_tokens_per_pos: list[int] = field(default_factory=list) # 投机推理的token每个位置的接受次数
request status
SchedulerInterface定义了一系列方法用于获取request status:
class SchedulerInterface(ABC):
@abstractmethod
def get_num_unfinished_requests(self) -> int:
raise NotImplementedError
def has_unfinished_requests(self) -> bool:
return self.get_num_unfinished_requests() > 0
@abstractmethod
def has_finished_requests(self) -> bool:
raise NotImplementedError
def has_requests(self) -> bool:
return self.has_unfinished_requests() or self.has_finished_requests()
@abstractmethod
def get_request_counts(self) -> tuple[int, int]:
raise NotImplementedError
reset_prefix_cache
reset_prefix_cache用于weights在线更新后,重新生成Prefix Cache
@abstractmethod
def reset_prefix_cache(self) -> bool:
raise NotImplementedError
shutdown
停止调度器:
class SchedulerInterface(ABC):
@abstractmethod
def shutdown(self) -> None:
"""Shutdown the scheduler."""
raise NotImplementedError
get_kv_connector
get_kv_connector用于获取KV Cache通信的Connector。
class SchedulerInterface(ABC):
def get_kv_connector(self) -> Optional["KVConnectorBase_V1"]:
return None