Download c source code

#include <assert.h>
#include <fcntl.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#define PRODUCER_SEMAPHORE_NAME "/producer"
#define CONSUMER_SEMAPHORE_NAME "/consumer"

// thread shared data
typedef struct
{
	size_t thread_count;
	size_t data_size;
	double* data;
	size_t rounds;
	unsigned producer_min_delay; // milliseconds
	unsigned producer_max_delay; // milliseconds
	unsigned consumer_min_delay; // milliseconds
	unsigned consumer_max_delay; // milliseconds
	sem_t* producer_semaphore;
	sem_t* consumer_semaphore;
} shared_data_t;

int analyze_arguments(int argc, char* argv[], shared_data_t* shared_data);
void* produce(void* data);
void* consume(void* data);

int main(int argc, char* argv[])
{
	srand( time(NULL) );
	
	shared_data_t shared_data;
	shared_data.thread_count = 2;

	int error = analyze_arguments(argc, argv, &shared_data);
	if ( error )
		return error;
	
	shared_data.data = (double*) calloc(shared_data.data_size, sizeof(double));
	if ( shared_data.data == NULL )
		return (void)fprintf(stderr, "hello_w: error: could not allocate memory for: %zu data\n", shared_data.data_size), 8;

	shared_data.producer_semaphore = sem_open(PRODUCER_SEMAPHORE_NAME, O_CREAT | O_EXCL, 0644, shared_data.data_size);
	shared_data.consumer_semaphore = sem_open(CONSUMER_SEMAPHORE_NAME, O_CREAT | O_EXCL, 0644, 0);
	assert(shared_data.producer_semaphore != SEM_FAILED && shared_data.consumer_semaphore != SEM_FAILED);
	
	pthread_t* threads = (pthread_t*) malloc(shared_data.thread_count * sizeof(pthread_t));
	if ( threads == NULL )
		return (void)fprintf(stderr, "hello_w: error: could not allocate memory for: %zu threads\n", shared_data.thread_count), 9;

	struct timespec start_time;
	clock_gettime(CLOCK_MONOTONIC, &start_time);

	// Create producer
	pthread_create(&threads[0], NULL, produce, &shared_data);
	// Create consumer
	pthread_create(&threads[1], NULL, consume, &shared_data);
	
	for ( size_t index = 0; index < shared_data.thread_count; ++index )
		pthread_join(threads[index], NULL);

	struct timespec finish_time;
	clock_gettime(CLOCK_MONOTONIC, &finish_time);
	
	double seconds = finish_time.tv_sec - start_time.tv_sec
		+ (finish_time.tv_nsec - start_time.tv_nsec) * 1e-9;

	printf("Simulation time: %.9lfs\n", seconds);

	sem_close(shared_data.consumer_semaphore);
	sem_close(shared_data.producer_semaphore);
	sem_unlink(CONSUMER_SEMAPHORE_NAME);
	sem_unlink(PRODUCER_SEMAPHORE_NAME);
	free(shared_data.data);
	free(threads);

	return 0;
}

int analyze_arguments(int argc, char* argv[], shared_data_t* shared_data)
{
	if ( argc != 7 )
		return (void)fprintf(stderr, "usage: producer_consumer_2 data_size rounds min_producer_delay"
			" max_producer_delay min_consumer_delay max_consumer_delay\n"), 1;

	if ( sscanf(argv[1], "%zu", &shared_data->data_size) != 1 || shared_data->data_size == 0 )
		return (void)fprintf(stderr, "producer_consumer: error: invalid data size: %s\n", argv[1]), 2;
	if ( sscanf(argv[2], "%zu", &shared_data->rounds) != 1 || shared_data->rounds == 0 )
		return (void)fprintf(stderr, "producer_consumer: error: invalid rounds: %s\n", argv[2]), 3;
	if ( sscanf(argv[3], "%u", &shared_data->producer_min_delay) != 1 )
		return (void)fprintf(stderr, "producer_consumer: error: invalid producer min delay: %s\n", argv[3]), 4;
	if ( sscanf(argv[4], "%u", &shared_data->producer_max_delay) != 1 || shared_data->producer_max_delay < shared_data->producer_min_delay )
		return (void)fprintf(stderr, "producer_consumer: error: invalid producer max delay: %s\n", argv[4]), 5;
	if ( sscanf(argv[5], "%u", &shared_data->consumer_min_delay) != 1 )
		return (void)fprintf(stderr, "producer_consumer: error: invalid consumer min delay: %s\n", argv[5]), 6;
	if ( sscanf(argv[6], "%u", &shared_data->consumer_max_delay) != 1 || shared_data->consumer_max_delay < shared_data->consumer_min_delay )
		return (void)fprintf(stderr, "producer_consumer: error: invalid consumer max delay: %s\n", argv[6]), 7;

	// Success
	return 0;
}

void random_wait(unsigned min_milliseconds, unsigned max_milliseconds)
{
	unsigned range = max_milliseconds - min_milliseconds;
	usleep( (min_milliseconds + rand() % (range + 1)) * 1000 );
}

double generate_product(size_t round, size_t index, const shared_data_t* shared_data)
{
	// Producer requires time to produce a value
	random_wait(shared_data->producer_min_delay, shared_data->producer_max_delay);

	// Produce a value
	double product = round + (index + 1) / 100.0;;
	printf("Produced %.2f\n", product);
	return product;
}

void consume_product(double product, const shared_data_t* shared_data)
{
	// Consumer requires time to consume a value
	random_wait(shared_data->consumer_min_delay, shared_data->consumer_max_delay);
	printf("\t\tConsumed %.2f\n", product);
}

void* produce(void* data)
{
	shared_data_t* shared_data = (shared_data_t*)data;

	for ( size_t round = 1; round <= shared_data->rounds; ++round )
	{
		for ( size_t index = 0; index < shared_data->data_size; ++index )
		{
			sem_wait( shared_data->producer_semaphore );
			shared_data->data[index] = generate_product(round, index, shared_data);
			sem_post( shared_data->consumer_semaphore );
		}
	}

	return NULL;
}

void* consume(void* data)
{
	shared_data_t* shared_data = (shared_data_t*)data;

	for ( size_t round = 1; round <= shared_data->rounds; ++round )
	{
		for ( size_t index = 0; index < shared_data->data_size; ++index )
		{
			sem_wait( shared_data->consumer_semaphore );
			consume_product( shared_data->data[index], shared_data );
			sem_post( shared_data->producer_semaphore );
		}
	}

	return NULL;
}