-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcda_repos.py
191 lines (157 loc) · 6.32 KB
/
cda_repos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Given the list of repositories provided by the user, we will fetch data from GitHub API for each.
import requests
from pydantic import BaseModel, ValidationError, HttpUrl
from typing import List, Optional
from datetime import datetime
import os
import subprocess
import json
import sys
import argparse
parser = argparse.ArgumentParser(description='Clone and deploy repositories based on runner or service.')
parser.add_argument('--runner', type=str, help='Specify the runner to deploy its services', choices=['frontend', 'backend', 'api'])
parser.add_argument('--service', type=str, help='Deploy a single service')
parser.add_argument('--dry-run', action='store_true', help='Run the script in dry-run mode without actual cloning')
args = parser.parse_args()
def get_repos_from_runner(runner_type):
# Add the directory to sys.path
runner_path = os.path.join(os.getcwd(), runner_type)
sys.path.insert(1, runner_path)
# Import the repos module from the given path
try:
repos_module = __import__('repos')
return [repo for repo_list in vars(repos_module).values() if isinstance(repo_list, list) for repo in repo_list]
except ModuleNotFoundError:
print(f"No 'repos.py' module found in the {runner_type} directory.")
return []
# repo_names = [
# 'cda.langchain-templates', 'cda.agents',
# 'cda.Juno', 'cda.actions',
# 'cda.data-lake', 'cda.ml-pipeline', 'cda.notebooks',
# 'cda.CMS_Automation_Pipeline', 'cda.ml-pipeline',
# 'cda.data', 'cda.databases', 'cda.s3',
# 'cda.docker', 'cda.kubernetes', 'cda.jenkins',
# 'cda.weaviate', 'cda.WeaviateApiFrontend', 'cda.webhooks',
# 'cda.Index-Videos',
# 'cda.dotfiles', 'cda.faas', 'cda.pull', 'cda.resumes', 'cda.snippets', 'cda.superagent', 'cda.ZoomVirtualOverlay', 'cda.knowledge-platform',
# 'cda.nginx'
# ]
class Owner(BaseModel):
name: str
id: int
type: str
class Repository(BaseModel):
id: int
node_id: str
name: str
full_name: str
owner: Owner
private: bool
html_url: HttpUrl
description: Optional[str]
fork: bool
url: HttpUrl
created_at: datetime
updated_at: datetime
pushed_at: datetime
git_url: str
ssh_url: str
clone_url: HttpUrl
size: int
stargazers_count: int
watchers_count: int
language: Optional[str]
has_issues: bool
has_projects: bool
has_downloads: bool
has_wiki: bool
has_pages: bool
forks_count: int
mirror_url: Optional[HttpUrl]
archived: bool
disabled: bool
open_issues_count: int
license: Optional[str]
allow_forking: bool
is_template: bool
topics: List[str]
visibility: str
class Project(BaseModel):
name: str
owner: Owner
repositories: Repository
class Submodule(BaseModel):
path: str
repository: Repository
commit_hash: Optional[str]
class GitHubAPI:
BASE_URL = 'https://api.github.com'
def __init__(self, access_token: str):
self.access_token = access_token
self.headers = {
'Authorization': f'token {self.access_token}',
'Accept': 'application/vnd.github.v3+json',
}
def get_repository(self, repo_name: str) -> Repository:
"""
Fetch repository data from GitHub and return a Repository object.
"""
url = f"{self.BASE_URL}/repos/{repo_name}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return Repository(**response.json())
else:
print(f"Failed to fetch repository {repo_name}: {response.status_code}")
return None
def clone_repository(self, repository: Repository):
"""
Clone the given repository to the current working directory.
"""
clone_url = repository.clone_url.replace('https://', f'https://{self.access_token}@')
repo_path = os.path.join(os.getcwd(), repository.name)
if not os.path.exists(repo_path):
subprocess.run(["git", "clone", clone_url], check=True)
print(f"Repository {repository.name} cloned into {repo_path}")
else:
print(f"Repository {repository.name} already exists in the current directory.")
def generate_build_structure_json(self, repositories: List[Repository]):
"""
Generate a JSON file of the build structure of the cloned repositories.
"""
build_structure = {
'repositories': [repo.dict() for repo in repositories]
}
with open('build_structure.json', 'w') as f:
json.dump(build_structure, f, indent=2)
print("Generated build_structure.json")
def save_repo_data_as_json(self, repo_names, filename):
repo_data_list = [
self.get_repository(repo_name).dict()
for repo_name in repo_names
if self.get_repository(repo_name)
]
with open(filename, 'w') as json_file:
json.dump(repo_data_list, json_file, indent=4)
print(f"Generated {filename} with repository data.")
if __name__ == '__main__':
github_api = GitHubAPI(access_token=os.environ.get('GH_TOKEN'))
if args.dry_run and args.runner:
print(f"Dry run activated. Fetching data for the runner: {args.runner}")
repo_names = get_repos_from_runner(args.runner)
github_api.save_repo_data_as_json(repo_names, f"{args.runner}_runner.json")
elif args.runner:
# Load repos from the specified runner's repos.py file
repo_names = get_repos_from_runner(args.runner)
for repo_name in repo_names:
repo_data = github_api.get_repository(repo_name)
if repo_data:
github_api.clone_repository(repo_data)
# ... additional logic to handle successful cloning ...
elif args.service:
# Deploy a single service
repo_data = github_api.get_repository(args.service)
if repo_data:
github_api.clone_repository(repo_data)
# ... additional logic to handle successful cloning ...
else:
print("Please specify either --runner or --service with an optional --dry-run flag.")