#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
typedef struct
{
size_t buffer_size;
double* buffer;
size_t rounds;
useconds_t min_producer_delay;
useconds_t max_producer_delay;
useconds_t min_consumer_delay;
useconds_t max_consumer_delay;
sem_t producer_semaphore;
sem_t consumer_semaphore;
pthread_mutex_t stdout_mutex;
} shared_data_t;
int analyze_arguments(int argc, char* argv[], shared_data_t* shared_data);
int create_threads(shared_data_t* shared_data);
void* produce(void* data);
void* consume(void* data);
void random_sleep(useconds_t min_milliseconds, useconds_t max_milliseconds);
int main(int argc, char* argv[])
{
srand( time(NULL) );
shared_data_t* shared_data = (shared_data_t*) calloc(1, sizeof(shared_data_t));
if ( shared_data == NULL )
return (void)fprintf(stderr, "error: could not allocate shared memory\n"), 1;
int error = analyze_arguments(argc, argv, shared_data);
if ( error == 0 )
{
shared_data->buffer = (double*) calloc(shared_data->buffer_size, sizeof(double));
if ( shared_data->buffer )
{
sem_init(&shared_data->producer_semaphore, 0 /*pshared*/, shared_data->buffer_size);
sem_init(&shared_data->consumer_semaphore, 0 /*pshared*/, 0);
pthread_mutex_init(&shared_data->stdout_mutex, /*attr*/ NULL);
struct timespec start_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
error = create_threads(shared_data);
if ( error == 0 )
{
struct timespec finish_time;
clock_gettime(CLOCK_MONOTONIC, &finish_time);
double elapsed_seconds = finish_time.tv_sec - start_time.tv_sec
+ 1e-9 * (finish_time.tv_nsec - start_time.tv_nsec);
printf("Simulation time %.9lfs\n", elapsed_seconds);
}
pthread_mutex_destroy(&shared_data->stdout_mutex);
free(shared_data->buffer);
}
else
{
fprintf(stderr, "error: could not allocate memory for %zu products\n", shared_data->buffer_size);
error = 2;
}
}
free(shared_data);
return error;
}
int analyze_arguments(int argc, char* argv[], shared_data_t* shared_data)
{
if ( argc != 7 )
{
fprintf(stderr, "usage: producer_consumer buffer_size rounds"
" min_producer_delay max_producer_delay"
" min_consumer_delay max_consumer_delay\n");
return 1;
}
shared_data->buffer_size = strtoull(argv[1], NULL, 10);
if ( shared_data->buffer_size == 0 )
return 2;
if ( sscanf(argv[2], "%zu", &shared_data->rounds) != 1 || shared_data->rounds == 0 )
return (void)fprintf(stderr, "invalid rounds: %s\n", argv[2]), 2;
if ( sscanf(argv[3], "%u", &shared_data->min_producer_delay) != 1 )
return (void)fprintf(stderr, "invalid min producer delay: %s\n", argv[3]), 3;
if ( sscanf(argv[4], "%u", &shared_data->max_producer_delay) != 1
|| shared_data->max_producer_delay < shared_data->min_producer_delay )
return (void)fprintf(stderr, "invalid max producer delay: %s\n", argv[4]), 4;
if ( sscanf(argv[5], "%u", &shared_data->min_consumer_delay) != 1 )
return (void)fprintf(stderr, "invalid min consumer delay: %s\n", argv[5]), 5;
if ( sscanf(argv[6], "%u", &shared_data->max_consumer_delay) != 1
|| shared_data->max_consumer_delay < shared_data->min_consumer_delay )
return (void)fprintf(stderr, "invalid max consumer delay: %s\n", argv[6]), 6;
return EXIT_SUCCESS;
}
int create_threads(shared_data_t* shared_data)
{
pthread_t producer_thread;
pthread_t consumer_thread;
pthread_create(&producer_thread, NULL, produce, shared_data);
pthread_create(&consumer_thread, NULL, consume, shared_data);
pthread_join(producer_thread, NULL);
pthread_join(consumer_thread, NULL);
return 0;
}
void* produce(void* data)
{
shared_data_t* shared_data = (shared_data_t*)data;
for ( size_t round = 1; round <= shared_data->rounds; ++round )
{
for ( size_t index = 0; index < shared_data->buffer_size; ++index )
{
sem_wait(&shared_data->producer_semaphore);
random_sleep(shared_data->min_producer_delay, shared_data->max_producer_delay);
shared_data->buffer[index] = round + (index + 1) / 100.0;
pthread_mutex_lock(&shared_data->stdout_mutex);
printf("Produced %.2lf\n", shared_data->buffer[index]);
pthread_mutex_unlock(&shared_data->stdout_mutex);
sem_post(&shared_data->consumer_semaphore);
}
}
return NULL;
}
void* consume(void* data)
{
shared_data_t* shared_data = (shared_data_t*)data;
for ( size_t round = 1; round <= shared_data->rounds; ++round )
{
for ( size_t index = 0; index < shared_data->buffer_size; ++index )
{
sem_wait(&shared_data->consumer_semaphore);
random_sleep(shared_data->min_consumer_delay, shared_data->max_consumer_delay);
pthread_mutex_lock(&shared_data->stdout_mutex);
printf("\t\t\tConsumed %.2lf\n", shared_data->buffer[index]);
pthread_mutex_unlock(&shared_data->stdout_mutex);
sem_post(&shared_data->producer_semaphore);
}
}
return NULL;
}
void random_sleep(useconds_t min_milliseconds, useconds_t max_milliseconds)
{
useconds_t duration = min_milliseconds;
useconds_t range = max_milliseconds - min_milliseconds;
if ( range > 0 )
duration += rand() % range;
usleep( 1000 * duration );
}