diff --git a/apps/reader/models.py b/apps/reader/models.py index be4a019d2..477645f79 100644 --- a/apps/reader/models.py +++ b/apps/reader/models.py @@ -1,5 +1,6 @@ import datetime import time +import re import redis from utils import log as logging from utils import json_functions as json @@ -150,9 +151,10 @@ class UserSubscription(models.Model): pipeline.zinterstore(unread_ranked_stories_key, [sorted_stories_key, unread_stories_key]) byscorefunc(unread_ranked_stories_key, min_score, max_score, withscores=include_timestamps) - pipeline.expire(unread_ranked_stories_key, 60*60) + pipeline.delete(unread_ranked_stories_key) if expire_unread_stories_key: pipeline.delete(unread_stories_key) + results = pipeline.execute() @@ -163,12 +165,13 @@ class UserSubscription(models.Model): feed_counter += 1 else: story_hashes.extend(hashes) - + return story_hashes def get_stories(self, offset=0, limit=6, order='newest', read_filter='all', withscores=False, hashes_only=False, cutoff_date=None): r = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_POOL) + rt = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_TEMP_POOL) ignore_user_stories = False stories_key = 'F:%s' % (self.feed_id) @@ -177,34 +180,38 @@ class UserSubscription(models.Model): unread_ranked_stories_key = 'z%sU:%s:%s' % ('h' if hashes_only else '', self.user_id, self.feed_id) - if offset and not withscores and r.exists(unread_ranked_stories_key): - pass - else: - r.delete(unread_ranked_stories_key) + if withscores or not offset or not rt.exists(unread_ranked_stories_key): + rt.delete(unread_ranked_stories_key) if not r.exists(stories_key): # print " ---> No stories on feed: %s" % self return [] - elif read_filter != 'unread' or not r.exists(read_stories_key): + elif read_filter == 'all' or not r.exists(read_stories_key): ignore_user_stories = True unread_stories_key = stories_key else: r.sdiffstore(unread_stories_key, stories_key, read_stories_key) sorted_stories_key = 'zF:%s' % (self.feed_id) r.zinterstore(unread_ranked_stories_key, [sorted_stories_key, unread_stories_key]) + if not ignore_user_stories: + r.delete(unread_stories_key) + + dump = r.dump(unread_ranked_stories_key) + rt.restore(unread_ranked_stories_key, 1*60*60, dump) + r.delete(unread_ranked_stories_key) - current_time = int(time.time() + 60*60*24) + current_time = int(time.time() + 60*60*24) if not cutoff_date: cutoff_date = datetime.datetime.now() - datetime.timedelta(days=settings.DAYS_OF_UNREAD) if order == 'oldest': - byscorefunc = r.zrangebyscore + byscorefunc = rt.zrangebyscore if read_filter == 'unread': min_score = int(time.mktime(self.mark_read_date.timetuple())) + 1 else: min_score = int(time.mktime(cutoff_date.timetuple()))-1000 max_score = current_time else: - byscorefunc = r.zrevrangebyscore + byscorefunc = rt.zrevrangebyscore min_score = current_time if read_filter == 'unread': # +1 for the intersection b/w zF and F, which carries an implicit score of 1. @@ -213,7 +220,7 @@ class UserSubscription(models.Model): max_score = 0 if settings.DEBUG and False: - debug_stories = r.zrevrange(unread_ranked_stories_key, 0, -1, withscores=True) + debug_stories = rt.zrevrange(unread_ranked_stories_key, 0, -1, withscores=True) print " ---> Unread all stories (%s - %s) %s stories: %s" % ( min_score, max_score, @@ -222,10 +229,7 @@ class UserSubscription(models.Model): story_ids = byscorefunc(unread_ranked_stories_key, min_score, max_score, start=offset, num=500, withscores=withscores)[:limit] - r.expire(unread_ranked_stories_key, 1*60*60) - if not ignore_user_stories: - r.delete(unread_stories_key) - + if withscores: story_ids = [(s[0], int(s[1])) for s in story_ids] @@ -243,12 +247,12 @@ class UserSubscription(models.Model): def feed_stories(cls, user_id, feed_ids=None, offset=0, limit=6, order='newest', read_filter='all', usersubs=None, cutoff_date=None, all_feed_ids=None): - r = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_POOL) + rt = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_TEMP_POOL) if order == 'oldest': - range_func = r.zrange + range_func = rt.zrange else: - range_func = r.zrevrange + range_func = rt.zrevrange if not feed_ids: feed_ids = [] @@ -259,8 +263,8 @@ class UserSubscription(models.Model): feeds_string = ','.join(str(f) for f in sorted(all_feed_ids))[:30] ranked_stories_keys = 'zU:%s:feeds:%s' % (user_id, feeds_string) unread_ranked_stories_keys = 'zhU:%s:feeds:%s' % (user_id, feeds_string) - stories_cached = r.exists(ranked_stories_keys) - unreads_cached = True if read_filter == "unread" else r.exists(unread_ranked_stories_keys) + stories_cached = rt.exists(ranked_stories_keys) + unreads_cached = True if read_filter == "unread" else rt.exists(unread_ranked_stories_keys) if offset and stories_cached and unreads_cached: story_hashes = range_func(ranked_stories_keys, offset, limit) if read_filter == "unread": @@ -269,8 +273,8 @@ class UserSubscription(models.Model): unread_story_hashes = range_func(unread_ranked_stories_keys, 0, offset+limit) return story_hashes, unread_story_hashes else: - r.delete(ranked_stories_keys) - r.delete(unread_ranked_stories_keys) + rt.delete(ranked_stories_keys) + rt.delete(unread_ranked_stories_keys) story_hashes = cls.story_hashes(user_id, feed_ids=feed_ids, read_filter=read_filter, order=order, @@ -281,8 +285,10 @@ class UserSubscription(models.Model): if not story_hashes: return [], [] + pipeline = rt.pipeline() for story_hash_group in chunks(story_hashes, 100): - r.zadd(ranked_stories_keys, **dict(story_hash_group)) + pipeline.zadd(ranked_stories_keys, **dict(story_hash_group)) + pipeline.execute() story_hashes = range_func(ranked_stories_keys, offset, limit) if read_filter == "unread": @@ -295,11 +301,11 @@ class UserSubscription(models.Model): cutoff_date=cutoff_date) if unread_story_hashes: for unread_story_hash_group in chunks(unread_story_hashes, 100): - r.zadd(unread_ranked_stories_keys, **dict(unread_story_hash_group)) + rt.zadd(unread_ranked_stories_keys, **dict(unread_story_hash_group)) unread_feed_story_hashes = range_func(unread_ranked_stories_keys, offset, limit) - r.expire(ranked_stories_keys, 60*60) - r.expire(unread_ranked_stories_keys, 60*60) + rt.expire(ranked_stories_keys, 60*60) + rt.expire(unread_ranked_stories_keys, 60*60) return story_hashes, unread_feed_story_hashes @@ -411,6 +417,35 @@ class UserSubscription(models.Model): r.srem(read_stories_key, *stale_story_hashes) r.srem("RS:%s" % self.feed_id, *stale_story_hashes) + @classmethod + def trim_user_read_stories(self, user_id): + r = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_POOL) + subs = UserSubscription.objects.filter(user_id=13).only('feed') + feeds = [f.feed_id for f in subs] + old_rs = r.smembers("RS:%s" % user_id) + # new_rs = r.sunionstore("RS:%s" % user_id, *["RS:%s:%s" % (user_id, f) for f in feeds]) + new_rs = r.sunion(*["RS:%s:%s" % (user_id, f) for f in feeds]) + + r.sunionstore("RS:%s:backup" % user_id, "RS:%s" % user_id) + r.expire("RS:%s:backup" % user_id, 60*60*24) + + missing_rs = [] + feed_re = re.compile(r'(\d+):.*?') + for rs in old_rs: + rs_feed_id = feed_re.search(rs).groups()[0] + if int(rs_feed_id) not in feeds: + missing_rs.append(rs) + # r.sadd("RS:%s" % user_id, *missing_rs) + + old_count = len(old_rs) + new_count = len(new_rs) + missing_count = len(missing_rs) + new_total = new_count + missing_count + user = User.objects.get(pk=user_id) + logging.user(user, "~FBTrimming ~FR%s~FB/%s (~SB%s~SN+~SB%s~SN saved) user read stories..." % + (old_count - new_total, old_count, new_count, missing_count)) + + def mark_feed_read(self, cutoff_date=None): if (self.unread_count_negative == 0 and self.unread_count_neutral == 0 diff --git a/apps/social/models.py b/apps/social/models.py index 7395b202d..a725d8ae6 100644 --- a/apps/social/models.py +++ b/apps/social/models.py @@ -29,7 +29,7 @@ from vendor import appdotnet from vendor import pynliner from utils import log as logging from utils import json_functions as json -from utils.feed_functions import relative_timesince +from utils.feed_functions import relative_timesince, chunks from utils.story_functions import truncate_chars, strip_tags, linkify, image_size from utils.scrubber import SelectiveScriptScrubber from utils import s3_utils @@ -862,7 +862,94 @@ class MSocialSubscription(mongo.Document): 'is_trained': self.is_trained, 'feed_opens': self.feed_opens, } - + + @classmethod + def subs_for_users(cls, user_id, subscription_user_ids=None, read_filter="unread"): + socialsubs = cls.objects + if read_filter == "unread": + socialsubs = socialsubs.filter(Q(unread_count_neutral__gt=0) | + Q(unread_count_positive__gt=0)) + if not subscription_user_ids: + socialsubs = socialsubs.filter(user=user_id).only('feed', 'mark_read_date', 'is_trained') + else: + socialsubs = socialsubs.filter(user=user_id, + subscription_user_id__in=subscription_user_ids)\ + .only('feed', 'mark_read_date', 'is_trained') + + return socialsubs + + @classmethod + def story_hashes(cls, user_id, relative_user_id, subscription_user_ids=None, socialsubs=None, + read_filter="unread", order="newest", + include_timestamps=False, group_by_user=True, cutoff_date=None): + r = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_POOL) + pipeline = r.pipeline() + story_hashes = {} if group_by_user else [] + + if not socialsubs: + socialsubs = cls.subs_for_users(relative_user_id, + subscription_user_ids=subscription_user_ids, + read_filter=read_filter) + subscription_user_ids = [sub.subscription_user_id for sub in socialsubs] + if not subscription_user_ids: + return story_hashes + + read_dates = dict((us.subscription_user_id, + int(us.mark_read_date.strftime('%s'))) for us in socialsubs) + current_time = int(time.time() + 60*60*24) + if not cutoff_date: + cutoff_date = datetime.datetime.now() - datetime.timedelta(days=settings.DAYS_OF_STORY_HASHES) + unread_timestamp = int(time.mktime(cutoff_date.timetuple()))-1000 + feed_counter = 0 + + for sub_user_id_group in chunks(subscription_user_ids, 20): + pipeline = r.pipeline() + for sub_user_id in sub_user_id_group: + stories_key = 'B:%s' % (sub_user_id) + sorted_stories_key = 'zB:%s' % (sub_user_id) + read_stories_key = 'RS:%s' % (user_id) + read_social_stories_key = 'RS:%s:B:%s' % (user_id, sub_user_id) + unread_stories_key = 'UB:%s:%s' % (user_id, sub_user_id) + sorted_stories_key = 'zB:%s' % (sub_user_id) + unread_ranked_stories_key = 'zUB:%s:%s' % (user_id, sub_user_id) + expire_unread_stories_key = False + + max_score = current_time + if read_filter == 'unread': + # +1 for the intersection b/w zF and F, which carries an implicit score of 1. + min_score = read_dates[sub_user_id] + 1 + pipeline.sdiffstore(unread_stories_key, stories_key, read_stories_key) + pipeline.sdiffstore(unread_stories_key, unread_stories_key, read_social_stories_key) + expire_unread_stories_key = True + else: + min_score = unread_timestamp + unread_stories_key = stories_key + + if order == 'oldest': + byscorefunc = pipeline.zrangebyscore + else: + byscorefunc = pipeline.zrevrangebyscore + min_score, max_score = max_score, min_score + + pipeline.zinterstore(unread_ranked_stories_key, [sorted_stories_key, unread_stories_key]) + byscorefunc(unread_ranked_stories_key, min_score, max_score, withscores=include_timestamps) + pipeline.delete(unread_ranked_stories_key) + if expire_unread_stories_key: + pipeline.delete(unread_stories_key) + + + results = pipeline.execute() + + for hashes in results: + if not isinstance(hashes, list): continue + if group_by_user: + story_hashes[subscription_user_ids[feed_counter]] = hashes + feed_counter += 1 + else: + story_hashes.extend(hashes) + + return story_hashes + def get_stories(self, offset=0, limit=6, order='newest', read_filter='all', withscores=False, hashes_only=False, cutoff_date=None, mark_read_complement=False): @@ -926,56 +1013,77 @@ class MSocialSubscription(mongo.Document): @classmethod def feed_stories(cls, user_id, social_user_ids, offset=0, limit=6, order='newest', read_filter='all', relative_user_id=None, cache=True, - cutoff_date=None): - r = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_POOL) + socialsubs=None, cutoff_date=None): + rt = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_TEMP_POOL) if not relative_user_id: relative_user_id = user_id if order == 'oldest': - range_func = r.zrange + range_func = rt.zrange else: - range_func = r.zrevrange + range_func = rt.zrevrange if not isinstance(social_user_ids, list): social_user_ids = [social_user_ids] ranked_stories_keys = 'zU:%s:social' % (user_id) - read_ranked_stories_keys = 'zhU:%s:social' % (user_id) + unread_ranked_stories_keys = 'zhU:%s:social' % (user_id) if (offset and cache and - r.exists(ranked_stories_keys) and - r.exists(read_ranked_stories_keys)): - story_hashes = range_func(ranked_stories_keys, offset, offset+limit, withscores=True) - read_story_hashes = range_func(read_ranked_stories_keys, 0, -1) - if story_hashes: - story_hashes, story_dates = zip(*story_hashes) - return story_hashes, story_dates, read_story_hashes + rt.exists(ranked_stories_keys) and + rt.exists(unread_ranked_stories_keys)): + story_hashes_and_dates = range_func(ranked_stories_keys, offset, limit, withscores=True) + story_hashes, story_dates = zip(*story_hashes_and_dates) + if read_filter == "unread": + unread_story_hashes = story_hashes else: - return [], [], [] + unread_story_hashes = range_func(unread_ranked_stories_keys, 0, offset+limit) + return story_hashes, story_dates, unread_story_hashes else: - r.delete(ranked_stories_keys) - r.delete(read_ranked_stories_keys) + rt.delete(ranked_stories_keys) + rt.delete(unread_ranked_stories_keys) - for social_user_id in social_user_ids: - us = cls.objects.get(user_id=relative_user_id, subscription_user_id=social_user_id) - story_hashes = us.get_stories(offset=0, limit=100, - order=order, read_filter=read_filter, - withscores=True, cutoff_date=cutoff_date) - if story_hashes: - r.zadd(ranked_stories_keys, **dict(story_hashes)) - - r.zinterstore(read_ranked_stories_keys, [ranked_stories_keys, "RS:%s" % user_id]) - story_hashes = range_func(ranked_stories_keys, offset, limit, withscores=True) - read_story_hashes = range_func(read_ranked_stories_keys, offset, limit) - r.expire(ranked_stories_keys, 1*60*60) - r.expire(read_ranked_stories_keys, 1*60*60) - - if story_hashes: - story_hashes, story_dates = zip(*story_hashes) - return story_hashes, story_dates, read_story_hashes - else: + story_hashes = cls.story_hashes(user_id, relative_user_id, + subscription_user_ids=social_user_ids, + read_filter=read_filter, order=order, + include_timestamps=True, + group_by_user=False, + socialsubs=socialsubs, + cutoff_date=cutoff_date) + if not story_hashes: return [], [], [] + pipeline = rt.pipeline() + for story_hash_group in chunks(story_hashes, 100): + pipeline.zadd(ranked_stories_keys, **dict(story_hash_group)) + pipeline.execute() + story_hashes_and_dates = range_func(ranked_stories_keys, offset, limit, withscores=True) + if not story_hashes_and_dates: + return [], [], [] + story_hashes, story_dates = zip(*story_hashes_and_dates) + + if read_filter == "unread": + unread_feed_story_hashes = story_hashes + else: + unread_story_hashes = cls.story_hashes(user_id, relative_user_id, + subscription_user_ids=social_user_ids, + read_filter="unread", order=order, + include_timestamps=True, + group_by_user=False, + socialsubs=socialsubs, + cutoff_date=cutoff_date) + if unread_story_hashes: + pipeline = rt.pipeline() + for unread_story_hash_group in chunks(unread_story_hashes, 100): + pipeline.zadd(unread_ranked_stories_keys, **dict(unread_story_hash_group)) + pipeline.execute() + unread_feed_story_hashes = range_func(unread_ranked_stories_keys, offset, limit) + + rt.expire(ranked_stories_keys, 60*60) + rt.expire(unread_ranked_stories_keys, 60*60) + + return story_hashes, story_dates, unread_feed_story_hashes + def mark_story_ids_as_read(self, story_hashes, feed_id=None, mark_all_read=False, request=None): data = dict(code=0, payload=story_hashes) r = redis.Redis(connection_pool=settings.REDIS_POOL) diff --git a/apps/social/views.py b/apps/social/views.py index c305e5ddc..587bb571e 100644 --- a/apps/social/views.py +++ b/apps/social/views.py @@ -192,19 +192,23 @@ def load_river_blurblog(request): if not relative_user_id: relative_user_id = user.pk + socialsubs = MSocialSubscription.objects.filter(user_id=relative_user_id) + if social_user_ids: + socialsubs = socialsubs.filter(subscription_user_id__in=social_user_ids) + if not social_user_ids: - socialsubs = MSocialSubscription.objects.filter(user_id=relative_user_id) social_user_ids = [s.subscription_user_id for s in socialsubs] offset = (page-1) * limit limit = page * limit - 1 - story_hashes, story_dates, read_feed_story_hashes = MSocialSubscription.feed_stories( - user.pk, social_user_ids, - offset=offset, limit=limit, - order=order, read_filter=read_filter, - relative_user_id=relative_user_id, - cutoff_date=user.profile.unread_cutoff) + story_hashes, story_dates, unread_feed_story_hashes = MSocialSubscription.feed_stories( + user.pk, social_user_ids, + offset=offset, limit=limit, + order=order, read_filter=read_filter, + relative_user_id=relative_user_id, + socialsubs=socialsubs, + cutoff_date=user.profile.unread_cutoff) mstories = MStory.find_by_story_hashes(story_hashes) story_hashes_to_dates = dict(zip(story_hashes, story_dates)) def sort_stories_by_hash(a, b): @@ -268,7 +272,7 @@ def load_river_blurblog(request): # Just need to format stories for story in stories: story['read_status'] = 0 - if story['story_hash'] in read_feed_story_hashes: + if story['story_hash'] not in unread_feed_story_hashes: story['read_status'] = 1 story_date = localtime_for_timezone(story['story_date'], user.profile.timezone) story['short_parsed_date'] = format_story_link_date__short(story_date, now) @@ -291,7 +295,7 @@ def load_river_blurblog(request): story['shared_date'] = format_story_link_date__long(shared_date, now) story['shared_comments'] = strip_tags(shared_stories[story['story_hash']]['comments']) if (shared_stories[story['story_hash']]['shared_date'] < user.profile.unread_cutoff or - story['story_hash'] in read_feed_story_hashes): + story['story_hash'] not in unread_feed_story_hashes): story['read_status'] = 1 classifiers = sort_classifiers_by_feed(user=user, feed_ids=story_feed_ids, diff --git a/apps/statistics/rstats.py b/apps/statistics/rstats.py index e84713520..ea157fbfd 100644 --- a/apps/statistics/rstats.py +++ b/apps/statistics/rstats.py @@ -102,16 +102,16 @@ class RStats: prefixes_ttls[prefix]['X'] += 1 elif ttl < 60*60: # 1 hour prefixes_ttls[prefix]['1h'] += 1 - elif ttl < 60*60*12: - prefixes_ttls[prefix]['12h'] += 1 elif ttl < 60*60*24: prefixes_ttls[prefix]['1d'] += 1 - elif ttl < 60*60*168: + elif ttl < 60*60*24*7: prefixes_ttls[prefix]['1w'] += 1 - elif ttl < 60*60*336: + elif ttl < 60*60*24*14: prefixes_ttls[prefix]['2w'] += 1 + elif ttl < 60*60*24*30: + prefixes_ttls[prefix]['4w'] += 1 else: - prefixes_ttls[prefix]['2w+'] += 1 + prefixes_ttls[prefix]['4w+'] += 1 keys_count = len(keys) total_size = float(sum([k for k in sizes.values()])) diff --git a/settings.py b/settings.py index 28115cfcb..24166e21d 100644 --- a/settings.py +++ b/settings.py @@ -585,15 +585,16 @@ MONGOANALYTICSDB = connect(MONGO_ANALYTICS_DB.pop('name'), **MONGO_ANALYTICS_DB) # = Redis = # ========= -REDIS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=0) -REDIS_ANALYTICS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=2) -REDIS_STATISTICS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=3) -REDIS_FEED_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=4) -REDIS_SESSION_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=5) -# REDIS_CACHE_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=6) # Duped in CACHES -REDIS_PUBSUB_POOL = redis.ConnectionPool(host=REDIS_PUBSUB['host'], port=6379, db=0) -REDIS_STORY_HASH_POOL = redis.ConnectionPool(host=REDIS_STORY['host'], port=6379, db=1) -# REDIS_STORY_HASH_POOL2 = redis.ConnectionPool(host=REDIS['host'], port=6379, db=8) +REDIS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=0) +REDIS_ANALYTICS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=2) +REDIS_STATISTICS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=3) +REDIS_FEED_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=4) +REDIS_SESSION_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=5) +# REDIS_CACHE_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=6) # Duped in CACHES +REDIS_PUBSUB_POOL = redis.ConnectionPool(host=REDIS_PUBSUB['host'], port=6379, db=0) +REDIS_STORY_HASH_POOL = redis.ConnectionPool(host=REDIS_STORY['host'], port=6379, db=1) +# REDIS_STORY_HASH_POOL2 = redis.ConnectionPool(host=REDIS['host'], port=6379, db=8) +REDIS_STORY_HASH_TEMP_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=10) # ========== # = Assets =