semi-big refactoring, use app context in async tasks

This commit is contained in:
Kyle Mahan 2015-04-21 23:19:59 -07:00
parent b1c9116995
commit 336af3f011
2 changed files with 105 additions and 113 deletions

View file

@ -6,7 +6,7 @@ import flask
import logging import logging
import sys import sys
MAIL_FORMAT = '''\ MAIL_FORMAT = '''\
Message type: %(levelname)s Message type: %(levelname)s
Location: %(pathname)s:%(lineno)d Location: %(pathname)s:%(lineno)d
@ -24,27 +24,25 @@ def create_app(config_path='../woodwind.cfg'):
app = flask.Flask('woodwind') app = flask.Flask('woodwind')
app.config.from_pyfile(config_path) app.config.from_pyfile(config_path)
configure_logging(app) configure_logging(app)
extensions.init_app(app) extensions.init_app(app)
app.register_blueprint(views) app.register_blueprint(views)
app.register_blueprint(api) app.register_blueprint(api)
app.register_blueprint(push) app.register_blueprint(push)
return app return app
def configure_logging(app): def configure_logging(app):
if app.debug: if app.debug:
return return
app.logger.setLevel(logging.DEBUG) app.logger.setLevel(logging.DEBUG)
app.logger.addHandler(logging.StreamHandler(sys.stdout)) app.logger.addHandler(logging.StreamHandler(sys.stdout))
recipients = app.config.get('ADMIN_EMAILS') recipients = app.config.get('ADMIN_EMAILS')
if recipients: if recipients:
error_handler = logging.handlers.SMTPHandler( error_handler = logging.handlers.SMTPHandler(
'localhost', 'Woodwind <woodwind@kylewm.com>', 'localhost', 'Woodwind <woodwind@kylewm.com>',
recipients, 'woodwind error') recipients, 'woodwind error')
error_handler.setLevel(logging.ERROR) error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(logging.Formatter(MAIL_FORMAT)) error_handler.setFormatter(logging.Formatter(MAIL_FORMAT))
app.logger.addHandler(error_handler) app.logger.addHandler(error_handler)

View file

@ -1,29 +1,22 @@
from contextlib import contextmanager from contextlib import contextmanager
from flask import Config as FlaskConfig
from logging import StreamHandler
from redis import StrictRedis from redis import StrictRedis
from woodwind import util from woodwind import util
from woodwind.models import Feed, Entry from woodwind.models import Feed, Entry
from woodwind.extensions import db
from flask import current_app
import bs4 import bs4
import datetime import datetime
import feedparser import feedparser
import itertools
import json import json
import logging
import mf2py import mf2py
import mf2util import mf2util
import os.path
import re import re
import requests import requests
import rq import rq
import sqlalchemy
import sqlalchemy.orm
import sys
import time import time
import urllib.parse import urllib.parse
config = FlaskConfig(os.path.dirname(os.path.realpath(__file__)))
config.from_pyfile('../woodwind.cfg')
# normal update interval for polling feeds # normal update interval for polling feeds
UPDATE_INTERVAL = datetime.timedelta(hours=1) UPDATE_INTERVAL = datetime.timedelta(hours=1)
# update interval when polling feeds that are push verified # update interval when polling feeds that are push verified
@ -39,35 +32,22 @@ AUDIO_ENCLOSURE_TMPL = '<p><audio class="u-audio" src="{href}" controls '\
VIDEO_ENCLOSURE_TMPL = '<p><video class="u-video" src="{href}" controls '\ VIDEO_ENCLOSURE_TMPL = '<p><video class="u-video" src="{href}" controls '\
'preload=none ><a href="{href}">video</a></video></p>' 'preload=none ><a href="{href}">video</a></video></p>'
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
stream_handler = StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
engine = sqlalchemy.create_engine(config['SQLALCHEMY_DATABASE_URI'])
Session = sqlalchemy.orm.sessionmaker(bind=engine)
redis = StrictRedis() redis = StrictRedis()
q_high = rq.Queue('high', connection=redis) q_high = rq.Queue('high', connection=redis)
q = rq.Queue('low', connection=redis) q = rq.Queue('low', connection=redis)
_app = None
@contextmanager @contextmanager
def session_scope(): def flask_app():
"""Provide a transactional scope around a series of operations.""" global _app
session = Session() if _app is None:
try: from woodwind import create_app
yield session _app = create_app()
session.commit() with _app.app_context():
except: yield _app
session.rollback()
raise
finally:
session.close()
def tick(): def tick():
@ -75,11 +55,11 @@ def tick():
Makes use of uWSGI timers to run every 5 minutes, without needing Makes use of uWSGI timers to run every 5 minutes, without needing
a separate process to fire ticks. a separate process to fire ticks.
""" """
with session_scope() as session: with flask_app():
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
logger.info('Tick {}'.format(now)) current_app.logger.info('Tick {}'.format(now))
for feed in session.query(Feed).all(): for feed in Feed.query.all():
logger.debug('Feed {} last checked {}'.format( current_app.logger.debug('Feed {} last checked {}'.format(
feed, feed.last_checked)) feed, feed.last_checked))
if (not feed.last_checked if (not feed.last_checked
or (not feed.push_verified or (not feed.push_verified
@ -90,88 +70,97 @@ def tick():
def update_feed(feed_id, content=None, is_polling=True): def update_feed(feed_id, content=None, is_polling=True):
with session_scope() as session: with flask_app() as app:
feed = session.query(Feed).get(feed_id) feed = Feed.query.get(feed_id)
logger.info('Updating {}'.format(feed)) current_app.logger.info('Updating {}'.format(feed))
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
new_entries = []
updated_entries = [] new_ids = []
updated_ids = []
reply_pairs = []
try: try:
if content: if content:
logger.info('using provided content. size=%d', len(content)) current_app.logger.info('using provided content. size=%d',
len(content))
else: else:
logger.info('fetching feed: %s', feed) current_app.logger.info('fetching feed: %s', feed)
response = requests.get(feed.feed) response = requests.get(feed.feed)
if response.status_code // 100 != 2: if response.status_code // 100 != 2:
logger.warn('bad response from %s. %r: %r', feed.feed, current_app.logger.warn('bad response from %s. %r: %r',
response, response.text) feed.feed, response, response.text)
return return
if is_polling: if is_polling:
check_push_subscription(session, feed, response) check_push_subscription(feed, response)
content = get_response_content(response) content = get_response_content(response)
# backfill if this is the first pull # backfill if this is the first pull
backfill = len(feed.entries) == 0 backfill = len(feed.entries) == 0
if feed.type == 'xml': if feed.type == 'xml':
result = process_xml_feed_for_new_entries( result = process_xml_feed_for_new_entries(
session, feed, content, backfill, now) feed, content, backfill, now)
elif feed.type == 'html': elif feed.type == 'html':
result = process_html_feed_for_new_entries( result = process_html_feed_for_new_entries(
session, feed, content, backfill, now) feed, content, backfill, now)
else: else:
result = [] result = []
for entry in result: for entry in result:
old = session.query(Entry)\ old = Entry.query\
.filter(Entry.feed == feed)\ .filter(Entry.feed == feed)\
.filter(Entry.uid == entry.uid)\ .filter(Entry.uid == entry.uid)\
.order_by(Entry.id.desc())\ .order_by(Entry.id.desc())\
.first() .first()
# have we seen this post before # have we seen this post before
if not old or not is_content_equal(old, entry): if not old:
# set a default value for published if none is provided # set a default value for published if none is provided
if not entry.published: entry.published = entry.published or now
entry.published = (old and old.published) or now new_ids.append(entry.id)
if old: for irt in entry.get_property('in-reply-to', []):
# if we're updating an old entriy, use the original reply_pairs.append((entry.id, irt))
# retrieved time
entry.retrieved = old.retrieved
feed.entries.remove(old)
# punt on deleting for now, learn about cascade
# and stuff later
# session.delete(old)
feed.entries.append(entry) feed.entries.append(entry)
session.commit() db.session.commit()
(updated_entries if old else new_entries).append(entry) elif not is_content_equal(old, entry):
entry.published = entry.published or old.published
# we're updating an old entriy, use the original
# retrieved time
entry.retrieved = old.retrieved
feed.entries.remove(old)
# punt on deleting for now, learn about cascade
# and stuff later
# session.delete(old)
updated_ids.append(entry.id)
for irt in entry.get_property('in-reply-to', []):
reply_pairs.append((entry.id, irt))
feed.entries.append(entry)
db.session.commit()
else: else:
logger.debug( current_app.logger.debug(
'skipping previously seen post %s', old.permalink) 'skipping previously seen post %s', old.permalink)
for entry in new_entries: for entry_id, in_reply_to in reply_pairs:
for in_reply_to in entry.get_property('in-reply-to', []): fetch_reply_context(entry_id, in_reply_to, now)
fetch_reply_context(entry.id, in_reply_to, now)
finally: finally:
if is_polling: if is_polling:
feed.last_checked = now feed.last_checked = now
if new_entries or updated_entries: if new_ids or updated_ids:
feed.last_updated = now feed.last_updated = now
session.commit() db.session.commit()
if new_entries: if new_ids:
notify_feed_updated(session, feed, new_entries) notify_feed_updated(app, feed, new_ids)
def check_push_subscription(session, feed, response): def check_push_subscription(feed, response):
def build_callback_url(): def build_callback_url():
return '{}://{}/_notify/{}'.format( return '{}://{}/_notify/{}'.format(
getattr(config, 'PREFERRED_URL_SCHEME', 'http'), getattr(current_app.config, 'PREFERRED_URL_SCHEME', 'http'),
config['SERVER_NAME'], current_app.config['SERVER_NAME'],
feed.id) feed.id)
def send_request(mode, hub, topic): def send_request(mode, hub, topic):
logger.debug( current_app.logger.debug(
'sending %s request for hub=%r, topic=%r', mode, hub, topic) 'sending %s request for hub=%r, topic=%r', mode, hub, topic)
r = requests.post(hub, data={ r = requests.post(hub, data={
'hub.mode': mode, 'hub.mode': mode,
@ -182,7 +171,7 @@ def check_push_subscription(session, feed, response):
# TODO secret should only be used over HTTPS # TODO secret should only be used over HTTPS
# 'hub.secret': secret, # 'hub.secret': secret,
}) })
logger.debug('%s response %r', mode, r) current_app.logger.debug('%s response %r', mode, r)
expiry = feed.push_expiry expiry = feed.push_expiry
old_hub = feed.push_hub old_hub = feed.push_hub
@ -190,8 +179,8 @@ def check_push_subscription(session, feed, response):
hub = response.links.get('hub', {}).get('url') hub = response.links.get('hub', {}).get('url')
topic = response.links.get('self', {}).get('url') topic = response.links.get('self', {}).get('url')
logger.debug('link headers. links=%s, hub=%s, topic=%s', current_app.logger.debug('link headers. links=%s, hub=%s, topic=%s',
response.links, hub, topic) response.links, hub, topic)
if not hub or not topic: if not hub or not topic:
# try to find link rel elements # try to find link rel elements
if feed.type == 'html': if feed.type == 'html':
@ -220,7 +209,7 @@ def check_push_subscription(session, feed, response):
feed.push_topic = topic feed.push_topic = topic
feed.push_verified = False feed.push_verified = False
feed.push_expiry = None feed.push_expiry = None
session.commit() db.session.commit()
if old_hub and old_topic and hub != old_hub and topic != old_topic: if old_hub and old_topic and hub != old_hub and topic != old_topic:
send_request('unsubscribe', old_hub, old_topic) send_request('unsubscribe', old_hub, old_topic)
@ -228,22 +217,23 @@ def check_push_subscription(session, feed, response):
if hub and topic: if hub and topic:
send_request('subscribe', hub, topic) send_request('subscribe', hub, topic)
session.commit() db.session.commit()
def notify_feed_updated(session, feed, entries): def notify_feed_updated(app, feed, entry_ids):
"""Render the new entries and publish them to redis """Render the new entries and publish them to redis
""" """
from . import create_app
from flask import render_template from flask import render_template
import flask.ext.login as flask_login import flask.ext.login as flask_login
flask_app = create_app()
entries = sorted(entries, key=lambda e: (e.retrieved, e.published), entries = Entry.query\
reverse=True) .filter(Entry.id.in_(entry_ids))\
.order_by(Entry.retrieved.desc(),
Entry.published.desc())\
.all()
for s in feed.subscriptions: for s in feed.subscriptions:
with flask_app.test_request_context(): with app.test_request_context():
flask_login.login_user(s.user, remember=True) flask_login.login_user(s.user, remember=True)
rendered = [] rendered = []
for e in entries: for e in entries:
@ -284,8 +274,8 @@ def is_content_equal(e1, e2):
and e1.properties == e2.properties) and e1.properties == e2.properties)
def process_xml_feed_for_new_entries(session, feed, content, backfill, now): def process_xml_feed_for_new_entries(feed, content, backfill, now):
logger.debug('fetching xml feed: %s', feed) current_app.logger.debug('fetching xml feed: %s', feed)
parsed = feedparser.parse(content) parsed = feedparser.parse(content)
feed_props = parsed.get('feed', {}) feed_props = parsed.get('feed', {})
@ -293,11 +283,12 @@ def process_xml_feed_for_new_entries(session, feed, content, backfill, now):
default_author_name = feed_props.get('author_detail', {}).get('name') default_author_name = feed_props.get('author_detail', {}).get('name')
default_author_photo = feed_props.get('logo') default_author_photo = feed_props.get('logo')
logger.debug('found {} entries'.format(len(parsed.entries))) current_app.logger.debug('found {} entries'.format(len(parsed.entries)))
# work from the bottom up (oldest first, usually) # work from the bottom up (oldest first, usually)
for p_entry in reversed(parsed.entries): for p_entry in reversed(parsed.entries):
logger.debug('processing entry {}'.format(str(p_entry)[:256])) current_app.logger.debug('processing entry {}'.format(
str(p_entry)[:256]))
permalink = p_entry.get('link') permalink = p_entry.get('link')
uid = p_entry.get('id') or permalink uid = p_entry.get('id') or permalink
@ -363,7 +354,7 @@ def process_xml_feed_for_new_entries(session, feed, content, backfill, now):
yield entry yield entry
def process_html_feed_for_new_entries(session, feed, content, backfill, now): def process_html_feed_for_new_entries(feed, content, backfill, now):
parsed = mf2util.interpret_feed( parsed = mf2util.interpret_feed(
mf2py.parse(url=feed.feed, doc=content), feed.feed) mf2py.parse(url=feed.feed, doc=content), feed.feed)
hfeed = parsed.get('entries', []) hfeed = parsed.get('entries', [])
@ -371,7 +362,7 @@ def process_html_feed_for_new_entries(session, feed, content, backfill, now):
for hentry in hfeed: for hentry in hfeed:
entry = hentry_to_entry(hentry, feed, backfill, now) entry = hentry_to_entry(hentry, feed, backfill, now)
if entry: if entry:
logger.debug('built entry: %s', entry.permalink) current_app.logger.debug('built entry: %s', entry.permalink)
yield entry yield entry
@ -422,13 +413,13 @@ def hentry_to_entry(hentry, feed, backfill, now):
def fetch_reply_context(entry_id, in_reply_to, now): def fetch_reply_context(entry_id, in_reply_to, now):
with session_scope() as session: with flask_app():
entry = session.query(Entry).get(entry_id) entry = Entry.query.get(entry_id)
context = session.query(Entry)\ context = Entry.query.filter_by(permalink=in_reply_to).first()
.filter_by(permalink=in_reply_to).first()
if not context: if not context:
logger.info('fetching in-reply-to url: %s', in_reply_to) current_app.logger.info('fetching in-reply-to url: %s',
in_reply_to)
parsed = mf2util.interpret( parsed = mf2util.interpret(
mf2py.parse(url=proxy_url(in_reply_to)), in_reply_to) mf2py.parse(url=proxy_url(in_reply_to)), in_reply_to)
if parsed: if parsed:
@ -436,11 +427,12 @@ def fetch_reply_context(entry_id, in_reply_to, now):
if context: if context:
entry.reply_context.append(context) entry.reply_context.append(context)
session.commit() db.session.commit()
def proxy_url(url): def proxy_url(url):
if config['TWITTER_AU_KEY'] and config['TWITTER_AU_SECRET']: if ('TWITTER_AU_KEY' in current_app.config
and 'TWITTER_AU_SECRET' in current_app.config):
# swap out the a-u url for twitter urls # swap out the a-u url for twitter urls
match = TWITTER_RE.match(url) match = TWITTER_RE.match(url)
if match: if match:
@ -448,10 +440,12 @@ def proxy_url(url):
'https://twitter-activitystreams.appspot.com/@me/@all/@app/{}?' 'https://twitter-activitystreams.appspot.com/@me/@all/@app/{}?'
.format(match.group(2)) + urllib.parse.urlencode({ .format(match.group(2)) + urllib.parse.urlencode({
'format': 'html', 'format': 'html',
'access_token_key': config['TWITTER_AU_KEY'], 'access_token_key':
'access_token_secret': config['TWITTER_AU_SECRET'], current_app.config['TWITTER_AU_KEY'],
'access_token_secret':
current_app.config['TWITTER_AU_SECRET'],
})) }))
logger.debug('proxied twitter url %s', proxy_url) current_app.logger.debug('proxied twitter url %s', proxy_url)
return proxy_url return proxy_url
return url return url