CHANNELS = 2 RATE = 44100 def genDAF(delay): bufferSize = floor(delay / 1000 * RATE) device = PyAudio() try: streamIn = device.open( format=paFloat32, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=bufferSize ) streamOut = device.open( format=paFloat32, channels=CHANNELS, rate=RATE, output=True, frames_per_buffer=bufferSize ) except OSError: print('genDAF: error: No input/output device found! Connect and rerun') return print('CTRL-C to stop capture') while streamIn.is_active(): start = clock() audioData = streamIn.read(bufferSize) streamOut.write(audioData) actualDelay = floor((clock() - start) * 1000) print('Actual Delay: {} ms'.format(actualDelay))
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Usage: python3 dafgen_cli.py <delay_in_ms> from pyaudio import PyAudio, paFloat32 from math import floor from time import clock import sys CHANNELS = 2 RATE = 44100 def genDAF(delay): bufferSize = floor(delay / 1000 * RATE) device = PyAudio() try: streamIn = device.open( format=paFloat32, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=bufferSize ) streamOut = device.open( format=paFloat32, channels=CHANNELS, rate=RATE, output=True, frames_per_buffer=bufferSize ) except OSError: print('genDAF: error: No input/output device found! Connect and rerun') return print('CTRL-C to stop capture') while streamIn.is_active(): start = clock() audioData = streamIn.read(bufferSize) streamOut.write(audioData) actualDelay = floor((clock() - start) * 1000) print('Actual Delay: {} ms'.format(actualDelay)) def main(): if len(sys.argv) != 2: print('Usage: python3 {} <delay_in_ms>'.format(sys.argv[0])) sys.exit(1) try: delay = int(sys.argv[1]) except ValueError: print('main: error: Invalid input type') sys.exit(1) if not 50 <= delay <= 200: print('main: error: Delay must be in [50; 200] ms') sys.exit(1) print('Delay: {} ms\n'.format(delay)) try: genDAF(delay) except KeyboardInterrupt: print('Stopped') if __name__ == '__main__': main()
class MainApp(QMainWindow, Ui_DAFGen): _CHANNELS = 2 _RATE = 44100 def __init__(self): super().__init__() self.setupUi(self) # ... # ... def _startCapture(self): bufferSize = floor(self.delaySlider.value() / 1000 * self._RATE) device = PyAudio() try: streamIn = device.open( format=paFloat32, channels=self._CHANNELS, rate=self._RATE, input=True, frames_per_buffer=bufferSize ) streamOut = device.open( format=paFloat32, channels=self._CHANNELS, rate=self._RATE, output=True, frames_per_buffer=bufferSize ) except OSError: QMessageBox.critical(self, 'Error', 'No input/output device found! Connect and rerun.') return self._workerThread = Worker(bufferSize, streamIn, streamOut) self._workerThread._trigger.connect(self._updateActualDelay) # ... self._workerThread.start()
class Worker(QThread): _trigger = pyqtSignal(float) def __init__(self, bufferSize, streamIn, streamOut): QThread.__init__(self) self._bufferSize = bufferSize self._streamIn = streamIn self._streamOut = streamOut def __del__(self): self.wait() def _genDAF(self): while self._streamIn.is_active(): start = clock() audioData = self._streamIn.read(self._bufferSize) self._streamOut.write(audioData) actualDelay = clock() - start self._trigger.emit(actualDelay) def run(self): self._genDAF()
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Usage: python3 dafgen.py from PyQt5.QtWidgets import * from PyQt5.QtCore import * from ui_dafgen import Ui_DAFGen from pyaudio import PyAudio, paFloat32 from math import floor from time import clock import sys class Worker(QThread): _trigger = pyqtSignal(float) def __init__(self, bufferSize, streamIn, streamOut): QThread.__init__(self) self._bufferSize = bufferSize self._streamIn = streamIn self._streamOut = streamOut def __del__(self): self.wait() def _genDAF(self): while self._streamIn.is_active(): start = clock() audioData = self._streamIn.read(self._bufferSize) self._streamOut.write(audioData) actualDelay = clock() - start self._trigger.emit(actualDelay) def run(self): self._genDAF() class MainApp(QMainWindow, Ui_DAFGen): _CHANNELS = 2 _RATE = 44100 def __init__(self): super().__init__() self.setupUi(self) self.stopButton.setEnabled(False) self._updateDelay() self.delaySlider.valueChanged.connect(self._updateDelay) self.startButton.clicked.connect(self._startCapture) self.stopButton.clicked.connect(self._stopCapture) self.quitButton.clicked.connect(QApplication.quit) def _updateDelay(self): self.delayEdit.setPlainText(str(self.delaySlider.value()) + ' ms') def _startCapture(self): bufferSize = floor(self.delaySlider.value() / 1000 * self._RATE) device = PyAudio() try: streamIn = device.open( format=paFloat32, channels=self._CHANNELS, rate=self._RATE, input=True, frames_per_buffer=bufferSize ) streamOut = device.open( format=paFloat32, channels=self._CHANNELS, rate=self._RATE, output=True, frames_per_buffer=bufferSize ) except OSError: QMessageBox.critical(self, 'Error', 'No input/output device found! Connect and rerun.') return self._workerThread = Worker(bufferSize, streamIn, streamOut) self._workerThread._trigger.connect(self._updateActualDelay) self.startButton.setEnabled(False) self.delaySlider.setEnabled(False) self.stopButton.setEnabled(True) self._workerThread.start() def _stopCapture(self): self._workerThread.terminate() self.actualDelayEdit.clear() self.startButton.setEnabled(True) self.delaySlider.setEnabled(True) self.stopButton.setEnabled(False) def _updateActualDelay(self, t): newValue = floor(t * 1000) self.actualDelayEdit.setPlainText(str(newValue) + ' ms') def main(): app = QApplication(sys.argv) win = MainApp() win.show() sys.exit(app.exec_()) if __name__ == '__main__': main()
Source: https://habr.com/ru/post/347580/
All Articles