use asyncio instead of tornado to handle websocket stuff
This commit is contained in:
parent
e93b9e8891
commit
dae2a0db02
2 changed files with 20 additions and 53 deletions
|
@ -210,7 +210,8 @@ def notify_feed_updated(session, feed, entries):
|
||||||
for e in entries
|
for e in entries
|
||||||
],
|
],
|
||||||
})
|
})
|
||||||
redis.publish('woodwind_notify', message)
|
for topic in 'user:{}'.format(user.id), 'feed:{}'.format(feed.id):
|
||||||
|
redis.publish('woodwind_notify:{}'.format(topic), message)
|
||||||
|
|
||||||
|
|
||||||
def is_content_equal(e1, e2):
|
def is_content_equal(e1, e2):
|
||||||
|
|
|
@ -1,56 +1,22 @@
|
||||||
import itertools
|
import websockets
|
||||||
import json
|
import asyncio
|
||||||
import redis
|
import asyncio_redis
|
||||||
import threading
|
|
||||||
import tornado.ioloop
|
|
||||||
import tornado.websocket
|
|
||||||
|
|
||||||
|
|
||||||
SUBSCRIBERS = {}
|
@asyncio.coroutine
|
||||||
|
def handle_subscription(websocket, path):
|
||||||
|
topic = yield from websocket.recv()
|
||||||
|
redis = yield from asyncio_redis.Connection.create()
|
||||||
|
ps = yield from redis.start_subscribe()
|
||||||
|
yield from ps.subscribe(['woodwind_notify:' + topic])
|
||||||
|
while True:
|
||||||
|
message = yield from ps.next_published()
|
||||||
|
if not websocket.open:
|
||||||
|
break
|
||||||
|
yield from websocket.send(message.value)
|
||||||
|
redis.close()
|
||||||
|
|
||||||
|
|
||||||
def redis_loop():
|
asyncio.get_event_loop().run_until_complete(
|
||||||
r = redis.StrictRedis()
|
websockets.serve(handle_subscription, 'localhost', 8077))
|
||||||
ps = r.pubsub()
|
asyncio.get_event_loop().run_forever()
|
||||||
ps.subscribe('woodwind_notify')
|
|
||||||
for message in ps.listen():
|
|
||||||
if message['type'] == 'message':
|
|
||||||
msg_data = str(message.get('data'), 'utf-8')
|
|
||||||
msg_blob = json.loads(msg_data)
|
|
||||||
user_topic = 'user:{}'.format(msg_blob.get('user'))
|
|
||||||
feed_topic = 'feed:{}'.format(msg_blob.get('feed'))
|
|
||||||
for subscriber in itertools.chain(
|
|
||||||
SUBSCRIBERS.get(user_topic, []),
|
|
||||||
SUBSCRIBERS.get(feed_topic, [])):
|
|
||||||
tornado.ioloop.IOLoop.instance().add_callback(
|
|
||||||
lambda: subscriber.forward_message(msg_data))
|
|
||||||
|
|
||||||
|
|
||||||
class WSHandler(tornado.websocket.WebSocketHandler):
|
|
||||||
def check_origin(self, origin):
|
|
||||||
return True
|
|
||||||
|
|
||||||
def forward_message(self, message):
|
|
||||||
try:
|
|
||||||
self.write_message(message)
|
|
||||||
except tornado.websocket.WebSocketClosedError:
|
|
||||||
self.on_close()
|
|
||||||
|
|
||||||
def on_message(self, topic):
|
|
||||||
self.topic = topic
|
|
||||||
SUBSCRIBERS.setdefault(topic, []).append(self)
|
|
||||||
|
|
||||||
def on_close(self):
|
|
||||||
if hasattr(self, 'topic'):
|
|
||||||
SUBSCRIBERS.get(self.topic, []).remove(self)
|
|
||||||
|
|
||||||
|
|
||||||
application = tornado.web.Application([
|
|
||||||
(r'/', WSHandler),
|
|
||||||
])
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
threading.Thread(target=redis_loop).start()
|
|
||||||
application.listen(8077)
|
|
||||||
tornado.ioloop.IOLoop.instance().start()
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue