Re: active object pattern
От: MaximE Великобритания  
Дата: 12.02.05 20:07
Оценка: 17 (6)
gid_vvp wrote:

> Необходимо сделать так чтобы некий объект (класс) полностью жил в отдельном потоке, т.е. чтобы все вызовы функций данного класса происходили в контексте его потока, даже если они вызываются из других потоков.


Паттерн называется active object, можешь google по этой фразе.

Основная сложность реализации заключается в передаче аргументов асинхронного вызова и обратно результата вызова между потоками.

Если аргументы и результат передаются по ссылке (указателю) то необходимо гарантировать, что объект, на который ссылаются, не был разрушен до того момента, как его начнут использовать в другом потоке. Проше всего такую гарантию обеспечить при помощи умных указателей с семантикой shared ownership, как, например, boost::shared_ptr или boost::intrusive_ptr.

boost содержит достаточно примитивов, чтобы быстренько набросать портабельную реализацию active object. Нам понадобятся thread, function, bind.

Классы поддержки:
////////////////////////////////////////////////////////////////////////////////////////////////

#include <queue>

#include "boost/thread/thread.hpp"
#include "boost/thread/mutex.hpp"
#include "boost/thread/condition.hpp"
#include "boost/thread/xtime.hpp"
#include "libs/thread/src/timeconv.inl"

#include "boost/function.hpp"
#include "boost/bind.hpp"

////////////////////////////////////////////////////////////////////////////////////////////////

void sleep(int ms)
{
     boost::xtime delay;
     to_time(ms, delay);
     boost::thread().sleep(delay);
}

////////////////////////////////////////////////////////////////////////////////////////////////

class worker_thread : private boost::noncopyable
{
public:
     typedef boost::function<void()> job;

public:
     worker_thread()
         : stop_(false)
         , thr_(boost::bind(&worker_thread::thread_fun, this))
     {}

     ~worker_thread()
     {
         {
             boost::mutex::scoped_lock l(mtx_);
             stop_ = true;
         }
         cnd_.notify_one();
         thr_.join();
     }

     void queue(job const& j)
     {
         {
             boost::mutex::scoped_lock l(mtx_);
             jobs_.push(j);
         }
         cnd_.notify_one();
     }

private:
     void thread_fun()
     {
         job j;
         while(true)
         {
             {
                 boost::mutex::scoped_lock l(mtx_);

                 while(!stop_ && jobs_.empty())
                     cnd_.wait(l);
                 if(stop_)
                     break;

                 jobs_.front().swap(j);
                 jobs_.pop();
             }
             j();
         }
     }

private:
     boost::mutex mtx_;
     boost::condition cnd_;
     std::queue<job> jobs_;

     bool stop_;
     boost::thread thr_;
};

////////////////////////////////////////////////////////////////////////////////////////////////

template<class T>
class async_result : private boost::noncopyable
{
public:
     async_result(T const& t = T())
         : result_(t)
         , ready_(false)
     {}

     ~async_result()
     {
         static_cast<void>(this->result()); // wait till it has been used by an active object
     }

     void set(T const& t)
     {
         {
             boost::mutex::scoped_lock l(mtx_);
             result_ = t;
             ready_ = true;
         }
         cnd_.notify_all();
     }

     bool ready()
     {
         boost::mutex::scoped_lock l(mtx_);
         return ready_;
     }

     T result()
     {
         boost::mutex::scoped_lock l(mtx_);
         while(!ready_)
             cnd_.wait(l);
         return result_;
     }

private:
     boost::mutex mtx_;
     boost::condition cnd_;
     T result_;
     bool ready_;
};

////////////////////////////////////////////////////////////////////////////////////////////////

template<class T>
struct active_object : private boost::noncopyable
{
     active_object(T const& t = T())
         : object(t)
     {}

     T object;
     worker_thread thread;
};

////////////////////////////////////////////////////////////////////////////////////////////////


Использование:
struct some
{
     void foo() { printf("%s\n", __FUNCTION__); sleep(1000); }
     int bar(int n) { printf("%s\n", __FUNCTION__); sleep(1000); return n * 2; }
};

struct some_proxy
{
     some_proxy(active_object<some>& a)
         : a_(&a)
     {}

     void foo()
     {
         a_->thread.queue(boost::bind(&some::foo, &a_->object));
     }

     void bar(int n, async_result<int>* r)
     {
         a_->thread.queue(boost::bind(&async_result<int>::set, r, boost::bind(&some::bar, &a_->object, n)));
     }

     active_object<some>* a_;
};

int main()
{
     active_object<some> a;
     some_proxy p(a);

     printf("calling foo\n");
     p.foo();

     async_result<int> r;
     printf("calling bar\n");
     p.bar(1, &r);
     printf("calling bar done: %d\n", r.result());
}


Небольшая проблема связана с реализацией прокси объектов: руками не очень хочется дублировать интерфейс active object, но просто по данному определению класса существующими средствами языка и препроцессора сгенерить реализацию прокси затруднительно. Как вариант, можно избавиться от прокси объектов вообще и нагенерить несколько перегрузок ф-ций упаковки аргументов вызова с разным числом параметров, как для boost::bind. Тогда синтаксис вызова будет выглядеть так:

template<class T, class F, class R, class A1>
void async_call(active_object<T>& a, F f, A1 a1, async_result<R>* r)
{
     a.thread.queue(boost::bind(&async_result<int>::set, r, boost::bind(f, &a.object, a1)));
}

//...

     async_result<int> s;
     printf("calling bar\n");
     async_call(a, &some::bar, 2, &s);
     printf("calling bar done: %d\n", s.result());


--
Maxim Yegorushkin

Those who do not understand Unix are condemned to reinvent it, poorly. © Henry Spencer
Posted via RSDN NNTP Server 1.9
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.