Spaces:
Build error
Build error
| # Install the assemblyai package by executing the command `pip3 install assemblyai` (macOS) or `pip install assemblyai` (Windows). | |
| import io | |
| import os | |
| import time | |
| from pyannote.audio import Pipeline | |
| # Import the AssemblyAI module | |
| from pprint import pprint | |
| import torch | |
| from transformers import pipeline | |
| from transformers.utils import is_flash_attn_2_available | |
| from pydub import AudioSegment | |
| import numpy as np | |
| from segment_wave_files import segment_wave_files | |
| from transcribe_files import transcribe_segments | |
| from transcript_analysis import transcript_analysis | |
| #from huggingface_hub import login | |
| #login() | |
| hugging_face = os.environ.get("HUGGING_FACE") | |
| pipelineDiary = Pipeline.from_pretrained( | |
| "pyannote/speaker-diarization-3.1", | |
| use_auth_token=hugging_face) | |
| if torch.cuda.is_available(): | |
| print("diarize_wav_file Using CUDA") | |
| pipelineDiary.to(torch.device("cuda")) | |
| else: | |
| print("diarize_wav_file Using CPU") | |
| def diarize_wav_file(file_name): | |
| print("DIARIZING " + file_name) | |
| start = time.time() | |
| diarization = pipelineDiary(file_name, num_speakers=2) | |
| print("Elapsed " + str(time.time() - start)) | |
| # {"waveform": audio_tensor, "sample_rate": sample_rate_tensor}) | |
| speakers = [] | |
| contSpeaker = "" | |
| dict = None | |
| for turn, _, speaker in diarization.itertracks(yield_label=True): | |
| if contSpeaker != speaker: | |
| if dict is not None: | |
| speakers.append(dict) | |
| dict = {'speaker': speaker, 'start': round(turn.start, 1), | |
| 'end': round(turn.end, 1)} | |
| contSpeaker = speaker | |
| else: | |
| dict['end'] = round(turn.end, 1) | |
| return speakers | |
| def convert_mono_16khz(location, file): | |
| sound = AudioSegment.from_file(location+file, "wav") | |
| sound = sound.set_channels(1) | |
| sound = sound.set_frame_rate(16000) | |
| sound.export(location+"16khz"+file, "wav") | |
| location = os.path.join(".", "data") + os.sep | |
| def get_included_files(): | |
| files = os.listdir(location) | |
| return location, files | |
| def main(): | |
| dir_list = os.listdir(location) | |
| for file in dir_list : | |
| #input_file=location+file | |
| input_file='C:\\Users\\jerry\\Downloads\\SampleCallsWave\\Tech Support Help from Call Center Experts1.wav' | |
| # apply pretrained pipeline | |
| # Pass the audio tensor and sample rate to the pipeline | |
| speakers = diarize_wav_file(input_file) | |
| speakers = segment_wave_files(speakers, input_file) | |
| transcript = transcribe_segments(speakers) | |
| print( | |
| "---------------------------------------------------------------------") | |
| pprint(transcript) | |
| print("---------------------------------------------------------------------") | |
| summary = transcript_analysis(transcript) | |
| pprint(summary) #.encode('utf-8').decode('utf-8')) | |
| print("\n\n\n\n\n\n\n") | |
| def convertMp3ToWav(file) : | |
| # convert mp3 file to a wav file | |
| sound = AudioSegment.from_mp3(file) | |
| # sound.export(output_file, format="wav") | |
| sample_rate = sound.frame_count() / sound.duration_seconds | |
| print(sample_rate) | |
| duration = sound.duration_seconds | |
| sound = sound.set_frame_rate(16000) | |
| sound = sound.set_channels(1) | |
| outFile = os.path.splitext(file)[0]+".wav" | |
| sound.export(outFile, format="wav") | |
| return outFile | |
| main() | |