-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdown.py
151 lines (118 loc) · 4.45 KB
/
down.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import re
import threading
import requests
import yaml
import pathlib
from functools import wraps
import time
from typing import List, Tuple, Optional, Callable
def parse_yaml(yaml_data: str) -> Tuple[List[str], List[str]]:
"""
解析YAML数据,提取GitHub仓库链接和直接下载链接。
:param yaml_data: YAML格式的字符串
:return: 一个包含GitHub仓库链接和直接下载链接的元组
"""
github_repos = []
direct_download_links = []
parsed_yaml = yaml.safe_load(yaml_data)
print(parsed_yaml)
for item in parsed_yaml['github']:
for name, link in item.items():
github_repos.append(link)
for item in parsed_yaml['direct_download_link']:
for name, link in item.items():
direct_download_links.append(link)
return github_repos, direct_download_links
import json
def extract_latest_release_urls(github_repos: List[str]) -> List[str]:
"""
提取GitHub仓库中最新发布版本的下载链接。
:param github_repos: GitHub仓库链接列表
:return: 最新发布版本的下载链接列表
"""
download_links = []
for repo in github_repos:
repo_owner, repo_name = repo.rstrip('/').split('/')[-2:]
api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/releases/latest"
headers = {
"Accept": "application/vnd.github+json",
}
response = request_ex(api_url, headers=headers)
if response is not None:
response_content = json.loads(response.text)
for asset in response_content.get("assets", []):
download_links.append(asset.get("browser_download_url", ""))
print(download_links)
return download_links
def download_file(file_name: str, url: str, output_dir: str) -> None:
"""
下载文件并保存到指定目录。
:param file_name: 保存文件的文件名
:param url: 下载链接
:param output_dir: 输出目录
:param retries: 重试次数
"""
print(url)
response = request_ex(url, timeout=10, stream=True)
if response is not None:
with open(os.path.join(output_dir, file_name), 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
else:
print(f"Failed to download {file_name} from {url}")
def retry_until_true(retries: int = 3, interval: int = 1) -> Callable:
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Optional[requests.Response]:
for _ in range(retries):
result = func(*args, **kwargs)
if result:
return result
time.sleep(interval)
return None
return wrapper
return decorator
@retry_until_true(retries=3, interval=1)
def request_ex(url: str, *args, **kwargs) -> Optional[requests.Response]:
"""
使用requests.get发起HTTP请求,并在请求失败时进行重试。
:param url: 请求的URL
:param args: requests.get的其他位置参数
:param kwargs: requests.get的其他关键字参数
:return: requests.Response对象,如果请求失败且重试次数已用完,则返回None
"""
try:
response = requests.get(url, *args, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.RequestException:
return None
def download_files(urls: List[str], output_dir: str) -> List[str]:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
threads = []
local_files = []
for url in urls:
file_name = os.path.basename(url)
local_files.append(os.path.join(output_dir, file_name))
thread = threading.Thread(target=download_file, args=(file_name, url, output_dir))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
return local_files
def main():
# read from ./repo.yaml
yaml_data = pathlib.Path(__file__).parent.joinpath("repo.yaml").read_text()
github_repos, direct_download_links = parse_yaml(yaml_data)
download_links = extract_latest_release_urls(github_repos)
all_links = download_links + direct_download_links
output_dir = "downloaded_files"
local_files = download_files(all_links, output_dir)
print("Downloaded files:")
for file in local_files:
print(file)
if __name__ == "__main__":
main()