Notes On Reward Model

Key Insights On Reward Model

1.RM 样本构造:pairwise sample or pointwise sample
2.RM 样本构造: 构造差距更小的样本对 v.s 构造差距更大的样本对
3.Reward Score Ensemble: 多个 RM score 如何融合?

RM 样本构造:pairwise sample or pointwise sample

问题设定:对于一个 prompt, sample 出来有 n 个 response, 可以构造 $c_{n}^2$ 这么多个样本, 每个样本生成 10/20 个 response,就会产生 45/190 个样本,假如我们有 20w prompt,样本数是 900w/3800w

构造 RM 是优先构造偏序关系还是优先构造分数, 是一个 “先有鸡还是先有蛋的问题”
当我们想办法先构造出来分数的时候,就自然出来了偏序关系, 进而可以利用 pairwise 损失函数训练 RM
当我们想办法先构造出来偏序关系的时候,就能通过简单的映射关系构建分数, 进而利用分数训练 pointwise RM/GenRM

RM 样本构造: pairwise sample

pairwise RM 的损失函数为:

构造思路1: pairwise direct ranking
直接用 LLM-as-a-Judge 的方法去对比两个回复谁更优质, 和 GenRM 的思想是一致的:
| symbol | probability |
| —- | —- |
| [A>B] | 0.8 |
| [B<A] | 0.2 |

另外在对比的过程中可以额外考虑打分的分布进行平滑使得打分更稳定和准确, 参考 Improving LLM-as-a-Judge Inference with the Judgment Distribution,用 LLM-as-a-judge 做一对输入偏序分类,然后用概率分布来打一个排序分

logits = model(prompt_tokens)
symbol probability
[[>>]] 0.4
[[>]] 0.2
[[=]] 0.1
[[<]] 0.2
[[<<]] 0.1

构造思路2: 先打 pointwise 分数, 然后转 pair
用一个强 LLM /人标数据对所有的 10 个回答进行排序或者评分, O(n) 复杂度计算量; 但这种 pointwise 有不太稳定/存在不可比的问题; 最简单的方法是转 pairwise scoring 在一定程度上提高可比性

大量的情况下,pairwise sample 如何解决样本数量爆炸问题 ?
1.少抽样
(i). Top-k 硬抽样 (通过采样直接砍掉大多数 pair)
例如: 10 个回答只选前 4 个或者后 4 个组 pair; 或者每个 prompt 限制一个上限:最多我就保留 3 个 pair, 不能再多
(ii). Hard Negative 动态采样: 首先需要有一个 pointwise 模型对所有样本打分数, 然后选择 |r(A) - r(B)| 最大的 pair, 认为这部分样本最重要

RM 样本构造: pointwise sample

构造思路1: pointwise scoring/pairwise scoring
采用 LLM-as-a-judge 方法下,pairwise scoring 每次输入两个样本,保证两个样本是可比的,相对 pointwise scoring 会更稳定一些

构造思路2: 借助偏序关系构造 pointwise score, Pairwise 转 Pointwise 的构造方法:
假设我们已经有偏序关系如下,已有排序关系则我们对任意 response i 有了个真实排序 rank(i), 我们构造一个以 rank(i) 为自变量的函数,利用这个函数可以带入 rank(i) 后直接计算出来一个分数

可以用两类映射公式,直接构造 pointwise 分数, 分别是线性映射和 softmax 平滑, 其中线性映射为如下公式:

{
  "A": 1.0,
  "B": 0.67,
  "C": 0.33,
  "D": 0.0
}

softmax 映射为如下公式

回答 Rank $e^{-λ·rank}$ Score (归一化后)
A 1 0.45 0.45 / 1.23 = 0.37
B 2 0.20 0.20 / 1.23 = 0.16
C 3 0.09 0.09 / 1.23 = 0.07
D 4 0.04 0.04 / 1.23 = 0.03

如下给出一个代码示例, 如何串联起来 topK 采样 + ranking 映射 pointwise score

import random
import itertools
import numpy as np
import json

# === 配置参数 ===
MAX_PAIRS_PER_PROMPT = 3         # 每个 prompt 抽 3 对 pair
TOP_K = 5                        # 每个 prompt 最多考虑前 5 个回答(减少组合)
RANK_TO_SCORE_METHOD = "linear"  # 可选: "linear" / "softmax"
LAMBDA = 0.8                     # softmax 陡峭度参数

# === 示例输入数据 ===
dataset = [
    {
        "prompt": "请解释量子纠缠。",
        "responses": [
            "量子纠缠是量子系统中粒子之间的一种特殊相关性。",
            "量子纠缠指两个粒子的状态相互依赖,即使分开也会同时改变。",
            "量子纠缠意味着两个粒子会瞬间通信。",
            "量子纠缠是一种宏观现象。",
            "量子纠缠可用于量子通信和量子计算。"
        ]
    },
    {
        "prompt": "什么是黑洞?",
        "responses": [
            "黑洞是质量极大的天体,其引力强到连光都无法逃逸。",
            "黑洞是一种能量体,会吸收周围的能量。",
            "黑洞是一种恒星的坍缩结果。",
            "黑洞是一种暗物质。",
        ]
    }
]

# === Step 1: 抽样部分 pair(避免组合爆炸) ===
def sample_pairs(responses, max_pairs=MAX_PAIRS_PER_PROMPT):
    pairs = list(itertools.combinations(range(len(responses)), 2))
    return random.sample(pairs, min(len(pairs), max_pairs))

# === Step 2: 模拟“打分”或 ranking ===
# 实际中可以用强模型 judge 或人工给出排名
def get_mock_ranking(responses):
    """随机生成一个 ranking (0=最好)"""
    ranks = list(range(len(responses)))
    random.shuffle(ranks)
    return ranks

# === Step 3: rank → score 转换 ===
def rank_to_score(ranks, method="linear", lam=0.8):
    n = len(ranks)
    scores = np.zeros(n)
    if method == "linear":
        for i, r in enumerate(ranks):
            scores[i] = 1 - (r / (n - 1))
    elif method == "softmax":
        exps = np.exp(-lam * np.array(ranks))
        scores = exps / exps.sum()
    return scores.tolist()

# === Step 4: 构建输出样本 ===
converted = []
for item in dataset:
    prompt = item["prompt"]
    responses = item["responses"][:TOP_K]

    # 1. pair 抽样
    sampled_pairs = sample_pairs(responses)

    # 2. ranking(这里用 mock,可替换为 judge 模型)
    ranks = get_mock_ranking(responses)
    scores = rank_to_score(ranks, method=RANK_TO_SCORE_METHOD)

    # 3. 构造输出结构
    record = {
        "prompt": prompt,
        "responses": [
            {"text": r, "rank": ranks[i], "score": scores[i]} 
            for i, r in enumerate(responses)
        ],
        "pairs": [
            {"winner": responses[i], "loser": responses[j]} 
            for (i, j) in sampled_pairs
        ]
    }
    converted.append(record)

# === Step 5: 保存 JSON ===
with open("rubric_rm_dataset.json", "w", encoding="utf-8") as f:
    json.dump(converted, f, ensure_ascii=False, indent=2)

print(json.dumps(converted[:1], ensure_ascii=False, indent=2))

如下是运行如上代码的示例结果:

[
  {
    "prompt": "请解释量子纠缠。",
    "responses": [
      {"text": "量子纠缠是量子系统中粒子之间的一种特殊相关性。", "rank": 3, "score": 0.333},
      {"text": "量子纠缠指两个粒子的状态相互依赖,即使分开也会同时改变。", "rank": 1, "score": 0.667},
      {"text": "量子纠缠意味着两个粒子会瞬间通信。", "rank": 4, "score": 0.0},
      {"text": "量子纠缠是一种宏观现象。", "rank": 2, "score": 0.5},
      {"text": "量子纠缠可用于量子通信和量子计算。", "rank": 0, "score": 1.0}
    ],
    "pairs": [
      {"winner": "量子纠缠是一种宏观现象。", "loser": "量子纠缠意味着两个粒子会瞬间通信。"},
      {"winner": "量子纠缠指两个粒子的状态相互依赖,即使分开也会同时改变。", "loser": "量子纠缠是量子系统中粒子之间的一种特殊相关性。"},
      {"winner": "量子纠缠可用于量子通信和量子计算。", "loser": "量子纠缠是一种宏观现象。"}
    ]
  }
]

采用这种方法,也可以用 Rubric RM 的方式对每个维度进行打分, 然后再融合分数 (平均或者加权)

import random
import itertools
import numpy as np
import json

# === 参数设置 ===
RUBRICS = ["helpfulness", "truthfulness", "conciseness", "politeness"]
TOP_K = 5                    # 每个 prompt 考虑前 K 个回答
MAX_PAIRS_PER_PROMPT = 3     # 每个 prompt 抽 3 对 pair
RANK_TO_SCORE_METHOD = "linear"
LAMBDA = 0.8

# === 示例输入数据 ===
dataset = [
    {
        "prompt": "请解释量子纠缠。",
        "responses": [
            "量子纠缠是量子系统中粒子之间的一种特殊相关性。",
            "量子纠缠指两个粒子的状态相互依赖,即使分开也会同时改变。",
            "量子纠缠意味着两个粒子会瞬间通信。",
            "量子纠缠是一种宏观现象。",
            "量子纠缠可用于量子通信和量子计算。"
        ]
    },
    {
        "prompt": "什么是黑洞?",
        "responses": [
            "黑洞是质量极大的天体,其引力强到连光都无法逃逸。",
            "黑洞是一种能量体,会吸收周围的能量。",
            "黑洞是一种恒星的坍缩结果。",
            "黑洞是一种暗物质。",
        ]
    }
]

# === 工具函数 ===
def sample_pairs(responses, max_pairs=MAX_PAIRS_PER_PROMPT):
    pairs = list(itertools.combinations(range(len(responses)), 2))
    return random.sample(pairs, min(len(pairs), max_pairs))

def rank_to_score(ranks, method="linear", lam=0.8):
    n = len(ranks)
    if method == "linear":
        scores = [1 - (r / (n - 1)) for r in ranks]
    elif method == "softmax":
        exps = np.exp(-lam * np.array(ranks))
        scores = (exps / exps.sum()).tolist()
    return scores

def get_mock_ranking(responses, num_rubrics):
    """模拟不同维度下的排名"""
    ranks_per_rubric = {}
    for r in RUBRICS:
        ranks = list(range(len(responses)))
        random.shuffle(ranks)
        ranks_per_rubric[r] = ranks
    return ranks_per_rubric

# === 构建 Rubric RM 数据集 ===
converted = []
for item in dataset:
    prompt = item["prompt"]
    responses = item["responses"][:TOP_K]
    n = len(responses)
    # 模拟不同 rubric 的 ranking(实际可用 GPT-judge 或人工替换)
    ranks_per_rubric = get_mock_ranking(responses, len(RUBRICS))
    # 将 ranking 转换为分数
    scores_per_rubric = {}
    for r_name, ranks in ranks_per_rubric.items():
        scores_per_rubric[r_name] = rank_to_score(ranks, method=RANK_TO_SCORE_METHOD, lam=LAMBDA)
    # 构建每个回答的多维分数
    answers_info = []
    for i, resp in enumerate(responses):
        rubric_scores = {r_name: scores_per_rubric[r_name][i] for r_name in RUBRICS}
        overall = float(np.mean(list(rubric_scores.values())))
        answers_info.append({
            "text": resp,
            "rubric_scores": rubric_scores,
            "overall_score": overall
        })
    # 抽样有限数量的 pair(节省计算)
    sampled_pairs = sample_pairs(responses)
    converted.append({
        "prompt": prompt,
        "responses": answers_info,
        "pairs": [
            {
                "winner": responses[i],
                "loser": responses[j]
            }
            for (i, j) in sampled_pairs
        ]
    })
# === 输出 JSON 文件 ===
with open("rubric_rm_multidim_dataset.json", "w", encoding="utf-8") as f:
    json.dump(converted, f, ensure_ascii=False, indent=2)

print(json.dumps(converted[:1], ensure_ascii=False, indent=2))

RM 样本构造: 构造差距更小的样本对 v.s 构造差距更大的样本对

Reward Score Ensemble: 多个 RM score 如何融合?

假设 PPO 阶段有两个 RM 模型,一个是来自 0-1 二分类样本训练 pointwise 模型,另一个是从 pairwise 偏好样本对训练的标准 RM 模型,这两个模型的分数在 RL 阶段应该如何正确融合?

1.先对两个 score 做 normalize

2.调优 score 超参数 $\alpha$ 和 $\beta$

下面伪代码模拟了两个 RM 内容质量 RM /安全性 RM 的常规合成

import numpy as np

# =========================================
# 1️⃣ 模拟两个 Reward Model 的输出
# =========================================
# r1: 内容质量 RM
# r2: 安全性 RM
r1 = np.array([2.1, 0.5, -1.2, 3.4])    # RM1 原始输出(任意实数)
r2 = np.array([0.03, -0.2, -0.8, 0.6])  # RM2 原始输出

# =========================================
# 2️⃣ 对每个 RM 单独做 Z-score 归一化
# =========================================
def zscore(x):
    return (x - x.mean()) / (x.std() + 1e-8)

r1_z = zscore(r1)
r2_z = zscore(r2)

# =========================================
# 3️⃣ 加权融合
# =========================================
# 示例 1:简单平均
r_final_avg = 0.5 * r1_z + 0.5 * r2_z

# 示例 2:固定加权(内容质量 0.7,安全性 0.3)
r_final_weighted = 0.7 * r1_z + 0.3 * r2_z

# 示例 3(可选):保守融合 - 取最小值(双 RM 都满意)
r_final_min = np.minimum(r1_z, r2_z)

# =========================================
# 4️⃣ (可选)再缩放,控制 reward 数值范围
# PPO 等 RLHF 阶段常用 tanh 限幅
# =========================================
λ = 2.0  # 控制最终 reward 强度
r_final_clipped = λ * np.tanh(r_final_weighted)

# =========================================
# 5️⃣ 输出最终可用 reward
# =========================================
print("RM1 原始输出:", r1)
print("RM2 原始输出:", r2)
print("融合后 reward:", r_final_clipped)

转载请注明来源 goldandrabbit.github.io