@@ -133,6 +133,7 @@ async def summarize_stream(
133
133
batch_index = 0
134
134
135
135
results_queue : asyncio .Queue [SummaryChunk ] = asyncio .Queue ()
136
+ lock = asyncio .Lock ()
136
137
137
138
skip_item_hashes_set = set (skip_item_hashes ) if skip_item_hashes else None
138
139
@@ -142,14 +143,17 @@ async def run_batch():
142
143
if limit != - 1 and remaining <= 0 :
143
144
return
144
145
145
- current_index = batch_index
146
- batch_index += 1
146
+ async with lock :
147
+ current_index = batch_index
148
+ batch_index += 1
147
149
148
- # calculate the number of items to summarize in the current batch
149
- requested_count = min (self ._batch_size , remaining ) if limit != - 1 else - 1
150
- # reduce the remaining count by the number of items in the current batch
151
- # so that the other tasks will not exceed the limit
152
- remaining -= requested_count
150
+ # calculate the number of items to summarize in the current batch
151
+ requested_count = (
152
+ min (self ._batch_size , remaining ) if limit != - 1 else - 1
153
+ )
154
+ # reduce the remaining count by the number of items in the current batch
155
+ # so that the other tasks will not exceed the limit
156
+ remaining -= requested_count
153
157
154
158
parsed_result = await self ._convert (
155
159
ancestor_selector = ancestor_selector ,
@@ -179,12 +183,13 @@ async def run_batch():
179
183
await ctx .send_debug_message (f"[{ self .name } ] No items summarized" )
180
184
return
181
185
182
- items_slice = items [:requested_count ] if limit != - 1 else items
183
- summarized_count = len (items_slice )
184
- count += summarized_count
185
- # correct the remaining count in case summary returned fewer items than requested
186
- if summarized_count < requested_count :
187
- remaining += requested_count - summarized_count
186
+ async with lock :
187
+ items_slice = items [:requested_count ] if limit != - 1 else items
188
+ summarized_count = len (items_slice )
189
+ count += summarized_count
190
+ # recalculate the remaining count in case summary returned fewer items than requested
191
+ if summarized_count < requested_count :
192
+ remaining += requested_count - summarized_count
188
193
189
194
await results_queue .put (
190
195
{
0 commit comments