Skip to content

赵隐剑老师_并行编程入门实践课程 02

其它 MPI 教程: * Paul Norvig's basic Guide * MPI tutorial 本篇笔记主要参考的是这个


MPI

MPI(Message Passing Interface),主要是处理进程之间消息通信。在 MPI 中,通常运行同一个程序的多个实例,每个实例是一个 MPI 进程。与线程不同的是,每个进程都有自己的独立内存空间。

通讯器(communicator)。通讯器定义了一组能够互相发消息的进程。在这组进程中,每个进程会被分配一个序号,称作秩(rank),通常从 0 开始,到 size-1,其中 size 是该通信器中进程的总数。默认的通信器是 MPI_COMM_WORLD,它包含了所有的进程。

进程之间通过消息传递来交换数据。MPI 提供了多种消息传递函数,包括阻塞和非阻塞的、点对点和集合通信的消息传递方式。

通信的基础建立在不同进程间发送和接收操作。一个进程可以通过指定另一个进程的秩以及一个独一无二的消息标签(tag)来发送消息给另一个进程。接受者可以发送一个接收特定标签标记的消息的请求(或者也可以完全不管标签,接收任何消息),然后依次处理接收到的数据。类似这样的涉及一个发送者以及一个接受者的通信被称作点对点(point-to-point)通信。

当然在很多情况下,某个进程可能需要跟所有其他进程通信。比如主进程想发一个广播给所有的从进程。在这种情况下,手动去写一个个进程点对点的信息传递就显得很笨拙。而且事实上这样会导致网络利用率低下。MPI 有专门的接口来帮我们处理这类所有进程间的集体性(collective)通信。

编译:

mpic++ main.cpp -o main
mpirun -np 4 ./main

Initializing and Finalizing MPI

  • MPI_Init: Initializes the MPI environment. It must be called before any other MPI functions.
  • MPI_Finalize: Cleans up the MPI environment. After calling this, no more MPI functions can be used.
#include <mpi.h>
#include <iostream>

int main(int argc, char** argv) {
    MPI_Init(&argc, &argv);  // Initialize MPI

    // ... MPI code here ...

    MPI_Finalize();  // Finalize MPI
    return 0;
}

Querying Process Information

  • MPI_Comm_size: Returns the total number of processes in a given communicator.
  • MPI_Comm_rank: Returns the rank of the current process within that communicator.
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);

int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);

std::cout << "Hello from rank " << world_rank << " out of " << world_size << "!\n";

Synchronization

  • MPI_Barrier: Blocks until all processes in the communicator have reached the barrier. Useful for synchronization points in the code.
MPI_Barrier(MPI_COMM_WORLD);
// All processes reach this point before continuing.

Sending and Receiving Messages

Point-to-Point Communication

  • MPI_Send:阻塞发送。在 MPI 库将数据安全复制出用户缓冲区后,函数返回。
  • MPI_Recv:阻塞接收。在接收到数据后,函数返回。

Parameters often include the buffer, count, datatype, destination/source rank, tag, and communicator.

#include <mpi.h>        // 包含MPI库的头文件
#include <iostream>    
int main(int argc, char** argv) {
    MPI_Init(&argc, &argv);

    // 获取当前进程的总数(即运行的进程数量)
    int world_size;
    MPI_Comm_size(MPI_COMM_WORLD, &world_size);

    // 获取当前进程的排名(唯一标识符)
    int world_rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);

    // 如果进程排名为0,执行发送操作
    if (world_rank == 0) {
        int data = 42; // 要发送的数据

        /*
         * MPI_Send函数用于发送消息。
         * 参数解释:
         * &data         - 指向要发送的数据的指针
         * 1             - 发送的数据元素数量
         * MPI_INT       - 数据类型(整数)
         * 1             - 目标进程的排名(接收方为排名1的进程)
         * 0             - 消息标签(用于标识消息类型,可用于匹配发送和接收)
         * MPI_COMM_WORLD - 通信域(所有进程都在这个通信域中)
         */
        MPI_Send(&data, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
        std::cout << "Rank 0 sent data: " << data << " to rank 1\n";
    }
    // 如果进程排名为1,执行接收操作
    else if (world_rank == 1) {
        int recv_data; // 用于存储接收到的数据

        /*
         * MPI_Recv函数用于接收消息。
         * 参数解释:
         * &recv_data          - 指向用于存储接收数据的缓冲区的指针
         * 1                   - 接收的数据元素数量
         * MPI_INT             - 数据类型(整数)
         * 0                   - 源进程的排名(发送方为排名0的进程)
         * 0                   - 消息标签(需要与发送时的标签匹配)
         * MPI_COMM_WORLD      - 通信域(所有进程都在这个通信域中)
         * MPI_STATUS_IGNORE    - 忽略状态信息(不需要接收消息的状态)
         */
        MPI_Recv(&recv_data, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        std::cout << "Rank 1 received data: " << recv_data << "\n";
    }

    MPI_Finalize();
    return 0;
}

集合通信

集合通信涉及通信器中的所有进程,常用的函数包括:

  • MPI_Bcast:从一个进程向所有其他进程广播数据。
  • MPI_Scatter:将一个数组的不同部分分发到各个进程。
  • MPI_Gather:将各进程的数据收集到一个进程中。
  • MPI_Allgather:收集所有进程的数据,并将结果分发到所有进程。
  • MPI_Reduce:对所有进程的数据进行归约操作(如求和、最大值、最小值),结果存储到一个进程中。
  • MPI_Allreduce:类似 MPI_Reduce,但将结果分发到所有进程。
int data;
if (world_rank == 0) {
    data = 100;
}
// Broadcast data from process 0 to all others
    /*
     * MPI_Bcast函数用于在所有进程之间广播数据。
     * 参数解释:
     * &data         - 指向要广播的数据的指针
     * 1             - 广播的数据元素数量
     * MPI_INT       - 数据类型(整数)
     * 0             - 广播的根进程的排名(即进程0将数据广播给所有其他进程)
     * MPI_COMM_WORLD - 通信域(所有进程都在这个通信域中)
     *
     * 重要说明:
     * MPI_Bcast是一个集体通信操作,这意味着所有参与通信的进程都必须调用这个函数。
     * 即使某些进程不需要发送数据,它们仍然需要参与广播,以确保通信的同步性。
     */
MPI_Bcast(&data, 1, MPI_INT, 0, MPI_COMM_WORLD);
std::cout << "Process " << world_rank << " received data: " << data << "\n";

MPI_Bcast内部机制会自动处理数据的发送和接收,根进程发送数据,其他进程接收数据。无需在代码中显式区分发送和接收过程。

计算 \(\pi\)

对应 cpp 程序:

#include <mpi.h>
#include <omp.h>

#include <iostream>
#include <cmath>
#include <cstdint> // For fixed-width integer types

// RAII wrapper for MPI
class MPIEnvironment {
public:
    MPIEnvironment(int& argc, char**& argv) {
        MPI_Init(&argc, &argv);
    }
    ~MPIEnvironment() {
        MPI_Finalize();
    }
};

int main(int argc, char** argv) {
    MPIEnvironment mpi_env(argc, argv);

    int mpi_i = 0;        // Rank of the current MPI process
    int mpi_n = 0;        // Total number of MPI processes
    MPI_Comm_rank(MPI_COMM_WORLD, &mpi_i);
    MPI_Comm_size(MPI_COMM_WORLD, &mpi_n);

    // Using a 64-bit integer (long long) for large n
    // Original code: n = 1000000000000
    // Note: This is a large number and may take a long time to run in practice.
    long long n = 1000000000000LL;

    // pi0: High-precision approximation of pi from the Fortran code
    // We'll keep this as a long double for higher precision in comparisons.
    constexpr long double pi0 = 3.14159265358979323846264338327950288419716939937510582097L;

    // Divide work among MPI processes
    // Each process computes a portion of the integral from i1 to i2
    double n_part = static_cast<double>(n) / static_cast<double>(mpi_n);
    long long n_part_int = static_cast<long long>(n_part);

    long long i1 = mpi_i * n_part_int + 1;
    long long i2 = i1 + n_part_int - 1;
    if (mpi_i == mpi_n - 1) {
        i2 = n;
    }

    MPI_Barrier(MPI_COMM_WORLD);

    // We are integrating f(x) = 4/(1+x²) from 0 to 1 to approximate π
    // dx = 1/n
    double dx = 1.0 / static_cast<double>(n);
    double pi = 0.0;
    double pi_sum = 0.0;

    // Use MPI_Wtime for timing
    double t = 0.0;
    if (mpi_i == 0) {
        t = MPI_Wtime();
    }

    // OpenMP parallel loop with reduction
    // Each thread adds to `pi` and at the end of the parallel loop,
    // the local results are combined.
    #pragma omp parallel for reduction(+:pi) default(none) shared(i1, i2, dx)
    for (long long i = i1; i <= i2; ++i) {
        double x = static_cast<double>(i) * dx;
        pi += 1.0 / (1.0 + x * x);
    }

    // Multiply by dx and 4 to get the local estimate of π
    pi = pi * 4.0 * dx;

    // Reduce all partial results to process 0
    MPI_Reduce(&pi, &pi_sum, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

    // On the root process, print the results
    if (mpi_i == 0) {
        double elapsed = MPI_Wtime() - t;
        double error = std::abs(pi_sum - static_cast<double>(pi0)) / static_cast<double>(pi0);

        // Print: Number of processes, OMP threads, time, relative error, and computed pi
        std::cout << mpi_n << " " 
                  << omp_get_max_threads() << " "
                  << elapsed << " "
                  << error << " "
                  << pi_sum << "\n";
    }

    return 0;
}