Major refactoring of social blurblog. Also adding a new trimmer, but keeping it turned off.

This commit is contained in:
Samuel Clay 2013-10-07 13:36:10 -07:00
parent cc64e3626c
commit 4acad6e59f
5 changed files with 232 additions and 84 deletions

View file

@ -1,5 +1,6 @@
import datetime
import time
import re
import redis
from utils import log as logging
from utils import json_functions as json
@ -150,9 +151,10 @@ class UserSubscription(models.Model):
pipeline.zinterstore(unread_ranked_stories_key, [sorted_stories_key, unread_stories_key])
byscorefunc(unread_ranked_stories_key, min_score, max_score, withscores=include_timestamps)
pipeline.expire(unread_ranked_stories_key, 60*60)
pipeline.delete(unread_ranked_stories_key)
if expire_unread_stories_key:
pipeline.delete(unread_stories_key)
results = pipeline.execute()
@ -163,12 +165,13 @@ class UserSubscription(models.Model):
feed_counter += 1
else:
story_hashes.extend(hashes)
return story_hashes
def get_stories(self, offset=0, limit=6, order='newest', read_filter='all', withscores=False,
hashes_only=False, cutoff_date=None):
r = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_POOL)
rt = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_TEMP_POOL)
ignore_user_stories = False
stories_key = 'F:%s' % (self.feed_id)
@ -177,34 +180,38 @@ class UserSubscription(models.Model):
unread_ranked_stories_key = 'z%sU:%s:%s' % ('h' if hashes_only else '',
self.user_id, self.feed_id)
if offset and not withscores and r.exists(unread_ranked_stories_key):
pass
else:
r.delete(unread_ranked_stories_key)
if withscores or not offset or not rt.exists(unread_ranked_stories_key):
rt.delete(unread_ranked_stories_key)
if not r.exists(stories_key):
# print " ---> No stories on feed: %s" % self
return []
elif read_filter != 'unread' or not r.exists(read_stories_key):
elif read_filter == 'all' or not r.exists(read_stories_key):
ignore_user_stories = True
unread_stories_key = stories_key
else:
r.sdiffstore(unread_stories_key, stories_key, read_stories_key)
sorted_stories_key = 'zF:%s' % (self.feed_id)
r.zinterstore(unread_ranked_stories_key, [sorted_stories_key, unread_stories_key])
if not ignore_user_stories:
r.delete(unread_stories_key)
dump = r.dump(unread_ranked_stories_key)
rt.restore(unread_ranked_stories_key, 1*60*60, dump)
r.delete(unread_ranked_stories_key)
current_time = int(time.time() + 60*60*24)
current_time = int(time.time() + 60*60*24)
if not cutoff_date:
cutoff_date = datetime.datetime.now() - datetime.timedelta(days=settings.DAYS_OF_UNREAD)
if order == 'oldest':
byscorefunc = r.zrangebyscore
byscorefunc = rt.zrangebyscore
if read_filter == 'unread':
min_score = int(time.mktime(self.mark_read_date.timetuple())) + 1
else:
min_score = int(time.mktime(cutoff_date.timetuple()))-1000
max_score = current_time
else:
byscorefunc = r.zrevrangebyscore
byscorefunc = rt.zrevrangebyscore
min_score = current_time
if read_filter == 'unread':
# +1 for the intersection b/w zF and F, which carries an implicit score of 1.
@ -213,7 +220,7 @@ class UserSubscription(models.Model):
max_score = 0
if settings.DEBUG and False:
debug_stories = r.zrevrange(unread_ranked_stories_key, 0, -1, withscores=True)
debug_stories = rt.zrevrange(unread_ranked_stories_key, 0, -1, withscores=True)
print " ---> Unread all stories (%s - %s) %s stories: %s" % (
min_score,
max_score,
@ -222,10 +229,7 @@ class UserSubscription(models.Model):
story_ids = byscorefunc(unread_ranked_stories_key, min_score,
max_score, start=offset, num=500,
withscores=withscores)[:limit]
r.expire(unread_ranked_stories_key, 1*60*60)
if not ignore_user_stories:
r.delete(unread_stories_key)
if withscores:
story_ids = [(s[0], int(s[1])) for s in story_ids]
@ -243,12 +247,12 @@ class UserSubscription(models.Model):
def feed_stories(cls, user_id, feed_ids=None, offset=0, limit=6,
order='newest', read_filter='all', usersubs=None, cutoff_date=None,
all_feed_ids=None):
r = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_POOL)
rt = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_TEMP_POOL)
if order == 'oldest':
range_func = r.zrange
range_func = rt.zrange
else:
range_func = r.zrevrange
range_func = rt.zrevrange
if not feed_ids:
feed_ids = []
@ -259,8 +263,8 @@ class UserSubscription(models.Model):
feeds_string = ','.join(str(f) for f in sorted(all_feed_ids))[:30]
ranked_stories_keys = 'zU:%s:feeds:%s' % (user_id, feeds_string)
unread_ranked_stories_keys = 'zhU:%s:feeds:%s' % (user_id, feeds_string)
stories_cached = r.exists(ranked_stories_keys)
unreads_cached = True if read_filter == "unread" else r.exists(unread_ranked_stories_keys)
stories_cached = rt.exists(ranked_stories_keys)
unreads_cached = True if read_filter == "unread" else rt.exists(unread_ranked_stories_keys)
if offset and stories_cached and unreads_cached:
story_hashes = range_func(ranked_stories_keys, offset, limit)
if read_filter == "unread":
@ -269,8 +273,8 @@ class UserSubscription(models.Model):
unread_story_hashes = range_func(unread_ranked_stories_keys, 0, offset+limit)
return story_hashes, unread_story_hashes
else:
r.delete(ranked_stories_keys)
r.delete(unread_ranked_stories_keys)
rt.delete(ranked_stories_keys)
rt.delete(unread_ranked_stories_keys)
story_hashes = cls.story_hashes(user_id, feed_ids=feed_ids,
read_filter=read_filter, order=order,
@ -281,8 +285,10 @@ class UserSubscription(models.Model):
if not story_hashes:
return [], []
pipeline = rt.pipeline()
for story_hash_group in chunks(story_hashes, 100):
r.zadd(ranked_stories_keys, **dict(story_hash_group))
pipeline.zadd(ranked_stories_keys, **dict(story_hash_group))
pipeline.execute()
story_hashes = range_func(ranked_stories_keys, offset, limit)
if read_filter == "unread":
@ -295,11 +301,11 @@ class UserSubscription(models.Model):
cutoff_date=cutoff_date)
if unread_story_hashes:
for unread_story_hash_group in chunks(unread_story_hashes, 100):
r.zadd(unread_ranked_stories_keys, **dict(unread_story_hash_group))
rt.zadd(unread_ranked_stories_keys, **dict(unread_story_hash_group))
unread_feed_story_hashes = range_func(unread_ranked_stories_keys, offset, limit)
r.expire(ranked_stories_keys, 60*60)
r.expire(unread_ranked_stories_keys, 60*60)
rt.expire(ranked_stories_keys, 60*60)
rt.expire(unread_ranked_stories_keys, 60*60)
return story_hashes, unread_feed_story_hashes
@ -411,6 +417,35 @@ class UserSubscription(models.Model):
r.srem(read_stories_key, *stale_story_hashes)
r.srem("RS:%s" % self.feed_id, *stale_story_hashes)
@classmethod
def trim_user_read_stories(self, user_id):
r = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_POOL)
subs = UserSubscription.objects.filter(user_id=13).only('feed')
feeds = [f.feed_id for f in subs]
old_rs = r.smembers("RS:%s" % user_id)
# new_rs = r.sunionstore("RS:%s" % user_id, *["RS:%s:%s" % (user_id, f) for f in feeds])
new_rs = r.sunion(*["RS:%s:%s" % (user_id, f) for f in feeds])
r.sunionstore("RS:%s:backup" % user_id, "RS:%s" % user_id)
r.expire("RS:%s:backup" % user_id, 60*60*24)
missing_rs = []
feed_re = re.compile(r'(\d+):.*?')
for rs in old_rs:
rs_feed_id = feed_re.search(rs).groups()[0]
if int(rs_feed_id) not in feeds:
missing_rs.append(rs)
# r.sadd("RS:%s" % user_id, *missing_rs)
old_count = len(old_rs)
new_count = len(new_rs)
missing_count = len(missing_rs)
new_total = new_count + missing_count
user = User.objects.get(pk=user_id)
logging.user(user, "~FBTrimming ~FR%s~FB/%s (~SB%s~SN+~SB%s~SN saved) user read stories..." %
(old_count - new_total, old_count, new_count, missing_count))
def mark_feed_read(self, cutoff_date=None):
if (self.unread_count_negative == 0
and self.unread_count_neutral == 0

View file

@ -29,7 +29,7 @@ from vendor import appdotnet
from vendor import pynliner
from utils import log as logging
from utils import json_functions as json
from utils.feed_functions import relative_timesince
from utils.feed_functions import relative_timesince, chunks
from utils.story_functions import truncate_chars, strip_tags, linkify, image_size
from utils.scrubber import SelectiveScriptScrubber
from utils import s3_utils
@ -862,7 +862,94 @@ class MSocialSubscription(mongo.Document):
'is_trained': self.is_trained,
'feed_opens': self.feed_opens,
}
@classmethod
def subs_for_users(cls, user_id, subscription_user_ids=None, read_filter="unread"):
socialsubs = cls.objects
if read_filter == "unread":
socialsubs = socialsubs.filter(Q(unread_count_neutral__gt=0) |
Q(unread_count_positive__gt=0))
if not subscription_user_ids:
socialsubs = socialsubs.filter(user=user_id).only('feed', 'mark_read_date', 'is_trained')
else:
socialsubs = socialsubs.filter(user=user_id,
subscription_user_id__in=subscription_user_ids)\
.only('feed', 'mark_read_date', 'is_trained')
return socialsubs
@classmethod
def story_hashes(cls, user_id, relative_user_id, subscription_user_ids=None, socialsubs=None,
read_filter="unread", order="newest",
include_timestamps=False, group_by_user=True, cutoff_date=None):
r = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_POOL)
pipeline = r.pipeline()
story_hashes = {} if group_by_user else []
if not socialsubs:
socialsubs = cls.subs_for_users(relative_user_id,
subscription_user_ids=subscription_user_ids,
read_filter=read_filter)
subscription_user_ids = [sub.subscription_user_id for sub in socialsubs]
if not subscription_user_ids:
return story_hashes
read_dates = dict((us.subscription_user_id,
int(us.mark_read_date.strftime('%s'))) for us in socialsubs)
current_time = int(time.time() + 60*60*24)
if not cutoff_date:
cutoff_date = datetime.datetime.now() - datetime.timedelta(days=settings.DAYS_OF_STORY_HASHES)
unread_timestamp = int(time.mktime(cutoff_date.timetuple()))-1000
feed_counter = 0
for sub_user_id_group in chunks(subscription_user_ids, 20):
pipeline = r.pipeline()
for sub_user_id in sub_user_id_group:
stories_key = 'B:%s' % (sub_user_id)
sorted_stories_key = 'zB:%s' % (sub_user_id)
read_stories_key = 'RS:%s' % (user_id)
read_social_stories_key = 'RS:%s:B:%s' % (user_id, sub_user_id)
unread_stories_key = 'UB:%s:%s' % (user_id, sub_user_id)
sorted_stories_key = 'zB:%s' % (sub_user_id)
unread_ranked_stories_key = 'zUB:%s:%s' % (user_id, sub_user_id)
expire_unread_stories_key = False
max_score = current_time
if read_filter == 'unread':
# +1 for the intersection b/w zF and F, which carries an implicit score of 1.
min_score = read_dates[sub_user_id] + 1
pipeline.sdiffstore(unread_stories_key, stories_key, read_stories_key)
pipeline.sdiffstore(unread_stories_key, unread_stories_key, read_social_stories_key)
expire_unread_stories_key = True
else:
min_score = unread_timestamp
unread_stories_key = stories_key
if order == 'oldest':
byscorefunc = pipeline.zrangebyscore
else:
byscorefunc = pipeline.zrevrangebyscore
min_score, max_score = max_score, min_score
pipeline.zinterstore(unread_ranked_stories_key, [sorted_stories_key, unread_stories_key])
byscorefunc(unread_ranked_stories_key, min_score, max_score, withscores=include_timestamps)
pipeline.delete(unread_ranked_stories_key)
if expire_unread_stories_key:
pipeline.delete(unread_stories_key)
results = pipeline.execute()
for hashes in results:
if not isinstance(hashes, list): continue
if group_by_user:
story_hashes[subscription_user_ids[feed_counter]] = hashes
feed_counter += 1
else:
story_hashes.extend(hashes)
return story_hashes
def get_stories(self, offset=0, limit=6, order='newest', read_filter='all',
withscores=False, hashes_only=False, cutoff_date=None,
mark_read_complement=False):
@ -926,56 +1013,77 @@ class MSocialSubscription(mongo.Document):
@classmethod
def feed_stories(cls, user_id, social_user_ids, offset=0, limit=6,
order='newest', read_filter='all', relative_user_id=None, cache=True,
cutoff_date=None):
r = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_POOL)
socialsubs=None, cutoff_date=None):
rt = redis.Redis(connection_pool=settings.REDIS_STORY_HASH_TEMP_POOL)
if not relative_user_id:
relative_user_id = user_id
if order == 'oldest':
range_func = r.zrange
range_func = rt.zrange
else:
range_func = r.zrevrange
range_func = rt.zrevrange
if not isinstance(social_user_ids, list):
social_user_ids = [social_user_ids]
ranked_stories_keys = 'zU:%s:social' % (user_id)
read_ranked_stories_keys = 'zhU:%s:social' % (user_id)
unread_ranked_stories_keys = 'zhU:%s:social' % (user_id)
if (offset and cache and
r.exists(ranked_stories_keys) and
r.exists(read_ranked_stories_keys)):
story_hashes = range_func(ranked_stories_keys, offset, offset+limit, withscores=True)
read_story_hashes = range_func(read_ranked_stories_keys, 0, -1)
if story_hashes:
story_hashes, story_dates = zip(*story_hashes)
return story_hashes, story_dates, read_story_hashes
rt.exists(ranked_stories_keys) and
rt.exists(unread_ranked_stories_keys)):
story_hashes_and_dates = range_func(ranked_stories_keys, offset, limit, withscores=True)
story_hashes, story_dates = zip(*story_hashes_and_dates)
if read_filter == "unread":
unread_story_hashes = story_hashes
else:
return [], [], []
unread_story_hashes = range_func(unread_ranked_stories_keys, 0, offset+limit)
return story_hashes, story_dates, unread_story_hashes
else:
r.delete(ranked_stories_keys)
r.delete(read_ranked_stories_keys)
rt.delete(ranked_stories_keys)
rt.delete(unread_ranked_stories_keys)
for social_user_id in social_user_ids:
us = cls.objects.get(user_id=relative_user_id, subscription_user_id=social_user_id)
story_hashes = us.get_stories(offset=0, limit=100,
order=order, read_filter=read_filter,
withscores=True, cutoff_date=cutoff_date)
if story_hashes:
r.zadd(ranked_stories_keys, **dict(story_hashes))
r.zinterstore(read_ranked_stories_keys, [ranked_stories_keys, "RS:%s" % user_id])
story_hashes = range_func(ranked_stories_keys, offset, limit, withscores=True)
read_story_hashes = range_func(read_ranked_stories_keys, offset, limit)
r.expire(ranked_stories_keys, 1*60*60)
r.expire(read_ranked_stories_keys, 1*60*60)
if story_hashes:
story_hashes, story_dates = zip(*story_hashes)
return story_hashes, story_dates, read_story_hashes
else:
story_hashes = cls.story_hashes(user_id, relative_user_id,
subscription_user_ids=social_user_ids,
read_filter=read_filter, order=order,
include_timestamps=True,
group_by_user=False,
socialsubs=socialsubs,
cutoff_date=cutoff_date)
if not story_hashes:
return [], [], []
pipeline = rt.pipeline()
for story_hash_group in chunks(story_hashes, 100):
pipeline.zadd(ranked_stories_keys, **dict(story_hash_group))
pipeline.execute()
story_hashes_and_dates = range_func(ranked_stories_keys, offset, limit, withscores=True)
if not story_hashes_and_dates:
return [], [], []
story_hashes, story_dates = zip(*story_hashes_and_dates)
if read_filter == "unread":
unread_feed_story_hashes = story_hashes
else:
unread_story_hashes = cls.story_hashes(user_id, relative_user_id,
subscription_user_ids=social_user_ids,
read_filter="unread", order=order,
include_timestamps=True,
group_by_user=False,
socialsubs=socialsubs,
cutoff_date=cutoff_date)
if unread_story_hashes:
pipeline = rt.pipeline()
for unread_story_hash_group in chunks(unread_story_hashes, 100):
pipeline.zadd(unread_ranked_stories_keys, **dict(unread_story_hash_group))
pipeline.execute()
unread_feed_story_hashes = range_func(unread_ranked_stories_keys, offset, limit)
rt.expire(ranked_stories_keys, 60*60)
rt.expire(unread_ranked_stories_keys, 60*60)
return story_hashes, story_dates, unread_feed_story_hashes
def mark_story_ids_as_read(self, story_hashes, feed_id=None, mark_all_read=False, request=None):
data = dict(code=0, payload=story_hashes)
r = redis.Redis(connection_pool=settings.REDIS_POOL)

View file

@ -192,19 +192,23 @@ def load_river_blurblog(request):
if not relative_user_id:
relative_user_id = user.pk
socialsubs = MSocialSubscription.objects.filter(user_id=relative_user_id)
if social_user_ids:
socialsubs = socialsubs.filter(subscription_user_id__in=social_user_ids)
if not social_user_ids:
socialsubs = MSocialSubscription.objects.filter(user_id=relative_user_id)
social_user_ids = [s.subscription_user_id for s in socialsubs]
offset = (page-1) * limit
limit = page * limit - 1
story_hashes, story_dates, read_feed_story_hashes = MSocialSubscription.feed_stories(
user.pk, social_user_ids,
offset=offset, limit=limit,
order=order, read_filter=read_filter,
relative_user_id=relative_user_id,
cutoff_date=user.profile.unread_cutoff)
story_hashes, story_dates, unread_feed_story_hashes = MSocialSubscription.feed_stories(
user.pk, social_user_ids,
offset=offset, limit=limit,
order=order, read_filter=read_filter,
relative_user_id=relative_user_id,
socialsubs=socialsubs,
cutoff_date=user.profile.unread_cutoff)
mstories = MStory.find_by_story_hashes(story_hashes)
story_hashes_to_dates = dict(zip(story_hashes, story_dates))
def sort_stories_by_hash(a, b):
@ -268,7 +272,7 @@ def load_river_blurblog(request):
# Just need to format stories
for story in stories:
story['read_status'] = 0
if story['story_hash'] in read_feed_story_hashes:
if story['story_hash'] not in unread_feed_story_hashes:
story['read_status'] = 1
story_date = localtime_for_timezone(story['story_date'], user.profile.timezone)
story['short_parsed_date'] = format_story_link_date__short(story_date, now)
@ -291,7 +295,7 @@ def load_river_blurblog(request):
story['shared_date'] = format_story_link_date__long(shared_date, now)
story['shared_comments'] = strip_tags(shared_stories[story['story_hash']]['comments'])
if (shared_stories[story['story_hash']]['shared_date'] < user.profile.unread_cutoff or
story['story_hash'] in read_feed_story_hashes):
story['story_hash'] not in unread_feed_story_hashes):
story['read_status'] = 1
classifiers = sort_classifiers_by_feed(user=user, feed_ids=story_feed_ids,

View file

@ -102,16 +102,16 @@ class RStats:
prefixes_ttls[prefix]['X'] += 1
elif ttl < 60*60: # 1 hour
prefixes_ttls[prefix]['1h'] += 1
elif ttl < 60*60*12:
prefixes_ttls[prefix]['12h'] += 1
elif ttl < 60*60*24:
prefixes_ttls[prefix]['1d'] += 1
elif ttl < 60*60*168:
elif ttl < 60*60*24*7:
prefixes_ttls[prefix]['1w'] += 1
elif ttl < 60*60*336:
elif ttl < 60*60*24*14:
prefixes_ttls[prefix]['2w'] += 1
elif ttl < 60*60*24*30:
prefixes_ttls[prefix]['4w'] += 1
else:
prefixes_ttls[prefix]['2w+'] += 1
prefixes_ttls[prefix]['4w+'] += 1
keys_count = len(keys)
total_size = float(sum([k for k in sizes.values()]))

View file

@ -585,15 +585,16 @@ MONGOANALYTICSDB = connect(MONGO_ANALYTICS_DB.pop('name'), **MONGO_ANALYTICS_DB)
# = Redis =
# =========
REDIS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=0)
REDIS_ANALYTICS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=2)
REDIS_STATISTICS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=3)
REDIS_FEED_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=4)
REDIS_SESSION_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=5)
# REDIS_CACHE_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=6) # Duped in CACHES
REDIS_PUBSUB_POOL = redis.ConnectionPool(host=REDIS_PUBSUB['host'], port=6379, db=0)
REDIS_STORY_HASH_POOL = redis.ConnectionPool(host=REDIS_STORY['host'], port=6379, db=1)
# REDIS_STORY_HASH_POOL2 = redis.ConnectionPool(host=REDIS['host'], port=6379, db=8)
REDIS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=0)
REDIS_ANALYTICS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=2)
REDIS_STATISTICS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=3)
REDIS_FEED_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=4)
REDIS_SESSION_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=5)
# REDIS_CACHE_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=6) # Duped in CACHES
REDIS_PUBSUB_POOL = redis.ConnectionPool(host=REDIS_PUBSUB['host'], port=6379, db=0)
REDIS_STORY_HASH_POOL = redis.ConnectionPool(host=REDIS_STORY['host'], port=6379, db=1)
# REDIS_STORY_HASH_POOL2 = redis.ConnectionPool(host=REDIS['host'], port=6379, db=8)
REDIS_STORY_HASH_TEMP_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=10)
# ==========
# = Assets =