动手实现一个播客翻译工作流

之前有期 Dwarkesh Patel 对话 Andrej Karpathy 的播客放在书签里好久没看,每天车上通勤时间有点浪费,遂想将播客转译为中文人声的想法。使用 Notebooklm 的话,通过 AI 提炼的文字对于播客这种信息密度比较高的内容来说损失难以衡量,而市面上虽然已有一些流行的视频字幕和人声转换工具,但好像又没有开源成熟的播客翻译的项目实现,于是准备先手动跑一遍流程,再看看有没有什么适合封装的实现。

跟 gemini 交流了一番,总结出主要流程需包含以下步骤:

  • Demucs:人声分离
  • WhisperX:声音识别与分割
  • LLM API:翻译
  • GPT-SoVITS:生成中文语音
  • pydub/ffmpeg-python:时序对齐和混音

0x00 环境准备

本来用的 nvidia pytorch 包依赖冲突太多,遂决定基于 nvcr.io/nvidia/cuda:12.1.1-cudnn8-devel-ubuntu22.04 重新构建一个,requirement.txt 参考了 VideoLingo 的,让 gemini 精简了下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# --- 核心基础库 ---
numpy==1.26.4
pandas==2.2.3
librosa==0.10.2.post1
scipy

# --- 音频处理与计算 ---
pydub==0.25.1
moviepy==1.0.3
ctranslate2==4.4.0
transformers==4.39.3
pytorch-lightning==2.3.3

# --- 核心 AI 模型 (直接从源码安装以获取最新修复) ---
# Demucs: 人声分离
demucs[dev] @ git+https://github.com/adefossez/demucs
# WhisperX: 识别与对齐 (指定了特定 Commit 以保证稳定性)
whisperx @ git+https://github.com/m-bain/whisperx.git@7307306a9d8dd0d261e588cc933322454f853853

# --- LLM 与 文本处理 ---
openai==1.55.3
json-repair # 极力推荐:防止 LLM 返回的 JSON 格式错误
spacy==3.7.4 # 用于更好的分句处理
autocorrect-py # 简单的拼写纠错

# --- 辅助工具 ---
PyYAML==6.0.2
requests==2.32.3

由于使用的 enroot 环境,没通过 dockerfile 构建,直接命令行一把梭:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 安装系统级依赖
apt-get update && apt-get install -y \
python3.10 \
python3-pip \
python3-venv \
git \
ffmpeg \
libavcodec-dev \
libavformat-dev \
libavdevice-dev \
libavutil-dev \
libswscale-dev \
libswresample-dev \
libavfilter-dev \
pkg-config \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# 安装 PyTorch
pip install --upgrade pip
pip install torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 --index-url https://download.pytorch.org/whl/cu121
# 安装剩余依赖
pip install -r requirements.txt
# 创建软链接
ln -s /usr/bin/python3.10 /usr/bin/python

0x01 人声分离

Demucs 是一款先进的音乐源分离模型,目前能够将鼓、贝斯和人声从其他伴奏中分离出来。

之前从 YouTube 下载的音频是 m4v 格式,虽然 demucs 支持,但还是按照 gemini 建议先将其转换为 wav 以提高后续处理效率。(转换后的 wav 体积大概是源文件的 10 倍,我是通过 ssh 进行的操作下载很慢,所以后续需要截取切片来试听效果)

1
ffmpeg -i input.m4a -ar 44100 -ac 2 input.wav

可以先截取 30min 验证流程:

1
ffmpeg -i input_source.wav -t 00:30:00 -c copy input.wav

使用 demucs 分离人声:

1
demucs --two-stems=vocals -n htdemucs input.wav
  • –two-stems=vocals: 告诉模型我只要 vocals(人声)和 no_vocals(伴奏/背景音)。如果不加这个,它会分成鼓、贝斯、其他等4个轨道,对播客没用。
  • -n htdemucs: 指定使用最新模型。还有一个微调版本 htdemucs_ft,核心改进在于分离头的精细化调整,推理时间更长,质量更高。

输出的音频位于 separated/htdemucs/input 下,分为 vocals.wavno_vocals.wav 两个音频文件(都是 GB 级别),可以通过以下命令截取 5 分钟片段下载到本地试听下效果:

1
2
ffmpeg -i separated/htdemucs/input/vocals.wav -t 300 -c copy testvocals.wav
ffmpeg -i separated/htdemucs/input/no_vocals.wav -t 300 -c copy test_no_vocals.wav

在音频目录下也可以通过 python 启动一个 http 服务转发音频在浏览器中直接预览:

1
python3 -m http.server 8000

0x02 语音识别

Whisper 是 OpenAI 开发的一款自动语音识别 (ASR) 模型,它基于一个包含各种音频的大型数据集进行训练。虽然它能生成高度准确的转录文本,但对应的时间戳是按语句而非单词计算的,因此可能存在几秒的误差。OpenAI 的 Whisper 本身并不支持批量处理。

WhisperX 是基于 Whisper 的工程封装库,支持说话人区分。

在使用 WhisperX 前,需要准备一些配置,比如说话人声分割用到的模型 speaker-diarization-3.1 要在参数中添加 huggingface 令牌。

首先在网页端 新建一个 read 权限的 token,同时需要在 speaker-diarization-3.1segmentation-3.0 页面同意开源模型协议。

准备完成后就可以在命令行启动 whisperx 进行语音转文字。

1
2
3
4
5
6
7
8
9
10
11
whisperx separated/htdemucs/input/vocals.wav \
--model large-v2 \
--language en \
--diarize \
--min_speakers 2 \
--max_speakers 5 \
--output_dir output_subs \
--output_format all \
--compute_type float16 \
--batch_size 32 \
--hf_token <huggingface_token>

下面是 gemini 给出的一些参数解释:

  • –model large-v2:
    • 目前 Whisper 生态中英文识别综合能力最强的模型(v3 在某些场景下对音乐的抗噪反而不如 v2,v2 是公认最稳的)。
    • 3090 显存足够大,不用担心跑不动。
  • –language en:
    • 显式指定英语。虽然它能自动检测,但指定后可以避免开头几秒误识别,并略微提升速度。
  • –diarize:
    • 添加说话人标签,多人对话场景属于必要参数。
  • –compute_type float16:
    • 使用半精度加速,在 3090 上这是标准操作,速度快且不损失精度。
  • –batch_size 16/32:
    • 这会极大提升处理速度。3090 的 24G 显存完全吃得消 batch 16 甚至 32。
  • –min_speakers 2 / --max_speakers 5:
    • 显式告诉 WhisperX “这里面有 2 个人”,能极大提高说话人区分 (Diarization) 的准确度,防止它把偶尔的背景杂音识别成第 3 个说话人。
    • 注:如果你不确定有几个人,可以不加这两个参数,但在播客场景下指定人数通常效果最好。
    • 补充:好吧后续在翻译结束后才发现中间有另一个女声(还好白嫖的翻译 api),导致两位男主声音合并到了一起。保险起见可以设置为 2~5 或更多。
  • –initial_prompt “Discussion about AI, LLMs, Transformer, PyTorch, CUDA, reinforcement learning, Andrej Karpathy, Dwarkesh Patel.”
    • 注入提示词可以优化技术类播客的生成效果,不过先不考虑添加。
      –max_line_width 100
    • 分片字符长度限制。在后续尝试中被去除,保证长句完整性。

运行完成后输出在当前路径下的 output_subs 目录:

1
2
$ ls output_subs/
vocals.json vocals.srt vocals.tsv vocals.txt vocals.vtt

将 json 文件(后续准备用来翻译的格式)下载下来预览一下:

1
2
3
{"segments": [{"start": 0.109, "end": 1.009, "text": " Reinforcement learning is terrible.", "words": [{"word": "Reinforcement", "start": 0.109, "end": 0.449, "score": 0.303, "speaker": "SPEAKER_01"}, {"word": "learning", "start": 0.469, "end": 0.649, "score": 0.849, "speaker": "SPEAKER_01"}
...
, {"word": "one.", "start": 8766.077, "end": 8766.157, "score": 0.741, "speaker": "SPEAKER_01"}], "language": "en"}

可以看出其中包含了文本时间轴和说话人等关键信息,但是也不难发现有些句子被切分得很短,在丢给大模型翻译时可能会丢失上下文信息。不过这个问题可以先放一边,后续再考虑优化。

最好再确认下人声数量分布,提前发现问题。

1
2
3
4
$ grep -o '"speaker": "[^"]*"' output_subs/vocals.json | sort | uniq -c
319 "speaker": "SPEAKER_00"
46484 "speaker": "SPEAKER_01"
16872 "speaker": "SPEAKER_02"

0x03 文本翻译

这边直接使用 gemini 提供的翻译脚本,支持断点续传以增加安全感,本地通过 new api 代理白嫖一个 gemini-2.5-flash 测试下(年底 google 收缩了白嫖额度,后续又尝试了很多模型):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import json
import os
import time
import shutil
from openai import OpenAI
from concurrent.futures import ThreadPoolExecutor

# ==========================================
# 🔧 通用配置区 (Standard Configuration)
# ==========================================
INPUT_FILE = "output_subs/vocals_semantic.json"
OUTPUT_FILE = "output_subs/vocals_translated.json"
MODEL_NAME = "qwen3-max-2025-09-23"

# 通用优化参数
BATCH_SIZE = 15 # 单次请求翻译的行数 (10-20 是安全范围)
SLEEP_INTERVAL = 0.5 # 请求间隔 (秒),防止瞬间并发过高
MAX_WORKERS = 3 # 并发线程数 (根据你的 API 配额调整,通常 3-5 是安全的)
MAX_RETRIES = 3 # 失败重试次数
TIMEOUT_SECONDS = 60 # API 超时时间
# ==========================================

# --- 术语表 (可根据内容修改) ---
GLOSSARY = """
- LLM: LLM (大语言模型)
- Transformer: Transformer
- Token: Token
- Weights: 权重
- Context Window: 上下文窗口
- Hallucination: 幻觉
- Inference: 推理
- RLHF: RLHF
- Gradient Descent: 梯度下降
"""

# 初始化客户端
# 脚本会自动读取环境变量 OPENAI_API_KEY 和 OPENAI_BASE_URL
client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
timeout=TIMEOUT_SECONDS
)

def safe_save_data(segments):
"""原子写入:防止中断导致 JSON 文件损坏"""
temp_file = OUTPUT_FILE + ".tmp"
try:
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(segments, f, ensure_ascii=False, indent=2)
shutil.move(temp_file, OUTPUT_FILE)
except Exception as e:
print(f"⚠️ Save failed: {e}")

def load_data():
"""智能加载逻辑:检测旧缓存冲突"""
if not os.path.exists(INPUT_FILE):
print(f"❌ Input file missing: {INPUT_FILE}")
exit(1)

with open(INPUT_FILE, 'r', encoding='utf-8') as f:
input_data = json.load(f)

# 初始化字段
for seg in input_data:
if 'text_zh' not in seg:
seg['text_zh'] = ""

if os.path.exists(OUTPUT_FILE):
print(f"♻️ Checking existing progress in {OUTPUT_FILE}...")
try:
with open(OUTPUT_FILE, 'r', encoding='utf-8') as f:
output_data = json.load(f)

# 校验长度,防止不同分词版本混用
if len(input_data) != len(output_data):
print(f"⚠️ Mismatch detected (Input: {len(input_data)} vs Cache: {len(output_data)}).")
print(f"🗑️ Discarding old cache. Starting fresh.")
return input_data
else:
translated_count = sum(1 for seg in output_data if seg.get('text_zh', '').strip() != "")
print(f"✅ Resume successful! ({translated_count}/{len(output_data)} segments translated)")
return output_data
except json.JSONDecodeError:
print(f"⚠️ Cache corrupted. Starting fresh.")
return input_data
return input_data

def translate_batch(args):
"""处理单个批次,包含重试逻辑"""
batch_idx, segments = args
input_count = len(segments)

# 1. 检查是否已完成
if all(seg.get('text_zh', '').strip() != "" for seg in segments):
print(f"⏩ Batch {batch_idx} already done.")
return segments, False

# 2. 构造 Prompt
task_text = f"Task (Translate these {input_count} lines one by one):\n"
for i, seg in enumerate(segments):
task_text += f"[{i}] {seg['text'].strip()}\n"

system_prompt = f"""
You are a professional translator for an AI technical podcast.
Target: Natural, fluent spoken Chinese.
Glossary:
{GLOSSARY}

IMPORTANT:
1. You have received {input_count} lines.
2. You MUST return exactly {input_count} translated lines.
3. Return ONLY a JSON string array. Format: ["text1", "text2"]
"""

# 3. 重试循环
for attempt in range(MAX_RETRIES):
try:
# 简单的限流休眠
time.sleep(SLEEP_INTERVAL)

print(f"🚀 [Batch {batch_idx}] Requesting {input_count} lines (Attempt {attempt+1})...")
start_ts = time.time()

response = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": task_text}
]
)

duration = time.time() - start_ts
result = response.choices[0].message.content.strip()

# 清洗 Markdown
if result.startswith("```"):
result = result.split("\n", 1)[1]
if result.endswith("```"):
result = result.rsplit("\n", 1)[0]

translations = json.loads(result)
if isinstance(translations, dict):
translations = list(translations.values())[0]

# 严格校验行数
if not isinstance(translations, list) or len(translations) != input_count:
print(f"⚠️ [Batch {batch_idx}] Count mismatch! Sent {input_count}, got {len(translations) if isinstance(translations, list) else 'Error'}")
raise ValueError("Line count mismatch")

# 回填结果
for i, seg in enumerate(segments):
seg['text_zh'] = str(translations[i]).strip()

print(f"✅ [Batch {batch_idx}] Success in {duration:.2f}s")
return segments, True

except Exception as e:
print(f"❌ [Batch {batch_idx}] Error: {str(e)}")

# 简单的退避策略
if attempt < MAX_RETRIES - 1:
wait_time = (attempt + 1) * 2 # 2s, 4s, 6s...
print(f"🔄 Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f"💀 Batch {batch_idx} Failed after {MAX_RETRIES} attempts.")
break

return segments, False

def main():
all_segments = load_data()
total_len = len(all_segments)

# 构造批次
batches = []
for i in range(0, total_len, BATCH_SIZE):
batch_seg = all_segments[i : i + BATCH_SIZE]
batches.append((i // BATCH_SIZE, batch_seg))

print(f"🚀 Processing {len(batches)} batches.")
print(f"ℹ️ Config: Batch Size={BATCH_SIZE}, Workers={MAX_WORKERS}, Interval={SLEEP_INTERVAL}s")

# 使用线程池并发处理
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# map 保证结果顺序返回,方便按顺序合并数据
results = executor.map(translate_batch, batches)

for i, (updated_batch, changed) in enumerate(results):
start_idx = i * BATCH_SIZE
all_segments[start_idx : start_idx + len(updated_batch)] = updated_batch

# 定期保存进度
if changed or i % 10 == 0:
safe_save_data(all_segments)
if changed:
print(f"💾 Progress saved at batch {i}")

safe_save_data(all_segments)
print(f"🎉 All Done! Output saved to {OUTPUT_FILE}")

if __name__ == "__main__":
main()

看起来 AI 很喜欢在日志中加入符号:

1
2
3
4
5
6
7
8
9
$ python translate_subs.py
🆕 No progress found, starting from scratch with output_subs/vocals.json...
🚀 Processing 200 batches...
🔄 Translating Batch 0 (10 lines)...
🔄 Translating Batch 1 (10 lines)...
💾 Saved progress at batch 0
🔄 Translating Batch 2 (10 lines)...
💾 Saved progress at batch 1
🔄 Translating Batch 3 (10 lines)...

完成后生成的 vocals_translated.json 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[
{
"start": 0.109,
"end": 1.009,
"text": " Reinforcement learning is terrible.",
"words": [
{
"word": "Reinforcement",
"start": 0.109,
"end": 0.449,
"score": 0.303,
"speaker": "SPEAKER_01"
},
{
"word": "learning",
"start": 0.469,
"end": 0.649,
"score": 0.849,
"speaker": "SPEAKER_01"
},
{
"word": "is",
"start": 0.669,
"end": 0.709,
"score": 0.96,
"speaker": "SPEAKER_01"
},
{
"word": "terrible.",
"start": 0.729,
"end": 1.009,
"score": 0.899,
"speaker": "SPEAKER_01"
}
],
"speaker": "SPEAKER_01",
"text_zh": "强化学习糟透了。"
},
...
]

0x04 语音生成

首先准备一个新的 cuda 12.8 环境,安装 miniconda,然后使用 GPT-SoVITS 官方的一键安装脚本来初始化环境。

1
2
3
4
5
6
7
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
conda create -n GPTSoVits python=3.10
conda activate GPTSoVits
git clone https://github.com/RVC-Boss/GPT-SoVITS.git
cd GPT-SoVITS
bash install.sh --device CU128 --source ModelScope

按照 gemini 建议采用零样本(Zero-shot)微调,先通过脚本获取几个说话人的音频切片:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import json
import os
import re
from pydub import AudioSegment

# --- 配置 ---
WAV_FILE = "separated/htdemucs/input/vocals.wav"
JSON_FILE = "output_subs/vocals_translated.json"
OUTPUT_DIR = "output_refs"

# 理想参数
IDEAL_DURATION = 6.0 # 理想时长 6秒
MIN_DURATION = 3.5 # 最短 3.5秒
MAX_DURATION = 10.0 # 最长 10秒

def calculate_score(seg, total_duration_idx):
"""
全自动评分函数
"""
text = seg['text'].strip()
duration = seg['end'] - seg['start']

# 1. 硬性门槛 (直接淘汰)
if duration < MIN_DURATION or duration > MAX_DURATION:
return -9999

# 2. 基础分:时长评分 (钟形曲线,越接近 6s 分越高)
# 距离 6s 每差 1s 扣 10 分
score = 100 - abs(duration - IDEAL_DURATION) * 10

# 3. 文本纯净度
# 只要字母和常用标点,排除特殊符号
alpha_count = len(re.findall(r'[a-zA-Z]', text))
if alpha_count < 10: return -9999 # 单词太少

# 字符/时长比 (语速检测)
# 正常语速大概每秒 10-15 个字母。如果太快(>20)或太慢(<5),扣分
cps = alpha_count / duration
if cps > 22: score -= 30 # 太快,容易吞字
if cps < 5: score -= 30 # 太慢,可能是拖长音

# 惩罚疑问句/感叹句 (语调不稳定)
if "?" in text: score -= 15
if "!" in text: score -= 15

# 4. 位置偏好 (Recency Bias)
# 我们倾向于取播客前面 1/3 的内容,通常音质较好
# total_duration_idx 是该 segment 在整个列表中的索引位置 (0.0 - 1.0)
if total_duration_idx < 0.3:
score += 10 # 早期加分
elif total_duration_idx > 0.8:
score -= 10 # 尾部扣分 (可能那是结束语或疲惫音)

return score

def main():
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)

print(f"📂 Loading audio {WAV_FILE} (Please wait, loading large file)...")
try:
audio = AudioSegment.from_wav(WAV_FILE)
except Exception as e:
print(f"❌ Failed to load WAV: {e}")
return

with open(JSON_FILE, 'r', encoding='utf-8') as f:
segments = json.load(f)

total_segments = len(segments)

# 按说话人分组
speakers = {}
for i, seg in enumerate(segments):
spk = seg.get('speaker', 'UNKNOWN')
if spk not in speakers: speakers[spk] = []

# 记录相对位置 (0.0 - 1.0)
seg['rel_pos'] = i / total_segments
speakers[spk].append(seg)

print(f"🔍 Found {len(speakers)} speakers.")

for spk, items in speakers.items():
best_score = -10000
best_seg = None

# 遍历该说话人的所有句子,找分最高的
for item in items:
score = calculate_score(item, item['rel_pos'])
if score > best_score:
best_score = score
best_seg = item

if best_seg:
print(f"🏆 Best Ref for {spk} (Score: {best_score:.1f}):")
print(f" Text: \"{best_seg['text']}\"")
print(f" Time: {best_seg['start']:.1f}s - {best_seg['end']:.1f}s ({best_seg['end']-best_seg['start']:.1f}s)")

# 导出最佳音频
start_ms = int(best_seg['start'] * 1000)
end_ms = int(best_seg['end'] * 1000)

# 稍微向外扩一点点 (防止切断首尾辅音),比如前后各加 50ms
# 但要确保不越界
start_ms = max(0, start_ms - 50)
end_ms = min(len(audio), end_ms + 50)

chunk = audio[start_ms:end_ms]

wav_path = os.path.join(OUTPUT_DIR, f"{spk}_ref.wav")
txt_path = os.path.join(OUTPUT_DIR, f"{spk}_ref.txt")

chunk.export(wav_path, format="wav")
with open(txt_path, 'w', encoding='utf-8') as f:
f.write(best_seg['text'])

print(f" ✅ Saved to {wav_path}")
else:
print(f"⚠️ No valid reference found for {spk} (All filtered out).")

if __name__ == "__main__":
main()

执行后输出的目录结构如下,分别对应多个说话人的音频切片和文字样本:

1
2
3
4
5
6
7
8
$ tree output_refs/
output_refs/
├── SPEAKER_00_ref.txt
├── SPEAKER_00_ref.wav
├── SPEAKER_01_ref.txt
├── SPEAKER_01_ref.wav
├── SPEAKER_02_ref.txt
└── SPEAKER_02_ref.wav

启动 api_v2.py(使用最新的 v2 版本),并让 gemini 给了一个调用 GPT-SoVITS api 的脚本,利用之前的采样片段和英文分片音频生成新的中文人声音频。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import json
import os
import requests
import time
from pydub import AudioSegment

# ================= 配置区 =================
INPUT_JSON = "output_subs/vocals_translated.json"
OUTPUT_DIR = "output_audio"
REF_DIR = "output_refs"
GPT_SOVITS_URL = "http://127.0.0.1:9880"
MIN_REF_DURATION = 3.0 # 参考音频最小适长 (秒)
# ==========================================

def fix_ref_audio(wav_path, txt_path):
"""如果参考音频太短,自动循环拼接以满足时长要求"""
try:
audio = AudioSegment.from_wav(wav_path)
original_dur = len(audio) / 1000.0

if original_dur >= MIN_REF_DURATION:
with open(txt_path, 'r', encoding='utf-8') as f:
return wav_path, f.read().strip()

print(f"⚠️ Ref {os.path.basename(wav_path)} too short ({original_dur:.1f}s). Extending...")

# 读取原始文本
with open(txt_path, 'r', encoding='utf-8') as f:
text = f.read().strip()

# 循环拼接直到达标
extended_audio = audio
extended_text = text
while len(extended_audio) / 1000.0 < MIN_REF_DURATION:
extended_audio += audio
extended_text += " " + text # 文本也同步重复

# 保存临时修复文件
fixed_wav = wav_path.replace(".wav", "_fixed.wav")
extended_audio.export(fixed_wav, format="wav")

return fixed_wav, extended_text

except Exception as e:
print(f"❌ Error fixing ref {wav_path}: {e}")
return wav_path, ""

def load_speaker_refs():
refs = {}
if not os.path.exists(REF_DIR):
return {}

for f in os.listdir(REF_DIR):
if f.endswith("_ref.wav") and not f.endswith("_fixed.wav"):
speaker = f.replace("_ref.wav", "")
wav_path = os.path.abspath(os.path.join(REF_DIR, f))
txt_path = wav_path.replace(".wav", ".txt")

if os.path.exists(txt_path):
# 调用修复逻辑获取最终路径和文本
final_wav, final_text = fix_ref_audio(wav_path, txt_path)

refs[speaker] = {
"audio": final_wav,
"text": final_text,
"lang": "en"
}
return refs

def generate_segment(index, seg, refs):
speaker = seg.get('speaker', 'UNKNOWN')
text_zh = seg.get('text_zh', '').strip()

if not text_zh: return

filename = f"{index:04d}_{speaker}.wav"
output_path = os.path.join(OUTPUT_DIR, filename)

if os.path.exists(output_path) and os.path.getsize(output_path) > 1024:
print(f"⏩ [{index}] Exists.")
return

# 获取参考信息,如果找不到则使用第一个可用的作为兜底
ref = refs.get(speaker)
if not ref and refs:
ref = list(refs.values())[0]

if not ref:
print(f"❌ No refs available for {speaker}")
return

payload = {
"text": text_zh,
"text_lang": "zh",
"ref_audio_path": ref['audio'],
"prompt_text": ref['text'],
"prompt_lang": ref['lang'],
"text_split_method": "cut5",
"batch_size": 1,
"media_type": "wav",
"streaming_mode": False
}

try:
response = requests.post(f"{GPT_SOVITS_URL}/tts", json=payload, stream=True)
if response.status_code == 200:
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=4096):
f.write(chunk)
print(f"✅ [{index}] {speaker}: Generated.")
else:
print(f"❌ [{index}] API Error: {response.text}")
except Exception as e:
print(f"❌ [{index}] Conn Error: {e}")

def main():
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)

print("🔍 Loading & Validating Refs...")
refs = load_speaker_refs()
print(f"Speakers ready: {list(refs.keys())}")

with open(INPUT_JSON, 'r', encoding='utf-8') as f:
segments = json.load(f)

print(f"🚀 Processing {len(segments)} segments...")

# 串行执行以保证稳定性
for i, seg in enumerate(segments):
generate_segment(i, seg, refs)

print("🎉 All done!")

if __name__ == "__main__":
main()

首次执行时发现之前的 SPEAKER_00 的女声采样片段只有 1s,不符合 GPT-SoVITS 要求,强行将所有女声片段拼成了一个 4s 音频重新生成。

单张 3090 上大概跑了一小时左右,生成了如下所有中文音频片段。

1
2
3
4
5
6
7
8
output_audio/
├── 0000_SPEAKER_01.wav
├── 0001_SPEAKER_01.wav
├── 0002_SPEAKER_01.wav
...
├── 1990_SPEAKER_02.wav
├── 1991_SPEAKER_02.wav
└── 1992_SPEAKER_02.wav

试听了一下估计由于仅仅使用了很短的样本而没有微调模型,声音和原来的 Karpathy 完全不一样,不过可以先用着,后面再考虑微调。

0x05 合并输出

接下来让 gemini 给了一段合并的代码,包含了语速调整和时间轴对齐等处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import json
import os
import subprocess
from pydub import AudioSegment, silence

# --- 配置 ---
JSON_FILE = "output_subs/vocals_translated.json"
AUDIO_DIR = "output_audio"
BACKGROUND_FILE = "separated/htdemucs/input/no_vocals.wav"
FINAL_OUTPUT = "final_podcast_smart_cn.mp3"
TEMP_WAV = "temp_vocals_smart.wav"

# --- 核心参数 ---
MAX_SPEED = 1.35 # 最大加速倍率 (超过这个倍率,宁愿让时间轴顺延)
MIN_SPEED = 1.0 # 最小速度 (绝不慢放,防止拖长音)

def remove_silence(audio_segment, silence_thresh=-50, min_silence_len=100):
"""
去除音频首尾的静音,节省时长。
"""
# pydub 的 detect_nonsilent 比较慢,这里用简单的 strip 逻辑
# 实际上 GPT-SoVITS 的静音通常比较纯净,直接用 split_on_silence 可能太激进
# 这里我们只做一个简单的首尾裁剪

def detect_leading_silence(sound, silence_threshold=-50.0, chunk_size=10):
trim_ms = 0 # ms
assert chunk_size > 0 # to avoid infinite loop
while sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold and trim_ms < len(sound):
trim_ms += chunk_size
return trim_ms

start_trim = detect_leading_silence(audio_segment, silence_thresh)
end_trim = detect_leading_silence(audio_segment.reverse(), silence_thresh)

duration = len(audio_segment)
stripped = audio_segment[start_trim:duration-end_trim]

# 如果全剪没了,说明全是静音,返回一段很短的静音防止报错
if len(stripped) == 0:
return AudioSegment.silent(duration=100)

return stripped

def change_speed(audio_segment, speed=1.0):
"""使用 FFmpeg 进行变速 (Time Stretching)"""
if 0.99 < speed < 1.01:
return audio_segment

# 导出临时源文件
src_tmp = f"tmp_src_{os.getpid()}.wav"
dst_tmp = f"tmp_dst_{os.getpid()}.wav"
audio_segment.export(src_tmp, format="wav")

# 限制 ffmpeg atempo 滤镜的范围 (0.5 - 2.0)
# 虽然我们在外面限制了 MAX_SPEED,但为了安全再次钳制
safe_speed = max(0.5, min(speed, 2.0))

cmd = [
"ffmpeg", "-y", "-v", "error", "-i", src_tmp,
"-filter:a", f"atempo={safe_speed}",
"-vn", dst_tmp
]

subprocess.run(cmd)

if os.path.exists(dst_tmp):
try:
new_seg = AudioSegment.from_wav(dst_tmp)
except:
print("⚠️ FFmpeg output invalid, using original.")
new_seg = audio_segment

# 清理
if os.path.exists(src_tmp): os.remove(src_tmp)
if os.path.exists(dst_tmp): os.remove(dst_tmp)
return new_seg
else:
print("❌ FFmpeg failed, using original.")
if os.path.exists(src_tmp): os.remove(src_tmp)
return audio_segment

def main():
print(f"📂 Loading {JSON_FILE}...")
with open(JSON_FILE, 'r', encoding='utf-8') as f:
segments = json.load(f)

final_vocals = AudioSegment.empty()

# 游标:记录当前生成的音频已经在时间轴上铺到了哪里
# 单位:毫秒
cursor_ms = 0

print(f"🚀 Smart Merging {len(segments)} segments...")
print(f" Policy: No Slow-down (Min {MIN_SPEED}x), Cap Speed-up (Max {MAX_SPEED}x)")

for i, seg in enumerate(segments):
speaker = seg.get('speaker', 'UNKNOWN')
filename = f"{i:04d}_{speaker}.wav"
file_path = os.path.join(AUDIO_DIR, filename)

# 原始字幕的开始和结束时间
orig_start_ms = int(seg['start'] * 1000)
orig_end_ms = int(seg['end'] * 1000)
orig_duration = orig_end_ms - orig_start_ms

# 1. 间隙处理 (Gap Handling)
# 如果当前游标还在后面 (cursor < start),说明中间有空隙(没人说话)
# 我们要填补这段静音,让时间轴对齐
if cursor_ms < orig_start_ms:
silence_dur = orig_start_ms - cursor_ms
final_vocals += AudioSegment.silent(duration=silence_dur)
cursor_ms = orig_start_ms # 游标移动到当前句子的原定开始处

# 注意:如果 cursor_ms > orig_start_ms,说明上一句说太长了,溢出了。
# 此时我们不填静音,直接紧接着上一句开始(Smart Overflow),
# 这样会稍微推迟这一句的开始时间,但保证了上一句的完整性。

# 2. 准备音频
if not os.path.exists(file_path):
# 如果文件缺失,用静音代替,长度设为原长
# (或者直接跳过,看你喜好,填静音能保持节奏)
# print(f"⚠️ Missing: {filename}")
processed_audio = AudioSegment.silent(duration=orig_duration)
else:
raw_audio = AudioSegment.from_wav(file_path)
# 剪除生成的空白,争取时间
trimmed_audio = remove_silence(raw_audio)

gen_duration = len(trimmed_audio)

# 3. 计算变速策略
if gen_duration == 0:
speed = 1.0
else:
# 理论需要的倍速 = 生成时长 / 原时长
# 比如生成了 10s,原长 5s,需要 2.0x
ratio = gen_duration / orig_duration

if ratio <= 1.0:
# 如果生成得比原片短 (0.8x),不需要慢放
# 保持 1.0x,后面自然会留白
speed = 1.0
else:
# 如果生成得太长
if ratio > MAX_SPEED:
# 如果需要 2.0x,但我们限制最大 1.35x
# 那么就只加速到 1.35x,剩下的让它溢出
speed = MAX_SPEED
else:
# 在 1.0x - 1.35x 之间,按需加速
speed = ratio

# 执行变速
processed_audio = change_speed(trimmed_audio, speed)

# 4. 拼接到总轨道
final_vocals += processed_audio

# 5. 更新游标
cursor_ms += len(processed_audio)

if i % 100 == 0:
print(f"Processing... {i}/{len(segments)} | Timeline Shift: {cursor_ms - orig_end_ms}ms")

# 导出人声
print("💾 Exporting vocals track...")
final_vocals.export(TEMP_WAV, format="wav")

# 混音
print("🎛️ Mixing with background...")

# 检查背景音时长,如果背景音不够长(因为溢出导致人声变长),FFmpeg 默认会以最短的结束
# 我们可以加上 -shortest 或者 loop,但通常 Podcast 背景音都很长
# 这里用 amix,duration=first (以人声长度为准,背景音长了截断,短了就没了)
# 更好的方式是 duration=longest 但背景音得够长

cmd_mix = [
"ffmpeg", "-y",
"-i", TEMP_WAV,
"-i", BACKGROUND_FILE,
"-filter_complex",
# volume=0.3: 背景音稍微调小一点,保证清晰
# apad: 给背景音补静音防止比人声短 (保险起见)
"[1:a]volume=0.3[bg];[0:a][bg]amix=inputs=2:duration=first:dropout_transition=2",
"-ac", "2",
FINAL_OUTPUT
]

subprocess.run(cmd_mix)

# 清理
if os.path.exists(TEMP_WAV):
os.remove(TEMP_WAV)

print(f"✅ All Done! Output: {FINAL_OUTPUT}")

if __name__ == "__main__":
main()

I re-watched the pod just now too. First of all, yes I know, and I’m sorry that I speak so fast :).
—— Andrej Karpathy

好吧确实很快,并且使用初版代码时很多地方的变速导致听感很奇怪,快的时候会听不清,而慢的时候会拖很长。

上面的代码是已经优化过的版本,主要尝试的优化改动在于如果中文快的话会留白而不是拖长,中文慢的话则会尽可能占用后面句子的时间来利用留白的部分。

实际测试下来有比之前好一些,但是作为信息密度很高的播客来听的话还是会丢失不少关键信息。想了想不妨试试把每一段中文音频原速放在英文之后,作为补充翻译。这样既可以锻炼听力,也可以尽可能保证信息不丢失。不过这样的话可能还需要注意连续播放的长度,尽可能播放完整的一段话再给出翻译,而中文翻译的声音可以忽略背景音。

首先试了一下按时长切片穿插翻译,但是这样听起来有点突兀。于是又想到播客场景也可以在另一个发言人接话之前给整段发言翻译,保证单人发言的连贯性。测试修改后的脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import json
import os
import shutil
import subprocess
from pydub import AudioSegment

# ================= 配置区 =================
# 输入文件
ORIGINAL_WAV = "input.wav"
JSON_FILE = "output_subs/vocals_translated.json"
TTS_DIR = "output_audio"

# 输出文件
FINAL_OUTPUT = "final_podcast_interleaved.mp3"
TEMP_DIR = "temp_mix_chunks"

# --- 核心优化参数 ---
BLOCK_THRESHOLD = 0.8 # 停顿阈值 (秒):只要停顿超过 0.8s,就插入翻译
MAX_BLOCK_DURATION = 15.0 # 强制切分阈值 (秒):单段英文最长不超过 15s

# 间隙微调
GAP_EN_TO_CN = 200 # 英文 -> 中文 (ms)
GAP_CN_TO_EN = 400 # 中文 -> 下一段英文 (ms)
CN_VOLUME_BOOST = 2.5 # 中文音量增益 (dB)
# ==========================================

def get_smart_blocks(segments):
"""
智能切分:兼顾语义连贯性和听觉反馈频率
"""
current_block = []
current_speaker = None
block_start_time = 0.0

for i, seg in enumerate(segments):
# 记录原始索引用于查找 TTS 文件
seg['original_index'] = i

speaker = seg.get('speaker')
start = seg['start']
end = seg['end']

is_new_block = False

if not current_block:
is_new_block = False
block_start_time = start
elif speaker != current_speaker:
# 场景1: 换人说话 -> 必须切
is_new_block = True
else:
last_end = current_block[-1]['end']
gap = start - last_end

# 计算如果加上这一句,当前块的总时长
current_duration = end - block_start_time

# 场景2: 停顿够久 ( > 0.8s )
# 场景3: 话说太多了 ( > 15s ),强制切分以提供翻译
if gap > BLOCK_THRESHOLD or current_duration > MAX_BLOCK_DURATION:
is_new_block = True

if is_new_block:
yield {
"speaker": current_speaker,
"segments": current_block,
"start": current_block[0]['start'],
"end": current_block[-1]['end']
}
current_block = []
block_start_time = start # 重置计时器

current_block.append(seg)
current_speaker = speaker

# Yield 最后一个块
if current_block:
yield {
"speaker": current_speaker,
"segments": current_block,
"start": current_block[0]['start'],
"end": current_block[-1]['end']
}

def main():
if not os.path.exists(ORIGINAL_WAV):
print(f"❌ Original audio not found: {ORIGINAL_WAV}")
return

if os.path.exists(TEMP_DIR):
shutil.rmtree(TEMP_DIR)
os.makedirs(TEMP_DIR)

print(f"📂 Loading original audio (This may take time)...")
try:
original_audio = AudioSegment.from_wav(ORIGINAL_WAV)
except Exception as e:
print(f"❌ Failed to load WAV: {e}")
return

with open(JSON_FILE, 'r', encoding='utf-8') as f:
segments = json.load(f)

print("🧠 Analyzing dialogue structure (Max 15s / Gap 0.8s)...")
blocks = list(get_smart_blocks(segments))
print(f" Segments merged into {len(blocks)} playback blocks.")

# 开始拼接
last_audio_ptr = 0 # 英文音频指针 (毫秒)
chunk_files = []
current_chunk = AudioSegment.empty()

print(f"🚀 Mixing started...")

for idx, block in enumerate(blocks):
# 1. 截取英文原声
# 即使被强制切分了,这里也会无缝衔接上一块的结束点
target_end_ms = int(block['end'] * 1000)
start_ms = max(last_audio_ptr, 0)

if target_end_ms > start_ms:
eng_segment = original_audio[start_ms:target_end_ms]
current_chunk += eng_segment
last_audio_ptr = target_end_ms

# 2. 拼接中文翻译
cn_block_audio = AudioSegment.empty()
has_cn = False

for seg in block['segments']:
# 查找对应的 TTS 文件
# 注意:如果之前语义分词改变了,这里的文件名索引可能需要重新对应
# 现在的逻辑假设 output_audio 里的索引是基于 output_subs/vocals_translated.json 的行号
# 这在整个流程中是一致的
tts_filename = f"{seg['original_index']:04d}_{seg.get('speaker', 'UNKNOWN')}.wav"
tts_path = os.path.join(TTS_DIR, tts_filename)

if os.path.exists(tts_path):
try:
seg_audio = AudioSegment.from_wav(tts_path)
cn_block_audio += seg_audio
# 句间微停顿
cn_block_audio += AudioSegment.silent(duration=100)
has_cn = True
except:
pass

# 3. 组合
if has_cn:
# 增加音量
cn_block_audio = cn_block_audio + CN_VOLUME_BOOST

# 插入间隙
current_chunk += AudioSegment.silent(duration=GAP_EN_TO_CN)
current_chunk += cn_block_audio
current_chunk += AudioSegment.silent(duration=GAP_CN_TO_EN)

# 4. 分块写盘 (每 50 个 Block)
if (idx + 1) % 50 == 0 or idx == len(blocks) - 1:
chunk_name = os.path.join(TEMP_DIR, f"chunk_{idx}.wav")
current_chunk.export(chunk_name, format="wav")
chunk_files.append(chunk_name)
current_chunk = AudioSegment.empty()
if idx % 100 == 0:
print(f" Processed {idx+1}/{len(blocks)} blocks...")

# 处理尾巴
if last_audio_ptr < len(original_audio):
outro_chunk = original_audio[last_audio_ptr:]
outro_name = os.path.join(TEMP_DIR, "chunk_final.wav")
outro_chunk.export(outro_name, format="wav")
chunk_files.append(outro_name)

# 5. FFmpeg 合并
print("✨ Merging chunks with FFmpeg...")
concat_list_path = os.path.join(TEMP_DIR, "files.txt")
with open(concat_list_path, 'w') as f:
for p in chunk_files:
abs_path = os.path.abspath(p)
f.write(f"file '{abs_path}'\n")

subprocess.run([
"ffmpeg", "-y", "-f", "concat", "-safe", "0",
"-i", concat_list_path,
"-c:a", "libmp3lame", "-q:a", "2",
FINAL_OUTPUT
])

shutil.rmtree(TEMP_DIR)
print(f"🎉 Success! Output: {FINAL_OUTPUT}")

if __name__ == "__main__":
main()

执行完成后就有了一个五个半小时的超长音频,截取 5 分钟预览如下:

下面是优化了语义分片等逻辑,重新抽卡的结果(sounds gay but ok):

0x06 What’s Next?

上面流程的输出效果仅仅是“能听”,距离成熟的播客翻译还有一段距离。将完整记录重新丢给 gemini 分析得到以下几个优化方向:

  • 同声传译模式还有一定的优化空间
  • WhisperX 切分的上下文过于碎片化
    • 考虑引入 NLP 模块重新合并分句(如 spacy
    • 批处理中考虑引入压缩的上下文或者通过滑动窗口翻译(很费 token,已放弃)
  • 实现更好的翻译模式(词汇表等)
  • 实现微调解决音色克隆问题(暂时考虑完全交给 GPT-SoVITS 实现)
  • 加入文本去噪和中文背景音等优化策略
  • 支持 RSS Feed 生成,或许会是产品化的亮点