肿瘤康复网,内容丰富有趣,生活中的好帮手!
肿瘤康复网 > python 中 websocket实现消息定时推送

python 中 websocket实现消息定时推送

时间:2020-11-30 14:22:56

相关推荐

一、socket服务端 websocket_util.py

import socketimport base64import hashlibfrom threading import Threadimport structimport copyimport jsonimport timefrom XX.redis_db import RedisDbfrom utils.summary import summaryfrom utils.redis_util import RedisUtilimport configredis_db = RedisDb(config.REDIS)redis_util = RedisUtil()class WebSocketUtil(object):global usersusers = set()def __init__(self, port=config.SOCKET_PORT, max_wait_user=5):self.sock = socket.socket()self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.sock.bind(("0.0.0.0", port))self.sock.listen(max_wait_user)# 请求头转换格式为字典def get_headers(self, data):"""将请求头转换为字典"""header_dict = {}data = str(data, encoding="utf-8")header, body = data.split("\r\n\r\n", 1)header_list = header.split("\r\n")for i in range(0, len(header_list)):if i == 0:if len(header_list[0].split(" ")) == 3:header_dict['method'], header_dict['url'], header_dict['protocol'] = header_list[0].split(" ")else:k, v = header_list[i].split(":", 1)header_dict[k] = v.strip()return header_dict# 等待用户连接def socket_connect(self):conn, addr = self.sock.accept()users.add(conn)# 获取握手消息,magic string ,sha1加密 发送给客户端 握手消息data = conn.recv(8096)headers = self.get_headers(data)# 对请求头中的sec-websocket-key进行加密response_tpl = "HTTP/1.1 101 Switching Protocols\r\n" \"Upgrade:websocket\r\n" \"Connection: Upgrade\r\n" \"Sec-WebSocket-Accept: %s\r\n" \"WebSocket-Location: ws://%s%s\r\n\r\n"magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'value = headers['Sec-WebSocket-Key'] + magic_stringac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest())response_str = response_tpl % (ac.decode('utf-8'), headers['Host'], headers['url'])# 响应握手信息conn.send(bytes(response_str, encoding='utf-8'), )# 新的连接成功立马发一次数据data = summary()self.send_msg(conn, bytes(json.dumps(data), encoding="utf-8"))# 添加redis,保存该连接最近推送消息的时间戳redis_key = redis_util.get_md5_key(str(conn))redis_db.str_set(redis_key, int(time.time()))def get_data(self, info):payload_len = info[1] & 127if payload_len == 126:extend_payload_len = info[2:4]mask = info[4:8]decoded = info[8:]elif payload_len == 127:extend_payload_len = info[2:10]mask = info[10:14]decoded = info[14:]else:extend_payload_len = Nonemask = info[2:6]decoded = info[6:]bytes_list = bytearray() # 这里我们使用字节将数据全部收集,再去字符串编码,这样不会导致中文乱码for i in range(len(decoded)):chunk = decoded[i] ^ mask[i % 4] # 解码方式bytes_list.append(chunk)body = str(bytes_list, encoding='utf-8')return body# 向客户端发送数据def send_msg(self, conn, msg_bytes):"""WebSocket服务端向客户端发送消息:param conn: 客户端连接到服务器端的socket对象,即: conn,address = socket.accept():param msg_bytes: 向客户端发送的字节:return:"""token = b"\x81" # 接收的第一字节,一般都是x81不变length = len(msg_bytes)if length < 126:token += struct.pack("B", length)elif length <= 0xFFFF:token += struct.pack("!BH", 126, length)else:token += struct.pack("!BQ", 127, length)msg = token + msg_bytes# 如果出错就是客户端断开连接try:conn.send(msg)except Exception as e:# 删除断开连接的记录users.remove(conn)# 循环等待客户端建立连接def wait_socket_connect(self):while True:self.socket_connect()# socket服务端监听客户端连接并批量推送数据def start_socket_server(self):# 启线程循环等待客户端建立连接Thread(target=self.wait_socket_connect).start()# 消息推送while True:# 判断是否有客户端连接,有才推送消息if len(users):send_users = copy.copy(users)# 自定义的消息内容data = summary()# 遍历for user in send_users:# 判断该连接是否最近5s推送消息(可配置时间),有就跳过if self.get_connect_last_send_time(user):self.send_msg(user, bytes(json.dumps(data), encoding="utf-8"))else:passtime.sleep(config.FLUSH_TIME)# 获取该连接最近推送消息的时间戳并判断是否在阀值内def get_connect_last_send_time(self, conn, time_threshold=config.TIME_THRESHOLD):try:redis_key = redis_util.get_md5_key(str(conn))last_send_time = int(redis_db.str_get(redis_key) or 0)return True if time.time() - last_send_time > time_threshold else Falseexcept Exception as e:return Trueif __name__ == '__main__':web = WebSocketUtil(port=8005)web.start_socket_server()

二、 socket 客户端 websocket_client.py, 可同时多个连接,可以通过服务端sock.listen(5)调整可等待连接数

import websocketimport jsonws = websocket.create_connection("ws://127.0.0.1:8080/")while True:data = ws.recv()print(json.loads(str(data, encoding="utf-8")))

三、 前端测试 test.html

<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><title>Title</title></head><body></body></html><script>ws =new WebSocket("ws://127.0.0.1:8005/");ws.onopen = function(ev){console.log("连接成功")};ws.onmessage = function (ev){console.log("接收消息为:");console.log(ev);}</script>

四、在flask项目结合使用websocket

1、只需要在flask项目启动时启一个线程来调用 websocket_util.py中的start_socket_server()

2、例子:

# 线程启动socket服务端from utils.websocket_util import WebSocketUtilimport configimport threadingtry:# uwsgi启动socket服务,只选其中一个进程启动,多个会报端口占用错误import uwsgiif uwsgi.worker_id() == 2:# uwsgi启动socket服务,默认端口为8003web_socket = WebSocketUtil(port=config.SOCKET_PORT)threading.Thread(target=web_socket.start_socket_server).start()print("socket server uwsgi启动成功")print("服务端推送间隔为:" + str(config.FLUSH_TIME) + "s")except Exception as e:print(e)print("socket server uwsgi启动失败")try:# 如果报错,证明是本地远程启动项目,直接启动socket服务 8004端口web_socket = WebSocketUtil(port=8004)threading.Thread(target=web_socket.start_socket_server).start()print("socket server 本地远程启动成功")print("服务端推送间隔为:" + str(config.FLUSH_TIME) + "s")except Exception as e:print("socket server 本地远程启动失败")

五、Nginx配置websocket反向代理

# 第一种:另起一个server端口代理map $http_upgrade $connection_upgrade { default upgrade; '' close; } upstream wsbackend{ server ip1:8003; keepalive 1000; } server { listen 8003; location /{ proxy_http_version 1.1; proxy_pass http://wsbackend; proxy_redirect off; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_read_timeout 3600s; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; } }# 第二种:在原来的server中添加一个location,将"ws://127.0.0.1:8080/websocket/"转成"ws://127.0.0.1:8003/"# web socket locationlocation ~ /websocket/ {proxy_http_version 1.1;proxy_pass http://127.0.0.1:8003;proxy_redirect off;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_read_timeout 3600s;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection $connection_upgrade;}

以上也是借鉴了网友的分享再加上自己的一点优化,仅供参考。

如果觉得《python 中 websocket实现消息定时推送》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。