/*
* Copyright 2021 Jeisson Hidalgo-Cespedes - Universidad de Costa Rica
*/
#include <assert.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include "dynamic_simulation.h"
#define MIN(a, b) ((a) < (b) ? (a) : (b))
typedef struct private_thread_data {
size_t thread_number; // rank
dynamic_simulation_t* dynamic_simulation;
} private_thread_data_t;
int dynamic_simulation_create_threads(dynamic_simulation_t* dynamic_simulation);
void dynamic_simulation_print_statistics(
dynamic_simulation_t* dynamic_simulation);
int produce(dynamic_simulation_t* dynamic_simulation);
void* work(void* data);
int dynamic_simulation_init(dynamic_simulation_t* dynamic_simulation,
simulation_t* simulation) {
assert(dynamic_simulation);
assert(simulation);
dynamic_simulation->simulation = simulation;
dynamic_simulation->durations = calloc(simulation->thread_count
, sizeof(data_t));
queue_init(&dynamic_simulation->work_queue);
sem_init(&dynamic_simulation->can_work, /*pshared*/ 0, 0);
return 0;
}
void dynamic_simulation_destroy(dynamic_simulation_t* dynamic_simulation) {
assert(dynamic_simulation);
sem_destroy(&dynamic_simulation->can_work);
queue_destroy(&dynamic_simulation->work_queue);
free(dynamic_simulation->durations);
}
int dynamic_simulation_run(dynamic_simulation_t* dynamic_simulation) {
assert(dynamic_simulation);
dynamic_simulation_create_threads(dynamic_simulation);
dynamic_simulation_print_statistics(dynamic_simulation);
return 0;
}
int dynamic_simulation_create_threads(
dynamic_simulation_t* dynamic_simulation) {
int error = 0;
simulation_t* simulation = dynamic_simulation->simulation;
pthread_t* workers = (pthread_t*)
malloc(simulation->thread_count * sizeof(pthread_t));
private_thread_data_t* private_thread_data = (private_thread_data_t*)
calloc(simulation->thread_count, sizeof(private_thread_data_t));
if (workers && private_thread_data) {
clock_gettime(/*clk_id*/CLOCK_MONOTONIC, &dynamic_simulation->start_time);
for (size_t index = 0; index < simulation->thread_count; ++index) {
private_thread_data[index].thread_number = index;
private_thread_data[index].dynamic_simulation = dynamic_simulation;
error = pthread_create(&workers[index], NULL, work
, &private_thread_data[index]);
if (error) {
fprintf(stderr, "error: could not create thread %zu\n", index);
error = 21;
}
}
produce(dynamic_simulation);
for (size_t index = 0; index < simulation->thread_count; ++index) {
pthread_join(workers[index], NULL);
}
free(private_thread_data);
free(workers);
} else {
fprintf(stderr, "error: could not allocate memory for %zu threads\n"
, simulation->thread_count);
error = 22;
}
return error;
}
void dynamic_simulation_print_statistics(
dynamic_simulation_t* dynamic_simulation) {
assert(dynamic_simulation);
simulation_t* simulation = dynamic_simulation->simulation;
printf("%-7s", "Dynamic");
data_t max_duration = 0;
for (size_t index = 0; index < simulation->thread_count; ++index) {
if (dynamic_simulation->durations[index] > max_duration) {
max_duration = dynamic_simulation->durations[index];
}
printf(" %4u", dynamic_simulation->durations[index]);
}
printf(" %8u", max_duration);
assert(max_duration > 0);
const double speedup = (double)dynamic_simulation->simulation->total_duration
/ max_duration;
printf(" %7.2lf", speedup);
const double efficiency = speedup / simulation->thread_count;
printf(" %10.2lf", efficiency);
printf(" %12.9lf", calculate_elapsed_time(&dynamic_simulation->start_time));
putchar('\n');
}
int produce(dynamic_simulation_t* dynamic_simulation) {
assert(dynamic_simulation);
simulation_t* simulation = dynamic_simulation->simulation;
int error = EXIT_SUCCESS;
for (size_t index = 0; index < simulation->work.count; ++index) {
error = queue_append(&dynamic_simulation->work_queue
, simulation->work.elements[index]);
if (error) {
break;
}
sem_post(&dynamic_simulation->can_work);
}
// Produce the end-of-work marker
if (error == EXIT_SUCCESS) {
error = queue_append(&dynamic_simulation->work_queue, END_OF_WORK);
if (error == EXIT_SUCCESS) {
sem_post(&dynamic_simulation->can_work);
}
}
return error;
}
void* work(void* data) {
private_thread_data_t* private_data = (private_thread_data_t*)data;
dynamic_simulation_t* dynamic_simulation = private_data->dynamic_simulation;
const size_t rank = private_data->thread_number;
while (true) {
sem_wait(&dynamic_simulation->can_work);
assert(queue_is_empty(&dynamic_simulation->work_queue) == false);
const data_t duration = queue_dequeue(&dynamic_simulation->work_queue);
if (duration != END_OF_WORK) {
usleep(duration);
dynamic_simulation->durations[rank] += duration;
} else {
queue_append(&dynamic_simulation->work_queue, END_OF_WORK);
sem_post(&dynamic_simulation->can_work);
break;
}
}
return NULL;
}