Skip to content
System Programming
Bounded buffer

Bounded buffer (producer and consumer)

The sample demonstrates how to implement a producer-consumer solution (bounded buffer) using semaphores and using conditional variables.

Using semaphores

The below solution uses a mutex for synchronizing access to the buffer and two semaphores: available_counter to indicate number of available slots and occupied_counter to indicate the number of used slots.

#include <iostream>
#include <pthread.h>
#include <string.h>
#include <semaphore.h>
#include <cerrno>
#include <unistd.h>
#include <vector>
 
int BUFFER_SIZE = 500;
int PRODUCERS = 10;
int CONSUMERS = 10;
 
// the worker context for producers and consumers
struct worker_context_t {
 
    // the limit of the buffer
    int buffer_size;
 
    // the bounded buffer
    std::vector<char>* buffer;
 
    // the lock to synchronize access to the buffer
    pthread_mutex_t* lock;
 
    // the semaphore to hold available slots
    sem_t* available_counter;
 
    // the semaphore to indicate number of occupied slots
    sem_t* occupied_counter;
};
 
 
void* producer(void* arg){
 
    // the worker context
    worker_context_t* ctx = (worker_context_t*) arg;
 
    while(true){
 
        // wait to have at least one slot to produce
        sem_wait(ctx->available_counter);
 
        // ensure exclusive access to the buffer
        pthread_mutex_lock(ctx->lock);
 
        // add new element to the buffer (produce)
        ctx->buffer->push_back('x');
        usleep(2000);
 
        printf("\rP: Elements in the buffer: %d/%d        ", (int) ctx->buffer->size(), ctx->buffer_size);
 
        // unlock the buffer
        pthread_mutex_unlock(ctx->lock);
        
        // increment the number of occupied slots
        sem_post(ctx->occupied_counter);
    }
 
    return NULL;
}
 
void* consumer(void* arg){
 
    // the worker context
    worker_context_t* ctx = (worker_context_t*) arg;
 
    while(true){
 
        // wait to have at least one full slot to consume
        sem_wait(ctx->occupied_counter);
 
        // ensure exclusive access to the buffer
        pthread_mutex_lock(ctx->lock);
 
        // remove last element to the buffer (consume)
        ctx->buffer->pop_back();
        usleep(1000);
        
        printf("\rC: Elements in the buffer: %d/%d        ", (int) ctx->buffer->size(), ctx->buffer_size);
 
        // unlock the buffer
        pthread_mutex_unlock(ctx->lock);
        
        // increment the number of available slots when removing element
        sem_post(ctx->available_counter);
    }
 
    return NULL;
}
 
int main(int argc, char** argv){
 
    // allocate the vector
    std::vector<char>* buffer = new std::vector<char>();
 
    // initialize locks
    pthread_mutex_t* lock = new pthread_mutex_t;
    pthread_mutex_init(lock, NULL);
 
    // initialize available slots counter semaphore (there are N available slots at the beginning)
    sem_t* available_counter = new sem_t;
    sem_init(available_counter, 0, BUFFER_SIZE);
 
    // initialize occupied slots counter semaphore (there are 0 occupied slots at the beginning)
    sem_t* occupied_counter = new sem_t;
    sem_init(occupied_counter, 0, 0);
 
    // initialize the common worker context
    worker_context_t* context = new worker_context_t;
    context->buffer = buffer;
    context->buffer_size = BUFFER_SIZE;
    context->lock = lock;
    context->available_counter = available_counter;
    context->occupied_counter = occupied_counter;
 
    // initialize worker threads
    pthread_t* producers = new pthread_t[PRODUCERS];
    pthread_t* consumers = new pthread_t[CONSUMERS];
 
    // create producer threads
    for(int i = 0; i < PRODUCERS; ++i){
        pthread_create(&producers[i], NULL, producer, (void*) context);
    }
 
    // create consumer threads
    for(int i = 0; i < CONSUMERS; ++i){
        pthread_create(&consumers[i], NULL, consumer, (void*) context);
    }
 
 
    // wait for all producers 
    for(int i = 0; i < PRODUCERS; ++i){
        pthread_join(producers[i], NULL);
    }
 
    // wait for all consumers
    for(int i = 0; i < CONSUMERS; ++i){
        pthread_join(consumers[i], NULL);
    }
 
    delete context;
 
    delete [] producers;
    delete [] consumers;
 
    sem_destroy(occupied_counter);
    delete occupied_counter;
 
    sem_destroy(available_counter);
    delete available_counter;
 
    pthread_mutex_destroy(lock);
    delete lock;
 
    delete buffer;
 
    return 0;
}

Using conditional variables

The above solution uses a mutex for synchronizing access to the buffer and two conditional variables: not_full to indicate that there are available slots and not_empty to indicate that there are occupied/used slots.

#include <iostream>
#include <pthread.h>
#include <string.h>
#include <cstdlib> 
#include <cerrno>
#include <unistd.h>
#include <vector>
 
int BUFFER_SIZE = 500;
int PRODUCERS = 10;
int CONSUMERS = 10;
 
// the worker context for producers and consumers
struct worker_context_t {
 
    // the limit of the buffer
    int buffer_size;
 
    // the bounded buffer
    std::vector<char>* buffer;
 
    // the lock to synchronize access to the buffer
    pthread_mutex_t* lock;
 
    // the condition variable for "not full"
    pthread_cond_t* not_full;
 
    // the condition variable for "not empty"
    pthread_cond_t* not_empty;
};
 
 
void* producer(void* arg){
 
    // the worker context
    worker_context_t* ctx = (worker_context_t*) arg;
 
    while(true){
 
        // ensure exclusive access to the buffer
        pthread_mutex_lock(ctx->lock);
 
        // wait until buffer has space to produce
        while((int) ctx->buffer->size() >= ctx->buffer_size){
            pthread_cond_wait(ctx->not_full, ctx->lock);
        }
 
        // add new element to the buffer (produce)
        ctx->buffer->push_back('x');
        usleep(2000);
 
        printf("\rP: Elements in the buffer: %d/%d        ", (int) ctx->buffer->size(), ctx->buffer_size);
 
        // signal that buffer is not empty now
        pthread_cond_signal(ctx->not_empty);
 
        // unlock the buffer
        pthread_mutex_unlock(ctx->lock);
    }
 
    return NULL;
}
 
void* consumer(void* arg){
 
    // the worker context
    worker_context_t* ctx = (worker_context_t*) arg;
 
    while(true){
 
        // ensure exclusive access to the buffer
        pthread_mutex_lock(ctx->lock);
 
        // wait until buffer has at least one element
        while(ctx->buffer->empty()){
            pthread_cond_wait(ctx->not_empty, ctx->lock);
        }
 
        // remove last element from the buffer (consume)
        ctx->buffer->pop_back();
        usleep(1000);
        
        printf("\rC: Elements in the buffer: %d/%d        ", (int) ctx->buffer->size(), ctx->buffer_size);
 
        // signal that buffer is not full now
        pthread_cond_signal(ctx->not_full);
 
        // unlock the buffer
        pthread_mutex_unlock(ctx->lock);
    }
 
    return NULL;
}
 
int main(int argc, char** argv){
 
    // allocate the vector
    std::vector<char>* buffer = new std::vector<char>();
 
    // initialize locks
    pthread_mutex_t* lock = new pthread_mutex_t;
    pthread_mutex_init(lock, NULL);
 
    // initialize condition variables
    pthread_cond_t* not_full = new pthread_cond_t;
    pthread_cond_init(not_full, NULL);
    
    pthread_cond_t* not_empty = new pthread_cond_t;
    pthread_cond_init(not_empty, NULL);
 
    // initialize the common worker context
    worker_context_t* context = new worker_context_t;
    context->buffer = buffer;
    context->buffer_size = BUFFER_SIZE;
    context->lock = lock;
    context->not_full = not_full;
    context->not_empty = not_empty;
 
    // initialize worker threads
    pthread_t* producers = new pthread_t[PRODUCERS];
    pthread_t* consumers = new pthread_t[CONSUMERS];
 
    // create producer threads
    for(int i = 0; i < PRODUCERS; ++i){
        pthread_create(&producers[i], NULL, producer, (void*) context);
    }
 
    // create consumer threads
    for(int i = 0; i < CONSUMERS; ++i){
        pthread_create(&consumers[i], NULL, consumer, (void*) context);
    }
 
    // wait for all producers 
    for(int i = 0; i < PRODUCERS; ++i){
        pthread_join(producers[i], NULL);
    }
 
    // wait for all consumers
    for(int i = 0; i < CONSUMERS; ++i){
        pthread_join(consumers[i], NULL);
    }
 
    delete context;
 
    delete [] producers;
    delete [] consumers;
 
    pthread_cond_destroy(not_full);
    delete not_full;
    
    pthread_cond_destroy(not_empty);
    delete not_empty;
 
    pthread_mutex_destroy(lock);
    delete lock;
 
    delete buffer;
 
    return 0;
}