boost::asio stream_handle && pipe::PIPE_READMODE_MESSAGE
От: Andrew S Россия http://alchemy-lab.com
Дата: 13.03.09 15:56
Оценка:
Всем привет.

Пытаюсь работать с азио и пайпами в режиме сообщений. Использую для этого stream_handle::async_read_some.
Когда буфер больше сообщения, оно принимается нормально, вызывается обработчик. Иначе — access violation (Unhandled exception at 0xfeeefeee).
В общем, портится overlapped.

Стек вызова (размер буфера 16 байт):
* feeefeee()
* boost::asio::detail::win_iocp_io_service::operation::do_completion(unsigned long last_error=0x000000ea, unsigned int bytes_transferred=0x00000010) Line 77 + 0x16 bytes C++
* boost::asio::detail::win_iocp_io_service::do_one(bool block=true, boost::system::error_code & ec={...}) Line 509 C++
* boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec={...}) Line 186 + 0xe bytes C++
* boost::asio::io_service::run() Line 58 + 0xf bytes C++

В общем, пришло ровно 16 байт, код ошибки MORE_DATA_AVAIL — что вполне нормально для режима сообщений. В этом случае надо просто повторять операции чтения, пока не придет success. К сожалению, до нас это богатство так и не добралось ввиду мусора в overlapped. Ощущение такое, что проблемы с синхронизацией, но тут весь IOCP в одном треде. Иногда "проходит" пара-тройка вызовов, но потом все опять заканчиывается ошибкой доступа.

В общем, буду благодарен за мысли по поводу. Код, воспроизводящий пример, приведен ниже (boost 1.38).

Спасибо!
--
Andrew Solodovnikov


#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <memory>

struct PipeServer
{
    PipeServer():m_hPipe(INVALID_HANDLE_VALUE), m_sName(NULL)
    {
    }
    BOOL Create(LPCTSTR szName, DWORD dwTimeout = 2000, DWORD dwBufSize = 4096)
    {
        m_bStop = FALSE;
        m_sName = _tcsdup(szName);
        if (m_sName)
        {
            m_dwTimeout = dwTimeout;
            m_dwBufSize = dwBufSize;
            InternalCreatePipe();
            if (m_hPipe != INVALID_HANDLE_VALUE)
                return TRUE;
            free(m_sName);
            m_sName = NULL;
        }
        return FALSE;
    }
    BOOL Accept(HANDLE &theStream)
    {
        if (m_hPipe != INVALID_HANDLE_VALUE)
        {
            if (ConnectNamedPipe(m_hPipe, NULL) || (GetLastError() == ERROR_PIPE_CONNECTED))
            {
                if (!m_bStop)
                {
                    theStream = m_hPipe;
                    InternalCreatePipe();
                    return TRUE;
                }
                VERIFY(::DisconnectNamedPipe(m_hPipe));
            }
            VERIFY(::CloseHandle(m_hPipe));
            m_hPipe = INVALID_HANDLE_VALUE;
        }
        return FALSE;
    }
    void Stop()
    {
        ::InterlockedExchange((long *)&m_bStop, TRUE);
        //  possible hang on ::CallNamedPipe - if another pipe connected between this calls...
        DWORD dwTemp;
        ::CallNamedPipe(m_sName, NULL, 0, NULL, 0, &dwTemp, NMPWAIT_NOWAIT);
    }
    void Close()
    {
        if (m_hPipe != INVALID_HANDLE_VALUE)
        {
            ::DisconnectNamedPipe(m_hPipe);
            ::CloseHandle(m_hPipe);
            m_hPipe = INVALID_HANDLE_VALUE;
        }
        free(m_sName);
        m_sName = NULL;
    }
private:
    void InternalCreatePipe()
    {
        m_hPipe = ::CreateNamedPipe( 
            m_sName,    // pipe name 
            PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,       // read/write access 
            PIPE_TYPE_MESSAGE |       // message type pipe 
            PIPE_READMODE_MESSAGE |   // message-read mode
            PIPE_WAIT,                // blocking mode 
            PIPE_UNLIMITED_INSTANCES, // max. instances  
            m_dwBufSize,                  // output buffer size 
            m_dwBufSize,                  // input buffer size 
            m_dwTimeout,             // client time-out 
            NULL                // no security attribute 
            );
    }
private:
    BOOL    m_bStop;
    LPTSTR    m_sName;
    HANDLE    m_hPipe;
    DWORD    m_dwTimeout;
    DWORD    m_dwBufSize;
};

#define BLOCK_SIZE 64

struct pipe_session
{
    pipe_session(boost::asio::io_service& io_service, HANDLE hClientHandle):
                        m_hClientHandle(io_service, hClientHandle)
    {
        io_service.post(boost::bind(&pipe_session::client_read, this));
    }
private:
    void client_read()
    {
        //boost::asio::async_read(m_hClientHandle,
        m_hClientHandle.async_read_some(
                                    boost::asio::buffer(&szBuffer[0], BLOCK_SIZE),
                                    boost::bind(&pipe_session::handle_client_read, this,
                                    boost::asio::placeholders::error,
                                    boost::asio::placeholders::bytes_transferred
                                    ));
    }
    void handle_client_read(const boost::system::error_code& error, size_t nReaded)
    {
        if (!error)
        {
            // end iteration
            int i = 0;
        }
        else
        {
            if (error.value() == ERROR_MORE_DATA)
            {
                //m_sbClientBuffer.commit(BLOCK_SIZE);
                client_read();
            }
        }
    }

private:
    boost::asio::windows::stream_handle m_hClientHandle;
    char szBuffer[4096];
};


void server_thread(PipeServer *server)
{
    boost::asio::io_service io_service;
    boost::asio::add_service(io_service, new boost::asio::windows::stream_handle_service(io_service));

    std::auto_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(io_service));

    boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
    HANDLE hStream;
    while (server->Accept(hStream))
    {
        new pipe_session(io_service, hStream);
    }
    work.reset(); // Allow run() to exit. 
    t.join();
}



int _tmain(int argc, TCHAR* argv[], TCHAR* envp[])
{

        PipeServer server;
        if (server.Create(_T("\\\\.\\pipe\\test_channel")))
        {
            try
            {
                {
                    boost::thread t(boost::bind(&server_thread, &server));

                    getch();
                    HANDLE hService = CreateFile(_T("\\\\.\\pipe\\test_channel"), GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, NULL);
                    if (hService != INVALID_HANDLE_VALUE)
                    {
                        DWORD mode = PIPE_READMODE_MESSAGE;
                        ::SetNamedPipeHandleState(hService, &mode, NULL, NULL);
                        std::vector<char> data(0xa5);
                        DWORD dw;
                        ::WriteFile(hService, &data.front(), data.size(), &dw, NULL);
                    }
                    getch();

                    server.Stop();
                    t.join();
                }
            }
            catch (std::exception& e)
            {
                std::cerr << "Exception: " << e.what() << "\n";
            }
            server.Close();
        }
        return 0;
}
http://www.rusyaz.ru/pr — стараемся писАть по-русски
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.