整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:

Serverless 实战:3 分钟实现文本敏感词过

Serverless 实战:3 分钟实现文本敏感词过滤

感词过滤是随着互联网社区一起发展起来的一种阻止网络犯罪和网络暴力的技术手段,通过对可能存在犯罪或网络暴力的关键词进行有针对性的筛查和屏蔽,能够防患于未然,将后果严重的犯罪行为扼杀于萌芽之中。

随着各种社交论坛的日益火爆,敏感词过滤逐渐成为了非常重要的功能。那么在 Serverless 架构下,利用 Python 语言,敏感词过滤又有那些新的实现呢?我们能否用最简单的方法实现一个敏感词过滤的 API 呢?

了解敏感过滤的几种方法

Replace方法

如果说敏感词过滤,其实不如说是文本的替换,以Python为例,说到词汇替换,不得不想到replace,我们可以准备一个敏感词库,然后通过replace进行敏感词替换:

def worldFilter(keywords, text):
    for eve in keywords:
        text=text.replace(eve, "***")
    return text
keywords=("关键词1", "关键词2", "关键词3")
content="这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。"
print(worldFilter(keywords, content))

但是动动脑大家就会发现,这种做法在文本和敏感词库非常庞大的前提下,会有很严重的性能问题。例如我将代码进行修改,进行基本的性能测试:

import time

def worldFilter(keywords, text):
    for eve in keywords:
        text=text.replace(eve, "***")
    return text
keywords=[ "关键词" + str(i) for i in range(0,10000)]
content="这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。" * 1000
startTime=time.time()
worldFilter(keywords, content)
print(time.time()-startTime)

此时的输出结果是:0.12426114082336426,可以看到性能非常差。

正则表达方法

与其用replace,还不如通过正则表达re.sub来的更加快速。

import time
import re
def worldFilter(keywords, text):
     return re.sub("|".join(keywords), "***", text)
keywords=[ "关键词" + str(i) for i in range(0,10000)]
content="这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。" * 1000
startTime=time.time()
worldFilter(keywords, content)
print(time.time()-startTime)

我们同样增加性能测试,按照上面的方法进行改造测试,输出结果是0.24773502349853516。通过这样的例子,我们可以发现,其性能磣韩剧并不大,但是实际上随着文本量增加,正则表达这种做法在性能层面会变高很多。

DFA过滤敏感词

这种方法相对来说效率会更高一些。例如,我们认为坏人,坏孩子,坏蛋是敏感词,则他们的树关系可以表达:

用DFA字典来表示:

{
    '坏': {
        '蛋': {
            '\x00': 0
        }, 
        '人': {
            '\x00': 0
        }, 
        '孩': {
            '子': {
                '\x00': 0
            }
        }
    }
}

使用这种树表示问题最大的好处就是可以降低检索次数,提高检索效率,基本代码实现:

import time

class DFAFilter(object):
    def __init__(self):
        self.keyword_chains={}  # 关键词链表
        self.delimit='\x00'  # 限定

    def parse(self, path):
        with open(path, encoding='utf-8') as f:
            for keyword in f:
                chars=str(keyword).strip().lower()  # 关键词英文变为小写
                if not chars:  # 如果关键词为空直接返回
                    return
                level=self.keyword_chains
                for i in range(len(chars)):
                    if chars[i] in level:
                        level=level[chars[i]]
                    else:
                        if not isinstance(level, dict):
                            break
                        for j in range(i, len(chars)):
                            level[chars[j]]={}
                            last_level, last_char=level, chars[j]
                            level=level[chars[j]]
                        last_level[last_char]={self.delimit: 0}
                        break
                if i==len(chars) - 1:
                    level[self.delimit]=0

    def filter(self, message, repl="*"):
        message=message.lower()
        ret=[]
        start=0
        while start < len(message):
            level=self.keyword_chains
            step_ins=0
            for char in message[start:]:
                if char in level:
                    step_ins +=1
                    if self.delimit not in level[char]:
                        level=level[char]
                    else:
                        ret.append(repl * step_ins)
                        start +=step_ins - 1
                        break
                else:
                    ret.append(message[start])
                    break
            else:
                ret.append(message[start])
            start +=1

        return ''.join(ret)



gfw=DFAFilter()
gfw.parse( "./sensitive_words")
content="这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。" * 1000
startTime=time.time()
result=gfw.filter(content)
print(time.time()-startTime)

这里我们的字典库是:

with open("./sensitive_words", 'w') as f:
    f.write("\n".join( [ "关键词" + str(i) for i in range(0,10000)]))

执行结果:

0.06450581550598145

可以看到性能进一步提升。

AC自动机过滤敏感词算法

接下来,我们来看一下 AC自动机过滤敏感词算法:

AC自动机:一个常见的例子就是给出n个单词,再给出一段包含m个字符的文章,让你找出有多少个单词在文章里出现过。

简单地讲,AC自动机就是字典树+kmp算法+失配指针

代码实现:

import time
class Node(object):
    def __init__(self):
        self.next={}
        self.fail=None
        self.isWord=False
        self.word=""


class AcAutomation(object):

    def __init__(self):
        self.root=Node()

    # 查找敏感词函数
    def search(self, content):
        p=self.root
        result=[]
        currentposition=0

        while currentposition < len(content):
            word=content[currentposition]
            while word in p.next==False and p !=self.root:
                p=p.fail

            if word in p.next:
                p=p.next[word]
            else:
                p=self.root

            if p.isWord:
                result.append(p.word)
                p=self.root
            currentposition +=1
        return result

    # 加载敏感词库函数
    def parse(self, path):
        with open(path, encoding='utf-8') as f:
            for keyword in f:
                temp_root=self.root
                for char in str(keyword).strip():
                    if char not in temp_root.next:
                        temp_root.next[char]=Node()
                    temp_root=temp_root.next[char]
                temp_root.isWord=True
                temp_root.word=str(keyword).strip()

    # 敏感词替换函数
    def wordsFilter(self, text):
        """
        :param ah: AC自动机
        :param text: 文本
        :return: 过滤敏感词之后的文本
        """
        result=list(set(self.search(text)))
        for x in result:
            m=text.replace(x, '*' * len(x))
            text=m
        return text


acAutomation=AcAutomation()
acAutomation.parse('./sensitive_words')
startTime=time.time()
print(acAutomation.wordsFilter("这是一个关键词替换的例子,这里涉及到了关键词1还有关键词2,最后还会有关键词3。"*1000))
print(time.time()-startTime)

词库同样是:

with open("./sensitive_words", 'w') as f:
    f.write("\n".join( [ "关键词" + str(i) for i in range(0,10000)]))

使用上面的方法,测试结果为0.017391204833984375。

敏感词过滤方法小结

可以看到这个所有算法中,在上述的基本算法中DFA过滤敏感词性能最高,但是实际上,对于后两者算法,并没有谁一定更好,可能某些时候,AC自动机过滤敏感词算法会得到更高的性能,所以在生产生活中,推荐时候用两者,可以根据自己的具体业务需要来做。

实现敏感词过滤API

将代码部署到Serverless架构上,可以选择API网关与函数计算进行结合,以AC自动机过滤敏感词算法为例:我们只需要增加是几行代码就好,完整代码如下:

# -*- coding:utf-8 -*-

import json, uuid


class Node(object):
    def __init__(self):
        self.next={}
        self.fail=None
        self.isWord=False
        self.word=""


class AcAutomation(object):

    def __init__(self):
        self.root=Node()

    # 查找敏感词函数
    def search(self, content):
        p=self.root
        result=[]
        currentposition=0

        while currentposition < len(content):
            word=content[currentposition]
            while word in p.next==False and p !=self.root:
                p=p.fail

            if word in p.next:
                p=p.next[word]
            else:
                p=self.root

            if p.isWord:
                result.append(p.word)
                p=self.root
            currentposition +=1
        return result

    # 加载敏感词库函数
    def parse(self, path):
        with open(path, encoding='utf-8') as f:
            for keyword in f:
                temp_root=self.root
                for char in str(keyword).strip():
                    if char not in temp_root.next:
                        temp_root.next[char]=Node()
                    temp_root=temp_root.next[char]
                temp_root.isWord=True
                temp_root.word=str(keyword).strip()

    # 敏感词替换函数
    def wordsFilter(self, text):
        """
        :param ah: AC自动机
        :param text: 文本
        :return: 过滤敏感词之后的文本
        """
        result=list(set(self.search(text)))
        for x in result:
            m=text.replace(x, '*' * len(x))
            text=m
        return text


def response(msg, error=False):
    return_data={
        "uuid": str(uuid.uuid1()),
        "error": error,
        "message": msg
    }
    print(return_data)
    return return_data


acAutomation=AcAutomation()
path='./sensitive_words'
acAutomation.parse(path)


def main_handler(event, context):
    try:
        sourceContent=json.loads(event["body"])["content"]
        return response({
            "sourceContent": sourceContent,
            "filtedContent": acAutomation.wordsFilter(sourceContent)
        })
    except Exception as e:
        return response(str(e), True)

最后,为了方便本地测试,我们可以增加:

def test():
    event={
        "requestContext": {
            "serviceId": "service-f94sy04v",
            "path": "/test/{path}",
            "httpMethod": "POST",
            "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef",
            "identity": {
                "secretId": "abdcdxxxxxxxsdfs"
            },
            "sourceIp": "14.17.22.34",
            "stage": "release"
        },
        "headers": {
            "Accept-Language": "en-US,en,cn",
            "Accept": "text/html,application/xml,application/json",
            "Host": "service-3ei3tii4-251000691.ap-guangzhou.apigateway.myqloud.com",
            "User-Agent": "User Agent String"
        },
        "body": "{\"content\":\"这是一个测试的文本,我也就呵呵了\"}",
        "pathParameters": {
            "path": "value"
        },
        "queryStringParameters": {
            "foo": "bar"
        },
        "headerParameters": {
            "Refer": "10.0.2.14"
        },
        "stageVariables": {
            "stage": "release"
        },
        "path": "/test/value",
        "queryString": {
            "foo": "bar",
            "bob": "alice"
        },
        "httpMethod": "POST"
    }
    print(main_handler(event, None))


if __name__=="__main__":
    test()

完成之后,我们就可以测试运行一下,例如我的字典是:

呵呵
测试

执行之后结果:

{'uuid': '9961ae2a-5cfc-11ea-a7c2-acde48001122', 'error': False, 'message': {'sourceContent': '这是一个测试的文本,我也就呵呵了', 'filtedContent': '这是一个**的文本,我也就**了'}}

接下来,我们将代码部署到云端,新建serverless.yaml:

sensitive_word_filtering:
  component: "@serverless/tencent-scf"
  inputs:
    name: sensitive_word_filtering
    codeUri: ./
    exclude:
      - .gitignore
      - .git/**
      - .serverless
      - .env
    handler: index.main_handler
    runtime: Python3.6
    region: ap-beijing
    description: 敏感词过滤
    memorySize: 64
    timeout: 2
    events:
      - apigw:
          name: serverless
          parameters:
            environment: release
            endpoints:
              - path: /sensitive_word_filtering
                description: 敏感词过滤
                method: POST
                enableCORS: true
                param:
                  - name: content
                    position: BODY
                    required: 'FALSE'
                    type: string
                    desc: 待过滤的句子

然后通过sls --debug进行部署,部署结果:

最后,通过PostMan进行测试:

总结

敏感词过滤是目前非常常见的需求/技术,通过敏感词过滤,我们可以在一定程度上降低恶意言语或者违规言论的出现,在上述实践过程,有以下两点内容:

  • 对于敏感词库额获得问题:Github上有很多,可以自行搜索下载,因为敏感词词库里面有很多敏感词,所以我也不能直接放在这个上面供大家使用,所以还需要大家自行在Github上搜索使用;
  • 这个API使用场景的问题:完全可以放在我们的社区跟帖系统/留言评论系统/博客发布系统中,防止出现敏感词汇,可以降低不必要的麻烦出现。

Serverless Framework 试用计划

我们诚邀您来体验最便捷的 Serverless 开发和部署方式。在试用期内,相关联的产品及服务均提供免费资源和专业的技术支持,帮助您的业务快速、便捷地实现 Serverless!

Serverless 极速部署,只需三步

Serverless Framework 是构建和运维 Serverless 应用的框架。简单三步,即可通过 Serverless Framework 快速实现服务部署。

1. 安装 Serverless

macOS/Linux 系统:推荐使用二进制安装

$ curl -o- -L https://slss.io/install | bash

Windows 系统:可通过 npm 安装

$ npm install -g serverless

2. 创建云上应用

在空文件夹下输入 `serverless` 命令

$ serverless

访问命令行中输出的链接,即可访问成功部署后的应用。

3. 查看部署信息

进入到部署成功的文件夹,运行如下命令,查看部署状态和资源信息:

$ sls info

用户在 HTML 表单中填写并提交数据时,可以使用 PHP 来接收并处理这些数据。要实现这一点,需要创建一个 PHP 脚本来处理提交的数据,然后将 HTML 表单的 "action" 属性设置为该脚本的文件路径。表单提交的数据需要进行验证和过滤,以确保数据的完整性和安全性。可以使用条件语句、正则表达式、过滤器函数等方法来验证和过滤数据,并使用 htmlspecialchars() 函数转义 HTML 标记,以防止 XSS 攻击。

以下是一个简单的示例:

HTML 表单代码:

<form action="submit.php" method="post">

<label for="name">Name:</label>

<input type="text" id="name" name="name">

<label for="email">Email:</label>

<input type="email" id="email" name="email">

<button type="submit">Submit</button>

</form>

PHP 代码(submit.php):

<?php

// 获取表单提交的数据

$name=$_POST['name'];

$email=$_POST['email'];

// 在这里进行处理,例如将数据存储到数据库中

// ...

// 返回一个响应,告诉用户数据已经被成功提交

echo "Thank you for submitting the form, $name!";

?>

在上面的示例中,表单的 "action" 属性设置为 "submit.php",这意味着提交表单时,数据将被发送到 submit.php 文件中的 PHP 代码中进行处理。PHP 代码使用 $_POST 数组来获取表单提交的数据,然后进行处理,例如将数据存储到数据库中。最后,PHP 代码返回一个响应,告诉用户数据已经被成功提交。在处理表单数据时,一定要对用户输入进行验证和过滤,以防止安全漏洞。

需要对表单提交的数据进行验证和过滤,以确保数据的完整性和安全性。以下是一些常见的方法:

1、验证表单字段:在 PHP 代码中使用条件语句和正则表达式等方法来验证表单字段的有效性,例如验证电子邮件地址的格式是否正确。

$email=$_POST['email'];

if (!filter_var($email, FILTER_VALIDATE_EMAIL)) {

// 如果邮件地址格式不正确,则显示错误消息

echo "Invalid email address";

}

2、过滤输入数据:使用 PHP 中的过滤器函数来过滤表单输入数据,以防止 XSS 攻击和 SQL 注入等安全漏洞。

$name=$_POST['name'];

$name=filter_var($name, FILTER_SANITIZE_STRING); // 过滤特殊字符和标签

3、防止跨站脚本攻击(XSS):在 PHP 代码中使用 htmlspecialchars() 函数来转义 HTML 标记,防止恶意脚本注入到页面中。

$name=$_POST['name'];

$name=htmlspecialchars($name, ENT_QUOTES, 'UTF-8'); // 转义 HTML 标记

4、防止 SQL 注入攻击:在 PHP 代码中使用参数化查询或准备语句来执行数据库操作,以防止恶意 SQL 语句注入到数据库中。

$stmt=$pdo->prepare("INSERT INTO users (name, email) VALUES (:name, :email)");

$stmt->bindParam(':name', $name);

$stmt->bindParam(':email', $email);

$stmt->execute();

通过这些方法,可以确保表单提交的数据是安全和有效的,并且能够正常地处理和存储到数据库中。

符实体

一些字符在 HTML 中拥有特殊的含义,比如小于号 (<) 用于定义 HTML 标签的开始,所以有时候直接在页面中书写的话,会产生意想不到的结果。如果我们希望浏览器正确地显示这些字符,我们必须在 HTML 源码中插入字符实体。

字符实体有三部分:一个和号 (&),一个实体名称或者 # 和一个实体编号,以及一个分号 (;)。

要在 HTML 文档中显示小于号,我们需要这样写:< 或者 <

使用实体名称而不是实体编号的好处在于,名称相对来说更容易记忆。而这么做的坏处是,并不是所有的浏览器都支持最新的实体名称,然而几乎所有的浏览器对实体编号的支持都很好。

注意:实体对大小写敏感。


空格

空格是 HTML 中最普通的字符实体。

通常情况下,HTML 会裁掉文档中的空格。假如你在文档中连续输入 10 个空格,那么 HTML 会去掉其中的9个而只显示1个。如果使用 就可以在文档中增加空格。

以下就罗列下html页面能使用到的字符实体


最常用的字符实体

显示结果

描述

实体名称

实体编号


空格

 

<

小于号

<

<

>

大于号

>

>

&

和号

&

&

"

引号

"

"

'

撇号

' (IE不支持)

'


其他一些常用的字符实体

显示结果

描述

实体名称

实体编号

¢

¢

£

£

日圆

¥

¥

§

§

§

?

版权

©

©

?

注册商标

®

®

×

乘号

×

×

÷

除号

÷

÷