00001
00014 #ifndef _DLR_THREAD_DISTRIBUTIONQUEUE_H_
00015 #define _DLR_THREAD_DISTRIBUTIONQUEUE_H_
00016
00017 #include <map>
00018 #include <boost/thread.hpp>
00019 #include <dlrThread/clientID.h>
00020 #include <dlrThread/exception.h>
00021 #include <dlrThread/monitor.h>
00022 #include <dlrUtilities/timeUtilities.h>
00023
00024
00025 namespace dlr {
00026
00027 namespace thread {
00028
00056 template <class Type>
00057 class DistributionQueue
00058 : public Monitor
00059 {
00060 public:
00061
00073 typedef ClientID<Type> ClientID;
00074
00075
00093 DistributionQueue(size_t maximumLength, size_t numberOfClients=2);
00094
00095
00103 DistributionQueue(const DistributionQueue<Type>& source);
00104
00105
00109 virtual
00110 ~DistributionQueue();
00111
00112
00121 DistributionQueue<Type>&
00122 operator=(const DistributionQueue<Type>& source);
00123
00124
00144 void
00145 bufferBack(const ClientID& clientID);
00146
00147
00167 bool
00168 bufferBack(const ClientID& clientID, double timeout);
00169
00170
00174 void
00175 clear();
00176
00177
00194 void
00195 copyBuffer(const ClientID& clientID, Type& target);
00196
00197
00206 size_t
00207 getMaximumLength() {return m_size;}
00208
00209
00253 void
00254 lockBack(const ClientID& clientID);
00255
00256
00278 bool
00279 lockBack(const ClientID& clientID, double timeout);
00280
00281
00295 void
00296 pushFront(const ClientID& clientID, const Type& element);
00297
00298
00325 void
00326 registerClient(ClientID& clientID,
00327 const std::string& clientName="anonymous");
00328
00329
00339 void
00340 unlockBack(const ClientID& clientID);
00341
00342 private:
00343
00344 inline void
00345 releaseLock();
00346
00347
00348 inline void
00349 waitForLock(Token& token);
00350
00351
00352 inline bool
00353 waitForLock(Token& token, double timeout);
00354
00355
00356
00357 size_t* m_headIndexPtr;
00358 size_t m_size;
00359 size_t* m_tailIndexPtr;
00360
00361 Type** m_bufferArrayPtr;
00362 std::map<size_t, std::string>* m_clientNameMapPtr;
00363 Type* m_dataArray;
00364 size_t* m_maxNumberOfClientsPtr;
00365 size_t* m_numberOfClientsPtr;
00366
00367 Condition m_headChangedCondition;
00368 bool* m_isLockedPtr;
00369 Condition m_isLockedCondition;
00370
00371
00372
00373
00374 ReferenceCount* m_referenceCountPtr;
00375 };
00376
00377 }
00378
00379 }
00380
00381
00382
00383
00384 #include <algorithm>
00385 #include <iostream>
00386 #include <sstream>
00387 #include <dlrThread/private.h>
00388
00389
00390 namespace dlr {
00391
00392 namespace thread {
00393
00394
00395
00396 template <class Type>
00397 DistributionQueue<Type>::
00398 DistributionQueue(size_t maximumLength, size_t numberOfClients)
00399 : Monitor(),
00400 m_headIndexPtr(new size_t),
00401 m_size(maximumLength),
00402 m_tailIndexPtr(new size_t),
00403 m_bufferArrayPtr(new Type*),
00404 m_clientNameMapPtr(new std::map<size_t, std::string>),
00405 m_dataArray(new Type[maximumLength]),
00406 m_maxNumberOfClientsPtr(new size_t),
00407 m_numberOfClientsPtr(new size_t),
00408 m_headChangedCondition(Monitor::createCondition()),
00409 m_isLockedPtr(new bool),
00410 m_isLockedCondition(Monitor::createCondition()),
00411 m_referenceCountPtr(new ReferenceCount)
00412 {
00413 if(numberOfClients == 0) {
00414 DLR_THROW(ValueException, "DistributionQueue::DistributionQueue()",
00415 "Argument numberOfClients must be greater than zero.");
00416 }
00417
00418
00419 *m_headIndexPtr = 0;
00420 *m_tailIndexPtr = 0;
00421
00422
00423 *m_bufferArrayPtr = new Type[numberOfClients];
00424 *m_maxNumberOfClientsPtr = numberOfClients;
00425 *m_numberOfClientsPtr = 0;
00426
00427
00428 *m_isLockedPtr = false;
00429
00430
00431 this->makeCopyable();
00432 }
00433
00434
00435
00436 template <class Type>
00437 DistributionQueue<Type>::
00438 DistributionQueue(const DistributionQueue<Type>& source)
00439 : Monitor(source),
00440 m_headIndexPtr(0),
00441 m_size(0),
00442 m_tailIndexPtr(0),
00443 m_bufferArrayPtr(0),
00444 m_clientNameMapPtr(0),
00445 m_dataArray(0),
00446 m_maxNumberOfClientsPtr(0),
00447 m_numberOfClientsPtr(0),
00448 m_headChangedCondition(Monitor::createCondition(false)),
00449 m_isLockedPtr(0),
00450 m_isLockedCondition(Monitor::createCondition(false)),
00451 m_referenceCountPtr(0)
00452 {
00453 Token token = this->getToken();
00454
00455
00456
00457 m_headIndexPtr = source.m_headIndexPtr;
00458 m_size = source.m_size;
00459 m_tailIndexPtr = source.m_tailIndexPtr;
00460 m_bufferArrayPtr = source.m_bufferArrayPtr;
00461 m_clientNameMapPtr = source.m_clientNameMapPtr;
00462 m_dataArray = source.m_dataArray;
00463 m_maxNumberOfClientsPtr = source.m_maxNumberOfClientsPtr;
00464 m_numberOfClientsPtr = source.m_numberOfClientsPtr;
00465 m_headChangedCondition = source.m_headChangedCondition;
00466 m_isLockedPtr = source.m_isLockedPtr;
00467 m_isLockedCondition = source.m_isLockedCondition;
00468 m_referenceCountPtr = source.m_referenceCountPtr;
00469 ++(*m_referenceCountPtr);
00470 }
00471
00472
00473
00474 template <class Type>
00475 DistributionQueue<Type>::
00476 ~DistributionQueue()
00477 {
00478 Token token = this->getToken();
00479
00480
00481
00482
00483 if(!(m_referenceCountPtr->isShared())) {
00484 delete m_headIndexPtr;
00485 delete m_tailIndexPtr;
00486 delete m_bufferArrayPtr;
00487 delete m_clientNameMapPtr;
00488 delete m_dataArray;
00489 delete m_maxNumberOfClientsPtr;
00490 delete m_numberOfClientsPtr;
00491 delete m_isLockedPtr;
00492 delete m_referenceCountPtr;
00493 } else {
00494 --(*m_referenceCountPtr);
00495 }
00496 }
00497
00498
00499
00500 template <class Type>
00501 DistributionQueue<Type>&
00502 DistributionQueue<Type>::
00503 operator=(const DistributionQueue<Type>& source)
00504 {
00505 if(&source != this) {
00506 Monitor::operator=(source);
00507 Token token = this->getToken();
00508
00509
00510
00511
00512 this->release();
00513 this->copyOther(source);
00514 }
00515 }
00516
00517
00518
00519
00520 template <class Type>
00521 void
00522 DistributionQueue<Type>::
00523 bufferBack(const ClientID& clientID)
00524 {
00525
00526 this->lockBack(clientID);
00527
00528 try {
00529 (*m_bufferArrayPtr)[clientID.m_idNumber]
00530 = m_dataArray[*m_tailIndexPtr];
00531 } catch(...) {
00532 this->unlockBack(clientID);
00533 throw;
00534 }
00535
00536 ++(*m_tailIndexPtr);
00537 if(*m_tailIndexPtr == m_size) {
00538 *m_tailIndexPtr = 0;
00539 }
00540
00541 this->unlockBack(clientID);
00542 }
00543
00544
00545
00546
00547 template <class Type>
00548 bool
00549 DistributionQueue<Type>::
00550 bufferBack(const ClientID& clientID, double timeout)
00551 {
00552
00553 if(this->lockBack(clientID, timeout) == false) {
00554 return false;
00555 }
00556
00557 try {
00558 (*m_bufferArrayPtr)[clientID.m_idNumber]
00559 = m_dataArray[*m_tailIndexPtr];
00560 } catch(...) {
00561 this->unlockBack(clientID);
00562 throw;
00563 }
00564
00565 ++(*m_tailIndexPtr);
00566 if(*m_tailIndexPtr == m_size) {
00567 *m_tailIndexPtr = 0;
00568 }
00569
00570 this->unlockBack(clientID);
00571 return true;
00572 }
00573
00574
00575
00576 template <class Type>
00577 void
00578 DistributionQueue<Type>::
00579 clear()
00580 {
00581 Token token = this->getToken();
00582 this->waitForLock();
00583 *m_tailIndexPtr = *m_headIndexPtr;
00584 this->releaseLock();
00585 }
00586
00587
00588
00589
00590
00591 template <class Type>
00592 void
00593 DistributionQueue<Type>::
00594 copyBuffer(const ClientID& clientID, Type& target)
00595 {
00596 target = (*m_bufferArrayPtr)[clientID.m_idNumber];
00597 }
00598
00599
00600
00601
00602 template <class Type>
00603 void
00604 DistributionQueue<Type>::
00605 lockBack(const ClientID& clientID)
00606 {
00607 Token token = this->getToken();
00608 this->waitForLock(token);
00609
00610
00611
00612
00613 while(*m_tailIndexPtr == *m_headIndexPtr) {
00614 this->releaseLock();
00615 this->wait(m_headChangedCondition, token);
00616 this->waitForLock(token);
00617 }
00618
00619
00620 return;
00621 }
00622
00623
00624
00625
00626 template <class Type>
00627 bool
00628 DistributionQueue<Type>::
00629 lockBack(const ClientID& clientID, double timeout)
00630 {
00631
00632 double deadline = getCurrentTime() + timeout;
00633 Token token = this->getToken(timeout);
00634 if(!token) {
00635 return false;
00636 }
00637 if(!this->waitForLock(token, deadline - getCurrentTime())) {
00638 return false;
00639 }
00640
00641
00642
00643
00644 while(*m_tailIndexPtr == *m_headIndexPtr) {
00645 this->releaseLock();
00646 if(!this->wait(m_headChangedCondition, token,
00647 deadline - getCurrentTime())) {
00648 return false;
00649 }
00650 if(!this->waitForLock(token, deadline - getCurrentTime())) {
00651 return false;
00652 }
00653 }
00654
00655
00656 return true;
00657 }
00658
00659
00660 template <class Type>
00661 void
00662 DistributionQueue<Type>::
00663 pushFront(const ClientID& clientID, const Type& element)
00664 {
00665
00666
00667
00668 bool overflowFlag = false;
00669 {
00670
00671 Token token = this->getToken();
00672 this->waitForLock(token);
00673 try {
00674
00675 size_t newHeadIndex = *m_headIndexPtr + 1;
00676 if(newHeadIndex == m_size) {
00677 newHeadIndex = 0;
00678 }
00679
00680
00681 if(newHeadIndex == *m_tailIndexPtr) {
00682 overflowFlag = true;
00683 } else {
00684
00685 m_dataArray[*m_headIndexPtr] = element;
00686 }
00687 *m_headIndexPtr = newHeadIndex;
00688
00689
00690 this->signalAll(m_headChangedCondition);
00691 } catch(...) {
00692 this->releaseLock();
00693 throw;
00694 }
00695 this->releaseLock();
00696 }
00697
00698
00699 if(overflowFlag) {
00700 std::ostringstream message;
00701 message << "Queue size exceeds the allowable maximum of "
00702 << m_size << " " << "on a function call from thread "
00703 << (m_clientNameMapPtr->find(clientID.m_idNumber))->second
00704 << ".";
00705 DLR_THROW(OverflowException, "DistributionQueue::pushFront()",
00706 message.str().c_str());
00707 }
00708 }
00709
00710
00711
00712
00713 template <class Type>
00714 void
00715 DistributionQueue<Type>::
00716 registerClient(ClientID& clientID, const std::string& clientName)
00717 {
00718
00719
00720 Token token = this->getToken();
00721 this->waitForLock(token);
00722
00723 try {
00724 m_clientNameMapPtr->insert(
00725 std::make_pair(*m_numberOfClientsPtr, clientName));
00726 clientID.m_count = 0;
00727 clientID.m_idNumber = (*m_numberOfClientsPtr)++;
00728 clientID.m_index = 0;
00729
00730
00731
00732 if(*m_numberOfClientsPtr > *m_maxNumberOfClientsPtr) {
00733 size_t oldMaxNumberOfClients = *m_maxNumberOfClientsPtr;
00734 Type* oldBufferArray = *m_bufferArrayPtr;
00735
00736 *m_maxNumberOfClientsPtr *= 2;
00737 *m_bufferArrayPtr = new Type[*m_maxNumberOfClientsPtr];
00738
00739 std::copy(oldBufferArray, oldBufferArray + oldMaxNumberOfClients,
00740 *m_bufferArrayPtr);
00741 delete[] oldBufferArray;
00742 }
00743 } catch(...) {
00744 this->releaseLock();
00745 throw;
00746 }
00747 this->releaseLock();
00748 }
00749
00750
00751
00752
00753 template <class Type>
00754 void
00755 DistributionQueue<Type>::
00756 unlockBack(const ClientID& clientID)
00757 {
00758 Token token = this->getToken();
00759 this->releaseLock();
00760 }
00761
00762
00763 template <class Type>
00764 void
00765 DistributionQueue<Type>::
00766 releaseLock()
00767 {
00768 *m_isLockedPtr = false;
00769 this->signalOne(m_isLockedCondition);
00770 }
00771
00772
00773 template <class Type>
00774 void
00775 DistributionQueue<Type>::
00776 waitForLock(Token& token)
00777 {
00778 while(*m_isLockedPtr) {
00779 this->wait(this->m_isLockedCondition, token);
00780 }
00781 *m_isLockedPtr = true;
00782 }
00783
00784
00785 template <class Type>
00786 bool
00787 DistributionQueue<Type>::
00788 waitForLock(Token& token, double timeout)
00789 {
00790 double deadline = getCurrentTime() + timeout;
00791 while(*m_isLockedPtr) {
00792 if(this->wait(this->m_isLockedCondition, token,
00793 deadline - getCurrentTime()) == false) {
00794 return false;
00795 }
00796 }
00797 *m_isLockedPtr = true;
00798 return true;
00799 }
00800
00801
00802 }
00803
00804 }
00805
00806 #endif