Behavioral

Observer (Publisher-Subscriber)

Purpose

Allows an object (publisher) to notify interested parties (subscribers)  when the publisher changes state.

UML

Behavior
  1. A Publisher notifies its Subscribers whenever a change occurs that would invalidate the state of the subscribers.
  2. Notified subscribers may then query the publisher for information to update their state.
Example

In a distributed application, a server module often uses heartbeats to inform connected clients that the server is up and running. The following example applies the Observer pattern to implement a heart beat mechanism in which a server-side class (HeartbeatPublisher) periodically sends updates to connected clients. To keep the example simple, publisher and subscribers are implemented on the same machine. A heartbeat is then conveniently implemented as a call from HeartbeatPublisher (server-side publisher) to a member function in ConnectedClient (client-side subscriber).

In a real world application where server and client modules reside on different machines, a heartbeat is often implemented using DCOM or sockets or some other similar technology.

Usage

/* Publisher */

/* Subject (Publisher): Provides an interface for attaching/detaching subscribers (observers). It also maintains a list of its observers. In this example, the publisher maintains a list of subscribers that are interested in receiving a heartbeat */
class Publisher
{
// Constructors/Destructor
protected:
    // protected: This class cannot be instantiated directly. 
    Publisher(){}

public:
    virtual ~Publisher(){}

// Public interface
public:
    void Attach( Subscriber* pSub);
    void Detach( Subscriber* pSub);
    void Notify( bool bReset );

// Data members
private:
    std::set<Subscriber*> setSubscribers;     // std::set has no duplicates
};

/* Concrete Subject (Concrete Publisher): Maintains state of interest and sends notification to its subscribers when its state changes */
class HeartbeatPublisher : public Publisher
{
// Constructors/Destructor
public:
    HeartbeatPublisher();
    ~HeartbeatPublisher();

// Public interface
public:
    static unsigned __stdcall Tick( void* pParam );

// Data members
    unsigned int  uiThreadID;
    unsigned long lThreadHandle;
    HANDLE        hEvent;
};

/* Add interested subscribers to an internal set */
void Publisher::Attach( Subscriber* pSub)

    // Add subscriber to list if not already added
    std::pair< std::set<Subscriber*>::iterator, bool > prInsert = setSubscribers.insert( pSub );
    if (!prInsert.second)
        throw std::invalid_argument( "already exists" );
}

/* Remove subscribers from the internal set */
void Publisher::Detach( Subscriber* pSub)
{
    std::set<Subscriber*>::size_type szNum = setSubscribers.erase( pSub );
    if (0 == szNum)
        throw std::invalid_argument("attempting to detach non-existant element");
}

/* Send notifications to all registered subscribers. The bool is used to notify subscribers that the publisher is being destroyed */
void Publisher::Notify( bool bReset )
{
    std::set<Subscriber*>::iterator itBegin     = setSubscribers.begin();
    const std::set<Subscriber*>::iterator itEnd = setSubscribers.end();

    // Notify each interested subscriber
    for ( ; itBegin != itEnd; ++itBegin )
    {
        // itBegin is an iterator (ie, pointer) to a pointer
        Subscriber* pSub = (Subscriber*)(*itBegin);
        pSub->Update( this, bReset );
    }
}

/* When this publisher is created, it creates a thread to handle sending heartbeats */
HeartbeatPublisher::HeartbeatPublisher
()
{
    // Create an event to terminate thread that sends heartbeats
    hEvent = CreateEvent( NULL, TRUE, FALSE, NULL);
    if ( 0 == hEvent )
        throw std::runtime_error( "Failed to create event object" );

    // Create a thread to emulate a timer
    lThreadHandle = _beginthreadex( NULL, 0, Tick, (void*)this, 0, &uiThreadID);
    if ( 0 == lThreadHandle )
        throw std::runtime_error( "Failed to create thread" );
}

/* When a publisher is destroyed it should unsubscribe all its subscribers to avoid having dangling references to subscribers */
HeartbeatPublisher::~HeartbeatPublisher()
{
    // Terminate heartbeat thread
    SetEvent( hEvent );
    WaitForSingleObject( (HANDLE)lThreadHandle, INFINITE);

    // Notify subscribers that this publisher is being destroyed so that they
    // reset their references to this publisher
    Notify( true );
}

/* Note: A console app like this cannot use SetTimer() because it requires a message loop. However, this behavior can be easily emulated using a thread that uses WaitForSingleObject() with the required time out period. */
unsigned __stdcall HeartbeatPublisher::Tick(void* pParam)
{
    HeartbeatPublisher* pThis = static_cast<HeartbeatPublisher*>(pParam);

    bool bLoop = true;
    while (bLoop)
    {
        DWORD dwReason = WaitForSingleObject( pThis->hEvent, 2000);
        switch( dwReason )
        {
            case WAIT_OBJECT_0:
                // Terminate thread
                std::cout << "Terminating thread ... " << std::endl;
                bLoop = false;
                break;

            case WAIT_TIMEOUT:
                // Time out period occured. Send a heartbeat to all subscribers
                std::cout << "Sending notification to subscribers ... " << std::endl;
                pThis->Notify( false );
                break;

            case WAIT_FAILED:
                std::cout << "WaitForSingleObject failed!" << std::endl;
        }; // switch
    } // while

    return 0;
}

/* Subscribers */

/* Observer (Subscriber): An ABC that defines an updating interface for any object to be notified of changes in a subject (publisher) */
class Subscriber
{
// Constructors/Destructor
public:
    virtual ~Subscriber(){}

// public interface
public:
    virtual void Update( Publisher* pChangedPublisher, bool bReset ) = 0;
};

/* Concrete Observer (Concrete Subscriber): Implements the Subscriber updating interface to keep its state consistent with the publisher. It also maintains a reference to the publisher in order to retrieve any information required to keep it in synch with the Publisher*/
class ConnectedClient : public Subscriber
{
// Constructors/Destructor
public:
    ConnectedClient( Publisher *pP);
    ~ConnectedClient();

// public interface
public:
    void Update( Publisher *pB, bool bReset);

private:
    // Use std::set<Publisher*> if subscriber can subscribe to more than one publisher.
    Publisher *pPublisher;
};


/* When a subscriber is created, it should subscribe with a publisher */
ConnectedClient::ConnectedClient( Publisher *pP) : pPublisher( pP )
{
    // Register interest with the publisher
    if (pPublisher)
        pPublisher->Attach( this );
}

/* When a subscriber is destroyed, it should unsubscribe from publisher */
ConnectedClient::~ConnectedClient()
{
    // On destruction, this object should no longer receive notifications from the publisher
    if (pPublisher)
        pPublisher->Detach( this );
}

/* Called by publisher to inform client of a heartbeat. Note that a publisher passes itself as a parameter to let the subscriber identify its publisher if the subscriber had more than one publisher (in this case the publishers must be maintained in a set). bReset flag indicates if the publisher is being destroyed; if so, subscriber must un-subscribe from publisher */
void ConnectedClient::Update( Publisher *pB, bool bReset)
{
    // Check that we are being notified by our publisher
    if (pB == pPublisher)
    { 
        // Is this publisher being reset
        if (bReset)
        {
            // Yes. We no longer can receive events from it
            pPublisher = NULL;
        }
        else
        {
            std::cout << "Received heartbeat (" << this << ")" << std::endl; 
            // Perform application-specific operations.

            // ...
 
       }
    }
}

int main(int argc, char* argv[])
{
    try
    {
        // Create a publisher (initializes a timer)
        HeartbeatPublisher obPublisher;

        // Create two subscribers and subscribe with obPublisher
        ConnectedClient obClient1( &obPublisher );
        ConnectedClient obClient2( &obPublisher );

        // Place a breakpoint on Notify and note how it updates its subscriber every
        // time out period

        // Heartbeats are sent until we quit application with 'q'
        char a;
        while (a != 'q')
        {
            std::cin >> a;
        };
    }
    catch (const std::exception & e)
    {
        std::cout << e.what() << std::endl;
    }

    return 0;
}

Notes