-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcheck_child_counts.py
71 lines (63 loc) · 3.35 KB
/
check_child_counts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# This script checks how many children a parent object has in ArchivesSpace and if the number of children is equal to
# or greater than 1000, logs them in a .csv file
import re
import json
import csv
from secrets import *
from asnake.aspace import ASpace
from asnake.client import ASnakeClient
id_field_regex = re.compile(r"(^id_+\d)")
id_combined_regex = re.compile(r'[\W_]+', re.UNICODE)
aspace = ASpace(baseurl=as_api, username=as_un, password=as_pw)
client = ASnakeClient(baseurl=as_api, username=as_un, password=as_pw)
client.authorize()
def check_child_counts(tree_info, child_counts, root_uri, aspace_coll_id, client, repository, top_level=False):
if tree_info["child_count"] >= 1000 and tree_info["uri"] not in child_counts:
child_counts[f"{tree_info['uri']}"] = (tree_info["title"], tree_info["child_count"], tree_info["level"],
aspace_coll_id, repository)
print(aspace_coll_id)
if "precomputed_waypoints" in tree_info and tree_info["child_count"] != 0:
if top_level is True:
waypoint_key = ""
else:
waypoint_key = tree_info["uri"]
for waypoint_num, waypoint_info in tree_info["precomputed_waypoints"][waypoint_key].items():
for child in waypoint_info:
if child["child_count"] >= 1000:
child_counts[f'{child["uri"]}'] = (child["title"], child["child_count"], child["level"],
aspace_coll_id)
children = client.get(root_uri + "/tree/node", params={"node_uri": child["uri"],
"published_only": True}).json()
check_child_counts(children, child_counts, root_uri, aspace_coll_id, client, repository,
top_level=False)
return child_counts
child_counts = {}
repos = client.get("repositories").json()
for repo in repos:
print(repo["name"] + "\n")
repo_id = repo["uri"].split("/")[2]
resources = client.get("repositories/{}/resources".format(repo_id), params={"all_ids": True}).json()
for resource_id in resources:
resource = client.get("repositories/{}/resources/{}".format(repo_id, resource_id))
combined_id = ""
for field, value in resource.json().items():
id_match = id_field_regex.match(field)
if id_match:
combined_id += value + "-"
combined_id = combined_id[:-1]
combined_aspace_id_clean = id_combined_regex.sub('', combined_id)
if resource.json()["publish"] is True:
if resource.status_code == 200:
root_uri = f'/repositories/{repo_id}/resources/{resource_id}'
tree_info = client.get(f'{root_uri}/tree/root').json()
print(combined_id)
child_counts = check_child_counts(tree_info, child_counts, root_uri, combined_id, client, repo["name"],
top_level=True)
new_dict = json.dumps(child_counts)
load_dict = json.loads(new_dict)
with open("data/check_child_count.csv", "w", encoding="utf8", newline='') as file:
writer = csv.writer(file)
for child, data in load_dict.items():
fields = [str(child), str(data[0]), str(data[1]), str(data[2]), str(data[3], str(data[4]))]
writer.writerow(fields)
file.close()