实验操作

  1. 登录集群
1
2
3
4
ssh -p 9922 用户名@172.28.9.54
cd data
mkdir mpi
cd mpi
  1. 创建作业文件
1
2
cat > sparse.cpp
cat > sparse.pbs
  1. 分别粘贴作业代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
#include <mpi.h>
#include <iostream>
#include <fstream>
#include <cmath>
#include <vector>
#include <cstdlib>
#include <algorithm>

using namespace std;

// 读取CSR格式矩阵
void read_matrix(const string& filename, int& n, vector<double>& values,
vector<int>& row_ptr, vector<int>& col_idx) {
ifstream fin(filename.c_str());
if (!fin) {
cerr << "Failed to open matrix file: " << filename << endl;
exit(1);
}
fin >> n;
int nnz;
fin >> nnz;
values.resize(nnz);
row_ptr.resize(n + 1);
col_idx.resize(nnz);
for (int i = 0; i <= n; i++) fin >> row_ptr[i];
for (int i = 0; i < nnz; i++) fin >> col_idx[i];
for (int i = 0; i < nnz; i++) fin >> values[i];
fin.close();
}

// 读取向量b
void read_vector(const string& filename, int n, vector<double>& b) {
ifstream fin(filename.c_str());
if (!fin) {
cerr << "Failed to open vector file: " << filename << endl;
exit(1);
}
b.resize(n);
for (int i = 0; i < n; i++) fin >> b[i];
fin.close();
}

int main(int argc, char* argv[]) {
MPI_Init(&argc, &argv);

int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);

if (argc != 3) {
if (rank == 0)
cerr << "Usage: " << argv[0] << " matrix.txt vector.txt" << endl;
MPI_Finalize();
return 1;
}

string matrix_file = argv[1];
string vector_file = argv[2];

// 全局矩阵参数
int n = 0;
vector<double> values;
vector<int> row_ptr, col_idx;
vector<double> b;

// 仅主进程读取矩阵和向量
if (rank == 0) {
read_matrix(matrix_file, n, values, row_ptr, col_idx);
read_vector(vector_file, n, b);
}

// 广播矩阵维度
MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD);

// 冗余存储:所有进程分配完整向量空间
vector<double> x(n, 0.0);
vector<double> r(n);
vector<double> p(n);
vector<double> Ap(n);
vector<double> b_local(n);

// 广播b向量(冗余存储)
if (rank == 0) {
for (int i = 0; i < n; i++) b_local[i] = b[i];
}
MPI_Bcast(b_local.data(), n, MPI_DOUBLE, 0, MPI_COMM_WORLD);

// 广播CSR矩阵数据(所有进程存储完整矩阵)
int nnz = 0;
if (rank == 0) nnz = values.size();
MPI_Bcast(&nnz, 1, MPI_INT, 0, MPI_COMM_WORLD);
if (rank != 0) {
values.resize(nnz);
row_ptr.resize(n + 1);
col_idx.resize(nnz);
}
MPI_Bcast(values.data(), nnz, MPI_DOUBLE, 0, MPI_COMM_WORLD);
MPI_Bcast(row_ptr.data(), n + 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(col_idx.data(), nnz, MPI_INT, 0, MPI_COMM_WORLD);

// 确定每个进程的行块范围(矩阵按行分块,用于SpMV)
int rows_per_proc = n / size;
int remainder = n % size;
int start_row = rank * rows_per_proc + min(rank, remainder);
int end_row = start_row + rows_per_proc + (rank < remainder ? 1 : 0);

// 准备 Allgatherv 的参数(用于收集Ap片段)
vector<int> recv_counts(size), displs(size);
int total = 0;
for (int i = 0; i < size; i++) {
int rows = rows_per_proc + (i < remainder ? 1 : 0);
recv_counts[i] = rows;
displs[i] = total;
total += rows;
}

// 初始化:x=0, r=b, p=b(所有进程独立初始化,结果一致)
for (int i = 0; i < n; i++) {
x[i] = 0.0;
r[i] = b_local[i];
p[i] = b_local[i];
}

// 计算初始残差内积(全局规约)
double rho0 = 0.0;
for (int i = 0; i < n; i++) rho0 += r[i] * r[i];
double global_rho0;
MPI_Allreduce(&rho0, &global_rho0, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);

double tol = 1e-8;
int max_iter = 1000;
if (sqrt(global_rho0) < tol) {
if (rank == 0) cout << "Initial residual satisfies tolerance" << endl;
MPI_Finalize();
return 0;
}

double rho_old = global_rho0;
int iter;
bool converged = false;

// 计时变量
double total_compute_time = 0.0;
double total_comm_time = 0.0;
double start_time = MPI_Wtime();

// 共轭梯度主循环
for (iter = 0; iter < max_iter; iter++) {
// ========== 计算阶段:SpMV ==========
double compute_start = MPI_Wtime();

// 每个进程计算自己的Ap片段(使用完整的p向量)
vector<double> Ap_local(n, 0.0);
for (int i = start_row; i < end_row; i++) {
double sum = 0.0;
for (int j = row_ptr[i]; j < row_ptr[i + 1]; j++) {
sum += values[j] * p[col_idx[j]];
}
Ap_local[i] = sum;
}

double compute_end = MPI_Wtime();
total_compute_time += (compute_end - compute_start);

// ========== 通信阶段:Allgatherv 收集Ap ==========
double comm_start = MPI_Wtime();

MPI_Allgatherv(Ap_local.data() + start_row, end_row - start_row, MPI_DOUBLE,
Ap.data(), recv_counts.data(), displs.data(),
MPI_DOUBLE, MPI_COMM_WORLD);

double comm_end = MPI_Wtime();
total_comm_time += (comm_end - comm_start);

// ========== 计算阶段:点积 ==========
compute_start = MPI_Wtime();

double pAp_local = 0.0;
for (int i = 0; i < n; i++) pAp_local += p[i] * Ap[i];

compute_end = MPI_Wtime();
total_compute_time += (compute_end - compute_start);

// ========== 通信阶段:Allreduce 规约 pAp ==========
comm_start = MPI_Wtime();

double pAp_global;
MPI_Allreduce(&pAp_local, &pAp_global, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);

comm_end = MPI_Wtime();
total_comm_time += (comm_end - comm_start);

if (fabs(pAp_global) < 1e-15) {
double rho_check = 0.0;
for (int i = 0; i < n; i++) rho_check += r[i] * r[i];
double global_rho_check;
MPI_Allreduce(&rho_check, &global_rho_check, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);
if (sqrt(global_rho_check) < tol) {
converged = true;
}
break;
}

// 4. 步长 alpha
double alpha = rho_old / pAp_global;

// ========== 计算阶段:向量更新 ==========
compute_start = MPI_Wtime();

// 更新 x 和 r(所有进程独立更新完整向量,无通信)
for (int i = 0; i < n; i++) {
x[i] += alpha * p[i];
r[i] -= alpha * Ap[i];
}

compute_end = MPI_Wtime();
total_compute_time += (compute_end - compute_start);

// ========== 计算阶段:新残差内积 ==========
compute_start = MPI_Wtime();

double rho_new_local = 0.0;
for (int i = 0; i < n; i++) rho_new_local += r[i] * r[i];

compute_end = MPI_Wtime();
total_compute_time += (compute_end - compute_start);

// ========== 通信阶段:Allreduce 规约残差 ==========
comm_start = MPI_Wtime();

double rho_new;
MPI_Allreduce(&rho_new_local, &rho_new, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);

comm_end = MPI_Wtime();
total_comm_time += (comm_end - comm_start);

// 7. 收敛判断
if (sqrt(rho_new) < tol) {
converged = true;
if (rank == 0) cout << "Converged at iteration " << iter + 1 << endl;
break;
}

// ========== 计算阶段:更新 p ==========
compute_start = MPI_Wtime();

double beta = rho_new / rho_old;
for (int i = 0; i < n; i++) {
p[i] = r[i] + beta * p[i];
}
rho_old = rho_new;

compute_end = MPI_Wtime();
total_compute_time += (compute_end - compute_start);
}

double end_time = MPI_Wtime();
double total_time = end_time - start_time;

if (rank == 0) {
// 验证结果
vector<double> Ax(n, 0.0);
for (int i = 0; i < n; i++) {
double sum = 0.0;
for (int j = row_ptr[i]; j < row_ptr[i + 1]; j++) {
sum += values[j] * x[col_idx[j]];
}
Ax[i] = sum;
}

double residual = 0.0;
for (int i = 0; i < n; i++) {
double diff = Ax[i] - b_local[i];
residual += diff * diff;
}
residual = sqrt(residual);

cout << "\n========== 结果 ==========" << endl;
cout << "迭代次数: " << iter + 1 << endl;
cout << "总计算时间: " << total_time << " 秒" << endl;
cout << " - 纯计算时间: " << total_compute_time << " 秒 ("
<< (total_compute_time/total_time*100) << "%)" << endl;
cout << " - 通信时间: " << total_comm_time << " 秒 ("
<< (total_comm_time/total_time*100) << "%)" << endl;
cout << "平均每次迭代:" << endl;
cout << " - 计算: " << total_compute_time/(iter+1) << " 秒" << endl;
cout << " - 通信: " << total_comm_time/(iter+1) << " 秒" << endl;
cout << "解向量前10个元素: ";
for (int i = 0; i < min(10, n); i++) cout << x[i] << " ";
cout << endl;
cout << "==========================" << endl;
}

MPI_Finalize();
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/bin/bash
#PBS -N sparse_mpi
#PBS -l nodes=2:ppn=4
#PBS -j oe

cd $PBS_O_WORKDIR

# 设置 OpenMPI 路径
export PATH=/usr/lib64/openmpi/bin:$PATH
export LD_LIBRARY_PATH=/usr/lib64/openmpi/lib:$LD_LIBRARY_PATH

echo "Nodes: $(sort -u $PBS_NODEFILE | tr '\n' ' ')"
echo "Total processes: $(cat $PBS_NODEFILE | wc -l)"
echo "PPN: $(($(cat $PBS_NODEFILE | wc -l) / $(sort -u $PBS_NODEFILE | wc -l)))"

procs=$(cat $PBS_NODEFILE | wc -l)

mpirun -np $procs ./sparse.o matrix.txt vector.txt
  1. 编译
1
2
module load mpi
mpic++ -o sparse.o sparse.cpp
  1. 相关生成数据的脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# -*- coding: utf-8 -*-
import argparse
import os
import sys
import math

def generate_11diagonal(n):
"""
生成十一对角矩阵(每行最多11个非零元)
带宽为5,适合模拟更复杂的物理问题
"""
values = []
col_idx = []
row_ptr = [0]

for i in range(n):
# 左五对角线 (i-5)
if i > 4:
values.append(-0.1)
col_idx.append(i - 5)

# 左四对角线 (i-4)
if i > 3:
values.append(-0.15)
col_idx.append(i - 4)

# 左三对角线 (i-3)
if i > 2:
values.append(-0.2)
col_idx.append(i - 3)

# 左二对角线 (i-2)
if i > 1:
values.append(-0.3)
col_idx.append(i - 2)

# 左对角线 (i-1)
if i > 0:
values.append(-0.5)
col_idx.append(i - 1)

# 主对角线
values.append(6.0)
col_idx.append(i)

# 右对角线 (i+1)
if i < n - 1:
values.append(-0.5)
col_idx.append(i + 1)

# 右二对角线 (i+2)
if i < n - 2:
values.append(-0.3)
col_idx.append(i + 2)

# 右三对角线 (i+3)
if i < n - 3:
values.append(-0.2)
col_idx.append(i + 3)

# 右四对角线 (i+4)
if i < n - 4:
values.append(-0.15)
col_idx.append(i + 4)

# 右五对角线 (i+5)
if i < n - 5:
values.append(-0.1)
col_idx.append(i + 5)

row_ptr.append(len(values))

return values, col_idx, row_ptr

def generate_banded_11diag(n):
"""
生成带状十一对角矩阵(每行固定11个非零元,边界除外)
"""
values = []
col_idx = []
row_ptr = [0]

bandwidth = 5 # 半带宽为5,总带宽11

for i in range(n):
for j in range(max(0, i-bandwidth), min(n, i+bandwidth+1)):
if i == j:
values.append(10.0)
else:
dist = abs(i - j)
val = -1.0 / (dist + 0.5)
values.append(val)
col_idx.append(j)
row_ptr.append(len(values))

return values, col_idx, row_ptr

def save_csr_matrix(n, values, col_idx, row_ptr, filename):
nnz = len(values)
with open(filename, 'w') as f:
f.write("{}\n".format(n))
f.write("{}\n".format(nnz))

for i, ptr in enumerate(row_ptr):
f.write(str(ptr))
if i < n:
f.write(" ")
f.write("\n")

for i, idx in enumerate(col_idx):
f.write(str(idx))
if i < nnz - 1:
f.write(" ")
f.write("\n")

for i, val in enumerate(values):
f.write("{:.10f}".format(val))
if i < nnz - 1:
f.write(" ")
f.write("\n")

print("Saved to {}, size: {}x{}, nnz: {}".format(filename, n, n, nnz))

def save_vector(b, filename):
with open(filename, 'w') as f:
for i, val in enumerate(b):
f.write("{:.10f}".format(val))
if i < len(b) - 1:
f.write(" ")
f.write("\n")
print("Saved to {}, size: {}".format(filename, len(b)))

def main():
parser = argparse.ArgumentParser(description='Generate sparse matrix with ~11 nonzeros per row')
parser.add_argument('--size', '-n', type=int, default=500000,
help='Matrix dimension (default: 500000)')
parser.add_argument('--type', '-t',
choices=['11diag', 'banded11'],
default='11diag',
help='Matrix type: 11diag (11-diagonal), banded11 (banded)')
args = parser.parse_args()

n = args.size

print("="*60)
print("Generating {} matrix: {}x{}".format(args.type, n, n))
print("="*60)

if args.type == '11diag':
values, col_idx, row_ptr = generate_11diagonal(n)
elif args.type == 'banded11':
values, col_idx, row_ptr = generate_banded_11diag(n)
else:
print("Unknown type")
return

nnz = len(values)
avg_per_row = nnz / float(n)
print("nnz = {}".format(nnz))
print("Average nonzeros per row: {:.1f}".format(avg_per_row))

# 生成精确解 x = [1,1,...,1]
print("Generating exact solution x = [1,1,...,1]...")
x_true = [1.0] * n

# 计算 b = A * x
print("Computing RHS vector b (this may take a while)...")
b = [0.0] * n
for i in range(n):
s = 0.0
for j in range(row_ptr[i], row_ptr[i+1]):
s += values[j] * x_true[col_idx[j]]
b[i] = s

if n > 100000 and i % 50000 == 0 and i > 0:
print(" Processed {}/{} rows...".format(i, n))

# 保存文件(固定文件名 matrix.txt 和 vector.txt)
print("Saving files...")
save_csr_matrix(n, values, col_idx, row_ptr, "matrix.txt")
save_vector(b, "vector.txt")

# 文件大小
for f in ["matrix.txt", "vector.txt"]:
if os.path.exists(f):
size_mb = os.path.getsize(f) / (1024.0 * 1024.0)
print(" {}: {:.2f} MB".format(f, size_mb))

print("="*60)
print("Done!")
print("\n运行命令:")
print(" mpirun -np 4 ./sparse.o matrix.txt vector.txt")

if __name__ == "__main__":
1
2
# 生成50万维十一对角矩阵(输出 matrix.txt 和 vector.txt)
python gen.py -n 500000 -t 11diag
  1. 提交+记录
1
2
3
qsub sparse.pbs
qstat
cat ****.o**** # 自己看生成名称是啥
  1. 先测node=1,不同ppn(1 4 8 16),再测node=2
  2. 关于数据:纯计算时间,通信时间,总时间,加速比,效率

实验概述

  1. 实验名称与内容
    1. 名称:MPI多进程求解稀疏线性方程组
    2. 内容:本实验采用MPI并行编程模型,实现共轭梯度法求解大规模稀疏线性方程组Ax = b
  2. 实验环境的配置参数
    (CPU/GPU型号与参数、内存容量与带宽、互联网络参数等)
    CPU:2Intel(R) Xeon(R) Gold 5218
    内存:256G
    硬盘:3
    600G,使用RAID5磁盘阵列技术,可用容量为1.2T
    操作系统:CentOS7
    编译环境:GCC4.8.5、OpenMPI 1.10.7
  3. 实验题目问题分析
    1. 本实验主要求解的是科学计算密集型问题:稀疏矩阵向量乘、向量点积、向量更新
    2. 并行化方案:
      1. 行快划分:将矩阵的N行连续划分给t个线程,每个线程处理连续的行块,数据局部性好,缓存命中率高,行数均匀分配,负载均衡,实现简单,无复杂通信,但点积需要全局规约(屏障同步)
      2. 循环划分:将矩阵行按循环方式分配给线程,负载更均衡,但是数据局部性差,缓存命中率低
      3. 任务池:动态分配行块,线程完成后获取新任务,负载自动均衡,但实现复杂,有任务分配开销
    3. 本实验采用了行快划分
  4. 方案设计
    1. 大概思路:本实验采用Pthread多线程并行技术实现共轭梯度法求解稀疏线性方程组。首先,主线程解析命令行参数获取矩阵文件、向量文件和线程数,读取CSR格式的稀疏矩阵A和右端向量b,并初始化解向量x=0、残差r=b、搜索方向p=b。然后,将矩阵的N行均匀分配给t个线程,计算每个线程的起始行和结束行,创建t个子线程并分配各自的行块范围。在每次迭代中,各子线程并行计算自己行块的SpMV(Ap=A*p)和局部点积(r•r和p•Ap),并行更新自己范围的x、r、p向量;主线程通过屏障同步收集各线程的局部点积,计算全局α和β,确保所有线程使用相同的α和β进行更新,并检查残差是否小于容差以判断收敛。最后,主线程输出运行时间等结果
    2. 流程图
    3. 伪代码
  5. 实现方法
    1. sparse.cpp
    2. sparse_mpi.pbs
  6. 结果分析
    1. 数据
    2. 理论性能分析:共轭梯度法每次迭代的计算量主要来自稀疏矩阵向量乘(SpMV,约占总计算量的85%)、向量点积(约10%)和向量更新(约5%)。根据阿姆达尔定律,本问题可并行部分约占98%,理想加速比4进程为3.88,8进程为7.27,16进程为13.91。实际加速比受限于并行开销,包括同步开销、通信延迟和负载均衡,单节点通信通过共享内存实现,延迟极低(纳秒级),带宽高(约50GB/s)。双节点通信通过千兆以太网,延迟高(约50微秒),带宽低(约1GB/s),导致跨节点通信开销远大于单节点
      3.实验结果分析:单节点结果:随着线程数从1增加到16,求解时间从0.855秒降至0.310秒,获得2.76倍加速比。通信占比从2.0%上升到12.2%,始终较低,说明共享内存通信效率高。4线程加速比2.18,8线程2.51,16线程2.76,边际收益递减,推荐使用4-8线程。双节点结果:随着进程数从2增加到32,总时间从7.619秒增加到13.527秒,反而变慢。通信占比高达91.7%-97.4%,加速比均小于1(32进程仅0.56)。跨节点通信开销完全主导了计算时间。对比分析:单节点16线程仅需0.310秒,双节点16进程需10.5秒,慢约33倍。单节点4线程需0.392秒,双节点8进程需9.7秒,慢约24倍。这说明跨节点通信延迟和带宽限制使CG算法无法获得加速。结论:对于百万维稀疏线性方程组,单节点共享内存并行有效(16线程加速2.76倍),双节点跨节点并行因通信开销过大而失效
  7. 个人总结