diff --git a/woodwind/tasks.py b/woodwind/tasks.py index 9d818fc..e5e3c79 100644 --- a/woodwind/tasks.py +++ b/woodwind/tasks.py @@ -210,7 +210,8 @@ def notify_feed_updated(session, feed, entries): for e in entries ], }) - redis.publish('woodwind_notify', message) + for topic in 'user:{}'.format(user.id), 'feed:{}'.format(feed.id): + redis.publish('woodwind_notify:{}'.format(topic), message) def is_content_equal(e1, e2): diff --git a/woodwind/websocket_server.py b/woodwind/websocket_server.py index f519334..e3ff603 100644 --- a/woodwind/websocket_server.py +++ b/woodwind/websocket_server.py @@ -1,56 +1,22 @@ -import itertools -import json -import redis -import threading -import tornado.ioloop -import tornado.websocket +import websockets +import asyncio +import asyncio_redis -SUBSCRIBERS = {} +@asyncio.coroutine +def handle_subscription(websocket, path): + topic = yield from websocket.recv() + redis = yield from asyncio_redis.Connection.create() + ps = yield from redis.start_subscribe() + yield from ps.subscribe(['woodwind_notify:' + topic]) + while True: + message = yield from ps.next_published() + if not websocket.open: + break + yield from websocket.send(message.value) + redis.close() -def redis_loop(): - r = redis.StrictRedis() - ps = r.pubsub() - ps.subscribe('woodwind_notify') - for message in ps.listen(): - if message['type'] == 'message': - msg_data = str(message.get('data'), 'utf-8') - msg_blob = json.loads(msg_data) - user_topic = 'user:{}'.format(msg_blob.get('user')) - feed_topic = 'feed:{}'.format(msg_blob.get('feed')) - for subscriber in itertools.chain( - SUBSCRIBERS.get(user_topic, []), - SUBSCRIBERS.get(feed_topic, [])): - tornado.ioloop.IOLoop.instance().add_callback( - lambda: subscriber.forward_message(msg_data)) - - -class WSHandler(tornado.websocket.WebSocketHandler): - def check_origin(self, origin): - return True - - def forward_message(self, message): - try: - self.write_message(message) - except tornado.websocket.WebSocketClosedError: - self.on_close() - - def on_message(self, topic): - self.topic = topic - SUBSCRIBERS.setdefault(topic, []).append(self) - - def on_close(self): - if hasattr(self, 'topic'): - SUBSCRIBERS.get(self.topic, []).remove(self) - - -application = tornado.web.Application([ - (r'/', WSHandler), -]) - - -if __name__ == '__main__': - threading.Thread(target=redis_loop).start() - application.listen(8077) - tornado.ioloop.IOLoop.instance().start() +asyncio.get_event_loop().run_until_complete( + websockets.serve(handle_subscription, 'localhost', 8077)) +asyncio.get_event_loop().run_forever()