__author__ = "Vanessa Sochat"
__copyright__ = "Copyright 2022, Vanessa Sochat"
__license__ = "MPL 2.0"
import inspect
import logging as _logging
import os
import platform
import random
import sys
import threading
import rich.console
import rich.table
[docs]class Table:
"""
Format a list of dicts into a table
"""
def __init__(self, items):
self.data = items
self.max_widths = {}
self.ensure_complete()
[docs] def available_width(self, columns):
"""
Calculate available width based on fields we cannot truncate (urls)
"""
# We will determine column width based on terminal size
try:
width = os.get_terminal_size().columns
except OSError:
width = 120
# Calculate column width
column_width = int(width / len(columns))
updated = width
for _, needed in self.max_widths.items():
updated = updated - needed
# We don't have enough space
if updated < 0:
logger.warning("Terminal is too small to correctly render!")
return column_width
# Otherwise, recalculate column width taking into account truncation
# We use the updated smaller width, and break it up between columns
# that don't have a max width
return int(updated / (len(columns) - len(self.max_widths)))
[docs] def ensure_complete(self):
"""
If any data missing fields, ensure they are included
"""
fields = set()
for entry in self.data:
[fields.add(x) for x in entry.keys()]
# Ensure fields are present
for entry in self.data:
for field in fields:
if field not in entry:
entry[field] = ""
@property
def color(self):
"""
Return a random color
"""
return "color(" + str(random.choice(range(255))) + ")"
[docs] def table_columns(self):
"""
Shared function to return consistent table columns
"""
if not self.data:
return []
return list(self.data[0].keys())
[docs] def table_rows(self, columns, limit=25):
"""
Shared function to yield rows as a list
"""
# All keys are lowercase
column_width = self.available_width(columns)
for i, row in enumerate(self.data):
# have we gone over the limit?
if limit and i > limit:
return
parsed = []
for column in columns:
content = row[column]
if (
content
and len(content) > column_width
and column not in self.endpoint.truncate_list
):
content = content[:column_width] + "..."
parsed.append(content)
yield parsed
[docs] def show(self, limit=25, title=None):
"""
Pretty print a table of content
"""
table = rich.table.Table(title=title)
# Always skip these columns
seen_colors = []
# Get column titles
columns = self.table_columns()
for column in columns:
color = None
while not color or color in seen_colors:
color = self.color
table.add_column(column.capitalize(), style=color)
# Add rows
for row in self.table_rows(columns, limit=limit):
table.add_row(*row)
# And print!
console = rich.console.Console()
console.print(table, justify="left")
[docs]class LogColors:
PURPLE = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
RED = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
[docs]def underline(msg):
"""
Return an underlined message
"""
return f"{LogColors.UNDERLINE}{msg}{LogColors.ENDC}"
[docs]def add_prefix(msg, char=">>"):
"""
Add an "OKBLUE" prefix to a message
"""
return f"{LogColors.OKBLUE}{char}{LogColors.ENDC} {msg}"
[docs]class ColorizingStreamHandler(_logging.StreamHandler):
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
RESET_SEQ = LogColors.ENDC
COLOR_SEQ = "\033[%dm"
BOLD_SEQ = "\033[1m"
colors = {
"WARNING": YELLOW,
"INFO": GREEN,
"DEBUG": BLUE,
"CRITICAL": RED,
"ERROR": RED,
}
def __init__(self, nocolor=False, stream=sys.stderr, use_threads=False):
super().__init__(stream=stream)
self._output_lock = threading.Lock()
self.nocolor = nocolor or not self.can_color_tty()
[docs] def can_color_tty(self):
if "TERM" in os.environ and os.environ["TERM"] == "dumb":
return False
return self.is_tty and not platform.system() == "Windows"
@property
def is_tty(self):
isatty = getattr(self.stream, "isatty", None)
return isatty and isatty()
[docs] def emit(self, record):
with self._output_lock:
try:
self.format(record) # add the message to the record
self.stream.write(self.decorate(record))
self.stream.write(getattr(self, "terminator", "\n"))
self.flush()
except BrokenPipeError as e:
raise e
except (KeyboardInterrupt, SystemExit):
# ignore any exceptions in these cases as any relevant messages have been printed before
pass
except Exception:
self.handleError(record)
[docs] def decorate(self, record):
message = record.message
message = [message]
if not self.nocolor and record.levelname in self.colors:
message.insert(0, self.COLOR_SEQ % (30 + self.colors[record.levelname]))
message.append(self.RESET_SEQ)
return "".join(message)
[docs]class Logger:
def __init__(self):
self.logger = _logging.getLogger(__name__)
self.log_handler = [self.text_handler]
self.stream_handler = None
self.printshellcmds = False
self.quiet = False
self.logfile = None
self.last_msg_was_job_info = False
self.logfile_handler = None
[docs] def cleanup(self):
if self.logfile_handler is not None:
self.logger.removeHandler(self.logfile_handler)
self.logfile_handler.close()
self.log_handler = [self.text_handler]
[docs] def handler(self, msg):
for handler in self.log_handler:
handler(msg)
[docs] def set_stream_handler(self, stream_handler):
if self.stream_handler is not None:
self.logger.removeHandler(self.stream_handler)
self.stream_handler = stream_handler
self.logger.addHandler(stream_handler)
[docs] def set_level(self, level):
self.logger.setLevel(level)
[docs] def location(self, msg):
callerframerecord = inspect.stack()[1]
frame = callerframerecord[0]
info = inspect.getframeinfo(frame)
self.debug("{}: {info.filename}, {info.function}, {info.lineno}".format(msg, info=info))
[docs] def yellow(self, msg):
self.handler(dict(level="info", msg=msg))
[docs] def info(self, msg):
self.handler(dict(level="info", msg=msg))
[docs] def warning(self, msg):
self.handler(dict(level="warning", msg=msg))
[docs] def debug(self, msg):
self.handler(dict(level="debug", msg=msg))
[docs] def error(self, msg):
self.handler(dict(level="error", msg=msg))
[docs] def exit(self, msg, return_code=1):
self.handler(dict(level="error", msg=msg))
sys.exit(return_code)
[docs] def progress(self, done=None, total=None):
self.handler(dict(level="progress", done=done, total=total))
[docs] def shellcmd(self, msg):
if msg is not None:
msg = dict(level="shellcmd", msg=msg)
self.handler(msg)
[docs] def text_handler(self, msg):
"""The default log handler prints the output to the console.
Args:
msg (dict): the log message dictionary
"""
level = msg["level"]
if level == "info" and not self.quiet:
self.logger.info(msg["msg"])
if level == "warning":
self.logger.warning(msg["msg"])
elif level == "error":
self.logger.error(msg["msg"])
elif level == "debug":
self.logger.debug(msg["msg"])
elif level == "progress" and not self.quiet:
done = msg["done"]
total = msg["total"]
p = done / total
percent_fmt = ("{:.2%}" if p < 0.01 else "{:.0%}").format(p)
self.logger.info("{} of {} steps ({}) done".format(done, total, percent_fmt))
elif level == "shellcmd":
if self.printshellcmds:
self.logger.warning(msg["msg"])
logger = Logger()
[docs]def setup_logger(
quiet=False,
printshellcmds=False,
nocolor=False,
stdout=False,
debug=False,
use_threads=False,
wms_monitor=None,
):
# console output only if no custom logger was specified
stream_handler = ColorizingStreamHandler(
nocolor=nocolor,
stream=sys.stdout if stdout else sys.stderr,
use_threads=use_threads,
)
logger.set_stream_handler(stream_handler)
logger.set_level(_logging.DEBUG if debug else _logging.INFO)
logger.quiet = quiet
logger.printshellcmds = printshellcmds