#include #include #include #include #include #ifdef __APPLE__ #include "pthread_barrier.h" #endif #include "levenshtein.h" /* Parallel implementation suggested by Guo et al. 2013: * * L. Guo et al., "Parallel Algorithm for Approximate String Matching with K Differences," * 2013 IEEE Eighth International Conference on Networking, Architecture and Storage, Xi'an, * 2013, pp. 257-261. doi: 10.1109/NAS.2013.40 * https://ieeexplore.ieee.org/document/6665373 */ // Size of the alphabet S (character set) #define SZ_ASCII UCHAR_MAX // rows of the distance matrix' window #define WINDOW_ROWS 2 // Shared data by all threads typedef struct { // Strings to be compared const void* text; // T (text) const void* pattern; // P (pattern) // Lengths of the strings to be compared size_t text_len; // n = |T| size_t pattern_len; // m = |P| // Total number of workers launched size_t worker_count; // w // Matrix X of |S| x (n+1), where S is the character set size_t** starts; // X // Matrix of (workers + 1) x (n + 1) distances // Actually it is a window of the real D matrix of (m + 2) x (n + 1) distances size_t** distances; // D // Threads will be calculating the distances of this row size_t current_row; // All threads have to wait until descent to the next row pthread_barrier_t barrier; // True if comparison must be done in Unicode, false for ASCII size_t unicode; // bool, but used size_t for padding } guo_shared_t; typedef struct { // Thread number size_t id; // Shared data guo_shared_t* shared; } guo_worker_data_t; void* calculate_guo(void* data); size_t levenshtein_distance_pthreads_guo(const void* s1, size_t len1, const void* s2, size_t len2, bool unicode, size_t* workers) { assert(s1); assert(s2); assert(len1); assert(len2); // Create the shared data guo_shared_t* shared = (guo_shared_t*) malloc( 1 * sizeof(guo_shared_t) ); if ( shared == NULL ) return (void)fprintf(stderr, "levdist: error: no memory for shared data\n"), (size_t)-1; // Init the shared data shared->unicode = unicode; // We use the longest string for columns (text) and shortest for rows (pattern) shared->text = len1 > len2 ? s1 : s2; shared->text_len = len1 > len2 ? len1 : len2; shared->pattern = len1 > len2 ? s2 : s1; shared->pattern_len = len1 > len2 ? len2 : len1; // Create the matrix of starting distances, shared by all workers const size_t cols = shared->text_len + 1; shared->starts = create_distance_matrix(SZ_ASCII, cols); if ( shared->starts == NULL ) return (void)fprintf(stderr, "levdist: error: no memory for starts\n"), (size_t)-1; // Create a window of the matrix of distances shared by all workers // Only two rows are required to hold in RAM shared->distances = create_distance_matrix(WINDOW_ROWS, cols); if ( shared->distances == NULL ) return (void)fprintf(stderr, "levdist: error: no memory for distances window\n"), (size_t)-1; // Fill first row to avoid an ifs in the concurrent code for ( size_t col = 0; col < cols; ++col ) shared->distances[0][col] = col; // Threads will start at row 1 shared->current_row = 1; // Do not use more workers than the number of columns shared->worker_count = *workers = min(*workers, cols); // Init the barrier that prevents all threads to descent until all the row is done pthread_barrier_init(&shared->barrier, NULL, (unsigned)shared->worker_count); // Create the exclusive data of each thread guo_worker_data_t* thread_data = (guo_worker_data_t*)malloc( *workers * sizeof(guo_worker_data_t) ); if ( thread_data == NULL ) return (void)fprintf(stderr, "levdist: error: no memory for thread data\n"), (size_t)-1; // Create the threads pthread_t* threads = (pthread_t*)malloc( *workers * sizeof(pthread_t) ); if ( threads == NULL ) return (void)fprintf(stderr, "levdist: error: no memory for threads\n"), (size_t)-1; // Start each thread for ( size_t index = 0; index < *workers; ++index ) { thread_data[index].id = index; thread_data[index].shared = shared; if ( pthread_create(threads + index, NULL, calculate_guo, thread_data + index) ) return (void)fprintf(stderr, "levdist: error: could not create thread %zu\n", index), (size_t)-1; } // Wait for all threads to finish for ( size_t index = 0; index < *workers; ++index ) pthread_join(threads[index], NULL); // Hold the result size_t result = shared->distances[ (shared->current_row - 1) % WINDOW_ROWS ][shared->text_len]; // Destroy the barrier pthread_barrier_destroy(&shared->barrier); // Free the arrays destroy_distance_matrix(shared->distances, WINDOW_ROWS); destroy_distance_matrix(shared->starts, SZ_ASCII); // Destroy the threads and shared data free(thread_data); free(threads); free(shared); // Return the Levenshtein distance return result; } void calculate_start_guo(guo_worker_data_t* worker, guo_shared_t* shared); void calculate_distance_guo(guo_worker_data_t* worker, guo_shared_t* shared); void* calculate_guo(void* data) { // Extract the given data from the void* pointers guo_worker_data_t* worker = (guo_worker_data_t*)data; guo_shared_t* shared = worker->shared; // First, all threads fill the starting distances matrix calculate_start_guo(worker, shared); // All threads must wait until the start matrix is filled pthread_barrier_wait(&shared->barrier); // Finally, all threads calculate the distance matrix calculate_distance_guo(worker, shared); return NULL; } // Fills the X starting distances matrix void calculate_start_guo(guo_worker_data_t* worker, guo_shared_t* shared) { // Distribute rows by thread const size_t equitative = SZ_ASCII / shared->worker_count; const size_t remainder = SZ_ASCII % shared->worker_count; // My first and last column to calculate const size_t my_first_row = worker->id * equitative + min(remainder, worker->id); const size_t my_last_row = (worker->id + 1) * equitative + min(remainder, worker->id + 1); // Calculate distances for all of my rows for ( size_t row = my_first_row; row < my_last_row; ++row ) { // Using formula (3) in (Guo et al. 2013) assert(row < SZ_ASCII); // The first column is always 0 shared->starts[row][0] = 0; // Traverse the remaining columns in the row for ( size_t col = 1; col <= shared->text_len; ++col ) shared->starts[row][col] = ((const char*)shared->text)[col - 1] == (char)row ? col : shared->starts[row][col - 1]; } } #define Xcj(j) shared->starts[ (unsigned char)pattern[shared->current_row - 1] ][j] void calculate_distance_guo(guo_worker_data_t* worker, guo_shared_t* shared) { // Distribute columns by thread const size_t equitative = (shared->text_len + 1) / shared->worker_count; const size_t remainder = (shared->text_len + 1) % shared->worker_count; // My first and last column to calculate const size_t my_first_col = worker->id * equitative + min(remainder, worker->id) + ! worker->id; const size_t my_last_col = (worker->id + 1) * equitative + min(remainder, worker->id + 1); // Cast the strings according to the unicode const char* text = (const char*) shared->text; const char* pattern = (const char*) shared->pattern; // All workers calculate rows until they run out // If there are no more rows to calculate, the team has finished the work // if ( worker->id == 0) // fprintf(stderr, "thrd crow row prev antp fcol lcol strt end\n"); while ( shared->current_row < shared->pattern_len + 1 ) { // The index in the matrix (not the diamond) of the current row const size_t row = shared->current_row % WINDOW_ROWS; const size_t prev = (shared->current_row + WINDOW_ROWS - 1) % WINDOW_ROWS; // fprintf(stderr, "%4zu %4zu %4zu %4zu %4zu %4zu %4zu %4zu %4zu\n", worker->id, shared->current_row, row, prev, antp, first_col, last_col, my_adj_start, my_adj_last); /* shared->unicode ? calculate_distance_guo_unicode(shared, row, prev, antp, my_adj_start, my_adj_last) : calculate_distance_guo_ascii (shared, row, prev, antp, my_adj_start, my_adj_last); */ // Avoid an if inside the next loop if ( worker->id == 0 ) shared->distances[row][0] = shared->current_row; // Update my distances using the formula (4) in (Guo et al. 2013) for ( size_t col = my_first_col; col < my_last_col; ++col ) { // First row is the distance from empty to the current text length /* if ( shared->current_row == 0 ) shared->distances[row][col] = col; // For pattern matching, change col by 0 // First column is the distance from empty to the current pattern length else if ( col == 0 ) shared->distances[row][col] = shared->current_row; // If in this cell both letters match, we keep the diagonal distance else*/ if ( text[col - 1] == pattern[shared->current_row - 1] ) shared->distances[row][col] = shared->distances[prev][col - 1]; // If I have calculated the previous cell, use it, as the original Levenshtein algorithm else if ( col > my_first_col ) shared->distances[row][col] = 1 + min3( shared->distances[prev][col], shared->distances[prev][col - 1], shared->distances[row ][col - 1]); // If it is the first occurrence of the current pattern character in the text else if ( Xcj(col) == 0 ) shared->distances[row][col] = 1 + min3( shared->distances[prev][col], shared->distances[prev][col - 1], shared->current_row + col - 1); // It is another occurrence of the char in text else shared->distances[row][col] = 1 + min3( shared->distances[prev][col], shared->distances[prev][col - 1], shared->distances[prev][Xcj(col) - 1] + (col - 1 - Xcj(col)) ); } // Make all threads to pass to the next row at the same time pthread_barrier_wait(&shared->barrier); // Only a thread updates the row index, we choose the first one if ( worker->id == 0 ) ++shared->current_row/*, fputc('\n', stderr)*/; // Wait again until the current row is updated pthread_barrier_wait(&shared->barrier); } }