使用 django 和 Vue 构建一个基于 Web 的聊天应用程序。但我们在扩展应用程序方面面临着一个重大问题。
WebSockets
从上一个教程开始,我们对 WebSockets 进行了一些初步了解。我们知道它们是双向连接,保持打开状态并允许服务器与客户端即时通信,反之亦然。
Django 本身是在互联网历史的不同时期开发的。那时的网站/Web 应用程序并不像今天那么复杂。基本上是“嘿,我想要 2005 年 1 月 1 日的文章”,然后服务器会说“别担心,兄弟,我找到你了”,并做了一些获取文章的工作,然后回复“这是您要求的文章”然后它返回睡眠状态或处理其他用户(关闭套接字)。如果不提出要求,您就无法收到信息。
客户请求,服务器回复,就这么简单。
但是聊天却不一样。
对于 Django 和许多 Python Web 框架来说,这确实是一个问题,因为控制 Python 应用程序如何运行的底层协议 ( ) 与这种通信模式WSGI
相关联。request-response
很多人从不同的角度来解决这个问题
- 通过新的框架/网络服务器(例如 Twisted、Tornado)。
- 带有 websocket 服务器的异步引擎(例如 gevent)
- 通过添加对现有 WSGI 服务器的支持 (
uWSGI
)
WSGI
直接修改 django 会很困难,因为由于 django 的同步特性和协议本身,它需要对 django 的核心进行一些巨大的改变。
https://googleads.g.doubleclick.net/pagead/ads?client=ca-pub-8618431079416074&output=html&h=200&slotname=8239403128&adk=2243974991&adf=557184702&pi=t.ma~as.8239403128&w=867&fwrn=4&lmt=1616559018&rafmt=11&format=867×200&url=https%3A%2F%2Fdanidee10.github.io%2F2018%2F01%2F13%2Frealtime-django-5.html&wgl=1&uach=WyJXaW5kb3dzIiwiMTUuMC4wIiwieDg2IiwiIiwiMTE4LjAuNTk5My4xMjAiLG51bGwsMCxudWxsLCI2NCIsW1siQ2hyb21pdW0iLCIxMTguMC41OTkzLjEyMCJdLFsiR29vZ2xlIENocm9tZSIsIjExOC4wLjU5OTMuMTIwIl0sWyJOb3Q9QT9CcmFuZCIsIjk5LjAuMC4wIl1dLDBd&dt=1699283052987&bpp=1&bdt=65&idt=68&shv=r20231101&mjsv=m202311010101&ptt=9&saldr=aa&abxe=1&prev_fmts=0x0%2C922x280&nras=1&correlator=7869224299760&frm=20&pv=1&ga_vid=1624410613.1699282327&ga_sid=1699283053&ga_hid=883785894&ga_fc=1&rplot=4&u_tz=480&u_his=6&u_h=720&u_w=1280&u_ah=672&u_aw=1280&u_cd=24&u_sd=1.5&dmc=8&adx=198&ady=1414&biw=1263&bih=595&scr_x=0&scr_y=0&eid=44759875%2C44759926%2C44759837%2C44795921%2C44807048%2C44807335%2C44807454%2C44807460%2C31078297%2C31079356%2C44807406%2C44807753%2C31078663%2C31078665%2C31078668%2C31078670&oid=2&pvsid=830248550329060&tmod=2054120635&uas=0&nvt=1&ref=https%3A%2F%2Fdanidee10.github.io%2F2018%2F01%2F10%2Frealtime-django-4.html&fc=1920&brdim=0%2C0%2C0%2C0%2C1280%2C0%2C1280%2C672%2C1280%2C595&vis=1&rsz=%7C%7CpEebr%7C&abl=CS&pfx=0&fu=128&bc=31&td=1&nt=1&ifi=3&uci=a!3&btvi=1&fsb=1&xpc=laXsvk1mJ5&p=https%3A//danidee10.github.io&dtd=71
Django 频道
Andrew Godwin 将 websockets 引入 django 原生django-channels
。在编写本教程时,它目前是 django 软件基金会的一个官方项目。这意味着它不会很快消失。
django-channels
引入了一个名为的新协议,ASGI
该协议与WSGI
. django-channels 带有它自己的 Web 服务器,称为Daphne
. Daphne
可以处理常规的http连接和WebSocket连接。
如果您决定使用,django-channels
您必须学习它的 API 和方法,您还需要更改您的部署过程。
要水平扩展到多台机器,您仍然需要使用所谓的django-channels
通道层。推荐层是Redis
层。还有一个RabbitMQ
通道层和一个IPC(进程间通信)层。这些 Channel 层是 django 和服务器之间的粘合剂Daphne
。和特别用于水平缩放通道Redis
。RabbitMQ
IPC通道层速度更快,但它只适合单个服务器,因为所有进程都使用共享内存进行通信。
使用 Redis 通道也有一些缺点。Redis本身不支持 TLS ,并且与 RabbitMQ 相比,它对持久队列的支持也不是那么好。
此外,由于 ASGI 规范,django-channels
模拟Pub/Sub
(它实际上并不使用 Redis 或 RabbitMQ 的 Pub/Sub 功能),如果您需要直接在通道上侦听,这并不是很好。
最终,我们将构建一个与django-channels
. 更像是“穷人”,django-channels
但水平较低。我们将直接从队列中读取RabbitMQ
。(django-channels
将其抽象为Groups
)。uWSGI
将扮演与服务器类似的角色Daphne
。
不同之处在于我们的方法并不将我们限制为单个 WebSocket 服务器,例如django-channels
. 您可以轻松地uWSGI
用另一个 WebSocket 服务器替换,只需最少的工作。
目前正在计划 django-channels 支持除 Daphne 之外的其他“接口”。
uWSGI WebSocket
unbit(uWSGI 的开发者)采取了不同的方法,他们决定将 WebSockets 集成到 uWSGI Core 本身中。uWSGI 是一个非常高性能的 Python WSGI Web 服务器。它可以说是最流行的 python WSGI 服务器。它还支持多种编程语言,例如Perl
,Ruby
甚至Go
.
如果您当前uWSGI
在堆栈中使用并且需要 WebSocket,则无需进行任何更改。即使您使用不同的WSGI
服务器,就像gunicorn
您只需要pip install uwsgi
它一样简单。
如果您还记得我们在第 3 部分前面关于 RabbitMQ 的讨论。uWSGI
记住我告诉过你 RabbitMQ 是和之间的粘合剂django
。
我们需要创建通知并将其放入 RabbitMQ 队列中,然后通过 websockets 可以将这些消息直接广播给多个用户。
为了简化创建通知并将其发送到 RabbitMQ 的过程,我创建了一个名为django-notifs的第三方 django 库。
它可以在 Pypi 上使用。
pip install django-notifs
确保添加到您的INSTALLED_APPS
:
INSTALLED_APPS = (
'django.contrib.auth',
...,
'rest_framework',
'rest_framework.authtoken',
'djoser',
# Our apps
'chat',
'notifications'
)
安装时django-notifs
还会安装pika
一个用于连接 RabbitMQ 的 python 库。
运行迁移python manage.py migrate
最后安装RabbitMQ
不同操作系统的说明有所不同,因此请参阅安装指南以获取适合您的操作系统的安装说明。
在继续之前,请确保RabbitMQ
服务器正在运行,如果没有运行,当您尝试从pika
.
打开views.py
并更新ChatSessionMessageView
视图:
from notifications.signals import notify
class ChatSessionMessageView(APIView):
...
def post(self, request, *args, **kwargs):
"""create a new message in a chat session."""
uri = kwargs['uri']
message = request.data['message']
user = request.user
chat_session = ChatSession.objects.get(uri=uri)
chat_session_message = ChatSessionMessage.objects.create(
user=user, chat_session=chat_session, message=message
)
notif_args = {
'source': user,
'source_display_name': user.get_full_name(),
'category': 'chat', 'action': 'Sent',
'obj': chat_session_message.id,
'short_description': 'You a new message', 'silent': True,
'extra_data': {'uri': chat_session.uri}
}
notify.send(
sender=self.__class__, **notif_args, channels=['websocket']
)
return Response ({
'status': 'SUCCESS', 'uri': chat_session.uri, 'message': message,
'user': deserialize_user(user)
})
在我们向用户返回响应之前,我们发送带有参数的通知信号,大多数参数都是不言自明的。
该silent
参数表示通知不会持久保存到数据库中。换句话说,我们使用django-notifs
类似的事件发射器。您还可以将任意通知数据extra_data
作为字典传递到参数中。
通知渠道
django-notifs 用于channels
传递消息。这意味着您可以编写自己的自定义渠道来通过电子邮件、短信、Slack 以及任何您能想到的方式传递消息。它甚至配备了一个内置的 websocket 通道,但这不足以满足我们的情况,因为它是一个用户到用户的通道。
我们希望同时向多个客户端广播消息。这种通信模式称为 Pub/Sub(发布订阅),RabbitMQ 对此提供支持exchanges
。
Anexchange
是一个从生产者(我们的应用程序)接收消息然后将其广播到多个队列的通道。有四种不同的类型,exchanges
即直接、主题、标题和扇出。我们将使用fanout
最容易理解并且完美适合我们用例的交换。
这是 RabbitMQ 文档中有关扇出交换如何工作的说明:
在队列可以接收消息之前,它必须绑定到交换器。
为了实现 Pub/Sub 模式,我们需要编写自己的交付通道。
这很简单。创建一个名为的新文件channels.py
"""Notification channels for django-notifs."""
from json import dumps
import pika
from notifications.channels import BaseNotificationChannel
class BroadCastWebSocketChannel(BaseNotificationChannel):
"""Fanout notification channel with RabbitMQ."""
def _connect(self):
"""Connect to the RabbitMQ server."""
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
return connection, channel
def construct_message(self):
"""Construct the message to be sent."""
extra_data = self.notification_kwargs['extra_data']
return dumps(extra_data['message'])
def notify(self, message):
"""put the message of the RabbitMQ queue."""
connection, channel = self._connect()
uri = self.notification_kwargs['extra_data']['uri']
channel.exchange_declare(exchange=uri, exchange_type='fanout')
channel.basic_publish(exchange=uri, routing_key='', body=message)
connection.close()
我们将交换名称设置为uri
聊天会话的名称。
我们还将聊天消息转储为字典。我们需要有关客户端消息的所有详细信息,而不仅仅是实际消息。
您需要介绍django-notifs
您刚刚创建的新频道。在您的应用程序设置中包括以下内容:
# Celery settings CELERY_TASK_ALWAYS_EAGER = True
# notifications settings NOTIFICATIONS_CHANNELS = {
'websocket': 'chat.channels.BroadCastWebSocketChannel'
}
这告诉它使用转发通知到我们的 websocket 通道,该通道处理将消息发送到的逻辑RabbitMQ
。
django-notifs 使用 Celery 异步处理通知,因此长时间运行的通知任务(如发送电子邮件)不会阻止用户的请求。
在该chatire
文件夹内,创建一个名为的新文件celery.py
并包含以下内容:
"""Celery init."""
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'chatire.settings')
app = Celery('chatire')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
线路app.autodiscover_tasks()
非常重要。它会自动查找并导入在没有它的情况下定义的 celery 任务django-notifs
,您必须手动导入该任务。
包括__init__.py
这个:
"""Initialize celery."""
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
这会在应用程序加载后导入app
我们创建的对象。celery.py
celery_app
因为我们设置CELERY_TASK_ALWAYS_EAGER
为True
in,settings.py
所以我们应该能够在没有 Celery 工作线程的情况下同步发送消息。如果您希望将异步行为(我强烈推荐)设置CELERY_TASK_ALWAYS_EAGER
为False
或完全忽略它并使用以下命令启动 celery 工作程序:
celery -A chatire worker -l info
django-notifs
确保您看到下面列出的任务[tasks]
尝试通过聊天 UI 发送消息。应创建一个基于聊天会话 uri 的新 RabbitMQ 交换。
要查看我们拥有的交换列表(对于 *nix 系统),请在终端中运行以下命令:
rabbitmqctl list_exchanges
Listing exchanges
amq.match headers
amq.direct direct
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
direct
amq.fanout fanout
amq.headers headers
fe662fd9de834fc fanout # our Exchange
您还可以看到一些内置的交换。
现在我们将动态创建队列并将它们绑定到我们之前创建的交换器,以便它们可以接收消息。
创建一个名为 的新文件websocket.py
。
"""Receive messages over from RabbitMQ and send them over the websocket."""
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
channel.exchange_declare(
exchange='fe662fd9de834fc', exchange_type='fanout'
)
# exclusive means the queue should be deleted once the connection is closed result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue # random queue name generated by RabbitMQ
channel.queue_bind(exchange='fe662fd9de834fc', queue=queue_name)
print('listening for messages...')
while True:
for method_frame, _, body in channel.consume(queue_name):
try:
print(body)
except OSError as error:
print(error)
else:
# acknowledge the message channel.basic_ack(method_frame.delivery_tag)
再次,在连接到RabbitMQ
使用后pika
,我们声明了交换。
多次声明交换器(或队列)不会产生不利影响,如果交换器事先不存在,则RabbitMQ
创建它,否则它不执行任何操作。
让我们仔细看看这一行:
channel.queue_bind(exchange='fe662fd9de834fc', queue=queue_name)
这将队列绑定到交换器。它更像是“嘿交换,我对您收到的消息感兴趣。请把它们寄给我。”
是queue_name
随机生成的,RabbitMQ
因为我们调用时queue_declare
没有传递名称。
从通道中消费消息的方式有多种。您可以使用回调或使用for loop
. 我们选择了第二个选项,这样我们就可以优雅地处理当我们尝试将消息发送到客户端时可能发生的异常。当我们最终实现 WebSocket 时,这将更有意义。
channel.basic_ack(method_frame.delivery_tag)
确认客户端已成功接收消息并且可以将其从队列中删除。如果消息未被确认,它将保留在队列中,直到队列本身被删除。
有关方法框架和不同类型的框架的更多信息,请查看 RabbitMQ 文档。
继续并开始运行该websocket
文件:
$ python websocket.py
listening for messages...
现在返回聊天 UI 并发送一些消息。我发送了"hello world"
和"how are you doing"
这就是输出。
listening for messages...
b'{"user": {"id": 1, "username": "danidee", "email": "", "first_name": "", "last_name": ""}, "message": "Hello world"}'
b'{"user": {"id": 12, "username": "daniel", "email": "", "first_name": "", "last_name": ""}, "message": "How are you doing"}'
打开一个新终端,运行 websocket 文件并从聊天 UI 发送更多消息。您应该仍然能够看到新消息。
干得好!现在只剩下一件事了,那就是将消息直接发送给用户。
网络套接字在哪里?
Websocket 包含在核心uwsgi
Python 对象中。首先,uWSGI
如果您还没有安装的话。
$ pip install uwsgi
我们将对文件进行一些websocket.py
修改
"""Receive messages over from RabbitMQ and send them over the websocket."""
import sys
import pika
import uwsgi
def application(env, start_response):
"""Setup the Websocket Server and read messages off the queue."""
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
exchange = env['PATH_INFO'].replace('/', '')
channel.exchange_declare(
exchange=exchange, exchange_type='fanout'
)
# exclusive means the queue should be deleted once the connection is closed result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue # random queue name generated by RabbitMQ
channel.queue_bind(exchange=exchange, queue=queue_name)
uwsgi.websocket_handshake(
env['HTTP_SEC_WEBSOCKET_KEY'],
env.get('HTTP_ORIGIN', '')
)
def keepalive():
"""Keep the websocket connection alive (called every 30 seconds)."""
print('PING/PONG...')
try:
uwsgi.websocket_recv_nb()
connection.add_timeout(30, keepalive)
except OSError as error:
connection.close()
print(error)
sys.exit(1) # Kill process and force uWSGI to Respawn
keepalive()
while True:
for method_frame, _, body in channel.consume(queue_name):
try:
uwsgi.websocket_send(body)
except OSError as error:
print(error)
sys.exit(1) # Force uWSGI to Respawn else:
# acknowledge the message channel.basic_ack(method_frame.delivery_tag)
websocket uwsgi
api 非常简单。我们只使用三种方法:
- uwsgi.websocket_handshake:握手是HTTP到WS协议的桥梁。此方法尝试将客户端和服务器连接在一起,如果由于任何原因失败,则会引发异常。
- uwsgi.websocket_recv_nb:这个方法实际上是具有欺骗性和误导性的(我真的很认真),因为尽管全名是
websocket receive non blocking
它不仅以非阻塞方式接收消息,它还通过发送来帮助维持与客户端的连接到pong
浏览器。(心跳机制检查客户端是否还活着) 保持活动功能每 30 秒调用此方法,否则如果客户端没有收到服务器的消息,客户端可能会断开连接(通常在一分钟不活动后)。 - uwsgi.websocket_send:你不需要占卜者来告诉你这个:-)虽然我们需要错误处理程序的原因是万一连接关闭并且我们尝试发送消息会
uwsgi.websocket_send
引发OSError
. 我们将关闭与 RabbitMQ 的连接并终止该进程。uWSGI
将为我们重新启动它。此外,该else
块永远不会运行,这意味着我们不会确认该消息并且它将保留在队列中。 下次我们进入 for 循环并调用时channel.consume
,将发送未确认的消息以及队列中的任何新消息。这意味着我们永远不会因为网络连接而错过任何消息。
您是否注意到交换uri
不再是硬编码的?相反,我们从连接 URL 中获取交换名称,这需要我们的客户端连接到如下 URL:
如果这对您没有任何意义,请不要担心,当我们最终将 Vue 前端连接到 WebSocket 服务器时,很多事情都会弄清楚。
使用 JavaScript 连接到 WebSocket
如果没有 JavaScript,就不可能在 Web 应用程序的上下文中谈论 WebSocket。大多数现代浏览器已经支持 WebSocket,因此我们不需要安装任何 polyfill。
让我们更新一下Chat
组件
<script>
const $ = window.jQuery
export default {
...
created () {
this.username = sessionStorage.getItem('username')
// Setup headers for all requests
$.ajaxSetup({
beforeSend: function(xhr) {
xhr.setRequestHeader('Authorization', `JWT ${sessionStorage.getItem('authToken')}`)
}
})
if (this.$route.params.uri) {
this.joinChatSession()
}
this.connectToWebSocket()
},
methods: {
...
postMessage (event) {
const data = {message: this.message}
$.post(`http://localhost:8000/api/chats/${this.$route.params.uri}/messages/`, data, (data) => {
this.message = '' // clear the message after sending
})
.fail((response) => {
alert(response.responseText)
})
},
joinChatSession () {
...
},
fetchChatSessionHistory () {
...
},
connectToWebSocket () {
const websocket = new WebSocket(`ws://localhost:8081/${this.$route.params.uri}`)
websocket.onopen = this.onOpen
websocket.onclose = this.onClose
websocket.onmessage = this.onMessage
websocket.onerror = this.onError
},
onOpen (event) {
console.log('Connection opened.', event.data)
},
onClose (event) {
console.log('Connection closed.', event.data)
// Try and Reconnect after five seconds
setTimeout(this.connectToWebSocket, 5000)
},
onMessage (event) {
const message = JSON.parse(event.data)
this.messages.push(message)
},
onError (event) {
alert('An error occured:', event.data)
}
}
}
</script>
现在在端口 8081 上启动uWSGI
WebSocket 服务器并重新加载浏览器。
$ uwsgi --http :8081 --module websocket --master --processes 4
万岁!
您应该能够发送消息并看到它们实时显示。
万岁!
还有一个问题。再打开 3 个选项卡(5 个活跃客户端)。最后一个客户端将无法连接,因为我们指定的 4 个进程被其他客户端占用。
注意:这就是为什么我们需要立即终止卡住的进程的原因,sys.exit(1)
因为该进程可能并没有真正卡住,用户可能故意离开聊天室,并且uWSGI需要一段时间才能在关闭连接之前找出客户端已断开连接在服务器上。
–master 选项调用主进程,该进程监视死亡进程并重新启动它们,否则该进程将死亡并且永远不会重新启动,并且它将继续直到最后一个进程死亡并且 uWSGI 退出。
那么为什么会发生这种情况呢?
我以为你说的是 WebSockets Scale
是的,我说过。我们当前的设置无法扩展的原因仅仅是因为我们的运行方式uWSGI
;像普通的 python 服务器一样WSGI
运行进程和线程。
一个简单的解决方案是增加进程,但这只能使我们的用户数量可能达到几百或更少,具体取决于服务器的资源。
异步IO和并发
异步 IO 值得单独写一整篇文章。如果你对它是什么一无所知,我建议你疯狂地搜索谷歌并阅读你能找到的每一篇文章。
基本上,背后的想法AsyncIO
或简单地说async
是当我们需要运行多个 IO 绑定任务时。在上述 IO 操作期间(在我们的例子中,发送和接收消息),进程不会闲置等待新消息,而是可以快速切换到另一个 IO 绑定任务并运行它。
这个简单的概念(高级解释)使得NodeJS
excel 对于 IO 密集型应用程序如此重要。
对于 python 来说,uWSGI
事情有点不同,因为它们在设计上不是异步的。python 有几个异步库。官方 asyncio、gevent、curio 等uWSGI
本身支持其中一些库,asyncio
但我们将使用gevent
.
根据我的经验,我发现与和gevent
相比效果更好。还有很多有用的方法。例如,它用 gevent 的库替换了大部分标准库,并允许您编写异步执行的同步代码。asyncio
uGreen
Gevent
monkey.patch_all
使用 pip 安装 gevent。
$ pip install gevent
现在你只需要uWSGI
像这样启动 WebSocket 服务器:
$ uwsgi --http :8081 --gevent 2 --module websocket --gevent-monkey-patch --master
首先,我们从 2 个 gevent 线程和一个进程开始,这意味着当更多客户端尝试加入时,我们只能合理地处理两个客户端(它在 3 到 4 个客户端之间随机波动,但大多数时候只有 2 个客户端可靠地接收消息)你会收到这样一句话的警告uWSGI
:
async queue is full !!!
有两种方法可以扩展,最简单的方法是增加运行的 gevent 线程(实际上是 greenlet)的数量。如果我们将uWSGI
启动代码更改为这样
$ uwsgi --http :8081 --gevent 100 --module websocket --gevent-monkey-patch --master
繁荣!就像这样我们可以处理 100 个并发用户。
如果有多个进程呢?
这是第二种扩展方式,你可以有多个进程让我们uWSGI
用 4 个进程启动服务器
$ uwsgi --http :8081 --gevent 100 --module websocket --gevent-monkey-patch --master --processes 4
4 个进程 * 100 –gevent 线程,即 400 个并发用户!
根据服务器的规格和配置,您可以增加进程和 gevent 线程的数量,但在执行此操作之前,请确保分析和监视应用程序的性能,因为在某个阶段增加数量会导致性能下降。
uWSGI
具有一个名为的 python 包(可从 pip 安装),uwsgitop
可用于监视它。但这超出了本教程的范围。也许我将来会写它。
扩展到多个服务器
在某个时刻,我们将耗尽服务器的资源,并且需要扩展到多个服务器。因为我们的 websocket 服务器没有耦合到我们的主 django 应用程序。这相当容易,因为我们可以uWSGI
在 Nginx 后面对多个服务器(每个服务器运行多个进程和 gevent 线程)进行负载平衡。
在您不知不觉中,您将轻松处理数千个连接。
当您需要横向扩展时,您还可以将相同的集群和负载平衡技术应用于 RabbitMQ(尽管不能使用 Nginx,因为 RabbitMQ 不使用 HTTP)。查看文档 https://www.rabbitmq.com/ha.html
好吧好吧……这就是本教程的全部内容,我希望您在跟随的过程中获得了一些新技能。
源代码地址: