我想改进一下这个运行,就是每次运行之后把运行的时间存在一个文件里,然后下次运行读取这个时间,或者直接在命令行里得到这个时间参数决定在所有的文件中,只有文件的创立时间在这个之后才运行整个的操作。
保存时间:2026/4/3 15:46:37
# 添加命令行参数处理
import argparse
# 添加配置参数
LAST_RUN_TIME_FILE = "/path/to/last_run_time.txt" # 保存上次运行时间的文件路径
def get_last_run_time(time_arg):
"""获取上次运行时间,优先使用命令行参数,其次使用文件中的时间,否则使用纪元时间"""
if time_arg:
# 解析命令行时间参数,假设格式为"%Y-%m-%d %H:%M:%S"
return datetime.strptime(time_arg, "%Y-%m-%d %H:%M:%S")
# 尝试从文件读取
if os.path.exists(LAST_RUN_TIME_FILE):
with open(LAST_RUN_TIME_FILE, 'r') as f:
time_str = f.read().strip()
return datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
# 默认返回纪元时间
return datetime.fromtimestamp(0)
def save_current_run_time():
"""保存当前运行时间到文件"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(LAST_RUN_TIME_FILE, 'w') as f:
f.write(current_time)
# 在main函数中添加命令行参数解析
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description='上传MP4文件到S3并生成MP3')
parser.add_argument('--since', help='只处理在此时间之后创建的文件,格式: "YYYY-MM-DD HH:MM:SS"')
args = parser.parse_args()
# 获取参考时间
reference_time = get_last_run_time(args.since)
print(f"只处理 {reference_time} 之后创建的文件")
# ... 其他代码 ...
# 获取并筛选MP4文件
mp4_files = []
for file in os.listdir(LOCAL_DIR):
if file.lower().endswith(".mp4"):
file_path = os.path.join(LOCAL_DIR, file)
# 获取文件创建时间
create_time = datetime.fromtimestamp(os.path.getctime(file_path))
# 只添加创建时间在参考时间之后的文件
if create_time > reference_time:
mp4_files.append(file_path)
# ... 处理文件 ...
# 保存当前运行时间
if not DRY_RUN: # 只有在非测试模式下才保存时间
save_current_run_time()
#!/usr/bin/env python3
import os
import subprocess
import re
import argparse
from datetime import datetime
# -------------------------- 1. 配置参数(必改) --------------------------
LOCAL_DIR = "/home/nick/.wine/drive_c/users/nick/Documents/wechat" # 本地视频文件夹
S3_BUCKET = "www.staroceans.org" # S3桶名
S3_DIR = "video/wechat/" # S3目标目录(末尾加/)
MP3_LOCAL_SUBDIR = "mp3" # 本地MP3子目录名
MP3_S3_SUBDIR = "mp3/" # S3上MP3子目录名(末尾加/)
LAST_RUN_TIME_FILE = "/home/nick/diabloforum/tools/data/last_run_time.txt" # 上次运行时间记录文件
# -------------------------- 2. 测试模式开关(选填) --------------------------
DRY_RUN = False # 测试时改True(只打印不执行),正式上传改False
# -------------------------- 3. 链接文件路径配置 --------------------------
LINK_FILE = "/home/nick/diabloforum/tools/data/s3_video_links.txt"
MP3_LINK_FILE = "/home/nick/diabloforum/tools/data/s3_audio_links.txt"
TOP_FILE = "/home/nick/diabloforum/tools/data/top.txt"
BOTTOM_FILE = "/home/nick/diabloforum/tools/data/bottom.txt"
VIDEO_FILE = "/home/nick/diabloforum/tools/data/video_list.html"
MP3_VIDEO_FILE = "/home/nick/diabloforum/tools/data/audio_list.html"
def safe_exec(cmd, shell=True, check=True):
"""安全执行命令,根据DRY_RUN决定是否实际执行"""
print(f"[{'DRY RUN' if DRY_RUN else '实际执行'}] {cmd}")
if not DRY_RUN:
result = subprocess.run(
cmd,
shell=shell,
check=check,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result
return None
def get_file_md5(file_path):
"""获取文件MD5值"""
result = safe_exec(f"md5sum {file_path}", check=True)
if result:
return result.stdout.split()[0]
return "模拟MD5" # 测试模式返回占位符
def s3_file_exists(s3_path):
"""检查S3上文件是否存在"""
result = safe_exec(f"s3cmd info {s3_path}", check=False)
if DRY_RUN:
return False # 测试模式默认不存在
return result.returncode == 0
def get_s3_md5(s3_path):
"""获取S3文件的MD5值"""
result = safe_exec(f"s3cmd info {s3_path}", check=True)
if result:
for line in result.stdout.splitlines():
if "MD5 sum" in line:
return line.split(": ")[1].strip()
return "模拟S3 MD5" # 测试模式返回占位符
def get_file_timestamp(file_path):
"""获取文件最后修改时间戳(DRY_RUN模式返回模拟时间)"""
if DRY_RUN:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
return datetime.fromtimestamp(os.path.getmtime(file_path)).strftime("%Y-%m-%d %H:%M:%S")
def natural_sort_key(filename):
"""自然排序键生成函数"""
name_without_ext = os.path.splitext(filename)[0]
match = re.match(r'(.*?)(\d+)$', name_without_ext)
if match:
main_part = match.group(1)
number_part = int(match.group(2))
else:
main_part = name_without_ext
number_part = 0
return (main_part, number_part)
def get_reference_time(time_arg):
"""获取参考时间:优先使用命令行参数,其次使用上次运行时间,否则使用纪元时间"""
# 命令行参数优先
if time_arg:
try:
return datetime.strptime(time_arg, "%Y-%m-%d %H:%M:%S")
except ValueError:
print(f"⚠️ 时间格式错误,使用默认格式: YYYY-MM-DD HH:MM:SS")
# 尝试读取上次运行时间
if os.path.exists(LAST_RUN_TIME_FILE):
try:
with open(LAST_RUN_TIME_FILE, 'r') as f:
time_str = f.read().strip()
return datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
except (ValueError, IOError):
print(f"⚠️ 读取上次运行时间失败,使用纪元时间")
# 默认返回纪元时间
return datetime.fromtimestamp(0)
def save_current_run_time():
"""保存当前运行时间到文件"""
if DRY_RUN:
print(f"[DRY RUN] 模拟保存当前时间到 {LAST_RUN_TIME_FILE}")
return
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
with open(LAST_RUN_TIME_FILE, 'w') as f:
f.write(current_time)
print(f"✅ 已保存当前运行时间: {current_time}")
except IOError as e:
print(f"⚠️ 保存运行时间失败: {e}")
def process_mp4(local_mp4):
"""处理MP4文件:检查S3存在性、上传、生成链接"""
filename = os.path.basename(local_mp4)
s3_mp4_path = f"s3://{S3_BUCKET}/{S3_DIR}{filename}"
s3_mp4_url = f"http://{S3_BUCKET}/{S3_DIR}{filename}"
print(f"\n=== 处理MP4文件:{filename} ===")
print(f"[步骤1] 检查S3是否存在: {s3_mp4_path}")
# 检查S3存在性
s3_exists = s3_file_exists(s3_mp4_path)
s3_md5_check = False # 可改为True启用MD5校验
# MD5校验逻辑
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp4)
s3_md5 = get_s3_md5(s3_mp4_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
# 上传文件
if not s3_exists:
print("[步骤2] 开始上传到S3")
safe_exec(
f"s3cmd put --mime-type=\"video/mp4\" --acl-public --continue --check-md5 "
f"'{local_mp4}' '{s3_mp4_path}'"
)
# 生成链接
timestamp = get_file_timestamp(local_mp4)
link_line = f"<li><a href=\"{s3_mp4_url}\">{filename} ({timestamp})</a></li>"
safe_exec(f"echo '{link_line}' >> {LINK_FILE}")
print("[步骤3] 已添加链接到列表")
def process_mp3(local_mp4):
"""处理MP3文件:生成、检查S3存在性、上传、生成链接"""
filename = os.path.basename(local_mp4)
filename_noext = os.path.splitext(filename)[0]
mp3_filename = f"{filename_noext}.mp3"
# 路径计算
local_mp3_dir = os.path.join(LOCAL_DIR, MP3_LOCAL_SUBDIR)
local_mp3 = os.path.join(local_mp3_dir, mp3_filename)
s3_mp3_path = f"s3://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}{mp3_filename}"
s3_mp3_url = f"http://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}{mp3_filename}"
print(f"\n=== 处理MP3文件:{mp3_filename} ===")
# 确保本地MP3目录存在
safe_exec(f"mkdir -p {local_mp3_dir}")
# 检查并生成本地MP3
print(f"[步骤1] 检查本地MP3: {local_mp3}")
if not os.path.exists(local_mp3) or DRY_RUN:
if not DRY_RUN and not os.path.exists(local_mp3):
print("[生成] 本地不存在,使用ffmpeg提取音频")
safe_exec(
f"ffmpeg -y -i '{local_mp4}' -vn -acodec libmp3lame '{local_mp3}'"
)
else:
print("[DRY RUN] 模拟生成MP3文件")
# 检查S3存在性
print(f"[步骤2] 检查S3是否存在: {s3_mp3_path}")
s3_exists = s3_file_exists(s3_mp3_path)
s3_md5_check = False # 可改为True启用MD5校验
# MD5校验逻辑
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp3)
s3_md5 = get_s3_md5(s3_mp3_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
# 上传文件
if not s3_exists:
print("[步骤3] 开始上传到S3")
safe_exec(
f"s3cmd put --mime-type=\"audio/mpeg\" --acl-public --continue --check-md5 "
f"'{local_mp3}' '{s3_mp3_path}'"
)
# 生成链接
timestamp = get_file_timestamp(local_mp4)
link_line = f"<li><a href=\"{s3_mp3_url}\">{mp3_filename} ({timestamp})</a></li>"
safe_exec(f"echo '{link_line}' >> {MP3_LINK_FILE}")
print("[步骤4] 已添加链接到列表")
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description='上传MP4文件到S3并生成MP3')
parser.add_argument('--since', help='只处理在此时间之后创建的文件,格式: "YYYY-MM-DD HH:MM:SS"')
args = parser.parse_args()
# 获取参考时间
reference_time = get_reference_time(args.since)
print(f"⏱️ 参考时间: {reference_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"📌 只处理此时间之后创建的文件")
# 初始化链接文件
print("========================================")
if DRY_RUN:
print("🔴 当前为DRY RUN测试模式,不会实际执行操作")
else:
print("🟢 当前为正式模式,将执行实际操作")
# 清空并初始化链接文件
safe_exec(f"> {LINK_FILE}")
safe_exec(f"echo '<ol>' > {LINK_FILE}")
safe_exec(f"> {MP3_LINK_FILE}")
safe_exec(f"echo '<ol>' > {MP3_LINK_FILE}")
print(f"本地目录: {LOCAL_DIR}")
print(f"本地MP3目录: {os.path.join(LOCAL_DIR, MP3_LOCAL_SUBDIR)}")
print(f"S3 MP4目标: s3://{S3_BUCKET}/{S3_DIR}")
print(f"S3 MP3目标: s3://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}")
print("========================================")
# 获取并筛选MP4文件
mp4_files = []
for file in os.listdir(LOCAL_DIR):
if file.lower().endswith(".mp4"):
file_path = os.path.join(LOCAL_DIR, file)
# 获取文件创建时间
create_time = datetime.fromtimestamp(os.path.getctime(file_path))
# 只添加创建时间在参考时间之后的文件
if create_time > reference_time:
mp4_files.append(file_path)
# 使用自然排序
mp4_files.sort(key=lambda x: natural_sort_key(os.path.basename(x)))
# DRY_RUN模式下打印排序后的所有文件名(调试用)
if DRY_RUN:
print("\n📋 按处理顺序排列的MP4文件列表:")
for idx, file in enumerate(mp4_files, 1):
print(f" {idx}. {os.path.basename(file)}")
print(f"\n📊 共发现 {len(mp4_files)} 个符合条件的MP4文件")
# 处理每个MP4文件
processed_count = 0
for mp4_file in mp4_files:
processed_count += 1
print(f"\n===== 开始处理第 {processed_count}/{len(mp4_files)} 个文件 =====")
process_mp4(mp4_file)
process_mp3(mp4_file)
print("----------------------------------------")
# 生成最终HTML文件
print("\n[生成HTML列表]")
# 处理MP4 HTML
safe_exec(f"echo '</ol>' >> {LINK_FILE}")
safe_exec(f"cat {TOP_FILE} > {VIDEO_FILE}")
safe_exec(f"cat {LINK_FILE} >> {VIDEO_FILE}")
safe_exec(f"cat {BOTTOM_FILE} >> {VIDEO_FILE}")
safe_exec(
f"s3cmd put --mime-type=\"text/html\" --acl-public --continue --check-md5 "
f"'{VIDEO_FILE}' 's3://{S3_BUCKET}/tools/data/video_list.html'"
)
# 处理MP3 HTML
safe_exec(f"echo '</ol>' >> {MP3_LINK_FILE}")
safe_exec(f"cat {TOP_FILE} > {MP3_VIDEO_FILE}")
safe_exec(f"cat {MP3_LINK_FILE} >> {MP3_VIDEO_FILE}")
safe_exec(f"cat {BOTTOM_FILE} >> {MP3_VIDEO_FILE}")
safe_exec(
f"s3cmd put --mime-type=\"text/html\" --acl-public --continue --check-md5 "
f"'{MP3_VIDEO_FILE}' 's3://{S3_BUCKET}/tools/data/audio_list.html'"
)
# 保存当前运行时间
save_current_run_time()
print("\n🎉 所有文件处理完成!")
print(f"📊 处理统计:共发现 {len(mp4_files)} 个符合条件的MP4文件,实际处理 {processed_count} 个")
if DRY_RUN:
print("🔴 注意:测试模式未执行实际操作,请核对排序是否符合预期")
else:
print(f"🟢 MP4链接列表: {LINK_FILE}")
print(f"🟢 MP3链接列表: {MP3_LINK_FILE}")
if __name__ == "__main__":
main()
LAST_RUN_TIME_FILE指定的文件中--since,可以手动指定参考时间,格式为 "YYYY-MM-DD HH:MM:SS"python3 uploader.py --since "2023-10-01 00:00:00"get_reference_time函数处理--since参数,但当时间格式错误时,只是打印警告,继续使用默认时间。用户希望这里更严格:如果参数不合法,直接退出并显示帮助信息。argparse设置了--since参数,但没有验证其格式。应该在解析后检查args.since的格式,如果无效,调用parser.error()或直接打印帮助并退出。main函数中,解析参数后,先检查args.since是否存在且格式正确。如果存在但格式错误,使用parser.print_help()显示帮助信息,然后退出程序,而不是继续执行。get_reference_time函数中,当time_arg格式错误时,只是打印警告,然后继续。现在需要将格式检查移到main函数中,一旦发现错误就退出,而不是返回默认时间。get_reference_time函数,假设传入的time_arg已经是合法的,或者在main中已经验证过。这样,在main中先验证--since参数的格式,如果无效则退出。sys.exit(1)来退出程序,表明有错误发生。--since "2023/10/01 00:00:00"(错误格式),脚本应打印帮助,说明正确格式,并退出,而不是继续处理。main函数中,解析--since参数后,检查其格式是否正确。get_reference_time函数,假设传入的time_arg是合法的,简化错误处理。#!/usr/bin/env python3
import os
import sys
import subprocess
import re
import argparse
from datetime import datetime
# -------------------------- 1. 配置参数(必改) --------------------------
LOCAL_DIR = "/home/nick/.wine/drive_c/users/nick/Documents/wechat" # 本地视频文件夹
S3_BUCKET = "www.staroceans.org" # S3桶名
S3_DIR = "video/wechat/" # S3目标目录(末尾加/)
MP3_LOCAL_SUBDIR = "mp3" # 本地MP3子目录名
MP3_S3_SUBDIR = "mp3/" # S3上MP3子目录名(末尾加/)
LAST_RUN_TIME_FILE = "/home/nick/diabloforum/tools/data/last_run_time.txt" # 上次运行时间记录文件
# -------------------------- 2. 测试模式开关(选填) --------------------------
DRY_RUN = False # 测试时改True(只打印不执行),正式上传改False
# -------------------------- 3. 链接文件路径配置 --------------------------
LINK_FILE = "/home/nick/diabloforum/tools/data/s3_video_links.txt"
MP3_LINK_FILE = "/home/nick/diabloforum/tools/data/s3_audio_links.txt"
TOP_FILE = "/home/nick/diabloforum/tools/data/top.txt"
BOTTOM_FILE = "/home/nick/diabloforum/tools/data/bottom.txt"
VIDEO_FILE = "/home/nick/diabloforum/tools/data/video_list.html"
MP3_VIDEO_FILE = "/home/nick/diabloforum/tools/data/audio_list.html"
def safe_exec(cmd, shell=True, check=True):
"""安全执行命令,根据DRY_RUN决定是否实际执行"""
print(f"[{'DRY RUN' if DRY_RUN else '实际执行'}] {cmd}")
if not DRY_RUN:
result = subprocess.run(
cmd,
shell=shell,
check=check,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result
return None
def get_file_md5(file_path):
"""获取文件MD5值"""
result = safe_exec(f"md5sum {file_path}", check=True)
if result:
return result.stdout.split()[0]
return "模拟MD5" # 测试模式返回占位符
def s3_file_exists(s3_path):
"""检查S3上文件是否存在"""
result = safe_exec(f"s3cmd info {s3_path}", check=False)
if DRY_RUN:
return False # 测试模式默认不存在
return result.returncode == 0
def get_s3_md5(s3_path):
"""获取S3文件的MD5值"""
result = safe_exec(f"s3cmd info {s3_path}", check=True)
if result:
for line in result.stdout.splitlines():
if "MD5 sum" in line:
return line.split(": ")[1].strip()
return "模拟S3 MD5" # 测试模式返回占位符
def get_file_timestamp(file_path):
"""获取文件最后修改时间戳(DRY_RUN模式返回模拟时间)"""
if DRY_RUN:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
return datetime.fromtimestamp(os.path.getmtime(file_path)).strftime("%Y-%m-%d %H:%M:%S")
def natural_sort_key(filename):
"""自然排序键生成函数"""
name_without_ext = os.path.splitext(filename)[0]
match = re.match(r'(.*?)(\d+)$', name_without_ext)
if match:
main_part = match.group(1)
number_part = int(match.group(2))
else:
main_part = name_without_ext
number_part = 0
return (main_part, number_part)
def get_reference_time(time_arg):
"""获取参考时间:优先使用命令行参数(已校验),其次使用上次运行时间,否则使用纪元时间"""
# 命令行参数已在main中校验过格式,直接使用
if time_arg:
return datetime.strptime(time_arg, "%Y-%m-%d %H:%M:%S")
# 尝试读取上次运行时间
if os.path.exists(LAST_RUN_TIME_FILE):
try:
with open(LAST_RUN_TIME_FILE, 'r') as f:
time_str = f.read().strip()
return datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
except (ValueError, IOError):
print(f"⚠️ 读取上次运行时间失败,使用纪元时间")
# 默认返回纪元时间
return datetime.fromtimestamp(0)
def save_current_run_time():
"""保存当前运行时间到文件"""
if DRY_RUN:
print(f"[DRY RUN] 模拟保存当前时间到 {LAST_RUN_TIME_FILE}")
return
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
with open(LAST_RUN_TIME_FILE, 'w') as f:
f.write(current_time)
print(f"✅ 已保存当前运行时间: {current_time}")
except IOError as e:
print(f"⚠️ 保存运行时间失败: {e}")
def process_mp4(local_mp4):
"""处理MP4文件:检查S3存在性、上传、生成链接"""
filename = os.path.basename(local_mp4)
s3_mp4_path = f"s3://{S3_BUCKET}/{S3_DIR}{filename}"
s3_mp4_url = f"http://{S3_BUCKET}/{S3_DIR}{filename}"
print(f"\n=== 处理MP4文件:{filename} ===")
print(f"[步骤1] 检查S3是否存在: {s3_mp4_path}")
# 检查S3存在性
s3_exists = s3_file_exists(s3_mp4_path)
s3_md5_check = False # 可改为True启用MD5校验
# MD5校验逻辑
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp4)
s3_md5 = get_s3_md5(s3_mp4_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
# 上传文件
if not s3_exists:
print
get_reference_time函数处理--since参数,但当时间格式错误时,只是打印警告,继续使用默认时间。用户希望这里更严格,不合法时打印帮助并退出。argparse可以添加自定义验证,但可能需要在解析后检查。在main函数中,解析args.since后,调用get_reference_time时,如果args.since存在但格式错误,应该触发错误处理。get_reference_time函数中,当time_arg存在且解析失败时,不只是打印警告,而是应该抛出异常或者返回一个错误状态,让main函数捕获并处理,打印帮助信息并退出。get_reference_time,当time_arg格式错误时,不再继续,而是提示错误并退出。可以使用argparse的error方法来打印帮助并退出,这样更符合命令行工具的习惯。main函数中,解析args后,调用get_reference_time,但如果args.since存在且格式错误,让get_reference_time抛出ValueError。get_reference_time中,当time_arg解析失败时,打印错误信息,然后调用parser.error()来显示帮助并退出,或者在main中捕获异常后处理。--since参数为空(合法)、格式正确(如 "2023-10-01 12:34:56")、格式错误(如 "2023/10/01"、"2023-13-01" 等),确保错误时正确退出并显示帮助。get_reference_time应该在time_arg格式错误时,明确提示并退出。可能需要将parser传递给get_reference_time,以便调用parser.error(),或者在main中处理错误。#!/usr/bin/env python3
import os
import subprocess
import re
import argparse
from datetime import datetime
# -------------------------- 1. 配置参数(必改) --------------------------
LOCAL_DIR = "/home/nick/.wine/drive_c/users/nick/Documents/wechat" # 本地视频文件夹
S3_BUCKET = "www.staroceans.org" # S3桶名
S3_DIR = "video/wechat/" # S3目标目录(末尾加/)
MP3_LOCAL_SUBDIR = "mp3" # 本地MP3子目录名
MP3_S3_SUBDIR = "mp3/" # S3上MP3子目录名(末尾加/)
LAST_RUN_TIME_FILE = "/home/nick/diabloforum/tools/data/last_run_time.txt" # 上次运行时间记录文件
# -------------------------- 2. 测试模式开关(选填) --------------------------
DRY_RUN = False # 测试时改True(只打印不执行),正式上传改False
# -------------------------- 3. 链接文件路径配置 --------------------------
LINK_FILE = "/home/nick/diabloforum/tools/data/s3_video_links.txt"
MP3_LINK_FILE = "/home/nick/diabloforum/tools/data/s3_audio_links.txt"
TOP_FILE = "/home/nick/diabloforum/tools/data/top.txt"
BOTTOM_FILE = "/home/nick/diabloforum/tools/data/bottom.txt"
VIDEO_FILE = "/home/nick/diabloforum/tools/data/video_list.html"
MP3_VIDEO_FILE = "/home/nick/diabloforum/tools/data/audio_list.html"
def safe_exec(cmd, shell=True, check=True):
"""安全执行命令,根据DRY_RUN决定是否实际执行"""
print(f"[{'DRY RUN' if DRY_RUN else '实际执行'}] {cmd}")
if not DRY_RUN:
result = subprocess.run(
cmd,
shell=shell,
check=check,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result
return None
def get_file_md5(file_path):
"""获取文件MD5值"""
result = safe_exec(f"md5sum {file_path}", check=True)
if result:
return result.stdout.split()[0]
return "模拟MD5" # 测试模式返回占位符
def s3_file_exists(s3_path):
"""检查S3上文件是否存在"""
result = safe_exec(f"s3cmd info {s3_path}", check=False)
if DRY_RUN:
return False # 测试模式默认不存在
return result.returncode == 0
def get_s3_md5(s3_path):
"""获取S3文件的MD5值"""
result = safe_exec(f"s3cmd info {s3_path}", check=True)
if result:
for line in result.stdout.splitlines():
if "MD5 sum" in line:
return line.split(": ")[1].strip()
return "模拟S3 MD5" # 测试模式返回占位符
def get_file_timestamp(file_path):
"""获取文件最后修改时间戳(DRY_RUN模式返回模拟时间)"""
if DRY_RUN:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
return datetime.fromtimestamp(os.path.getmtime(file_path)).strftime("%Y-%m-%d %H:%M:%S")
def natural_sort_key(filename):
"""自然排序键生成函数"""
name_without_ext = os.path.splitext(filename)[0]
match = re.match(r'(.*?)(\d+)$', name_without_ext)
if match:
main_part = match.group(1)
number_part = int(match.group(2))
else:
main_part = name_without_ext
number_part = 0
return (main_part, number_part)
def get_reference_time(time_arg, parser):
"""获取参考时间:优先使用命令行参数,其次使用上次运行时间,否则使用纪元时间"""
# 命令行参数优先
if time_arg:
try:
# 严格校验时间格式
return datetime.strptime(time_arg, "%Y-%m-%d %H:%M:%S")
except ValueError:
# 时间格式错误,打印帮助并退出
parser.error(f"无效的时间格式: '{time_arg}',正确格式应为: YYYY-MM-DD HH:MM:SS")
# 尝试读取上次运行时间
if os.path.exists(LAST_RUN_TIME_FILE):
try:
with open(LAST_RUN_TIME_FILE, 'r') as f:
time_str = f.read().strip()
return datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
except (ValueError, IOError) as e:
print(f"⚠️ 读取上次运行时间失败: {e},将使用纪元时间作为参考")
# 默认返回纪元时间(处理所有文件)
return datetime.fromtimestamp(0)
def save_current_run_time():
"""保存当前运行时间到文件"""
if DRY_RUN:
print(f"[DRY RUN] 模拟保存当前时间到 {LAST_RUN_TIME_FILE}")
return
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
with open(LAST_RUN_TIME_FILE, 'w') as f:
f.write(current_time)
print(f"✅ 已保存当前运行时间: {current_time}")
except IOError as e:
print(f"⚠️ 保存运行时间失败: {e}")
def process_mp4(local_mp4):
"""处理MP4文件:检查S3存在性、上传、生成链接"""
filename = os.path.basename(local_mp4)
s3_mp4_path = f"s3://{S3_BUCKET}/{S3_DIR}{filename}"
s3_mp4_url = f"http://{S3_BUCKET}/{S3_DIR}{filename}"
print(f"\n=== 处理MP4文件:{filename} ===")
print(f"[步骤1] 检查S3是否存在: {s3_mp4_path}")
# 检查S3存在性
s3_exists = s3_file_exists(s3_mp4_path)
s3_md5_check = False # 可改为True启用MD5校验
# MD5校验逻辑
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp4)
s3_md5 = get_s3_md5(s3_mp4_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
# 上传文件
if not s3_exists:
print("[步骤2] 开始上传到S3")
safe_exec(
f"s3cmd put --mime-type=\"video/mp4\" --acl-public --continue --check-md5 "
f"'{local_mp4}' '{s3_mp4_path}'"
)
# 生成链接
timestamp = get_file_timestamp(local_mp4)
link_line = f"<li><a href=\"{s3_mp4_url}\">{filename} ({timestamp})</a></li>"
safe_exec(f"echo '{link_line}' >> {LINK_FILE}")
print("[步骤3] 已添加链接到列表")
def process_mp3(local_mp4):
"""处理MP3文件:生成、检查S3存在性、上传、生成链接"""
filename = os.path.basename(local_mp4)
filename_noext = os.path.splitext(filename)[0]
mp3_filename = f"{filename_noext}.mp3"
# 路径计算
local_mp3_dir = os.path.join(LOCAL_DIR, MP3_LOCAL_SUBDIR)
local_mp3 = os.path.join(local_mp3_dir, mp3_filename)
s3_mp3_path = f"s3://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}{mp3_filename}"
s3_mp3_url = f"http://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}{mp3_filename}"
print(f"\n=== 处理MP3文件:{mp3_filename} ===")
# 确保本地MP3目录存在
safe_exec(f"mkdir -p {local_mp3_dir}")
# 检查并生成本地MP3
print(f"[步骤1] 检查本地MP3: {local_mp3}")
if not os.path.exists(local_mp3) or DRY_RUN:
if not DRY_RUN and not os.path.exists(local_mp3):
print("[生成] 本地不存在,使用ffmpeg提取音频")
safe_exec(
f"ffmpeg -y -i '{local_mp4}' -vn -acodec libmp3lame '{local_mp3}'"
)
else:
print("[DRY RUN] 模拟生成MP3文件")
# 检查S3存在性
print(f"[步骤2] 检查S3是否存在: {s3_mp3_path}")
s3_exists = s3_file_exists(s3_mp3_path)
s3_md5_check = False # 可改为True启用MD5校验
# MD5校验逻辑
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp3)
s3_md5 = get_s3_md5(s3_mp3_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
# 上传文件
if not s3_exists:
print("[步骤3] 开始上传到S3")
safe_exec(
f"s3cmd put --mime-type=\"audio/mpeg\" --acl-public --continue --check-md5 "
f"'{local_mp3}' '{s3_mp3_path}'"
)
# 生成链接
timestamp = get_file_timestamp(local_mp4)
link_line = f"<li><a href=\"{s3_mp3_url}\">{mp3_filename} ({timestamp})</a></li>"
safe_exec(f"echo '{link_line}' >> {MP3_LINK_FILE}")
print("[步骤4] 已添加链接到列表")
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(
description='上传MP4文件到S3并生成MP3',
formatter_class=argparse.RawTextHelpFormatter # 保留帮助信息的换行格式
)
parser.add_argument(
'--since',
help='只处理在此时间之后创建的文件,格式必须为:\n YYYY-MM-DD HH:MM:SS\n例如:\n --since "2023-10-01 08:30:00"'
)
args = parser.parse_args()
# 获取参考时间(传入parser用于错误处理)
reference_time = get_reference_time(args.since, parser)
print(f"⏱️ 参考时间: {reference_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"📌 只处理此时间之后创建的文件")
# 初始化链接文件
print("========================================")
if DRY_RUN:
print("🔴 当前为DRY RUN测试模式,不会实际执行操作")
else:
print("🟢 当前为正式模式,将执行实际操作")
# 清空并初始化链接文件
safe_exec(f"> {LINK_FILE}")
safe_exec(f"echo '<ol>' > {LINK_FILE}")
safe_exec(f"> {MP3_LINK_FILE}")
safe_exec(f"echo '<ol>' > {MP3_LINK_FILE}")
print(f"本地目录: {LOCAL_DIR}")
print(f"本地MP3目录: {os.path.join(LOCAL_DIR, MP3_LOCAL_SUBDIR)}")
print(f"S3 MP4目标: s3://{S3_BUCKET}/{S3_DIR}")
print(f"S3 MP3目标: s3://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}")
print("========================================")
# 获取并筛选MP4文件
mp4_files = []
for file in os.listdir(LOCAL_DIR):
if file.lower().endswith(".mp4"):
file_path = os.path.join(LOCAL_DIR, file)
# 获取文件创建时间
create_time = datetime.fromtimestamp(os.path.getctime(file_path))
# 只添加创建时间在参考时间之后的文件
if create_time > reference_time:
mp4_files.append(file_path)
# 使用自然排序
mp4_files.sort(key=lambda x: natural_sort_key(os.path.basename(x)))
# DRY_RUN模式下打印排序后的所有文件名(调试用)
if DRY_RUN:
print("\n📋 按处理顺序排列的MP4文件列表:")
for idx, file in enumerate(mp4_files, 1):
print(f" {idx}. {os.path.basename(file)}")
print(f"\n📊 共发现 {len(mp4_files)} 个符合条件的MP4文件")
# 处理每个MP4文件
processed_count = 0
for mp4_file in mp4_files:
processed_count += 1
print(f"\n===== 开始处理第 {processed_count}/{len(mp4_files)} 个文件 =====")
process_mp4(mp4_file)
process_mp3(mp4_file)
print("----------------------------------------")
# 生成最终HTML文件
print("\n[生成HTML列表]")
# 处理MP4 HTML
safe_exec(f"echo '</ol>' >> {LINK_FILE}")
safe_exec(f"cat {TOP_FILE} > {VIDEO_FILE}")
safe_exec(f"cat {LINK_FILE} >> {VIDEO_FILE}")
safe_exec(f"cat {BOTTOM_FILE} >> {VIDEO_FILE}")
safe_exec(
f"s3cmd put --mime-type=\"text/html\" --acl-public --continue --check-md5 "
f"'{VIDEO_FILE}' 's3://{S3_BUCKET}/tools/data/video_list.html'"
)
# 处理MP3 HTML
safe_exec(f"echo '</ol>' >> {MP3_LINK_FILE}")
safe_exec(f"cat {TOP_FILE} > {MP3_VIDEO_FILE}")
safe_exec(f"cat {MP3_LINK_FILE} >> {MP3_VIDEO_FILE}")
safe_exec(f"cat {BOTTOM_FILE} >> {MP3_VIDEO_FILE}")
safe_exec(
f"s3cmd put --mime-type=\"text/html\" --acl-public --continue --check-md5 "
f"'{MP3_VIDEO_FILE}' 's3://{S3_BUCKET}/tools/data/audio_list.html'"
)
# 保存当前运行时间
save_current_run_time()
print("\n🎉 所有文件处理完成!")
print(f"📊 处理统计:共发现 {len(mp4_files)} 个符合条件的MP4文件,实际处理 {processed_count} 个")
if DRY_RUN:
print("🔴 注意:测试模式未执行实际操作,请核对排序是否符合预期")
else:
print(f"🟢 MP4链接列表: {LINK_FILE}")
print(f"🟢 MP3链接列表: {MP3_LINK_FILE}")
if __name__ == "__main__":
main()
get_reference_time函数中,当--since参数格式错误时,通过parser.error()直接打印错误信息并展示完整帮助文档,然后退出程序--since参数添加了更详细的帮助说明,包括格式要求和示例RawTextHelpFormatter保留帮助信息中的换行格式,提高可读性python3 script.py --since "2023/10/01 12:34"
usage: script.py [-h] [--since SINCE]
script.py: error: 无效的时间格式: '2023/10/01 12:34',正确格式应为: YYYY-MM-DD HH:MM:SS
mp4_files列表(最终出现在 HTML 列表中),但可以在process_mp4/process_mp3阶段跳过实际处理(如上传、生成 MP3 等操作)。#!/usr/bin/env python3
import os
import subprocess
import re
import argparse
from datetime import datetime
# -------------------------- 1. 配置参数(必改) --------------------------
LOCAL_DIR = "/home/nick/.wine/drive_c/users/nick/Documents/wechat" # 本地视频文件夹
S3_BUCKET = "www.staroceans.org" # S3桶名
S3_DIR = "video/wechat/" # S3目标目录(末尾加/)
MP3_LOCAL_SUBDIR = "mp3" # 本地MP3子目录名
MP3_S3_SUBDIR = "mp3/" # S3上MP3子目录名(末尾加/)
LAST_RUN_TIME_FILE = "/home/nick/diabloforum/tools/data/last_run_time.txt" # 上次运行时间记录文件
# -------------------------- 2. 处理开关(新增:自定义跳过规则) --------------------------
# 定义需要跳过处理(但仍加入列表)的文件名规则(例如:包含"skip"的文件)
def should_skip_process(file_name):
"""判断是否需要跳过处理(仅生成列表,不上传/转码)"""
# 可根据需求修改规则,例如:文件名包含"skip"则跳过
return "skip" in file_name.lower()
# -------------------------- 3. 测试模式开关(选填) --------------------------
DRY_RUN = False # 测试时改True(只打印不执行),正式上传改False
# -------------------------- 4. 链接文件路径配置 --------------------------
LINK_FILE = "/home/nick/diabloforum/tools/data/s3_video_links.txt"
MP3_LINK_FILE = "/home/nick/diabloforum/tools/data/s3_audio_links.txt"
TOP_FILE = "/home/nick/diabloforum/tools/data/top.txt"
BOTTOM_FILE = "/home/nick/diabloforum/tools/data/bottom.txt"
VIDEO_FILE = "/home/nick/diabloforum/tools/data/video_list.html"
MP3_VIDEO_FILE = "/home/nick/diabloforum/tools/data/audio_list.html"
def safe_exec(cmd, shell=True, check=True):
"""安全执行命令,根据DRY_RUN决定是否实际执行"""
print(f"[{'DRY RUN' if DRY_RUN else '实际执行'}] {cmd}")
if not DRY_RUN:
result = subprocess.run(
cmd,
shell=shell,
check=check,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result
return None
def get_file_md5(file_path):
"""获取文件MD5值"""
result = safe_exec(f"md5sum {file_path}", check=True)
if result:
return result.stdout.split()[0]
return "模拟MD5" # 测试模式返回占位符
def s3_file_exists(s3_path):
"""检查S3上文件是否存在"""
result = safe_exec(f"s3cmd info {s3_path}", check=False)
if DRY_RUN:
return False # 测试模式默认不存在
return result.returncode == 0
def get_s3_md5(s3_path):
"""获取S3文件的MD5值"""
result = safe_exec(f"s3cmd info {s3_path}", check=True)
if result:
for line in result.stdout.splitlines():
if "MD5 sum" in line:
return line.split(": ")[1].strip()
return "模拟S3 MD5" # 测试模式返回占位符
def get_file_timestamp(file_path):
"""获取文件最后修改时间戳(DRY_RUN模式返回模拟时间)"""
if DRY_RUN:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
return datetime.fromtimestamp(os.path.getmtime(file_path)).strftime("%Y-%m-%d %H:%M:%S")
def natural_sort_key(filename):
"""自然排序键生成函数"""
name_without_ext = os.path.splitext(filename)[0]
match = re.match(r'(.*?)(\d+)$', name_without_ext)
if match:
main_part = match.group(1)
number_part = int(match.group(2))
else:
main_part = name_without_ext
number_part = 0
return (main_part, number_part)
def get_reference_time(time_arg, parser):
"""获取参考时间:优先使用命令行参数,其次使用上次运行时间,否则使用纪元时间"""
if time_arg:
try:
return datetime.strptime(time_arg, "%Y-%m-%d %H:%M:%S")
except ValueError:
parser.error(f"无效的时间格式: '{time_arg}',正确格式应为: YYYY-MM-DD HH:MM:SS")
if os.path.exists(LAST_RUN_TIME_FILE):
try:
with open(LAST_RUN_TIME_FILE, 'r') as f:
time_str = f.read().strip()
return datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
except (ValueError, IOError) as e:
print(f"⚠️ 读取上次运行时间失败: {e},将使用纪元时间作为参考")
return datetime.fromtimestamp(0)
def save_current_run_time():
"""保存当前运行时间到文件"""
if DRY_RUN:
print(f"[DRY RUN] 模拟保存当前时间到 {LAST_RUN_TIME_FILE}")
return
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
with open(LAST_RUN_TIME_FILE, 'w') as f:
f.write(current_time)
print(f"✅ 已保存当前运行时间: {current_time}")
except IOError as e:
print(f"⚠️ 保存运行时间失败: {e}")
def process_mp4(local_mp4):
"""处理MP4文件:判断是否跳过处理,跳过则仅生成列表链接"""
filename = os.path.basename(local_mp4)
s3_mp4_path = f"s3://{S3_BUCKET}/{S3_DIR}{filename}"
s3_mp4_url = f"http://{S3_BUCKET}/{S3_DIR}{filename}"
print(f"\n=== 处理MP4文件:{filename} ===")
# 检查是否需要跳过处理
if should_skip_process(filename):
print(f"[跳过处理] 符合跳过规则,仅生成列表链接")
else:
# 正常处理流程(仅非跳过文件执行)
print(f"[步骤1] 检查S3是否存在: {s3_mp4_path}")
s3_exists = s3_file_exists(s3_mp4_path)
s3_md5_check = False # 可改为True启用MD5校验
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp4)
s3_md5 = get_s3_md5(s3_mp4_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
if not s3_exists:
print("[步骤2] 开始上传到S3")
safe_exec(
f"s3cmd put --mime-type=\"video/mp4\" --acl-public --continue --check-md5 "
f"'{local_mp4}' '{s3_mp4_path}'"
)
# 无论是否跳过处理,都生成列表链接(核心需求)
timestamp = get_file_timestamp(local_mp4)
link_line = f"<li><a href=\"{s3_mp4_url}\">{filename} ({timestamp})</a></li>"
safe_exec(f"echo '{link_line}' >> {LINK_FILE}")
print("[步骤3] 已添加链接到列表")
def process_mp3(local_mp4):
"""处理MP3文件:判断是否跳过处理,跳过则仅生成列表链接(若需要)"""
filename = os.path.basename(local_mp4)
filename_noext = os.path.splitext(filename)[0]
mp3_filename = f"{filename_noext}.mp3"
local_mp3_dir = os.path.join(LOCAL_DIR, MP3_LOCAL_SUBDIR)
local_mp3 = os.path.join(local_mp3_dir, mp3_filename)
s3_mp3_path = f"s3://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}{mp3_filename}"
s3_mp3_url = f"http://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}{mp3_filename}"
print(f"\n=== 处理MP3文件:{mp3_filename} ===")
# 检查是否需要跳过处理
if should_skip_process(filename):
print(f"[跳过处理] 符合跳过规则,仅生成列表链接(若需要)")
else:
# 正常处理流程(仅非跳过文件执行)
safe_exec(f"mkdir -p {local_mp3_dir}")
print(f"[步骤1] 检查本地MP3: {local_mp3}")
if not os.path.exists(local_mp3) or DRY_RUN:
if not DRY_RUN and not os.path.exists(local_mp3):
print("[生成] 本地不存在,使用ffmpeg提取音频")
safe_exec(
f"ffmpeg -y -i '{local_mp4}' -vn -acodec libmp3lame '{local_mp3}'"
)
else:
print("[DRY RUN] 模拟生成MP3文件")
print(f"[步骤2] 检查S3是否存在: {s3_mp3_path}")
s3_exists = s3_file_exists(s3_mp3_path)
s3_md5_check = False # 可改为True启用MD5校验
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp3)
s3_md5 = get_s3_md5(s3_mp3_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
if not s3_exists:
print("[步骤3] 开始上传到S3")
safe_exec(
f"s3cmd put --mime-type=\"audio/mpeg\" --acl-public --continue --check-md5 "
f"'{local_mp3}' '{s3_mp3_path}'"
)
# 无论是否跳过处理,都生成MP3列表链接(若需要)
timestamp = get_file_timestamp(local_mp4)
link_line = f"<li><a href=\"{s3_mp3_url}\">{mp3_filename} ({timestamp})</a></li>"
safe_exec(f"echo '{link_line}' >> {MP3_LINK_FILE}")
print("[步骤4] 已添加链接到列表")
def main():
parser = argparse.ArgumentParser(
description='上传MP4文件到S3并生成MP3',
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
'--since',
help='只处理在此时间之后创建的文件,格式必须为:\n YYYY-MM-DD HH:MM:SS\n例如:\n --since "2023-10-01 08:30:00"'
)
args = parser.parse_args()
reference_time = get_reference_time(args.since, parser)
print(f"⏱️ 参考时间: {reference_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"📌 只处理此时间之后创建的文件")
print("========================================")
if DRY_RUN:
print("🔴 当前为DRY RUN测试模式,不会实际执行操作")
else:
print("🟢 当前为正式模式,将执行实际操作")
safe_exec(f"> {LINK_FILE}")
safe_exec(f"echo '<ol>' > {LINK_FILE}")
safe_exec(f"> {MP3_LINK_FILE}")
safe_exec(f"echo '<ol>' > {MP3_LINK_FILE}")
print(f"本地目录: {LOCAL_DIR}")
print(f"本地MP3目录: {os.path.join(LOCAL_DIR, MP3_LOCAL_SUBDIR)}")
print(f"S3 MP4目标: s3://{S3_BUCKET}/{S3_DIR}")
print(f"S3 MP3目标: s3://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}")
print("========================================")
# 获取并筛选MP4文件(所有符合时间条件的都加入列表,不在这里过滤)
mp4_files = []
for file in os.listdir(LOCAL_DIR):
if file.lower().endswith(".mp4"):
file_path = os.path.join(LOCAL_DIR, file)
create_time = datetime.fromtimestamp(os.path.getctime(file_path))
if create_time > reference_time:
mp4_files.append(file_path) # 全部加入列表,处理阶段再判断是否跳过
mp4_files.sort(key=lambda x: natural_sort_key(os.path.basename(x)))
if DRY_RUN:
print("\n📋 按处理顺序排列的MP4文件列表:")
for idx, file in enumerate(mp4_files, 1):
print(f" {idx}. {os.path.basename(file)}")
print(f"\n📊 共发现 {len(mp4_files)} 个符合条件的MP4文件")
# 处理每个MP4文件(包含跳过逻辑)
processed_count = 0
for mp4_file in mp4_files:
processed_count += 1
print(f"\n===== 开始处理第 {processed_count}/{len(mp4_files)} 个文件 =====")
process_mp4(mp4_file)
process_mp3(mp4_file)
print("----------------------------------------")
# 生成最终HTML文件
print("\n[生成HTML列表]")
safe_exec(f"echo '</ol>' >> {LINK_FILE}")
safe_exec(f"cat {TOP_FILE} > {VIDEO_FILE}")
safe_exec(f"cat {LINK_FILE} >> {VIDEO_FILE}")
safe_exec(f"cat {BOTTOM_FILE} >> {VIDEO_FILE}")
safe_exec(
f"s3cmd put --mime-type=\"text/html\" --acl-public --continue --check-md5 "
f"'{VIDEO_FILE}' 's3://{S3_BUCKET}/tools/data/video_list.html'"
)
safe_exec(f"echo '</ol>' >> {MP3_LINK_FILE}")
safe_exec(f"cat {TOP_FILE} > {MP3_VIDEO_FILE}")
safe_exec(f"cat {MP3_LINK_FILE} >> {MP3_VIDEO_FILE}")
safe_exec(f"cat {BOTTOM_FILE} >> {MP3_VIDEO_FILE}")
safe_exec(
f"s3cmd put --mime-type=\"text/html\" --acl-public --continue --check-md5 "
f"'{MP3_VIDEO_FILE}' 's3://{S3_BUCKET}/tools/data/audio_list.html'"
)
save_current_run_time()
print("\n🎉 所有文件处理完成!")
print(f"📊 处理统计:共发现 {len(mp4_files)} 个符合条件的MP4文件,实际处理 {processed_count} 个")
if DRY_RUN:
print("🔴 注意:测试模式未执行实际操作,请核对排序是否符合预期")
else:
print(f"🟢 MP4链接列表: {LINK_FILE}")
print(f"🟢 MP3链接列表: {MP3_LINK_FILE}")
if __name__ == "__main__":
main()
should_skip_process(file_name)函数,用于判断文件是否需要跳过处理(例如文件名包含 "skip" 时跳过)。您可以根据实际需求修改该函数的判断逻辑(如特定前缀、后缀等)。process_mp4和process_mp3函数中,先通过should_skip_process判断是否跳过:mp4_files时,仅根据时间筛选,不过滤任何符合时间条件的 MP4 文件,确保所有文件都会被加入列表。should_skip_process函数的规则(例如命名为skip_meeting.mp4)。process_mp4和process_mp3处理。具体修改如下:#!/usr/bin/env python3
import os
import subprocess
import re
import argparse
from datetime import datetime
# -------------------------- 1. 配置参数(必改) --------------------------
LOCAL_DIR = "/home/nick/.wine/drive_c/users/nick/Documents/wechat" # 本地视频文件夹
S3_BUCKET = "www.staroceans.org" # S3桶名
S3_DIR = "video/wechat/" # S3目标目录(末尾加/)
MP3_LOCAL_SUBDIR = "mp3" # 本地MP3子目录名
MP3_S3_SUBDIR = "mp3/" # S3上MP3子目录名(末尾加/)
LAST_RUN_TIME_FILE = "/home/nick/diabloforum/tools/data/last_run_time.txt" # 上次运行时间记录文件
# -------------------------- 2. 测试模式开关(选填) --------------------------
DRY_RUN = False # 测试时改True(只打印不执行),正式上传改False
# -------------------------- 3. 链接文件路径配置 --------------------------
LINK_FILE = "/home/nick/diabloforum/tools/data/s3_video_links.txt"
MP3_LINK_FILE = "/home/nick/diabloforum/tools/data/s3_audio_links.txt"
TOP_FILE = "/home/nick/diabloforum/tools/data/top.txt"
BOTTOM_FILE = "/home/nick/diabloforum/tools/data/bottom.txt"
VIDEO_FILE = "/home/nick/diabloforum/tools/data/video_list.html"
MP3_VIDEO_FILE = "/home/nick/diabloforum/tools/data/audio_list.html"
def safe_exec(cmd, shell=True, check=True):
"""安全执行命令,根据DRY_RUN决定是否实际执行"""
print(f"[{'DRY RUN' if DRY_RUN else '实际执行'}] {cmd}")
if not DRY_RUN:
result = subprocess.run(
cmd,
shell=shell,
check=check,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return result
return None
def get_file_md5(file_path):
"""获取文件MD5值"""
result = safe_exec(f"md5sum {file_path}", check=True)
if result:
return result.stdout.split()[0]
return "模拟MD5" # 测试模式返回占位符
def s3_file_exists(s3_path):
"""检查S3上文件是否存在"""
result = safe_exec(f"s3cmd info {s3_path}", check=False)
if DRY_RUN:
return False # 测试模式默认不存在
return result.returncode == 0
def get_s3_md5(s3_path):
"""获取S3文件的MD5值"""
result = safe_exec(f"s3cmd info {s3_path}", check=True)
if result:
for line in result.stdout.splitlines():
if "MD5 sum" in line:
return line.split(": ")[1].strip()
return "模拟S3 MD5" # 测试模式返回占位符
def get_file_timestamp(file_path):
"""获取文件最后修改时间戳(DRY_RUN模式返回模拟时间)"""
if DRY_RUN:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
return datetime.fromtimestamp(os.path.getmtime(file_path)).strftime("%Y-%m-%d %H:%M:%S")
def natural_sort_key(filename):
"""自然排序键生成函数"""
name_without_ext = os.path.splitext(filename)[0]
match = re.match(r'(.*?)(\d+)$', name_without_ext)
if match:
main_part = match.group(1)
number_part = int(match.group(2))
else:
main_part = name_without_ext
number_part = 0
return (main_part, number_part)
def get_reference_time(time_arg, parser):
"""获取参考时间:优先使用命令行参数,其次使用上次运行时间,否则使用纪元时间"""
if time_arg:
try:
return datetime.strptime(time_arg, "%Y-%m-%d %H:%M:%S")
except ValueError:
parser.error(f"无效的时间格式: '{time_arg}',正确格式应为: YYYY-MM-DD HH:MM:SS")
if os.path.exists(LAST_RUN_TIME_FILE):
try:
with open(LAST_RUN_TIME_FILE, 'r') as f:
time_str = f.read().strip()
return datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
except (ValueError, IOError) as e:
print(f"⚠️ 读取上次运行时间失败: {e},将使用纪元时间作为参考")
return datetime.fromtimestamp(0)
def save_current_run_time():
"""保存当前运行时间到文件"""
if DRY_RUN:
print(f"[DRY RUN] 模拟保存当前时间到 {LAST_RUN_TIME_FILE}")
return
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
with open(LAST_RUN_TIME_FILE, 'w') as f:
f.write(current_time)
print(f"✅ 已保存当前运行时间: {current_time}")
except IOError as e:
print(f"⚠️ 保存运行时间失败: {e}")
def process_mp4(local_mp4):
"""处理MP4文件:检查S3存在性、上传、生成链接"""
filename = os.path.basename(local_mp4)
s3_mp4_path = f"s3://{S3_BUCKET}/{S3_DIR}{filename}"
s3_mp4_url = f"http://{S3_BUCKET}/{S3_DIR}{filename}"
print(f"\n=== 处理MP4文件:{filename} ===")
print(f"[步骤1] 检查S3是否存在: {s3_mp4_path}")
# 检查S3存在性
s3_exists = s3_file_exists(s3_mp4_path)
s3_md5_check = False # 可改为True启用MD5校验
# MD5校验逻辑
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp4)
s3_md5 = get_s3_md5(s3_mp4_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
# 上传文件
if not s3_exists:
print("[步骤2] 开始上传到S3")
safe_exec(
f"s3cmd put --mime-type=\"video/mp4\" --acl-public --continue --check-md5 "
f"'{local_mp4}' '{s3_mp4_path}'"
)
# 生成链接
timestamp = get_file_timestamp(local_mp4)
link_line = f"<li><a href=\"{s3_mp4_url}\">{filename} ({timestamp})</a></li>"
safe_exec(f"echo '{link_line}' >> {LINK_FILE}")
print("[步骤3] 已添加链接到列表")
def process_mp3(local_mp4):
"""处理MP3文件:生成、检查S3存在性、上传、生成链接"""
filename = os.path.basename(local_mp4)
filename_noext = os.path.splitext(filename)[0]
mp3_filename = f"{filename_noext}.mp3"
# 路径计算
local_mp3_dir = os.path.join(LOCAL_DIR, MP3_LOCAL_SUBDIR)
local_mp3 = os.path.join(local_mp3_dir, mp3_filename)
s3_mp3_path = f"s3://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}{mp3_filename}"
s3_mp3_url = f"http://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}{mp3_filename}"
print(f"\n=== 处理MP3文件:{mp3_filename} ===")
# 确保本地MP3目录存在
safe_exec(f"mkdir -p {local_mp3_dir}")
# 检查并生成本地MP3
print(f"[步骤1] 检查本地MP3: {local_mp3}")
if not os.path.exists(local_mp3) or DRY_RUN:
if not DRY_RUN and not os.path.exists(local_mp3):
print("[生成] 本地不存在,使用ffmpeg提取音频")
safe_exec(
f"ffmpeg -y -i '{local_mp4}' -vn -acodec libmp3lame '{local_mp3}'"
)
else:
print("[DRY RUN] 模拟生成MP3文件")
# 检查S3存在性
print(f"[步骤2] 检查S3是否存在: {s3_mp3_path}")
s3_exists = s3_file_exists(s3_mp3_path)
s3_md5_check = False # 可改为True启用MD5校验
# MD5校验逻辑
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp3)
s3_md5 = get_s3_md5(s3_mp3_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
# 上传文件
if not s3_exists:
print("[步骤3] 开始上传到S3")
safe_exec(
f"s3cmd put --mime-type=\"audio/mpeg\" --acl-public --continue --check-md5 "
f"'{local_mp3}' '{s3_mp3_path}'"
)
# 生成链接
timestamp = get_file_timestamp(local_mp4)
link_line = f"<li><a href=\"{s3_mp3_url}\">{mp3_filename} ({timestamp})</a></li>"
safe_exec(f"echo '{link_line}' >> {MP3_LINK_FILE}")
print("[步骤4] 已添加链接到列表")
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(
description='上传MP4文件到S3并生成MP3',
formatter_class=argparse.RawTextHelpFormatter # 保留帮助信息的换行格式
)
parser.add_argument(
'--since',
help='只处理在此时间之后创建的文件,格式必须为:\n YYYY-MM-DD HH:MM:SS\n例如:\n --since "2023-10-01 08:30:00"'
)
args = parser.parse_args()
# 获取参考时间(传入parser用于错误处理)
reference_time = get_reference_time(args.since, parser)
print(f"⏱️ 参考时间: {reference_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"📌 只处理此时间之后创建的文件(但所有文件都会加入列表)")
# 初始化链接文件
print("========================================")
if DRY_RUN:
print("🔴 当前为DRY RUN测试模式,不会实际执行操作")
else:
print("🟢 当前为正式模式,将执行实际操作")
# 清空并初始化链接文件
safe_exec(f"> {LINK_FILE}")
safe_exec(f"echo '<ol>' > {LINK_FILE}")
safe_exec(f"> {MP3_LINK_FILE}")
safe_exec(f"echo '<ol>' > {MP3_LINK_FILE}")
print(f"本地目录: {LOCAL_DIR}")
print(f"本地MP3目录: {os.path.join(LOCAL_DIR, MP3_LOCAL_SUBDIR)}")
print(f"S3 MP4目标: s3://{S3_BUCKET}/{S3_DIR}")
print(f"S3 MP3目标: s3://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}")
print("========================================")
# 获取所有MP4文件(不按时间筛选,全部加入列表)
mp4_files = []
for file in os.listdir(LOCAL_DIR):
if file.lower().endswith(".mp4"):
file_path = os.path.join(LOCAL_DIR, file) # 保存绝对路径,用于后续获取创建时间
mp4_files.append(file_path)
# 使用自然排序
mp4_files.sort(key=lambda x: natural_sort_key(os.path.basename(x)))
# DRY_RUN模式下打印所有文件(调试用)
if DRY_RUN:
print("\n📋 按处理顺序排列的所有MP4文件列表:")
for idx, file in enumerate(mp4_files, 1):
print(f" {idx}. {os.path.basename(file)}")
print(f"\n📊 共发现 {len(mp4_files)} 个MP4文件")
# 处理每个MP4文件(仅对创建时间晚于参考时间的文件执行process_*)
processed_count = 0
skipped_count = 0
for mp4_file in mp4_files:
# 获取当前文件的创建时间(基于绝对路径)
create_time = datetime.fromtimestamp(os.path.getctime(mp4_file))
filename = os.path.basename(mp4_file)
# 检查是否需要处理
if create_time > reference_time:
processed_count += 1
print(f"\n===== 开始处理第 {processed_count}/{len(mp4_files)} 个文件({filename}) =====")
process_mp4(mp4_file)
process_mp3(mp4_file)
print("----------------------------------------")
else:
skipped_count += 1
print(f"\n===== 跳过文件({filename}):创建时间早于参考时间 =====")
# 即使跳过处理,也生成链接(如果需要所有文件都在列表中)
# 若不需要跳过的文件出现在列表,可删除以下3行
timestamp = get_file_timestamp(mp4_file)
s3_mp4_url = f"http://{S3_BUCKET}/{S3_DIR}{filename}"
safe_exec(f"echo '<li><a href=\"{s3_mp4_url}\">{filename}(已跳过处理)</a></li>' >> {LINK_FILE}")
# 生成最终HTML文件
print("\n[生成HTML列表]")
# 处理MP4 HTML
safe_exec(f"echo '</ol>' >> {LINK_FILE}")
safe_exec(f"cat {TOP_FILE} > {VIDEO_FILE}")
safe_exec(f"cat {LINK_FILE} >> {VIDEO_FILE}")
safe_exec(f"cat {BOTTOM_FILE} >> {VIDEO_FILE}")
safe_exec(
f"s3cmd put --mime-type=\"text/html\" --acl-public --continue --check-md5 "
f"'{VIDEO_FILE}' 's3://{S3_BUCKET}/tools/data/video_list.html'"
)
# 处理MP3 HTML(仅包含已处理的文件的MP3链接)
safe_exec(f"echo '</ol>' >> {MP3_LINK_FILE}")
safe_exec(f"cat {TOP_FILE} > {MP3_VIDEO_FILE}")
safe_exec(f"cat {MP3_LINK_FILE} >> {MP3_VIDEO_FILE}")
safe_exec(f"cat {BOTTOM_FILE} >> {MP3_VIDEO_FILE}")
safe_exec(
f"s3cmd put --mime-type=\"text/html\" --acl-public --continue --check-md5 "
f"'{MP3_VIDEO_FILE}' 's3://{S3_BUCKET}/tools/data/audio_list.html'"
)
# 保存当前运行时间
save_current_run_time()
print("\n🎉 所有文件处理完成!")
print(f"📊 处理统计:共 {len(mp4_files)} 个MP4文件,其中 {processed_count} 个符合时间条件并处理,{skipped_count} 个跳过处理")
if DRY_RUN:
print("🔴 注意:测试模式未执行实际操作,请核对排序是否符合预期")
else:
print(f"🟢 MP4链接列表: {LINK_FILE}")
print(f"🟢 MP3链接列表: {MP3_LINK_FILE}")
if __name__ == "__main__":
main()
if create_time > reference_time判断,所有.mp4文件都会被加入mp4_files列表(通过绝对路径存储,确保后续能获取创建时间)。mp4_file重新获取create_time(datetime.fromtimestamp(os.path.getctime(mp4_file))),仅对create_time > reference_time的文件执行process_mp4和process_mp3。skipped_count统计跳过的文件数量,输出更清晰的处理结果。#!/usr/bin/env python3
import os
import subprocess
import re
import argparse
from datetime import datetime
# -------------------------- 配置参数(保持不变) --------------------------
LOCAL_DIR = "/home/nick/.wine/drive_c/users/nick/Documents/wechat"
S3_BUCKET = "www.staroceans.org"
S3_DIR = "video/wechat/"
MP3_LOCAL_SUBDIR = "mp3"
MP3_S3_SUBDIR = "mp3/"
LAST_RUN_TIME_FILE = "/home/nick/diabloforum/tools/data/last_run_time.txt"
DRY_RUN = False
LINK_FILE = "/home/nick/diabloforum/tools/data/s3_video_links.txt"
MP3_LINK_FILE = "/home/nick/diabloforum/tools/data/s3_audio_links.txt"
TOP_FILE = "/home/nick/diabloforum/tools/data/top.txt"
BOTTOM_FILE = "/home/nick/diabloforum/tools/data/bottom.txt"
VIDEO_FILE = "/home/nick/diabloforum/tools/data/video_list.html"
MP3_VIDEO_FILE = "/home/nick/diabloforum/tools/data/audio_list.html"
# -------------------------- 工具函数(保持不变) --------------------------
def safe_exec(cmd, shell=True, check=True):
print(f"[{'DRY RUN' if DRY_RUN else '实际执行'}] {cmd}")
if not DRY_RUN:
return subprocess.run(cmd, shell=shell, check=check, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
return None
def get_file_md5(file_path):
result = safe_exec(f"md5sum {file_path}", check=True)
return result.stdout.split()[0] if result else "模拟MD5"
def s3_file_exists(s3_path):
result = safe_exec(f"s3cmd info {s3_path}", check=False)
return False if DRY_RUN else result.returncode == 0
def get_s3_md5(s3_path):
result = safe_exec(f"s3cmd info {s3_path}", check=True)
if result:
for line in result.stdout.splitlines():
if "MD5 sum" in line:
return line.split(": ")[1].strip()
return "模拟S3 MD5"
def get_file_timestamp(file_path):
if DRY_RUN:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
return datetime.fromtimestamp(os.path.getmtime(file_path)).strftime("%Y-%m-%d %H:%M:%S")
def natural_sort_key(filename):
name_without_ext = os.path.splitext(filename)[0]
match = re.match(r'(.*?)(\d+)$', name_without_ext)
if match:
return (match.group(1), int(match.group(2)))
return (name_without_ext, 0)
def get_reference_time(time_arg, parser):
if time_arg:
try:
return datetime.strptime(time_arg, "%Y-%m-%d %H:%M:%S")
except ValueError:
parser.error(f"无效的时间格式: '{time_arg}',正确格式应为: YYYY-MM-DD HH:MM:SS")
if os.path.exists(LAST_RUN_TIME_FILE):
try:
with open(LAST_RUN_TIME_FILE, 'r') as f:
return datetime.strptime(f.read().strip(), "%Y-%m-%d %H:%M:%S")
except (ValueError, IOError) as e:
print(f"⚠️ 读取上次运行时间失败: {e},将使用纪元时间作为参考")
return datetime.fromtimestamp(0)
def save_current_run_time():
if DRY_RUN:
print(f"[DRY RUN] 模拟保存当前时间到 {LAST_RUN_TIME_FILE}")
return
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
with open(LAST_RUN_TIME_FILE, 'w') as f:
f.write(current_time)
print(f"✅ 已保存当前运行时间: {current_time}")
except IOError as e:
print(f"⚠️ 保存运行时间失败: {e}")
# -------------------------- 处理函数(保持不变) --------------------------
def process_mp4(local_mp4):
filename = os.path.basename(local_mp4)
s3_mp4_path = f"s3://{S3_BUCKET}/{S3_DIR}{filename}"
s3_mp4_url = f"http://{S3_BUCKET}/{S3_DIR}{filename}"
print(f"\n=== 处理MP4文件:{filename} ===")
print(f"[步骤1] 检查S3是否存在: {s3_mp4_path}")
s3_exists = s3_file_exists(s3_mp4_path)
s3_md5_check = False
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp4)
s3_md5 = get_s3_md5(s3_mp4_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
if not s3_exists:
print("[步骤2] 开始上传到S3")
safe_exec(f"s3cmd put --mime-type=\"video/mp4\" --acl-public --continue --check-md5 '{local_mp4}' '{s3_mp4_path}'")
timestamp = get_file_timestamp(local_mp4)
link_line = f"<li><a href=\"{s3_mp4_url}\">{filename} ({timestamp})</a></li>"
safe_exec(f"echo '{link_line}' >> {LINK_FILE}")
print("[步骤3] 已添加链接到列表")
def process_mp3(local_mp4):
filename = os.path.basename(local_mp4)
filename_noext = os.path.splitext(filename)[0]
mp3_filename = f"{filename_noext}.mp3"
local_mp3_dir = os.path.join(LOCAL_DIR, MP3_LOCAL_SUBDIR)
local_mp3 = os.path.join(local_mp3_dir, mp3_filename)
s3_mp3_path = f"s3://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}{mp3_filename}"
s3_mp3_url = f"http://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}{mp3_filename}"
print(f"\n=== 处理MP3文件:{mp3_filename} ===")
safe_exec(f"mkdir -p {local_mp3_dir}")
print(f"[步骤1] 检查本地MP3: {local_mp3}")
if not os.path.exists(local_mp3) or DRY_RUN:
if not DRY_RUN and not os.path.exists(local_mp3):
print("[生成] 本地不存在,使用ffmpeg提取音频")
safe_exec(f"ffmpeg -y -i '{local_mp4}' -vn -acodec libmp3lame '{local_mp3}'")
else:
print("[DRY RUN] 模拟生成MP3文件")
print(f"[步骤2] 检查S3是否存在: {s3_mp3_path}")
s3_exists = s3_file_exists(s3_mp3_path)
s3_md5_check = False
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp3)
s3_md5 = get_s3_md5(s3_mp3_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
if not s3_exists:
print("[步骤3] 开始上传到S3")
safe_exec(f"s3cmd put --mime-type=\"audio/mpeg\" --acl-public --continue --check-md5 '{local_mp3}' '{s3_mp3_path}'")
timestamp = get_file_timestamp(local_mp4)
link_line = f"<li><a href=\"{s3_mp3_url}\">{mp3_filename} ({timestamp})</a></li>"
safe_exec(f"echo '{link_line}' >> {MP3_LINK_FILE}")
print("[步骤4] 已添加链接到列表")
# -------------------------- 主函数(仅修改文件筛选和处理判断) --------------------------
def main():
parser = argparse.ArgumentParser(
description='上传MP4文件到S3并生成MP3',
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
'--since',
help='只处理在此时间之后创建的文件,格式必须为:\n YYYY-MM-DD HH:MM:SS\n例如:\n --since "2023-10-01 08:30:00"'
)
args = parser.parse_args()
reference_time = get_reference_time(args.since, parser)
print(f"⏱️ 参考时间: {reference_time.strftime('%Y-%m-%d %H:%M:%S')}")
print("========================================")
if DRY_RUN:
print("🔴 当前为DRY RUN测试模式")
else:
print("🟢 当前为正式模式")
safe_exec(f"> {LINK_FILE}")
safe_exec(f"echo '<ol>' > {LINK_FILE}")
safe_exec(f"> {MP3_LINK_FILE}")
safe_exec(f"echo '<ol>' > {MP3_LINK_FILE}")
# 1. 收集所有MP4文件(不筛选时间)
mp4_files = []
for file in os.listdir(LOCAL_DIR):
if file.lower().endswith(".mp4"):
mp4_files.append(os.path.join(LOCAL_DIR, file)) # 存绝对路径
mp4_files.sort(key=lambda x: natural_sort_key(os.path.basename(x)))
# 2. 处理文件(仅对新文件执行process_mp4/process_mp3)
processed_count = 0
for mp4_file in mp4_files:
filename = os.path.basename(mp4_file)
create_time = datetime.fromtimestamp(os.path.getctime(mp4_file)) # 用绝对路径取创建时间
# 核心修改:仅此处判断时间
if create_time > reference_time:
processed_count += 1
print(f"\n===== 处理第 {processed_count}/{len(mp4_files)} 个文件:{filename} =====")
process_mp4(mp4_file) # 完整保留MP3处理
process_mp3(mp4_file)
else:
print(f"\n===== 跳过文件:{filename}(创建时间早于参考时间) =====")
# 如需将跳过的文件加入列表,取消下面注释
# timestamp = get_file_timestamp(mp4_file)
# safe_exec(f"echo '<li>{filename}(已跳过)</li>' >> {LINK_FILE}")
# 生成HTML(保持不变)
print("\n[生成HTML列表]")
safe_exec(f"echo '</ol>' >> {LINK_FILE}")
safe_exec(f"cat {TOP_FILE} > {VIDEO_FILE}")
safe_exec(f"cat {LINK_FILE} >> {VIDEO_FILE}")
safe_exec(f"cat {BOTTOM_FILE} >> {VIDEO_FILE}")
safe_exec(f"s3cmd put --mime-type=\"text/html\" --acl-public '{VIDEO_FILE}' 's3://{S3_BUCKET}/tools/data/video_list.html'")
safe_exec(f"echo '</ol>' >> {MP3_LINK_FILE}")
safe_exec(f"cat {TOP_FILE} > {MP3_VIDEO_FILE}")
safe_exec(f"cat {MP3_LINK_FILE} >> {MP3_VIDEO_FILE}")
safe_exec(f"cat {BOTTOM_FILE} >> {MP3_VIDEO_FILE}")
safe_exec(f"s3cmd put --mime-type=\"text/html\" --acl-public '{MP3_VIDEO_FILE}' 's3://{S3_BUCKET}/tools/data/audio_list.html'")
save_current_run_time()
print(f"\n🎉 完成:共{len(mp4_files)}个文件,处理了{processed_count}个")
if __name__ == "__main__":
main()
mp4_files包含所有 MP4(通过绝对路径存储)if create_time > reference_time才执行process_mp4和process_mp3process_mp4和process_mp3中添加skip_uploading参数,仅跳过上传步骤但保留列表生成,核心修改如下:#!/usr/bin/env python3
import os
import subprocess
import re
import argparse
from datetime import datetime
# -------------------------- 配置参数(不变) --------------------------
LOCAL_DIR = "/home/nick/.wine/drive_c/users/nick/Documents/wechat"
S3_BUCKET = "www.staroceans.org"
S3_DIR = "video/wechat/"
MP3_LOCAL_SUBDIR = "mp3"
MP3_S3_SUBDIR = "mp3/"
LAST_RUN_TIME_FILE = "/home/nick/diabloforum/tools/data/last_run_time.txt"
DRY_RUN = False
LINK_FILE = "/home/nick/diabloforum/tools/data/s3_video_links.txt"
MP3_LINK_FILE = "/home/nick/diabloforum/tools/data/s3_audio_links.txt"
TOP_FILE = "/home/nick/diabloforum/tools/data/top.txt"
BOTTOM_FILE = "/home/nick/diabloforum/tools/data/bottom.txt"
VIDEO_FILE = "/home/nick/diabloforum/tools/data/video_list.html"
MP3_VIDEO_FILE = "/home/nick/diabloforum/tools/data/audio_list.html"
# -------------------------- 工具函数(不变) --------------------------
def safe_exec(cmd, shell=True, check=True):
print(f"[{'DRY RUN' if DRY_RUN else '实际执行'}] {cmd}")
if not DRY_RUN:
return subprocess.run(cmd, shell=shell, check=check, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
return None
def get_file_md5(file_path):
result = safe_exec(f"md5sum {file_path}", check=True)
return result.stdout.split()[0] if result else "模拟MD5"
def s3_file_exists(s3_path):
result = safe_exec(f"s3cmd info {s3_path}", check=False)
return False if DRY_RUN else result.returncode == 0
def get_s3_md5(s3_path):
result = safe_exec(f"s3cmd info {s3_path}", check=True)
if result:
for line in result.stdout.splitlines():
if "MD5 sum" in line:
return line.split(": ")[1].strip()
return "模拟S3 MD5"
def get_file_timestamp(file_path):
if DRY_RUN:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
return datetime.fromtimestamp(os.path.getmtime(file_path)).strftime("%Y-%m-%d %H:%M:%S")
def natural_sort_key(filename):
name_without_ext = os.path.splitext(filename)[0]
match = re.match(r'(.*?)(\d+)$', name_without_ext)
return (match.group(1), int(match.group(2))) if match else (name_without_ext, 0)
def get_reference_time(time_arg, parser):
if time_arg:
try:
return datetime.strptime(time_arg, "%Y-%m-%d %H:%M:%S")
except ValueError:
parser.error(f"无效的时间格式: '{time_arg}',正确格式应为: YYYY-MM-DD HH:MM:SS")
if os.path.exists(LAST_RUN_TIME_FILE):
try:
with open(LAST_RUN_TIME_FILE, 'r') as f:
return datetime.strptime(f.read().strip(), "%Y-%m-%d %H:%M:%S")
except (ValueError, IOError) as e:
print(f"⚠️ 读取上次运行时间失败: {e},将使用纪元时间作为参考")
return datetime.fromtimestamp(0)
def save_current_run_time():
if DRY_RUN:
print(f"[DRY RUN] 模拟保存当前时间到 {LAST_RUN_TIME_FILE}")
return
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
with open(LAST_RUN_TIME_FILE, 'w') as f:
f.write(current_time)
print(f"✅ 已保存当前运行时间: {current_time}")
except IOError as e:
print(f"⚠️ 保存运行时间失败: {e}")
# -------------------------- 处理函数(添加skip_uploading参数) --------------------------
def process_mp4(local_mp4, skip_uploading):
"""处理MP4文件:skip_uploading=True时仅生成列表,跳过上传"""
filename = os.path.basename(local_mp4)
s3_mp4_path = f"s3://{S3_BUCKET}/{S3_DIR}{filename}"
s3_mp4_url = f"http://{S3_BUCKET}/{S3_DIR}{filename}"
print(f"\n=== 处理MP4文件:{filename} ===")
# 仅当不跳过上传时,执行检查和上传逻辑
if not skip_uploading:
print(f"[步骤1] 检查S3是否存在: {s3_mp4_path}")
s3_exists = s3_file_exists(s3_mp4_path)
s3_md5_check = False
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp4)
s3_md5 = get_s3_md5(s3_mp4_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
if not s3_exists:
print("[步骤2] 开始上传到S3")
safe_exec(f"s3cmd put --mime-type=\"video/mp4\" --acl-public --continue --check-md5 '{local_mp4}' '{s3_mp4_path}'")
else:
print("[跳过上传] 仅生成列表链接")
# 无论是否跳过上传,均生成列表链接(核心需求)
timestamp = get_file_timestamp(local_mp4)
link_line = f"<li><a href=\"{s3_mp4_url}\">{filename} ({timestamp})</a></li>"
safe_exec(f"echo '{link_line}' >> {LINK_FILE}")
print("[步骤3] 已添加链接到列表")
def process_mp3(local_mp4, skip_uploading):
"""处理MP3文件:skip_uploading=True时仅生成列表,跳过转码和上传"""
filename = os.path.basename(local_mp4)
filename_noext = os.path.splitext(filename)[0]
mp3_filename = f"{filename_noext}.mp3"
local_mp3_dir = os.path.join(LOCAL_DIR, MP3_LOCAL_SUBDIR)
local_mp3 = os.path.join(local_mp3_dir, mp3_filename)
s3_mp3_path = f"s3://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}{mp3_filename}"
s3_mp3_url = f"http://{S3_BUCKET}/{S3_DIR}{MP3_S3_SUBDIR}{mp3_filename}"
print(f"\n=== 处理MP3文件:{mp3_filename} ===")
# 仅当不跳过上传时,执行转码和上传逻辑
if not skip_uploading:
safe_exec(f"mkdir -p {local_mp3_dir}")
print(f"[步骤1] 检查本地MP3: {local_mp3}")
if not os.path.exists(local_mp3) or DRY_RUN:
if not DRY_RUN and not os.path.exists(local_mp3):
print("[生成] 本地不存在,使用ffmpeg提取音频")
safe_exec(f"ffmpeg -y -i '{local_mp4}' -vn -acodec libmp3lame '{local_mp3}'")
else:
print("[DRY RUN] 模拟生成MP3文件")
print(f"[步骤2] 检查S3是否存在: {s3_mp3_path}")
s3_exists = s3_file_exists(s3_mp3_path)
s3_md5_check = False
if s3_exists and s3_md5_check:
local_md5 = get_file_md5(local_mp3)
s3_md5 = get_s3_md5(s3_mp3_path)
print(f"[对比] 本地MD5: {local_md5} | S3 MD5: {s3_md5}")
if local_md5 != s3_md5:
print("[结论] MD5不一致,需要重新上传")
s3_exists = False
else:
print("[结论] MD5一致,跳过上传")
if not s3_exists:
print("[步骤3] 开始上传到S3")
safe_exec(f"s3cmd put --mime-type=\"audio/mpeg\" --acl-public --continue --check-md5 '{local_mp3}' '{s3_mp3_path}'")
else:
print("[跳过上传] 仅生成列表链接")
# 无论是否跳过上传,均生成列表链接(核心需求)
timestamp = get_file_timestamp(local_mp4)
link_line = f"<li><a href=\"{s3_mp3_url}\">{mp3_filename} ({timestamp})</a></li>"
safe_exec(f"echo '{link_line}' >> {MP3_LINK_FILE}")
print("[步骤4] 已添加链接到列表")
# -------------------------- 主函数(控制skip_uploading参数) --------------------------
def main():
parser = argparse.ArgumentParser(
description='上传MP4文件到S3并生成MP3',
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
'--since',
help='只处理在此时间之后创建的文件,格式必须为:\n YYYY-MM-DD HH:MM:SS\n例如:\n --since "2023-10-01 08:30:00"'
)
args = parser.parse_args()
reference_time = get_reference_time(args.since, parser)
print(f"⏱️ 参考时间: {reference_time.strftime('%Y-%m-%d %H:%M:%S')}")
print("========================================")
if DRY_RUN:
print("🔴 当前为DRY RUN测试模式")
else:
print("🟢 当前为正式模式")
# 清空并初始化链接文件(确保所有文件重新生成列表)
safe_exec(f"> {LINK_FILE}")
safe_exec(f"echo '<ol>' > {LINK_FILE}")
safe_exec(f"> {MP3_LINK_FILE}")
safe_exec(f"echo '<ol>' > {MP3_LINK_FILE}")
# 1. 收集所有MP4文件(不筛选时间,全部加入列表)
mp4_files = []
for file in os.listdir(LOCAL_DIR):
if file.lower().endswith(".mp4"):
mp4_files.append(os.path.join(LOCAL_DIR, file)) # 存储绝对路径
mp4_files.sort(key=lambda x: natural_sort_key(os.path.basename(x)))
# 2. 处理所有文件:根据时间决定是否跳过上传
for mp4_file in mp4_files:
filename = os.path.basename(mp4_file)
create_time = datetime.fromtimestamp(os.path.getctime(mp4_file)) # 用绝对路径取创建时间
# 核心逻辑:创建时间早于参考时间则跳过上传(但仍生成列表)
skip_uploading = create_time <= reference_time
print(f"\n===== 处理文件:{filename}({'跳过上传' if skip_uploading else '正常处理'}) =====")
process_mp4(mp4_file, skip_uploading) # 传入跳过参数
process_mp3(mp4_file, skip_uploading) # 传入跳过参数
# 生成最终HTML(所有文件均在列表中)
print("\n[生成HTML列表]")
safe_exec(f"echo '</ol>' >> {LINK_FILE}")
safe_exec(f"cat {TOP_FILE} > {VIDEO_FILE}")
safe_exec(f"cat {LINK_FILE} >> {VIDEO_FILE}")
safe_exec(f"cat {BOTTOM_FILE} >> {VIDEO_FILE}")
safe_exec(f"s3cmd put --mime-type=\"text/html\" --acl-public '{VIDEO_FILE}' 's3://{S3_BUCKET}/tools/data/video_list.html'")
safe_exec(f"echo '</ol>' >> {MP3_LINK_FILE}")
safe_exec(f"cat {TOP_FILE} > {MP3_VIDEO_FILE}")
safe_exec(f"cat {MP3_LINK_FILE} >> {MP3_VIDEO_FILE}")
safe_exec(f"cat {BOTTOM_FILE} >> {MP3_VIDEO_FILE}")
safe_exec(f"s3cmd put --mime-type=\"text/html\" --acl-public '{MP3_VIDEO_FILE}' 's3://{S3_BUCKET}/tools/data/audio_list.html'")
save_current_run_time()
print(f"\n🎉 所有文件处理完成,均已加入列表")
if __name__ == "__main__":
main()
process_mp4和process_mp3均添加skip_uploading布尔参数skip_uploading=True时,跳过 S3 上传、MP3 转码等操作,但保留列表链接生成create_time,若<= reference_time则skip_uploading=Trueskip_uploading参数,实现 “全部入列表,仅新文件执行上传”