Module cvtool.core.version_control
This file contains all the git, github and versioning functions for the cvtools library.
Expand source code
'''
This file contains all the git, github and versioning functions for the cvtools library.
'''
import requests
import subprocess
import os,re,glob
from typing import Dict
from .custom_errors import GitAPIError
from .io import read_temp, write_temp, exists, rm_temp
from git import Repo
from datetime import datetime
def last_commit(repo_owner: str, repo_name: str) -> Dict[str, str]:
"""
Retrieve information about the latest commit of a GitHub repository.
Args:
repo_owner (str): Owner of the repository.
repo_name (str): Repository name.
Returns:
dict: Dictionary containing details of the latest commit.
"""
api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/commits"
today_date = datetime.today().strftime("%Y%m%d")
envname = f'cvtool.{repo_owner}.{repo_name}.{today_date}'
commit_info = read_temp(envname)
if commit_info:
print(commit_info.get('api_url') , api_url)
if commit_info.get('api_url') == api_url:
print('Loading saved repo metadata from git.')
return commit_info
else:
response = requests.get(api_url)
if response.status_code == 200:
commits_data = response.json()
if commits_data:
latest_commit = commits_data[0]
commit_info = {
"api_url": api_url,
"SHA": latest_commit["sha"],
"Message": latest_commit["commit"]["message"],
"Author": f"{latest_commit['commit']['author']['name']} <{latest_commit['commit']['author']['email']}>",
"Committer": f"{latest_commit['commit']['committer']['name']} <{latest_commit['commit']['committer']['email']}>",
"Date": latest_commit["commit"]["author"]["date"]
}
write_temp(envname, commit_info)
return commit_info
else:
raise GitAPIError(f"No commits found in the repository: {api_url}")
else:
raise GitAPIError(f"Failed to fetch commit information from the GitHub API: {api_url}")
def clear_last(repo_owner: str, repo_name: str) -> Dict[str, str]:
today_date = datetime.today().strftime("%Y%m%d")
envname = f'cvtool.{repo_owner}.{repo_name}.{today_date}'
rm_temp(envname)
def git_user() -> Dict[str, str]:
"""
Retrieve the Git username and email.
Returns:
dict: Dictionary containing 'user' and 'email' keys with corresponding values.
"""
try:
return get_user()
except subprocess.CalledProcessError:
return get_user(shell=True)
def get_github_version(repo_owner: str = '', repo_name: str = '') -> str:
"""
Get the latest release version from GitHub API.
Args:
repo_owner (str): Owner of the repository.
repo_name (str): Repository name.
Returns:
str: Latest release version or '-0.0.0' if not available.
"""
if not repo_owner and not repo_name:
if 'cmor_github_owner' in os.environ and 'cmor_github_repo' in os.environ:
owner = os.environ['cmor_github_owner']
repo = os.environ['cmor_github_repo']
else:
command = ["git", "config", "--get", "remote.origin.url"]
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True)
output, _ = process.communicate()
if process.returncode == 0:
remote_url = output.strip()
owner, repo = remote_url.split("/")[-2:]
os.environ['cmor_github_owner'] = owner
os.environ['cmor_github_repo'] = repo
else:
raise RuntimeError("Failed to get repository information.")
try:
today_date = datetime.today().strftime("%Y%m%d")
envname = f'cvtool.{repo_owner}.{repo_name}.{today_date}'
json_data = read_temp(envname)
if not json_data or "tag_name" not in json_data:
api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/releases/latest"
response = requests.get(api_url)
response.raise_for_status()
# GitAPIError(f"Failed to fetch commit information from the GitHub API: {api_url}")
json_data = response.json()
write_temp(envname, json_data)
version = json_data.get("tag_name")
return version if version else "-0.0.0"
except (requests.RequestException, ValueError, IndexError) as e:
print(f"Error: {str(e)}")
return "-0.0.0"
def get_commit_hashes(owner: str, repo: str, latest: bool = True) -> str:
"""
Get the latest or previous commit hash from GitHub API.
Args:
owner (str): Owner of the repository.
repo (str): Repository name.
latest (bool): True to get the latest commit hash, False to get the previous commit hash.
Returns:
str: Commit hash or None if failed to retrieve.
"""
if 'cmor_github_hashes' in os.environ:
hashes = os.environ['cmor_github_hashes'].split('~')
return hashes[1 if latest else 0]
else:
try:
api_url = f"https://api.github.com/repos/{owner}/{repo}/commits"
response = requests.get(api_url)
response.raise_for_status()
json_data = response.json()
latest_commit = json_data[0]["sha"]
previous_commit = json_data[1]["sha"]
os.environ['cmor_github_hashes'] = f"{previous_commit}~{latest_commit}"
return latest_commit if latest else previous_commit
except (requests.RequestException, ValueError, IndexError) as e:
print(f"Error: {str(e)}")
return None
def repo_commits(repo):
"""
Print information about commits in a Git repository.
Args:
repo: Git repository object.
"""
for commit in repo.iter_commits():
print("Commit SHA:", commit.hexsha)
print("Author:", commit.author.name, "<" + commit.author.email + ">")
print("Committer:", commit.committer.name, "<" + commit.committer.email + ">")
print("Date:", commit.authored_datetime)
print("Message:", commit.message)
print("-" * 50)
def query_repo(repo_path,verbose = True):
exists(repo_path)
repo = Repo(repo_path)
# Get the current commit SHA
current_commit = repo.head.object.hexsha
# Get the latest tag version
tags = repo.tags
latest_tag = tags[-1]
# Find the closest tag released prior to the target commit
closest_previous_tag = None
closest_distance = float('inf')
for tag in repo.tags:
tag_commit = repo.commit(tag.commit)
distance = len(list(repo.iter_commits(rev=f'{tag_commit}..{current_commit}')))
if distance < closest_distance:
closest_distance = distance
closest_previous_tag = tag
# Get the current commit SHA
current_commit = repo.head.object.hexsha
# Check if the current commit is the latest commit
is_latest_commit = current_commit == repo.heads[repo.active_branch.name].commit.hexsha
# Get the repository URL
repo_url = repo.remotes.origin.url
# Check if there is a Zenodo DOI attached to the repository (assuming it's in the README file)
zenodo_doi = None
readme_path = os.path.join(repo_path, 'README.md')
if os.path.exists(readme_path):
with open(readme_path, 'r', encoding='utf-8') as readme_file:
readme_content = readme_file.read()
match = re.search(r'zenodo\.org/record/(\d+)', readme_content)
if match:
zenodo_doi = match.group(1)
# Store the information in a dictionary
if verbose:
output_dict = {
"tag":{
"version": str(closest_previous_tag),
"latest": latest_tag == closest_previous_tag,
},
"commit":{
"SHA": current_commit,
"latest": is_latest_commit,
},
"Repository URL": repo_url,
# "DOI": zenodo_doi
}
else:
output_dict = {
"tag":str(closest_previous_tag),
"commit":current_commit,
"Repository URL": repo_url,
# "DOI": zenodo_doi
}
# license
license_files = glob.glob(os.path.join(repo_path, '*LICENSE*'))
for license_file in license_files:
try:
if license_file == 'LICENSE':
with open(license_file, 'r', encoding='utf-8') as file:
lines = file.readlines()
if len(lines) >= 3:
license_info = lines[2].strip()
if license_info:
output_dict['license'] = license_info
break # Stop searching if a valid license is found
else:
output_dict['license'] = license_file.split('/')[-1]
except FileNotFoundError:
print(f'License file {license_file} not found.')
if 'license' not in output_dict:
print('No valid license information found in the specified files.')
return output_dict
Functions
def clear_last(repo_owner: str, repo_name: str) ‑> Dict[str, str]-
Expand source code
def clear_last(repo_owner: str, repo_name: str) -> Dict[str, str]: today_date = datetime.today().strftime("%Y%m%d") envname = f'cvtool.{repo_owner}.{repo_name}.{today_date}' rm_temp(envname) def get_commit_hashes(owner: str, repo: str, latest: bool = True) ‑> str-
Get the latest or previous commit hash from GitHub API.
Args
owner:str- Owner of the repository.
repo:str- Repository name.
latest:bool- True to get the latest commit hash, False to get the previous commit hash.
Returns
str- Commit hash or None if failed to retrieve.
Expand source code
def get_commit_hashes(owner: str, repo: str, latest: bool = True) -> str: """ Get the latest or previous commit hash from GitHub API. Args: owner (str): Owner of the repository. repo (str): Repository name. latest (bool): True to get the latest commit hash, False to get the previous commit hash. Returns: str: Commit hash or None if failed to retrieve. """ if 'cmor_github_hashes' in os.environ: hashes = os.environ['cmor_github_hashes'].split('~') return hashes[1 if latest else 0] else: try: api_url = f"https://api.github.com/repos/{owner}/{repo}/commits" response = requests.get(api_url) response.raise_for_status() json_data = response.json() latest_commit = json_data[0]["sha"] previous_commit = json_data[1]["sha"] os.environ['cmor_github_hashes'] = f"{previous_commit}~{latest_commit}" return latest_commit if latest else previous_commit except (requests.RequestException, ValueError, IndexError) as e: print(f"Error: {str(e)}") return None def get_github_version(repo_owner: str = '', repo_name: str = '') ‑> str-
Get the latest release version from GitHub API.
Args
repo_owner:str- Owner of the repository.
repo_name:str- Repository name.
Returns
str- Latest release version or '-0.0.0' if not available.
Expand source code
def get_github_version(repo_owner: str = '', repo_name: str = '') -> str: """ Get the latest release version from GitHub API. Args: repo_owner (str): Owner of the repository. repo_name (str): Repository name. Returns: str: Latest release version or '-0.0.0' if not available. """ if not repo_owner and not repo_name: if 'cmor_github_owner' in os.environ and 'cmor_github_repo' in os.environ: owner = os.environ['cmor_github_owner'] repo = os.environ['cmor_github_repo'] else: command = ["git", "config", "--get", "remote.origin.url"] process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True) output, _ = process.communicate() if process.returncode == 0: remote_url = output.strip() owner, repo = remote_url.split("/")[-2:] os.environ['cmor_github_owner'] = owner os.environ['cmor_github_repo'] = repo else: raise RuntimeError("Failed to get repository information.") try: today_date = datetime.today().strftime("%Y%m%d") envname = f'cvtool.{repo_owner}.{repo_name}.{today_date}' json_data = read_temp(envname) if not json_data or "tag_name" not in json_data: api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/releases/latest" response = requests.get(api_url) response.raise_for_status() # GitAPIError(f"Failed to fetch commit information from the GitHub API: {api_url}") json_data = response.json() write_temp(envname, json_data) version = json_data.get("tag_name") return version if version else "-0.0.0" except (requests.RequestException, ValueError, IndexError) as e: print(f"Error: {str(e)}") return "-0.0.0" def git_user() ‑> Dict[str, str]-
Retrieve the Git username and email.
Returns
dict- Dictionary containing 'user' and 'email' keys with corresponding values.
Expand source code
def git_user() -> Dict[str, str]: """ Retrieve the Git username and email. Returns: dict: Dictionary containing 'user' and 'email' keys with corresponding values. """ try: return get_user() except subprocess.CalledProcessError: return get_user(shell=True) def last_commit(repo_owner: str, repo_name: str) ‑> Dict[str, str]-
Retrieve information about the latest commit of a GitHub repository.
Args
repo_owner:str- Owner of the repository.
repo_name:str- Repository name.
Returns
dict- Dictionary containing details of the latest commit.
Expand source code
def last_commit(repo_owner: str, repo_name: str) -> Dict[str, str]: """ Retrieve information about the latest commit of a GitHub repository. Args: repo_owner (str): Owner of the repository. repo_name (str): Repository name. Returns: dict: Dictionary containing details of the latest commit. """ api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/commits" today_date = datetime.today().strftime("%Y%m%d") envname = f'cvtool.{repo_owner}.{repo_name}.{today_date}' commit_info = read_temp(envname) if commit_info: print(commit_info.get('api_url') , api_url) if commit_info.get('api_url') == api_url: print('Loading saved repo metadata from git.') return commit_info else: response = requests.get(api_url) if response.status_code == 200: commits_data = response.json() if commits_data: latest_commit = commits_data[0] commit_info = { "api_url": api_url, "SHA": latest_commit["sha"], "Message": latest_commit["commit"]["message"], "Author": f"{latest_commit['commit']['author']['name']} <{latest_commit['commit']['author']['email']}>", "Committer": f"{latest_commit['commit']['committer']['name']} <{latest_commit['commit']['committer']['email']}>", "Date": latest_commit["commit"]["author"]["date"] } write_temp(envname, commit_info) return commit_info else: raise GitAPIError(f"No commits found in the repository: {api_url}") else: raise GitAPIError(f"Failed to fetch commit information from the GitHub API: {api_url}") def query_repo(repo_path, verbose=True)-
Expand source code
def query_repo(repo_path,verbose = True): exists(repo_path) repo = Repo(repo_path) # Get the current commit SHA current_commit = repo.head.object.hexsha # Get the latest tag version tags = repo.tags latest_tag = tags[-1] # Find the closest tag released prior to the target commit closest_previous_tag = None closest_distance = float('inf') for tag in repo.tags: tag_commit = repo.commit(tag.commit) distance = len(list(repo.iter_commits(rev=f'{tag_commit}..{current_commit}'))) if distance < closest_distance: closest_distance = distance closest_previous_tag = tag # Get the current commit SHA current_commit = repo.head.object.hexsha # Check if the current commit is the latest commit is_latest_commit = current_commit == repo.heads[repo.active_branch.name].commit.hexsha # Get the repository URL repo_url = repo.remotes.origin.url # Check if there is a Zenodo DOI attached to the repository (assuming it's in the README file) zenodo_doi = None readme_path = os.path.join(repo_path, 'README.md') if os.path.exists(readme_path): with open(readme_path, 'r', encoding='utf-8') as readme_file: readme_content = readme_file.read() match = re.search(r'zenodo\.org/record/(\d+)', readme_content) if match: zenodo_doi = match.group(1) # Store the information in a dictionary if verbose: output_dict = { "tag":{ "version": str(closest_previous_tag), "latest": latest_tag == closest_previous_tag, }, "commit":{ "SHA": current_commit, "latest": is_latest_commit, }, "Repository URL": repo_url, # "DOI": zenodo_doi } else: output_dict = { "tag":str(closest_previous_tag), "commit":current_commit, "Repository URL": repo_url, # "DOI": zenodo_doi } # license license_files = glob.glob(os.path.join(repo_path, '*LICENSE*')) for license_file in license_files: try: if license_file == 'LICENSE': with open(license_file, 'r', encoding='utf-8') as file: lines = file.readlines() if len(lines) >= 3: license_info = lines[2].strip() if license_info: output_dict['license'] = license_info break # Stop searching if a valid license is found else: output_dict['license'] = license_file.split('/')[-1] except FileNotFoundError: print(f'License file {license_file} not found.') if 'license' not in output_dict: print('No valid license information found in the specified files.') return output_dict def repo_commits(repo)-
Print information about commits in a Git repository.
Args
repo- Git repository object.
Expand source code
def repo_commits(repo): """ Print information about commits in a Git repository. Args: repo: Git repository object. """ for commit in repo.iter_commits(): print("Commit SHA:", commit.hexsha) print("Author:", commit.author.name, "<" + commit.author.email + ">") print("Committer:", commit.committer.name, "<" + commit.committer.email + ">") print("Date:", commit.authored_datetime) print("Message:", commit.message) print("-" * 50)