#include #include #include #include #include #include #include #include #define PRODUCER_SEMAPHORE_NAME "/producer" #define CONSUMER_SEMAPHORE_NAME "/consumer" // thread shared data typedef struct { size_t thread_count; size_t data_size; double* data; size_t rounds; unsigned producer_min_delay; // milliseconds unsigned producer_max_delay; // milliseconds unsigned consumer_min_delay; // milliseconds unsigned consumer_max_delay; // milliseconds sem_t* producer_semaphore; sem_t* consumer_semaphore; } shared_data_t; int analyze_arguments(int argc, char* argv[], shared_data_t* shared_data); void* produce(void* data); void* consume(void* data); int main(int argc, char* argv[]) { srand( time(NULL) ); shared_data_t shared_data; shared_data.thread_count = 2; int error = analyze_arguments(argc, argv, &shared_data); if ( error ) return error; shared_data.data = (double*) calloc(shared_data.data_size, sizeof(double)); if ( shared_data.data == NULL ) return (void)fprintf(stderr, "hello_w: error: could not allocate memory for: %zu data\n", shared_data.data_size), 8; shared_data.producer_semaphore = sem_open(PRODUCER_SEMAPHORE_NAME, O_CREAT | O_EXCL, 0644, shared_data.data_size); shared_data.consumer_semaphore = sem_open(CONSUMER_SEMAPHORE_NAME, O_CREAT | O_EXCL, 0644, 0); assert(shared_data.producer_semaphore != SEM_FAILED && shared_data.consumer_semaphore != SEM_FAILED); pthread_t* threads = (pthread_t*) malloc(shared_data.thread_count * sizeof(pthread_t)); if ( threads == NULL ) return (void)fprintf(stderr, "hello_w: error: could not allocate memory for: %zu threads\n", shared_data.thread_count), 9; struct timespec start_time; clock_gettime(CLOCK_MONOTONIC, &start_time); // Create producer pthread_create(&threads[0], NULL, produce, &shared_data); // Create consumer pthread_create(&threads[1], NULL, consume, &shared_data); for ( size_t index = 0; index < shared_data.thread_count; ++index ) pthread_join(threads[index], NULL); struct timespec finish_time; clock_gettime(CLOCK_MONOTONIC, &finish_time); double seconds = finish_time.tv_sec - start_time.tv_sec + (finish_time.tv_nsec - start_time.tv_nsec) * 1e-9; printf("Simulation time: %.9lfs\n", seconds); sem_close(shared_data.consumer_semaphore); sem_close(shared_data.producer_semaphore); sem_unlink(CONSUMER_SEMAPHORE_NAME); sem_unlink(PRODUCER_SEMAPHORE_NAME); free(shared_data.data); free(threads); return 0; } int analyze_arguments(int argc, char* argv[], shared_data_t* shared_data) { if ( argc != 7 ) return (void)fprintf(stderr, "usage: producer_consumer_2 data_size rounds min_producer_delay" " max_producer_delay min_consumer_delay max_consumer_delay\n"), 1; if ( sscanf(argv[1], "%zu", &shared_data->data_size) != 1 || shared_data->data_size == 0 ) return (void)fprintf(stderr, "producer_consumer: error: invalid data size: %s\n", argv[1]), 2; if ( sscanf(argv[2], "%zu", &shared_data->rounds) != 1 || shared_data->rounds == 0 ) return (void)fprintf(stderr, "producer_consumer: error: invalid rounds: %s\n", argv[2]), 3; if ( sscanf(argv[3], "%u", &shared_data->producer_min_delay) != 1 ) return (void)fprintf(stderr, "producer_consumer: error: invalid producer min delay: %s\n", argv[3]), 4; if ( sscanf(argv[4], "%u", &shared_data->producer_max_delay) != 1 || shared_data->producer_max_delay < shared_data->producer_min_delay ) return (void)fprintf(stderr, "producer_consumer: error: invalid producer max delay: %s\n", argv[4]), 5; if ( sscanf(argv[5], "%u", &shared_data->consumer_min_delay) != 1 ) return (void)fprintf(stderr, "producer_consumer: error: invalid consumer min delay: %s\n", argv[5]), 6; if ( sscanf(argv[6], "%u", &shared_data->consumer_max_delay) != 1 || shared_data->consumer_max_delay < shared_data->consumer_min_delay ) return (void)fprintf(stderr, "producer_consumer: error: invalid consumer max delay: %s\n", argv[6]), 7; // Success return 0; } void random_wait(unsigned min_milliseconds, unsigned max_milliseconds) { unsigned range = max_milliseconds - min_milliseconds; usleep( (min_milliseconds + rand() % (range + 1)) * 1000 ); } double generate_product(size_t round, size_t index, const shared_data_t* shared_data) { // Producer requires time to produce a value random_wait(shared_data->producer_min_delay, shared_data->producer_max_delay); // Produce a value double product = round + (index + 1) / 100.0;; printf("Produced %.2f\n", product); return product; } void consume_product(double product, const shared_data_t* shared_data) { // Consumer requires time to consume a value random_wait(shared_data->consumer_min_delay, shared_data->consumer_max_delay); printf("\t\tConsumed %.2f\n", product); } void* produce(void* data) { shared_data_t* shared_data = (shared_data_t*)data; for ( size_t round = 1; round <= shared_data->rounds; ++round ) { for ( size_t index = 0; index < shared_data->data_size; ++index ) { sem_wait( shared_data->producer_semaphore ); shared_data->data[index] = generate_product(round, index, shared_data); sem_post( shared_data->consumer_semaphore ); } } return NULL; } void* consume(void* data) { shared_data_t* shared_data = (shared_data_t*)data; for ( size_t round = 1; round <= shared_data->rounds; ++round ) { for ( size_t index = 0; index < shared_data->data_size; ++index ) { sem_wait( shared_data->consumer_semaphore ); consume_product( shared_data->data[index], shared_data ); sem_post( shared_data->producer_semaphore ); } } return NULL; }