canonicalize URLs, split jobs into high and low priority
This commit is contained in:
parent
127035e328
commit
6aebe0287d
6 changed files with 101 additions and 15 deletions
60
migrations/2015-03-26-normalize-urls.py
Normal file
60
migrations/2015-03-26-normalize-urls.py
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
from config import Config
|
||||||
|
from woodwind import util
|
||||||
|
from woodwind.models import Feed
|
||||||
|
import requests
|
||||||
|
import sqlalchemy
|
||||||
|
import sqlalchemy.orm
|
||||||
|
import sys
|
||||||
|
|
||||||
|
engine = sqlalchemy.create_engine(Config.SQLALCHEMY_DATABASE_URI)
|
||||||
|
Session = sqlalchemy.orm.sessionmaker(bind=engine)
|
||||||
|
|
||||||
|
def follow_redirects():
|
||||||
|
try:
|
||||||
|
session = Session()
|
||||||
|
feeds = session.query(Feed).all()
|
||||||
|
for feed in feeds:
|
||||||
|
print('fetching', feed.feed)
|
||||||
|
try:
|
||||||
|
r = requests.head(feed.feed, allow_redirects=True)
|
||||||
|
if feed.feed != r.url:
|
||||||
|
print('urls differ', feed.feed, r.url)
|
||||||
|
feed.feed = r.url
|
||||||
|
except:
|
||||||
|
print('error', sys.exc_info()[0])
|
||||||
|
session.commit()
|
||||||
|
except:
|
||||||
|
session.rollback()
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
def dedupe():
|
||||||
|
try:
|
||||||
|
session = Session()
|
||||||
|
feeds = session.query(Feed).order_by(Feed.id).all()
|
||||||
|
|
||||||
|
removed = set()
|
||||||
|
|
||||||
|
for f1 in feeds:
|
||||||
|
if f1.id in removed:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for f2 in feeds:
|
||||||
|
if f2.id in removed:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if f1.id < f2.id and f1.feed == f2.feed:
|
||||||
|
print('dedupe', f1.feed, f1.id, f2.id)
|
||||||
|
f1.users += f2.users
|
||||||
|
f2.users.clear()
|
||||||
|
removed.add(f2.id)
|
||||||
|
session.delete(f2)
|
||||||
|
session.commit()
|
||||||
|
except:
|
||||||
|
session.rollback()
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
dedupe()
|
|
@ -5,6 +5,6 @@ socket=/tmp/woodwind.sock
|
||||||
chmod-socket=666
|
chmod-socket=666
|
||||||
module=woodwind.wsgi
|
module=woodwind.wsgi
|
||||||
import=timers
|
import=timers
|
||||||
attach-daemon=rqworker
|
attach-daemon=rqworker high low
|
||||||
attach-daemon=python -m woodwind.websocket_server
|
attach-daemon=python -m woodwind.websocket_server
|
||||||
py-autoreload=3
|
py-autoreload=3
|
||||||
|
|
13
woodwind.ini
13
woodwind.ini
|
@ -1,10 +1,17 @@
|
||||||
[uwsgi]
|
[uwsgi]
|
||||||
master=true
|
master=true
|
||||||
processes=2
|
|
||||||
threads=4
|
threads=2
|
||||||
|
cheaper-algo=spare
|
||||||
|
cheaper=2
|
||||||
|
cheaper-initial=2
|
||||||
|
workers=10
|
||||||
|
|
||||||
socket=/tmp/woodwind.sock
|
socket=/tmp/woodwind.sock
|
||||||
chmod-socket=666
|
chmod-socket=666
|
||||||
module=woodwind.wsgi
|
module=woodwind.wsgi
|
||||||
import=timers
|
import=timers
|
||||||
attach-daemon=venv/bin/rqworker
|
|
||||||
|
attach-daemon=venv/bin/rqworker high
|
||||||
|
attach-daemon=venv/bin/rqworker high low
|
||||||
attach-daemon=venv/bin/python -m woodwind.websocket_server
|
attach-daemon=venv/bin/python -m woodwind.websocket_server
|
||||||
|
|
|
@ -13,10 +13,6 @@ def notify(feed_id):
|
||||||
current_app.logger.debug(
|
current_app.logger.debug(
|
||||||
'received PuSH notification for feed id %d', feed_id)
|
'received PuSH notification for feed id %d', feed_id)
|
||||||
feed = Feed.query.get(feed_id)
|
feed = Feed.query.get(feed_id)
|
||||||
if not feed:
|
|
||||||
current_app.logger.debug(
|
|
||||||
'could not find feed corresponding to %d', feed_id)
|
|
||||||
abort(404)
|
|
||||||
|
|
||||||
current_app.logger.debug('processing PuSH notification for feed %r', feed)
|
current_app.logger.debug('processing PuSH notification for feed %r', feed)
|
||||||
if request.method == 'GET':
|
if request.method == 'GET':
|
||||||
|
@ -30,7 +26,18 @@ def notify(feed_id):
|
||||||
'challenge=%s, lease_seconds=%s',
|
'challenge=%s, lease_seconds=%s',
|
||||||
feed, mode, topic, challenge, lease_seconds)
|
feed, mode, topic, challenge, lease_seconds)
|
||||||
|
|
||||||
if mode == 'subscribe' and topic == feed.push_topic:
|
if mode == 'subscribe':
|
||||||
|
if not feed:
|
||||||
|
current_app.logger.warn(
|
||||||
|
'could not find feed corresponding to %d', feed_id)
|
||||||
|
abort(404)
|
||||||
|
|
||||||
|
if topic != feed.push_topic:
|
||||||
|
current_app.logger.warn(
|
||||||
|
'feed topic (%s) does not match subscription request (%s)',
|
||||||
|
feed.push_topic, topic)
|
||||||
|
abort(404)
|
||||||
|
|
||||||
current_app.logger.debug(
|
current_app.logger.debug(
|
||||||
'PuSH verify subscribe for feed=%r, topic=%s', feed, topic)
|
'PuSH verify subscribe for feed=%r, topic=%s', feed, topic)
|
||||||
feed.push_verified = True
|
feed.push_verified = True
|
||||||
|
@ -39,7 +46,8 @@ def notify(feed_id):
|
||||||
+ datetime.timedelta(seconds=int(lease_seconds))
|
+ datetime.timedelta(seconds=int(lease_seconds))
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return challenge
|
return challenge
|
||||||
elif mode == 'unsubscribe' and topic != feed.push_topic:
|
|
||||||
|
elif mode == 'unsubscribe' and (not feed or topic != feed.push_topic):
|
||||||
current_app.logger.debug(
|
current_app.logger.debug(
|
||||||
'PuSH verify unsubscribe for feed=%r, topic=%s', feed, topic)
|
'PuSH verify unsubscribe for feed=%r, topic=%s', feed, topic)
|
||||||
return challenge
|
return challenge
|
||||||
|
@ -47,10 +55,15 @@ def notify(feed_id):
|
||||||
mode, feed, topic)
|
mode, feed, topic)
|
||||||
abort(404)
|
abort(404)
|
||||||
|
|
||||||
|
if not feed:
|
||||||
|
current_app.logger.warn(
|
||||||
|
'could not find feed corresponding to %d', feed_id)
|
||||||
|
abort(404)
|
||||||
|
|
||||||
# could it be? an actual push notification!?
|
# could it be? an actual push notification!?
|
||||||
current_app.logger.debug(
|
current_app.logger.debug(
|
||||||
'received PuSH ping for %r; content size: %d', feed, len(request.data))
|
'received PuSH ping for %r; content size: %d', feed, len(request.data))
|
||||||
feed.last_pinged = datetime.datetime.utcnow()
|
feed.last_pinged = datetime.datetime.utcnow()
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
tasks.q.enqueue(tasks.update_feed, feed.id)
|
tasks.q_high.enqueue(tasks.update_feed, feed.id)
|
||||||
return make_response('', 204)
|
return make_response('', 204)
|
||||||
|
|
|
@ -20,7 +20,11 @@ import time
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
|
|
||||||
|
|
||||||
|
# normal update interval for polling feeds
|
||||||
UPDATE_INTERVAL = datetime.timedelta(hours=1)
|
UPDATE_INTERVAL = datetime.timedelta(hours=1)
|
||||||
|
# update interval when polling feeds that are push verified
|
||||||
|
UPDATE_INTERVAL_PUSH = datetime.timedelta(days=1)
|
||||||
|
|
||||||
TWITTER_RE = re.compile(
|
TWITTER_RE = re.compile(
|
||||||
r'https?://(?:www\.|mobile\.)?twitter\.com/(\w+)/status(?:es)?/(\w+)')
|
r'https?://(?:www\.|mobile\.)?twitter\.com/(\w+)/status(?:es)?/(\w+)')
|
||||||
TAG_RE = re.compile(r'</?\w+[^>]*?>')
|
TAG_RE = re.compile(r'</?\w+[^>]*?>')
|
||||||
|
@ -29,11 +33,12 @@ COMMENT_RE = re.compile(r'<!--[^>]*?-->')
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
logger.setLevel(logging.DEBUG)
|
logger.setLevel(logging.DEBUG)
|
||||||
logger.addHandler(logging.StreamHandler(sys.stdout))
|
logger.addHandler(logging.StreamHandler(sys.stdout))
|
||||||
|
|
||||||
engine = sqlalchemy.create_engine(Config.SQLALCHEMY_DATABASE_URI)
|
engine = sqlalchemy.create_engine(Config.SQLALCHEMY_DATABASE_URI)
|
||||||
Session = sqlalchemy.orm.sessionmaker(bind=engine)
|
Session = sqlalchemy.orm.sessionmaker(bind=engine)
|
||||||
redis = StrictRedis()
|
redis = StrictRedis()
|
||||||
q = rq.Queue(connection=redis)
|
|
||||||
|
q_high = rq.Queue('high', connection=redis)
|
||||||
|
q = rq.Queue('low', connection=redis)
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
|
@ -62,7 +67,8 @@ def tick():
|
||||||
logger.debug('Feed {} last checked {}'.format(
|
logger.debug('Feed {} last checked {}'.format(
|
||||||
feed, feed.last_checked))
|
feed, feed.last_checked))
|
||||||
if (not feed.last_checked
|
if (not feed.last_checked
|
||||||
or now - feed.last_checked > UPDATE_INTERVAL):
|
or (not feed.push_verified and now - feed.last_checked > UPDATE_INTERVAL)
|
||||||
|
or (feed.push_verified and now - feed.last_checked > UPDATE_INTERVAL_PUSH)):
|
||||||
q.enqueue(update_feed, feed.id)
|
q.enqueue(update_feed, feed.id)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -337,7 +337,7 @@ def find_possible_feeds(origin):
|
||||||
if hfeed.get('entries'):
|
if hfeed.get('entries'):
|
||||||
feeds.append({
|
feeds.append({
|
||||||
'origin': origin,
|
'origin': origin,
|
||||||
'feed': origin,
|
'feed': resp.url,
|
||||||
'type': 'html',
|
'type': 'html',
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue