#include using namespace Upp; #define BUFFER_SIZE 10 #define PRODUCER_SLEEP_TIME_MS 500 #define CONSUMER_SLEEP_TIME_MS 2000 int ItemBuffer[BUFFER_SIZE]; int QueueSize; int QueueStartOffset; int TotalItemsProduced; int TotalItemsConsumed; ConditionVariable ItemBufferNotEmpty; ConditionVariable ItemBufferNotFull; Mutex ItemBufferLock; BOOL StopRequested; void ProducerThread() { LOG("Starting producer thread"); for(;;) { Sleep(Random(PRODUCER_SLEEP_TIME_MS)); { Mutex::Lock __(ItemBufferLock); while(QueueSize == BUFFER_SIZE && !StopRequested) ItemBufferNotFull.Wait(ItemBufferLock); if(StopRequested) break; int Item = Random(1000); ItemBuffer[(QueueStartOffset + QueueSize) % BUFFER_SIZE] = Item; QueueSize++; TotalItemsProduced++; LOG("Producer item " << Item << ", queue size " << QueueSize); } ItemBufferNotEmpty.Signal(); } LOG("Producer exiting"); } void ConsumerThread(int id) { LOG("Starting consumer thread " << id); for(;;) { { Mutex::Lock __(ItemBufferLock); while(QueueSize == 0 && !StopRequested) ItemBufferNotEmpty.Wait(ItemBufferLock); if(StopRequested && QueueSize == 0) break; LONG Item = ItemBuffer[QueueStartOffset]; QueueSize--; QueueStartOffset++; TotalItemsConsumed++; if(QueueStartOffset == BUFFER_SIZE) QueueStartOffset = 0; LOG("Consumer " << id << ", item " << Item << ", queue size " << QueueSize); } ItemBufferNotFull.Signal(); Sleep(Random(CONSUMER_SLEEP_TIME_MS)); } LOG("Consumer exiting"); } CONSOLE_APP_MAIN { StdLogSetup(LOG_COUT|LOG_FILE); Thread producer, consumer[2]; producer.Run(callback(ProducerThread)); for(int i = 0; i < 2; i++) consumer[i].Run(callback1(ConsumerThread, i)); LOG("Press enter to stop..."); ReadStdIn(); StopRequested = TRUE; ItemBufferNotFull.Broadcast(); ItemBufferNotEmpty.Broadcast(); producer.Wait(); for(int i = 0; i < 2; i++) consumer[i].Wait(); LOG("TotalItemsProduced: " << TotalItemsProduced << ", TotalItemsConsumed: " << TotalItemsConsumed); }