Перейти к содержанию
  • Регистрация
  • 0
Авторизация  
ddr 2

TThreadedQueue некорректное "ожидание" данных очереди

Вопрос

Добрый день!

Заметил неприятный эффект(баг, особенность либо моё недопонимание чего либо). Через некоторое непродолжительно время работы PopItem( в версии возвращающей TWaitResult) перестаёт ожидать данные из очереди, и перестаёт  "усыплять" на(до) TThreadedQueue.FPopTimeout мсек,  поток из которого была вызвана, а возвращает wrTimeout без ожидания(менее 100 тактов). Ниже тестовый код, который демонстрирует этот эффект.

Описание теста: основной поток, создаёт поток с условным названием FirstDepthThread, который:

 -запускает MaxThread(константа)+1 потоков( по умолчанию их 10) с условными названиями SecondDepthThread_X ;

- c интервалом в 2-5 секунды в каждую из очередей записывается константный идентификатор. Кол-во очередей =MaxThread+1(в каждом из  потоков SecondDepthThread_X есть поле с очередью) .

Функционал потоков SecondDepthThread_X - ждать данных извлекать их  из очереди. 

В Procedure TSecondDepthThread.Execute есть 2-а условия( помечены в тексте, как {жучок 1} и {жучок 2}), которые в моём понимании никогда не должны выполняться, но...выполняются. Понимание, как обойти этот эффект есть, но очень хочется разобраться, что не так с указанной реализацией? Буду рад мнениям.

Условия тестирования: Delphi 10.3.1., W10.1903. Так же рекомендуется попробовать разные значения MaxThread( при MaxThread=2, эффект может и не наступить,при MaxThread= 9, на 3-х разные cpu Intel эффект наступает через ,5-5 секунд, на стареньком AMD FX-8300 лишь через 2-10 минут. при MaxThread=14, AMDешный cpu так же "сдаётся" за несколько секунд). В отладчике смотрим окно Events. Если без отладчика, то в виндовом диспетчере задач, как только загрузка процессора у данного процесс подскочила от 0 к 100%, значит эффект достигнут.

unit UThreadedQueue;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs,System.Generics.Collections;

Const MaxThread=9;

type
 TSecondDepthThread=class(TThread)
   Protected
    FThreadNomber:integer;
    FError:boolean;
    Procedure Execute;override;
   Public
    TickSending:Cardinal;
    TickInterval:Cardinal;
    SendingDataQuery:TThreadedQueue<TBytes>;
    constructor Create(AThreadCount:integer);
    Destructor Destroy;override;
    Property ThreadNomber:integer read FThreadNomber;
 end;

 TFirstDepthThread=class(TThread)
   Protected
    FSecondDepthThreads:array[0..MaxThread] of TSecondDepthThread;
    Procedure Execute;override;
   Public
    SendingDataQuery:TThreadedQueue<TBytes>;
    constructor Create;
     Destructor Destroy;override;
 end;

  TForm1 = class(TForm)
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
  private
   FFirstDepthThread:TFirstDepthThread;
   { Private declarations }
  public
    { Public declarations }
  end;

var
  Form1: TForm1;

implementation
uses SyncObjs,TypInfo;
{$R *.dfm}

Constructor TSecondDepthThread.Create(AThreadCount: Integer);
begin
 FError:=false;
 TickInterval:=2000+random(3000);
 FThreadNomber:=AThreadCount;
 SendingDataQuery:=TThreadedQueue<TBytes>.Create(100, 1, 1); //инициализация очереди
 FreeOnTerminate := false;
 Inherited Create(false);
end;

Destructor TSecondDepthThread.Destroy;
begin
 FreeAndNil(SendingDataQuery);
 inherited;
end;

Procedure TSecondDepthThread.Execute;
var iCounterPerSec,TimeStart, TimeFinish: TLargeInteger;
    QSize:integer;
    WaitResult:TWaitResult;
    ReceivedData:TBytes;
    ElapsedTime:double;
begin
 NameThreadForDebugging('SecondDepthThread_'+inttostr(FThreadNomber));
 while (not Terminated)do
  begin
    QueryPerformanceFrequency(iCounterPerSec);
    QueryPerformanceCounter(TimeStart);
    WaitResult:=SendingDataQuery.PopItem(QSize,ReceivedData);
    QueryPerformanceCounter(TimeFinish);
    ElapsedTime:= (TimeFinish - TimeStart)/iCounterPerSec;
    if (ElapsedTime<0.000082)and(WaitResult=TWaitResult.wrTimeout)and(not FError)  then
     begin   {жучок 1}
      OutputDebugString(pchar('Поток№ '+inttostr(FThreadNomber)+ ' Всё.., началось! Ответ(состояние) очереди='+
                        GetEnumName(TypeInfo(TWaitResult), Ord(WaitResult))+' выполнение за '+
                        FormatFloat('0.000000', ElapsedTime) + ' сек., а должно быть более 0.001 сек'));
      FError:=true; // дальше сообщения с wrTimeout не выводим, т.к. их слишком много
     end;
   if (WaitResult=TWaitResult.wrSignaled)and(length(ReceivedData)>0) then
    begin
     if ReceivedData[0]=100+FThreadNomber then
       {OutputDebugString(pchar('Поток№ '+inttostr(FThreadNomber)+ ' ,есть корректные данные'))};

     if ReceivedData[0]<>100+FThreadNomber then  {жучок 2}
       OutputDebugString(pchar('Поток№ '+inttostr(FThreadNomber)+ 'некорректные данные, должно прийти'+
                         inttostr(100+FThreadNomber)+' ,а пришло '+inttostr(ReceivedData[0])));

     if ReceivedData[0]=0 then {подвид 2-го жука}
       OutputDebugString(pchar('Поток№ '+inttostr(FThreadNomber)+ ' Такого быть не должно! Ответ(состояние) очереди='+
                         GetEnumName(TypeInfo(TWaitResult), Ord(WaitResult))+
                         ' сигнал о наличии данных есть, а данных нет,'+
                         ' выполнение за '+FormatFloat('0.000000', ElapsedTime) ));
      ReceivedData[0]:=0; // для контроля извлекаемых из очереди данных.
    end
  end;
end;

Constructor TFirstDepthThread.Create;
Var Count:integer;
begin
 for count := 0 to MaxThread do FSecondDepthThreads[Count]:=TSecondDepthThread.Create(Count);
 FreeOnTerminate := false;
 Inherited Create(false);
end;

Destructor TFirstDepthThread.Destroy;
Var Count:integer;
begin
 for Count := 0 to MaxThread do
  begin
   FSecondDepthThreads[Count].Terminate;
   FSecondDepthThreads[Count].WaitFor;
   FSecondDepthThreads[Count].Free;
  end;
 inherited;
end;

Procedure TFirstDepthThread.Execute;
var Count:integer;
    SendingData:Tbytes;
begin
 NameThreadForDebugging('FirstDepthThread');
 Setlength(SendingData,1);
 while not terminated  do
  begin
   for Count := 0 to MaxThread do with FSecondDepthThreads[Count] do
    if GetTickCount-TickSending>TickInterval then
      begin
       SendingData[0]:=100+ThreadNomber;
       SendingDataQuery.PushItem(SendingData);
       TickSending:=GetTickCount;
      end;
   sleep(1);
   //yield;
  end;
end;

procedure TForm1.FormCreate(Sender: TObject);
begin
 FFirstDepthThread:=TFirstDepthThread.Create;
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
 FFirstDepthThread.Terminate;
 FFirstDepthThread.WaitFor;
 FFirstDepthThread.Free;
end;

end.

 

Поделиться сообщением


Ссылка на сообщение
Поделиться на другие сайты

Рекомендуемые сообщения

Ответы на этот вопрос пока отсутствуют

Присоединяйтесь к обсуждению

Вы можете написать сейчас и зарегистрироваться позже. Если у вас есть аккаунт, авторизуйтесь, чтобы опубликовать от имени своего аккаунта.

Гость
Ответить на вопрос...

×   Вставлено с форматированием.   Вставить как обычный текст

  Разрешено использовать не более 75 эмодзи.

×   Ваша ссылка была автоматически встроена.   Отображать как обычную ссылку

×   Ваш предыдущий контент был восстановлен.   Очистить редактор

×   Вы не можете вставлять изображения напрямую. Загружайте или вставляйте изображения по ссылке.

Авторизация  

  • Последние посетители   0 пользователей онлайн

    Ни одного зарегистрированного пользователя не просматривает данную страницу

×
×
  • Создать...