Fixing search indexing. Also removing superfluous search_indexer_tasker queue.

This commit is contained in:
Samuel Clay 2021-03-03 17:04:05 -05:00
parent f99ae77789
commit 05afdce691
8 changed files with 30 additions and 34 deletions

View file

@ -34,8 +34,6 @@
command: "celery worker -A newsblur_web --loglevel=INFO -Q beat_feeds_task -c 1"
- container_name: task-search
command: "celery worker -A newsblur_web --loglevel=INFO -Q search_indexer -c 4"
- container_name: task-search
command: "celery worker -A newsblur_web --loglevel=INFO -Q search_indexer_tasker -c 2"
- container_name: task-work
command: "celery worker -A newsblur_web --loglevel=INFO -Q work_queue"

View file

@ -9,8 +9,8 @@ import json
__author__ = "Dananjaya Ramanayake <dananjaya86@gmail.com>, Samuel Clay <samuel@newsblur.com>"
__version__ = "1.0"
API_URL = "http://www.newsblur.com/"
# API_URL = "http://nb.local.host:8000/"
API_URL = "https://www.newsblur.com/"
# API_URL = "https://nb.local.host:8000/"
class request():

View file

@ -9,6 +9,7 @@ import mongoengine as mongo
from django.conf import settings
from django.contrib.auth.models import User
from apps.search.tasks import IndexSubscriptionsForSearch
from apps.search.tasks import FinishIndexSubscriptionsForSearch
from apps.search.tasks import IndexSubscriptionsChunkForSearch
from apps.search.tasks import IndexFeedsForSearch
from utils import log as logging
@ -50,7 +51,7 @@ class MUserSearch(mongo.Document):
def schedule_index_subscriptions_for_search(self):
IndexSubscriptionsForSearch.apply_async(kwargs=dict(user_id=self.user_id),
queue='search_indexer_tasker')
queue='search_indexer')
# Should be run as a background task
def index_subscriptions_for_search(self):
@ -78,15 +79,23 @@ class MUserSearch(mongo.Document):
logging.user(user, "~FCIndexing ~SB%s feeds~SN in %s chunks..." %
(total, len(feed_id_chunks)))
tasks = [IndexSubscriptionsChunkForSearch.s(feed_ids=feed_id_chunk,
search_chunks = [IndexSubscriptionsChunkForSearch.s(feed_ids=feed_id_chunk,
user_id=self.user_id
).set(queue='search_indexer')
for feed_id_chunk in feed_id_chunks]
group = celery.group(*tasks)
res = group.apply_async(queue='search_indexer')
res.join_native(disable_sync_subtasks=False)
callback = FinishIndexSubscriptionsForSearch.s(user_id=self.user_id,
start=start).set(queue='search_indexer')
celery.chord(search_chunks)(callback)
def finish_index_subscriptions_for_search(self, start):
from apps.reader.models import UserSubscription
r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL)
user = User.objects.get(pk=self.user_id)
subscriptions = UserSubscription.objects.filter(user=user).only('feed')
total = subscriptions.count()
duration = time.time() - start
logging.user(user, "~FCIndexed ~SB%s feeds~SN in ~FM~SB%s~FC~SN sec." %
(total, round(duration, 2)))
r.publish(user.username, 'search_index_complete:done')

View file

@ -21,3 +21,11 @@ def IndexFeedsForSearch(feed_ids, user_id):
from apps.search.models import MUserSearch
MUserSearch.index_feeds_for_search(feed_ids, user_id)
@app.task()
def FinishIndexSubscriptionsForSearch(results, user_id, start):
logging.debug(" ---> Indexing finished for %s" % (user_id))
from apps.search.models import MUserSearch
user_search = MUserSearch.get_user(user_id)
user_search.finish_index_subscriptions_for_search(start)

View file

@ -187,7 +187,7 @@ class MSocialProfile(mongo.Document):
@property
def blurblog_url(self):
return "http://%s.%s/" % (
return "https://%s.%s/" % (
self.username_slug,
Site.objects.get_current().domain.replace('www.', ''))

View file

@ -202,16 +202,6 @@ services:
user: $CURRENT_UID:$CURRENT_GID
celeryd_search_indexer_tasker:
container_name: celeryd_search_indexer_tasker
image: newsblur/newsblur_python3
command: "celery worker -A newsblur_web --loglevel=INFO -Q search_indexer_tasker -c 2"
environment:
- DOCKERBUILD=True
volumes:
- app-files:/srv/newsblur
user: $CURRENT_UID:$CURRENT_GID
celeryd_work_queue:
container_name: celeryd_work_queue
image: newsblur/newsblur_python3

View file

@ -375,10 +375,6 @@ CELERY_TASK_ROUTES = {
"queue": "search_indexer",
"binding_key": "search_indexer"
},
"search-indexer-tasker": {
"queue": "search_indexer_tasker",
"binding_key": "search_indexer_tasker"
},
}
CELERY_TASK_QUEUES = {
"work_queue": {
@ -416,11 +412,6 @@ CELERY_TASK_QUEUES = {
"exchange_type": "direct",
"binding_key": "search_indexer"
},
"search_indexer_tasker": {
"exchange": "search_indexer_tasker",
"exchange_type": "direct",
"binding_key": "search_indexer_tasker"
},
}
CELERY_TASK_DEFAULT_QUEUE = "work_queue"

View file

@ -41,7 +41,7 @@ class NBMuninGraph(MuninGraph):
'celery_new_feeds': r.llen("new_feeds"),
'celery_push_feeds': r.llen("push_feeds"),
'celery_work_queue': r.llen("work_queue"),
'celery_search_queue': r.llen("search_indexer") + r.llen("search_indexer_tasker"),
'celery_search_queue': r.llen("search_indexer"),
}
if __name__ == '__main__':