Allows an object (publisher) to notify interested parties (subscribers) when the publisher changes state.
In a distributed application, a server module often uses heartbeats to inform connected clients that the server is up and running. The following example applies the Observer pattern to implement a heart beat mechanism in which a server-side class (HeartbeatPublisher) periodically sends updates to connected clients. To keep the example simple, publisher and subscribers are implemented on the same machine. A heartbeat is then conveniently implemented as a call from HeartbeatPublisher (server-side publisher) to a member function in ConnectedClient (client-side subscriber).
In a real world application where server and client modules reside on different machines, a heartbeat is often implemented using DCOM or sockets or some other similar technology.
/* Publisher */
/* Subject (Publisher): Provides an interface for attaching/detaching subscribers (observers). It
also maintains a list of its observers. In this example, the publisher maintains a list of subscribers
that are interested in receiving a heartbeat */
class Publisher
{
// Constructors/Destructor
protected:
// protected: This class cannot be instantiated directly.
Publisher(){}
public:
virtual ~Publisher(){}
// Public interface
public:
void Attach( Subscriber* pSub);
void Detach( Subscriber* pSub);
void Notify( bool bReset );
// Data members
private:
std::set<Subscriber*> setSubscribers;
// std::set has no duplicates
};
/* Concrete Subject (Concrete Publisher): Maintains state of interest and sends notification to its
subscribers when its state changes */
class HeartbeatPublisher : public Publisher
{
// Constructors/Destructor
public:
HeartbeatPublisher();
~HeartbeatPublisher();
// Public interface
public:
static unsigned __stdcall Tick(
void* pParam );
// Data members
unsigned int uiThreadID;
unsigned long lThreadHandle;
HANDLE hEvent;
};
/* Add interested subscribers to an internal set */
void Publisher::Attach( Subscriber* pSub)
{
// Add subscriber to list if not already added
std::pair< std::set<Subscriber*>::iterator, bool > prInsert = setSubscribers.insert( pSub );
if (!prInsert.second)
throw std::invalid_argument( "already exists" );
}
/* Remove subscribers from the internal set */
void Publisher::Detach( Subscriber* pSub)
{
std::set<Subscriber*>::size_type szNum = setSubscribers.erase( pSub );
if (0 == szNum)
throw std::invalid_argument("attempting to detach non-existant element");
}
/* Send notifications to all registered subscribers. The bool is used to notify
subscribers that the publisher is being destroyed */
void Publisher::Notify( bool bReset )
{
std::set<Subscriber*>::iterator itBegin
= setSubscribers.begin();
const std::set<Subscriber*>::iterator itEnd = setSubscribers.end();
// Notify each interested subscriber
for ( ; itBegin != itEnd; ++itBegin )
{
// itBegin is an iterator (ie, pointer) to a pointer
Subscriber* pSub = (Subscriber*)(*itBegin);
pSub->Update( this, bReset );
}
}
/* When this publisher is created, it creates a thread to
handle sending heartbeats */
HeartbeatPublisher::HeartbeatPublisher()
{
// Create an event to terminate thread
that sends heartbeats
hEvent = CreateEvent( NULL, TRUE, FALSE, NULL);
if ( 0 == hEvent )
throw std::runtime_error( "Failed to create event object" );
// Create a thread to emulate a timer
lThreadHandle = _beginthreadex( NULL, 0, Tick, (void*)this, 0, &uiThreadID);
if ( 0 == lThreadHandle )
throw std::runtime_error( "Failed to create thread" );
}
/* When a publisher is destroyed it should unsubscribe all
its subscribers to avoid having dangling references to subscribers */
HeartbeatPublisher::~HeartbeatPublisher()
{
// Terminate heartbeat thread
SetEvent( hEvent );
WaitForSingleObject( (HANDLE)lThreadHandle, INFINITE);
// Notify subscribers that this publisher is being destroyed so that they
// reset their references to this publisher
Notify( true );
}
/* Note: A console app like this cannot use SetTimer() because it requires
a message loop. However, this behavior can be easily emulated using a thread that uses
WaitForSingleObject() with the required time out period. */
unsigned __stdcall HeartbeatPublisher::Tick(void* pParam)
{
HeartbeatPublisher* pThis = static_cast<HeartbeatPublisher*>(pParam);
bool bLoop = true;
while (bLoop)
{
DWORD dwReason = WaitForSingleObject( pThis->hEvent, 2000);
switch( dwReason )
{
case WAIT_OBJECT_0:
// Terminate thread
std::cout << "Terminating thread ... " << std::endl;
bLoop = false;
break;
case WAIT_TIMEOUT:
// Time out period occured. Send a heartbeat to all subscribers
std::cout << "Sending notification to subscribers ... " << std::endl;
pThis->Notify( false );
break;
case WAIT_FAILED:
std::cout << "WaitForSingleObject failed!" << std::endl;
}; // switch
} // while
return 0;
}
/* Subscribers */
/* Observer (Subscriber): An ABC that defines an updating interface
for any object to be notified of changes in a subject (publisher) */
class Subscriber
{
// Constructors/Destructor
public:
virtual ~Subscriber(){}
// public interface
public:
virtual void Update( Publisher* pChangedPublisher, bool bReset ) = 0;
};
/* Concrete Observer (Concrete Subscriber): Implements the Subscriber updating interface to keep
its state consistent with the publisher. It also maintains a reference to the publisher in order
to retrieve any information required to keep it in synch with the Publisher*/
class ConnectedClient : public Subscriber
{
// Constructors/Destructor
public:
ConnectedClient( Publisher *pP);
~ConnectedClient();
// public interface
public:
void Update( Publisher *pB, bool bReset);
private:
// Use std::set<Publisher*> if
subscriber can subscribe to more than one publisher.
Publisher *pPublisher;
};
/* When a subscriber is created, it should subscribe with
a publisher */
ConnectedClient::ConnectedClient( Publisher *pP) : pPublisher( pP )
{
// Register interest with the publisher
if (pPublisher)
pPublisher->Attach( this );
}
/* When a subscriber is destroyed, it should unsubscribe
from publisher */
ConnectedClient::~ConnectedClient()
{
// On destruction, this object should no longer receive notifications from
the publisher
if (pPublisher)
pPublisher->Detach( this );
}
/* Called by publisher to inform client of a heartbeat.
Note that a publisher passes itself as a parameter to let the subscriber
identify its publisher if the subscriber had more than one publisher (in this
case the publishers must be maintained in a set). bReset flag indicates if the
publisher is being destroyed; if so, subscriber must un-subscribe from publisher
*/
void ConnectedClient::Update( Publisher *pB, bool bReset)
{
// Check that we are being notified by our publisher
if (pB == pPublisher)
{
// Is this publisher being reset
if (bReset)
{
// Yes. We no longer can receive events from it
pPublisher = NULL;
}
else
{
std::cout << "Received heartbeat (" << this << ")" << std::endl;
// Perform application-specific operations.
// ...
}
}
}
int main(int argc, char* argv[])
{
try
{
// Create a publisher (initializes a timer)
HeartbeatPublisher obPublisher;
// Create two subscribers and subscribe with obPublisher
ConnectedClient obClient1( &obPublisher );
ConnectedClient obClient2( &obPublisher );
// Place a breakpoint on Notify and note how it updates its subscriber every
// time out period
// Heartbeats are sent until we quit
application with 'q'
char a;
while (a != 'q')
{
std::cin >> a;
};
}
catch (const std::exception & e)
{
std::cout << e.what() << std::endl;
}
return 0;
}