Landman Code Exploring my outer regions of coding.

A blog about C#, Delphi, assembler and general developer stuff.

Landman Code Exploring my outer regions of coding.

A blog about C#, Delphi, assembler and general developer stuff.

Thread safe TStack (TThreadStack)

This post was migrated from my old blog delphi-snippets.blogspot.com, for explanation about this switch see my introduction post.

Recently i've been doing a lot of multithreading, and luckily Delphi provides with a handy bunch of classes to make the developers life easier. You've got some basic synchronization classes (TMutex, TEvent, TCriticalSection, TMultiReadExclusiveWriteSynchronizer...) and an basic data container (TThreadList). But for one program I needed a thread safe stack. In this post I will describe how I created my own.

There is not thread safe stack in Borland Turbo Delphi 2006, so off course I started with a Google for delphi TThreadStack, which at this time gives zero results. Searching the newsgroups I found an interesting group called comp.programming.threads, searching that group I found one Pascal Lock-Free stack (which basically means not using a critical section, or any other mechanism, to lock the data). But after testing it, FastMM pointed out an memory leak. I provided the author with a test case to cause the memory leak, but after 5 months no reply.

But I'm a hard person to please, and although it is indeed lock free, I was wondering if in my situation this lock-free solution wasn't to complicated and perhaps slower. So I created an simple TThreadStack from looking at the principle of the TThreadList (Very simple!), and compared it with the lock-free solution.

The source demonstrates is the test project.

program Project2;

{$APPTYPE CONSOLE}

uses
  FastMM4,
  unThreadStack,
  FreeStack, Math,
  Windows, SysUtils, Classes;
const
  NumberOfAllocations = 1000;
  PopTimeOut = 1;
  PushTimeOut = 1;
  NumberOfThreads = 8; // must be an multiple of 4
  NumberOfTest = 10;
type
  TTestRec = packed record
    BigField: array[0..254] of Char;
    SmallerField: Extended;
    SmallField: Byte;
  end;
  PTestRec = ^TTestRec;

  TFreeStackThread = class(TThread)
    FPopper: Boolean;
    FDestination: TFreeStack;
    FFinished: THandle;
  protected
    procedure Execute; override;
  public
    constructor Create(ADestination: TFreeStack; APopper: Boolean; AFinished: THandle);
  end;

  TThreadStackThread = class(TThread)
    FPopper: Boolean;
    FDestination: TThreadStack;
    FFinished: THandle;
  protected
    procedure Execute; override;
  public
    constructor Create(ADestination: TThreadStack; APopper: Boolean; AFinished: THandle);
  end;

  { TThreadStackThread }

constructor TThreadStackThread.Create(ADestination: TThreadStack;
  APopper: Boolean; AFinished: THandle);
begin
  FDestination := ADestination;
  FPopper := APopper;
  FFinished := AFinished;
  inherited Create(False);
end;

procedure TThreadStackThread.Execute;
var
  TempStack: TStack;
  p: PTestRec;
  counter: Int64;
  temp: LongWord;
begin
  counter := 0;
  while (not Terminated) and (counter < NumberOfAllocations) do
  begin
    TempStack := FDestination.LockStack;
    try
      if FPopper then
      begin
        if TempStack.Count > 0 then
        begin
          Dispose(PTestRec(TempStack.Pop));
          inc(counter);
        end;
      end
      else
      begin
        New(p);
        TempStack.Push(p);
        inc(counter);
      end;
    finally
      FDestination.UnlockStack;
    end;
    if FPopper then
      Sleep(PopTimeOut)
    else
      Sleep(PushTimeOut);
  end;
  ReleaseSemaphore(FFinished, 1, @temp);
end;

{ TFreeStackThread }

constructor TFreeStackThread.Create(ADestination: TFreeStack; APopper: Boolean; AFinished: THandle);
begin
  FDestination := ADestination;
  FPopper := APopper;
  FFinished := AFinished;
  inherited Create(False);
end;

procedure TFreeStackThread.Execute;
var
  p: PTestRec;
  counter: Int64;
  temp: LongWord;
begin
  counter := 0;
  p := nil;
  while (not Terminated) and (counter < NumberOfAllocations) do
  begin
    if FPopper then
    begin
      if FDestination.Count > 0 then
      begin
        if FDestination.Pop(TObject(p)) then
        begin
          Dispose(p);
          inc(counter);
        end;
      end;
    end
    else
    begin
      if p = nil then
        New(p);
      if FDestination.Push(TObject(p)) then
      begin
        p := nil;
        inc(counter);
      end;
    end;
    if FPopper then
      Sleep(PopTimeOut)
    else
      Sleep(PushTimeOut);
  end;
  ReleaseSemaphore(FFinished, 1, @temp);
end;

var
  AThreadStacksTests: array[0..NumberOfThreads - 1] of TThreadStackThread;
  AFreeStacksTests: array[0..NumberOfThreads - 1] of TFreeStackThread;
  i, j: Integer;
  Start, Stop, Freq: Int64;
  ThreadStacks: array[0..1] of TThreadStack;
  FreeStacks: array[0..1] of TFreeStack;
  ResultThread: array[0..NumberOfTest - 1] of Double;
  ResultFree: array[0..NumberOfTest - 1] of Double;
  Finished: THandle;
  Mean, StdDev: Extended;
begin
  ThreadStacks[0] := TThreadStack.Create;
  ThreadStacks[1] := TThreadStack.Create;
  FreeStacks[0] := TFreeStack.Create;
  FreeStacks[1] := TFreeStack.Create;
  Finished := CreateSemaphore(nil, 0, NumberOfThreads, 'Thread runners');
  Writeln('Starting ThreadStack threads');
  for j := 0 to NumberOfTest - 1 do
  begin
    QueryPerformanceCounter(Start);
    for I := 0 to NumberOfThreads - 1 do
      AThreadStacksTests[i] := TThreadStackThread.Create(ThreadStacks[i mod 2], (i mod 4) >= 2, Finished);
    for I := 0 to NumberOfThreads - 1 do
      WaitForSingleObject(Finished, INFINITE);
    QueryPerformanceCounter(Stop);
    for I := 0 to NumberOfThreads - 1 do
      AThreadStacksTests[i].Free;
    ResultThread[j] := Stop - Start;
  end;
  Writeln('ThreadStack done.');
  Writeln('Starting TFreeStack threads');
  for j := 0 to NumberOfTest - 1 do
  begin
    QueryPerformanceCounter(Start);
    for I := 0 to NumberOfThreads - 1 do
      AFreeStacksTests[i] := TFreeStackThread.Create(FreeStacks[i mod 2], (i mod 4) >= 2, Finished);
    for I := 0 to NumberOfThreads - 1 do
      WaitForSingleObject(Finished, INFINITE);
    QueryPerformanceCounter(Stop);
    for I := 0 to NumberOfThreads - 1 do
      AFreeStacksTests[i].Free;
    ResultFree[j] := Stop - Start;
  end;
  Writeln('TFreeStack done.');
  Writeln(Format('Calculating the mean and the standard deviation out of %d runs.', [NumberOfTest]));
  QueryPerformanceFrequency(Freq);
  MeanAndStdDev(ResultThread, Mean, StdDev);
  Writeln(Format('TThreadStack: %f (%f)', [Mean, StdDev]));
  Writeln(Format('TThreadStack: %fms (%fms)', [Mean / (Freq / 1000), StdDev / (Freq / 1000)]));
  MeanAndStdDev(ResultFree, Mean, StdDev);
  Writeln(Format('TFreeStack:   %f (%f)', [Mean, StdDev]));
  Writeln(Format('TFreeStack:   %fms (%fms)', [Mean / (Freq / 1000), StdDev / (Freq / 1000)]));
  if DebugHook <> 0 then
    Readln;
  { freeing everything}
  ThreadStacks[0].Free;
  ThreadStacks[1].Free;
  FreeStacks[0].Free;
  FreeStacks[1].Free;
  CloseHandle(Finished);
end.


This one also creates the memory leak. Running the code on a P4 2.26 returns this result:

Starting ThreadStack threads
ThreadStack done.
Starting TFreeStack threads
TFreeStack done.
Calculating the mean and the standard deviation out of 10 runs.
TThreadStack: 7152374,20 (26574,65)
TThreadStack: 1998,12ms (7,42ms)
TFreeStack: 7339945,80 (310570,01)
TFreeStack: 2050,52ms (86,76ms)

So the TFreestack is all most the same speed as the simple critical section based TThreadStack, although I know now the test isn't what you'd call regular, because it's constantly trying to push and pop. So perhaps a more normal situation would result differently. But in my program this situation was expected.

I also liked the simplicity of the TThreadStack above the complexity of the FreeStack, were you had to compile a piece TASM (containing the CAS) for it to work. Another show stopper was the memory leak. But actually I just wanted to post my simple TThreadStack so that next time I have to use it, I can just wander to my own blog. So without further ado, I bring you the following source.

unit unThreadStack;

interface
uses
  Windows, Contnrs;
type
  TStack = Contnrs.TStack;
  TThreadStack = class
  private
    FStack: TStack;
    FLock :TRTLCriticalSection;
  public
    constructor Create();
    destructor Destroy;  override;
    function LockStack : TStack;
    procedure UnlockStack;
    function Count: Integer;
    function Push(AItem: Pointer): Pointer;
    function Pop: Pointer;
    function Peek: Pointer;
  end;

implementation

{ TThreadStack }

function TThreadStack.LockStack: TStack;
begin
  EnterCriticalSection(FLock);
  Result := FStack;
end;

function TThreadStack.Count: Integer;
begin
  EnterCriticalSection(FLock);
  try
    Result := FStack.Count;
  finally
    LeaveCriticalSection(FLock);
  end;
end;

constructor TThreadStack.Create();
begin
  inherited Create();
  InitializeCriticalSection(FLock);
  FStack := Contnrs.TStack.Create;
end;

destructor TThreadStack.Destroy;
begin
  DeleteCriticalSection(FLock);
  FStack.Free;
  inherited;
end;

function TThreadStack.Peek: Pointer;
begin
  EnterCriticalSection(FLock);
  try
    Result := FStack.Peek;
  finally
    LeaveCriticalSection(FLock);
  end;
end;

function TThreadStack.Pop: Pointer;
begin
  EnterCriticalSection(FLock);
  try
    Result := FStack.Pop;
  finally
    LeaveCriticalSection(FLock);
  end;
end;

function TThreadStack.Push(AItem: Pointer): Pointer;
begin
  EnterCriticalSection(FLock);
  try
    Result := FStack.Push(AItem);
  finally
    LeaveCriticalSection(FLock);
  end;
end;

procedure TThreadStack.UnlockStack;
begin
  LeaveCriticalSection(FLock);
end;

end.

If you use this source, I would like it if you just left a comment on this blog.

Tags: , ,

Post a Comment