Перейти к содержанию
Fire Monkey от А до Я
  • 0

Быстродействие при использовании TCrititcalSection и TThread


Камышев Александр

Вопрос

Windows, FMX

Возможно не совсем в тему форума, вопрос по архитектуре серверных служб, хотелось бы услышать мнения.

Ситуация:

IdHTTTPServer на каждый запрос создает поток, в этом потоке не обойтись без обращения к пулу данных в памяти. Пул - несколько наборов актуальных данных. Наборы данных асинхронно получаются из БД, имеют связи многие ко многим, один ко многим и периодически кэшируются в память в основном потоке. Т.к. обращение к пулу из потока - соответственно пул должен быть потокозащищенным. После обработки запроса, данные также отправляются в  основном потоке в очередь БД.

1. Если весь пул закрыть в TCriticalSection - то на время использования его одним потоком все остальные будут ожидать. Обращение к очереди БД из потока получается также должно быть потокозащищенным. Не изящно.

2. Можно задачу обработки положить в некую потокозащищенную очередь и остановить поток c помощью TSimpleEvent->WaitFor( INFINITE ). Далее в основном потоке работать с пулом данных и очередью БД без критических сессий и, после получения результата, запустить поток SetEvent(). Код будет проще и понятней, однако задачи будут выполняться синхронно, последовательно как и в первом случае.

3. Можно закрывать TCriticalSection отдельные наборы данных, это возможно несколько увеличит быстродействие (не факт!), усложнит код и увеличит вероятность deadlock, т.к. для обработки одного запроса используется несколько наборов данных. Deadlock не будет, если перед обращением к следующему набору ( critical_section->Enter() ) копировать что необходимо из предыдущего и отпускать его ( critical_section->Leave() ) - тут становится важна стоимость операторов копирования.При больших объемах, стоимость копирования может перекрыть весь профит от частного обращения к наборам.

TThread полезно использовать при длительных операциях ввода вывода и ресурсоемких операциях, т.е. когда нужно подождать, не останавливая основной поток. Выигрыша в производительности полагаю нет, к тому же переключения критических секций также имеют стоимость. 

Вопросы:

1. Какой вариант предпочтительней? Есть стандартные схемы?

2. Как влияет количество ядер, процессоров, на быстродействие во втором и третьем случае?

Ссылка на комментарий

Рекомендуемые сообщения

  • 0

Вот вам наглядная причина, от которой открещиваются разработчики индейцев. Они пишут "создание потока на каждый чих - это нормально, в операционной системе толпы потоков, одним больше-одним меньше разницы нет".

Представьте: 1500 потоков. Помимо того, что создание потока сама по себе затратная операция, на каждый из них выделяются системные ресурсы, в том числе - память. Каждый поток инициализирует свои данные, это еще память. По завершению обработки поток скорее всего не убьется сразу, а пройдет энное количество миллисекунд (чем загруженнее процессор - тем больше времени). Вот вам и постепенное накопление требуемой памяти.

Скорее всего, в ProcessExplorer вы увидите, что ваш процесс "сожрал" гектара полтора в PrivateBytes.

 

Собственно, об этом я раньше уже писал. В этой теме.

Изменено пользователем kami
Ссылка на комментарий
  • 0
4 минуты назад, kami сказал:

Скорее всего, в ProcessExplorer вы увидите, что ваш процесс "сожрал" гектара полтора в PrivateBytes.

в процессах пиковая память 80м - ни о чём.

http://www.transl-gunsmoker.ru/2010/09/windows-2000.html тут пишут что в принципе больше 2k  средов один процесс не может сделать.

проблема не в этом, винда при вызове API вернула ошибку - ок обработали, работаем дальше, так нет в ошибку доступа и крашиться процесс, это беда библиотеки

Ссылка на комментарий
  • 0

Ограничте количество соединений самостоятельно разумной цифрой, при попытке соединения сверх лимита отдавайте в хеадере редирект на следующий сервер в кластере. Если конечно есть следующий сервер...

Ссылка на комментарий
  • 0
Только что, Евгений Корепов сказал:

Ограничте количество соединений самостоятельно разумной цифрой, при попытке соединения сверх лимита отдавайте в хеадере редирект на следующий сервер в кластере. Если конечно есть следующий сервер...

инди создает срэд перед вызовом OnCommandGet, есть HeadersAvailable но оттуда редиректнуть нельзя, а смысл перенаправлять если ресурсы уже заняты...

MaxConnections есть ограничение в IdHTTTPServer, и да, разумная цифра 1024 там не будет лишней.

Ссылка на комментарий
  • 0
6 минут назад, Камышев Александр сказал:

в процессах пиковая память 80м - ни о чём.

Я бы не стал доверять стандартному диспетчеру задач. Не знаю, что означает "пиковая память", но скорее всего - это максимальное значение памяти, которое процесс занимал в физической оперативке. Это совсем не соответствует тому, сколько процесс реально взял памяти у системы.

32 разрядному процессу доступно в теории 4Гб. На практике - больше 2Гб (а в подавляющем большинстве случаев - 1,5Гб) получить не удается.

9 минут назад, Камышев Александр сказал:

ок обработали, работаем дальше,

Пардон, а как работать дальше? На каждый чих, на почти каждую строчку кода требуется выделение памяти. Где ее взять, если она закончилась?

Правильно процесс вылетает.

Ссылка на комментарий
  • 0
29 минут назад, kami сказал:

Я бы не стал доверять стандартному диспетчеру задач. Не знаю, что означает "пиковая память", но скорее всего - это максимальное значение памяти, которое процесс занимал в физической оперативке. Это совсем не соответствует тому, сколько процесс реально взял памяти у системы.

означает "Пиковый рабочий набор", Process Explorer показал Private Bytes 100м и Working Set 63м.

Может я не туда смотрю... где гектары?

Полагаю ограничение все-таки размер стэка, т.е. размер резервируемых адресов там превышен.

Изменено пользователем Камышев Александр
Ссылка на комментарий
  • 0
37 минут назад, Камышев Александр сказал:

инди создает срэд перед вызовом OnCommandGet, есть HeadersAvailable но оттуда редиректнуть нельзя, а смысл перенаправлять если ресурсы уже заняты...

MaxConnections есть ограничение в IdHTTTPServer, и да, разумная цифра 1024 там не будет лишней.

Смысл в том что в комбинации с IdHTTPServer1.KeepAlive:=False, код ниже будет сразу освобождать ресурсы, т.е. количество соединений не превысит 1024 плюс несколько десятков отправленных в редирект с последующим отключением.

procedure TForm1.IdHTTPServer1CommandGet(AContext: TIdContext;
  ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo);
begin
  if CurrentConnectionCount>1024 then
    AResponseInfo.Redirect('sadas.asdasf.com');
end;

 

Ссылка на комментарий
  • 0
  • Администраторы

Есть примитивы синхронизации блокирующие только запись. Запись делают потокобезопасным. В этом случае не будет лока данных на чтение из других потоков, если запись не залочена. В отличии от критической секции, которая всегда лочит данные.

Еще есть LockFree концепция, которая сводит почти к минимому использование блокировок. Главная идея - это использование атомарных функций и специальных флагов помечающих актуальность данных.

Ссылка на комментарий
  • 0
Цитата

Вы можете попробовать втиснуть больше потоков в ваш процесс уменьшением начального размера стека - вы можете сделать это либо указанием размера в заголовке PE-файле, либо вручную указав размер стека в функции CreateThread, как указано в MSDN:
1
h := CreateThread(nil, 4096, @ThreadProc, nil, STACK_SIZE_PARAM_IS_A_RESERVATION, id);

Пытаюсь изменить код в исходниках System.Classes.pas:

constructor TThread.Create(CreateSuspended: Boolean);
-//-
{$IF Defined(MSWINDOWS)}
  
//#define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000
//#define CREATE_SUSPENDED                  0x00000004
    
	//FHandle := BeginThread(nil, 0, @ThreadProc, Pointer(Self), CREATE_SUSPENDED, FThreadID);
    FHandle := BeginThread(nil, 65536, @ThreadProc, Pointer(Self), $00010004, FThreadID); // изменения тут

    if FHandle = 0 then
      raise EThread.CreateResFmt(@SThreadCreateError, [SysErrorMessage(GetLastError)]);

компилируется, создаю System.Classes.hpp, однако нет эффекта совсем,

удалял совсем стоку создания потока BeginThread - никакого эффекта, видимо rtl не так просто изменить.

как применить изменения?

Ссылка на комментарий
  • 0

:D заработало:D

3600 запросов в секунду, пока не падает, в клиенте время запроса 0,7 секунды поднялось с 0,1 и 70% процессорного времени

попробую теперь без обращения к пулу, без критических секций, как время ответа измениться и до краша догнать

Изменено пользователем Камышев Александр
Ссылка на комментарий
  • 0
//-----------------------------------------------------------------------

void __fastcall TdmSkyUpdate::serverCommandGet(TIdContext *AContext,
				TIdHTTPRequestInfo *ARequestInfo, TIdHTTPResponseInfo *AResponseInfo)
{
strCommandData cd;
#ifdef AUX_MODE
	#ifdef MY_DEBUG_MODE
		cd.log_queue.push_back( ARequestInfo->RawHTTPCommand ); // debug
		cd.log_queue.push_back( ARequestInfo->RawHeaders->Text ); // debug
	#endif
	MyTimer timer; timer.Start();
#endif
if ( db_queue_size >= max_db_queue_size )
	{
		AResponseInfo->ResponseNo = 503;
		#ifdef MY_DEBUG_MODE
			cd.log_queue.push_back( "Переполнение очереди в бд, ограничение "
				+ (AnsiString)max_db_queue_size ); // debug
			PushToLogQueue( &cd.log_queue );
			TIdNotify::NotifyMethod( OnLogQueue );
		#endif
		return;
	}
cd.req = ARequestInfo; cd.resp = AResponseInfo;
cd.user = cd.req->AuthUsername; cd.pass = cd.req->AuthPassword;
AResponseInfo->CloseConnection = ARequestInfo->Connection.LowerCase() != "keep-alive";
switch ( ARequestInfo->CommandType )
	{
		case hcGET: GetCommand( &cd ); break;
		case hcPOST: PostCommand( &cd ); break;
		case hcHEAD: HeadCommand( &cd ); break;
		case hcPUT: break;
	}
#ifdef MY_DEBUG_MODE
PushToLogQueue( &cd.log_queue );
#endif
#ifdef AUX_MODE
	int req_time = timer.GetTimeMSec();
	if ( stat.dev_req_time < req_time ) Interlocked->Exchange( stat.dev_req_time, req_time );
#endif
}
//---------------------------------------------------------------------------

 

Изменено пользователем Камышев Александр
Ссылка на комментарий
  • 0
//---------------------------------------------------------------------------

void TdmSkyUpdate::PostCommand( strCommandData *cd )
{
bool is_client = false;
if ( !cd->req->AuthExists )
	{
		cd->sx_auth = cd->req->RawHeaders->Values["X-SX-auth"];
		if ( cd->sx_auth.IsEmpty() ) { cd->resp->ResponseNo = 401; return; }
		is_client = true;
	}
if ( !is_client )
	{
		strDeviceCmd dc; dc.cd = cd;
		cs_pool->Enter();
		PostDevice( &dc );
		cs_pool->Leave();
		if ( !dc.db_queue.size() ) return;
		if ( !PushToDBQueue( &dc.db_queue ) ) { cd->resp->ResponseNo = 500; return; }
	}
else
	{
		strClientCmd cc; cc.cd = cd;
		cs_pool->Enter();
		PostClient( &cc );
		cs_pool->Leave();
		if ( !cc.db_queue.size() ) return;
		if ( !PushToDBQueue( &cc.db_queue ) ) { cd->resp->ResponseNo = 500; return; }
		if ( cc.file_data.data ) SaveTaskFile( &cc.file_data, cc.file_id );
	}
}
//---------------------------------------------------------------------------

 

Ссылка на комментарий
  • 0

думаю эти два куска показывают основную обработку OnCommandGet,

из хедера:

typedef std::deque< strDBQueueMember* > db_queue_deque;
db_queue_deque db_queue;
TCriticalSection *cs_pool, *cs_queue, *cs_files;
TInterlocked *Interlocked;

3600 это запрос без обращения к бд, только данные из пула с критическими секциями 

ну и вот это обязательно в System.Classes.pas  в Embarcadero\Studio\17.0\source\rtl\common\

constructor TThread.Create(CreateSuspended: Boolean);
-//-
{$IF Defined(MSWINDOWS)}
  
//#define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000 - это для информации
//#define CREATE_SUSPENDED                  0x00000004 - это для информации
    // заменить  FHandle := BeginThread(nil, 0, @ThreadProc, Pointer(Self), CREATE_SUSPENDED, FThreadID);
    // на
	FHandle := BeginThread(nil, 65536, @ThreadProc, Pointer(Self), $00010004, FThreadID);

System.Classes.pas добавить в проект.

и будет вам счастье :)

Изменено пользователем Камышев Александр
Ссылка на комментарий
  • 0

А если  TIdHTTPServer заменить на TIdTCPServer, интересно на сколько увеличится быстродействие? И еще вопрос - по ядрам процессора нагрузка распределяется равномерно или грузит только одно ядро в вашем случае?

 

Ссылка на комментарий
  • 0
9 минут назад, Евгений Корепов сказал:

А если  TIdHTTPServer заменить на TIdTCPServer, интересно на сколько увеличится быстродействие? И еще вопрос - по ядрам процессора нагрузка распределяется равномерно или грузит только одно ядро в вашем случае?

 

Не думаю что сколько нибудь заметно, если брать http пакеты, ведь TIdHTTPServer наследник от TIdCustomTCPServer. Если пересылать бинарные массивы, без парсинга хедеров - возможно и будет небольшое увеличение.

Был приятно удивлен, по всем ядрам ровная нагрузка.

Изменено пользователем Камышев Александр
Ссылка на комментарий
  • 0
В 22.09.2016 в 12:01, kami сказал:

На Indy, которые генерят на каждый запрос отдельный поток? Хммм....Несколько тысяч потоков, инициализация каждого - затратная операция с т.з. ОС... ну да, ну да...

результаты краш-теста - упал в "Недостаточно памяти" после 5770 запросов в секунду, как бы и ничего так :) 

С обращением к пулу - 3к и без задержек на доступ к критическим секциям 5к запросов с задержкой 200 мсек. - непринужденно, конечно после допила системного TThread, беда кстати была не в инди, а в теплом ламповом borland.

тестировалось на ПК разработчика, сервер и эмулятор на localhost:

2017-01-13_15-58-44.png

Изменено пользователем Камышев Александр
Ссылка на комментарий
  • 0

Как-то вы поступили очень грубо :)

В ваших целях достаточно было воспользоваться директивами препроцессора:

http://docs.embarcadero.com/products/rad_studio/delphiAndcpp2009/HelpUpdate2/EN/html/devcommon/compdirsmemoryallocsizes_xml.html

 

type
  TTh = class(TThread)
  protected
    procedure Execute; override;
  end;

implementation

procedure Do;
var
  I: Integer;
begin
  for I := 1 to 5000 do
    TTh.Create(False);
end;

{ TTh }

{$M 16384, 65535}
procedure TTh.Execute;
begin
  while True do
    Sleep(50);
end;

 

Ссылка на комментарий
  • 0
В 13.01.2017 в 23:59, egorea1999 сказал:

 

Как-то вы поступили очень грубо :)

 

Бывает B)

Про директивы не знал, теперь буду в курсе, так изящнее, спасибо, но увы и ах:

The memory allocation directives are meaningful only in a program. They should not be used in a library or a unit.

т.е. в System.Classes.pas и в IdThread.pas вставлять нельзя, ибо should not be used in a library or a unit.:( Indy сама создает потоки в коде библиотеки IdCustomTCPServer.pas.

и еще {$M 16384, 65535} - полагаю размеры указаны неверно, от 64К до 1М, возможно я ошибаюсь, вроде: гранулярность выделения адресного пространства равна 64 Кб, сдругой стороны из описания

Default  
{$M 16384,1048576}  

 

В общем, все таки брутальный подход, только хардкор...

Изменено пользователем Камышев Александр
Ссылка на комментарий
  • 0
Цитата

вот, отсюда поподробней, как реализовать пул потоков?

У Delphi же есть TThreadPool и  TTask. Также смотри TParallel.For

Т.е. можно сделать что то типа

 Pool := TThreadPool.Create;  

 Pool.SetMaxWorkerThreads(10);

 TTask.Run(
    procedure
    begin
      ...

      TThread.Synchronize(nil,
        procedure
        begin
          что то делаем в GUI 
        end);
    end, Pool ); 

 

Или вот код из реального проекта.

Качаем файлы в разных потоках.

Pool := TThreadPool.Create;  
Pool.SetMaxWorkerThreads(10);

TParallel.For(0, FMembersList.Count - 1,
    procedure(Idx: Integer)
    var
      AHTTP: TIdHTTP;
    begin
      if not FileExists(FMembersList[Idx].FileName) then
      begin
        AHTTP := TIdHTTP.Create(nil);
        try
          AHTTP.HandleRedirects := true;
          FStreamList[Idx].Clear;
          AHTTP.Get(FMembersList[Idx].URL, FStreamList[Idx]);
        finally
          FreeAndNil(AHTTP);
        end;

        TThread.Queue(TThread.CurrentThread,
          procedure
          var
            AItem: Integer;
          begin
            ...
            end;
            Log.d('Current Thread = ' + inttostr(Idx));
          end);
      end;
    end, Pool); 

 

Изменено пользователем ENRGY
Ссылка на комментарий

Присоединяйтесь к обсуждению

Вы можете написать сейчас и зарегистрироваться позже. Если у вас есть аккаунт, авторизуйтесь, чтобы опубликовать от имени своего аккаунта.

Гость
Ответить на вопрос...

×   Вставлено с форматированием.   Вставить как обычный текст

  Разрешено использовать не более 75 эмодзи.

×   Ваша ссылка была автоматически встроена.   Отображать как обычную ссылку

×   Ваш предыдущий контент был восстановлен.   Очистить редактор

×   Вы не можете вставлять изображения напрямую. Загружайте или вставляйте изображения по ссылке.

×
×
  • Создать...