mirror of
https://github.com/viq/NewsBlur.git
synced 2025-09-18 21:43:31 +00:00
Adding color throughout the logs. It's crazy gorgeous and easy to follow what's happening on the site from across the room.
This commit is contained in:
parent
3bc4eaebf3
commit
397fd61c5f
16 changed files with 991 additions and 28 deletions
|
@ -17,7 +17,7 @@ def index(requst):
|
|||
@json.json_view
|
||||
def save_classifier(request):
|
||||
post = request.POST
|
||||
logging.info(" ---> [%s] Saving classifier: %s" % (request.user, post))
|
||||
logging.info(" ---> [%s] ~FYSaving classifier: ~FW%s" % (request.user, post))
|
||||
feed_id = int(post['feed_id'])
|
||||
feed = get_object_or_404(Feed, pk=feed_id)
|
||||
code = 0
|
||||
|
@ -71,7 +71,7 @@ def save_classifier(request):
|
|||
_save_classifier(MClassifierTitle, 'title')
|
||||
_save_classifier(MClassifierFeed, 'feed')
|
||||
|
||||
logging.info(" ---> [%s] Feed training: %s" % (request.user, feed))
|
||||
logging.info(" ---> [%s] ~FYFeed training: ~SB%s" % (request.user, feed))
|
||||
|
||||
response = dict(code=code, message=message, payload=payload)
|
||||
return response
|
||||
|
|
|
@ -83,10 +83,10 @@ def login(request):
|
|||
if form.is_valid():
|
||||
login_user(request, form.get_user())
|
||||
if request.POST.get('api'):
|
||||
logging.info(" ---> [%s] iPhone Login" % form.get_user())
|
||||
logging.info(" ---> [%s] ~FGiPhone Login~FW" % form.get_user())
|
||||
code = 1
|
||||
else:
|
||||
logging.info(" ---> [%s] Login" % form.get_user())
|
||||
logging.info(" ---> [%s] ~FGLogin~FW" % form.get_user())
|
||||
return HttpResponseRedirect(reverse('index'))
|
||||
|
||||
if request.POST.get('api'):
|
||||
|
@ -101,14 +101,14 @@ def signup(request):
|
|||
if form.is_valid():
|
||||
new_user = form.save()
|
||||
login_user(request, new_user)
|
||||
logging.info(" ---> [%s] NEW SIGNUP" % new_user)
|
||||
logging.info(" ---> [%s] ~FG~SB~BMNEW SIGNUP~FW" % new_user)
|
||||
return HttpResponseRedirect(reverse('index'))
|
||||
|
||||
return index(request)
|
||||
|
||||
@never_cache
|
||||
def logout(request):
|
||||
logging.info(" ---> [%s] Logout" % request.user)
|
||||
logging.info(" ---> [%s] ~FGLogout~FW" % request.user)
|
||||
logout_user(request)
|
||||
|
||||
if request.GET.get('api'):
|
||||
|
@ -337,7 +337,7 @@ def load_single_feed(request):
|
|||
diff = datetime.datetime.utcnow()-now
|
||||
timediff = float("%s.%s" % (diff.seconds, (diff.microseconds / 1000)))
|
||||
last_update = relative_timesince(feed.last_update)
|
||||
logging.info(" ---> [%s] Loading feed: %s (%s seconds)" % (request.user, feed, timediff))
|
||||
logging.info(" ---> [%s] ~FGLoading feed: ~SB%s ~SN(%s seconds)" % (request.user, feed, timediff))
|
||||
FeedLoadtime.objects.create(feed=feed, loadtime=timediff)
|
||||
|
||||
data = dict(stories=stories,
|
||||
|
@ -394,7 +394,7 @@ def load_starred_stories(request):
|
|||
'title': 0,
|
||||
}
|
||||
|
||||
logging.info(" ---> [%s] Loading starred stories: %s stories" % (request.user, len(stories)))
|
||||
logging.info(" ---> [%s] ~FCLoading starred stories: ~SB%s stories" % (request.user, len(stories)))
|
||||
|
||||
return dict(stories=stories)
|
||||
|
||||
|
@ -415,7 +415,7 @@ def mark_all_as_read(request):
|
|||
sub.mark_read_date = read_date
|
||||
sub.save()
|
||||
|
||||
logging.info(" ---> [%s] Marking all as read: %s days" % (request.user, days,))
|
||||
logging.info(" ---> [%s] ~FR~BMMarking all as read: ~SB%s days" % (request.user, days,))
|
||||
return dict(code=code)
|
||||
|
||||
@ajax_login_required
|
||||
|
@ -442,9 +442,9 @@ def mark_story_as_read(request):
|
|||
data = dict(code=0, payload=story_ids)
|
||||
|
||||
if len(story_ids) > 1:
|
||||
logging.debug(" ---> [%s] Read %s stories in feed: %s" % (request.user, len(story_ids), usersub.feed))
|
||||
logging.debug(" ---> [%s] ~FK~SBRead %s stories in feed: %s" % (request.user, len(story_ids), usersub.feed))
|
||||
else:
|
||||
logging.debug(" ---> [%s] Read story in feed: %s" % (request.user, usersub.feed))
|
||||
logging.debug(" ---> [%s] ~FK~SBRead story in feed: %s" % (request.user, usersub.feed))
|
||||
|
||||
for story_id in story_ids:
|
||||
story = MStory.objects(story_feed_id=feed_id, story_guid=story_id)[0]
|
||||
|
@ -453,7 +453,7 @@ def mark_story_as_read(request):
|
|||
try:
|
||||
m.save()
|
||||
except OperationError:
|
||||
logging.info(' ---> [%s] *** Marked story as read: Duplicate Story -> %s' % (request.user, story_id))
|
||||
logging.info(' ---> [%s] ~BW*** Marked story as read: Duplicate Story -> %s' % (request.user, story_id))
|
||||
|
||||
return data
|
||||
|
||||
|
@ -477,7 +477,7 @@ def mark_feed_as_read(request):
|
|||
else:
|
||||
code = 1
|
||||
|
||||
logging.info(" ---> [%s] Marking feed as read: %s" % (request.user, feed,))
|
||||
logging.info(" ---> [%s] ~FMMarking feed as read: ~SB%s" % (request.user, feed,))
|
||||
MUserStory.objects(user_id=request.user.pk, feed_id=feed_id).delete()
|
||||
return dict(code=code)
|
||||
|
||||
|
@ -498,7 +498,7 @@ def add_url(request):
|
|||
folder = request.POST['folder']
|
||||
feed = None
|
||||
|
||||
logging.info(" ---> [%s] Adding URL: %s (in %s)" % (request.user, url, folder))
|
||||
logging.info(" ---> [%s] ~FRAdding URL: ~SB%s (in %s)" % (request.user, url, folder))
|
||||
|
||||
if url:
|
||||
url = urlnorm.normalize(url)
|
||||
|
@ -556,7 +556,7 @@ def add_folder(request):
|
|||
folder = request.POST['folder']
|
||||
parent_folder = request.POST['parent_folder']
|
||||
|
||||
logging.info(" ---> [%s] Adding Folder: %s (in %s)" % (request.user, folder, parent_folder))
|
||||
logging.info(" ---> [%s] ~FRAdding Folder: ~SB%s (in %s)" % (request.user, folder, parent_folder))
|
||||
|
||||
if folder:
|
||||
code = 1
|
||||
|
@ -633,7 +633,7 @@ def add_feature(request):
|
|||
@json.json_view
|
||||
def load_features(request):
|
||||
page = int(request.POST.get('page', 0))
|
||||
logging.info(" ---> [%s] Browse features: Page #%s" % (request.user, page+1))
|
||||
logging.info(" ---> [%s] ~FBBrowse features: Page #%s" % (request.user, page+1))
|
||||
features = Feature.objects.all()[page*3:(page+1)*3+1].values()
|
||||
features = [{
|
||||
'description': f['description'],
|
||||
|
@ -649,7 +649,7 @@ def save_feed_order(request):
|
|||
# Test that folders can be JSON decoded
|
||||
folders_list = json.decode(folders)
|
||||
assert folders_list is not None
|
||||
logging.info(" ---> [%s] Feed re-ordering: %s folders/feeds" % (request.user,
|
||||
logging.info(" ---> [%s] ~FBFeed re-ordering: ~SB%s folders/feeds" % (request.user,
|
||||
len(folders_list)))
|
||||
user_sub_folders = UserSubscriptionFolders.objects.get(user=request.user)
|
||||
user_sub_folders.folders = folders
|
||||
|
@ -678,7 +678,7 @@ def get_feeds_trainer(request):
|
|||
classifier['feed_authors'] = json.decode(us.feed.popular_authors) if us.feed.popular_authors else []
|
||||
classifiers.append(classifier)
|
||||
|
||||
logging.info(" ---> [%s] Loading Trainer: %s feeds" % (user, len(classifiers)))
|
||||
logging.info(" ---> [%s] ~FYLoading Trainer: ~SB%s feeds" % (user, len(classifiers)))
|
||||
|
||||
return classifiers
|
||||
|
||||
|
@ -705,7 +705,7 @@ def save_feed_chooser(request):
|
|||
|
||||
queue_new_feeds(request.user)
|
||||
|
||||
logging.info(' ---> [%s] Activated standard account: %s/%s' % (request.user,
|
||||
logging.info(' ---> [%s] ~FRActivated standard account: ~SB%s~ST/~SB%s' % (request.user,
|
||||
activated,
|
||||
usersubs.count()))
|
||||
return {'activated': activated}
|
||||
|
@ -741,7 +741,7 @@ def activate_premium_account(request):
|
|||
@login_required
|
||||
def login_as(request):
|
||||
if not request.user.is_staff:
|
||||
logging.info(' ---> NON-STAFF LOGGING IN AS ANOTHER USER: %s' % request.user)
|
||||
logging.info(' ---> [%s] ~SKNON-STAFF LOGGING IN AS ANOTHER USER!' % request.user)
|
||||
assert False
|
||||
return HttpResponseForbidden()
|
||||
username = request.GET['user']
|
||||
|
@ -751,7 +751,7 @@ def login_as(request):
|
|||
return HttpResponseRedirect(reverse('index'))
|
||||
|
||||
def iframe_buster(request):
|
||||
logging.info(" ---> [%s] iFrame bust!" % (request.user,))
|
||||
logging.info(" ---> [%s] ~FB~SBiFrame bust!" % (request.user,))
|
||||
return HttpResponse(status=204)
|
||||
|
||||
@ajax_login_required
|
||||
|
@ -768,7 +768,7 @@ def mark_story_as_starred(request):
|
|||
now = datetime.datetime.now()
|
||||
story_values = dict(user_id=request.user.pk, starred_date=now, **story_db)
|
||||
MStarredStory.objects.create(**story_values)
|
||||
logging.info(' ---> [%s] Starring: %s' % (request.user, story[0].story_title[:50]))
|
||||
logging.info(' ---> [%s] ~FCStarring: ~SB%s' % (request.user, story[0].story_title[:50]))
|
||||
else:
|
||||
code = -1
|
||||
|
||||
|
@ -782,7 +782,7 @@ def mark_story_as_unstarred(request):
|
|||
|
||||
starred_story = MStarredStory.objects(user_id=request.user.pk, story_guid=story_id)
|
||||
if starred_story:
|
||||
logging.info(' ---> [%s] Unstarring: %s' % (request.user, starred_story[0].story_title[:50]))
|
||||
logging.info(' ---> [%s] ~FCUnstarring: ~SB%s' % (request.user, starred_story[0].story_title[:50]))
|
||||
starred_story.delete()
|
||||
else:
|
||||
code = -1
|
||||
|
|
|
@ -36,7 +36,7 @@ def load_feed_statistics(request):
|
|||
stats['premium_subscribers'] = feed.premium_subscribers
|
||||
stats['active_subscribers'] = feed.active_subscribers
|
||||
|
||||
logging.info(" ---> [%s] Statistics: %s (%s/%s/%s subs)" % (request.user, feed, feed.num_subscribers, feed.active_subscribers, feed.premium_subscribers,))
|
||||
logging.info(" ---> [%s] ~FBStatistics: ~SB%s ~FG(%s/%s/%s subs)" % (request.user, feed, feed.num_subscribers, feed.active_subscribers, feed.premium_subscribers,))
|
||||
|
||||
return stats
|
||||
|
||||
|
|
6
utils/colorama/__init__.py
Normal file
6
utils/colorama/__init__.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
from .initialise import init
|
||||
from .ansi import Fore, Back, Style
|
||||
from .ansitowin32 import AnsiToWin32
|
||||
|
||||
__version__ = '0.1.16'
|
||||
|
51
utils/colorama/ansi.py
Normal file
51
utils/colorama/ansi.py
Normal file
|
@ -0,0 +1,51 @@
|
|||
'''
|
||||
This module generates ANSI character codes to printing colors to terminals.
|
||||
See: http://en.wikipedia.org/wiki/ANSI_escape_code
|
||||
'''
|
||||
|
||||
CSI = '\033['
|
||||
|
||||
def code_to_chars(code):
|
||||
return CSI + str(code) + 'm'
|
||||
|
||||
class AnsiCodes(object):
|
||||
def __init__(self, codes):
|
||||
for name in dir(codes):
|
||||
if not name.startswith('_'):
|
||||
value = getattr(codes, name)
|
||||
setattr(self, name, code_to_chars(value))
|
||||
|
||||
class AnsiFore:
|
||||
BLACK = 30
|
||||
RED = 31
|
||||
GREEN = 32
|
||||
YELLOW = 33
|
||||
BLUE = 34
|
||||
MAGENTA = 35
|
||||
CYAN = 36
|
||||
WHITE = 37
|
||||
RESET = 39
|
||||
|
||||
class AnsiBack:
|
||||
BLACK = 40
|
||||
RED = 41
|
||||
GREEN = 42
|
||||
YELLOW = 43
|
||||
BLUE = 44
|
||||
MAGENTA = 45
|
||||
CYAN = 46
|
||||
WHITE = 47
|
||||
RESET = 49
|
||||
|
||||
class AnsiStyle:
|
||||
BRIGHT = 1
|
||||
DIM = 2
|
||||
UNDERLINE = 4
|
||||
BLINK = 5
|
||||
NORMAL = 22
|
||||
RESET_ALL = 0
|
||||
|
||||
Fore = AnsiCodes( AnsiFore )
|
||||
Back = AnsiCodes( AnsiBack )
|
||||
Style = AnsiCodes( AnsiStyle )
|
||||
|
176
utils/colorama/ansitowin32.py
Normal file
176
utils/colorama/ansitowin32.py
Normal file
|
@ -0,0 +1,176 @@
|
|||
|
||||
import re
|
||||
import sys
|
||||
|
||||
from .ansi import AnsiFore, AnsiBack, AnsiStyle, Style
|
||||
from .winterm import WinTerm, WinColor, WinStyle
|
||||
from .win32 import windll
|
||||
|
||||
|
||||
if windll is not None:
|
||||
winterm = WinTerm()
|
||||
|
||||
|
||||
def is_a_tty(stream):
|
||||
return hasattr(stream, 'isatty') and stream.isatty()
|
||||
|
||||
|
||||
class StreamWrapper(object):
|
||||
'''
|
||||
Wraps a stream (such as stdout), acting as a transparent proxy for all
|
||||
attribute access apart from method 'write()', which is delegated to our
|
||||
Converter instance.
|
||||
'''
|
||||
def __init__(self, wrapped, converter):
|
||||
# double-underscore everything to prevent clashes with names of
|
||||
# attributes on the wrapped stream object.
|
||||
self.__wrapped = wrapped
|
||||
self.__convertor = converter
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.__wrapped, name)
|
||||
|
||||
def write(self, text):
|
||||
self.__convertor.write(text)
|
||||
|
||||
|
||||
class AnsiToWin32(object):
|
||||
'''
|
||||
Implements a 'write()' method which, on Windows, will strip ANSI character
|
||||
sequences from the text, and if outputting to a tty, will convert them into
|
||||
win32 function calls.
|
||||
'''
|
||||
ANSI_RE = re.compile('\033\[((?:\d|;)*)([a-zA-Z])')
|
||||
|
||||
def __init__(self, wrapped, convert=None, strip=None, autoreset=False):
|
||||
# The wrapped stream (normally sys.stdout or sys.stderr)
|
||||
self.wrapped = wrapped
|
||||
|
||||
# should we reset colors to defaults after every .write()
|
||||
self.autoreset = autoreset
|
||||
|
||||
# create the proxy wrapping our output stream
|
||||
self.stream = StreamWrapper(wrapped, self)
|
||||
|
||||
on_windows = sys.platform.startswith('win')
|
||||
|
||||
# should we strip ANSI sequences from our output?
|
||||
if strip is None:
|
||||
strip = on_windows
|
||||
self.strip = strip
|
||||
|
||||
# should we should convert ANSI sequences into win32 calls?
|
||||
if convert is None:
|
||||
convert = on_windows and is_a_tty(wrapped)
|
||||
self.convert = convert
|
||||
|
||||
# dict of ansi codes to win32 functions and parameters
|
||||
self.win32_calls = self.get_win32_calls()
|
||||
|
||||
# are we wrapping stderr?
|
||||
self.on_stderr = self.wrapped is sys.stderr
|
||||
|
||||
|
||||
def should_wrap(self):
|
||||
'''
|
||||
True if this class is actually needed. If false, then the output
|
||||
stream will not be affected, nor will win32 calls be issued, so
|
||||
wrapping stdout is not actually required. This will generally be
|
||||
False on non-Windows platforms, unless optional functionality like
|
||||
autoreset has been requested using kwargs to init()
|
||||
'''
|
||||
return self.convert or self.strip or self.autoreset
|
||||
|
||||
|
||||
def get_win32_calls(self):
|
||||
if self.convert and winterm:
|
||||
return {
|
||||
AnsiStyle.RESET_ALL: (winterm.reset_all, ),
|
||||
AnsiStyle.BRIGHT: (winterm.style, WinStyle.BRIGHT),
|
||||
AnsiStyle.DIM: (winterm.style, WinStyle.NORMAL),
|
||||
AnsiStyle.NORMAL: (winterm.style, WinStyle.NORMAL),
|
||||
AnsiFore.BLACK: (winterm.fore, WinColor.BLACK),
|
||||
AnsiFore.RED: (winterm.fore, WinColor.RED),
|
||||
AnsiFore.GREEN: (winterm.fore, WinColor.GREEN),
|
||||
AnsiFore.YELLOW: (winterm.fore, WinColor.YELLOW),
|
||||
AnsiFore.BLUE: (winterm.fore, WinColor.BLUE),
|
||||
AnsiFore.MAGENTA: (winterm.fore, WinColor.MAGENTA),
|
||||
AnsiFore.CYAN: (winterm.fore, WinColor.CYAN),
|
||||
AnsiFore.WHITE: (winterm.fore, WinColor.GREY),
|
||||
AnsiFore.RESET: (winterm.fore, ),
|
||||
AnsiBack.BLACK: (winterm.back, WinColor.BLACK),
|
||||
AnsiBack.RED: (winterm.back, WinColor.RED),
|
||||
AnsiBack.GREEN: (winterm.back, WinColor.GREEN),
|
||||
AnsiBack.YELLOW: (winterm.back, WinColor.YELLOW),
|
||||
AnsiBack.BLUE: (winterm.back, WinColor.BLUE),
|
||||
AnsiBack.MAGENTA: (winterm.back, WinColor.MAGENTA),
|
||||
AnsiBack.CYAN: (winterm.back, WinColor.CYAN),
|
||||
AnsiBack.WHITE: (winterm.back, WinColor.GREY),
|
||||
AnsiBack.RESET: (winterm.back, ),
|
||||
}
|
||||
|
||||
|
||||
def write(self, text):
|
||||
if self.strip or self.convert:
|
||||
self.write_and_convert(text)
|
||||
else:
|
||||
self.wrapped.write(text)
|
||||
self.wrapped.flush()
|
||||
if self.autoreset:
|
||||
self.reset_all()
|
||||
|
||||
|
||||
def reset_all(self):
|
||||
if self.convert:
|
||||
self.call_win32('m', (0,))
|
||||
else:
|
||||
self.wrapped.write(Style.RESET_ALL)
|
||||
|
||||
|
||||
def write_and_convert(self, text):
|
||||
'''
|
||||
Write the given text to our wrapped stream, stripping any ANSI
|
||||
sequences from the text, and optionally converting them into win32
|
||||
calls.
|
||||
'''
|
||||
cursor = 0
|
||||
for match in self.ANSI_RE.finditer(text):
|
||||
start, end = match.span()
|
||||
self.write_plain_text(text, cursor, start)
|
||||
self.convert_ansi(*match.groups())
|
||||
cursor = end
|
||||
self.write_plain_text(text, cursor, len(text))
|
||||
|
||||
|
||||
def write_plain_text(self, text, start, end):
|
||||
if start < end:
|
||||
self.wrapped.write(text[start:end])
|
||||
self.wrapped.flush()
|
||||
|
||||
|
||||
def convert_ansi(self, paramstring, command):
|
||||
if self.convert:
|
||||
params = self.extract_params(paramstring)
|
||||
self.call_win32(command, params)
|
||||
|
||||
|
||||
def extract_params(self, paramstring):
|
||||
def split(paramstring):
|
||||
for p in paramstring.split(';'):
|
||||
if p != '':
|
||||
yield int(p)
|
||||
return tuple(split(paramstring))
|
||||
|
||||
|
||||
def call_win32(self, command, params):
|
||||
if params == []:
|
||||
params = [0]
|
||||
if command == 'm':
|
||||
for param in params:
|
||||
if param in self.win32_calls:
|
||||
func_args = self.win32_calls[param]
|
||||
func = func_args[0]
|
||||
args = func_args[1:]
|
||||
kwargs = dict(on_stderr=self.on_stderr)
|
||||
func(*args, **kwargs)
|
||||
|
32
utils/colorama/initialise.py
Normal file
32
utils/colorama/initialise.py
Normal file
|
@ -0,0 +1,32 @@
|
|||
import atexit
|
||||
import sys
|
||||
|
||||
from .ansitowin32 import AnsiToWin32
|
||||
|
||||
|
||||
orig_stdout = sys.stdout
|
||||
orig_stderr = sys.stderr
|
||||
|
||||
|
||||
@atexit.register
|
||||
def reset_all():
|
||||
AnsiToWin32(orig_stdout).reset_all()
|
||||
|
||||
|
||||
def init(autoreset=False, convert=None, strip=None, wrap=True):
|
||||
|
||||
if wrap==False and (autoreset==True or convert==True or strip==True):
|
||||
raise ValueError('wrap=False conflicts with any other arg=True')
|
||||
|
||||
sys.stdout = wrap_stream(orig_stdout, convert, strip, autoreset, wrap)
|
||||
sys.stderr = wrap_stream(orig_stderr, convert, strip, autoreset, wrap)
|
||||
|
||||
|
||||
def wrap_stream(stream, convert, strip, autoreset, wrap):
|
||||
if wrap:
|
||||
wrapper = AnsiToWin32(stream,
|
||||
convert=convert, strip=strip, autoreset=autoreset)
|
||||
if wrapper.should_wrap():
|
||||
stream = wrapper.stream
|
||||
return stream
|
||||
|
0
utils/colorama/tests/__init__.py
Normal file
0
utils/colorama/tests/__init__.py
Normal file
58
utils/colorama/tests/ansi_test.py
Normal file
58
utils/colorama/tests/ansi_test.py
Normal file
|
@ -0,0 +1,58 @@
|
|||
|
||||
import sys
|
||||
from unittest2 import TestCase, main
|
||||
|
||||
from ..ansi import Fore, Back, Style
|
||||
from ..ansitowin32 import AnsiToWin32
|
||||
|
||||
|
||||
stdout_orig = sys.stdout
|
||||
stderr_orig = sys.stderr
|
||||
|
||||
|
||||
class AnsiTest(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# sanity check: stdout should be a file or StringIO object.
|
||||
# It will only be AnsiToWin32 if init() has previously wrapped it
|
||||
self.assertNotEqual(type(sys.stdout), AnsiToWin32)
|
||||
self.assertNotEqual(type(sys.stderr), AnsiToWin32)
|
||||
|
||||
def tearDown(self):
|
||||
sys.stdout = stdout_orig
|
||||
sys.stderr = stderr_orig
|
||||
|
||||
|
||||
def testForeAttributes(self):
|
||||
self.assertEquals(Fore.BLACK, '\033[30m')
|
||||
self.assertEquals(Fore.RED, '\033[31m')
|
||||
self.assertEquals(Fore.GREEN, '\033[32m')
|
||||
self.assertEquals(Fore.YELLOW, '\033[33m')
|
||||
self.assertEquals(Fore.BLUE, '\033[34m')
|
||||
self.assertEquals(Fore.MAGENTA, '\033[35m')
|
||||
self.assertEquals(Fore.CYAN, '\033[36m')
|
||||
self.assertEquals(Fore.WHITE, '\033[37m')
|
||||
self.assertEquals(Fore.RESET, '\033[39m')
|
||||
|
||||
|
||||
def testBackAttributes(self):
|
||||
self.assertEquals(Back.BLACK, '\033[40m')
|
||||
self.assertEquals(Back.RED, '\033[41m')
|
||||
self.assertEquals(Back.GREEN, '\033[42m')
|
||||
self.assertEquals(Back.YELLOW, '\033[43m')
|
||||
self.assertEquals(Back.BLUE, '\033[44m')
|
||||
self.assertEquals(Back.MAGENTA, '\033[45m')
|
||||
self.assertEquals(Back.CYAN, '\033[46m')
|
||||
self.assertEquals(Back.WHITE, '\033[47m')
|
||||
self.assertEquals(Back.RESET, '\033[49m')
|
||||
|
||||
|
||||
def testStyleAttributes(self):
|
||||
self.assertEquals(Style.DIM, '\033[2m')
|
||||
self.assertEquals(Style.NORMAL, '\033[22m')
|
||||
self.assertEquals(Style.BRIGHT, '\033[1m')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
185
utils/colorama/tests/ansitowin32_test.py
Normal file
185
utils/colorama/tests/ansitowin32_test.py
Normal file
|
@ -0,0 +1,185 @@
|
|||
|
||||
from unittest2 import TestCase, main
|
||||
from mock import Mock, patch
|
||||
|
||||
from .utils import platform
|
||||
|
||||
from ..ansi import Style
|
||||
from ..ansitowin32 import AnsiToWin32, StreamWrapper
|
||||
|
||||
|
||||
|
||||
class StreamWrapperTest(TestCase):
|
||||
|
||||
def testIsAProxy(self):
|
||||
mockStream = Mock()
|
||||
wrapper = StreamWrapper(mockStream, None)
|
||||
self.assertTrue( wrapper.random_attr is mockStream.random_attr )
|
||||
|
||||
def testDelegatesWrite(self):
|
||||
mockStream = Mock()
|
||||
mockConverter = Mock()
|
||||
wrapper = StreamWrapper(mockStream, mockConverter)
|
||||
wrapper.write('hello')
|
||||
self.assertTrue(mockConverter.write.call_args, (('hello',), {}))
|
||||
|
||||
|
||||
class AnsiToWin32Test(TestCase):
|
||||
|
||||
def testInit(self):
|
||||
mockStdout = object()
|
||||
auto = object()
|
||||
stream = AnsiToWin32(mockStdout, autoreset=auto)
|
||||
self.assertEquals(stream.wrapped, mockStdout)
|
||||
self.assertEquals(stream.autoreset, auto)
|
||||
|
||||
@patch('colorama.ansitowin32.winterm', None)
|
||||
def testStripIsTrueOnWindows(self):
|
||||
with platform('windows'):
|
||||
stream = AnsiToWin32(None)
|
||||
self.assertTrue(stream.strip)
|
||||
|
||||
def testStripIsFalseOffWindows(self):
|
||||
with platform('darwin'):
|
||||
stream = AnsiToWin32(None)
|
||||
self.assertFalse(stream.strip)
|
||||
|
||||
|
||||
def testWriteStripsAnsi(self):
|
||||
mockStdout = Mock()
|
||||
stream = AnsiToWin32(mockStdout)
|
||||
stream.wrapped = Mock()
|
||||
stream.write_and_convert = Mock()
|
||||
stream.strip = True
|
||||
|
||||
stream.write('abc')
|
||||
|
||||
self.assertFalse(stream.wrapped.write.called)
|
||||
self.assertEquals(stream.write_and_convert.call_args, (('abc',), {}))
|
||||
|
||||
|
||||
def testWriteDoesNotStripAnsi(self):
|
||||
mockStdout = Mock()
|
||||
stream = AnsiToWin32(mockStdout)
|
||||
stream.wrapped = Mock()
|
||||
stream.write_and_convert = Mock()
|
||||
stream.strip = False
|
||||
stream.convert = False
|
||||
|
||||
stream.write('abc')
|
||||
|
||||
self.assertFalse(stream.write_and_convert.called)
|
||||
self.assertEquals(stream.wrapped.write.call_args, (('abc',), {}))
|
||||
|
||||
|
||||
def assert_autoresets(self, convert, autoreset=True):
|
||||
stream = AnsiToWin32(Mock())
|
||||
stream.convert = convert
|
||||
stream.reset_all = Mock()
|
||||
stream.autoreset = autoreset
|
||||
stream.winterm = Mock()
|
||||
|
||||
stream.write('abc')
|
||||
|
||||
self.assertEquals(stream.reset_all.called, autoreset)
|
||||
|
||||
def testWriteAutoresets(self):
|
||||
self.assert_autoresets(convert=True)
|
||||
self.assert_autoresets(convert=False)
|
||||
self.assert_autoresets(convert=True, autoreset=False)
|
||||
self.assert_autoresets(convert=False, autoreset=False)
|
||||
|
||||
def testWriteAndConvertWritesPlainText(self):
|
||||
stream = AnsiToWin32(Mock())
|
||||
stream.write_and_convert( 'abc' )
|
||||
self.assertEquals( stream.wrapped.write.call_args, (('abc',), {}) )
|
||||
|
||||
def testWriteAndConvertStripsAllValidAnsi(self):
|
||||
stream = AnsiToWin32(Mock())
|
||||
stream.call_win32 = Mock()
|
||||
data = [
|
||||
'abc\033[mdef',
|
||||
'abc\033[0mdef',
|
||||
'abc\033[2mdef',
|
||||
'abc\033[02mdef',
|
||||
'abc\033[002mdef',
|
||||
'abc\033[40mdef',
|
||||
'abc\033[040mdef',
|
||||
'abc\033[0;1mdef',
|
||||
'abc\033[40;50mdef',
|
||||
'abc\033[50;30;40mdef',
|
||||
'abc\033[Adef',
|
||||
'abc\033[0Gdef',
|
||||
'abc\033[1;20;128Hdef',
|
||||
]
|
||||
for datum in data:
|
||||
stream.wrapped.write.reset_mock()
|
||||
stream.write_and_convert( datum )
|
||||
self.assertEquals(
|
||||
[args[0] for args in stream.wrapped.write.call_args_list],
|
||||
[ ('abc',), ('def',) ]
|
||||
)
|
||||
|
||||
def testWriteAndConvertSkipsEmptySnippets(self):
|
||||
stream = AnsiToWin32(Mock())
|
||||
stream.call_win32 = Mock()
|
||||
stream.write_and_convert( '\033[40m\033[41m' )
|
||||
self.assertFalse( stream.wrapped.write.called )
|
||||
|
||||
def testWriteAndConvertCallsWin32WithParamsAndCommand(self):
|
||||
stream = AnsiToWin32(Mock())
|
||||
stream.convert = True
|
||||
stream.call_win32 = Mock()
|
||||
stream.extract_params = Mock(return_value='params')
|
||||
data = {
|
||||
'abc\033[adef': ('a', 'params'),
|
||||
'abc\033[;;bdef': ('b', 'params'),
|
||||
'abc\033[0cdef': ('c', 'params'),
|
||||
'abc\033[;;0;;Gdef': ('G', 'params'),
|
||||
'abc\033[1;20;128Hdef': ('H', 'params'),
|
||||
}
|
||||
for datum, expected in data.items():
|
||||
stream.call_win32.reset_mock()
|
||||
stream.write_and_convert( datum )
|
||||
self.assertEquals( stream.call_win32.call_args[0], expected )
|
||||
|
||||
def testExtractParams(self):
|
||||
stream = AnsiToWin32(Mock())
|
||||
data = {
|
||||
'': (),
|
||||
';;': (),
|
||||
'2': (2,),
|
||||
';;002;;': (2,),
|
||||
'0;1': (0, 1),
|
||||
';;003;;456;;': (3, 456),
|
||||
'11;22;33;44;55': (11, 22, 33, 44, 55),
|
||||
}
|
||||
for datum, expected in data.items():
|
||||
self.assertEquals(stream.extract_params(datum), expected)
|
||||
|
||||
def testCallWin32UsesLookup(self):
|
||||
listener = Mock()
|
||||
stream = AnsiToWin32(listener)
|
||||
stream.win32_calls = {
|
||||
1: (lambda *_, **__: listener(11),),
|
||||
2: (lambda *_, **__: listener(22),),
|
||||
3: (lambda *_, **__: listener(33),),
|
||||
}
|
||||
stream.call_win32('m', (3, 1, 99, 2))
|
||||
self.assertEquals(
|
||||
[a[0][0] for a in listener.call_args_list],
|
||||
[33, 11, 22] )
|
||||
|
||||
def testCallWin32DefaultsToParams0(self):
|
||||
mockStdout = Mock()
|
||||
stream = AnsiToWin32(mockStdout)
|
||||
stream.win32_calls = {0: (mockStdout.reset,)}
|
||||
|
||||
stream.call_win32('m', [])
|
||||
|
||||
self.assertTrue(mockStdout.reset.called)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
105
utils/colorama/tests/initialise_test.py
Normal file
105
utils/colorama/tests/initialise_test.py
Normal file
|
@ -0,0 +1,105 @@
|
|||
|
||||
import sys
|
||||
from unittest2 import TestCase, main
|
||||
|
||||
from mock import patch
|
||||
|
||||
from .utils import platform, redirected_output
|
||||
|
||||
from ..initialise import init
|
||||
from ..ansitowin32 import StreamWrapper
|
||||
|
||||
|
||||
orig_stdout = sys.stdout
|
||||
orig_stderr = sys.stderr
|
||||
|
||||
|
||||
class InitTest(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# sanity check
|
||||
self.assertNotWrapped()
|
||||
|
||||
def tearDown(self):
|
||||
sys.stdout = orig_stdout
|
||||
sys.stderr = orig_stderr
|
||||
|
||||
def assertWrapped(self):
|
||||
self.assertIsNot(sys.stdout, orig_stdout, 'stdout should be wrapped')
|
||||
self.assertIsNot(sys.stderr, orig_stderr, 'stderr should be wrapped')
|
||||
self.assertTrue(isinstance(sys.stdout, StreamWrapper),
|
||||
'bad stdout wrapper')
|
||||
self.assertTrue(isinstance(sys.stderr, StreamWrapper),
|
||||
'bad stderr wrapper')
|
||||
|
||||
def assertNotWrapped(self):
|
||||
self.assertIs(sys.stdout, orig_stdout, 'stdout should not be wrapped')
|
||||
self.assertIs(sys.stderr, orig_stderr, 'stderr should not be wrapped')
|
||||
|
||||
@patch('colorama.initialise.reset_all')
|
||||
def testInitWrapsOnWindows(self, _):
|
||||
with platform('windows'):
|
||||
init()
|
||||
self.assertWrapped()
|
||||
|
||||
def testInitDoesntWrapOnNonWindows(self):
|
||||
with platform('darwin'):
|
||||
init()
|
||||
self.assertNotWrapped()
|
||||
|
||||
def testInitAutoresetOnWrapsOnAllPlatforms(self):
|
||||
with platform('darwin'):
|
||||
init(autoreset=True)
|
||||
self.assertWrapped()
|
||||
|
||||
def testInitWrapOffDoesntWrapOnWindows(self):
|
||||
with platform('windows'):
|
||||
init(wrap=False)
|
||||
self.assertNotWrapped()
|
||||
|
||||
def testInitWrapOffWillUnwrapIfRequired(self):
|
||||
with platform('windows'):
|
||||
init()
|
||||
init(wrap=False)
|
||||
self.assertNotWrapped()
|
||||
|
||||
def testInitWrapOffIncompatibleWithAutoresetOn(self):
|
||||
self.assertRaises(ValueError, lambda: init(autoreset=True, wrap=False))
|
||||
|
||||
@patch('colorama.ansitowin32.winterm', None)
|
||||
def testInitOnlyWrapsOnce(self):
|
||||
with platform('windows'):
|
||||
init()
|
||||
init()
|
||||
self.assertWrapped()
|
||||
|
||||
@patch('colorama.win32.SetConsoleTextAttribute')
|
||||
@patch('colorama.initialise.AnsiToWin32')
|
||||
def testAutoResetPassedOn(self, mockATW32, _):
|
||||
with platform('windows'):
|
||||
init(autoreset=True)
|
||||
self.assertEquals(len(mockATW32.call_args_list), 2)
|
||||
self.assertEquals(mockATW32.call_args_list[1][1]['autoreset'], True)
|
||||
self.assertEquals(mockATW32.call_args_list[0][1]['autoreset'], True)
|
||||
|
||||
@patch('colorama.initialise.AnsiToWin32')
|
||||
def testAutoResetChangeable(self, mockATW32):
|
||||
with platform('windows'):
|
||||
init()
|
||||
|
||||
init(autoreset=True)
|
||||
self.assertEquals(len(mockATW32.call_args_list), 4)
|
||||
self.assertEquals(mockATW32.call_args_list[2][1]['autoreset'], True)
|
||||
self.assertEquals(mockATW32.call_args_list[3][1]['autoreset'], True)
|
||||
|
||||
init()
|
||||
self.assertEquals(len(mockATW32.call_args_list), 6)
|
||||
self.assertEquals(
|
||||
mockATW32.call_args_list[4][1]['autoreset'], False)
|
||||
self.assertEquals(
|
||||
mockATW32.call_args_list[5][1]['autoreset'], False)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
21
utils/colorama/tests/utils.py
Normal file
21
utils/colorama/tests/utils.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
|
||||
from contextlib import contextmanager
|
||||
import sys
|
||||
|
||||
from mock import Mock
|
||||
|
||||
@contextmanager
|
||||
def platform(name):
|
||||
orig = sys.platform
|
||||
sys.platform = name
|
||||
yield
|
||||
sys.platform = orig
|
||||
|
||||
@contextmanager
|
||||
def redirected_output():
|
||||
orig = sys.stdout
|
||||
sys.stdout = Mock()
|
||||
sys.stdout.isatty = lambda: False
|
||||
yield
|
||||
sys.stdout = orig
|
||||
|
124
utils/colorama/tests/winterm_test.py
Normal file
124
utils/colorama/tests/winterm_test.py
Normal file
|
@ -0,0 +1,124 @@
|
|||
|
||||
from unittest2 import TestCase, main
|
||||
|
||||
from mock import Mock, patch
|
||||
|
||||
from ..winterm import WinColor, WinStyle, WinTerm
|
||||
|
||||
|
||||
class WinTermTest(TestCase):
|
||||
|
||||
@patch('colorama.winterm.win32')
|
||||
def testInit(self, mockWin32):
|
||||
mockAttr = Mock()
|
||||
mockAttr.wAttributes = 7 + 6 * 16 + 8
|
||||
mockWin32.GetConsoleScreenBufferInfo.return_value = mockAttr
|
||||
term = WinTerm()
|
||||
self.assertEquals(term._fore, 7)
|
||||
self.assertEquals(term._back, 6)
|
||||
self.assertEquals(term._style, 8)
|
||||
|
||||
def testGetAttrs(self):
|
||||
term = WinTerm()
|
||||
|
||||
term._fore = 0
|
||||
term._back = 0
|
||||
term._style = 0
|
||||
self.assertEquals(term.get_attrs(), 0)
|
||||
|
||||
term._fore = WinColor.YELLOW
|
||||
self.assertEquals(term.get_attrs(), WinColor.YELLOW)
|
||||
|
||||
term._back = WinColor.MAGENTA
|
||||
self.assertEquals(
|
||||
term.get_attrs(),
|
||||
WinColor.YELLOW + WinColor.MAGENTA * 16)
|
||||
|
||||
term._style = WinStyle.BRIGHT
|
||||
self.assertEquals(
|
||||
term.get_attrs(),
|
||||
WinColor.YELLOW + WinColor.MAGENTA * 16 + WinStyle.BRIGHT)
|
||||
|
||||
@patch('colorama.winterm.win32')
|
||||
def testResetAll(self, mockWin32):
|
||||
mockAttr = Mock()
|
||||
mockAttr.wAttributes = 1 + 2 * 16 + 8
|
||||
mockWin32.GetConsoleScreenBufferInfo.return_value = mockAttr
|
||||
term = WinTerm()
|
||||
|
||||
term.set_console = Mock()
|
||||
term._fore = -1
|
||||
term._back = -1
|
||||
term._style = -1
|
||||
|
||||
term.reset_all()
|
||||
|
||||
self.assertEquals(term._fore, 1)
|
||||
self.assertEquals(term._back, 2)
|
||||
self.assertEquals(term._style, 8)
|
||||
self.assertEquals(term.set_console.called, True)
|
||||
|
||||
def testFore(self):
|
||||
term = WinTerm()
|
||||
term.set_console = Mock()
|
||||
term._fore = 0
|
||||
|
||||
term.fore(5)
|
||||
|
||||
self.assertEquals(term._fore, 5)
|
||||
self.assertEquals(term.set_console.called, True)
|
||||
|
||||
def testBack(self):
|
||||
term = WinTerm()
|
||||
term.set_console = Mock()
|
||||
term._back = 0
|
||||
|
||||
term.back(5)
|
||||
|
||||
self.assertEquals(term._back, 5)
|
||||
self.assertEquals(term.set_console.called, True)
|
||||
|
||||
def testStyle(self):
|
||||
term = WinTerm()
|
||||
term.set_console = Mock()
|
||||
term._style = 0
|
||||
|
||||
term.style(22)
|
||||
|
||||
self.assertEquals(term._style, 22)
|
||||
self.assertEquals(term.set_console.called, True)
|
||||
|
||||
@patch('colorama.winterm.win32')
|
||||
def testSetConsole(self, mockWin32):
|
||||
mockAttr = Mock()
|
||||
mockAttr.wAttributes = 0
|
||||
mockWin32.GetConsoleScreenBufferInfo.return_value = mockAttr
|
||||
term = WinTerm()
|
||||
term.windll = Mock()
|
||||
|
||||
term.set_console()
|
||||
|
||||
self.assertEquals(
|
||||
mockWin32.SetConsoleTextAttribute.call_args,
|
||||
((mockWin32.STDOUT, term.get_attrs()), {})
|
||||
)
|
||||
|
||||
@patch('colorama.winterm.win32')
|
||||
def testSetConsoleOnStderr(self, mockWin32):
|
||||
mockAttr = Mock()
|
||||
mockAttr.wAttributes = 0
|
||||
mockWin32.GetConsoleScreenBufferInfo.return_value = mockAttr
|
||||
term = WinTerm()
|
||||
term.windll = Mock()
|
||||
|
||||
term.set_console(on_stderr=True)
|
||||
|
||||
self.assertEquals(
|
||||
mockWin32.SetConsoleTextAttribute.call_args,
|
||||
((mockWin32.STDERR, term.get_attrs()), {})
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
95
utils/colorama/win32.py
Normal file
95
utils/colorama/win32.py
Normal file
|
@ -0,0 +1,95 @@
|
|||
|
||||
# from winbase.h
|
||||
STDOUT = -11
|
||||
STDERR = -12
|
||||
|
||||
try:
|
||||
from ctypes import windll
|
||||
except ImportError:
|
||||
windll = None
|
||||
SetConsoleTextAttribute = lambda *_: None
|
||||
else:
|
||||
from ctypes import (
|
||||
byref, Structure, c_char, c_short, c_uint32, c_ushort
|
||||
)
|
||||
|
||||
handles = {
|
||||
STDOUT: windll.kernel32.GetStdHandle(STDOUT),
|
||||
STDERR: windll.kernel32.GetStdHandle(STDERR),
|
||||
}
|
||||
|
||||
SHORT = c_short
|
||||
WORD = c_ushort
|
||||
DWORD = c_uint32
|
||||
TCHAR = c_char
|
||||
|
||||
class COORD(Structure):
|
||||
"""struct in wincon.h"""
|
||||
_fields_ = [
|
||||
('X', SHORT),
|
||||
('Y', SHORT),
|
||||
]
|
||||
|
||||
class SMALL_RECT(Structure):
|
||||
"""struct in wincon.h."""
|
||||
_fields_ = [
|
||||
("Left", SHORT),
|
||||
("Top", SHORT),
|
||||
("Right", SHORT),
|
||||
("Bottom", SHORT),
|
||||
]
|
||||
|
||||
class CONSOLE_SCREEN_BUFFER_INFO(Structure):
|
||||
"""struct in wincon.h."""
|
||||
_fields_ = [
|
||||
("dwSize", COORD),
|
||||
("dwCursorPosition", COORD),
|
||||
("wAttributes", WORD),
|
||||
("srWindow", SMALL_RECT),
|
||||
("dwMaximumWindowSize", COORD),
|
||||
]
|
||||
|
||||
def GetConsoleScreenBufferInfo(stream_id):
|
||||
handle = handles[stream_id]
|
||||
csbi = CONSOLE_SCREEN_BUFFER_INFO()
|
||||
success = windll.kernel32.GetConsoleScreenBufferInfo(
|
||||
handle, byref(csbi))
|
||||
# This fails when imported via setup.py when installing using 'pip'
|
||||
# presumably the fix is that running setup.py should not trigger all
|
||||
# this activity.
|
||||
# assert success
|
||||
return csbi
|
||||
|
||||
def SetConsoleTextAttribute(stream_id, attrs):
|
||||
handle = handles[stream_id]
|
||||
success = windll.kernel32.SetConsoleTextAttribute(handle, attrs)
|
||||
assert success
|
||||
|
||||
def SetConsoleCursorPosition(stream_id, position):
|
||||
handle = handles[stream_id]
|
||||
position = COORD(*position)
|
||||
success = windll.kernel32.SetConsoleCursorPosition(handle, position)
|
||||
assert success
|
||||
|
||||
def FillConsoleOutputCharacter(stream_id, char, length, start):
|
||||
handle = handles[stream_id]
|
||||
char = TCHAR(char)
|
||||
length = DWORD(length)
|
||||
start = COORD(*start)
|
||||
num_written = DWORD(0)
|
||||
# AttributeError: function 'FillConsoleOutputCharacter' not found
|
||||
# could it just be that my types are wrong?
|
||||
success = windll.kernel32.FillConsoleOutputCharacter(
|
||||
handle, char, length, start, byref(num_written))
|
||||
assert success
|
||||
return num_written.value
|
||||
|
||||
|
||||
if __name__=='__main__':
|
||||
x = GetConsoleScreenBufferInfo(STDOUT)
|
||||
print(x.dwSize)
|
||||
print(x.dwCursorPosition)
|
||||
print(x.wAttributes)
|
||||
print(x.srWindow)
|
||||
print(x.dwMaximumWindowSize)
|
||||
|
69
utils/colorama/winterm.py
Normal file
69
utils/colorama/winterm.py
Normal file
|
@ -0,0 +1,69 @@
|
|||
|
||||
from . import win32
|
||||
|
||||
|
||||
# from wincon.h
|
||||
class WinColor(object):
|
||||
BLACK = 0
|
||||
BLUE = 1
|
||||
GREEN = 2
|
||||
CYAN = 3
|
||||
RED = 4
|
||||
MAGENTA = 5
|
||||
YELLOW = 6
|
||||
GREY = 7
|
||||
|
||||
# from wincon.h
|
||||
class WinStyle(object):
|
||||
NORMAL = 0x00 # dim text, dim background
|
||||
BRIGHT = 0x08 # bright text, dim background
|
||||
|
||||
|
||||
class WinTerm(object):
|
||||
|
||||
def __init__(self):
|
||||
self._default = \
|
||||
win32.GetConsoleScreenBufferInfo(win32.STDOUT).wAttributes
|
||||
self.set_attrs(self._default)
|
||||
self._default_fore = self._fore
|
||||
self._default_back = self._back
|
||||
self._default_style = self._style
|
||||
|
||||
def get_attrs(self):
|
||||
return self._fore + self._back * 16 + self._style
|
||||
|
||||
def set_attrs(self, value):
|
||||
self._fore = value & 7
|
||||
self._back = (value >> 4) & 7
|
||||
self._style = value & WinStyle.BRIGHT
|
||||
|
||||
def reset_all(self, on_stderr=None):
|
||||
self.set_attrs(self._default)
|
||||
self.set_console(attrs=self._default)
|
||||
|
||||
def fore(self, fore=None, on_stderr=False):
|
||||
if fore is None:
|
||||
fore = self._default_fore
|
||||
self._fore = fore
|
||||
self.set_console(on_stderr=on_stderr)
|
||||
|
||||
def back(self, back=None, on_stderr=False):
|
||||
if back is None:
|
||||
back = self._default_back
|
||||
self._back = back
|
||||
self.set_console(on_stderr=on_stderr)
|
||||
|
||||
def style(self, style=None, on_stderr=False):
|
||||
if style is None:
|
||||
style = self._default_style
|
||||
self._style = style
|
||||
self.set_console(on_stderr=on_stderr)
|
||||
|
||||
def set_console(self, attrs=None, on_stderr=False):
|
||||
if attrs is None:
|
||||
attrs = self.get_attrs()
|
||||
handle = win32.STDOUT
|
||||
if on_stderr:
|
||||
handle = win32.STDERR
|
||||
win32.SetConsoleTextAttribute(handle, attrs)
|
||||
|
49
utils/log.py
49
utils/log.py
|
@ -1,5 +1,7 @@
|
|||
import logging
|
||||
from django.conf import settings
|
||||
from utils.colorama import Fore, Back, Style
|
||||
import re
|
||||
|
||||
def getlogger():
|
||||
root_logger = logging.getLogger('newsblur')
|
||||
|
@ -11,7 +13,7 @@ def getlogger():
|
|||
hdlr = logging.StreamHandler()
|
||||
else:
|
||||
hdlr = logging.FileHandler(settings.LOG_FILE)
|
||||
formatter = logging.Formatter('[%(asctime)-12s] %(message)s','%b %d %H:%M:%S')
|
||||
formatter = logging.Formatter('[%(asctime)-12s] %(message)s','%b %d %H:%M:%S')
|
||||
|
||||
hdlr.setFormatter(formatter)
|
||||
logger.addHandler(hdlr)
|
||||
|
@ -21,12 +23,51 @@ def getlogger():
|
|||
|
||||
def debug(msg):
|
||||
logger = getlogger()
|
||||
logger.debug(msg)
|
||||
logger.debug(colorize(msg))
|
||||
|
||||
def info(msg):
|
||||
logger = getlogger()
|
||||
logger.info(msg)
|
||||
logger.info(colorize(msg))
|
||||
|
||||
def error(msg):
|
||||
logger = getlogger()
|
||||
logger.error(msg)
|
||||
logger.error(colorize(msg))
|
||||
|
||||
def colorize(msg):
|
||||
params = {
|
||||
r'\-\-\->' : '~FB~SB--->~FW',
|
||||
r'\*\*\*>' : '~FB~SB~BB--->~BT~FW',
|
||||
r'\[' : '~SB~FB[~SN~FM',
|
||||
r'\]' : '~FB~SB]~FW~SN',
|
||||
}
|
||||
colors = {
|
||||
'~SB' : Style.BRIGHT,
|
||||
'~SN' : Style.NORMAL,
|
||||
'~SK' : Style.BLINK,
|
||||
'~SU' : Style.UNDERLINE,
|
||||
'~ST' : Style.RESET_ALL,
|
||||
'~FK': Fore.BLACK,
|
||||
'~FR': Fore.RED,
|
||||
'~FG': Fore.GREEN,
|
||||
'~FY': Fore.YELLOW,
|
||||
'~FB': Fore.BLUE,
|
||||
'~FM': Fore.MAGENTA,
|
||||
'~FC': Fore.CYAN,
|
||||
'~FW': Fore.WHITE,
|
||||
'~FT': Fore.RESET,
|
||||
'~BK': Back.BLACK,
|
||||
'~BR': Back.RED,
|
||||
'~BG': Back.GREEN,
|
||||
'~BY': Back.YELLOW,
|
||||
'~BB': Back.BLUE,
|
||||
'~BM': Back.MAGENTA,
|
||||
'~BC': Back.CYAN,
|
||||
'~BW': Back.WHITE,
|
||||
'~BT': Back.RESET,
|
||||
}
|
||||
for k, v in params.items():
|
||||
msg = re.sub(k, v, msg)
|
||||
msg = msg + '~ST~FW~BT'
|
||||
msg = re.sub(r'(~[A-Z]{2})', r'%(\1)s', msg)
|
||||
msg = msg % colors
|
||||
return msg
|
Loading…
Add table
Reference in a new issue