/* * Copyright 2021 Jeisson Hidalgo-Cespedes - Universidad de Costa Rica */ #include #include #include #include #include #include "dynamic_simulation.h" #define MIN(a, b) ((a) < (b) ? (a) : (b)) typedef struct private_thread_data { size_t thread_number; // rank dynamic_simulation_t* dynamic_simulation; } private_thread_data_t; int dynamic_simulation_create_threads(dynamic_simulation_t* dynamic_simulation); void dynamic_simulation_print_statistics( dynamic_simulation_t* dynamic_simulation); int produce(dynamic_simulation_t* dynamic_simulation); void* work(void* data); int dynamic_simulation_init(dynamic_simulation_t* dynamic_simulation, simulation_t* simulation) { assert(dynamic_simulation); assert(simulation); dynamic_simulation->simulation = simulation; dynamic_simulation->durations = calloc(simulation->thread_count , sizeof(data_t)); queue_init(&dynamic_simulation->work_queue); sem_init(&dynamic_simulation->can_work, /*pshared*/ 0, 0); return 0; } void dynamic_simulation_destroy(dynamic_simulation_t* dynamic_simulation) { assert(dynamic_simulation); sem_destroy(&dynamic_simulation->can_work); queue_destroy(&dynamic_simulation->work_queue); free(dynamic_simulation->durations); } int dynamic_simulation_run(dynamic_simulation_t* dynamic_simulation) { assert(dynamic_simulation); dynamic_simulation_create_threads(dynamic_simulation); dynamic_simulation_print_statistics(dynamic_simulation); return 0; } int dynamic_simulation_create_threads( dynamic_simulation_t* dynamic_simulation) { int error = 0; simulation_t* simulation = dynamic_simulation->simulation; pthread_t* workers = (pthread_t*) malloc(simulation->thread_count * sizeof(pthread_t)); private_thread_data_t* private_thread_data = (private_thread_data_t*) calloc(simulation->thread_count, sizeof(private_thread_data_t)); if (workers && private_thread_data) { clock_gettime(/*clk_id*/CLOCK_MONOTONIC, &dynamic_simulation->start_time); for (size_t index = 0; index < simulation->thread_count; ++index) { private_thread_data[index].thread_number = index; private_thread_data[index].dynamic_simulation = dynamic_simulation; error = pthread_create(&workers[index], NULL, work , &private_thread_data[index]); if (error) { fprintf(stderr, "error: could not create thread %zu\n", index); error = 21; } } produce(dynamic_simulation); for (size_t index = 0; index < simulation->thread_count; ++index) { pthread_join(workers[index], NULL); } free(private_thread_data); free(workers); } else { fprintf(stderr, "error: could not allocate memory for %zu threads\n" , simulation->thread_count); error = 22; } return error; } void dynamic_simulation_print_statistics( dynamic_simulation_t* dynamic_simulation) { assert(dynamic_simulation); simulation_t* simulation = dynamic_simulation->simulation; printf("%-7s", "Dynamic"); data_t max_duration = 0; for (size_t index = 0; index < simulation->thread_count; ++index) { if (dynamic_simulation->durations[index] > max_duration) { max_duration = dynamic_simulation->durations[index]; } printf(" %4u", dynamic_simulation->durations[index]); } printf(" %8u", max_duration); assert(max_duration > 0); const double speedup = (double)dynamic_simulation->simulation->total_duration / max_duration; printf(" %7.2lf", speedup); const double efficiency = speedup / simulation->thread_count; printf(" %10.2lf", efficiency); printf(" %12.9lf", calculate_elapsed_time(&dynamic_simulation->start_time)); putchar('\n'); } int produce(dynamic_simulation_t* dynamic_simulation) { assert(dynamic_simulation); simulation_t* simulation = dynamic_simulation->simulation; int error = EXIT_SUCCESS; for (size_t index = 0; index < simulation->work.count; ++index) { error = queue_append(&dynamic_simulation->work_queue , simulation->work.elements[index]); if (error) { break; } sem_post(&dynamic_simulation->can_work); } // Produce the end-of-work marker if (error == EXIT_SUCCESS) { error = queue_append(&dynamic_simulation->work_queue, END_OF_WORK); if (error == EXIT_SUCCESS) { sem_post(&dynamic_simulation->can_work); } } return error; } void* work(void* data) { private_thread_data_t* private_data = (private_thread_data_t*)data; dynamic_simulation_t* dynamic_simulation = private_data->dynamic_simulation; const size_t rank = private_data->thread_number; while (true) { sem_wait(&dynamic_simulation->can_work); assert(queue_is_empty(&dynamic_simulation->work_queue) == false); const data_t duration = queue_dequeue(&dynamic_simulation->work_queue); if (duration != END_OF_WORK) { usleep(duration); dynamic_simulation->durations[rank] += duration; } else { queue_append(&dynamic_simulation->work_queue, END_OF_WORK); sem_post(&dynamic_simulation->can_work); break; } } return NULL; }