消息推送系统——(三)Python服务器端的实现
这是消息推送系统系列文章中的4/4篇在上一篇《消息推送系统——(二)Javascript客户端的构建 》中我们分析了消息推送系统中客户端的构建思路,这里会继续来梳理消息推送系统中服务器端的实现方法。
服务器端是用Python语言编写的,基于框架Tornado之上。Tornado是一个强大的Python Web框架,对异步处理支持得很好。而异步处理就是消息推送服务器的关键所在。
消息推送服务器有一个比较关键的部分,即需要接收消息的客户端(在线的客户端),我们叫listeners。所以服务器需要一个端口来接收listeners发送的“上线”请求。服务器的内部就需要把在线的listeners保存下来。同时我们要求listeners在“下线”时也要发送“下线”的请求,方便服务器将其从在线listeners中移除。但是考虑网络的不稳定因素,也需要有一个机制来及时清除“断线”的客户端。
首先来说一下将要用到的数据结构:
{
'listeners' :
{
#user_id : 回调函数列表
'1001' : {'callback' : []}, #回调函数用于推送消息
'1002' : {'callback' : []},
...
}
}
接着引人需要的python类、包以及tornado框架:
import sys
import json
import csv
import string
import urllib
import urllib2
import time
import hashlib
import re
import logging
import uuid
from urlparse import urlparse
import tornado.web
import tornado.ioloop
import tornado.database
然后来写一个基本的类来处理在线用户列表:
class HandlerMixin(object):
listeners = {}
#将user_id和callback存入listensers
def add_listener(self, user_id, callback):
handler = HandlerMixin
if user_id not in handler.listeners:
handler.listeners[user_id] = {'callback':set()}
user_callback = handler.listeners[user_id]['callback']
user_callback.add(callback)
#将user_id从listeners中移除,并删除callback
def del_listener(self, user_id, callback):
handler = HandlerMixin
if user_id in handler.listeners:
user_callback = handler.listeners[user_id]['callback']
if callback in user_callback:
user_callback.remove(callback)
del callback
if len(handler.listeners[user_id]['callback']) == 0:
del handler.listeners[user_id]
#调用某个listenser的callback, 即推送消息
def send_message(self, user_id, message):
handler = HandlerMixin
if user_id in handler.listeners:
user_callback = handler.listeners[user_id]['callback']
for one_callback in user_callback:
one_callback(message)
del handler.listeners[user_id]
然后就是服务器的关键部分,利用tornado的异步机制来使客户端保持在线。如《消息推送系统——(二)Javascript客户端的构建 》中就是利用了HTTP的长连接来一直等待服务器的响应。
class StartListeningHandler(tornado.web.RequestHandler, HandlerMixin):
#定义可发起请求的客户端domain
available_domains = ['http://www.fancycedar.info', 'http://www.icarsclub.com']
#这一句是申明该函数要使用异步机制
@tornado.web.asynchronous
def get(self):
user = {
"id" : self.get_argument("id", '').replace('"', ''),
"name" : self.get_argument("name", '').replace('"', ''),
}
self.add_listener(self.user_id, self._callback)
#处理用户断线、超时、关闭浏览器等情况
def on_connection_close(self):
self.del_listener(self.user_id, self._callback)
def on_finish(self):
pass
#监测客户端请求HTTP头中的origin信息,确保是自己的domain
def _check_origin(self, origin):
domain = string.replace(origin, 'https://', '')
domain = string.replace(origin, 'http://', '')
return domain in self.available_domains
#这就是回调函数的定义
def _callback(self, message):
#确保在客户端保持连接时才继续执行
if self.request.connection.stream.closed():
return
#检查origin来确保通信安全
headers = dict(self.request.headers)
origin = headers.get("Origin", None)
if origin is None or self._check_origin(origin) is False:
if origin is None:
origin = '-'
self.finish()
return
#准备推送消息的http头
self.set_header("Content-Type", "text/plain")
self.set_header("Access-Control-Allow-Origin", origin)
self.set_header("Access-Control-Allow-Methods", "GET,POST")
self.set_header("Access-Control-Allow-Credentials", "true")
data = {"msg":message, "sys_time":time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}
self.finish(data)
这些代码都部署在Pushserver.py里面。然后用一个main.py来调用tornado:
#!/usr/bin/python
import os
import sys
import tornado.options
import tornado.ioloop
import tornado.web
import Pushserver
settings = {
"debug": False
}
application = tornado.web.Application([
(r"/start_listening", Pushserver.StartListeningHandler),
], **settings)
if __name__ == "__main__":
tornado.options.parse_command_line()
application.listen(int(sys.argv[1]))
tornado.ioloop.IOLoop.instance().start()
最后将main.py一直运行起来,监听端口提供服务:
$ ./main.py 11200
然后JS客户端通过请求http://xxx.com/start_listening:11200?uid=xxxx&name=xxx来连接服务器了。
一个消息推送服务器就这样实现了。原理其实很简单对不对?
下面还是用一张图来说明一下服务器的结构和流程:
消息推送服务器端结构和流程
这里的代码是精简之后的,重点在于说明原理。作为一个服务器,对错误的处理、日志等都是需要做的很严格而周全的,所以在实际的项目中一定要注意这些方面的考虑。
原文链接:http://www.fancycedar.info/2013/05/push-server-python-server/
服务器端是用Python语言编写的,基于框架Tornado之上。Tornado是一个强大的Python Web框架,对异步处理支持得很好。而异步处理就是消息推送服务器的关键所在。
消息推送服务器有一个比较关键的部分,即需要接收消息的客户端(在线的客户端),我们叫listeners。所以服务器需要一个端口来接收listeners发送的“上线”请求。服务器的内部就需要把在线的listeners保存下来。同时我们要求listeners在“下线”时也要发送“下线”的请求,方便服务器将其从在线listeners中移除。但是考虑网络的不稳定因素,也需要有一个机制来及时清除“断线”的客户端。
首先来说一下将要用到的数据结构:
{
'listeners' :
{
#user_id : 回调函数列表
'1001' : {'callback' : []}, #回调函数用于推送消息
'1002' : {'callback' : []},
...
}
}
接着引人需要的python类、包以及tornado框架:
import sys
import json
import csv
import string
import urllib
import urllib2
import time
import hashlib
import re
import logging
import uuid
from urlparse import urlparse
import tornado.web
import tornado.ioloop
import tornado.database
然后来写一个基本的类来处理在线用户列表:
class HandlerMixin(object):
listeners = {}
#将user_id和callback存入listensers
def add_listener(self, user_id, callback):
handler = HandlerMixin
if user_id not in handler.listeners:
handler.listeners[user_id] = {'callback':set()}
user_callback = handler.listeners[user_id]['callback']
user_callback.add(callback)
#将user_id从listeners中移除,并删除callback
def del_listener(self, user_id, callback):
handler = HandlerMixin
if user_id in handler.listeners:
user_callback = handler.listeners[user_id]['callback']
if callback in user_callback:
user_callback.remove(callback)
del callback
if len(handler.listeners[user_id]['callback']) == 0:
del handler.listeners[user_id]
#调用某个listenser的callback, 即推送消息
def send_message(self, user_id, message):
handler = HandlerMixin
if user_id in handler.listeners:
user_callback = handler.listeners[user_id]['callback']
for one_callback in user_callback:
one_callback(message)
del handler.listeners[user_id]
然后就是服务器的关键部分,利用tornado的异步机制来使客户端保持在线。如《消息推送系统——(二)Javascript客户端的构建 》中就是利用了HTTP的长连接来一直等待服务器的响应。
class StartListeningHandler(tornado.web.RequestHandler, HandlerMixin):
#定义可发起请求的客户端domain
available_domains = ['http://www.fancycedar.info', 'http://www.icarsclub.com']
#这一句是申明该函数要使用异步机制
@tornado.web.asynchronous
def get(self):
user = {
"id" : self.get_argument("id", '').replace('"', ''),
"name" : self.get_argument("name", '').replace('"', ''),
}
self.add_listener(self.user_id, self._callback)
#处理用户断线、超时、关闭浏览器等情况
def on_connection_close(self):
self.del_listener(self.user_id, self._callback)
def on_finish(self):
pass
#监测客户端请求HTTP头中的origin信息,确保是自己的domain
def _check_origin(self, origin):
domain = string.replace(origin, 'https://', '')
domain = string.replace(origin, 'http://', '')
return domain in self.available_domains
#这就是回调函数的定义
def _callback(self, message):
#确保在客户端保持连接时才继续执行
if self.request.connection.stream.closed():
return
#检查origin来确保通信安全
headers = dict(self.request.headers)
origin = headers.get("Origin", None)
if origin is None or self._check_origin(origin) is False:
if origin is None:
origin = '-'
self.finish()
return
#准备推送消息的http头
self.set_header("Content-Type", "text/plain")
self.set_header("Access-Control-Allow-Origin", origin)
self.set_header("Access-Control-Allow-Methods", "GET,POST")
self.set_header("Access-Control-Allow-Credentials", "true")
data = {"msg":message, "sys_time":time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}
self.finish(data)
这些代码都部署在Pushserver.py里面。然后用一个main.py来调用tornado:
#!/usr/bin/python
import os
import sys
import tornado.options
import tornado.ioloop
import tornado.web
import Pushserver
settings = {
"debug": False
}
application = tornado.web.Application([
(r"/start_listening", Pushserver.StartListeningHandler),
], **settings)
if __name__ == "__main__":
tornado.options.parse_command_line()
application.listen(int(sys.argv[1]))
tornado.ioloop.IOLoop.instance().start()
最后将main.py一直运行起来,监听端口提供服务:
$ ./main.py 11200
然后JS客户端通过请求http://xxx.com/start_listening:11200?uid=xxxx&name=xxx来连接服务器了。
一个消息推送服务器就这样实现了。原理其实很简单对不对?
下面还是用一张图来说明一下服务器的结构和流程:
消息推送服务器端结构和流程
这里的代码是精简之后的,重点在于说明原理。作为一个服务器,对错误的处理、日志等都是需要做的很严格而周全的,所以在实际的项目中一定要注意这些方面的考虑。
原文链接:http://www.fancycedar.info/2013/05/push-server-python-server/
热门话题 · · · · · · ( 去话题广场 )
- 我们为什么需要书店? 3.0万次浏览
- 解锁我的夏日旅行足迹地图 活动 71.3万次浏览
- 你有哪些保持精力充沛的方法? 2.4万次浏览
- 如何阅读一片叶子 2024次浏览
- 当代打工人精神状态be like 新话题
- 我这偷感十足的一生 新话题 · 7148次浏览