generated from ModelAtlasofTheEarth/mate-model-template
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit ab851bf
Showing
26 changed files
with
933 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
--- | ||
name: Publish model | ||
description: Update model with doi and publish | ||
title: "Publish model" | ||
labels: ["model published"] | ||
|
||
body: | ||
|
||
- type: input | ||
id: doi | ||
attributes: | ||
label: -> doi | ||
placeholder: "https://doi.org/10.47366/sabia.v5n1a3" | ||
description: "Provide the doi of your published model" | ||
validations: | ||
required: true |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
import os | ||
from github import Github, Auth | ||
|
||
# Environment variables | ||
token = os.environ.get("GITHUB_TOKEN") | ||
repo_name = os.environ.get("REPO_NAME") | ||
|
||
# Get repo | ||
auth = Auth.Token(token) | ||
g = Github(auth=auth) | ||
repo = g.get_repo(repo_name) | ||
|
||
# Find if any of the issues has the published label | ||
published = False | ||
|
||
for issue in repo.get_issues(): | ||
for label in issue.labels: | ||
if 'published' in label.name: | ||
published = True | ||
|
||
print(published) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import os | ||
import base64 | ||
from github import Github, Auth | ||
|
||
# Environment variables | ||
token = os.environ.get("GITHUB_TOKEN") | ||
source_repo_owner = os.environ.get("SOURCE_REPO_OWNER") | ||
source_repo_name = os.environ.get("SOURCE_REPO_NAME") | ||
source_path = os.environ.get("SOURCE_PATH") | ||
target_repo_owner = os.environ.get("TARGET_REPO_OWNER") | ||
target_repo_name = os.environ.get("TARGET_REPO_NAME") | ||
target_branch_name = os.environ.get("TARGET_REPO_BRANCH") | ||
target_path = os.environ.get("TARGET_PATH") | ||
|
||
auth = Auth.Token(token) | ||
g = Github(auth=auth) | ||
source_repo = g.get_repo(f"{source_repo_owner}/{source_repo_name}") | ||
target_repo = g.get_repo(f"{target_repo_owner}/{target_repo_name}") | ||
|
||
def copy_files(contents, target_path): | ||
for content in contents: | ||
if content.type == "dir": | ||
# Get the contents of the directory and copy recursively | ||
copy_files(source_repo.get_contents(content.path), f"{target_path}/{content.name}") | ||
else: | ||
# Check if the file already exists in the target repo | ||
try: | ||
target_file = target_repo.get_contents(f"{target_path}/{content.name}", ref=target_branch_name) | ||
# File exists, compare contents | ||
if content.sha != target_file.sha: | ||
# Contents differ, update the file | ||
source_file_content = base64.b64decode(source_repo.get_git_blob(content.sha).content) | ||
target_repo.update_file(f"{target_path}/{content.name}",f"Updating {content.name}", source_file_content, target_file.sha, branch=target_branch_name) | ||
except: | ||
# Copy file to target repository | ||
source_file_content = base64.b64decode(source_repo.get_git_blob(content.sha).content) | ||
target_repo.create_file(f"{target_path}/{content.name}", f"Copying {content.name}", source_file_content, branch=target_branch_name) | ||
|
||
# Get contents of source directory | ||
source_contents = source_repo.get_contents(source_path) | ||
|
||
# Start copying files | ||
copy_files(source_contents, target_path) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import os | ||
from github import Github, Auth | ||
|
||
# Environment variables | ||
token = os.environ.get("GITHUB_TOKEN") | ||
repo_owner = os.environ.get("REPO_OWNER") | ||
repo_name = os.environ.get("REPO_NAME") | ||
branch_name = os.environ.get("BRANCH_NAME") | ||
|
||
auth = Auth.Token(token) | ||
g = Github(auth=auth) | ||
repo = g.get_repo(f"{repo_owner}/{repo_name}") | ||
|
||
# Check if the branch name already exists | ||
try: | ||
assert repo.get_git_ref(f"heads/{branch_name}").ref is not None | ||
print("Branch already exists") | ||
|
||
# Create new branch if it doesn't | ||
except: | ||
base_ref = repo.get_git_ref(f"heads/{repo.default_branch}") | ||
|
||
repo.create_git_ref(f"refs/heads/{branch_name}", base_ref.object.sha) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
import json | ||
from ruamel.yaml import YAML | ||
import csv | ||
import os | ||
from io import StringIO | ||
|
||
def create_or_update_json_entry(rocrate, keys_path, new_value): | ||
""" | ||
Create or update a nested JSON entry in a ro-crate structure. | ||
Args: | ||
rocrate (dict): The main ro-crate dictionary. | ||
keys_path (str): Dot-separated path to the key that needs updating. | ||
new_value (any): New value to be inserted or updated. | ||
""" | ||
# Split the keys path into individual components | ||
keys = keys_path.split('.') | ||
prefix = "" | ||
structure = rocrate | ||
|
||
# Traverse through the nested structure using keys except the last one | ||
for key in keys[:-1]: | ||
key = prefix + key | ||
|
||
# Handle potential './' prefix logic | ||
if key == "": | ||
prefix = "." | ||
continue | ||
else: | ||
prefix = "" | ||
|
||
if isinstance(structure, list): | ||
# Find the item with matching '@id' key | ||
for item in structure: | ||
if item.get("@id") == key: | ||
structure = item | ||
break | ||
else: | ||
print(f"Key '{key}' not found.") | ||
return | ||
elif key in structure: | ||
structure = structure[key] | ||
else: | ||
print(f"Key '{key}' not found.") | ||
return | ||
|
||
# The final key where the new value should be placed | ||
last_key = keys[-1] | ||
|
||
# Update the value at the final key | ||
if last_key in structure: | ||
if isinstance(structure[last_key], list): | ||
# Prepend only if the new value is not already in the list | ||
if new_value not in structure[last_key]: | ||
structure[last_key].insert(0, new_value) | ||
else: | ||
# Convert existing non-list value to a list if needed | ||
structure[last_key] = [new_value, structure[last_key]] | ||
else: | ||
# If the key doesn't exist, create a new list with the new value | ||
structure[last_key] = [new_value] | ||
|
||
|
||
def navigate_and_assign(source, path, value): | ||
"""Navigate through a nested dictionary and assign a value to the specified path.""" | ||
keys = path.split('.') | ||
for i, key in enumerate(keys[:-1]): | ||
if key.isdigit(): # If the key is a digit, it's an index for a list | ||
key = int(key) | ||
while len(source) <= key: # Extend the list if necessary | ||
source.append({}) | ||
source = source[key] | ||
else: | ||
if i < len(keys) - 2 and keys[i + 1].isdigit(): # Next key is a digit, so ensure this key leads to a list | ||
source = source.setdefault(key, []) | ||
else: # Otherwise, it leads to a dictionary | ||
source = source.setdefault(key, {}) | ||
# Assign the value to the final key | ||
if keys[-1].isdigit(): # If the final key is a digit, it's an index for a list | ||
key = int(keys[-1]) | ||
while len(source) <= key: # Extend the list if necessary | ||
source.append(None) | ||
source[key] = value | ||
else: | ||
source[keys[-1]] = value | ||
|
||
|
||
def read_yaml_with_header(file_path): | ||
""" | ||
Read YAML content inside YAML header delimiters '---' | ||
""" | ||
|
||
with open(file_path,'r') as file: | ||
data = file.read() | ||
|
||
yaml = YAML() | ||
yaml_content = yaml.load(data.strip('---\n')) | ||
|
||
return yaml_content | ||
|
||
def update_csv_content(file_path, field, value): | ||
# Read the CSV file and update the field value | ||
updated_rows = [] | ||
field_exists = False | ||
with open(file_path, mode='r', newline='') as file: | ||
reader = csv.reader(file) | ||
for row in reader: | ||
if row and row[0] == field: | ||
row[1] = value | ||
field_exists = True | ||
updated_rows.append(row) | ||
|
||
# If the field does not exist, add a new line | ||
if not field_exists: | ||
updated_rows.append([field, value]) | ||
|
||
# Convert the updated rows back into a CSV-formatted string | ||
updated_csv_content = StringIO() | ||
writer = csv.writer(updated_csv_content) | ||
writer.writerows(updated_rows) | ||
updated_csv_string = updated_csv_content.getvalue() | ||
|
||
return updated_csv_string |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import os | ||
import json | ||
import requests | ||
from github import Github, Auth | ||
|
||
# Environment variables | ||
token = os.environ.get("TOKEN") | ||
repo_name = os.environ.get("REPO") | ||
org = os.environ.get("ORG") | ||
|
||
repos = [] | ||
|
||
# Get org | ||
auth = Auth.Token(token) | ||
g = Github(auth=auth) | ||
org = g.get_organization(org) | ||
|
||
# Find repos created from this template | ||
for repo in org.get_repos(): | ||
repo_json = requests.get(repo.url).json() | ||
if 'template_repository' in repo_json: | ||
if repo_json['template_repository']['name'] == repo_name: | ||
repos.append(repo.name) | ||
|
||
print(json.dumps(repos)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
import re | ||
import yaml | ||
|
||
|
||
def extract_doi_parts(doi_string): | ||
# Regular expression to match a DOI within a string or URL | ||
# It looks for a string starting with '10.' followed by any non-whitespace characters | ||
# and optionally includes common URL prefixes | ||
# the DOI | ||
doi_pattern = re.compile(r'(10\.[0-9]+/[^ \s]+)') | ||
|
||
# Search for DOI pattern in the input string | ||
match = doi_pattern.search(doi_string) | ||
|
||
# If a DOI is found in the string | ||
if match: | ||
# Extract the DOI | ||
doi = match.group(1) | ||
|
||
# Clean up the DOI by removing any trailing characters that are not part of a standard DOI | ||
# This includes common punctuation and whitespace that might be accidentally included | ||
#doi = re.sub(r'[\s,.:;]+$', '', doi) | ||
doi = re.sub(r'[\s,.:;|\/\?:@&=+\$,]+$', '', doi) | ||
|
||
# Split the DOI into prefix and suffix at the first "/" | ||
#prefix, suffix = doi.split('/', 1) | ||
|
||
return doi | ||
else: | ||
# Return an error message if no DOI is found | ||
return "No valid DOI found in the input string." | ||
|
||
|
||
def format_citation(ro_crate): | ||
# Find the root entity (main dataset) | ||
root_entity = next((item for item in ro_crate['@graph'] if item['@id'] == './'), None) | ||
if not root_entity: | ||
return "Error: Root data entity not found." | ||
|
||
# Extract essential data: title, DOI, publication year | ||
title = root_entity.get('name', 'No title available') | ||
|
||
# Handle the case where 'identifier' might be an empty string or empty list | ||
identifier = root_entity.get('identifier') | ||
if isinstance(identifier, list): | ||
doi = identifier[0] if identifier and identifier[0] else 'No DOI available' | ||
elif isinstance(identifier, str) and identifier: | ||
doi = identifier | ||
else: | ||
doi = 'No DOI available' | ||
|
||
date_published = root_entity.get('datePublished', '')[:4] # Extract the first four characters, which represent the year | ||
|
||
# Extract publisher information, handling multiple publishers | ||
publisher_ids = root_entity.get('publisher', []) | ||
if not isinstance(publisher_ids, list): | ||
publisher_ids = [publisher_ids] | ||
publishers = [] | ||
for publisher_id in publisher_ids: | ||
publisher_entity = next((item for item in ro_crate['@graph'] if item['@id'] == publisher_id['@id']), None) | ||
if publisher_entity: | ||
publishers.append(publisher_entity.get('name', 'No publisher available')) | ||
publisher_names = ', '.join(publishers) if publishers else "No publisher available" | ||
|
||
# Extract and format author names | ||
authors = root_entity.get('creator', []) | ||
author_names = [] | ||
for author_id in authors: | ||
author_entity = next((item for item in ro_crate['@graph'] if item['@id'] == author_id['@id']), None) | ||
if author_entity: | ||
surname = author_entity.get('familyName', '') | ||
given_name_initial = author_entity.get('givenName', '')[0] if author_entity.get('givenName', '') else '' | ||
author_names.append(f"{surname}, {given_name_initial}.") | ||
|
||
# Join author names with commas, and use '&' before the last author if multiple | ||
if len(author_names) > 1: | ||
authors_formatted = ', '.join(author_names[:-1]) + f", & {author_names[-1]}" | ||
else: | ||
authors_formatted = ''.join(author_names) | ||
|
||
# Create formatted citation string | ||
citation = f"{authors_formatted} ({date_published}). {title} [Data set]. {publisher_names}. https://doi.org/{doi.split('/')[-1]}" | ||
return citation | ||
|
||
|
||
|
||
|
||
|
||
def ro_crate_to_cff(ro_crate): | ||
# Find the root entity | ||
root_entity = next((item for item in ro_crate['@graph'] if item['@id'] == './'), None) | ||
if not root_entity: | ||
return "Error: Root data entity not found." | ||
|
||
# Extract necessary fields | ||
title = root_entity.get('name', 'No title available') | ||
version = root_entity.get('version', '1.0') | ||
doi = root_entity.get('identifier', ['No DOI available'])[0] | ||
date_released = root_entity.get('datePublished', '').split('T')[0] | ||
url = root_entity.get('url', 'No URL provided') | ||
|
||
# Extract authors | ||
authors = root_entity.get('creator', []) | ||
author_list = [] | ||
for author_id in authors: | ||
author_entity = next((item for item in ro_crate['@graph'] if item['@id'] == author_id['@id']), None) | ||
if author_entity: | ||
author_list.append({ | ||
'family-names': author_entity.get('familyName', ''), | ||
'given-names': author_entity.get('givenName', ''), | ||
'orcid': author_id['@id'] | ||
}) | ||
|
||
# Construct the CFF object | ||
cff_dict = { | ||
'cff-version': '1.2.0', | ||
'message': 'If you use this model, please cite it as below.', | ||
'authors': author_list, | ||
'title': title, | ||
'version': version, | ||
'doi': doi, # Assuming DOI is a complete URL, extract just the number | ||
'date-released': date_released, | ||
'url': url, | ||
'type': 'dataset' | ||
} | ||
|
||
# Convert dict to YAML format | ||
cff_yaml = yaml.dump(cff_dict, sort_keys=False, default_flow_style=False) | ||
return cff_yaml |
Oops, something went wrong.