Перейти к содержанию
Fire Monkey от А до Я
  • 0

TThreadedQueue некорректное "ожидание" данных очереди


ddr 2

Вопрос

Добрый день!

Заметил неприятный эффект(баг, особенность либо моё недопонимание чего либо). Через некоторое непродолжительно время работы PopItem( в версии возвращающей TWaitResult) перестаёт ожидать данные из очереди, и перестаёт  "усыплять" на(до) TThreadedQueue.FPopTimeout мсек,  поток из которого была вызвана, а возвращает wrTimeout без ожидания(менее 100 тактов). Ниже тестовый код, который демонстрирует этот эффект.

Описание теста: основной поток, создаёт поток с условным названием FirstDepthThread, который:

 -запускает MaxThread(константа)+1 потоков( по умолчанию их 10) с условными названиями SecondDepthThread_X ;

- c интервалом в 2-5 секунды в каждую из очередей записывается константный идентификатор. Кол-во очередей =MaxThread+1(в каждом из  потоков SecondDepthThread_X есть поле с очередью) .

Функционал потоков SecondDepthThread_X - ждать данных извлекать их  из очереди. 

В Procedure TSecondDepthThread.Execute есть 2-а условия( помечены в тексте, как {жучок 1} и {жучок 2}), которые в моём понимании никогда не должны выполняться, но...выполняются. Понимание, как обойти этот эффект есть, но очень хочется разобраться, что не так с указанной реализацией? Буду рад мнениям.

Условия тестирования: Delphi 10.3.1., W10.1903. Так же рекомендуется попробовать разные значения MaxThread( при MaxThread=2, эффект может и не наступить,при MaxThread= 9, на 3-х разные cpu Intel эффект наступает через ,5-5 секунд, на стареньком AMD FX-8300 лишь через 2-10 минут. при MaxThread=14, AMDешный cpu так же "сдаётся" за несколько секунд). В отладчике смотрим окно Events. Если без отладчика, то в виндовом диспетчере задач, как только загрузка процессора у данного процесс подскочила от 0 к 100%, значит эффект достигнут.

unit UThreadedQueue;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs,System.Generics.Collections;

Const MaxThread=9;

type
 TSecondDepthThread=class(TThread)
   Protected
    FThreadNomber:integer;
    FError:boolean;
    Procedure Execute;override;
   Public
    TickSending:Cardinal;
    TickInterval:Cardinal;
    SendingDataQuery:TThreadedQueue<TBytes>;
    constructor Create(AThreadCount:integer);
    Destructor Destroy;override;
    Property ThreadNomber:integer read FThreadNomber;
 end;

 TFirstDepthThread=class(TThread)
   Protected
    FSecondDepthThreads:array[0..MaxThread] of TSecondDepthThread;
    Procedure Execute;override;
   Public
    SendingDataQuery:TThreadedQueue<TBytes>;
    constructor Create;
     Destructor Destroy;override;
 end;

  TForm1 = class(TForm)
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
  private
   FFirstDepthThread:TFirstDepthThread;
   { Private declarations }
  public
    { Public declarations }
  end;

var
  Form1: TForm1;

implementation
uses SyncObjs,TypInfo;
{$R *.dfm}

Constructor TSecondDepthThread.Create(AThreadCount: Integer);
begin
 FError:=false;
 TickInterval:=2000+random(3000);
 FThreadNomber:=AThreadCount;
 SendingDataQuery:=TThreadedQueue<TBytes>.Create(100, 1, 1); //инициализация очереди
 FreeOnTerminate := false;
 Inherited Create(false);
end;

Destructor TSecondDepthThread.Destroy;
begin
 FreeAndNil(SendingDataQuery);
 inherited;
end;

Procedure TSecondDepthThread.Execute;
var iCounterPerSec,TimeStart, TimeFinish: TLargeInteger;
    QSize:integer;
    WaitResult:TWaitResult;
    ReceivedData:TBytes;
    ElapsedTime:double;
begin
 NameThreadForDebugging('SecondDepthThread_'+inttostr(FThreadNomber));
 while (not Terminated)do
  begin
    QueryPerformanceFrequency(iCounterPerSec);
    QueryPerformanceCounter(TimeStart);
    WaitResult:=SendingDataQuery.PopItem(QSize,ReceivedData);
    QueryPerformanceCounter(TimeFinish);
    ElapsedTime:= (TimeFinish - TimeStart)/iCounterPerSec;
    if (ElapsedTime<0.000082)and(WaitResult=TWaitResult.wrTimeout)and(not FError)  then
     begin   {жучок 1}
      OutputDebugString(pchar('Поток№ '+inttostr(FThreadNomber)+ ' Всё.., началось! Ответ(состояние) очереди='+
                        GetEnumName(TypeInfo(TWaitResult), Ord(WaitResult))+' выполнение за '+
                        FormatFloat('0.000000', ElapsedTime) + ' сек., а должно быть более 0.001 сек'));
      FError:=true; // дальше сообщения с wrTimeout не выводим, т.к. их слишком много
     end;
   if (WaitResult=TWaitResult.wrSignaled)and(length(ReceivedData)>0) then
    begin
     if ReceivedData[0]=100+FThreadNomber then
       {OutputDebugString(pchar('Поток№ '+inttostr(FThreadNomber)+ ' ,есть корректные данные'))};

     if ReceivedData[0]<>100+FThreadNomber then  {жучок 2}
       OutputDebugString(pchar('Поток№ '+inttostr(FThreadNomber)+ 'некорректные данные, должно прийти'+
                         inttostr(100+FThreadNomber)+' ,а пришло '+inttostr(ReceivedData[0])));

     if ReceivedData[0]=0 then {подвид 2-го жука}
       OutputDebugString(pchar('Поток№ '+inttostr(FThreadNomber)+ ' Такого быть не должно! Ответ(состояние) очереди='+
                         GetEnumName(TypeInfo(TWaitResult), Ord(WaitResult))+
                         ' сигнал о наличии данных есть, а данных нет,'+
                         ' выполнение за '+FormatFloat('0.000000', ElapsedTime) ));
      ReceivedData[0]:=0; // для контроля извлекаемых из очереди данных.
    end
  end;
end;

Constructor TFirstDepthThread.Create;
Var Count:integer;
begin
 for count := 0 to MaxThread do FSecondDepthThreads[Count]:=TSecondDepthThread.Create(Count);
 FreeOnTerminate := false;
 Inherited Create(false);
end;

Destructor TFirstDepthThread.Destroy;
Var Count:integer;
begin
 for Count := 0 to MaxThread do
  begin
   FSecondDepthThreads[Count].Terminate;
   FSecondDepthThreads[Count].WaitFor;
   FSecondDepthThreads[Count].Free;
  end;
 inherited;
end;

Procedure TFirstDepthThread.Execute;
var Count:integer;
    SendingData:Tbytes;
begin
 NameThreadForDebugging('FirstDepthThread');
 Setlength(SendingData,1);
 while not terminated  do
  begin
   for Count := 0 to MaxThread do with FSecondDepthThreads[Count] do
    if GetTickCount-TickSending>TickInterval then
      begin
       SendingData[0]:=100+ThreadNomber;
       SendingDataQuery.PushItem(SendingData);
       TickSending:=GetTickCount;
      end;
   sleep(1);
   //yield;
  end;
end;

procedure TForm1.FormCreate(Sender: TObject);
begin
 FFirstDepthThread:=TFirstDepthThread.Create;
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
 FFirstDepthThread.Terminate;
 FFirstDepthThread.WaitFor;
 FFirstDepthThread.Free;
end;

end.

 

Ссылка на комментарий

Рекомендуемые сообщения

  • 0

Вам нужно провести чистый эксперимент на простом приложении и простых условиях. И не использовать Double для условий - лучше использовать целочисленные переменные, чтоб быть уверенным в отсутствии накопления ошибки.

Еще важный момент - создание очереди

TThreadedQueue<TBytes>.Create(100, 1, 1)

не совсем корректно. Вы ставите одинаковый и минимально возможный таймаут на pop и push, и рискуете словить взаимную блокировку. Пока идет push невозможен pop, и наоборот. Обычно, в зависимости от условий, ставят одно гораздо больше другого, к примеру если важна целостность подаваемых данных, то push 1000, а если нужен быстрый поток (который параллельно делает что то еще), то pop 10. Или если поток только обрабатывает входящие данные, то pop 3600 * 24, чтоб он зря не крутился без данных ))) 

Сейчас у меня приложение на Delphi 10.3.3 крутится месяц без остановки, архитектура - Основной поток -> Поток получения данных -> Поток обработки данных -> 100-200 потоков исполнителей. Весь обмен на очередях. Плюс поток логов и поток мониторинга (они получают данные, каждый по своей очереди, со всех потоков. 

И никаких аномалий я не заметил, все работает как часы.

Ссылка на комментарий
  • 0

А вот я и не прав! Теперь я тоже проклят и напоролся на точно такую же проблему. 

Действительно, PopItem в непредсказуемый момент изменяет поведение - любой таймаут (от 1 до INFINITE) превращается в 0.

И похоже проблеме уже 2 года, но Эмбаркадера делает вид что в " я в домике". Т.е. полностью игнорирует проблему. Дело похоже как обычно в злощастном TMonitor - то одна проблема, то другая, то опять внедряют предыдущую проблему.

Вот найденные ссылки по этой проблеме:

https://forums.embarcadero.com/thread.jspa?messageID=941762

https://quality.embarcadero.com/browse/RSP-19993

А вот тут речь тоже о той же самой проблеме, дата 2012 год

https://delphihaven.wordpress.com/2012/01/30/fixing-tthreadedqueue-or-in-other-words-tmonitor-again/

Ссылка на комментарий
  • 0

Форум жив! Это радует. Ответ, через 4 месяца, лучше, чем без ответа вообще (без сарказма). Благодарю за внимание! Я долго разбирался с ранее описанными проблемами. И в некотором роде разобрался. Пусть останется для истории. Ранее я несколько сумбурно описал проблемы при использовании TThreadedQueue, отчасти это связано, что на тот момент я не понимал природу этих проблем. Итак имеем:

-Проблема 1: «замусоривание» данных при использовании TThreadedQueue<T> , где <T>=Tbytes; (и как выяснилось любой динамический массив). Ранее я это называл {жучок 2}

-Проблема 2: TThreadedQueue.PopItem при определенных обстоятельствах перестаёт «ожидать» данные заданное кол-во времени соответственно не «усыпляет» поток в котором он был вызван. Ранее я это называл {жучок 1}

Начну с 2-ой проблемы. Пытаясь разобраться, я пошел по пути написать своей TThreadedQueue. Что и было сделано. Итак, оригинальный TThreadedQueue реализован через TMonitor, который в WIN32\64 реализуется через критическую секцию. Т.к. меня в первую очередь интересует windows платформа, то свой TThreadedQueue я реализовал сразу на критической секции. Далее изучая исходники TThreadedQueue, я выяснил, что PopItem реализован через WINAPIшные WaitForSingleObject(WaitForMultipleObjects) у которых один из параметров dwMilliseconds(время ожидания в мсек, которое передаётся из конструктора TThreadedQueue). И как выяснилось именно эти функции реализуют описанную проблему. В разных версиях Windows и при разном кол-ве ядер CPU функции ведут себя по-разному в плане «ожидания». Далее не буду углубляться… я пришел к выводу, что такой подход имеет смысл. Правильный подход это всегда использовать  dwMilliseconds= INFINITE. И за слишком частые вызовы WaitForMultipleObjects с параметром ожидания 1мсек, или 100мсек, Microsoft попросту «наказывает» программиста за неэффективное проектирование кода. Пробовал экспериментировать с задержками, в поиске найти «безопасную» задержку, так вот безопасная это только INFINITE. На 500мсек., при 50 потоках ждать проблему приходиться до 8 минут. При 1 секунде- не дождался, но… скорее всего если оставить программу работать на несколько суток, то эффект был бы получен. Возвращаясь к бесконечной задержке ожидания, в числе прочего меня к ней подтолкнул исходники TThreadedQueue. Если без подробностей, то в нем есть публичный метод  DoShutDown, он позволяет «выйти» из бесконечного ожидания  PopItem, при этом вызывать его естественно нужно не из потока «читателя». DoShutDown – необходимо использовать ВСЕГДА, при использовании TThreadedQueue. А использование бесконечной задержки – гарантия, что проблема не возникнет. Да проблемы проявляется скорее в «экстремальной» нагрузке, при 1-2 «читателей-писателей» или при низкой активности «писателей» проблема может и не проявиться, но лучше не рисковать. Фактически данная особенность не имеет отношение ни к Delphi ни к реализации TThreadedQueue, это WINAPI и возможно, что на других платформах этого не будет.

Что касается 1-ой проблемы с «замусориванием» данных, то решение есть – никогда не использовать динамические массивы в TThreadedQueue<T>, а значит и в любом многопоточном приложении, а т.е. в любом…  Поясню. Ранее написанный пример демонстрирует «замусоривание» данных. Если в примере заменить TBytes на  статический массив array[0..2048]of byte – проблема исчезает. Так же, если «обернуть» TBytes в class, то проблемы так же исчезает. Фактически динамические массивы нужно использовать очень осторожно, они не потокобезопасны даже при использовании критических секций. Это очень печальная «особенность» в Delphi. Опять же проявляется только под «экстремальной» нагрузкой.

Ссылка на комментарий

Присоединяйтесь к обсуждению

Вы можете написать сейчас и зарегистрироваться позже. Если у вас есть аккаунт, авторизуйтесь, чтобы опубликовать от имени своего аккаунта.

Гость
Ответить на вопрос...

×   Вставлено с форматированием.   Вставить как обычный текст

  Разрешено использовать не более 75 эмодзи.

×   Ваша ссылка была автоматически встроена.   Отображать как обычную ссылку

×   Ваш предыдущий контент был восстановлен.   Очистить редактор

×   Вы не можете вставлять изображения напрямую. Загружайте или вставляйте изображения по ссылке.

  • Последние посетители   0 пользователей онлайн

    • Ни одного зарегистрированного пользователя не просматривает данную страницу
×
×
  • Создать...