-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAtomV0-fullsample code 3-4.txt
119 lines (81 loc) · 4.65 KB
/
AtomV0-fullsample code 3-4.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
3-4
# atomv0-fullsample: Task Corpus Management Module & # atomv0-fullsample: Evaluation Module
python
# atomv0-fullsample: Task Corpus Management Module
class TaskCorpusManager:
def __init__(self, task_lifecycle):
self.task_lifecycle = task_lifecycle
def filter_tasks(self, filter_func):
return list(filter(filter_func, self.task_lifecycle.tasks))
def sort_tasks(self, sort_key, reverse=False):
return sorted(self.task_lifecycle.tasks, key=sort_key, reverse=reverse)
def get_tasks_by_priority(self, priority):
return self.filter_tasks(lambda task: task.priority == priority)
def get_tasks_by_deadline(self, deadline):
return self.filter_tasks(lambda task: task.deadline == deadline)
def get_tasks_by_dependency(self, dependency):
return self.filter_tasks(lambda task: dependency in task.dependencies)
# Example usage:
# Instantiate a Task Corpus manager with the Task Lifecycle manager
task_corpus_manager = TaskCorpusManager(task_lifecycle)
# Add tasks to the Task Lifecycle manager
task3 = Task("Submit expense report", 1, "2023-04-25", [], "pending", 3)
task_lifecycle.add_task(task3)
# Retrieve tasks with a specific priority
priority_tasks = task_corpus_manager.get_tasks_by_priority(2)
print("Priority tasks:", priority_tasks)
# Retrieve tasks with a specific deadline
deadline_tasks = task_corpus_manager.get_tasks_by_deadline("2023-04-25")
print("Deadline tasks:", deadline_tasks)
# Retrieve tasks with a specific dependency
dependency_tasks = task_corpus_manager.get_tasks_by_dependency("Gather data")
print("Dependency tasks:", dependency_tasks)
# Sort tasks by priority
sorted_tasks_priority = task_corpus_manager.sort_tasks(lambda task: task.priority)
print("Sorted by priority:", sorted_tasks_priority)
# Sort tasks by deadline
sorted_tasks_deadline = task_corpus_manager.sort_tasks(lambda task: task.deadline)
print("Sorted by deadline:", sorted_tasks_deadline)
In this sample code, I've created a Task Corpus Management module for atomv0-fullsample. The TaskCorpusManager class provides methods for filtering and sorting tasks based on various criteria, such as priority, deadline, or dependencies. It also includes specific methods for retrieving tasks based on these criteria. The example usage demonstrates how to create a Task Corpus manager, add tasks, and filter, sort, and retrieve tasks based on different criteria.
# atomv0-fullsample: Evaluation Module
import time
from collections import defaultdict
class EvaluationModule:
def __init__(self):
self.metrics = defaultdict(lambda: {'execution_time': [], 'success': [], 'resources_used': []})
def update_metric(self, task_id, metric, value):
self.metrics[task_id][metric].append(value)
def get_metric(self, task_id, metric):
return self.metrics[task_id][metric]
def get_average_metric(self, task_id, metric):
values = self.get_metric(task_id, metric)
if not values:
return None
return sum(values) / len(values)
# Example usage:
# Instantiate the Evaluation module
evaluation_module = EvaluationModule()
# Simulate a task execution
task_id = "task1"
start_time = time.time()
# Task execution code here
time.sleep(2) # Simulate task execution
end_time = time.time()
execution_time = end_time - start_time
resources_used = 500 # Simulate resources used
success = True # Simulate task success
# Update the evaluation module with the task performance
evaluation_module.update_metric(task_id, 'execution_time', execution_time)
evaluation_module.update_metric(task_id, 'resources_used', resources_used)
evaluation_module.update_metric(task_id, 'success', success)
# Access the metrics for a task
execution_times = evaluation_module.get_metric(task_id, 'execution_time')
print("Execution times:", execution_times)
resources_used_list = evaluation_module.get_metric(task_id, 'resources_used')
print("Resources used:", resources_used_list)
success_list = evaluation_module.get_metric(task_id, 'success')
print("Success:", success_list)
# Calculate the average execution time for a task
average_execution_time = evaluation_module.get_average_metric(task_id, 'execution_time')
print("Average execution time:", average_execution_time)
In this sample code, I've implemented an Evaluation module for atomv0-fullsample. The EvaluationModule class provides methods to update and access performance metrics for tasks. The module tracks execution time, success rate, and resource usage for each task. In the example usage, a task's performance metrics are updated and accessed using the update_metric and get_metric methods. The average execution time for a task can be calculated using the get_average_metric method.