port websocket_server from Node to Tornado

This commit is contained in:
Kyle Mahan 2015-02-24 21:53:35 -08:00
parent 5ae94c2273
commit 6efa7f8d1d
7 changed files with 96 additions and 34 deletions

View file

@ -12,4 +12,4 @@ setup(name='Woodwind',
install_requires=[
'Flask', 'Flask-Login', 'Flask-Micropub', 'Flask-SQLAlchemy',
'beautifulsoup4', 'bleach', 'celery', 'feedparser', 'html5lib',
'mf2py', 'mf2util', 'redis', 'requests'])
'mf2py', 'mf2util', 'redis', 'requests', 'tornado'])

View file

@ -1,4 +1,5 @@
$(function(){
function clickOlderLink(evt) {
evt.preventDefault();
$.get(this.href, function(result) {
@ -66,5 +67,30 @@ $(function(){
});
}
// topic will be user:id or feed:id
function webSocketSubscribe(topic) {
if ('WebSocket' in window) {
var ws = new WebSocket(window.location.origin
.replace(/https?:\/\//, 'ws://')
.replace(/(:\d+)?$/, ':8077'));
ws.onopen = function(event) {
// send the topic
console.log('subscribing to topic: ' + topic);
ws.send(topic);
};
ws.onmessage = function(event) {
var data = JSON.parse(event.data);
data.entries.forEach(function(entryHtml) {
$('body main').prepend(entryHtml);
});
attachListeners();
};
}
}
attachListeners();
if (WS_TOPIC) {
webSocketSubscribe(WS_TOPIC);
}
});

View file

@ -1,23 +0,0 @@
// topic will be woodwind::user:id or woodwind::feed:id
function webSocketSubscribe(topic) {
if ('WebSocket' in window) {
var ws = new WebSocket(window.location.origin
.replace(/https?:\/\//, 'ws://')
.replace(/(:\d+)?$/, ':8077'));
ws.onopen = function(event) {
// send the topic
console.log('subscribing to topic: ' + topic);
ws.send(topic);
};
ws.onmessage = function(event) {
var data = JSON.parse(event.data);
data.entries.forEach(function(entryHtml) {
$('body main').prepend(entryHtml);
});
};
}
}

View file

@ -194,7 +194,7 @@ def notify_feed_updated(session, feed, entries):
for e in entries
],
})
redis.publish('woodwind:user:{}'.format(user.id), message)
redis.publish('woodwind_notify', message)
def is_content_equal(e1, e2):

View file

@ -1,7 +1,10 @@
{% extends "base.jinja2" %}
{% block head %}
<script src="{{url_for('static', filename='feed.js', version='2015-02-19')}}"></script>
<script src="{{ url_for('static', filename='websocket_client.js', version='2015-02-22') }}"></script>
{% if ws_topic %}
<script>var WS_TOPIC = "{{ ws_topic }}";</script>
{% endif %}
<script src="{{url_for('static', filename='feed.js', version='2015-02-24')}}"></script>
{% if current_user and current_user.settings
and current_user.settings.get('reply-method') == 'indie-config' %}
@ -31,10 +34,4 @@
</div>
{% endif %}
{% if current_user.is_authenticated() %}
<script>
webSocketSubscribe('woodwind:user:' + {{ current_user.id }});
</script>
{% endif %}
{% endblock body %}

View file

@ -21,6 +21,8 @@ views = flask.Blueprint('views', __name__)
def index():
page = int(flask.request.args.get('page', 1))
entries = []
ws_topic = None
if flask_login.current_user.is_authenticated():
per_page = flask.current_app.config.get('PER_PAGE', 30)
offset = (page - 1) * per_page
@ -32,12 +34,19 @@ def index():
if 'feed' in flask.request.args:
feed_hex = flask.request.args.get('feed').encode()
feed_url = binascii.unhexlify(feed_hex).decode('utf-8')
feed = Feed.query.filter_by(feed=feed_url).first()
if not feed:
flask.abort(404)
entry_query = entry_query.filter(Feed.feed == feed_url)
ws_topic = 'feed:{}'.format(feed.id)
else:
ws_topic = 'user:{}'.format(flask_login.current_user.id)
entries = entry_query.order_by(Entry.published.desc())\
.offset(offset).limit(per_page).all()
return flask.render_template('feed.jinja2', entries=entries, page=page)
return flask.render_template('feed.jinja2', entries=entries, page=page,
ws_topic=ws_topic)
@views.route('/install')

View file

@ -0,0 +1,53 @@
import itertools
import json
import redis
import threading
import tornado.ioloop
import tornado.websocket
SUBSCRIBERS = {}
def redis_loop():
r = redis.StrictRedis()
ps = r.pubsub()
ps.subscribe('woodwind_notify')
for message in ps.listen():
if message['type'] == 'message':
msg_data = str(message.get('data'), 'utf-8')
msg_blob = json.loads(msg_data)
user_topic = 'user:{}'.format(msg_blob.get('user'))
feed_topic = 'feed:{}'.format(msg_blob.get('feed'))
for subscriber in itertools.chain(
SUBSCRIBERS.get(user_topic, []),
SUBSCRIBERS.get(feed_topic, [])):
tornado.ioloop.IOLoop.instance().add_callback(
lambda: subscriber.forward_message(msg_data))
class WSHandler(tornado.websocket.WebSocketHandler):
def check_origin(self, origin):
return True
def forward_message(self, message):
self.write_message(message)
def on_message(self, topic):
self.topic = topic
SUBSCRIBERS.setdefault(topic, []).append(self)
def on_close(self):
if hasattr(self, 'topic'):
SUBSCRIBERS.setdefault(self.topic, []).append(self)
application = tornado.web.Application([
(r'/', WSHandler),
])
if __name__ == '__main__':
threading.Thread(target=redis_loop).start()
application.listen(8077)
tornado.ioloop.IOLoop.instance().start()