动手实现一个播客翻译工作流

之前有期 Dwarkesh Patel 对话 Andrej Karpathy 的播客放在书签里好久没看,每天车上通勤时间有点浪费,遂想将播客转译为中文人声的想法。使用 Notebooklm 的话,通过 AI 提炼的文字对于播客这种信息密度比较高的内容来说损失难以衡量,而市面上虽然已有一些流行的视频字幕和人声转换工具,但好像又没有开源成熟的播客翻译的项目实现,于是准备先手动跑一遍流程,再看看有没有什么适合封装的实现。

跟 gemini 交流了一番,总结出主要流程需包含以下步骤:

  • Demucs:人声分离
  • WhisperX:声音识别与分割
  • LLM API:翻译
  • GPT-SoVITS:生成中文语音
  • pydub/ffmpeg-python:时序对齐和混音

0x00 环境准备

本来用的 nvidia pytorch 包依赖冲突太多,遂决定基于 nvcr.io/nvidia/cuda:12.1.1-cudnn8-devel-ubuntu22.04 重新构建一个,requirement.txt 参考了 VideoLingo 的,让 gemini 精简了下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# --- 核心基础库 ---
numpy==1.26.4
pandas==2.2.3
librosa==0.10.2.post1
scipy

# --- 音频处理与计算 ---
pydub==0.25.1
moviepy==1.0.3
ctranslate2==4.4.0
transformers==4.39.3
pytorch-lightning==2.3.3

# --- 核心 AI 模型 (直接从源码安装以获取最新修复) ---
# Demucs: 人声分离
demucs[dev] @ git+https://github.com/adefossez/demucs
# WhisperX: 识别与对齐 (指定了特定 Commit 以保证稳定性)
whisperx @ git+https://github.com/m-bain/whisperx.git@7307306a9d8dd0d261e588cc933322454f853853

# --- LLM 与 文本处理 ---
openai==1.55.3
json-repair # 极力推荐:防止 LLM 返回的 JSON 格式错误
spacy==3.7.4 # 用于更好的分句处理
autocorrect-py # 简单的拼写纠错

# --- 辅助工具 ---
PyYAML==6.0.2
requests==2.32.3

由于使用的 enroot 环境,没通过 dockerfile 构建,直接命令行一把梭:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 安装系统级依赖
apt-get update && apt-get install -y \
python3.10 \
python3-pip \
python3-venv \
git \
ffmpeg \
libavcodec-dev \
libavformat-dev \
libavdevice-dev \
libavutil-dev \
libswscale-dev \
libswresample-dev \
libavfilter-dev \
pkg-config \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# 安装 PyTorch
pip install --upgrade pip
pip install torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 --index-url https://download.pytorch.org/whl/cu121
# 安装剩余依赖
pip install -r requirements.txt
# 创建软链接
ln -s /usr/bin/python3.10 /usr/bin/python

0x01 人声分离

Demucs 是一款先进的音乐源分离模型,目前能够将鼓、贝斯和人声从其他伴奏中分离出来。

之前从 YouTube 下载的音频是 m4v 格式,虽然 demucs 支持,但还是按照 gemini 建议先将其转换为 wav 以提高后续处理效率。(转换后的 wav 体积大概是源文件的 10 倍,我是通过 ssh 进行的操作下载很慢,所以后续需要截取切片来试听效果)

1
ffmpeg -i input.m4a -ar 44100 -ac 2 input.wav

使用 demucs 分离人声

1
demucs --two-stems=vocals -n htdemucs input.wav
  • –two-stems=vocals: 告诉模型我只要 vocals(人声)和 no_vocals(伴奏/背景音)。如果不加这个,它会分成鼓、贝斯、其他等4个轨道,对播客没用。
  • -n htdemucs: 指定使用最新模型。

输出的音频位于 separated/htdemucs/input 下,分为 vocals.wavno_vocals.wav 两个音频文件(都是 GB 级别),可以通过以下命令截取 5 分钟片段下载到本地试听下效果:

1
2
ffmpeg -i separated/htdemucs/input/vocals.wav -t 300 -c copy testvocals.wav
ffmpeg -i separated/htdemucs/input/no_vocals.wav -t 300 -c copy test_no_vocals.wav

在音频目录下也可以通过 python 启动一个 http 服务转发音频在浏览器中直接预览:

1
python3 -m http.server 8000

0x02 语音识别

Whisper 是 OpenAI 开发的一款自动语音识别 (ASR) 模型,它基于一个包含各种音频的大型数据集进行训练。虽然它能生成高度准确的转录文本,但对应的时间戳是按语句而非单词计算的,因此可能存在几秒的误差。OpenAI 的 Whisper 本身并不支持批量处理。

WhisperX 是基于 Whisper 的工程封装库,支持说话人区分。

在使用 WhisperX 前,需要准备一些配置,比如说话人声分割用到的模型 speaker-diarization-3.1 要在参数中添加 huggingface 令牌。

首先在网页端 新建一个 read 权限的 token,同时需要在 speaker-diarization-3.1segmentation-3.0 页面同意开源模型协议。

准备完成后就可以在命令行启动 whisperx 进行语音转文字。

1
2
3
4
5
6
7
8
9
10
11
12
whisperx separated/htdemucs/input/vocals.wav \
--model large-v2 \
--language en \
--diarize \
--min_speakers 3 \
--max_speakers 3 \
--max_line_width 100 \
--output_dir output_subs \
--output_format all \
--compute_type float16 \
--batch_size 16 \
--hf_token <huggingface_token>

下面是 gemini 给出的一些参数解释:

  • –model large-v2:
    • 目前 Whisper 生态中英文识别综合能力最强的模型(v3 在某些场景下对音乐的抗噪反而不如 v2,v2 是公认最稳的)。
    • 3090 显存足够大,不用担心跑不动。
  • –language en:
    • 显式指定英语。虽然它能自动检测,但指定后可以避免开头几秒误识别,并略微提升速度。
  • –compute_type float16:
    • 使用半精度加速,在 3090 上这是标准操作,速度快且不损失精度。
  • –batch_size 16:
    • 这会极大提升处理速度。3090 的 24G 显存完全吃得消 batch 16 甚至 32。
  • –min_speakers 2 / --max_speakers 2?:
    • 显式告诉 WhisperX “这里面有 2 个人”,能极大提高说话人区分 (Diarization) 的准确度,防止它把偶尔的背景杂音识别成第 3 个说话人。
    • 注:如果你不确定有几个人,可以不加这两个参数,但在播客场景下指定人数通常效果最好。
    • 补充:好吧后续在翻译结束后才发现中间有另一个女声(还好白嫖的翻译 api),重新改为 3 再试下

运行完成后输出在当前路径下的 output_subs 目录:

1
2
$ ls output_subs/
vocals.json vocals.srt vocals.tsv vocals.txt vocals.vtt

将 json 文件(后续准备用来翻译的格式)下载下来预览一下:

1
2
3
{"segments": [{"start": 0.109, "end": 1.009, "text": " Reinforcement learning is terrible.", "words": [{"word": "Reinforcement", "start": 0.109, "end": 0.449, "score": 0.303, "speaker": "SPEAKER_01"}, {"word": "learning", "start": 0.469, "end": 0.649, "score": 0.849, "speaker": "SPEAKER_01"}
...
, {"word": "one.", "start": 8766.077, "end": 8766.157, "score": 0.741, "speaker": "SPEAKER_01"}], "language": "en"}

可以看出其中包含了文本时间轴和说话人等关键信息,但是也不难发现有些句子被切分得很短,在丢给大模型翻译时可能会丢失上下文信息。不过这个问题可以先放一边,后续再考虑优化。

最好再确认下人声数量分布,提前发现问题。

1
2
3
4
$ grep -o '"speaker": "[^"]*"' output_subs/vocals.json | sort | uniq -c
319 "speaker": "SPEAKER_00"
46484 "speaker": "SPEAKER_01"
16872 "speaker": "SPEAKER_02"

0x03 文本翻译

这边直接使用 gemini 提供的翻译脚本,支持断点续传以增加安全感,本地通过 new api 代理白嫖一个 gemini-2.5-flash 测试下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import json
import os
import traceback
from openai import OpenAI
from concurrent.futures import ThreadPoolExecutor

# --- 配置区 ---
INPUT_FILE = "output_subs/vocals.json"
OUTPUT_FILE = "output_subs/vocals_translated.json"
MODEL_NAME = "gemini-2.5-flash"
BATCH_SIZE = 10
MAX_WORKERS = 1 # 保持单线程以确保保存顺序和稳定性

client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL")
)

def load_data():
"""
智能加载逻辑:
1. 如果输出文件(vocals_translated.json)存在,优先加载它(里面可能包含了部分已翻译的)。
2. 如果不存在,加载原始 Whisper 结果。
"""
if os.path.exists(OUTPUT_FILE):
print(f"♻️ Found existing progress in {OUTPUT_FILE}, resuming...")
with open(OUTPUT_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
else:
print(f"🆕 No progress found, starting from scratch with {INPUT_FILE}...")
with open(INPUT_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
# 确保每个 segment 都有 text_zh 字段,方便后续判断
segments = data['segments'] if 'segments' in data else data
for seg in segments:
if 'text_zh' not in seg:
seg['text_zh'] = ""
return segments

def save_data(segments):
"""实时保存数据"""
with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
json.dump(segments, f, ensure_ascii=False, indent=2)

def translate_batch(batch_data):
"""
处理一个批次。
输入:(index, batch_segments)
"""
batch_idx, segments = batch_data

# --- [关键] 断点续传检查 ---
# 如果这一批里所有的句子都已经有翻译了,直接跳过
if all(seg.get('text_zh', '').strip() != "" for seg in segments):
print(f"⏩ Batch {batch_idx} already translated, skipping.")
return segments, False # False 表示没有进行 API 调用

# --- 开始翻译 ---
prompt_text = ""
for i, seg in enumerate(segments):
speaker = seg.get('speaker', 'UNKNOWN')
text = seg['text'].strip()
prompt_text += f"[{i}] Speaker {speaker}: {text}\n"

system_prompt = (
"你是一位专业的播客字幕翻译专家。请将下面的英文播客对话翻译成中文。\n"
"要求:\n"
"1. 保持口语化,自然流畅。\n"
"2. 严格保持原有的行数和顺序。\n"
"3. 返回 JSON 列表,例如:[\"第一句\", \"第二句\"]\n"
)

try:
print(f"🔄 Translating Batch {batch_idx} ({len(segments)} lines)...")
response = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt_text}
],
# response_format={"type": "json_object"} # 已注释,兼容性更好
)
result = response.choices[0].message.content.strip()

# 简单的 JSON 清洗(处理 Markdown 代码块)
if result.startswith("```"):
result = result.split("\n", 1)[1]
if result.endswith("```"):
result = result.rsplit("\n", 1)[0]

try:
translations = json.loads(result)
if isinstance(translations, dict): # 兼容 {"translations": [...]}
translations = list(translations.values())[0]
except:
print(f"⚠️ JSON Parse Error in Batch {batch_idx}, raw: {result[:20]}...")
translations = []

except Exception as e:
print(f"❌ Error in Batch {batch_idx}: {str(e)}")
translations = []

# 回填结果
for i, seg in enumerate(segments):
if i < len(translations) and isinstance(translations, list):
seg['text_zh'] = str(translations[i])

return segments, True # True 表示进行了更新

def main():
all_segments = load_data()
total_len = len(all_segments)

# 构造批次,带上索引以便追踪
batches = []
for i in range(0, total_len, BATCH_SIZE):
batch_seg = all_segments[i : i + BATCH_SIZE]
batches.append((i // BATCH_SIZE, batch_seg))

print(f"🚀 Processing {len(batches)} batches...")

# 使用单线程处理(因为我们需要频繁写文件,单线程最安全且最容易 debug)
# 如果你想快,可以改 max_workers,但保存逻辑会变得复杂(需要加锁)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# map 会按顺序返回结果,这对我们按顺序保存很重要
results = executor.map(translate_batch, batches)

for i, (updated_batch, changed) in enumerate(results):
# 将更新后的 batch 塞回总列表(其实引用没变,这步是多余的但逻辑更清晰)
start_idx = i * BATCH_SIZE
all_segments[start_idx : start_idx + len(updated_batch)] = updated_batch

# 只有当确实发生了 API 调用并更新了数据时,才写入硬盘
# 或者每处理 5 个 batch 强制保存一次,防止断电
if changed or i % 5 == 0:
save_data(all_segments)
if changed:
print(f"💾 Saved progress at batch {i}")

# 最后再保存一次
save_data(all_segments)
print(f"✅ All done! Saved to {OUTPUT_FILE}")

if __name__ == "__main__":
main()

看起来 AI 很喜欢在日志中加入符号:

1
2
3
4
5
6
7
8
9
$ python translate_subs.py
🆕 No progress found, starting from scratch with output_subs/vocals.json...
🚀 Processing 200 batches...
🔄 Translating Batch 0 (10 lines)...
🔄 Translating Batch 1 (10 lines)...
💾 Saved progress at batch 0
🔄 Translating Batch 2 (10 lines)...
💾 Saved progress at batch 1
🔄 Translating Batch 3 (10 lines)...

完成后生成的 vocals_translated.json 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[
{
"start": 0.109,
"end": 1.009,
"text": " Reinforcement learning is terrible.",
"words": [
{
"word": "Reinforcement",
"start": 0.109,
"end": 0.449,
"score": 0.303,
"speaker": "SPEAKER_01"
},
{
"word": "learning",
"start": 0.469,
"end": 0.649,
"score": 0.849,
"speaker": "SPEAKER_01"
},
{
"word": "is",
"start": 0.669,
"end": 0.709,
"score": 0.96,
"speaker": "SPEAKER_01"
},
{
"word": "terrible.",
"start": 0.729,
"end": 1.009,
"score": 0.899,
"speaker": "SPEAKER_01"
}
],
"speaker": "SPEAKER_01",
"text_zh": "强化学习糟透了。"
},
...
]

0x04 语音生成

首先准备一个新的 cuda 12.8 环境,安装 miniconda,然后使用 GPT-SoVITS 官方的一键安装脚本来初始化环境。

1
2
3
4
5
6
7
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
conda create -n GPTSoVits python=3.10
conda activate GPTSoVits
git clone https://github.com/RVC-Boss/GPT-SoVITS.git
cd GPT-SoVITS
bash install.sh --device CU128 --source ModelScope

按照 gemini 建议采用零样本(Zero-shot)微调,先通过脚本获取几个说话人的音频切片:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import json
import random
from pydub import AudioSegment

# --- 配置 ---
WAV_FILE = "separated/htdemucs/input/vocals.wav"
JSON_FILE = "output_subs/vocals_translated.json"
OUTPUT_DIR = "output_refs"

def main():
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)

print(f"Loading {JSON_FILE}...")
with open(JSON_FILE, 'r', encoding='utf-8') as f:
segments = json.load(f)

# 1. 按说话人分组
speakers = {}
for seg in segments:
spk = seg.get('speaker', 'UNKNOWN')
duration = seg['end'] - seg['start']
text_len = len(seg['text'])

# 将所有句子都存下来,顺便存下元数据方便打分
if spk not in speakers:
speakers[spk] = []

# 给每个句子打分:
# - 时长越接近 7秒 越好
# - 文本越长越好(但别超过150)
score = -abs(duration - 7.0)
if text_len < 10: score -= 100 # 太短的惩罚
if text_len > 150: score -= 100 # 太长的惩罚

speakers[spk].append({
"seg": seg,
"score": score
})

# 2. 加载音频(这可能会花几秒钟,因为文件大)
print(f"Loading wav file (this may take a while)...")
audio = AudioSegment.from_wav(WAV_FILE)

# 3. 为每个说话人提取 1 个最佳片段
for spk, items in speakers.items():
# 按分数排序,取分数最高的
items.sort(key=lambda x: x["score"], reverse=True)
best_item = items[0]
ref_seg = best_item["seg"]

print(f"Extracting ref for {spk} (Score: {best_item['score']:.2f}): {ref_seg['text']}")

# pydub 切片单位是毫秒
start_ms = int(ref_seg['start'] * 1000)
end_ms = int(ref_seg['end'] * 1000)

ref_audio = audio[start_ms:end_ms]

# 保存音频
ref_wav_path = f"{OUTPUT_DIR}/{spk}_ref.wav"
ref_audio.export(ref_wav_path, format="wav")

# 保存对应的文本(TTS 需要知道这段音频说了啥)
ref_text_path = f"{OUTPUT_DIR}/{spk}_ref.txt"
with open(ref_text_path, 'w', encoding='utf-8') as f:
f.write(ref_seg['text']) # 注意:参考文本必须是原文(英文),不是译文

print(f"✅ Saved {ref_wav_path}")

if __name__ == "__main__":
import os
main()

执行后输出的目录结构如下,分别对应多个说话人的音频切片和文字样本:

1
2
3
4
5
6
7
8
$ tree output_refs/
output_refs/
├── SPEAKER_00_ref.txt
├── SPEAKER_00_ref.wav
├── SPEAKER_01_ref.txt
├── SPEAKER_01_ref.wav
├── SPEAKER_02_ref.txt
└── SPEAKER_02_ref.wav

启动 api_v2.py(使用最新的 v2 版本),并让 gemini 给了一个调用 GPT-SoVITS api 的脚本,利用之前的采样片段和英文分片音频生成新的中文人声音频。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import json
import os
import requests
import time

# --- 配置区 ---
INPUT_JSON = "output_subs/vocals_translated.json"
OUTPUT_DIR = "output_audio"
REF_DIR = "output_refs"
# 注意:api_v2 的地址通常不需要加 /tts 后缀在配置里,我们在请求时拼接
GPT_SOVITS_URL = "http://127.0.0.1:9880"

# --- 动态加载参考音频配置 ---
def load_speaker_config(ref_dir):
if not os.path.exists(ref_dir):
raise FileNotFoundError(f"❌ 参考音频目录 {ref_dir} 不存在!")

config = {}
print(f"🔍 Scanning {ref_dir} for reference audio...")

for filename in os.listdir(ref_dir):
if filename.endswith("_ref.wav"):
speaker_id = filename.replace("_ref.wav", "")
wav_path = os.path.abspath(os.path.join(ref_dir, filename))
txt_path = os.path.join(ref_dir, f"{speaker_id}_ref.txt")

prompt_text = ""
if os.path.exists(txt_path):
with open(txt_path, 'r', encoding='utf-8') as f:
prompt_text = f.read().strip()

config[speaker_id] = {
"ref_audio_path": wav_path,
"prompt_text": prompt_text,
"prompt_lang": "en"
}
print(f" ✅ Loaded: {speaker_id}")

return config

def generate_audio(text_zh, speaker_id, config, output_filename):
"""适配 api_v2.py 的 POST 请求"""

if speaker_id in config:
spk_cfg = config[speaker_id]
else:
default_key = list(config.keys())[0]
spk_cfg = config[default_key]
print(f" ⚠️ Speaker {speaker_id} fallback to {default_key}")

# --- 关键修改点:参数名适配 api_v2 ---
payload = {
"text": text_zh,
"text_lang": "zh", # v1是 text_language, v2是 text_lang
"ref_audio_path": spk_cfg["ref_audio_path"], # v1是 refer_wav_path, v2是 ref_audio_path
"prompt_text": spk_cfg["prompt_text"],
"prompt_lang": spk_cfg["prompt_lang"], # v1是 prompt_language, v2是 prompt_lang
"media_type": "wav",
"streaming_mode": False, # 批量生成不需要流式
"text_split_method": "cut5",
"speed_factor": 1.0
}

try:
# --- 关键修改点:使用 POST 请求访问 /tts 端点 ---
# api_v2 的合成接口在 /tts
url = f"{GPT_SOVITS_URL}/tts"

response = requests.post(url, json=payload, stream=True, timeout=120)

if response.status_code == 200:
if 'application/json' in response.headers.get('Content-Type', ''):
print(f"❌ API Error: {response.text}")
return False

with open(output_filename, "wb") as f:
for chunk in response.iter_content(chunk_size=4096):
if chunk:
f.write(chunk)
return True
else:
print(f"❌ API Error {response.status_code}: {response.text}")
return False

except Exception as e:
print(f"❌ Connection Error: {e}")
return False

def clean_text(text):
if not text: return ""
return text.replace('\n', ' ').strip()

def main():
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)

speakers_config = load_speaker_config(REF_DIR)
if not speakers_config: return

print(f"📂 Loading {INPUT_JSON}...")
with open(INPUT_JSON, 'r', encoding='utf-8') as f:
segments = json.load(f)

total = len(segments)
print(f"🚀 Starting TTS (v2 API) for {total} segments...")

success_count = 0
for i, seg in enumerate(segments):
speaker = seg.get('speaker', 'UNKNOWN')
text_zh = clean_text(seg.get('text_zh', ''))
filename = f"{i:04d}_{speaker}.wav"
output_path = os.path.join(OUTPUT_DIR, filename)

if not text_zh: continue
if os.path.exists(output_path) and os.path.getsize(output_path) > 1024:
success_count += 1
continue

print(f"🔊 [{i:04d}/{total}] {speaker}: {text_zh[:20]}...")

for attempt in range(3):
if generate_audio(text_zh, speaker, speakers_config, output_path):
success_count += 1
break
time.sleep(1)

print(f"\n✅ Generation Complete! ({success_count}/{total} files)")

if __name__ == "__main__":
main()

首次执行时发现之前的 SPEAKER_00 的女声采样片段只有 1s,不符合 GPT-SoVITS 要求,强行将所有女声片段拼成了一个 4s 音频重新生成。

单张 3090 上大概跑了一小时左右,生成了如下所有中文音频片段。

1
2
3
4
5
6
7
8
output_audio/
├── 0000_SPEAKER_01.wav
├── 0001_SPEAKER_01.wav
├── 0002_SPEAKER_01.wav
...
├── 1990_SPEAKER_02.wav
├── 1991_SPEAKER_02.wav
└── 1992_SPEAKER_02.wav

试听了一下估计由于仅仅使用了很短的样本而没有微调模型,声音和原来的 Karpathy 完全不一样,不过可以先用着,后面再考虑微调。

0x05 合并输出

接下来让 gemini 给了一段合并的代码,包含了语速调整和时间轴对齐等处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import json
import os
import subprocess
from pydub import AudioSegment, silence

# --- 配置 ---
JSON_FILE = "output_subs/vocals_translated.json"
AUDIO_DIR = "output_audio"
BACKGROUND_FILE = "separated/htdemucs/input/no_vocals.wav"
FINAL_OUTPUT = "final_podcast_smart_cn.mp3"
TEMP_WAV = "temp_vocals_smart.wav"

# --- 核心参数 ---
MAX_SPEED = 1.35 # 最大加速倍率 (超过这个倍率,宁愿让时间轴顺延)
MIN_SPEED = 1.0 # 最小速度 (绝不慢放,防止拖长音)

def remove_silence(audio_segment, silence_thresh=-50, min_silence_len=100):
"""
去除音频首尾的静音,节省时长。
"""
# pydub 的 detect_nonsilent 比较慢,这里用简单的 strip 逻辑
# 实际上 GPT-SoVITS 的静音通常比较纯净,直接用 split_on_silence 可能太激进
# 这里我们只做一个简单的首尾裁剪

def detect_leading_silence(sound, silence_threshold=-50.0, chunk_size=10):
trim_ms = 0 # ms
assert chunk_size > 0 # to avoid infinite loop
while sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold and trim_ms < len(sound):
trim_ms += chunk_size
return trim_ms

start_trim = detect_leading_silence(audio_segment, silence_thresh)
end_trim = detect_leading_silence(audio_segment.reverse(), silence_thresh)

duration = len(audio_segment)
stripped = audio_segment[start_trim:duration-end_trim]

# 如果全剪没了,说明全是静音,返回一段很短的静音防止报错
if len(stripped) == 0:
return AudioSegment.silent(duration=100)

return stripped

def change_speed(audio_segment, speed=1.0):
"""使用 FFmpeg 进行变速 (Time Stretching)"""
if 0.99 < speed < 1.01:
return audio_segment

# 导出临时源文件
src_tmp = f"tmp_src_{os.getpid()}.wav"
dst_tmp = f"tmp_dst_{os.getpid()}.wav"
audio_segment.export(src_tmp, format="wav")

# 限制 ffmpeg atempo 滤镜的范围 (0.5 - 2.0)
# 虽然我们在外面限制了 MAX_SPEED,但为了安全再次钳制
safe_speed = max(0.5, min(speed, 2.0))

cmd = [
"ffmpeg", "-y", "-v", "error", "-i", src_tmp,
"-filter:a", f"atempo={safe_speed}",
"-vn", dst_tmp
]

subprocess.run(cmd)

if os.path.exists(dst_tmp):
try:
new_seg = AudioSegment.from_wav(dst_tmp)
except:
print("⚠️ FFmpeg output invalid, using original.")
new_seg = audio_segment

# 清理
if os.path.exists(src_tmp): os.remove(src_tmp)
if os.path.exists(dst_tmp): os.remove(dst_tmp)
return new_seg
else:
print("❌ FFmpeg failed, using original.")
if os.path.exists(src_tmp): os.remove(src_tmp)
return audio_segment

def main():
print(f"📂 Loading {JSON_FILE}...")
with open(JSON_FILE, 'r', encoding='utf-8') as f:
segments = json.load(f)

final_vocals = AudioSegment.empty()

# 游标:记录当前生成的音频已经在时间轴上铺到了哪里
# 单位:毫秒
cursor_ms = 0

print(f"🚀 Smart Merging {len(segments)} segments...")
print(f" Policy: No Slow-down (Min {MIN_SPEED}x), Cap Speed-up (Max {MAX_SPEED}x)")

for i, seg in enumerate(segments):
speaker = seg.get('speaker', 'UNKNOWN')
filename = f"{i:04d}_{speaker}.wav"
file_path = os.path.join(AUDIO_DIR, filename)

# 原始字幕的开始和结束时间
orig_start_ms = int(seg['start'] * 1000)
orig_end_ms = int(seg['end'] * 1000)
orig_duration = orig_end_ms - orig_start_ms

# 1. 间隙处理 (Gap Handling)
# 如果当前游标还在后面 (cursor < start),说明中间有空隙(没人说话)
# 我们要填补这段静音,让时间轴对齐
if cursor_ms < orig_start_ms:
silence_dur = orig_start_ms - cursor_ms
final_vocals += AudioSegment.silent(duration=silence_dur)
cursor_ms = orig_start_ms # 游标移动到当前句子的原定开始处

# 注意:如果 cursor_ms > orig_start_ms,说明上一句说太长了,溢出了。
# 此时我们不填静音,直接紧接着上一句开始(Smart Overflow),
# 这样会稍微推迟这一句的开始时间,但保证了上一句的完整性。

# 2. 准备音频
if not os.path.exists(file_path):
# 如果文件缺失,用静音代替,长度设为原长
# (或者直接跳过,看你喜好,填静音能保持节奏)
# print(f"⚠️ Missing: {filename}")
processed_audio = AudioSegment.silent(duration=orig_duration)
else:
raw_audio = AudioSegment.from_wav(file_path)
# 剪除生成的空白,争取时间
trimmed_audio = remove_silence(raw_audio)

gen_duration = len(trimmed_audio)

# 3. 计算变速策略
if gen_duration == 0:
speed = 1.0
else:
# 理论需要的倍速 = 生成时长 / 原时长
# 比如生成了 10s,原长 5s,需要 2.0x
ratio = gen_duration / orig_duration

if ratio <= 1.0:
# 如果生成得比原片短 (0.8x),不需要慢放
# 保持 1.0x,后面自然会留白
speed = 1.0
else:
# 如果生成得太长
if ratio > MAX_SPEED:
# 如果需要 2.0x,但我们限制最大 1.35x
# 那么就只加速到 1.35x,剩下的让它溢出
speed = MAX_SPEED
else:
# 在 1.0x - 1.35x 之间,按需加速
speed = ratio

# 执行变速
processed_audio = change_speed(trimmed_audio, speed)

# 4. 拼接到总轨道
final_vocals += processed_audio

# 5. 更新游标
cursor_ms += len(processed_audio)

if i % 100 == 0:
print(f"Processing... {i}/{len(segments)} | Timeline Shift: {cursor_ms - orig_end_ms}ms")

# 导出人声
print("💾 Exporting vocals track...")
final_vocals.export(TEMP_WAV, format="wav")

# 混音
print("🎛️ Mixing with background...")

# 检查背景音时长,如果背景音不够长(因为溢出导致人声变长),FFmpeg 默认会以最短的结束
# 我们可以加上 -shortest 或者 loop,但通常 Podcast 背景音都很长
# 这里用 amix,duration=first (以人声长度为准,背景音长了截断,短了就没了)
# 更好的方式是 duration=longest 但背景音得够长

cmd_mix = [
"ffmpeg", "-y",
"-i", TEMP_WAV,
"-i", BACKGROUND_FILE,
"-filter_complex",
# volume=0.3: 背景音稍微调小一点,保证清晰
# apad: 给背景音补静音防止比人声短 (保险起见)
"[1:a]volume=0.3[bg];[0:a][bg]amix=inputs=2:duration=first:dropout_transition=2",
"-ac", "2",
FINAL_OUTPUT
]

subprocess.run(cmd_mix)

# 清理
if os.path.exists(TEMP_WAV):
os.remove(TEMP_WAV)

print(f"✅ All Done! Output: {FINAL_OUTPUT}")

if __name__ == "__main__":
main()

I re-watched the pod just now too. First of all, yes I know, and I’m sorry that I speak so fast :).
—— Andrej Karpathy

好吧确实很快,并且使用初版代码时很多地方的变速导致听感很奇怪,快的时候会听不清,而慢的时候会拖很长。

上面的代码是已经优化过的版本,主要尝试的优化改动在于如果中文快的话会留白而不是拖长,中文慢的话则会尽可能占用后面句子的时间来利用留白的部分。

实际测试下来有比之前好一些,但是作为信息密度很高的播客来听的话还是会丢失不少关键信息。想了想不妨试试把每一段中文音频原速放在英文之后,作为补充翻译。这样既可以锻炼听力,也可以尽可能保证信息不丢失。不过这样的话可能还需要注意连续播放的长度,尽可能播放完整的一段话再给出翻译,而中文翻译的声音可以忽略背景音。

首先试了一下按时长切片穿插翻译,但是这样听起来有点突兀。于是又想到播客场景也可以在另一个发言人接话之前给整段发言翻译,保证单人发言的连贯性。测试修改后的脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import json
import os
import shutil
import subprocess
from pydub import AudioSegment

# --- 配置区 ---
JSON_FILE = "output_subs/vocals_translated.json"
AUDIO_DIR = "output_audio"
ORIGINAL_WAV = "input.wav"
FINAL_OUTPUT = "podcast_smart_interleaved.mp3"

# 逻辑参数
MAX_BLOCK_DURATION = 20.0 # (秒) 单次翻译的最大英文时长。超过这个长度会强制切分翻译。
# 建议 15-25秒,太短太碎,太长记不住。

# 临时文件配置
TEMP_DIR = "temp_chunks"
PROGRESS_FILE = os.path.join(TEMP_DIR, "progress_smart.json")
CHUNK_SIZE = 10 # 每处理多少个 Block 写一次盘

def get_smart_blocks(segments):
"""
智能分块生成器
按照 '说话人改变' 或 '累计时长超过阈值' 来生成 Block
"""
current_block = []
current_duration = 0.0
current_speaker = None

# 为了保留原始索引用于查找文件
for i, seg in enumerate(segments):
seg['original_index'] = i

for seg in segments:
speaker = seg.get('speaker')
# 计算这句话的长度 (秒)
seg_len = seg['end'] - seg['start']

# 决策是否要“结算”当前块
should_flush = False
reason = ""

if current_block:
# 1. 换人了 -> 必须结算
if speaker != current_speaker:
should_flush = True
reason = "Speaker Change"
# 2. 当前块太长了 -> 必须结算 (切分长难句)
elif (current_duration + seg_len) > MAX_BLOCK_DURATION:
should_flush = True
reason = "Max Duration Exceeded"

if should_flush:
# yield 出去当前块
yield {
"speaker": current_speaker,
"segments": current_block,
"end_time": current_block[-1]['end'] # 这一块最后一句的结束时间
}
# 重置
current_block = []
current_duration = 0.0

# 加入新句子
current_block.append(seg)
current_duration += seg_len
current_speaker = speaker

# 处理剩下的
if current_block:
yield {
"speaker": current_speaker,
"segments": current_block,
"end_time": current_block[-1]['end']
}

def load_progress():
if os.path.exists(PROGRESS_FILE):
with open(PROGRESS_FILE, 'r') as f:
return json.load(f)
return {"last_block_idx": -1, "last_audio_ptr": 0, "chunks": []}

def save_progress(idx, ptr, chunk_file):
data = load_progress()
data["last_block_idx"] = idx
data["last_audio_ptr"] = ptr
if chunk_file:
data["chunks"].append(chunk_file)
with open(PROGRESS_FILE, 'w') as f:
json.dump(data, f)

def main():
# 清理旧环境 (建议每次手动删,这里为了安全起见不做 rm -rf,只创建)
if not os.path.exists(TEMP_DIR):
os.makedirs(TEMP_DIR)

print(f"📂 Loading original audio: {ORIGINAL_WAV}...")
try:
# 如果文件巨大,建议用 ffmpeg 切片加载,这里假设内存足够
original_audio = AudioSegment.from_wav(ORIGINAL_WAV)
except:
print(f"❌ 找不到 {ORIGINAL_WAV}")
return

print(f"📂 Loading {JSON_FILE}...")
with open(JSON_FILE, 'r', encoding='utf-8') as f:
segments = json.load(f)

# 生成所有 Blocks (纯元数据)
print("🧠 Analyzing dialogue structure...")
all_blocks = list(get_smart_blocks(segments))
print(f" Total Segments: {len(segments)}")
print(f" Smart Blocks: {len(all_blocks)}")

# 加载进度
progress = load_progress()
start_idx = progress["last_block_idx"] + 1
# 上一次英文截取到了哪里 (毫秒)
# 这个指针非常重要,确保英文是连续的,不会漏掉中间的思考声
last_audio_ptr = progress["last_audio_ptr"]

current_chunk_audio = AudioSegment.empty()

print(f"🚀 Processing from block {start_idx}...")

for b_idx in range(start_idx, len(all_blocks)):
block = all_blocks[b_idx]

# 1. 英文部分
# 截取范围:[上一次结束点] -> [当前块最后一句的结束点]
# 这样即使 Block 之间有 0.5秒 的静音,也会被包含在这一段里播放
target_end_ms = int(block['end_time'] * 1000)

# 容错:防止时间倒流
start_ms = max(last_audio_ptr, 0)
if target_end_ms > start_ms:
eng_audio = original_audio[start_ms:target_end_ms]
current_chunk_audio += eng_audio
last_audio_ptr = target_end_ms

# 2. 中文部分 (拼接该 Block 内所有句子)
cn_audio_combined = AudioSegment.empty()
has_cn = False

for seg in block['segments']:
orig_idx = seg['original_index']
speaker = seg.get('speaker', 'UNKNOWN')
filename = f"{orig_idx:04d}_{speaker}.wav"
tts_path = os.path.join(AUDIO_DIR, filename)

if os.path.exists(tts_path):
# 拼接
part_audio = AudioSegment.from_wav(tts_path)
cn_audio_combined += part_audio
# 句间微停顿 (更自然)
cn_audio_combined += AudioSegment.silent(duration=80)
has_cn = True

# 3. 组合
if has_cn:
# 英文说完后,空 0.2秒 再出中文
current_chunk_audio += AudioSegment.silent(duration=200)
current_chunk_audio += cn_audio_combined
# 中文说完后,空 0.4秒 再出下一段英文
current_chunk_audio += AudioSegment.silent(duration=400)

if b_idx % 20 == 0:
print(f"Processing Block {b_idx}/{len(all_blocks)}...")

# 4. 存盘 (流式)
if (b_idx + 1) % CHUNK_SIZE == 0 or (b_idx == len(all_blocks) - 1):
chunk_filename = os.path.join(TEMP_DIR, f"smart_chunk_{b_idx}.wav")
# print(f"💾 Saving {chunk_filename}...")
current_chunk_audio.export(chunk_filename, format="wav")
save_progress(b_idx, last_audio_ptr, chunk_filename)
current_chunk_audio = AudioSegment.empty()

# --- 合并阶段 ---
print("✨ Merging chunks...")
progress = load_progress()
chunk_files = progress["chunks"]

concat_list_path = os.path.join(TEMP_DIR, "concat_list.txt")
with open(concat_list_path, 'w') as f:
for chunk in chunk_files:
abs_path = os.path.abspath(chunk)
f.write(f"file '{abs_path}'\n")

# 尾巴
if last_audio_ptr < len(original_audio):
outro_path = os.path.join(TEMP_DIR, "outro.wav")
original_audio[last_audio_ptr:].export(outro_path, format="wav")
with open(concat_list_path, 'a') as f:
f.write(f"file '{os.path.abspath(outro_path)}'\n")

print("RUNNING FFmpeg concat...")
subprocess.run([
"ffmpeg", "-y", "-f", "concat", "-safe", "0",
"-i", concat_list_path,
"-c", "copy",
"temp_merged_smart.wav"
])

print("Converting to MP3...")
subprocess.run([
"ffmpeg", "-y", "-i", "temp_merged_smart.wav",
"-b:a", "192k",
FINAL_OUTPUT
])

# --- 清理工作 ---
print("🧹 Cleaning up temporary files...")
if os.path.exists("temp_merged_smart.wav"):
os.remove("temp_merged_smart.wav")

if os.path.exists(TEMP_DIR):
try:
shutil.rmtree(TEMP_DIR)
print(f"✅ Removed {TEMP_DIR}")
except Exception as e:
print(f"⚠️ Could not remove {TEMP_DIR}: {e}")

print(f"🎉 All Done! Output: {FINAL_OUTPUT}")

if __name__ == "__main__":
main()

执行完成后就有了一个五个半小时的超长音频,截取 5 分钟预览如下: