From a7554f4a09219105b57a94b56136edaa32b83d7b Mon Sep 17 00:00:00 2001 From: zhaoyafan Date: Sat, 2 Mar 2024 11:26:59 +0800 Subject: [PATCH] First commit --- AudioDevice.ini | 1 + main.py | 332 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 333 insertions(+) create mode 100644 AudioDevice.ini create mode 100644 main.py diff --git a/AudioDevice.ini b/AudioDevice.ini new file mode 100644 index 0000000..d06ed44 --- /dev/null +++ b/AudioDevice.ini @@ -0,0 +1 @@ +扬声器 \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..acbbea8 --- /dev/null +++ b/main.py @@ -0,0 +1,332 @@ +import os +import sys +import time +import logging +import pyaudio +import sounddevice as sd +import soundfile as sf +import threading +import subprocess +from sounddevice import PortAudioError +from PyQt5.QtWidgets import QApplication, QWidget, QLabel, QMessageBox, QPushButton, QHBoxLayout +from PyQt5.QtCore import QTimer, Qt, QPoint, QLibraryInfo + + +class LoggerFileHandler: + def __init__(self, log_file: str, mode: str = 'a', level: str = None, fmt: str = None): + self.log, self.mod = log_file, mode + self.lev, self.fmt = level, fmt + self.sty = '%' + + +class LoggerConsHandler: + def __init__(self, level: str = None, fmt: str = None): + self.lev, self.fmt = level, fmt + self.sty = '%' + + +class Logger: + logger = None + levels = { + "CRITICAL": logging.CRITICAL, + "FATAL": logging.FATAL, + "ERROR": logging.ERROR, + "WARNING": logging.WARNING, + "WARN": logging.WARN, + "INFO": logging.INFO, + "DEBUG": logging.DEBUG, + "NOTSET": logging.NOTSET, + "D": logging.DEBUG, + "I": logging.INFO, + "W": logging.WARNING, + "E": logging.ERROR, + "F": logging.FATAL + } + default_format = '{asctime} - {name} - {levelname[0]}: {message}' + default_format_style = '{' + handler_list = [] + + def __init__( + self, + name: str = 'default', + default_level='DEBUG', + fh: LoggerFileHandler = None, + ch: LoggerConsHandler = None, + add_default_handler=False + ): + ch = LoggerConsHandler() if add_default_handler and not ch else ch + if fh and not isinstance(fh, LoggerFileHandler): + raise TypeError('The parameter fh must be type.') + if ch and not isinstance(ch, LoggerConsHandler): + raise TypeError('The parameter ch must be type.') + + self.logger = logging.getLogger(name) + self.logger.setLevel(self.levels[default_level]) + + if fh: + fhandler = logging.FileHandler(filename=fh.log, mode=fh.mod, encoding='utf-8') + self.handler_list.append(fhandler) + fhandler.setLevel(self.levels[fh.lev or default_level]) + fh.fmt = fh.fmt or self.default_format + fh.sty = '{' if '%' not in fh.fmt else '%' + fhandler.setFormatter(logging.Formatter(fmt=fh.fmt, style=fh.sty)) + self.logger.addHandler(fhandler) + if ch: + chandler = logging.StreamHandler() + self.handler_list.append(chandler) + chandler.setLevel(self.levels[ch.lev or default_level]) + ch.fmt = ch.fmt or self.default_format + ch.sty = '{' if '%' not in ch.fmt else '%' + chandler.setFormatter(logging.Formatter(fmt=ch.fmt, style=ch.sty)) + self.logger.addHandler(chandler) + + self.d = self.logger.debug + self.i = self.logger.info + self.w = self.logger.warning + self.e = self.logger.error + self.f = self.logger.fatal + self.c = self.logger.critical + + +logger = Logger(name='main', fh=LoggerFileHandler(log_file='%s.log' % (os.path.splitext(__file__)[0],), mode='w'), ch=LoggerConsHandler()) + + +class AudioPlayer: + def __init__(self): + self.playlist = [] + self.curr_track_index = 0 + self.play_thread = None + self.play_status = None + + def next_track(self): + self.curr_track_index = (self.curr_track_index + 1) % len(self.playlist) + + def prev_track(self): + self.curr_track_index = (self.curr_track_index - 1) % len(self.playlist) + + def curr_track(self): + return self.playlist[self.curr_track_index] + + def play_process(self): + self.play_status = 1 + try: + self.callback_play() + except Exception: + pass + while 1: + try: + file = self.curr_track() + try: + self.callback_name(os.path.basename(file)) + except Exception: + pass + data, fs = sf.read(file, dtype='float32') + if (self.play_status == 1) == 1: + logger.i('%s%s' % ('Play ', file)) + sd.play(data, fs) + sd.wait() + if (self.play_status != 1) == 1: + logger.i('%s%s' % ('Stopped', '')) + break + self.next_track() + except PortAudioError: + self.next_track() + logger.w('Audio device abnormal') + time.sleep(0.15) + break + except Exception as err: + self.next_track() + logger.w(str(err)) + break + try: + self.callback_stop() + except Exception: + pass + self.play_status = 0 + + def play(self): + if not self.play_status: + self.play_thread = threading.Thread(target=self.play_process) + self.play_thread.start() + + def stop(self): + self.play_status = 0 + sd.stop() + time.sleep(0.05) + + def next(self): + self.stop() + self.next_track() + self.play() + + def prev(self): + self.stop() + self.prev_track() + self.play() + + @staticmethod + def callback_play(): + pass + + @staticmethod + def callback_stop(): + pass + + @staticmethod + def callback_name(name): + pass + + +r = os.path.dirname(__file__) +player = AudioPlayer() + + +def is_audio_device_connected(device_name): + audio = pyaudio.PyAudio() + device_list = [] + for i in range(audio.get_device_count()): + device_info = audio.get_device_info_by_index(i) + if device_info.get('maxInputChannels') > 0 or device_info.get('maxOutputChannels') > 0: + device_list.append(device_info["name"]) + audio.terminate() + for item in device_list: + if device_name in item: + return True + return False + + +class MainWindow(QWidget): + def __init__(self): + super().__init__() + if (sd.default.device[1] == -1) == 1: + QMessageBox.warning(self, '错误', '无可用声音输出设备') + sys.exit() + self.play_controlled = 0 + try: + self.audio_device_name = open(os.path.join(os.path.dirname(__file__), 'AudioDevice.ini'), "r", encoding="utf-8").read().strip() + except Exception as err: + self.audio_device_name = '*' + QMessageBox.warning(self, '错误', '%s' % (str(err),)) + sys.exit() + player.playlist = [f for f in [os.path.join(r, f) for f in os.listdir(r)] if os.path.isfile(os.path.join(r, f)) and (f.endswith('.mp3') or f.endswith('.wav'))] + if (len(player.playlist) == 0) == 1: + QMessageBox.warning(self, '错误', '播放列表为空') + sys.exit() + self.title = 'main' + self.play_name = '' + self.setWindowTitle(self.title) + self.setGeometry(0, 0, 200, 100) + self.status_label = QLabel('暂无节目播放', self) + self.status_label.setStyleSheet("font-size: 20px; font-family: 'Microsoft YaHei'; color: #ffffff;") + self.status_label.setAlignment(Qt.AlignLeft | Qt.AlignVCenter) + self.status_label.setFixedSize(200, 40) + + self.bt_next_shortcut = "F5" + self.bt_next = QPushButton('%s(%s)' % ('切换', self.bt_next_shortcut), self) + self.bt_next.setShortcut(self.bt_next_shortcut) + self.bt_next.setStyleSheet("font-size: 14px; font-family: 'Microsoft YaHei'; color: #ffffff;") + self.bt_next.clicked.connect(self.button_next) + + self.bt_stop_shortcut = "F6" + self.bt_stop = QPushButton('%s(%s)' % ('停止', self.bt_stop_shortcut), self) + self.bt_stop.setShortcut(self.bt_stop_shortcut) + self.bt_stop.setStyleSheet("font-size: 14px; font-family: 'Microsoft YaHei'; color: #ffffff;") + self.bt_stop.clicked.connect(self.button_stop) + + layout = QHBoxLayout() + layout.addWidget(self.status_label) + layout.addWidget(self.bt_next) + layout.addWidget(self.bt_stop) + self.setLayout(layout) + self.setWindowFlag(Qt.WindowStaysOnTopHint) # 窗口保持在顶层 + self.setFixedSize(420, 125) + self.update_title = QTimer() + self.update_title.timeout.connect(self.update_window_title) + self.update_title.start(500) + + self.timer = QTimer() + self.timer.timeout.connect(self.check_device_status) + self.timer.start(500) + screen_geometry = QApplication.desktop().availableGeometry() + self.move(screen_geometry.bottomRight() - self.rect().bottomRight() - QPoint(0, 35)) + self.show() + + def closeEvent(self, event): + player.stop() + event.accept() + + def update_window_title(self): + self.setWindowTitle(self.title) + + def check_device_status(self): + if self.audio_device_name == '*' or is_audio_device_connected(self.audio_device_name): + self.setStyleSheet('background-color: #61b64a;') + if (self.play_controlled == 0) == 1: + self.play_controlled = 1 + # audio device connect + bat_thread = threading.Thread( + target=subprocess.run, + args=(os.path.join(os.path.dirname(__file__), "EventConnect.bat"),), + kwargs={'shell': True, 'stdout': subprocess.DEVNULL, 'stderr': subprocess.DEVNULL} + ) + bat_thread.start() + player.play() + else: + self.setStyleSheet('background-color: #353535;') + if (self.play_controlled == 1) == 1: + self.play_controlled = 0 + # audio device disconn + bat_thread = threading.Thread( + target=subprocess.run, + args=(os.path.join(os.path.dirname(__file__), "EventDisconn.bat"),), + kwargs={'shell': True, 'stdout': subprocess.DEVNULL, 'stderr': subprocess.DEVNULL} + ) + bat_thread.start() + player.stop() + + @staticmethod + def button_next(): + player.next() + + @staticmethod + def button_stop(): + player.stop() if player.play_status else player.play() + + def event_play(self): + self.status_label.setStyleSheet("font-size: 20px; font-family: 'Microsoft YaHei'; color: #ffffff;") + self.title = '正在播放' + self.bt_stop.setText('%s(%s)' % ('停止', self.bt_stop_shortcut)) + self.bt_stop.setShortcut(self.bt_stop_shortcut) + bat_thread = threading.Thread( + target=subprocess.run, + args=(os.path.join(os.path.dirname(__file__), "EventPlay.bat"),), + kwargs={'shell': True, 'stdout': subprocess.DEVNULL, 'stderr': subprocess.DEVNULL} + ) + bat_thread.start() + + def event_stop(self): + self.status_label.setStyleSheet("font-size: 20px; font-family: 'Microsoft YaHei'; color: #dd0000;") + self.title = '停止播放' + self.bt_stop.setText('%s(%s)' % ('播放', self.bt_stop_shortcut)) + self.bt_stop.setShortcut(self.bt_stop_shortcut) + bat_thread = threading.Thread( + target=subprocess.run, + args=(os.path.join(os.path.dirname(__file__), "EventStop.bat"),), + kwargs={'shell': True, 'stdout': subprocess.DEVNULL, 'stderr': subprocess.DEVNULL} + ) + bat_thread.start() + + def event_name(self, name): + self.play_name = name + self.status_label.setText(self.play_name) + + +if __name__ == '__main__': + os.environ['QT_QPA_PLATFORM_PLUGIN_PATH'] = QLibraryInfo.location(QLibraryInfo.PluginsPath) + app = QApplication(sys.argv) + window_main = MainWindow() + player.callback_play = window_main.event_play + player.callback_stop = window_main.event_stop + player.callback_name = window_main.event_name + player.play() + sys.exit(app.exec_())