整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

网页端IM通信技术快速入门:短轮询、长轮询、SSE、WebSocket

文来自“糊糊糊糊糊了”的分享,原题《实时消息推送整理》,有优化和改动。

1、写在前面

对Web端即时通讯技术熟悉的开发者来说,我们回顾网页端IM的底层通信技术,从短轮询、长轮询,到后来的SSE以及WebSocket,使用门槛越来越低(早期的长轮询Comet这类技术实际属于hack手段,使用门槛并不低),技术手段越来越先进,网页端即时通讯技术的体验也因此越来越好。

但上周在编辑《IM扫码登录技术专题》系列文章第3篇的时候忽然想到,之前的这些所谓的网页端即时通讯“老技术”相对于当红的WebSocket,并非毫无用武之地。就拿IM里的扫码登录功能来说,用短轮询技术就非常合适,完全没必要大炮打蚊子上WebSocket。

所以,很多时候没必要盲目追求新技术,相对应用场景来说适合的才是最好的。对于即时通讯网的im和消息推送这类即时通讯技术开发者来说,掌握WebSocket固然很重要,但了解短轮询、长轮询等这些所谓的Web端即时通讯“老技术”仍然大有裨益,这也正是整理分享本文的重要原因。

2、推荐阅读

[1] 新手入门贴:史上最全Web端即时通讯技术原理详解

[2] 详解Web端通信方式的演进:从Ajax、JSONP 到 SSE、Websocket

[3] Web端即时通讯技术盘点:短轮询、Comet、Websocket、SSE

3、正文引言

对于IM/消息推送这类即时通讯系统而言,系统的关键就是“实时通信”能力。

从表面意思上来看,“实时通信”指的是:

  • 1)客户端能随时主动发送数据给服务端;
  • 2)当客户端关注的内容在发生改变时,服务器能够实时地通知客户端。

类比于传统的C/S请求模型,“实时通信”时客户端不需要主观地发送请求去获取自己关心的内容,而是由服务器端进行“推送”。

注意:上面的“推送”二字打了引号,实际上现有的几种技术实现方式中,并不是服务器端真正主动地推送,而是通过一定的手段营造了一种“实时通信”的假象。

就目前现有的几种技术而言,主要有以下几类:

  • 1)客户端轮询:传统意义上的短轮询(Short Polling);
  • 2)服务器端轮询:长轮询(Long Polling);
  • 3)单向服务器推送:Server-Sent Events(SSE);
  • 4)全双工通信:WebSocket。

以下正文将针对这几种技术方案,为你一一解惑。

4、本文配套Demo和代码

为了帮助读者更好的理解本文内容,笔者专门写了一个较完整的Demo,Demo会以一个简易聊天室的例子来分别通过上述的四种技术方式实现(代码存在些许bug,主要是为了做演示用,别介意)。

完整Demo源码打包下载:

请从同步链接附件中下载:http://www.52im.net/thread-3555-1-1.html)

Demo的运行效果(动图):

有兴趣可以自行下载研究学习。

5、理解短轮询(Short Polling)

短轮询的实现原理:

  • 1)客户端向服务器端发送一个请求,服务器返回数据,然后客户端根据服务器端返回的数据进行处理;
  • 2)客户端继续向服务器端发送请求,继续重复以上的步骤,如果不想给服务器端太大的压力,一般情况下会设置一个请求的时间间隔。

逻辑如下图所示:

使用短轮询的优点:基础不需要额外的开发成本,请求数据,解析数据,作出响应,仅此而已,然后不断重复。

缺点也显而易见:

  • 1)不断的发送和关闭请求,对服务器的压力会比较大,因为本身开启Http连接就是一件比较耗资源的事情;
  • 2)轮询的时间间隔不好控制。如果要求的实时性比较高,显然使用短轮询会有明显的短板,如果设置interval的间隔过长,会导致消息延迟,而如果太短,会对服务器产生压力。

短轮询客户的代码实现(片段节选):

var ShortPollingNotification = {

datasInterval: null,

subscribe: function() {

this.datasInterval = setInterval(function() {

Request.getDatas().then(function(res) {

window.ChatroomDOM.renderData(res);

});

}, TIMEOUT);

return this.unsubscribe;

},

unsubscribe: function() {

this.datasInterval && clearInterval(this.datasInterval);

}

}

PS:完整代码,请见本文“4、本文配套Demo和代码”一节。

对应本文配套Demo的运行效果如下(动图):

下面是对应的请求,注意左下角的请求数量一直在变化:

在上图中,每隔1s就会发送一个请求,看起来效果还不错,但是如果将timeout的值设置成5s,效果将大打折扣。如下图所示。

将timeout值设置成5s时的Demo运行效果(动图):

6、理解长轮询(Long Polling)

6.1 基本原理

长轮询的基本原理:

  • 1)客户端发送一个请求,服务器会hold住这个请求;
  • 2)直到监听的内容有改变,才会返回数据,断开连接(或者在一定的时间内,请求还得不到返回,就会因为超时自动断开连接);
  • 3)客户端继续发送请求,重复以上步骤。

逻辑如下图所示:

长轮询是基于短轮询上的改进版本:主要是减少了客户端发起Http连接的开销,改成了在服务器端主动地去判断所关心的内容是否变化。

所以其实轮询的本质并没有多大变化,变化的点在于:

  • 1)对于内容变化的轮询由客户端改成了服务器端(客户端会在连接中断之后,会再次发送请求,对比短轮询来说,大大减少了发起连接的次数);
  • 2)客户端只会在数据改变时去作相应的改变,对比短轮询来说,并不是全盘接收。

6.2 代码实现

长轮询客户的代码实现(片段节选):

// 客户端

var LongPollingNotification = {

// ....

subscribe: function() {

var that = this;


// 设置超时时间

Request.getV2Datas(this.getKey(),{ timeout: 10000 }).then(function(res) {

var data = res.data;

window.ChatroomDOM.renderData(res);

// 成功获取数据后会再次发送请求

that.subscribe();

}).catch(function(error) {

// timeout 之后也会再次发送请求

that.subscribe();

});

return this.unsubscribe;

}

// ....

}

笔者采用的是express,默认不支持hold住请求,因此用了一个express-longpoll的库来实现。

下面是一个原生不用库的实现(这里只是介绍原理),整体的思路是:如果服务器端支持hold住请求的话,那么在一定的时间内会自轮询,然后期间通过比较key值,判断是否返回新数据。

以下是具体思路:

  • 1)客户端第一次会带一个空的key值,这次会立即返回,获取新内容,服务器端将计算出的contentKey返回给客户端;
  • 2)然后客户端发送第二次请求,带上第一次返回的contentKey作为key值,然后进行下一轮的比较;
  • 3)如果两次的key值相同,就会hold请求,进行内部轮询,如果期间有新内容或者客户端timeout,就会断开连接;
  • 4)重复以上步骤。

代码如下:

// 服务器端

router.get('/v2/datas', function(req, res) {

const key = _.get(req.query, 'key', '');

let contentKey = chatRoom.getContentKey();


while(key === contentKey) {

sleep.sleep(5);

contentKey = chatRoom.getContentKey();

}


const connectors = chatRoom.getConnectors();

const messages = chatRoom.getMessages();

res.json({

code: 200,

data: { connectors: connectors, messages: messages, key: contentKey },

});

});

以下是用 express-longpoll的实现片段:

// mini-chatroom/public/javascripts/server/longPolling.js

function pushDataToClient(key, longpoll) {

var contentKey = chatRoom.getContentKey();


if(key !== contentKey) {

var connectors = chatRoom.getConnectors();

var messages = chatRoom.getMessages();


long poll.publish(

'/v2/datas',

{

code: 200,

data: {connectors: connectors, messages: messages, key: contentKey},

}

);

}

}


long poll.create("/v2/datas", function(req, res, next) {

key = _.get(req.query, 'key', '');

pushDataToClient(key, longpoll);

next();

});


intervalId = setInterval(function() {

pushDataToClient(key, longpoll);

}, LONG_POLLING_TIMEOUT);

PS:完整代码,请见本文“4、本文配套Demo和代码”一节。

为了方便演示,我将客户端发起请求的timeout改成了4s,注意观察下面的截图:

可以看到,断开连接的两种方式,要么是超时,要么是请求有数据返回。

6.3 基于iframe的长轮询模式

这是长轮询技术的另一个种实现方案。

该方案的具体的原理为:

  • 1)在页面中嵌入一个iframe,地址指向轮询的服务器地址,然后在父页面中放置一个执行函数,比如execute(data);
  • 2)当服务器有内容改变时,会向iframe发送一个脚本<script>parent.execute(JSON.stringify(data))</script>;
  • 3)通过发送的脚本,主动执行父页面中的方法,达到推送的效果。

因不篇幅原因,在此不作深入介绍,有兴趣的同学可以详读《新手入门贴:史上最全Web端即时通讯技术原理详解》一文中的“3.3.2 基于iframe的数据流”一节。

7、什么是Server-Sent Events(SSE)

7.1 基本介绍

从纯技术的角度讲:上两节介绍的短轮询和长轮询技术,服务器端是无法主动给客户端推送消息的,都是客户端主动去请求服务器端获取最新的数据。

本节要介绍的SSE是一种可以主动从服务端推送消息的技术。

SSE的本质其实就是一个HTTP的长连接,只不过它给客户端发送的不是一次性的数据包,而是一个stream流,格式为text/event-stream。所以客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。

简单来说,SSE就是:

  • 1)SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
  • 2)SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。
  • 3)SSE 默认支持断线重连,WebSocket 需要自己实现。
  • 4)SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。
  • 5)SSE 支持自定义发送的消息类型。

SSE的技术原理如下图所示:

SSE基本的使用方法,可以参看 SSE 的API文档,地址是:https://developer.mozilla.org/en ... _server-sent_events。

目前除了IE以及低版本的浏览器不支持,绝大多数的现代浏览器都支持SSE:

上图来自:https://caniuse.com/?search=Server-Sent-Events)

7.2 代码实现

// 客户端

var SSENotification = {

source: null,

subscribe: function() {

if('EventSource'inwindow) {

this.source = newEventSource('/sse');


this.source.addEventListener('message', function(res) {

const d = res.data;

window.ChatroomDOM.renderData(JSON.parse(d));

});

}

return this.unsubscribe;

},

unsubscribe: function() {

this.source && this.source.close();

}

}


// 服务器端

router.get('/sse', function(req, res) {

const connectors = chatRoom.getConnectors();

const messages = chatRoom.getMessages();

const response = { code: 200, data: { connectors: connectors, messages: messages } };


res.writeHead(200, {

"Content-Type":"text/event-stream",

"Cache-Control":"no-cache",

"Connection":"keep-alive",

"Access-Control-Allow-Origin": '*',

});


res.write("retry: 10000\n");

res.write("data: "+ JSON.stringify(response) + "\n\n");


var unsubscribe = Event.subscribe(function() {

const connectors = chatRoom.getConnectors();

const messages = chatRoom.getMessages();

const response = { code: 200, data: { connectors: connectors, messages: messages } };

res.write("data: "+ JSON.stringify(response) + "\n\n");

});


req.connection.addListener("close", function() {

unsubscribe();

}, false);

});

下面是控制台的情况,注意观察响应类型:

详情中注意查看请求类型,以及EventStream消息类型:

PS:有关SSE更详尽的资料就不在这里展开了,有兴趣的同学可以详读《SSE技术详解:一种全新的HTML5服务器推送事件技术》、《使用WebSocket和SSE技术实现Web端消息推送》。

8、什么是WebSocket

8.1 基本介绍

PS:本小节内容引用自《Web端即时通讯实践干货:如何让WebSocket断网重连更快速?》一文的“3、快速了解WebSocket”。

WebSocket诞生于2008年,在2011年成为国际标准,现在所有的浏览器都已支持(详见《新手快速入门:WebSocket简明教程》)。它是一种全新的应用层协议,是专门为web客户端和服务端设计的真正的全双工通信协议,可以类比HTTP协议来了解websocket协议。

图片引用自《WebSocket详解(四):刨根问底HTTP与WebSocket的关系(上篇)

它们的不同点:

  • 1)HTTP的协议标识符是http,WebSocket的是ws;
  • 2)HTTP请求只能由客户端发起,服务器无法主动向客户端推送消息,而WebSocket可以;
  • 3)HTTP请求有同源限制,不同源之间通信需要跨域,而WebSocket没有同源限制。

它们的相同点:

  • 1)都是应用层的通信协议;
  • 2)默认端口一样,都是80或443;
  • 3)都可以用于浏览器和服务器间的通信;
  • 4)都基于TCP协议。

两者和TCP的关系图:

图片引用自《新手快速入门:WebSocket简明教程

有关Http和WebSocket的关系,可以详读:

《WebSocket详解(四):刨根问底HTTP与WebSocket的关系(上篇)》

《WebSocket详解(五):刨根问底HTTP与WebSocket的关系(下篇)》

有关WebSocket和Socket的关系,可以详读:《WebSocket详解(六):刨根问底WebSocket与Socket的关系》.

8.2 技术特征

WebSocket技术特征总结下就是:

  • 1)可双向通信,设计的目的主要是为了减少传统轮询时http连接数量的开销;
  • 2)建立在TCP协议之上,握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器;
  • 3)与HTTP兼容性良好,同样可以使用80和443端口;
  • 4)没有同源限制,客户端可以与任意服务器通信;
  • 5)可以发送文本,也可以发送二进制数据;
  • 6)协议标识符是ws(如果加密,则为wss),服务器网址就是 URL.

WebSocket的技术原理如下图所示:

关于WebSocket API方面的知识,这里不再作讲解,可以自己查阅:https://developer.mozilla.org/en-US/docs/Web/API/WebSocket

8.3 浏览器兼容性

WebSocket兼容性良好,基本支持所有现代浏览器。

上图来自:https://caniuse.com/mdn-api_websocket)

8.4 代码实现

笔者这里采用的是socket.io,是基于WebSocket的封装,提供了客户端以及服务器端的支持。

// 客户端

var WebsocketNotification = {

// ...

subscribe: function(args) {

var connector = args[1];

this.socket = io();

this.socket.emit('register', connector);

this.socket.on('register done', function() {

window.ChatroomDOM.renderAfterRegister();

});

this.socket.on('data', function(res) {

window.ChatroomDOM.renderData(res);

});

this.socket.on('disconnect', function() {

window.ChatroomDOM.renderAfterLogout();

});

}

// ...

}


// 服务器端

var io = socketIo(httpServer);

io.on('connection', (socket) => {

socket.on('register', function(connector) {

chatRoom.onConnect(connector);

io.emit('register done');

var data = chatRoom.getDatas();

io.emit('data', { data });

});

socket.on('chat', function(message) {

chatRoom.receive(message);

var data = chatRoom.getDatas();

io.emit('data', { data });

});

});

PS:完整代码,请见本文“4、本文配套Demo和代码”一节。

响应格式如下:

8.5 深入学习

随着HTML5的普及率越来越高,WebSocket的应用也越来越普及,关于WebSocket的学习资料网上很容易找到,限于篇幅本文就不深入展开这个话题。

如果想进一步深入学习WebSocket的方方面面,以下文章值得一读:

《新手快速入门:WebSocket简明教程》

《WebSocket详解(一):初步认识WebSocket技术》

《WebSocket详解(二):技术原理、代码演示和应用案例》

《WebSocket详解(三):深入WebSocket通信协议细节》

《WebSocket详解(四):刨根问底HTTP与WebSocket的关系(上篇)》

《WebSocket详解(五):刨根问底HTTP与WebSocket的关系(下篇)》

《WebSocket详解(六):刨根问底WebSocket与Socket的关系》

《理论联系实际:从零理解WebSocket的通信原理、协议格式、安全性》

《微信小程序中如何使用WebSocket实现长连接(含完整源码)》

《八问WebSocket协议:为你快速解答WebSocket热门疑问》

《Web端即时通讯实践干货:如何让你的WebSocket断网重连更快速?》

《WebSocket从入门到精通,半小时就够!》

《WebSocket硬核入门:200行代码,教你徒手撸一个WebSocket服务器》

《长连接网关技术专题(四):爱奇艺WebSocket实时推送网关技术实践》

9、本文小结

短轮询、长轮询实现成本相对比较简单,适用于一些实时性要求不高的消息推送,在实时性要求高的场景下,会存在延迟以及会给服务器带来更大的压力。

SSE只能是服务器端推送消息,因此对于不需要双向通信的项目比较适用。

WebSocket目前而言实现成本相对较低,适合于双工通信,对于多人在线,要求实时性较高的项目比较实用。

学习交流:

- 移动端IM开发入门文章:《新手入门一篇就够:从零开发移动端IM》

- 开源IM框架源码:https://github.com/JackJiang2011/MobileIMSDK

本文已同步发布于“即时通讯技术圈”公众号。

同步发布链接是:http://www.52im.net/thread-3555-1-1.html

之前在搭建jumpserver的时候有用到了Guacamole,但是没有对Guacamole做一个详细的介绍,刚好在网上看到一篇Guacamole官方手册的翻译,所以整理了一些相关内容,下面介绍一下Guacamole的具体实现和架构。


Guacamole

Guacamole 是一个基于 HTML 5 和 JavaScript 的 VNC 查看器,服务端基于 Java 的 VNC-to-XML 代理开发。要求浏览器支持 HTML 5。目前该项目是 Apache 基金会的孵化项目。

该项目包含三个组件,分别是:

  • apache/incubator-guacamole-server
  • apache/incubator-guacamole-client
  • apache/incubator-guacamole-website

Guacamole 是无客户端的远程桌面网关,Guacamole 支持标准协议,比如 VNC 和 RDP。

Guacamole 不要求任何插件和客户端软件。

Guacamole 是提供远程桌面的解决方案的开源项目,通过浏览器就能操作虚拟机,适用于Chrome,Firefox,IE10等浏览器(浏览器需要支持HTML5)。 由于使用 HTML5,Guancamole 只要在一个服务器安装成功,你访问你的桌面就是访问一个 web 浏览器。


Guacamole的介绍以及架构

Guacamole不是一个独立的Web应用程序,而是由许多部件组成的。Web应用程序实际上是整个项目里最小最轻量的,大部分的功能依靠Guacamole的底层组件来完成。

用户通过浏览器连接到Guacamole的服务端。Guacamole的客户端是用javascript编写的,Guacamole server通过web容器(比如tomcat)把服务提供给用户。一旦加载,客户端通过http承载着Guacamole自己的定义的协议与服务端通信。

部署在Guacamole server这边的Web应用程序,解析到的Guacamole protocal,就传给Guacamole的代理guacd,这个代理(guacd)实际上就是解析Guacamole protocal,替用户连接到远程机器。

Guacamole protocal协议本身以及guacd的存在,实现了协议的透明:Guacamole客户端(浏览器运行的js)和Web应用程序,都不需要知道远程桌面具体用哪个协议(VNC,RDP etc)


Guacamole protocal协议

Web应用程序是不知道任何远程桌面协议。应用程序不包含支持VNC,RDP等其他协议的Guacamole模块。应用程序只需要理解 Guacamole protocal协议,这个协议支持显示渲染和消息传输。尽管Guacamole protocal的这些功能与一个远程桌面协议类似,不过远程桌面协议和Guacamole protocal背后的设计原则是不一样的:Guacamole protocal并不是为了实现一个特定的桌面环境的远程功能。

Guacamole protocal作为一个远程显示和交互协议,实现了现有的远程桌面协议的超集(superset)。为了让Guacamole支持一个新的远程桌面协议 (比如RDP),需要写一个中间层(middle layer)来实现Guacamole protocal和这个新协议的转换。实现这样的转换机制和本地客户端远程访问桌面的实现没什么区别,唯一的不同是这个转换是要渲染远程的显示器(浏览 器),而不是本地客户端(比如real vnc)。

而实现这个协议互相转换的中间层就是guacd。


guacd

guacd是Guacamole的“心脏”,插件式的动态支持远程桌面协议,根据Web应用程序发来的指令连接到远程桌面。

guacd是和Guzcamole一起被安装到机器的,以驻留后台进程形式提供代理服务,接收来自Web应用程序的Tcp连接。guacd同样也不 知道具体的远程桌面协议,它只是需要决定加载那个插件并且传送特定的参数给插件。(这个插件就是用来解析具体远程桌面协议的)一旦这个插件被加载,插件就 独立于guacd运行,对Web应用程序和自己之间的会话(conmunication)具有绝对的控制权,直到插件被关闭。


Web应用程序

在Guacamole中与用户打交道的就是Web应用程序。

之前说过,Web应用程序自己不实现任何的远程桌面协议。Web应用程序依赖guacd,只关心优美的界面以及权限认证。

Web应用程序用Java语言编写,对,只要你愿意,完全可以用其他语言实现。因为,事实上,Guacamole被设计成API,我们鼓励开发者使用API开发自己的应用。


RealMint

说到Guacamole,一般是指一个远程桌面的网关,但是也不完全是这样。起初,Guacamole用javascript写了一个纯文本的Telnet客户端叫做:RealMint(RealMint是一个终端的名字)。这个项目主要是写了个示范程序,目的希望它能有用,它一度声名鹊起是因为RealMint是只用javascript写的。

RealMint的隧道是用PHP写的。跟Guacamole的HTTP的隧道相比,RealMint的隧道用的是简单的长轮询技术,比较低效。RealMint有一个比较像样的键盘实现是保存至今,被应用到Guacamole的键盘部分的代码,也许这就是RealMint唯一保留下来的功能和特性。

鉴于RealMint只是实现了一个古老的协议(telnet),业界内还有几个其他的比较成熟稳定的javascript客户端模拟器的实现,RealMint这个项目就被遗弃了。


VNC Client

开发者一旦接触HTML5的canvas标签,就会发现这个标签Firefox和Chrome已经支持,而且开始替代哪些所谓的Javascript实现的VNC的客户端了。

完全用javascript实现的客户端加上java服务器的组件,工作原理是把VNC协议转化成一样的基于XML的版本。这样的实现受限于 VNC的特性,无法把一个连接传送给多个用户。概念上的项目需要很好的线上环境,虽然发展有点滞后,一个HTML5的VNC客户端已经在SourceForge上以“Guacamole”的名字注册了。

当Guacamole慢慢发展,变得不仅仅是一个概念上的产品时,需求快速增长,过去的像RealMint一样采取长轮询的XML的解决方案就慢慢被废弃了。

因为Websocket此时还无法被完全信赖,Java有没有Websocket的servlets标准,一个替代品,基于HTTP的隧道解决方案应运而生。


远程桌面网关(Remote Desktop Gateway)

一个更快的基于文本的协议被提出,它可以支持现有的大多数的远程桌面协议,不仅仅是VNC。整个系统被重新设计成一个标准的后台驻留进程(guacd)和一些公用的库文件(libguac),可扩展地支持远程桌面协议。

这个项目从一个完整的VNC客户端扩展成一个高性能的HTML5远程桌面网关以及通用API。目前,Guacamole被用做一个中心网关,可以支持连接任意数量的,运行着不同远程桌面协议的机器。提供可扩展地认证体系,这样你可以做一些特定的适配,Guacamole也提供一个用户Html5远程 连接的通用API。


篇幅有限,关于Guacamole的内容就介绍到这了,后面会分享更多devops和DBA方面的内容,感兴趣的朋友可以关注一下~

天这篇文章来介绍一下Nacos配置中心的原理之一:长轮询机制的应用

为方便理解与表达,这里把 Nacos 控制台和 Nacos 注册中心称为 Nacos 服务器(就是 web 界面那个),我们编写的业务服务称为 Nacso 客户端;

Nacos 动态监听的长轮询机制原理图,本篇将围绕这张图剖析长轮询定时机制的原理:

ConfigService 是 Nacos 客户端提供的用于访问实现配置中心基本操作的类型,我们将从 ConfigService 的实例化开始长轮询定时机制的源码之旅;

1. 客户端的长轮询定时机制

我们从NacosPropertySourceLocator.locate()开始【断点步入】:


1.1 利用反射机制实例化 NacosConfigService 对象

客户端的长轮询定时任务是在 NacosFactory.createConfigService() 方法中,构建 ConfigService 对象是实时启动的,我们接着 1.1 处的源码;

进入 NacosFactory.createConfigService():

public static ConfigService createConfigService(Properties properties) throws NacosException {
    //【断点步入】创建 ConfigService
    return ConfigFactory.createConfigService(properties);
}

进入 ConfigFactory.createConfigService(),发现其使用反射机制实例化 NacosConfigService 对象;

1.2 NacosConfigService 的构造方法里启动长轮询定时任务

进入 NacosConfigService.NacosConfigService() 构造方法,里面设置了一些跟远程任务相关的属性;

1.2.1 初始化 HttpAgent

MetricsHttpAgent 类的设计如下:

ServerHttpAgent 类的设计如下:

1.2.2 初始化 ClientWorker

进入 ClientWorker.ClientWorker() 构造方法,主要是创建了两个定时调度的线程池,并启动一个定时任务;

进入 ClientWorker.checkConfigInfo(),每隔 10s 检查一次配置是否发生变化;

  • cacheMap:是一个 AtomicReference<Map<String, CacheData>> 对象,用来存储监听变更的缓存集合,key 是根据 datalD/group/tenant(租户)拼接的值。Value 是对应的存储在 Nacos 服务器上的配置文件的内容;
  • 长轮询任务拆分:默认情况下,每个长轮询 LongPollingRunnable 任务处理3000个监听配置集。如果超过3000个,则需要启动多个 LongPollingRunnable 去执行;

1.3 检查配置变更,读取变更配置 LongPollingRunnable.run()

因为我们没有这么多配置项,debug 不进去,所以直接找到 LongPollingRunnable.run() 方法,该方法的主要逻辑是:

根据 taskld 对 cacheMap 进行数据分割;

再通过checkLocalConfig() 方法比较本地配置文件(在${user}\nacos\config\ 里)的数据是否存在变更,如果有变更则直接触发通知;

public void run() {
    List<CacheData> cacheDatas = new ArrayList();
    ArrayList inInitializingCacheList = new ArrayList();
    try {
        //遍历 CacheData,检查本地配置
        Iterator var3 = ((Map)ClientWorker.this.cacheMap.get()).values().iterator();
        while(var3.hasNext()) {
            CacheData cacheData = (CacheData)var3.next();
            if (cacheData.getTaskId() == this.taskId) {
                cacheDatas.add(cacheData);
                try {
                    //检查本地配置
                    ClientWorker.this.checkLocalConfig(cacheData);
                    if (cacheData.isUseLocalConfigInfo()) {
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception var13) {
                    ClientWorker.LOGGER.error("get local config info error", var13);
                }
            }
        }
        //【断点步入 1.3.1】通过长轮询请求检查服务端对应的配置是否发生变更
        List<String> changedGroupKeys = ClientWorker.this.checkUpdateDataIds(cacheDatas, inInitializingCacheList);
        //遍历存在变更的 groupKey,重新加载最新数据
        Iterator var16 = changedGroupKeys.iterator();
        while(var16.hasNext()) {
            String groupKey = (String)var16.next();
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                //【断点步入 1.3.2】读取变更配置,这里的 dataId、group 和 tenant 是【1.3.1】里获取的
                String content = ClientWorker.this.getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId, group, tenant));
                cache.setContent(content);
                ClientWorker.LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", new Object[]{ClientWorker.this.agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)});
            } catch (NacosException var12) {
                String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", ClientWorker.this.agent.getName(), dataId, group, tenant);
                ClientWorker.LOGGER.error(message, var12);
            }
        }
        //触发事件通知
        var16 = cacheDatas.iterator();
        while(true) {
            CacheData cacheDatax;
            do {
                if (!var16.hasNext()) {
                    inInitializingCacheList.clear();
                    //继续定时执行当前线程
                    ClientWorker.this.executorService.execute(this);
                    return;
                }
                cacheDatax = (CacheData)var16.next();
            } while(cacheDatax.isInitializing() && !inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheDatax.dataId, cacheDatax.group, cacheDatax.tenant)));
            cacheDatax.checkListenerMd5();
            cacheDatax.setInitializing(false);
        }
    } catch (Throwable var14) {
        ClientWorker.LOGGER.error("longPolling error : ", var14);
        ClientWorker.this.executorService.schedule(this, (long)ClientWorker.this.taskPenaltyTime, TimeUnit.MILLISECONDS);
    }
}

注意:这里的断点需要注意 Nacos 服务器上修改配置(间隔大于 30s),进入后才好理解;

1.3.1 检查配置变更 ClientWorker.checkUpdateDataIds()

我们点进 ClientWorker.checkUpdateDataIds()​ 方法,发现其最终调用的是 ClientWorker.checkUpdateConfigStr() 方法,其实现逻辑与源码如下:

  • 通过MetricsHttpAgent.httpPost()​ 方法(上面 1.2.1 有提到)调用/v1/cs/configs/listener 接口实现长轮询请求;
  • 轮询请求在实现层面只是设置了一个比较长的超时时间,默认是 30s;
  • 如果服务端的数据发生了变更,客户端会收到一个 HttpResult ,服务端返回的是存在数据变更的 Data ID、Group、Tenant;
  • 获得这些信息之后,在LongPollingRunnable.run() 方法中调用 getServerConfig() 去 Nacos 服务器上读取具体的配置内容;
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
    List<String> params = Arrays.asList("Listening-Configs", probeUpdateString);
    List<String> headers = new ArrayList(2);
    headers.add("Long-Pulling-Timeout");
    headers.add("" + this.timeout);
    if (isInitializingCacheList) {
        headers.add("Long-Pulling-Timeout-No-Hangup");
        headers.add("true");
    }
    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    } else {
        try {
            //调用 /v1/cs/configs/listener 接口实现长轮询请求,返回的 HttpResult 里包含存在数据变更的 Data ID、Group、Tenant
            HttpResult result = this.agent.httpPost("/v1/cs/configs/listener", headers, params, this.agent.getEncode(), this.timeout);
            if (200 == result.code) {
                this.setHealthServer(true);
                //
                return this.parseUpdateDataIdResponse(result.content);
            }
            this.setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", this.agent.getName(), result.code);
        } catch (IOException var6) {
            this.setHealthServer(false);
            LOGGER.error("[" + this.agent.getName() + "] [check-update] get changed dataId exception", var6);
            throw var6;
        }
        return Collections.emptyList();
    }
}

1.3.2 读取变更配置 ClientWorker.getServerConfig()

进入 ClientWorker.getServerConfig()​ 方法;读取服务器上的变更配置;最终调用的是 MetricsHttpAgent.httpGet()​ 方法(上面 1.2.1 有提到),调用 /v1/cs/configs​ 接口获取配置;然后通过调用 LocalConfigInfoProcessor.saveSnapshot() 将变更的配置保存到本地;

2. 服务端的长轮询定时机制

2.1 服务器接收请求 ConfigController.listener()

Nacos客户端 通过 HTTP 协议与服务器通信,那么在服务器源码里必然有对应接口的实现;

在 nacos-config 模块下的 controller 包,提供了个 ConfigController 类来处理请求,其中有个 /listener 接口,是客户端发起数据监听的接口,其主要逻辑和源码如下:

  • 获取客户端需要监听的可能发生变化的配置,并计算 MD5 值;
  • ConfigServletInner.doPollingConfig() 开始执行长轮询请求;

2.2 执行长轮询请求 ConfigSer

vletInner.doPollingConfig()

进入 ConfigServletInner.doPollingConfig() 方法,该方法封装了长轮询的实现逻辑,同时兼容短轮询逻辑;

进入 LongPollingService.addLongPollingClient() 方法,里面是长轮询的核心处理逻辑,主要作用是把客户端的长轮询请求封装成 ClientPolling 交给 scheduler 执行;

2.3 创建线程执行定时任务 ClientLongPolling.run()

我们找到 ClientLongPolling.run() 方法,这里可以体现长轮询定时机制的核心原理,通俗来说,就是:

服务端收到请求之后,不立即返回,没有变更则在延后 (30-0.5)s 把请求结果返回给客户端;

这就使得客户端和服务端之间在 30s 之内数据没有发生变化的情况下一直处于连接状态;

2.4 监听配置变更事件

2.4.1 监听 LocalDataChangeEvent 事件的实现

当我们在 Nacos 服务器或通过 API 方式变更配置后,会发布一个 LocalDataChangeEvent 事件,该事件会被 LongPollingService 监听;

这里 LongPollingService 为什么具有监听功能在 1.3.1 版本后有些变化:

  • 1.3.1 前:LongPollingService.onEvent();
  • 1.3.1 后:Subscriber.onEvent();

在 Nacos 1.3.1 版本之前,通过 LongPollingService 继承 AbstractEventListener 实现监听,覆盖 onEvent() 方法;

而在 1.3.2 版本之后,通过构造订阅者实现

效果是一样的,实现了对 LocalDataChangeEvent 事件的监听,并通过通过线程池执行 DataChangeTask 任务;

2.4.2 监听事件后的处理逻辑 DataChangeTask.run()

我们找到 DataChangeTask.run() 方法,这个线程任务实现了

3. 源码结构图小结

3.1 客户端的长轮询定时机制

  • NacosPropertySourceLocator.locate() :初始化 ConfigService 对象,定位配置;
  • NacosConfigService.NacosConfigService():NacosConfigService 的构造方法;
  • Executors.newScheduledThreadPool():创建 executor 线程池;
  • Executors.newScheduledThreadPool():创建 executorService 线程池;
  • ClientWorker.checkConfigInfo():使用 executor 线程池检查配置是否发生变化;
  • ClientWorker.checkLocalConfig():检查本地配置;
  • ClientWorker.checkUpdateDataIds():检查服务端对应的配置是否发生变更;
  • ClientWorker.getServerConfig():读取变更配置
  • MetricsHttpAgent.httpPost():调用 /v1/cs/configs/listener 接口实现长轮询请求;
  • ClientWorker.checkUpdateConfigStr():检查服务端对应的配置是否发生变更;
  • MetricsHttpAgent.httpGet():调用 /v1/cs/configs 接口获取配置;
  • LongPollingRunnable.run():运行长轮询定时线程;
  • MetricsHttpAgent.MetricsHttpAgent():初始化 HttpAgent;
  • ClientWorker.ClientWorker():初始化 ClientWorker;
  • NacosFactory.createConfigService():创建配置服务器;
  • ConfigFactory.createConfigService():利用反射机制创建配置服务器;

3.2 服务端的长轮询定时机制

  • ConfigController.listener() :服务器接收请求;
  • LongPollingService.addLongPollingClient():长轮询的核心处理逻辑,提前 500ms 返回响应;
  • ClientLongPolling.run():长轮询定时机制的实现逻辑;
  • Map.put():将 ClientLongPolling 实例本身添加到 allSubs 队列中;
  • Queue.remove():把 ClientLongPolling 实例本身从 allSubs 队列中移除;
  • MD5Util.compareMd5():比较数据的 MD5 值;
  • LongPollingService.sendResponse():将变更的结果通过 response 返回给客户端;
  • ConfigExecutor.scheduleLongPolling():启动定时任务,延时时间为 29.5s;
  • HttpServletRequest.getHeader():获取客户端设置的请求超时时间;
  • MD5Util.compareMd5():和服务端的数据进行 MD5 对比;
  • ConfigExecutor.executeLongPolling():创建 ClientLongPolling 线程执行定时任务;
  • MD5Util.getClientMd5Map():计算 MD5 值;
  • ConfigServletInner.doPollingConfig():执行长轮询请求;

3.3 Nacos 服务器配置变更的事件监听

Nacos 服务器上的配置发生变更后,发布一个 LocalDataChangeEvent 事件;

Subscriber.onEvent() :监听 LocalDataChangeEvent 事件(1.3.2 版本后);

  • DataChangeTask.run():根据 groupKey 返回配置;
  • ConfigExecutor.executeLongPolling():通过线程池执行 DataChangeTask 任务;

原文链接:https://developer.51cto.com/article/713995.html