fat pings for PuSH

This commit is contained in:
Kyle Mahan 2015-04-01 23:17:48 -07:00
parent 90f5a273a3
commit 85e53fa4f6
3 changed files with 99 additions and 68 deletions

View file

@ -3,6 +3,7 @@ import json
import binascii
from .extensions import db
import re
import uuid
from sqlalchemy.ext.orderinglist import ordering_list
from sqlalchemy.ext.associationproxy import association_proxy
@ -104,11 +105,18 @@ class Feed(db.Model):
push_topic = db.Column(db.String(512))
push_verified = db.Column(db.Boolean)
push_expiry = db.Column(db.DateTime)
push_secret = db.Column(db.String(200))
last_pinged = db.Column(db.DateTime)
def get_feed_code(self):
return binascii.hexlify(self.feed.encode())
def get_or_create_push_secret(self):
if not self.push_secret:
self.push_secret = uuid.uuid4().hex
db.commit()
return self.push_secret
def __repr__(self):
return '<Feed:{},{}>'.format(self.name, self.feed)

View file

@ -3,6 +3,7 @@ from .extensions import db
from .models import Feed
from flask import Blueprint, request, abort, current_app, make_response
import datetime
import hmac
push = Blueprint('push', __name__)
@ -34,10 +35,10 @@ def notify(feed_id):
if topic != feed.push_topic:
current_app.logger.warn(
'feed topic (%s) does not match subscription request (%s)',
'feed topic (%s) does not match subscription request (%s)',
feed.push_topic, topic)
abort(404)
current_app.logger.debug(
'PuSH verify subscribe for feed=%r, topic=%s', feed, topic)
feed.push_verified = True
@ -63,7 +64,21 @@ def notify(feed_id):
# could it be? an actual push notification!?
current_app.logger.debug(
'received PuSH ping for %r; content size: %d', feed, len(request.data))
# try to process fat pings
content = None
signature = request.headers.get('X-Hub-Signature')
if signature and feed.push_secret and request.data:
h = hmac.new(feed.push_secret.encode('utf-8'),
msg=request.data, digestmod='sha1').hexdigest()
if h != signature:
current_app.logger.warn(
'X-Hub-Signature did not match expected key')
return make_response('', 204)
content = request.data.decode('utf-8')
tasks.q_high.enqueue(tasks.update_feed, feed.id,
content=content, is_polling=False)
feed.last_pinged = datetime.datetime.utcnow()
db.session.commit()
tasks.q_high.enqueue(tasks.update_feed, feed.id)
return make_response('', 204)

View file

@ -67,75 +67,82 @@ def tick():
logger.debug('Feed {} last checked {}'.format(
feed, feed.last_checked))
if (not feed.last_checked
or (not feed.push_verified and now - feed.last_checked > UPDATE_INTERVAL)
or (feed.push_verified and now - feed.last_checked > UPDATE_INTERVAL_PUSH)):
or (not feed.push_verified
and now - feed.last_checked > UPDATE_INTERVAL)
or (feed.push_verified
and now - feed.last_checked > UPDATE_INTERVAL_PUSH)):
q.enqueue(update_feed, feed.id)
def update_feed(feed_id):
def update_feed(feed_id, content=None, is_polling=True):
with session_scope() as session:
feed = session.query(Feed).get(feed_id)
logger.info('Updating {}'.format(feed))
process_feed(session, feed)
def process_feed(session, feed):
now = datetime.datetime.utcnow()
new_entries = []
try:
logger.info('fetching feed: %s', feed)
response = requests.get(feed.feed)
if response.status_code // 100 != 2:
logger.warn('bad response from %s. %r: %r', feed.feed,
response, response.text)
return
check_push_subscription(session, feed, response)
backfill = len(feed.entries) == 0 # backfill if this is the first pull
if feed.type == 'xml':
result = process_xml_feed_for_new_entries(
session, feed, response, backfill, now)
elif feed.type == 'html':
result = process_html_feed_for_new_entries(
session, feed, response, backfill, now)
else:
result = []
for entry in result:
old = session.query(Entry)\
.filter(Entry.feed == feed)\
.filter(Entry.uid == entry.uid).first()
# have we seen this post before
if not old or not is_content_equal(old, entry):
# set a default value for published if none is provided
if not entry.published:
entry.published = (old and old.published) or now
if old:
# if we're updating an old entriy, use the original
# retrieved time
entry.retrieved = old.retrieved
feed.entries.remove(old)
# punt on deleting for now, learn about cascade
# and stuff later
# session.delete(old)
feed.entries.append(entry)
session.commit()
new_entries.append(entry)
now = datetime.datetime.utcnow()
new_entries = []
try:
if content:
logger.info('using provided content. size=%d', len(content))
else:
logger.info('skipping previously seen post %s', old.permalink)
logger.info('fetching feed: %s', feed)
response = requests.get(feed.feed)
if response.status_code // 100 != 2:
logger.warn('bad response from %s. %r: %r', feed.feed,
response, response.text)
return
if is_polling:
check_push_subscription(session, feed, response)
content = get_response_content(response)
for entry in new_entries:
for in_reply_to in entry.get_property('in-reply-to', []):
fetch_reply_context(entry.id, in_reply_to, now)
# backfill if this is the first pull
backfill = len(feed.entries) == 0
if feed.type == 'xml':
result = process_xml_feed_for_new_entries(
session, feed, content, backfill, now)
elif feed.type == 'html':
result = process_html_feed_for_new_entries(
session, feed, content, backfill, now)
else:
result = []
finally:
feed.last_checked = now
if new_entries:
feed.last_updated = now
session.commit()
if new_entries:
notify_feed_updated(session, feed, new_entries)
for entry in result:
old = session.query(Entry)\
.filter(Entry.feed == feed)\
.filter(Entry.uid == entry.uid).first()
# have we seen this post before
if not old or not is_content_equal(old, entry):
# set a default value for published if none is provided
if not entry.published:
entry.published = (old and old.published) or now
if old:
# if we're updating an old entriy, use the original
# retrieved time
entry.retrieved = old.retrieved
feed.entries.remove(old)
# punt on deleting for now, learn about cascade
# and stuff later
# session.delete(old)
feed.entries.append(entry)
session.commit()
new_entries.append(entry)
else:
logger.debug(
'skipping previously seen post %s', old.permalink)
for entry in new_entries:
for in_reply_to in entry.get_property('in-reply-to', []):
fetch_reply_context(entry.id, in_reply_to, now)
finally:
if is_polling:
feed.last_checked = now
if new_entries:
feed.last_updated = now
session.commit()
if new_entries:
notify_feed_updated(session, feed, new_entries)
def check_push_subscription(session, feed, response):
@ -152,6 +159,7 @@ def check_push_subscription(session, feed, response):
'hub.mode': mode,
'hub.topic': topic,
'hub.callback': build_callback_url(),
'hub.secret': feed.get_or_create_push_secret(),
'hub.verify': 'sync', # backcompat with 0.3
# TODO secret should only be used over HTTPS
# 'hub.secret': secret,
@ -187,7 +195,8 @@ def check_push_subscription(session, feed, response):
topic = next((link['href'] for link in links
if 'self' in link['rel']), None)
if ((expiry and expiry - datetime.datetime.utcnow() <= UPDATE_INTERVAL_PUSH)
if ((expiry and expiry - datetime.datetime.utcnow()
<= UPDATE_INTERVAL_PUSH)
or hub != old_hub or topic != old_topic or not feed.push_verified):
feed.push_hub = hub
feed.push_topic = topic
@ -250,10 +259,10 @@ def is_content_equal(e1, e2):
and e1.properties == e2.properties)
def process_xml_feed_for_new_entries(session, feed, response, backfill, now):
def process_xml_feed_for_new_entries(session, feed, content, backfill, now):
logger.debug('fetching xml feed: %s', feed)
parsed = feedparser.parse(get_response_content(response))
parsed = feedparser.parse(content)
feed_props = parsed.get('feed', {})
default_author_url = feed_props.get('author_detail', {}).get('href')
default_author_name = feed_props.get('author_detail', {}).get('name')
@ -319,10 +328,9 @@ def process_xml_feed_for_new_entries(session, feed, response, backfill, now):
yield entry
def process_html_feed_for_new_entries(session, feed, response, backfill, now):
doc = get_response_content(response)
def process_html_feed_for_new_entries(session, feed, content, backfill, now):
parsed = mf2util.interpret_feed(
mf2py.parse(url=feed.feed, doc=doc), feed.feed)
mf2py.parse(url=feed.feed, doc=content), feed.feed)
hfeed = parsed.get('entries', [])
for hentry in hfeed: