import os import re import sys import time import json import socket import base64 import shutil import signal import hashlib import inspect import tempfile import platform import subprocess from pathlib import Path from PyQt5.QtWidgets import QApplication, QMessageBox, QMenu, QAction, QWidget, QVBoxLayout, QTableWidget, QTableWidgetItem, QLabel, QFrame, QLineEdit, QMainWindow, QDesktopWidget, QHBoxLayout, QPushButton, QDialog from PyQt5.QtCore import Qt, QCoreApplication, QDateTime, QTimer from PyQt5.QtGui import QFont, QPalette, QColor, QBrush, QIcon __file__ = os.path.abspath(sys.argv[0]) def _fd(f): return f.fileno() if hasattr(f, 'fileno') else f if os.name == 'nt': import msvcrt from ctypes import (sizeof, c_ulong, c_void_p, c_int64, Structure, Union, POINTER, windll, byref) from ctypes.wintypes import BOOL, DWORD, HANDLE LOCK_SH = 0x0 LOCK_NB = 0x1 LOCK_EX = 0x2 LOCK_UN = 0x9 if sizeof(c_ulong) != sizeof(c_void_p): ULONG_PTR = c_int64 else: ULONG_PTR = c_ulong PVOID = c_void_p class _OFFSET(Structure): _fields_ = [ ('Offset', DWORD), ('OffsetHigh', DWORD) ] class _OFFSET_UNION(Union): _fields_ = [ ('_offset', _OFFSET), ('Pointer', PVOID) ] _anonymous_ = ['_offset'] class OVERLAPPED(Structure): _fields_ = [ ('Internal', ULONG_PTR), ('InternalHigh', ULONG_PTR), ('_offset_union', _OFFSET_UNION), ('hEvent', HANDLE) ] _anonymous_ = ['_offset_union'] LPOVERLAPPED = POINTER(OVERLAPPED) LockFileEx = windll.kernel32.LockFileEx LockFileEx.restype = BOOL LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED] UnlockFileEx = windll.kernel32.UnlockFileEx UnlockFileEx.restype = BOOL UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED] def flock(f, flags): hfile = msvcrt.get_osfhandle(_fd(f)) overlapped = OVERLAPPED() if flags == LOCK_UN: ret = UnlockFileEx( hfile, 0, 0, 0xFFFF0000, byref(overlapped) ) else: ret = LockFileEx( hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped) ) return bool(ret) else: try: import fcntl LOCK_SH = fcntl.LOCK_SH LOCK_NB = fcntl.LOCK_NB LOCK_EX = fcntl.LOCK_EX LOCK_UN = fcntl.LOCK_UN except (ImportError, AttributeError): LOCK_EX = LOCK_SH = LOCK_NB = 0 def flock(f, flags): return flags == LOCK_UN else: def flock(f, flags): return fcntl.flock(_fd(f), flags) == 0 class SafeJSONConfigReader: def __init__(self, file: str): try: self.data = json.loads(open(file=os.path.abspath(file), mode='r', encoding='utf-8').read()) except (json.JSONDecodeError, FileNotFoundError, PermissionError): self.data = None except (Exception,): self.data = None def is_loaded_success(self): return self.data is not None def get(self, key, default=None): if (self.data is None) == 1: return default keys = key.split('.') value = self.data try: for k in keys: if isinstance(value, dict): value = value.get(k) else: return default if (value is None) == 1: return default return value except (AttributeError, TypeError): return default class CustomLineEdit(QLineEdit): def __init__(self, parent=None): super().__init__(parent) self.setContextMenuPolicy(Qt.CustomContextMenu) self.customContextMenuRequested.connect(self.show_context_menu) def show_context_menu(self, pos): menu = QMenu(self) action_c = menu.addAction('复制') action_c.triggered.connect(self.copy_text_content) action_c.setEnabled(self.hasSelectedText()) action_p = menu.addAction('粘贴') action_p.triggered.connect(self.paste) action_p.setEnabled(self.isEnabled()) menu.exec_(self.mapToGlobal(pos)) def copy_text_content(self): self.hasSelectedText() and self.copy() class MessageBox(QDialog): def __init__(self, title, text, parent=None): super().__init__(parent) self.setWindowTitle(title) dark_palette = QPalette() dark_palette.setColor(QPalette.Window, QColor(53, 53, 53)) dark_palette.setColor(QPalette.WindowText, Qt.white) dark_palette.setColor(QPalette.Base, QColor(35, 35, 35)) dark_palette.setColor(QPalette.Text, Qt.white) dark_palette.setColor(QPalette.Button, QColor(53, 53, 53)) dark_palette.setColor(QPalette.ButtonText, Qt.white) self.setPalette(dark_palette) self.setStyleSheet(""" QPushButton { background: #444; padding: 8px 16px; border-radius: 4px; color: white; min-width: 80px; } QPushButton:hover { background: #555; } QLabel { color: white; } """) main_layout = QVBoxLayout(self) main_layout.setContentsMargins(20, 20, 20, 20) main_layout.setSpacing(20) self.labels_content = QLabel(text) self.labels_content.setFont(QFont('Consolas', 12)) self.labels_content.setWordWrap(True) self.labels_content.setAlignment(Qt.AlignLeft | Qt.AlignVCenter) main_layout.addWidget(self.labels_content, 1) self.button_confirm = QPushButton('确定') self.button_confirm.setFont(QFont('Microsoft YaHei', 12)) self.button_confirm.clicked.connect(self.accept) main_layout.addWidget(self.button_confirm, 0, Qt.AlignCenter) self.button_confirm.setFocus() self.adjustSize() self.setFixedSize(self.size()) @classmethod def show_message(cls, title, text, parent=None): return cls(title, text, parent).exec_() class TextInputDialog(QDialog): def __init__(self, title, default_text, regexp, placeholder, parent=None): super().__init__(parent) self.setWindowTitle('%s' % (title,)) self.setFixedSize(450, 155) self.regexp = regexp dark_palette = QPalette() dark_palette.setColor(QPalette.Window, QColor(53, 53, 53)) dark_palette.setColor(QPalette.WindowText, Qt.white) dark_palette.setColor(QPalette.Base, QColor(35, 35, 35)) dark_palette.setColor(QPalette.Text, Qt.white) self.setPalette(dark_palette) self.setStyleSheet(""" QLineEdit { background: #2a2a2a; border: 1px solid #444; border-radius: 3px; padding: 5px; color: white; } """) main_layout = QVBoxLayout(self) main_layout.setContentsMargins(15, 15, 15, 15) self.inputs_text = CustomLineEdit(default_text) self.inputs_text.setFont(QFont('Consolas', 14)) self.inputs_text.setPlaceholderText(placeholder) self.inputs_text.returnPressed.connect(self.validate_and_accept) self.inputs_text.selectAll() main_layout.addWidget(self.inputs_text) def validate_and_accept(self): text = self.inputs_text.text().strip() if (self.regexp == '' or bool(re.match(self.regexp, text))) == 1: self.accept() else: self.inputs_text.setStyleSheet('border: 1px solid red;') self.inputs_text.setFocus() def text(self): return self.inputs_text.text().strip() class MainWindow(QMainWindow): def __init__(self, app_name, app_version): super().__init__() self.app_name = app_name self.app_version = app_version self.operators = 'Unknown' self.equipment = 'Unknown' self.host = None self.port = None self.station = None self.scan_regexp = None self.scanning_record = [] self.scanning_latest = [] self.setWindowTitle('%s %s' % (self.app_name, self.app_version)) self.setWindowIcon(QIcon(os.path.join(os.path.dirname(__file__), 'favicon.ico'))) self.setMinimumSize(925, 728) frame = self.frameGeometry() center_point = QDesktopWidget().availableGeometry().center() frame.moveCenter(center_point) self.move(frame.topLeft()) dark_palette = QPalette() dark_palette.setColor(QPalette.Window, QColor(53, 53, 53)) dark_palette.setColor(QPalette.WindowText, Qt.white) dark_palette.setColor(QPalette.Base, QColor(35, 35, 35)) dark_palette.setColor(QPalette.AlternateBase, QColor(53, 53, 53)) dark_palette.setColor(QPalette.ToolTipBase, Qt.white) dark_palette.setColor(QPalette.ToolTipText, Qt.white) dark_palette.setColor(QPalette.Text, Qt.white) dark_palette.setColor(QPalette.Button, QColor(53, 53, 53)) dark_palette.setColor(QPalette.ButtonText, Qt.white) dark_palette.setColor(QPalette.BrightText, Qt.red) dark_palette.setColor(QPalette.Highlight, QColor(142, 45, 197).lighter()) dark_palette.setColor(QPalette.HighlightedText, Qt.black) self.setPalette(dark_palette) self.setStyleSheet(""" QLineEdit, QTableWidget { background: #2a2a2a; border: 2px solid #444; border-radius: 5px; padding: 5px; color: white; } QHeaderView::section { background-color: #444; color: white; padding: 4px; border: 1px solid #555; } QTableWidget::item { padding: 5px; } QPushButton { background: #444; padding: 5px 10px; border-radius: 4px; color: white; } QPushButton:hover { background: #555; } """) central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) main_layout.setContentsMargins(20, 20, 20, 20) main_layout.setSpacing(20) scan_layout = QVBoxLayout() scan_layout.setSpacing(15) self.inputs_scan = CustomLineEdit() self.inputs_scan.setFont(QFont('Consolas', 24)) self.inputs_scan.setAlignment(Qt.AlignCenter) self.inputs_scan.setPlaceholderText('') self.inputs_scan.setMaxLength(256) self.inputs_scan.returnPressed.connect(self.on_scan) self.labels_status = QLabel('') self.labels_status.setFont(QFont('Microsoft YaHei', 24)) self.labels_status.setAlignment(Qt.AlignCenter) self.labels_status.setAutoFillBackground(True) self.set_status('wait', '') scan_layout.addWidget(self.inputs_scan) scan_layout.addWidget(self.labels_status) self.labels_history = QLabel('扫描历史') self.labels_history.setFont(QFont('Microsoft YaHei', 14)) self.labels_history.setStyleSheet('color: white;') self.tables_history = QTableWidget() self.tables_history.setColumnCount(4) self.tables_history.setHorizontalHeaderLabels(['时间', '条码', '结果', '详情']) self.tables_history.horizontalHeader().setStretchLastSection(True) self.tables_history.verticalHeader().setVisible(False) self.tables_history.setEditTriggers(QTableWidget.NoEditTriggers) self.tables_history.setSelectionBehavior(QTableWidget.SelectRows) self.tables_history.setFont(QFont('Consolas', 12)) self.tables_history.setColumnWidth(0, 200) self.tables_history.setColumnWidth(1, 435) self.tables_history.setColumnWidth(2, 80) foot_layout = QHBoxLayout() self.labels_operator = QLabel('') self.labels_operator.setFont(QFont('Microsoft YaHei', 14)) self.labels_operator.setStyleSheet('color: white;') self.button_change_operator = QPushButton('变更作业人员') self.button_change_operator.setFont(QFont('Microsoft YaHei', 12)) self.button_change_operator.setStyleSheet('color: white;') self.button_change_operator.clicked.connect(self.on_show_dialog_change_operator) self.button_defect = QPushButton('不良发现扫描') self.button_defect.setFont(QFont('Microsoft YaHei', 12)) self.button_defect.setStyleSheet('color: white;') self.button_defect.clicked.connect(self.on_show_dialog_defect) foot_layout.addWidget(self.labels_operator) foot_layout.addStretch() foot_layout.addWidget(self.button_defect) foot_layout.addWidget(self.button_change_operator) main_layout.addLayout(scan_layout) main_layout.addWidget(self.labels_history) main_layout.addWidget(self.tables_history) main_layout.addLayout(foot_layout) self.init() self.show() QTimer.singleShot(35, self.init_delay) def init(self): self.set_operators('Operator') self.set_equipment('%s' % (platform.node(),)) def init_delay(self): config = os.path.join(os.path.dirname(__file__), '%s.json' % (os.path.splitext(os.path.basename(__file__))[0],)) reader = SafeJSONConfigReader(config) if (not reader.is_loaded_success()) == 1: MessageBox.show_message('配置文件错误', '%s' % (config,), self) self.exit() return None data = reader.get('window_title'), reader.get('server_host'), reader.get('server_port'), reader.get('station_id'), reader.get('scan_regexp') if (not data[1] or not data[2] or not data[3]) == 1: MessageBox.show_message('配置文件错误', '%s' % ('主机信息或工站配置错误',), self) self.exit() return None try: self.setWindowTitle('%s' % (data[0],)) self.host, self.port, self.station, self.scan_regexp = str(data[1]).strip(), int(data[2]), str(data[3]).strip(), data[4] except (Exception,) as e: MessageBox.show_message('错误', '%s' % (e,), self) self.exit() return None @staticmethod def exit(): QApplication.quit() @staticmethod def mes_upload(host: str, port: int, station_id: str, barcode: str, operators: str, equipment: str, result: int): try: client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client_socket.settimeout(3.0) client_socket.connect((host, port)) client_socket.sendall(('100,%s,%s,%s,%s,%s,%s' % (station_id, barcode, result, operators, equipment, equipment)).encode('utf-8')) received_data = str(client_socket.recv(1024).decode('GBK')) client_socket.close() if received_data.find(base64.b64decode(bytes('5p2h56CB5LiN5a2Y5Zyo', encoding='utf-8')).decode()) != -1: return 2, '不存在的条码' if received_data.find(base64.b64decode(bytes('5Lqn5ZOB5Z6L5Y+36ZSZ6K+v', encoding='utf-8')).decode()) != -1: return 2, '产品型号错误' if received_data.find(base64.b64decode(bytes('5bel5bqP5LqS5qOA5aSx6LSl', encoding='utf-8')).decode()) != -1: return 2, '工序互检失败' if received_data.find(base64.b64decode(bytes('6L+U5L+u5aSE55CG', encoding='utf-8')).decode()) != -1: return 2, '条码已被锁定' if received_data.find(base64.b64decode(bytes('5LiK5bel5bqPTkc=', encoding='utf-8')).decode()) != -1: return 2, '上道工序不良' if received_data.find(base64.b64decode(bytes('5rKh5pyJ5byC5bi4', encoding='utf-8')).decode()) != -1: return 0, '' print(received_data) return 9, '未知错误信息' except socket.timeout: return 1, '网络连接超时' def set_status(self, status=None, message=None): palette = self.labels_status.palette() if status == 'wait': palette.setColor(QPalette.WindowText, Qt.white) palette.setColor(QPalette.Window, QColor(80, 80, 80)) self.labels_status.setPalette(palette) if status == 'pass': palette.setColor(QPalette.WindowText, Qt.white) palette.setColor(QPalette.Window, QColor(101, 200, 68)) self.labels_status.setPalette(palette) QTimer.singleShot(500, lambda: [palette.setColor(QPalette.Window, QColor(80, 80, 80)), self.labels_status.setPalette(palette)][0]) self.inputs_scan.setStyleSheet(""" QLineEdit { background: #2a2a2a; border: 2px solid #444; border-radius: 5px; padding: 5px; color: white; } """) if status == 'fail': palette.setColor(QPalette.WindowText, Qt.white) palette.setColor(QPalette.Window, QColor(222, 61, 66)) self.labels_status.setPalette(palette) self.inputs_scan.setStyleSheet('border: 2px solid red;') self.labels_status.setText(message if message else '') def set_operators(self, data): self.operators = str(data).strip() self.labels_operator.setText('作业人员: %s' % (self.operators,)) print('人员变更 => %s' % (self.operators,)) def set_equipment(self, data): self.equipment = str(data).strip() print('设备变更 => %s' % (self.equipment,)) def add_scan_record(self, timestr, timestamp, barcode, results, details): max_records = 128 self.scanning_record.insert(0, [timestr, timestamp, barcode, results, details]) if (len(self.scanning_record) > max_records) == 1: self.scanning_record = self.scanning_record[:max_records] self.tables_history.setRowCount(len(self.scanning_record)) for row, (_timestr, _timestamp, _barcode, _results, _details) in enumerate(self.scanning_record): timestr_item = QTableWidgetItem(_timestr) timestr_item.setTextAlignment(Qt.AlignCenter | Qt.AlignVCenter) self.tables_history.setItem(row, 0, timestr_item) barcode_item = QTableWidgetItem(_barcode) barcode_item.setTextAlignment(Qt.AlignLeft | Qt.AlignVCenter) self.tables_history.setItem(row, 1, barcode_item) results_item = QTableWidgetItem(_results) results_item.setTextAlignment(Qt.AlignCenter | Qt.AlignVCenter) results_item.setForeground(QBrush(QColor(0, 255, 0))) if _results == 'PASS' else results_item.setForeground(QBrush(QColor(255, 0, 0))) self.tables_history.setItem(row, 2, results_item) details_item = QTableWidgetItem(_details) details_item.setTextAlignment(Qt.AlignLeft | Qt.AlignVCenter) self.tables_history.setItem(row, 3, details_item) def on_scan(self, barcode: str = '', result: int = 1): edit = self.inputs_scan text = str(barcode).strip() if barcode else edit.text().strip() edit.clear() if barcode else None if (result not in [1, 2]) == 1: return None if (text == '') == 1: return None if (len(text) > 128) == 1: self.set_status('fail', '扫描内容过长') edit.clear() return None if (self.scan_regexp and isinstance(self.scan_regexp, str) and not re.match(self.scan_regexp, text)) == 1: self.set_status('fail', '规则验证失败') edit.selectAll() return None if (self.scanning_latest and self.scanning_latest[1] == text and (time.time() < self.scanning_latest[0])) == 1: self.scanning_latest[2] and self.set_status('fail', self.scanning_latest[2]) edit.selectAll() return None mes_upload_response = self.mes_upload(self.host, self.port, self.station, text, self.operators, self.equipment, result) if (mes_upload_response[0] != 0) == 1: self.scanning_latest = [time.time() + 1, text, ''] self.set_status('fail', mes_upload_response[1]) edit.selectAll() return None else: self.scanning_latest = [time.time() + 5, text, '重复扫描'] self.set_status('pass', '上传成功') self.add_scan_record(QDateTime.currentDateTime().toString('yyyy-MM-dd hh:mm:ss'), int(time.time()), text, {1: 'PASS', 2: 'FAIL'}[result], '') edit.clear() return None def on_show_dialog_change_operator(self): dialog = TextInputDialog(title='变更作业人员', default_text=self.operators, regexp='^[0-9A-Za-z\\-]{6,12}$', placeholder='', parent=self) if (dialog.exec_() == QDialog.Accepted) == 1: operator = dialog.text() operator and self.set_operators(operator) self.inputs_scan.setFocus() def on_show_dialog_defect(self): dialog = TextInputDialog(title='不良发现扫描', default_text='', regexp='^.{1,128}$', placeholder='', parent=self) if (dialog.exec_() == QDialog.Accepted) == 1: products = dialog.text() products and self.on_scan(products, 2) self.inputs_scan.setFocus() class MainRunner: def __init__(self): signal.signal(signal.SIGINT, self._handle_interrupt) self.app_name = 'MesInterface' self.app_version = '1.0.0.0' self.app_publisher = 'zhaoyafan' self.app_publisher_url = 'https://www.fanscloud.net/' self.application = None self.window = None def _copy_files_and_directories(self, src, dst): function_name = inspect.currentframe().f_code.co_name if (os.path.exists(src)) != 1: return None if (os.path.isdir(src)) == 1: if not os.path.exists(dst): os.makedirs(dst) for item in os.listdir(src): s = os.path.join(src, item) d = os.path.join(dst, item) if os.path.isdir(s): self.__getattribute__(function_name)(s, d) else: shutil.copy(s, d) else: shutil.copy(src, dst) def run(self): QApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True) QApplication.setHighDpiScaleFactorRoundingPolicy(Qt.HighDpiScaleFactorRoundingPolicy.PassThrough) self.application = QApplication(sys.argv) self.window = MainWindow(app_name=self.app_name, app_version=self.app_version) sys.exit(self.application.exec_()) def build(self): if (str(__file__).endswith('.py')) == 0: print('Build is not currently supported.', file=sys.stderr) exit(1) from_home = os.path.dirname(os.path.abspath(__file__)) dist_home = os.path.join(os.path.dirname(__file__), '%s.dist' % (os.path.splitext(os.path.basename(__file__))[0],)) ask = input('%s %s: ' % ('Build?', '[Y/n]')) if ask.lower().strip() == 'y': subprocess.run(['pip', 'install', 'nuitka', '-q'], shell=True, env=os.environ) subprocess.run([ 'python', '-m', 'nuitka', '--standalone', '--enable-plugin=pyqt5', '--include-module=PyQt5', '--windows-console-mode=disable', '--windows-icon-from-ico=favicon.ico', '--product-name=%s' % (self.app_name,), '--file-description=%s' % (self.app_name,), '--product-version=%s' % (self.app_version,), '--copyright=Copyright (C) 2025', '--output-dir=%s' % (os.path.join(os.path.dirname(__file__)),), '%s' % (__file__,) ], shell=True, env=os.environ) for i in ['PyQt5', 'favicon.ico', 'MesInterface.json']: self._copy_files_and_directories('%s/%s' % (from_home, i), '%s/%s' % (dist_home, i)) else: if (not os.path.exists(dist_home)) == 1: return None ask = input('%s %s: ' % ('Compile setup program?', '[Y/n]')) if ask.lower().strip() == 'y': compile_file = os.path.join(os.path.dirname(__file__), '%s.iss' % (os.path.splitext(os.path.basename(__file__))[0],)) compile_template = os.path.join(os.path.dirname(__file__), '%s.iss.template' % (os.path.splitext(os.path.basename(__file__))[0],)) compiler = 'C:\\Program Files (x86)\\Inno Setup 6\\ISCC.exe' if (os.path.exists(compile_template)) != 1: print('The template file \"%s\" does not exist.' % (compile_template,), file=sys.stderr) return None if (os.path.exists(compiler)) != 1: print('The compiler \"%s\" does not exist. Please check if Inno Setup is installed. You can download it at https://www.innosetup.com/' % (compiler,), file=sys.stderr) return None Path(compile_file).write_text( Path(compile_template).read_text().replace( '%APPNAME%', self.app_name ).replace( '%APPEXEC%', os.path.splitext(os.path.basename(__file__))[0] ).replace( '%APPVERSION%', self.app_version ).replace( '%APPBUILDDATE%', time.strftime('%Y%m%d', time.localtime()) ).replace( '%APPPUBLISHER%', self.app_publisher ).replace( '%APPPUBLISHERURL%', self.app_publisher_url ).replace( '%DISABLEX64%', '' if platform.architecture()[0] == '64bit' else '; ' ) ) subprocess.run([compiler, compile_file]) def _handle_interrupt(self, _signal, _frame): print('Exit.', file=sys.stderr) self.handle_interrupt() def handle_interrupt(self): try: self.window.exit() except Exception as e: print(e, file=sys.stderr) if __name__ == '__main__': if (os.path.basename(__file__).lower().endswith('.int')) == 1: QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'site-packages/PyQt5/Qt5/plugins')) else: QCoreApplication.addLibraryPath(os.path.join(os.path.dirname(__file__), 'PyQt5/Qt5/plugins')) f_lock = open(file=os.path.join(tempfile.gettempdir(), '%s.lock' % hashlib.md5(bytes(__file__, encoding='utf-8')).hexdigest()[:16]), mode='w', encoding='utf-8') if (not flock(f_lock, LOCK_EX | LOCK_NB)) == 1: app = QApplication(sys.argv) msg = QMessageBox() msg.setIcon(QMessageBox.Warning) msg.setText('The application is already running.') msg.setWindowTitle('Warning') msg.setStandardButtons(QMessageBox.Cancel) msg.exec_() app.exit(1) sys.exit(1) else: MainRunner().run() if (len(sys.argv) > 1 and sys.argv[1] == '--build') == 0 else MainRunner().build()