Skip to content

Commit

Permalink
Added new parameter to batching.batch_get_keys
Browse files Browse the repository at this point in the history
  • Loading branch information
sgeulette committed Nov 27, 2024
1 parent ce405e7 commit 6ad7fd6
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 21 deletions.
4 changes: 2 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ Changelog
1.1.1 (unreleased)
------------------

- Nothing changed yet.

- Added new parameter to `batching.batch_get_keys`.
[sgeulette]

1.1.0 (2024-10-02)
------------------
Expand Down
31 changes: 17 additions & 14 deletions src/imio/helpers/batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
Idea: a batch number, a commit number and a loop number are considered
1) we get a stored dictionary containing the treated keys (using load_pickle function)
2) if the key is already in the dictionary, we skip it (continue)
2) if the key is already in the dictionary, we skip it (continue). Otherwise, we increase the loop count
3) if the treated items number is >= batch number, we exit the for loop, do a commit and dump the dictionary
4) otherwise, we store the corresponding key in the dictionary and increase the loop number
4) otherwise, we store the corresponding key in the dictionary
5) when the current loop number is a multiple of the commit number, we do a commit and dump the dictionary
6) when the for loop is globally finished, we do a commit and dump the dictionary
7) when all the items are treated, we can delete the dictionary file
Expand All @@ -32,40 +32,43 @@


# 1) we get a stored dictionary containing the treated keys (using load_pickle function)
def batch_get_keys(infile, loop_length=0, a_set=None):
def batch_get_keys(infile, loop_length=0, a_set=None, add_files=None):
"""Returns the stored batched keys from the file.
Must be used like this, before the loop:
batch_keys, config = batch_get_keys(infile, batch_number, commit_number)
:param infile: file name where the set is stored
:param loop_length: the loop length number
:param a_set: a given data structure to get the stored keys
:param add_files: a list of additional files to consider when deleting files
:return: 2 parameters: 1) a_set fulled with pickled data,
2) a config dict {'bn': batch_number, 'bl': batch_last, 'cn': commit_number, 'll': loop_length, 'lc': loop_count,
'pf': infile, 'cf': config_file, 'kc': keys_count, 'lk': last_key, 'ldk': last_dump_key,
'fr'; first_run_bool}
'fr'; first_run_bool, 'af': add_files}
"""
infile = os.path.abspath(infile)
commit_number = int(os.getenv('COMMIT', '0'))
batch_number = int(os.getenv('BATCH', '0'))
batch_last = bool(int(os.getenv('BATCH_LAST', '0')))
if not add_files:
add_files = []
if not batch_number:
return None, {'bn': batch_number, 'bl': batch_last, 'cn': commit_number, 'll': loop_length, 'lc': 0,
'pf': infile, 'cf': None, 'kc': 0, 'fr': False}
'pf': infile, 'cf': None, 'kc': 0, 'fr': False, 'af': add_files}
if not infile.endswith('.pkl'):
raise Exception("The giver file '{}' must be a pickle file ending with '.pkl'".format(infile))
raise Exception("The given file '{}' must be a pickle file ending with '.pkl'".format(infile))
if a_set is None:
a_set = set()
load_pickle(infile, a_set)
dic_file = infile.replace('.pkl', '_config.txt')
first_run = not os.path.exists(dic_file)
config = {'bn': batch_number, 'bl': batch_last, 'cn': commit_number, 'll': loop_length, 'lc': 0, 'pf': infile,
'cf': dic_file, 'kc': len(a_set), 'fr': first_run}
'cf': dic_file, 'kc': len(a_set), 'fr': first_run, 'af': add_files}
dump_var(dic_file, config)
return a_set, config


# 2) if the key is already in the dictionary, we skip it (continue)
# 2) if the key is already in the dictionary, we skip it (continue). Otherwise, we increase the loop count
def batch_skip_key(key, batch_keys, config):
"""Returns True if the key is already in the batch_keys.
Must be used like this, at the beginning of the loop:
Expand All @@ -76,7 +79,7 @@ def batch_skip_key(key, batch_keys, config):
:param batch_keys: the treated keys set
:param config: a config dict {'bn': batch_number, 'bl': batch_last, 'cn': commit_number, 'll': loop_length,
'lc': loop_count, 'pf': infile, 'cf': config_file, 'kc': keys_count, 'lk': last_key,
'ldk': last_dump_key, 'fr'; first_run_bool}
'ldk': last_dump_key, 'fr'; first_run_bool, 'af': add_files}
:return: True if a "continue" must be done. False otherwise.
"""
if batch_keys is None:
Expand All @@ -88,7 +91,7 @@ def batch_skip_key(key, batch_keys, config):


# 3) if the treated items number is higher than the batch number, we exit the loop, do a commit and dump the dictionary
# 4) otherwise, we store the corresponding key in the dictionary and increase the loop number
# 4) otherwise, we store the corresponding key in the dictionary
# 5) when the current loop number is a multiple of the commit number, we do a commit and dump the dictionary
def batch_handle_key(key, batch_keys, config):
"""Returns True if the loop must be exited.
Expand All @@ -100,7 +103,7 @@ def batch_handle_key(key, batch_keys, config):
:param batch_keys: the treated keys set
:param config: a config dict {'bn': batch_number, 'bl': batch_last, 'cn': commit_number, 'll': loop_length,
'lc': loop_count, 'pf': infile, 'cf': config_file, 'kc': keys_count, 'lk': last_key,
'ldk': last_dump_key, 'fr'; first_run_bool}
'ldk': last_dump_key, 'fr'; first_run_bool, 'af': add_files}
:return: True if the loop must be exited. False otherwise.
"""
if batch_keys is None:
Expand Down Expand Up @@ -138,7 +141,7 @@ def batch_loop_else(batch_keys, config):
:param batch_keys: the treated keys set
:param config: a config dict {'bn': batch_number, 'bl': batch_last, 'cn': commit_number, 'll': loop_length,
'lc': loop_count, 'pf': infile, 'cf': config_file, 'kc': keys_count, 'lk': last_key,
'ldk': last_dump_key, 'fr'; first_run_bool}
'ldk': last_dump_key, 'fr'; first_run_bool, 'af': add_files}
"""
if batch_keys is None:
return
Expand Down Expand Up @@ -166,7 +169,7 @@ def batch_globally_finished(batch_keys, config):
:param batch_keys: the treated keys set
:param config: a config dict {'bn': batch_number, 'bl': batch_last, 'cn': commit_number, 'll': loop_length,
'lc': loop_count, 'pf': infile, 'cf': config_file, 'kc': keys_count, 'lk': last_key,
'ldk': last_dump_key, 'fr'; first_run_bool}
'ldk': last_dump_key, 'fr'; first_run_bool, 'af': add_files}
:return: True if the loop is globally finished. False otherwise.
"""
# if not batch_keys:
Expand Down Expand Up @@ -202,7 +205,7 @@ def can_delete_batch_files(batch_keys, config):
:param batch_keys: the treated keys set
:param config: a config dict {'bn': batch_number, 'bl': batch_last, 'cn': commit_number, 'll': loop_length,
'lc': loop_count, 'pf': infile, 'cf': config_file, 'kc': keys_count, 'lk': last_key,
'ldk': last_dump_key, 'fr'; first_run_bool}
'ldk': last_dump_key, 'fr'; first_run_bool, 'af': add_files}
:return: boolean
"""
if config["fr"] and os.getenv("IU_RUN1", "0") == "1": # if first run by imio.updates, the config file is needed.
Expand Down
16 changes: 11 additions & 5 deletions src/imio/helpers/tests/test_batching.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# -*- coding: utf-8 -*-
from imio.helpers.batching import batch_delete_files
from imio.helpers.batching import batch_get_keys
from imio.helpers.batching import batch_handle_key
from imio.helpers.batching import batch_hashed_filename
from imio.helpers.batching import batch_loop_else
from imio.helpers.batching import batch_skip_key
from imio.helpers.batching import can_delete_batch_files
from imio.pyutils.batching import batch_delete_files

import logging
import os
Expand All @@ -18,12 +18,12 @@
processed = {'keys': [], 'commits': 0, 'errors': 0}


def loop_process(loop_len, batch_number, commit_number, a_set, last=False):
def loop_process(loop_len, batch_number, commit_number, a_set, last=False, add_files=()):
"""Process the loop using the batching module."""
os.environ['BATCH'] = str(batch_number)
os.environ['COMMIT'] = str(commit_number)
os.environ['BATCH_LAST'] = str(int(last))
batch_keys, config = batch_get_keys(KEYS_PKL_FILE, loop_len, a_set=a_set)
batch_keys, config = batch_get_keys(KEYS_PKL_FILE, loop_len, a_set=a_set, add_files=add_files)
for key in range(1, loop_len + 1):
if batch_skip_key(key, batch_keys, config):
continue
Expand Down Expand Up @@ -80,6 +80,7 @@ def test_batching(self):
self.assertEqual(conf['kc'], 0)
self.assertEqual(conf['lc'], 0)
self.assertEqual(conf.get('lk'), None)
self.assertListEqual(conf["af"], [])
self.assertIsNone(keys)
self.assertFalse(os.path.exists(KEYS_PKL_FILE))
self.assertFalse(os.path.exists(CONFIG_FILE))
Expand Down Expand Up @@ -108,7 +109,9 @@ def test_batching(self):
self.assertFalse(os.path.exists(CONFIG_FILE))
# batching: 2 passes with commit each item
reset_processed()
keys, conf = loop_process(5, 3, 1, a_set)
with open("various.pkl", 'w'):
pass
keys, conf = loop_process(5, 3, 1, a_set, add_files=["various.pkl"])
self.assertEqual(processed['keys'], [1, 2, 3])
self.assertEqual(processed['commits'], 3)
self.assertSetEqual(a_set, {1, 2, 3})
Expand All @@ -118,7 +121,9 @@ def test_batching(self):
self.assertSetEqual(keys, a_set)
self.assertTrue(os.path.exists(KEYS_PKL_FILE))
self.assertTrue(os.path.exists(CONFIG_FILE))
keys, conf = loop_process(5, 3, 1, a_set, last=True)
self.assertEqual(len(conf["af"]), 1)
self.assertTrue(os.path.exists("various.pkl"))
keys, conf = loop_process(5, 3, 1, a_set, last=True, add_files=["various.pkl"])
self.assertEqual(processed['keys'], [1, 2, 3, 4, 5])
self.assertEqual(processed['commits'], 5)
self.assertSetEqual(a_set, {1, 2, 3, 4, 5})
Expand All @@ -128,6 +133,7 @@ def test_batching(self):
self.assertSetEqual(keys, a_set)
self.assertFalse(os.path.exists(KEYS_PKL_FILE))
self.assertFalse(os.path.exists(CONFIG_FILE))
self.assertFalse(os.path.exists("various.pkl"))
# batching: 2 passes with commit each 3 items
reset_processed()
a_set = set()
Expand Down

0 comments on commit 6ad7fd6

Please sign in to comment.