之前有期 Dwarkesh Patel 对话 Andrej Karpathy 的播客放在书签里好久没看,每天车上通勤时间有点浪费,遂想将播客转译为中文人声的想法。使用 Notebooklm 的话,通过 AI 提炼的文字对于播客这种信息密度比较高的内容来说损失难以衡量,而市面上虽然已有一些流行的视频字幕和人声转换工具,但好像又没有开源成熟的播客翻译的项目实现,于是准备先手动跑一遍流程,再看看有没有什么适合封装的实现。
跟 gemini 交流了一番,总结出主要流程需包含以下步骤:
Demucs:人声分离
WhisperX:声音识别与分割
LLM API:翻译
GPT-SoVITS:生成中文语音
pydub/ffmpeg-python:时序对齐和混音
0x00 环境准备
本来用的 nvidia pytorch 包依赖冲突太多,遂决定基于 nvcr.io/nvidia/cuda:12.1.1-cudnn8-devel-ubuntu22.04 重新构建一个,requirement.txt 参考了 VideoLingo 的,让 gemini 精简了下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 # --- 核心基础库 --- numpy==1.26.4 pandas==2.2.3 librosa==0.10.2.post1 scipy # --- 音频处理与计算 --- pydub==0.25.1 moviepy==1.0.3 ctranslate2==4.4.0 transformers==4.39.3 pytorch-lightning==2.3.3 # --- 核心 AI 模型 (直接从源码安装以获取最新修复) --- # Demucs: 人声分离 demucs[dev] @ git+https://github.com/adefossez/demucs # WhisperX: 识别与对齐 (指定了特定 Commit 以保证稳定性) whisperx @ git+https://github.com/m-bain/whisperx.git@7307306a9d8dd0d261e588cc933322454f853853 # --- LLM 与 文本处理 --- openai==1.55.3 json-repair # 极力推荐:防止 LLM 返回的 JSON 格式错误 spacy==3.7.4 # 用于更好的分句处理 autocorrect-py # 简单的拼写纠错 # --- 辅助工具 --- PyYAML==6.0.2 requests==2.32.3
由于使用的 enroot 环境,没通过 dockerfile 构建,直接命令行一把梭:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 apt-get update && apt-get install -y \ python3.10 \ python3-pip \ python3-venv \ git \ ffmpeg \ libavcodec-dev \ libavformat-dev \ libavdevice-dev \ libavutil-dev \ libswscale-dev \ libswresample-dev \ libavfilter-dev \ pkg-config \ build-essential \ && rm -rf /var/lib/apt/lists/* pip install --upgrade pip pip install torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 --index-url https://download.pytorch.org/whl/cu121 pip install -r requirements.txt ln -s /usr/bin/python3.10 /usr/bin/python
0x01 人声分离
Demucs 是一款先进的音乐源分离模型,目前能够将鼓、贝斯和人声从其他伴奏中分离出来。
之前从 YouTube 下载的音频是 m4v 格式,虽然 demucs 支持,但还是按照 gemini 建议先将其转换为 wav 以提高后续处理效率。(转换后的 wav 体积大概是源文件的 10 倍,我是通过 ssh 进行的操作下载很慢,所以后续需要截取切片来试听效果)
1 ffmpeg -i input.m4a -ar 44100 -ac 2 input.wav
可以先截取 30min 验证流程:
1 ffmpeg -i input_source.wav -t 00:30:00 -c copy input.wav
使用 demucs 分离人声:
1 demucs --two-stems=vocals -n htdemucs input.wav
–two-stems=vocals: 告诉模型我只要 vocals(人声)和 no_vocals(伴奏/背景音)。如果不加这个,它会分成鼓、贝斯、其他等4个轨道,对播客没用。
-n htdemucs: 指定使用最新模型。还有一个微调版本 htdemucs_ft,核心改进在于分离头的精细化调整 ,推理时间更长,质量更高。
输出的音频位于 separated/htdemucs/input 下,分为 vocals.wav 和 no_vocals.wav 两个音频文件(都是 GB 级别),可以通过以下命令截取 5 分钟片段下载到本地试听下效果:
1 2 ffmpeg -i separated/htdemucs/input/vocals.wav -t 300 -c copy testvocals.wav ffmpeg -i separated/htdemucs/input/no_vocals.wav -t 300 -c copy test_no_vocals.wav
在音频目录下也可以通过 python 启动一个 http 服务转发音频在浏览器中直接预览:
1 python3 -m http.server 8000
0x02 语音识别
Whisper 是 OpenAI 开发的 一款自动语音识别 (ASR) 模型,它基于一个包含各种音频的大型数据集进行训练。虽然它能生成高度准确的转录文本,但对应的时间戳是按语句而非单词计算的,因此可能存在几秒的误差。OpenAI 的 Whisper 本身并不支持批量处理。
WhisperX 是基于 Whisper 的工程封装库 ,支持说话人区分。
在使用 WhisperX 前,需要准备一些配置,比如说话人声分割用到的模型 speaker-diarization-3.1 要在参数中添加 huggingface 令牌。
首先在网页端 新建一个 read 权限的 token,同时需要在 speaker-diarization-3.1 和 segmentation-3.0 页面同意开源模型协议。
准备完成后就可以在命令行启动 whisperx 进行语音转文字。
1 2 3 4 5 6 7 8 9 10 11 whisperx separated/htdemucs/input/vocals.wav \ --model large-v2 \ --language en \ --diarize \ --min_speakers 2 \ --max_speakers 5 \ --output_dir output_subs \ --output_format all \ --compute_type float16 \ --batch_size 32 \ --hf_token <huggingface_token>
下面是 gemini 给出的一些参数解释:
–model large-v2:
目前 Whisper 生态中英文识别综合能力最强的模型(v3 在某些场景下对音乐的抗噪反而不如 v2,v2 是公认最稳的)。
3090 显存足够大,不用担心跑不动。
–language en:
显式指定英语。虽然它能自动检测,但指定后可以避免开头几秒误识别,并略微提升速度。
–diarize:
–compute_type float16:
使用半精度加速,在 3090 上这是标准操作,速度快且不损失精度。
–batch_size 16/32:
这会极大提升处理速度。3090 的 24G 显存完全吃得消 batch 16 甚至 32。
–min_speakers 2 / --max_speakers 5:
显式告诉 WhisperX “这里面有 2 个人”,能极大提高说话人区分 (Diarization) 的准确度,防止它把偶尔的背景杂音识别成第 3 个说话人。
注:如果你不确定有几个人,可以不加这两个参数,但在播客场景下指定人数通常效果最好。
补充:好吧后续在翻译结束后才发现中间有另一个女声(还好白嫖的翻译 api),导致两位男主声音合并到了一起。保险起见可以设置为 2~5 或更多。
–initial_prompt “Discussion about AI, LLMs, Transformer, PyTorch, CUDA, reinforcement learning, Andrej Karpathy, Dwarkesh Patel.”
注入提示词可以优化技术类播客的生成效果,不过先不考虑添加。
–max_line_width 100
分片字符长度限制。在后续尝试中被去除,保证长句完整性。
运行完成后输出在当前路径下的 output_subs 目录:
1 2 $ ls output_subs/vocals.json vocals.srt vocals.tsv vocals.txt vocals.vtt
将 json 文件(后续准备用来翻译的格式)下载下来预览一下:
1 2 3 { "segments" : [ { "start" : 0.109 , "end" : 1.009 , "text" : " Reinforcement learning is terrible." , "words" : [ { "word" : "Reinforcement" , "start" : 0.109 , "end" : 0.449 , "score" : 0.303 , "speaker" : "SPEAKER_01" } , { "word" : "learning" , "start" : 0.469 , "end" : 0.649 , "score" : 0.849 , "speaker" : "SPEAKER_01" } ... , { "word" : "one." , "start" : 8766.077 , "end" : 8766.157 , "score" : 0.741 , "speaker" : "SPEAKER_01" } ] , "language" : "en" }
可以看出其中包含了文本时间轴和说话人等关键信息,但是也不难发现有些句子被切分得很短,在丢给大模型翻译时可能会丢失上下文信息。不过这个问题可以先放一边,后续再考虑优化。
最好再确认下人声数量分布,提前发现问题。
1 2 3 4 $ grep -o '"speaker": "[^"]*"' output_subs/vocals.json | sort | uniq -c 319 "speaker": "SPEAKER_00" 46484 "speaker": "SPEAKER_01" 16872 "speaker": "SPEAKER_02"
0x03 文本翻译
这边直接使用 gemini 提供的翻译脚本,支持断点续传以增加安全感,本地通过 new api 代理白嫖一个 gemini-2.5-flash 测试下(年底 google 收缩了白嫖额度,后续又尝试了很多模型):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 import jsonimport osimport timeimport shutilfrom openai import OpenAIfrom concurrent.futures import ThreadPoolExecutorINPUT_FILE = "output_subs/vocals_semantic.json" OUTPUT_FILE = "output_subs/vocals_translated.json" MODEL_NAME = "qwen3-max-2025-09-23" BATCH_SIZE = 15 SLEEP_INTERVAL = 0.5 MAX_WORKERS = 3 MAX_RETRIES = 3 TIMEOUT_SECONDS = 60 GLOSSARY = """ - LLM: LLM (大语言模型) - Transformer: Transformer - Token: Token - Weights: 权重 - Context Window: 上下文窗口 - Hallucination: 幻觉 - Inference: 推理 - RLHF: RLHF - Gradient Descent: 梯度下降 """ client = OpenAI( api_key=os.getenv("OPENAI_API_KEY" ), base_url=os.getenv("OPENAI_BASE_URL" ), timeout=TIMEOUT_SECONDS ) def safe_save_data (segments ): """原子写入:防止中断导致 JSON 文件损坏""" temp_file = OUTPUT_FILE + ".tmp" try : with open (temp_file, 'w' , encoding='utf-8' ) as f: json.dump(segments, f, ensure_ascii=False , indent=2 ) shutil.move(temp_file, OUTPUT_FILE) except Exception as e: print (f"⚠️ Save failed: {e} " ) def load_data (): """智能加载逻辑:检测旧缓存冲突""" if not os.path.exists(INPUT_FILE): print (f"❌ Input file missing: {INPUT_FILE} " ) exit(1 ) with open (INPUT_FILE, 'r' , encoding='utf-8' ) as f: input_data = json.load(f) for seg in input_data: if 'text_zh' not in seg: seg['text_zh' ] = "" if os.path.exists(OUTPUT_FILE): print (f"♻️ Checking existing progress in {OUTPUT_FILE} ..." ) try : with open (OUTPUT_FILE, 'r' , encoding='utf-8' ) as f: output_data = json.load(f) if len (input_data) != len (output_data): print (f"⚠️ Mismatch detected (Input: {len (input_data)} vs Cache: {len (output_data)} )." ) print (f"🗑️ Discarding old cache. Starting fresh." ) return input_data else : translated_count = sum (1 for seg in output_data if seg.get('text_zh' , '' ).strip() != "" ) print (f"✅ Resume successful! ({translated_count} /{len (output_data)} segments translated)" ) return output_data except json.JSONDecodeError: print (f"⚠️ Cache corrupted. Starting fresh." ) return input_data return input_data def translate_batch (args ): """处理单个批次,包含重试逻辑""" batch_idx, segments = args input_count = len (segments) if all (seg.get('text_zh' , '' ).strip() != "" for seg in segments): print (f"⏩ Batch {batch_idx} already done." ) return segments, False task_text = f"Task (Translate these {input_count} lines one by one):\n" for i, seg in enumerate (segments): task_text += f"[{i} ] {seg['text' ].strip()} \n" system_prompt = f""" You are a professional translator for an AI technical podcast. Target: Natural, fluent spoken Chinese. Glossary: {GLOSSARY} IMPORTANT: 1. You have received {input_count} lines. 2. You MUST return exactly {input_count} translated lines. 3. Return ONLY a JSON string array. Format: ["text1", "text2"] """ for attempt in range (MAX_RETRIES): try : time.sleep(SLEEP_INTERVAL) print (f"🚀 [Batch {batch_idx} ] Requesting {input_count} lines (Attempt {attempt+1 } )..." ) start_ts = time.time() response = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role" : "system" , "content" : system_prompt}, {"role" : "user" , "content" : task_text} ] ) duration = time.time() - start_ts result = response.choices[0 ].message.content.strip() if result.startswith("```" ): result = result.split("\n" , 1 )[1 ] if result.endswith("```" ): result = result.rsplit("\n" , 1 )[0 ] translations = json.loads(result) if isinstance (translations, dict ): translations = list (translations.values())[0 ] if not isinstance (translations, list ) or len (translations) != input_count: print (f"⚠️ [Batch {batch_idx} ] Count mismatch! Sent {input_count} , got {len (translations) if isinstance (translations, list ) else 'Error' } " ) raise ValueError("Line count mismatch" ) for i, seg in enumerate (segments): seg['text_zh' ] = str (translations[i]).strip() print (f"✅ [Batch {batch_idx} ] Success in {duration:.2 f} s" ) return segments, True except Exception as e: print (f"❌ [Batch {batch_idx} ] Error: {str (e)} " ) if attempt < MAX_RETRIES - 1 : wait_time = (attempt + 1 ) * 2 print (f"🔄 Retrying in {wait_time} s..." ) time.sleep(wait_time) else : print (f"💀 Batch {batch_idx} Failed after {MAX_RETRIES} attempts." ) break return segments, False def main (): all_segments = load_data() total_len = len (all_segments) batches = [] for i in range (0 , total_len, BATCH_SIZE): batch_seg = all_segments[i : i + BATCH_SIZE] batches.append((i // BATCH_SIZE, batch_seg)) print (f"🚀 Processing {len (batches)} batches." ) print (f"ℹ️ Config: Batch Size={BATCH_SIZE} , Workers={MAX_WORKERS} , Interval={SLEEP_INTERVAL} s" ) with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: results = executor.map (translate_batch, batches) for i, (updated_batch, changed) in enumerate (results): start_idx = i * BATCH_SIZE all_segments[start_idx : start_idx + len (updated_batch)] = updated_batch if changed or i % 10 == 0 : safe_save_data(all_segments) if changed: print (f"💾 Progress saved at batch {i} " ) safe_save_data(all_segments) print (f"🎉 All Done! Output saved to {OUTPUT_FILE} " ) if __name__ == "__main__" : main()
看起来 AI 很喜欢在日志中加入符号:
1 2 3 4 5 6 7 8 9 $ python translate_subs.py 🆕 No progress found, starting from scratch with output_subs/vocals.json... 🚀 Processing 200 batches... 🔄 Translating Batch 0 (10 lines)... 🔄 Translating Batch 1 (10 lines)... 💾 Saved progress at batch 0 🔄 Translating Batch 2 (10 lines)... 💾 Saved progress at batch 1 🔄 Translating Batch 3 (10 lines)...
完成后生成的 vocals_translated.json 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 [ { "start" : 0.109 , "end" : 1.009 , "text" : " Reinforcement learning is terrible." , "words" : [ { "word" : "Reinforcement" , "start" : 0.109 , "end" : 0.449 , "score" : 0.303 , "speaker" : "SPEAKER_01" } , { "word" : "learning" , "start" : 0.469 , "end" : 0.649 , "score" : 0.849 , "speaker" : "SPEAKER_01" } , { "word" : "is" , "start" : 0.669 , "end" : 0.709 , "score" : 0.96 , "speaker" : "SPEAKER_01" } , { "word" : "terrible." , "start" : 0.729 , "end" : 1.009 , "score" : 0.899 , "speaker" : "SPEAKER_01" } ] , "speaker" : "SPEAKER_01" , "text_zh" : "强化学习糟透了。" } , ... ]
0x04 语音生成
首先准备一个新的 cuda 12.8 环境,安装 miniconda,然后使用 GPT-SoVITS 官方的一键安装脚本 来初始化环境。
1 2 3 4 5 6 7 wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh bash Miniconda3-latest-Linux-x86_64.sh conda create -n GPTSoVits python=3.10 conda activate GPTSoVits git clone https://github.com/RVC-Boss/GPT-SoVITS.git cd GPT-SoVITS bash install.sh --device CU128 --source ModelScope
按照 gemini 建议采用零样本(Zero-shot)微调,先通过脚本获取几个说话人的音频切片:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 import jsonimport osimport refrom pydub import AudioSegmentWAV_FILE = "separated/htdemucs/input/vocals.wav" JSON_FILE = "output_subs/vocals_translated.json" OUTPUT_DIR = "output_refs" IDEAL_DURATION = 6.0 MIN_DURATION = 3.5 MAX_DURATION = 10.0 def calculate_score (seg, total_duration_idx ): """ 全自动评分函数 """ text = seg['text' ].strip() duration = seg['end' ] - seg['start' ] if duration < MIN_DURATION or duration > MAX_DURATION: return -9999 score = 100 - abs (duration - IDEAL_DURATION) * 10 alpha_count = len (re.findall(r'[a-zA-Z]' , text)) if alpha_count < 10 : return -9999 cps = alpha_count / duration if cps > 22 : score -= 30 if cps < 5 : score -= 30 if "?" in text: score -= 15 if "!" in text: score -= 15 if total_duration_idx < 0.3 : score += 10 elif total_duration_idx > 0.8 : score -= 10 return score def main (): if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) print (f"📂 Loading audio {WAV_FILE} (Please wait, loading large file)..." ) try : audio = AudioSegment.from_wav(WAV_FILE) except Exception as e: print (f"❌ Failed to load WAV: {e} " ) return with open (JSON_FILE, 'r' , encoding='utf-8' ) as f: segments = json.load(f) total_segments = len (segments) speakers = {} for i, seg in enumerate (segments): spk = seg.get('speaker' , 'UNKNOWN' ) if spk not in speakers: speakers[spk] = [] seg['rel_pos' ] = i / total_segments speakers[spk].append(seg) print (f"🔍 Found {len (speakers)} speakers." ) for spk, items in speakers.items(): best_score = -10000 best_seg = None for item in items: score = calculate_score(item, item['rel_pos' ]) if score > best_score: best_score = score best_seg = item if best_seg: print (f"🏆 Best Ref for {spk} (Score: {best_score:.1 f} ):" ) print (f" Text: \"{best_seg['text' ]} \"" ) print (f" Time: {best_seg['start' ]:.1 f} s - {best_seg['end' ]:.1 f} s ({best_seg['end' ]-best_seg['start' ]:.1 f} s)" ) start_ms = int (best_seg['start' ] * 1000 ) end_ms = int (best_seg['end' ] * 1000 ) start_ms = max (0 , start_ms - 50 ) end_ms = min (len (audio), end_ms + 50 ) chunk = audio[start_ms:end_ms] wav_path = os.path.join(OUTPUT_DIR, f"{spk} _ref.wav" ) txt_path = os.path.join(OUTPUT_DIR, f"{spk} _ref.txt" ) chunk.export(wav_path, format ="wav" ) with open (txt_path, 'w' , encoding='utf-8' ) as f: f.write(best_seg['text' ]) print (f" ✅ Saved to {wav_path} " ) else : print (f"⚠️ No valid reference found for {spk} (All filtered out)." ) if __name__ == "__main__" : main()
执行后输出的目录结构如下,分别对应多个说话人的音频切片和文字样本:
1 2 3 4 5 6 7 8 $ tree output_refs/ output_refs/ ├── SPEAKER_00_ref.txt ├── SPEAKER_00_ref.wav ├── SPEAKER_01_ref.txt ├── SPEAKER_01_ref.wav ├── SPEAKER_02_ref.txt └── SPEAKER_02_ref.wav
启动 api_v2.py(使用最新的 v2 版本),并让 gemini 给了一个调用 GPT-SoVITS api 的脚本,利用之前的采样片段和英文分片音频生成新的中文人声音频。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 import jsonimport osimport requestsimport timefrom pydub import AudioSegmentINPUT_JSON = "output_subs/vocals_translated.json" OUTPUT_DIR = "output_audio" REF_DIR = "output_refs" GPT_SOVITS_URL = "http://127.0.0.1:9880" MIN_REF_DURATION = 3.0 def fix_ref_audio (wav_path, txt_path ): """如果参考音频太短,自动循环拼接以满足时长要求""" try : audio = AudioSegment.from_wav(wav_path) original_dur = len (audio) / 1000.0 if original_dur >= MIN_REF_DURATION: with open (txt_path, 'r' , encoding='utf-8' ) as f: return wav_path, f.read().strip() print (f"⚠️ Ref {os.path.basename(wav_path)} too short ({original_dur:.1 f} s). Extending..." ) with open (txt_path, 'r' , encoding='utf-8' ) as f: text = f.read().strip() extended_audio = audio extended_text = text while len (extended_audio) / 1000.0 < MIN_REF_DURATION: extended_audio += audio extended_text += " " + text fixed_wav = wav_path.replace(".wav" , "_fixed.wav" ) extended_audio.export(fixed_wav, format ="wav" ) return fixed_wav, extended_text except Exception as e: print (f"❌ Error fixing ref {wav_path} : {e} " ) return wav_path, "" def load_speaker_refs (): refs = {} if not os.path.exists(REF_DIR): return {} for f in os.listdir(REF_DIR): if f.endswith("_ref.wav" ) and not f.endswith("_fixed.wav" ): speaker = f.replace("_ref.wav" , "" ) wav_path = os.path.abspath(os.path.join(REF_DIR, f)) txt_path = wav_path.replace(".wav" , ".txt" ) if os.path.exists(txt_path): final_wav, final_text = fix_ref_audio(wav_path, txt_path) refs[speaker] = { "audio" : final_wav, "text" : final_text, "lang" : "en" } return refs def generate_segment (index, seg, refs ): speaker = seg.get('speaker' , 'UNKNOWN' ) text_zh = seg.get('text_zh' , '' ).strip() if not text_zh: return filename = f"{index:04d} _{speaker} .wav" output_path = os.path.join(OUTPUT_DIR, filename) if os.path.exists(output_path) and os.path.getsize(output_path) > 1024 : print (f"⏩ [{index} ] Exists." ) return ref = refs.get(speaker) if not ref and refs: ref = list (refs.values())[0 ] if not ref: print (f"❌ No refs available for {speaker} " ) return payload = { "text" : text_zh, "text_lang" : "zh" , "ref_audio_path" : ref['audio' ], "prompt_text" : ref['text' ], "prompt_lang" : ref['lang' ], "text_split_method" : "cut5" , "batch_size" : 1 , "media_type" : "wav" , "streaming_mode" : False } try : response = requests.post(f"{GPT_SOVITS_URL} /tts" , json=payload, stream=True ) if response.status_code == 200 : with open (output_path, "wb" ) as f: for chunk in response.iter_content(chunk_size=4096 ): f.write(chunk) print (f"✅ [{index} ] {speaker} : Generated." ) else : print (f"❌ [{index} ] API Error: {response.text} " ) except Exception as e: print (f"❌ [{index} ] Conn Error: {e} " ) def main (): if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) print ("🔍 Loading & Validating Refs..." ) refs = load_speaker_refs() print (f"Speakers ready: {list (refs.keys())} " ) with open (INPUT_JSON, 'r' , encoding='utf-8' ) as f: segments = json.load(f) print (f"🚀 Processing {len (segments)} segments..." ) for i, seg in enumerate (segments): generate_segment(i, seg, refs) print ("🎉 All done!" ) if __name__ == "__main__" : main()
首次执行时发现之前的 SPEAKER_00 的女声采样片段只有 1s,不符合 GPT-SoVITS 要求,强行将所有女声片段拼成了一个 4s 音频重新生成。
单张 3090 上大概跑了一小时左右,生成了如下所有中文音频片段。
1 2 3 4 5 6 7 8 output_audio/ ├── 0000_SPEAKER_01.wav ├── 0001_SPEAKER_01.wav ├── 0002_SPEAKER_01.wav ... ├── 1990_SPEAKER_02.wav ├── 1991_SPEAKER_02.wav └── 1992_SPEAKER_02.wav
试听了一下估计由于仅仅使用了很短的样本而没有微调模型,声音和原来的 Karpathy 完全不一样,不过可以先用着,后面再考虑微调。
0x05 合并输出
接下来让 gemini 给了一段合并的代码,包含了语速调整和时间轴对齐等处理逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 import jsonimport osimport subprocessfrom pydub import AudioSegment, silenceJSON_FILE = "output_subs/vocals_translated.json" AUDIO_DIR = "output_audio" BACKGROUND_FILE = "separated/htdemucs/input/no_vocals.wav" FINAL_OUTPUT = "final_podcast_smart_cn.mp3" TEMP_WAV = "temp_vocals_smart.wav" MAX_SPEED = 1.35 MIN_SPEED = 1.0 def remove_silence (audio_segment, silence_thresh=-50 , min_silence_len=100 ): """ 去除音频首尾的静音,节省时长。 """ def detect_leading_silence (sound, silence_threshold=-50.0 , chunk_size=10 ): trim_ms = 0 assert chunk_size > 0 while sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold and trim_ms < len (sound): trim_ms += chunk_size return trim_ms start_trim = detect_leading_silence(audio_segment, silence_thresh) end_trim = detect_leading_silence(audio_segment.reverse(), silence_thresh) duration = len (audio_segment) stripped = audio_segment[start_trim:duration-end_trim] if len (stripped) == 0 : return AudioSegment.silent(duration=100 ) return stripped def change_speed (audio_segment, speed=1.0 ): """使用 FFmpeg 进行变速 (Time Stretching)""" if 0.99 < speed < 1.01 : return audio_segment src_tmp = f"tmp_src_{os.getpid()} .wav" dst_tmp = f"tmp_dst_{os.getpid()} .wav" audio_segment.export(src_tmp, format ="wav" ) safe_speed = max (0.5 , min (speed, 2.0 )) cmd = [ "ffmpeg" , "-y" , "-v" , "error" , "-i" , src_tmp, "-filter:a" , f"atempo={safe_speed} " , "-vn" , dst_tmp ] subprocess.run(cmd) if os.path.exists(dst_tmp): try : new_seg = AudioSegment.from_wav(dst_tmp) except : print ("⚠️ FFmpeg output invalid, using original." ) new_seg = audio_segment if os.path.exists(src_tmp): os.remove(src_tmp) if os.path.exists(dst_tmp): os.remove(dst_tmp) return new_seg else : print ("❌ FFmpeg failed, using original." ) if os.path.exists(src_tmp): os.remove(src_tmp) return audio_segment def main (): print (f"📂 Loading {JSON_FILE} ..." ) with open (JSON_FILE, 'r' , encoding='utf-8' ) as f: segments = json.load(f) final_vocals = AudioSegment.empty() cursor_ms = 0 print (f"🚀 Smart Merging {len (segments)} segments..." ) print (f" Policy: No Slow-down (Min {MIN_SPEED} x), Cap Speed-up (Max {MAX_SPEED} x)" ) for i, seg in enumerate (segments): speaker = seg.get('speaker' , 'UNKNOWN' ) filename = f"{i:04d} _{speaker} .wav" file_path = os.path.join(AUDIO_DIR, filename) orig_start_ms = int (seg['start' ] * 1000 ) orig_end_ms = int (seg['end' ] * 1000 ) orig_duration = orig_end_ms - orig_start_ms if cursor_ms < orig_start_ms: silence_dur = orig_start_ms - cursor_ms final_vocals += AudioSegment.silent(duration=silence_dur) cursor_ms = orig_start_ms if not os.path.exists(file_path): processed_audio = AudioSegment.silent(duration=orig_duration) else : raw_audio = AudioSegment.from_wav(file_path) trimmed_audio = remove_silence(raw_audio) gen_duration = len (trimmed_audio) if gen_duration == 0 : speed = 1.0 else : ratio = gen_duration / orig_duration if ratio <= 1.0 : speed = 1.0 else : if ratio > MAX_SPEED: speed = MAX_SPEED else : speed = ratio processed_audio = change_speed(trimmed_audio, speed) final_vocals += processed_audio cursor_ms += len (processed_audio) if i % 100 == 0 : print (f"Processing... {i} /{len (segments)} | Timeline Shift: {cursor_ms - orig_end_ms} ms" ) print ("💾 Exporting vocals track..." ) final_vocals.export(TEMP_WAV, format ="wav" ) print ("🎛️ Mixing with background..." ) cmd_mix = [ "ffmpeg" , "-y" , "-i" , TEMP_WAV, "-i" , BACKGROUND_FILE, "-filter_complex" , "[1:a]volume=0.3[bg];[0:a][bg]amix=inputs=2:duration=first:dropout_transition=2" , "-ac" , "2" , FINAL_OUTPUT ] subprocess.run(cmd_mix) if os.path.exists(TEMP_WAV): os.remove(TEMP_WAV) print (f"✅ All Done! Output: {FINAL_OUTPUT} " ) if __name__ == "__main__" : main()
I re-watched the pod just now too. First of all, yes I know, and I’m sorry that I speak so fast :).
—— Andrej Karpathy
好吧确实很快,并且使用初版代码时很多地方的变速导致听感很奇怪,快的时候会听不清,而慢的时候会拖很长。
上面的代码是已经优化过的版本,主要尝试的优化改动在于如果中文快的话会留白而不是拖长,中文慢的话则会尽可能占用后面句子的时间来利用留白的部分。
实际测试下来有比之前好一些,但是作为信息密度很高的播客来听的话还是会丢失不少关键信息。想了想不妨试试把每一段中文音频原速放在英文之后,作为补充翻译。这样既可以锻炼听力,也可以尽可能保证信息不丢失。不过这样的话可能还需要注意连续播放的长度,尽可能播放完整的一段话再给出翻译,而中文翻译的声音可以忽略背景音。
首先试了一下按时长切片穿插翻译,但是这样听起来有点突兀。于是又想到播客场景也可以在另一个发言人接话之前给整段发言翻译,保证单人发言的连贯性。测试修改后的脚本如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 import jsonimport osimport shutilimport subprocessfrom pydub import AudioSegmentORIGINAL_WAV = "input.wav" JSON_FILE = "output_subs/vocals_translated.json" TTS_DIR = "output_audio" FINAL_OUTPUT = "final_podcast_interleaved.mp3" TEMP_DIR = "temp_mix_chunks" BLOCK_THRESHOLD = 0.8 MAX_BLOCK_DURATION = 15.0 GAP_EN_TO_CN = 200 GAP_CN_TO_EN = 400 CN_VOLUME_BOOST = 2.5 def get_smart_blocks (segments ): """ 智能切分:兼顾语义连贯性和听觉反馈频率 """ current_block = [] current_speaker = None block_start_time = 0.0 for i, seg in enumerate (segments): seg['original_index' ] = i speaker = seg.get('speaker' ) start = seg['start' ] end = seg['end' ] is_new_block = False if not current_block: is_new_block = False block_start_time = start elif speaker != current_speaker: is_new_block = True else : last_end = current_block[-1 ]['end' ] gap = start - last_end current_duration = end - block_start_time if gap > BLOCK_THRESHOLD or current_duration > MAX_BLOCK_DURATION: is_new_block = True if is_new_block: yield { "speaker" : current_speaker, "segments" : current_block, "start" : current_block[0 ]['start' ], "end" : current_block[-1 ]['end' ] } current_block = [] block_start_time = start current_block.append(seg) current_speaker = speaker if current_block: yield { "speaker" : current_speaker, "segments" : current_block, "start" : current_block[0 ]['start' ], "end" : current_block[-1 ]['end' ] } def main (): if not os.path.exists(ORIGINAL_WAV): print (f"❌ Original audio not found: {ORIGINAL_WAV} " ) return if os.path.exists(TEMP_DIR): shutil.rmtree(TEMP_DIR) os.makedirs(TEMP_DIR) print (f"📂 Loading original audio (This may take time)..." ) try : original_audio = AudioSegment.from_wav(ORIGINAL_WAV) except Exception as e: print (f"❌ Failed to load WAV: {e} " ) return with open (JSON_FILE, 'r' , encoding='utf-8' ) as f: segments = json.load(f) print ("🧠 Analyzing dialogue structure (Max 15s / Gap 0.8s)..." ) blocks = list (get_smart_blocks(segments)) print (f" Segments merged into {len (blocks)} playback blocks." ) last_audio_ptr = 0 chunk_files = [] current_chunk = AudioSegment.empty() print (f"🚀 Mixing started..." ) for idx, block in enumerate (blocks): target_end_ms = int (block['end' ] * 1000 ) start_ms = max (last_audio_ptr, 0 ) if target_end_ms > start_ms: eng_segment = original_audio[start_ms:target_end_ms] current_chunk += eng_segment last_audio_ptr = target_end_ms cn_block_audio = AudioSegment.empty() has_cn = False for seg in block['segments' ]: tts_filename = f"{seg['original_index' ]:04d} _{seg.get('speaker' , 'UNKNOWN' )} .wav" tts_path = os.path.join(TTS_DIR, tts_filename) if os.path.exists(tts_path): try : seg_audio = AudioSegment.from_wav(tts_path) cn_block_audio += seg_audio cn_block_audio += AudioSegment.silent(duration=100 ) has_cn = True except : pass if has_cn: cn_block_audio = cn_block_audio + CN_VOLUME_BOOST current_chunk += AudioSegment.silent(duration=GAP_EN_TO_CN) current_chunk += cn_block_audio current_chunk += AudioSegment.silent(duration=GAP_CN_TO_EN) if (idx + 1 ) % 50 == 0 or idx == len (blocks) - 1 : chunk_name = os.path.join(TEMP_DIR, f"chunk_{idx} .wav" ) current_chunk.export(chunk_name, format ="wav" ) chunk_files.append(chunk_name) current_chunk = AudioSegment.empty() if idx % 100 == 0 : print (f" Processed {idx+1 } /{len (blocks)} blocks..." ) if last_audio_ptr < len (original_audio): outro_chunk = original_audio[last_audio_ptr:] outro_name = os.path.join(TEMP_DIR, "chunk_final.wav" ) outro_chunk.export(outro_name, format ="wav" ) chunk_files.append(outro_name) print ("✨ Merging chunks with FFmpeg..." ) concat_list_path = os.path.join(TEMP_DIR, "files.txt" ) with open (concat_list_path, 'w' ) as f: for p in chunk_files: abs_path = os.path.abspath(p) f.write(f"file '{abs_path} '\n" ) subprocess.run([ "ffmpeg" , "-y" , "-f" , "concat" , "-safe" , "0" , "-i" , concat_list_path, "-c:a" , "libmp3lame" , "-q:a" , "2" , FINAL_OUTPUT ]) shutil.rmtree(TEMP_DIR) print (f"🎉 Success! Output: {FINAL_OUTPUT} " ) if __name__ == "__main__" : main()
执行完成后就有了一个五个半小时的超长音频,截取 5 分钟预览如下:
您的浏览器不支持 audio 标签。
下面是优化了语义分片等逻辑,重新抽卡的结果(sounds gay but ok):
您的浏览器不支持 audio 标签。
0x06 What’s Next?
上面流程的输出效果仅仅是“能听”,距离成熟的播客翻译还有一段距离。将完整记录重新丢给 gemini 分析得到以下几个优化方向:
同声传译模式还有一定的优化空间
WhisperX 切分的上下文过于碎片化
考虑引入 NLP 模块重新合并分句(如 spacy)
批处理中考虑引入压缩的上下文或者通过滑动窗口翻译 (很费 token,已放弃)
实现更好的翻译模式(词汇表等)
实现微调解决音色克隆问题(暂时考虑完全交给 GPT-SoVITS 实现)
加入文本去噪和中文背景音等优化策略
支持 RSS Feed 生成,或许会是产品化的亮点