-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
123 lines (95 loc) · 3.56 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import csv
import argparse
import sys
from task import Task, PERIODIC
from task_set import TaskSet
from RTOS import RTOS
from schedular import RM_MODE, DM_MODE, EDF_MODE
# setting command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--mode", help="Set scheduling mode")
parser.add_argument("-p", "--preemptive", help="Preemptive scheduling or not")
parser.add_argument("-f", "--file", help="File name of tasks")
parser.add_argument("-d", "--duration", help="Program duration", type=int)
class Main:
"""Main class of the OS."""
def __init__(self, duration=100):
self.task_set = TaskSet()
self.duration = duration
def run(self, file, mode=RM_MODE, preemptive=False):
"""Run the main function
Args:
file (string): file name
mode (int, optional): Set algorithm mode. Defaults to RM_MODE.
preemptive (bool, optional): Preemptive or not. Defaults to False.
"""
# create tasks and add them to task set
self.read_tasks_from_csv(filename=file)
# creating our rtos
rtos = RTOS(self.task_set, mode=mode, preemptive=preemptive)
# schedule tasks using selected algorithm
rtos.run(self.duration)
def read_tasks_from_csv(self, filename):
"""Read tasks from CSV
Args:
filename (string): task set file
"""
with open(filename, 'r') as csvfile:
taskreader = csv.reader(csvfile)
header = True
for row in taskreader:
priority, name, state, type, act_time, period, wcet, deadline = row
# don't read headers
if header:
header = False
continue
task = Task(
priority=int(priority),
name=name,
state=int(state),
type=int(type),
act_time=int(act_time),
period=int(period),
wcet=int(wcet),
deadline=int(deadline)
)
self.task_set.add_task(task)
if task.type == PERIODIC:
i = task.act_time + task.period
while i < self.duration:
tmp = Task(
priority=int(priority),
name=name,
state=int(state),
type=int(type),
act_time=i,
period=int(period),
wcet=int(wcet),
deadline=task.deadline
)
self.task_set.add_task(tmp)
i = tmp.act_time + tmp.period
if __name__ == '__main__':
# parsing command line arguments
args = parser.parse_args()
# select the mode
mode = 0
if args.mode == "RM":
mode = RM_MODE
elif args.mode == "DM":
mode = DM_MODE
elif args.mode == "EDF":
mode = EDF_MODE
else:
print("[error] Mode should be RM, DM, or EDF!")
sys.exit(-1)
pre = False
if args.preemptive == "True":
pre = True
try:
# start main
main = Main(duration=args.duration)
main.run(args.file, mode=mode, preemptive=pre)
except KeyboardInterrupt:
print("[info] Termiated!")
sys.exit(1)