/* * Copyright 2021 Jeisson Hidalgo-Cespedes - Universidad de Costa Rica * Creates a secondary thread that greets in the standard output */ #include #include #include #include #include #include #include #include #include typedef struct queue_node { size_t product_number; struct queue_node* next; } queue_node_t; /** * Thread-safe queue */ typedef struct queue { queue_node_t* head; queue_node_t* tail; pthread_mutex_t mutex; } queue_t; int queue_init(queue_t* queue); int queue_destroy(queue_t* queue); bool queue_is_empty(queue_t* queue); int queue_append(queue_t* queue, size_t data); /** * @remarks Queue must be not empty, otherwise it will crash */ size_t queue_dequeue(queue_t* queue); int queue_free(queue_t* queue); typedef struct shared_data { size_t product_count; size_t next_product_index; pthread_mutex_t next_product_mutex; queue_t queue; size_t producer_count; size_t consumer_count; useconds_t min_producer_delay; useconds_t max_producer_delay; useconds_t min_consumer_delay; useconds_t max_consumer_delay; sem_t can_consume; pthread_mutex_t stdout_mutex; } shared_thread_data_t; typedef struct private_thread_data { size_t thread_number; // rank shared_thread_data_t* shared_data; } private_thread_data_t; int analyze_arguments(int argc, char* argv[] , shared_thread_data_t* shared_data); int simulate_producer_consumer(shared_thread_data_t* shared_data); int create_threads(shared_thread_data_t* shared_data); void* produce(void* data); void* consume(void* data); /** * @param min must be less than @a max * @param max must be greater than @a min */ void random_delay(useconds_t min, useconds_t max, unsigned* seedp); int main(int argc, char* argv[]) { int error = 0; shared_thread_data_t* shared_data = (shared_thread_data_t*) calloc(1, sizeof(shared_thread_data_t)); error = analyze_arguments(argc, argv, shared_data); if (error == EXIT_SUCCESS) { error = simulate_producer_consumer(shared_data); } return error; } int analyze_arguments(int argc, char* argv[] , shared_thread_data_t* shared_data) { int error = 0; if (argc == 8) { if (sscanf(argv[1], "%zu", &shared_data->product_count) != 1 || shared_data->product_count == 0) { fprintf(stderr, "error: invalid product count\n"); error = 2; } else if (sscanf(argv[2], "%zu", &shared_data->producer_count) != 1 || shared_data->producer_count == 0) { fprintf(stderr, "error: invalid producer count\n"); error = 3; } else if (sscanf(argv[3], "%zu", &shared_data->consumer_count) != 1 || shared_data->consumer_count == 0) { fprintf(stderr, "error: invalid consumer count\n"); error = 4; } else if (sscanf(argv[4], "%u" , &shared_data->min_producer_delay) != 1) { fprintf(stderr, "error: invalid min producer delay\n"); error = 5; } else if (sscanf(argv[5], "%u" , &shared_data->max_producer_delay) != 1) { fprintf(stderr, "error: invalid max producer delay\n"); error = 6; } else if (sscanf(argv[6], "%u" , &shared_data->min_consumer_delay) != 1) { fprintf(stderr, "error: invalid min consumer delay\n"); error = 7; } else if (sscanf(argv[7], "%u" , &shared_data->max_consumer_delay) != 1) { fprintf(stderr, "error: invalid max consumer delay\n"); error = 8; } } else { fprintf(stderr, "usage: producer_consumer product_count producers consumers" " min_producer_delay max_producer_delay" " min_consumer_delay max_consumer_delay\n"); error = 1; } return error; } int simulate_producer_consumer(shared_thread_data_t* shared_data) { assert(shared_data); int error = 0; if (shared_data) { error += queue_init(&shared_data->queue); error += sem_init(&shared_data->can_consume, /*pshared*/0, 0); error += pthread_mutex_init(&shared_data->stdout_mutex, /*attr*/NULL); error += pthread_mutex_init(&shared_data->next_product_mutex , /*attr*/NULL); if (error == 0) { struct timespec start_time, finish_time; clock_gettime(/*clk_id*/CLOCK_MONOTONIC, &start_time); error = create_threads(shared_data); clock_gettime(/*clk_id*/CLOCK_MONOTONIC, &finish_time); double elapsed_time = finish_time.tv_sec - start_time.tv_sec + (finish_time.tv_nsec - start_time.tv_nsec) * 1e-9; printf("execution time: %.9lfs\n", elapsed_time); pthread_mutex_destroy(&shared_data->next_product_mutex); pthread_mutex_destroy(&shared_data->stdout_mutex); } else { fprintf(stderr, "error: could not init mutex\n"); error = 11; } queue_free(&shared_data->queue); queue_destroy(&shared_data->queue); free(shared_data); } else { fprintf(stderr, "error: could not allocated shared memory\n"); error = 12; } return error; } int create_threads(shared_thread_data_t* shared_data) { assert(shared_data); int error = 0; const size_t thread_count = shared_data->producer_count + shared_data->consumer_count; pthread_t* threads = (pthread_t*) malloc(thread_count * sizeof(pthread_t)); private_thread_data_t* private_data = (private_thread_data_t*) calloc(thread_count, sizeof(private_thread_data_t)); if (threads && private_data) { for (size_t index = 0; index < shared_data->producer_count; ++index) { private_data[index].thread_number = index; private_data[index].shared_data = shared_data; error = pthread_create(&threads[index], NULL, produce , &private_data[index]); if (error) { fprintf(stderr, "error: could not create thread %zu\n", index); error = 21; break; } } for (size_t index = 0; index < shared_data->consumer_count; ++index) { const size_t array_index = shared_data->producer_count + index; private_data[array_index].thread_number = index; private_data[array_index].shared_data = shared_data; error = pthread_create(&threads[array_index], NULL, consume , &private_data[array_index]); if (error) { fprintf(stderr, "error: could not create thread %zu\n", array_index); error = 21; break; } } for (size_t index = 0; index < thread_count; ++index) { pthread_join(threads[index], NULL); } free(private_data); free(threads); } else { fprintf(stderr, "error: could not allocate memory for %zu threads\n" , thread_count); error = 22; } return error; } void* produce(void* data) { assert(data); private_thread_data_t* private_data = (private_thread_data_t*)data; shared_thread_data_t *shared_data = private_data->shared_data; struct timespec time; clock_gettime(/*clk_id*/CLOCK_MONOTONIC, &time); unsigned int seed = time.tv_nsec; while (true) { pthread_mutex_lock(&shared_data->next_product_mutex); const size_t product_index = shared_data->next_product_index++; pthread_mutex_unlock(&shared_data->next_product_mutex); if (product_index >= shared_data->product_count) { // Give all consumers an opportunity to leave the semaphor if (product_index >= shared_data->product_count + shared_data->producer_count - 1) { printf("All producers finished\n"); for (size_t index = 0; index < shared_data->consumer_count; ++index) { sem_post(&shared_data->can_consume); } } break; } random_delay(shared_data->min_producer_delay , shared_data->max_producer_delay, &seed); pthread_mutex_lock(&shared_data->stdout_mutex); printf("%zu produced %zu\n", private_data->thread_number , product_index + 1); pthread_mutex_unlock(&shared_data->stdout_mutex); queue_append(&shared_data->queue, product_index); sem_post(&shared_data->can_consume); } return NULL; } void* consume(void* data) { assert(data); private_thread_data_t* private_data = (private_thread_data_t*)data; shared_thread_data_t *shared_data = private_data->shared_data; struct timespec time; clock_gettime(/*clk_id*/CLOCK_MONOTONIC, &time); unsigned int seed = time.tv_nsec; while (true) { sem_wait(&shared_data->can_consume); if (queue_is_empty(&shared_data->queue)) { break; } size_t product_index = queue_dequeue(&shared_data->queue); pthread_mutex_lock(&shared_data->stdout_mutex); printf("\t\t%zu consuming %zu\n", private_data->thread_number , product_index + 1); pthread_mutex_unlock(&shared_data->stdout_mutex); random_delay(shared_data->min_consumer_delay , shared_data->max_consumer_delay, &seed); // pthread_mutex_lock(&shared_data->stdout_mutex); // printf("\t\t%zu consumed %zu\n", private_data->thread_number // , product_index + 1); // pthread_mutex_unlock(&shared_data->stdout_mutex); } return NULL; } void random_delay(useconds_t min, useconds_t max, unsigned* seedp) { assert(min <= max); useconds_t milliseconds = min; if (max > min) { milliseconds += rand_r(seedp) % (max - min); } usleep(milliseconds * 1000); } // ------------ queue.c bool queue_is_empty_private(const queue_t* queue); int queue_init(queue_t* queue) { assert(queue); return pthread_mutex_init(&queue->mutex, /*attr*/NULL); } int queue_destroy(queue_t* queue) { assert(queue); return pthread_mutex_destroy(&queue->mutex); } bool queue_is_empty_private(const queue_t* queue) { assert(queue); return queue->head == NULL; } int queue_append(queue_t* queue, size_t data) { assert(queue); int error = 0; queue_node_t* new_node = (queue_node_t*)calloc(1, sizeof(queue_node_t)); if (new_node) { new_node->product_number = data; pthread_mutex_lock(&queue->mutex); if (queue_is_empty_private(queue)) { queue->head = queue->tail = new_node; } else { queue->tail = queue->tail->next = new_node; } pthread_mutex_unlock(&queue->mutex); } else { fprintf(stderr, "error: could not allocate memory for a queue node\n"); error = 41; } return error; } bool queue_is_empty(queue_t* queue) { assert(queue); pthread_mutex_lock(&queue->mutex); bool result = queue->head == NULL; pthread_mutex_unlock(&queue->mutex); return result; } int queue_free(queue_t* queue) { assert(queue); return 0; } size_t queue_dequeue(queue_t* queue) { assert(queue); pthread_mutex_lock(&queue->mutex); assert(queue_is_empty_private(queue) == false); queue_node_t* first = queue->head; size_t data = first->product_number; queue->head = first->next; free(first); if (queue->head == NULL) { queue->tail = NULL; } pthread_mutex_unlock(&queue->mutex); return data; } #if 0 static unsigned xn = 0; unsigned my_rand(void) { unsigned xn_1 = (1103515245 * xn + 12345) % 100; return xn = xn_1; } void my_srand(unsigned seed_x0) { xn = seed_x0; } unsigned my_rand_r(unsigned* xn) { unsigned xn_1 = (1103515245 * *xn + 12345) % 100; return *xn = xn_1; } int main2(void) { my_srand(time(NULL)); for (int index = 0; index < 20; ++index) { printf("%d: %u\n", index, my_rand()); } } int run0(void) { unsigned xn1 = time(NULL); unsigned xn2 = time(NULL); printf("%d: %u\n", index, my_rand_r(&xn1)); printf("%d: %u\n", index, my_rand_r(&xn2)); } int run1(void) { unsigned xn = time(NULL); for (int index = 0; index < 20; ++index) { printf("%d: %u\n", index, my_rand_r(&xn)); } } #endif