近在研究公众号的开发,前段时间已经上线了电子书关键词的回复功能,调研过程中发现了 Chatterbot 这个不错的 Python 机器人库,因此找了一篇还不错的实践教程,经作者授权后分享推荐给大家。
看完之后,你应该可以学会如何正确地接入一个 Chatterbot 应用。
另外,周日推的那篇推文我在筛选合作的时候不够谨慎,商家的主体和宣传信息存在很大的误导性,因此我已经删除文章了,在这里跟大家道个歉!也提醒昨天几位购买了产品的同学,建议拒收或者退货处理。抱歉~
EarlGrey
文 | goodspeed
编辑 | EarlGrey
推荐 | 编程派公众号(ID:codingpy)
使用Python实现聊天机器人的方案有多种:AIML、chatterBot以及图灵聊天机器人和微软小冰等。
考虑到以后可能会做一些定制化的需求,这里我选择了chatterBot
(github 项目地址:https://github.com/gunthercox/ChatterBot)。
chatterbot是一款python接口的,基于一系列规则和机器学习算法完成的聊天机器人。具有结构清晰,可扩展性好,简单实用的特点。
chatterBot 的工作流程如图:
输入模块(input adapter)从终端或者API等输入源获取数据
输入源会被指定的逻辑处理模块(logic Adapter)分别处理,逻辑处理模块会匹配训练集中已知的最接近输入数据句子A,然后根据句子A去找到相关度最高的结果B,如果有多个逻辑处理模块返回了不同的结果,会返回一个相关度最高的结果。
输出模块(output adapter)将匹配到的结果返回给终端或者API。
值得一说的是chatterBot 是一个模块化的项目,分为 input Adapter、logic Adapter、storage Adapter、output Adapter以及Trainer 模块。
logic Adapter是一个插件式设计,主进程在启动时会将用户定义的所有逻辑处理插件添加到logic context中,然后交MultiLogicAdapter 进行处理,MultiLogicAdapter 依次调用每个 logic Adapter,logic Adapter 被调用时先执行can_process 方式判断输入是否可以命中这个逻辑处理插件。比如”今天天气怎么样“这样的问题显然需要命中天气逻辑处理插件,这时时间逻辑处理插件的can_process 则会返回False。在命中后logic Adapter 负责计算出对应的回答(Statement对象)以及可信度(confidence),MultiLogicAdapter会取可信度最高的回答,并进入下一步。
下面我们来看下 chatterBot 如何使用
chatterBot 是使用Python编写的,可以使用 pip 安装:
pip install chatterbot
chatterBot 的中文对话要求Python3 以上版本,建议在Python3.x 环境下开发
打开iPython,输入测试一下
In[1]: from chatterbot import ChatBot # import ChatBot
In[2]: momo = ChatBot('Momo', trainer='chatterbot.trainers.ChatterBotCorpusTrainer')
/Users/gs/.virtualenvs/py3/lib/python3.6/site-packages/chatterbot/storage/jsonfile.py:26: UnsuitableForProductionWarning: The JsonFileStorageAdapter is not recommended for production environments.
self.UnsuitableForProductionWarning # 这里storage adapter 默认使用的是 json 格式存储数据的,如果想在服务端部署,应该避免使用这种格式,因为实在是太慢了
In[3]: momo.train("chatterbot.corpus.chinese") # 指定训练集,这里我们使用中文
# 下边是对话结果
In[4]: momo.get_response('你好')
Out[4]: <Statement text:你好>
In[5]: momo.get_response('怎么了')
Out[5]: <Statement text:没什么.>
In[6]: momo.get_response('你知道它的所有内容吗?')
Out[6]: <Statement text:优美胜于丑陋.>
In[7]: momo.get_response('你是一个程序员吗?')
Out[7]: <Statement text:我是个程序员>
In[8]: momo.get_response('你使用什么语言呢?')
Out[8]: <Statement text:我经常使用 Python, Java 和 C++ .>
这时你已经可以和机器人对话了,不过现在由于训练数据太少,机器人只能返回简单的对话。
这里是默认的中文对话训练数据 中文训练数据地址:https://github.com/gunthercox/chatterbot-corpus/tree/master/chatterbot_corpus/data/chinese。
那么我们怎么添加训练数据呢?
chatterBot 内置了training class,自带的方法有两种,一种是使用通过输入list 来训练,比如 ["你好", "我不好"],后者是前者的回答,另一种是通过导入Corpus 格式的文件来训练。也支持自定义的训练模块,不过最终都是转为上述两种类型。
chatterBot 通过调用 train 函数训练,不过在这之前要先用 set_trainer 来进行设置。例如:
In[12]: from chatterbot.trainers import ListTrainer # 导入训练模块的 ListTrainer 类
In[13]: momo.get_response('你叫什么?') # 现在是答非所问,因为在这之前我们并没有训练过
Out[13]: <Statement text:我在烤蛋糕.>
In[14]: momo.set_trainer(ListTrainer) # 指定训练方式
In[15]: momo.train(['你叫什么?', '我叫魔魔!']) # 训练
In[16]: momo.get_response('你叫什么?') # 现在机器人已经可以回答了
Out[16]: <Statement text:我叫魔魔!>
训练好的数据默认存在 ./database.db,这里使用的是 jsondb。
对 chatterBot 的介绍先到这里,具体用法可以参考文档:ChatterBot Tutorial:http://chatterbot.readthedocs.io/en/stable/tutorial.html
接下来,介绍如何在项目中使用 chatterBot。
Sanic 是一个和类Flask 的基于Python3.5+的web框架,它编写的代码速度特别快。
除了像Flask 以外,Sanic 还支持以异步请求的方式处理请求。这意味着你可以使用新的 async/await 语法,编写非阻塞的快速的代码。
这里之所以使用 Sanic 是因为他和Flask 非常像,之前我一直使用Flask,并且它也是专门为Python3.5 写的,使用到了协程。
首先建个项目,这里项目我已经建好了,项目结构如下:
.
├── LICENSE
├── README.md
├── manage.py # 运行文件 启动项目 使用 python manage.py 命令
├── momo
│ ├── __init__.py
│ ├── app.py # 创建app 模块
│ ├── helper.py
│ ├── settings.py # 应用配置
│ └── views
│ ├── __init__.py
│ ├── hello.py # 测试模块
│ └── mweixin.py # 微信消息处理模块
├── requirements.txt
└── supervisord.conf
源码我已经上传到github,有兴趣的可以看一下,也可以直接拉下来测试。项目代码地址
我们先重点看下 hello.py
文件 和helper.py
。
# hello.py
# -*- coding: utf-8 -*-
from sanic import Sanic, Blueprint
from sanic.views import HTTPMethodView
from sanic.response import text
from momo.helper import get_momo_answer # 导入获取机器人回答获取函数
blueprint = Blueprint('index', url_prefix='/')
class ChatBot(HTTPMethodView):
# 聊天机器人 http 请求处理逻辑
async def get(self, request):
ask = request.args.get('ask')
# 先获取url 参数值 如果没有值,返回 '你说啥'
if ask:
answer = get_momo_answer(ask)
return text(answer)
return text('你说啥?')
blueprint.add_route(ChatBot.as_view, '/momo')
# helper.py
from chatterbot import ChatBot
momo_chat = ChatBot(
'Momo',
# 指定存储方式 使用mongodb 存储数据
storage_adapter='chatterbot.storage.MongoDatabaseAdapter',
# 指定 logic adpater 这里我们指定三个
logic_adapters=[
"chatterbot.logic.BestMatch",
"chatterbot.logic.MathematicalEvaluation", # 数学模块
"chatterbot.logic.TimeLogicAdapter", # 时间模块
],
input_adapter='chatterbot.input.VariableInputTypeAdapter',
output_adapter='chatterbot.output.OutputAdapter',
database='chatterbot',
read_only=True
)
def get_momo_answer(content):
# 获取机器人返回结果函数
response = momo_chat.get_response(content)
if isinstance(response, str):
return response
return response.text
运行命令 python manage.py
启动项目。
在浏览器访问url:http://0.0.0.0:8000/momo?ask=你是程序员吗
到这里,我们已经启动了一个web 项目,可以通过访问url 的方式和机器人对话,是时候接入微信公号了!
拥有一个可以使用的微信公众号(订阅号服务号都可以,如果没有,可以使用微信提供的测试账号)
拥有一个外网可以访问的服务器(vps 或公有云都可以 aws 新用户免费使用一年,可以试试)
服务器配置了python3 环境,(建议使用 virtualenvwrapper 配置虚拟环境)
登录微信公众号:https://mp.weixin.qq.com
查看公号开发信息:
设置请求url,这里是你配置的url(需要外网可访问,只能是80或443端口)
填写token和EncodingAESKey,这里我选择的是兼容模式,既有明文方便调试,又有信息加密。
详细配置可以参考官方文档:接入指南
如果你的 服务器地址
已经配置完成,现在点击提交应该就成功了。如果没有成功我们接下来看怎么配置服务器地址。
先看下 微信请求的视图代码:
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from six import StringIO
import re
import xmltodict
from chatterbot.trainers import ListTrainer
from sanic import Blueprint
from sanic.views import HTTPMethodView
from sanic.response import text
from sanic.exceptions import ServerError
from weixin import WeixinMpAPI
from weixin.lib.WXBizMsgCrypt import WXBizMsgCrypt
from momo.settings import Config
blueprint = Blueprint('weixin', url_prefix='/weixin')
class WXRequestView(HTTPMethodView):
def _get_args(self, request):
# 获取微信请求参数,加上token 拼接为完整的请求参数
params = request.raw_args
if not params:
raise ServerError("invalid params", status_code=400)
args = {
'mp_token': Config.WEIXINMP_TOKEN,
'signature': params.get('signature'),
'timestamp': params.get('timestamp'),
'echostr': params.get('echostr'),
'nonce': params.get('nonce'),
}
return args
def get(self, request):
# 微信验证服务器这一步是get 请求,参数可以使用 request.raw_args 获取
args = self._get_args(request)
weixin = WeixinMpAPI(**args) # 这里我使用了 第三方包 python-weixin 可以直接实例化一个WeixinMpAPI对象
if weixin.validate_signature: # 验证参数合法性
# 如果参数争取,我们将微信发过来的echostr参数再返回给微信,否则返回 fail
return text(args.get('echostr') or 'fail')
return text('fail')
blueprint.add_route(WXRequestView.as_view, '/request')
这里处理微信请求我使用的是 我用python 写的 微信SDK python-weixin,可以使用 pip 安装:
pip install python-weixin
这个包最新版本对Python3 加密解密有点问题,可以直接从github 安装:
pip install git+https://github.com/zongxiao/python-weixin.git@py3
然后更新 app.py 文件:
# -*- coding: utf-8 -*-
from sanic import Sanic
from momo.settings import Config
def create_app(register_bp=True, test=False):
# 创建app
app = Sanic(__name__)
if test:
app.config['TESTING'] = True
# 从object 导入配置
app.config.from_object(Config)
register_blueprints(app)
return app
def register_blueprints(app):
from momo.views.hello import blueprint as hello_bp
from momo.views.mweixin import blueprint as wx_bp
app.register_blueprint(hello_bp)
# 注册 wx_bp
app.register_blueprint(wx_bp)
详细代码参考github: 微信聊天机器人 momo
现在我们公号已经接入了自己的服务,是时候接入微信聊天机器人。
微信聊天机器人的工作流程如下:
看我们消息逻辑处理代码:
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from six import StringIO
import re
import xmltodict
from chatterbot.trainers import ListTrainer
from sanic import Blueprint
from sanic.views import HTTPMethodView
from sanic.response import text
from sanic.exceptions import ServerError
from weixin import WeixinMpAPI
from weixin.reply import TextReply
from weixin.response import WXResponse as _WXResponse
from weixin.lib.WXBizMsgCrypt import WXBizMsgCrypt
from momo.settings import Config
from momo.helper import validate_xml, smart_str, get_momo_answer
from momo.media import media_fetch
blueprint = Blueprint('weixin', url_prefix='/weixin')
appid = smart_str(Config.WEIXINMP_APPID)
token = smart_str(Config.WEIXINMP_TOKEN)
encoding_aeskey = smart_str(Config.WEIXINMP_ENCODINGAESKEY)
# 关注后自动返回的文案
AUTO_REPLY_CONTENT = """
Hi,朋友!
这是我妈四月的公号,我是魔魔,我可以陪你聊天呦!
我还能"记账",输入"记账"会有惊喜呦!
<a href=""">历史记录</a>
"""
class ReplyContent(object):
_source = 'value'
def __init__(self, event, keyword, content=None, momo=True):
self.momo = momo
self.event = event
self.content = content
self.keyword = keyword
if self.event == 'scan':
pass
@property
def value(self):
if self.momo:
answer = get_momo_answer(self.content)
return answer
return ''
class WXResponse(_WXResponse):
auto_reply_content = AUTO_REPLY_CONTENT
def _subscribe_event_handler(self):
# 关注公号后的处理逻辑
self.reply_params['content'] = self.auto_reply_content
self.reply = TextReply(**self.reply_params).render
def _unsubscribe_event_handler(self):
# 取关后的处理逻辑,取关我估计会哭吧
pass
def _text_msg_handler(self):
# 文字消息处理逻辑 聊天机器人的主要逻辑
event_key = 'text'
content = self.data.get('Content')
reply_content = ReplyContent('text', event_key, content)
self.reply_params['content'] = reply_content.value
self.reply = TextReply(**self.reply_params).render
class WXRequestView(HTTPMethodView):
def _get_args(self, request):
params = request.raw_args
if not params:
raise ServerError("invalid params", status_code=400)
args = {
'mp_token': Config.WEIXINMP_TOKEN,
'signature': params.get('signature'),
'timestamp': params.get('timestamp'),
'echostr': params.get('echostr'),
'nonce': params.get('nonce'),
}
return args
def get(self, request):
args = self._get_args(request)
weixin = WeixinMpAPI(**args)
if weixin.validate_signature:
return text(args.get('echostr') or 'fail')
return text('fail')
def _get_xml(self, data):
post_str = smart_str(data)
# 验证xml 格式是否正确
validate_xml(StringIO(post_str))
return post_str
def _decrypt_xml(self, params, crypt, xml_str):
# 解密消息
nonce = params.get('nonce')
msg_sign = params.get('msg_signature')
timestamp = params.get('timestamp')
ret, decryp_xml = crypt.DecryptMsg(xml_str, msg_sign,
timestamp, nonce)
return decryp_xml, nonce
def _encryp_xml(self, crypt, to_xml, nonce):
# 加密消息
to_xml = smart_str(to_xml)
ret, encrypt_xml = crypt.EncryptMsg(to_xml, nonce)
return encrypt_xml
def post(self, request):
# 获取微信服务器发送的请求参数
args = self._get_args(request)
weixin = WeixinMpAPI(**args)
if not weixin.validate_signature: # 验证参数合法性
raise AttributeError("Invalid weixin signature")
xml_str = self._get_xml(request.body) # 获取form data
crypt = WXBizMsgCrypt(token, encoding_aeskey, appid)
decryp_xml, nonce = self._decrypt_xml(request.raw_args, crypt, xml_str) # 解密
xml_dict = xmltodict.parse(decryp_xml)
xml = WXResponse(xml_dict) or 'success' # 使用WXResponse 根据消息获取机器人返回值
encryp_xml = self._encryp_xml(crypt, xml, nonce) # 加密消息
return text(encryp_xml or xml) # 回应微信请求
blueprint.add_route(WXRequestView.as_view, '/request')
可以看到,我处理微信请求返回结果比较简单,也是使用的 python-weixin 包封装的接口,主要的处理逻辑是 WXResponse。
这里需要注意的是,如果服务器在5秒内没有响应微信服务器会重试。为了加快响应速度,不要在服务器 将 chatterBot 的 storage adapter 设置为使用 jsondb。
上边这些就是,微信聊天机器人的主要处理逻辑,我们运行服务,示例如下:
可以看到这里聊天机器人也可以做简单的数学运算和报时,是因为我在上边指定处理逻辑的时候添加了数学模块和时间模块:
momo_chat = ChatBot(
'Momo',
# 指定存储方式 使用mongodb 存储数据
storage_adapter='chatterbot.storage.MongoDatabaseAdapter',
# 指定 logic adpater 这里我们指定三个
logic_adapters=[
"chatterbot.logic.BestMatch",
"chatterbot.logic.MathematicalEvaluation", # 数学模块
"chatterbot.logic.TimeLogicAdapter", # 时间模块
],
input_adapter='chatterbot.input.VariableInputTypeAdapter',
output_adapter='chatterbot.output.OutputAdapter',
database='chatterbot',
read_only=True
)
到这里,微信机器人的搭建就完成了,详细代码已经长传到了 github: https://github.com/gusibi/momo/tree/chatterbot,感兴趣的可以参考一下。
ChatterBot 项目地址:https://github.com/gunthercox/ChatterBot
ChatterBot Tutorial:http://chatterbot.readthedocs.io/en/stable/tutorial.html
用Python快速实现一个聊天机器人:http://www.jianshu.com/p/d1333fde266f
基于Python-ChatterBot搭建不同adapter的聊天机器人:https://ask.hellobi.com/blog/guodongwei1991/7626
擁有自動學習的 Python 機器人 - ChatterBot:https://kantai235.github.io/2017/03/16/ChatterBotTeaching/
使用 ChatterBot构建聊天机器人:https://www.biaodianfu.com/chatterbot.html
python-weixin sdk: https://github.com/gusibi/python-weixin
回复下方「关键词」,获取优质资源
回复关键词「 pybook03」,立即获取主页君与小伙伴一起翻译的《Think Python 2e》电子版
回复关键词「书单02」,立即获取主页君整理的 10 本 Python 入门书的电子版
T之家 4 月 25 日消息,在 Windows 98 的老旧系统上运行一个现代的应用,近日一个开发者就做到了这一点,他把大火的 ChatGPT 聊天机器人移植到了 Java 平台上,让它能够在几乎任何 Windows 版本上运行。他把这个应用叫做 JavaGPT,并且在 GitHub 上公开了源代码。
ChatGPT 是一个基于 OpenAI 和微软的人工智能技术开发的聊天机器人,可以与用户进行自然而有趣的对话。JavaGPT 是一个基于 Java 8 版本开发的应用,它可以让用户通过一个友好的图形界面来访问 ChatGPT 和其所有功能。据开发者“FrankCYB”介绍,JavaGPT 有以下几个特点:
聊天流式化:让回复实时生成,就像在 ChatGPT 网站上一样
聊天历史:让用户可以像在网站上一样与之前的聊天互动
撤回聊天:让用户可以撤销之前的提问和回复
HTML 查看器:让用户可以以 HTML 格式查看聊天内容,支持 Markdown 语法
聊天标题:根据聊天内容自动生成一个标题,也可以手动修改
导入预设提问
保存聊天到文件
暗黑模式和右键复制编辑粘贴功能
支持 ChatGPT 4 和所有 ChatGPT 3.5 模型
跨平台
只有 6mb 的文件大小
由于 JavaGPT 是一个开源项目,任何人都可以下载并查看它的代码。不过需要注意的是,由于这个应用是基于最新的 Java 8 版本开发的,所以比 Windows 98 更旧的系统,包括 Windows 95,目前都无法使用 JavaGPT。另外IT之家提醒,这毕竟是一个第三方应用,没有得到 OpenAI 或微软的认可或支持,所以使用者需要自行承担风险。
来看一下最终的效果吧
开始聊天,输入消息并点击发送消息就可以开始聊天了
点击 “获取后端数据”开启实时推送
Channels是一个采用Django并将其功能扩展到HTTP以外的项目,以处理WebSocket,聊天协议,IoT协议等。它基于称为ASGI的Python规范构建。
它以Django的核心为基础,并在其下面分层了一个完全异步的层,以同步模式运行Django本身,但异步处理了连接和套接字,并提供了以两种方式编写的选择,从而实现了这一点。
ASGI 由 Django 团队提出,为了解决在一个网络框架里(如 Django)同时处理 HTTP、HTTP2、WebSocket 协议。为此,Django 团队开发了 Django Channels 插件,为 Django 带来了 ASGI 能力。
在 ASGI 中,将一个网络请求划分成三个处理层面,最前面的一层,interface server(协议处理服务器),负责对请求协议进行解析,并将不同的协议分发到不同的 Channel(频道);频道属于第二层,通常可以是一个队列系统。频道绑定了第三层的 Consumer(消费者)。
下边来说一下具体的实现步骤
pip3 install channels
pip3 install channels_redis
django-admin startproject mysite
python3 manage.py startapp chat
#注册应用
INSTALLED_APPS = [
....
'chat.apps.ChatConfig',
"channels",
]
# 在文件尾部新增如下配置
#将ASGI_APPLICATION设置设置为指向该路由对象作为您的根应用程序:
ASGI_APPLICATION = 'mysite.routing.application'
#配置Redis
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('10.0.6.29', 6379)],
},
},
}
在chat目录中创建一个templates目录。在您刚刚创建的templates 目录中,创建另一个名为的目录 chat,并在其中创建一个名为的文件index.html以保存索引视图的模板
将以下代码放入chat/templates/chat/index.html
<!-- chat/templates/chat/index.html -->
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8"/>
<title>Chat Rooms</title>
</head>
<body>
What chat room would you like to enter?<br>
<input id="room-name-input" type="text" size="100"><br>
<input id="room-name-submit" type="button" value="Enter">
<script>
document.querySelector('#room-name-input').focus();
document.querySelector('#room-name-input').onkeyup = function(e) {
if (e.keyCode === 13) { // enter, return
document.querySelector('#room-name-submit').click();
}
};
document.querySelector('#room-name-submit').onclick = function(e) {
var roomName = document.querySelector('#room-name-input').value;
window.location.pathname = '/chat/' + roomName + '/';
};
</script>
</body>
</html>
chat/templates/chat/room.html
<!DOCTYPE html>
<html>
<head>
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.5.0/jquery.min.js" type="text/javascript"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css">
<script src="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/js/bootstrap.min.js"></script>
<meta charset="utf-8"/>
<title>Chat Room</title>
</head>
<body>
<textarea id="chat-log" cols="150" rows="30" class="text"></textarea><br>
<input id="chat-message-input" type="text" size="150"><br>
<input id="chat-message-submit" type="button" value="发送消息" class="input-sm">
<button id="get_data" class="btn btn-success">获取后端数据</button>
{{ room_name|json_script:"room-name" }}
<script>
$("#get_data").click(function () {
$.ajax({
url: "{% url 'push' %}",
type: "GET",
data: {
"room": "{{ room_name }}",
"csrfmiddlewaretoken": "{{ csrf_token }}"
},
})
});
const roomName = JSON.parse(document.getElementById('room-name').textContent);
const chatSocket = new WebSocket(
'ws://' + window.location.host
+ '/ws/chat/'
+ roomName + '/'
);
let chatSocketa = new WebSocket(
"ws://" + window.location.host + "/ws/push/" + roomName
);
chatSocket.onmessage = function (e) {
const data = JSON.parse(e.data);
// data 为收到后端发来的数据
//console.log(data);
document.querySelector('#chat-log').value += (data.message + '\n');
};
chatSocketa.onmessage = function (e) {
let data = JSON.parse(e.data);
//let message = data["message"];
document.querySelector("#chat-log").value += (data.message + "\n");
};
chatSocket.onclose = function (e) {
console.error('Chat socket closed unexpectedly');
};
chatSocketa.onclose = function (e) {
console.error("Chat socket closed unexpectedly");
};
document.querySelector('#chat-message-input').focus();
document.querySelector('#chat-message-input').onkeyup = function (e) {
if (e.keyCode === 13) { // enter, return
document.querySelector('#chat-message-submit').click();
}
};
document.querySelector('#chat-message-submit').onclick = function (e) {
const messageInputDom = document.querySelector('#chat-message-input');
const message = messageInputDom.value;
chatSocket.send(JSON.stringify({
'message': message
}));
messageInputDom.value = '';
};
</script>
</body>
</html>
将以下代码放入chat/views.py
# chat/views.py
from django.shortcuts import render
from django.http import JsonResponse
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
def index(request):
return render(request, "chat/index.html")
def room(request, room_name):
return render(request, "chat/room.html", {"room_name": room_name})
def pushRedis(request):
room = request.GET.get("room")
print(room)
def push(msg):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
room,
{"type": "push.message", "message": msg, "room_name": room}
)
push("推送测试", )
return JsonResponse({"1": 1})
在chat目录下创建一个名为的文件urls.py
# mysite/chat/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('', views.index, name='index'), path('<str:room_name>/', views.room, name='room'),
]
# mysite/urls.py
from django.contrib import admin
from django.urls import path, include
from chat.views import pushRedis
urlpatterns = [
path('admin/', admin.site.urls),
path("chat/", include("chat.urls")),
path("push", pushRedis, name="push"),
]
文件chat/consumers.py
当Django接受HTTP请求时,它会查询根URLconf来查找视图函数,然后调用该视图函数来处理该请求。同样,当Channels接受WebSocket连接时,它会查询根路由配置以查找使用者,然后在使用者上调用各种功能来处理来自连接的事件。
import time
import json
from channels.generic.websocket import WebsocketConsumer, AsyncWebsocketConsumer
from asgiref.sync import async_to_sync
import redis
pool = redis.ConnectionPool(
host="10.0.6.29",
port=6379,
max_connections=10,
decode_response=True,
)
conn = redis.Redis(connection_pool=pool, decode_responses=True)
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self, ):
self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
self.room_group_name = "chat_%s" % self.room_name
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name,
)
await self.accept()
async def disconnect(self, close_code):
print("close_code: ", close_code)
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data=None, bytes_data=None):
text_data_json = json.loads(text_data)
message = text_data_json["message"]
print("receive_message:", message)
await self.channel_layer.group_send(
self.room_group_name,
{
"type": "chat_message",
"message": message
}
)
async def chat_message(self, event):
receive_message = event["message"]
response_message = "You message is :" + receive_message
await self.send(text_data=json.dumps({
"message": response_message
}))
class PushMessage(WebsocketConsumer):
def connect(self):
self.room_group_name = self.scope["url_route"]["kwargs"]["room_name"]
async_to_sync(self.channel_layer.group_add)(
self.room_group_name,
self.channel_name
)
self.accept()
def disconnect(self, code):
async_to_sync(self.channel_layer.group_discard)(
self.room_group_name,
self.channel_name
)
def push_message(self, event):
"""
主动推送
:param event:
:return:
"""
print(event, type(event))
while True:
time.sleep(2)
msg = time.strftime("%Y-%m-%d %H:%M:%S") + "--- room_name: %s" % event["room_name"]
self.send(text_data=json.dumps(
{"message": msg}
))
在chat目录下创建一个名为的文件routing.py
# mysite/chat/routing.py
from django.urls import re_path, path
from . import consumers
websocket_urlpatterns = [
re_path(r"ws/chat/(?P<room_name>\w+)/$", consumers.ChatConsumer),
path("ws/push/<room_name>", consumers.PushMessage),
]
与setting同级目录新建ws根路由文件 routing.py
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import chat.routing
application = ProtocolTypeRouter({
"websocket": AuthMiddlewareStack(
URLRouter(
chat.routing.websocket_urlpatterns
)
),
})
python3 manage.py runserver 10.0.6.2:80
注意看,这和django是不一样的
还有另一种更稳健的启动方式
和setting同级新增文件 asgi.py
import os
import django
from channels.routing import get_default_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
django.setup()
application = get_default_application()
启动方式为:
daphne -b 10.0.6.2 -p 80 mysite.asgi:application
daphne 在安装channel时已经自动安装好了
文章到此也就结束了,希望大家能够喜欢,喜欢的关注一下小编,之后将继续与大家分享知识!别忘了点赞收藏奥,最后感谢大家的阅读!
*请认真填写需求信息,我们会在24小时内与您取得联系。