整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

从底层理解Python的执行

编者按】下面博文将带你创建一个字节码级别的追踪API以追踪Python的一些内部机制,比如类似YIELDVALUE、YIELDFROM操作码的实现,推式构造列表(List Comprehensions)、生成器表达式(generator expressions)以及其他一些有趣Python的编译。

关于译者:赵斌,OneAPM工程师,常年使用 Python/Perl 脚本,从事 DevOP、测试开发相关的开发工作。业余热爱看书,喜欢 MOOC。

以下为译文

最近我在学习 Python 的运行模型。我对 Python 的一些内部机制很是好奇,比如 Python 是怎么实现类似 YIELDVALUE、YIELDFROM 这样的操作码的;对于 递推式构造列表(List Comprehensions)、生成器表达式(generator expressions)以及其他一些有趣的 Python 特性是怎么编译的;从字节码的层面来看,当异常抛出的时候都发生了什么事情。翻阅 CPython 的代码对于解答这些问题当然是很有帮助的,但我仍然觉得以这样的方式来做的话对于理解字节码的执行和堆栈的变化还是缺少点什么。GDB 是个好选择,但是我懒,而且只想使用一些比较高阶的接口写点 Python 代码来完成这件事。

所以呢,我的目标就是创建一个字节码级别的追踪 API,类似 sys.setrace 所提供的那样,但相对而言会有更好的粒度。这充分锻炼了我编写 Python 实现的 C 代码的编码能力。我们所需要的有如下几项,在这篇文章中所用的 Python 版本为 3.5。

  • 一个新的 Cpython 解释器操作码
  • 一种将操作码注入到 Python 字节码的方法
  • 一些用于处理操作码的 Python 代码

一个新的 Cpython 操作码

新操作码:DEBUG_OP

这个新的操作码 DEBUG_OP 是我第一次尝试写 CPython 实现的 C 代码,我将尽可能的让它保持简单。 我们想要达成的目的是,当我们的操作码被执行的时候我能有一种方式来调用一些 Python 代码。同时,我们也想能够追踪一些与执行上下文有关的数据。我们的操作码会把这些信息当作参数传递给我们的回调函数。通过操作码能辨识出的有用信息如下:

所以呢,我们的操作码需要做的事情是:

  • 找到回调函数
  • 创建一个包含堆栈内容的列表
  • 调用回调函数,并将包含堆栈内容的列表和当前帧作为参数传递给它

听起来挺简单的,现在开始动手吧!声明:下面所有的解释说明和代码是经过了大量段错误调试之后总结得到的结论。首先要做的是给操作码定义一个名字和相应的值,因此我们需要在Include/opcode.h中添加代码。

/** My own comments begin by '**' **/
/** From: Includes/opcode.h **/

/* Instruction opcodes for compiled code */

/** We just have to define our opcode with a free value
    0 was the first one I found **/
#define DEBUG_OP                0

#define POP_TOP                 1
#define ROT_TWO                 2
#define ROT_THREE               3

这部分工作就完成了,现在我们去编写操作码真正干活的代码。

实现 DEBUG_OP

在考虑如何实现DEBUG_OP之前我们需要了解的是DEBUG_OP提供的接口将长什么样。 拥有一个可以调用其他代码的新操作码是相当酷眩的,但是究竟它将调用哪些代码捏?这个操作码如何找到回调函数的捏?我选择了一种最简单的方法:在帧的全局区域写死函数名。那么问题就变成了,我该怎么从字典中找到一个固定的 C 字符串?为了回答这个问题我们来看看在 Python 的 main loop 中使用到的和上下文管理相关的标识符__enter____exit__。

我们可以看到这两标识符被使用在操作码SETUP_WITH中:

/** From: Python/ceval.c **/
TARGET(SETUP_WITH) {
_Py_IDENTIFIER(__exit__);
_Py_IDENTIFIER(__enter__);
PyObject *mgr = TOP;
PyObject *exit = special_lookup(mgr, &PyId___exit__), *enter;
PyObject *res;

现在,看一眼宏_Py_IDENTIFIER的定义

/** From: Include/object.h **/

/********************* String Literals ****************************************/
/* This structure helps managing static strings. The basic usage goes like this:
   Instead of doing

       r = PyObject_CallMethod(o, "foo", "args", ...);

   do

       _Py_IDENTIFIER(foo);
       ...
       r = _PyObject_CallMethodId(o, &PyId_foo, "args", ...);

   PyId_foo is a static variable, either on block level or file level. On first
   usage, the string "foo" is interned, and the structures are linked. On interpreter
   shutdown, all strings are released (through _PyUnicode_ClearStaticStrings).

   Alternatively, _Py_static_string allows to choose the variable name.
   _PyUnicode_FromId returns a borrowed reference to the interned string.
   _PyObject_{Get,Set,Has}AttrId are __getattr__ versions using _Py_Identifier*.
*/
typedef struct _Py_Identifier {
    struct _Py_Identifier *next;
    const char* string;
    PyObject *object;
} _Py_Identifier;

#define _Py_static_string_init(value) { 0, value, 0 }
#define _Py_static_string(varname, value)  static _Py_Identifier varname = _Py_static_string_init(value)
#define _Py_IDENTIFIER(varname) _Py_static_string(PyId_##varname, #varname)

嗯,注释部分已经说明得很清楚了。通过一番查找,我们发现了可以用来从字典找固定字符串的函数_PyDict_GetItemId,所以我们操作码的查找部分的代码就是长这样滴。

 /** Our callback function will be named op_target **/
PyObject *target = NULL;
_Py_IDENTIFIER(op_target);
target = _PyDict_GetItemId(f->f_globals, &PyId_op_target);
if (target == NULL && _PyErr_OCCURRED) {
    if (!PyErr_ExceptionMatches(PyExc_KeyError))
        goto error;
    PyErr_Clear;
    DISPATCH;
}

为了方便理解,对这一段代码做一些说明:

  • f是当前的帧,f->f_globals是它的全局区域
  • 如果我们没有找到op_target,我们将会检查这个异常是不是KeyError
  • goto error;是一种在 main loop 中抛出异常的方法
  • PyErr_Clear抑制了当前异常的抛出,而DISPATCH触发了下一个操作码的执行

下一步就是收集我们想要的堆栈信息。

/** This code create a list with all the values on the current stack **/
PyObject *value = PyList_New(0);
for (i = 1 ; i <= STACK_LEVEL; i++) {
    tmp = PEEK(i);
    if (tmp == NULL) {
        tmp = Py_None;
    }
    PyList_Append(value, tmp);
}

最后一步就是调用我们的回调函数!我们用call_function来搞定这件事,我们通过研究操作码CALL_FUNCTION的实现来学习怎么使用call_function

/** From: Python/ceval.c **/
TARGET(CALL_FUNCTION) {
    PyObject **sp, *res;
    /** stack_pointer is a local of the main loop.
        It's the pointer to the stacktop of our frame **/
    sp = stack_pointer;
    res = call_function(&sp, oparg);
    /** call_function handles the args it consummed on the stack for us **/
    stack_pointer = sp;
    PUSH(res);
    /** Standard exception handling **/
    if (res == NULL)
        goto error;
    DISPATCH;
}

有了上面这些信息,我们终于可以捣鼓出一个操作码DEBUG_OP的草稿了:

TARGET(DEBUG_OP) {
    PyObject *value = NULL;
    PyObject *target = NULL;
    PyObject *res = NULL;
    PyObject **sp = NULL;
    PyObject *tmp;
    int i;
    _Py_IDENTIFIER(op_target);

    target = _PyDict_GetItemId(f->f_globals, &PyId_op_target);
    if (target == NULL && _PyErr_OCCURRED) {
        if (!PyErr_ExceptionMatches(PyExc_KeyError))
            goto error;
        PyErr_Clear;
        DISPATCH;
    }
    value = PyList_New(0);
    Py_INCREF(target);
    for (i = 1 ; i <= STACK_LEVEL; i++) {
        tmp = PEEK(i);
        if (tmp == NULL)
            tmp = Py_None;
        PyList_Append(value, tmp);
    }

    PUSH(target);
    PUSH(value);
    Py_INCREF(f);
    PUSH(f);
    sp = stack_pointer;
    res = call_function(&sp, 2);
    stack_pointer = sp;
    if (res == NULL)
        goto error;
    Py_DECREF(res);
    DISPATCH;
}

在编写 CPython 实现的 C 代码方面我确实没有什么经验,有可能我漏掉了些细节。如果您有什么建议还请您纠正,我期待您的反馈。

编译它,成了!

一切看起来很顺利,但是当我们尝试去使用我们定义的操作码DEBUG_OP的时候却失败了。自从 2008 年之后,Python 使用预先写好的goto(你也可以从这里获取更多的讯息)。故,我们需要更新下 goto jump table,我们在 Python/opcode_targets.h 中做如下修改。

/** From: Python/opcode_targets.h **/
/** Easy change since DEBUG_OP is the opcode number 1 **/
static void *opcode_targets[256] = {
    //&&_unknown_opcode,
    &&TARGET_DEBUG_OP,
    &&TARGET_POP_TOP,
    /** ... **/

这就完事了,我们现在就有了一个可以工作的新操作码。唯一的问题就是这货虽然存在,但是没有被人调用过。接下来,我们将DEBUG_OP注入到函数的字节码中。

在 Python 字节码中注入操作码 DEBUG_OP

有很多方式可以在 Python 字节码中注入新的操作码:

  • 使用 peephole optimizer, Quarkslab就是这么干的
  • 在生成字节码的代码中动些手脚
  • 在运行时直接修改函数的字节码(这就是我们将要干的事儿)

为了创造出一个新操作码,有了上面的那一堆 C 代码就够了。现在让我们回到原点,开始理解奇怪甚至神奇的 Python!

我们将要做的事儿有:

  • 得到我们想要追踪函数的 code object
  • 重写字节码来注入DEBUG_OP
  • 将新生成的 code object 替换回去

和 code object 有关的小贴士

如果你从没听说过 code object,这里有一个简单的 介绍网路上也有一些相关的文档可供查阅,可以直接Ctrl+F查找 code object

还有一件事情需要注意的是在这篇文章所指的环境中 code object 是不可变的:

Python 3.4.2 (default, Oct  8 2014, 10:45:20)
[GCC 4.9.1] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> x = lambda y : 2
>>> x.__code__
<code object <lambda> at 0x7f481fd88390, file "<stdin>", line 1>
>>> x.__code__.co_name
'<lambda>'
>>> x.__code__.co_name = 'truc'
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: readonly attribute
>>> x.__code__.co_consts = ('truc',)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: readonly attribute

但是不用担心,我们将会找到方法绕过这个问题的。

使用的工具

为了修改字节码我们需要一些工具:

  • dis模块用来反编译和分析字节码
  • dis.BytecodePython 3.4 新增的一个特性,对于反编译和分析字节码特别有用
  • 一个能够简单修改 code object 的方法

dis.Bytecode反编译 code bject 能告诉我们一些有关操作码、参数和上下文的信息。

# Python3.4
>>> import dis
>>> f = lambda x: x + 3
>>> for i in dis.Bytecode(f.__code__): print (i)
...
Instruction(opname='LOAD_FAST', opcode=124, arg=0, argval='x', argrepr='x', offset=0, starts_line=1, is_jump_target=False)
Instruction(opname='LOAD_CONST', opcode=100, arg=1, argval=3, argrepr='3', offset=3, starts_line=None, is_jump_target=False)
Instruction(opname='BINARY_ADD', opcode=23, arg=None, argval=None, argrepr='', offset=6, starts_line=None, is_jump_target=False)
Instruction(opname='RETURN_VALUE', opcode=83, arg=None, argval=None, argrepr='', offset=7, starts_line=None, is_jump_target=False)

为了能够修改 code object,我定义了一个很小的类用来复制 code object,同时能够按我们的需求修改相应的值,然后重新生成一个新的 code object。

class MutableCodeObject(object):
    args_name = ("co_argcount", "co_kwonlyargcount", "co_nlocals", "co_stacksize", "co_flags", "co_code",
                  "co_consts", "co_names", "co_varnames", "co_filename", "co_name", "co_firstlineno",
                   "co_lnotab", "co_freevars", "co_cellvars")

    def __init__(self, initial_code):
        self.initial_code = initial_code
        for attr_name in self.args_name:
            attr = getattr(self.initial_code, attr_name)
            if isinstance(attr, tuple):
                attr = list(attr)
            setattr(self, attr_name, attr)

    def get_code(self):
        args = 
        for attr_name in self.args_name:
            attr = getattr(self, attr_name)
            if isinstance(attr, list):
                attr = tuple(attr)
            args.append(attr)
        return self.initial_code.__class__(*args)

这个类用起来很方便,解决了上面提到的 code object 不可变的问题。

>>> x = lambda y : 2
>>> m = MutableCodeObject(x.__code__)
>>> m
<new_code.MutableCodeObject object at 0x7f3f0ea546a0>
>>> m.co_consts
[None, 2]
>>> m.co_consts[1] = '3'
>>> m.co_name = 'truc'
>>> m.get_code
<code object truc at 0x7f3f0ea2bc90, file "<stdin>", line 1>

测试我们的新操作码

我们现在拥有了注入DEBUG_OP的所有工具,让我们来验证下我们的实现是否可用。我们将我们的操作码注入到一个最简单的函数中:

from new_code import MutableCodeObject

def op_target(*args):
    print("WOOT")
    print("op_target called with args <{0}>".format(args))

def nop:
    pass

new_nop_code = MutableCodeObject(nop.__code__)
new_nop_code.co_code = b"\x00" + new_nop_code.co_code[0:3] + b"\x00" + new_nop_code.co_code[-1:]
new_nop_code.co_stacksize += 3

nop.__code__ = new_nop_code.get_code

import dis
dis.dis(nop)
nop


# Don't forget that ./python is our custom Python implementing DEBUG_OP
hakril@computer ~/python/CPython3.5 % ./python proof.py
  8           0 <0>
              1 LOAD_CONST               0 (None)
              4 <0>
              5 RETURN_VALUE
WOOT
op_target called with args <([], <frame object at 0x7fde9eaebdb0>)>
WOOT
op_target called with args <([None], <frame object at 0x7fde9eaebdb0>)>

看起来它成功了!有一行代码需要说明一下new_nop_code.co_stacksize += 3

  • co_stacksize 表示 code object 所需要的堆栈的大小
  • 操作码DEBUG_OP往堆栈中增加了三项,所以我们需要为这些增加的项预留些空间

现在我们可以将我们的操作码注入到每一个 Python 函数中了!

重写字节码

正如我们在上面的例子中所看到的那样,重写 Pyhton 的字节码似乎 so easy。为了在每一个操作码之间注入我们的操作码,我们需要获取每一个操作码的偏移量,然后将我们的操作码注入到这些位置上(把我们操作码注入到参数上是有坏处大大滴)。这些偏移量也很容易获取,使用dis.Bytecode,就像这样 。
def add_debug_op_everywhere(code_obj):
    # We get every instruction offset in the code object
    offsets = [instr.offset for instr in dis.Bytecode(code_obj)]
    # And insert a DEBUG_OP at every offset
    return insert_op_debug_list(code_obj, offsets)

def insert_op_debug_list(code, offsets):
    # We insert the DEBUG_OP one by one
    for nb, off in enumerate(sorted(offsets)):
        # Need to ajust the offsets by the number of opcodes already inserted before
        # That's why we sort our offsets!
        code = insert_op_debug(code, off + nb)
    return code

# Last problem: what does insert_op_debug looks like?

基于上面的例子,有人可能会想我们的insert_op_debug会在指定的偏移量增加一个"\x00",这尼玛是个坑啊!我们第一个DEBUG_OP注入的例子中被注入的函数是没有任何的分支的,为了能够实现完美一个函数注入函数insert_op_debug我们需要考虑到存在分支操作码的情况。

Python 的分支一共有两种:

  • 绝对分支:看起来是类似这样子的Instruction_Pointer = argument(instruction)
  • 相对分支:看起来是类似这样子的Instruction_Pointer += argument(instruction)

我们希望这些分支在我们插入操作码之后仍然能够正常工作,为此我们需要修改一些指令参数。以下是其逻辑流程:

  • 对于每一个在插入偏移量之前的相对分支而言
    • 如果目标地址是严格大于我们的插入偏移量的话,将指令参数增加 1
    • 如果相等,则不需要增加 1 就能够在跳转操作和目标地址之间执行我们的操作码DEBUG_OP
    • 如果小于,插入我们的操作码的话并不会影响到跳转操作和目标地址之间的距离
  • 对于 code object 中的每一个绝对分支而言
    • 如果目标地址是严格大于我们的插入偏移量的话,将指令参数增加 1
    • 如果相等,那么不需要任何修改,理由和相对分支部分是一样的
    • 如果小于,插入我们的操作码的话并不会影响到跳转操作和目标地址之间的距离

下面是实现:

# Helper
def bytecode_to_string(bytecode):
    if bytecode.arg is not None:
        return struct.pack("<Bh", bytecode.opcode, bytecode.arg)
    return struct.pack("<B", bytecode.opcode)

# Dummy class for bytecode_to_string
class DummyInstr:
    def __init__(self, opcode, arg):
        self.opcode = opcode
        self.arg = arg

def insert_op_debug(code, offset):
    opcode_jump_rel = ['FOR_ITER', 'JUMP_FORWARD', 'SETUP_LOOP', 'SETUP_WITH', 'SETUP_EXCEPT', 'SETUP_FINALLY']
    opcode_jump_abs = ['POP_JUMP_IF_TRUE', 'POP_JUMP_IF_FALSE', 'JUMP_ABSOLUTE']
    res_codestring = b""
    inserted = False
    for instr in dis.Bytecode(code):
        if instr.offset == offset:
            res_codestring += b"\x00"
            inserted = True
        if instr.opname in opcode_jump_rel and not inserted: #relative jump are always forward
            if offset < instr.offset + 3 + instr.arg: # inserted beetwen jump and dest: add 1 to dest (3 for size)
                #If equal: jump on DEBUG_OP to get info before exec instr
                res_codestring += bytecode_to_string(DummyInstr(instr.opcode, instr.arg + 1))
                continue
        if instr.opname in opcode_jump_abs:
            if instr.arg > offset:
                res_codestring += bytecode_to_string(DummyInstr(instr.opcode, instr.arg + 1))
                continue
        res_codestring += bytecode_to_string(instr)
    # replace_bytecode just replaces the original code co_code
    return replace_bytecode(code, res_codestring)

让我们看一下效果如何:

>>> def lol(x):
...     for i in range(10):
...         if x == i:
...             break

>>> dis.dis(lol)
101           0 SETUP_LOOP              36 (to 39)
              3 LOAD_GLOBAL              0 (range)
              6 LOAD_CONST               1 (10)
              9 CALL_FUNCTION            1 (1 positional, 0 keyword pair)
             12 GET_ITER
        >>   13 FOR_ITER                22 (to 38)
             16 STORE_FAST               1 (i)

102          19 LOAD_FAST                0 (x)
             22 LOAD_FAST                1 (i)
             25 COMPARE_OP               2 (==)
             28 POP_JUMP_IF_FALSE       13

103          31 BREAK_LOOP
             32 JUMP_ABSOLUTE           13
             35 JUMP_ABSOLUTE           13
        >>   38 POP_BLOCK
        >>   39 LOAD_CONST               0 (None)
             42 RETURN_VALUE
>>> lol.__code__ = transform_code(lol.__code__, add_debug_op_everywhere, add_stacksize=3)


>>> dis.dis(lol)
101           0 <0>
              1 SETUP_LOOP              50 (to 54)
              4 <0>
              5 LOAD_GLOBAL              0 (range)
              8 <0>
              9 LOAD_CONST               1 (10)
             12 <0>
             13 CALL_FUNCTION            1 (1 positional, 0 keyword pair)
             16 <0>
             17 GET_ITER
        >>   18 <0>

102          19 FOR_ITER                30 (to 52)
             22 <0>
             23 STORE_FAST               1 (i)
             26 <0>
             27 LOAD_FAST                0 (x)
             30 <0>

103          31 LOAD_FAST                1 (i)
             34 <0>
             35 COMPARE_OP               2 (==)
             38 <0>
             39 POP_JUMP_IF_FALSE       18
             42 <0>
             43 BREAK_LOOP
             44 <0>
             45 JUMP_ABSOLUTE           18
             48 <0>
             49 JUMP_ABSOLUTE           18
        >>   52 <0>
             53 POP_BLOCK
        >>   54 <0>
             55 LOAD_CONST               0 (None)
             58 <0>
             59 RETURN_VALUE

# Setup the simplest handler EVER
>>> def op_target(stack, frame):
...     print (stack)

# GO
>>> lol(2)


[<class 'range'>]
[10, <class 'range'>]
[range(0, 10)]
[<range_iterator object at 0x7f1349afab80>]
[0, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[0, 2, <range_iterator object at 0x7f1349afab80>]
[False, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[1, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[1, 2, <range_iterator object at 0x7f1349afab80>]
[False, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[2, 2, <range_iterator object at 0x7f1349afab80>]
[True, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]

[None]

甚好!现在我们知道了如何获取堆栈信息和 Python 中每一个操作对应的帧信息。上面结果所展示的结果目前而言并不是很实用。在最后一部分中让我们对注入做进一步的封装。

增加 Python 封装

正如您所见到的,所有的底层接口都是好用的。我们最后要做的一件事是让 op_target 更加方便使用(这部分相对而言比较空泛一些,毕竟在我看来这不是整个项目中最有趣的部分)。

首先我们来看一下帧的参数所能提供的信息,如下所示:

  • f_code当前帧将执行的 code object
  • f_lasti当前的操作(code object 中的字节码字符串的索引)

经过我们的处理我们可以得知DEBUG_OP之后要被执行的操作码,这对我们聚合数据并展示是相当有用的。

新建一个用于追踪函数内部机制的类:

  • 改变函数自身的co_code
  • 设置回调函数作为op_debug的目标函数

一旦我们知道下一个操作,我们就可以分析它并修改它的参数。举例来说我们可以增加一个auto-follow-called-functions的特性。

def op_target(l, f, exc=None):
    if op_target.callback is not None:
        op_target.callback(l, f, exc)

class Trace:
    def __init__(self, func):
        self.func = func

    def call(self, *args, **kwargs):
        self.add_func_to_trace(self.func)
        # Activate Trace callback for the func call
        op_target.callback = self.callback
        try:
            res = self.func(*args, **kwargs)
        except Exception as e:
            res = e
        op_target.callback = None
        return res

    def add_func_to_trace(self, f):
        # Is it code? is it already transformed?
        if not hasattr(f ,"op_debug") and hasattr(f, "__code__"):
            f.__code__ = transform_code(f.__code__, transform=add_everywhere, add_stacksize=ADD_STACK)
            f.__globals__['op_target'] = op_target
            f.op_debug = True

    def do_auto_follow(self, stack, frame):
        # Nothing fancy: FrameAnalyser is just the wrapper that gives the next executed instruction
        next_instr = FrameAnalyser(frame).next_instr
        if "CALL" in next_instr.opname:
            arg = next_instr.arg
            f_index = (arg & 0xff) + (2 * (arg >> 8))
            called_func = stack[f_index]

            # If call target is not traced yet: do it
            if not hasattr(called_func, "op_debug"):
                self.add_func_to_trace(called_func)

现在我们实现一个 Trace 的子类,在这个子类中增加 callback 和 doreport 这两个方法。callback 方法将在每一个操作之后被调用。doreport 方法将我们收集到的信息打印出来。

这是一个伪函数追踪器实现:

class DummyTrace(Trace):
    def __init__(self, func):
        self.func = func
        self.data = collections.OrderedDict
        self.last_frame = None
        self.known_frame = 
        self.report = 

    def callback(self, stack, frame, exc):
        if frame not in self.known_frame:
            self.known_frame.append(frame)
            self.report.append(" === Entering New Frame {0} ({1}) ===".format(frame.f_code.co_name, id(frame)))
            self.last_frame = frame
        if frame != self.last_frame:
            self.report.append(" === Returning to Frame {0} {1}===".format(frame.f_code.co_name, id(frame)))
            self.last_frame = frame

        self.report.append(str(stack))
        instr = FrameAnalyser(frame).next_instr
        offset = str(instr.offset).rjust(8)
        opname = str(instr.opname).ljust(20)
        arg = str(instr.arg).ljust(10)
        self.report.append("{0}  {1} {2} {3}".format(offset, opname, arg, instr.argval))
        self.do_auto_follow(stack, frame)

    def do_report(self):
        print("\n".join(self.report))

这里有一些实现的例子和使用方法。格式有些不方便观看,毕竟我并不擅长于搞这种对用户友好的报告的事儿。

递推式构造列表(List Comprehensions)的追踪示例 。

总结

这个小项目是一个了解 Python 底层的良好途径,包括解释器的 main loop,Python 实现的 C 代码编程、Python 字节码。通过这个小工具我们可以看到 Python 一些有趣构造函数的字节码行为,例如生成器、上下文管理和递推式构造列表。

这里是这个小项目的完整代码。更进一步的,我们还可以做的是修改我们所追踪的函数的堆栈。我虽然不确定这个是否有用,但是可以肯定是这一过程是相当有趣的。

6月3-5日,北京国家会议中心,第七届中国云计算大会,3天主会,17场分论坛,3场实战培训,160+位讲师,议题全公开!

者 | Einstellung

责编 | 郭芮

首先介绍一下背景。笔者参加的一个关于风机开裂故障分析的预警比赛。训练数据有将近5万个样本,测试数据近9万。数据来自SCADA采集系统。采集了10分钟之内的75个特征值的数据信息,label是一周以内风机是否会发生故障的label。

数据介绍

每个样本10分钟之内大概采集到了450条数据,一共75个特征,也就是差不多75*450个信息。最后三个特征完全没有数据,所以一开始做的时候,我们就把最后三个特征进行删除,所以实际上是对72个特征进行的数据分析。

最开始,用的是seaborn画的正常风机和不正常风机的频率分布图,比如说对于轮毂转速这个特征:

 1import seaborn as snsimport pandas as pd
 2data_file = r"D:\fan_fault\feature1.csv"
 3pre_process = pd.read_csv(data_file, encoding = "gbk")
 4
 5pre_process = pre_process.fillna(0)
 6feature1_plot = pre_process["normal(0)"]
 7
 8feature2_plot2 = pre_process["fault(1)"]
 9sns.kdeplot(feature1_plot, shade = True)
10sns.kdeplot(feature2_plot2, shade = True)

大部分特征都是这样,没有很好的区分度。正是因为如此,也一直没有尝试出来非常好的模型。后来我们尝试用MATLAB画图,每个特征出两个图:

看起来要比seaborn好一些(后两个图和第一个不是一个特征)。我们在做数据分析这一块很大的问题是在于只去查了各个特征的物理含义,做了频率和频数分布图,看看是否有没有好的特征,然后就直接进入了下一步。忘了考虑是否可能会出现因为采集问题而导致的异常值和空缺值问题。这一点导致后面我们的很多工作都需要推倒重来。

数据分析

我们从统计上来看,并没有找到很好的区分度特征,然后就考虑从物理上来找。在老师的建议下,我们尝试了有轮毂转速,风速为6.5m/s时,y方向振动值的特征:

依旧没有很好的区分度,对于其他风速尝试也是如此。

随后我们讨论到了阈值、记0等方式构造新特征。在考虑记0这个新特征构造办法时,突然发现大气压力这个特征居然有0的情况。根据物理学的知识来讲,风机的大气压力是不可能为0的。然后我们才想起来,没有对数据的异常值进行处理。删除了有8万多条整行全为0的数据,导致某些文件为空,也就是这个风机没有数据信息。当然,也有某些风机是某几行为0。

除了删除空缺值,我们还对其他明显是异常的数据进行了一些数据清洗工作。因为之前我们对于数据特征数统计分析是根据未清洗的数据做的分析,所以分析的可靠性也有点问题,后面就产生了一些不必要的麻烦。我们也做了一些相关性分析的工作,大部分特征相关性十分的高。几十个特征两两组合然后进行相关性分析,会有数千个结果,相关性分析没有办法进行下去。后来,我们就没有考虑相关性的事情。

特征工程

我们最开始尝试对前72个特征构造均值,作为基准尝试看看效果如何。

 1import os
 2import pandas as pd
 3import numpy as np
 4import csv
 5
 6label_file = r"C:\fan_fault\train\trainX"
 7train_mean = r"D:\fan_fault\train_mean_new.csv"
 8
 9with open(train_mean, "a", newline = '', encoding = "utf-8") as f:
10 train_mean = csv.writer(f) 
11
12 for x in range(1, 48340):
13 fan_file = os.path.join(label_file, str(x) + ".csv")
14 print("程序运行进度为", x/48340) #用该语句查看工作进度状态
15
16 with open(fan_file, encoding='utf-8') as f:
17 feature_read = pd.read_csv(f) 
18 #遍历打开文件的每一个特征(72),求取均值
19 # a用来临时存放计算好的特征均值,外加一个label
20
21 a = [] 
22 for i in range(72):
23 mean_num = feature_read.iloc[:, i]
24 mean_num = np.array(mean_num).mean() 
25 #生成每个特征所有数据对应的均值
26 a.append(mean_num)
27
28 train_mean.writerow(a)

也包括绝对值差分累计、差分均值、差分方差,用随机森林进行调参。

 1# -*- coding: utf-8 -*-"""
 2
 3import numpy as np
 4import pandas as pd
 5from sklearn.preprocessing import MinMaxScaler
 6from sklearn.ensemble import RandomForestClassifier
 7from sklearn.model_selection import cross_val_scorefrom sklearn 
 8import metrics
 9from sklearn.model_selection import GridSearchCV
10
11#数据导入、检查空缺值
12data = pd.read_csv(r'D:\next\8_19\train_data.csv',encoding = "gbk")
13label = pd.read_csv(r"D:\next\8_19\train_label.csv")
14data.info()
15data.notnull().sum(axis=0)/data.shape[0]
16train = data.iloc[:,:-1]
17label = label.iloc[:,-1]
18
19#数据标准化
20scaler = MinMaxScaler()
21train = scaler.fit(train).transform(train)
22
23#单个分类器
24clf = RandomForestClassifier(random_state=14)
25f1 = cross_val_score(clf, train, label, scoring='f1')
26print("f1:{0:.1f}%".format(np.mean(f1)*100))
27
28#调参
29parameter_space = { 
30 'n_estimators':range(10,200,10), 
31 'max_depth':range(1,10), 
32 'min_samples_split':range(2,10),
33 }
34clf = RandomForestClassifier(random_state=14)
35grid = GridSearchCV(clf,parameter_space,scoring='f1', n_jobs = 6)
36grid.fit(train,label)
37print("f1:(0:.1f)%".format(grid.best_score_*100))
38print(grid.best_estimator_)
39
40#调参后的分类器
41new_clf = RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
42 max_depth=7, max_features='auto', max_leaf_nodes=None,
43 min_impurity_decrease=0.0, min_impurity_split=None,
44 min_samples_leaf=1, min_samples_split=7,
45 min_weight_fraction_leaf=0.0, n_estimators=10, n_jobs=1,
46 oob_score=False, random_state=14, verbose=0,warm_start=False)
47print("f1:{0:.1f}%".format(np.mean(f1)*100))

测试集输出预测结果如下:

 1#数据标准化
 2scaler = MinMaxScaler()
 3train = scaler.fit(train).transform(train)
 4test = scaler.fit(test).transform(test)
 5
 6#训练分类器
 7clf = RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
 8 max_depth=8, max_features='auto', max_leaf_nodes=None,
 9 min_impurity_decrease=0.0, min_impurity_split=None,
10 min_samples_leaf=1, min_samples_split=5,
11 min_weight_fraction_leaf=0.0, n_estimators=20, n_jobs=5,
12 oob_score=False, random_state=14, verbose=0, warm_start=False)
13clf = clf.fit(train, label)
14#预测结果
15pre = clf.predict(test)
16
17#测试结果文件写入
18import csv
19
20label = r"D:/fan_fault/label.csv"
21
22with open(label, "w", newline = '', encoding = "utf-8") as f:
23 label = csv.writer(f)
24
25 for x in range(len(pre)):
26 label.writerow(pre[x:x+1])

测试效果来看,并没有取得十分理想的效果。

之后想起来没有考虑数据清洗,之前的工作全部推倒从来。我们在此期间,也调查整理关于这72个特征和风机叶片开裂故障的原因,发现从文献上没有找到和叶片开裂故障有很好相关性的特征,对于特征进行的改造也没有很好的区分效果。后期我们咨询过相关行业的工程师,他们说这些特征中也没有十分强相关性的特征。我们就考虑到特征之间两两交叉相乘看看效果。

 1# 交叉特征有2844列,分别是自身的平方和相互交叉,最后求均值方差,最后三个特征不单独再生成交叉特征
 2
 3import os
 4import pandas as pd
 5import numpy as np
 6import csv
 7from sklearn.preprocessing import PolynomialFeatures
 8
 9label_file = r"F:\User\Xinyuan Huang\train_labels.csv"
10fan_folder = r"F:\User\Xinyuan Huang"
11read_label = pd.read_csv(label_file)
12
13cross_var = r"F:\User\Xinyuan Huang\CaiJi\Feature_crosses\cross_var.csv"
14
15with open(cross_var, "a", newline = '', encoding = "utf-8") as f:
16 cross_var = csv.writer(f)
17
18 # 该for循环用于定位要打开的文件
19 for x in range(len(read_label)-1):
20 column1 = str(read_label["f_id"][x:x+1]) 
21 #遍历DataFrame第一列的f_id标签下面的每一个数
22 column2 = str(read_label["file_name"][x:x+1]) 
23 #遍历DataFrame第二列的file_name标签下面的每一个数
24 column3 = str(read_label["ret"][x:x+1]) 
25 #遍历DataFrame第三列的ret标签下面的每一个数
26
27 f_id = column1.split()[1] 
28 #第一行的文件所对应的f_id进行切片操作,获取对应的数字
29 # 对f_id进行补0操作
30 f_id = f_id.zfill(3) 
31 # 比如2补成002,所以这里写3
32 file_name = column2.split()[1] 
33 #第一行的文件所对应的file_name
34 label = column3.split()[1] 
35 #第一行文件所对应的ret
36
37 fan_file = os.path.join(fan_folder, "train", f_id, file_name)
38 print("程序运行进度为", x/(len(read_label)-1)) 
39 #用该语句查看工作进度状态
40
41 # 打开相应的fan_file文件进行读取操作
42 with open(fan_file, encoding='utf-8') as f:
43 dataset = pd.read_csv(f)
44 #数据集名称为dataset
45 poly = PolynomialFeatures(degree=2, include_bias=False,interaction_only=False)
46 X_ploly = poly.fit_transform(dataset)
47 data_ploly = pd.DataFrame(X_ploly, columns=poly.get_feature_names())
48
49 new_data = data_ploly.ix[:,75:-6]
50
51 #ploly_mean,ploly_var为交叉特征均值方差
52 ploly_mean = np.mean(new_data)
53 ploly_var = np.var(ploly_mean)
54
55 ploly_var = list(ploly_var)
56 ploly_var.append(label)
57
58 cross_var.writerow(ploly_var)

交叉相乘之后的文件有数千个特征,生成的文件有将近2G大小。考虑到服务器性能不高,计算旷日持久。不对特征进行筛选,直接进行交叉之后跑算法这条路被我们放弃了。

后来阜特科技的杨工帮我们筛选了一些比较重要的特征,我们在此基础之上进行了一些特征交叉和重要性排序的操作,特征缩小到了几百个(包含交叉、均值、方差等,经过重要性排序),然后用它来跑得模型。

特征里面有一些特征是离散特征,对于这些特征我们进行单独处理,进行离散化。比如说偏航要求值总共有3个值分别是1,2,3。我们对其进行离散化处理,一个特征就变成了三个特征。每个样本统计出现这三个特征的频率。

 1import os
 2import pandas as pd
 3import numpy as np
 4import csv
 5
 6label_file = r"E:\8_19\testX_csv"
 7
 8train_mean = r"E:\8_19\disperse\discrete56.csv"
 9
10with open(train_mean, "a", newline = '', encoding = "utf-8") as f:
11 train_mean = csv.writer(f)
12
13 for x in range(1, 451):
14 fan_file = os.path.join(label_file, str(x) + ".csv")
15# print("程序运行进度为", x/451) #用该语句查看工作进度状态
16
17 with open(fan_file, encoding='utf-8') as f:
18 feature_read = pd.read_csv(f, header = None)
19
20 num1 = 0
21 num2 = 0
22 num3 = 0
23
24 a = []
25
26 for x in range(len(feature_read)):
27 if feature_read[55][x] == 0:
28 num1 = num1+1
29 if feature_read[55][x] == 1:
30 num2 = num2+1
31 if feature_read[55][x] == 2:
32 num3 = num3+1
33
34 num1 = num1/len(feature_read)
35 num2 = num2/len(feature_read)
36 num3 = num3/len(feature_read)
37
38 a.append(num1)
39 a.append(num2)
40 a.append(num3)
41
42 train_mean.writerow(a)

算法

我们最后主要用的算法是Xgboost,期间也尝试过LightGBM,因为算力不够的原因,没有办法尝试一些算法(包括杨工说的SVM以及深度学习的想法),最后主要用Xgboost进行调参,直接一起调参的话算力不够,我们是单个调参以及两两调参组合的形式进行参数调整。

 1from xgboost import XGBClassifier
 2import xgboost as xgb
 3
 4import pandas as pd 
 5import numpy as np
 6
 7from sklearn.model_selection import GridSearchCV
 8from sklearn.model_selection import StratifiedKFold
 9
10from sklearn.metrics import log_loss
11from sklearn.preprocessing import MinMaxScaler
12
13
14#数据导入、检查空缺值
15data = pd.read_csv(r'D:\next\8_19\train_data.csv',encoding = "gbk")
16label = pd.read_csv(r"D:\next\8_19\train_label.csv")
17test = pd.read_csv(r"D:\next\8_19\test_data.csv", encoding = "gbk")
18train = data.iloc[:,:-1]
19label = label.iloc[:,-1]
20
21X_train = train
22y_train = label
23
24#数据标准化
25scaler = MinMaxScaler()
26train = scaler.fit(train).transform(train)
27test = scaler.fit(test).transform(test)
28
29#交叉验证
30kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=3)
31
32param_test1 = {
33 'max_depth':list(range(3,10,1)),
34 'min_child_weight':list(range(1,6,1))
35}
36gsearch1 = GridSearchCV(estimator = XGBClassifier( learning_rate =0.1, n_estimators=400, max_depth=5,
37 min_child_weight=1, gamma=0, subsample=0.8, colsample_bytree=0.8,
38 objective= 'binary:logistic', nthread=4, scale_pos_weight=1, seed=27), 
39 param_grid = param_test1, scoring='roc_auc',n_jobs=4,iid=False, cv=5)
40gsearch1.fit(X_train,y_train)
41gsearch1.grid_scores_, gsearch1.best_params_, gsearch1.best_score_

调完这个参数之后,把最好的输出结果拿出来放在下一个参数调优里进行调整:

 1aram_test1 = {
 2 'learning_rate':[i/100.0 for i in range(6,14,2)]
 3}
 4gsearch1 = GridSearchCV(estimator = XGBClassifier( learning_rate =0.1, n_estimators=400, max_depth=6,
 5 min_child_weight=1, gamma=0, subsample=0.8, colsample_bytree=0.8,
 6 objective= 'binary:logistic', nthread=6, scale_pos_weight=1, seed=27), 
 7 param_grid = param_test1, scoring='roc_auc',n_jobs=-1,iid=False, cv=5)
 8gsearch1.fit(X_train,y_train)
 9gsearch1.grid_scores_, gsearch1.best_params_, gsearch1.best_score_
10
11param_test1 = {
12 'subsample':[0.8, 0.9]
13}
14gsearch1 = GridSearchCV(estimator = XGBClassifier( learning_rate =0.1, n_estimators=310, max_depth=6,
15 min_child_weight=1, gamma=0, subsample=0.9, colsample_bytree=0.8,
16 objective= 'binary:logistic', nthread=6, scale_pos_weight=1, seed=27), 
17 param_grid = param_test1, scoring='roc_auc',n_jobs=-1,iid=False, cv=5)
18gsearch1.fit(X_train,y_train)
19gsearch1.grid_scores_, gsearch1.best_params_, gsearch1.best_score_

还可以调整好几个参数,最后结果输出:

 1import xgboost as xgb
 2dtrain=xgb.DMatrix(X_train ,label=y_train)
 3dtest=xgb.DMatrix(test)
 4
 5params={
 6 'objective': 'binary:logistic',
 7 'max_depth':6,
 8 'subsample':0.8,
 9 'colsample_bytree':0.8,
10 'min_child_weight':1,
11 'seed':27,
12 'nthread':6,
13 'learning_rate':0.1,
14 'n_estimators':292,
15 'gamma':0,
16 'scale_pos_weight':1}
17
18watchlist = [(dtrain,'train')]
19
20bst=xgb.train(params,dtrain,num_boost_round=100,evals=watchlist)
21
22ypred=bst.predict(dtest)
23
24import csv
25
26test_label = r"D:\next\8_20\test_label_new.csv"
27with open(test_label, "a", newline = '', encoding = "utf-8") as f:
28 test_label = csv.writer(f)
29
30 for x in range(len(ypred)):
31 a = []
32 if ypred[x] < 0.5:
33 a.append(0)
34 test_label.writerow(a)
35 else:
36 a.append(1)
37 test_label.writerow(a)

即使是单个调参和两两调参,对于我们而言,计算速度还是太慢,我们为此也尝试了Hyperopt方法。通俗的说,我们用的是掷骰子方法,也就是在一个划定参数区域内随机地掷骰子,哪个参数被掷成几,我们就用这个数来训练模型。最后返回一个掷的最好的参数结果。这个方法有很大的局限性,一是结果的随机性,二是很容易局部收敛。但是,如果用来粗糙地验证一个特征构造的好坏,也不失为一个好方法。

 1# -*- coding: utf-8 -*-
 2"""
 3Created on Fri May 18 14:09:06 2018
 4
 6"""
 7
 8import numpy as np
 9import pandas as pd
10from sklearn.preprocessing import MinMaxScaler
11import xgboost as xgb
12from random import shuffle
13from xgboost.sklearn import XGBClassifier
14from sklearn.cross_validation import cross_val_score
15import pickle
16import time
17from hyperopt import fmin, tpe, hp,space_eval,rand,Trials,partial,STATUS_OK
18import random
19
20data = pd.read_csv(r'D:\next\select_data\new_feature.csv', encoding = "gbk").values
21label = pd.read_csv(r'D:\next\select_data\new_label.csv').values
22labels = label.reshape((1,-1))
23label = labels.tolist()[0]
24
25minmaxscaler = MinMaxScaler()
26attrs = minmaxscaler.fit_transform(data)
27
28index = range(0,len(label))
29random.shuffle(label)
30trainIndex = index[:int(len(label)*0.7)]
31print (len(trainIndex))
32testIndex = index[int(len(label)*0.7):]
33print (len(testIndex))
34attr_train = attrs[trainIndex,:]
35print (attr_train.shape)
36attr_test = attrs[testIndex,:]
37print (attr_test.shape)
38label_train = labels[:,trainIndex].tolist()[0]
39print (len(label_train))
40label_test = labels[:,testIndex].tolist()[0]
41print (len(label_test))
42print (np.mat(label_train).reshape((-1,1)).shape)
43
44
45def GBM(argsDict):
46 max_depth = argsDict["max_depth"] + 5
47# n_estimators = argsDict['n_estimators'] * 5 + 50
48 n_estimators = 627
49 learning_rate = argsDict["learning_rate"] * 0.02 + 0.05
50 subsample = argsDict["subsample"] * 0.1 + 0.7
51 min_child_weight = argsDict["min_child_weight"]+1
52
53 print ("max_depth:" + str(max_depth))
54 print ("n_estimator:" + str(n_estimators))
55 print ("learning_rate:" + str(learning_rate))
56 print ("subsample:" + str(subsample))
57 print ("min_child_weight:" + str(min_child_weight))
58
59 global attr_train,label_train
60
61 gbm = xgb.XGBClassifier(nthread=6, #进程数
62 max_depth=max_depth, #最大深度
63 n_estimators=n_estimators, #树的数量
64 learning_rate=learning_rate, #学习率
65 subsample=subsample, #采样数
66 min_child_weight=min_child_weight, #孩子数
67
68 max_delta_step = 50, #50步不降则停止
69 objective="binary:logistic")
70
71 metric = cross_val_score(gbm,attr_train,label_train,cv=3, scoring="f1", n_jobs = -1).mean()
72 print (metric)
73 return -metric
74
75space = {"max_depth":hp.randint("max_depth",15),
76 "n_estimators":hp.quniform("n_estimators",100,1000,1), #[0,1,2,3,4,5] -> [50,]
77 #"learning_rate":hp.quniform("learning_rate",0.01,0.2,0.01), #[0,1,2,3,4,5] -> 0.05,0.06
78 #"subsample":hp.quniform("subsample",0.5,1,0.1),#[0,1,2,3] -> [0.7,0.8,0.9,1.0]
79 #"min_child_weight":hp.quniform("min_child_weight",1,6,1), #
80
81 #"max_depth":hp.randint("max_depth",15),
82 # "n_estimators":hp.randint("n_estimators",10), #[0,1,2,3,4,5] -> [50,]
83 "learning_rate":hp.randint("learning_rate",6), #[0,1,2,3,4,5] -> 0.05,0.06
84 "subsample":hp.randint("subsample",3),#[0,1,2,3] -> [0.7,0.8,0.9,1.0]
85 "min_child_weight":hp.randint("min_child_weight",2)
86
87 }
88algo = partial(tpe.suggest,n_startup_jobs=1)
89best = fmin(GBM,space,algo=algo,max_evals=50) #max_evals表示想要训练的最大模型数量,越大越容易找到最优解
90
91print (best)
92print (GBM(best))

最终结果

我们首先把数据进行分类处理。对于那些空缺值的数据,我们直接给label为1(表示异常),对于空缺值的处理只能摸奖。在分析训练样本的分布时,我们还发现有一些阈值的特征,就是那些特征大于某些值或者小于某些值之后故障风机要明显比正常风机多出很多,这样,我们可以用阈值判断直接给label,剩下不能判断的再用算法进行判断。然后最后时间比较紧,阈值部分的没有做,除去了空缺值之后,其他的全部用算法进行判断。

杨工告诉我们,应该做分析的时候分析“轮毂转速”大于3的数据,因为风机工作才可以检测出来异常,如果风机不工作是很难判断的。但是因为时间比较紧,对于训练集我们就没有进行这样的操作,于是测试集也没有进行这样的划分。全部都一起塞进算法里了。

总结一下。我们对于特征进行交叉和重要性排序,综合考虑杨工说的重要特征和算法反馈的重要特征排序,最后生成一百多个特征的特征文件用来训练(训练样本是经过数据清洗过后的样本)。

测试集分为两部分,一部分是空缺值,直接标1,另一部分放算法里出结果。

总结

首先最大的一个坑是开始没有做数据清洗的工作,后来发现了之后从新来了一遍。再后来是杨工和我们说应该分析工作风机,拿工作风机来进行训练。如果这样做的话又要推倒从来,当时时间已经十分紧张了,心有余而力不足。对于比赛或者说数据分析工作来说,数据的理解是第一位的。否则很可能会做不少无用功。有的时候,受限于专业背景,我们很难充分地理解数据和业务场景,这时候应该向专业人士进行请教,把这些工作都做好之后再进行数据分析要好很多。

其次,提高自己地代码和算法的能力。既要懂算法又要能撸出一手好代码,这样才能提高效率。我写代码写得太慢,十分制约我的想法实现速度。算法不太懂,也不能很好地参与到算法讨论环节。

另外,版本控制十分重要。我们每天都在实现新的想法,文件很多也很乱。经常出现刚发一个文件,过一段时间就找不到那个文件或者忘了有没有发那个文件地情况。

声明:本文为公众号 经管人学数据分析 投稿,版权归对方所有。

征稿啦

CSDN 公众号秉持着「与千万技术人共成长」理念,不仅以「极客头条」、「畅言」栏目在第一时间以技术人的独特视角描述技术人关心的行业焦点事件,更有「技术头条」专栏,深度解读行业内的热门技术与场景应用,让所有的开发者紧跟技术潮流,保持警醒的技术嗅觉,对行业趋势、技术有更为全面的认知。

如果你有优质的文章,或是行业热点事件、技术趋势的真知灼见,或是深度的应用实践、场景方案等的新见解,欢迎联系 CSDN 投稿,联系方式:微信(guorui_1118,请备注投稿+姓名+公司职位),邮箱(guorui@csdn.net)。

者 | 徐麟

责编 | 胡巍巍

前言

很多人提到B站,首先想到的就会是二次元或者鬼畜。

上个月,笔者也发表了一篇关于B站鬼畜视频的文章:

大数据解读B站火过蔡徐坤的“鬼畜“区巨头们

然而,实际上B站其实是个非常神奇的网站,里面的内容可谓是包罗万象,有趣的弹幕文化也能极大地提高大家的体验,B站也逐渐地成为了一个用来学习的“神器”。

近期B站获得了央视网的力挺,报道称B站已经成为了越来越多的年轻人的学习阵地,正所谓“我在B站看番,你却在B站学习” ,今天我们就来爬取B站上那些播放量、弹幕量排名靠前的编程类视频,一起去了解B站的另一面。

数据来源

我们此次的数据主要来源于B站搜索框中输入“编程”后的视频列表及相关信息:

B站一共提供了物种视频排序的方式,每种能够返回前1000个视频,我们分别爬取五种排序所得到的1000个视频之后对5000个视频进行排序,最终得到了2000多个编程类视频的信息。

同时我们也增加了一些筛选条件,使得最终获取到的编程教学视频更具代表性:a.所属分类为科技类 b.视频时长大于60分钟,部分代码如下:

## 获得列表
def get_list(i,j):
 attempts = 0
 success = False
 while attempts < 5 and not success:
 try:
 url = 'https://search.bilibili.com/all?keyword=%E7%BC%96%E7%A8%8B&from_source=banner_search&order={}&duration=4&tids_1=36&page={}'.format(i,j+1) 
 header = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win32; x32; rv:54.0) Gecko/20100101 Firefox/54.0',
 'Connection': 'keep-alive'}
 cookies ='v=3; iuuid=1A6E888B4A4B29B16FBA1299108DBE9CDCB327A9713C232B36E4DB4FF222CF03; webp=true; ci=1%2C%E5%8C%97%E4%BA%AC; __guid=26581345.3954606544145667000.1530879049181.8303; _lxsdk_cuid=1646f808301c8-0a4e19f5421593-5d4e211f-100200-1646f808302c8; _lxsdk=1A6E888B4A4B29B16FBA1299108DBE9CDCB327A9713C232B36E4DB4FF222CF03; monitor_count=1; _lxsdk_s=16472ee89ec-de2-f91-ed0%7C%7C5; __mta=189118996.1530879050545.1530936763555.1530937843742.18'
 cookie = {}
 for line in cookies.split(';'):
 name, value = cookies.strip().split('=', 1)
 cookie[name] = value 
 html = requests.get(url,cookies=cookie, headers=header).content
 bsObj = BeautifulSoup(html.decode('utf-8'),"html.parser")
 script = bsObj.find_all('script')[3].text
 info = json.loads(script.replace('window.__INITIAL_STATE__=','').split(';(function()')[0])['allData']['video']
 return info
 except:
 attempts = attempts+1
 return []
coding_all = []
type = ['click','stow','dm']
for i in type:
 for j in range(50):
 this_coding = get_list(i,j)
 coding_all = coding_all+this_coding

最终,我们获取到了如下的视频信息列表:

数据分析

获取到数据之后,我们首先关注的是这些视频的主要内容,通过视频给出的标签,绘制整体内容总结的词云图:

可以看到,上面的词云除了编程语言,技术之外包含了许多类似于学习,教程这样的通用描述性词汇,我们需要进一步从中筛选出与编程语言、技术相关的词云,提高词云图的效果:

可以看到,经过筛选后的词云图效果要好很多,其中基本上囊括了现在比较火的编程语言,如Java、Python 以及数据结构、机器学习这些技术类的内容,下面我们来看一下各编程语言的播放量及弹幕量对比:

我们此次将Linux也划分到语言类中,可以看到目前基本上就是处于Python、C语言、Java三组鼎力的态势,Python略微领先于其他两种语言,这也一定程度反映了当今的整体发展趋势。由此可见,B站的内容也是与时俱进,适合年轻人去学习了解编程整体发展趋势。

看完了语言类,我们再来看一下具体的技术类排行榜:

可以看到,前端、人工智能、数据框、爬虫这些大家比较关心以及公司有较大需求量的技术都出现在了榜单中,在B站如果能将自己所要从事领域的视频认真学习,也会有很大的提高,部分代码如下:

## 分组统计
coding_tag = dataframe_explode(coding,'tag')
coding_tag['tag'] = coding_tag['tag'].apply(str.lower)
coding_tag['type'] = coding_tag['tag'].map({tag_dict['tag'][k]:tag_dict['type'][k] for k in range(tag_dict.shape[0])})
coding_tag = coding_tag.groupby(['title','pic','author','arcurl','tag','type'],as_index=False).agg({'play':'max','danmu':'max','favorites':'max','review':'max'})
tag_count = coding_tag.groupby(['tag','type'],as_index=False).agg({'title':['count'],'play':['sum'],'danmu':['sum'],'favorites':['sum']}) 
tag_count.columns = ['tag','type','num','play','danmu','favorites']
## 绘制图片
coding_stat = tag_count[tag_count['type']=='语言']
coding_stat.sort_values('play',ascending=False,inplace=True)
attr = coding_stat['tag'][0:10]
v1 = coding_stat['play'][0:10]
bar = Bar("语言类播放量TOP10")
bar.add("播放数量", attr, v1, is_stack=True, xaxis_rotate=30,xaxis_label_textsize=18,
 xaxis_interval =0,is_splitline_show=False,label_text_size=12,is_label_show=True)
bar.render('语言类播放量TOP10.html')

精品视频

分析完整体视频内容的分布情况,我们再来看下那些最为精品的视频,由于B站以弹幕文化为特色,我们就依据弹幕量来为大家精选出一些非常不错的视频,首先是所有编程类视频的TOP20:

我们下面分别看一下三足鼎立中的Python、Java、C语言分别弹幕量排名前十的视频信息:

写在最后

B站的阿婆主为为大家提供了特别多的编程学习资源,大家在学习知识的同时,也需要注意的就是相应的版权信息。

上传视频一定要确认版权不存在问题之后再去上传,另外如果发现有存在侵权的问题,也要及时跟视频作者进行反馈,及时将侵权视频下架。

另外,希望大家能够多多支持技术类的视频和阿婆主,如果觉得不错就不要吝惜手中的硬币,让更多的技术类阿婆主有动力为大家提供更多更好的视频内容。

作者简介:徐麟,某互联网公司数据分析狮,哥大统计狗,喜欢用R&Python玩一些不一样的数据。个人公众号:数据森麟(ID:shujusenlin)。

声明:本文为作者投稿,版权归对方所有。