Download c source code

/*
 * Copyright 2021 Jeisson Hidalgo-Cespedes - Universidad de Costa Rica
 */

#include <assert.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>

#include "dynamic_simulation.h"

#define MIN(a, b) ((a) < (b) ? (a) : (b))

typedef struct private_thread_data {
  size_t thread_number;  // rank
  dynamic_simulation_t* dynamic_simulation;
} private_thread_data_t;

int dynamic_simulation_create_threads(dynamic_simulation_t* dynamic_simulation);
void dynamic_simulation_print_statistics(
  dynamic_simulation_t* dynamic_simulation);
int produce(dynamic_simulation_t* dynamic_simulation);
void* work(void* data);

int dynamic_simulation_init(dynamic_simulation_t* dynamic_simulation,
  simulation_t* simulation) {
  assert(dynamic_simulation);
  assert(simulation);

  dynamic_simulation->simulation = simulation;
  dynamic_simulation->durations = calloc(simulation->thread_count
    , sizeof(data_t));

  queue_init(&dynamic_simulation->work_queue);
  sem_init(&dynamic_simulation->can_work, /*pshared*/ 0, 0);

  return 0;
}

void dynamic_simulation_destroy(dynamic_simulation_t* dynamic_simulation) {
  assert(dynamic_simulation);

  sem_destroy(&dynamic_simulation->can_work);
  queue_destroy(&dynamic_simulation->work_queue);

  free(dynamic_simulation->durations);
}

int dynamic_simulation_run(dynamic_simulation_t* dynamic_simulation) {
  assert(dynamic_simulation);

  dynamic_simulation_create_threads(dynamic_simulation);
  dynamic_simulation_print_statistics(dynamic_simulation);

  return 0;
}

int dynamic_simulation_create_threads(
    dynamic_simulation_t* dynamic_simulation) {
  int error = 0;
  simulation_t* simulation = dynamic_simulation->simulation;
  pthread_t* workers = (pthread_t*)
    malloc(simulation->thread_count * sizeof(pthread_t));

  private_thread_data_t* private_thread_data = (private_thread_data_t*)
    calloc(simulation->thread_count, sizeof(private_thread_data_t));

  if (workers && private_thread_data) {
    clock_gettime(/*clk_id*/CLOCK_MONOTONIC, &dynamic_simulation->start_time);

    for (size_t index = 0; index < simulation->thread_count; ++index) {
      private_thread_data[index].thread_number = index;
      private_thread_data[index].dynamic_simulation = dynamic_simulation;

      error = pthread_create(&workers[index], NULL, work
        , &private_thread_data[index]);

      if (error) {
        fprintf(stderr, "error: could not create thread %zu\n", index);
        error = 21;
      }
    }

    produce(dynamic_simulation);

    for (size_t index = 0; index < simulation->thread_count; ++index) {
      pthread_join(workers[index], NULL);
    }

    free(private_thread_data);
    free(workers);
  } else {
    fprintf(stderr, "error: could not allocate memory for %zu threads\n"
      , simulation->thread_count);
    error = 22;
  }

  return error;
}

void dynamic_simulation_print_statistics(
    dynamic_simulation_t* dynamic_simulation) {
  assert(dynamic_simulation);
  simulation_t* simulation = dynamic_simulation->simulation;

  printf("%-7s", "Dynamic");
  data_t max_duration = 0;
  for (size_t index = 0; index < simulation->thread_count; ++index) {
    if (dynamic_simulation->durations[index] > max_duration) {
      max_duration = dynamic_simulation->durations[index];
    }
    printf(" %4u", dynamic_simulation->durations[index]);
  }
  printf(" %8u", max_duration);

  assert(max_duration > 0);
  const double speedup = (double)dynamic_simulation->simulation->total_duration
    / max_duration;
  printf(" %7.2lf", speedup);

  const double efficiency = speedup / simulation->thread_count;
  printf(" %10.2lf", efficiency);

  printf(" %12.9lf", calculate_elapsed_time(&dynamic_simulation->start_time));
  putchar('\n');
}

int produce(dynamic_simulation_t* dynamic_simulation) {
  assert(dynamic_simulation);
  simulation_t* simulation = dynamic_simulation->simulation;

  int error = EXIT_SUCCESS;
  for (size_t index = 0; index < simulation->work.count; ++index) {
    error = queue_append(&dynamic_simulation->work_queue
      , simulation->work.elements[index]);
    if (error) {
      break;
    }
    sem_post(&dynamic_simulation->can_work);
  }

  // Produce the end-of-work marker
  if (error == EXIT_SUCCESS) {
    error = queue_append(&dynamic_simulation->work_queue, END_OF_WORK);
    if (error == EXIT_SUCCESS) {
      sem_post(&dynamic_simulation->can_work);
    }
  }

  return error;
}

void* work(void* data) {
  private_thread_data_t* private_data = (private_thread_data_t*)data;
  dynamic_simulation_t* dynamic_simulation = private_data->dynamic_simulation;
  const size_t rank = private_data->thread_number;

  while (true) {
    sem_wait(&dynamic_simulation->can_work);

    assert(queue_is_empty(&dynamic_simulation->work_queue) == false);
    const data_t duration = queue_dequeue(&dynamic_simulation->work_queue);

    if (duration != END_OF_WORK) {
      usleep(duration);
      dynamic_simulation->durations[rank] += duration;
    } else {
      queue_append(&dynamic_simulation->work_queue, END_OF_WORK);
      sem_post(&dynamic_simulation->can_work);
      break;
    }
  }

  return NULL;
}