保存时间:2026/4/8 08:25:40
我的火山语音旧版应用 Access Token 已泄露公开,请求:1)立即手动禁用 / 回收该 Token;2)重置该 APP ID 下所有鉴权信息;3)确认该 APP ID 下所有服务已永久停用,防止被滥用扣费。
根账号(只用来登录控制台、付费、管理IAM)
↓
IAM 用户(1个或多个,专门给程序用)
↓ 绑定权限策略(如:语音全权限、模型只读)
↓ 限制在某个 Project(如:AI_Service_Project)
↓
Project 里放:语音应用、大模型应用、各种服务
ai_service_user总费用 = 总识别秒数 ÷ 3600 × 单价(元/小时)
Access Key ID / Secret Access Key(IAM)AppKey(语音应用里)pip install volcengine websockets pyaudio
import asyncio
import websockets
import pyaudio
import json
from volcengine.volc_auth import VolcAuth
# 你填自己的
AK = "你的AK"
SK = "你的SK"
APP_KEY = "你的AppKey"
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 1024
auth = VolcAuth(AK, SK)
token = auth.get_access_token()
async def asr_stream():
uri = f"wss://openspeech.bytedance.com/api/v3/asr/stream?appkey={APP_KEY}&token={token}"
async with websockets.connect(uri) as websocket:
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
# 发送开始帧
await websocket.send(json.dumps({
"type": "start",
"data": {
"format": "pcm",
"sample_rate": 16000,
"language": "zh-CN",
"show_utterances": True
}
}))
print("🎤 说话吧(Ctrl+C 退出)...")
try:
while True:
data = stream.read(CHUNK)
await websocket.send(data)
# 接收结果
try:
res = await asyncio.wait_for(websocket.recv(), timeout=0.1)
res_json = json.loads(res)
if res_json.get("type") == "result":
text = res_json["data"]["text"]
print(f"识别:{text}", end="\r")
# 一句话结束
if res_json["data"].get("end"):
print(f"\n✅ 最终:{text}\n")
except asyncio.TimeoutError:
continue
except KeyboardInterrupt:
print("\n结束")
stream.stop_stream()
stream.close()
p.terminate()
asyncio.run(asr_stream())
wss://openspeech.bytedance.com/api/v3/asr/streamAppKey + Token(VolcAuth 用 AK/SK 算出来)w://openspeech.bytedance.com/api/v4/asr/stream20250406abcdef123456)import asyncio
import websockets
import pyaudio
import json
import uuid
# 你只需要填这两个!
APP_ID = "你的AppID(X-Api-App-Key)"
ACCESS_TOKEN = "你的AccessToken(X-Api-Access-Key)"
# 固定配置
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 1024
async def asr_stream():
uri = "wss://openspeech.bytedance.com/api/v4/asr/stream"
headers = {
"X-Api-App-Key": APP_ID,
"X-Api-,
"X-Api-Resource-Id": "volc.seedasr.sauc.duration", # 小时版
"X-Api-Connect-Id": str(uuid.uuid4())
}
async with websockets.connect(uri, extra_headers=headers) as websocket:
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
# 发送开始参数
await websocket.send(json.dumps({
"type": "start",
"data": {
"format": "pcm",
"sample_rate": 16000,
"language": "zh-CN",
"show_utterances": True,
"model": "seed-asr-2.0"
}
}))
print("🎤 说话(Ctrl+C 退出)...")
try:
while True:
data = stream.read(CHUNK)
await websocket.send(data)
try:
res = await asyncio.wait_for(websocket.recv(), timeout=0.1)
res_json = json.loads(res)
if res_json.get("type") == "result":
text = res_json["data"]["text"]
print(f"识别:{text}", end="\r")
if res_json["data"].get("end"):
print(f"\n✅ 最终:{text}\n")
except asyncio.TimeoutError:
continue
except KeyboardInterrupt:
print("\n结束")
stream.stop_stream()
stream.close()
p.terminate()
asyncio.run(asr_stream())
X-Api-Resource-Idvolc.seedasr.sauc.durationvolc.seedasr.sauc.concurrent| 维度 | V2/V3(旧版) | V4(新版) |
|---|---|---|
| 控制台入口 | speech.volcengine.com(子控制台) | console.volcengine.com(主控制台) |
| 核心模型 | 传统流式 ASR | 豆包-Seed-ASR 大模型 |
| 密钥 | AppKey + Token(需 AK/SK 计算) | AppID(X-Api-App-Key) + AccessToken(直接复制) |
| 计费 | 按时长,秒级累计 | 同 V2/V3,按时长,秒级累计 |
| 体验 | 识别一般,延迟较高 | 更准、更快,首包 P95 < 350ms |
import json
import time
import uuid
import requests
import base64
import argparse
from pathlib import Path
from typing import Optional
# ===================== 配置项(可根据需要调整) =====================
API_APP_ID = "this is my api key" # 替换为你的实际API Key
API_RECOGNIZE_URL = "https://openspeech.bytedance.com/api/v3/auc/bigmodel/recognize/flash"
API_TIMEOUT = 1200 # 全局超时时间(秒),适配长音频处理
# ====================================================================
def download_file(file_url: str) -> bytes:
"""下载远程音频文件(二进制格式)"""
try:
response = requests.get(
file_url,
timeout=API_TIMEOUT,
headers={"User-Agent": "ASR-Client/1.0"}
)
response.raise_for_status() # 非200状态码抛出异常
return response.content
except requests.exceptions.RequestException as e:
raise Exception(f"文件下载失败:{str(e)}") from e
def file_to_base64(file_path: str) -> str:
"""将本地文件转换为Base64编码字符串"""
try:
file_path_obj = Path(file_path).resolve()
if not file_path_obj.exists():
raise FileNotFoundError(f"文件不存在:{file_path}")
if file_path_obj.stat().st_size == 0:
raise ValueError(f"文件为空:{file_path}")
with open(file_path_obj, 'rb') as f:
base64_data = base64.b64encode(f.read()).decode('utf-8')
return base64_data
except Exception as e:
raise Exception(f"文件转Base64失败:{str(e)}") from e
def format_asr_result(text: str) -> str:
"""按句号换行格式化ASR结果,提升可读性"""
if not text:
return ""
# 按句号分割后换行,保留句号
sentences = [s.strip() + "。" for s in text.split("。") if s.strip()]
return "\n".join(sentences)
def recognize_task(file_url: Optional[str] = None, file_path: Optional[str] = None) -> dict:
"""执行ASR识别任务,返回原始响应数据"""
# 构建请求头
headers = {
"X-,
"X-Api-Resource-Id": "volc.bigasr.auc_turbo",
"X-Api-Request-Id": str(uuid.uuid4()),
"X-Api-Sequence": "-1",
"Content-Type": "application/json; charset=utf-8"
}
# 准备音频数据(URL/本地文件二选一)
audio_data = {}
if file_url:
audio_data = {"url": file_url}
elif file_path:
audio_data = {"data": file_to_base64(file_path)}
else:
raise ValueError("必须提供 file_url 或 file_path 其中一个参数")
# 构建请求体
request_body = {
"user": {"uid": API_APP_ID},
"audio": audio_data,
"request": {
"model_name": "bigmodel",
"show_utterances": False,
# 可根据需要开启以下参数
# "enable_itn": True,
# "enable_punc": True,
# "enable_ddc": True,
# "enable_speaker_info": False,
}
}
# 发送请求(带全局超时)
try:
response = requests.post(
API_RECOGNIZE_URL,
json=request_body,
headers=headers,
timeout=API_TIMEOUT
)
response.raise_for_status() # HTTP状态码非200抛出异常
except requests.exceptions.RequestException as e:
raise Exception(f"ASR请求失败:{str(e)}") from e
# 解析响应头
status_code = response.headers.get("X-Api-Status-Code", "")
log_id = response.headers.get("X-Tt-Logid", "")
api_message = response.headers.get("X-Api-Message", "")
# 打印响应日志
print(f"[{time.asctime()}] 响应状态码:{status_code}")
print(f"[{time.asctime()}] 响应信息:{api_message}")
print(f"[{time.asctime()}] 请求LogID:{log_id}")
print(f"[{time.asctime()}] 原始响应内容:{json.dumps(response.json(), ensure_ascii=False, indent=2)}")
# 校验核心状态
if status_code not in ["20000000", "20000001", "20000002"]:
raise Exception(f"ASR任务失败:状态码={status_code},LogID={log_id},信息={api_message}")
return {
"status_code": status_code,
"log_id": log_id,
"response_data": response.json()
}
def recognize_mode(file_url: Optional[str] = None, file_path: Optional[str] = None, output_path: Optional[str] = None) -> None:
"""
主识别流程
:param file_url: 远程音频文件URL
:param file_path: 本地音频文件路径
:param output_path: 识别结果输出文件路径
"""
start_time = time.time()
print(f"[{time.asctime()}] 开始ASR识别任务...")
try:
# 执行识别任务
result = recognize_task(file_url=file_url, file_path=file_path)
# 处理成功结果
if result["status_code"] == "20000000":
# 提取识别文本(根据实际响应结构调整,需确认JSON字段)
asr_text = ""
response_data = result["response_data"]
# 适配常见的ASR响应结构,需根据实际返回调整字段
if "result" in response_data and "text" in response_data["result"]:
asr_text = response_data["result"]["text"]
elif "text" in response_data:
asr_text = response_data["text"]
# 格式化文本(按句号换行)
formatted_text = format_asr_result(asr_text)
# 保存结果到文件
if output_path:
output_path_obj = Path(output_path).resolve()
output_path_obj.parent.mkdir(parents=True, exist_ok=True) # 确保输出目录存在
with open(output_path_obj, "w", encoding="utf-8") as f:
f.write(formatted_text)
print(f"[{time.asctime()}] 识别结果已保存至:{output_path_obj}")
# 打印结果和耗时
print(f"\n[{time.asctime()}] 识别成功!")
print(f"[{time.asctime()}] 格式化结果:\n{formatted_text}")
print(f"[{time.asctime()}] 总耗时:{time.time() - start_time:.2f} 秒")
# 处理任务未完成状态(根据API文档补充逻辑)
elif result["status_code"] in ["20000001", "20000002"]:
print(f"[{time.asctime()}] 任务未完成:状态码={result['status_code']},LogID={result['log_id']}")
except Exception as e:
print(f"[{time.asctime()}] 识别失败:{str(e)}")
raise # 抛出异常让上层处理(适配batch_asr.py的失败终止逻辑)
def main():
"""命令行入口(适配batch_asr.py的调用规范)"""
parser = argparse.ArgumentParser(description="ASR WebSocket Client (Support MP3 directly)")
parser.add_argument("--file", type=str, required=True, help="Audio file path (MP3/WAV supported)")
parser.add_argument("--output", type=str, default=None, help="Output text file path (auto-generated if not set)")
args = parser.parse_args()
# 自动生成输出路径(如果未指定)
output_path = args.output
if not output_path:
file_path = Path(args.file)
output_path = file_path.parent / f"{file_path.stem}_asr.txt"
# 执行识别
recognize_mode(file_path=args.file, output_path=output_path)
if __name__ == '__main__':
main()
--file(必填)和--output(可选)参数--output时自动生成输出路径(原文件同目录 +_asr.txt 后缀)API_TIMEOUT = 1200秒(20 分钟),覆盖:format_asr_result函数,按句号分割文本并换行原文本:今天天气很好。我要去公园散步。
格式化后:
今天天气很好。
我要去公园散步。
API_APP_ID为你的实际 API Key# 指定输出路径
python3 doubao_save.py --file /home/nick/Documents/wechat/mp3/2084.mp3 --output /home/nick/Documents/wechat/text/2084.txt
# 自动生成输出路径
python3 doubao_save.py --file /home/nick/Documents/wechat/mp3/2084.mp3
recognize_mode中提取文本的字段(代码中已标注)main函数中新增--url参数(当前仅支持本地文件,适配批量脚本)API_TIMEOUT常量batch_asr.py --dry-run验证命令是否正确