Использование почтовых ящиков для связи между процессами - Классы CQueuedMailslotWriter и CAsyncMailslotReader

ОГЛАВЛЕНИЕ

CQueuedMailslotWriter

Этот класс реализует очередь сообщений и использует фоновый поток для записи их в экземпляр CSyncMailslotWriter. Класс реализует принцип сообщений с высоким приоритетом и сообщений с нормальным приоритетом. Есть одна очередь для каждого вида сообщений и, разумеется, сообщения в высокоприоритетной очереди отправляются первыми. Вызывающая функция задает приоритет сообщения, когда вызывает метод Write(). При желании можно расширить этот механизм до любого нужного числа уровней приоритета, но на деле достаточно двух уровней. Расширение уровней приоритета вызывает снижение производительности. Класс выглядит так:

class CQueuedMailslotWriter : public CSyncMailslotWriter
{
    class CQueuedData
    {
    public:
        CQueuedData(BYTE *pbData, DWORD dwDataLength);
        ~CQueuedData();

        DWORD       Length() const      { return m_dwDataLength; }
        BYTE        *Data() const       { return m_pbData; }

    private:
        BYTE        *m_pbData;
        DWORD       m_dwDataLength;
    };

    typedef deque<CQUEUEDDATA *> DATAQUEUE;
    typedef DATAQUEUE::const_iterator DQITER;

public:
                    CQueuedMailslotWriter(void);
    virtual         ~CQueuedMailslotWriter(void);

    virtual bool    Write(BYTE *pbData, DWORD dwDataLength,
                          BOOL bImportant);
    virtual bool    Connect(LPCTSTR szSlotname,
                            LPCTSTR szServername = _T("."));

private:
    static unsigned __stdcall ThreadStub(LPVOID data);
    virtual void    ThreadProc(CBaseThread *pThread);
    void            StopThread();

    HANDLE          m_hStopEvent,
                    m_hSignalEvent,
                    m_haSignal[2];
    CInterruptibleMutex m_imMutex;
    CBaseThread     *m_pThread;
    volatile bool   m_bStop;
    DATAQUEUE       m_highPriorityDataQueue,
                    m_normalPriorityDataQueue;
};

Основные дополнения – закрытый класс CQueuedData и несколько связанных с потоками переменных. Класс CQueuedData – просто удобный способ сохранения данных, передаваемых в каждом вызове метода Write(). Метод Write() упаковывает переданные данные и добавляет их в очередь. Позже метод ThreadProc() выведет данные из очереди и передаст их методу Write() базового класса. CQueuedMailslotWriter::Write() выглядит так:

//  Записывает сообщение в почтовый ящик. Он помещает сообщение в очередь
//  для почтового ящика, а запись фактически производит фоновый поток
bool CQueuedMailslotWriter::Write(BYTE *pbData, DWORD dwDataLength,
                                  BOOL bImportant)
{
    assert(pbData);
    assert(dwDataLength);

    //  Если почтовый ящик закрыт, пытаемся вновь подключиться к нему
    if (!IsOpen() && m_pszSlotname != LPTSTR(NULL))
        CSyncMailslotWriter::Connect();

    if (IsOpen())
    {
        //  Сперва захватываем мьютекс. Вы должны иметь мьютекс перед
        //  попыткой создания объекта QueuedData, иначе вы
        //  повредите кучу программы или подвесите блокировку кучи.
        if (m_imMutex.AquireMutex(m_hStopEvent) ==
                                  CInterruptibleMutex::eMutexAquired)
        {
            CQueuedData *pqData = new CQueuedData(pbData, dwDataLength);

            assert(pqData);

            if (bImportant)
                //  Сообщение с высоким приоритетом, помещаем его в высокоприоритетную
                //  очередь
                m_highPriorityDataQueue.push_back(pqData);
            else
                //  Сообщение с нормальным приоритетом, помещаем его в очередь
                //  с нормальным приоритетом
                m_normalPriorityDataQueue.push_back(pqData);

            m_imMutex.ReleaseMutex();

            //  Оповещаем поток обработчика очереди...
            SetEvent(m_hSignalEvent);
            return true;
        }
    }

    return false;
}

Как и с методом CSyncMailslotWriter::Write(), этот класс сначала проверяет, что подключение к почтовому ящику открыто. Если нет, то он пытается вновь подключиться к почтовому ящику. Если он имеет то, что, по его мнению, является действительным подключением к почтовому ящику, он двигается вперед и добавляет сообщение в очередь. Если нет –  он отбрасывает сообщение. Если класс добавил сообщение в очередь – он устанавливает событие, сообщающее фоновому потоку, что пришло новое сообщение и нуждается в отправке. Метод Write() и процедура потока совместно используют одни и те же очереди данных, поэтому они должны реализовывать безопасный метод добавления и удаления записей из очереди. Они делают это путем использования общего мьютекса (взаимная блокировка) (фактически, экземпляр класса CInterruptibleMutex)

Поток выглядит так:

void CQueuedMailslotWriter::ThreadProc(CBaseThread *pThread)
{
    CQueuedData *pqData;
    DQITER      pdqIterator;
    bool        bQueuePriority;

    while (!pThread->Stop())
    {
        switch (WaitForMultipleObjects(2, m_haSignal, FALSE, INFINITE))
        {
        case WAIT_OBJECT_0:
            //  Приказано остановиться, поэтому останавливаемся
            break;

        case WAIT_OBJECT_0 + 1:
            //  Захватываем мьютекс, прежде чем войти в цикл
            if (m_imMutex.AquireMutex(m_hStopEvent) !=
                                      CInterruptibleMutex::eMutexAquired)
                //  Приказано остановиться, поэтому останавливаемся...
                break;

            //  Новое сообщение добавлено в очередь, отправляем его
            while ((m_highPriorityDataQueue.size() ||
                    m_normalPriorityDataQueue.size())
                   && !pThread->Stop())
            {
                //  Продолжаем выполнять цикл, пока очередь не опустеет, или
                //  не прикажут остановиться.
                if (m_highPriorityDataQueue.size())
                {
                    pdqIterator = m_highPriorityDataQueue.begin();
                    bQueuePriority = false;
                }
                else
                {
                    pdqIterator = m_normalPriorityDataQueue.begin();
                    bQueuePriority = true;
                }

                pqData = *pdqIterator;

                //  Пока закончили, снимаем мьютекс, чтобы дать другим потокам
                //  возможность поместить данные в очередь.
                m_imMutex.ReleaseMutex();
               
                if (CSyncMailslotWriter::Write(pqData->Data(),
                        pqData->Length()) == pqData->Length())
                {
                    //  Снова захватываем мьютекс, чтобы удалить сообщение
                    //  из очереди
                    if (m_imMutex.AquireMutex(m_hStopEvent) ==
                                  CInterruptibleMutex::eMutexAquired)
                    {
                        //  Флаг bQueuePriority сообщает, из какой очереди
                        //  извлечено сообщение. Здесь нельзя использовать
                        //  размер очереди, так как очереди могли измениться
                        //  за время между текущим моментом и моментом
                        //  извлечения сообщения для отправки.
                        if (bQueuePriority == false)
                            m_highPriorityDataQueue.pop_front();
                        else
                            m_normalPriorityDataQueue.pop_front();

                        delete pqData;
                        continue;
                    }
                }
                else
                    //  Не удалось записать сообщение, оставляем его в очереди
                    //  и прерываем цикл писателя
                    break;
            }

            //  Цикл завершен, поэтому снимаем мьютекс.
            m_imMutex.ReleaseMutex();
            break;
        }
    }

    // Убеждаемся, что любой ожидающий ввод-вывод отменен, перед выходом из потока.
    if (IsOpen())
        CancelIo(m_hMailSlot);
}

Он ждет, пока ему не сообщат о приходе нового сообщения. Когда сообщение приходит, он сначала проверяет высокоприоритетную очередь, а затем очередь с нормальным приоритетом. В любом случае, он получает сообщение для отправки путем вызова CSyncMailslotWriter::Write(). Главное в этой процедуре - блокировка. Когда ему сообщают, что сообщение ждет отправки, он блокирует очередь сообщений, захватывая мьютекс. Он извлекает сообщение из одной или другой очереди и снимает мьютекст перед попыткой отправкой сообщения. После отправки сообщения он снова захватывает мьютекс, чтобы безопасно удалить сообщение из очереди. Цикл усложняется необходимостью следить, чтобы мьютекс был захвачен перед проверкой размеров и очередей, и поскольку размеры очередей являются управляющими переменными для цикла while, это означает больше вызовов для захвата мьютекса, чем ожидалось. Так как вызов CSyncMailslotWriter::Write() может блокироваться, мы снимаем мьютекс перед осуществлением этого вызова. После того как вызов вернул управление, надо снова захватить мьютекс перед переходом к началу цикла while.

CAsyncMailslotReader

Данный класс реализует высокоуровневое использование читателя почтового ящика, работающего в отдельном потоке и передающего входящие сообщения клиенту посредством виртуальной функции OnMessage().

Класс выглядит так:

class CAsyncMailslotReader : public CSyncMailslotReader
{
public:
                    CAsyncMailslotReader();
    virtual         ~CAsyncMailslotReader();

    virtual bool    OnMessage(BYTE *pbMessage, DWORD dwMessageLength) = 0;
    virtual bool    Connect(LPCTSTR szSlotName);

protected:
    static unsigned int __stdcall ThreadStub(LPVOID data);
    unsigned int    ThreadProc(LPVOID data);

    CBaseThread     *m_pThread;
};

Класс переопределяет виртуальный метод Connect(), чтобы создать контролирующий поток в момент, когда объект создает почтовый ящик. Connect() выглядит так:

bool CAsyncMailslotReader::Connect(LPCTSTR szSlotName)
{
    assert(szSlotName);
    bool bStatus = CSyncMailslotReader::Connect(szSlotName);

    if (bStatus)
    {
        //  Если почтовый ящик был успешно создан, запускается поток для
        //  контроля за ним.
        m_pThread = new CBaseThread(m_hStopEvent, &m_bStop, ThreadStub,
                                    false, this);
        assert(m_pThread);
    }

    return bStatus;
}

Что весьма просто. Процедура потока выглядит так:

unsigned int CAsyncMailslotReader::ThreadProc(LPVOID data)
{
    CBaseThread *pThread = (CBaseThread *) data;
    BYTE        *pbMessage;
    DWORD       dwMessageLength = 0;

    assert(pThread);

    while (!pThread->Stop())
    {
        //  Получаем и отправляем сообщения
        pbMessage = Read(dwMessageLength);

        if (dwMessageLength)
            OnMessage(pbMessage, dwMessageLength);
    }

    return 0;
}

Процесс просто сидит внутри вызова CSyncMailslotReader::Read(), ожидая следующего сообщения. Когда это сообщение приходит, поток вызывает виртуальный метод OnMessage(), который вы переопределяете, чтобы выполнять любую нужную для вашего приложения обработку. Заметьте, что метод CAsyncMailslotReader::OnMessage() является чисто виртуальным; вы не можете напрямую создать экземпляр CAsyncMailslotReader, но должны унаследовать свой собственный класс от него.

Знайте, что вызов вашего метода OnMessage() происходит в потоке, отличном от главного потока вашего приложения; это особенно важно помнить, если вы используете класс в приложении MFC(библиотека базовых классов Microsoft) и манипулируете CWnd из метода OnMessage().