Skip to content

Commit

Permalink
Properly resize the temporary "pipe" array on queuing policy change!
Browse files Browse the repository at this point in the history
  • Loading branch information
wowczarek committed Jun 25, 2023
1 parent b5727bf commit 5e0ad2c
Showing 1 changed file with 30 additions and 9 deletions.
39 changes: 30 additions & 9 deletions utils.pas
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ TBytePump = class
WakeUpTimer: TTHreadedTimer;
procedure SetFillLevel(l: integer);
procedure SetDelay(d: integer);
procedure SetQueuePolicy(qp: TQueuePolicy);
{ the actual moving of data, but without pumping }
function FEnqueue(var src: array of byte; len: Integer): Integer;
function FDequeue(var dst: array of byte; len: Integer): Integer;
Expand All @@ -51,7 +52,7 @@ TBytePump = class

public

QueuePolicy: TQueuePolicy;
FQueuePolicy: TQueuePolicy;
OnReady: TNotifyEvent;
Name: String;
procedure Flush;
Expand All @@ -67,6 +68,7 @@ TBytePump = class
property Capacity: Integer read FCapacity;
property Count: Integer read FCount;
property Left: Integer read FLeft;
property QueuePolicy: TQueuePolicy read FQueuePolicy write SetQueuePolicy;
property FillLevel: Integer read FFillLevel write SetFillLevel;
property Delay: Integer read FDelay write SetDelay;
property Ready: Boolean read FReady;
Expand Down Expand Up @@ -124,7 +126,7 @@ constructor TBytePump.Create(Capacity: Integer);
FSink := Nil;
Waiting := False;
FReady := True;
QueuePolicy := QPNone;
FQueuePolicy := QPNone;
Timer := TThreadedTimer.Create(nil);
Timer.Interval := 1;
WakeUpTimer := TThreadedTimer.Create(nil);
Expand All @@ -138,9 +140,28 @@ constructor TBytePump.Create(Capacity: Integer);

destructor TBytePump.Destroy;
begin
Timer.Enabled := False;
WakeUpTimer.Enabled := False;
Unplumb;
end;

procedure TBytePump.SetQueuePolicy(qp: TQueuePolicy);
begin
if FQueuePolicy = qp then exit;

if qp = QPNone then
begin
SetLength(Pipe, FCapacity);
end else
begin
SetLength(Pipe, FFillLevel);
end;

FQueuePolicy := qp;

end;


procedure TBytePump.SetFillLevel(l: Integer);
begin
if l < 0 then l := FCapacity;
Expand Down Expand Up @@ -250,22 +271,22 @@ procedure TBytePump.Pump;

if not assigned(FSource) or (FLeft < 1) then exit;

case QueuePolicy of
case FQueuePolicy of

{ Wait: only pull more data through the pipe if we're ready }
QPWait: begin
FillLeft := FFillLevel - FCount;
if FillLeft < 1 then FillLeft := 0;
if Ready and (FillLeft > 0) then begin
len := FSource.Dequeue(Pipe, FillLeft);
Enqueue(Pipe, len);
if len > 0 then Enqueue(Pipe, len);
end;
end;

{ None: always attempt to pull data from source }
QPNone: begin
len := FSource.Dequeue(Pipe, FLeft);
Enqueue(Pipe, len);
if len > 0 then Enqueue(Pipe, len);
end;

end;
Expand All @@ -282,15 +303,15 @@ function TBytePump.FEnqueue(var src: array of byte; len: Integer): Integer;

if len < 1 then exit;

if (QueuePolicy = QPWait) and (not Ready) then exit;
if (FQueuePolicy = QPWait) and (not Ready) then exit;

{ no space left }
if FLeft <= 0 then exit;
{ some space left but not enough - only enqueue what we can }
if FLeft < len then len := FLeft;

{ only fill up to fill level, no more }
if QueuePolicy = QPWait then
if FQueuePolicy = QPWait then
begin
if ((len + FCount) >= FFillLevel) then len := FFillLevel - FCount;
{ queue fill level reached }
Expand Down Expand Up @@ -320,7 +341,7 @@ function TBytePump.FEnqueue(var src: array of byte; len: Integer): Integer;
Dec(FLeft, len);

{ we reached our fill level - stop pumping notify }
if (QueuePolicy = QPWait) and (FCount >= FFillLevel) then
if (FQueuePolicy = QPWait) and (FCount >= FFillLevel) then
begin
FReady := False;
WakeUpTimer.Enabled := True;
Expand Down Expand Up @@ -371,7 +392,7 @@ function TBytePump.FDequeue(var dst: array of byte; len: Integer): Integer;
Inc(FLeft, len);

{ queue emptied - delay notification if delay specified }
if (QueuePolicy = QPWait) and (FCount = 0) then
if (FQueuePolicy = QPWait) and (FCount = 0) then
begin
if Delay > 0 then
begin
Expand Down

0 comments on commit 5e0ad2c

Please sign in to comment.