created: 2025-05-26 tags:

  • 程序设计
  • 并行
  • 算法

奇偶排序 ( MPI实现 )

  • 待排序数字 写到file里面 0号进程读取, 分发给各个进程 ; 每个进程按照奇偶排序号之后,再交还给0号进程,结果写入新file
  • 要求: 排序量在1000个以上
  • 包含: mpi程序 + 输入file , 输出file

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int compare_integers(const void *a, const void *b) {
    return (*(int*)a - *(int*)b);
}

// 插入排序实现
void insertion_sort(int arr[], int n) {
    int i, key, j;
    for (i = 1; i < n; i++) {
        key = arr[i];
        j = i - 1;
        while (j >= 0 && arr[j] > key) {
            arr[j + 1] = arr[j];
            j = j - 1;
        }
        arr[j + 1] = key;
    }
}

// 合并两个已排序数组
void merge_sorted_arrays(int arr1[], int n1, int arr2[], int n2, int result[]) {
    int i = 0, j = 0, k = 0;
    
    while (i < n1 && j < n2) {
        if (arr1[i] <= arr2[j]) {
            result[k++] = arr1[i++];
        } else {
            result[k++] = arr2[j++];
        }
    }
    
    while (i < n1) {
        result[k++] = arr1[i++];
    }
    
    while (j < n2) {
        result[k++] = arr2[j++];
    }
}

int Compute_partner(int phase, int my_rank, int comm_sz) {
    int partner;
    if (phase % 2 == 0) {
        if (my_rank % 2 != 0) { 
            partner = my_rank - 1;
        } else { 
            partner = my_rank + 1;
        }
    } else { 
        if (my_rank % 2 != 0) { 
            partner = my_rank + 1;
        } else {
            partner = my_rank - 1;
        }
    }
    if (partner == -1 || partner == comm_sz) {
        partner = MPI_PROC_NULL;
    }
    return partner;
}


int main(int argc, char* argv[]) {
    int my_rank, comm_sz;
    int N = 0; // 元素总数
    int local_n; // 当前进程的元素数量
    int *global_data = NULL;
    int *local_keys = NULL;
    int *sorted_global_data = NULL;
    char input_filename[] = "input.txt";
    char output_filename[] = "output.txt";

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
    MPI_Comm_size(MPI_COMM_WORLD, &comm_sz);

    if (my_rank == 0) {
        FILE *infile = fopen(input_filename, "r");
        if (!infile) {
            fprintf(stderr, "打开输入文件 %s 出错\n", input_filename);
            MPI_Abort(MPI_COMM_WORLD, 1);
        }
        
        // 先读取文件计算元素个数 (知道数量了才方便malloc数组来存放)
        int temp;
        while (fscanf(infile, "%d", &temp) == 1) {
            N++;
        }
        
        if (N <= 0) {
            fprintf(stderr, "输入文件为空或格式错误\n");
            fclose(infile);
            MPI_Abort(MPI_COMM_WORLD, 1);
        }
        
        // 重新读取数据
        rewind(infile);
        global_data = (int*)malloc(N * sizeof(int));
        for (int i = 0; i < N; i++) {
            fscanf(infile, "%d", &global_data[i]);
        }
        fclose(infile);
    }

    // 只广播总元素数量
    MPI_Bcast(&N, 1, MPI_INT, 0, MPI_COMM_WORLD);
    
    // 每个进程独立计算自己的数据分配
    int base_n = N / comm_sz;
    int remainder = N % comm_sz;
    local_n = base_n + (my_rank < remainder ? 1 : 0);
    
    local_keys = (int*)malloc(local_n * sizeof(int));

    // 计算分配信息用于 Scatterv 和 Gatherv
    int *local_n_array = (int*)malloc(comm_sz * sizeof(int));
    int *displacements = (int*)malloc(comm_sz * sizeof(int));
    
    for (int i = 0; i < comm_sz; i++) {
        local_n_array[i] = base_n + (i < remainder ? 1 : 0);
        displacements[i] = (i == 0) ? 0 : displacements[i-1] + local_n_array[i-1];
    }

    MPI_Scatterv(global_data, local_n_array, displacements, MPI_INT,
                 local_keys, local_n, MPI_INT, 0, MPI_COMM_WORLD);

    if (my_rank == 0) {
        free(global_data);
    }

    // 初始本地排序 
    insertion_sort(local_keys, local_n);

    // 并行奇偶排序阶段 - 需要调整以处理不同大小的local_n
    for (int phase = 0; phase < comm_sz; phase++) { 
        int partner = Compute_partner(phase, my_rank, comm_sz); 

        if (partner != MPI_PROC_NULL) {
            int partner_n = local_n_array[partner];
            int* partner_keys = (int*)malloc(partner_n * sizeof(int));
            int* temp_merged_keys = (int*)malloc((local_n + partner_n) * sizeof(int));

            MPI_Sendrecv(local_keys, local_n, MPI_INT, partner, 0,
                           partner_keys, partner_n, MPI_INT, partner, 0,
                           MPI_COMM_WORLD, MPI_STATUS_IGNORE); 

            if (my_rank < partner) {
                merge_sorted_arrays(local_keys, local_n, partner_keys, partner_n, temp_merged_keys);
                memcpy(local_keys, temp_merged_keys, local_n * sizeof(int));
            } else {
                merge_sorted_arrays(partner_keys, partner_n, local_keys, local_n, temp_merged_keys);
                memcpy(local_keys, temp_merged_keys + partner_n, local_n * sizeof(int));
            }

            free(partner_keys);
            free(temp_merged_keys);
        }
    }

    if (my_rank == 0) {
        sorted_global_data = (int*)malloc(N * sizeof(int));
    }

    MPI_Gatherv(local_keys, local_n, MPI_INT,
                sorted_global_data, local_n_array, displacements, MPI_INT, 0, MPI_COMM_WORLD);

    if (my_rank == 0) {
        FILE *outfile = fopen(output_filename, "w");
        if (!outfile) {
            fprintf(stderr, "打开输出文件 %s 出错\n", output_filename);
        } else {
            for (int i = 0; i < N; i++) {
                fprintf(outfile, "%d ", sorted_global_data[i]);
            }
            fprintf(outfile, "\n");
            fclose(outfile);
        }
        free(sorted_global_data);
    }

    free(local_keys);
    free(local_n_array);
    free(displacements);
    MPI_Finalize();
    return 0;
}
最后修改:2025 年 05 月 29 日
如果觉得我的文章对你有用,请随意赞赏