Add: Exter Chapter LLM-generation-method

This commit is contained in:
KMnO4-zx
2025-10-17 17:11:05 +08:00
parent b9172031c8
commit 72b41341e1
5 changed files with 897 additions and 1 deletions

View File

@@ -0,0 +1,152 @@
import torch
from modelscope import AutoModelForCausalLM, AutoTokenizer
def test_decoding_strategies():
"""
测试三种解码策略:贪婪解码、随机采样、束搜索
"""
model_id = "../model/kmno4zx/happy-llm-215M-sft/"
print("正在加载模型和tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_id, trust_remote_code=True, device_map="cpu").eval()
# 测试prompt
test_prompt = "请介绍一下自己"
messages = [
{"role": "system", "content": "你是一个AI助手"},
{"role": "user", "content": test_prompt}
]
# 准备输入
input_ids = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
input_ids = tokenizer(input_ids).data['input_ids']
x = (torch.tensor(input_ids, dtype=torch.long)[None, ...]).to(model.device)
print(f"测试prompt: {test_prompt}")
print(f"输入token数量: {len(input_ids)}")
print("=" * 60)
# 测试1: 贪婪解码 (Greedy Search)
print("🔍 测试1: 贪婪解码 (Greedy Search)")
print("参数: do_sample=False, num_beams=1, temperature=0.0")
print("特点: 每步选择概率最大的token结果确定速度快")
with torch.no_grad():
greedy_output = model.generate_super(
x,
stop_id=tokenizer.eos_token_id,
max_new_tokens=50,
temperature=0.0,
do_sample=False,
num_beams=1
)
greedy_response = tokenizer.decode(greedy_output[0].tolist(), skip_special_tokens=True)
print(f"贪婪解码结果: {greedy_response}")
print()
# 测试2: 随机采样 (Random Sampling)
print("🎲 测试2: 随机采样 (Random Sampling)")
print("参数: do_sample=True, num_beams=1, temperature=0.8, top_k=50")
print("特点: 基于概率分布随机采样,结果多样,创造性高")
with torch.no_grad():
# 运行多次以展示随机性
for i in range(3):
sampling_output = model.generate_super(
x,
stop_id=tokenizer.eos_token_id,
max_new_tokens=50,
temperature=0.8,
top_k=50,
do_sample=True,
num_beams=1
)
sampling_response = tokenizer.decode(sampling_output[0].tolist(), skip_special_tokens=True)
print(f"随机采样结果 {i+1}: {sampling_response}")
print()
# 测试3: 束搜索 (Beam Search)
print("🔦 测试3: 束搜索 (Beam Search)")
print("参数: do_sample=False, num_beams=3, temperature=1.0")
print("特点: 维护多条候选路径,选择总概率最高的序列,质量更高")
with torch.no_grad():
beam_output = model.generate_super(
x,
stop_id=tokenizer.eos_token_id,
max_new_tokens=50,
temperature=1.0,
do_sample=False,
num_beams=3
)
beam_response = tokenizer.decode(beam_output[0].tolist(), skip_special_tokens=True)
print(f"束搜索结果: {beam_response}")
print()
# 测试4: 不同的温度参数对随机采样的影响
print("🌡️ 测试4: 不同温度参数对随机采样的影响")
print("参数: do_sample=True, num_beams=1, 测试不同temperature值")
temperatures = [0.2, 0.8, 1.5]
for temp in temperatures:
with torch.no_grad():
temp_output = model.generate_super(
x,
stop_id=tokenizer.eos_token_id,
max_new_tokens=30,
temperature=temp,
do_sample=True,
num_beams=1
)
temp_response = tokenizer.decode(temp_output[0].tolist(), skip_special_tokens=True)
print(f"温度 {temp}: {temp_response}")
print()
print("=" * 60)
print("✅ 三种解码策略测试完成!")
print()
print("📊 总结对比:")
print("• 贪婪解码: 速度快,结果确定,适合确定性任务")
print("• 随机采样: 创造性强,结果多样,适合创意生成")
print("• 束搜索: 质量较高,平衡速度和质量,适合一般对话")
def test_original_generation():
"""
原始的生成代码作为对比
"""
model_id = "../model/kmno4zx/happy-llm-215M-sft/"
print("运行原始生成代码...")
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_id, trust_remote_code=True, device_map="cpu").eval()
messages = [
{"role": "system", "content": "你是一个AI助手"},
{"role": "user", "content": "你好,请介绍一下自己。"}
]
input_ids = tokenizer.apply_chat_template(messages,tokenize=False,add_generation_prompt=True)
input_ids = tokenizer(input_ids).data['input_ids']
x = (torch.tensor(input_ids, dtype=torch.long)[None, ...]).to(model.device)
with torch.no_grad():
y = model.generate_super(x, stop_id=tokenizer.eos_token_id, max_new_tokens=512, temperature=0.6)
response = tokenizer.decode(y[0].tolist(), skip_special_tokens=True)
print(f"Assistant: {response}")
if __name__ == "__main__":
print("开始测试三种解码策略...")
print()
try:
test_decoding_strategies()
except Exception as e:
print(f"测试过程中出现错误: {e}")
print("运行原始生成代码...")
test_original_generation()

View File

@@ -0,0 +1,3 @@
from modelscope import snapshot_download
model_dir = snapshot_download('kmno4zx/happy-llm-215M-sft', cache_dir='your/cache/dir', revision='master')

View File

@@ -0,0 +1,511 @@
# 大模型生成Token的方式
> 代码已更新到 Happy-LLM 仓库第五章的代码中。
## 贪婪解码Greedy Decoding
### 原理说明
贪婪解码是最简单直接的文本生成策略。在每一步生成时它总是选择概率最大的那个token作为下一个token然后继续生成直到遇到停止条件或达到最大长度。
**核心思想**:局部最优选择 → 希望全局最优
**数学表达**
```
token_t = argmax P(token_t | token_1, token_2, ..., token_{t-1})
```
### 代码实现
基于我们实现的 `_greedy_decode` 方法:
```python
def _greedy_decode(self, logits: torch.Tensor) -> torch.Tensor:
"""
贪婪解码选择概率最大的token
Args:
logits: 模型输出的logits形状为 (batch_size, vocab_size)
Returns:
选择的token索引形状为 (batch_size, 1)
"""
_, idx_next = torch.topk(logits, k=1, dim=-1)
return idx_next
```
**关键步骤解析**
1. `torch.topk(logits, k=1, dim=-1)`找到logits中最大值的位置
2. 返回最大概率token的索引
3. 该token被添加到序列中继续下一轮生成
### 使用示例
```python
# 在 generate_super 函数中调用贪婪解码
output = model.generate_super(
input_ids,
do_sample=False, # 不使用采样
num_beams=1, # 不使用束搜索
temperature=0.0, # 温度为0确保确定性
max_new_tokens=100
)
```
### 优缺点分析
**优点**
-**速度快**每步只需要一次前向传播和简单的argmax操作
-**结果确定**:相同的输入总是产生相同的输出
-**内存效率高**:不需要维护多个候选序列
-**实现简单**:算法逻辑直观易懂
**缺点**
-**容易陷入局部最优**:每步的局部最优不一定等于全局最优
-**缺乏多样性**:总是产生相同的序列,缺乏创造性
-**可能产生重复内容**:容易陷入重复循环
-**忽略长程依赖**:不考虑序列的整体连贯性
### 典型例子
假设模型生成了以下概率分布:
```
输入: "今天天气"
下一token概率:
- "很" (0.4)
- "不错" (0.3)
- "真好" (0.2)
- "不太好" (0.1)
```
贪婪解码会选择"很",生成"今天天气很",然后继续这个过程。
### 使用场景
- **确定性任务**:如数学计算、代码生成
- **需要一致性的应用**如API服务、自动化脚本
- **计算资源受限的环境**:需要快速生成结果
- **基准测试**:作为其他算法的对比基准
## 采样解码Sampling Decoding
### 原理说明
采样解码不是选择概率最大的token而是基于模型的概率分布进行随机采样。这样可以在每次生成时产生不同的结果增加文本的多样性和创造性。
**核心思想**:基于概率分布随机选择 → 增加多样性
**数学表达**
```
token_t ~ P(token_t | token_1, token_2, ..., token_{t-1})
```
### 关键参数
#### 1. Temperature温度
- **作用**:控制概率分布的平滑程度
- **原理**将logits除以temperature然后进行softmax
- **效果**
- `temperature > 1`:分布更平滑,增加随机性
- `temperature < 1`:分布更尖锐,更接近贪婪解码
- `temperature → 0`:等价于贪婪解码
#### 2. Top-k Sampling
- **作用**限制候选token的范围
- **原理**只考虑概率最高的k个token其他token概率设为0
- **效果**:避免选择概率很低的"奇怪"token提高质量
### 代码实现
基于我们实现的 `_random_sample` 方法:
```python
def _random_sample(self, logits: torch.Tensor, temperature: float = 1.0, top_k: int = None) -> torch.Tensor:
"""
随机采样基于概率分布随机选择token
Args:
logits: 模型输出的logits形状为 (batch_size, vocab_size)
temperature: 温度参数,控制随机性
top_k: 只考虑概率最高的k个token
Returns:
选择的token索引形状为 (batch_size, 1)
"""
# 1. 温度缩放
logits = logits / temperature
# 2. Top-k过滤
if top_k is not None:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits[logits < v[:, [-1]]] = -float('Inf')
# 3. 计算概率并采样
probs = F.softmax(logits, dim=-1)
idx_next = torch.multinomial(probs, num_samples=1)
return idx_next
```
**关键步骤解析**
1. **温度缩放**:调整概率分布的平滑程度
2. **Top-k过滤**:移除低概率候选,提高质量
3. **概率归一化**使用softmax得到概率分布
4. **随机采样**根据概率分布随机选择token
### 使用示例
```python
# 基本采样
output = model.generate_super(
input_ids,
do_sample=True, # 启用采样
num_beams=1, # 不使用束搜索
temperature=0.8, # 中等温度
max_new_tokens=100
)
# 带top-k的采样
output = model.generate_super(
input_ids,
do_sample=True,
num_beams=1,
temperature=1.0, # 较高温度增加随机性
top_k=50, # 只考虑前50个候选
max_new_tokens=100
)
```
### 温度参数详解
**不同温度的效果对比**
```python
# 示例概率分布
original_probs = [0.6, 0.2, 0.1, 0.05, 0.05]
# Temperature = 0.1 (低温度,接近贪婪)
scaled_probs = [0.85, 0.08, 0.04, 0.015, 0.015]
# 结果很可能选择第一个token
# Temperature = 1.0 (标准温度)
scaled_probs = [0.6, 0.2, 0.1, 0.05, 0.05]
# 结果:按原始概率采样
# Temperature = 2.0 (高温度,增加随机性)
scaled_probs = [0.35, 0.25, 0.18, 0.11, 0.11]
# 结果各个token都有机会被选中
```
### Top-k机制详解
**Top-k过滤过程**
```python
# 假设词汇表大小为1000top_k=50
logits = [0.1, 2.3, 0.5, 1.8, 0.3, 3.2, 0.9, 0.2, 1.5, 0.7, ...] # 1000个值
# 步骤1找到前50个最大值
v, _ = torch.topk(logits, 50)
threshold = v[-1] # 第50大的值
# 步骤2过滤
logits[logits < threshold] = -float('Inf')
# 结果只有50个token有非零概率其他950个token概率为0
```
### 优缺点分析
**优点**
-**多样性好**:每次生成可能产生不同的结果
-**创造性高**:能产生意想不到的内容
-**避免重复**:不容易陷入重复循环
-**可调性强**:通过参数控制随机程度
**缺点**
-**结果不确定**:相同输入可能产生不同输出
-**质量不稳定**:可能产生低质量或不连贯的内容
-**需要调参**temperature和top_k需要仔细调节
-**计算开销**:需要计算完整的概率分布
### 使用场景
- **创意写作**:故事生成、诗歌创作
- **对话系统**:让对话更加自然和有趣
- **数据增强**:生成多样化的训练数据
- **探索性任务**:需要探索多种可能性的场景
## 束搜索Beam Search
### 原理说明
束搜索是一种启发式搜索算法,它在每一步生成时保留多个候选序列(束),而不是只选择一个最佳序列。通过维护多条路径,它能够在计算效率和生成质量之间取得平衡。
**核心思想**:维护多条候选路径 → 选择累积概率最高的序列
**算法流程**
1. **初始化**:从输入序列开始
2. **扩展**:为每个候选序列生成多个扩展
3. **评分**:计算每个新序列的累积概率
4. **筛选**保留分数最高的N个候选
5. **重复**:继续扩展直到结束条件
### 关键概念
#### 束宽度Beam Width
- **定义**:每步保留的候选序列数量
- **权衡**
- 宽度=1等价于贪婪解码
- 宽度越大:搜索空间越大,质量越高,但计算成本也越大
#### 累积概率
- **计算方式**:序列概率 = 各个token概率的乘积
- **数值稳定性**:通常使用对数概率求和
- **公式**`log P(sequence) = Σ log P(token_i | context)`
### 代码实现
基于我们实现的 `_beam_search` 方法:
```python
def _beam_search(self, idx: torch.Tensor, max_new_tokens: int, num_beams: int,
temperature: float = 1.0, top_k: int = None, stop_id: int = None) -> torch.Tensor:
"""
束搜索:维护多个候选序列,选择最优路径
Args:
idx: 输入序列,形状为 (batch_size, seq_len)
max_new_tokens: 最大生成token数量
num_beams: 束宽度,表示保留的候选路径数量
temperature: 温度参数,控制分布的平滑程度
top_k: top-k过滤参数限制候选token范围
stop_id: 停止生成的token ID遇到则停止
Returns:
生成的token序列形状为 (batch_size, generated_length)
"""
# 1. 初始化束
beams = [idx.clone() for _ in range(num_beams)]
beam_scores = torch.zeros(num_beams, device=idx.device)
beam_scores[0] = 0.0 # 第一个候选是原始序列
beam_scores[1:] = float('-inf') # 其他候选初始分数为负无穷
# 2. 主循环逐步生成token
for step in range(max_new_tokens):
new_beams = []
new_scores = []
# 3. 扩展每个候选序列
for beam_idx, beam in enumerate(beams):
if beam_scores[beam_idx] == float('-inf'):
continue # 跳过无效候选
# 前向传播获取logits
output = self(beam)
logits = output.logits[:, -1, :]
# 应用温度和top-k
if temperature != 1.0:
logits = logits / temperature
if top_k is not None:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits[logits < v[:, [-1]]] = -float('Inf')
# 计算对数概率
log_probs = F.log_softmax(logits, dim=-1)
# 获取前num_beams个候选token
top_log_probs, top_indices = torch.topk(log_probs, k=num_beams, dim=-1)
# 4. 为当前候选生成多个扩展
for k in range(num_beams):
token = top_indices[:, k:k+1]
log_prob = top_log_probs[:, k]
new_beam = torch.cat([beam, token], dim=1)
new_score = beam_scores[beam_idx] + log_prob.item()
new_beams.append(new_beam)
new_scores.append(new_score)
# 5. 筛选最佳候选
if not new_beams:
break
# 按分数排序选择前num_beams个
sorted_indices = sorted(range(len(new_scores)), key=lambda i: new_scores[i], reverse=True)
beams = [new_beams[i] for i in sorted_indices[:num_beams]]
beam_scores = [new_scores[i] for i in sorted_indices[:num_beams]]
# 检查停止条件
if stop_id is not None and beams[0][0, -1] == stop_id:
break
# 6. 返回最佳序列
return beams[0][:, idx.shape[1]:] # 只返回生成部分
```
### 束搜索过程示例
假设束宽度=3输入="今天天气"
**第1步扩展**
```
候选1: "今天天气很好" (分数: 0.4)
候选2: "今天天气不错" (分数: 0.3)
候选3: "今天天气真好" (分数: 0.2)
```
**第2步扩展**每个候选再扩展3个
```
候选1.1: "今天天气很好啊" (分数: 0.4 + 0.1 = 0.5)
候选1.2: "今天天气很好。" (分数: 0.4 + 0.2 = 0.6) ← 保留
候选1.3: "今天天气很好," (分数: 0.4 + 0.05 = 0.45)
候选2.1: "今天天气不错啊" (分数: 0.3 + 0.15 = 0.45)
候选2.2: "今天天气不错。" (分数: 0.3 + 0.1 = 0.4) ← 保留
候选2.3: "今天天气不错," (分数: 0.3 + 0.08 = 0.38)
候选3.1: "今天天气真好啊" (分数: 0.2 + 0.12 = 0.32)
候选3.2: "今天天气真好。" (分数: 0.2 + 0.25 = 0.45) ← 保留
候选3.3: "今天天气真好," (分数: 0.2 + 0.1 = 0.3)
```
**筛选结果**保留分数最高的3个
```
最佳候选: "今天天气很好。" (分数: 0.6)
次佳候选: "今天天气不错。" (分数: 0.4)
第三候选: "今天天气真好。" (分数: 0.45)
```
### 使用示例
```python
# 基本束搜索
output = model.generate_super(
input_ids,
do_sample=False, # 不使用采样
num_beams=3, # 束宽度为3
temperature=1.0, # 标准温度
max_new_tokens=100
)
# 带top-k的束搜索
output = model.generate_super(
input_ids,
do_sample=False,
num_beams=5, # 更大的束宽度
temperature=0.8, # 稍微降低温度
top_k=50, # 限制候选范围
max_new_tokens=100
)
```
### 优缺点分析
**优点**
-**质量较高**:比贪婪解码质量更好
-**确定性**:结果相对稳定(相同输入产生相同输出)
-**平衡性好**:在质量和效率之间取得平衡
-**避免明显错误**不容易选择明显不合适的token
**缺点**
-**计算开销大**:需要维护多个候选序列
-**内存占用高**:存储多个候选序列和分数
-**仍可能局部最优**:虽然比贪婪好,但仍可能错过全局最优
-**多样性有限**:仍然偏向高概率路径,创造性不如采样
### 束宽度选择建议
| 束宽度 | 适用场景 | 优点 | 缺点 |
|--------|----------|------|------|
| 1-2 | 实时应用、计算资源有限 | 速度快、资源占用少 | 质量相对较低 |
| 3-5 | 一般对话、文本生成 | 质量较好、速度适中 | 资源占用中等 |
| 6-10 | 高质量生成、翻译 | 质量很高 | 计算开销大 |
| 10+ | 专业应用、研究 | 最高质量 | 开销很大 |
### 使用场景
- **机器翻译**:需要准确性和流畅性的平衡
- **文本摘要**:生成连贯的摘要内容
- **对话系统**:生成有逻辑的回复
- **代码生成**:需要语法正确和逻辑合理
- **长文本生成**:如文章写作、报告生成
## 辅助模型投机解码Assisted Decoding
### 原理说明
投机解码是一种**用小模型加速大模型推理**的技术。它通过"草稿-验证"的方式让小先生成候选token然后大家模型快速验证减少大模型的前向传播次数。
**核心思想**:小模型投机生成 → 大模型批量验证 → 减少大模型计算负担
### 工作流程
#### 1. 草稿生成阶段
```
输入: "今天天气"
小模型快速生成草稿: "今天天气很好,适合出门散步"
```
#### 2. 验证阶段
大模型一次性验证整个草稿序列:
- ✅ 接受的token"今天天气很好,"
- ❌ 拒绝的token从"适合"开始拒绝
- 🔧 大模型重新生成:"适合在家休息"
#### 3. 最终结果
```
输出: "今天天气很好,适合在家休息"
```
### 关键优势
**速度提升**
- 小模型推理快 → 生成多个候选token
- 大模型批量验证 → 一次处理多个token
- 减少90%+的大模型前向传播
**质量保证**
- 大模型有最终否决权
- 只有大模型认可的token才会被保留
- 不会降低生成质量
### 具体例子对比
**传统方式**(大模型逐个生成):
```
第1步: 大模型 → "今天"
第2步: 大模型 → "今天天气"
第3步: 大模型 → "今天天气很"
第4步: 大模型 → "今天天气很好"
第5步: 大模型 → "今天天气很好,"
第6步: 大模型 → "今天天气很好,适合"
... (每步都需要大模型前向传播)
```
**投机解码**
```
第1步: 小模型快速草稿 → "今天天气很好,适合出门散步"
第2步: 大模型批量验证 → 接受"今天天气很好,",拒绝"适合出门散步"
第3步: 大模型重新生成 → "适合在家休息"
```
这样原本需要6次大模型推理的过程现在只需要2次
### 技术实现要点
#### 1. 草稿长度控制
- **草稿不宜过长**通常2-10个token
- **接受率平衡**:太长接受率低,太短加速效果不明显
- **动态调整**:根据接受率调整草稿长度
#### 2. 验证机制
```python
# 伪代码
def assisted_decoding(input_ids, assistant_model, main_model):
# 小模型生成草稿
draft_tokens = assistant_model.generate_draft(input_ids, max_draft_len=5)
# 大模型验证
accepted_count = main_model.verify_draft(input_ids, draft_tokens)
# 构建最终结果
if accepted_count == len(draft_tokens):
return draft_tokens # 全部接受
else:
# 部分接受,大模型重新生成剩余部分
accepted_part = draft_tokens[:accepted_count]
remaining_part = main_model.generate_remaining(input_ids + accepted_part)
return accepted_part + remaining_part
```
### 总结
投机解码本质上是用**计算资源换时间**,通过小模型的"投机"来减少大模型的计算负担。它是一种聪明的工程优化,在不牺牲质量的前提下显著提升推理速度。

View File

@@ -69,11 +69,13 @@
- [Qwen3-"VL"——超小中文多模态模型的“拼接微调”之路](./Extra-Chapter/vlm-concatenation-finetune/README.md) @[ShaohonChen](https://github.com/ShaohonChen) 2025-7-30
- [S1: Thinking Budget with vLLM](./Extra-Chapter/s1-vllm-thinking-budget/readme.md) @[kmno4-zx](https://github.com/kmno4-zx) 2025-8-03
- [S1: Thinking Budget with vLLM](./Extra-Chapter/s1-vllm-thinking-budget/readme.md) @[不要葱姜蒜](https://github.com/kmno4-zx) 2025-8-03
- [CDDRS: 使用细粒度语义信息指导增强的RAG检索方法](./Extra-Chapter/CDDRS/readme.md) @[Hongru0306](https://github.com/Hongru0306) 2025-8-21
- [大模型生成 Token 的方式有哪些?](./Extra-Chapter/generation-method/readme.md) @[不要葱姜蒜](https://github.com/kmno4-zx) 2025-10-17
> &emsp;&emsp;*如果大家在学习 Happy-LLM 项目或 LLM 相关知识中有自己独到的见解、认知、实践,欢迎大家 PR 在 [Extra Chapter LLM Blog](./Extra-Chapter/) 中。请遵守 Extra Chapter LLM Blog 的 [PR 规范](./Extra-Chapter/Readme.md),我们会视 PR 内容的质量和价值来决定是否合并或补充到 Happy-LLM 正文中来。*

View File

@@ -415,6 +415,234 @@ class Transformer(PreTrainedModel):
idx = torch.cat((idx, idx_next), dim=1)
return idx[:, index:] # 只返回生成的token
def _greedy_decode(self, logits: torch.Tensor) -> torch.Tensor:
"""
贪婪解码选择概率最大的token
Args:
logits: 模型输出的logits形状为 (batch_size, vocab_size)
Returns:
选择的token索引形状为 (batch_size, 1)
"""
_, idx_next = torch.topk(logits, k=1, dim=-1)
return idx_next
def _random_sample(self, logits: torch.Tensor, temperature: float = 1.0, top_k: int = None) -> torch.Tensor:
"""
随机采样基于概率分布随机选择token
Args:
logits: 模型输出的logits形状为 (batch_size, vocab_size)
temperature: 温度参数,控制随机性
top_k: 只考虑概率最高的k个token
Returns:
选择的token索引形状为 (batch_size, 1)
"""
# 缩放 logits
logits = logits / temperature
# 应用top-k过滤
if top_k is not None:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
# 将不在 top-k 内的 logits 设为负无穷
logits[logits < v[:, [-1]]] = -float('Inf')
# 计算概率并采样
probs = F.softmax(logits, dim=-1)
idx_next = torch.multinomial(probs, num_samples=1)
return idx_next
def _beam_search(self, idx: torch.Tensor, max_new_tokens: int, num_beams: int,
temperature: float = 1.0, top_k: int = None, stop_id: int = None) -> torch.Tensor:
"""
束搜索:维护多个候选序列,选择最优路径
束搜索的核心思想在每一步生成时不是只选择一个最佳token
而是保留多个候选路径,最终选择累积概率最高的完整序列。
Args:
idx: 输入序列,形状为 (batch_size, seq_len)
max_new_tokens: 最大生成token数量
num_beams: 束宽度,表示保留的候选路径数量
temperature: 温度参数,控制分布的平滑程度
top_k: top-k过滤参数限制候选token范围
stop_id: 停止生成的token ID遇到则停止
Returns:
生成的token序列形状为 (batch_size, generated_length)
只返回新生成的部分,不包含原始输入序列
"""
# 获取输入序列的基本信息
batch_size = idx.shape[0] # 批次大小通常为1
seq_len = idx.shape[1] # 输入序列长度
# 初始化束:创建 num_beams 个候选序列
beams = [idx.clone() for _ in range(num_beams)]
# 初始化每个候选序列的累积对数概率分数
beam_scores = torch.zeros(num_beams, device=idx.device)
# 第一个候选是原始输入序列分数为0
beam_scores[0] = 0.0
# 其他候选初始分数设为负无穷,表示尚未生成
beam_scores[1:] = float('-inf')
# 主循环逐步生成新的token最多生成 max_new_tokens 个
for step in range(max_new_tokens):
# 每轮迭代收集新的候选序列和分数
new_beams = [] # 新的候选序列列表
new_scores = [] # 对应的分数列表
# 遍历当前的所有候选序列
for beam_idx, beam in enumerate(beams):
# 跳过无效候选(分数为负无穷的序列)
if beam_scores[beam_idx] == float('-inf'):
continue
# 序列长度检查:如果超过最大长度,截取最后的部分
beam_cond = beam if beam.size(1) <= self.args.max_seq_len else beam[:, -self.args.max_seq_len:]
# 前向传播:获取模型对当前序列的预测
output = self(beam_cond)
# 提取最后一个位置的logits用于预测下一个token
logits = output.logits[:, -1, :] # 形状: (1, vocab_size)
# 温度缩放调整logits的分布
if temperature != 1.0:
logits = logits / temperature
# 温度 > 1分布更平滑增加随机性
# 温度 < 1分布更尖锐更确定
# Top-k过滤限制候选token的范围提高质量
if top_k is not None:
# 找到logits中前top_k个最大的值
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
# 将不在前top_k内的logits设为负无穷
logits[logits < v[:, [-1]]] = -float('Inf')
# 这样采样时只会考虑前top_k个token
# 计算对数概率使用log_softmax避免数值不稳定
log_probs = F.log_softmax(logits, dim=-1)
# 获取前 num_beams 个最可能的候选token
# 注意这里的top-k与上面的top-k不同
# 上面的top-k是全局过滤这里是束搜索的分支选择
top_log_probs, top_indices = torch.topk(log_probs, k=num_beams, dim=-1)
# 为当前候选序列生成 num_beams 个扩展序列
for k in range(num_beams):
# 选择第k个候选token
token = top_indices[:, k:k+1] # token ID
log_prob = top_log_probs[:, k] # 对应的对数概率
# 扩展序列将新token添加到当前序列末尾
new_beam = torch.cat([beam, token], dim=1)
# 更新累积分数:原序列分数 + 新token的对数概率
new_score = beam_scores[beam_idx] + log_prob.item()
# 保存新的候选序列和分数
new_beams.append(new_beam)
new_scores.append(new_score)
# 安全检查:如果没有生成任何有效候选,提前结束
if not new_beams:
break
# 筛选最佳候选:从所有新生成的候选中选择分数最高的 num_beams 个
# 按分数降序排序,获取索引
sorted_indices = sorted(range(len(new_scores)), key=lambda i: new_scores[i], reverse=True)
# 选择前 num_beams 个最佳候选
beams = [new_beams[i] for i in sorted_indices[:num_beams]]
beam_scores = [new_scores[i] for i in sorted_indices[:num_beams]]
# 停止条件检查检查最佳序列是否以停止token结尾
if stop_id is not None and beams[0][0, -1] == stop_id:
break
# 返回得分最高的序列,只返回新生成的部分(去掉原始输入)
# beams[0] 是最终得分最高的完整序列
# [:, seq_len:] 切片只保留生成部分
return beams[0][:, seq_len:]
@torch.inference_mode()
def generate_super(self,
idx,
stop_id=None,
max_new_tokens=256,
temperature=1.0,
top_k=None,
do_sample=False,
num_beams=1
):
"""
高级文本生成函数,支持三种解码策略:
1. 贪婪解码Greedy Search
- 参数do_sample=False, num_beams=1
- 特点每步选择概率最大的token速度快、结果确定
2. 随机采样Random Sampling
- 参数do_sample=True, num_beams=1
- 特点基于概率分布随机采样可配合temperature和top-k控制多样性
3. 束搜索Beam Search
- 参数do_sample=False, num_beams>1
- 特点:维护多条候选路径,选择总概率最高的序列,质量更高但速度较慢
Args:
idx: 输入序列张量,形状为 (batch_size, seq_len)
stop_id: 停止生成的token ID
max_new_tokens: 最大生成token数量
temperature: 温度参数,控制随机性,越高越随机
top_k: 只考虑概率最高的k个tokenNone表示不考虑
do_sample: 是否使用随机采样False时使用确定性解码
num_beams: 束搜索的束宽度1表示不使用束搜索
Returns:
生成的token序列形状为 (batch_size, generated_length)
"""
# 参数验证
if temperature <= 0:
temperature = 0.001 # 避免除零错误
if num_beams < 1:
num_beams = 1
if top_k is not None and top_k < 1:
top_k = None
# 束搜索逻辑
if not do_sample and num_beams > 1:
return self._beam_search(idx, max_new_tokens, num_beams, temperature, top_k, stop_id)
# 贪婪解码和随机采样逻辑
index = idx.shape[1]
for _ in range(max_new_tokens):
# 如果序列上下文过长,截断它到最大长度
idx_cond = idx if idx.size(1) <= self.args.max_seq_len else idx[:, -self.args.max_seq_len:]
# 前向传播获取序列中最后一个位置的 logits
logits = self(idx_cond).logits
logits = logits[:, -1, :] # 只保留最后一个时间步的输出
# 根据参数选择解码策略
if do_sample:
idx_next = self._random_sample(logits, temperature, top_k)
else:
# 当temperature=0时使用贪婪解码
if temperature < 0.1:
idx_next = self._greedy_decode(logits)
else:
# 低温度下的随机采样(接近贪婪)
idx_next = self._random_sample(logits, temperature, top_k)
# 检查停止条件
if stop_id is not None and idx_next[0, 0] == stop_id:
break
# 将选择的token添加到序列中
idx = torch.cat((idx, idx_next), dim=1)
return idx[:, index:] # 只返回生成的token
if __name__ == '__main__':
tokenizer = AutoTokenizer.from_pretrained("tokenizer_k")