broken: trying to put all entries into one transaction and now getting all sorts of detached exceptions. rolling back

This commit is contained in:
Kyle Mahan 2016-03-04 01:21:31 +00:00
parent b3c3e47263
commit ddb75f5993
2 changed files with 52 additions and 32 deletions

View file

@ -36,7 +36,11 @@ def configure_logging(app):
return return
app.logger.setLevel(logging.DEBUG) app.logger.setLevel(logging.DEBUG)
app.logger.addHandler(logging.StreamHandler(sys.stdout))
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
app.logger.addHandler(handler)
recipients = app.config.get('ADMIN_EMAILS') recipients = app.config.get('ADMIN_EMAILS')
if recipients: if recipients:

View file

@ -4,6 +4,7 @@ from redis import StrictRedis
from woodwind import util from woodwind import util
from woodwind.extensions import db from woodwind.extensions import db
from woodwind.models import Feed, Entry from woodwind.models import Feed, Entry
import sqlalchemy
import bs4 import bs4
import datetime import datetime
import feedparser import feedparser
@ -112,12 +113,12 @@ def update_feed(feed_id, content=None,
with flask_app() as app: with flask_app() as app:
feed = Feed.query.get(feed_id) feed = Feed.query.get(feed_id)
current_app.logger.info('Updating {}'.format(feed)) current_app.logger.info('Updating {}'.format(str(feed)[:32]))
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
new_ids = [] new_entries = []
updated_ids = [] updated_entries = []
reply_pairs = [] reply_pairs = []
try: try:
@ -125,7 +126,7 @@ def update_feed(feed_id, content=None,
current_app.logger.info('using provided content. size=%d', current_app.logger.info('using provided content. size=%d',
len(content)) len(content))
else: else:
current_app.logger.info('fetching feed: %s', feed) current_app.logger.info('fetching feed: %s', str(feed)[:32])
try: try:
response = util.requests_get(feed.feed) response = util.requests_get(feed.feed)
@ -163,24 +164,30 @@ def update_feed(feed_id, content=None,
result = [] result = []
for entry in result: for entry in result:
current_app.logger.debug('searching for entry with uid=%s', entry.uid)
old = Entry.query\ old = Entry.query\
.filter(Entry.feed == feed)\ .filter(Entry.feed == feed)\
.filter(Entry.uid == entry.uid)\ .filter(Entry.uid == entry.uid)\
.order_by(Entry.id.desc())\ .order_by(Entry.id.desc())\
.first() .first()
current_app.logger.debug('done searcing: %s', 'found' if old else 'not found')
# have we seen this post before # have we seen this post before
if not old: if not old:
current_app.logger.debug('this is a new post, saving a new entry')
# set a default value for published if none is provided # set a default value for published if none is provided
entry.published = entry.published or now entry.published = entry.published or now
in_reply_tos = entry.get_property('in-reply-to', []) in_reply_tos = entry.get_property('in-reply-to', [])
db.session.add(entry)
feed.entries.append(entry) feed.entries.append(entry)
db.session.commit()
new_ids.append(entry.id) new_entries.append(entry)
for irt in in_reply_tos: for irt in in_reply_tos:
reply_pairs.append((entry.id, irt)) reply_pairs.append((entry, irt))
elif not is_content_equal(old, entry): elif not is_content_equal(old, entry):
current_app.logger.debug('this post content has changed, updating entry')
entry.published = entry.published or old.published entry.published = entry.published or old.published
in_reply_tos = entry.get_property('in-reply-to', []) in_reply_tos = entry.get_property('in-reply-to', [])
# we're updating an old entriy, use the original # we're updating an old entriy, use the original
@ -190,28 +197,38 @@ def update_feed(feed_id, content=None,
# punt on deleting for now, learn about cascade # punt on deleting for now, learn about cascade
# and stuff later # and stuff later
# session.delete(old) # session.delete(old)
db.session.add(entry)
feed.entries.append(entry) feed.entries.append(entry)
db.session.commit()
updated_ids.append(entry.id) updated_entries.append(entry)
for irt in in_reply_tos: for irt in in_reply_tos:
reply_pairs.append((entry.id, irt)) reply_pairs.append((entry, irt))
else: else:
current_app.logger.debug( current_app.logger.debug(
'skipping previously seen post %s', old.permalink) 'skipping previously seen post %s', old.permalink)
for entry_id, in_reply_to in reply_pairs: for entry, in_reply_to in reply_pairs:
fetch_reply_context(entry_id, in_reply_to, now) fetch_reply_context(entry, in_reply_to, now)
db.session.commit()
except:
db.session.rollback()
raise
finally: finally:
if is_polling: if is_polling:
feed.last_checked = now feed.last_checked = now
if new_ids or updated_ids: if new_entries or updated_entries:
feed.last_updated = now feed.last_updated = now
db.session.commit() db.session.commit()
if new_ids:
notify_feed_updated(app, feed_id, new_ids) if new_entries:
for e in new_entries:
current_app.logger.debug('entry %s state: %s', e.uid, sqlalchemy.inspect(e))
notify_feed_updated(app, feed_id, new_entries)
def check_push_subscription(feed, response): def check_push_subscription(feed, response):
@ -267,6 +284,8 @@ def check_push_subscription(feed, response):
if ((expiry and expiry - datetime.datetime.utcnow() if ((expiry and expiry - datetime.datetime.utcnow()
<= UPDATE_INTERVAL_PUSH) <= UPDATE_INTERVAL_PUSH)
or hub != old_hub or topic != old_topic or not feed.push_verified): or hub != old_hub or topic != old_topic or not feed.push_verified):
current_app.logger.debug('push subscription expired or hub/topic changed')
feed.push_hub = hub feed.push_hub = hub
feed.push_topic = topic feed.push_topic = topic
feed.push_verified = False feed.push_verified = False
@ -274,29 +293,24 @@ def check_push_subscription(feed, response):
db.session.commit() db.session.commit()
if old_hub and old_topic and hub != old_hub and topic != old_topic: if old_hub and old_topic and hub != old_hub and topic != old_topic:
current_app.logger.debug('unsubscribing hub=%s, topic=%s', old_hub, old_topic)
send_request('unsubscribe', old_hub, old_topic) send_request('unsubscribe', old_hub, old_topic)
if hub and topic: if hub and topic:
current_app.logger.debug('subscribing hub=%s, topic=%s', hub, topic)
send_request('subscribe', hub, topic) send_request('subscribe', hub, topic)
db.session.commit() db.session.commit()
def notify_feed_updated(app, feed_id, entry_ids): def notify_feed_updated(app, feed_id, entries):
"""Render the new entries and publish them to redis """Render the new entries and publish them to redis
""" """
from flask import render_template from flask import render_template
import flask.ext.login as flask_login import flask.ext.login as flask_login
current_app.logger.debug( current_app.logger.debug('notifying feed updated: %s', feed_id)
'notifying feed updated for entries %r', entry_ids)
feed = Feed.query.get(feed_id) feed = Feed.query.get(feed_id)
entries = Entry.query\
.filter(Entry.id.in_(entry_ids))\
.order_by(Entry.retrieved.desc(),
Entry.published.desc())\
.all()
for s in feed.subscriptions: for s in feed.subscriptions:
with app.test_request_context(): with app.test_request_context():
flask_login.login_user(s.user, remember=True) flask_login.login_user(s.user, remember=True)
@ -345,7 +359,7 @@ def is_content_equal(e1, e2):
def process_xml_feed_for_new_entries(feed, content, backfill, now): def process_xml_feed_for_new_entries(feed, content, backfill, now):
current_app.logger.debug('fetching xml feed: %s', feed) current_app.logger.debug('fetching xml feed: %s', str(feed)[:32])
parsed = feedparser.parse(content, response_headers={ parsed = feedparser.parse(content, response_headers={
'content-location': feed.feed, 'content-location': feed.feed,
}) })
@ -354,12 +368,11 @@ def process_xml_feed_for_new_entries(feed, content, backfill, now):
default_author_name = feed_props.get('author_detail', {}).get('name') default_author_name = feed_props.get('author_detail', {}).get('name')
default_author_photo = feed_props.get('logo') default_author_photo = feed_props.get('logo')
current_app.logger.debug('found {} entries'.format(len(parsed.entries))) current_app.logger.debug('found %d entries', len(parsed.entries))
# work from the bottom up (oldest first, usually) # work from the bottom up (oldest first, usually)
for p_entry in reversed(parsed.entries): for p_entry in reversed(parsed.entries):
current_app.logger.debug('processing entry {}'.format( current_app.logger.debug('processing entry %s', str(p_entry)[:32])
str(p_entry)[:256]))
permalink = p_entry.get('link') permalink = p_entry.get('link')
uid = p_entry.get('id') or permalink uid = p_entry.get('id') or permalink
@ -406,6 +419,8 @@ def process_xml_feed_for_new_entries(feed, content, backfill, now):
video = VIDEO_ENCLOSURE_TMPL.format(href=link.get('href')) video = VIDEO_ENCLOSURE_TMPL.format(href=link.get('href'))
content = (content or '') + video content = (content or '') + video
current_app.logger.debug('building entry')
entry = Entry( entry = Entry(
published=published, published=published,
updated=updated, updated=updated,
@ -422,6 +437,8 @@ def process_xml_feed_for_new_entries(feed, content, backfill, now):
author_photo=default_author_photo author_photo=default_author_photo
or fallback_photo(feed.origin)) or fallback_photo(feed.origin))
current_app.logger.debug('yielding entry')
yield entry yield entry
@ -527,9 +544,8 @@ def hentry_to_entry(hentry, feed, backfill, now):
return entry return entry
def fetch_reply_context(entry_id, in_reply_to, now): def fetch_reply_context(entry, in_reply_to, now):
with flask_app(): with flask_app():
entry = Entry.query.get(entry_id)
context = Entry.query\ context = Entry.query\
.join(Entry.feed)\ .join(Entry.feed)\
.filter(Entry.permalink==in_reply_to, Feed.type == 'html')\ .filter(Entry.permalink==in_reply_to, Feed.type == 'html')\
@ -542,10 +558,10 @@ def fetch_reply_context(entry_id, in_reply_to, now):
mf2py.parse(url=proxy_url(in_reply_to)), in_reply_to) mf2py.parse(url=proxy_url(in_reply_to)), in_reply_to)
if parsed: if parsed:
context = hentry_to_entry(parsed, None, False, now) context = hentry_to_entry(parsed, None, False, now)
db.session.add(context)
if context: if context:
entry.reply_context.append(context) entry.reply_context.append(context)
db.session.commit()
def proxy_url(url): def proxy_url(url):