/* * Copyright 2021 Jeisson Hidalgo-Cespedes - Universidad de Costa Rica * Creates a secondary thread that greets in the standard output */ #include #include #include #include #include #include #include #include typedef struct shared_thread_data { size_t buffer_size; double* buffer; size_t rounds; useconds_t min_producer_delay; useconds_t max_producer_delay; useconds_t min_consumer_delay; useconds_t max_consumer_delay; sem_t can_produce; sem_t can_consume; pthread_mutex_t stdout_mutex; } shared_thread_data_t; int analyze_arguments(int argc, char* argv[] , shared_thread_data_t* shared_thread_data); int simulate_producer_consumer(shared_thread_data_t* shared_thread_data); int create_threads(shared_thread_data_t* shared_thread_data); void* produce(void* data); void* consume(void* data); /** * @param min must be less than @a max * @param max must be greater than @a min */ void random_delay(useconds_t min, useconds_t max); int main(int argc, char* argv[]) { int error = 0; struct timespec time; clock_gettime(/*clk_id*/CLOCK_MONOTONIC, &time); srand(time.tv_nsec); shared_thread_data_t* shared_thread_data = (shared_thread_data_t*) calloc(1, sizeof(shared_thread_data_t)); error = analyze_arguments(argc, argv, shared_thread_data); if (error == EXIT_SUCCESS) { error = simulate_producer_consumer(shared_thread_data); } return error; } int analyze_arguments(int argc, char* argv[] , shared_thread_data_t* shared_thread_data) { int error = 0; if (argc == 7) { if (sscanf(argv[1], "%zu", &shared_thread_data->buffer_size) != 1 || shared_thread_data->buffer_size == 0) { fprintf(stderr, "error: invalid buffer size\n"); error = 2; } else if (sscanf(argv[2], "%zu", &shared_thread_data->rounds) != 1 || shared_thread_data->rounds == 0) { fprintf(stderr, "error: invalid round count\n"); error = 3; } else if (sscanf(argv[3], "%u" , &shared_thread_data->min_producer_delay) != 1) { fprintf(stderr, "error: invalid min producer delay\n"); error = 4; } else if (sscanf(argv[4], "%u" , &shared_thread_data->max_producer_delay) != 1) { fprintf(stderr, "error: invalid max producer delay\n"); error = 5; } else if (sscanf(argv[5], "%u" , &shared_thread_data->min_consumer_delay) != 1) { fprintf(stderr, "error: invalid min consumer delay\n"); error = 6; } else if (sscanf(argv[6], "%u" , &shared_thread_data->max_consumer_delay) != 1) { fprintf(stderr, "error: invalid max consumer delay\n"); error = 7; } } else { fprintf(stderr, "usage: producer_consumer buffer_size rounds" " min_producer_delay max_producer_delay" " min_consumer_delay max_consumer_delay\n"); error = 1; } return error; } int simulate_producer_consumer(shared_thread_data_t* shared_thread_data) { assert(shared_thread_data); int error = 0; if (shared_thread_data) { shared_thread_data->buffer = (double*) calloc(shared_thread_data->buffer_size, sizeof(double)); if (shared_thread_data->buffer) { error = sem_init(&shared_thread_data->can_produce, /*pshared*/0 , shared_thread_data->buffer_size); error += sem_init(&shared_thread_data->can_consume, /*pshared*/0, 0); error += pthread_mutex_init(&shared_thread_data->stdout_mutex , /*attr*/NULL); if (error == 0) { struct timespec start_time, finish_time; clock_gettime(/*clk_id*/CLOCK_MONOTONIC, &start_time); error = create_threads(shared_thread_data); clock_gettime(/*clk_id*/CLOCK_MONOTONIC, &finish_time); double elapsed_time = finish_time.tv_sec - start_time.tv_sec + (finish_time.tv_nsec - start_time.tv_nsec) * 1e-9; printf("execution time: %.9lfs\n", elapsed_time); pthread_mutex_destroy(&shared_thread_data->stdout_mutex); } else { fprintf(stderr, "error: could not init mutex\n"); error = 11; } } else { fprintf(stderr, "error: could not allocated shared memory\n"); error = 13; } free(shared_thread_data->buffer); free(shared_thread_data); } else { fprintf(stderr, "error: could not allocated shared memory\n"); error = 12; } return error; } int create_threads(shared_thread_data_t* shared_thread_data) { assert(shared_thread_data); int error = 0; pthread_t producer_thread; pthread_t consumer_thread; error = pthread_create(&producer_thread, NULL, produce, shared_thread_data); if (error == 0) { error = pthread_create(&consumer_thread, NULL, consume, shared_thread_data); if (error == 0) { pthread_join(consumer_thread, NULL); } else { fprintf(stderr, "error: could not create consumer thread\n"); error = 22; } pthread_join(producer_thread, NULL); } else { fprintf(stderr, "error: could not create producer thread\n"); error = 21; } return error; } void* produce(void* data) { assert(data); shared_thread_data_t* shared_data = (shared_thread_data_t*)data; for (size_t round = 1; round <= shared_data->rounds; ++round) { for (size_t index = 0; index < shared_data->buffer_size; ++index) { sem_wait(&shared_data->can_produce); random_delay(shared_data->min_producer_delay , shared_data->max_producer_delay); shared_data->buffer[index] = round + (index + 1) / 100.0; pthread_mutex_lock(&shared_data->stdout_mutex); printf("Produced %.2lf\n", shared_data->buffer[index]); pthread_mutex_unlock(&shared_data->stdout_mutex); sem_post(&shared_data->can_consume); } } return NULL; } void* consume(void* data) { assert(data); shared_thread_data_t* shared_data = (shared_thread_data_t*)data; for (size_t round = 1; round <= shared_data->rounds; ++round) { for (size_t index = 0; index < shared_data->buffer_size; ++index) { sem_wait(&shared_data->can_consume); random_delay(shared_data->min_consumer_delay , shared_data->max_consumer_delay); double value = shared_data->buffer[index]; pthread_mutex_lock(&shared_data->stdout_mutex); printf("\t\tConsumed %.2lf\n", value); pthread_mutex_unlock(&shared_data->stdout_mutex); sem_post(&shared_data->can_produce); } } return NULL; } void random_delay(useconds_t min, useconds_t max) { assert(min <= max); useconds_t milliseconds = min; if (max > min) { milliseconds += rand() % (max - min); } usleep(milliseconds * 1000); }