Upgrading to latest celery 4 (holy moly), which required some big changes to project layout. Still needs supervisor scripts updated.

This commit is contained in:
Samuel Clay 2020-06-29 17:39:55 -04:00
parent d60c917a9c
commit 3fa55aff17
21 changed files with 834 additions and 48 deletions

View file

@ -599,10 +599,10 @@ class Profile(models.Model):
muted_feed = not bool(user_ids[profile['user_id']])
if muted_feed:
last_seen_on = 0
pipeline.zadd(key, profile['user_id'], last_seen_on)
pipeline.zadd(key, {profile['user_id']: last_seen_on})
total += 1
if profile['is_premium']:
pipeline.zadd(premium_key, profile['user_id'], last_seen_on)
pipeline.zadd(premium_key, {profile['user_id']: last_seen_on})
premium += 1
else:
pipeline.zrem(premium_key, profile['user_id'])
@ -615,9 +615,9 @@ class Profile(models.Model):
if entire_feed_counted:
now = int(datetime.datetime.now().strftime('%s'))
r.zadd(key, -1, now)
r.zadd(key, {-1: now})
r.expire(key, settings.SUBSCRIBER_EXPIRE*24*60*60)
r.zadd(premium_key, -1, now)
r.zadd(premium_key, {-1: now})
r.expire(premium_key, settings.SUBSCRIBER_EXPIRE*24*60*60)
logging.info(" ---> [%-30s] ~SN~FBCounting subscribers, storing in ~SBredis~SN: ~FMt:~SB~FM%s~SN a:~SB%s~SN p:~SB%s~SN ap:~SB%s" %
@ -643,9 +643,9 @@ class Profile(models.Model):
last_seen_on = int(user.profile.last_seen_on.strftime('%s'))
if feed_ids is muted_feed_ids:
last_seen_on = 0
pipeline.zadd(key, user.pk, last_seen_on)
pipeline.zadd(key, {user.pk: last_seen_on})
if user.profile.is_premium:
pipeline.zadd(premium_key, user.pk, last_seen_on)
pipeline.zadd(premium_key, {user.pk: last_seen_on})
else:
pipeline.zrem(premium_key, user.pk)
pipeline.execute()
@ -1530,7 +1530,7 @@ class RNewUserQueue:
r = redis.Redis(connection_pool=settings.REDIS_FEED_UPDATE_POOL)
now = time.time()
r.zadd(cls.KEY, user_id, now)
r.zadd(cls.KEY, {user_id: now})
@classmethod
def user_count(cls):

View file

@ -310,7 +310,7 @@ class UserSubscription(models.Model):
pipeline = rt.pipeline()
for story_hash_group in chunks(story_hashes, 100):
pipeline.zadd(ranked_stories_keys, **dict(story_hash_group))
pipeline.zadd(ranked_stories_keys, dict(story_hash_group))
pipeline.execute()
story_hashes = range_func(ranked_stories_keys, offset, limit)
@ -325,7 +325,7 @@ class UserSubscription(models.Model):
cutoff_date=cutoff_date)
if unread_story_hashes:
for unread_story_hash_group in chunks(unread_story_hashes, 100):
rt.zadd(unread_ranked_stories_keys, **dict(unread_story_hash_group))
rt.zadd(unread_ranked_stories_keys, dict(unread_story_hash_group))
unread_feed_story_hashes = range_func(unread_ranked_stories_keys, offset, limit)
rt.expire(ranked_stories_keys, 60*60)

View file

@ -29,7 +29,7 @@ from django.contrib.sites.models import Site
from django.template.defaultfilters import slugify
from django.utils.encoding import smart_str, smart_unicode
from mongoengine.queryset import OperationError, Q, NotUniqueError
from mongoengine.base import ValidationError
from mongoengine.errors import ValidationError
from vendor.timezones.utilities import localtime_for_timezone
from apps.rss_feeds.tasks import UpdateFeeds, PushFeeds, ScheduleCountTagsForUser
from apps.rss_feeds.text_importer import TextImporter
@ -556,7 +556,7 @@ class Feed(models.Model):
now = datetime.datetime.now().strftime("%s")
p = r.pipeline()
for feed_id in feeds:
p.zadd('tasked_feeds', feed_id, now)
p.zadd('tasked_feeds', {feed_id: now})
p.execute()
# for feed_ids in (feeds[pos:pos + queue_size] for pos in xrange(0, len(feeds), queue_size)):
@ -1169,7 +1169,7 @@ class Feed(models.Model):
'requesting_user_id': kwargs.get('requesting_user_id', None)
}
if getattr(settings, 'TEST_DEBUG', False):
if getattr(settings, 'TEST_DEBUG', False) and "NEWSBLUR_DIR" in self.feed_address:
print " ---> Testing feed fetch: %s" % self.log_title
# options['force_fp'] = True # No, why would this be needed?
original_feed_address = self.feed_address
@ -1192,7 +1192,7 @@ class Feed(models.Model):
if feed:
feed.last_update = datetime.datetime.utcnow()
feed.set_next_scheduled_update()
r.zadd('fetched_feeds_last_hour', feed.pk, int(datetime.datetime.now().strftime('%s')))
r.zadd('fetched_feeds_last_hour', {feed.pk: int(datetime.datetime.now().strftime('%s'))})
if not feed or original_feed_id != feed.pk:
logging.info(" ---> ~FRFeed changed id, removing %s from tasked_feeds queue..." % original_feed_id)
@ -2195,7 +2195,7 @@ class Feed(models.Model):
if minutes_to_next_fetch > self.min_to_decay or not skip_scheduling:
self.next_scheduled_update = next_scheduled_update
if self.active_subscribers >= 1:
r.zadd('scheduled_updates', self.pk, self.next_scheduled_update.strftime('%s'))
r.zadd('scheduled_updates', {self.pk: self.next_scheduled_update.strftime('%s')})
r.zrem('tasked_feeds', self.pk)
r.srem('queued_feeds', self.pk)
@ -2221,7 +2221,7 @@ class Feed(models.Model):
logging.debug(' ---> [%-30s] Scheduling feed fetch immediately...' % (self.log_title[:30]))
self.next_scheduled_update = datetime.datetime.utcnow()
r.zadd('scheduled_updates', self.pk, self.next_scheduled_update.strftime('%s'))
r.zadd('scheduled_updates', {self.pk: self.next_scheduled_update.strftime('%s')})
return self.save()
@ -2354,11 +2354,6 @@ class MFeedPage(mongo.Document):
'allow_inheritance': False,
}
def save(self, *args, **kwargs):
if self.page_data:
self.page_data = zlib.compress(self.page_data)
return super(MFeedPage, self).save(*args, **kwargs)
def page(self):
return zlib.decompress(self.page_data)
@ -2682,7 +2677,7 @@ class MStory(mongo.Document):
# r2.sadd(feed_key, self.story_hash)
# r2.expire(feed_key, settings.DAYS_OF_STORY_HASHES*24*60*60)
r.zadd('z' + feed_key, self.story_hash, time.mktime(self.story_date.timetuple()))
r.zadd('z' + feed_key, {self.story_hash: time.mktime(self.story_date.timetuple())})
r.expire('z' + feed_key, settings.DAYS_OF_STORY_HASHES*24*60*60)
# r2.zadd('z' + feed_key, self.story_hash, time.mktime(self.story_date.timetuple()))
# r2.expire('z' + feed_key, settings.DAYS_OF_STORY_HASHES*24*60*60)

View file

@ -289,7 +289,8 @@ class PageImporter(object):
if feed_page.page() == html:
logging.debug(' ---> [%-30s] ~FYNo change in page data: %s' % (self.feed.log_title[:30], self.feed.feed_link))
else:
feed_page.page_data = html
# logging.debug(' ---> [%-30s] ~FYChange in page data: %s (%s/%s %s/%s)' % (self.feed.log_title[:30], self.feed.feed_link, type(html), type(feed_page.page()), len(html), len(feed_page.page())))
feed_page.page_data = zlib.compress(html)
feed_page.save()
except MFeedPage.DoesNotExist:
feed_page = MFeedPage.objects.create(feed_id=self.feed.pk, page_data=html)

View file

@ -90,7 +90,7 @@ class TaskBrokenFeeds(Task):
r.zremrangebyscore('tasked_feeds', 0, hours_ago)
# r.sadd('queued_feeds', *old_tasked_feeds)
for feed_id in old_tasked_feeds:
r.zincrby('error_feeds', feed_id, 1)
r.zincrby('error_feeds', 1, feed_id)
feed = Feed.get_by_id(feed_id)
feed.set_next_scheduled_update()
logging.debug(" ---> ~SN~FBRe-queuing ~SB%s~SN dropped/broken feeds (~SB%s/%s~SN queued/tasked)" % (

View file

@ -1108,7 +1108,7 @@ class MSocialSubscription(mongo.Document):
pipeline = rt.pipeline()
for story_hash_group in chunks(story_hashes, 100):
pipeline.zadd(ranked_stories_keys, **dict(story_hash_group))
pipeline.zadd(ranked_stories_keys, dict(story_hash_group))
pipeline.execute()
story_hashes_and_dates = range_func(ranked_stories_keys, offset, limit, withscores=True)
if not story_hashes_and_dates:
@ -1129,7 +1129,7 @@ class MSocialSubscription(mongo.Document):
if unread_story_hashes:
pipeline = rt.pipeline()
for unread_story_hash_group in chunks(unread_story_hashes, 100):
pipeline.zadd(unread_ranked_stories_keys, **dict(unread_story_hash_group))
pipeline.zadd(unread_ranked_stories_keys, dict(unread_story_hash_group))
pipeline.execute()
unread_feed_story_hashes = range_func(unread_ranked_stories_keys, offset, limit)
@ -1865,10 +1865,10 @@ class MSharedStory(mongo.DynamicDocument):
r.sadd('B:%s' % self.user_id, self.feed_guid_hash)
# r2.sadd('B:%s' % self.user_id, self.feed_guid_hash)
r.zadd('zB:%s' % self.user_id, self.feed_guid_hash,
time.mktime(self.shared_date.timetuple()))
# r2.zadd('zB:%s' % self.user_id, self.feed_guid_hash,
# time.mktime(self.shared_date.timetuple()))
r.zadd('zB:%s' % self.user_id, {self.feed_guid_hash:
time.mktime(self.shared_date.timetuple())})
# r2.zadd('zB:%s' % self.user_id, {self.feed_guid_hash:
# time.mktime(self.shared_date.timetuple())})
r.expire('B:%s' % self.user_id, settings.DAYS_OF_STORY_HASHES*24*60*60)
# r2.expire('B:%s' % self.user_id, settings.DAYS_OF_STORY_HASHES*24*60*60)
r.expire('zB:%s' % self.user_id, settings.DAYS_OF_STORY_HASHES*24*60*60)

View file

@ -2,11 +2,10 @@ bleach==3.1.4
BeautifulSoup==3.2.1
beautifulsoup4==4.8.0
boto==2.43.0
celery==3.1.25
celery>=4.0,<5
chardet==3.0.4
cssutils==1.0.1
django-celery-with-redis==3.0
django-celery==3.1.17
django-celery-beat==2.0.0
django-compress==1.0.1
django-cors-middleware==1.3.1
django-extensions==1.6.7
@ -14,7 +13,7 @@ django-mailgun==0.9.1
django-oauth-toolkit==0.10.0
django-qurl==0.1.1
django-paypal==1.0
django-redis-cache==1.7.1
django-redis-cache==2.1.1
django-redis-sessions==0.6.1
django-ses==0.7.1
django-subdomains==2.1.0
@ -29,7 +28,7 @@ image==1.5.27
isodate==0.5.4
lxml==3.6.4
mock==2.0.0
mongoengine==0.10.7
mongoengine==0.17.0
PyMySQL==0.9.3
ndg-httpsclient==0.4.2
nltk==3.4.5
@ -48,7 +47,7 @@ python-digitalocean==1.13.2
python-gflags==3.1.0
pytz==2018.3
raven==6.5.0
redis==2.10.5
redis==3.5.3
requests==2.20.0
scipy==0.18.1
seacucumber==1.5.2

2
fabfile.py vendored
View file

@ -25,7 +25,7 @@ except ImportError:
print "Digital Ocean's API not loaded. Install python-digitalocean."
django.settings_module('settings')
django.settings_module('newsblur.settings')
try:
from django.conf import settings as django_settings
except ImportError:

View file

@ -2,14 +2,8 @@
import os
import sys
try:
import pymysql
pymysql.install_as_MySQLdb()
except:
pass
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "newsblur.settings")
from django.core.management import execute_from_command_line

7
newsblur/__init__.py Normal file
View file

@ -0,0 +1,7 @@
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celeryapp import app as celery_app
__all__ = ['celery_app']

17
newsblur/celeryapp.py Normal file
View file

@ -0,0 +1,17 @@
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'newsblur.settings')
app = Celery('newsblur')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

764
newsblur/settings.py Normal file
View file

@ -0,0 +1,764 @@
import sys
import os
# ===========================
# = Directory Declaractions =
# ===========================
ROOT_DIR = os.path.dirname(__file__)
NEWSBLUR_DIR = os.path.join(ROOT_DIR, "../")
MEDIA_ROOT = os.path.join(NEWSBLUR_DIR, 'media')
STATIC_ROOT = os.path.join(NEWSBLUR_DIR, 'static')
UTILS_ROOT = os.path.join(NEWSBLUR_DIR, 'utils')
VENDOR_ROOT = os.path.join(NEWSBLUR_DIR, 'vendor')
LOG_FILE = os.path.join(NEWSBLUR_DIR, 'logs/newsblur.log')
IMAGE_MASK = os.path.join(NEWSBLUR_DIR, 'media/img/mask.png')
# ==============
# = PYTHONPATH =
# ==============
if '/utils' not in ' '.join(sys.path):
sys.path.append(UTILS_ROOT)
if '/vendor' not in ' '.join(sys.path):
sys.path.append(VENDOR_ROOT)
import logging
import datetime
import redis
import raven
import django.http
import re
from mongoengine import connect
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
from utils import jammit
# ===================
# = Server Settings =
# ===================
ADMINS = (
('Samuel Clay', 'samuel@newsblur.com'),
)
SERVER_NAME = 'newsblur'
SERVER_EMAIL = 'server@newsblur.com'
HELLO_EMAIL = 'hello@newsblur.com'
NEWSBLUR_URL = 'http://www.newsblur.com'
IMAGES_URL = 'https://imageproxy.newsblur.com'
SECRET_KEY = 'YOUR_SECRET_KEY'
IMAGES_SECRET_KEY = "YOUR_SECRET_IMAGE_KEY"
DNSIMPLE_TOKEN = "YOUR_DNSIMPLE_TOKEN"
RECAPTCHA_SECRET_KEY = "YOUR_RECAPTCHA_KEY"
YOUTUBE_API_KEY = "YOUR_YOUTUBE_API_KEY"
IMAGES_SECRET_KEY = "YOUR_IMAGES_SECRET_KEY"
# ===================
# = Global Settings =
# ===================
DEBUG = True
TEST_DEBUG = False
SEND_BROKEN_LINK_EMAILS = False
DEBUG_QUERIES = False
MANAGERS = ADMINS
PAYPAL_RECEIVER_EMAIL = 'samuel@ofbrooklyn.com'
TIME_ZONE = 'GMT'
LANGUAGE_CODE = 'en-us'
SITE_ID = 1
USE_I18N = False
LOGIN_REDIRECT_URL = '/'
LOGIN_URL = '/account/login'
MEDIA_URL = '/media/'
STATIC_URL = '/media/'
STATIC_ROOT = '/media/'
# URL prefix for admin media -- CSS, JavaScript and images. Make sure to use a
# trailing slash.
# Examples: "http://foo.com/media/", "/media/".
CIPHER_USERNAMES = False
DEBUG_ASSETS = DEBUG
HOMEPAGE_USERNAME = 'popular'
ALLOWED_HOSTS = ['*']
AUTO_PREMIUM_NEW_USERS = True
AUTO_ENABLE_NEW_USERS = True
ENFORCE_SIGNUP_CAPTCHA = False
PAYPAL_TEST = False
# Uncomment below to force all feeds to store this many stories. Default is to cut
# off at 25 stories for single subscriber non-premium feeds and 500 for popular feeds.
# OVERRIDE_STORY_COUNT_MAX = 1000
# ===========================
# = Django-specific Modules =
# ===========================
MIDDLEWARE_CLASSES = (
'django.middleware.gzip.GZipMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'apps.profile.middleware.TimingMiddleware',
'apps.profile.middleware.LastSeenMiddleware',
'apps.profile.middleware.UserAgentBanMiddleware',
'subdomains.middleware.SubdomainMiddleware',
'corsheaders.middleware.CorsMiddleware',
'apps.profile.middleware.SimpsonsMiddleware',
'apps.profile.middleware.ServerHostnameMiddleware',
'oauth2_provider.middleware.OAuth2TokenMiddleware',
# 'debug_toolbar.middleware.DebugToolbarMiddleware',
'apps.profile.middleware.DBProfilerMiddleware',
'apps.profile.middleware.SQLLogToConsoleMiddleware',
'utils.mongo_raw_log_middleware.MongoDumpMiddleware',
'utils.redis_raw_log_middleware.RedisDumpMiddleware',
)
AUTHENTICATION_BACKENDS = (
'oauth2_provider.backends.OAuth2Backend',
'django.contrib.auth.backends.ModelBackend',
)
CORS_ORIGIN_ALLOW_ALL = True
# CORS_ORIGIN_REGEX_WHITELIST = ('^(https?://)?(\w+\.)?newsblur\.com$', )
CORS_ALLOW_CREDENTIALS = True
OAUTH2_PROVIDER = {
'SCOPES': {
'read': 'View new unread stories, saved stories, and shared stories.',
'write': 'Create new saved stories, shared stories, and subscriptions.',
'ifttt': 'Pair your NewsBlur account with other services.',
},
'CLIENT_ID_GENERATOR_CLASS': 'oauth2_provider.generators.ClientIdGenerator',
'ACCESS_TOKEN_EXPIRE_SECONDS': 60*60*24*365*10, # 10 years
'AUTHORIZATION_CODE_EXPIRE_SECONDS': 60*60, # 1 hour
}
# ===========
# = Logging =
# ===========
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '[%(asctime)-12s] %(message)s',
'datefmt': '%b %d %H:%M:%S'
},
'simple': {
'format': '%(message)s'
},
},
'handlers': {
'null': {
'level':'DEBUG',
'class':'logging.NullHandler',
},
'console':{
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'verbose'
},
'pyes':{
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'verbose'
},
'vendor.apns':{
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'verbose'
},
'log_file':{
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler',
'filename': LOG_FILE,
'maxBytes': '16777216', # 16megabytes
'formatter': 'verbose'
},
'mail_admins': {
'level': 'CRITICAL',
'class': 'django.utils.log.AdminEmailHandler',
'filters': ['require_debug_false'],
'include_html': True,
},
# 'sentry': {
# 'level': 'ERROR',
# 'class': 'raven.contrib.django.raven_compat.handlers.SentryHandler'
# },
},
'loggers': {
'django': {
'handlers': ['console', 'mail_admins'],
'level': 'ERROR',
'propagate': True,
},
'django.db.backends': {
'handlers': ['null'],
'level': 'DEBUG',
'propagate': False,
},
'django.security.DisallowedHost': {
'handlers': ['null'],
'propagate': False,
},
'newsblur': {
'handlers': ['console', 'log_file'],
'level': 'DEBUG',
'propagate': False,
},
'readability': {
'handlers': ['console'],
'level': 'DEBUG',
'propagate': False,
},
'pyes': {
'handlers': ['console'],
'level': 'DEBUG',
'propagate': False,
},
'apps': {
'handlers': ['log_file'],
'level': 'INFO',
'propagate': True,
},
# 'raven': {
# 'level': 'DEBUG',
# 'handlers': ['console'],
# 'propagate': False,
# },
# 'sentry.errors': {
# 'level': 'DEBUG',
# 'handlers': ['console'],
# 'propagate': False,
# },
},
'filters': {
'require_debug_false': {
'()': 'django.utils.log.RequireDebugFalse'
}
},
}
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
# ==========================
# = Miscellaneous Settings =
# ==========================
DAYS_OF_UNREAD = 30
DAYS_OF_UNREAD_FREE = 14
# DoSH can be more, since you can up this value by N, and after N days,
# you can then up the DAYS_OF_UNREAD value with no impact.
DAYS_OF_STORY_HASHES = 30
SUBSCRIBER_EXPIRE = 7
ROOT_URLCONF = 'newsblur.urls'
INTERNAL_IPS = ('127.0.0.1',)
LOGGING_LOG_SQL = True
APPEND_SLASH = False
SESSION_ENGINE = 'redis_sessions.session'
TEST_RUNNER = "utils.testrunner.TestRunner"
SESSION_COOKIE_NAME = 'newsblur_sessionid'
SESSION_COOKIE_AGE = 60*60*24*365*10 # 10 years
SESSION_COOKIE_DOMAIN = '.newsblur.com'
SESSION_COOKIE_HTTPONLY = False
SENTRY_DSN = 'https://XXXNEWSBLURXXX@app.getsentry.com/99999999'
SESSION_SERIALIZER = 'django.contrib.sessions.serializers.PickleSerializer'
if DEBUG:
# EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend'
EMAIL_BACKEND = 'vendor.mailgun.MailgunBackend'
else:
EMAIL_BACKEND = 'vendor.mailgun.MailgunBackend'
# ==============
# = Subdomains =
# ==============
SUBDOMAIN_URLCONFS = {
None: 'newsblur.urls',
'www': 'newsblur.urls',
}
REMOVE_WWW_FROM_DOMAIN = True
# ===========
# = Logging =
# ===========
LOG_LEVEL = logging.DEBUG
LOG_TO_STREAM = False
# ===============
# = Django Apps =
# ===============
OAUTH2_PROVIDER_APPLICATION_MODEL = 'oauth2_provider.Application'
INSTALLED_APPS = (
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
'django.contrib.admin',
'django_extensions',
'paypal.standard.ipn',
'apps.rss_feeds',
'apps.reader',
'apps.analyzer',
'apps.feed_import',
'apps.profile',
'apps.recommendations',
'apps.statistics',
'apps.notifications',
'apps.static',
'apps.mobile',
'apps.push',
'apps.social',
'apps.oauth',
'apps.search',
'apps.categories',
'django_celery_beat',
'utils', # missing models so no migrations
'vendor',
'vendor.typogrify',
'vendor.zebra',
'oauth2_provider',
'corsheaders',
)
# ==========
# = Stripe =
# ==========
STRIPE_SECRET = "YOUR-SECRET-API-KEY"
STRIPE_PUBLISHABLE = "YOUR-PUBLISHABLE-API-KEY"
ZEBRA_ENABLE_APP = True
# ==========
# = Celery =
# ==========
CELERY_TASK_ROUTES = {
"work-queue": {
"queue": "work_queue",
"binding_key": "work_queue"
},
"new-feeds": {
"queue": "new_feeds",
"binding_key": "new_feeds"
},
"push-feeds": {
"queue": "push_feeds",
"binding_key": "push_feeds"
},
"update-feeds": {
"queue": "update_feeds",
"binding_key": "update_feeds"
},
"beat-tasks": {
"queue": "beat_tasks",
"binding_key": "beat_tasks"
},
"search-indexer": {
"queue": "search_indexer",
"binding_key": "search_indexer"
},
"search-indexer-tasker": {
"queue": "search_indexer_tasker",
"binding_key": "search_indexer_tasker"
},
}
CELERY_TASK_QUEUES = {
"work_queue": {
"exchange": "work_queue",
"exchange_type": "direct",
"binding_key": "work_queue",
},
"new_feeds": {
"exchange": "new_feeds",
"exchange_type": "direct",
"binding_key": "new_feeds"
},
"push_feeds": {
"exchange": "push_feeds",
"exchange_type": "direct",
"binding_key": "push_feeds"
},
"update_feeds": {
"exchange": "update_feeds",
"exchange_type": "direct",
"binding_key": "update_feeds"
},
"beat_tasks": {
"exchange": "beat_tasks",
"exchange_type": "direct",
"binding_key": "beat_tasks"
},
"beat_feeds_task": {
"exchange": "beat_feeds_task",
"exchange_type": "direct",
"binding_key": "beat_feeds_task"
},
"search_indexer": {
"exchange": "search_indexer",
"exchange_type": "direct",
"binding_key": "search_indexer"
},
"search_indexer_tasker": {
"exchange": "search_indexer_tasker",
"exchange_type": "direct",
"binding_key": "search_indexer_tasker"
},
}
CELERY_TASK_DEFAULT_QUEUE = "work_queue"
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_IMPORTS = ("apps.rss_feeds.tasks",
"apps.social.tasks",
"apps.reader.tasks",
"apps.profile.tasks",
"apps.feed_import.tasks",
"apps.search.tasks",
"apps.statistics.tasks",)
CELERY_WORKER_CONCURRENCY = 4
CELERY_TASK_IGNORE_RESULT = True
CELERY_TASK_ACKS_LATE = True # Retry if task fails
CELERY_WORKER_MAX_TASKS_PER_CHILD = 10
CELERY_TASK_TIME_LIMIT = 12 * 30
CELERY_WORKER_DISABLE_RATE_LIMITS = True
SECONDS_TO_DELAY_CELERY_EMAILS = 60
CELERY_BEAT_SCHEDULE = {
'task-feeds': {
'task': 'task-feeds',
'schedule': datetime.timedelta(minutes=1),
'options': {'queue': 'beat_feeds_task'},
},
'task-broken-feeds': {
'task': 'task-broken-feeds',
'schedule': datetime.timedelta(hours=6),
'options': {'queue': 'beat_feeds_task'},
},
'freshen-homepage': {
'task': 'freshen-homepage',
'schedule': datetime.timedelta(hours=1),
'options': {'queue': 'beat_tasks'},
},
'collect-stats': {
'task': 'collect-stats',
'schedule': datetime.timedelta(minutes=1),
'options': {'queue': 'beat_tasks'},
},
'collect-feedback': {
'task': 'collect-feedback',
'schedule': datetime.timedelta(minutes=1),
'options': {'queue': 'beat_tasks'},
},
'share-popular-stories': {
'task': 'share-popular-stories',
'schedule': datetime.timedelta(minutes=10),
'options': {'queue': 'beat_tasks'},
},
'clean-analytics': {
'task': 'clean-analytics',
'schedule': datetime.timedelta(hours=12),
'options': {'queue': 'beat_tasks', 'timeout': 720*10},
},
'reimport-stripe-history': {
'task': 'reimport-stripe-history',
'schedule': datetime.timedelta(hours=6),
'options': {'queue': 'beat_tasks'},
},
'clean-spam': {
'task': 'clean-spam',
'schedule': datetime.timedelta(hours=1),
'options': {'queue': 'beat_tasks'},
},
'clean-social-spam': {
'task': 'clean-social-spam',
'schedule': datetime.timedelta(hours=6),
'options': {'queue': 'beat_tasks'},
},
'premium-expire': {
'task': 'premium-expire',
'schedule': datetime.timedelta(hours=24),
'options': {'queue': 'beat_tasks'},
},
'activate-next-new-user': {
'task': 'activate-next-new-user',
'schedule': datetime.timedelta(minutes=5),
'options': {'queue': 'beat_tasks'},
},
}
# =========
# = Mongo =
# =========
MONGO_DB = {
'host': 'db_mongo:27017',
'name': 'newsblur',
}
MONGO_ANALYTICS_DB = {
'host': 'db_mongo_analytics:27017',
'name': 'nbanalytics',
}
# ====================
# = Database Routers =
# ====================
class MasterSlaveRouter(object):
"""A router that sets up a simple master/slave configuration"""
def db_for_read(self, model, **hints):
"Point all read operations to a random slave"
return 'slave'
def db_for_write(self, model, **hints):
"Point all write operations to the master"
return 'default'
def allow_relation(self, obj1, obj2, **hints):
"Allow any relation between two objects in the db pool"
db_list = ('slave','default')
if obj1._state.db in db_list and obj2._state.db in db_list:
return True
return None
def allow_migrate(self, db, model):
"Explicitly put all models on all databases."
return True
# =========
# = Redis =
# =========
REDIS = {
'host': '127.0.0.1',
}
REDIS_PUBSUB = {
'host': '127.0.0.1',
}
REDIS_STORY = {
'host': '127.0.0.1',
}
REDIS_SESSIONS = {
'host': '127.0.0.1',
}
CELERY_REDIS_DB_NUM = 4
SESSION_REDIS_DB = 5
# =================
# = Elasticsearch =
# =================
ELASTICSEARCH_FEED_HOSTS = ['db_search_feed:9200']
ELASTICSEARCH_STORY_HOSTS = ['db_search_story:9200']
# ===============
# = Social APIs =
# ===============
FACEBOOK_APP_ID = '111111111111111'
FACEBOOK_SECRET = '99999999999999999999999999999999'
FACEBOOK_NAMESPACE = 'newsblur'
TWITTER_CONSUMER_KEY = 'ooooooooooooooooooooo'
TWITTER_CONSUMER_SECRET = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
YOUTUBE_API_KEY = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
# ===============
# = AWS Backing =
# ===============
ORIGINAL_PAGE_SERVER = "db_pages:3060"
BACKED_BY_AWS = {
'pages_on_s3': False,
'icons_on_s3': False,
}
PROXY_S3_PAGES = True
S3_BACKUP_BUCKET = 'newsblur_backups'
S3_PAGES_BUCKET_NAME = 'pages.newsblur.com'
S3_ICONS_BUCKET_NAME = 'icons.newsblur.com'
S3_AVATARS_BUCKET_NAME = 'avatars.newsblur.com'
# ==================
# = Configurations =
# ==================
from local_settings import *
if not DEBUG:
INSTALLED_APPS += (
'raven.contrib.django',
'django_ses',
)
# RAVEN_CLIENT = raven.Client(dsn=SENTRY_DSN, release=raven.fetch_git_sha(os.path.dirname(__file__)))
RAVEN_CLIENT = raven.Client(SENTRY_DSN)
COMPRESS = not DEBUG
ACCOUNT_ACTIVATION_DAYS = 30
AWS_ACCESS_KEY_ID = S3_ACCESS_KEY
AWS_SECRET_ACCESS_KEY = S3_SECRET
os.environ["AWS_ACCESS_KEY_ID"] = AWS_ACCESS_KEY_ID
os.environ["AWS_SECRET_ACCESS_KEY"] = AWS_SECRET_ACCESS_KEY
def custom_show_toolbar(request):
return DEBUG
DEBUG_TOOLBAR_CONFIG = {
'INTERCEPT_REDIRECTS': True,
'SHOW_TOOLBAR_CALLBACK': custom_show_toolbar,
'HIDE_DJANGO_SQL': False,
}
if DEBUG:
template_loaders = [
'django.template.loaders.filesystem.Loader',
'django.template.loaders.app_directories.Loader',
]
else:
template_loaders = [
('django.template.loaders.cached.Loader', (
'django.template.loaders.filesystem.Loader',
'django.template.loaders.app_directories.Loader',
)),
]
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [os.path.join(NEWSBLUR_DIR, 'templates'),
os.path.join(NEWSBLUR_DIR, 'vendor/zebra/templates')],
# 'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
"django.contrib.auth.context_processors.auth",
"django.template.context_processors.debug",
"django.template.context_processors.media",
'django.template.context_processors.request',
],
'loaders': template_loaders,
},
}
]
# =========
# = Mongo =
# =========
MONGO_DB_DEFAULTS = {
'name': 'newsblur',
'host': 'db_mongo:27017',
'alias': 'default',
'connect': False,
}
MONGO_DB = dict(MONGO_DB_DEFAULTS, **MONGO_DB)
MONGO_DB_NAME = MONGO_DB.pop('name')
# MONGO_URI = 'mongodb://%s' % (MONGO_DB.pop('host'),)
# if MONGO_DB.get('read_preference', pymongo.ReadPreference.PRIMARY) != pymongo.ReadPreference.PRIMARY:
# MONGO_PRIMARY_DB = MONGO_DB.copy()
# MONGO_PRIMARY_DB.update(read_preference=pymongo.ReadPreference.PRIMARY)
# MONGOPRIMARYDB = connect(MONGO_PRIMARY_DB.pop('name'), **MONGO_PRIMARY_DB)
# else:
# MONGOPRIMARYDB = MONGODB
# MONGODB = connect(MONGO_DB.pop('name'), host=MONGO_URI, **MONGO_DB)
MONGODB = connect(MONGO_DB_NAME, **MONGO_DB)
# MONGODB = connect(host="mongodb://localhost:27017/newsblur", connect=False)
MONGO_ANALYTICS_DB_DEFAULTS = {
'name': 'nbanalytics',
'host': 'db_mongo_analytics:27017',
'alias': 'nbanalytics',
}
MONGO_ANALYTICS_DB = dict(MONGO_ANALYTICS_DB_DEFAULTS, **MONGO_ANALYTICS_DB)
MONGO_ANALYTICS_DB_NAME = MONGO_ANALYTICS_DB.pop('name')
# MONGO_ANALYTICS_URI = 'mongodb://%s' % (MONGO_ANALYTICS_DB.pop('host'),)
# MONGOANALYTICSDB = connect(MONGO_ANALYTICS_DB.pop('name'), host=MONGO_ANALYTICS_URI, **MONGO_ANALYTICS_DB)
MONGOANALYTICSDB = connect(MONGO_ANALYTICS_DB_NAME, **MONGO_ANALYTICS_DB)
# =========
# = Redis =
# =========
CELERY_BROKER_URL = "redis://%s:6379/%s" % (REDIS['host'], CELERY_REDIS_DB_NUM)
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
SESSION_REDIS = {
'host': REDIS_SESSIONS['host'],
'port': 6379,
'db': SESSION_REDIS_DB,
# 'password': 'password',
'prefix': '',
'socket_timeout': 10,
'retry_on_timeout': True
}
CACHES = {
'default': {
'BACKEND': 'redis_cache.RedisCache',
'LOCATION': '%s:6379' % REDIS['host'],
'OPTIONS': {
'DB': 6,
'PARSER_CLASS': 'redis.connection.HiredisParser',
'SERIALIZER_CLASS': 'redis_cache.serializers.PickleSerializer'
},
},
}
REDIS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=0)
REDIS_ANALYTICS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=2)
REDIS_STATISTICS_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=3)
REDIS_FEED_UPDATE_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=4)
# REDIS_STORY_HASH_POOL2 = redis.ConnectionPool(host=REDIS['host'], port=6379, db=8) # Only used when changing DAYS_OF_UNREAD
REDIS_STORY_HASH_TEMP_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=10)
# REDIS_CACHE_POOL = redis.ConnectionPool(host=REDIS['host'], port=6379, db=6) # Duped in CACHES
REDIS_STORY_HASH_POOL = redis.ConnectionPool(host=REDIS_STORY['host'], port=6379, db=1)
REDIS_FEED_READ_POOL = redis.ConnectionPool(host=REDIS_SESSIONS['host'], port=6379, db=1)
REDIS_FEED_SUB_POOL = redis.ConnectionPool(host=REDIS_SESSIONS['host'], port=6379, db=2)
REDIS_SESSION_POOL = redis.ConnectionPool(host=REDIS_SESSIONS['host'], port=6379, db=5)
REDIS_PUBSUB_POOL = redis.ConnectionPool(host=REDIS_PUBSUB['host'], port=6379, db=0)
# ==========
# = Celery =
# ==========
# celeryapp.autodiscover_tasks(INSTALLED_APPS)
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
# ==========
# = Assets =
# ==========
JAMMIT = jammit.JammitAssets(ROOT_DIR)
if DEBUG:
MIDDLEWARE_CLASSES += ('utils.request_introspection_middleware.DumpRequestMiddleware',)
MIDDLEWARE_CLASSES += ('utils.exception_middleware.ConsoleExceptionMiddleware',)
# =======
# = AWS =
# =======
S3_CONN = None
if BACKED_BY_AWS.get('pages_on_s3') or BACKED_BY_AWS.get('icons_on_s3'):
S3_CONN = S3Connection(S3_ACCESS_KEY, S3_SECRET, calling_format=OrdinaryCallingFormat())
# if BACKED_BY_AWS.get('pages_on_s3'):
# S3_PAGES_BUCKET = S3_CONN.get_bucket(S3_PAGES_BUCKET_NAME)
# if BACKED_BY_AWS.get('icons_on_s3'):
# S3_ICONS_BUCKET = S3_CONN.get_bucket(S3_ICONS_BUCKET_NAME)
django.http.request.host_validation_re = re.compile(r"^([a-z0-9.-_\-]+|\[[a-f0-9]*:[a-f0-9:]+\])(:\d+)?$")

View file

@ -6,7 +6,7 @@ https://docs.djangoproject.com/en/1.7/howto/deployment/wsgi/
"""
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "newsblur.settings")
from django.core.wsgi import get_wsgi_application
application = get_wsgi_application()

View file

@ -1,6 +1,6 @@
import sys
from mongoengine.queryset import OperationError
from mongoengine.base import ValidationError
from mongoengine.errors import ValidationError
from apps.analyzer.models import MClassifierFeed
from apps.analyzer.models import MClassifierAuthor
from apps.analyzer.models import MClassifierTag

View file

@ -28,6 +28,7 @@ from utils import log as logging
from utils.feed_functions import timelimit, TimeoutError
from qurl import qurl
from BeautifulSoup import BeautifulSoup
from mongoengine import connect, connection
from django.utils import feedgenerator
from django.utils.html import linebreaks
from django.utils.encoding import smart_unicode
@ -237,7 +238,7 @@ class FetchFeed:
return
if channel_id:
video_ids_xml = requests.get("https://www.youtube.com/feeds/videos.xml?channel_id=%s" % channel_id, verify=False)
video_ids_xml = requests.get("https://www.youtube.com/feeds/videos.xml?channel_id=%s" % channel_id)
channel_json = requests.get("https://www.googleapis.com/youtube/v3/channels?part=snippet&id=%s&key=%s" %
(channel_id, settings.YOUTUBE_API_KEY))
channel = json.decode(channel_json.content)
@ -257,7 +258,7 @@ class FetchFeed:
return
channel_url = "https://www.youtube.com/playlist?list=%s" % list_id
elif username:
video_ids_xml = requests.get("https://www.youtube.com/feeds/videos.xml?user=%s" % username, verify=False)
video_ids_xml = requests.get("https://www.youtube.com/feeds/videos.xml?user=%s" % username)
description = "YouTube videos uploaded by %s" % username
else:
return
@ -641,6 +642,12 @@ class Dispatcher:
return Feed.get_by_id(feed_id)
def process_feed_wrapper(self, feed_queue):
connection._connections = {}
connection._connection_settings ={}
connection._dbs = {}
settings.MONGODB = connect(settings.MONGO_DB_NAME, **settings.MONGO_DB)
settings.MONGOANALYTICSDB = connect(settings.MONGO_ANALYTICS_DB_NAME, **settings.MONGO_ANALYTICS_DB)
delta = None
current_process = multiprocessing.current_process()
identity = "X"

View file

@ -6,6 +6,7 @@ from pymongo.mongo_replica_set_client import MongoReplicaSetClient
from time import time
import struct
import bson
import pymongo
from bson.errors import InvalidBSON
class MongoDumpMiddleware(object):
@ -53,7 +54,8 @@ class MongoDumpMiddleware(object):
def _instrument(self, original_method):
def instrumented_method(*args, **kwargs):
query = args[1].get_message(False, False)
with args[0]._socket_for_writes() as sock_info:
query = args[1].get_message(False, sock_info, False)
message = _mongodb_decode_wire_protocol(query[1])
# message = _mongodb_decode_wire_protocol(args[1][1])
if not message or message['msg_id'] in self._used_msg_ids: