use contextmanager to limit session scope
This commit is contained in:
parent
1023ccacd8
commit
70829ebd2a
1 changed files with 36 additions and 19 deletions
|
@ -1,5 +1,6 @@
|
|||
from woodwind.models import Feed, Entry
|
||||
from config import Config
|
||||
from contextlib import contextmanager
|
||||
import celery
|
||||
import celery.utils.log
|
||||
import feedparser
|
||||
|
@ -18,36 +19,52 @@ app.config_from_object('celeryconfig')
|
|||
|
||||
logger = celery.utils.log.get_task_logger(__name__)
|
||||
engine = sqlalchemy.create_engine(Config.SQLALCHEMY_DATABASE_URI)
|
||||
session = sqlalchemy.orm.Session(bind=engine)
|
||||
Session = sqlalchemy.orm.sessionmaker(bind=engine)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def session_scope():
|
||||
"""Provide a transactional scope around a series of operations."""
|
||||
session = Session()
|
||||
try:
|
||||
yield session
|
||||
session.commit()
|
||||
except:
|
||||
session.rollback()
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
@app.task
|
||||
def tick():
|
||||
now = datetime.datetime.utcnow()
|
||||
logger.debug('Tick {}'.format(now))
|
||||
for feed in session.query(Feed).all():
|
||||
logger.debug('Feed {} last checked {}'.format(
|
||||
feed, feed.last_checked))
|
||||
if (not feed.last_checked
|
||||
or now - feed.last_checked > UPDATE_INTERVAL):
|
||||
update_feed.delay(feed.id)
|
||||
with session_scope() as session:
|
||||
now = datetime.datetime.utcnow()
|
||||
logger.debug('Tick {}'.format(now))
|
||||
for feed in session.query(Feed).all():
|
||||
logger.debug('Feed {} last checked {}'.format(
|
||||
feed, feed.last_checked))
|
||||
if (not feed.last_checked
|
||||
or now - feed.last_checked > UPDATE_INTERVAL):
|
||||
update_feed.delay(feed.id)
|
||||
|
||||
|
||||
@app.task
|
||||
def update_feed(feed_id):
|
||||
feed = session.query(Feed).get(feed_id)
|
||||
logger.info('Updating {}'.format(feed))
|
||||
new_entries = process_feed_for_new_entries(feed)
|
||||
for entry in new_entries:
|
||||
logger.debug('Got new entry: {}'.format(entry))
|
||||
with session_scope() as session:
|
||||
feed = session.query(Feed).get(feed_id)
|
||||
logger.info('Updating {}'.format(feed))
|
||||
new_entries = process_feed_for_new_entries(session, feed)
|
||||
for entry in new_entries:
|
||||
logger.debug('Got new entry: {}'.format(entry))
|
||||
|
||||
|
||||
def process_feed_for_new_entries(feed):
|
||||
def process_feed_for_new_entries(session, feed):
|
||||
try:
|
||||
if feed.type == 'xml':
|
||||
result = process_xml_feed_for_new_entries(feed)
|
||||
result = process_xml_feed_for_new_entries(session, feed)
|
||||
elif feed.type == 'html':
|
||||
result = process_html_feed_for_new_entries(feed)
|
||||
result = process_html_feed_for_new_entries(session, feed)
|
||||
else:
|
||||
result = None
|
||||
return result
|
||||
|
@ -59,7 +76,7 @@ def process_feed_for_new_entries(feed):
|
|||
session.commit()
|
||||
|
||||
|
||||
def process_xml_feed_for_new_entries(feed):
|
||||
def process_xml_feed_for_new_entries(session, feed):
|
||||
logger.debug('fetching xml feed: %s', feed)
|
||||
|
||||
now = datetime.datetime.utcnow()
|
||||
|
@ -129,7 +146,7 @@ def process_xml_feed_for_new_entries(feed):
|
|||
yield entry
|
||||
|
||||
|
||||
def process_html_feed_for_new_entries(feed):
|
||||
def process_html_feed_for_new_entries(session, feed):
|
||||
logger.debug('fetching html feed: %s', feed)
|
||||
|
||||
now = datetime.datetime.utcnow()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue