AudioPlayer/main.py

335 lines
12 KiB
Python

import os
import sys
import site
import time
import logging
import pyaudio
import sounddevice as sd
import soundfile as sf
import threading
import subprocess
from sounddevice import PortAudioError
from PyQt5.QtWidgets import QApplication, QWidget, QLabel, QMessageBox, QPushButton, QHBoxLayout
from PyQt5.QtCore import QTimer, Qt, QPoint, QCoreApplication
class LoggerFileHandler:
def __init__(self, log_file: str, mode: str = 'a', level: str = None, fmt: str = None):
self.log, self.mod = log_file, mode
self.lev, self.fmt = level, fmt
self.sty = '%'
class LoggerConsHandler:
def __init__(self, level: str = None, fmt: str = None):
self.lev, self.fmt = level, fmt
self.sty = '%'
class Logger:
logger = None
levels = {
'CRITICAL': logging.CRITICAL,
'FATAL': logging.FATAL,
'ERROR': logging.ERROR,
'WARNING': logging.WARNING,
'WARN': logging.WARN,
'INFO': logging.INFO,
'DEBUG': logging.DEBUG,
'NOTSET': logging.NOTSET,
'D': logging.DEBUG,
'I': logging.INFO,
'W': logging.WARNING,
'E': logging.ERROR,
'F': logging.FATAL
}
default_format = '{asctime} - {name} - {levelname[0]}: {message}'
default_format_style = '{'
handler_list = []
def __init__(
self,
name: str = 'default',
default_level='DEBUG',
fh: LoggerFileHandler = None,
ch: LoggerConsHandler = None,
add_default_handler=False
):
ch = LoggerConsHandler() if add_default_handler and not ch else ch
if fh and not isinstance(fh, LoggerFileHandler):
raise TypeError('The parameter fh must be <LoggerFileHandler> type.')
if ch and not isinstance(ch, LoggerConsHandler):
raise TypeError('The parameter ch must be <LoggerConsHandler> type.')
self.logger = logging.getLogger(name)
self.logger.setLevel(self.levels[default_level])
if fh:
fhandler = logging.FileHandler(filename=fh.log, mode=fh.mod, encoding='utf-8')
self.handler_list.append(fhandler)
fhandler.setLevel(self.levels[fh.lev or default_level])
fh.fmt = fh.fmt or self.default_format
fh.sty = '{' if '%' not in fh.fmt else '%'
fhandler.setFormatter(logging.Formatter(fmt=fh.fmt, style=fh.sty))
self.logger.addHandler(fhandler)
if ch:
chandler = logging.StreamHandler()
self.handler_list.append(chandler)
chandler.setLevel(self.levels[ch.lev or default_level])
ch.fmt = ch.fmt or self.default_format
ch.sty = '{' if '%' not in ch.fmt else '%'
chandler.setFormatter(logging.Formatter(fmt=ch.fmt, style=ch.sty))
self.logger.addHandler(chandler)
self.d = self.logger.debug
self.i = self.logger.info
self.w = self.logger.warning
self.e = self.logger.error
self.f = self.logger.fatal
self.c = self.logger.critical
logger = Logger(name='main', fh=LoggerFileHandler(log_file='%s.log' % (os.path.splitext(__file__)[0],), mode='w'), ch=LoggerConsHandler())
class AudioPlayer:
def __init__(self):
self.playlist = []
self.curr_track_index = 0
self.play_thread = None
self.play_status = None
def next_track(self):
self.curr_track_index = (self.curr_track_index + 1) % len(self.playlist)
def prev_track(self):
self.curr_track_index = (self.curr_track_index - 1) % len(self.playlist)
def curr_track(self):
return self.playlist[self.curr_track_index]
def play_process(self):
self.play_status = 1
try:
self.callback_play()
except Exception:
pass
while 1:
try:
file = self.curr_track()
try:
self.callback_name(os.path.basename(file))
except Exception:
pass
data, fs = sf.read(file, dtype='float32')
if (self.play_status == 1) == 1:
logger.i('%s%s' % ('Play ', file))
sd.play(data, fs)
sd.wait()
if (self.play_status != 1) == 1:
logger.i('%s%s' % ('Stopped', ''))
break
self.next_track()
except PortAudioError:
self.next_track()
logger.w('Audio device abnormal')
time.sleep(0.15)
break
except Exception as err:
self.next_track()
logger.w(str(err))
break
try:
self.callback_stop()
except Exception:
pass
self.play_status = 0
def play(self):
if not self.play_status:
self.play_thread = threading.Thread(target=self.play_process)
self.play_thread.start()
def stop(self):
self.play_status = 0
sd.stop()
time.sleep(0.05)
def next(self):
self.stop()
self.next_track()
self.play()
def prev(self):
self.stop()
self.prev_track()
self.play()
@staticmethod
def callback_play():
pass
@staticmethod
def callback_stop():
pass
@staticmethod
def callback_name(name):
pass
r = os.path.dirname(__file__)
player = AudioPlayer()
def is_audio_device_connected(device_name):
audio = pyaudio.PyAudio()
device_list = []
for i in range(audio.get_device_count()):
device_info = audio.get_device_info_by_index(i)
if device_info.get('maxInputChannels') > 0 or device_info.get('maxOutputChannels') > 0:
device_list.append(device_info['name'])
audio.terminate()
for item in device_list:
if device_name in item:
return True
return False
class MainWindow(QWidget):
def __init__(self):
super().__init__()
if (sd.default.device[1] == -1) == 1:
QMessageBox.warning(self, '错误', '无可用声音输出设备')
sys.exit()
self.play_controlled = 0
try:
self.audio_device_name = open(os.path.join(os.path.dirname(__file__), 'AudioDevice.ini'), 'r', encoding='utf-8').read().strip()
except Exception as err:
self.audio_device_name = '*'
QMessageBox.warning(self, '错误', '%s' % (str(err),))
sys.exit()
player.playlist = [f for f in [os.path.join(r, f) for f in os.listdir(r)] if os.path.isfile(os.path.join(r, f)) and (f.endswith('.mp3') or f.endswith('.wav'))]
if (len(player.playlist) == 0) == 1:
QMessageBox.warning(self, '错误', '播放列表为空')
sys.exit()
self.title = 'main'
self.play_name = ''
self.setWindowTitle(self.title)
self.setGeometry(0, 0, 200, 100)
self.status_label = QLabel('暂无节目播放', self)
self.status_label.setStyleSheet('font-size: 20px; font-family: \'Microsoft YaHei\'; color: #ffffff;')
self.status_label.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
self.status_label.setFixedSize(200, 40)
self.bt_next_shortcut = 'F5'
self.bt_next = QPushButton('%s(%s)' % ('切换', self.bt_next_shortcut), self)
self.bt_next.setShortcut(self.bt_next_shortcut)
self.bt_next.setStyleSheet('font-size: 14px; font-family: \'Microsoft YaHei\'; color: #ffffff;')
self.bt_next.clicked.connect(self.button_next)
self.bt_stop_shortcut = 'F6'
self.bt_stop = QPushButton('%s(%s)' % ('停止', self.bt_stop_shortcut), self)
self.bt_stop.setShortcut(self.bt_stop_shortcut)
self.bt_stop.setStyleSheet('font-size: 14px; font-family: \'Microsoft YaHei\'; color: #ffffff;')
self.bt_stop.clicked.connect(self.button_stop)
layout = QHBoxLayout()
layout.addWidget(self.status_label)
layout.addWidget(self.bt_next)
layout.addWidget(self.bt_stop)
self.setLayout(layout)
self.setWindowFlag(Qt.WindowStaysOnTopHint) # 窗口保持在顶层
self.setFixedSize(420, 125)
self.update_title = QTimer()
self.update_title.timeout.connect(self.update_window_title)
self.update_title.start(500)
self.timer = QTimer()
self.timer.timeout.connect(self.check_device_status)
self.timer.start(500)
screen_geometry = QApplication.desktop().availableGeometry()
self.move(screen_geometry.bottomRight() - self.rect().bottomRight() - QPoint(0, 35))
self.show()
def closeEvent(self, event):
player.stop()
event.accept()
def update_window_title(self):
self.setWindowTitle(self.title)
def check_device_status(self):
if self.audio_device_name == '*' or is_audio_device_connected(self.audio_device_name):
self.setStyleSheet('background-color: #62AC3E;')
if (self.play_controlled == 0) == 1:
self.play_controlled = 1
# audio device connect
bat_thread = threading.Thread(
target=subprocess.run,
args=(os.path.join(os.path.dirname(__file__), 'EventConnect.bat'),),
kwargs={'shell': True, 'stdout': subprocess.DEVNULL, 'stderr': subprocess.DEVNULL}
)
bat_thread.start()
player.play()
else:
self.setStyleSheet('background-color: #353535;')
if (self.play_controlled == 1) == 1:
self.play_controlled = 0
# audio device disconn
bat_thread = threading.Thread(
target=subprocess.run,
args=(os.path.join(os.path.dirname(__file__), 'EventDisconn.bat'),),
kwargs={'shell': True, 'stdout': subprocess.DEVNULL, 'stderr': subprocess.DEVNULL}
)
bat_thread.start()
player.stop()
@staticmethod
def button_next():
player.next()
@staticmethod
def button_stop():
player.stop() if player.play_status else player.play()
def event_play(self):
self.status_label.setStyleSheet('font-size: 20px; font-family: \'Microsoft YaHei\'; color: #ffffff;')
self.title = '正在播放'
self.bt_stop.setText('%s(%s)' % ('停止', self.bt_stop_shortcut))
self.bt_stop.setShortcut(self.bt_stop_shortcut)
bat_thread = threading.Thread(
target=subprocess.run,
args=(os.path.join(os.path.dirname(__file__), 'EventPlay.bat'),),
kwargs={'shell': True, 'stdout': subprocess.DEVNULL, 'stderr': subprocess.DEVNULL}
)
bat_thread.start()
def event_stop(self):
self.status_label.setStyleSheet('font-size: 20px; font-family: \'Microsoft YaHei\'; color: #dd0000;')
self.title = '停止播放'
self.bt_stop.setText('%s(%s)' % ('播放', self.bt_stop_shortcut))
self.bt_stop.setShortcut(self.bt_stop_shortcut)
bat_thread = threading.Thread(
target=subprocess.run,
args=(os.path.join(os.path.dirname(__file__), 'EventStop.bat'),),
kwargs={'shell': True, 'stdout': subprocess.DEVNULL, 'stderr': subprocess.DEVNULL}
)
bat_thread.start()
def event_name(self, name):
self.play_name = name
self.status_label.setText(self.play_name)
if __name__ == '__main__':
for i in site.getsitepackages():
i.endswith('site-packages') and QCoreApplication.addLibraryPath(os.path.abspath(os.path.join(i, 'PyQt5/Qt5/plugins')))
app = QApplication(sys.argv)
window_main = MainWindow()
player.callback_play = window_main.event_play
player.callback_stop = window_main.event_stop
player.callback_name = window_main.event_name
player.play()
sys.exit(app.exec_())