mirror of
https://github.com/viq/NewsBlur.git
synced 2025-08-05 16:49:45 +00:00

* master: Fixing hub url change subscription. Adding failed push feeds to munin. Showing correctly real-time status if feed is push. Showing correct real-time status if feed is push. Emergency fix to not have to reverse url in task fetchers. Setting up push feeds for removal. Adding munin graph for pushed feed queue and nginx conf. Logging real-time. Resubscribing to real-time feeds on add/remove feed. Adding push feeds queue. Ignoring fat ping, since it might not match URLs. Cleanup Forgive push subscription errors. Adding # of push feeds to munin. Correcting a double-encoding bug for story permalinks that was from way back when. Fixing the shit out of the feed fetcher's deleted feed handling. Also fixing PuSH support to correctly give parameters to PuSH server. Checking for push hub links safetly. Adding subscription callbacks for PuSH. Stubbing in PuSH support. Need to work it in with Feeds and upgrade rel='hub'. Conflicts: apps/reader/models.py media/js/newsblur/reader/reader.js settings.py utils/feed_fetcher.py
507 lines
No EOL
23 KiB
Python
507 lines
No EOL
23 KiB
Python
import time
|
|
import datetime
|
|
import traceback
|
|
import multiprocessing
|
|
import urllib2
|
|
import xml.sax
|
|
import redis
|
|
import random
|
|
from django.core.cache import cache
|
|
from django.conf import settings
|
|
from django.db import IntegrityError
|
|
from apps.reader.models import UserSubscription, MUserStory
|
|
from apps.rss_feeds.models import Feed, MStory
|
|
from apps.rss_feeds.page_importer import PageImporter
|
|
from apps.rss_feeds.icon_importer import IconImporter
|
|
from apps.push.models import PushSubscription
|
|
from utils import feedparser
|
|
from utils.story_functions import pre_process_story
|
|
from utils import log as logging
|
|
from utils.feed_functions import timelimit, TimeoutError, mail_feed_error_to_admin, utf8encode
|
|
|
|
|
|
# Refresh feed code adapted from Feedjack.
|
|
# http://feedjack.googlecode.com
|
|
|
|
ENTRY_NEW, ENTRY_UPDATED, ENTRY_SAME, ENTRY_ERR = range(4)
|
|
FEED_OK, FEED_SAME, FEED_ERRPARSE, FEED_ERRHTTP, FEED_ERREXC = range(5)
|
|
|
|
def mtime(ttime):
|
|
""" datetime auxiliar function.
|
|
"""
|
|
return datetime.datetime.fromtimestamp(time.mktime(ttime))
|
|
|
|
|
|
class FetchFeed:
|
|
def __init__(self, feed_id, options):
|
|
self.feed = Feed.objects.get(pk=feed_id)
|
|
self.options = options
|
|
self.fpf = None
|
|
|
|
@timelimit(20)
|
|
def fetch(self):
|
|
"""
|
|
Uses feedparser to download the feed. Will be parsed later.
|
|
"""
|
|
start = time.time()
|
|
identity = self.get_identity()
|
|
log_msg = u'%2s ---> [%-30s] ~FYFetching feed (~FB%d~FY), last update: %s' % (identity,
|
|
self.feed.title[:30],
|
|
self.feed.id,
|
|
datetime.datetime.now() - self.feed.last_update)
|
|
logging.debug(log_msg)
|
|
|
|
etag=self.feed.etag
|
|
modified = self.feed.last_modified.utctimetuple()[:7] if self.feed.last_modified else None
|
|
|
|
if self.options.get('force') or not self.feed.fetched_once or not self.feed.known_good:
|
|
modified = None
|
|
etag = None
|
|
|
|
USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_1) AppleWebKit/534.48.3 (KHTML, like Gecko) Version/5.1 Safari/534.48.3 (NewsBlur Feed Fetcher - %s subscriber%s - %s)' % (
|
|
self.feed.num_subscribers,
|
|
's' if self.feed.num_subscribers != 1 else '',
|
|
settings.NEWSBLUR_URL
|
|
)
|
|
if self.options.get('feed_xml'):
|
|
logging.debug(u' ---> [%-30s] ~FM~BKFeed has been fat pinged. Ignoring fat: %s' % (
|
|
self.feed.title[:30], len(self.options.get('feed_xml'))))
|
|
if self.options.get('fpf'):
|
|
self.fpf = self.options.get('fpf')
|
|
logging.debug(u' ---> [%-30s] ~FM~BKFeed fetched in real-time with fat ping.' % (
|
|
self.feed.title[:30]))
|
|
else:
|
|
self.fpf = feedparser.parse(self.feed.feed_address,
|
|
agent=USER_AGENT,
|
|
etag=etag,
|
|
modified=modified)
|
|
|
|
if self.options['verbose'] and getattr(self.fpf, 'status', None) == 200:
|
|
logging.debug(u' ---> [%-30s] ~FBTIME: feed fetch in ~FM%.4ss' % (
|
|
self.feed.title[:30], time.time() - start))
|
|
|
|
return FEED_OK, self.fpf
|
|
|
|
def get_identity(self):
|
|
identity = "X"
|
|
|
|
current_process = multiprocessing.current_process()
|
|
if current_process._identity:
|
|
identity = current_process._identity[0]
|
|
|
|
return identity
|
|
|
|
class ProcessFeed:
|
|
def __init__(self, feed_id, fpf, options):
|
|
self.feed_id = feed_id
|
|
self.options = options
|
|
self.fpf = fpf
|
|
self.entry_trans = {
|
|
ENTRY_NEW:'new',
|
|
ENTRY_UPDATED:'updated',
|
|
ENTRY_SAME:'same',
|
|
ENTRY_ERR:'error'}
|
|
self.entry_keys = sorted(self.entry_trans.keys())
|
|
|
|
def refresh_feed(self):
|
|
try:
|
|
self.feed = Feed.objects.using('default').get(pk=self.feed_id)
|
|
except Feed.DoesNotExist:
|
|
duplicate_feed = Feed.objects.filter(feed_address=self.feed.feed_address, feed_link=self.feed.feed_link)
|
|
if duplicate_feed:
|
|
self.feed = duplicate_feed[0].feed
|
|
|
|
def process(self):
|
|
""" Downloads and parses a feed.
|
|
"""
|
|
start = time.time()
|
|
self.refresh_feed()
|
|
|
|
ret_values = {
|
|
ENTRY_NEW:0,
|
|
ENTRY_UPDATED:0,
|
|
ENTRY_SAME:0,
|
|
ENTRY_ERR:0}
|
|
|
|
# logging.debug(u' ---> [%d] Processing %s' % (self.feed.id, self.feed.feed_title))
|
|
|
|
if hasattr(self.fpf, 'status'):
|
|
if self.options['verbose']:
|
|
if self.fpf.bozo and self.fpf.status != 304:
|
|
logging.debug(u' ---> [%-30s] ~FRBOZO exception: %s ~SB(%s entries)' % (
|
|
self.feed.title[:30],
|
|
self.fpf.bozo_exception,
|
|
len(self.fpf.entries)))
|
|
|
|
if self.fpf.status == 304:
|
|
self.feed = self.feed.save()
|
|
self.feed.save_feed_history(304, "Not modified")
|
|
return FEED_SAME, ret_values
|
|
|
|
if self.fpf.status in (302, 301):
|
|
if not self.fpf.href.endswith('feedburner.com/atom.xml'):
|
|
self.feed.feed_address = self.fpf.href
|
|
if not self.feed.known_good:
|
|
self.feed.fetched_once = True
|
|
logging.debug(" ---> [%-30s] ~SB~SK~FRFeed is %s'ing. Refetching..." % (self.feed.title[:30], self.fpf.status))
|
|
self.feed = self.feed.schedule_feed_fetch_immediately()
|
|
if not self.fpf.entries:
|
|
self.feed = self.feed.save()
|
|
self.feed.save_feed_history(self.fpf.status, "HTTP Redirect")
|
|
return FEED_ERRHTTP, ret_values
|
|
if self.fpf.status >= 400:
|
|
logging.debug(" ---> [%-30s] ~SB~FRHTTP Status code: %s.%s Checking address..." % (self.feed.title[:30], self.fpf.status))
|
|
fixed_feed = None
|
|
if not self.feed.known_good:
|
|
fixed_feed = self.feed.check_feed_link_for_feed_address()
|
|
if not fixed_feed:
|
|
self.feed.save_feed_history(self.fpf.status, "HTTP Error")
|
|
self.feed = self.feed.save()
|
|
return FEED_ERRHTTP, ret_values
|
|
|
|
if not self.fpf.entries:
|
|
if self.fpf.bozo and isinstance(self.fpf.bozo_exception, feedparser.NonXMLContentType):
|
|
logging.debug(" ---> [%-30s] ~SB~FRFeed is Non-XML. %s entries. Checking address..." % (self.feed.title[:30], len(self.fpf.entries)))
|
|
fixed_feed = None
|
|
if not self.feed.known_good:
|
|
fixed_feed = self.feed.check_feed_link_for_feed_address()
|
|
if not fixed_feed:
|
|
self.feed.save_feed_history(552, 'Non-xml feed', self.fpf.bozo_exception)
|
|
self.feed = self.feed.save()
|
|
return FEED_ERRPARSE, ret_values
|
|
elif self.fpf.bozo and isinstance(self.fpf.bozo_exception, xml.sax._exceptions.SAXException):
|
|
logging.debug(" ---> [%-30s] ~SB~FRFeed has SAX/XML parsing issues. %s entries. Checking address..." % (self.feed.title[:30], len(self.fpf.entries)))
|
|
fixed_feed = None
|
|
if not self.feed.known_good:
|
|
fixed_feed = self.feed.check_feed_link_for_feed_address()
|
|
if not fixed_feed:
|
|
self.feed.save_feed_history(553, 'SAX Exception', self.fpf.bozo_exception)
|
|
self.feed = self.feed.save()
|
|
return FEED_ERRPARSE, ret_values
|
|
|
|
# the feed has changed (or it is the first time we parse it)
|
|
# saving the etag and last_modified fields
|
|
self.feed.etag = self.fpf.get('etag')
|
|
if self.feed.etag:
|
|
self.feed.etag = self.feed.etag[:255]
|
|
# some times this is None (it never should) *sigh*
|
|
if self.feed.etag is None:
|
|
self.feed.etag = ''
|
|
|
|
try:
|
|
self.feed.last_modified = mtime(self.fpf.modified)
|
|
except:
|
|
pass
|
|
|
|
self.fpf.entries = self.fpf.entries[:50]
|
|
|
|
if self.fpf.feed.get('title'):
|
|
self.feed.feed_title = self.fpf.feed.get('title')
|
|
tagline = self.fpf.feed.get('tagline', self.feed.data.feed_tagline)
|
|
if tagline:
|
|
self.feed.data.feed_tagline = utf8encode(tagline)
|
|
self.feed.data.save()
|
|
if not self.feed.feed_link_locked:
|
|
self.feed.feed_link = self.fpf.feed.get('link') or self.fpf.feed.get('id') or self.feed.feed_link
|
|
|
|
guids = []
|
|
for entry in self.fpf.entries:
|
|
if entry.get('id', ''):
|
|
guids.append(entry.get('id', ''))
|
|
elif entry.get('link'):
|
|
guids.append(entry.link)
|
|
elif entry.get('title'):
|
|
guids.append(entry.title)
|
|
self.feed = self.feed.save()
|
|
|
|
# Compare new stories to existing stories, adding and updating
|
|
start_date = datetime.datetime.utcnow()
|
|
# end_date = datetime.datetime.utcnow()
|
|
story_guids = []
|
|
for entry in self.fpf.entries:
|
|
story = pre_process_story(entry)
|
|
if story.get('published') < start_date:
|
|
start_date = story.get('published')
|
|
# if story.get('published') > end_date:
|
|
# end_date = story.get('published')
|
|
story_guids.append(story.get('guid') or story.get('link'))
|
|
|
|
existing_stories = list(MStory.objects(
|
|
# story_guid__in=story_guids,
|
|
story_date__gte=start_date,
|
|
story_feed_id=self.feed_id
|
|
).limit(len(story_guids)))
|
|
|
|
# MStory.objects(
|
|
# (Q(story_date__gte=start_date) & Q(story_date__lte=end_date))
|
|
# | (Q(story_guid__in=story_guids)),
|
|
# story_feed=self.feed
|
|
# ).order_by('-story_date')
|
|
ret_values = self.feed.add_update_stories(self.fpf.entries, existing_stories, verbose=self.options['verbose'])
|
|
|
|
if (not self.feed.is_push and hasattr(self.fpf, 'feed') and
|
|
hasattr(self.fpf.feed, 'links') and self.fpf.feed.links):
|
|
hub_url = None
|
|
self_url = self.feed.feed_link
|
|
for link in self.fpf.feed.links:
|
|
if link['rel'] == 'hub':
|
|
hub_url = link['href']
|
|
elif link['rel'] == 'self':
|
|
self_url = link['href']
|
|
if hub_url and self_url and not settings.DEBUG:
|
|
logging.debug(u' ---> [%-30s] ~BB~SK~FWSubscribing to PuSH hub: %s' % (
|
|
self.feed.title[:30], hub_url))
|
|
PushSubscription.objects.subscribe(self_url, feed=self.feed, hub=hub_url)
|
|
|
|
logging.debug(u' ---> [%-30s] ~FYParsed Feed: new=~FG~SB%s~SN~FY up=~FY~SB%s~SN same=~FY%s err=~FR~SB%s' % (
|
|
self.feed.title[:30],
|
|
ret_values[ENTRY_NEW], ret_values[ENTRY_UPDATED], ret_values[ENTRY_SAME], ret_values[ENTRY_ERR]))
|
|
self.feed.update_all_statistics(full=bool(ret_values[ENTRY_NEW]), force=self.options['force'])
|
|
self.feed.trim_feed()
|
|
self.feed.save_feed_history(200, "OK")
|
|
|
|
if self.options['verbose']:
|
|
logging.debug(u' ---> [%-30s] ~FBTIME: feed parse in ~FM%.4ss' % (
|
|
self.feed.title[:30], time.time() - start))
|
|
|
|
return FEED_OK, ret_values
|
|
|
|
|
|
class Dispatcher:
|
|
def __init__(self, options, num_threads):
|
|
self.options = options
|
|
self.entry_stats = {
|
|
ENTRY_NEW:0,
|
|
ENTRY_UPDATED:0,
|
|
ENTRY_SAME:0,
|
|
ENTRY_ERR:0}
|
|
self.feed_stats = {
|
|
FEED_OK:0,
|
|
FEED_SAME:0,
|
|
FEED_ERRPARSE:0,
|
|
FEED_ERRHTTP:0,
|
|
FEED_ERREXC:0}
|
|
self.feed_trans = {
|
|
FEED_OK:'ok',
|
|
FEED_SAME:'unchanged',
|
|
FEED_ERRPARSE:'cant_parse',
|
|
FEED_ERRHTTP:'http_error',
|
|
FEED_ERREXC:'exception'}
|
|
self.feed_keys = sorted(self.feed_trans.keys())
|
|
self.num_threads = num_threads
|
|
self.time_start = datetime.datetime.utcnow()
|
|
self.workers = []
|
|
|
|
def refresh_feed(self, feed_id):
|
|
"""Update feed, since it may have changed"""
|
|
return Feed.objects.using('default').get(pk=feed_id)
|
|
|
|
def process_feed_wrapper(self, feed_queue):
|
|
delta = None
|
|
current_process = multiprocessing.current_process()
|
|
identity = "X"
|
|
feed = None
|
|
if current_process._identity:
|
|
identity = current_process._identity[0]
|
|
|
|
for feed_id in feed_queue:
|
|
ret_entries = {
|
|
ENTRY_NEW: 0,
|
|
ENTRY_UPDATED: 0,
|
|
ENTRY_SAME: 0,
|
|
ENTRY_ERR: 0
|
|
}
|
|
start_time = time.time()
|
|
ret_feed = FEED_ERREXC
|
|
try:
|
|
feed = self.refresh_feed(feed_id)
|
|
|
|
skip = False
|
|
if self.options.get('fake'):
|
|
skip = True
|
|
weight = "-"
|
|
quick = "-"
|
|
rand = "-"
|
|
elif (self.options.get('quick') and not self.options['force'] and
|
|
feed.known_good and feed.fetched_once and not feed.is_push):
|
|
weight = feed.stories_last_month * feed.num_subscribers
|
|
random_weight = random.randint(1, max(weight, 1))
|
|
quick = float(self.options.get('quick', 0))
|
|
rand = random.random()
|
|
if random_weight < 100 and rand < quick:
|
|
skip = True
|
|
if skip:
|
|
logging.debug(' ---> [%-30s] ~BGFaking fetch, skipping (%s/month, %s subs, %s < %s)...' % (
|
|
feed.title[:30],
|
|
weight,
|
|
feed.num_subscribers,
|
|
rand, quick))
|
|
continue
|
|
|
|
ffeed = FetchFeed(feed_id, self.options)
|
|
ret_feed, fetched_feed = ffeed.fetch()
|
|
|
|
if ((fetched_feed and ret_feed == FEED_OK) or self.options['force']):
|
|
pfeed = ProcessFeed(feed_id, fetched_feed, self.options)
|
|
ret_feed, ret_entries = pfeed.process()
|
|
feed = pfeed.feed
|
|
|
|
if ret_entries.get(ENTRY_NEW) or self.options['force']:
|
|
start = time.time()
|
|
if not feed.known_good:
|
|
feed.known_good = True
|
|
feed = feed.save()
|
|
MUserStory.delete_old_stories(feed_id=feed.pk)
|
|
try:
|
|
self.count_unreads_for_subscribers(feed)
|
|
except TimeoutError:
|
|
logging.debug(' ---> [%-30s] Unread count took too long...' % (feed.title[:30],))
|
|
if self.options['verbose']:
|
|
logging.debug(u' ---> [%-30s] ~FBTIME: unread count in ~FM%.4ss' % (
|
|
feed.title[:30], time.time() - start))
|
|
cache.delete('feed_stories:%s-%s-%s' % (feed.id, 0, 25))
|
|
# if ret_entries.get(ENTRY_NEW) or ret_entries.get(ENTRY_UPDATED) or self.options['force']:
|
|
# feed.get_stories(force=True)
|
|
except KeyboardInterrupt:
|
|
break
|
|
except urllib2.HTTPError, e:
|
|
logging.debug(' ---> [%-30s] ~FRFeed throws HTTP error: ~SB%s' % (unicode(feed_id)[:30], e.fp.read()))
|
|
feed.save_feed_history(e.code, e.msg, e.fp.read())
|
|
fetched_feed = None
|
|
except Feed.DoesNotExist, e:
|
|
logging.debug(' ---> [%-30s] ~FRFeed is now gone...' % (unicode(feed_id)[:30]))
|
|
continue
|
|
except TimeoutError, e:
|
|
logging.debug(' ---> [%-30s] ~FRFeed fetch timed out...' % (feed.title[:30]))
|
|
feed.save_feed_history(505, 'Timeout', '')
|
|
fetched_feed = None
|
|
except Exception, e:
|
|
logging.debug('[%d] ! -------------------------' % (feed_id,))
|
|
tb = traceback.format_exc()
|
|
logging.error(tb)
|
|
logging.debug('[%d] ! -------------------------' % (feed_id,))
|
|
ret_feed = FEED_ERREXC
|
|
feed = self.refresh_feed(feed.pk)
|
|
feed.save_feed_history(500, "Error", tb)
|
|
fetched_feed = None
|
|
mail_feed_error_to_admin(feed, e)
|
|
|
|
feed = self.refresh_feed(feed.pk)
|
|
if ((self.options['force']) or
|
|
(random.random() > .9) or
|
|
(fetched_feed and
|
|
feed.feed_link and
|
|
feed.has_page and
|
|
(ret_feed == FEED_OK or
|
|
(ret_feed == FEED_SAME and feed.stories_last_month > 10)))):
|
|
|
|
logging.debug(u' ---> [%-30s] ~FYFetching page: %s' % (feed.title[:30], feed.feed_link))
|
|
page_importer = PageImporter(feed)
|
|
try:
|
|
page_importer.fetch_page()
|
|
except TimeoutError, e:
|
|
logging.debug(' ---> [%-30s] ~FRPage fetch timed out...' % (feed.title[:30]))
|
|
feed.save_page_history(555, 'Timeout', '')
|
|
except Exception, e:
|
|
logging.debug('[%d] ! -------------------------' % (feed_id,))
|
|
tb = traceback.format_exc()
|
|
logging.error(tb)
|
|
logging.debug('[%d] ! -------------------------' % (feed_id,))
|
|
feed.save_page_history(550, "Page Error", tb)
|
|
fetched_feed = None
|
|
mail_feed_error_to_admin(feed, e)
|
|
|
|
logging.debug(u' ---> [%-30s] ~FYFetching icon: %s' % (feed.title[:30], feed.feed_link))
|
|
icon_importer = IconImporter(feed, force=self.options['force'])
|
|
try:
|
|
icon_importer.save()
|
|
except TimeoutError, e:
|
|
logging.debug(' ---> [%-30s] ~FRIcon fetch timed out...' % (feed.title[:30]))
|
|
feed.save_page_history(556, 'Timeout', '')
|
|
except Exception, e:
|
|
logging.debug('[%d] ! -------------------------' % (feed_id,))
|
|
tb = traceback.format_exc()
|
|
logging.error(tb)
|
|
logging.debug('[%d] ! -------------------------' % (feed_id,))
|
|
# feed.save_feed_history(560, "Icon Error", tb)
|
|
mail_feed_error_to_admin(feed, e)
|
|
else:
|
|
logging.debug(u' ---> [%-30s] ~FBSkipping page fetch: (%s on %s stories) %s' % (feed.title[:30], self.feed_trans[ret_feed], feed.stories_last_month, '' if feed.has_page else ' [HAS NO PAGE]'))
|
|
|
|
feed = self.refresh_feed(feed.pk)
|
|
delta = time.time() - start_time
|
|
|
|
feed.last_load_time = round(delta)
|
|
feed.fetched_once = True
|
|
try:
|
|
feed = feed.save()
|
|
except IntegrityError:
|
|
logging.debug(" ---> [%-30s] ~FRIntegrityError on feed: %s" % (feed.title[:30], feed.feed_address,))
|
|
|
|
if ret_entries[ENTRY_NEW]:
|
|
self.publish_to_subscribers(feed)
|
|
|
|
done_msg = (u'%2s ---> [%-30s] ~FYProcessed in ~FG~SB%.4ss~FY~SN (~FB%s~FY) [%s]' % (
|
|
identity, feed.feed_title[:30], delta,
|
|
feed.pk, self.feed_trans[ret_feed],))
|
|
logging.debug(done_msg)
|
|
|
|
self.feed_stats[ret_feed] += 1
|
|
for key, val in ret_entries.items():
|
|
self.entry_stats[key] += val
|
|
|
|
if len(feed_queue) == 1:
|
|
return feed
|
|
|
|
# time_taken = datetime.datetime.utcnow() - self.time_start
|
|
|
|
def publish_to_subscribers(self, feed):
|
|
try:
|
|
r = redis.Redis(connection_pool=settings.REDIS_POOL)
|
|
listeners_count = r.publish(str(feed.pk), 'story:new')
|
|
if listeners_count:
|
|
logging.debug(" ---> [%-30s] ~FMPublished to %s subscribers" % (feed.title[:30], listeners_count))
|
|
except redis.ConnectionError:
|
|
logging.debug(" ***> [%-30s] ~BMRedis is unavailable for real-time." % (feed.title[:30],))
|
|
|
|
@timelimit(20)
|
|
def count_unreads_for_subscribers(self, feed):
|
|
UNREAD_CUTOFF = datetime.datetime.utcnow() - datetime.timedelta(days=settings.DAYS_OF_UNREAD)
|
|
user_subs = UserSubscription.objects.filter(feed=feed,
|
|
active=True,
|
|
user__profile__last_seen_on__gte=UNREAD_CUTOFF)\
|
|
.order_by('-last_read_date')
|
|
logging.debug(u' ---> [%-30s] ~FYComputing scores for ~SB%s subscribers ~SN(%s/%s/%s)' % (
|
|
feed.title[:30], user_subs.count(),
|
|
feed.num_subscribers, feed.active_subscribers, feed.premium_subscribers))
|
|
|
|
stories_db = MStory.objects(story_feed_id=feed.pk,
|
|
story_date__gte=UNREAD_CUTOFF)
|
|
for sub in user_subs:
|
|
cache.delete('usersub:%s' % sub.user_id)
|
|
sub.needs_unread_recalc = True
|
|
sub.save()
|
|
|
|
if self.options['compute_scores']:
|
|
for sub in user_subs:
|
|
silent = False if self.options['verbose'] >= 2 else True
|
|
sub.calculate_feed_scores(silent=silent, stories_db=stories_db)
|
|
|
|
def add_jobs(self, feeds_queue, feeds_count=1):
|
|
""" adds a feed processing job to the pool
|
|
"""
|
|
self.feeds_queue = feeds_queue
|
|
self.feeds_count = feeds_count
|
|
|
|
def run_jobs(self):
|
|
if self.options['single_threaded']:
|
|
return self.process_feed_wrapper(self.feeds_queue[0])
|
|
else:
|
|
for i in range(self.num_threads):
|
|
feed_queue = self.feeds_queue[i]
|
|
self.workers.append(multiprocessing.Process(target=self.process_feed_wrapper,
|
|
args=(feed_queue,)))
|
|
for i in range(self.num_threads):
|
|
self.workers[i].start()
|
|
|
|
|