2-pthreads/{position_race/position_race.c → producer_consumer/producer_consumer.c} RENAMED
@@ -1,95 +1,134 @@
1
  #include <pthread.h>
 
2
  #include <stdio.h>
3
  #include <stdlib.h>
4
  #include <time.h>
5
  #include <unistd.h>
6
 
7
- /*
8
- int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
9
- void* (*start_routine)(void*), void *arg);
10
- */
11
-
12
  // thread shared data
13
  typedef struct
14
  {
15
  size_t thread_count;
16
- size_t position;
17
- pthread_mutex_t mutex;
 
 
 
 
 
 
18
  } shared_data_t;
19
 
20
- // thread private data
21
- typedef struct
22
  {
23
- size_t thread_id;
24
- shared_data_t* shared_data;
25
- } private_data_t;
26
 
27
- void* run(void* data)
28
  {
29
- private_data_t* private_data = (private_data_t*)data;
30
- shared_data_t* shared_data = private_data->shared_data;
 
31
 
32
- pthread_mutex_lock( &shared_data->mutex );
33
- ++shared_data->position;
 
34
 
35
- printf("thread %zu/%zu I arrived at position %zu\n"
36
- , private_data->thread_id, shared_data->thread_count
37
- , shared_data->position);
38
 
39
- pthread_mutex_unlock( &shared_data->mutex );
 
 
 
 
 
 
40
 
41
  return NULL;
42
  }
43
 
44
- int main(int argc, char* argv[])
45
  {
46
- size_t thread_count = sysconf(_SC_NPROCESSORS_ONLN);
47
- if ( argc >= 2 )
 
 
 
48
  {
49
- if ( sscanf(argv[1], "%zu", &thread_count) != 1 || thread_count == 0 )
50
- return (void)fprintf(stderr, "hello_w: error: invalid thread count: %s\n", argv[1]), 1;
 
 
 
 
 
 
 
 
 
 
51
  }
52
 
53
- pthread_t* threads = (pthread_t*)malloc(thread_count * sizeof(pthread_t));
54
- if ( threads == NULL )
55
- return (void)fprintf(stderr, "hello_w: error: could not allocate memory for: %zu threads\n", thread_count), 2;
56
 
57
- struct timespec start_time;
58
- clock_gettime(CLOCK_MONOTONIC, &start_time);
 
59
 
60
  shared_data_t shared_data;
61
- shared_data.thread_count = thread_count;
62
- shared_data.position = 0;
63
- pthread_mutex_init(&shared_data.mutex, NULL);
64
-
65
- private_data_t* private_data = (private_data_t*) calloc(thread_count, sizeof(private_data_t));
66
 
67
- for ( size_t index = 0; index < thread_count; ++index )
68
  {
69
- private_data[index].thread_id = index;
70
- private_data[index].shared_data = &shared_data;
71
- pthread_create(&threads[index], NULL, run, private_data + index);
 
 
 
 
 
72
  }
 
 
73
 
74
- pthread_mutex_lock( &shared_data.mutex );
75
- printf("Hello from main thread\n");
76
- pthread_mutex_unlock( &shared_data.mutex );
 
 
 
 
77
 
 
 
 
 
 
 
 
 
 
 
 
78
 
79
- for ( size_t index = 0; index < thread_count; ++index )
80
  pthread_join(threads[index], NULL);
81
 
82
  struct timespec finish_time;
83
  clock_gettime(CLOCK_MONOTONIC, &finish_time);
84
 
85
  double seconds = finish_time.tv_sec - start_time.tv_sec
86
  + (finish_time.tv_nsec - start_time.tv_nsec) * 1e-9;
87
 
88
- printf("Thread creation and join time: %.9lfs\n", seconds);
 
 
 
 
89
 
90
- pthread_mutex_destroy(&shared_data.mutex);
91
- free(private_data);
92
  free(threads);
93
 
94
  return 0;
95
  }
1
  #include <pthread.h>
2
+ #include <semaphore.h>
3
  #include <stdio.h>
4
  #include <stdlib.h>
5
  #include <time.h>
6
  #include <unistd.h>
7
 
 
 
 
 
 
8
  // thread shared data
9
  typedef struct
10
  {
11
  size_t thread_count;
12
+ size_t data_size;
13
+ double* data;
14
+ size_t rounds;
15
+ unsigned producer_max_delay; // milliseconds
16
+ unsigned consumer_max_delay; // milliseconds
17
+ sem_t producer_semaphore;
18
+ sem_t consumer_semaphore;
19
+ pthread_mutex_t stdout_mutex;
20
  } shared_data_t;
21
 
22
+ void* produce(void* data)
 
23
  {
24
+ shared_data_t* shared_data = (shared_data_t*)data;
 
 
25
 
26
+ for ( size_t round = 1; round <= shared_data->rounds; ++round )
27
  {
28
+ for ( size_t index = 0; index < shared_data->data_size; ++index )
29
+ {
30
+ sem_wait( &shared_data->producer_semaphore );
31
 
32
+ // Producer requires time to produce a value
33
+ usleep( rand() % (shared_data->producer_max_delay + 1) * 1000 );
34
+ double product = round + (index + 1) / 100.0;
35
 
36
+ shared_data->data[index] = product;
 
 
37
 
38
+ pthread_mutex_lock( &shared_data->stdout_mutex );
39
+ printf("Produced %.2f\n", product);
40
+ pthread_mutex_unlock( &shared_data->stdout_mutex );
41
+
42
+ sem_post( &shared_data->consumer_semaphore );
43
+ }
44
+ }
45
 
46
  return NULL;
47
  }
48
 
49
+ void* consume(void* data)
50
  {
51
+ shared_data_t* shared_data = (shared_data_t*)data;
52
+
53
+ for ( size_t round = 1; round <= shared_data->rounds; ++round )
54
+ {
55
+ for ( size_t index = 0; index < shared_data->data_size; ++index )
56
  {
57
+ sem_wait( &shared_data->consumer_semaphore );
58
+
59
+ // Consumer requires time to consume a value
60
+ usleep( rand() % (shared_data->consumer_max_delay + 1) * 1000 );
61
+ double product = shared_data->data[index];
62
+
63
+ pthread_mutex_lock( &shared_data->stdout_mutex );
64
+ printf("\t\t\tConsumed %.2f\n", product);
65
+ pthread_mutex_unlock( &shared_data->stdout_mutex );
66
+
67
+ sem_post( &shared_data->producer_semaphore );
68
+ }
69
  }
70
 
71
+ return NULL;
72
+ }
 
73
 
74
+ int main(int argc, char* argv[])
75
+ {
76
+ srand( time(NULL) );
77
 
78
  shared_data_t shared_data;
79
+ shared_data.thread_count = 2;
 
 
 
 
80
 
81
+ if ( argc == 5 )
82
  {
83
+ if ( sscanf(argv[1], "%zu", &shared_data.data_size) != 1 || shared_data.data_size == 0 )
84
+ return (void)fprintf(stderr, "producer_consumer: error: invalid data size: %s\n", argv[1]), 1;
85
+ if ( sscanf(argv[2], "%zu", &shared_data.rounds) != 1 || shared_data.rounds == 0 )
86
+ return (void)fprintf(stderr, "producer_consumer: error: invalid rounds: %s\n", argv[2]), 1;
87
+ if ( sscanf(argv[3], "%u", &shared_data.producer_max_delay) != 1 )
88
+ return (void)fprintf(stderr, "producer_consumer: error: invalid producer max delay: %s\n", argv[3]), 1;
89
+ if ( sscanf(argv[4], "%u", &shared_data.consumer_max_delay) != 1 )
90
+ return (void)fprintf(stderr, "producer_consumer: error: invalid consumer max delay: %s\n", argv[4]), 1;
91
  }
92
+ else
93
+ return (void)fprintf(stderr, "usage: producer_consumer data_size rounds producer_delay consumer_delay\n"), 1;
94
 
95
+ shared_data.data = (double*) calloc(shared_data.data_size, sizeof(double));
96
+ if ( shared_data.data == NULL )
97
+ return 6;
98
+
99
+ sem_init( &shared_data.producer_semaphore, 0, shared_data.data_size );
100
+ sem_init( &shared_data.consumer_semaphore, 0, 0);
101
+ pthread_mutex_init( &shared_data.stdout_mutex, NULL );
102
 
103
+ pthread_t* threads = (pthread_t*)malloc(shared_data.thread_count * sizeof(pthread_t));
104
+ if ( threads == NULL )
105
+ return (void)fprintf(stderr, "hello_w: error: could not allocate memory for: %zu threads\n", shared_data.thread_count), 2;
106
+
107
+ struct timespec start_time;
108
+ clock_gettime(CLOCK_MONOTONIC, &start_time);
109
+
110
+ // Create producer
111
+ pthread_create(&threads[0], NULL, produce, &shared_data);
112
+ // Create consumer
113
+ pthread_create(&threads[1], NULL, consume, &shared_data);
114
 
115
+ for ( size_t index = 0; index < shared_data.thread_count; ++index )
116
  pthread_join(threads[index], NULL);
117
 
118
  struct timespec finish_time;
119
  clock_gettime(CLOCK_MONOTONIC, &finish_time);
120
 
121
  double seconds = finish_time.tv_sec - start_time.tv_sec
122
  + (finish_time.tv_nsec - start_time.tv_nsec) * 1e-9;
123
 
124
+ printf("Simulation time: %.9lfs\n", seconds);
125
+
126
+ pthread_mutex_destroy( &shared_data.stdout_mutex );
127
+ sem_destroy(&shared_data.consumer_semaphore);
128
+ sem_destroy(&shared_data.producer_semaphore);
129
 
130
+ free(shared_data.data);
 
131
  free(threads);
132
 
133
  return 0;
134
  }