#include #include #include #include #ifdef __APPLE__ #include "pthread_barrier.h" #endif #include "levenshtein.h" // Pthreads diagonal Levenshtein ================================================================== typedef struct { // Strings to be compared const void* col_str; const void* row_str; // Lengths of the strings to be compared size_t col_len; size_t row_len; // Total number of workers launched size_t worker_count; // Matrix of 3*(max_len+1) distances size_t** distances; // Threads will be calculating the distances of this row size_t current_row; pthread_barrier_t barrier; // True if comparison must be done in Unicode, false for ASCII size_t unicode; // bool, but used size_t for padding } diagonal_shared_t; typedef struct { // Thread number size_t id; // Shared data diagonal_shared_t* shared; } diagonal_worker_data_t; void* calculate_distance_diagonal(void* data); size_t levenshtein_distance_pthreads_diagonal(const void* s1, size_t len1, const void* s2, size_t len2, bool unicode, size_t* workers) { assert(s1); assert(s2); assert(len1); assert(len2); // Create the shared data diagonal_shared_t* shared = (diagonal_shared_t*) malloc( 1 * sizeof(diagonal_shared_t) ); if ( shared == NULL ) return (void)fprintf(stderr, "levdist: error: no memory for shared data\n"), (size_t)-1; // Init the shared data shared->unicode = unicode; shared->current_row = 0; // We use the longest string for columns and shortest for rows shared->col_str = len1 > len2 ? s2 : s1; shared->row_str = len1 > len2 ? s1 : s2; shared->col_len = len1 > len2 ? len2 : len1;; shared->row_len = len1 > len2 ? len1 : len2; // Create a matrix of distances shared by all workers // Only 3 rows are required to hold in RAM const size_t cols = shared->row_len + 1; shared->distances = create_distance_matrix(3, cols); // Do not use more workers than the number of columns *workers = min(*workers, cols); shared->worker_count = *workers; // Init the barrier pthread_barrier_init(&shared->barrier, NULL, (unsigned)shared->worker_count); // Create the exclusive data of each thread diagonal_worker_data_t* thread_data = (diagonal_worker_data_t*)malloc( *workers * sizeof(diagonal_worker_data_t) ); if ( thread_data == NULL ) return (void)fprintf(stderr, "levdist: error: no memory for thread data\n"), (size_t)-1; // Create the threads pthread_t* threads = (pthread_t*)malloc( *workers * sizeof(pthread_t) ); if ( threads == NULL ) return (void)fprintf(stderr, "levdist: error: no memory for threads\n"), (size_t)-1; // Start each thread for ( size_t index = 0; index < *workers; ++index ) { thread_data[index].id = index; thread_data[index].shared = shared; if ( pthread_create( threads + index, NULL, calculate_distance_diagonal, thread_data + index) ) return (void)fprintf(stderr, "levdist: error: could not create thread %zu\n", index), (size_t)-1; } // Wait for all threads to finish for ( size_t index = 0; index < *workers; ++index ) pthread_join(threads[index], NULL); // Hold the result size_t result = shared->distances[ (shared->current_row - 1) % 3 ][shared->row_len]; // Destroy the barrier pthread_barrier_destroy(&shared->barrier); // Free the arrays destroy_distance_matrix(shared->distances, 3); // Destroy the threads and shared data free(thread_data); free(threads); free(shared); // Return the Levenshtein distance return result; } static inline void calculate_distance_diagonal_ascii(diagonal_shared_t* shared, const size_t row, const size_t prev, const size_t antp, const size_t my_adj_start, const size_t my_adj_last) { for ( size_t col = my_adj_start; col < my_adj_last; ++col ) { if ( col == 0 || col == shared->current_row ) shared->distances[row][col] = shared->current_row; else { size_t diff = (size_t)( ((const char*)shared->col_str)[shared->current_row - col - 1] != ((const char*)shared->row_str)[col - 1] ); shared->distances[row][col] = min3( shared->distances[prev][col] + 1, shared->distances[prev][col - 1] + 1, shared->distances[antp][col - 1] + diff); } } } // Incorrect slight optimization of calculate_distance_diagonal_ascii(): //if ( my_adj_start == 0 ) // shared->distances[row][my_adj_start] = shared->current_row; //else if ( my_adj_last - 1 == shared->current_row ) // shared->distances[row][my_adj_last - 1] = shared->current_row; //const char* col_str = (const char*)shared->col_str; //const char* row_str = (const char*)shared->row_str; //for ( size_t col = my_adj_start; col < my_adj_last; ++col ) //{ // size_t diff = (size_t)(col_str[shared->current_row - col - 1] != row_str[col - 1] ); // shared->distances[row][col] = min3( // shared->distances[prev][col] + 1, // shared->distances[prev][col - 1] + 1, // shared->distances[antp][col - 1] + diff); //} static inline void calculate_distance_diagonal_unicode(diagonal_shared_t* shared, const size_t row, const size_t prev, const size_t antp, const size_t my_adj_start, const size_t my_adj_last) { for ( size_t col = my_adj_start; col < my_adj_last; ++col ) { if ( col == 0 || col == shared->current_row ) shared->distances[row][col] = shared->current_row; else { size_t diff = (size_t)( ((const wchar_t*)shared->col_str)[shared->current_row - col - 1] != ((const wchar_t*)shared->row_str)[col - 1] ); shared->distances[row][col] = min3( shared->distances[prev][col] + 1, shared->distances[prev][col - 1] + 1, shared->distances[antp][col - 1] + diff); } } } void* calculate_distance_diagonal(void* data) { // Extract the given data from the void* pointers diagonal_worker_data_t* worker = (diagonal_worker_data_t*)data; diagonal_shared_t* shared = worker->shared; // Dimensions of the diamond to be traversed const size_t diamond_rows = shared->col_len + shared->row_len + 1; const size_t diamond_cols = shared->row_len + 1; // Distribute columns by thread const size_t equitative = diamond_cols / shared->worker_count; const size_t remainder = diamond_cols % shared->worker_count; // My first and last column to calculate const size_t my_first_col = worker->id * equitative + min(remainder, worker->id); const size_t my_last_col = (worker->id + 1) * equitative + min(remainder, worker->id + 1); // All workers calculate rows until they run out // If there are no more rows to calculate, the team has finished the work // if ( worker->id == 0) // fprintf(stderr, "thrd crow row prev antp fcol lcol strt end\n"); while ( shared->current_row < diamond_rows ) { // The index in the matrix (not the diamond) of the current row const size_t row = shared->current_row % 3; const size_t prev = (shared->current_row + 3 - 1) % 3; const size_t antp = (shared->current_row + 3 - 2) % 3; // The columns to be updated in this row const size_t first_col = shared->current_row > shared->col_len ? shared->current_row - shared->col_len : 0; const size_t last_col = min(shared->current_row + 1, diamond_cols); // The number of columns I have to update const size_t my_adj_start = max(my_first_col, first_col); const size_t my_adj_last = min(my_last_col, last_col); // Update the distances using the Levenshtein algoritm // fprintf(stderr, "%4zu %4zu %4zu %4zu %4zu %4zu %4zu %4zu %4zu\n", worker->id, shared->current_row, row, prev, antp, first_col, last_col, my_adj_start, my_adj_last); shared->unicode ? calculate_distance_diagonal_unicode(shared, row, prev, antp, my_adj_start, my_adj_last) : calculate_distance_diagonal_ascii (shared, row, prev, antp, my_adj_start, my_adj_last); // Make all threads to pass to the next row at the same time pthread_barrier_wait(&shared->barrier); // Only a thread updates the row index, we choose the first one if ( worker->id == 0 ) ++shared->current_row/*, fputc('\n', stderr)*/; // Wait again until the current row is updated pthread_barrier_wait(&shared->barrier); } return NULL; }