Module cvtool.core.io

Functions for input-output operations within the cvtools library

Expand source code
'''
Functions for input-output operations within the cvtools library

'''

import os
import json
import shutil
import tempfile
import inspect
import glob
import tempfile
import atexit
import shutil
import time
from typing import Union, Dict, Any, OrderedDict

def exists(path: str, error: bool = True) -> Union[str, bool]:
    """
    Check if a given path exists.

    Args:
        path (str): The path to check.
        error (bool): Flag indicating whether to raise an error if the path does not exist.
    
    Returns:
        Union[str, bool]: The path if it exists, False otherwise.
    
    Raises:
        FileNotFoundError: If the path does not exist and error is set to True.
    """
    if path and not os.path.exists(path):
        if error:
            raise FileNotFoundError(f"The path '{path}' does not exist.")
        else:
            return False
    return path

def combine(new: dict, old: dict) -> dict:
    """
    Combine two dictionaries.

    Args:
        new (dict): The new dictionary to merge.
        old (dict): The old dictionary to merge into.

    Returns:
        dict: The merged_dict dictionary.
    """
    merged_dict = old.copy()
    for key, value in new.items():
        if key in merged_dict and isinstance(merged_dict[key], dict) and isinstance(value, dict):
            merged_dict[key] = combine(value, merged_dict[key])
        else:
            merged_dict[key] = value
    return merged_dict

def json_write(jsndata: dict, filename: str, mode: str = 'w', sort: bool = False) -> None:
    """
    Write JSON data to a file.

    Args:
        jsndata (dict): The JSON data to write.
        filename (str): The name of the file to write to.
        mode (str): The file mode (default is 'w' for write).
        sort (bool): Whether to sort the keys in the JSON output.

    Raises:
        IOError: If there is an error writing the JSON data to the file.
    """
    if not filename.endswith('.json'):
        filename += '.json'

    with open(filename, mode) as file:
        json.dump(jsndata, file, indent=4, sort_keys=sort)

def json_read(file_path: str, mode: str = 'r') -> dict:
    """
    Reads the contents of a JSON file.

    Args:
        file_path (str): Path to the JSON file.
        mode (str): The file mode (default is 'r' for read).

    Returns:
        dict: Contents of the file as a dictionary, or an empty dictionary if the file is not found.
    """
    try:
        with open(file_path, mode) as file:
            return json.load(file)
    except FileNotFoundError:
        return {}

def get_current_function_name() -> str:
    """
    Get the name of the calling function.

    Returns:
        str: The name of the calling function.
    """
    current_frame = inspect.currentframe()
    caller_frame = inspect.getouterframes(current_frame)[1]
    function_name = caller_frame.function
    return function_name

def ensure_suffix(var: str, end: str) -> str:
    """
    Ensure that a string ends with a specified suffix.

    Args:
        var (str): The input string.
        end (str): The suffix to ensure.

    Returns:
        str: The modified string with the ensured suffix.
    """
    if not var.endswith(end):
        var += end
    return var

def relpath(path: str, relto: str = None) -> str:
    """
    Get the relative path of a given path.

    Args:
        path (str): The path to get the relative path for.
        relto (str): The base path to make the path relative to (default is current working directory).

    Returns:
        str: The relative path of the input path.
    """
    abs_path = os.path.abspath(path)
    current_dir = relto or os.getcwd()
    return os.path.relpath(abs_path, current_dir)

def mkdir(directory_path: str) -> None:
    """
    Create a directory.

    Args:
        directory_path (str): The path of the directory to create.

    Raises:
        FileExistsError: If the directory already exists.
        Exception: If an error occurs while creating the directory.
    """
    try:
        os.mkdir(directory_path)
        print(f"Directory '{directory_path}' created successfully.")
    except FileExistsError:
        print(f"Directory '{directory_path}' already exists.")
    except Exception as e:
        print(f"An error occurred: {e}")

def rmdir(directory_path: str) -> None:
    """
    Remove a directory and its contents.

    Args:
        directory_path (str): The path of the directory to remove.

    Raises:
        Exception: If an error occurs while deleting the directory.
    """
    try:
        shutil.rmtree(directory_path)
        print(f"Removed directory: {directory_path}")
    except Exception as e:
        # print(f"Error deleting directory {directory_path}: {e}")
        ...

def write_temp(prefix: str, content: dict) -> None:
    """
    Write content to a temporary JSON file.

    Args:
        prefix (str): The prefix for the temporary file name.
        content (dict): The content to write to the temporary file.
    """
    with tempfile.NamedTemporaryFile(mode='w', prefix=prefix, delete=False, suffix='.json') as temp_file:
        json.dump(content, temp_file, indent=4)

def read_temp(prefix: str, index: int = 0) -> dict:
    """
    Read content from a temporary JSON file.

    Args:
        prefix (str): The prefix of the temporary file name.
        index (int): The index of the temporary file to read from (default is 0).

    Returns:
        dict: The content read from the temporary file as a dictionary, or None if the file does not exist.
    """ 
    tmp = tempfile.gettempdir()
    existing = glob.glob(f'{tmp}/{prefix}*')

    if existing:
        with open(existing[index], 'r') as file:
            content = json.load(file)
            return content
        
    return None

def rm_temp(prefix: str, index: int = 0) -> bool:
    """
    Remove a temporary file with the specified prefix and index.

    Args:
        prefix (str): The prefix of the temporary file name.
        index (int): The index of the temporary file to remove (default is 0).

    Returns:
        bool: True if the file was successfully removed, False otherwise.
    """
    tmp = tempfile.gettempdir()
    existing = glob.glob(f'{tmp}/{prefix}*')

    for tmpfl in existing: 
        try:
            os.remove(tmpfl)
            return True
        except Exception as e:
            print(f"Error: {e}")
            return False

    return False




def mk_tempdir():
    # Create a temporary directory
    temp_dir = tempfile.mkdtemp()
    
    # Register a cleanup function to remove the temporary directory on program exit
    atexit.register(shutil.rmtree, temp_dir)
    
    # Return the path of the temporary directory
    return temp_dir+'/'


def relative_to(absolte,current):
        
    absolute = os.path.abspath(absolte)
    # Get the absolute path of the current script
    current_script_path = os.path.abspath(current)

    # Calculate the relative path
    relative_path = os.path.relpath(absolte, current_script_path)

    return relative_path


def terminal():
    size = os.get_terminal_size()
    return size

def filter_dict(input_dict, keys_to_keep):
    """
    Filters a dictionary to contain only key-value pairs with keys present in keys_to_keep list.
    
    Args:
        input_dict (dict): Input dictionary.
        keys_to_keep (list): List of keys to keep in the filtered dictionary.
        
    Returns:
        dict: Filtered dictionary containing only specified keys.
    """
    return {key: input_dict[key] for key in keys_to_keep if key in input_dict}

def sort_dict(d,reverse = False):
    """
    Recursively sort a dictionary by its keys, including nested dictionaries.

    Args:
        d (dict): Dictionary to be sorted.

    Returns:
        dict: Sorted dictionary.
    """
    if isinstance(d, dict):
        sorted_dict = OrderedDict()
        for key in sorted(d,reverse=reverse):
            sorted_dict[key] = sort_dict(d[key],reverse)
        return sorted_dict
    else:
        return d
    # return dict(sorted(d.items()))

def merge_dict(dict1, dict2, overwrite_keys=None):
    """
    Merge two dictionaries together.
    
    Args:
        dict1 (dict): First dictionary to merge.
        dict2 (dict): Second dictionary to merge.
        overwrite_keys (list): List of keys that are allowed to be overwritten if they exist in both dictionaries.

    ** dict 2 will overwrite values from dict one if those are allowed. 
    
    Returns:
        dict: Merged dictionary.
    
    Raises:
        ValueError: If any identical first-level keys are found and not included in overwrite_keys.
    """
    if overwrite_keys is None:
        overwrite_keys = set()

    conflicting_keys = False
    if overwrite_keys != 'all':
        common_keys = set(dict1) & set(dict2)
        conflicting_keys = common_keys - overwrite_keys
    
    if conflicting_keys:
        raise ValueError(f"Duplicate keys found: {', '.join(conflicting_keys)}. Use overwrite_keys list to allow overwriting or correct the error. ")
    
    # merged_dict = {**dict1, **dict2}

    merged_dict = dict(dict1)
    for key, value in dict2.items():
        if key in merged_dict and isinstance(merged_dict[key], dict) and isinstance(value, dict):
            merged_dict[key] = merge_dict(merged_dict[key], value, overwrite_keys)
        elif key in merged_dict and isinstance(merged_dict[key], list) and isinstance(value, list):
            merged_dict[key].extend(value)
            merged_dict[key] = list(set(merged_dict[key]))

        else:
            merged_dict[key] = value

    return merged_dict

def merge_entries(dict1, dict2, append=True):
    '''
    When we are merging multiple items

    append = true :: combines the values
    append =false :: replaces the values 

    '''
    common_keys = set(dict1) & set(dict2)
    for key in common_keys:
        if not append:
            # dict1[key] = dict1[key].update(dict2[key])

            # for k2 in dict2[key]:
            #     print(k2)
            #     dict1[key][k2] = dict2[key][k2]

            dict1[key] = {**dict1[key],**dict2[key]}
        else:
            dict1[key] = merge_dict(dict1[key],dict2[key],'all')

    return dict1
    



# def copy_files(src_dir, dest_dir, prefix=''):
#     # Walk through the source directory
#     for foldername, _, filenames in os.walk(src_dir):
#         # Create corresponding folder in the destination directory
#         dest_folder = dest_dir
#         mkdir(dest_folder)
        
#         # Copy files from the current folder to the destination folder
#         for filename in filenames:
#             # Add the specified prefix to individual files
#             new_filename = prefix + filename
#             src_file = os.path.join(foldername, filename)
#             dest_file = os.path.join(dest_folder, new_filename)
#             shutil.copy2(src_file, dest_file)  # Use shutil.copy2 to preserve metadata like timestamps


def copy_files(src_dir, dest_dir, prefix=''):
    # Create the destination directory if it doesn't exist
    os.makedirs(dest_dir, exist_ok=True)

    # Recursive function to handle nested directories and files
    def copy_recursive(src, dest):
        for item in os.listdir(src):
            src_item = os.path.join(src, item)

            dir_item = dest_item = os.path.join(dest, item)

            if prefix not in item:
                item = prefix+item

            dest_item = os.path.join(dest, item)
            
            if os.path.isdir(src_item):
                # If it's a directory, copy it recursively
                os.makedirs(dir_item, exist_ok=True)
                copy_recursive(src_item, dir_item)
            else:
                # If it's a file, copy it with the specified prefix
                shutil.copy2(src_item, dest_item)

    copy_recursive(src_dir, dest_dir)


def rm_older(directory_path,minutes = 5):
    current_time = time.time()
    five_minutes_ago = current_time - minutes * 60  # 5 minutes ago in seconds

    for foldername, subfolders, filenames in os.walk(directory_path):
        for filename in filenames:
            file_path = os.path.join(foldername, filename)
            # Get the last modification time of the file
            file_modified_time = os.path.getmtime(file_path)

            # Check if the file was not modified in the last 5 minutes
            if file_modified_time < five_minutes_ago:
                try:
                    # Remove the file
                    os.remove(file_path)
                    # print(f"Removed old file: {file_path}")
                except Exception as e:
                    print(f"Error occurred while removing file {file_path}: {e}")

Functions

def combine(new: dict, old: dict) ‑> dict

Combine two dictionaries.

Args

new : dict
The new dictionary to merge.
old : dict
The old dictionary to merge into.

Returns

dict
The merged_dict dictionary.
Expand source code
def combine(new: dict, old: dict) -> dict:
    """
    Combine two dictionaries.

    Args:
        new (dict): The new dictionary to merge.
        old (dict): The old dictionary to merge into.

    Returns:
        dict: The merged_dict dictionary.
    """
    merged_dict = old.copy()
    for key, value in new.items():
        if key in merged_dict and isinstance(merged_dict[key], dict) and isinstance(value, dict):
            merged_dict[key] = combine(value, merged_dict[key])
        else:
            merged_dict[key] = value
    return merged_dict
def copy_files(src_dir, dest_dir, prefix='')
Expand source code
def copy_files(src_dir, dest_dir, prefix=''):
    # Create the destination directory if it doesn't exist
    os.makedirs(dest_dir, exist_ok=True)

    # Recursive function to handle nested directories and files
    def copy_recursive(src, dest):
        for item in os.listdir(src):
            src_item = os.path.join(src, item)

            dir_item = dest_item = os.path.join(dest, item)

            if prefix not in item:
                item = prefix+item

            dest_item = os.path.join(dest, item)
            
            if os.path.isdir(src_item):
                # If it's a directory, copy it recursively
                os.makedirs(dir_item, exist_ok=True)
                copy_recursive(src_item, dir_item)
            else:
                # If it's a file, copy it with the specified prefix
                shutil.copy2(src_item, dest_item)

    copy_recursive(src_dir, dest_dir)
def ensure_suffix(var: str, end: str) ‑> str

Ensure that a string ends with a specified suffix.

Args

var : str
The input string.
end : str
The suffix to ensure.

Returns

str
The modified string with the ensured suffix.
Expand source code
def ensure_suffix(var: str, end: str) -> str:
    """
    Ensure that a string ends with a specified suffix.

    Args:
        var (str): The input string.
        end (str): The suffix to ensure.

    Returns:
        str: The modified string with the ensured suffix.
    """
    if not var.endswith(end):
        var += end
    return var
def exists(path: str, error: bool = True) ‑> Union[str, bool]

Check if a given path exists.

Args

path : str
The path to check.
error : bool
Flag indicating whether to raise an error if the path does not exist.

Returns

Union[str, bool]
The path if it exists, False otherwise.

Raises

FileNotFoundError
If the path does not exist and error is set to True.
Expand source code
def exists(path: str, error: bool = True) -> Union[str, bool]:
    """
    Check if a given path exists.

    Args:
        path (str): The path to check.
        error (bool): Flag indicating whether to raise an error if the path does not exist.
    
    Returns:
        Union[str, bool]: The path if it exists, False otherwise.
    
    Raises:
        FileNotFoundError: If the path does not exist and error is set to True.
    """
    if path and not os.path.exists(path):
        if error:
            raise FileNotFoundError(f"The path '{path}' does not exist.")
        else:
            return False
    return path
def filter_dict(input_dict, keys_to_keep)

Filters a dictionary to contain only key-value pairs with keys present in keys_to_keep list.

Args

input_dict : dict
Input dictionary.
keys_to_keep : list
List of keys to keep in the filtered dictionary.

Returns

dict
Filtered dictionary containing only specified keys.
Expand source code
def filter_dict(input_dict, keys_to_keep):
    """
    Filters a dictionary to contain only key-value pairs with keys present in keys_to_keep list.
    
    Args:
        input_dict (dict): Input dictionary.
        keys_to_keep (list): List of keys to keep in the filtered dictionary.
        
    Returns:
        dict: Filtered dictionary containing only specified keys.
    """
    return {key: input_dict[key] for key in keys_to_keep if key in input_dict}
def get_current_function_name() ‑> str

Get the name of the calling function.

Returns

str
The name of the calling function.
Expand source code
def get_current_function_name() -> str:
    """
    Get the name of the calling function.

    Returns:
        str: The name of the calling function.
    """
    current_frame = inspect.currentframe()
    caller_frame = inspect.getouterframes(current_frame)[1]
    function_name = caller_frame.function
    return function_name
def json_read(file_path: str, mode: str = 'r') ‑> dict

Reads the contents of a JSON file.

Args

file_path : str
Path to the JSON file.
mode : str
The file mode (default is 'r' for read).

Returns

dict
Contents of the file as a dictionary, or an empty dictionary if the file is not found.
Expand source code
def json_read(file_path: str, mode: str = 'r') -> dict:
    """
    Reads the contents of a JSON file.

    Args:
        file_path (str): Path to the JSON file.
        mode (str): The file mode (default is 'r' for read).

    Returns:
        dict: Contents of the file as a dictionary, or an empty dictionary if the file is not found.
    """
    try:
        with open(file_path, mode) as file:
            return json.load(file)
    except FileNotFoundError:
        return {}
def json_write(jsndata: dict, filename: str, mode: str = 'w', sort: bool = False) ‑> None

Write JSON data to a file.

Args

jsndata : dict
The JSON data to write.
filename : str
The name of the file to write to.
mode : str
The file mode (default is 'w' for write).
sort : bool
Whether to sort the keys in the JSON output.

Raises

IOError
If there is an error writing the JSON data to the file.
Expand source code
def json_write(jsndata: dict, filename: str, mode: str = 'w', sort: bool = False) -> None:
    """
    Write JSON data to a file.

    Args:
        jsndata (dict): The JSON data to write.
        filename (str): The name of the file to write to.
        mode (str): The file mode (default is 'w' for write).
        sort (bool): Whether to sort the keys in the JSON output.

    Raises:
        IOError: If there is an error writing the JSON data to the file.
    """
    if not filename.endswith('.json'):
        filename += '.json'

    with open(filename, mode) as file:
        json.dump(jsndata, file, indent=4, sort_keys=sort)
def merge_dict(dict1, dict2, overwrite_keys=None)

Merge two dictionaries together.

Args

dict1 : dict
First dictionary to merge.
dict2 : dict
Second dictionary to merge.
overwrite_keys : list
List of keys that are allowed to be overwritten if they exist in both dictionaries.

** dict 2 will overwrite values from dict one if those are allowed.

Returns

dict
Merged dictionary.

Raises

ValueError
If any identical first-level keys are found and not included in overwrite_keys.
Expand source code
def merge_dict(dict1, dict2, overwrite_keys=None):
    """
    Merge two dictionaries together.
    
    Args:
        dict1 (dict): First dictionary to merge.
        dict2 (dict): Second dictionary to merge.
        overwrite_keys (list): List of keys that are allowed to be overwritten if they exist in both dictionaries.

    ** dict 2 will overwrite values from dict one if those are allowed. 
    
    Returns:
        dict: Merged dictionary.
    
    Raises:
        ValueError: If any identical first-level keys are found and not included in overwrite_keys.
    """
    if overwrite_keys is None:
        overwrite_keys = set()

    conflicting_keys = False
    if overwrite_keys != 'all':
        common_keys = set(dict1) & set(dict2)
        conflicting_keys = common_keys - overwrite_keys
    
    if conflicting_keys:
        raise ValueError(f"Duplicate keys found: {', '.join(conflicting_keys)}. Use overwrite_keys list to allow overwriting or correct the error. ")
    
    # merged_dict = {**dict1, **dict2}

    merged_dict = dict(dict1)
    for key, value in dict2.items():
        if key in merged_dict and isinstance(merged_dict[key], dict) and isinstance(value, dict):
            merged_dict[key] = merge_dict(merged_dict[key], value, overwrite_keys)
        elif key in merged_dict and isinstance(merged_dict[key], list) and isinstance(value, list):
            merged_dict[key].extend(value)
            merged_dict[key] = list(set(merged_dict[key]))

        else:
            merged_dict[key] = value

    return merged_dict
def merge_entries(dict1, dict2, append=True)

When we are merging multiple items

append = true :: combines the values append =false :: replaces the values

Expand source code
def merge_entries(dict1, dict2, append=True):
    '''
    When we are merging multiple items

    append = true :: combines the values
    append =false :: replaces the values 

    '''
    common_keys = set(dict1) & set(dict2)
    for key in common_keys:
        if not append:
            # dict1[key] = dict1[key].update(dict2[key])

            # for k2 in dict2[key]:
            #     print(k2)
            #     dict1[key][k2] = dict2[key][k2]

            dict1[key] = {**dict1[key],**dict2[key]}
        else:
            dict1[key] = merge_dict(dict1[key],dict2[key],'all')

    return dict1
def mk_tempdir()
Expand source code
def mk_tempdir():
    # Create a temporary directory
    temp_dir = tempfile.mkdtemp()
    
    # Register a cleanup function to remove the temporary directory on program exit
    atexit.register(shutil.rmtree, temp_dir)
    
    # Return the path of the temporary directory
    return temp_dir+'/'
def mkdir(directory_path: str) ‑> None

Create a directory.

Args

directory_path : str
The path of the directory to create.

Raises

FileExistsError
If the directory already exists.
Exception
If an error occurs while creating the directory.
Expand source code
def mkdir(directory_path: str) -> None:
    """
    Create a directory.

    Args:
        directory_path (str): The path of the directory to create.

    Raises:
        FileExistsError: If the directory already exists.
        Exception: If an error occurs while creating the directory.
    """
    try:
        os.mkdir(directory_path)
        print(f"Directory '{directory_path}' created successfully.")
    except FileExistsError:
        print(f"Directory '{directory_path}' already exists.")
    except Exception as e:
        print(f"An error occurred: {e}")
def read_temp(prefix: str, index: int = 0) ‑> dict

Read content from a temporary JSON file.

Args

prefix : str
The prefix of the temporary file name.
index : int
The index of the temporary file to read from (default is 0).

Returns

dict
The content read from the temporary file as a dictionary, or None if the file does not exist.
Expand source code
def read_temp(prefix: str, index: int = 0) -> dict:
    """
    Read content from a temporary JSON file.

    Args:
        prefix (str): The prefix of the temporary file name.
        index (int): The index of the temporary file to read from (default is 0).

    Returns:
        dict: The content read from the temporary file as a dictionary, or None if the file does not exist.
    """ 
    tmp = tempfile.gettempdir()
    existing = glob.glob(f'{tmp}/{prefix}*')

    if existing:
        with open(existing[index], 'r') as file:
            content = json.load(file)
            return content
        
    return None
def relative_to(absolte, current)
Expand source code
def relative_to(absolte,current):
        
    absolute = os.path.abspath(absolte)
    # Get the absolute path of the current script
    current_script_path = os.path.abspath(current)

    # Calculate the relative path
    relative_path = os.path.relpath(absolte, current_script_path)

    return relative_path
def relpath(path: str, relto: str = None) ‑> str

Get the relative path of a given path.

Args

path : str
The path to get the relative path for.
relto : str
The base path to make the path relative to (default is current working directory).

Returns

str
The relative path of the input path.
Expand source code
def relpath(path: str, relto: str = None) -> str:
    """
    Get the relative path of a given path.

    Args:
        path (str): The path to get the relative path for.
        relto (str): The base path to make the path relative to (default is current working directory).

    Returns:
        str: The relative path of the input path.
    """
    abs_path = os.path.abspath(path)
    current_dir = relto or os.getcwd()
    return os.path.relpath(abs_path, current_dir)
def rm_older(directory_path, minutes=5)
Expand source code
def rm_older(directory_path,minutes = 5):
    current_time = time.time()
    five_minutes_ago = current_time - minutes * 60  # 5 minutes ago in seconds

    for foldername, subfolders, filenames in os.walk(directory_path):
        for filename in filenames:
            file_path = os.path.join(foldername, filename)
            # Get the last modification time of the file
            file_modified_time = os.path.getmtime(file_path)

            # Check if the file was not modified in the last 5 minutes
            if file_modified_time < five_minutes_ago:
                try:
                    # Remove the file
                    os.remove(file_path)
                    # print(f"Removed old file: {file_path}")
                except Exception as e:
                    print(f"Error occurred while removing file {file_path}: {e}")
def rm_temp(prefix: str, index: int = 0) ‑> bool

Remove a temporary file with the specified prefix and index.

Args

prefix : str
The prefix of the temporary file name.
index : int
The index of the temporary file to remove (default is 0).

Returns

bool
True if the file was successfully removed, False otherwise.
Expand source code
def rm_temp(prefix: str, index: int = 0) -> bool:
    """
    Remove a temporary file with the specified prefix and index.

    Args:
        prefix (str): The prefix of the temporary file name.
        index (int): The index of the temporary file to remove (default is 0).

    Returns:
        bool: True if the file was successfully removed, False otherwise.
    """
    tmp = tempfile.gettempdir()
    existing = glob.glob(f'{tmp}/{prefix}*')

    for tmpfl in existing: 
        try:
            os.remove(tmpfl)
            return True
        except Exception as e:
            print(f"Error: {e}")
            return False

    return False
def rmdir(directory_path: str) ‑> None

Remove a directory and its contents.

Args

directory_path : str
The path of the directory to remove.

Raises

Exception
If an error occurs while deleting the directory.
Expand source code
def rmdir(directory_path: str) -> None:
    """
    Remove a directory and its contents.

    Args:
        directory_path (str): The path of the directory to remove.

    Raises:
        Exception: If an error occurs while deleting the directory.
    """
    try:
        shutil.rmtree(directory_path)
        print(f"Removed directory: {directory_path}")
    except Exception as e:
        # print(f"Error deleting directory {directory_path}: {e}")
        ...
def sort_dict(d, reverse=False)

Recursively sort a dictionary by its keys, including nested dictionaries.

Args

d : dict
Dictionary to be sorted.

Returns

dict
Sorted dictionary.
Expand source code
def sort_dict(d,reverse = False):
    """
    Recursively sort a dictionary by its keys, including nested dictionaries.

    Args:
        d (dict): Dictionary to be sorted.

    Returns:
        dict: Sorted dictionary.
    """
    if isinstance(d, dict):
        sorted_dict = OrderedDict()
        for key in sorted(d,reverse=reverse):
            sorted_dict[key] = sort_dict(d[key],reverse)
        return sorted_dict
    else:
        return d
    # return dict(sorted(d.items()))
def terminal()
Expand source code
def terminal():
    size = os.get_terminal_size()
    return size
def write_temp(prefix: str, content: dict) ‑> None

Write content to a temporary JSON file.

Args

prefix : str
The prefix for the temporary file name.
content : dict
The content to write to the temporary file.
Expand source code
def write_temp(prefix: str, content: dict) -> None:
    """
    Write content to a temporary JSON file.

    Args:
        prefix (str): The prefix for the temporary file name.
        content (dict): The content to write to the temporary file.
    """
    with tempfile.NamedTemporaryFile(mode='w', prefix=prefix, delete=False, suffix='.json') as temp_file:
        json.dump(content, temp_file, indent=4)