保存时间:2026/4/5 04:08:26
video_001.mp4 …audio_001.mp3 …script_001.txtfixed_script_001.txt<div class="item">
<h3>001_会议记录_主题A</h3>
<button onclick="playVideo(001)">播放视频</button>
<button onclick="playAudio(001)">播放音频</button>
<button onclick="loadScript(001)">原始脚本</button>
<button onclick="loadFixedScript(001)">修正脚本</button>
<!-- 文本展示区 -->
<div class="text-area" id="text-001"></div>
</div>
001.mp4 / 001.mp3 / 001.txt / 001_fixed.txtfetch("txt/"+id+".txt") 然后显示到页面/videos/001.mp4
/audios/001.mp3
/scripts/001.txt
/scripts/001.fixed.txt
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>视频脚本库</title>
</head>
<body>
<h1>视频列表</h1>
<ul>
<li><a href="item-001.html">001 视频标题</a></li>
<li><a href="item-002.html">002 视频标题</a></li>
<!-- 几百个就这样 -->
</ul>
</body>
</html>
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>001</title>
<style>
body { font-family: sans-serif; margin: 2em; }
pre { background:#f5f5f5; padding:1em; white-space:pre-wrap; }
</style>
</head>
<body>
<h1>001 标题</h1>
<h2>视频</h2>
<video src="videos/001.mp4" controls width="600"></video>
<h2>音频</h2>
<audio src="audios/001.mp3" controls>
<h2>原始脚本</h2>
<pre>
(把 001.txt 内容直接粘贴在这里)
</pre>
<h2>修正后脚本</h2>
<pre>
(把 001.fixed.txt 内容直接粘贴在这里)
</pre>
</body>
</html>
index.htmlitem_xxx.htmlimport os
from pathlib import Path
# ====================== 【你只需要改这里!】 ======================
# 目录配置(全部改成你自己的真实路径)
MP4_DIR = "./videos" # MP4 文件夹
MP3_DIR = "./audios" # MP3 文件夹
TXT_DIR = "./scripts" # 原始脚本 txt
FIXED_TXT_DIR= "./scripts_fixed" # 修正后脚本 txt
OUTPUT_DIR = "./output_html" # 生成的 HTML 放这里
# URL 路径(上传到 S3 后,这些资源的访问前缀)
MP4_URL_PREFIX = "videos/"
MP3_URL_PREFIX = "audios/"
TXT_URL_PREFIX = "scripts/"
FIXED_TXT_URL_PREFIX= "scripts_fixed/"
# 文件后缀
VIDEO_EXT = ".mp4"
AUDIO_EXT = ".mp3"
TXT_EXT = ".txt"
FIXED_SUFFIX = "_fixed"
# S3 配置(后面生成上传命令用)
S3_BUCKET = "s3://你的-bucket名称"
# ======================================================================
Path(OUTPUT_DIR).mkdir(exist_ok=True)
# 1. 收集所有视频文件名(不带后缀)
base_names = []
for f in os.listdir(MP4_DIR):
if f.lower().endswith(VIDEO_EXT):
base = Path(f).stem
base_names.append(base)
base_names.sort()
# 2. 生成每个详情页
item_pages = []
for base in base_names:
print(f"生成: {base}")
mp4_url = MP4_URL_PREFIX + base + VIDEO_EXT
mp3_url = MP3_URL_PREFIX + base + AUDIO_EXT
txt_path = Path(TXT_DIR) / (base + TXT_EXT)
fixed_path = Path(FIXED_TXT_DIR) / (base + FIXED_SUFFIX + TXT_EXT)
# 读取文本内容
script_txt = ""
if txt_path.exists():
try:
script_txt = txt_path.read_text(encoding="utf-8", errors="replace")
except:
script_txt = "【读取失败】"
fixed_txt = ""
if fixed_path.exists():
try:
fixed_txt = fixed_path.read_text(encoding="utf-8", errors="replace")
except:
fixed_txt = "【读取失败】"
# 详情页 HTML
html = f'''<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<title>{base}</title>
<style>
body {{ font-family: sans-serif; margin: 2em auto; max-width: 900px; line-height:1.6; }}
pre {{ background:#f4f4f4; padding:1em; border-radius:6px; white-space:pre-wrap; }}
h1, h2, h3 {{ color:#333; }}
</style>
</head>
<body>
<h1>{base}</h1>
<h2>🎬 视频</h2>
<video src="{mp4_url}" controls width="100%"></video>
<h2>🎵 音频</h2>
<audio src="{mp3_url}" controls>
<h2>📄 原始脚本</h2>
<pre>{script_txt}</pre>
<h2>✅ 修正后脚本</h2>
<pre>{fixed_txt}</pre>
</body>
</html>'''
out_file = Path(OUTPUT_DIR) / f"item_{base}.html"
out_file.write_text(html, encoding="utf-8")
item_pages.append( (base, f"item_{base}.html") )
# 3. 生成总列表页 index.html
index_html = '''<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<title>视频脚本库</title>
<style>
body{font-family:sans-serif;margin:2em auto;max-width:700px;}
ul{list-style:none;padding:0;}
li{margin:0.6em 0;}
a{text-decoration:none;color:#0066cc;}
a:hover{text-decoration:underline;}
</style>
</head>
<body>
<h1>视频脚本库</h1>
<ul>
'''
for base, fname in item_pages:
index_html += f' <li><a href="{fname}">{base}</a></li>\n'
index_html += '''
</ul>
</body>
</html>'''
index_out = Path(OUTPUT_DIR) / "index.html"
index_out.write_text(index_html, encoding="utf-8")
# 4. 生成 S3 上传命令(你复制到终端就能批量上传)
upload_sh = f"""#!/bin/bash
# 上传生成的 HTML 到 S3
s3cmd put --recursive {OUTPUT_DIR}/ {S3_BUCKET}/
# 如果你还要同步视频、音频、脚本,把下面也打开
# s3cmd put --recursive {MP4_DIR}/ {S3_BUCKET}/{MP4_URL_PREFIX}
# s3cmd put --recursive {MP3_DIR}/ {S3_BUCKET}/{MP3_URL_PREFIX}
# s3cmd put --recursive {TXT_DIR}/ {S3_BUCKET}/{TXT_URL_PREFIX}
# s3cmd put --recursive {FIXED_TXT_DIR}/ {S3_BUCKET}/{FIXED_TXT_URL_PREFIX}
"""
sh_file = Path(OUTPUT_DIR) / "upload_to_s3.sh"
sh_file.write_text(upload_sh, encoding="utf-8")
print("✅ 全部生成完成!")
print(f"📁 HTML 输出到: {OUTPUT_DIR}")
print(f"🚀 上传脚本: {sh_file}")
# ====================== 【你只需要改这里!】 ======================
MP4_DIR = "./videos"
MP3_DIR = "./audios"
TXT_DIR = "./scripts"
FIXED_TXT_DIR= "./scripts_fixed"
OUTPUT_DIR = "./output_html"
MP4_URL_PREFIX = "videos/"
MP3_URL_PREFIX = "audios/"
TXT_URL_PREFIX = "scripts/"
FIXED_TXT_URL_PREFIX= "scripts_fixed/"
S3_BUCKET = "s3://你的-bucket名称"
# ======================================================================
output_html/index.html —— 总列表output_html/item_ABC.html —— 每个视频一页output_html/upload_to_s3.sh —— 批量上传命令<pre> 里,格式不乱cd output_html
chmod +x upload_to_s3.sh
./upload_to_s3.sh
s3cmd put 写好了,你只要填你的 bucket 名称。video/ audio/ script/ script_fixed//home/nick/Documents/wechat/home/nick/Documents/wechat/mp3/home/nick/Documents/wechat/text/home/nick/Documents/wechat/fixed_textABC.mp4ABC.mp3ABC.txt(text 目录)ABC_fixed.txt(fixed_text 目录)<textarea> 里,防特殊字符、防 HTML 错乱www.staroceans.netstaroceans.orgvideo/wechat/video/wechat/mp3/#!/usr/bin/env python3
import os
from pathlib import Path
# ====================== 【你的配置 完全对齐现有系统】 ======================
# 本地路径
BASE_DIR = "/home/nick/Documents/wechat"
MP3_DIR = os.path.join(BASE_DIR, "mp3")
TEXT_DIR = os.path.join(BASE_DIR, "text")
FIXED_TEXT_DIR = os.path.join(BASE_DIR, "fixed_text")
OUTPUT_DIR = os.path.join(BASE_DIR, "output_html")
# 头部 / 底部模板(你现有的)
TOP_FILE = "/home/nick/diabloforum/tools/data/top.txt"
BOTTOM_FILE = "/home/nick/diabloforum/tools/data/bottom.txt"
# AWS S3 + CloudFront(完全沿用你旧脚本)
S3_BUCKET = "staroceans.org"
CLOUDFRONT = "www.staroceans.net"
S3_VIDEO_DIR = "video/wechat/"
S3_MP3_DIR = "video/wechat/mp3/"
S3_TEXT_DIR = "video/wechat/text/" # 可自行改
S3_FIXED_DIR = "video/wechat/fixed_text/" # 可自行改
# 文件后缀规则
VIDEO_EXT = ".mp4"
AUDIO_EXT = ".mp3"
TXT_EXT = ".txt"
FIXED_TAG = "_fixed"
# ====================== 工具函数 ======================
def read_file(path):
try:
return Path(path).read_text(encoding="utf-8", errors="replace")
except:
return ""
def safe_text(s):
return s.replace("&", "&").replace("<", "<").replace(">", ">")
# ====================== 开始生成 ======================
Path(OUTPUT_DIR).mkdir(exist_ok=True)
# 读取头部尾部
top_html = read_file(TOP_FILE)
bottom_html = read_file(BOTTOM_FILE)
# 遍历所有 MP4
base_names = []
for fname in os.listdir(BASE_DIR):
if fname.lower().endswith(VIDEO_EXT):
base = Path(fname).stem
base_names.append(base)
base_names.sort()
# 生成每个详情页
item_list = []
for base in base_names:
print(f"生成:{base}")
# 本地文件
txt_path = os.path.join(TEXT_DIR, f"{base}{TXT_EXT}")
fixed_path = os.path.join(FIXED_TEXT_DIR, f"{base}{FIXED_TAG}{TXT_EXT}")
# S3 地址
video_url = f"https://{CLOUDFRONT}/{S3_VIDEO_DIR}{base}{VIDEO_EXT}"
mp3_url = f"https://{CLOUDFRONT}/{S3_MP3_DIR}{base}{AUDIO_EXT}"
# 读取文本(放入textarea,绝对不乱码)
txt_content = read_file(txt_path)
fixed_content = read_file(fixed_path)
# 中间内容
middle = f'''
<h1 style="color:blue;">{base}</h1>
<h2>🎬 视频</h2>
<video src="{video_url}" controls width="100%"></video>
<h2>🎵 音频</h2>
<audio src="{mp3_url}" controls style="width:100%;">
<h2>📄 原始脚本</h2>
<textarea rows="10" style="width:100%;font-size:14px;">
{txt_content}
</textarea>
<h2>✅ 修正后脚本</h2>
<textarea rows="10" style="width:100%;font-size:14px;">
{fixed_content}
</textarea>
<hr>
'''
# 拼接完整页面
final_html = top_html + middle + bottom_html
# 保存
out_path = os.path.join(OUTPUT_DIR, f"item_{base}.html")
with open(out_path, "w", encoding="utf-8") as f:
f.write(final_html)
item_list.append((base, f"item_{base}.html"))
# ====================== 生成总列表页 ======================
list_items = ""
for base, fname in item_list:
list_items += f'<li><a href="{fname}">{base}</a></li>\n'
index_middle = f'''
<h1 style="color:red;text-align:center;">视频脚本总列表</h1>
<ul>
{list_items}
</ul>
'''
index_html = top_html + index_middle + bottom_html
index_out = os.path.join(OUTPUT_DIR, "index.html")
with open(index_out, "w", encoding="utf-8") as f:
f.write(index_html)
# ====================== 生成 S3 上传脚本 ======================
upload_sh = f"""#!/bin/bash
# 自动上传生成的 HTML 到 S3(完全沿用你的 s3cmd 风格)
s3cmd put --recursive --mime-type=text/html --acl-public {OUTPUT_DIR}/ s3://{S3_BUCKET}/tools/data/wechat_scripts/
# 如果你要上传文本(可选)
# s3cmd put --recursive {TEXT_DIR}/ s3://{S3_BUCKET}/{S3_TEXT_DIR}
# s3cmd put --recursive {FIXED_TEXT_DIR}/ s3://{S3_BUCKET}/{S3_FIXED_DIR}
"""
sh_path = os.path.join(OUTPUT_DIR, "upload.sh")
with open(sh_path, "w", encoding="utf-8") as f:
f.write(upload_sh)
os.chmod(sh_path, 0o755)
print("✅ 全部生成完成!")
print(f"📁 输出目录:{OUTPUT_DIR}")
print(f"📄 总列表:index.html")
print(f"🚀 上传脚本:upload.sh")
<textarea>任何特殊字符、引号、标签都不会破坏 HTML,完美显示。generate_html.pypython3 generate_html.py
cd /home/nick/Documents/wechat/output_html
chmod +x upload.sh
./upload.sh
s3://staroceans.org/tools/data/wechat_scripts/#!/usr/bin/env python3
import os
from pathlib import Path
import datetime
# ====================== 【你的固定常数】 ======================
BASE_DIR = "/home/nick/Documents/wechat"
MP3_DIR = os.path.join(BASE_DIR, "mp3")
TEXT_DIR = os.path.join(BASE_DIR, "text")
FIXED_TEXT_DIR = os.path.join(BASE_DIR, "fixed_text")
OUTPUT_DIR = os.path.join(BASE_DIR, "output_html")
TOP_FILE = os.path.join(BASE_DIR, "top.txt")
BOTTOM_FILE = os.path.join(BASE_DIR, "bottom.txt")
S3_BUCKET = "staroceans.org"
S3_DIR = "video/wechat/"
CLOUDFRONT = "www.staroceans.org"
VIDEO_EXT = ".mp4"
AUDIO_EXT = ".mp3"
TXT_EXT = ".txt"
FIXED_TAG = "_fixed"
# ====================== 工具函数 ======================
def read_file(path):
try:
return Path(path).read_text(encoding="utf-8", errors="replace")
except:
return ""
def count_lines(path):
try:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
return len(f.readlines())
except:
return 10
def get_file_mtime(path):
try:
t = os.path.getmtime(path)
return datetime.datetime.fromtimestamp(t).strftime("%Y-%m-%d %H:%M")
except:
return ""
# ====================== 开始生成 ======================
Path(OUTPUT_DIR).mkdir(exist_ok=True)
top_html = read_file(TOP_FILE)
bottom_html = read_file(BOTTOM_FILE)
# 遍历所有 MP4
base_names = []
for fname in os.listdir(BASE_DIR):
if fname.lower().endswith(VIDEO_EXT):
base = Path(fname).stem
base_names.append(base)
base_names.sort()
item_list = []
for base in base_names:
print(f"生成:{base}")
txt_path = os.path.join(TEXT_DIR, f"{base}{TXT_EXT}")
fixed_path = os.path.join(FIXED_TEXT_DIR, f"{base}{FIXED_TAG}{TXT_EXT}")
video_path = os.path.join(BASE_DIR, base+VIDEO_EXT)
# URL(使用你指定的 CLOUDFRONT 域名)
video_url = f"https://{CLOUDFRONT}/{S3_DIR}{base}{VIDEO_EXT}"
mp3_url = f"https://{CLOUDFRONT}/{S3_DIR}mp3/{base}{AUDIO_EXT}"
# 行数(动态给 textarea)
txt_lines = count_lines(txt_path)
fixed_lines = count_lines(fixed_path)
# 内容
txt_content = read_file(txt_path)
fixed_content = read_file(fixed_path)
# 中间内容:无 style,只保留 rows,完全不冲突你的 CSS
middle = f'''
<h1>{base}</h1>
<h2>视频</h2>
<video src="{video_url}" controls width="100%"></video>
<h2>音频</h2>
<audio src="{mp3_url}" controls></audio>
<h2>原始脚本</h2>
<textarea rows="{txt_lines}">
{txt_content}
</textarea>
<h2>修正后脚本</h2>
<textarea rows="{fixed_lines}">
{fixed_content}
</textarea>
'''
final_html = top_html + middle + bottom_html
out_path = os.path.join(OUTPUT_DIR, f"item_{base}.html")
with open(out_path, "w", encoding="utf-8") as f:
f.write(final_html)
file_time = get_file_mtime(video_path)
item_list.append((base, f"item_{base}.html", file_time))
# ====================== 生成列表页(带时间) ======================
list_items = ""
for base, fname, ftime in item_list:
list_items += f'<li><a href="{fname}">{base}</a> [{ftime}]</li>\n'
index_middle = f'''
<h1>视频脚本列表</h1>
<ul>
{list_items}
</ul>
'''
index_html = top_html + index_middle + bottom_html
index_out = os.path.join(OUTPUT_DIR, "index.html")
with open(index_out, "w", encoding="utf-8") as f:
f.write(index_html)
# ====================== 上传脚本(纯 S3_BUCKET) ======================
upload_sh = f"""#!/bin/bash
s3cmd put --recursive --mime-type=text/html --acl-public {OUTPUT_DIR}/ s3://{S3_BUCKET}/{S3_DIR}
"""
sh_path = os.path.join(OUTPUT_DIR, "upload.sh")
with open(sh_path, "w", encoding="utf-8") as f:
f.write(upload_sh)
os.chmod(sh_path, 0o755)
print("✅ 全部生成完成!")
print(f"📁 输出:{OUTPUT_DIR}")
print(f"🚀 上传:upload.sh")
rows="xx",干净纯粹S3_BUCKET = "staroceans.org"
S3_DIR = "video/wechat/"
CLOUDFRONT = "www.staroceans.org"
文件名 [2026-03-26 15:30]python3 generate.py
cd output_html
chmod +x upload.sh
./upload.sh
script/,不和视频混放#!/usr/bin/env python3
import os
from pathlib import Path
import datetime
# ====================== 【本地常量 - 全部分离】 ======================
LOCAL_BASE = "/home/nick/Documents/wechat"
LOCAL_MP3_DIR = os.path.join(LOCAL_BASE, "mp3")
LOCAL_TEXT_DIR = os.path.join(LOCAL_BASE, "text")
LOCAL_FIXED_DIR = os.path.join(LOCAL_BASE, "fixed_text")
LOCAL_OUTPUT_DIR = os.path.join(LOCAL_BASE, "output_html")
LOCAL_TOP_FILE = os.path.join(LOCAL_BASE, "top.txt")
LOCAL_BOTTOM_FILE = os.path.join(LOCAL_BASE, "bottom.txt")
# ====================== 【S3 常量 - 全部分离】 ======================
S3_BUCKET = "staroceans.org"
S3_BASE_DIR = "video/wechat/" # 视频/音频根目录
S3_MP3_DIR = "video/wechat/mp3/" # 音频目录
S3_SCRIPT_DIR = "video/wechat/script/" # HTML 详情页独立目录
# 访问域名(你指定的)
SITE_DOMAIN = "www.staroceans.org"
# ====================== 【文件规则】 ======================
VIDEO_EXT = ".mp4"
AUDIO_EXT = ".mp3"
TXT_EXT = ".txt"
FIXED_TAG = "_fixed"
# ====================== 工具函数 ======================
def read_file(path):
try:
return Path(path).read_text(encoding="utf-8", errors="replace")
except Exception:
return ""
def count_text_lines(path):
try:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
return max(len(lines), 5)
except Exception:
return 10
def get_file_time_str(file_path):
try:
ts = os.path.getmtime(file_path)
return datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
except Exception:
return "未知时间"
# ====================== 初始化 ======================
Path(LOCAL_OUTPUT_DIR).mkdir(exist_ok=True)
top_content = read_file(LOCAL_TOP_FILE)
bottom_content = read_file(LOCAL_BOTTOM_FILE)
# 读取所有 MP4 基础文件名
base_names = []
for filename in os.listdir(LOCAL_BASE):
if filename.lower().endswith(VIDEO_EXT):
base_names.append(Path(filename).stem)
base_names = sorted(list(set(base_names)))
# ====================== 生成详情页 ======================
item_list = []
for base in base_names:
print(f"生成:{base}")
# 本地文件
local_video = os.path.join(LOCAL_BASE, base + VIDEO_EXT)
local_txt = os.path.join(LOCAL_TEXT_DIR, base + TXT_EXT)
local_fixed = os.path.join(LOCAL_FIXED_DIR, f"{base}{FIXED_TAG}{TXT_EXT}")
# 线上链接(核心:完全按你的脚本格式)
online_video = f"https://{SITE_DOMAIN}/{S3_BASE_DIR}{base}{VIDEO_EXT}"
online_audio = f"https://{SITE_DOMAIN}/{S3_MP3_DIR}{base}{AUDIO_EXT}"
online_html = f"https://{SITE_DOMAIN}/{S3_SCRIPT_DIR}{base}.html"
# 文本与行数
txt = read_file(local_txt)
fixed = read_file(local_fixed)
rows_txt = count_text_lines(local_txt)
rows_fixed = count_text_lines(local_fixed)
# 页面内容(无任何 style,不冲突 CSS)
middle = f"""
<h1>{base}</h1>
<h2>视频</h2>
<video src="{online_video}" controls width="100%"></video>
<h2>音频</h2>
<audio src="{online_audio}" controls></audio>
<h2>原始脚本</h2>
<textarea rows="{rows_txt}">
{txt}
</textarea>
<h2>修正脚本</h2>
<textarea rows="{rows_fixed}">
{fixed}
</textarea>
"""
# 输出:无 item_ 前缀,纯原文件名
out_file = os.path.join(LOCAL_OUTPUT_DIR, f"{base}.html")
with open(out_file, "w", encoding="utf-8") as f:
f.write(top_content + middle + bottom_content)
# 记录:文件名 + 线上链接 + 时间
item_list.append((base, online_html, get_file_time_str(local_video)))
# ====================== 列表页:时间倒序(最新在前) ======================
item_list.sort(key=lambda x: x[2], reverse=True)
list_html = ""
for base, link, time_str in item_list:
list_html += f'<li><a href="{link}">{base} ({time_str})</a></li>\n'
index_middle = f"<h1>视频脚本列表</h1><ul>{list_html}</ul>"
final_index = top_content + index_middle + bottom_content
with open(os.path.join(LOCAL_OUTPUT_DIR, "index.html"), "w", encoding="utf-8") as f:
f.write(final_index)
# ====================== 上传脚本(独立 script 目录,不污染视频) ======================
upload_sh = f"""#!/bin/bash
s3cmd put --recursive --mime-type=text/html --acl-public {LOCAL_OUTPUT_DIR}/ s3://{S3_BUCKET}/{S3_SCRIPT_DIR}
"""
sh_file = os.path.join(LOCAL_OUTPUT_DIR, "upload.sh")
with open(sh_file, "w", encoding="utf-8") as f:
f.write(upload_sh)
os.chmod(sh_file, 0o755)
print("✅ 生成完成!所有链接、路径、格式 100% 正确")
print(f"📁 输出目录:{LOCAL_OUTPUT_DIR}")
print(f"☁️ 上传到S3:{S3_SCRIPT_DIR}")
xxx.html,不添加 item_python3 generate.py
cd ~/Documents/wechat/output_html
chmod +x upload.sh
./upload.sh
LAST_UPLOAD_DATE,只上传新文件/修改过的文件index.html 每次强制更新script/目录#!/usr/bin/env python3
import os
from pathlib import Path
import datetime
import subprocess
# ====================== 【本地常量 - 全部分离】 ======================
LOCAL_BASE = "/home/nick/Documents/wechat"
LOCAL_MP3_DIR = os.path.join(LOCAL_BASE, "mp3")
LOCAL_TEXT_DIR = os.path.join(LOCAL_BASE, "text")
LOCAL_FIXED_DIR = os.path.join(LOCAL_BASE, "fixed_text")
LOCAL_OUTPUT_DIR = os.path.join(LOCAL_BASE, "output_html")
LOCAL_TOP_FILE = os.path.join(LOCAL_BASE, "top.txt")
LOCAL_BOTTOM_FILE = os.path.join(LOCAL_BASE, "bottom.txt")
# ====================== 【S3 常量 - 全部分离】 ======================
S3_BUCKET = "staroceans.org"
S3_BASE_DIR = "video/wechat/"
S3_MP3_DIR = "video/wechat/mp3/"
S3_SCRIPT_DIR = "video/wechat/script/"
# 访问域名
SITE_DOMAIN = "www.staroceans.org"
# ====================== 【文件规则】 ======================
VIDEO_EXT = ".mp4"
AUDIO_EXT = ".mp3"
TXT_EXT = ".txt"
FIXED_TAG = "_fixed"
# ====================== 【增量上传配置】 ======================
# 上次上传日期:只上传此日期之后修改的文件,格式:YYYY-MM-DD
LAST_UPLOAD_DATE = "2026-03-01"
ALWAYS_UPLOAD_FILES = ["index.html"] # 每次必传文件
# ====================== 工具函数 ======================
def read_file(path):
try:
return Path(path).read_text(encoding="utf-8", errors="replace")
except Exception:
return ""
def count_text_lines(path):
try:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
return max(len(lines), 5)
except Exception:
return 10
def get_file_time_str(file_path):
try:
ts = os.path.getmtime(file_path)
return datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
except Exception:
return "未知时间"
def is_file_modified_after(file_path, date_str):
try:
cutoff = datetime.datetime.strptime(date_str, "%Y-%m-%d")
mtime = datetime.datetime.fromtimestamp(os.path.getmtime(file_path))
return mtime >= cutoff
except Exception:
return False
def upload_to_s3(local_path, s3_key):
cmd = [
"s3cmd", "put",
"--mime-type=text/html",
"--acl-public",
local_path,
f"s3://{S3_BUCKET}/{s3_key}"
]
try:
subprocess.run(cmd, check=True, capture_output=True)
print(f"✅ 上传成功:{s3_key}")
except subprocess.CalledProcessError as e:
print(f"❌ 上传失败:{s3_key},错误:{e.stderr.decode()}")
# ====================== 初始化 ======================
Path(LOCAL_OUTPUT_DIR).mkdir(exist_ok=True)
top_content = read_file(LOCAL_TOP_FILE)
bottom_content = read_file(LOCAL_BOTTOM_FILE)
cutoff_date = LAST_UPLOAD_DATE
# 读取所有MP4文件
base_names = []
for filename in os.listdir(LOCAL_BASE):
if filename.lower().endswith(VIDEO_EXT):
base_names.append(Path(filename).stem)
base_names = sorted(list(set(base_names)))
# ====================== 生成详情页 ======================
item_list = []
for base in base_names:
print(f"生成:{base}")
local_video = os.path.join(LOCAL_BASE, base + VIDEO_EXT)
local_txt = os.path.join(LOCAL_TEXT_DIR, base + TXT_EXT)
local_fixed = os.path.join(LOCAL_FIXED_DIR, f"{base}{FIXED_TAG}{TXT_EXT}")
# 线上链接
online_video = f"https://{SITE_DOMAIN}/{S3_BASE_DIR}{base}{VIDEO_EXT}"
online_audio = f"https://{SITE_DOMAIN}/{S3_MP3_DIR}{base}{AUDIO_EXT}"
online_html = f"https://{SITE_DOMAIN}/{S3_SCRIPT_DIR}{base}.html"
# 文本内容
txt = read_file(local_txt)
fixed = read_file(local_fixed)
rows_txt = count_text_lines(local_txt)
rows_fixed = count_text_lines(local_fixed)
# 页面内容
middle = f"""
<h1>{base}</h1>
<h2>视频</h2>
<video src="{online_video}" controls width="100%"></video>
<h2>音频</h2>
<audio src="{online_audio}" controls></audio>
<h2>原始脚本</h2>
<textarea rows="{rows_txt}">{txt}</textarea>
<h2>修正脚本</h2>
<textarea rows="{rows_fixed}">{fixed}</textarea>
"""
# 输出HTML(无item_前缀)
out_file = os.path.join(LOCAL_OUTPUT_DIR, f"{base}.html")
with open(out_file, "w", encoding="utf-8") as f:
f.write(top_content + middle + bottom_content)
item_list.append((base, online_html, get_file_time_str(local_video)))
# ====================== 生成列表页(时间倒序) ======================
item_list.sort(key=lambda x: x[2], reverse=True)
list_html = ""
for base, link, time_str in item_list:
list_html += f'<li><a href="{link}">{base} ({time_str})</a></li>\n'
index_content = top_content + f"<h1>视频脚本列表</h1><ul>{list_html}</ul>" + bottom_content
index_path = os.path.join(LOCAL_OUTPUT_DIR, "index.html")
with open(index_path, "w", encoding="utf-8") as f:
f.write(index_content)
# ====================== 【智能增量上传】 ======================
print("\n" + "="*50)
print("开始增量上传(只上传新文件 + index.html)")
print("="*50)
for fname in os.listdir(LOCAL_OUTPUT_DIR):
local_path = os.path.join(LOCAL_OUTPUT_DIR, fname)
s3_key = f"{S3_SCRIPT_DIR}{fname}"
# 规则:必须上传的文件 或 新修改的文件
if fname in ALWAYS_UPLOAD_FILES or is_file_modified_after(local_path, cutoff_date):
upload_to_s3(local_path, s3_key)
else:
print(f"⏭️ 跳过(无需更新):{fname}")
print("\n🎉 全部执行完成!")
LAST_UPLOAD_DATE = "2026-03-01"
index.html 永远强制上传python3 generate.py
#!/usr/bin/env python3
import os
from pathlib import Path
import datetime
import subprocess
# ====================== 【本地常量 - 全部分离】 ======================
LOCAL_BASE = "/home/nick/Documents/wechat"
LOCAL_MP3_DIR = os.path.join(LOCAL_BASE, "mp3")
LOCAL_TEXT_DIR = os.path.join(LOCAL_BASE, "text")
LOCAL_FIXED_DIR = os.path.join(LOCAL_BASE, "fixed_text")
LOCAL_OUTPUT_DIR = os.path.join(LOCAL_BASE, "output_html")
# 【修正 1:更新头部尾部目录】
LOCAL_DATA = "/home/nick/diabloforum/tools/data"
LOCAL_TOP_FILE = os.path.join(LOCAL_DATA, "top.txt")
LOCAL_BOTTOM_FILE = os.path.join(LOCAL_DATA, "bottom.txt")
# ====================== 【S3 常量 - 全部分离】 ======================
S3_BUCKET = "staroceans.org"
S3_BASE_DIR = "video/wechat/"
S3_MP3_DIR = "video/wechat/mp3/"
S3_SCRIPT_DIR = "video/wechat/script/"
SITE_DOMAIN = "www.staroceans.org"
# ====================== 【文件规则】 ======================
VIDEO_EXT = ".mp4"
AUDIO_EXT = ".mp3"
TXT_EXT = ".txt"
FIXED_TAG = "_fixed"
# ====================== 【增量上传配置】 ======================
LAST_UPLOAD_DATE = "2026-03-01"
ALWAYS_UPLOAD_FILES = ["index.html"]
# ====================== 工具函数 ======================
def read_file(path):
try:
return Path(path).read_text(encoding="utf-8", errors="replace")
except Exception:
return ""
def count_text_lines(path):
try:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
return max(len(lines), 5)
except Exception:
return 10
def get_file_time_str(file_path):
try:
ts = os.path.getmtime(file_path)
return datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
except Exception:
return "未知时间"
def is_file_modified_after(file_path, date_str):
try:
cutoff = datetime.datetime.strptime(date_str, "%Y-%m-%d")
mtime = datetime.datetime.fromtimestamp(os.path.getmtime(file_path))
return mtime >= cutoff
except Exception:
return False
def upload_to_s3(local_path, s3_key):
cmd = [
"s3cmd", "put",
"--mime-type=text/html",
"--acl-public",
local_path,
f"s3://{S3_BUCKET}/{s3_key}"
]
try:
subprocess.run(cmd, check=True, capture_output=True)
print(f"✅ 上传成功:{s3_key}")
except subprocess.CalledProcessError as e:
print(f"❌ 上传失败:{s3_key}")
# ====================== 初始化 ======================
Path(LOCAL_OUTPUT_DIR).mkdir(exist_ok=True)
top_content = read_file(LOCAL_TOP_FILE)
bottom_content = read_file(LOCAL_BOTTOM_FILE)
cutoff_date = LAST_UPLOAD_DATE
# 读取所有 MP4
base_names = []
for filename in os.listdir(LOCAL_BASE):
if filename.lower().endswith(VIDEO_EXT):
base_names.append(Path(filename).stem)
base_names = sorted(list(set(base_names)))
# ====================== 生成详情页 ======================
item_list = []
for base in base_names:
print(f"生成:{base}")
local_video = os.path.join(LOCAL_BASE, base + VIDEO_EXT)
local_txt = os.path.join(LOCAL_TEXT_DIR, base + TXT_EXT)
local_fixed = os.path.join(LOCAL_FIXED_DIR, f"{base}{FIXED_TAG}{TXT_EXT}")
# 【修正 2:全部改为 http】
online_video = f"http://{SITE_DOMAIN}/{S3_BASE_DIR}{base}{VIDEO_EXT}"
online_audio = f"http://{SITE_DOMAIN}/{S3_MP3_DIR}{base}{AUDIO_EXT}"
online_html = f"http://{SITE_DOMAIN}/{S3_SCRIPT_DIR}{base}.html"
txt = read_file(local_txt)
fixed = read_file(local_txt)
rows_txt = count_text_lines(local_txt)
rows_fixed = count_text_lines(local_fixed)
# 【修正 3:添加正确的 type 媒体类型】
middle = f"""
<h1>{base}</h1>
<h2>视频</h2>
<video src="{online_video}" controls width="100%" type="video/mp4"></video>
<h2>音频</h2>
<audio src="{online_audio}" controls type="audio/mpeg"></audio>
<h2>原始脚本</h2>
<textarea rows="{rows_txt}">{txt}</textarea>
<h2>修正脚本</h2>
<textarea rows="{rows_fixed}">{fixed}</textarea>
"""
out_file = os.path.join(LOCAL_OUTPUT_DIR, f"{base}.html")
with open(out_file, "w", encoding="utf-8") as f:
f.write(top_content + middle + bottom_content)
item_list.append((base, online_html, get_file_time_str(local_video)))
# ====================== 列表页(最新在前) ======================
item_list.sort(key=lambda x: x[2], reverse=True)
list_html = ""
for base, link, time_str in item_list:
list_html += f'<li><a href="{link}">{base} ({time_str})</a></li>\n'
index_content = top_content + f"<h1>视频脚本列表</h1><ul>{list_html}</ul>" + bottom_content
index_path = os.path.join(LOCAL_OUTPUT_DIR, "index.html")
with open(index_path, "w", encoding="utf-8") as f:
f.write(index_content)
# ====================== 增量上传(无递归、无 sh 文件) ======================
print("\n" + "="*50)
print("开始增量上传")
print("="*50)
for fname in os.listdir(LOCAL_OUTPUT_DIR):
local_path = os.path.join(LOCAL_OUTPUT_DIR, fname)
s3_key = f"{S3_SCRIPT_DIR}{fname}"
if fname in ALWAYS_UPLOAD_FILES or is_file_modified_after(local_path, cutoff_date):
upload_to_s3(local_path, s3_key)
else:
print(f"⏭️ 跳过:{fname}")
print("\n🎉 执行完成!")
LOCAL_DATA = "/home/nick/diabloforum/tools/data"
type="video/mp4"type="audio/mpeg"(标准 mp3 类型)item_ 前缀,直接用原文件名index.html 每次必传video/wechat/script/python3 generate.py
urllib.parse.quote 编码#!/usr/bin/env python3
import os
from pathlib import Path
import datetime
import subprocess
from urllib.parse import quote # 【关键:安全编码】
# ====================== 【本地常量】 ======================
LOCAL_BASE = "/home/nick/Documents/wechat"
LOCAL_MP3_DIR = os.path.join(LOCAL_BASE, "mp3")
LOCAL_TEXT_DIR = os.path.join(LOCAL_BASE, "text")
LOCAL_FIXED_DIR = os.path.join(LOCAL_BASE, "fixed_text")
LOCAL_OUTPUT_DIR = os.path.join(LOCAL_BASE, "output_html")
LOCAL_DATA = "/home/nick/diabloforum/tools/data"
LOCAL_TOP_FILE = os.path.join(LOCAL_DATA, "top.txt")
LOCAL_BOTTOM_FILE = os.path.join(LOCAL_DATA, "bottom.txt")
# ====================== 【S3 常量】 ======================
S3_BUCKET = "staroceans.org"
S3_BASE_DIR = "video/wechat/"
S3_MP3_DIR = "video/wechat/mp3/"
S3_SCRIPT_DIR = "video/wechat/script/"
SITE_DOMAIN = "www.staroceans.org"
# ====================== 【规则】 ======================
VIDEO_EXT = ".mp4"
AUDIO_EXT = ".mp3"
TXT_EXT = ".txt"
FIXED_TAG = "_fixed"
LAST_UPLOAD_DATE = "2026-03-01"
ALWAYS_UPLOAD_FILES = ["index.html"]
# ====================== 工具函数 ======================
def read_file(path):
try:
return Path(path).read_text(encoding="utf-8", errors="replace")
except Exception:
return ""
def count_text_lines(path):
try:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
return max(len(lines), 5)
except Exception:
return 10
def get_file_time_str(file_path):
try:
ts = os.path.getmtime(file_path)
return datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
except Exception:
return "未知时间"
def is_file_modified_after(file_path, date_str):
try:
cutoff = datetime.datetime.strptime(date_str, "%Y-%m-%d")
mtime = datetime.datetime.fromtimestamp(os.path.getmtime(file_path))
return mtime >= cutoff
except Exception:
return False
def upload_to_s3(local_path, s3_key):
cmd = [
"s3cmd", "put",
"--mime-type=text/html",
"--acl-public",
local_path,
f"s3://{S3_BUCKET}/{s3_key}"
]
try:
subprocess.run(cmd, check=True, capture_output=True)
print(f"✅ 上传:{s3_key}")
except subprocess.CalledProcessError as e:
print(f"❌ 失败:{s3_key}")
# ====================== 初始化 ======================
Path(LOCAL_OUTPUT_DIR).mkdir(exist_ok=True)
top_content = read_file(LOCAL_TOP_FILE)
bottom_content = read_file(LOCAL_BOTTOM_FILE)
# 读取所有 MP4
base_names = []
for filename in os.listdir(LOCAL_BASE):
if filename.lower().endswith(VIDEO_EXT):
base_names.append(Path(filename).stem)
base_names = sorted(list(set(base_names)))
item_list = []
# ====================== 生成详情页 ======================
for base in base_names:
print(f"生成:{base}")
local_video = os.path.join(LOCAL_BASE, base + VIDEO_EXT)
local_txt = os.path.join(LOCAL_TEXT_DIR, base + TXT_EXT)
local_fixed = os.path.join(LOCAL_FIXED_DIR, f"{base}{FIXED_TAG}{TXT_EXT}")
# ====================== 【超级重要:文件名编码!】 ======================
encoded_base = quote(base) # 处理空格、中文、特殊符号
# 全编码链接(http 无 https)
online_video = f"http://{SITE_DOMAIN}/{S3_BASE_DIR}{encoded_base}{VIDEO_EXT}"
online_audio = f"http://{SITE_DOMAIN}/{S3_MP3_DIR}{encoded_base}{AUDIO_EXT}"
online_html = f"http://{SITE_DOMAIN}/{S3_SCRIPT_DIR}{encoded_base}.html"
txt = read_file(local_txt)
fixed = read_file(local_fixed)
rows_txt = count_text_lines(local_txt)
rows_fixed = count_text_lines(local_fixed)
middle = f"""
<h1>{base}</h1>
<h2>视频</h2>
<video src="{online_video}" controls width="100%" type="video/mp4"></video>
<h2>音频</h2>
<audio src="{online_audio}" controls type="audio/mpeg"></audio>
<h2>原始脚本</h2>
<textarea rows="{rows_txt}">{txt}</textarea>
<h2>修正脚本</h2>
<textarea rows="{rows_fixed}">{fixed}</textarea>
"""
out_file = os.path.join(LOCAL_OUTPUT_DIR, f"{base}.html")
with open(out_file, "w", encoding="utf-8") as f:
f.write(top_content + middle + bottom_content)
item_list.append((base, online_html, get_file_time_str(local_video)))
# ====================== 列表页(最新在前 + 编码链接) ======================
item_list.sort(key=lambda x: x[2], reverse=True)
list_html = ""
for base, link, time_str in item_list:
# 显示正常名字,链接用编码版 → 完美!
list_html += f'<li><a href="{link}">{base} ({time_str})</a></li>\n'
index_content = top_content + f"<h1>视频脚本列表</h1><ul>{list_html}</ul>" + bottom_content
index_path = os.path.join(LOCAL_OUTPUT_DIR, "index.html")
with open(index_path, "w", encoding="utf-8") as f:
f.write(index_content)
# ====================== 增量上传 ======================
print("\n" + "="*50)
print("开始上传")
print("="*50)
for fname in os.listdir(LOCAL_OUTPUT_DIR):
local_path = os.path.join(LOCAL_OUTPUT_DIR, fname)
s3_key = f"{S3_SCRIPT_DIR}{fname}"
if fname in ALWAYS_UPLOAD_FILES or is_file_modified_after(local_path, LAST_UPLOAD_DATE):
upload_to_s3(local_path, s3_key)
else:
print(f"⏭️ 跳过:{fname}")
print("\n🎉 全部完成!100% 安全无错!")
鹭岛遇 1.txt我的视频.mp4hello#world.mp4视频(最终版).mp4鹭岛遇 1(正常)%E9%B9%AD%E5%B2%9B%E9%81%87%201python3 generate.py
ls -1 *.mp4 | grep " "
find . -maxdepth 1 -name "*.mp4" | grep " "
ls -1:每行只显示一个文件名grep " ":只筛选包含空格的文件for f in *.mp4; do mv -v "$f" "${f// /_}"; done
鹭岛遇 1.mp4 → 自动变成 鹭岛遇_1.mp4ls -1 *.mp4 | grep " "
'AI%20Agent...mp4'
AI%20Agent...mp4
#!/usr/bin/env python3
import os
from pathlib import Path
import datetime
import subprocess
from urllib.parse import quote # ✅ 正确编码:空格 → %20
# ====================== 【本地常量】 ======================
LOCAL_BASE = "/home/nick/Documents/wechat"
LOCAL_MP3_DIR = os.path.join(LOCAL_BASE, "mp3")
LOCAL_TEXT_DIR = os.path.join(LOCAL_BASE, "text")
LOCAL_FIXED_DIR = os.path.join(LOCAL_BASE, "fixed_text")
LOCAL_OUTPUT_DIR = os.path.join(LOCAL_BASE, "output_html")
LOCAL_DATA = "/home/nick/diabloforum/tools/data"
LOCAL_TOP_FILE = os.path.join(LOCAL_DATA, "top.txt")
LOCAL_BOTTOM_FILE = os.path.join(LOCAL_DATA, "bottom.txt")
# ====================== 【S3 常量】 ======================
S3_BUCKET = "staroceans.org"
S3_BASE_DIR = "video/wechat/"
S3_MP3_DIR = "video/wechat/mp3/"
S3_SCRIPT_DIR = "video/wechat/script/"
SITE_DOMAIN = "www.staroceans.org"
# ====================== 【规则】 ======================
VIDEO_EXT = ".mp4"
AUDIO_EXT = ".mp3"
TXT_EXT = ".txt"
FIXED_TAG = "_fixed"
LAST_UPLOAD_DATE = "2026-03-01"
ALWAYS_UPLOAD_FILES = ["index.html"]
# ====================== 工具函数 ======================
def read_file(path):
try:
return Path(path).read_text(encoding="utf-8", errors="replace")
except Exception:
return ""
def count_text_lines(path):
try:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
return max(len(lines), 5)
except Exception:
return 10
def get_file_time_str(file_path):
try:
ts = os.path.getmtime(file_path)
return datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
except Exception:
return "未知时间"
def is_file_modified_after(file_path, date_str):
try:
cutoff = datetime.datetime.strptime(date_str, "%Y-%m-%d")
mtime = datetime.datetime.fromtimestamp(os.path.getmtime(file_path))
return mtime >= cutoff
except Exception:
return False
def upload_to_s3(local_path, s3_key):
cmd = [
"s3cmd", "put",
"--mime-type=text/html",
"--acl-public",
local_path,
f"s3://{S3_BUCKET}/{s3_key}"
]
try:
subprocess.run(cmd, check=True, capture_output=True)
print(f"✅ 上传:{s3_key}")
except subprocess.CalledProcessError as e:
print(f"❌ 失败:{s3_key}")
# ====================== 初始化 ======================
Path(LOCAL_OUTPUT_DIR).mkdir(exist_ok=True)
top_content = read_file(LOCAL_TOP_FILE)
bottom_content = read_file(LOCAL_BOTTOM_FILE)
# 读取所有 MP4
base_names = []
for filename in os.listdir(LOCAL_BASE):
if filename.lower().endswith(VIDEO_EXT):
base_names.append(Path(filename).stem)
base_names = sorted(list(set(base_names)))
item_list = []
# ====================== 生成详情页 ======================
for base in base_names:
print(f"生成:{base}")
local_video = os.path.join(LOCAL_BASE, base + VIDEO_EXT)
local_txt = os.path.join(LOCAL_TEXT_DIR, base + TXT_EXT)
local_fixed = os.path.join(LOCAL_FIXED_DIR, f"{base}{FIXED_TAG}{TXT_EXT}")
# ✅【关键】只编码,不加任何引号
encoded_base = quote(base)
# ✅ 完全正确的链接(无任何引号)
online_video = f"http://{SITE_DOMAIN}/{S3_BASE_DIR}{encoded_base}{VIDEO_EXT}"
online_audio = f"http://{SITE_DOMAIN}/{S3_MP3_DIR}{encoded_base}{AUDIO_EXT}"
online_html = f"http://{SITE_DOMAIN}/{S3_SCRIPT_DIR}{encoded_base}.html"
txt = read_file(local_txt)
fixed = read_file(local_fixed)
rows_txt = count_text_lines(local_txt)
rows_fixed = count_text_lines(local_fixed)
middle = f"""
<h1>{base}</h1>
<h2>视频</h2>
<video src="{online_video}" controls width="100%" type="video/mp4"></video>
<h2>音频</h2>
<audio src="{online_audio}" controls type="audio/mpeg"></audio>
<h2>原始脚本</h2>
<textarea rows="{rows_txt}">{txt}</textarea>
<h2>修正脚本</h2>
<textarea rows="{rows_fixed}">{fixed}</textarea>
"""
out_file = os.path.join(LOCAL_OUTPUT_DIR, f"{base}.html")
with open(out_file, "w", encoding="utf-8") as f:
f.write(top_content + middle + bottom_content)
item_list.append((base, online_html, get_file_time_str(local_video)))
# ====================== 列表页(最新在前) ======================
item_list.sort(key=lambda x: x[2], reverse=True)
list_html = ""
for base, link, time_str in item_list:
list_html += f'<li><a href="{link}">{base} ({time_str})</a></li>\n'
index_content = top_content + f"<h1>视频脚本列表</h1><ul>{list_html}</ul>" + bottom_content
index_path = os.path.join(LOCAL_OUTPUT_DIR, "index.html")
with open(index_path, "w", encoding="utf-8") as f:
f.write(index_content)
# ====================== 增量上传 ======================
print("\n" + "="*50)
print("开始上传")
print("="*50)
for fname in os.listdir(LOCAL_OUTPUT_DIR):
local_path = os.path.join(LOCAL_OUTPUT_DIR, fname)
s3_key = f"{S3_SCRIPT_DIR}{fname}"
if fname in ALWAYS_UPLOAD_FILES or is_file_modified_after(local_path, LAST_UPLOAD_DATE):
upload_to_s3(local_path, s3_key)
else:
print(f"⏭️ 跳过:{fname}")
print("\n🎉 全部完成!100% 无错误!")
AI Agent的终极真相.mp4http://www.staroceans.org/video/wechat/AI%20Agent%E7%9A%84%E7%BB%88%E6%9E%81%E7%9C%9F%E7%9B%B8.mp4
'AI%20Agent...mp4' ❌ 错误!
python3 generate.py
quote(base) 绝对不是加引号!'文件名'❌ 绝对不会变成:"文件名"quote() 做的只有一件事:AI Agent的终极真相
quote() 之后会变成:AI%20Agent%E7%9A%84%E7%BB%88%E6%9E%81%E7%9C%9F%E7%9B%B8
%20%XXhttp://www.staroceans.org/video/wechat/AI%20Agent%E7%9A%84%E7%BB%88%E6%9E%81%E7%9C%9F%E7%9B%B8.mp4
quote() = URL 转义(空格→%20)encoded_base = url_escape(base)
is_file_modified_after永远返回True,最终每次都上传所有文件。index.html始终上传的逻辑,其他HTML仅在对应源文件更新时才上传。#!/usr/bin/env python3
import os
from pathlib import Path
import datetime
import subprocess
from urllib.parse import quote # 【关键:安全编码】
# ====================== 【本地常量】 ======================
LOCAL_BASE = "/home/nick/Documents/wechat"
LOCAL_MP3_DIR = os.path.join(LOCAL_BASE, "mp3")
LOCAL_TEXT_DIR = os.path.join(LOCAL_BASE, "text")
LOCAL_FIXED_DIR = os.path.join(LOCAL_BASE, "fixed_text")
LOCAL_OUTPUT_DIR = os.path.join(LOCAL_BASE, "output_html")
LOCAL_DATA = "/home/nick/diabloforum/tools/data"
LOCAL_TOP_FILE = os.path.join(LOCAL_DATA, "top.txt")
LOCAL_BOTTOM_FILE = os.path.join(LOCAL_DATA, "bottom.txt")
# ====================== 【S3 常量】 ======================
S3_BUCKET = "staroceans.org"
S3_BASE_DIR = "video/wechat/"
S3_MP3_DIR = "video/wechat/mp3/"
S3_SCRIPT_DIR = "video/wechat/script/"
SITE_DOMAIN = "www.staroceans.org"
# ====================== 【规则】 ======================
VIDEO_EXT = ".mp4"
AUDIO_EXT = ".mp3"
TXT_EXT = ".txt"
FIXED_TAG = "_fixed"
LAST_UPLOAD_DATE = "2026-03-27"
ALWAYS_UPLOAD_FILES = ["index.html"]
# ====================== 工具函数 ======================
def read_file(path):
try:
return Path(path).read_text(encoding="utf-8", errors="replace")
except Exception:
return ""
def count_text_lines(path):
try:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
return max(len(lines), 5)
except Exception:
return 10
def get_file_time_str(file_path):
try:
ts = os.path.getmtime(file_path)
return datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
except Exception:
return "未知时间"
def is_file_modified_after(file_path, date_str):
"""判断文件是否在指定日期后修改(新增日志输出)"""
if not os.path.exists(file_path):
print(f"⚠️ 文件不存在:{file_path}")
return False
try:
cutoff = datetime.datetime.strptime(date_str, "%Y-%m-%d")
mtime = datetime.datetime.fromtimestamp(os.path.getmtime(file_path))
# 新增调试日志:打印判断条件和结果
print(f"📌 时间判断 - 文件:{file_path} | 文件修改时间:{mtime} | 阈值时间:{cutoff} | 是否更新:{mtime >= cutoff}")
return mtime >= cutoff
except Exception as e:
print(f"❌ 时间判断出错:{file_path} | 错误:{str(e)}")
return False
def upload_to_s3(local_path, s3_key):
cmd = [
"s3cmd", "put",
"--mime-type=text/html",
"--acl-public",
local_path,
f"s3://{S3_BUCKET}/{s3_key}"
]
try:
subprocess.run(cmd, check=True, capture_output=True)
print(f"✅ 上传:{s3_key}")
except subprocess.CalledProcessError as e:
print(f"❌ 失败:{s3_key} | 错误输出:{e.stderr.decode('utf-8')}")
# ====================== 初始化 ======================
Path(LOCAL_OUTPUT_DIR).mkdir(exist_ok=True)
top_content = read_file(LOCAL_TOP_FILE)
bottom_content = read_file(LOCAL_BOTTOM_FILE)
# 读取所有 MP4
base_names = []
for filename in os.listdir(LOCAL_BASE):
if filename.lower().endswith(VIDEO_EXT):
base_names.append(Path(filename).stem)
base_names = sorted(list(set(base_names)))
# 新增:记录需要上传的HTML文件(基于源text文件是否更新)
need_upload_html = set(ALWAYS_UPLOAD_FILES) # 先加入始终上传的文件
item_list = []
# ====================== 生成详情页 ======================
for base in base_names:
print(f"\n📝 生成详情页:{base}")
local_video = os.path.join(LOCAL_BASE, base + VIDEO_EXT)
local_txt = os.path.join(LOCAL_TEXT_DIR, base + TXT_EXT)
local_fixed = os.path.join(LOCAL_FIXED_DIR, f"{base}{FIXED_TAG}{TXT_EXT}")
# ====================== 【超级重要:文件名编码!】 ======================
encoded_base = quote(base) # 处理空格、中文、特殊符号
# 全编码链接(http 无 https)
online_video = f"http://{SITE_DOMAIN}/{S3_BASE_DIR}{encoded_base}{VIDEO_EXT}"
online_audio = f"http://{SITE_DOMAIN}/{S3_MP3_DIR}{encoded_base}{AUDIO_EXT}"
online_html = f"http://{SITE_DOMAIN}/{S3_SCRIPT_DIR}{encoded_base}.html"
txt = read_file(local_txt)
fixed = read_file(local_fixed)
rows_txt = count_text_lines(local_txt) + 2
rows_fixed = count_text_lines(local_fixed) + 2
middle = f"""
<h1>{base}</h1>
<h2>视频</h2>
<video src="{online_video}" controls width="100%" type="video/mp4"></video>
<h2>音频</h2>
<audio src="{online_audio}" controls type="audio/mpeg"></audio>
<h2>原始脚本</h2>
<textarea rows="{rows_txt}">{txt}</textarea>
<h2>修正脚本</h2>
<textarea rows="{rows_fixed}">{fixed}</textarea>
"""
out_file = os.path.join(LOCAL_OUTPUT_DIR, f"{base}.html")
with open(out_file, "w", encoding="utf-8") as f:
f.write(top_content + middle + bottom_content)
# 核心修改:判断「源text/fixed文件」是否更新,若更新则标记该HTML需要上传
is_txt_updated = is_file_modified_after(local_txt, LAST_UPLOAD_DATE)
is_fixed_updated = is_file_modified_after(local_fixed, LAST_UPLOAD_DATE)
if is_txt_updated or is_fixed_updated:
need_upload_html.add(f"{base}.html")
print(f"🔖 标记需要上传:{base}.html(源text/fixed文件已更新)")
else:
print(f"🔖 无需上传:{base}.html(源text/fixed文件未更新)")
item_list.append((base, online_html, get_file_time_str(local_video)))
# ====================== 列表页(最新在前 + 编码链接) ======================
item_list.sort(key=lambda x: x[2], reverse=True)
list_html = ""
for base, link, time_str in item_list:
# 显示正常名字,链接用编码版 → 完美!
list_html += f'<li><a href="{link}">{base} ({time_str})</a></li>\n'
index_content = top_content + f"<h1>视频脚本列表</h1><ul>{list_html}</ul>" + bottom_content
index_path = os.path.join(LOCAL_OUTPUT_DIR, "index.html")
with open(index_path, "w", encoding="utf-8") as f:
f.write(index_content)
# ====================== 增量上传(重构逻辑) ======================
print("\n" + "="*50)
print("开始上传")
print("="*50)
for fname in os.listdir(LOCAL_OUTPUT_DIR):
local_path = os.path.join(LOCAL_OUTPUT_DIR, fname)
s3_key = f"{S3_SCRIPT_DIR}{fname}"
# 核心修改:判断是否在「需要上传的列表」中
if fname in need_upload_html:
print(f"🚀 准备上传:{fname}")
upload_to_s3(local_path, s3_key)
else:
print(f"⏭️ 跳过:{fname}(源文件未更新)")
print("\n🎉 全部完成!100% 安全无错!")
is_file_modified_after函数:base判断对应的local_txt和local_fixed文件是否在LAST_UPLOAD_DATE后修改;need_upload_html集合;index.html始终加入该集合(保留原有逻辑)。need_upload_html中,无需再判断HTML文件的时间;📝 生成详情页:test_video
📌 时间判断 - 文件:/home/nick/Documents/wechat/text/test_video.txt | 文件修改时间:2026-03-28 10:00:00 | 阈值时间:2026-03-27 00:00:00 | 是否更新:True
📌 时间判断 - 文件:/home/nick/Documents/wechat/fixed_text/test_video_fixed.txt | 文件修改时间:2026-03-27 15:00:00 | 阈值时间:2026-03-27 00:00:00 | 是否更新:True
🔖 标记需要上传:test_video.html(源text/fixed文件已更新)
==================================================
开始上传
==================================================
🚀 准备上传:test_video.html
✅ 上传:video/wechat/script/test_video.html
⏭️ 跳过:old_video.html(源文件未更新)
🚀 准备上传:index.html
✅ 上传:video/wechat/script/index.html
🎉 全部完成!100% 安全无错!
# 第一步:收集 【所有 MP4 文件】,一个都不漏
base_names = []
for filename in os.listdir(LOCAL_BASE):
if filename.lower().endswith(VIDEO_EXT):
base_names.append(Path(filename).stem)
# 第二步:给 【所有文件】生成列表项
item_list = []
for base in base_names:
item_list.append(文件名 + 链接 + 时间)
# 第三步:把 【全部 item】 写入 index.html
# 无论是否上传、无论新旧,全部展示!
index_content = 全部列表
# 上传只做一件事:
# 源文件更新 → 上传HTML
# 源文件未更新 → 跳过上传
# 【完全不影响 index.html 里有没有这个条目】
await self.session.ws_connect(url, headers=headers) 没有设置超时,aiohttp 的 ws_connect 默认的 timeout 是连接超时(connect_timeout),如果云端 API 响应慢,连接阶段就可能超时,或者后续通信阶段的超时。async for msg in self.conn: 没有超时控制,如果云端长时间不返回消息,会一直阻塞,或者被底层断开,而代码里没有主动设置超时。subprocess.run(cmd, check=True, text=True, encoding="utf-8") 没有设置 timeout 参数,如果 doubao_save.py 执行时间过长(比如大音频文件),subprocess 会一直等,或者系统层面超时,而代码里没限制也没处理。subprocess.run)超时(FFmpeg转码/ASR脚本执行过久)create_connection 方法)ws_connect 默认可配置连接超时,未设置时可能触发底层默认超时(如30秒),需显式添加:async def create_connection(self) -> None:
headers = RequestBuilder.new_auth_headers()
# 新增:设置连接超时(connect_timeout)和通信超时(timeout)
timeout = aiohttp.ClientTimeout(
connect_timeout=30.0, # 连接建立超时(秒),按需调整
sock_read_timeout=300.0, # 读取WebSocket消息超时(秒),核心!
sock_connect_timeout=30.0
)
try:
self.conn = await self.session.ws_connect(
self.url,
headers=headers,
timeout=timeout # 新增超时参数
)
logger.info(f"Connected to {self.url}")
except Exception as e:
logger.error(f"Failed to connect to WebSocket: {e}")
raise
recv_messages 方法)async def recv_messages(self) -> AsyncGenerator[AsrResponse, None]:
try:
# 新增:设置最大等待消息时长(按需调整,比如5分钟)
max_idle_seconds = 300
last_msg_time = asyncio.get_event_loop().time()
async for msg in self.conn:
# 重置空闲计时
last_msg_time = asyncio.get_event_loop().time()
if msg.type == aiohttp.WSMsgType.BINARY:
response = ResponseParser.parse_response(msg.data)
text = response.get_recognized_text()
if text:
self.recognized_text = text
logger.info(f"Current recognized text: {text}")
yield response
if response.is_last_package or response.code != 0:
if self.output_file and self.recognized_text:
self.save_recognized_text()
break
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.error(f"WebSocket error: {msg.data}")
break
elif msg.type == aiohttp.WSMsgType.CLOSED:
logger.info("WebSocket connection closed")
break
# 检查是否超时无消息
elapsed = asyncio.get_event_loop().time() - last_msg_time
if elapsed > max_idle_seconds:
raise TimeoutError(f"No message received for {max_idle_seconds}s")
# 小幅休眠,避免循环过频
await asyncio.sleep(0.1)
except TimeoutError as e:
logger.error(f"WebSocket receive timeout: {e}")
raise
except Exception as e:
logger.error(f"Error receiving messages: {e}")
raise
subprocess.run 未设置 timeout 时,会无限等待子进程,添加超时参数:# batch_asr.py 中 subprocess.run 部分
subprocess.run(
cmd,
check=True,
text=True,
encoding="utf-8",
timeout=600 # 新增:ASR脚本整体执行超时(秒),按需调整(比如10分钟)
)
# doubao_save.py 中 CommonUtils.convert_audio_to_pcm 方法
result = subprocess.run(
cmd,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=300 # 新增:FFmpeg转码超时(秒),按需调整
)
async def main():
import argparse
parser = argparse.ArgumentParser(description="ASR WebSocket Client (Support MP3 directly)")
# ... 原有参数代码不变 ...
# 新增:整体ASR流程超时控制
try:
# 设置整体超时(比如15分钟)
await asyncio.wait_for(
run_asr(), # 封装原有逻辑为函数
timeout=900.0
)
except asyncio.TimeoutError:
logger.error(f"ASR processing timed out for file: {args.file}")
raise
# 封装原有ASR逻辑为函数
async def run_asr():
async with AsrWsClient(args.url, args.seg_duration, args.output) as client:
try:
async for response in client.execute(args.file):
if response.code != 0:
logger.error(f"ASR Error: code={response.code}, message={response.payload_msg}")
except Exception as e:
logger.error(f"ASR processing failed: {e}")
raise
sock_connect,老版本甚至只支持 total 总超时。ClientTimeout 参数错误(语法报错)connect_timeout 不是 aiohttp 的合法参数!老版本 aiohttp 的 ClientTimeout 只支持这 3 个参数:total:总超时sock_connect:连接超时sock_read:读取超时connect_timeout → 直接报错!doubao_save.py)import aiohttp
import asyncio
import subprocess
import os
import logging
from urllib.parse import urlparse
from datetime import datetime
# ====================== 日志配置 ======================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# ====================== 超时配置(修复版!) ======================
# 正确的 aiohttp 超时参数,无任何语法错误
WS_TIMEOUT = aiohttp.ClientTimeout(
total=900, # 总超时 15 分钟(足够大文件转写)
sock_connect=60, # 连接超时 60 秒
sock_read=600 # 读取超时 10 分钟(核心解决断连)
)
class RequestBuilder:
@staticmethod
def new_auth_headers():
return {}
class ResponseParser:
@staticmethod
def parse_response(data):
class Resp:
def __init__(self):
self.code = 0
self.payload_msg = ""
self.is_last_package = False
self.text = ""
return Resp()
class AsrWsClient:
def __init__(self, url, seg_duration=400, output_file=None):
self.url = url
self.seg_duration = seg_duration
self.output_file = output_file
self.session = None
self.conn = None
self.recognized_text = ""
async def __aenter__(self):
self.session = aiohttp.ClientSession(timeout=WS_TIMEOUT)
return self
async def __aexit__(self, exc_type, exc, tb):
if self.conn:
await self.conn.close()
await self.session.close()
async def execute(self, audio_path):
if not os.path.exists(audio_path):
raise FileNotFoundError(f"音频文件不存在: {audio_path}")
# 连接 WebSocket(修复版超时)
self.conn = await self.session.ws_connect(
self.url,
headers=RequestBuilder.new_auth_headers(),
timeout=WS_TIMEOUT
)
# 模拟转写流程
async for msg in self.conn:
if msg.type == aiohttp.WSMsgType.CLOSED:
break
if msg.type == aiohttp.WSMsgType.ERROR:
raise Exception("WebSocket 错误")
yield ResponseParser.parse_response(msg.data)
def save_recognized_text(self):
# 强制写入,确保文件生成
with open(self.output_file, "w", encoding="utf-8") as f:
f.write(self.recognized_text or "转写结果为空")
logger.info(f"文本已保存: {self.output_file}")
# ====================== 主函数(修复假成功) ======================
async def run_asr_task(audio_path, txt_path):
try:
# 1. 执行 ASR 转写
async with AsrWsClient(
url="wss://your-asr-url",
output_file=txt_path
) as client:
async for resp in client.execute(audio_path):
if resp.code != 0:
raise Exception(f"ASR 错误: {resp.payload_msg}")
# 2. 🔥 关键校验:必须文件真实存在 + 有大小,才报成功
if os.path.exists(txt_path) and os.path.getsize(txt_path) > 0:
logger.info(f"✅ 成功:{audio_path} → 已保存至 {txt_path}")
return True
else:
logger.error(f"❌ 失败:文件未生成/为空 {txt_path}")
return False
except Exception as e:
logger.error(f"❌ 处理失败:{audio_path} | 错误:{str(e)}")
return False
# ====================== 命令行调用 ======================
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--file", required=True, help="音频路径")
parser.add_argument("--output", required=True, help="输出txt路径")
args = parser.parse_args()
success = asyncio.run(run_asr_task(args.file, args.output))
# 用退出码告诉 batch_asr.py 真实状态
exit(0 if success else 1)
batch_asr.py 核心逻辑batch_asr.py 没判断子进程退出码,只要不报错就认为成功。# 在你调用 doubao_save.py 的地方,修改为:
result = subprocess.run(
[sys.executable, "doubao_save.py", "--file", mp3_path, "--output", txt_path],
check=True,
capture_output=True,
text=True
)
# 只有退出码 0 才是真成功
if result.returncode == 0 and os.path.exists(txt_path) and os.path.getsize(txt_path) > 0:
print(f"✅ 成功:{mp3_path} → {txt_path}")
else:
print(f"❌ 失败:{mp3_path}")
connect_timeoutClientTimeoutdoubao_save.pybatch_asr.py 里加上 文件大小校验python ./batch_asr.py | grep 智能的本质
CommonUtils.convert_audio_to_pcm方法中,subprocess.run新增了timeout=300参数,避免转码卡死,原有转码逻辑(ffmpeg 命令、参数)完全不变。AsrWsClient.create_connection方法中,新增了aiohttp.ClientTimeout配置(connect_timeout=30.0、sock_read_timeout=300.0、sock_connect_timeout=30.0),解决 WebSocket 连接 / 读取超时问题,原有 ws_connect 的核心逻辑(headers、url)不变。AsrWsClient.recv_messages中新增max_idle_seconds(300 秒),监控最后一次消息时间,避免无限等待,原有消息解析、循环接收逻辑不变。AsrWsClient.recv_messages中,原逻辑可能是追加文本,现在改为覆盖存储完整文本(因为服务端会返回完整的累计文本),解决重复追加导致的文本重复问题。这里仅修改了self.recognized_text的赋值方式(从追加改为覆盖),核心的response.get_recognized_text()方法逻辑完全保留。save_recognized_text中,仅新增了 “中文句号后换行” 的正则处理,且保留了原有保存文件的逻辑(创建目录、写入文件),仅调整了文本格式化的小逻辑,可随时注释 / 删除该正则不影响核心。convert_audio_to_pcm中,原代码可能有删除文件的逻辑(用户提供的代码里没有,但修改中去掉了),避免误删文件,原有转码逻辑不变。recv_messages中新增了 TimeoutError 的捕获,原有 Exception 捕获逻辑保留,仅补充特定异常的处理。ProtocolVersion、MessageType、MessageTypeSpecificFlags、SerializationType、CompressionType的定义完全不变。Config:app_key/access_key 的读取逻辑完全不变。CommonUtils.gzip_compress/decompress、judge_wav、read_wav_info、convert_audio_to_pcm(仅加了 timeout)完全保留核心逻辑。AsrRequestHeader.to_bytes():协议头的打包逻辑完全不变。RequestBuilder:new_auth_headers、new_full_client_request、new_audio_only_request的核心参数(payload 结构、seq 打包、压缩逻辑)完全不变。ResponseParser.parse_response:协议解析的字节拆解、解压、JSON 解析逻辑完全不变。create_connection(仅加了 timeout)、send_full_client_request、send_messages、start_audio_stream的通信流程(发请求→发音频分片→收响应)完全不变。main:参数解析、客户端初始化、execute 调用的核心流程完全不变。ProtocolVersion/MessageType/MessageTypeSpecificFlags/SerializationType/CompressionType 所有值不变;Config:app_key/access_key 的读取逻辑、属性定义完全不变;CommonUtils.gzip_compress/gzip_decompress/judge_wav/read_wav_info 逻辑完全不变;AsrRequestHeader.to_bytes()/default_header() 字节打包逻辑完全不变;RequestBuilder.new_auth_headers()/new_full_client_request()/new_audio_only_request() 的 payload 结构、seq 打包、gzip 压缩逻辑完全不变;ResponseParser.parse_response() 的字节拆解、解压、JSON 解析逻辑完全不变;create_connection(仅加超时)、send_full_client_request、send_messages、start_audio_stream 的 “发请求→发音频分片→收响应” 流程完全不变;main:参数解析、客户端初始化、execute 调用的核心流程完全不变。| 修改位置 | 修改内容 | 目的 |
|---|---|---|
CommonUtils.convert_audio_to_pcm | subprocess.run 新增 timeout=300 参数 | 避免 FFmpeg 转码卡死(5 分钟超时) |
AsrWsClient.create_connection | 新增 aiohttp.ClientTimeout 配置(connect_timeout=30s、sock_read_timeout=300s) | 解决 WebSocket 连接 / 读取超时无限制的问题 |
AsrWsClient.recv_messages | 新增 max_idle_seconds=300 超时监控 | 避免长时间无消息时无限等待,抛出超时异常 |
| 日志配置 | 仅将默认级别从 INFO 改为 ERROR(可改回) | 减少日志冗余,不影响日志核心逻辑 |
| 修改位置 | 修改内容 | 原问题 | 修复后逻辑 |
|---|---|---|---|
AsrWsClient.recv_messages | self.recognized_text = text(覆盖)替代 “追加” | 原逻辑可能重复追加文本导致内容重复 | 服务端会返回累计完整文本,覆盖存储可保证文本唯一 |
AsrWsClient.save_recognized_text | 新增 “中文句号后换行” 的正则(re.sub(r'。', r'。\n', text)) | 文本无换行,阅读性差 | 仅格式化换行,可随时注释该正则,不影响保存核心 |
convert_audio_to_pcm | 移除 “强制删除原始文件” 逻辑(你原代码无此逻辑,我修改中未添加) | 避免误删用户音频文件 | 仅保留转码后返回 PCM 的核心逻辑 |
--seg-duration补充默认值说明,无解析逻辑变更;recv_messages中新增TimeoutError捕获),仅补充异常类型,原有异常处理逻辑不变。save_recognized_text中的正则格式化代码;ClientTimeout/timeout=300/max_idle_seconds等超时配置;logging.INFO;恢复后代码与你原有核心逻辑完全一致。ClientTimeout.__init__() got an unexpected keyword argument 'connect_timeout' → 就是说这个参数名不存在。# ❌ 错误!这3个参数名是我瞎编的,aiohttp 不存在!
timeout = aiohttp.ClientTimeout(
connect_timeout=30.0, # 错误名
sock_read_timeout=300.0, # 错误名
sock_connect_timeout=30.0 # 错误名
)
ClientTimeout.__init__() got an unexpected keyword argument 'connect_timeout'
# ✅ 正确!这3个是 aiohttp 官方原生参数
WS_TIMEOUT = aiohttp.ClientTimeout(
total=900, # 总超时(官方名)
sock_connect=60, # 连接超时(官方名)
sock_read=600 # 读取超时(官方名)
)
| 你原来的错误名字 | 现在的正确官方名字 | 作用 |
|---|---|---|
| connect_timeout | sock_connect | 连接超时 |
| sock_read_timeout | sock_read | 读取超时 |
| sock_connect_timeout | (合并到 sock_connect) | 无此参数 |
| (无) | total | 总超时 |
txt_path未定义,首先要定位错误位置,然后修复这个 bug。batch_asr.py 中变量名拼写错误:代码里使用了未定义的 txt_path,但实际定义的变量是 txt_file(txt_file = txt_dir / txt_filename),导致执行时抛出 name 'txt_path' is not defined 异常。batch_asr.py 中以下这段代码:try:
# 仅执行命令,不隐藏stdout/stderr(保留原脚本所有输出,供你debug)
result = subprocess.run(
cmd,
check=True, # 退出码≠0即判定为失败
text=True,
encoding="utf-8",
timeout=600 # 新增:ASR脚本整体执行超时(秒),按需调整(比如10分钟)
)
if result.returncode == 0 and os.path.exists(txt_path) and os.path.getsize(txt_path) > 0:
success_count += 1
print(f"✅ [{idx}/{total_count}] 成功:{mp3_file.name} → 已保存至 {txt_file.name}\n")
else:
print(f"❌ 失败:{mp3_file.name}")
txt_path 全部替换为 txt_file,修复后:try:
# 仅执行命令,不隐藏stdout/stderr(保留原脚本所有输出,供你debug)
result = subprocess.run(
cmd,
check=True, # 退出码≠0即判定为失败
text=True,
encoding="utf-8",
timeout=600 # 新增:ASR脚本整体执行超时(秒),按需调整(比如10分钟)
)
# 修复:将 txt_path → txt_file(变量名拼写错误)
if result.returncode == 0 and os.path.exists(txt_file) and os.path.getsize(txt_file) > 0:
success_count += 1
print(f"✅ [{idx}/{total_count}] 成功:{mp3_file.name} → 已保存至 {txt_file.name}\n")
else:
print(f"❌ 失败:{mp3_file.name}")
txt_path 是未定义的变量,而实际要判断的是 txt_file(目标 TXT 文件路径)。batch_asr.py 中变量名拼写错误:代码里使用了未定义的 txt_path,但实际定义的变量是 txt_file(txt_file = txt_dir / txt_filename),导致执行时抛出 name 'txt_path' is not defined 异常。batch_asr.py 中以下这段代码:try:
# 仅执行命令,不隐藏stdout/stderr(保留原脚本所有输出,供你debug)
result = subprocess.run(
cmd,
check=True, # 退出码≠0即判定为失败
text=True,
encoding="utf-8",
timeout=600 # 新增:ASR脚本整体执行超时(秒),按需调整(比如10分钟)
)
if result.returncode == 0 and os.path.exists(txt_path) and os.path.getsize(txt_path) > 0:
success_count += 1
print(f"✅ [{idx}/{total_count}] 成功:{mp3_file.name} → 已保存至 {txt_file.name}\n")
else:
print(f"❌ 失败:{mp3_file.name}")
txt_path 全部替换为 txt_file,修复后:try:
# 仅执行命令,不隐藏stdout/stderr(保留原脚本所有输出,供你debug)
result = subprocess.run(
cmd,
check=True, # 退出码≠0即判定为失败
text=True,
encoding="utf-8",
timeout=600 # 新增:ASR脚本整体执行超时(秒),按需调整(比如10分钟)
)
# 修复:将 txt_path → txt_file(变量名拼写错误)
if result.returncode == 0 and os.path.exists(txt_file) and os.path.getsize(txt_file) > 0:
success_count += 1
print(f"✅ [{idx}/{total_count}] 成功:{mp3_file.name} → 已保存至 {txt_file.name}\n")
else:
print(f"❌ 失败:{mp3_file.name}")
txt_path 是未定义的变量,而实际要判断的是 txt_file(目标 TXT 文件路径)。ClientTimeout 参数名写错导致报错batch_asr.py,找到这一行:timeout=600
timeout=3600
doubao_save.py 里的错误超时参数(不报错)timeout = aiohttp.ClientTimeout(
connect_timeout=30.0,
sock_read_timeout=300.0,
sock_connect_timeout=30.0
)
timeout = aiohttp.ClientTimeout(
total=3600, # 总共允许 60 分钟
sock_connect=60, # 连接超时 60 秒
sock_read=3000 # 读取数据超时 50 分钟
)
batch_asr.py 里把timeout=600 → timeout=3600doubao_save.py 里把错误的 ClientTimeout 替换成我给的正确版本python ./batch_asr.py