Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add server side run estimates. #36

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions estimator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import json
import os
import threading
import warnings
from numpy.polynomial import polynomial as poly
from utility import rd_print

# A global lock used to preventing estimator data files from being read and written to a the same time.
estimator_file_lock = threading.Lock()

# Classes that extend Estimator assume that work is of a correlated type (i.e. RDEstimator has RDWork)
class Estimator:
def __init__(self):
pass
def update(self, work, time):
pass
def get_estimate(self, work):
return 0
def finish(self):
pass

class RDEstimator(Estimator):
def __init__(self, run):
super().__init__()
self.run = run
self.ref_scale = 0.0
self.data = None
input_filename = 'estimate_data/rd/{}/{}.json'.format(run.codec, run.set)
if os.path.isfile(input_filename):
with estimator_file_lock:
input_file = open(input_filename,'r',encoding='utf-8')
input_json = json.load(input_file)
input_file.close()
self.ref_scale = input_json['scale']
data = {}
for filename_to_quality in input_json['videos'].items():
qualities = []
ratios = []
for quality_to_ratio in filename_to_quality[1].items():
qualities.append(int(quality_to_ratio[0]))
ratios.append(quality_to_ratio[1])
with warnings.catch_warnings():
# ignore the RankWarning that polyfit emits
warnings.simplefilter('ignore')
data[filename_to_quality[0]] = poly.Polynomial(poly.polyfit(qualities, ratios, 5))
self.data = data
self.scale = 0.0
self.sample_sum = 0
self.sample_counter = 0
def update(self, work, time):
ratio = 1.0
if self.data:
ratio = self.data[work.filename](work.quality)
# time/ratio is the scale that would have been correct for the given work
# the scale is then weighted be the time it took
self.sample_sum += (time/ratio) * time
self.sample_counter += time
self.scale = self.sample_sum/self.sample_counter
def get_estimate(self, work):
ratio = 1.0
if self.data:
ratio = self.data[work.filename](work.quality)
if self.sample_counter == 0:
return ratio * self.ref_scale
return ratio * self.scale

# Extend RDEstimator since we still want an estimate
class RDDataCollector(RDEstimator):
def __init__(self, run, video_filenames):
super().__init__(run)
collected_data = {}
for filename in video_filenames:
collected_data[filename] = {}
self.collected_data = collected_data
self.longest_work = 0.0
def update(self, work, time):
super().update(work, time)
self.collected_data[work.filename][work.quality] = time
self.longest_work = max(self.longest_work, time)
def finish(self):
run = self.run
data_json = {}
data_json['scale'] = self.longest_work
videos_json = {}
for filename_to_quality in self.collected_data.items():
qualities_json = {}
for quality_to_time in filename_to_quality[1].items():
qualities_json[str(quality_to_time[0])] = quality_to_time[1] / self.longest_work
videos_json[filename_to_quality[0]] = qualities_json
data_json['videos'] = videos_json
output_filename = 'estimate_data/rd/{}/{}.json'.format(run.codec, run.set)
with estimator_file_lock:
try:
os.makedirs(os.path.dirname(output_filename), exist_ok=True)
try:
output_file = open(output_filename,'w',encoding='utf-8')
json.dump(data_json, output_file)
except Exception as e:
raise
finally:
output_file.close()
except Exception as e:
rd_print(run.log,'Failed to save estimator data on '+run.runid)
66 changes: 63 additions & 3 deletions rd_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
import tornado.ioloop
import tornado.web
import os
import codecs
import json
import argparse
import sshslot
import threading
import time
import awsremote
from queue import PriorityQueue
from work import *
from utility import *
from estimator import *

video_sets_f = codecs.open('sets.json','r',encoding='utf-8')
video_sets_f = open('sets.json','r',encoding='utf-8')
video_sets = json.load(video_sets_f)
video_sets_f.close()

machines = []
slots = []
Expand Down Expand Up @@ -81,7 +83,7 @@ def get(self):
pass
if 'qualities' in info:
if info['qualities'] != '':
run.quality = info['qualities'].split()
run.quality = list(map(int, info['qualities'].split()))
if 'extra_options' in info:
run.extra_options = info['extra_options']
if 'save_encode' in info:
Expand All @@ -91,6 +93,10 @@ def get(self):
run.write_status()
run_list.append(run)
video_filenames = video_sets[run.set]['sources']
if 'profile_set' in info and info['profile_set']:
run.estimator = RDDataCollector(run, video_filenames)
else:
run.estimator = RDEstimator(run)
run.work_items = create_rdwork(run, video_filenames)
work_list.extend(run.work_items)
if False:
Expand Down Expand Up @@ -118,10 +124,12 @@ def get(self):
class RunStatusHandler(tornado.web.RequestHandler):
def get(self):
self.set_header("Content-Type", "application/json")
current_time = time.perf_counter()
runs = []
for run in run_list:
run_json = {}
run_json['run_id'] = run.runid
run_json['eta'] = max(0, run.eta - current_time)
run_json['completed'] = 0
run_json['total'] = 0
run_json['info'] = run.info
Expand Down Expand Up @@ -240,18 +248,30 @@ def machine_allocator():
awsremote.stop_machines(args.awsgroup)
time.sleep(60)

class KeyedEntry:
def __init__(self, key, data):
self.key = key
self.data = data
def __lt__(self, other):
return self.key < other.key

def scheduler_tick():
global free_slots
global work_list
global run_list
global work_done
active_slots = []
update_simulation = False
max_retries = 5
# look for completed work
for slot in slots:
if slot.busy == False and slot.work != None:
if slot.work.failed == False:
slot.work.done = True
work_done.append(slot.work)
slot.work.run.completed += 1
slot.work.update_estimator()
update_simulation = True
rd_print(slot.work.log,slot.work.get_name(),'finished.')
elif slot.work.retries < max_retries:
slot.work.retries += 1
Expand All @@ -261,9 +281,49 @@ def scheduler_tick():
else:
slot.work.done = True
work_done.append(slot.work)
slot.work.run.completed += 1
rd_print(slot.work.log,slot.work.get_name(),'given up on.')
slot.work = None
free_slots.append(slot)
elif slot.work != None:
active_slots.append(slot)
# update the simulation to find etas for runs
if update_simulation:
sim_queue = PriorityQueue()
sim_completed = {}
for run in run_list:
sim_completed[id(run)] = run.completed
current_time = time.perf_counter()
# load in progress work into the queue
for slot in active_slots:
work = slot.work
remaining = max(0, work.estimate_time() - (current_time - work.start_time))
sim_queue.put(KeyedEntry(remaining, work))
sim_work_list = list(work_list)
# fill any free slots
for i in range(0, min(len(free_slots), len(sim_work_list))):
work = sim_work_list.pop(0)
sim_queue.put(KeyedEntry(work.estimate_time(), work))
# go through the simulation's main loop until there is no work to add to empty slots
while len(sim_work_list) != 0:
finished = sim_queue.get()
sim_time = finished.key
work = finished.data
run = work.run
sim_completed[id(run)] += 1
if sim_completed[id(run)] == len(run.work_items):
run.eta = current_time + sim_time
new_work = sim_work_list.pop(0)
sim_queue.put(KeyedEntry(sim_time + new_work.estimate_time(), new_work))
# go through the simulation's main loop until all the slots are empty
while not sim_queue.empty():
finished = sim_queue.get()
sim_time = finished.key
work = finished.data
run = work.run
sim_completed[id(run)] += 1
if sim_completed[id(run)] == len(run.work_items):
run.eta = current_time + sim_time
# fill empty slots with new work
if len(work_list) != 0:
if len(free_slots) != 0:
Expand Down
2 changes: 1 addition & 1 deletion rd_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
run = Run(args.codec)
run.runid = str(args.runid)
if args.qualities:
run.quality = args.qualities
run.quality = list(map(int, args.qualities))
run.set = args.set[0]
run.bindir = args.bindir
run.save_encode = args.save_encode
Expand Down
1 change: 1 addition & 0 deletions sshslot.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def gather(self):
return self.p.communicate()
def start_work(self, work):
self.work = work
work.record_start_time()
work_thread = threading.Thread(target=self.execute, args=(work,))
work_thread.daemon = True
self.busy = True
Expand Down
17 changes: 17 additions & 0 deletions work.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from utility import *
import subprocess
import sys
import time
from estimator import *

# Finding files such as `this_(that)` requires `'` be placed on both
# sides of the quote so the `()` are both captured. Files such as
Expand Down Expand Up @@ -41,6 +43,9 @@ def __init__(self, codec):
self.rundir = None
self.status = 'running'
self.work_items = []
self.estimator = Estimator()
self.completed = 0
self.eta = 0
def write_status(self):
f = open(self.rundir+'/status.txt','w')
f.write(self.status)
Expand All @@ -53,6 +58,7 @@ def cancel(self):
def finish(self):
if self.log:
self.log.close()
self.estimator.finish()

class RDRun(Run):
def reduce(self):
Expand Down Expand Up @@ -84,9 +90,18 @@ def __init__(self):
self.failed = False
self.runid = ''
self.slot = None
self.start_time = 0
self.run = None
def cancel(self):
self.failed = True
self.done = True
def record_start_time(self):
self.start_time = time.perf_counter()
def update_estimator(self):
elapsed = time.perf_counter() - self.start_time
self.run.estimator.update(self, elapsed)
def estimate_time(self):
return self.run.estimator.get_estimate(self)

class RDWork(Work):
def __init__(self):
Expand Down Expand Up @@ -212,6 +227,7 @@ def create_rdwork(run, video_filenames):
for filename in video_filenames:
for q in sorted(run.quality, reverse = True):
work = RDWork()
work.run = run
work.log = run.log
work.quality = q
work.runid = run.runid
Expand Down Expand Up @@ -274,6 +290,7 @@ def create_abwork(run, video_filenames):
for filename in video_filenames:
for bpp in bits_per_pixel:
work = ABWork()
work.run = run
work.log = run.log
work.bpp = bpp
work.codec = run.codec
Expand Down