Slightly refactoring feed fetcher to deal with weakref's when pickling functions.

This commit is contained in:
Samuel Clay 2020-12-15 17:24:01 -05:00
parent 238bed3aa6
commit 6f97446b3b

View file

@ -2,6 +2,10 @@ import time
import datetime
import traceback
import multiprocessing
import django
django.setup()
import urllib.request, urllib.error, urllib.parse
import xml.sax
import redis
@ -617,9 +621,9 @@ class ProcessFeed:
return FEED_OK, ret_values
class FeedFetcherWorker:
class Dispatcher:
def __init__(self, options, num_threads):
def __init__(self, options):
self.options = options
self.feed_stats = {
FEED_OK:0,
@ -634,9 +638,7 @@ class Dispatcher:
FEED_ERRHTTP:'http_error',
FEED_ERREXC:'exception'}
self.feed_keys = sorted(self.feed_trans.keys())
self.num_threads = num_threads
self.time_start = datetime.datetime.utcnow()
self.workers = []
def refresh_feed(self, feed_id):
"""Update feed, since it may have changed"""
@ -923,6 +925,12 @@ class Dispatcher:
silent = False if getattr(self.options, 'verbose', 0) >= 2 else True
sub.calculate_feed_scores(silent=silent, stories=stories)
class Dispatcher:
def __init__(self, options, num_threads):
self.options = options
self.num_threads = num_threads
self.workers = []
def add_jobs(self, feeds_queue, feeds_count=1):
""" adds a feed processing job to the pool
"""
@ -931,11 +939,15 @@ class Dispatcher:
def run_jobs(self):
if self.options['single_threaded']:
return self.process_feed_wrapper(self.feeds_queue[0])
return dispatch_workers(self.feeds_queue[0], self.options)
else:
for i in range(self.num_threads):
feed_queue = self.feeds_queue[i]
self.workers.append(multiprocessing.Process(target=self.process_feed_wrapper,
args=(feed_queue,)))
self.workers.append(multiprocessing.Process(target=dispatch_workers,
args=(feed_queue, self.options)))
for i in range(self.num_threads):
self.workers[i].start()
def dispatch_workers(feed_queue, options):
worker = FeedFetcherWorker(options)
return worker.process_feed_wrapper(feed_queue)