Skip to main content

MPI 学习

14 min read

初识MPI

MPI介绍

  • MPI,全称Message Passing Interface(消息传递接口),是业界定义的一种消息传递标准,用于编写并行计算的程序,广泛应用于高性能计算领域。

环境准备

  • 安装过程(ubuntu 18.04)
  • sudo apt-get update
  • sudo apt-get install -y build-essential
  • gcc -v

MPICH

  • MPICH是MPI标准的一种重要的实现,可以免费从网上下载。MPICH的开发与MPI规范的制订是同步进行的,因此MPICH最能反映MPI的变化和发展。
  • sudo apt-get install -y mpich
  • mpicc -v

第一个MPI程序

  • sudo apt-get install -y vim
  • vim mpi.c

#include <mpi.h>

#include <stdio.h>

int main(int argc, char **argv)

{

MPI_Init(&argc, &argv);

printf("Hello World!\n");

MPI_Finalize();

return 0;

}

  • mpicc mpi.c -o mpi.o
  • mpirun -np 8 ./mpi.o
  • MPI程序和普通的C程序的区别在于有一个开始的函数和结束的函数来标识MPI部分,再在这个部分进行你想要进行的操作
  • int MPI_Init(int *argc, char **argv)
  • 通过MPI_Init函数进入MPI环境并完成所有的初始化工作,标志并行代码的开始。
  • int MPI_Finalize(void)
  • 通过MPI_Finalize函数从MPI环境中退出,标志并行代码的结束,如果不是MPI程序最后一条可执行语句,则运行结果不可知。

MPI学习

获取进程数量

  • 在MPI编程中,我们常常需要获取指定通信域的进程个数,以确定程序的规模。
  • 一组可以相互发送消息的进程集合叫做通信子,通常由MPI_Init()在用户启动程序时,定义由用户启动的所有进程所组成的通信子,缺省值为 MPI_COMM_WORLD。这个参数是MPI通信操作函数中必不可少的参数,用于限定参加通信的进程的范围。
  • int MPI_Comm_size(MPI_Comm comm, int *rank)
  • 获取指定通信域的进程个数。
  • 第一个参数是通信子,第二个参数返回进程的个数。

#include <stdio.h>

#include <mpi.h>

int main(int argc, char **argv)

{

int numprocs;

MPI_Init(&argc, &argv);

//your code here

MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

//end of your code

printf("Hello World! The number of processes is %d\n",numprocs);

MPI_Finalize();

return 0;

}

获取进程id

  • 常常需要输出当前进程的id,以此来判断具体哪个进程完成了对应的任务。
  • int MPI_Comm_rank(MPI_Comm comm, int *rank)
  • 获得当前进程在指定通信域中的编号,将自身与其他程序区分。
  • 第一个参数是通信子,第二个参数返回进程的编号。

#include <stdio.h>

#include <mpi.h>

int main(int argc, char **argv)

{

int myid, numprocs;

MPI_Init(&argc, &argv);

MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

//your code here

MPI_Comm_rank(MPI_COMM_WORLD, &myid);

//end of your code

printf("Hello World!I'm rank %d of %d\n", myid, numprocs);

MPI_Finalize();

return 0;

}

获取处理器名

  • 在实际处理中我们可能需要将进程迁移至不同的处理器,而MPI提供了获取处理器名的函数以简单地允许这种行为。
  • 注意在MPI中不需要定义这种迁移。
  • int MPI_Get_processor_name ( char *name, int *resultlen)
  • char *name : 实际节点的唯一说明字;
  • int *resultlen:在name中返回结果的长度;
  • 返回调用时调用所在的处理器名。

#include <stdio.h>

#include <mpi.h>

int main(int argc, char **argv)

{

int len;

char name[MPI_MAX_PROCESSOR_NAME];

MPI_Init(&argc, &argv);

//your code here

MPI_Get_processor_name (name, &len);

//end of your code

printf("Hello, world. I am %s.\n", name);

MPI_Finalize();

return 0;

}

运行时间

  • 在MPI编程我们可以使用MPI_Wtime函数在并行代码中计算运行时间,用MPI_Wtick来查看精度。
  • double MPI_Wtime(void)
  • MPI_WTIME返回一个用浮点数表示的秒数, 它表示从过去某一时刻到调用时刻所经历的时间。
  • double MPI_Wtick(void)
  • MPI_WTICK返回MPI_WTIME的精度,单位是秒,可以认为是一个时钟滴答所占用的时间。

#include<stdio.h>

#include<mpi.h>

int main(int argc, char **argv)

{

int myid, numprocs;

double start, finish;

MPI_Init(&argc, &argv);

MPI_Comm_rank(MPI_COMM_WORLD, &myid);

MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

//your code here

start = MPI_Wtime();

printf("The precision is: %f\n", MPI_Wtick());

finish = MPI_Wtime();

//your code here

printf("Hello World!I'm rank %d of %d, running %f seconds.\n", myid, numprocs, finish-start);

MPI_Finalize();

return 0;

}

同步

  • 在实际工作中,我们常常会因为许多原因需要进行同步操作。例如,希望保证所有进程中并行代码在某个地方同时开始运行,或者在某个函数调用结束之前不能返回。
  • int MPI_Barrier(MPI_Comm comm)
  • 阻止调用直到communicator中所有进程已经完成调用,
  • 就是说,任意一次进程的调用只能在所有communicator中的成员已经开始调用之后进行。

#include<stdio.h>

#include<mpi.h>

int main(int argc, char **argv)

{

int myid, numprocs;

double start, finish;

MPI_Init(&argc, &argv);

MPI_Comm_rank(MPI_COMM_WORLD, &myid);

MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

//your code here

MPI_Barrier(MPI_COMM_WORLD);

//end of your code

start = MPI_Wtime();

printf("The precision is: %f\n", MPI_Wtick());

finish = MPI_Wtime();

printf("Hello World!I'm rank %d of %d, running %f seconds.\n", myid, numprocs, finish-start);

MPI_Finalize();

return 0;

}

消息传递

  • 在并行编程中,消息传递占了很大的比重。良好的消息传递是正常完成进程/节点之间操作的基本条件。在这里先介绍的最基本发送/接收函数。
  • 最基本的发送/接收函数都是以缓冲区作为端点,通过参数配置来完成指定操作。
  • `int MPI_Send(void* msg_buf_p, int msg_size, MPI_Datatype msg_type, int dest, int tag, MPI_Comm communicator)

`

  • 发送缓冲区中的信息到目标进程。

void* msg_buf_p : 发送缓冲区的起始地址;

int buf_size : 缓冲区大小;

MPI_Datatype msg_type : 发送信息的数据类型;

int dest :目标进程的id值;

int tag : 消息标签;

MPI_Comm communicator : 通信子;

  • `int MPI_Recv(void* msg_buf_p, int buf_size, MPI_Datatype msg_type, int source, int tag, MPI_Comm communicator, MPI_Status *status_p)

`

  • 发送缓冲区中的信息到目标进程。

void* msg_buf_p : 缓冲区的起始地址;

int buf_size : 缓冲区大小;

MPI_Datatype msg_type : 发送信息的数据类型;

int dest :目标进程的id值;

int tag : 消息标签;

MPI_Comm communicator : 通信子;

MPI_Status *status_p : status_p对象,包含实际接收到的消息的有关信息

  • 实验说明:
  • 在这里我们把id为0的进程当作根进程,然后在除此之外的进程中使用函数MPI_Send发送一句"hello world!"到根进程中,然后在根进程中把这些信息打印出来。

#include <stdio.h>

#include <mpi.h>

int main(int argc, char **argv)

{

int myid, numprocs, source;

MPI_Status status;

char message[100];

MPI_Init(&argc, &argv);

MPI_Comm_rank(MPI_COMM_WORLD, &myid);

MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

if(myid != 0) {

strcpy(message, "hello world!");

//your code here

MPI_Send(message, strlen(message)+1, MPI_CHAR, 0, 99, MPI_COMM_WORLD);

//end of your code

}

else { //myid == 0

for(source=1; source<numprocs; source++) {

//your code here

MPI_Recv(message, 100, MPI_CHAR, source, 99, MPI_COMM_WORLD, &status);

//end of your code

printf("%s\n", message);

}

}

MPI_Finalize();

return 0;

}

地址偏移量

  • 在通信操作中,我们常常需要对地址进行传递或操作,例如传送/接收缓冲区。
  • 而一个位置在内存中的地址可以通过MPI_ADDRESS函数获得。
  • int MPI_Address(void* location, MPI_Aint *address)
  • void* location : 调用者的内存位置;
  • MPI_Aint *address:位置的对应地址;
  • 实验说明:
  • 给出三个临时变量a, b, c, 分别求出a与b、a与c之间的地址偏移量。
  • 输出结果:
  • 由于这里采用的变量类型为int,所以如果变量地址是连续的话应该是:

The distance between a and b is 4

The distance between a and c is 8


#include<stdio.h>

#include<mpi.h>

int main(int argc, char **argv)

{

int myid, numprocs;

MPI_Aint address1, address2, address3;

int a, b, c, dist1, dist2;

a = 1;

b = 2;

c = 3;

MPI_Init(&argc, &argv);

MPI_Comm_rank(MPI_COMM_WORLD, &myid);

MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

// your code here

MPI_Address(&a, &address1);

MPI_Address(&b, &address2);

MPI_Address(&c, &address3);

// end of your code

dist1 = address2 - address1 ;

dist2 = address3 - address1 ;

if(myid == 0) {

printf("The distance between a and b is %d\n", dist1);

printf("The distance between a and c is %d\n", dist2);

}

MPI_Finalize();

return 0;

}

数据的打包(pack)

  • 有时候我们希望将不连续的数据或是不相同的数据类型的数据一起发送到其他进程,而不是效率很低地逐个发送。
  • 一个解决这个问题的方案是将数据封装成包,再将数据包放到一个连续的缓冲区,发送到接收缓冲区后再提取出来尽心解包。
  • 值得注意的是,打包/解包函数有时候还会用来代替系统缓存策略。此外,对于在MPI顶层进一步开发附加的通信库会起到辅助的作用。

int MPI_Pack(void* inbuf, int incount, MPI_datatype datatype, void *outbuf, int outcount, int *position, MPI_Comm comm)

void* inbuf : 输入缓冲区地址;

int incount :输入数据项数目;

MPI_datatype datatype :数据项的类型;

void *outbuf :输出缓冲区地址;

int outcount :输出缓冲区大小;

int *position :缓冲区当前位置;

MPI_Comm comm :通信子;

  • 实验说明:
  • 在源进程中打包发送一个数据包到进程1,进程1解包并打印出数据。
  • 输出结果:

The number is 1 and 2


#include <stdio.h>

#include <mpi.h>

int main(int argc, char **argv)

{

int myid, numprocs, source;

MPI_Status status;

int i, j, position;

int k[2];

int buf[1000];

MPI_Init(&argc, &argv);

MPI_Comm_rank(MPI_COMM_WORLD, &myid);

MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

i = 1;

j = 2;

if(myid == 0) {

position = 0 ;

// your code here

MPI_Pack(&i, 1, MPI_INT, buf, 1000, &position, MPI_COMM_WORLD);

MPI_Pack(&j, 1, MPI_INT, buf, 1000, &position, MPI_COMM_WORLD);

// end of your code

MPI_Send(buf, position, MPI_PACKED, 1, 99, MPI_COMM_WORLD);

}

else if (myid == 1){

MPI_Recv(k, 2, MPI_INT, 0, 99, MPI_COMM_WORLD, &status);

position = 0 ;

MPI_Unpack(k, 2, &position, &i, 1, MPI_INT, MPI_COMM_WORLD);

MPI_Unpack(k, 2, &position, &j, 1, MPI_INT, MPI_COMM_WORLD);

printf("The number is %d and %d", i, j);

}

MPI_Finalize();

return 0;

}

数据的解包(unpack)

  • 解包是对应于打包的MPI操作。
  • 需要特别注意的是:
  • MPI_RECV和MPI_UNPACK的区别:
  • 在MPI_RECV中, count参数指明的是可以接收的最大项数. 实际接收的项数是由接收的消息的长度来决定的.
  • 在MPI_UNPACK中, count参数指明实际打包的项数; 相应消息的"size"是position的增加值.
  • 这种改动的原因是"输入消息的大小" 直到用户决定如何解包之前是不能预先确定的;
  • 从解包的项数来确定"消息大小"也是很困难的。

int MPI_Unpack(void* inbuf, int insize, int *position, void *outbuf, int outcount, MPI_Datatype datatype, MPI_Comm comm)

void* inbuf : 输入缓冲区地址;

int insize :输入数据项数目;

MPI_datatype datatype :数据项的类型;

void *outbuf :输出缓冲区地址;

int outcount :输出缓冲区大小;

int *position :缓冲区当前位置;

MPI_Comm comm :通信子;

规约(reduce)

  • 在现实生活中,我们常常需要对于数据做同一种操作,并将结果返回到指定的进程中,这个过程称为集合通信。例如,将数据分散到各个进程中,先在各个进程内进行求和,再在全局完成求和-平均这个操作,这个过程是一个规约的过程。
  • 一般来说,集合通信包括通信、同步和计算三个功能。不过,目前我们暂时不需要关注整个过程,而是先使用一个规约函数去体验一下集合通信。
  • int MPI_Reduce(void * input_data_p, void * output_data_p, int count, MPI_Datatype datatype, MPI_Op operator, int dest_process, MPI_Comm comm)
  • 规约函数,所有进程将待处理数据通过输入的操作子operator计算为最终结果并将它存入目标进程中。

void * input_data_p : 每个进程的待处理数据存放在input_data_p中;

void * output_data_p : 存放最终结果的目标进程的地址;

int count : 缓冲区中的数据个数;

MPI_Datatype datatype : 数据项的类型;

MPI_Op operator : 操作子,例如加减;

int dest_process : 目标进程的编号;

  • 实验说明:
  • 使用函数MPI_Reduce来完成加法规约到根进程的操作,并在根进程打印出总和和平均值。
  • 输出结果:
  • 由于这里是测试用例,所以每个进程的数值都是取3.0。所以,输出结果应该是总和等于进程数乘以3,平均值应该是3。

#include <stdio.h>

#include <mpi.h>

int main(int argc, char **argv)

{

int myid, numprocs;

double local_num = 3.0;

MPI_Init(&argc, &argv);

MPI_Comm_rank(MPI_COMM_WORLD, &myid);

MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

double global_num;

//your code here

MPI_Reduce(&local_num, &global_num, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

//end of your code

if(myid == 0) {

printf("Total sum = %f, avg = %f\n", global_num, global_num / numprocs);

}

MPI_Finalize();

return 0;

}

广播(broadcast)

  • 在一个集合通信中,如果属于一个进程的数据被发送到通信子中的所有进程,这样的集合通信叫做广播。
  • int MPI_Bcast(void* buffer, int count, MPI_Datatype datatype, int source, MPI_Comm comm)
  • 广播函数,从一个id值为source的进程将一条消息广播发送到通信子内的所有进程,包括它本身在内。

void*  buffer   缓冲区的起始地址;

int   count    缓冲区中的数据个数;

MPI_Datatype datatype   缓冲区中的数据类型;

int   source    发送信息的进程id;

MPI_Comm comm    通信子;

  • 实验说明:
  • 使用函数MPI_Bcast在根进程中发送一个数组到其他进程,并在其他进程中打印出来。
  • 输出结果:
  • 输出应该是这样的格式:

In process 1, arr[0]=1 arr[1]=2 arr[2]=3 arr[3]=4 arr[4]=5

In process 3, arr[0]=1 arr[1]=2 arr[2]=3 arr[3]=4 arr[4]=5

...

In process n, arr[0]=1 arr[1]=2 arr[2]=3 arr[3]=4 arr[4]=5


#include<stdio.h>

#include<mpi.h>

int main(int argc, char **argv)

{

int myid, numprocs;

int source = 0;

int array[5]={1,2,3,4,5};

int i;

MPI_Init(&argc, &argv);

MPI_Comm_rank(MPI_COMM_WORLD, &myid);

MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

if(myid == source) {

for(i = 1; i <= 5; i++)

array[i] = i;

}

//your code here

MPI_Bcast(array, 5, MPI_INT, source, MPI_COMM_WORLD);

//end of your code

if(myid != source) {

printf("In process %d, ", myid);

for(i = 0; i < 5; i++)

printf("arr[%d]=%d\t", i, array[i]);

printf("\n");

}

MPI_Finalize();

return 0;

}

收集(gather)

  • 同样,有时候我们希望在一个进程中从所有进程获取信息,例如将所有进程中的一个数组都收集到根进程中作进一步的处理,这样的集合通信我们叫做收集。
  • `int MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendtype,

void* recvbuf, int recvcount, MPI_Datatype recvtype,

int root, MPI_Comm comm)`

  • 收集函数,根进程(目标进程)从所有进程(包括它自己)收集发送缓冲区的数据,根进程根据发送这些数据的进程id将它们依次存放到自已的缓冲区中.

void* sendbuf 发送缓冲区的起始地址

int sendcount 发送缓冲区的数据个数

MPI_Datatype sendtype 发送缓冲区的数据类型

void* recvbuf 接收缓冲区的起始地址

int recvcount 待接收的元素个数

MPI_Datatype recvtype 接收的数据类型

int root 接收进程id

MPI_Comm comm 通信子

  • 实验说明:
  • 使用函数MPI_Gather在根进程中从所有进程接收一个数组,并在根进程中打印出来。
  • 输出结果:输出应该是这样的格式:

Now is process 1's data: arr[0]=1 arr[1]=2 arr[2]=3 arr[3]=4 arr[4]=5

Now is process 4's data: arr[0]=1 arr[1]=2 arr[2]=3 arr[3]=4 arr[4]=5

Now is process 2's data: arr[0]=1 arr[1]=2 arr[2]=3 arr[3]=4 arr[4]=5

...

Now is process n's data: arr[0]=1 arr[1]=2 arr[2]=3 arr[3]=4 arr[4]=5

...


#include<stdio.h>

#include<mpi.h>

int main(int argc, char **argv)

{

int myid, numprocs;

int dest = 0;

int array[5]={1,2,3,4,5};

int *rbuf;

int i,j;

MPI_Init(&argc, &argv);

MPI_Comm_rank(MPI_COMM_WORLD, &myid);

MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

if(myid == dest) {

rbuf=(int *)malloc(numprocs*5*sizeof(int));

}

//your code here

MPI_Gather(array, 5, MPI_INT, rbuf, 5, MPI_INT, dest, MPI_COMM_WORLD);

//end of your code

if(myid == dest) {

for(i=dest+1;i<numprocs;i++) {

printf("Now is process %d's data: ", i);

for(j=0;j<5;j++) {

printf("array[%d]=%d\t", j, rbuf[i*5+j]);

}

printf("\n");

}

}

MPI_Finalize();

return 0;

}

14. 散发(scatter)

  • 在前面我们学习了收集(gather)操作,那么与之相对应也有一个相反的集合通信操作,即根进程向所有进程发送缓冲区的数据,称为散发。
  • 需要特别说明的是,散发操作和广播操作的区别在于发送到各个进程的信息可以是不同的
  • `int MPI_Scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype,

void* recvbuf, int recvcount, MPI_Datatype recvtype,

int root, MPI_Comm comm)`

  • MPI_SCATTER是MPI_GATHER的逆操作,另外一种解释是根进程通过MPI_Send发送一条消息,这条消息被分成n等份,第i份发送给组中的第i个处理器, 然后每个处理器如上所述接收相应的消息。

void* sendbuf 发送缓冲区的起始地址

int sendcount 发送的数据个数

MPI_Datatype sendtype 发送缓冲区中的数据类型

void* recvbuf 接收缓冲区的起始地址

int recvcount 待接收的元素个数

MPI_Datatype recvtype 接收的数据类型

int root 发送进程id

MPI_Comm comm 通信子

  • 实验说明:
  • 使用函数MPI_Scatter在根进程中向所有进程发送对应数组,并在对应进程中打印出来。
  • 输出结果:输出应该是这样的格式:

Now is process 1: arr[0]=5 arr[1]=6 arr[2]=7 arr[3]=8 arr[4]=9

Now is process 4: arr[0]=20 arr[1]=21 arr[2]=22 arr[3]=23 arr[4]=24

Now is process 2: arr[0]=10 arr[1]=11 arr[2]=12 arr[3]=13 arr[4]=14

...

Now is process n: arr[0]=5n arr[1]=5n+1 arr[2]=5n+2 arr[3]=5n+3 arr[4]=5*n+4

...


#include<stdio.h>

#include<mpi.h>

int main(int argc, char **argv)

{

int myid, numprocs;

int source = 0;

int *sbuf;

int rbuf[5];

int i;

MPI_Init(&argc, &argv);

MPI_Comm_rank(MPI_COMM_WORLD, &myid);

MPI_Comm_size(MPI_COMM_WORLD, &numprocs);

if(myid == source) {

sbuf=(int *)malloc(numprocs*5*sizeof(int));

for(i=0;i<numprocs*5;i++) {

sbuf[i]=i;

}

}

// your code here

MPI_Scatter(sbuf, 5, MPI_INT, rbuf, 5, MPI_INT, source, MPI_COMM_WORLD);

// end of your code

printf("Now is process %d: ", myid);

for(i=0;i<5;i++) {

printf("array[%d]=%d\t", i, rbuf[i]);

}

printf("\n");

MPI_Finalize();

return 0;

}

15. 组的管理-创建(1)

Loading Comments...