Faster-Whisper 实时识别电脑语音转文本

avatar
作者
猴君
阅读量:2

Faster-Whisper 实时识别电脑语音转文本

前言

以前做的智能对话软件接的Baidu API,想换成本地的,就搭一套Faster-Whisper吧。
下面是B站视频实时转写的截图
效果图

项目

搭建环境

所需要的CUDANN已经装好了,如果装的是12.2应该是包含cuBLAS了
没装的,可以从下面链接下载装一下,文末的参考视频中也有讲解
https://github.com/Purfview/whisper-standalone-win/releases/tag/libs

配置好的CUDANN

Ancanda的运行环境去Clone一下之前配好的环境,用之前BertVits的即可

安装Faster-Whisper

输入即可安装

pip install faster-whisper 

下载模型

https://huggingface.co/Systran/faster-whisper-large-v3
下载完放到代码旁边就可以了
模型放代码同文件夹

编写测试代码

模型放这里

# local_files_only=True 表示加载本地模型 # model_size_or_path=path 指定加载模型路径 # device="cuda" 指定使用cuda # compute_type="int8_float16" 量化为8位 # language="zh" 指定音频语言 # vad_filter=True 开启vad # vad_parameters=dict(min_silence_duration_ms=1000) 设置vad参数 from faster_whisper import WhisperModel  model_size = "large-v3" path = r"D:\Project\Python_Project\FasterWhisper\large-v3"  # Run on GPU with FP16 model = WhisperModel(model_size_or_path=path, device="cuda", local_files_only=True)   # or run on GPU with INT8 # model = WhisperModel(model_size, device="cuda", compute_type="int8_float16") # or run on CPU with INT8 # model = WhisperModel(model_size, device="cpu", compute_type="int8")  segments, info = model.transcribe("audio.wav", beam_size=5, language="zh", vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1000))  print("Detected language '%s' with probability %f" % (info.language, info.language_probability))  for segment in segments:     print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))   

运行测试代码

找个音频放入文件夹内,输入python main.py即可运行!
可以看到正确(不太正确)的识别出了音频说了什么。
运行效果

实时转写脚本

新建一个脚本transper.py
运行即可

此处特别感谢开源项目
https://github.com/MyloBishop/transper

import os import sys import time import wave import tempfile import threading  import torch import pyaudiowpatch as pyaudio from faster_whisper import WhisperModel as whisper  # A bigger audio buffer gives better accuracy # but also increases latency in response. # 表示音频缓冲时间的常量 AUDIO_BUFFER = 5  # 此函数使用 PyAudio 库录制音频,并将其保存为一个临时的 WAV 文件。 # 使用 pyaudio.PyAudio 实例创建一个音频流,通过指定回调函数 callback 来实时写入音频数据到 WAV 文件。 # time.sleep(AUDIO_BUFFER) 会阻塞执行,确保录制足够的音频时间。 # 最后,函数返回保存的 WAV 文件的文件名。 def record_audio(p, device):     """Record audio from output device and save to temporary WAV file."""     with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:         filename = f.name         wave_file = wave.open(filename, "wb")         wave_file.setnchannels(device["maxInputChannels"])         wave_file.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))         wave_file.setframerate(int(device["defaultSampleRate"]))          def callback(in_data, frame_count, time_info, status):             """Write frames and return PA flag"""             wave_file.writeframes(in_data)             return (in_data, pyaudio.paContinue)          stream = p.open(             format=pyaudio.paInt16,             channels=device["maxInputChannels"],             rate=int(device["defaultSampleRate"]),             frames_per_buffer=pyaudio.get_sample_size(pyaudio.paInt16),             input=True,             input_device_index=device["index"],             stream_callback=callback,         )          try:             time.sleep(AUDIO_BUFFER)  # Blocking execution while playing         finally:             stream.stop_stream()             stream.close()             wave_file.close()             # print(f"{filename} saved.")     return filename  # 此函数使用 Whisper 模型对录制的音频进行转录,并输出转录结果。 def whisper_audio(filename, model):     """Transcribe audio buffer and display."""     # segments, info = model.transcribe(filename, beam_size=5, task="translate", language="zh", vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1000))     segments, info = model.transcribe(filename, beam_size=5, language="zh", vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1000))     os.remove(filename)     # print(f"{filename} removed.")     for segment in segments:         # print(f"[{segment.start:.2f} -> {segment.end:.2f}] {segment.text.strip()}")         print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))  # main 函数是整个脚本的主控制函数。 # 加载 Whisper 模型,选择合适的计算设备(GPU 或 CPU)。 # 获取默认的 WASAPI 输出设备信息,并选择默认的扬声器(输出设备)。 # 使用 PyAudio 开始录制音频,并通过多线程运行 whisper_audio 函数进行音频转录。 def main():     """Load model record audio and transcribe from default output device."""     print("Loading model...")     device = "cuda" if torch.cuda.is_available() else "cpu"     print(f"Using {device} device.")     # model = whisper("large-v3", device=device, compute_type="float16")     model = whisper("large-v3", device=device, local_files_only=True)      print("Model loaded.")      with pyaudio.PyAudio() as pya:         # Create PyAudio instance via context manager.         try:             # Get default WASAPI info             wasapi_info = pya.get_host_api_info_by_type(pyaudio.paWASAPI)         except OSError:             print("Looks like WASAPI is not available on the system. Exiting...")             sys.exit()          # Get default WASAPI speakers         default_speakers = pya.get_device_info_by_index(             wasapi_info["defaultOutputDevice"]         )          if not default_speakers["isLoopbackDevice"]:             for loopback in pya.get_loopback_device_info_generator():                 # Try to find loopback device with same name(and [Loopback suffix]).                 # Unfortunately, this is the most adequate way at the moment.                 if default_speakers["name"] in loopback["name"]:                     default_speakers = loopback                     break             else:                 print(                     """                     Default loopback output device not found.                     Run `python -m pyaudiowpatch` to check available devices.                     Exiting...                     """                 )                 sys.exit()          print(             f"Recording from: {default_speakers['name']} ({default_speakers['index']})\n"         )          while True:             filename = record_audio(pya, default_speakers)             thread = threading.Thread(target=whisper_audio, args=(filename, model))             thread.start()  main() 

实时转写WebSocket服务器模式

在最新Google Bard的帮助下,从同步多线程单机版变成了异步WebSocket服务器版本,Unity可以链接并监听实时转写的数据了(写这篇文章时是冬季,ChatGPT实测已经开始休眠状态了

import asyncio import os import wave import tempfile import torch import pyaudiowpatch as pyaudio from faster_whisper import WhisperModel as whisper import websockets import json  # Audio buffer time AUDIO_BUFFER = 5  # Dictionary to store WebSocket connections clients = {}  # handle client async def handle_client(websocket):    client_id = id(websocket)  # Using the WebSocket object's ID as a unique identifier    print(f"Client connected from {websocket.remote_address} with ID {client_id}")    clients[client_id] = websocket    try:       # print(f"Client connected from {websocket.remote_address}")       # Wait for messages from the client       async for message in websocket:          print(f"Received message from client {client_id}: {message}")          # Process the message (you can replace this with your logic)          response = f"Server received: {message}"          # Send a response back to the client          await websocket.send(response)          print(f"Sent response to client {client_id}: {response}")     except websockets.exceptions.ConnectionClosedError:       print(f"Connection with {websocket.remote_address} closed.")    finally:       # Remove the WebSocket connection when the client disconnects       del clients[client_id]  # Send a message to all connected clients async def send_all_clients(message):    if clients==None or clients=={}:        print("No clients connected.")       return    for client_id, websocket in clients.items():       try:          await websocket.send(message)          print(f"Sent message to client {client_id}: {message}")       except Exception as e:          print(f"Error sending message to client {client_id}: {e}")  # Send a message to a specific client identified by client_id async def send_message(client_id, message):    if client_id in clients:       websocket = clients[client_id]       await websocket.send(message)       print(f"Sent message to client {client_id}: {message}")    else:       print(f"Client with ID {client_id} not found.")   # Start the server async def main_server():    server = await websockets.serve(handle_client, "localhost", 8765)    print("WebSocket server started. Listening on ws://localhost:8765")     await server.wait_closed()    #This function records audio using the PyAudio library and saves it as a temporary WAV file. #Use pyaudio PyAudio instance creates an audio stream and writes audio data in real-time to a WAV file by specifying the callback function callback. #Due to the use of the asyncio library, it is no longer necessary to use time. sleep() to block execution, but instead to use asyncio. sleep() to wait asynchronously. #Finally, the function returns the file name of the saved WAV file. async def record_audio(p, device):     """Record audio from output device and save to temporary WAV file."""     with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:         filename = f.name         wave_file = wave.open(filename, "wb")         wave_file.setnchannels(device["maxInputChannels"])         wave_file.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))         wave_file.setframerate(int(device["defaultSampleRate"]))          def callback(in_data, frame_count, time_info, status):             """Write frames and return PA flag"""             wave_file.writeframes(in_data)             return (in_data, pyaudio.paContinue)          stream = p.open(             format=pyaudio.paInt16,             channels=device["maxInputChannels"],             rate=int(device["defaultSampleRate"]),             frames_per_buffer=pyaudio.get_sample_size(pyaudio.paInt16),             input=True,             input_device_index=device["index"],             stream_callback=callback,         )          await asyncio.sleep(AUDIO_BUFFER)          stream.stop_stream()         stream.close()         wave_file.close()         # print(f"{filename} saved.")     return filename   # SegmentData class class SegmentData:     def __init__(self, start, end,text):         # 实例属性         self.start = start         self.end = end         self.text = text      def __dict__(self):         return {"start": self.start, "end": self.end, "text": self.text}  def convert_to_unity_data(data):  # 参数 data 为字典列表     unity_data = []     for item in data:         segment_data = SegmentData(item["start"], item["end"], item["text"])         unity_data.append(segment_data)     return unity_data   # This function transcribes the recorded audio using the Whisper model and outputs the transcription result. async def whisper_audio(filename, model):     """Transcribe audio buffer and display."""     segments, info = model.transcribe(filename, beam_size=5, language="zh", vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1000))     os.remove(filename)     # print(f"{filename} removed.")     if segments:         segments_dict_list = [{"start": segment.start, "end": segment.end, "text": segment.text.strip()} for segment in segments]         json_transcriptions=json.dumps(segments_dict_list)         print(f"Transcription: {json_transcriptions}")         try:             await send_all_clients(json_transcriptions)         except Exception as e:             print(f"Error sending message: {e}")    # Start recording audio using PyAudio and concurrently run the whisper_audio function for audio transcription using asyncio.gather. async def main():     """Load model record audio and transcribe from default output device."""     print("Loading model...")     device = "cuda" if torch.cuda.is_available() else "cpu"     print(f"Using {device} device.")     model = whisper("large-v3", device=device, local_files_only=True,compute_type="int8_float16")      print("Model loaded.")      with pyaudio.PyAudio() as pya:         # Get microphone device information (assuming you want to select the first microphone device)         microphone_index = 0         microphone_info = pya.get_device_info_by_index(microphone_index)         while True:             filename = await record_audio(pya, microphone_info)             await asyncio.gather(whisper_audio(filename, model))   async def appmain():     await asyncio.gather(main(), main_server())  # Gather coroutines here  if __name__ == "__main__":     asyncio.run(appmain())  # Pass the main coroutine to asyncio.run() 

参考

faster-whisper
MyloBishop/transper
Google Bard
基于faster_whisper的实时语音识别
基于faster whisper实现实时语音识别项目语音转文本python编程实现

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!