*** ВНИМАНИЕ: Блог переехал на другой адрес - demin.ws ***

вторник, 17 февраля 2009 г.

Автоматический мьютекс

Описанный мной ранее класс Mutex является базовым механизмом синхронизации потоков при параллельном программировании и применяется сплошь и рядом.

Часто бывают случаи, когда несколько функций реализуют какую-то единую функциональность, построенную на общем разделяемом ресурсе, защищенном блокировкой. В этом случае каждая функция в начале работы занимает эту блокировку, а на выходе — освобождает ее. Например, методы класса-регистратора системных событий все работают с выходным буфером и используют единую блокировку для синхронизации доступа к нему. Например:

class Logger {
public:
...
void put(const char* str) {
__lock.Lock();
__buffer.push_back(str);
__lock.Unlock();
}
void flush() {
__lock.Lock();
...
__buffer.clear();
__lock.Unlock();
}
...
private:
Mutex __lock;
std::vector<std::string> __buffer;
}
В целом, такой подход является не совсем правильным, так как данные методы могут быть весьма сложными, иметь многочисленные условные операторы, могут генерировать исключения. В этом случае программисту необходимо позаботиться о всех возможных вариантах завершения каждой функции и везде вставить оператор освобождения блокировки:
__lock.Unlock();
Если этого не сделать, то неосвобожденная по какой-то редко возникающей причине блокировка может просто "подвесить" всю программу, так как все остальные функции, работающие с этой блокировкой, более никогда не получат управления.

К счастью, в С++ есть механизм, дающий возможность очень просто избежать подобных проблем, вывозом кода освобождения блокировки при любом варианте завершения функции. Механизм называется RAII (Resource Acquisition Is Initialization). В С++ деструкторы созданных в контексте функции объектов обязательно вызываются перед завершением контекста (попросту говоря, когда функция завершается любым способом). Если возникло непойманное в функции исключение, то в процессе раскрутки стека деструкторы созданных локальных объектов тоже будут вызваны. Отсюда и идея: занимать блокировку в конструкторе созданного в функции локального объекта и затем освобождать ее в деструкторе. Использование такого метода позволило бы изменить приведенный пример так:

class Logger {
public:
...
void put(const char* str) {
AutoLock(__lock);
__buffer.push_back(str);
}
void flush() {
AutoLock(__lock);
...
__buffer.clear();
}
...
private:
Mutex __lock;
std::vector<std::string> __buffer;
}
Объект AutoLock, создаваемый первым в контексте каждой функции, будет занимать блокировку и освобождать ее при закрытии этого контекста.

Идея проста и понятна, а класс, реализующий эту логику еще проще.

Пространство имен ext можно заменить по вкусу на подходящее вам.
Файл autolock.h:
#ifndef _EXT_AUTOLOCK_H
#define _EXT_AUTOLOCK_H

#include "mutex.h"

namespace ext {

class AutoLock {
public:
// Запираем блокировку в конструторе
AutoLock(Mutex& lock) : __lock(lock) {
__lock.Lock();
}

// Освобождаем блокировку в деструкторе
~AutoLock() {
__lock.Unlock();
}
private:
// Защита от случайного копирования
AutoLock(const AutoLock&);
void operator=(const AutoLock&);

Mutex& __lock;
};

} // ext

#endif
Данный класс использует реализацию блокировки (мьютекса) Mutex.

Посмотрим, как оно будет в деле (конечно с помощью unit-тестирования).

Традиционно, для компиляции тестов нам нужна Google Test Framework. Как я уже писал, вы можете скачать мою модификацию этой библиотеки, которая сокращена без потери какой-либо функциональности до двух необходимых файлов gtest/gtest.h и gtest-all.cc.
Файл autolock_unittest.cpp:
#include "gtest/gtest.h"

#include "autolock.h"
#include "mutex.h"
#include "thread.h"

// Универсальная задержка в миллисекундах для Windows и UNIX
#ifdef WIN32
#include <windows.h>
#define msleep(x) Sleep(x)
#else
#include <unistd.h>
#define msleep(x) usleep((x)*1000)
#endif

// Тестовый поток
class T: public ext::Thread {
public:
// Параметры потока:
// flag - флаг для сигнализации о выполненном действии
// mutex - рабочий объект-блокировка
// timeout - время, которое необходимо подождать после
// установки флага
// val - значение, в которое надо установить флаг
T(volatile int& flag, ext::Mutex& mutex, int timeout, int val) :
__flag(flag), __mutex(mutex), __timeout(timeout), __val(val)
{}

// Функция потока: занять автоматическую блокировку, установить
// флаг, подождать указанное время, освободить автоматическую
// блокировку.
virtual void Execute() {
ext::AutoLock locker(__mutex);
__flag = __val;
msleep(__timeout);
}
private:
volatile int& __flag;
ext::Mutex& __mutex;
int __timeout;
int __val;
};

// Данный тест выполняет параллельно две функции, которые конкурируют
// за одну блокировку. Функция-поток 'a' занимает блокировку, устанавливает
// флаг в 1, ждет 100мс и затем освобождает блокировку. Функция-поток 'b'
// стартует, когда поток 'a' уже занял блокировку, поэтому после старта
// потока 'b' флаг еще некоторое время будет равен 1, пока поток 'a' не
// отпустит блокировку, и затем поток 'b' изменит флаг в 0, получив
// управление ожидания на блокировке.
TEST(AutoLock, ConcurrentCalls) {
volatile int flag = 0;

ext::Mutex mutex;

T a(flag, mutex, 100, 1);
T b(flag, mutex, 0, 0);

// Запускаем поток 'a'.
a.Start();
// Ждем, пока поток 'a' займет блокировку.
// Это случится, когда флаг станет равен 1.
while (!flag);

// Запускаем поток 'b'.
b.Start();
// Ждем немного, чтобы убедиться, что поток запустился
// и дошел до попытки занять блокировку.
msleep(50);

// Так как время задержки в потоке 'a' больше 50мс,
// то флаг все еще равен 1, так как поток 'a' пока не отпустил
// блокировку, не давая потоку 'b' получить управление
// и изменить флаг в 0.
EXPECT_EQ(1, flag);

// Ждем завершения потока 'a' (блокировка должна быть
// отпущена при его завершении.
a.Join();

// Ждем завершения потока 'b', который к своему завершению
// должен обнулить флаг.
b.Join();
EXPECT_EQ(0, flag);
}
Для компиляции нам также понадобятся файлы mutex.h (класс Mutex), thread.cpp и thread.h (класс Thread).

Файл для запуска тестов runner.cpp:

#include "gtest/gtest.h"
int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Компилируем:

Visual Studio:

cl /EHsc /I. /Feautolock_unittest_vs2008.exe /DWIN32 runner.cpp autolock_unittest.cpp thread.cpp gtest\gtest-all.cc
Cygwin:
g++ -I. -o autolock_unittest_cygwin.exe runner.cpp autolock_unittest.cpp thread.cpp gtest/gtest-all.cc
Запускаем:
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from AutoLock
[ RUN ] AutoLock.ConcurrentCalls
[ OK ] AutoLock.ConcurrentCalls
[----------] Global test environment tear-down
[==========] 1 test from 1 test case ran.
[ PASSED ] 1 test.
Работает, что приятно. Тест работает как ожидалось. 
Не забудьте включить файл autolock_unittest.cpp в тестовый набор вашего проекта. Не тратьте время на вылавливание неожиданных глюков тогда, когда вы уже порядком подзабыли, как тут все работает. Пусть ловлей глюков занимается тест, автоматизировано.


Другие посты по теме:

6 комментариев:

  1. Получается, что тест будет отрабатывать не быстрее, чем за 100 мс. Медленно - сотня таких тестов и тесты уже запускать слишком накладно будет.
    Думаю вот - можно ли этот тест сделать без Sleep? Например, использовать Event-ы для посылки сигналов в тех местах, где Sleep. Поток A занимает мьютекс и шлет эвент основному потоку. И начинает ждать другого события, что поток B запущен.
    Основной поток запускает поток B (вначале Execute в B можно послать событие в основной поток, что B стартовал) и посылает событие в A, что он может продолжиться. A посылает событие в основной поток, что он собирается выйти и основной проверяет значение переменной и потом разрешает выйти.
    А потом срабатывает B и приходит событие от B, что переменная изменилась.

    бррр, сложная схема :)

    ОтветитьУдалить
  2. Согласен, 100мс на тест - это много. Вообще, вставлять какие-либо явные задержки в тест - это плохо. Обычно, тесты требующие явного времени я выношу из unit-тестирования в QA тестирование, которое не запускается при каждом билде. Но в это тесте, как ты правильно сказал, необходимо иметь механизм нотификации из одного потока другой о том, что достигнута какая-то точка в программе. Наверное это реализуемо через Eventы в виндах, но теперь осталось забацать аналогичное для юникса (и сопровождать обе версии ;-). Ну а как альтернатива явной нотификаци - это явные перекрывающие друг друга задержки. Это тоже не выход, по большому счету, так как если комьютер занят чем-то (например, параллельной компиляцией), то задержка может сильно удлиниться, и тест даст сбой. Пока я решил так: данный тест удовлетворяет почти всем условиям - тестов не так много, машина быстрая. Если что-то будет вылезать за рамки - будем решать. ;-) Как, говориться, голова не должна отключаться даже после написания теста. ;-)

    ОтветитьУдалить
  3. Пока я решил так: данный тест удовлетворяет почти всем условиям - тестов не так много, машина быстрая. Если что-то будет вылезать за рамки - будем решать. ;-) Как, говориться, голова не должна отключаться даже после написания теста. ;-)

    Правильный подход и я не "проповедник книжных подходов", понимаю, что компромисы нужны.
    Просто было бы интересно понять, как можно делать такие тесты потоков без Sleep. Ибо я и сам Sleep использую в них :) А хочется избавиться.

    ОтветитьУдалить
  4. Прочёл сегодня этот пост и увидел ошибку.

    void put(const char* str) {
    AutoLock(__lock);
    __buffer.push_back(str);
    }
    void flush() {
    AutoLock(__lock);
    ...
    __buffer.clear();
    }

    В этом коде будет race, т.к. безымянные переменные, создаваемые выражением "AutoLock(__lock)", будут уничтожены немедленно по достижении ";", а вовсе не в конце блока. То есть мьютекс будет захвачен, а затем немедленно отпущен.

    Это очень опасная ошибка -- один из главных недостатков автоматического мьютекса.

    Вот цитата из стандарта, на всякий случай:

    Temporary objects are destroyed as the last step in evaluating the full-expression (1.9) that (lexically) contains the point where they were created.

    ОтветитьУдалить
  5. Да, тут жесткая опечатка. Правильно надо писать не "AutoLock(__lock)", а "AutoLock guard(__lock)", чтобы создавать не временный объект, а нормальный, живущий до конца функции.

    ОтветитьУдалить
  6. Самое печальное, что такие опечатки нередки и в промышленном коде. Их не видно, компилятор глотает не морщась, и раз в полгода у кастомеров случается развал по причине битой памяти...

    ОтветитьУдалить