__author__ = "Vanessa Sochat"
__copyright__ = "Copyright 2022, Vanessa Sochat"
__license__ = "MPL 2.0"
import errno
import hashlib
import json
import os
import re
import shutil
import stat
import tempfile
from pipelib.logger import logger
[docs]def list_modules(path):
"""
List python modules under a path.
"""
module_files = []
for root, dirs, files in os.walk(path):
for filename in files:
if filename.endswith(".py") and not filename.startswith("_"):
module_files.append(os.path.join(path, root, filename))
return module_files
[docs]def creation_date(filename):
"""
Get the creation date, and fallback to modified date.
"""
stat = os.stat(filename)
try:
return stat.st_birthtime
except AttributeError:
return stat.st_mtime
[docs]def mkdirp(dirnames):
"""
Create one or more directories
"""
for dirname in dirnames:
mkdir_p(dirname)
[docs]def mkdir_p(path):
"""mkdir_p attempts to get the same functionality as mkdir -p
:param path: the path to create.
"""
try:
os.makedirs(path)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
logger.exit("Error creating path %s, exiting." % path)
[docs]def get_tmpfile(tmpdir=None, prefix=""):
"""
Get a temporary file with an optional prefix.
"""
# First priority for the base goes to the user requested.
tmpdir = get_tmpdir(tmpdir)
# If tmpdir is set, add to prefix
if tmpdir:
prefix = os.path.join(tmpdir, os.path.basename(prefix))
fd, tmp_file = tempfile.mkstemp(prefix=prefix)
os.close(fd)
return tmp_file
[docs]def get_tmpdir(tmpdir=None, prefix="", create=True):
"""
Get a temporary directory for an operation.
"""
tmpdir = tmpdir or tempfile.gettempdir()
prefix = prefix or "pipelib-tmp"
prefix = "%s.%s" % (prefix, next(tempfile._get_candidate_names()))
tmpdir = os.path.join(tmpdir, prefix)
if not os.path.exists(tmpdir) and create is True:
os.mkdir(tmpdir)
return tmpdir
[docs]def recursive_find(base, pattern=None):
"""
Find filenames that match a particular pattern, and yield them.
"""
# We can identify modules by finding module.lua
for root, folders, files in os.walk(base):
for file in files:
fullpath = os.path.abspath(os.path.join(root, file))
if pattern and not re.search(pattern, fullpath):
continue
yield fullpath
[docs]def get_file_hash(image_path, algorithm="sha256"):
"""
Return an sha256 hash of the file based on a criteria level.
"""
try:
hasher = getattr(hashlib, algorithm)()
except AttributeError:
logger.error("%s is an invalid algorithm.")
logger.exit(" ".join(hashlib.algorithms_guaranteed))
with open(image_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hasher.update(chunk)
return hasher.hexdigest()
[docs]def copyfile(source, destination, force=True):
"""
Copy a file from a source to its destination.
"""
# Case 1: It's already there, we aren't replacing it :)
if source == destination and force is False:
return destination
# Case 2: It's already there, we ARE replacing it :)
if os.path.exists(destination) and force is True:
os.remove(destination)
shutil.copyfile(source, destination)
return destination
[docs]def write_file(filename, content, mode="w", exec=False):
"""
Write content to a filename
"""
with open(filename, mode) as filey:
filey.writelines(content)
if exec:
st = os.stat(filename)
# Execute / search permissions for the user and others
os.chmod(filename, st.st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
return filename
[docs]def write_json(json_obj, filename, mode="w", print_pretty=True):
"""Write json to a filename"""
with open(filename, mode) as filey:
if print_pretty:
filey.writelines(print_json(json_obj))
else:
filey.writelines(json.dumps(json_obj))
return filename
[docs]def print_json(json_obj):
"""Print json pretty"""
return json.dumps(json_obj, indent=4, separators=(",", ": "))
[docs]def read_file(filename, mode="r"):
"""Read a file."""
with open(filename, mode) as filey:
content = filey.read()
return content
[docs]def read_json(filename, mode="r"):
"""Read a json file to a dictionary."""
return json.loads(read_file(filename))