Sending all of a user's feeds through the archive fetcher to fill out their backfill, then email them the new stories.

This commit is contained in:
Samuel Clay 2022-04-18 13:29:13 -04:00
parent 43695b8cd4
commit ac593494db
6 changed files with 144 additions and 19 deletions

View file

@ -265,9 +265,7 @@ class Profile(models.Model):
return True return True
def activate_archive(self, never_expire=False): def activate_archive(self, never_expire=False):
from apps.profile.tasks import EmailNewPremiumArchive UserSubscription.schedule_fetch_archive_feeds_for_user(self.user.pk)
EmailNewPremiumArchive.delay(user_id=self.user.pk)
was_premium = self.is_premium was_premium = self.is_premium
was_archive = self.is_archive was_archive = self.is_archive
@ -289,6 +287,8 @@ class Profile(models.Model):
except (IntegrityError, Feed.DoesNotExist): except (IntegrityError, Feed.DoesNotExist):
pass pass
# Count subscribers to turn on archive_subscribers counts, then show that count to users
# on the paypal_archive_return page.
try: try:
scheduled_feeds = [sub.feed.pk for sub in subs] scheduled_feeds = [sub.feed.pk for sub in subs]
except Feed.DoesNotExist: except Feed.DoesNotExist:
@ -296,7 +296,7 @@ class Profile(models.Model):
logging.user(self.user, "~SN~FMTasking the scheduling immediate premium setup of ~SB%s~SN feeds..." % logging.user(self.user, "~SN~FMTasking the scheduling immediate premium setup of ~SB%s~SN feeds..." %
len(scheduled_feeds)) len(scheduled_feeds))
SchedulePremiumSetup.apply_async(kwargs=dict(feed_ids=scheduled_feeds)) SchedulePremiumSetup.apply_async(kwargs=dict(feed_ids=scheduled_feeds))
UserSubscription.queue_new_feeds(self.user) UserSubscription.queue_new_feeds(self.user)
self.setup_premium_history() self.setup_premium_history()
@ -1249,8 +1249,8 @@ class Profile(models.Model):
logging.user(self.user, "~BB~FM~SBSending email for new premium: %s" % self.user.email) logging.user(self.user, "~BB~FM~SBSending email for new premium: %s" % self.user.email)
def send_new_premium_archive_email(self, force=False): def send_new_premium_archive_email(self, new_story_count, total_story_count, force=False):
if not self.user.email or not self.send_emails: if not self.user.email:
return return
params = dict(receiver_user_id=self.user.pk, email_type='new_premium_archive') params = dict(receiver_user_id=self.user.pk, email_type='new_premium_archive')
@ -1265,7 +1265,7 @@ class Profile(models.Model):
user = self.user user = self.user
text = render_to_string('mail/email_new_premium_archive.txt', locals()) text = render_to_string('mail/email_new_premium_archive.txt', locals())
html = render_to_string('mail/email_new_premium_archive.xhtml', locals()) html = render_to_string('mail/email_new_premium_archive.xhtml', locals())
subject = "Thank you for subscribing to NewsBlur Premium Archive!" subject = f"Your NewsBlur Premium Archive subscription now holds {total_story_count:,} stories"
msg = EmailMultiAlternatives(subject, text, msg = EmailMultiAlternatives(subject, text,
from_email='NewsBlur <%s>' % settings.HELLO_EMAIL, from_email='NewsBlur <%s>' % settings.HELLO_EMAIL,
to=['%s <%s>' % (user, user.email)]) to=['%s <%s>' % (user, user.email)])

View file

@ -15,10 +15,30 @@ def EmailNewPremium(user_id):
user_profile = Profile.objects.get(user__pk=user_id) user_profile = Profile.objects.get(user__pk=user_id)
user_profile.send_new_premium_email() user_profile.send_new_premium_email()
@app.task(name="email-new-premium-archive") @app.task()
def EmailNewPremiumArchive(user_id): def FetchArchiveFeedsForUser(user_id):
subs = UserSubscription.objects.filter(user=user_id)
user_profile = Profile.objects.get(user__pk=user_id) user_profile = Profile.objects.get(user__pk=user_id)
user_profile.send_new_premium_archive_email() logging.user(user_profile.user, f"~FCBeginning archive feed fetches for ~SB~FG{subs.count()} feeds~SN...")
UserSubscription.fetch_archive_feeds_for_user(user_id)
@app.task()
def FetchArchiveFeedsChunk(user_id, feed_ids):
logging.debug(" ---> Fetching archive stories: %s for %s" % (feed_ids, user_id))
UserSubscription.fetch_archive_feeds_chunk(user_id, feed_ids)
@app.task()
def FinishFetchArchiveFeeds(results, user_id, start_time, starting_story_count):
logging.debug(" ---> Fetching archive stories finished for %s" % (user_id))
ending_story_count = UserSubscription.finish_fetch_archive_feeds(user_id, start_time)
new_story_count = ending_story_count - starting_story_count
subs = UserSubscription.objects.filter(user=user_id)
user_profile = Profile.objects.get(user__pk=user_id)
logging.user(user_profile.user, f"~FCFinished archive feed fetches for ~SB~FG{subs.count()} feeds~FC~SN: ~FG~SB{new_story_count} new~SB~FC, ~FG{ending_story_count} total")
user_profile.send_new_premium_archive_email(new_story_count, ending_story_count)
@app.task(name="email-new-premium-pro") @app.task(name="email-new-premium-pro")
def EmailNewPremiumPro(user_id): def EmailNewPremiumPro(user_id):

View file

@ -380,15 +380,16 @@ def profile_is_premium_archive(request):
subs = UserSubscription.objects.filter(user=request.user) subs = UserSubscription.objects.filter(user=request.user)
total_subs = subs.count() total_subs = subs.count()
activated_subs = subs.filter(active=True).count() activated_subs = subs.filter(feed__archive_subscribers__gte=1).count()
if retries >= 30: if retries >= 30:
code = -1 code = -1
if not request.user.profile.is_premium: if not request.user.profile.is_premium:
subject = "Premium activation failed: %s (%s/%s)" % (request.user, activated_subs, total_subs) subject = "Premium archive activation failed: %s (%s/%s)" % (request.user, activated_subs, total_subs)
message = """User: %s (%s) -- Email: %s""" % (request.user.username, request.user.pk, request.user.email) message = """User: %s (%s) -- Email: %s""" % (request.user.username, request.user.pk, request.user.email)
mail_admins(subject, message) mail_admins(subject, message)
request.user.profile.is_premium = True request.user.profile.is_premium = True
request.user.profile.is_premium_archive = True
request.user.profile.save() request.user.profile.save()
return { return {

View file

@ -3,6 +3,7 @@ import time
import re import re
import redis import redis
import pymongo import pymongo
import celery
import mongoengine as mongo import mongoengine as mongo
from operator import itemgetter from operator import itemgetter
from pprint import pprint from pprint import pprint
@ -573,7 +574,88 @@ class UserSubscription(models.Model):
if stale_feeds: if stale_feeds:
stale_feeds = list(set([f.feed_id for f in stale_feeds])) stale_feeds = list(set([f.feed_id for f in stale_feeds]))
cls.queue_new_feeds(user, new_feeds=stale_feeds) cls.queue_new_feeds(user, new_feeds=stale_feeds)
@classmethod
def schedule_fetch_archive_feeds_for_user(cls, user_id):
from apps.profile.tasks import FetchArchiveFeedsForUser
FetchArchiveFeedsForUser.apply_async(kwargs=dict(user_id=user_id),
queue='search_indexer')
# Should be run as a background task
@classmethod
def fetch_archive_feeds_for_user(cls, user_id):
from apps.profile.tasks import FetchArchiveFeedsChunk, FinishFetchArchiveFeeds
start_time = time.time()
user = User.objects.get(pk=user_id)
r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL)
r.publish(user.username, 'fetch_archive:start')
subscriptions = UserSubscription.objects.filter(user=user).only('feed')
total = subscriptions.count()
feed_ids = []
starting_story_count = 0
for sub in subscriptions:
try:
feed_ids.append(sub.feed.pk)
except Feed.DoesNotExist:
continue
starting_story_count += MStory.objects(story_feed_id=sub.feed.pk).count()
feed_id_chunks = [c for c in chunks(feed_ids, 6)]
logging.user(user, "~FCFetching archive stories from ~SB%s feeds~SN in %s chunks..." %
(total, len(feed_id_chunks)))
search_chunks = [FetchArchiveFeedsChunk.s(feed_ids=feed_id_chunk,
user_id=user_id
).set(queue='search_indexer')
for feed_id_chunk in feed_id_chunks]
callback = FinishFetchArchiveFeeds.s(user_id=user_id,
start_time=start_time,
starting_story_count=starting_story_count).set(queue='search_indexer')
celery.chord(search_chunks)(callback)
@classmethod
def fetch_archive_feeds_chunk(cls, user_id, feed_ids):
from apps.rss_feeds.models import Feed
r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL)
user = User.objects.get(pk=user_id)
logging.user(user, "~FCFetching archive stories from %s feeds..." % len(feed_ids))
for feed_id in feed_ids:
feed = Feed.get_by_id(feed_id)
if not feed: continue
feed.fill_out_archive_stories()
r.publish(user.username, 'fetch_archive:feeds:%s' %
','.join([str(f) for f in feed_ids]))
@classmethod
def finish_fetch_archive_feeds(cls, user_id, start_time):
r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL)
user = User.objects.get(pk=user_id)
subscriptions = UserSubscription.objects.filter(user=user).only('feed')
total = subscriptions.count()
duration = time.time() - start_time
ending_story_count = 0
for sub in subscriptions:
try:
ending_story_count += MStory.objects(story_feed_id=sub.feed.pk).count()
except Feed.DoesNotExist:
continue
logging.user(user, "~FCFetched archive stories from ~SB%s feeds~SN in ~FM~SB%s~FC~SN sec." %
(total, round(duration, 2)))
r.publish(user.username, 'fetch_archive:done')
return ending_story_count
@classmethod @classmethod
def identify_deleted_feed_users(cls, old_feed_id): def identify_deleted_feed_users(cls, old_feed_id):
users = UserSubscriptionFolders.objects.filter(folders__contains=old_feed_id).only('user') users = UserSubscriptionFolders.objects.filter(folders__contains=old_feed_id).only('user')
@ -1126,7 +1208,8 @@ class UserSubscription(models.Model):
return table return table
# return cofeeds # return cofeeds
class RUserStory: class RUserStory:
@classmethod @classmethod

View file

@ -608,6 +608,7 @@ class Feed(models.Model):
r.zremrangebyrank('error_feeds', 0, -1) r.zremrangebyrank('error_feeds', 0, -1)
else: else:
logging.debug(" ---> No errored feeds to drain") logging.debug(" ---> No errored feeds to drain")
def update_all_statistics(self, has_new_stories=False, force=False): def update_all_statistics(self, has_new_stories=False, force=False):
recount = not self.counts_converted_to_redis recount = not self.counts_converted_to_redis
count_extra = False count_extra = False
@ -1032,12 +1033,25 @@ class Feed(models.Model):
else: else:
return 'black' return 'black'
def fill_out_archive_stories(self): def fill_out_archive_stories(self, force=False):
""" """
Starting from page 1 and iterating through N pages, determine whether Starting from page 1 and iterating through N pages, determine whether
page(i) matches page(i-1) and if there are any new stories. page(i) matches page(i-1) and if there are any new stories.
""" """
before_story_count = MStory.objects(story_feed_id=self.pk).count()
if not force and not self.archive_subscribers:
logging.debug(" ---> [%-30s] ~FBNot filling out archive stories, no archive subscribers" % (
self.log_title[:30]))
return before_story_count, before_story_count
self.update(archive_page=1) self.update(archive_page=1)
after_story_count = MStory.objects(story_feed_id=self.pk).count()
logging.debug(" ---> [%-30s] ~FCFilled out archive, ~FM~SB%s~SN new stories~FC, total of ~SB%s~SN stories" % (
self.log_title[:30],
after_story_count - before_story_count,
after_story_count))
def save_feed_stories_last_month(self, verbose=False): def save_feed_stories_last_month(self, verbose=False):
month_ago = datetime.datetime.utcnow() - datetime.timedelta(days=30) month_ago = datetime.datetime.utcnow() - datetime.timedelta(days=30)

View file

@ -503,11 +503,11 @@ class ProcessFeed:
if not story['title'] and not story['story_content']: if not story['title'] and not story['story_content']:
continue continue
if self.options.get('archive_page', None) and story.get('published') > day_ago: if self.options.get('archive_page', None) and story.get('published') > day_ago:
# Arbitrary but necessary to prevent broken feeds from creating an unlimited number of stories # Archive only: Arbitrary but necessary to prevent feeds from creating an unlimited number of stories
# because they don't have a guid so it gets auto-generated based on the date, and if the story # because they don't have a guid so it gets auto-generated based on the date, and if the story
# is missing a date, then the latest date gets used. So reject anything newer than 24 hours old # is missing a date, then the latest date gets used. So reject anything newer than 24 hours old
# when filling out the archive. # when filling out the archive.
logging.debug(f" ---> [%-30s] ~FBTossing story because it's too new for the archive: ~SB{story}") # logging.debug(f" ---> [%-30s] ~FBTossing story because it's too new for the archive: ~SB{story}")
continue continue
if story.get('published') < start_date: if story.get('published') < start_date:
start_date = story.get('published') start_date = story.get('published')
@ -1142,10 +1142,10 @@ class FeedFetcherWorker:
# time_taken = datetime.datetime.utcnow() - self.time_start # time_taken = datetime.datetime.utcnow() - self.time_start
def fetch_and_process_archive_pages(self, feed_id): def fetch_and_process_archive_pages(self, feed_id):
seen_story_hashes = set()
feed = Feed.get_by_id(feed_id) feed = Feed.get_by_id(feed_id)
for archive_page_key in ["page", "paged"]: for archive_page_key in ["page", "paged"]:
seen_story_hashes = set()
failed_pages = 0 failed_pages = 0
self.options['archive_page_key'] = archive_page_key self.options['archive_page_key'] = archive_page_key
@ -1155,7 +1155,13 @@ class FeedFetcherWorker:
self.options['archive_page'] = page+1 self.options['archive_page'] = page+1
ffeed = FetchFeed(feed_id, self.options) ffeed = FetchFeed(feed_id, self.options)
ret_feed, fetched_feed = ffeed.fetch() try:
ret_feed, fetched_feed = ffeed.fetch()
except TimeoutError as e:
logging.debug(' ---> [%-30s] ~FRFeed fetch timed out...' % (feed.log_title[:30]))
failed_pages += 1
continue
raw_feed = ffeed.raw_feed raw_feed = ffeed.raw_feed
if fetched_feed and ret_feed == FEED_OK: if fetched_feed and ret_feed == FEED_OK:
@ -1169,12 +1175,13 @@ class FeedFetcherWorker:
seen_story_hashes.update(pfeed.archive_seen_story_hashes) seen_story_hashes.update(pfeed.archive_seen_story_hashes)
after_story_hashes = len(seen_story_hashes) after_story_hashes = len(seen_story_hashes)
logging.debug(f" ---> [{feed.log_title[:30]:<30}] ~FBStory hashes found: ~SB{len(seen_story_hashes)} stories, ~SN~FR{failed_pages}~FB failures")
if before_story_hashes == after_story_hashes: if before_story_hashes == after_story_hashes:
failed_pages += 1 failed_pages += 1
else: else:
failed_pages += 1 failed_pages += 1
logging.debug(f" ---> [{feed.log_title[:30]:<30}] ~FBStory hashes found: ~SB{len(seen_story_hashes)} stories, ~SN~FR{failed_pages}~FB failures")
def publish_to_subscribers(self, feed, new_count): def publish_to_subscribers(self, feed, new_count):
try: try:
r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL) r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL)