Source code for license_updater.utils.fileio

__author__ = "Vanessa Sochat"
__copyright__ = "Copyright 2022-2023, Vanessa Sochat"
__license__ = "MPL 2.0"

import errno
import json
import os
import re
import shutil
import stat
import tempfile

from license_updater.logger import logger


[docs]def mkdirp(dirnames): """ Create one or more directories """ for dirname in dirnames: mkdir_p(dirname)
[docs]def mkdir_p(path): """ Make a directory path if it does not exist, akin to mkdir -p """ try: os.makedirs(path) except OSError as e: if e.errno == errno.EEXIST and os.path.isdir(path): pass else: logger.exit("Error creating path %s, exiting." % path)
[docs]def get_tmpfile(tmpdir=None, prefix=""): """ Get a temporary file with an optional prefix. """ # First priority for the base goes to the user requested. tmpdir = get_tmpdir(tmpdir) # If tmpdir is set, add to prefix if tmpdir: prefix = os.path.join(tmpdir, os.path.basename(prefix)) fd, tmp_file = tempfile.mkstemp(prefix=prefix) os.close(fd) return tmp_file
[docs]def get_tmpdir(tmpdir=None, prefix="", create=True): """ Get a temporary directory for an operation. """ tmpdir = tmpdir or tempfile.gettempdir() prefix = prefix or "license-updater-tmp" prefix = "%s.%s" % (prefix, next(tempfile._get_candidate_names())) tmpdir = os.path.join(tmpdir, prefix) if not os.path.exists(tmpdir) and create is True: os.mkdir(tmpdir) return tmpdir
[docs]def recursive_find(base, pattern=None): """ Find filenames that match a particular pattern, and yield them. """ # We can identify modules by finding module.lua for root, _, files in os.walk(base): for file in files: fullpath = os.path.abspath(os.path.join(root, file)) if pattern and not re.search(pattern, fullpath): continue yield fullpath
[docs]def copyfile(source, destination, force=True): """ Copy a file from a source to its destination. """ # Case 1: It's already there, we aren't replacing it :) if source == destination and force is False: return destination # Case 2: It's already there, we ARE replacing it :) if os.path.exists(destination) and force is True: os.remove(destination) shutil.copyfile(source, destination) return destination
[docs]def write_file(content, filename, mode="w", exec=False): """ Write content to a filename """ with open(filename, mode) as filey: filey.writelines(content) if exec: st = os.stat(filename) # Execute / search permissions for the user and others os.chmod(filename, st.st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH) return filename
[docs]def write_json(json_obj, filename, mode="w"): """ Write json to a filename """ with open(filename, mode) as filey: filey.writelines(print_json(json_obj)) return filename
[docs]def read_file(filename, mode="r"): """ Read a file. """ with open(filename, mode) as filey: content = filey.read() return content
[docs]def read_json(filename): """ Read a json file to a dictionary. """ return json.loads(read_file(filename))