use simpler RQ library instead of celery

This commit is contained in:
Kyle Mahan 2015-03-14 12:17:34 -07:00
parent 9b1b7c1202
commit e93b9e8891
7 changed files with 47 additions and 42 deletions

View file

@ -1,15 +0,0 @@
import datetime
BROKER_URL = 'redis://'
CELERY_RESULT_BACKEND = 'redis'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERYBEAT_SCHEDULE = {
'tick-every-5-minutes': {
'task': 'woodwind.tasks.tick',
'schedule': datetime.timedelta(minutes=5),
}
}
# recommended to disable if not using -- introduces a lot of complexity
CELERY_DISABLE_RATE_LIMITS = False

View file

@ -10,6 +10,19 @@ setup(name='Woodwind',
url='https://indiewebcamp.com/Woodwind',
packages=['woodwind'],
install_requires=[
'Flask', 'Flask-Login', 'Flask-Micropub', 'Flask-SQLAlchemy',
'beautifulsoup4', 'bleach', 'celery', 'feedparser', 'html5lib',
'mf2py', 'mf2util', 'redis', 'requests', 'tornado'])
'Flask',
'Flask-Login',
'Flask-Micropub',
'Flask-SQLAlchemy',
'beautifulsoup4',
'bleach',
'feedparser',
'html5lib',
'mf2py',
'mf2util',
'redis',
'requests',
'rq',
'tornado',
'uwsgi',
])

8
woodwind-dev.ini Normal file
View file

@ -0,0 +1,8 @@
[uwsgi]
master=true
processes=1
threads=1
http=:4000
module=woodwind.wsgi
attach-daemon=rqworker
import=woodwind.tasks

View file

@ -1,8 +1,9 @@
[uwsgi]
master=true
processes=4
threads=2
processes=2
threads=4
socket=/tmp/woodwind.sock
chmod-socket=666
module=woodwind.wsgi
#pidfile=/tmp/woodwind.pid
attach-daemon=rqworker
import=woodwind.tasks

View file

@ -51,5 +51,5 @@ def notify(feed_id):
current_app.logger.debug('received PuSH ping for %r', feed)
feed.last_pinged = datetime.datetime.utcnow()
db.session.commit()
tasks.update_feed.delay(feed.id)
tasks.q.enqueue(tasks.update_feed, feed.id)
return make_response('', 204)

View file

@ -1,21 +1,22 @@
from config import Config
from contextlib import contextmanager
from woodwind.models import Feed, Entry
from redis import StrictRedis
from uwsgidecorators import timer
from woodwind.models import Feed, Entry
import bs4
import celery
import celery.utils.log
import datetime
import feedparser
import json
import logging
import mf2py
import mf2util
import re
import requests
import rq
import sqlalchemy
import sqlalchemy.orm
import time
import urllib.parse
import requests
UPDATE_INTERVAL = datetime.timedelta(hours=1)
@ -23,15 +24,11 @@ TWITTER_RE = re.compile(
r'https?://(?:www\.|mobile\.)?twitter\.com/(\w+)/status(?:es)?/(\w+)')
TAG_RE = re.compile(r'</?\w+[^>]*?>')
app = celery.Celery('woodwind')
app.config_from_object('celeryconfig')
logger = celery.utils.log.get_task_logger(__name__)
logger = logging.getLogger(__name__)
engine = sqlalchemy.create_engine(Config.SQLALCHEMY_DATABASE_URI)
Session = sqlalchemy.orm.sessionmaker(bind=engine)
redis = StrictRedis()
q = rq.Queue(connection=redis)
@contextmanager
@ -48,20 +45,23 @@ def session_scope():
session.close()
@app.task
def tick():
@timer(300)
def tick(signum=None):
"""Checks all feeds to see if any of them are ready for an update.
Makes use of uWSGI timers to run every 5 minutes, without needing
a separate process to fire ticks.
"""
with session_scope() as session:
now = datetime.datetime.utcnow()
logger.debug('Tick {}'.format(now))
logger.info('Tick {}'.format(now))
for feed in session.query(Feed).all():
logger.debug('Feed {} last checked {}'.format(
feed, feed.last_checked))
if (not feed.last_checked
or now - feed.last_checked > UPDATE_INTERVAL):
update_feed.delay(feed.id)
q.enqueue(update_feed, feed.id)
@app.task
def update_feed(feed_id):
with session_scope() as session:
feed = session.query(Feed).get(feed_id)
@ -359,7 +359,6 @@ def hentry_to_entry(hentry, feed, backfill):
return entry
@app.task
def fetch_reply_context(entry_id, in_reply_to):
with session_scope() as session:
entry = session.query(Entry).get(entry_id)
@ -396,7 +395,6 @@ def proxy_url(url):
return url
def fallback_photo(url):
"""Use favatar to find an appropriate photo for any URL"""
domain = urllib.parse.urlparse(url).netloc

View file

@ -95,7 +95,7 @@ def settings():
@flask_login.login_required
def update_feed():
feed_id = flask.request.form.get('id')
tasks.update_feed.delay(feed_id)
tasks.q.enqueue(tasks.update_feed, feed_id)
return flask.redirect(flask.url_for('.feeds'))
@ -103,7 +103,7 @@ def update_feed():
@flask_login.login_required
def update_all():
for feed in flask_login.current_user.feeds:
tasks.update_feed.delay(feed.id)
tasks.q.enqueue(tasks.update_feed, feed.id)
return flask.redirect(flask.url_for('.feeds'))
@ -293,7 +293,7 @@ def add_subscription(origin, feed_url, type):
flask_login.current_user.feeds.append(feed)
db.session.commit()
# go ahead and update the fed
tasks.update_feed.delay(feed.id)
tasks.q.enqueue(update_feed, feed.id)
return feed