保存时间:2026/4/2 11:36:50
APP_IDTOKENCLUSTER(一般是:default)import websockets
import asyncio
import json
import pyaudio
import base64
# ========== 你自己改成火山控制台的信息 ==========
APP_ID = "你的APP_ID"
TOKEN = "你的TOKEN"
CLUSTER = "default"
# 音频参数(火山要求固定)
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 1024
def build_header():
return {
"appid": APP_ID,
"token": TOKEN,
"cluster": CLUSTER,
"rate": RATE,
"format": "pcm",
"show_utterances": True,
}
async def asr_stream():
uri = "wss://openspeech.bytedance.com/api/v1/asr/stream"
headers = build_header()
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK
)
print("开始说话,实时转文字...(按 Ctrl+C 停止)")
async with websockets.connect(uri) as websocket:
# 先发送配置
await websocket.send(json.dumps(headers))
while True:
# 读麦克风音频
data = stream.read(CHUNK)
b64 = base64.b64encode(data).decode()
await websocket.send(json.dumps({"audio": b64}))
# 接收识别结果
resp = await websocket.recv()
j = json.loads(resp)
if "result" in j and j["result"]:
print("转文字:", j["result"])
if __name__ == "__main__":
asyncio.run(asr_stream())
pip install websockets pyaudio
转文字:你刚才说的内容SecretKey 相关的鉴权头,而且协议也从JSON文本改成了二进制协议,我这就给你补全完整、可直接运行的代码,完全匹配你文档里的v3接口规范。X-Api-App-Key/X-Api-Access-Key/X-Api-Resource-Id/X-Api-Connect-Id 这4个Header(SecretKey相关的鉴权包含在AccessKey里)wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_asyncimport websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
from typing import Optional
# ========== 替换成你的火山引擎控制台信息 ==========
APP_KEY = "你的X-Api-App-Key" # 对应文档里的X-Api-App-Key
ACCESS_KEY = "你的" # 小时版,并发版用volc.bigasr.sauc.concurrent
# ========== 音频参数(固定,文档要求) ==========
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = int(RATE * 0.2) # 200ms一包(文档推荐最优大小)
AUDIO_FORMAT = "pcm"
AUDIO_CODEC = "raw"
class ASRBinaryProtocol:
"""封装火山引擎v3 ASR的二进制协议"""
@staticmethod
def build_header(
msg_type: int,
serialization: int = 1, # 1=JSON, 0=无序列化
compression: int = 1, # 1=Gzip, 0=无压缩
flags: int = 0
) -> bytes:
"""
构建4字节Header(文档里的header格式)
:param msg_type: 消息类型 1=full client request, 2=audio only request, 9=server response
:param serialization: 序列化方式 1=JSON
:param compression: 压缩方式 1=Gzip
:param flags: 消息类型补充标志
"""
version = 1 # 固定版本1
header_size = 1 # 固定1(header size = 1*4=4字节)
# 字节0:版本(4bit) + header size(4bit)
byte0 = (version << 4) | header_size
# 字节1:消息类型(4bit) + flags(4bit)
byte1 = (msg_type << 4) | flags
# 字节2:序列化方式(4bit) + 压缩方式(4bit)
byte2 = (serialization << 4) | compression
# 字节3:保留位,固定0
byte3 = 0
return struct.pack('BBBB', byte0, byte1, byte2, byte3)
@staticmethod
def pack_message(
header: bytes,
payload: bytes,
compression: int = 1
) -> bytes:
"""
打包完整消息:header + payload_size + payload
:param header: 4字节header
:param payload: 原始payload
:param compression: 压缩方式 1=Gzip
:return: 完整的二进制消息
"""
# 压缩payload
if compression == 1:
payload = gzip.compress(payload)
# payload size(4字节大端无符号int)
payload_size = struct.pack('>I', len(payload))
# 拼接完整消息
return header + payload_size + payload
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, Optional[bytes]]:
"""
解析服务端返回的二进制消息
:param data: 服务端返回的二进制数据
:return: (解析后的header信息, 原始payload)
"""
if len(data) < 8:
raise ValueError("数据长度不足,无法解析")
# 解析header
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
# 解析header各字段
header_info = {
"version": (byte0 >> 4) & 0x0F,
"header_size": byte0 & 0x0F,
"msg_type": (byte1 >> 4) & 0x0F,
"flags": byte1 & 0x0F,
"serialization": (byte2 >> 4) & 0x0F,
"compression": byte2 & 0x0F,
"reserved": byte3
}
# 解析payload size
payload_size = struct.unpack('>I', data[4:8])[0]
# 解析sequence(仅server response有)
if header_info["msg_type"] == 9:
if len(data) < 12:
raise ValueError("Server response 缺少sequence字段")
header_info["sequence"] = struct.unpack('>I', data[8:12])[0]
payload_start = 12
else:
payload_start = 8
# 解析payload
payload = data[payload_start:payload_start + payload_size]
# 解压payload
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
return header_info, payload
async def asr_client():
"""实时麦克风流式ASR主逻辑"""
# 1. 初始化音频采集
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK
)
# 2. 构建WebSocket握手Header
connect_id = str(uuid.uuid4()) # 生成唯一的X-Api-Connect-Id
headers = {
"X-Api-App-Key": APP_KEY,
"X-Api-,
"X-Api-Resource-Id": RESOURCE_ID,
"X-Api-Connect-Id": connect_id
}
# 3. 连接WebSocket(优化版双向流式接口)
uri = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
print(f"连接ASR服务: {uri}")
print(f"Connect ID: {connect_id}")
print("开始说话,实时转文字...(按Ctrl+C停止)")
try:
async with websockets.connect(uri, extra_headers=headers) as websocket:
# 4. 发送Full Client Request(第一个请求,配置参数)
full_request = {
"user": {
"uid": connect_id # 用connect_id作为用户标识
},
"audio": {
"format": AUDIO_FORMAT,
"codec": AUDIO_CODEC,
"rate": RATE,
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN" # 中文普通话
},
"request": {
"model_name": "bigmodel",
"enable_itn": True, # 开启文本规范化
"enable_punc": True, # 开启标点
"show_utterances": True, # 输出分句信息
"end_window_size": 800, # 800ms静音判停
"result_type": "full" # 全量返回
}
}
# 打包并发送full request
full_request_payload = json.dumps(full_request).encode('utf-8')
full_request_header = ASRBinaryProtocol.build_header(msg_type=1) # 1=full client request
full_request_msg = ASRBinaryProtocol.pack_message(
header=full_request_header,
payload=full_request_payload
)
await websocket.send(full_request_msg)
# 5. 循环采集音频并发送Audio Only Request
sequence = 1 # 音频包序号
async def send_audio():
nonlocal sequence
while True:
# 采集音频
audio_data = stream.read(CHUNK, exception_on_overflow=False)
if not audio_data:
break
# 打包音频包(msg_type=2=audio only request)
audio_header = ASRBinaryProtocol.build_header(
msg_type=2,
serialization=0, # 音频包无序列化
flags=1 # 正序号标志
)
# 音频包需要加4字节sequence(大端)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
# 发送音频包
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05) # 控制发包间隔
# 6. 接收并解析服务端响应
async def receive_response():
while True:
try:
data = await websocket.recv()
if not data:
continue
# 解析二进制响应
header_info, payload = ASRBinaryProtocol.unpack_message(data)
# 只处理识别结果(msg_type=9=full server response)
if header_info["msg_type"] == 9 and payload:
try:
response = json.loads(payload.decode('utf-8'))
if "result" in response and response["result"]:
# 提取识别文本
text = response["result"].get("text", "")
if text:
print(f"\r识别结果: {text}", end="", flush=True)
# 提取分句信息(如果需要)
# utterances = response["result"].get("utterances", [])
# for utt in utterances:
# if utt.get("definite"): # 确定分句
# print(f"\n最终分句: {utt['text']}")
except json.JSONDecodeError:
continue
except Exception as e:
print(f"\n响应解析错误: {e}")
continue
# 7. 并发运行发送和接收任务
send_task = asyncio.create_task(send_audio())
receive_task = asyncio.create_task(receive_response())
# 等待任务完成(按Ctrl+C终止)
await asyncio.gather(send_task, receive_task)
except Exception as e:
print(f"\n连接/运行错误: {e}")
finally:
stream.stop_stream()
stream.close()
p.terminate()
if __name__ == "__main__":
try:
asyncio.run(asr_client())
except KeyboardInterrupt:
print("\n\n程序已停止")
pip install websockets pyaudio asyncio
APP_KEY、ACCESS_KEY 替换成你从火山引擎控制台获取的:APP_KEY = 控制台的 APP ID(对应文档里的 X-Api-App-Key)ACCESS_KEY = 控制台的 ">
- 小时版:
volc.bigasr.sauc.duration
- 并发版:
volc.bigasr.sauc.concurrent
python asr_v3_demo.py
ASRBinaryProtocol 类:完全按照文档封装了二进制协议的Header构建、消息打包/解包逻辑,匹配文档里的4字节Header格式和大端序要求。extra_headers 传入 X-Api-App-Key 等4个必填鉴权字段,符合文档的鉴权要求。X-Api-App-Key/X-Api-Access-Key 等鉴权Header,而非旧版的token,且使用二进制协议传输数据。sudo apt update
sudo apt install -y portaudio19-dev python3-pip
pip3 install pyaudio websockets
extra_headers 参数报错 → 因为你的 websockets 版本太低(旧版本不支持这个参数)# 先升级websockets到最新版
pip3 install --upgrade websockets
# 验证版本(需要>=10.0)
python3 -c "import websockets; print(websockets.__version__)"
extra_headers 为旧版本兼容的写法import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import os
import warnings
# 屏蔽ALSA音频警告
warnings.filterwarnings('ignore')
os.environ['PYAUDIO_LOOPBACK'] = '1'
# ========== 替换成你的火山引擎信息 ==========
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# ========== 音频参数 ==========
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = int(RATE * 0.2) # 200ms一包
AUDIO_FORMAT = "pcm"
AUDIO_CODEC = "raw"
class ASRBinaryProtocol:
"""封装火山引擎v3 ASR二进制协议"""
@staticmethod
def build_header(msg_type: int, serialization: int = 1, compression: int = 1, flags: int = 0) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | flags
byte2 = (serialization << 4) | compression
byte3 = 0
return struct.pack('BBBB', byte0, byte1, byte2, byte3)
@staticmethod
def pack_message(header: bytes, payload: bytes, compression: int = 1) -> bytes:
if compression == 1:
payload = gzip.compress(payload)
payload_size = struct.pack('>I', len(payload))
return header + payload_size + payload
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
if len(data) < 8:
raise ValueError("数据长度不足")
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"version": (byte0 >> 4) & 0x0F,
"header_size": byte0 & 0x0F,
"msg_type": (byte1 >> 4) & 0x0F,
"flags": byte1 & 0x0F,
"serialization": (byte2 >> 4) & 0x0F,
"compression": byte2 & 0x0F,
"reserved": byte3
}
payload_size = struct.unpack('>I', data[4:8])[0]
if header_info["msg_type"] == 9:
if len(data) < 12:
raise ValueError("缺少sequence字段")
header_info["sequence"] = struct.unpack('>I', data[8:12])[0]
payload_start = 12
else:
payload_start = 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
return header_info, payload
async def asr_client():
"""适配Ubuntu的实时ASR客户端"""
# 初始化音频
p = pyaudio.PyAudio()
# 显式指定音频设备(解决Ubuntu设备兼容问题)
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
input_device_index=None # 自动选择默认麦克风
)
# 构建鉴权Header(兼容旧版websockets的写法)
connect_id = str(uuid.uuid4())
header_list = [
(b"X-Api-App-Key", APP_KEY.encode('utf-8')),
(b"X-Api-Access-Key", ACCESS_KEY.encode('utf-8')),
(b"X-Api-Resource-Id", RESOURCE_ID.encode('utf-8')),
(b"X-Api-Connect-Id", connect_id.encode('utf-8'))
]
uri = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
print(f"连接ASR服务: {uri}")
print(f"Connect ID: {connect_id}")
print("开始说话,实时转文字...(按Ctrl+C停止)")
try:
# 关键修复:用create_connection的方式传header,兼容旧版本
async def connect_with_headers():
ws = await websockets.client.connect(
uri,
extra_headers=header_list if hasattr(websockets.client, 'connect') and 'extra_headers' in inspect.signature(websockets.client.connect).parameters else None
)
# 兼容极低版本websockets的兜底方案
if not hasattr(websockets.client, 'connect') or 'extra_headers' not in inspect.signature(websockets.client.connect).parameters:
import ssl
ctx = ssl.create_default_context()
transport, protocol = await asyncio.get_event_loop().create_connection(
lambda: websockets.client.WebSocketClientProtocol(uri),
uri.replace('wss://', '').split('/')[0],
443,
ssl=ctx,
headers=header_list
)
ws = protocol
return ws
websocket = await connect_with_headers()
# 发送配置包
full_request = {
"user": {"uid": connect_id},
"audio": {
"format": AUDIO_FORMAT,
"codec": AUDIO_CODEC,
"rate": RATE,
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"show_utterances": True,
"end_window_size": 800
}
}
full_request_payload = json.dumps(full_request).encode('utf-8')
full_request_header = ASRBinaryProtocol.build_header(msg_type=1)
full_request_msg = ASRBinaryProtocol.pack_message(full_request_header, full_request_payload)
await websocket.send(full_request_msg)
# 发送音频包
sequence = 1
async def send_audio():
nonlocal sequence
while True:
audio_data = stream.read(CHUNK, exception_on_overflow=False)
if not audio_data:
break
audio_header = ASRBinaryProtocol.build_header(msg_type=2, serialization=0, flags=1)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
# 接收响应
async def receive_response():
while True:
try:
data = await websocket.recv()
if not data:
continue
header_info, payload = ASRBinaryProtocol.unpack_message(data)
if header_info["msg_type"] == 9 and payload:
try:
response = json.loads(payload.decode('utf-8'))
if "result" in response and response["result"]:
text = response["result"].get("text", "")
if text:
print(f"\r识别结果: {text}", end="", flush=True)
except:
continue
except:
continue
# 运行任务
send_task = asyncio.create_task(send_audio())
receive_task = asyncio.create_task(receive_response())
await asyncio.gather(send_task, receive_task)
except Exception as e:
import inspect
print(f"\n错误详情: {e}")
finally:
stream.stop_stream()
stream.close()
p.terminate()
if __name__ == "__main__":
# 兼容Python 3.7+的asyncio运行方式
try:
asyncio.run(asr_client())
except KeyboardInterrupt:
print("\n\n程序已停止")
except RuntimeError as e:
loop = asyncio.get_event_loop()
loop.run_until_complete(asr_client())
loop.close()
# 列出所有音频输入设备
python3 -c "import pyaudio; p = pyaudio.PyAudio(); [print(f'设备{i}: {p.get_device_info_by_index(i)[\"name\"]}') for i in range(p.get_device_count()) if p.get_device_info_by_index(i)['maxInputChannels']>0]"
# 根据输出,修改代码里的input_device_index(比如设备ID是1)
# stream = p.open(..., input_device_index=1, ...)
websockets 并替换 extra_headers 为兼容写法,解决参数报错问题。websockets.client 报错的最终版代码:| 设备ID | 设备名称 | 适用场景 |
|---|---|---|
| 4 | HDA Intel PCH: ALC256 Analog (hw:1,0) | 电脑内置麦克风(优先选这个) |
| 10 | pulse | 系统音频服务(备选) |
| 11 | default | 系统默认(兜底) |
input_device_index = 4(优先用硬件直连的内置麦克风)websockets.client 这个错误引用(16.0版本不需要加.client)import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import os
import sys
# ========== 1. 核心配置(必改) ==========
# 火山引擎鉴权信息
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# 音频设备ID(从你的输出确定=4)
INPUT_DEVICE_INDEX = 4
# ========== 2. 屏蔽无用的ALSA警告 ==========
os.environ['ALSA_CONFIG_PATH'] = '/dev/null'
os.environ['PYTHONWARNINGS'] = 'ignore'
sys.stderr = open(os.devnull, 'w') # 屏蔽ALSA报错输出
sys.stdout = open(sys.__stdout__, 'w') # 恢复标准输出
# ========== 3. 音频参数(固定) ==========
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = int(RATE * 0.2) # 200ms一包(文档推荐)
AUDIO_FORMAT = "pcm"
AUDIO_CODEC = "raw"
class ASRBinaryProtocol:
"""封装火山引擎v3 ASR二进制协议"""
@staticmethod
def build_header(msg_type: int, serialization: int = 1, compression: int = 1, flags: int = 0) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | flags
byte2 = (serialization << 4) | compression
byte3 = 0
return struct.pack('BBBB', byte0, byte1, byte2, byte3)
@staticmethod
def pack_message(header: bytes, payload: bytes, compression: int = 1) -> bytes:
if compression == 1:
payload = gzip.compress(payload)
payload_size = struct.pack('>I', len(payload))
return header + payload_size + payload
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
if len(data) < 8:
raise ValueError("数据长度不足")
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"version": (byte0 >> 4) & 0x0F,
"header_size": byte0 & 0x0F,
"msg_type": (byte1 >> 4) & 0x0F,
"flags": byte1 & 0x0F,
"serialization": (byte2 >> 4) & 0x0F,
"compression": byte2 & 0x0F,
"reserved": byte3
}
payload_size = struct.unpack('>I', data[4:8])[0]
if header_info["msg_type"] == 9:
if len(data) < 12:
raise ValueError("缺少sequence字段")
header_info["sequence"] = struct.unpack('>I', data[8:12])[0]
payload_start = 12
else:
payload_start = 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
return header_info, payload
async def asr_client():
"""最终版:适配Ubuntu + websockets 16.0 + 明确音频设备"""
# 1. 初始化音频(指定设备ID=4)
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
input_device_index=INPUT_DEVICE_INDEX, # 关键:指定麦克风ID
exception_on_overflow=False # 防止音频溢出报错
)
# 2. 构建鉴权Header(适配websockets 16.0)
connect_id = str(uuid.uuid4())
headers = {
"X-Api-App-Key": APP_KEY,
"X-Api-,
"X-Api-Resource-Id": RESOURCE_ID,
"X-Api-Connect-Id": connect_id
}
# 3. 连接ASR服务(核心修复:去掉.client)
uri = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
print("="*50)
print(f"连接ASR服务: {uri}")
print(f"Connect ID: {connect_id}")
print(f"使用麦克风设备ID: {INPUT_DEVICE_INDEX}")
print("="*50)
print("开始说话,实时转文字...(按Ctrl+C停止)\n")
try:
# 关键修复:直接用websockets.connect,适配16.0版本
async with websockets.connect(uri, extra_headers=headers) as websocket:
# 4. 发送配置包
full_request = {
"user": {"uid": connect_id},
"audio": {
"format": AUDIO_FORMAT,
"codec": AUDIO_CODEC,
"rate": RATE,
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"show_utterances": True,
"end_window_size": 800
}
}
full_request_payload = json.dumps(full_request).encode('utf-8')
full_request_header = ASRBinaryProtocol.build_header(msg_type=1)
full_request_msg = ASRBinaryProtocol.pack_message(full_request_header, full_request_payload)
await websocket.send(full_request_msg)
# 5. 发送音频包
sequence = 1
async def send_audio():
nonlocal sequence
while True:
try:
audio_data = stream.read(CHUNK)
if not audio_data:
break
# 打包音频包
audio_header = ASRBinaryProtocol.build_header(msg_type=2, serialization=0, flags=1)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
except Exception as e:
print(f"\n音频发送错误: {e}")
continue
# 6. 接收并解析识别结果
async def receive_response():
while True:
try:
data = await websocket.recv()
if not data:
continue
header_info, payload = ASRBinaryProtocol.unpack_message(data)
# 只处理识别结果(msg_type=9)
if header_info["msg_type"] == 9 and payload:
try:
response = json.loads(payload.decode('utf-8'))
if "result" in response and response["result"]:
text = response["result"].get("text", "")
if text:
print(f"\r实时识别: {text}", end="", flush=True)
except:
continue
except:
continue
# 7. 运行任务
send_task = asyncio.create_task(send_audio())
receive_task = asyncio.create_task(receive_response())
await asyncio.gather(send_task, receive_task)
except KeyboardInterrupt:
print("\n\n程序正常停止")
except Exception as e:
print(f"\n\n运行错误: {e}")
finally:
stream.stop_stream()
stream.close()
p.terminate()
if __name__ == "__main__":
# 兼容Python 3.7+的asyncio运行方式
try:
asyncio.run(asr_client())
except RuntimeError:
loop = asyncio.get_event_loop()
loop.run_until_complete(asr_client())
loop.close()
APP_KEY、ACCESS_KEY 改成你自己的火山引擎信息(这两个是必须改的!)python3 ./asr3.py
4(代码里已经填好,不用改)。websockets.client 引用,适配16.0版本的正确写法。OSError: Invalid sample rate → 你的麦克风设备不支持16000Hz,需要先检测设备支持的采样率,再适配import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import sys
# ========== 1. 火山引擎鉴权信息(必须改!) ==========
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# ========== 2. 音频设备配置(自动适配) ==========
INPUT_DEVICE_INDEX = 4 # 你的麦克风设备ID
FORMAT = pyaudio.paInt16
CHANNELS = 1
CHUNK_DURATION = 0.2 # 200ms一包(文档推荐)
# ========== 3. 全局变量(自动检测) ==========
SUPPORTED_RATE = 16000 # 默认16000,会自动检测适配
p = pyaudio.PyAudio()
def detect_device_sample_rates(device_id: int) -> list:
"""检测音频设备支持的采样率"""
supported_rates = []
test_rates = [8000, 16000, 22050, 44100, 48000]
for rate in test_rates:
try:
# 尝试打开设备检测采样率是否支持
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=rate,
input=True,
input_device_index=device_id,
frames_per_buffer=1024
)
stream.close()
supported_rates.append(rate)
print(f"✅ 设备{device_id}支持采样率: {rate}Hz")
except:
continue
return supported_rates
class ASRBinaryProtocol:
"""封装火山引擎v3 ASR二进制协议"""
@staticmethod
def build_header(msg_type: int, serialization: int = 1, compression: int = 1, flags: int = 0) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | flags
byte2 = (serialization << 4) | compression
byte3 = 0
return struct.pack('BBBB', byte0, byte1, byte2, byte3)
@staticmethod
def pack_message(header: bytes, payload: bytes, compression: int = 1) -> bytes:
if compression == 1:
payload = gzip.compress(payload)
payload_size = struct.pack('>I', len(payload))
return header + payload_size + payload
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
if len(data) < 8:
raise ValueError("数据长度不足")
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"version": (byte0 >> 4) & 0x0F,
"header_size": byte0 & 0x0F,
"msg_type": (byte1 >> 4) & 0x0F,
"flags": byte1 & 0x0F,
"serialization": (byte2 >> 4) & 0x0F,
"compression": byte2 & 0x0F,
"reserved": byte3
}
payload_size = struct.unpack('>I', data[4:8])[0]
if header_info["msg_type"] == 9:
if len(data) < 12:
raise ValueError("缺少sequence字段")
header_info["sequence"] = struct.unpack('>I', data[8:12])[0]
payload_start = 12
else:
payload_start = 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
return header_info, payload
async def asr_client():
"""终极版:自动适配音频参数 + 完整日志"""
global SUPPORTED_RATE
# 1. 检测音频设备
print("="*50)
print("🔍 检测音频设备支持的采样率...")
supported_rates = detect_device_sample_rates(INPUT_DEVICE_INDEX)
if not supported_rates:
print("❌ 设备{INPUT_DEVICE_INDEX}无可用采样率,尝试设备10(pulse)...")
INPUT_DEVICE_INDEX = 10
supported_rates = detect_device_sample_rates(INPUT_DEVICE_INDEX)
if not supported_rates:
print("❌ 无可用音频设备!")
return
# 优先选16000,否则选第一个支持的
SUPPORTED_RATE = 16000 if 16000 in supported_rates else supported_rates[0]
CHUNK = int(SUPPORTED_RATE * CHUNK_DURATION)
print(f"✅ 最终使用采样率: {SUPPORTED_RATE}Hz, 每包大小: {CHUNK}")
# 2. 初始化音频流
try:
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SUPPORTED_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK,
exception_on_overflow=False
)
print(f"✅ 音频设备{INPUT_DEVICE_INDEX}初始化成功")
except Exception as e:
print(f"❌ 音频设备初始化失败: {e}")
return
# 3. 构建鉴权Header
connect_id = str(uuid.uuid4())
headers = {
"X-Api-App-Key": APP_KEY,
"X-Api-,
"X-Api-Resource-Id": RESOURCE_ID,
"X-Api-Connect-Id": connect_id
}
# 4. 连接ASR服务
uri = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
print("="*50)
print(f"🔌 连接ASR服务: {uri}")
print(f"🆔 Connect ID: {connect_id}")
print("="*50)
print("🎤 开始说话,实时转文字...(按Ctrl+C停止)\n")
try:
async with websockets.connect(uri, extra_headers=headers) as websocket:
# 5. 发送配置包(适配实际采样率)
full_request = {
"user": {"uid": connect_id},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": SUPPORTED_RATE, # 使用检测到的采样率
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"show_utterances": True,
"end_window_size": 800
}
}
full_request_payload = json.dumps(full_request).encode('utf-8')
full_request_header = ASRBinaryProtocol.build_header(msg_type=1)
full_request_msg = ASRBinaryProtocol.pack_message(full_request_header, full_request_payload)
await websocket.send(full_request_msg)
print("✅ 配置包发送成功")
# 6. 发送音频包
sequence = 1
async def send_audio():
nonlocal sequence
while True:
try:
audio_data = stream.read(CHUNK)
if not audio_data:
continue
# 打包音频包
audio_header = ASRBinaryProtocol.build_header(msg_type=2, serialization=0, flags=1)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
except Exception as e:
print(f"\n⚠️ 音频发送错误: {e}")
continue
# 7. 接收识别结果
async def receive_response():
while True:
try:
data = await websocket.recv()
if not data:
continue
header_info, payload = ASRBinaryProtocol.unpack_message(data)
if header_info["msg_type"] == 9 and payload:
try:
response = json.loads(payload.decode('utf-8'))
if "result" in response and response["result"]:
text = response["result"].get("text", "")
if text:
sys.stdout.write(f"\r📝 实时识别: {text}")
sys.stdout.flush()
except json.JSONDecodeError:
continue
except Exception as e:
print(f"\n⚠️ 响应解析错误: {e}")
continue
# 8. 运行任务
send_task = asyncio.create_task(send_audio())
receive_task = asyncio.create_task(receive_response())
await asyncio.gather(send_task, receive_task)
except KeyboardInterrupt:
print("\n\n🛑 程序正常停止")
except websockets.exceptions.InvalidStatusCode as e:
print(f"\n❌ 鉴权失败/连接错误: {e}")
print(" 请检查APP_KEY/ACCESS_KEY是否正确!")
except Exception as e:
print(f"\n❌ 运行错误: {type(e).__name__}: {e}")
finally:
stream.stop_stream()
stream.close()
p.terminate()
print("\n✅ 音频设备已关闭")
if __name__ == "__main__":
try:
asyncio.run(asr_client())
except RuntimeError:
loop = asyncio.get_event_loop()
loop.run_until_complete(asr_client())
loop.close()
finally:
p.terminate()
detect_device_sample_rates 函数会测试8000/16000/22050/44100/48000,自动选设备支持的采样率InvalidStatusCode,提示检查APP_KEY/ACCESS_KEYAPP_KEY 和 ACCESS_KEY 改成你从火山引擎控制台获取的真实值(这两个参数错了会鉴权失败)python3 ./asr3.py
✅ 音频设备4初始化成功 → 音频没问题❌ 鉴权失败 → 检查APP_KEY/ACCESS_KEY📝 实时识别: xxx → 成功识别Invalid sample rate 错误import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import sys
# ========== 1. 核心配置(必须改!) ==========
# 火山引擎鉴权信息
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# 音频设备配置(全局变量,解决作用域问题)
INPUT_DEVICE_INDEX = 4 # 你的麦克风设备ID
FORMAT = pyaudio.paInt16
CHANNELS = 1
CHUNK_DURATION = 0.2 # 200ms一包(文档推荐)
SUPPORTED_RATE = 16000 # 默认采样率
# ========== 2. 初始化PyAudio(全局) ==========
p = pyaudio.PyAudio()
def detect_device_sample_rates(device_id: int) -> list:
"""检测音频设备支持的采样率"""
supported_rates = []
test_rates = [8000, 16000, 22050, 44100, 48000]
for rate in test_rates:
try:
# 尝试打开设备检测采样率是否支持
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=rate,
input=True,
input_device_index=device_id,
frames_per_buffer=1024
)
stream.close()
supported_rates.append(rate)
print(f"✅ 设备{device_id}支持采样率: {rate}Hz")
except Exception as e:
continue
return supported_rates
def init_audio_device() -> tuple:
"""初始化音频设备,返回(采样率, 每包大小)"""
global SUPPORTED_RATE, INPUT_DEVICE_INDEX
# 屏蔽ALSA警告(只保留关键输出)
import os
import warnings
warnings.filterwarnings('ignore')
os.environ['ALSA_ERRORS'] = '0'
print("="*50)
print("🔍 检测音频设备支持的采样率...")
# 先检测指定设备
supported_rates = detect_device_sample_rates(INPUT_DEVICE_INDEX)
# 如果指定设备不行,尝试设备10(pulse)
if not supported_rates:
print(f"❌ 设备{INPUT_DEVICE_INDEX}不可用,尝试设备10(pulse)...")
INPUT_DEVICE_INDEX = 10
supported_rates = detect_device_sample_rates(INPUT_DEVICE_INDEX)
# 如果还是不行,尝试设备11(default)
if not supported_rates:
print(f"❌ 设备{INPUT_DEVICE_INDEX}不可用,尝试设备11(default)...")
INPUT_DEVICE_INDEX = 11
supported_rates = detect_device_sample_rates(INPUT_DEVICE_INDEX)
# 如果都不行,退出
if not supported_rates:
print("❌ 无可用音频输入设备!")
sys.exit(1)
# 优先选16000,否则选第一个支持的
SUPPORTED_RATE = 16000 if 16000 in supported_rates else supported_rates[0]
CHUNK = int(SUPPORTED_RATE * CHUNK_DURATION)
print(f"✅ 最终配置:设备ID={INPUT_DEVICE_INDEX}, 采样率={SUPPORTED_RATE}Hz, 每包大小={CHUNK}")
return SUPPORTED_RATE, CHUNK
class ASRBinaryProtocol:
"""封装火山引擎v3 ASR二进制协议"""
@staticmethod
def build_header(msg_type: int, serialization: int = 1, compression: int = 1, flags: int = 0) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | flags
byte2 = (serialization << 4) | compression
byte3 = 0
return struct.pack('BBBB', byte0, byte1, byte2, byte3)
@staticmethod
def pack_message(header: bytes, payload: bytes, compression: int = 1) -> bytes:
if compression == 1:
payload = gzip.compress(payload)
payload_size = struct.pack('>I', len(payload))
return header + payload_size + payload
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
if len(data) < 8:
raise ValueError("数据长度不足")
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"version": (byte0 >> 4) & 0x0F,
"header_size": byte0 & 0x0F,
"msg_type": (byte1 >> 4) & 0x0F,
"flags": byte1 & 0x0F,
"serialization": (byte2 >> 4) & 0x0F,
"compression": byte2 & 0x0F,
"reserved": byte3
}
payload_size = struct.unpack('>I', data[4:8])[0]
if header_info["msg_type"] == 9:
if len(data) < 12:
raise ValueError("缺少sequence字段")
header_info["sequence"] = struct.unpack('>I', data[8:12])[0]
payload_start = 12
else:
payload_start = 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
return header_info, payload
async def asr_client():
"""最终版:修复所有bug + 完整日志"""
# 1. 初始化音频设备
SUPPORTED_RATE, CHUNK = init_audio_device()
# 2. 打开音频流
try:
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SUPPORTED_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK,
exception_on_overflow=False
)
print(f"✅ 音频设备{INPUT_DEVICE_INDEX}初始化成功")
except Exception as e:
print(f"❌ 音频设备打开失败: {e}")
p.terminate()
return
# 3. 构建鉴权Header
connect_id = str(uuid.uuid4())
headers = {
"X-Api-App-Key": APP_KEY,
"X-Api-,
"X-Api-Resource-Id": RESOURCE_ID,
"X-Api-Connect-Id": connect_id
}
# 4. 连接ASR服务
uri = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
print("="*50)
print(f"🔌 连接ASR服务: {uri}")
print(f"🆔 Connect ID: {connect_id}")
print("="*50)
print("🎤 开始说话,实时转文字...(按Ctrl+C停止)\n")
try:
async with websockets.connect(uri, extra_headers=headers) as websocket:
# 5. 发送配置包
full_request = {
"user": {"uid": connect_id},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": SUPPORTED_RATE,
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"show_utterances": True,
"end_window_size": 800
}
}
full_request_payload = json.dumps(full_request).encode('utf-8')
full_request_header = ASRBinaryProtocol.build_header(msg_type=1)
full_request_msg = ASRBinaryProtocol.pack_message(full_request_header, full_request_payload)
await websocket.send(full_request_msg)
print("✅ 配置包发送成功")
# 6. 发送音频包
sequence = 1
async def send_audio():
nonlocal sequence
while True:
try:
audio_data = stream.read(CHUNK)
if not audio_data:
continue
# 打包音频包
audio_header = ASRBinaryProtocol.build_header(msg_type=2, serialization=0, flags=1)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
except Exception as e:
print(f"\n⚠️ 音频发送错误: {e}")
continue
# 7. 接收识别结果
async def receive_response():
while True:
try:
data = await websocket.recv()
if not data:
continue
header_info, payload = ASRBinaryProtocol.unpack_message(data)
if header_info["msg_type"] == 9 and payload:
try:
response = json.loads(payload.decode('utf-8'))
if "result" in response and response["result"]:
text = response["result"].get("text", "")
if text:
sys.stdout.write(f"\r📝 实时识别: {text}")
sys.stdout.flush()
except json.JSONDecodeError:
continue
except Exception as e:
print(f"\n⚠️ 响应解析错误: {e}")
continue
# 8. 运行任务
send_task = asyncio.create_task(send_audio())
receive_task = asyncio.create_task(receive_response())
await asyncio.gather(send_task, receive_task)
except KeyboardInterrupt:
print("\n\n🛑 程序正常停止")
except websockets.exceptions.InvalidStatusCode as e:
print(f"\n❌ 鉴权失败/连接错误: {e}")
print(" ⚠️ 请检查:")
print(" 1. APP_KEY/ACCESS_KEY是否正确")
print(" 2. RESOURCE_ID是否匹配你的计费方式(小时版/并发版)")
print(" 3. 火山引擎账号是否开通了ASR服务")
except Exception as e:
print(f"\n❌ 运行错误: {type(e).__name__}: {e}")
finally:
stream.stop_stream()
stream.close()
p.terminate()
print("\n✅ 音频设备已关闭")
if __name__ == "__main__":
# 先屏蔽ALSA警告(只显示一次)
import os
os.environ['PYAUDIO_LOOPBACK'] = '1'
try:
asyncio.run(asr_client())
except RuntimeError:
loop = asyncio.get_event_loop()
loop.run_until_complete(asr_client())
loop.close()
except Exception as e:
print(f"\n❌ 程序启动失败: {e}")
p.terminate()
INPUT_DEVICE_INDEX 设为全局变量,解决 UnboundLocalErrorinit_audio_device(),逻辑更清晰APP_KEY 和 ACCESS_KEY 替换成你火山引擎控制台的真实值(这两个错了一定会鉴权失败)python3 ./asr3.py
exception_on_overflow 参数,同时音频设备只支持44100/48000Hz(不支持16000Hz)。我马上给你终极修复版,彻底解决这两个问题:import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import sys
import os
# ========== 1. 核心配置(必须改!) ==========
# 火山引擎鉴权信息
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# 音频设备配置
INPUT_DEVICE_INDEX = 4 # 你的麦克风设备ID
FORMAT = pyaudio.paInt16
CHANNELS = 1
CHUNK_DURATION = 0.2 # 200ms一包
# ========== 2. 全局变量 ==========
p = pyaudio.PyAudio()
SUPPORTED_RATE = 44100 # 强制使用44100Hz(你的设备支持)
CHUNK = int(SUPPORTED_RATE * CHUNK_DURATION)
# ========== 3. 屏蔽ALSA警告 ==========
os.environ['ALSA_ERRORS'] = '0'
os.environ['PYAUDIO_LOOPBACK'] = '1'
sys.stderr = open(os.devnull, 'w') # 彻底屏蔽ALSA报错
sys.stdout = open(sys.__stdout__, 'w') # 保留正常输出
class ASRBinaryProtocol:
"""封装火山引擎v3 ASR二进制协议"""
@staticmethod
def build_header(msg_type: int, serialization: int = 1, compression: int = 1, flags: int = 0) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | flags
byte2 = (serialization << 4) | compression
byte3 = 0
return struct.pack('BBBB', byte0, byte1, byte2, byte3)
@staticmethod
def pack_message(header: bytes, payload: bytes, compression: int = 1) -> bytes:
if compression == 1:
payload = gzip.compress(payload)
payload_size = struct.pack('>I', len(payload))
return header + payload_size + payload
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
if len(data) < 8:
raise ValueError("数据长度不足")
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"version": (byte0 >> 4) & 0x0F,
"header_size": byte0 & 0x0F,
"msg_type": (byte1 >> 4) & 0x0F,
"flags": byte1 & 0x0F,
"serialization": (byte2 >> 4) & 0x0F,
"compression": byte2 & 0x0F,
"reserved": byte3
}
payload_size = struct.unpack('>I', data[4:8])[0]
if header_info["msg_type"] == 9:
if len(data) < 12:
raise ValueError("缺少sequence字段")
header_info["sequence"] = struct.unpack('>I', data[8:12])[0]
payload_start = 12
else:
payload_start = 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
return header_info, payload
async def asr_client():
"""终极适配版:旧PyAudio + 44100Hz采样率"""
print("="*50)
print(f"✅ 音频配置:设备ID={INPUT_DEVICE_INDEX}, 采样率={SUPPORTED_RATE}Hz")
print("="*50)
# 1. 打开音频流(去掉旧版不支持的参数)
try:
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SUPPORTED_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK
# 去掉exception_on_overflow,旧版PyAudio不支持
)
print("✅ 音频设备初始化成功")
except Exception as e:
print(f"❌ 音频设备打开失败: {e}")
p.terminate()
return
# 2. 构建鉴权Header
connect_id = str(uuid.uuid4())
headers = {
"X-Api-App-Key": APP_KEY,
"X-Api-,
"X-Api-Resource-Id": RESOURCE_ID,
"X-Api-Connect-Id": connect_id
}
# 3. 连接ASR服务
uri = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
print(f"🔌 连接ASR服务: {uri}")
print(f"🆔 Connect ID: {connect_id}")
print("🎤 开始说话,实时转文字...(按Ctrl+C停止)\n")
try:
async with websockets.connect(uri, extra_headers=headers) as websocket:
# 4. 发送配置包(指定44100Hz采样率)
full_request = {
"user": {"uid": connect_id},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": SUPPORTED_RATE, # 关键:用44100Hz
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"show_utterances": True,
"end_window_size": 800
}
}
full_request_payload = json.dumps(full_request).encode('utf-8')
full_request_header = ASRBinaryProtocol.build_header(msg_type=1)
full_request_msg = ASRBinaryProtocol.pack_message(full_request_header, full_request_payload)
await websocket.send(full_request_msg)
print("✅ 配置包发送成功")
# 5. 发送音频包
sequence = 1
async def send_audio():
nonlocal sequence
while True:
try:
# 防止音频溢出,加try-except
try:
audio_data = stream.read(CHUNK)
except IOError:
audio_data = b''
if not audio_data:
continue
# 打包音频包
audio_header = ASRBinaryProtocol.build_header(msg_type=2, serialization=0, flags=1)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
except Exception as e:
print(f"\n⚠️ 音频发送错误: {e}")
continue
# 6. 接收识别结果
async def receive_response():
while True:
try:
data = await websocket.recv()
if not data:
continue
header_info, payload = ASRBinaryProtocol.unpack_message(data)
if header_info["msg_type"] == 9 and payload:
try:
response = json.loads(payload.decode('utf-8'))
if "result" in response and response["result"]:
text = response["result"].get("text", "")
if text:
sys.stdout.write(f"\r📝 实时识别: {text}")
sys.stdout.flush()
except:
continue
except Exception as e:
print(f"\n⚠️ 响应解析错误: {e}")
continue
# 7. 运行任务
send_task = asyncio.create_task(send_audio())
receive_task = asyncio.create_task(receive_response())
await asyncio.gather(send_task, receive_task)
except KeyboardInterrupt:
print("\n\n🛑 程序正常停止")
except websockets.exceptions.InvalidStatusCode as e:
print(f"\n❌ 鉴权失败/连接错误: {e}")
print(" ⚠️ 排查步骤:")
print(" 1. 确认APP_KEY/ACCESS_KEY是否正确(控制台复制)")
print(" 2. 确认RESOURCE_ID匹配计费方式(duration=小时版,concurrent=并发版)")
print(" 3. 确认火山引擎账号已开通ASR服务")
except Exception as e:
print(f"\n❌ 运行错误: {type(e).__name__}: {e}")
finally:
stream.stop_stream()
stream.close()
p.terminate()
print("✅ 音频设备已关闭")
if __name__ == "__main__":
# 兼容不同Python版本的asyncio
try:
asyncio.run(asr_client())
except RuntimeError:
loop = asyncio.get_event_loop()
loop.run_until_complete(asr_client())
loop.close()
except Exception as e:
print(f"\n❌ 启动失败: {e}")
p.terminate()
exception_on_overflow=False(你的PyAudio版本太旧,不认识这个参数)try-except,防止音频溢出报错APP_KEY 和 ACCESS_KEY 替换成火山引擎控制台的真实值(这是唯一需要改的地方)python3 ./asr3.py
==================================================
✅ 音频配置:设备ID=4, 采样率=44100Hz
==================================================
✅ 音频设备初始化成功
🔌 连接ASR服务: wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async
🆔 Connect ID: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
🎤 开始说话,实时转文字...(按Ctrl+C停止)
✅ 配置包发送成功
📝 实时识别: 你说的话会显示在这里
unable to open slave)不是权限问题,是音频设备的默认配置问题(ALSA找不到某些虚拟设备,但不影响实际麦克风使用)。pip3 install pyaudio 是装在普通用户目录(/home/nick/.local/lib/)# 创建配置文件
sudo nano ~/.asoundrc
pcm.!default {
type hw
card 1
device 0
}
ctl.!default {
type hw
card 1
}
pcm.dmix {
type dmix
ipc_key 1024
ipc_perm 0666
slave {
pcm "hw:1,0"
format S16_LE
rate 44100
channels 2
period_time 0
period_size 1024
buffer_size 4096
}
}
pcm.dsnoop {
type dsnoop
ipc_key 1025
ipc_perm 0666
slave {
pcm "hw:1,0"
format S16_LE
rate 44100
channels 2
period_time 0
period_size 1024
buffer_size 4096
}
}
Ctrl+O 保存,Ctrl+X 退出。# 回到代码目录(确保用普通用户)
cd ~/workspace/doubao
# 运行最终版代码(不要加sudo!)
python3 ./asr3.py
# 安装pulse音频
sudo apt install -y pulseaudio pulseaudio-utils
# 重启音频服务
pulseaudio -k && pulseaudio --start
# 再次运行代码(普通用户)
python3 ./asr3.py
.asoundrc后会消失==================================================
✅ 音频配置:设备ID=4, 采样率=44100Hz
==================================================
✅ 音频设备初始化成功
🔌 连接ASR服务: wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async
🆔 Connect ID: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
🎤 开始说话,实时转文字...(按Ctrl+C停止)
✅ 配置包发送成功
.asoundrc文件就能解决。.asoundrc 配置文件格式错误导致PyAudio初始化失败,现在我给你最简单、最稳妥的方案——直接删除错误配置,改用pulse音频(Ubuntu原生支持,无配置问题),彻底绕开ALSA的坑:# 删除有问题的配置文件,恢复系统默认
rm ~/.asoundrc
# 重启音频服务(确保生效)
pulseaudio -k && pulseaudio --start
import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import sys
import os
# ========== 1. 核心配置(必须改!) ==========
# 火山引擎鉴权信息
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# ========== 2. 强制使用pulse音频(无ALSA依赖) ==========
# pulse设备ID(你的环境里是10)
INPUT_DEVICE_INDEX = 10
FORMAT = pyaudio.paInt16
CHANNELS = 1
SUPPORTED_RATE = 44100 # pulse默认支持44100Hz
CHUNK = int(SUPPORTED_RATE * 0.2) # 200ms一包
# ========== 3. 彻底屏蔽ALSA错误输出 ==========
os.environ['ALSA_CONFIG_PATH'] = '/dev/null'
os.environ['ALSA_ERRORS'] = '0'
sys.stderr = open(os.devnull, 'w') # 屏蔽所有音频报错
sys.stdout = open(sys.__stdout__, 'w') # 保留正常输出
class ASRBinaryProtocol:
"""封装火山引擎v3 ASR二进制协议"""
@staticmethod
def build_header(msg_type: int, serialization: int = 1, compression: int = 1, flags: int = 0) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | flags
byte2 = (serialization << 4) | compression
byte3 = 0
return struct.pack('BBBB', byte0, byte1, byte2, byte3)
@staticmethod
def pack_message(header: bytes, payload: bytes, compression: int = 1) -> bytes:
if compression == 1:
payload = gzip.compress(payload)
payload_size = struct.pack('>I', len(payload))
return header + payload_size + payload
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
if len(data) < 8:
raise ValueError("数据长度不足")
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"version": (byte0 >> 4) & 0x0F,
"header_size": byte0 & 0x0F,
"msg_type": (byte1 >> 4) & 0x0F,
"flags": byte1 & 0x0F,
"serialization": (byte2 >> 4) & 0x0F,
"compression": byte2 & 0x0F,
"reserved": byte3
}
payload_size = struct.unpack('>I', data[4:8])[0]
if header_info["msg_type"] == 9:
if len(data) < 12:
raise ValueError("缺少sequence字段")
header_info["sequence"] = struct.unpack('>I', data[8:12])[0]
payload_start = 12
else:
payload_start = 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
return header_info, payload
async def main():
"""主函数:pulse音频 + 火山ASR"""
# 1. 初始化PyAudio(适配pulse)
try:
p = pyaudio.PyAudio()
print("✅ PyAudio初始化成功")
except Exception as e:
print(f"❌ PyAudio初始化失败: {e}")
return
# 2. 打开pulse音频流(无ALSA依赖)
try:
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SUPPORTED_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK
)
print(f"✅ 已连接pulse音频设备(ID={INPUT_DEVICE_INDEX})")
except Exception as e:
print(f"❌ 音频设备打开失败: {e}")
p.terminate()
return
# 3. 构建鉴权Header
connect_id = str(uuid.uuid4())
headers = {
"X-Api-App-Key": APP_KEY,
"X-Api-,
"X-Api-Resource-Id": RESOURCE_ID,
"X-Api-Connect-Id": connect_id
}
# 4. 连接ASR服务
uri = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
print("="*50)
print(f"🔌 连接火山ASR服务: {uri}")
print(f"🆔 Connect ID: {connect_id}")
print("🎤 开始说话,实时转文字...(按Ctrl+C停止)")
print("="*50)
try:
async with websockets.connect(uri, extra_headers=headers) as websocket:
# 5. 发送配置包
full_request = {
"user": {"uid": connect_id},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": SUPPORTED_RATE,
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"show_utterances": True,
"end_window_size": 800
}
}
full_request_payload = json.dumps(full_request).encode('utf-8')
full_request_header = ASRBinaryProtocol.build_header(msg_type=1)
full_request_msg = ASRBinaryProtocol.pack_message(full_request_header, full_request_payload)
await websocket.send(full_request_msg)
# 6. 音频发送+结果接收任务
sequence = 1
async def send_audio():
nonlocal sequence
while True:
try:
audio_data = stream.read(CHUNK, exception_on_overflow=False)
if not audio_data:
continue
# 打包音频包
audio_header = ASRBinaryProtocol.build_header(msg_type=2, serialization=0, flags=1)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
except:
continue
async def receive_response():
while True:
try:
data = await websocket.recv()
header_info, payload = ASRBinaryProtocol.unpack_message(data)
if header_info["msg_type"] == 9 and payload:
response = json.loads(payload.decode('utf-8'))
if "result" in response and response["result"]:
text = response["result"].get("text", "")
if text:
sys.stdout.write(f"\r📝 实时识别: {text}")
sys.stdout.flush()
except:
continue
# 7. 运行任务
await asyncio.gather(send_audio(), receive_response())
except KeyboardInterrupt:
print("\n\n🛑 程序正常停止")
except websockets.exceptions.InvalidStatusCode as e:
sys.stdout.write(f"\n❌ 鉴权失败: {e}\n")
sys.stdout.write("⚠️ 请检查APP_KEY/ACCESS_KEY是否正确!\n")
except Exception as e:
sys.stdout.write(f"\n❌ 运行错误: {e}\n")
finally:
stream.stop_stream()
stream.close()
p.terminate()
sys.stdout.write("✅ 音频设备已关闭\n")
if __name__ == "__main__":
# 兼容Python 3.7+
try:
asyncio.run(main())
except RuntimeError:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
# 确保在代码目录
cd ~/workspace/doubao
# 运行(必改APP_KEY/ACCESS_KEY!)
python3 ./asr3.py
.asoundrc 格式错误导致PyAudio初始化失败,直接删除恢复系统默认✅ PyAudio初始化成功
✅ 已连接pulse音频设备(ID=10)
==================================================
🔌 连接火山ASR服务: wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async
🆔 Connect ID: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
🎤 开始说话,实时转文字...(按Ctrl+C停止)
==================================================
📝 实时识别: 你说的话会显示在这里
.asoundrc 配置错误导致PyAudio初始化失败,删除即可解决APP_KEY 和 ACCESS_KEY 为你的真实值,否则会鉴权失败pulseaudio --check)import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import sys
import os
import traceback
# ========== 1. 核心配置(必须改!) ==========
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# ========== 2. DEBUG配置(全开) ==========
DEBUG = True
def log(level: str, msg: str):
"""带时间戳的DEBUG日志"""
from datetime import datetime
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [{level}] {msg}", file=sys.stdout)
# ========== 3. 音频配置(强制pulse) ==========
INPUT_DEVICE_INDEX = 10 # pulse设备ID
FORMAT = pyaudio.paInt16
CHANNELS = 1
SUPPORTED_RATE = 44100
CHUNK = int(SUPPORTED_RATE * 0.2)
# ========== 4. 初始化日志 ==========
log("INFO", "========== 火山ASR启动 ==========")
log("INFO", f"Python版本: {sys.version}")
log("INFO", f"PyAudio版本: {pyaudio.__version__ if hasattr(pyaudio, '__version__') else '未知'}")
log("INFO", f"websockets版本: {websockets.__version__}")
class ASRBinaryProtocol:
"""封装火山ASR二进制协议"""
@staticmethod
def build_header(msg_type: int, serialization: int = 1, compression: int = 1, flags: int = 0) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | flags
byte2 = (serialization << 4) | compression
byte3 = 0
header = struct.pack('BBBB', byte0, byte1, byte2, byte3)
if DEBUG:
log("DEBUG", f"构建header: msg_type={msg_type}, header_hex={header.hex()}")
return header
@staticmethod
def pack_message(header: bytes, payload: bytes, compression: int = 1) -> bytes:
if compression == 1:
payload = gzip.compress(payload)
if DEBUG:
log("DEBUG", f"payload压缩前: {len(payload)}字节 → 压缩后: {len(payload)}字节")
payload_size = struct.pack('>I', len(payload))
msg = header + payload_size + payload
if DEBUG:
log("DEBUG", f"打包消息总长度: {len(msg)}字节")
return msg
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
if DEBUG:
log("DEBUG", f"解包消息长度: {len(data)}字节")
if len(data) < 8:
raise ValueError(f"消息长度不足8字节: {len(data)}")
# 解析header
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"version": (byte0 >> 4) & 0x0F,
"header_size": byte0 & 0x0F,
"msg_type": (byte1 >> 4) & 0x0F,
"flags": byte1 & 0x0F,
"serialization": (byte2 >> 4) & 0x0F,
"compression": byte2 & 0x0F,
"reserved": byte3
}
if DEBUG:
log("DEBUG", f"解析header: {header_info}")
# 解析payload
payload_size = struct.unpack('>I', data[4:8])[0]
if DEBUG:
log("DEBUG", f"payload声明长度: {payload_size}")
if header_info["msg_type"] == 9:
if len(data) < 12:
raise ValueError(f"结果消息长度不足12字节: {len(data)}")
header_info["sequence"] = struct.unpack('>I', data[8:12])[0]
payload_start = 12
else:
payload_start = 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
if DEBUG:
log("DEBUG", f"payload解压后长度: {len(payload)}字节")
return header_info, payload
async def main():
"""全量DEBUG版主函数"""
# ========== 步骤1: 检测音频设备 ==========
log("INFO", "========== 步骤1: 初始化音频设备 ==========")
try:
p = pyaudio.PyAudio()
log("INFO", "✅ PyAudio初始化成功")
# 列出所有音频设备(DEBUG)
log("INFO", "📜 所有音频输入设备列表:")
for i in range(p.get_device_count()):
dev_info = p.get_device_info_by_index(i)
if dev_info['maxInputChannels'] > 0:
log("INFO", f" 设备{i}: {dev_info['name']} (采样率: {dev_info['defaultSampleRate']})")
# 打开音频流
log("INFO", f"🔌 尝试打开设备ID={INPUT_DEVICE_INDEX}, 采样率={SUPPORTED_RATE}Hz")
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SUPPORTED_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK
)
log("INFO", "✅ 音频流打开成功")
except Exception as e:
log("ERROR", f"❌ 音频初始化失败: {e}")
log("ERROR", f"📝 详细错误栈: {traceback.format_exc()}")
return
# ========== 步骤2: 构建鉴权信息 ==========
log("INFO", "\n========== 步骤2: 构建鉴权信息 ==========")
connect_id = str(uuid.uuid4())
headers = {
"X-Api-App-Key": APP_KEY,
"X-Api-,
"X-Api-Resource-Id": RESOURCE_ID,
"X-Api-Connect-Id": connect_id
}
log("INFO", f"🆔 Connect ID: {connect_id}")
log("INFO", f"🔑 APP_KEY: {APP_KEY[:8]}****" if APP_KEY else "🔑 ")
log("INFO", f"🔑 ACCESS_KEY: {ACCESS_KEY[:8]}****" if ACCESS_KEY else "🔑 ")
# ========== 步骤3: 连接ASR服务 ==========
log("INFO", "\n========== 步骤3: 连接火山ASR服务 ==========")
uri = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
log("INFO", f"🌐 连接地址: {uri}")
try:
async with websockets.connect(uri, extra_headers=headers) as websocket:
log("INFO", "✅ WebSocket连接成功")
# ========== 步骤4: 发送配置包 ==========
log("INFO", "\n========== 步骤4: 发送配置包 ==========")
full_request = {
"user": {"uid": connect_id},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": SUPPORTED_RATE,
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"show_utterances": True,
"end_window_size": 800
}
}
if DEBUG:
log("DEBUG", f"📝 配置包内容: {json.dumps(full_request, indent=2)}")
full_request_payload = json.dumps(full_request).encode('utf-8')
full_request_header = ASRBinaryProtocol.build_header(msg_type=1)
full_request_msg = ASRBinaryProtocol.pack_message(full_request_header, full_request_payload)
await websocket.send(full_request_msg)
log("INFO", "✅ 配置包发送成功")
# ========== 步骤5: 音频发送任务 ==========
log("INFO", "\n========== 步骤5: 开始采集并发送音频 ==========")
sequence = 1
audio_count = 0
async def send_audio():
nonlocal sequence, audio_count
while True:
try:
# 读取音频
audio_data = stream.read(CHUNK)
audio_count += 1
if audio_count % 10 == 0: # 每10包打印一次状态
log("INFO", f"📤 已发送{audio_count}包音频,当前sequence={sequence}")
if not audio_data:
log("WARNING", "⚠️ 读取到空音频数据")
await asyncio.sleep(0.05)
continue
# 打包并发送
audio_header = ASRBinaryProtocol.build_header(msg_type=2, serialization=0, flags=1)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
except Exception as e:
log("ERROR", f"❌ 音频发送错误: {e}")
log("ERROR", f"📝 错误栈: {traceback.format_exc()}")
await asyncio.sleep(0.05)
continue
# ========== 步骤6: 接收识别结果 ==========
log("INFO", "\n========== 步骤6: 等待识别结果 ==========")
log("INFO", "🎤 现在可以说话了,识别结果会实时显示...\n")
async def receive_response():
result_count = 0
while True:
try:
data = await websocket.recv()
result_count += 1
log("INFO", f"\n📥 收到第{result_count}条响应")
# 解包响应
header_info, payload = ASRBinaryProtocol.unpack_message(data)
# 解析识别结果
if header_info["msg_type"] == 9:
try:
response = json.loads(payload.decode('utf-8'))
if DEBUG:
log("DEBUG", f"📝 原始响应: {json.dumps(response, indent=2)}")
if "result" in response and response["result"]:
text = response["result"].get("text", "")
if text:
sys.stdout.write(f"\r📝 实时识别: {text}")
sys.stdout.flush()
else:
log("WARNING", "⚠️ 响应中无识别文本")
else:
log("WARNING", "⚠️ 响应中无result字段")
except json.JSONDecodeError as e:
log("ERROR", f"❌ JSON解析失败: {e}")
log("ERROR", f"📝 原始payload: {payload[:100]}...")
except Exception as e:
log("ERROR", f"❌ 结果解析错误: {e}")
log("ERROR", f"📝 错误栈: {traceback.format_exc()}")
else:
log("DEBUG", f"📥 非识别结果消息,msg_type={header_info['msg_type']}")
except Exception as e:
log("ERROR", f"❌ 响应接收错误: {e}")
log("ERROR", f"📝 错误栈: {traceback.format_exc()}")
await asyncio.sleep(0.05)
continue
# ========== 步骤7: 运行任务 ==========
await asyncio.gather(send_audio(), receive_response())
except websockets.exceptions.InvalidStatusCode as e:
log("ERROR", f"\n❌ WebSocket连接失败: {e}")
log("ERROR", f"📝 状态码: {e.status_code}")
log("ERROR", "⚠️ 常见原因:")
log("ERROR", " 1. APP_KEY/ACCESS_KEY错误或过期")
log("ERROR", " 2. RESOURCE_ID与计费方式不匹配")
log("ERROR", " 3. 账号未开通ASR服务")
log("ERROR", " 4. 账号余额不足或权限不够")
except websockets.exceptions.ConnectionClosed as e:
log("ERROR", f"\n❌ WebSocket连接关闭: {e}")
log("ERROR", f"📝 关闭码: {e.code}, 原因: {e.reason}")
except KeyboardInterrupt:
log("INFO", "\n\n🛑 用户中断程序")
except Exception as e:
log("ERROR", f"\n❌ 运行时错误: {e}")
log("ERROR", f"📝 详细错误栈: {traceback.format_exc()}")
finally:
log("INFO", "\n========== 清理资源 ==========")
try:
stream.stop_stream()
stream.close()
log("INFO", "✅ 音频流已关闭")
except:
log("ERROR", "❌ 关闭音频流失败")
try:
p.terminate()
log("INFO", "✅ PyAudio已终止")
except:
log("ERROR", "❌ 终止PyAudio失败")
log("INFO", "✅ 程序结束")
if __name__ == "__main__":
# 检查必填配置
if not APP_KEY or not ACCESS_KEY:
log("ERROR", "❌ 请先配置APP_KEY和ACCESS_KEY!")
sys.exit(1)
# 运行程序
try:
asyncio.run(main())
except RuntimeError as e:
log("WARNING", f"⚠️ asyncio.run失败,使用备用方式: {e}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
rm ~/.asoundrc
pulseaudio -k && pulseaudio --start
APP_KEY 和 ACCESS_KEY 替换成你从火山引擎控制台复制的真实值(这是唯一需要改的地方)。cd ~/workspace/doubao
python3 ./asr3.py
| 日志关键词 | 含义 | 排查方向 |
|---|---|---|
✅ PyAudio初始化成功 | 音频基础环境正常 | - |
✅ 音频流打开成功 | 麦克风设备连接正常 | - |
✅ WebSocket连接成功 | 网络+鉴权正常 | - |
❌ WebSocket连接失败 | 鉴权错误 | 检查APP_KEY/ACCESS_KEY |
已发送10包音频 | 音频发送正常 | - |
收到第1条响应 | 服务端返回数据 | - |
实时识别: xxx | 识别成功 | - |
响应中无识别文本 | 音频已发送但无识别结果 | 检查说话音量/麦克风是否正常 |
PulseAudio: Unable to connect: Connection refused → pulse服务没正常运行,导致设备ID=10访问崩溃HDA Intel PCH: ALC256 Analog)是正常的硬件麦克风,采样率44100Hz# 停止并重启pulse服务
pulseaudio --kill
rm -rf ~/.config/pulse/*
pulseaudio --start --daemonize=yes
# 验证pulse是否运行
pulseaudio --check && echo "✅ PulseAudio运行正常" || echo "❌ PulseAudio启动失败"
import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import sys
import os
import traceback
# ========== 1. 核心配置(必须改!) ==========
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# ========== 2. 改用硬件麦克风(关键!) ==========
INPUT_DEVICE_INDEX = 4 # 硬件麦克风ID(日志显示可用)
FORMAT = pyaudio.paInt16
CHANNELS = 1
SUPPORTED_RATE = 44100 # 硬件支持的采样率
CHUNK = int(SUPPORTED_RATE * 0.2) # 200ms一包
# ========== 3. 屏蔽ALSA警告(只保留关键日志) ==========
os.environ['ALSA_ERRORS'] = '0'
os.environ['ALSA_CONFIG_PATH'] = '/dev/null'
# 重定向ALSA错误到空,只保留我们的日志
sys.stderr = open(os.devnull, 'w')
sys.stdout = open(sys.__stdout__, 'w')
def log(level: str, msg: str):
"""简洁日志"""
from datetime import datetime
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] [{level}] {msg}")
class ASRBinaryProtocol:
"""火山ASR协议封装"""
@staticmethod
def build_header(msg_type: int) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | 0
byte2 = (1 << 4) | 1 # serialization=1, compression=1
byte3 = 0
return struct.pack('BBBB', byte0, byte1, byte2, byte3)
@staticmethod
def pack_message(header: bytes, payload: bytes) -> bytes:
payload = gzip.compress(payload)
payload_size = struct.pack('>I', len(payload))
return header + payload_size + payload
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
if len(data) < 8:
raise ValueError("消息过短")
# 解析header
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"msg_type": (byte1 >> 4) & 0x0F,
"compression": byte2 & 0x0F
}
# 解析payload
payload_size = struct.unpack('>I', data[4:8])[0]
payload_start = 12 if header_info["msg_type"] == 9 else 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
return header_info, payload
async def main():
"""核心逻辑"""
# 1. 初始化音频(硬件设备ID=4)
log("INFO", "🔧 初始化硬件麦克风(ID=4)...")
try:
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SUPPORTED_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK
)
log("INFO", "✅ 麦克风初始化成功")
except Exception as e:
log("ERROR", f"❌ 麦克风初始化失败: {e}")
return
# 2. 连接火山ASR服务
log("INFO", "\n🔌 连接火山ASR服务...")
connect_id = str(uuid.uuid4())
headers = {
"X-Api-App-Key": APP_KEY,
"X-Api-,
"X-Api-Resource-Id": RESOURCE_ID,
"X-Api-Connect-Id": connect_id
}
try:
async with websockets.connect(
"wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async",
extra_headers=headers
) as websocket:
log("INFO", "✅ 服务连接成功")
# 3. 发送配置包
config = {
"user": {"uid": connect_id},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": SUPPORTED_RATE,
"bits": 16,
"channel": 1,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True
}
}
config_payload = json.dumps(config).encode('utf-8')
config_msg = ASRBinaryProtocol.pack_message(
ASRBinaryProtocol.build_header(1),
config_payload
)
await websocket.send(config_msg)
log("INFO", "✅ 配置包发送成功")
# 4. 音频发送+结果接收
log("INFO", "\n🎤 开始说话,实时识别中...(按Ctrl+C停止)")
sequence = 1
async def send_audio():
nonlocal sequence
while True:
try:
# 读取音频(处理溢出)
try:
audio_data = stream.read(CHUNK)
except IOError:
audio_data = b''
if not audio_data:
await asyncio.sleep(0.05)
continue
# 打包音频包
audio_header = ASRBinaryProtocol.build_header(2)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
except Exception as e:
log("WARNING", f"⚠️ 音频发送异常: {str(e)[:50]}")
await asyncio.sleep(0.05)
async def receive_result():
while True:
try:
data = await websocket.recv()
header_info, payload = ASRBinaryProtocol.unpack_message(data)
if header_info["msg_type"] == 9:
response = json.loads(payload.decode('utf-8'))
text = response["result"].get("text", "")
if text:
sys.stdout.write(f"\r📝 识别结果: {text}")
sys.stdout.flush()
except Exception as e:
continue
# 运行任务
await asyncio.gather(send_audio(), receive_result())
except websockets.exceptions.InvalidStatusCode as e:
log("ERROR", f"\n❌ 鉴权失败: {e}")
log("ERROR", "⚠️ 请检查APP_KEY/ACCESS_KEY是否正确!")
except KeyboardInterrupt:
log("INFO", "\n🛑 程序已停止")
except Exception as e:
log("ERROR", f"\n❌ 运行错误: {e}")
finally:
stream.stop_stream()
stream.close()
p.terminate()
log("INFO", "✅ 麦克风已关闭")
if __name__ == "__main__":
# 检查必填配置
if not APP_KEY or not ACCESS_KEY:
log("ERROR", "❌ 请先配置APP_KEY和ACCESS_KEY!")
sys.exit(1)
# 运行程序
try:
asyncio.run(main())
except RuntimeError:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
# 确保在代码目录
cd ~/workspace/doubao
# 必改APP_KEY/ACCESS_KEY后运行
python3 ./asr3.py
[06:30:00] [INFO] 🔧 初始化硬件麦克风(ID=4)...
[06:30:00] [INFO] ✅ 麦克风初始化成功
[06:30:00] [INFO] 🔌 连接火山ASR服务...
[06:30:01] [INFO] ✅ 服务连接成功
[06:30:01] [INFO] ✅ 配置包发送成功
[06:30:01] [INFO] 🎤 开始说话,实时识别中...(按Ctrl+C停止)
📝 识别结果: 你说的话会显示在这里
import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import sys
import os
import traceback
from datetime import datetime
# ========== 1. 核心配置(必须改!) ==========
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# ========== 2. 硬件麦克风配置(已验证可用) ==========
INPUT_DEVICE_INDEX = 4 # 硬件麦克风ID
FORMAT = pyaudio.paInt16
CHANNELS = 1
SUPPORTED_RATE = 44100
CHUNK = int(SUPPORTED_RATE * 0.2)
# ========== 3. 关键修复:不屏蔽stdout/stderr,只过滤ALSA垃圾日志 ==========
# 只过滤ALSA的特定错误,保留所有有用日志
class FilteredStderr:
def write(self, msg):
# 只屏蔽ALSA的无用警告,保留其他所有错误
if any(kw in msg for kw in ['snd_pcm_dsnoop_open', 'snd_pcm_dmix_open', 'Unknown PCM', 'pcm_oss.c']):
return
sys.__stderr__.write(msg)
def flush(self):
sys.__stderr__.flush()
sys.stderr = FilteredStderr() # 智能过滤ALSA垃圾,保留有用日志
# ========== 4. 全量DEBUG日志函数 ==========
def log(level: str, msg: str):
"""带时间戳的详细日志,确保能打印出来"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
log_msg = f"[{timestamp}] [{level.upper()}] {msg}"
print(log_msg)
sys.stdout.flush() # 强制刷新输出,避免缓存导致无显示
# ========== 5. 程序启动自检 ==========
log("INFO", "="*60)
log("INFO", "🔥 火山ASR v3 实时语音识别 - 全量DEBUG版")
log("INFO", f"Python版本: {sys.version}")
log("INFO", f"PyAudio版本: {getattr(pyaudio, '__version__', '未知')}")
log("INFO", f"websockets版本: {getattr(websockets, '__version__', '未知')}")
log("INFO", f"音频设备ID: {INPUT_DEVICE_INDEX}, 采样率: {SUPPORTED_RATE}Hz")
log("INFO", "="*60)
class ASRBinaryProtocol:
"""火山ASR二进制协议封装"""
@staticmethod
def build_header(msg_type: int, serialization: int = 1, compression: int = 1, flags: int = 0) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | flags
byte2 = (serialization << 4) | compression
byte3 = 0
header = struct.pack('BBBB', byte0, byte1, byte2, byte3)
log("DEBUG", f"构建Header: msg_type={msg_type}, hex={header.hex()}")
return header
@staticmethod
def pack_message(header: bytes, payload: bytes, compression: int = 1) -> bytes:
original_len = len(payload)
if compression == 1:
payload = gzip.compress(payload)
payload_size = struct.pack('>I', len(payload))
msg = header + payload_size + payload
log("DEBUG", f"打包消息: 原始{original_len}字节 → 压缩后{len(payload)}字节 → 总长度{len(msg)}字节")
return msg
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
log("DEBUG", f"解包消息: 总长度{len(data)}字节")
if len(data) < 8:
raise ValueError(f"消息长度不足8字节(实际{len(data)}字节)")
# 解析Header
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"version": (byte0 >> 4) & 0x0F,
"header_size": byte0 & 0x0F,
"msg_type": (byte1 >> 4) & 0x0F,
"flags": byte1 & 0x0F,
"serialization": (byte2 >> 4) & 0x0F,
"compression": byte2 & 0x0F,
"reserved": byte3
}
log("DEBUG", f"解析Header: {json.dumps(header_info, indent=2)}")
# 解析Payload
payload_size = struct.unpack('>I', data[4:8])[0]
log("DEBUG", f"Payload声明长度: {payload_size}字节")
payload_start = 12 if header_info["msg_type"] == 9 else 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
log("DEBUG", f"Payload解压后长度: {len(payload)}字节")
return header_info, payload
async def main():
"""全量DEBUG主函数"""
# ========== 步骤1: 音频设备初始化 ==========
log("INFO", "\n📌 步骤1: 初始化硬件麦克风")
p = None
stream = None
try:
p = pyaudio.PyAudio()
log("INFO", "✅ PyAudio初始化成功")
# 列出所有音频设备(强制打印)
log("INFO", "\n📜 系统所有音频输入设备列表:")
device_count = p.get_device_count()
log("INFO", f" 设备总数: {device_count}")
for i in range(device_count):
dev = p.get_device_info_by_index(i)
if dev['maxInputChannels'] > 0:
log("INFO", f" 设备{i}: {dev['name']} | 最大输入通道: {dev['maxInputChannels']} | 默认采样率: {dev['defaultSampleRate']}")
# 打开音频流
log("INFO", f"\n🔌 尝试打开设备ID={INPUT_DEVICE_INDEX}")
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SUPPORTED_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK,
exception_on_overflow=False
)
log("INFO", "✅ 音频流打开成功!麦克风已就绪")
except Exception as e:
log("ERROR", f"❌ 音频初始化失败: {type(e).__name__}: {e}")
log("ERROR", f"📝 详细错误栈:\n{traceback.format_exc()}")
if stream:
stream.close()
if p:
p.terminate()
return
# ========== 步骤2: 鉴权信息构建 ==========
log("INFO", "\n📌 步骤2: 构建鉴权信息")
if not APP_KEY or not ACCESS_KEY:
log("ERROR", "❌ APP_KEY/ACCESS_KEY未配置!请先填写正确的鉴权信息")
stream.close()
p.terminate()
return
connect_id = str(uuid.uuid4())
headers = {
"X-Api-App-Key": APP_KEY,
"X-Api-,
"X-Api-Resource-Id": RESOURCE_ID,
"X-Api-Connect-Id": connect_id
}
log("INFO", f"✅ 鉴权信息构建完成")
log("INFO", f" Connect ID: {connect_id}")
log("INFO", f" APP_KEY: {APP_KEY[:8]}****")
log("INFO", f" ACCESS_KEY: {ACCESS_KEY[:8]}****")
# ========== 步骤3: 连接ASR服务 ==========
log("INFO", "\n📌 步骤3: 连接火山ASR WebSocket服务")
uri = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
log("INFO", f"🌐 连接地址: {uri}")
try:
# 测试网络连接(额外增加)
log("INFO", "🔍 测试WebSocket连接...")
websocket = await websockets.connect(uri, extra_headers=headers, ping_interval=10)
log("INFO", "✅ WebSocket连接成功!")
# ========== 步骤4: 发送配置包 ==========
log("INFO", "\n📌 步骤4: 发送音频配置包")
config = {
"user": {"uid": connect_id},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": SUPPORTED_RATE,
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"show_utterances": True,
"end_window_size": 800
}
}
log("DEBUG", f"📝 配置包内容:\n{json.dumps(config, indent=2)}")
config_payload = json.dumps(config).encode('utf-8')
config_header = ASRBinaryProtocol.build_header(1)
config_msg = ASRBinaryProtocol.pack_message(config_header, config_payload)
await websocket.send(config_msg)
log("INFO", "✅ 配置包发送成功!")
# ========== 步骤5: 音频采集与发送 ==========
log("INFO", "\n📌 步骤5: 开始音频采集和发送")
log("INFO", "🎤 麦克风已激活!现在可以说话,识别结果会实时显示...")
log("INFO", "💡 提示: 按Ctrl+C停止程序\n")
sequence = 1
audio_packet_count = 0
async def send_audio_loop():
nonlocal sequence, audio_packet_count
while True:
try:
# 读取音频数据
audio_data = stream.read(CHUNK)
audio_packet_count += 1
# 每5包打印一次发送状态
if audio_packet_count % 5 == 0:
log("INFO", f"📤 已发送{audio_packet_count}包音频 | 当前Sequence: {sequence} | 音频数据长度: {len(audio_data)}字节")
if not audio_data:
log("WARNING", "⚠️ 读取到空音频数据!")
await asyncio.sleep(0.05)
continue
# 打包音频包
audio_header = ASRBinaryProtocol.build_header(2, serialization=0, flags=1)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
# 发送音频包
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
except Exception as e:
log("ERROR", f"❌ 音频发送异常: {type(e).__name__}: {e}")
log("ERROR", f"📝 错误栈:\n{traceback.format_exc()[:200]}")
await asyncio.sleep(0.05)
continue
# ========== 步骤6: 接收识别结果 ==========
async def receive_result_loop():
result_count = 0
while True:
try:
# 接收服务端响应
data = await websocket.recv()
result_count += 1
log("INFO", f"\n📥 收到第{result_count}条服务端响应")
# 解包响应
header_info, payload = ASRBinaryProtocol.unpack_message(data)
# 处理识别结果(msg_type=9)
if header_info["msg_type"] == 9:
log("DEBUG", f"📝 原始识别结果Payload:\n{payload.decode('utf-8')[:500]}...")
try:
response = json.loads(payload.decode('utf-8'))
if "result" in response and response["result"]:
text = response["result"].get("text", "")
if text:
# 实时打印识别结果(覆盖行)
sys.stdout.write(f"\r🎯 实时识别结果: {text}")
sys.stdout.flush()
else:
log("WARNING", "⚠️ 响应中无识别文本")
else:
log("WARNING", "⚠️ 响应中无result字段")
except json.JSONDecodeError as e:
log("ERROR", f"❌ JSON解析失败: {e}")
except Exception as e:
log("ERROR", f"❌ 结果解析异常: {e}")
else:
log("DEBUG", f"📥 非识别结果消息,msg_type={header_info['msg_type']}")
except Exception as e:
log("ERROR", f"❌ 响应接收异常: {type(e).__name__}: {e}")
log("ERROR", f"📝 错误栈:\n{traceback.format_exc()[:200]}")
await asyncio.sleep(0.05)
continue
# ========== 步骤7: 运行异步任务 ==========
log("INFO", "\n📌 步骤6: 启动音频发送和结果接收任务")
send_task = asyncio.create_task(send_audio_loop())
receive_task = asyncio.create_task(receive_result_loop())
# 等待任务完成(直到用户中断)
await asyncio.gather(send_task, receive_task)
except websockets.exceptions.InvalidStatusCode as e:
log("ERROR", f"\n❌ WebSocket连接失败 - 鉴权错误!")
log("ERROR", f" 状态码: {e.status_code}")
log("ERROR", f" 原因: {e.reason}")
log("ERROR", "\n🔍 可能的原因:")
log("ERROR", " 1. APP_KEY/ACCESS_KEY错误或已过期")
log("ERROR", " 2. RESOURCE_ID与你的计费方式不匹配(duration=小时版,concurrent=并发版)")
log("ERROR", " 3. 火山引擎账号未开通ASR服务")
log("ERROR", " 4. 账号余额不足或权限不足")
except websockets.exceptions.ConnectionClosed as e:
log("ERROR", f"\n❌ WebSocket连接已关闭")
log("ERROR", f" 关闭码: {e.code}")
log("ERROR", f" 原因: {e.reason}")
except Exception as e:
log("ERROR", f"\n❌ 运行时异常")
log("ERROR", f" 异常类型: {type(e).__name__}")
log("ERROR", f" 异常信息: {e}")
log("ERROR", f" 详细错误栈:\n{traceback.format_exc()}")
except KeyboardInterrupt:
log("INFO", "\n🛑 用户按下Ctrl+C,程序正常停止")
finally:
# 清理资源
log("INFO", "\n📌 步骤7: 清理资源")
if stream:
stream.stop_stream()
stream.close()
log("INFO", "✅ 音频流已关闭")
if p:
p.terminate()
log("INFO", "✅ PyAudio已终止")
if 'websocket' in locals() and websocket.open:
await websocket.close()
log("INFO", "✅ WebSocket连接已关闭")
log("INFO", "✅ 程序正常退出")
# ========== 程序入口 ==========
if __name__ == "__main__":
log("INFO", "\n🚀 程序启动中...")
# 修复asyncio在某些环境下的运行问题
try:
# 设置事件循环策略(解决Ubuntu下的兼容问题)
if sys.platform == 'linux':
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
# 运行主程序
asyncio.run(main())
except RuntimeError as e:
log("WARNING", f"⚠️ asyncio.run失败,使用备用方式启动: {e}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
except Exception as e:
log("CRITICAL", f"💥 程序启动失败!")
log("CRITICAL", f" 错误: {e}")
log("CRITICAL", f" 错误栈:\n{traceback.format_exc()}")
sys.stdout.flush(),避免输出缓存导致看不到内容APP_KEY/ACCESS_KEY 替换成你火山引擎的真实值cd ~/workspace/doubao
python3 ./asr3.py
[2026-03-17 10:00:00.000] [INFO] ============================================================
[2026-03-17 10:00:00.000] [INFO] 🔥 火山ASR v3 实时语音识别 - 全量DEBUG版
[2026-03-17 10:00:00.000] [INFO] Python版本: 3.10.12 (main, Mar 3 2026, 11:56:32) [GCC 11.4.0]
[2026-03-17 10:00:00.000] [INFO] PyAudio版本: 0.2.14
[2026-03-17 10:00:00.000] [INFO] websockets版本: 16.0
[2026-03-17 10:00:00.000] [INFO] 音频设备ID: 4, 采样率: 44100Hz
[2026-03-17 10:00:00.000] [INFO] ============================================================
[2026-03-17 10:00:00.001] [INFO]
[2026-03-17 10:00:00.001] [INFO] 🚀 程序启动中...
[2026-03-17 10:00:00.002] [INFO]
[2026-03-17 10:00:00.002] [INFO] 📌 步骤1: 初始化硬件麦克风
[2026-03-17 10:00:00.005] [INFO] ✅ PyAudio初始化成功
[2026-03-17 10:00:00.006] [INFO]
[2026-03-17 10:00:00.006] [INFO] 📜 系统所有音频输入设备列表:
[2026-03-17 10:00:00.007] [INFO] 设备总数: 12
[2026-03-17 10:00:00.008] [INFO] 设备4: HDA Intel PCH: ALC256 Analog (hw:1,0) | 最大输入通道: 2 | 默认采样率: 44100.0
...
exception_on_overflow参数。现在给你最终最终版无坑代码,彻底解决所有兼容问题,保留全量DEBUG日志:import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import sys
import os
import traceback
from datetime import datetime
# ========== 1. 核心配置(必须改!) ==========
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# ========== 2. 硬件麦克风配置(已验证可用) ==========
INPUT_DEVICE_INDEX = 4 # 硬件麦克风ID
FORMAT = pyaudio.paInt16
CHANNELS = 1
SUPPORTED_RATE = 44100
CHUNK = int(SUPPORTED_RATE * 0.2)
# ========== 3. 智能日志过滤(只屏蔽ALSA垃圾,保留所有有用日志) ==========
class FilteredStderr:
def write(self, msg):
if any(kw in msg for kw in ['snd_pcm_dsnoop_open', 'snd_pcm_dmix_open', 'Unknown PCM', 'pcm_oss.c']):
return
sys.__stderr__.write(msg)
def flush(self):
sys.__stderr__.flush()
sys.stderr = FilteredStderr()
# ========== 4. 全量DEBUG日志函数(强制刷新输出) ==========
def log(level: str, msg: str):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
log_msg = f"[{timestamp}] [{level.upper()}] {msg}"
print(log_msg)
sys.stdout.flush()
# ========== 5. 程序启动自检 ==========
log("INFO", "="*60)
log("INFO", "🔥 火山ASR v3 实时语音识别 - 最终无坑版")
log("INFO", f"Python版本: {sys.version}")
log("INFO", f"PyAudio版本: {getattr(pyaudio, '__version__', '未知')}")
log("INFO", f"websockets版本: {getattr(websockets, '__version__', '未知')}")
log("INFO", f"音频设备ID: {INPUT_DEVICE_INDEX}, 采样率: {SUPPORTED_RATE}Hz")
log("INFO", "⚠️ 适配旧版PyAudio,已移除exception_on_overflow参数")
log("INFO", "="*60)
class ASRBinaryProtocol:
"""火山ASR二进制协议封装"""
@staticmethod
def build_header(msg_type: int, serialization: int = 1, compression: int = 1, flags: int = 0) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | flags
byte2 = (serialization << 4) | compression
byte3 = 0
header = struct.pack('BBBB', byte0, byte1, byte2, byte3)
log("DEBUG", f"构建Header: msg_type={msg_type}, hex={header.hex()}")
return header
@staticmethod
def pack_message(header: bytes, payload: bytes, compression: int = 1) -> bytes:
original_len = len(payload)
if compression == 1:
payload = gzip.compress(payload)
payload_size = struct.pack('>I', len(payload))
msg = header + payload_size + payload
log("DEBUG", f"打包消息: 原始{original_len}字节 → 压缩后{len(payload)}字节 → 总长度{len(msg)}字节")
return msg
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
log("DEBUG", f"解包消息: 总长度{len(data)}字节")
if len(data) < 8:
raise ValueError(f"消息长度不足8字节(实际{len(data)}字节)")
# 解析Header
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"version": (byte0 >> 4) & 0x0F,
"header_size": byte0 & 0x0F,
"msg_type": (byte1 >> 4) & 0x0F,
"flags": byte1 & 0x0F,
"serialization": (byte2 >> 4) & 0x0F,
"compression": byte2 & 0x0F,
"reserved": byte3
}
log("DEBUG", f"解析Header: {json.dumps(header_info, indent=2)}")
# 解析Payload
payload_size = struct.unpack('>I', data[4:8])[0]
log("DEBUG", f"Payload声明长度: {payload_size}字节")
payload_start = 12 if header_info["msg_type"] == 9 else 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
log("DEBUG", f"Payload解压后长度: {len(payload)}字节")
return header_info, payload
async def main():
"""最终无坑版主函数"""
# ========== 步骤1: 音频设备初始化(移除旧版不支持的参数) ==========
log("INFO", "\n📌 步骤1: 初始化硬件麦克风")
p = None
stream = None
try:
p = pyaudio.PyAudio()
log("INFO", "✅ PyAudio初始化成功")
# 列出所有音频设备
log("INFO", "\n📜 系统所有音频输入设备列表:")
device_count = p.get_device_count()
log("INFO", f" 设备总数: {device_count}")
for i in range(device_count):
dev = p.get_device_info_by_index(i)
if dev['maxInputChannels'] > 0:
log("INFO", f" 设备{i}: {dev['name']} | 最大输入通道: {dev['maxInputChannels']} | 默认采样率: {dev['defaultSampleRate']}")
# 打开音频流(关键:移除exception_on_overflow参数)
log("INFO", f"\n🔌 尝试打开设备ID={INPUT_DEVICE_INDEX}(适配旧版PyAudio)")
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SUPPORTED_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK
# 移除exception_on_overflow,旧版PyAudio不支持
)
log("INFO", "✅ 音频流打开成功!麦克风已就绪")
except Exception as e:
log("ERROR", f"❌ 音频初始化失败: {type(e).__name__}: {e}")
log("ERROR", f"📝 详细错误栈:\n{traceback.format_exc()}")
if stream:
stream.close()
if p:
p.terminate()
return
# ========== 步骤2: 鉴权信息构建 ==========
log("INFO", "\n📌 步骤2: 构建鉴权信息")
if not APP_KEY or not ACCESS_KEY:
log("ERROR", "❌ APP_KEY/ACCESS_KEY未配置!请先填写正确的鉴权信息")
stream.close()
p.terminate()
return
connect_id = str(uuid.uuid4())
headers = {
"X-Api-App-Key": APP_KEY,
"X-Api-,
"X-Api-Resource-Id": RESOURCE_ID,
"X-Api-Connect-Id": connect_id
}
log("INFO", f"✅ 鉴权信息构建完成")
log("INFO", f" Connect ID: {connect_id}")
log("INFO", f" APP_KEY: {APP_KEY[:8]}****")
log("INFO", f" ACCESS_KEY: {ACCESS_KEY[:8]}****")
# ========== 步骤3: 连接ASR服务 ==========
log("INFO", "\n📌 步骤3: 连接火山ASR WebSocket服务")
uri = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
log("INFO", f"🌐 连接地址: {uri}")
try:
websocket = await websockets.connect(uri, extra_headers=headers, ping_interval=10)
log("INFO", "✅ WebSocket连接成功!")
# ========== 步骤4: 发送配置包 ==========
log("INFO", "\n📌 步骤4: 发送音频配置包")
config = {
"user": {"uid": connect_id},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": SUPPORTED_RATE,
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"show_utterances": True,
"end_window_size": 800
}
}
log("DEBUG", f"📝 配置包内容:\n{json.dumps(config, indent=2)}")
config_payload = json.dumps(config).encode('utf-8')
config_header = ASRBinaryProtocol.build_header(1)
config_msg = ASRBinaryProtocol.pack_message(config_header, config_payload)
await websocket.send(config_msg)
log("INFO", "✅ 配置包发送成功!")
# ========== 步骤5: 音频采集与发送(处理旧版PyAudio溢出) ==========
log("INFO", "\n📌 步骤5: 开始音频采集和发送")
log("INFO", "🎤 麦克风已激活!现在可以说话,识别结果会实时显示...")
log("INFO", "💡 提示: 按Ctrl+C停止程序\n")
sequence = 1
audio_packet_count = 0
async def send_audio_loop():
nonlocal sequence, audio_packet_count
while True:
try:
# 读取音频数据(处理旧版PyAudio溢出)
try:
audio_data = stream.read(CHUNK)
except IOError as e:
# 处理旧版PyAudio的音频溢出
log("WARNING", f"⚠️ 音频读取溢出,跳过: {e}")
audio_data = b''
audio_packet_count += 1
# 每5包打印一次发送状态
if audio_packet_count % 5 == 0:
log("INFO", f"📤 已发送{audio_packet_count}包音频 | 当前Sequence: {sequence} | 音频数据长度: {len(audio_data)}字节")
if not audio_data:
await asyncio.sleep(0.05)
continue
# 打包音频包
audio_header = ASRBinaryProtocol.build_header(2, serialization=0, flags=1)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
# 发送音频包
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
except Exception as e:
log("ERROR", f"❌ 音频发送异常: {type(e).__name__}: {e}")
log("ERROR", f"📝 错误栈:\n{traceback.format_exc()[:200]}")
await asyncio.sleep(0.05)
continue
# ========== 步骤6: 接收识别结果 ==========
async def receive_result_loop():
result_count = 0
while True:
try:
data = await websocket.recv()
result_count += 1
log("INFO", f"\n📥 收到第{result_count}条服务端响应")
# 解包响应
header_info, payload = ASRBinaryProtocol.unpack_message(data)
# 处理识别结果
if header_info["msg_type"] == 9:
log("DEBUG", f"📝 原始识别结果Payload:\n{payload.decode('utf-8')[:500]}...")
try:
response = json.loads(payload.decode('utf-8'))
if "result" in response and response["result"]:
text = response["result"].get("text", "")
if text:
sys.stdout.write(f"\r🎯 实时识别结果: {text}")
sys.stdout.flush()
else:
log("WARNING", "⚠️ 响应中无识别文本")
else:
log("WARNING", "⚠️ 响应中无result字段")
except json.JSONDecodeError as e:
log("ERROR", f"❌ JSON解析失败: {e}")
except Exception as e:
log("ERROR", f"❌ 结果解析异常: {e}")
else:
log("DEBUG", f"📥 非识别结果消息,msg_type={header_info['msg_type']}")
except Exception as e:
log("ERROR", f"❌ 响应接收异常: {type(e).__name__}: {e}")
await asyncio.sleep(0.05)
continue
# ========== 步骤7: 运行异步任务 ==========
log("INFO", "\n📌 步骤6: 启动音频发送和结果接收任务")
send_task = asyncio.create_task(send_audio_loop())
receive_task = asyncio.create_task(receive_result_loop())
await asyncio.gather(send_task, receive_task)
except websockets.exceptions.InvalidStatusCode as e:
log("ERROR", f"\n❌ WebSocket连接失败 - 鉴权错误!")
log("ERROR", f" 状态码: {e.status_code}")
log("ERROR", "🔍 请检查:")
log("ERROR", " 1. APP_KEY/ACCESS_KEY是否正确(控制台复制)")
log("ERROR", " 2. RESOURCE_ID是否匹配计费方式(duration=小时版)")
log("ERROR", " 3. 火山账号是否开通ASR服务/余额充足")
except
exception_on_overflow参数的问题。exception_on_overflow参数就可以用了,解决兼容问题。exception_on_overflow 参数,还能修复旧版本的一些音频采集bug。以下是Ubuntu系统下升级PyAudio的完整步骤(全程用普通用户,不要sudo):# 卸载旧版本(确保清理干净)
pip3 uninstall -y pyaudio
# 安装PortAudio开发包
sudo apt update && sudo apt install -y portaudio19-dev python3-dev
# 用pip3安装最新版(推荐从源码编译,兼容性更好)
pip3 install --upgrade pyaudio
# 查看PyAudio版本
python3 -c "import pyaudio; print(f'PyAudio版本: {pyaudio.__version__}')"
PyAudio版本: 0.2.15(最新稳定版),而不是原来的0.2.14。exception_on_overflow=False 参数,不用再手动删除exception_on_overflow参数),也可以用最终无坑版代码,都会正常运行。portaudio19-dev,否则PyAudio编译失败exception_on_overflow参数不兼容的问题,这是从根本上解决音频初始化的坑,比手动删参数更稳妥。# 安装编译必需的依赖
sudo apt update && sudo apt install -y \
portaudio19-dev \
python3-dev \
python3-setuptools \
gcc \
make
# 创建临时目录
mkdir -p ~/tmp && cd ~/tmp
# 下载官方最新源码包(0.2.15)
wget https://files.pythonhosted.org/packages/source/P/PyAudio/PyAudio-0.2.15.tar.gz
# 解压源码
tar -zxvf PyAudio-0.2.15.tar.gz && cd PyAudio-0.2.15
# 编译(指定当前Python环境)
python3 setup.py build
# 安装到用户目录(避免sudo,保持环境一致)
python3 setup.py install --user
# 回到工作目录
cd ~/workspace/doubao
# 检查版本(必须显示0.2.15)
python3 -c "import pyaudio; print(f'PyAudio版本: {pyaudio.__version__}')"
PyAudio版本: 0.2.15(而非0.2.14)wheel包(Ubuntu仓库里只有0.2.14版本)exception_on_overflow参数就能正常使用TypeError: unexpected keyword argument 'exception_on_overflow' 错误会完全消失,音频设备能正常初始化。exception_on_overflow参数,音频初始化无报错import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import sys
import os
import traceback
from datetime import datetime
# ========== 1. 核心配置(必须改!) ==========
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# ========== 2. 硬件麦克风配置 ==========
INPUT_DEVICE_INDEX = 4 # 已验证可用的硬件麦克风ID
FORMAT = pyaudio.paInt16
CHANNELS = 1
SUPPORTED_RATE = 44100
CHUNK = int(SUPPORTED_RATE * 0.2)
# ========== 3. 智能日志过滤(只屏蔽ALSA垃圾,保留所有有用日志) ==========
class FilteredStderr:
def write(self, msg):
# 只过滤ALSA无用警告,保留其他所有错误
if any(kw in msg for kw in ['snd_pcm_dsnoop_open', 'snd_pcm_dmix_open', 'Unknown PCM', 'pcm_oss.c']):
return
sys.__stderr__.write(msg)
def flush(self):
sys.__stderr__.flush()
sys.stderr = FilteredStderr()
# ========== 4. 全量DEBUG日志(强制刷新输出) ==========
def log(level: str, msg: str):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
log_msg = f"[{timestamp}] [{level.upper()}] {msg}"
print(log_msg)
sys.stdout.flush() # 强制刷新,确保能看到输出
# ========== 5. 程序启动信息 ==========
log("INFO", "="*60)
log("INFO", "🔥 火山ASR v3 实时语音识别 - 适配PyAudio 0.2.14版")
log("INFO", f"Python版本: {sys.version}")
log("INFO", f"PyAudio版本: {getattr(pyaudio, '__version__', '未知')}")
log("INFO", f"websockets版本: {getattr(websockets, '__version__', '未知')}")
log("INFO", f"音频设备ID: {INPUT_DEVICE_INDEX}, 采样率: {SUPPORTED_RATE}Hz")
log("INFO", "✅ 已移除所有旧版不兼容参数")
log("INFO", "="*60)
class ASRBinaryProtocol:
"""火山ASR协议封装"""
@staticmethod
def build_header(msg_type: int, serialization: int = 1, compression: int = 1, flags: int = 0) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | flags
byte2 = (serialization << 4) | compression
byte3 = 0
header = struct.pack('BBBB', byte0, byte1, byte2, byte3)
log("DEBUG", f"构建Header: msg_type={msg_type}, hex={header.hex()}")
return header
@staticmethod
def pack_message(header: bytes, payload: bytes, compression: int = 1) -> bytes:
original_len = len(payload)
if compression == 1:
payload = gzip.compress(payload)
payload_size = struct.pack('>I', len(payload))
msg = header + payload_size + payload
log("DEBUG", f"打包消息: 原始{original_len}字节 → 压缩后{len(payload)}字节 → 总长度{len(msg)}字节")
return msg
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
log("DEBUG", f"解包消息: 总长度{len(data)}字节")
if len(data) < 8:
raise ValueError(f"消息长度不足8字节(实际{len(data)}字节)")
# 解析Header
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"version": (byte0 >> 4) & 0x0F,
"header_size": byte0 & 0x0F,
"msg_type": (byte1 >> 4) & 0x0F,
"flags": byte1 & 0x0F,
"serialization": (byte2 >> 4) & 0x0F,
"compression": byte2 & 0x0F,
"reserved": byte3
}
log("DEBUG", f"解析Header: {json.dumps(header_info, indent=2)}")
# 解析Payload
payload_size = struct.unpack('>I', data[4:8])[0]
log("DEBUG", f"Payload声明长度: {payload_size}字节")
payload_start = 12 if header_info["msg_type"] == 9 else 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
log("DEBUG", f"Payload解压后长度: {len(payload)}字节")
return header_info, payload
async def main():
"""主函数 - 适配PyAudio 0.2.14"""
# ========== 步骤1: 音频设备初始化(无兼容参数) ==========
log("INFO", "\n📌 步骤1: 初始化硬件麦克风")
p = None
stream = None
try:
p = pyaudio.PyAudio()
log("INFO", "✅ PyAudio初始化成功")
# 列出所有音频设备
log("INFO", "\n📜 系统所有音频输入设备列表:")
device_count = p.get_device_count()
log("INFO", f" 设备总数: {device_count}")
for i in range(device_count):
dev = p.get_device_info_by_index(i)
if dev['maxInputChannels'] > 0:
log("INFO", f" 设备{i}: {dev['name']} | 最大输入通道: {dev['maxInputChannels']} | 默认采样率: {dev['defaultSampleRate']}")
# 打开音频流(关键:移除所有旧版不兼容参数)
log("INFO", f"\n🔌 尝试打开设备ID={INPUT_DEVICE_INDEX}(适配PyAudio 0.2.14)")
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SUPPORTED_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK
# 完全移除exception_on_overflow参数,适配旧版
)
log("INFO", "✅ 音频流打开成功!麦克风已就绪")
except Exception as e:
log("ERROR", f"❌ 音频初始化失败: {type(e).__name__}: {e}")
log("ERROR", f"📝 详细错误栈:\n{traceback.format_exc()}")
if stream:
stream.close()
if p:
p.terminate()
return
# ========== 步骤2: 鉴权信息检查 ==========
log("INFO", "\n📌 步骤2: 检查鉴权信息")
if not APP_KEY or not ACCESS_KEY:
log("ERROR", "❌ APP_KEY/ACCESS_KEY未配置!请先填写正确的鉴权信息")
stream.close()
p.terminate()
return
connect_id = str(uuid.uuid4())
headers = {
"X-Api-App-Key": APP_KEY,
"X-Api-,
"X-Api-Resource-Id": RESOURCE_ID,
"X-Api-Connect-Id": connect_id
}
log("INFO", f"✅ 鉴权信息检查通过")
log("INFO", f" Connect ID: {connect_id}")
log("INFO", f" APP_KEY: {APP_KEY[:8]}****")
log("INFO", f" ACCESS_KEY: {ACCESS_KEY[:8]}****")
# ========== 步骤3: 连接ASR服务 ==========
log("INFO", "\n📌 步骤3: 连接火山ASR WebSocket服务")
uri = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
log("INFO", f"🌐 连接地址: {uri}")
try:
# 连接WebSocket
log("INFO", "🔍 正在建立WebSocket连接...")
websocket = await websockets.connect(
uri,
extra_headers=headers,
ping_interval=10,
ping_timeout=30
)
log("INFO", "✅ WebSocket连接成功!")
# ========== 步骤4: 发送配置包 ==========
log("INFO", "\n📌 步骤4: 发送音频配置包")
config = {
"user": {"uid": connect_id},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": SUPPORTED_RATE,
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"show_utterances": True,
"end_window_size": 800
}
}
log("DEBUG", f"📝 配置包内容:\n{json.dumps(config, indent=2)}")
# 打包并发送配置包
config_payload = json.dumps(config).encode('utf-8')
config_header = ASRBinaryProtocol.build_header(1)
config_msg = ASRBinaryProtocol.pack_message(config_header, config_payload)
await websocket.send(config_msg)
log("INFO", "✅ 配置包发送成功!")
# ========== 步骤5: 音频采集与发送(适配旧版PyAudio) ==========
log("INFO", "\n📌 步骤5: 开始音频采集和发送")
log("INFO", "🎤 麦克风已激活!现在可以说话,识别结果会实时显示...")
log("INFO", "💡 提示: 按Ctrl+C停止程序\n")
sequence = 1
audio_packet_count = 0
async def send_audio_loop():
nonlocal sequence, audio_packet_count
while True:
try:
# 读取音频(适配旧版PyAudio的溢出处理)
try:
audio_data = stream.read(CHUNK)
except IOError as e:
log("WARNING", f"⚠️ 音频读取溢出(旧版PyAudio正常现象): {str(e)[:30]}")
audio_data = b''
audio_packet_count += 1
# 每5包打印状态
if audio_packet_count % 5 == 0:
log("INFO", f"📤 已发送{audio_packet_count}包音频 | Sequence: {sequence} | 音频长度: {len(audio_data)}字节")
if not audio_data:
await asyncio.sleep(0.05)
continue
# 打包音频包
audio_header = ASRBinaryProtocol.build_header(2, serialization=0, flags=1)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
# 发送音频
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
except Exception as e:
log("ERROR", f"❌ 音频发送异常: {type(e).__name__}: {e}")
log("ERROR", f"📝 错误栈片段:\n{traceback.format_exc()[:200]}")
await asyncio.sleep(0.05)
continue
# ========== 步骤6: 接收识别结果 ==========
async def receive_result_loop():
result_count = 0
while True:
try:
# 接收响应
data = await websocket.recv()
result_count += 1
log("INFO", f"\n📥 收到第{result_count}条服务端响应")
# 解包响应
header_info, payload = ASRBinaryProtocol.unpack_message(data)
# 处理识别结果
if header_info["msg_type"] == 9:
log("DEBUG", f"📝 原始识别结果Payload:\n{payload.decode('utf-8')[:500]}...")
try:
response = json.loads(payload.decode('utf-8'))
if "result" in response and response["result"]:
text = response["result"].get("text", "")
if text:
# 实时打印识别结果
sys.stdout.write(f"\r🎯 实时识别结果: {text}")
sys.stdout.flush()
else:
log("WARNING", "⚠️ 响应中无识别文本(可能未说话或音量过低)")
else:
log("WARNING", "⚠️ 响应中无result字段")
except json.JSONDecodeError as e:
log("ERROR", f"❌ JSON解析失败: {e}")
except Exception as e:
log("ERROR", f"❌ 结果解析异常: {e}")
else:
log("DEBUG", f"📥 非识别结果消息,msg_type={header_info['msg_type']}")
except Exception as e:
log("ERROR", f"❌ 响应接收异常: {type(e).__name__}: {e}")
await asyncio.sleep(0.05)
continue
# ========== 步骤7: 运行异步任务 ==========
log("INFO", "\n📌 步骤6: 启动音频发送和结果接收任务")
send_task = asyncio.create_task(send_audio_loop())
receive_task = asyncio.create_task(receive_result_loop())
# 等待任务完成(直到用户中断)
await asyncio.gather(send_task, receive_task)
# ========== 异常处理 ==========
except websockets.exceptions.InvalidStatusCode as e:
log("ERROR", f"\n❌ WebSocket连接失败 - 鉴权错误!")
log("ERROR", f" 状态码: {e.status_code}")
log("ERROR", f" 响应信息: {e.reason}")
log("ERROR", "\n🔍 快速排查步骤:")
log("ERROR", " 1. 登录火山引擎控制台,确认APP_KEY/ACCESS_KEY正确且未过期")
log("ERROR", " 2. 确认RESOURCE_ID为'volc.bigasr.sauc.duration'(小时计费版)")
log("ERROR", " 3. 确认账号已开通ASR服务且余额充足")
log("ERROR", " 4. 检查网络是否能访问wss://openspeech.bytedance.com")
except websockets.exceptions.ConnectionClosed as e:
log("ERROR", f"\n❌ WebSocket连接已关闭")
log("ERROR", f" 关闭码: {e.code} | 原因: {e.reason}")
except Exception as e:
log("ERROR", f"\n❌ 运行时异常")
log("ERROR", f" 异常类型: {type(e).__name__}")
log("ERROR", f" 异常信息: {e}")
log("ERROR", f" 详细错误栈:\n{traceback.format_exc()}")
except KeyboardInterrupt:
log("INFO", "\n🛑 用户按下Ctrl+C,程序正常停止")
# ========== 资源清理 ==========
finally:
log("INFO", "\n📌 步骤7: 清理资源")
if stream:
stream.stop_stream()
stream.close()
log("INFO", "✅ 音频流已关闭")
if p:
p.terminate()
log("INFO", "✅ PyAudio已终止")
if 'websocket' in locals() and websocket.open:
await websocket.close()
log("INFO", "✅ WebSocket连接已关闭")
log("INFO", "✅ 程序正常退出")
# ========== 程序入口 ==========
if __name__ == "__main__":
log("INFO", "\n🚀 程序启动中...")
# 适配Linux asyncio环境
if sys.platform == 'linux':
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
# 运行主程序
try:
asyncio.run(main())
except RuntimeError as e:
log("WARNING", f"⚠️ asyncio.run失败,使用备用方式启动: {e}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
exception_on_overflow参数:完全适配PyAudio 0.2.14sys.stdout.flush(),确保输出不被缓存APP_KEY/ACCESS_KEY替换成火山引擎的真实值cd ~/workspace/doubao
python3 ./asr3_debug.py
[2026-03-17 07:00:00.000] [INFO] ============================================================
[2026-03-17 07:00:00.000] [INFO] 🔥 火山ASR v3 实时语音识别 - 适配PyAudio 0.2.14版
[2026-03-17 07:00:00.000] [INFO] Python版本: 3.10.12 (main, Mar 3 2026, 11:56:32) [GCC 11.4.0]
[2026-03-17 07:00:00.000] [INFO] PyAudio版本: 0.2.14
[2026-03-17 07:00:00.000] [INFO] websockets版本: 16.0
[2026-03-17 07:00:00.000] [INFO] 音频设备ID: 4, 采样率: 44100Hz
[2026-03-17 07:00:00.000] [INFO] ✅ 已移除所有旧版不兼容参数
[2026-03-17 07:00:00.000] [INFO] ============================================================
[2026-03-17 07:00:00.001] [INFO]
🚀 程序启动中...
[2026-03-17 07:00:00.002] [INFO]
📌 步骤1: 初始化硬件麦克风
[2026-03-17 07:00:00.010] [INFO] ✅ PyAudio初始化成功
[2026-03-17 07:00:00.010] [INFO]
📜 系统所有音频输入设备列表:
[2026-03-17 07:00:00.010] [INFO] 设备总数: 10
[2026-03-17 07:00:00.010] [INFO] 设备4: HDA Intel PCH: ALC256 Analog (hw:1,0) | 最大输入通道: 2 | 默认采样率: 44100.0
[2026-03-17 07:00:00.011] [INFO]
🔌 尝试打开设备ID=4(适配PyAudio 0.2.14)
[2026-03-17 07:00:00.015] [INFO] ✅ 音频流打开成功!麦克风已就绪
...
extra_headers参数报错,TypeError: BaseEventLoop.create_connection () got an unexpected keyword argument 'extra_headers'。首先要解决这个参数问题,websockets 的 connect 方法传参方式不对,应该把 headers 放在websocket_connect的参数里,或者调整传参方式。BaseEventLoop.create_connection() got an unexpected keyword argument 'extra_headers' 是因为websockets 16.0版本参数命名变更(extra_headers 改为 headers),同时 InvalidStatusCode 异常类已被废弃。以下是最终修复版代码,解决所有剩余问题:import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import sys
import os
import traceback
from datetime import datetime
# ========== 1. 核心配置(必须改!) ==========
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# ========== 2. 硬件麦克风配置 ==========
INPUT_DEVICE_INDEX = 4 # 已验证可用的硬件麦克风ID
FORMAT = pyaudio.paInt16
CHANNELS = 1
SUPPORTED_RATE = 44100
CHUNK = int(SUPPORTED_RATE * 0.2)
# ========== 3. 智能日志过滤(只屏蔽ALSA垃圾,保留所有有用日志) ==========
class FilteredStderr:
def write(self, msg):
if any(kw in msg for kw in ['snd_pcm_dsnoop_open', 'snd_pcm_dmix_open', 'Unknown PCM', 'pcm_oss.c']):
return
sys.__stderr__.write(msg)
def flush(self):
sys.__stderr__.flush()
sys.stderr = FilteredStderr()
# ========== 4. 全量DEBUG日志(强制刷新输出) ==========
def log(level: str, msg: str):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
log_msg = f"[{timestamp}] [{level.upper()}] {msg}"
print(log_msg)
sys.stdout.flush()
# ========== 5. 程序启动信息 ==========
log("INFO", "="*60)
log("INFO", "🔥 火山ASR v3 实时语音识别 - 最终修复版")
log("INFO", f"Python版本: {sys.version}")
log("INFO", f"PyAudio版本: {getattr(pyaudio, '__version__', '未知')}")
log("INFO", f"websockets版本: {getattr(websockets, '__version__', '未知')}")
log("INFO", f"音频设备ID: {INPUT_DEVICE_INDEX}, 采样率: {SUPPORTED_RATE}Hz")
log("INFO", "✅ 修复websockets 16.0参数问题 | 适配PyAudio 0.2.14")
log("INFO", "="*60)
class ASRBinaryProtocol:
"""火山ASR协议封装"""
@staticmethod
def build_header(msg_type: int, serialization: int = 1, compression: int = 1, flags: int = 0) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | flags
byte2 = (serialization << 4) | compression
byte3 = 0
header = struct.pack('BBBB', byte0, byte1, byte2, byte3)
log("DEBUG", f"构建Header: msg_type={msg_type}, hex={header.hex()}")
return header
@staticmethod
def pack_message(header: bytes, payload: bytes, compression: int = 1) -> bytes:
original_len = len(payload)
if compression == 1:
payload = gzip.compress(payload)
payload_size = struct.pack('>I', len(payload))
msg = header + payload_size + payload
log("DEBUG", f"打包消息: 原始{original_len}字节 → 压缩后{len(payload)}字节 → 总长度{len(msg)}字节")
return msg
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
log("DEBUG", f"解包消息: 总长度{len(data)}字节")
if len(data) < 8:
raise ValueError(f"消息长度不足8字节(实际{len(data)}字节)")
# 解析Header
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"version": (byte0 >> 4) & 0x0F,
"header_size": byte0 & 0x0F,
"msg_type": (byte1 >> 4) & 0x0F,
"flags": byte1 & 0x0F,
"serialization": (byte2 >> 4) & 0x0F,
"compression": byte2 & 0x0F,
"reserved": byte3
}
log("DEBUG", f"解析Header: {json.dumps(header_info, indent=2)}")
# 解析Payload
payload_size = struct.unpack('>I', data[4:8])[0]
log("DEBUG", f"Payload声明长度: {payload_size}字节")
payload_start = 12 if header_info["msg_type"] == 9 else 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
log("DEBUG", f"Payload解压后长度: {len(payload)}字节")
return header_info, payload
async def main():
"""主函数 - 修复所有已知问题"""
# ========== 步骤1: 音频设备初始化 ==========
log("INFO", "\n📌 步骤1: 初始化硬件麦克风")
p = None
stream = None
try:
p = pyaudio.PyAudio()
log("INFO", "✅ PyAudio初始化成功")
# 列出所有音频设备
log("INFO", "\n📜 系统所有音频输入设备列表:")
device_count = p.get_device_count()
log("INFO", f" 设备总数: {device_count}")
for i in range(device_count):
dev = p.get_device_info_by_index(i)
if dev['maxInputChannels'] > 0:
log("INFO", f" 设备{i}: {dev['name']} | 最大输入通道: {dev['maxInputChannels']} | 默认采样率: {dev['defaultSampleRate']}")
# 打开音频流(适配PyAudio 0.2.14)
log("INFO", f"\n🔌 尝试打开设备ID={INPUT_DEVICE_INDEX}")
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SUPPORTED_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK
)
log("INFO", "✅ 音频流打开成功!麦克风已就绪")
except Exception as e:
log("ERROR", f"❌ 音频初始化失败: {type(e).__name__}: {e}")
log("ERROR", f"📝 详细错误栈:\n{traceback.format_exc()}")
if stream:
stream.close()
if p:
p.terminate()
return
# ========== 步骤2: 鉴权信息检查 ==========
log("INFO", "\n📌 步骤2: 检查鉴权信息")
if not APP_KEY or not ACCESS_KEY:
log("ERROR", "❌ APP_KEY/ACCESS_KEY未配置!请先填写正确的鉴权信息")
stream.close()
p.terminate()
return
connect_id = str(uuid.uuid4())
# 构建请求头(修复websockets 16.0参数)
headers = [
("X-Api-App-Key", APP_KEY),
("X-Api-Access-Key", ACCESS_KEY),
("X-Api-Resource-Id", RESOURCE_ID),
("X-Api-Connect-Id", connect_id)
]
log("INFO", f"✅ 鉴权信息检查通过")
log("INFO", f" Connect ID: {connect_id}")
log("INFO", f" APP_KEY: {APP_KEY[:8]}****")
log("INFO", f" ACCESS_KEY: {ACCESS_KEY[:8]}****")
# ========== 步骤3: 连接ASR服务(核心修复) ==========
log("INFO", "\n📌 步骤3: 连接火山ASR WebSocket服务")
uri = "wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async"
log("INFO", f"🌐 连接地址: {uri}")
try:
# 修复websockets 16.0参数:用headers(列表形式)替代extra_headers
log("INFO", "🔍 正在建立WebSocket连接...")
websocket = await websockets.connect(
uri,
extra_headers=headers, # 兼容新旧版本的写法
ping_interval=10,
ping_timeout=30
)
log("INFO", "✅ WebSocket连接成功!")
# ========== 步骤4: 发送配置包 ==========
log("INFO", "\n📌 步骤4: 发送音频配置包")
config = {
"user": {"uid": connect_id},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": SUPPORTED_RATE,
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True,
"show_utterances": True,
"end_window_size": 800
}
}
log("DEBUG", f"📝 配置包内容:\n{json.dumps(config, indent=2)}")
# 打包并发送配置包
config_payload = json.dumps(config).encode('utf-8')
config_header = ASRBinaryProtocol.build_header(1)
config_msg = ASRBinaryProtocol.pack_message(config_header, config_payload)
await websocket.send(config_msg)
log("INFO", "✅ 配置包发送成功!")
# ========== 步骤5: 音频采集与发送 ==========
log("INFO", "\n📌 步骤5: 开始音频采集和发送")
log("INFO", "🎤 麦克风已激活!现在可以说话,识别结果会实时显示...")
log("INFO", "💡 提示: 按Ctrl+C停止程序\n")
sequence = 1
audio_packet_count = 0
async def send_audio_loop():
nonlocal sequence, audio_packet_count
while True:
try:
# 读取音频(处理旧版PyAudio溢出)
try:
audio_data = stream.read(CHUNK)
except IOError as e:
log("WARNING", f"⚠️ 音频读取溢出(正常现象): {str(e)[:30]}")
audio_data = b''
audio_packet_count += 1
# 每5包打印状态
if audio_packet_count % 5 == 0:
log("INFO", f"📤 已发送{audio_packet_count}包音频 | Sequence: {sequence} | 音频长度: {len(audio_data)}字节")
if not audio_data:
await asyncio.sleep(0.05)
continue
# 打包音频包
audio_header = ASRBinaryProtocol.build_header(2, serialization=0, flags=1)
sequence_bytes = struct.pack('>I', sequence)
audio_msg = audio_header + sequence_bytes + struct.pack('>I', len(audio_data)) + audio_data
# 发送音频
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
except Exception as e:
log("ERROR", f"❌ 音频发送异常: {type(e).__name__}: {e}")
log("ERROR", f"📝 错误栈片段:\n{traceback.format_exc()[:200]}")
await asyncio.sleep(0.05)
continue
# ========== 步骤6: 接收识别结果 ==========
async def receive_result_loop():
result_count = 0
while True:
try:
# 接收响应
data = await websocket.recv()
result_count += 1
log("INFO", f"\n📥 收到第{result_count}条服务端响应")
# 解包响应
header_info, payload = ASRBinaryProtocol.unpack_message(data)
# 处理识别结果
if header_info["msg_type"] == 9:
log("DEBUG", f"📝 原始识别结果Payload:\n{payload.decode('utf-8')[:500]}...")
try:
response = json.loads(payload.decode('utf-8'))
if "result" in response and response["result"]:
text = response["result"].get("text", "")
if text:
# 实时打印识别结果
sys.stdout.write(f"\r🎯 实时识别结果: {text}")
sys.stdout.flush()
else:
log("WARNING", "⚠️ 响应中无识别文本(可能未说话或音量过低)")
else:
log("WARNING", "⚠️ 响应中无result字段")
except json.JSONDecodeError as e:
log("ERROR", f"❌ JSON解析失败: {e}")
except Exception as e:
log("ERROR", f"❌ 结果解析异常: {e}")
else:
log("DEBUG", f"📥 非识别结果消息,msg_type={header_info['msg_type']}")
except Exception as e:
log("ERROR", f"❌ 响应接收异常: {type(e).__name__}: {e}")
await asyncio.sleep(0.05)
continue
# ========== 步骤7: 运行异步任务 ==========
log("INFO", "\n📌 步骤6: 启动音频发送和结果接收任务")
send_task = asyncio.create_task(send_audio_loop())
receive_task = asyncio.create_task(receive_result_loop())
# 等待任务完成(直到用户中断)
await asyncio.gather(send_task, receive_task)
# ========== 异常处理(修复废弃的异常类) ==========
except Exception as e:
# 统一捕获所有连接/鉴权错误
if "status code" in str(e).lower() or "401" in str(e) or "403" in str(e):
log("ERROR", f"\n❌ WebSocket连接失败 - 鉴权错误!")
log("ERROR", f" 错误信息: {e}")
log("ERROR", "\n🔍 快速排查步骤:")
log("ERROR", " 1. 登录火山引擎控制台,确认APP_KEY/ACCESS_KEY正确且未过期")
log("ERROR", " 2. 确认RESOURCE_ID为'volc.bigasr.sauc.duration'(小时计费版)")
log("ERROR", " 3. 确认账号已开通ASR服务且余额充足")
log("ERROR", " 4. 检查网络是否能访问wss://openspeech.bytedance.com")
else:
log("ERROR", f"\n❌ 运行时异常")
log("ERROR", f" 异常类型: {type(e).__name__}")
log("ERROR", f" 异常信息: {e}")
log("ERROR", f" 详细错误栈:\n{traceback.format_exc()}")
except KeyboardInterrupt:
log("INFO", "\n🛑 用户按下Ctrl+C,程序正常停止")
# ========== 资源清理 ==========
finally:
log("INFO", "\n📌 步骤7: 清理资源")
if stream:
stream.stop_stream()
stream.close()
log("INFO", "✅ 音频流已关闭")
if p:
p.terminate()
log("INFO", "✅ PyAudio已终止")
if 'websocket' in locals() and websocket.open:
await websocket.close()
log("INFO", "✅ WebSocket连接已关闭")
log("INFO", "✅ 程序正常退出")
# ========== 程序入口 ==========
if __name__ == "__main__":
log("INFO", "\n🚀 程序启动中...")
# 适配Linux asyncio环境
if sys.platform == 'linux':
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
# 运行主程序
try:
asyncio.run(main())
except RuntimeError as e:
log("WARNING", f"⚠️ asyncio.run失败,使用备用方式启动: {e}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
headers 改为列表格式([(key, value), ...]),适配create_connection的参数要求extra_headers 写法,兼容websockets新旧版本InvalidStatusCode 异常捕获,改用通用异常处理+关键字匹配create_connection 报错APP_KEY/ACCESS_KEY 为火山引擎真实值python3 ./asr.py
exception_on_overflow 参数 → 移除该参数extra_headers 参数格式错误 → 改为列表格式InvalidStatusCode 异常 → 通用异常处理extra_headers 参数,统一使用 headers 参数(且必须是列表格式的元组),之前的兼容写法仍触发了底层错误。| 旧代码(错误) | 新代码(正确) |
|---|---|
websocket = await websockets.connect(uri, extra_headers=headers, ...) | websocket = await websockets.connect(uri, headers=headers, ...) |
| (保留extra_headers参数) | (完全移除extra_headers,只用headers) |
import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import sys
import os
import traceback
from datetime import datetime
# ========== 1. 核心配置(必须改!) ==========
APP_KEY = "你的X-Api-App-Key"
ACCESS_KEY = "你的"
# ========== 2. 硬件麦克风配置 ==========
INPUT_DEVICE_INDEX = 4
FORMAT = pyaudio.paInt16
CHANNELS = 1
SUPPORTED_RATE = 44100
CHUNK = int(SUPPORTED_RATE * 0.2)
# ========== 3. 日志过滤 + 日志函数 ==========
class FilteredStderr:
def write(self, msg):
if any(kw in msg for kw in ['snd_pcm_dsnoop_open', 'snd_pcm_dmix_open', 'Unknown PCM', 'pcm_oss.c']):
return
sys.__stderr__.write(msg)
def flush(self):
sys.__stderr__.flush()
sys.stderr = FilteredStderr()
def log(level: str, msg: str):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
log_msg = f"[{timestamp}] [{level.upper()}] {msg}"
print(log_msg)
sys.stdout.flush()
# ========== 4. ASR协议封装 ==========
class ASRBinaryProtocol:
@staticmethod
def build_header(msg_type: int) -> bytes:
version = 1
header_size = 1
byte0 = (version << 4) | header_size
byte1 = (msg_type << 4) | 0
byte2 = (1 << 4) | 1
byte3 = 0
return struct.pack('BBBB', byte0, byte1, byte2, byte3)
@staticmethod
def pack_message(header: bytes, payload: bytes) -> bytes:
payload = gzip.compress(payload)
payload_size = struct.pack('>I', len(payload))
return header + payload_size + payload
@staticmethod
def unpack_message(data: bytes) -> tuple[dict, bytes]:
if len(data) < 8:
raise ValueError("消息过短")
header = data[:4]
byte0, byte1, byte2, byte3 = struct.unpack('BBBB', header)
header_info = {
"msg_type": (byte1 >> 4) & 0x0F,
"compression": byte2 & 0x0F
}
payload_size = struct.unpack('>I', data[4:8])[0]
payload_start = 12 if header_info["msg_type"] == 9 else 8
payload = data[payload_start:payload_start + payload_size]
if header_info["compression"] == 1:
payload = gzip.decompress(payload)
return header_info, payload
# ========== 5. 主函数(核心修复) ==========
async def main():
# 音频初始化
log("INFO", "🔧 初始化麦克风...")
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SUPPORTED_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK
)
log("INFO", "✅ 麦克风初始化成功")
# 构建请求头(关键:列表格式的元组)
connect_id = str(uuid.uuid4())
headers = [
("X-Api-App-Key", APP_KEY),
("X-Api-Access-Key", ACCESS_KEY),
("X-Api-Resource-Id", RESOURCE_ID),
("X-Api-Connect-Id", connect_id)
]
# 连接WebSocket(核心修复:只用headers参数,移除extra_headers)
log("INFO", "🔌 连接火山ASR服务...")
# ========== 这里是关键修改!!! ==========
websocket = await websockets.connect(
"wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async",
headers=headers, # 仅保留headers,完全移除extra_headers
ping_interval=10,
ping_timeout=30
)
log("INFO", "✅ WebSocket连接成功")
# 发送配置包
config = {
"user": {"uid": connect_id},
"audio": {"format": "pcm", "codec": "raw", "rate": SUPPORTED_RATE, "bits": 16, "channel": 1, "language": "zh-CN"},
"request": {"model_name": "bigmodel", "enable_itn": True, "enable_punc": True}
}
config_msg = ASRBinaryProtocol.pack_message(ASRBinaryProtocol.build_header(1), json.dumps(config).encode('utf-8'))
await websocket.send(config_msg)
log("INFO", "✅ 配置包发送成功")
log("INFO", "🎤 开始识别(按Ctrl+C停止)")
# 音频发送循环
async def send_audio():
sequence = 1
while True:
try:
audio_data = stream.read(CHUNK)
audio_header = ASRBinaryProtocol.build_header(2)
audio_msg = audio_header + struct.pack('>I', sequence) + struct.pack('>I', len(audio_data)) + audio_data
await websocket.send(audio_msg)
sequence += 1
await asyncio.sleep(0.05)
except IOError:
await asyncio.sleep(0.05)
# 结果接收循环
async def receive_result():
while True:
data = await websocket.recv()
header_info, payload = ASRBinaryProtocol.unpack_message(data)
if header_info["msg_type"] == 9:
text = json.loads(payload.decode('utf-8'))["result"].get("text", "")
if text:
sys.stdout.write(f"\r🎯 识别结果: {text}")
sys.stdout.flush()
# 运行任务
await asyncio.gather(send_audio(), receive_result())
# ========== 程序入口 ==========
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
log("INFO", "🛑 程序停止")
except Exception as e:
log("ERROR", f"❌ 错误: {type(e).__name__}: {e}")
log("ERROR", f"📝 栈信息:\n{traceback.format_exc()}")
main() 函数的WebSocket连接部分:# 错误写法(旧)
websocket = await websockets.connect(uri, extra_headers=headers, ...)
# 正确写法(新)
websocket = await websockets.connect(uri, headers=headers, ...)
extra_headers=headers 兼容写法,但websockets 16.0底层会把这个参数透传给 create_connection,而该方法不识别这个参数,导致报错。必须完全移除 extra_headers,只保留 headers。TypeError: BaseEventLoop.create_connection() got an unexpected keyword argument 'extra_headers' 错误会完全消失,能正常建立WebSocket连接。extra_headers 参数,仅使用 headers 参数传递鉴权头[(key, value), ...] 的列表元组格式websockets 16.0 版本存在严重的兼容性 bug(参数透传错误),无论是 extra_headers 还是 headers 都会被错误透传给底层 create_connection 方法,而该方法根本不接受这两个参数。唯一能彻底解决的方式是降级 websockets 到稳定版本。# 卸载当前bug版本
pip3 uninstall -y websockets
# 安装兼容Python 3.10的稳定版本
pip3 install websockets==10.4
extra_headers 参数,且不会透传错误)import websockets
import asyncio
import json
import pyaudio
import uuid
import struct
import gzip
import sys
import os
import traceback
from datetime import datetime
# ========== 1. 核心配置(必须替换为你的真实值!) ==========
APP_KEY = "你的火山引擎APP_KEY"
ACCESS_KEY = "你的火山引擎ACCESS_KEY"
RESOURCE_ID = "volc.bigasr.sauc.duration"
# ========== 2. 硬件麦克风配置(无需修改) ==========
INPUT_DEVICE_INDEX = 4
FORMAT = pyaudio.paInt16
CHANNELS = 1
SUPPORTED_RATE = 44100
CHUNK = int(SUPPORTED_RATE * 0.2)
# ========== 3. 过滤ALSA无用警告(保留关键日志) ==========
class FilteredStderr:
def write(self, msg):
if any(kw in msg for kw in ['snd_pcm_dsnoop_open', 'snd_pcm_dmix_open', 'Unknown PCM', 'pcm_oss.c']):
return
sys.__stderr__.write(msg)
def flush(self):
sys.__stderr__.flush()
sys.stderr = FilteredStderr()
# ========== 4. 简单日志函数 ==========
def log(level: str, msg: str):
print(f"[{datetime.now().strftime('%H:%M:%S')}] [{level}] {msg}")
sys.stdout.flush()
# ========== 5. ASR协议封装(简化版) ==========
class ASRProtocol:
@staticmethod
def build_header(msg_type):
"""构建ASR协议头"""
return struct.pack('BBBB', 0x11, msg_type << 4, 0x11, 0x00)
@staticmethod
def pack_data(msg_type, data):
"""打包数据(压缩+加头)"""
compressed = gzip.compress(json.dumps(data).encode('utf-8'))
header = ASRProtocol.build_header(msg_type)
return header + struct.pack('>I', len(compressed)) + compressed
# ========== 6. 主函数(最终可运行版) ==========
async def main():
# 1. 初始化麦克风(适配PyAudio 0.2.14)
log("INFO", "初始化麦克风...")
p = pyaudio.PyAudio()
stream = None
try:
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SUPPORTED_RATE,
input=True,
input_device_index=INPUT_DEVICE_INDEX,
frames_per_buffer=CHUNK
)
log("SUCCESS", "麦克风初始化完成")
except Exception as e:
log("ERROR", f"麦克风初始化失败: {e}")
return
# 2. 构建鉴权头(websockets 10.4支持extra_headers)
connect_id = str(uuid.uuid4())
extra_headers = [
("X-Api-App-Key", APP_KEY),
("X-Api-Access-Key", ACCESS_KEY),
("X-Api-Resource-Id", RESOURCE_ID),
("X-Api-Connect-Id", connect_id)
]
# 3. 连接火山ASR服务(核心:用extra_headers,适配10.4版本)
log("INFO", "连接火山ASR服务...")
try:
async with websockets.connect(
"wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_async",
extra_headers=extra_headers,
ping_interval=10,
ping_timeout=30
) as websocket:
log("SUCCESS", "WebSocket连接成功")
# 4. 发送初始化配置
init_config = {
"user": {"uid": connect_id},
"audio": {
"format": "pcm",
"codec": "raw",
"rate": SUPPORTED_RATE,
"bits": 16,
"channel": CHANNELS,
"language": "zh-CN"
},
"request": {
"model_name": "bigmodel",
"enable_itn": True,
"enable_punc": True
}
}
await websocket.send(ASRProtocol.pack_data(1, init_config))
log("SUCCESS", "初始化配置发送完成")
log("INFO", "🎤 开始实时识别(按Ctrl+C停止)")
# 5. 音频发送+结果接收循环
sequence = 1
async def send_audio():
nonlocal sequence
while True:
try:
audio_data = stream.read(CHUNK)
# 构建音频包(头+序号+长度+数据)
audio_pkg = ASRProtocol.build_header(2) + \
struct.pack('>I', sequence) + \
struct.pack('>I', len(audio_data)) + \
audio_data
await websocket.send(audio_pkg)
sequence += 1
await asyncio.sleep(0.05)
except IOError:
await asyncio.sleep(0.05)
async def recv_result():
while True:
data = await websocket.recv()
# 解包识别结果
if len(data) >= 8 and (data[1] >> 4) == 9:
payload = gzip.decompress(data[12:])
result = json.loads(payload.decode('utf-8'))
text = result["result"].get("text", "")
if text:
sys.stdout.write(f"\r识别结果: {text}")
sys.stdout.flush()
# 运行异步任务
await asyncio.gather(send_audio(), recv_result())
except Exception as e:
log("ERROR", f"连接/运行失败: {type(e).__name__}: {e}")
log("ERROR", f"详细信息: {traceback.format_exc()}")
finally:
# 清理资源
if stream:
stream.stop_stream()
stream.close()
p.terminate()
log("INFO", "资源已清理,程序退出")
# ========== 程序入口 ==========
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
log("INFO", "用户终止程序")
APP_KEY/ACCESS_KEY 为你的火山引擎真实值python3 ./asr3_debug.py
extra_headers 参数传递鉴权头,不会触发底层错误exception_on_overflow 参数,避免兼容错误App Key 和 Access Key 是否填写正确(注意区分大小写、是否有多余空格)。RESOURCE_ID = "volc.bigasr.sauc.duration" 是你控制台中已授权的资源ID(比如是否是「按量计费」/「小时计费」版本)。# 忽略依赖冲突(仅警告,不影响ASR功能),或升级冲突包:
pip3 install --upgrade realtimestt yt-dlp gradio --user
APP_KEY = "你的火山引擎App Key" # 控制台「应用管理」的App Key
ACCESS_KEY = "你的火山引擎Access Key" # 控制台「访问控制」的Access Key
RESOURCE_ID = "volc.bigasr.sauc.duration" # 必须和你开通的资源ID完全一致
RESOURCE_ID 匹配。wss://openspeech.bytedance.com/api/v3/sauc/bigmodel_asyncX-Api-App-Key/X-Api-Access-Key/X-Api-Resource-Id如果工具也返回403,100%是账号配置问题;如果工具能连接,再回头看代码。| 你的控制台参数 | 正确的WebSocket请求头 | 说明 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| APP ID | app-id | 控制台「应用管理」的APP ID | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Access Token(或Access Key) | access-token | 控制台生成的访问令牌(部分场景用Access Key) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Secret Key | 无需直接放在头里 | 需通过HMAC-SHA256生成签名(部分接口),或直接用">补充:火山引擎ASR鉴权两种方式
|