#include <assert.h>
#include <fcntl.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#define PRODUCER_SEMAPHORE_NAME "/producer"
#define CONSUMER_SEMAPHORE_NAME "/consumer"
// thread shared data
typedef struct
{
size_t thread_count;
size_t data_size;
double* data;
size_t rounds;
unsigned producer_min_delay; // milliseconds
unsigned producer_max_delay; // milliseconds
unsigned consumer_min_delay; // milliseconds
unsigned consumer_max_delay; // milliseconds
sem_t* producer_semaphore;
sem_t* consumer_semaphore;
} shared_data_t;
int analyze_arguments(int argc, char* argv[], shared_data_t* shared_data);
void* produce(void* data);
void* consume(void* data);
int main(int argc, char* argv[])
{
srand( time(NULL) );
shared_data_t shared_data;
shared_data.thread_count = 2;
int error = analyze_arguments(argc, argv, &shared_data);
if ( error )
return error;
shared_data.data = (double*) calloc(shared_data.data_size, sizeof(double));
if ( shared_data.data == NULL )
return (void)fprintf(stderr, "hello_w: error: could not allocate memory for: %zu data\n", shared_data.data_size), 8;
shared_data.producer_semaphore = sem_open(PRODUCER_SEMAPHORE_NAME, O_CREAT | O_EXCL, 0644, shared_data.data_size);
shared_data.consumer_semaphore = sem_open(CONSUMER_SEMAPHORE_NAME, O_CREAT | O_EXCL, 0644, 0);
assert(shared_data.producer_semaphore != SEM_FAILED && shared_data.consumer_semaphore != SEM_FAILED);
pthread_t* threads = (pthread_t*) malloc(shared_data.thread_count * sizeof(pthread_t));
if ( threads == NULL )
return (void)fprintf(stderr, "hello_w: error: could not allocate memory for: %zu threads\n", shared_data.thread_count), 9;
struct timespec start_time;
clock_gettime(CLOCK_MONOTONIC, &start_time);
// Create producer
pthread_create(&threads[0], NULL, produce, &shared_data);
// Create consumer
pthread_create(&threads[1], NULL, consume, &shared_data);
for ( size_t index = 0; index < shared_data.thread_count; ++index )
pthread_join(threads[index], NULL);
struct timespec finish_time;
clock_gettime(CLOCK_MONOTONIC, &finish_time);
double seconds = finish_time.tv_sec - start_time.tv_sec
+ (finish_time.tv_nsec - start_time.tv_nsec) * 1e-9;
printf("Simulation time: %.9lfs\n", seconds);
sem_close(shared_data.consumer_semaphore);
sem_close(shared_data.producer_semaphore);
sem_unlink(CONSUMER_SEMAPHORE_NAME);
sem_unlink(PRODUCER_SEMAPHORE_NAME);
free(shared_data.data);
free(threads);
return 0;
}
int analyze_arguments(int argc, char* argv[], shared_data_t* shared_data)
{
if ( argc != 7 )
return (void)fprintf(stderr, "usage: producer_consumer_2 data_size rounds min_producer_delay"
" max_producer_delay min_consumer_delay max_consumer_delay\n"), 1;
if ( sscanf(argv[1], "%zu", &shared_data->data_size) != 1 || shared_data->data_size == 0 )
return (void)fprintf(stderr, "producer_consumer: error: invalid data size: %s\n", argv[1]), 2;
if ( sscanf(argv[2], "%zu", &shared_data->rounds) != 1 || shared_data->rounds == 0 )
return (void)fprintf(stderr, "producer_consumer: error: invalid rounds: %s\n", argv[2]), 3;
if ( sscanf(argv[3], "%u", &shared_data->producer_min_delay) != 1 )
return (void)fprintf(stderr, "producer_consumer: error: invalid producer min delay: %s\n", argv[3]), 4;
if ( sscanf(argv[4], "%u", &shared_data->producer_max_delay) != 1 || shared_data->producer_max_delay < shared_data->producer_min_delay )
return (void)fprintf(stderr, "producer_consumer: error: invalid producer max delay: %s\n", argv[4]), 5;
if ( sscanf(argv[5], "%u", &shared_data->consumer_min_delay) != 1 )
return (void)fprintf(stderr, "producer_consumer: error: invalid consumer min delay: %s\n", argv[5]), 6;
if ( sscanf(argv[6], "%u", &shared_data->consumer_max_delay) != 1 || shared_data->consumer_max_delay < shared_data->consumer_min_delay )
return (void)fprintf(stderr, "producer_consumer: error: invalid consumer max delay: %s\n", argv[6]), 7;
// Success
return 0;
}
void random_wait(unsigned min_milliseconds, unsigned max_milliseconds)
{
unsigned range = max_milliseconds - min_milliseconds;
usleep( (min_milliseconds + rand() % (range + 1)) * 1000 );
}
double generate_product(size_t round, size_t index, const shared_data_t* shared_data)
{
// Producer requires time to produce a value
random_wait(shared_data->producer_min_delay, shared_data->producer_max_delay);
// Produce a value
double product = round + (index + 1) / 100.0;;
printf("Produced %.2f\n", product);
return product;
}
void consume_product(double product, const shared_data_t* shared_data)
{
// Consumer requires time to consume a value
random_wait(shared_data->consumer_min_delay, shared_data->consumer_max_delay);
printf("\t\tConsumed %.2f\n", product);
}
void* produce(void* data)
{
shared_data_t* shared_data = (shared_data_t*)data;
for ( size_t round = 1; round <= shared_data->rounds; ++round )
{
for ( size_t index = 0; index < shared_data->data_size; ++index )
{
sem_wait( shared_data->producer_semaphore );
shared_data->data[index] = generate_product(round, index, shared_data);
sem_post( shared_data->consumer_semaphore );
}
}
return NULL;
}
void* consume(void* data)
{
shared_data_t* shared_data = (shared_data_t*)data;
for ( size_t round = 1; round <= shared_data->rounds; ++round )
{
for ( size_t index = 0; index < shared_data->data_size; ++index )
{
sem_wait( shared_data->consumer_semaphore );
consume_product( shared_data->data[index], shared_data );
sem_post( shared_data->producer_semaphore );
}
}
return NULL;
}