之前有期 Dwarkesh Patel 对话 Andrej Karpathy 的播客放在书签里好久没看,每天车上通勤时间有点浪费,遂想将播客转译为中文人声的想法。使用 Notebooklm 的话,通过 AI 提炼的文字对于播客这种信息密度比较高的内容来说损失难以衡量,而市面上虽然已有一些流行的视频字幕和人声转换工具,但好像又没有开源成熟的播客翻译的项目实现,于是准备先手动跑一遍流程,再看看有没有什么适合封装的实现。
跟 gemini 交流了一番,总结出主要流程需包含以下步骤:
Demucs:人声分离
WhisperX:声音识别与分割
LLM API:翻译
GPT-SoVITS:生成中文语音
pydub/ffmpeg-python:时序对齐和混音
0x00 环境准备
本来用的 nvidia pytorch 包依赖冲突太多,遂决定基于 nvcr.io/nvidia/cuda:12.1.1-cudnn8-devel-ubuntu22.04 重新构建一个,requirement.txt 参考了 VideoLingo 的,让 gemini 精简了下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 # --- 核心基础库 --- numpy==1.26.4 pandas==2.2.3 librosa==0.10.2.post1 scipy # --- 音频处理与计算 --- pydub==0.25.1 moviepy==1.0.3 ctranslate2==4.4.0 transformers==4.39.3 pytorch-lightning==2.3.3 # --- 核心 AI 模型 (直接从源码安装以获取最新修复) --- # Demucs: 人声分离 demucs[dev] @ git+https://github.com/adefossez/demucs # WhisperX: 识别与对齐 (指定了特定 Commit 以保证稳定性) whisperx @ git+https://github.com/m-bain/whisperx.git@7307306a9d8dd0d261e588cc933322454f853853 # --- LLM 与 文本处理 --- openai==1.55.3 json-repair # 极力推荐:防止 LLM 返回的 JSON 格式错误 spacy==3.7.4 # 用于更好的分句处理 autocorrect-py # 简单的拼写纠错 # --- 辅助工具 --- PyYAML==6.0.2 requests==2.32.3
由于使用的 enroot 环境,没通过 dockerfile 构建,直接命令行一把梭:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 apt-get update && apt-get install -y \ python3.10 \ python3-pip \ python3-venv \ git \ ffmpeg \ libavcodec-dev \ libavformat-dev \ libavdevice-dev \ libavutil-dev \ libswscale-dev \ libswresample-dev \ libavfilter-dev \ pkg-config \ build-essential \ && rm -rf /var/lib/apt/lists/* pip install --upgrade pip pip install torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 --index-url https://download.pytorch.org/whl/cu121 pip install -r requirements.txt ln -s /usr/bin/python3.10 /usr/bin/python
0x01 人声分离
Demucs 是一款先进的音乐源分离模型,目前能够将鼓、贝斯和人声从其他伴奏中分离出来。
之前从 YouTube 下载的音频是 m4v 格式,虽然 demucs 支持,但还是按照 gemini 建议先将其转换为 wav 以提高后续处理效率。(转换后的 wav 体积大概是源文件的 10 倍,我是通过 ssh 进行的操作下载很慢,所以后续需要截取切片来试听效果)
1 ffmpeg -i input.m4a -ar 44100 -ac 2 input.wav
使用 demucs 分离人声
1 demucs --two-stems=vocals -n htdemucs input.wav
–two-stems=vocals: 告诉模型我只要 vocals(人声)和 no_vocals(伴奏/背景音)。如果不加这个,它会分成鼓、贝斯、其他等4个轨道,对播客没用。
-n htdemucs: 指定使用最新模型。
输出的音频位于 separated/htdemucs/input 下,分为 vocals.wav 和 no_vocals.wav 两个音频文件(都是 GB 级别),可以通过以下命令截取 5 分钟片段下载到本地试听下效果:
1 2 ffmpeg -i separated/htdemucs/input/vocals.wav -t 300 -c copy testvocals.wav ffmpeg -i separated/htdemucs/input/no_vocals.wav -t 300 -c copy test_no_vocals.wav
在音频目录下也可以通过 python 启动一个 http 服务转发音频在浏览器中直接预览:
1 python3 -m http.server 8000
0x02 语音识别
Whisper 是 OpenAI 开发的 一款自动语音识别 (ASR) 模型,它基于一个包含各种音频的大型数据集进行训练。虽然它能生成高度准确的转录文本,但对应的时间戳是按语句而非单词计算的,因此可能存在几秒的误差。OpenAI 的 Whisper 本身并不支持批量处理。
WhisperX 是基于 Whisper 的工程封装库 ,支持说话人区分。
在使用 WhisperX 前,需要准备一些配置,比如说话人声分割用到的模型 speaker-diarization-3.1 要在参数中添加 huggingface 令牌。
首先在网页端 新建一个 read 权限的 token,同时需要在 speaker-diarization-3.1 和 segmentation-3.0 页面同意开源模型协议。
准备完成后就可以在命令行启动 whisperx 进行语音转文字。
1 2 3 4 5 6 7 8 9 10 11 12 whisperx separated/htdemucs/input/vocals.wav \ --model large-v2 \ --language en \ --diarize \ --min_speakers 3 \ --max_speakers 3 \ --max_line_width 100 \ --output_dir output_subs \ --output_format all \ --compute_type float16 \ --batch_size 16 \ --hf_token <huggingface_token>
下面是 gemini 给出的一些参数解释:
–model large-v2:
目前 Whisper 生态中英文识别综合能力最强的模型(v3 在某些场景下对音乐的抗噪反而不如 v2,v2 是公认最稳的)。
3090 显存足够大,不用担心跑不动。
–language en:
显式指定英语。虽然它能自动检测,但指定后可以避免开头几秒误识别,并略微提升速度。
–compute_type float16:
使用半精度加速,在 3090 上这是标准操作,速度快且不损失精度。
–batch_size 16:
这会极大提升处理速度。3090 的 24G 显存完全吃得消 batch 16 甚至 32。
–min_speakers 2 / --max_speakers 2?:
显式告诉 WhisperX “这里面有 2 个人”,能极大提高说话人区分 (Diarization) 的准确度,防止它把偶尔的背景杂音识别成第 3 个说话人。
注:如果你不确定有几个人,可以不加这两个参数,但在播客场景下指定人数通常效果最好。
补充:好吧后续在翻译结束后才发现中间有另一个女声(还好白嫖的翻译 api),重新改为 3 再试下
运行完成后输出在当前路径下的 output_subs 目录:
1 2 $ ls output_subs/vocals.json vocals.srt vocals.tsv vocals.txt vocals.vtt
将 json 文件(后续准备用来翻译的格式)下载下来预览一下:
1 2 3 { "segments" : [ { "start" : 0.109 , "end" : 1.009 , "text" : " Reinforcement learning is terrible." , "words" : [ { "word" : "Reinforcement" , "start" : 0.109 , "end" : 0.449 , "score" : 0.303 , "speaker" : "SPEAKER_01" } , { "word" : "learning" , "start" : 0.469 , "end" : 0.649 , "score" : 0.849 , "speaker" : "SPEAKER_01" } ... , { "word" : "one." , "start" : 8766.077 , "end" : 8766.157 , "score" : 0.741 , "speaker" : "SPEAKER_01" } ] , "language" : "en" }
可以看出其中包含了文本时间轴和说话人等关键信息,但是也不难发现有些句子被切分得很短,在丢给大模型翻译时可能会丢失上下文信息。不过这个问题可以先放一边,后续再考虑优化。
最好再确认下人声数量分布,提前发现问题。
1 2 3 4 $ grep -o '"speaker": "[^"]*"' output_subs/vocals.json | sort | uniq -c 319 "speaker": "SPEAKER_00" 46484 "speaker": "SPEAKER_01" 16872 "speaker": "SPEAKER_02"
0x03 文本翻译
这边直接使用 gemini 提供的翻译脚本,支持断点续传以增加安全感,本地通过 new api 代理白嫖一个 gemini-2.5-flash 测试下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 import jsonimport osimport tracebackfrom openai import OpenAIfrom concurrent.futures import ThreadPoolExecutorINPUT_FILE = "output_subs/vocals.json" OUTPUT_FILE = "output_subs/vocals_translated.json" MODEL_NAME = "gemini-2.5-flash" BATCH_SIZE = 10 MAX_WORKERS = 1 client = OpenAI( api_key=os.getenv("OPENAI_API_KEY" ), base_url=os.getenv("OPENAI_BASE_URL" ) ) def load_data (): """ 智能加载逻辑: 1. 如果输出文件(vocals_translated.json)存在,优先加载它(里面可能包含了部分已翻译的)。 2. 如果不存在,加载原始 Whisper 结果。 """ if os.path.exists(OUTPUT_FILE): print (f"♻️ Found existing progress in {OUTPUT_FILE} , resuming..." ) with open (OUTPUT_FILE, 'r' , encoding='utf-8' ) as f: return json.load(f) else : print (f"🆕 No progress found, starting from scratch with {INPUT_FILE} ..." ) with open (INPUT_FILE, 'r' , encoding='utf-8' ) as f: data = json.load(f) segments = data['segments' ] if 'segments' in data else data for seg in segments: if 'text_zh' not in seg: seg['text_zh' ] = "" return segments def save_data (segments ): """实时保存数据""" with open (OUTPUT_FILE, 'w' , encoding='utf-8' ) as f: json.dump(segments, f, ensure_ascii=False , indent=2 ) def translate_batch (batch_data ): """ 处理一个批次。 输入:(index, batch_segments) """ batch_idx, segments = batch_data if all (seg.get('text_zh' , '' ).strip() != "" for seg in segments): print (f"⏩ Batch {batch_idx} already translated, skipping." ) return segments, False prompt_text = "" for i, seg in enumerate (segments): speaker = seg.get('speaker' , 'UNKNOWN' ) text = seg['text' ].strip() prompt_text += f"[{i} ] Speaker {speaker} : {text} \n" system_prompt = ( "你是一位专业的播客字幕翻译专家。请将下面的英文播客对话翻译成中文。\n" "要求:\n" "1. 保持口语化,自然流畅。\n" "2. 严格保持原有的行数和顺序。\n" "3. 返回 JSON 列表,例如:[\"第一句\", \"第二句\"]\n" ) try : print (f"🔄 Translating Batch {batch_idx} ({len (segments)} lines)..." ) response = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role" : "system" , "content" : system_prompt}, {"role" : "user" , "content" : prompt_text} ], ) result = response.choices[0 ].message.content.strip() if result.startswith("```" ): result = result.split("\n" , 1 )[1 ] if result.endswith("```" ): result = result.rsplit("\n" , 1 )[0 ] try : translations = json.loads(result) if isinstance (translations, dict ): translations = list (translations.values())[0 ] except : print (f"⚠️ JSON Parse Error in Batch {batch_idx} , raw: {result[:20 ]} ..." ) translations = [] except Exception as e: print (f"❌ Error in Batch {batch_idx} : {str (e)} " ) translations = [] for i, seg in enumerate (segments): if i < len (translations) and isinstance (translations, list ): seg['text_zh' ] = str (translations[i]) return segments, True def main (): all_segments = load_data() total_len = len (all_segments) batches = [] for i in range (0 , total_len, BATCH_SIZE): batch_seg = all_segments[i : i + BATCH_SIZE] batches.append((i // BATCH_SIZE, batch_seg)) print (f"🚀 Processing {len (batches)} batches..." ) with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: results = executor.map (translate_batch, batches) for i, (updated_batch, changed) in enumerate (results): start_idx = i * BATCH_SIZE all_segments[start_idx : start_idx + len (updated_batch)] = updated_batch if changed or i % 5 == 0 : save_data(all_segments) if changed: print (f"💾 Saved progress at batch {i} " ) save_data(all_segments) print (f"✅ All done! Saved to {OUTPUT_FILE} " ) if __name__ == "__main__" : main()
看起来 AI 很喜欢在日志中加入符号:
1 2 3 4 5 6 7 8 9 $ python translate_subs.py 🆕 No progress found, starting from scratch with output_subs/vocals.json... 🚀 Processing 200 batches... 🔄 Translating Batch 0 (10 lines)... 🔄 Translating Batch 1 (10 lines)... 💾 Saved progress at batch 0 🔄 Translating Batch 2 (10 lines)... 💾 Saved progress at batch 1 🔄 Translating Batch 3 (10 lines)...
完成后生成的 vocals_translated.json 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 [ { "start" : 0.109 , "end" : 1.009 , "text" : " Reinforcement learning is terrible." , "words" : [ { "word" : "Reinforcement" , "start" : 0.109 , "end" : 0.449 , "score" : 0.303 , "speaker" : "SPEAKER_01" } , { "word" : "learning" , "start" : 0.469 , "end" : 0.649 , "score" : 0.849 , "speaker" : "SPEAKER_01" } , { "word" : "is" , "start" : 0.669 , "end" : 0.709 , "score" : 0.96 , "speaker" : "SPEAKER_01" } , { "word" : "terrible." , "start" : 0.729 , "end" : 1.009 , "score" : 0.899 , "speaker" : "SPEAKER_01" } ] , "speaker" : "SPEAKER_01" , "text_zh" : "强化学习糟透了。" } , ... ]
0x04 语音生成
首先准备一个新的 cuda 12.8 环境,安装 miniconda,然后使用 GPT-SoVITS 官方的一键安装脚本 来初始化环境。
1 2 3 4 5 6 7 wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh bash Miniconda3-latest-Linux-x86_64.sh conda create -n GPTSoVits python=3.10 conda activate GPTSoVits git clone https://github.com/RVC-Boss/GPT-SoVITS.git cd GPT-SoVITS bash install.sh --device CU128 --source ModelScope
按照 gemini 建议采用零样本(Zero-shot)微调,先通过脚本获取几个说话人的音频切片:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 import jsonimport randomfrom pydub import AudioSegmentWAV_FILE = "separated/htdemucs/input/vocals.wav" JSON_FILE = "output_subs/vocals_translated.json" OUTPUT_DIR = "output_refs" def main (): if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) print (f"Loading {JSON_FILE} ..." ) with open (JSON_FILE, 'r' , encoding='utf-8' ) as f: segments = json.load(f) speakers = {} for seg in segments: spk = seg.get('speaker' , 'UNKNOWN' ) duration = seg['end' ] - seg['start' ] text_len = len (seg['text' ]) if spk not in speakers: speakers[spk] = [] score = -abs (duration - 7.0 ) if text_len < 10 : score -= 100 if text_len > 150 : score -= 100 speakers[spk].append({ "seg" : seg, "score" : score }) print (f"Loading wav file (this may take a while)..." ) audio = AudioSegment.from_wav(WAV_FILE) for spk, items in speakers.items(): items.sort(key=lambda x: x["score" ], reverse=True ) best_item = items[0 ] ref_seg = best_item["seg" ] print (f"Extracting ref for {spk} (Score: {best_item['score' ]:.2 f} ): {ref_seg['text' ]} " ) start_ms = int (ref_seg['start' ] * 1000 ) end_ms = int (ref_seg['end' ] * 1000 ) ref_audio = audio[start_ms:end_ms] ref_wav_path = f"{OUTPUT_DIR} /{spk} _ref.wav" ref_audio.export(ref_wav_path, format ="wav" ) ref_text_path = f"{OUTPUT_DIR} /{spk} _ref.txt" with open (ref_text_path, 'w' , encoding='utf-8' ) as f: f.write(ref_seg['text' ]) print (f"✅ Saved {ref_wav_path} " ) if __name__ == "__main__" : import os main()
执行后输出的目录结构如下,分别对应多个说话人的音频切片和文字样本:
1 2 3 4 5 6 7 8 $ tree output_refs/ output_refs/ ├── SPEAKER_00_ref.txt ├── SPEAKER_00_ref.wav ├── SPEAKER_01_ref.txt ├── SPEAKER_01_ref.wav ├── SPEAKER_02_ref.txt └── SPEAKER_02_ref.wav
启动 api_v2.py(使用最新的 v2 版本),并让 gemini 给了一个调用 GPT-SoVITS api 的脚本,利用之前的采样片段和英文分片音频生成新的中文人声音频。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 import jsonimport osimport requestsimport timeINPUT_JSON = "output_subs/vocals_translated.json" OUTPUT_DIR = "output_audio" REF_DIR = "output_refs" GPT_SOVITS_URL = "http://127.0.0.1:9880" def load_speaker_config (ref_dir ): if not os.path.exists(ref_dir): raise FileNotFoundError(f"❌ 参考音频目录 {ref_dir} 不存在!" ) config = {} print (f"🔍 Scanning {ref_dir} for reference audio..." ) for filename in os.listdir(ref_dir): if filename.endswith("_ref.wav" ): speaker_id = filename.replace("_ref.wav" , "" ) wav_path = os.path.abspath(os.path.join(ref_dir, filename)) txt_path = os.path.join(ref_dir, f"{speaker_id} _ref.txt" ) prompt_text = "" if os.path.exists(txt_path): with open (txt_path, 'r' , encoding='utf-8' ) as f: prompt_text = f.read().strip() config[speaker_id] = { "ref_audio_path" : wav_path, "prompt_text" : prompt_text, "prompt_lang" : "en" } print (f" ✅ Loaded: {speaker_id} " ) return config def generate_audio (text_zh, speaker_id, config, output_filename ): """适配 api_v2.py 的 POST 请求""" if speaker_id in config: spk_cfg = config[speaker_id] else : default_key = list (config.keys())[0 ] spk_cfg = config[default_key] print (f" ⚠️ Speaker {speaker_id} fallback to {default_key} " ) payload = { "text" : text_zh, "text_lang" : "zh" , "ref_audio_path" : spk_cfg["ref_audio_path" ], "prompt_text" : spk_cfg["prompt_text" ], "prompt_lang" : spk_cfg["prompt_lang" ], "media_type" : "wav" , "streaming_mode" : False , "text_split_method" : "cut5" , "speed_factor" : 1.0 } try : url = f"{GPT_SOVITS_URL} /tts" response = requests.post(url, json=payload, stream=True , timeout=120 ) if response.status_code == 200 : if 'application/json' in response.headers.get('Content-Type' , '' ): print (f"❌ API Error: {response.text} " ) return False with open (output_filename, "wb" ) as f: for chunk in response.iter_content(chunk_size=4096 ): if chunk: f.write(chunk) return True else : print (f"❌ API Error {response.status_code} : {response.text} " ) return False except Exception as e: print (f"❌ Connection Error: {e} " ) return False def clean_text (text ): if not text: return "" return text.replace('\n' , ' ' ).strip() def main (): if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) speakers_config = load_speaker_config(REF_DIR) if not speakers_config: return print (f"📂 Loading {INPUT_JSON} ..." ) with open (INPUT_JSON, 'r' , encoding='utf-8' ) as f: segments = json.load(f) total = len (segments) print (f"🚀 Starting TTS (v2 API) for {total} segments..." ) success_count = 0 for i, seg in enumerate (segments): speaker = seg.get('speaker' , 'UNKNOWN' ) text_zh = clean_text(seg.get('text_zh' , '' )) filename = f"{i:04d} _{speaker} .wav" output_path = os.path.join(OUTPUT_DIR, filename) if not text_zh: continue if os.path.exists(output_path) and os.path.getsize(output_path) > 1024 : success_count += 1 continue print (f"🔊 [{i:04d} /{total} ] {speaker} : {text_zh[:20 ]} ..." ) for attempt in range (3 ): if generate_audio(text_zh, speaker, speakers_config, output_path): success_count += 1 break time.sleep(1 ) print (f"\n✅ Generation Complete! ({success_count} /{total} files)" ) if __name__ == "__main__" : main()
首次执行时发现之前的 SPEAKER_00 的女声采样片段只有 1s,不符合 GPT-SoVITS 要求,强行将所有女声片段拼成了一个 4s 音频重新生成。
单张 3090 上大概跑了一小时左右,生成了如下所有中文音频片段。
1 2 3 4 5 6 7 8 output_audio/ ├── 0000_SPEAKER_01.wav ├── 0001_SPEAKER_01.wav ├── 0002_SPEAKER_01.wav ... ├── 1990_SPEAKER_02.wav ├── 1991_SPEAKER_02.wav └── 1992_SPEAKER_02.wav
试听了一下估计由于仅仅使用了很短的样本而没有微调模型,声音和原来的 Karpathy 完全不一样,不过可以先用着,后面再考虑微调。
0x05 合并输出
接下来让 gemini 给了一段合并的代码,包含了语速调整和时间轴对齐等处理逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 import jsonimport osimport subprocessfrom pydub import AudioSegment, silenceJSON_FILE = "output_subs/vocals_translated.json" AUDIO_DIR = "output_audio" BACKGROUND_FILE = "separated/htdemucs/input/no_vocals.wav" FINAL_OUTPUT = "final_podcast_smart_cn.mp3" TEMP_WAV = "temp_vocals_smart.wav" MAX_SPEED = 1.35 MIN_SPEED = 1.0 def remove_silence (audio_segment, silence_thresh=-50 , min_silence_len=100 ): """ 去除音频首尾的静音,节省时长。 """ def detect_leading_silence (sound, silence_threshold=-50.0 , chunk_size=10 ): trim_ms = 0 assert chunk_size > 0 while sound[trim_ms:trim_ms+chunk_size].dBFS < silence_threshold and trim_ms < len (sound): trim_ms += chunk_size return trim_ms start_trim = detect_leading_silence(audio_segment, silence_thresh) end_trim = detect_leading_silence(audio_segment.reverse(), silence_thresh) duration = len (audio_segment) stripped = audio_segment[start_trim:duration-end_trim] if len (stripped) == 0 : return AudioSegment.silent(duration=100 ) return stripped def change_speed (audio_segment, speed=1.0 ): """使用 FFmpeg 进行变速 (Time Stretching)""" if 0.99 < speed < 1.01 : return audio_segment src_tmp = f"tmp_src_{os.getpid()} .wav" dst_tmp = f"tmp_dst_{os.getpid()} .wav" audio_segment.export(src_tmp, format ="wav" ) safe_speed = max (0.5 , min (speed, 2.0 )) cmd = [ "ffmpeg" , "-y" , "-v" , "error" , "-i" , src_tmp, "-filter:a" , f"atempo={safe_speed} " , "-vn" , dst_tmp ] subprocess.run(cmd) if os.path.exists(dst_tmp): try : new_seg = AudioSegment.from_wav(dst_tmp) except : print ("⚠️ FFmpeg output invalid, using original." ) new_seg = audio_segment if os.path.exists(src_tmp): os.remove(src_tmp) if os.path.exists(dst_tmp): os.remove(dst_tmp) return new_seg else : print ("❌ FFmpeg failed, using original." ) if os.path.exists(src_tmp): os.remove(src_tmp) return audio_segment def main (): print (f"📂 Loading {JSON_FILE} ..." ) with open (JSON_FILE, 'r' , encoding='utf-8' ) as f: segments = json.load(f) final_vocals = AudioSegment.empty() cursor_ms = 0 print (f"🚀 Smart Merging {len (segments)} segments..." ) print (f" Policy: No Slow-down (Min {MIN_SPEED} x), Cap Speed-up (Max {MAX_SPEED} x)" ) for i, seg in enumerate (segments): speaker = seg.get('speaker' , 'UNKNOWN' ) filename = f"{i:04d} _{speaker} .wav" file_path = os.path.join(AUDIO_DIR, filename) orig_start_ms = int (seg['start' ] * 1000 ) orig_end_ms = int (seg['end' ] * 1000 ) orig_duration = orig_end_ms - orig_start_ms if cursor_ms < orig_start_ms: silence_dur = orig_start_ms - cursor_ms final_vocals += AudioSegment.silent(duration=silence_dur) cursor_ms = orig_start_ms if not os.path.exists(file_path): processed_audio = AudioSegment.silent(duration=orig_duration) else : raw_audio = AudioSegment.from_wav(file_path) trimmed_audio = remove_silence(raw_audio) gen_duration = len (trimmed_audio) if gen_duration == 0 : speed = 1.0 else : ratio = gen_duration / orig_duration if ratio <= 1.0 : speed = 1.0 else : if ratio > MAX_SPEED: speed = MAX_SPEED else : speed = ratio processed_audio = change_speed(trimmed_audio, speed) final_vocals += processed_audio cursor_ms += len (processed_audio) if i % 100 == 0 : print (f"Processing... {i} /{len (segments)} | Timeline Shift: {cursor_ms - orig_end_ms} ms" ) print ("💾 Exporting vocals track..." ) final_vocals.export(TEMP_WAV, format ="wav" ) print ("🎛️ Mixing with background..." ) cmd_mix = [ "ffmpeg" , "-y" , "-i" , TEMP_WAV, "-i" , BACKGROUND_FILE, "-filter_complex" , "[1:a]volume=0.3[bg];[0:a][bg]amix=inputs=2:duration=first:dropout_transition=2" , "-ac" , "2" , FINAL_OUTPUT ] subprocess.run(cmd_mix) if os.path.exists(TEMP_WAV): os.remove(TEMP_WAV) print (f"✅ All Done! Output: {FINAL_OUTPUT} " ) if __name__ == "__main__" : main()
I re-watched the pod just now too. First of all, yes I know, and I’m sorry that I speak so fast :).
—— Andrej Karpathy
好吧确实很快,并且使用初版代码时很多地方的变速导致听感很奇怪,快的时候会听不清,而慢的时候会拖很长。
上面的代码是已经优化过的版本,主要尝试的优化改动在于如果中文快的话会留白而不是拖长,中文慢的话则会尽可能占用后面句子的时间来利用留白的部分。
实际测试下来有比之前好一些,但是作为信息密度很高的播客来听的话还是会丢失不少关键信息。想了想不妨试试把每一段中文音频原速放在英文之后,作为补充翻译。这样既可以锻炼听力,也可以尽可能保证信息不丢失。不过这样的话可能还需要注意连续播放的长度,尽可能播放完整的一段话再给出翻译,而中文翻译的声音可以忽略背景音。
首先试了一下按时长切片穿插翻译,但是这样听起来有点突兀。于是又想到播客场景也可以在另一个发言人接话之前给整段发言翻译,保证单人发言的连贯性。测试修改后的脚本如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 import jsonimport osimport shutilimport subprocessfrom pydub import AudioSegmentJSON_FILE = "output_subs/vocals_translated.json" AUDIO_DIR = "output_audio" ORIGINAL_WAV = "input.wav" FINAL_OUTPUT = "podcast_smart_interleaved.mp3" MAX_BLOCK_DURATION = 20.0 TEMP_DIR = "temp_chunks" PROGRESS_FILE = os.path.join(TEMP_DIR, "progress_smart.json" ) CHUNK_SIZE = 10 def get_smart_blocks (segments ): """ 智能分块生成器 按照 '说话人改变' 或 '累计时长超过阈值' 来生成 Block """ current_block = [] current_duration = 0.0 current_speaker = None for i, seg in enumerate (segments): seg['original_index' ] = i for seg in segments: speaker = seg.get('speaker' ) seg_len = seg['end' ] - seg['start' ] should_flush = False reason = "" if current_block: if speaker != current_speaker: should_flush = True reason = "Speaker Change" elif (current_duration + seg_len) > MAX_BLOCK_DURATION: should_flush = True reason = "Max Duration Exceeded" if should_flush: yield { "speaker" : current_speaker, "segments" : current_block, "end_time" : current_block[-1 ]['end' ] } current_block = [] current_duration = 0.0 current_block.append(seg) current_duration += seg_len current_speaker = speaker if current_block: yield { "speaker" : current_speaker, "segments" : current_block, "end_time" : current_block[-1 ]['end' ] } def load_progress (): if os.path.exists(PROGRESS_FILE): with open (PROGRESS_FILE, 'r' ) as f: return json.load(f) return {"last_block_idx" : -1 , "last_audio_ptr" : 0 , "chunks" : []} def save_progress (idx, ptr, chunk_file ): data = load_progress() data["last_block_idx" ] = idx data["last_audio_ptr" ] = ptr if chunk_file: data["chunks" ].append(chunk_file) with open (PROGRESS_FILE, 'w' ) as f: json.dump(data, f) def main (): if not os.path.exists(TEMP_DIR): os.makedirs(TEMP_DIR) print (f"📂 Loading original audio: {ORIGINAL_WAV} ..." ) try : original_audio = AudioSegment.from_wav(ORIGINAL_WAV) except : print (f"❌ 找不到 {ORIGINAL_WAV} " ) return print (f"📂 Loading {JSON_FILE} ..." ) with open (JSON_FILE, 'r' , encoding='utf-8' ) as f: segments = json.load(f) print ("🧠 Analyzing dialogue structure..." ) all_blocks = list (get_smart_blocks(segments)) print (f" Total Segments: {len (segments)} " ) print (f" Smart Blocks: {len (all_blocks)} " ) progress = load_progress() start_idx = progress["last_block_idx" ] + 1 last_audio_ptr = progress["last_audio_ptr" ] current_chunk_audio = AudioSegment.empty() print (f"🚀 Processing from block {start_idx} ..." ) for b_idx in range (start_idx, len (all_blocks)): block = all_blocks[b_idx] target_end_ms = int (block['end_time' ] * 1000 ) start_ms = max (last_audio_ptr, 0 ) if target_end_ms > start_ms: eng_audio = original_audio[start_ms:target_end_ms] current_chunk_audio += eng_audio last_audio_ptr = target_end_ms cn_audio_combined = AudioSegment.empty() has_cn = False for seg in block['segments' ]: orig_idx = seg['original_index' ] speaker = seg.get('speaker' , 'UNKNOWN' ) filename = f"{orig_idx:04d} _{speaker} .wav" tts_path = os.path.join(AUDIO_DIR, filename) if os.path.exists(tts_path): part_audio = AudioSegment.from_wav(tts_path) cn_audio_combined += part_audio cn_audio_combined += AudioSegment.silent(duration=80 ) has_cn = True if has_cn: current_chunk_audio += AudioSegment.silent(duration=200 ) current_chunk_audio += cn_audio_combined current_chunk_audio += AudioSegment.silent(duration=400 ) if b_idx % 20 == 0 : print (f"Processing Block {b_idx} /{len (all_blocks)} ..." ) if (b_idx + 1 ) % CHUNK_SIZE == 0 or (b_idx == len (all_blocks) - 1 ): chunk_filename = os.path.join(TEMP_DIR, f"smart_chunk_{b_idx} .wav" ) current_chunk_audio.export(chunk_filename, format ="wav" ) save_progress(b_idx, last_audio_ptr, chunk_filename) current_chunk_audio = AudioSegment.empty() print ("✨ Merging chunks..." ) progress = load_progress() chunk_files = progress["chunks" ] concat_list_path = os.path.join(TEMP_DIR, "concat_list.txt" ) with open (concat_list_path, 'w' ) as f: for chunk in chunk_files: abs_path = os.path.abspath(chunk) f.write(f"file '{abs_path} '\n" ) if last_audio_ptr < len (original_audio): outro_path = os.path.join(TEMP_DIR, "outro.wav" ) original_audio[last_audio_ptr:].export(outro_path, format ="wav" ) with open (concat_list_path, 'a' ) as f: f.write(f"file '{os.path.abspath(outro_path)} '\n" ) print ("RUNNING FFmpeg concat..." ) subprocess.run([ "ffmpeg" , "-y" , "-f" , "concat" , "-safe" , "0" , "-i" , concat_list_path, "-c" , "copy" , "temp_merged_smart.wav" ]) print ("Converting to MP3..." ) subprocess.run([ "ffmpeg" , "-y" , "-i" , "temp_merged_smart.wav" , "-b:a" , "192k" , FINAL_OUTPUT ]) print ("🧹 Cleaning up temporary files..." ) if os.path.exists("temp_merged_smart.wav" ): os.remove("temp_merged_smart.wav" ) if os.path.exists(TEMP_DIR): try : shutil.rmtree(TEMP_DIR) print (f"✅ Removed {TEMP_DIR} " ) except Exception as e: print (f"⚠️ Could not remove {TEMP_DIR} : {e} " ) print (f"🎉 All Done! Output: {FINAL_OUTPUT} " ) if __name__ == "__main__" : main()
执行完成后就有了一个五个半小时的超长音频,截取 5 分钟预览如下:
您的浏览器不支持 audio 标签。