#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
// thread shared data
typedef struct
{
size_t thread_count;
size_t data_size;
double* data;
size_t rounds;
unsigned producer_max_delay; // milliseconds
unsigned consumer_max_delay; // milliseconds
sem_t producer_semaphore;
sem_t consumer_semaphore;
pthread_mutex_t stdout_mutex;
} shared_data_t;
void* produce(void* data)
{
shared_data_t* shared_data = (shared_data_t*)data;
for ( size_t round = 1; round <= shared_data->rounds; ++round )
{
for ( size_t index = 0; index < shared_data->data_size; ++index )
{
sem_wait( &shared_data->producer_semaphore );
// Producer requires time to produce a value
usleep( rand() % (shared_data->producer_max_delay + 1) * 1000 );
double product = round + (index + 1) / 100.0;
shared_data->data[index] = product;
pthread_mutex_lock( &shared_data->stdout_mutex );
printf("Produced %.2f\n", product);
pthread_mutex_unlock( &shared_data->stdout_mutex );
sem_post( &shared_data->consumer_semaphore );
}
}
return NULL;
}
void* consume(void* data)
{
shared_data_t* shared_data = (shared_data_t*)data;
for ( size_t round = 1; round <= shared_data->rounds; ++round )
{
for ( size_t index = 0; index < shared_data->data_size; ++index )
{
sem_wait( &shared_data->consumer_semaphore );
// Consumer requires time to consume a value
usleep( rand() % (shared_data->consumer_max_delay + 1) * 1000 );
double product = shared_data->data[index];
pthread_mutex_lock( &shared_data->stdout_mutex );
printf("\t\t\tConsumed %.2f\n", product);
pthread_mutex_unlock( &shared_data->stdout_mutex );
sem_post( &shared_data->producer_semaphore );
}
}
return NULL;
}
int main(int argc, char* argv[])
{
srand( time(NULL) );
shared_data_t shared_data;
shared_data.thread_count = 2;
if ( argc == 5 )
{
if ( sscanf(argv[1], "%zu", &shared_data.data_size) != 1 || shared_data.data_size == 0 )
return (void)fprintf(stderr, "producer_consumer: error: invalid data size: %s\n", argv[1]), 1;
if ( sscanf(argv[2], "%zu", &shared_data.rounds) != 1 || shared_data.rounds == 0 )
return (void)fprintf(stderr, "producer_consumer: error: invalid rounds: %s\n", argv[2]), 1;
if ( sscanf(argv[3], "%u", &shared_data.producer_max_delay) != 1 )
return (void)fprintf(stderr, "producer_consumer: error: invalid producer max delay: %s\n", argv[3]), 1;
if ( sscanf(argv[4], "%u", &shared_data.consumer_max_delay) != 1 )
return (void)fprintf(stderr, "producer_consumer: error: invalid consumer max delay: %s\n", argv[4]), 1;
}
else
return (void)fprintf(stderr, "usage: producer_consumer data_size rounds producer_delay consumer_delay\n"), 1;
shared_data.data = (double*) calloc(shared_data.data_size, sizeof(double));
if ( shared_data.data == NULL )
return 6;
sem_init( &shared_data.producer_semaphore, 0, shared_data.data_size );
sem_init( &shared_data.consumer_semaphore, 0, 0);
pthread_mutex_init( &shared_data.stdout_mutex, NULL );
pthread_t* threads = (pthread_t*)malloc(shared_data.thread_count * sizeof(pthread_t));
if ( threads == NULL )
return (void)fprintf(stderr, "hello_w: error: could not allocate memory for: %zu threads\n", shared_data.thread_count), 2;
struct timespec start_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
// Create producer
pthread_create(&threads[0], NULL, produce, &shared_data);
// Create consumer
pthread_create(&threads[1], NULL, consume, &shared_data);
for ( size_t index = 0; index < shared_data.thread_count; ++index )
pthread_join(threads[index], NULL);
struct timespec finish_time;
clock_gettime(CLOCK_MONOTONIC, &finish_time);
double seconds = finish_time.tv_sec - start_time.tv_sec
+ (finish_time.tv_nsec - start_time.tv_nsec) * 1e-9;
printf("Simulation time: %.9lfs\n", seconds);
pthread_mutex_destroy( &shared_data.stdout_mutex );
sem_destroy(&shared_data.consumer_semaphore);
sem_destroy(&shared_data.producer_semaphore);
free(shared_data.data);
free(threads);
return 0;
}