第一句子网 - 唯美句子、句子迷、好句子大全
第一句子网 > python TCP套接字服务器v1.1-新增服务端命令功能及修改bug(socket+PyQt5)

python TCP套接字服务器v1.1-新增服务端命令功能及修改bug(socket+PyQt5)

时间:2019-07-05 14:51:56

相关推荐

python TCP套接字服务器v1.1-新增服务端命令功能及修改bug(socket+PyQt5)

TCP聊天服务器套接字v1.1

所有版本记录:

v1.0: TCP聊天服务器套接字|PyQt5+socket(TCP端口映射+端口放行)+logging+Thread(含日志,html)+anaconda打包32位exe(3.4万字)|python高阶

文章目录

| 1. 服务器代码改进 / bug改进(1).发送函数改为@function(2).异常运行函数改为三叠函数(3).服务端在下线时列表在遍历时 增加下线的服务端 -> 触发RuntimeError(4)获取真正本机的ip地址 | 2.新增命令功能 (在输入框添加"/")| 3.新增cmd控制台颜色改变| 4.客户端PyQt5信息过快使Textedit刷新空白| 5.PyQt5 消息超过一页下拉浪费时间: 指针至最后| 6.服务端socket多次连接不同地址错误| 7. 完整代码

| 1. 服务器代码改进 / bug改进

(1).发送函数改为@function

class Server():...def send(self, sock, user, mes):self.QUIT(user, lambda: sock.sendall(mes.encode(self.encode)))()

def send(self, sock, user, mes):@self.QUIT(user)def func():sock.sendall(mes.encode(self.encode))return func()

(2).异常运行函数改为三叠函数

def QUIT(self, user, command):sock = list(self.connect.keys())[list(self.connect.values()).index(user)]def logs(*args, **kargs):try:command(*args, **kargs)except:self.errs.append(sock)return logs

def QUIT(self, user):sock = list(self.connect.keys())[list(self.connect.values()).index(user)]def prop(command, *args, **kwargs):def logs(*args, **kwargs):try:command(*args, **kwargs)return Trueexcept:logging.exception(str())if self.errduring is False:self.errs.append((sock,user)) #在遍历时列表增加 -> 触发RuntimeErrorreturn Falsereturn logsreturn prop

(3).服务端在下线时列表在遍历时 增加下线的服务端 -> 触发RuntimeError

bug改进方法:

增加errduring;inc参数

def ServerMessage(self, mes):for sock, user in self.connect.items():if not sock in self.errs:self.send(sock, user, mes)def UserMessage(self, address, _user, mes):if not mes:returnmes = mes.decode(encoding="utf8")for sock, user in self.connect.items():if not sock in self.errs:send_message = Server.user_message % ("brown" if _user == user else "red",_user,address,"(我自己)" if _user == user else "",mes)self.send(sock, user, send_message)logger.info(f"{address}[{_user}] : {mes}")self.error_handle()def error_handle(self):for sock in self.errs:sock.close()logger.exception(msg = str())Q = Server.quit_message % userlogger.info(Q)self.connect.pop(sock)self.ServerMessage(Q)

def ServerMessage(self, mes, inc=True):self.errduring = Truefor sock, user in self.connect.items():if not sock in self.errs:self.send(sock, user, mes)if inc:self.error_handle()self.errduring = Falsedef UserMessage(self, address, _user, mes,inc=True):if not mes:returnself.errduring = Truefor sock, user in self.connect.items():if not sock in self.errs:send_message = Server.user_message % ("brown" if _user == user else "red",_user,address,"(我自己)" if _user == user else "",mes)self.send(sock, user, send_message)logger.info(f"{address}[{_user}] : {mes}")if inc:self.error_handle()self.errduring = Falsedef error_handle(self):for _, user in self.errs: #使用变量_ : 为了防止触发KeyError: <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>if user in list(self.connect.values()):sock = list(self.connect.keys())[list(self.connect.values()).index(user)]logger.exception(msg = str())Q = Server.quit_message % (user, self._str_sockets())logger.info(Q)self.connect.pop(sock) self.ServerMessage(Q, False) #防止多次调用error_handle函数sock.close()

(4)获取真正本机的ip地址

如果直接用socket.gethostbyname(socket.gethostname())获取地址,很有可能是错误的(Vmware虚拟机的地址127.0.0.1等)

搜索后我得到了下面这段精巧的代码:

...def get_host_ip() -> str:"""get current IP address"""try:s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)s.connect(('8.8.8.8', 80))ip = s.getsockname()[0]finally:s.close()return ip

通过UDP尝试连接8.8.8.8:80,不管是否连接成功,获得的本机IP一定是正确的。在Ubuntu(kivydev)和Windows上都可用,还省去了判断操作系统的大段代码.

那么代码可以改为:

server = Server(socket.gethostname(),429)

server = Server(get_host_ip(),429)

| 2.新增命令功能 (在输入框添加"/")

class Command_Handler(object):def __init__(self, bind=None):"""Bind Server class"""self.bind = binddef _function(self, _list, client):data = {"/info" : {"-v": self.get_version(),"-id" : self.get_id(client),"-i" : self.info(),"-h" : self.help()},}_dict = datafor n in range(len(_list)):if type(_dict) == dict:_dict = _dict.get(_list[n], self.unknown(" ".join(_list)))else:breakif type(_dict) == dict:_dict = "Error:\n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(_dict.keys()) return _dict@staticmethoddef help():return """/info [-v] [-id] [-i]-v : get version of program.-id : get your id.-i : get information.-h : help.For example, <font color=red>/info -id</font>"""@staticmethoddef get_version():return "version : " + str(__version__)def get_id(self, client):return "Your id is {}.".format(id(client))def info(self):return f"Socket Server[version {self.get_version()}] By zmh."def unknown(self,s):return """Error:No command named "%s". Please search [/info -h] to help.%s""" % (s, self.help())def cut(self, string):return string.strip().split()def handler(self, c, client=None):return "<font color='gray'>[command] </font><font color='brown'>%s</font>\n%s" % (c,str(self._function(self.cut(c),client)))def iscommand(self,i):return i.strip().startswith("/")

class Server():def __init__(self, addr, port, backlog = 10, encode = 'utf8'):... = Command_Handler(self)self.encode = encodeself.errs = []self.errduring = False

| 3.新增cmd控制台颜色改变

我的文章: Python 命令行cmd指定颜色设置

#color.pyimport ctypes,timeSTD_INPUT_HANDLE = -10 STD_OUTPUT_HANDLE= -11 STD_ERROR_HANDLE = -12std_out_handle=ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)class Text():DARKBLUE = 0x01 # 暗蓝色DARKGREEN = 0x02 # 暗绿色DARKSKYBLUE = 0x03 # 暗天蓝色DARKRED = 0x04 # 暗红色DARKPINK = 0x05 # 暗粉红色DARKYELLOW = 0x06 # 暗黄色DARKWHITE = 0x07 # 暗白色DARKGRAY = 0x08 # 暗灰色/亮BLUE = 0x09 # 蓝色GREEN = 0x0a # 绿色SKYBLUE = 0x0b # 天蓝色RED = 0x0c # 红色PINK = 0x0d # 粉红色YELLOW = 0x0e # 黄色WHITE = 0x0f # 白色class Background():BLUE= 0x10 # 蓝GREEN = 0x20 # 绿RED= 0x40 # 红INTENSITY = 0x80 # 亮def SetCmdColor(color, handle=std_out_handle):Bool=ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)return Booldef Print(color:int,*args, **kwargs):SetCmdColor(color)print(*args, **kwargs)SetCmdColor(Text.DARKWHITE)

#server.pyfrom color import Text, Background, Print

| 4.客户端PyQt5信息过快使Textedit刷新空白

增加延迟呗

...def Show_Message(self, data):if data:for i in data.split('\n'):sleep(0.2 / len(data.split('\n'))) #防止信息过快使Textedit刷新空白self.textEdit_2.append(i)print(i)

| 5.PyQt5 消息超过一页下拉浪费时间: 指针至最后

def Show_Message(self, data):if data:...self.textEdit_2.moveCursor(QtGui.QTextCursor.End)

| 6.服务端socket多次连接不同地址错误

OSError: [WinError 10022] 提供了一个无效的参数.

@to_loggingdef socket_connect(self):if hasattr(self, "_socc"):self.retry() #socket多次连接不同地址会出现 `OSError: [WinError 10022] 提供了一个无效的参数.`else:self._socc = Trueself.socket.connect(self.addr)

| 7. 完整代码

客户端:

# -*- coding: utf-8 -*-# Form implementation generated from reading ui files 'USER.ui', 'Connect.ui'## Created by: PyQt5 UI code generator 5.15.4## WARNING: Any manual changes made to this file will be lost when pyuic5 is# run again. Do not edit this file unless you know what you are doing.from PyQt5 import QtCore, QtGui, QtWidgetsimport socket, sys, loggingfrom traceback import format_excfrom datetime import datetimefrom time import sleepfrom threading import Threadfrom random import randint as randimport logging # 引入logging模块logging.basicConfig(level=logging.DEBUG,format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')Username = f'zmh {rand(0,1000)}'TIMEOUT = 3IP = "114.67.206.19"def get_host_ip() -> str:"""get current IP address"""try:s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)s.connect(('8.8.8.8', 80))ip = s.getsockname()[0]finally:s.close()return ipselfIP = get_host_ip()bytecount = 1024dicts = {f"{IP} (公网)" : {"ip":IP, "port":16223},f"{selfIP} (私网)" : {"ip":selfIP,"port":429},"EXAM-41 (微机室)" : {"ip":"EXAM-41","port":429}}get_time = lambda: datetime.now().strftime('%Y %m %d %H:%M:%S')def threading(Daemon, **kwargs):thread = Thread(**kwargs)thread.setDaemon(Daemon)thread.start()return threaddef to_logging(command):def logs(*args, **kwargs):try:command(*args, **kwargs)except Exception as e:if "main" in dir():main.Show_Message(format_exc())else:logging.exception(str())return Falseelse:return Truereturn logsclass Socket:def __init__(self,Function=lambda i:None,code='utf-8'):self.socket = socket.socket()self.code = codeself._logger = Functionself.socket.settimeout(TIMEOUT)self._connect = Falsedef set_func(self, f):self._logger = fdef retry(self):del self.socketself.socket = socket.socket()self.socket.settimeout(TIMEOUT)@to_loggingdef socket_connect(self):if hasattr(self, "_socc"):self.retry() #socket多次连接不同地址会出现 `OSError: [WinError 10022] 提供了一个无效的参数.`else:self._socc = Trueself.socket.connect(self.addr)def connect(self, ip = None,port:int=0000, show=None):self.addr = (ip, port) if not self.socket_connect():show("[{}]: 连接服务器[{}]失败".format(get_time(),self.addr[0]))return Falseelse:show("[{}]: 连接成功".format(get_time()))return Truedef _handler(self):self.socket.send(Username.encode(self.code))while True:try:byte = self.socket.recv(bytecount **2)if len(byte) == 0:breakkb = len(byte) / bytecountself._logger(f'[{datetime.now().strftime("%H:%M:%S")}]{byte.decode(encoding=self.code)} {"<font size=1>%0.2f kb</font>" % kb}')except Exception as e:if not type(e) == socket.timeout:for n in ["","ERROR".center(20,"-")]+format_exc().split('\n')+["".center(20,"-"),""]:self._logger(f"<font color='red'>{n}</font>")self.socket.close()returndef run(self): #线程self._connect = Trueself.thread = threading(False, target=self._handler)class Ui_Dialog(object):def __init__(self):Dialog = QtWidgets.QDialog()self.Dialog = DialogDialog.setObjectName("Dialog")Dialog.resize(495, 81)self.gridLayout = QtWidgets.QGridLayout(Dialog)self.gridLayout.setObjectName("gridLayout")boBox = QtWidgets.QComboBox(Dialog)boBox.setObjectName("comboBox")boBox.addItems(list(dicts.keys()))self.gridLayout.addWidget(boBox, 0, 1, 1, 1)self.pushButton = QtWidgets.QPushButton(Dialog)self.pushButton.setObjectName("pushButton")self.gridLayout.addWidget(self.pushButton, 0, 2, 1, 1)self.label = QtWidgets.QLabel(Dialog)font = QtGui.QFont()font.setFamily("Consolas")self.label.setFont(font)self.label.setObjectName("label")self.gridLayout.addWidget(self.label, 0, 0, 1, 1)self.lineEdit = QtWidgets.QLineEdit(Dialog)self.lineEdit.setReadOnly(True)self.lineEdit.setObjectName("lineEdit")self.gridLayout.addWidget(self.lineEdit, 1, 0, 2, 3)self.retranslateUi(Dialog)self.pushButton.clicked.connect(self._connect)QtCore.QMetaObject.connectSlotsByName(Dialog)self.num = 0def retranslateUi(self, Dialog):_translate = QtCore.QCoreApplication.translateDialog.setWindowTitle(_translate("Dialog", "Dialog"))boBox.setItemText(0, _translate("Dialog", list(dicts.keys())[0]))self.pushButton.setText(_translate("Dialog", "Connecting"))self.label.setText(_translate("Dialog", "Select Socket Connect Address:"))self.set_text("No return value.")Dialog.show()def set_text(self, m):self.lineEdit.setText(QtCore.QCoreApplication.translate("Dialog", str(m)))self.lineEdit.update()@to_loggingdef _connect(self, i):addr = dicts[boBox.currentText()]self.set_text("[{}]: 尝试连接服务器[{}],最大超时报错 {}s".format(datetime.now().strftime('%Y %m %d %H:%M:%S'),addr["ip"],TIMEOUT))if s.connect(**addr, show=self.set_text):global mainmain = Ui_MainWindow()main.show()def close(widget):sleep(1)widget.close()threading(False, target=close, args=(self.Dialog, ))class Ui_MainWindow(object):def __init__(self):MainWindow = QtWidgets.QMainWindow()MainWindow.setObjectName("MainWindow")MainWindow.resize(800, 619)font = QtGui.QFont()font.setFamily("Consolas")MainWindow.setFont(font)self.centralwidget = QtWidgets.QWidget(MainWindow)self.centralwidget.setObjectName("centralwidget")self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)self.gridLayout.setObjectName("gridLayout")self.lineEdit_3 = QtWidgets.QLineEdit(self.centralwidget)self.lineEdit_3.setReadOnly(True)self.lineEdit_3.setObjectName("lineEdit_3")self.gridLayout.addWidget(self.lineEdit_3, 7, 3, 1, 1)self.pushButton = QtWidgets.QPushButton(self.centralwidget)self.pushButton.setStyleSheet("background-color:rgb(44, 176, 13);\n""color:rgb(255, 255, 255);\n""font: 200 10pt \"Consolas\";")self.pushButton.setObjectName("pushButton")self.gridLayout.addWidget(self.pushButton, 8, 6, 1, 1)spacerItem = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)self.gridLayout.addItem(spacerItem, 8, 5, 1, 1)self.label_2 = QtWidgets.QLabel(self.centralwidget)self.label_2.setObjectName("label_2")self.gridLayout.addWidget(self.label_2, 6, 1, 1, 1)spacerItem1 = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)self.gridLayout.addItem(spacerItem1, 8, 3, 1, 1)self.lineEdit_2 = QtWidgets.QLineEdit(self.centralwidget)self.lineEdit_2.setReadOnly(True)self.lineEdit_2.setObjectName("lineEdit_2")self.gridLayout.addWidget(self.lineEdit_2, 6, 3, 1, 1)self.label = QtWidgets.QLabel(self.centralwidget)self.label.setObjectName("label")self.gridLayout.addWidget(self.label, 5, 1, 1, 1)self.lineEdit = QtWidgets.QLineEdit(self.centralwidget)self.lineEdit.setCursor(QtGui.QCursor(QtCore.Qt.IBeamCursor))self.lineEdit.setDragEnabled(False)self.lineEdit.setReadOnly(True)self.lineEdit.setObjectName("lineEdit")self.gridLayout.addWidget(self.lineEdit, 5, 3, 1, 1)self.label_3 = QtWidgets.QLabel(self.centralwidget)self.label_3.setObjectName("label_3")self.gridLayout.addWidget(self.label_3, 7, 1, 1, 1)self.line = QtWidgets.QFrame(self.centralwidget)self.line.setFrameShape(QtWidgets.QFrame.VLine)self.line.setFrameShadow(QtWidgets.QFrame.Sunken)self.line.setObjectName("line")self.gridLayout.addWidget(self.line, 5, 4, 3, 1)self.textEdit = QtWidgets.QTextEdit(self.centralwidget)self.textEdit.setObjectName("textEdit")self.gridLayout.addWidget(self.textEdit, 5, 5, 3, 2)spacerItem2 = QtWidgets.QSpacerItem(20, 40, QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Expanding)self.gridLayout.addItem(spacerItem2, 1, 1, 1, 1)self.textEdit_2 = QtWidgets.QTextEdit(self.centralwidget)self.textEdit_2.setObjectName("textEdit_2")self.textEdit_2.setReadOnly(True)self.gridLayout.addWidget(self.textEdit_2, 0, 3, 2, 4)self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget)self.pushButton_2.setObjectName("pushButton_2")self.gridLayout.addWidget(self.pushButton_2, 8, 1, 1, 1)MainWindow.setCentralWidget(self.centralwidget)self.menubar = QtWidgets.QMenuBar(MainWindow)self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 24))self.menubar.setObjectName("menubar")self.menu = QtWidgets.QMenu(self.menubar)self.menu.setObjectName("menu")self.menulanguage = QtWidgets.QMenu(self.menu)self.menulanguage.setObjectName("menulanguage")MainWindow.setMenuBar(self.menubar)self.statusbar = QtWidgets.QStatusBar(MainWindow)self.statusbar.setObjectName("statusbar")MainWindow.setStatusBar(self.statusbar)self.actionsocket_connet = QtWidgets.QAction(MainWindow)self.actionsocket_connet.setObjectName("actionsocket_connet")self.actionChinese = QtWidgets.QAction(MainWindow)self.actionChinese.setObjectName("actionChinese")self.actionip_socket_gethostbyname_socket_gethostname = QtWidgets.QAction(MainWindow)self.actionip_socket_gethostbyname_socket_gethostname.setObjectName("actionip_socket_gethostbyname_socket_gethostname")self.menulanguage.addSeparator()self.menulanguage.addAction(self.actionChinese)self.menu.addSeparator()self.menu.addAction(self.menulanguage.menuAction())self.menu.addAction(self.actionip_socket_gethostbyname_socket_gethostname)self.menubar.addAction(self.menu.menuAction())self.socket_peername = s.addr[0]self.retranslateUi(MainWindow)QtCore.QMetaObject.connectSlotsByName(MainWindow)self.MainWindow = MainWindowself.pushButton.clicked.connect(self.send)@to_loggingdef sendmsg(self):data = self.textEdit.toPlainText().strip()if data:s.socket.send(data.encode('utf8'));self.textEdit.clear()@to_loggingdef send(self, i):if hasattr(s,"_connect") and s._connect:if not self.sendmsg():QtWidgets.QMessageBox.information(self.MainWindow, 'TraceBack',f'Socket Server<{self.socket_peername}> 断开连接')s._connect = Falseelse:self.Show_Message("<font color='red'>发送异常. 未连接至服务器.请点击[重新连接服务器]按钮尝试重新连接.</font>")def retranslateUi(self, MainWindow):_translate = QtCore.QCoreApplication.translateMainWindow.setWindowTitle(_translate("MainWindow", "Socket"))self.lineEdit_2.setText(socket.gethostname())self.lineEdit.setText(socket.gethostbyname(socket.gethostname()))self.lineEdit_3.setText(self.socket_peername)self.pushButton.setText(_translate("MainWindow", "send"))self.label_2.setText(_translate("MainWindow", "主机名:"))self.label.setText(_translate("MainWindow", "本地端口:"))self.label_3.setText(_translate("MainWindow", "连接端口:"))self.pushButton_2.setText(_translate("MainWindow", "重新连接服务器"))self.menu.setTitle(_translate("MainWindow", "设置"))self.menulanguage.setTitle(_translate("MainWindow", "language"))self.actionsocket_connet.setText(_translate("MainWindow", "socket connect"))self.actionChinese.setText(_translate("MainWindow", "Chinese"))self.actionip_socket_gethostbyname_socket_gethostname.setText(_translate("MainWindow", "ip: "+socket.gethostbyname(socket.gethostname()) ))s.set_func(self.Show_Message)s.run()def show(self):self.MainWindow.show()def Show_Message(self, data):if data:for i in data.split('\n'):sleep(0.2 / len(data.split('\n'))) #防止信息过快使Textedit刷新空白self.textEdit_2.append(i)print(i)self.textEdit_2.moveCursor(QtGui.QTextCursor.End)if __name__ == "__main__":app = QtWidgets.QApplication(sys.argv)s = Socket()conn = Ui_Dialog()sys.exit(app.exec_())

服务端:

server.py

import socket # 导入 socket 模块from threading import Threadimport loggingfrom color import Text, Background, Print__version__ = 1.1def threading(Daemon, **kwargs):thread = Thread(**kwargs)thread.setDaemon(Daemon)thread.start()return threadlogger = logging.getLogger(__name__)logger.setLevel(level = logging.INFO)handler = logging.FileHandler("log.txt")handler.setLevel(logging.INFO)handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))logger.addHandler(handler)console = logging.StreamHandler()console.setLevel(logging.INFO)logger.addHandler(handler)logger.addHandler(console)bytecount = 1024class Command_Handler(object):def __init__(self, bind=None):"""Bind Server class"""self.bind = binddef _function(self, _list, client):data = {"/info" : {"-v": self.get_version(),"-id" : self.get_id(client),"-i" : self.info(),"-h" : self.help()},}_dict = datafor n in range(len(_list)):if type(_dict) == dict:_dict = _dict.get(_list[n], self.unknown(" ".join(_list)))else:breakif type(_dict) == dict:_dict = "Error:\n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(_dict.keys()) return _dict@staticmethoddef help():return """/info [-v] [-id] [-i]-v : get version of program.-id : get your id.-i : get information.-h : help.For example, <font color=red>/info -id</font>"""@staticmethoddef get_version():return "version : " + str(__version__)def get_id(self, client):return "Your id is {}.".format(id(client))def info(self):return f"Socket Server[version {self.get_version()}] By zmh."def unknown(self,s):return """Error:No command named "%s". Please search [/info -h] to help.%s""" % (s, self.help())def cut(self, string):return string.strip().split()def handler(self, c, client=None):return "<font color='gray'>[command] </font><font color='brown'>%s</font>\n%s" % (c,str(self._function(self.cut(c),client)))def iscommand(self,i):return i.strip().startswith("/")class Server(object):join_message = "<font color='red'>Server></font> <font color='blue'>%s</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"user_message = "<font color='%s'>%s(%s端)%s></font> %s"quit_message = "%s 下线了, %s"def __init__(self, addr, port, backlog = 10, encode = 'utf8'):self.address = addr, portself.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.socket.bind(self.address)self.socket.listen(backlog)self.connect = {} = Command_Handler(self)self.encode = encodeself.errs = []self.errduring = Falsedef run(self):logger.info("服务器[%s]开启,端口[%s]" % (socket.gethostbyname(socket.gethostname()), self.address[1]))self.accept_client()def _get_sockets(self): #return intreturn len(self.connect.items())def _str_sockets(self):return f"当前人数 {self._get_sockets()}"def send(self, sock, user, mes):@self.QUIT(user)def func():sock.sendall(mes.encode(self.encode))return func()def QUIT(self, user):sock = list(self.connect.keys())[list(self.connect.values()).index(user)]def prop(command, *args, **kwargs):def logs(*args, **kwargs):try:command(*args, **kwargs)return Trueexcept:logging.exception(str())if self.errduring is False:self.errs.append((sock,user)) #在遍历时列表增加 -> 触发RuntimeErrorreturn Falsereturn logsreturn propdef ServerMessage(self, mes, inc=True):self.errduring = Truefor sock, user in self.connect.items():if not sock in self.errs:self.send(sock, user, mes)if inc:self.error_handle()self.errduring = Falsedef UserMessage(self, address, _user, mes,inc=True):if not mes:returnself.errduring = Truefor sock, user in self.connect.items():if not sock in self.errs:send_message = Server.user_message % ("brown" if _user == user else "red",_user,address,"(我自己)" if _user == user else "",mes)self.send(sock, user, send_message)logger.info(f"{address}[{_user}] : {mes}")if inc:self.error_handle()self.errduring = Falsedef error_handle(self):for _, user in self.errs: #使用变量_ : 为了防止触发KeyError: <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>if user in list(self.connect.values()):sock = list(self.connect.keys())[list(self.connect.values()).index(user)]logger.exception(msg = str())Q = Server.quit_message % (user, self._str_sockets())logger.info(Q)self.connect.pop(sock) self.ServerMessage(Q, False) #防止多次调用error_handle函数sock.close()def accept_client(self):while True:client, address = self.socket.accept() # 阻塞,等待客户端连接thread = threading(Daemon=True, target=self.message_handle, args=(client,address[0]))def message_handle(self,client,address):username = client.recv(1024).decode(encoding='utf8')self.connect[client] = usernamelogger.info(f"{address}[{username}] 加入服务器 , " + self._str_sockets())self.ServerMessage(Server.join_message % (address, self._get_sockets()) )@self.QUIT(username)def update():string = client.recv(bytecount **2).decode(encoding="utf8")Print(Text.BLUE,f"{username}>{string}")if .iscommand(string):self.send(client, username, .handler(string))else:self.UserMessage(address,username,string)while True:status = update()if not status:breakdef get_host_ip() -> str:"""get current IP address"""try:s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)s.connect(('8.8.8.8', 80))ip = s.getsockname()[0]finally:s.close()return ipserver = Server(get_host_ip(),429)server.run()sys.exit(app.exec_())

color.py

import ctypes,timeSTD_INPUT_HANDLE = -10 STD_OUTPUT_HANDLE= -11 STD_ERROR_HANDLE = -12std_out_handle=ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)class Text():DARKBLUE = 0x01 # 暗蓝色DARKGREEN = 0x02 # 暗绿色DARKSKYBLUE = 0x03 # 暗天蓝色DARKRED = 0x04 # 暗红色DARKPINK = 0x05 # 暗粉红色DARKYELLOW = 0x06 # 暗黄色DARKWHITE = 0x07 # 暗白色DARKGRAY = 0x08 # 暗灰色/亮BLUE = 0x09 # 蓝色GREEN = 0x0a # 绿色SKYBLUE = 0x0b # 天蓝色RED = 0x0c # 红色PINK = 0x0d # 粉红色YELLOW = 0x0e # 黄色WHITE = 0x0f # 白色class Background():BLUE= 0x10 # 蓝GREEN = 0x20 # 绿RED= 0x40 # 红INTENSITY = 0x80 # 亮def SetCmdColor(color, handle=std_out_handle):Bool=ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)return Booldef Print(color:int,*args, **kwargs):SetCmdColor(color)print(*args, **kwargs)SetCmdColor(Text.DARKWHITE)

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。