flask websocket如何同时发送多个数据。

2021-02-05 11:18发布

情况是这样,我需要对页面更新多个数据,包括时间,地点,人数。其中时间和人数都需要用ws进行实时推送数据,请问应该怎么实现呢?

情况是这样,我需要对页面更新多个数据,包括时间,地点,人数。其中时间和人数都需要用ws进行实时推送数据,请问应该怎么实现呢?

1条回答
卡卡
2楼 · 2021-02-25 14:43



WebSocket协议简介

WebSocket是html5规范新引入的功能,用于解决浏览器与后台服务器双向通讯的问题,使用WebSocket技术,后台可以随时向前端推送消息,以保证前后台状态统一,在传统的无状态HTTP协议中,这是“无法做到”的。

在WebSocket出现之前,传统的服务端向浏览器推送消息的技术包括:ajax、flash、comet、javaapplet等。无一例外,这些技术使用的都是长轮循,即每隔一段时间去请求后台,以获取最新状态。长轮询方式容易实现,但效果也差,频繁盲目的调用后台,带来不必要的开销,且实时性无法保障,后台出现更新,前端需要在下一次轮询时才知道。

WebSocket协议支持服务端与浏览器建立长连接,双方可以随时发送数据给对方,不再是由客户端控制的一问一答的方式。在实现推送功能的时候,主要是由服务端给客户端发送数据。

基于长轮循(polling)和websocket推送的浏览器(browser)和服务端(Server)的交互对比图如下所示:


由于WebSocket协议建立在http协议的基础之上,因此二者有很多的类似之处。事实上,在使用websocket协议时,浏览器与服务端最开始建立的还是http连接,之后再将协议从http转换成websocket,协议转换的过程称之为握手(handshake),表示服务端与客户端都同意建立websocket协议。

需要注意的是,由于websocket是新的协议,需要浏览器和web服务端都支持的情况下,才能建立连接成功。

正常情况下,连接在建立的时候,浏览器向服务端发送一个HTTP请求,通过包含一些额外信息,表明其希望将协议从HTTP转换成WebSocket。这个额外信息实际上就是增加了一个请求头Update,如下所示:

GETws://echo.websocket.org/?encoding=textHTTP/1.1

Origin:http://websocket.org

Cookie:__utma=99as

Connection:Upgrade

Host:echo.websocket.org

Sec-WebSocket-Key:uRovscZjNol/umbTt5uKmw==

Upgrade:websocket

Sec-WebSocket-Version:13

可以看到Update请求头的值为websocket,表示希望将Http协议转换成websocket协议。

Origin表示的是,连接开始时发送http请求的地址。

注意GET的路径是以ws开头,这是因为WebSocket是一种全新的协议,不属于http无状态协议,协议名为”ws",与http协议使用相同的80端口,类似的,"wss"和https协议使用相同的443端口。

客户端握手请求中的Sec-WebSocket-Key头字段中的内容是随机的,采用base64编码。服务端会接收到这个值之后,会将其与一个魔幻数字258EAFA5-E914-47DA-95CA-C5AB0DC85B11进行连接,使用SHA-1加密后,采用base64编码,以Sec-WebSocket-Accept响应头返回。

如果服务端接受了客户端将http协议转换成websocket协议连接的请求,会返回类似如下响应,表示协议切换成功:HTTP/1.1101WebSocketProtocolHandshake

Date:Fri,10Feb201217:38:18GMT

Connection:Upgrade

Server:KaazingGateway

Upgrade:WebSocket

Access-Control-Allow-Origin:http://websocket.org

Access-Control-Allow-Credentials:true

Sec-WebSocket-Accept:rLHCkw/SKsO9GAH/ZSFhBATDKrU=

Access-Control-Allow-Headers:content-type


WebSocket客户端开发

支持html5的浏览器,一般都会提供一个内置的js对象Websocket,开发者利用这个对象就可以与服务端建立websocket连接。特别的在FireFox中,这个对象为在Firefox中为MozWebSocket。

可以通过以下js代码检测一个浏览器是否支持websocketwindow.WebSocket = window.WebSocket || window.MozWebSocket;

if (!window.WebSocket){

alert("WebSocket not supported by this browser");

return;

}

websocket协议连接的创建var myWebSocket = new WebSocket("ws://www.websockets.org");

关闭连接使用Websocket对象的close方法myWebSocket.close();

发送消息使用Websocket对象的send方法myWebSocket.send("Hello WebSockets!");

Websocket对象还提供了几个回调方法,在适当的实际会被回调://连接创建成功时被回调

myWebSocket.onopen=function(evt){alert("Connectionopen...");};

//收到服务端的消息时被回调

myWebSocket.onmessage=function(evt){alert("ReceivedMessage:"+evt.data);};

//连接关闭时被回调

myWebSocket.onclose=function(evt){alert("Connectionclosed.");};

完整的客户端案例:

在这里我们编写一个界面,如下所示,在"客户端请求消息"文本框中填入要发送的内容,点击"发送WebSocket请求消息按钮",服务端的响应消息会显示在"服务端返回的应答消息文本框中"


源码如下所示:html>

 Netty WebSocket 协议 

客户端请求消息

服务端返回的应答消息

window.WebSocket = window.WebSocket || window.MozWebSocket;

if (!window.WebSocket){

alert("你的浏览器不支持websocket协议");

}else{

var socket = new WebSocket("ws://localhost:8080/websocket");

socket.onmessage = function (event) {

var ta = document.getElementById('responseText');

ta.value = event.data

};

socket.onopen = function (event) {

alert("websocket连接建立成功...");

};

socket.onclose = function (event) {

alert("连接关闭");

};

function send(message) {

if (!window.WebSocket) {

return;

}

if (socket.readyState == WebSocket.OPEN) {

socket.send(message);

}

else {

alert("WebSocket not supported by this browser");

}

}

}

NettyWebsocket服务端开发public class WebSocketServer {

public void run(int port) throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch)

throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("http-codec",

new HttpServerCodec());

pipeline.addLast("aggregator",

new HttpObjectAggregator(65536));

ch.pipeline().addLast("http-chunked",

new ChunkedWriteHandler());

pipeline.addLast("handler",

new WebSocketServerHandler());

}

});

Channel ch = b.bind(port).sync().channel();

System.out.println("Web socket server started at port " + port

+ '.');

System.out

.println("Open your browser and navigate to http://localhost:"

+ port + '/');

ch.closeFuture().sync();

} finally {

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

public static void main(String[] args) throws Exception {

int port = 8080;

if (args.length > 0) {

try {

port = Integer.parseInt(args[0]);

} catch (NumberFormatException e) {

e.printStackTrace();

}

}

new WebSocketServer().run(port);

}

}

上述代码中的HttpServerCodec、HttpObjectAggregator、ChunkedWriteHandler都是Netty提供的对tttp或webscoket协议支持的ChannelHandler:

HttpServerCodec的作用是将请求或者应答消息按照HTTP协议的格式进行解码或者编码。

HttpObjectAggregator的目的是将HTTP消息的多个部分组合成一条完整的HTTP消息,也就是处理粘包与解包问题。

ChunkedWriteHandler用来向客户单发送HTML文件,它主要用于支持浏览器和服务端进行WebSocket通信。

WebSocketServerHandler是我们自己编写的处理webscoket请求的ChannelHandler。public class WebSocketServerHandler extends SimpleChannelInboundHandler {

private static final Logger logger = Logger

.getLogger(WebSocketServerHandler.class.getName());

private WebSocketServerHandshaker handshaker;

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

// 传统的HTTP接入

if (msg instanceof FullHttpRequest) {

handleHttpRequest(ctx, (FullHttpRequest) msg);

}

// WebSocket接入

else if (msg instanceof WebSocketFrame) {

handleWebSocketFrame(ctx, (WebSocketFrame) msg);

}

}

private void handleHttpRequest(ChannelHandlerContext ctx,

FullHttpRequest req) throws Exception {

// 如果HTTP解码失败,返回HHTP异常

if (!req.getDecoderResult().isSuccess()

|| (!"websocket".equals(req.headers().get("Upgrade")))) {

sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1,

BAD_REQUEST));

return;

}

// 构造握手响应返回,本机测试

WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(

"ws://localhost:8080/websocket", null, false);

handshaker = wsFactory.newHandshaker(req);

if (handshaker == null) {

WebSocketServerHandshakerFactory

.sendUnsupportedVersionResponse(ctx.channel());

} else {

handshaker.handshake(ctx.channel(), req);

}

}

private void handleWebSocketFrame(ChannelHandlerContext ctx,

WebSocketFrame frame) {

// 判断是否是关闭链路的指令

if (frame instanceof CloseWebSocketFrame) {

handshaker.close(ctx.channel(),

(CloseWebSocketFrame) frame.retain());

return;

}

// 判断是否是Ping消息

if (frame instanceof PingWebSocketFrame) {

ctx.channel().write(

new PongWebSocketFrame(frame.content().retain()));

return;

}

// 本例程仅支持文本消息,不支持二进制消息

if (!(frame instanceof TextWebSocketFrame)) {

throw new UnsupportedOperationException(String.format(

"%s frame types not supported", frame.getClass().getName()));

}

// 返回应答消息

String request = ((TextWebSocketFrame) frame).text();

if (logger.isLoggable(Level.FINE)) {

logger.fine(String.format("%s received %s", ctx.channel(), request));

}

ctx.channel().write(new TextWebSocketFrame(" 收到客户端请求:"+request));

}

private static void sendHttpResponse(ChannelHandlerContext ctx,

FullHttpRequest req, FullHttpResponse res) {

// 返回应答给客户端

if (res.getStatus().code() != 200) {

ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(),

CharsetUtil.UTF_8);

res.content().writeBytes(buf);

buf.release();

setContentLength(res, res.content().readableBytes());

}

// 如果是非Keep-Alive,关闭连接

ChannelFuture f = ctx.channel().writeAndFlush(res);

if (!isKeepAlive(req) || res.getStatus().code() != 200) {

f.addListener(ChannelFutureListener.CLOSE);

}

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.flush();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

throws Exception {

cause.printStackTrace();

ctx.close();

}

}

在上述代码中,当接收到客户端发送的请求消息时,首先通过msg是FullHttpRequest还是WebSocketFrame来判断判断是http请求还是websocket请求。然后分别调用handleHttpRequest、handleWebSocketFrame方法来处理。

对于handleHttpRequest方法,仅仅在第一次请求的时候被调用一次

连接建立时,第一次发送的肯定是一个http请求,因为将http升级websocket协议需要服务端支持,但是并不是所有的服务端都支持websocket协议,因此服务端需要在handleHttpRequest方法中,通知客户端,服务端是否同意将HTTP协议升级成WebSocket协议。在我们的案例中,通过WebSocketServerHandshaker的handshake方法,告诉客户端,服务端同意协议升级,此时websocket连接建立完成。

一旦websocket协议建立完成,之后的所有的请求都会走handleWebSocketFrame方法,这个方法的处理顺序如下:

1、判断是否是CloseWebSocketFrame请求,如果是,关闭连接

2、判断是否是PingWebSocketFrame消息,如果是维持链路的Ping消息,则构造Pong消息返回。Ping和Pong是websocket里的心跳,用来保证客户端是在线的。

3、判断消息是否是TextWebSocketFrame文本消息。因为websocket消息是基于HTTP协议的,http除了支持文本消息,还是支持图片上传等复杂的二进制消息。按我们的案例中,只对文本消息进行处理。

4、返回应答消息

在handleWebSocketFrame方法中,我们使用到了webscocket中不同的帧类型,CloseWebSocketFrame、PingWebSocketFrame、TextWebSocketFrame。帧类型实际上就是WebSocket协议定义的消息类型。在一个完整的WebSocket帧中(如下图所示),包含了一个4位的二进制Opcode字段,其用来表示消息类型。


Opcode的取值和消息类型的对应关系如下所示:OPCODE:4位

0x0表示附加数据帧

0x1表示文本数据帧

0x2表示二进制数据帧

0x3-7暂时无定义,为以后的非控制帧保留

0x8表示连接关闭

0x9表示ping

0xA表示pong

0xB-F暂时无定义,为以后的控制帧保留

可以看到,虽然4位2进制最多支持16种消息类型,但是目前实际上只定义了六种消息类型。在Netty中,针对这每一种帧类型,都有一个java类与之对应,如下所示。


其中Close、Ping以及Pong称之为控制帧,Close关闭帧很容易理解,客户端如果接受到了就关闭连接,客户端也可以发送关闭帧给服务端。Ping和Pong是websocket里的心跳,用来保证客户端是在线的。

Text和Binary表示这个传输的帧是文本类型还是二进制类型,二进制类型传输的数据可以是图片或者语音之类的。

如何实现消息推送

之前的案例,还是通过客户端发送请求,服务端进行响应的方式进行交互。WebSocket协议还可以实现服务端主动向客户端推送消息的功能。

主动推送需要考虑的问题包括:

1、服务端要保存用户唯一标记(假设为userId)与其对应的SocketChannel的对应关系,以便之后根据这个唯一标记给用户推送消息。

对应关系的保存很简单,用一个Map来记录即可。保存这种对应关系的最佳时机是websocket连接刚刚建立的时候,因为这种对应关系只需要保存一次,之后就可以拿着这个userId找到对应SocketChannel给用户推送消息。以之前的代码为例,很明显这个操作,应该放在handleHttpRequest方法中处理,因为这个方法在一个websocket连接的生命周期只会调用一次。

我们知道,当连接建立时,我们在服务端就能获取到客户端对应的SocketChannel,我们如果获取到userId呢?WebSocket协议与HTTP协议一样,都支持在url中添加参数。如:ws://localhost:8080/websocket?userId=123456

我们可以在handleHttpRequest中添加到以下代码获取到这个参数//这个map用于保存userId和SocketChannel的对应关系

public static Map map=new ConcurrentHashMap();

private void handleHttpRequest(ChannelHandlerContext ctx,

FullHttpRequest req) throws Exception {

....

//获取userId

QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.getUri());

Map> params = queryStringDecoder.parameters();

String userId=params.get("userId").get(0);

//保存userId和Channel的对应关系

map.put(userId,ctx.channel());

}

此外,当用户断开与服务端的连接的时候,也就是当服务端接收到CloseWebSocketFrame帧的时候,需要从Map中移除对应的userId和SocketChannel,以免Map中存储过多的无效用户数据导致内存溢出。

2、通过userId给用户推送消息

通常消息推送是由运营人员根据业务需要,从一个后台管理界面,过滤出符合特定条件的用户来推送消息,因此需要推送的userId和推送内容都是运营指定的。推送服务端只需要根据userId找到对应的Channel,将推送消息推送给浏览器即可。代码片段如下://运营人员制定要推送的userId和消息

String userId="123456";

String msg="xxxxxxxxxxxxxxxxxxxxxx";

//根据userId找到对应的channel,并将消息写出

Channel channel = map.get(userId);

channel.writeAndFlush(new TextWebSocketFrame(msg));

一般公司的推送的消息量都比较大,所以需要推送的消息一般会放到一个外部消息队列中,如rocketmq,kafka。推送服务端通过消费这些队列,来给用户推送消息。

3、客户端ack接受到的消息

一般情况下,我们会每一个推送的消息都设置一个唯一的msgId。客户端在接受推送的消息后,需要对这条消息进行ack,也就是告诉服务端自己接收到了这条消息,ack时需要将msgId带回来。

如果我们只使用websocket做推送服务,那么之前代码中的handleWebSocketFrame方法,要处理的主要就是这类ack消息。

4、离线消息

如果当给某个用户推送消息的时候,其并不在线,可以将消息保存下来,一般都是保存到一个缓存服务器中(如redis),每次当一个用户与服务端建立连接的时候,首先检查缓存服务器中有没有其对应的离线消息,如果有直接取出来发送给用户。

5、消息推送记录的存储

服务端推送完成消息记录之后,还需要将这些消息存储下来,以支持一个用户查看自己曾经受到过的推送消息。

6、疲劳度控制

在实现推送功能时,我们通常还会做一个疲劳度控制的功能,也就是限制给一个用户每天推送消息的数量,以免频繁的推送消息给用户造成不好的体验。疲劳度控制可以从多个维度进行设计。例如用户一天可以接受到消息总数,根据业务划分的某种特定类型的消息的一天可以接受到的消息总数等。

参考文章:

http://websocket.org/aboutwebsocket.html

http://www.jianshu.com/p/867274a5e054

http://www.jianshu.com/p/1ee01218097b

http://www.jianshu.com/p/eb4c1c724d9e



相关问题推荐

  • 回答 3

    换行。比如,print hello\nworld效果就是helloworld\n就是一个换行符。\是转义的意思,'\n'是换行,'\t'是tab,'\\'是,\ 是在编写程序中句子太长百,人为换行后加上\但print出来是一整行。...

  • 回答 42

    十种常见排序算法一般分为以下几种:(1)非线性时间比较类排序:a. 交换类排序(快速排序、冒泡排序)b. 插入类排序(简单插入排序、希尔排序)c. 选择类排序(简单选择排序、堆排序)d. 归并排序(二路归并排序、多路归并排序)(2)线性时间非比较类排序:...

  • 回答 70
    已采纳

    前景很好,中国正在产业升级,工业机器人和人工智能方面都会是强烈的热点,而且正好是在3~5年以后的时间。难度,肯定高,要求你有创新的思维能力,高数中的微积分、数列等等必须得非常好,软件编程(基础的应用最广泛的语言:C/C++)必须得很好,微电子(数字电...

  • 回答 28

    迭代器与生成器的区别:(1)生成器:生成器本质上就是一个函数,它记住了上一次返回时在函数体中的位置。对生成器函数的第二次(或第n次)调用,跳转到函数上一次挂起的位置。而且记录了程序执行的上下文。生成器不仅记住了它的数据状态,生成器还记住了程序...

  • 回答 9

    python中title( )属于python中字符串函数,返回’标题化‘的字符串,就是单词的开头为大写,其余为小写

  • 回答 6

    第一种解释:代码中的cnt是count的简称,一种电脑计算机内部的数学函数的名字,在Excel办公软件中计算参数列表中的数字项的个数;在数据库( sq| server或者access )中可以用来统计符合条件的数据条数。函数COUNT在计数时,将把数值型的数字计算进去;但是...

  • 回答 1

    head是方法,所以需要取小括号,即dataset.head()显示的则是前5行。data[:, :-1]和data[:, -1]。另外,如果想通过位置取数据,请使用iloc,即dataset.iloc[:, :-1]和dataset.iloc[:, -1],前者表示的是取所有行,但不包括最后一列的数据,结果是个DataFrame。...

  • Python入门简单吗2021-09-23 13:21
    回答 45

    挺简单的,其实课程内容没有我们想象的那么难、像我之前同学,完全零基础,培训了半年,直接出来就工作了,人家还在北京大公司上班,一个月15k,实力老厉害了

  • 回答 4

    Python针对众多的类型,提供了众多的内建函数来处理(内建是相对于导入import来说的,后面学习到包package时,将会介绍),这些内建函数功用在于其往往可对多种类型对象进行类似的操作,即多种类型对象的共有的操作;如果某种操作只对特殊的某一类对象可行,Pyt...

  • 回答 8

     相当于 ... 这里不是注释

  • 回答 4

    还有FIXME

  • 回答 3

    python的两个库:xlrd和xlutils。 xlrd打开excel,但是打开的excel并不能直接写入数据,需要用xlutils主要是复制一份出来,实现后续的写入功能。

  • 回答 8

    单行注释:Python中的单行注释一般是以#开头的,#右边的文字都会被当做解释说明的内容,不会被当做执行的程序。为了保证代码的可读性,一般会在#后面加一两个空格然后在编写解释内容。示例:#  单行注释print(hello world)注释可以放在代码上面也可以放在代...

  • 回答 2

    主要是按行读取,然后就是写出判断逻辑来勘测行是否为注视行,空行,编码行其他的:import linecachefile=open('3_2.txt','r')linecount=len(file.readlines())linecache.getline('3_2.txt',linecount)这样做的过程中发现一个问题,...

  • 回答 4

    或许是里面有没被注释的代码

  • 回答 26

    自学的话要看个人情况,可以先在B站找一下视频看一下

没有解决我的问题,去提问