实验操作

  1. 登录集群
1
2
ssh -p 9922 用户名@172.28.9.54
cd data
  1. 创建作业文件
1
2
cat > sparse.cpp
cat > sparse.pbs
  1. 分别粘贴作业代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
#include <iostream>
#include <fstream>
#include <cmath>
#include <pthread.h>
#include <cstring>
#include <cstdlib>
#include <iomanip>
#include <sys/time.h>

using namespace std;

// CSR 格式稀疏矩阵
double *values;
int *col_idx;
int *row_ptr;
int n;
int nnz;

// 向量
double *b;
double *x;
double *r;
double *p;
double *Ap;

// 迭代控制参数
int max_iter = 10000;
double tol = 1e-6;

// 多线程相关
int num_threads;
pthread_t *threads;
pthread_barrier_t barrier;

// 线程局部数据
double *local_rho_array;
double *local_pAp_array;

// 行划分
int *start_row;
int *end_row;

// 全局迭代变量
double rho;
double rho_old;
double alpha;
double beta;
int iter_count;
bool converged;

// 时间统计
double total_time;
double spmv_time;
double dot_time;
double update_time;

void read_matrix(const char *mat_file, const char *vec_file);
void init_vectors();
void partition_rows();
void* cg_thread_work(void *arg);
void print_results();
void free_memory();

void read_matrix(const char *mat_file, const char *vec_file) {
ifstream fin(mat_file);
if (!fin.is_open()) {
cerr << "Error: Cannot open matrix file: " << mat_file << endl;
exit(1);
}

fin >> n >> nnz;
values = new double[nnz];
col_idx = new int[nnz];
row_ptr = new int[n + 1];

for (int i = 0; i < nnz; i++) fin >> values[i];
for (int i = 0; i < nnz; i++) fin >> col_idx[i];
for (int i = 0; i <= n; i++) fin >> row_ptr[i];
fin.close();

ifstream fvec(vec_file);
if (!fvec.is_open()) {
cerr << "Error: Cannot open vector file: " << vec_file << endl;
exit(1);
}

b = new double[n];
for (int i = 0; i < n; i++) fvec >> b[i];
fvec.close();
}

void init_vectors() {
x = new double[n]();
r = new double[n];
p = new double[n];
Ap = new double[n];

for (int i = 0; i < n; i++) {
r[i] = b[i];
p[i] = b[i];
x[i] = 0.0;
}

rho = 0.0;
for (int i = 0; i < n; i++) rho += r[i] * r[i];

cout << "初始残差范数: " << sqrt(rho) << endl;

if (sqrt(rho) < tol) {
converged = true;
} else {
converged = false;
}

iter_count = 0;
}

void partition_rows() {
start_row = new int[num_threads];
end_row = new int[num_threads];

int rows_per_thread = n / num_threads;
int remainder = n % num_threads;
int current = 0;

for (int i = 0; i < num_threads; i++) {
start_row[i] = current;
current += rows_per_thread + (i < remainder ? 1 : 0);
end_row[i] = current;
}
}

void* cg_thread_work(void *arg) {
int tid = *(int*)arg;
int start = start_row[tid];
int end = end_row[tid];

struct timeval tv_start, tv_end;

while (!converged && iter_count < max_iter) {
// 1. 所有线程并行计算 SpMV: Ap = A * p
for (int i = start; i < end; i++) {
double sum = 0.0;
for (int j = row_ptr[i]; j < row_ptr[i + 1]; j++) {
sum += values[j] * p[col_idx[j]];
}
Ap[i] = sum;
}
pthread_barrier_wait(&barrier);

// 2. 并行计算点积
double local_pAp_val = 0.0;
double local_rho_val = 0.0;

for (int i = start; i < end; i++) {
local_pAp_val += p[i] * Ap[i];
local_rho_val += r[i] * r[i];
}
// 存储局部结果
local_pAp_array[tid] = local_pAp_val;
local_rho_array[tid] = local_rho_val;
pthread_barrier_wait(&barrier);

// 主线程汇总并计算参数
if (tid == 0) {
double pap = 0.0;
double rho_new = 0.0;

for (int i = 0; i < num_threads; i++) {
pap += local_pAp_array[i];
rho_new += local_rho_array[i];
}
// 数值稳定性检查
if (pap <= 0) {
pap = 1e-15;
}
// 保存旧残差内积
rho_old = rho;
rho = rho_new;
// 计算步长 alpha = rho / (p·Ap)
alpha = rho / pap;
// 检查收敛
double residual_norm = sqrt(rho);
if (residual_norm < tol) {
converged = true;
}
// 计算 beta
if (rho_old > 0 && !converged) {
beta = rho / rho_old;
} else {
beta = 0.0;
}
iter_count++;
}
pthread_barrier_wait(&barrier);

if (converged) break;

// 3. 并行更新 x 和 r
for (int i = start; i < end; i++) {
x[i] += alpha * p[i];
r[i] -= alpha * Ap[i];
}
pthread_barrier_wait(&barrier);

// 4. 并行更新 p = r + beta * p
for (int i = start; i < end; i++) {
p[i] = r[i] + beta * p[i];
}

pthread_barrier_wait(&barrier);
}

return NULL;
}

// 打印结果
void print_results() {
cout << "迭代" << iter_count << " 次后达到收敛" << endl;

double final_residual = sqrt(rho);
if (!isnan(final_residual) && !isinf(final_residual)) {
cout << "最终残差范数: " << final_residual << endl;
} else {
cout << "Final residual norm: Numerical issue detected" << endl;
}

// 打印前10个解
cout << "解向量(前10个):" << endl;
int print_n = (n < 10) ? n : 10;
for (int i = 0; i < print_n; i++) {
if (!isnan(x[i]) && !isinf(x[i])) {
cout << "x[" << i << "] = " << scientific << setprecision(8) << x[i] << endl;
}
}

// 打印性能统计
cout << "Total time: " << total_time << " s" << endl;
if (spmv_time > 0) {
cout << "SpMV time: " << spmv_time << " seconds ("
<< (spmv_time / total_time * 100) << "%)" << endl;
}
if (dot_time > 0) {
cout << "Dot product time: " << dot_time << " seconds ("
<< (dot_time / total_time * 100) << "%)" << endl;
}
if (update_time > 0) {
cout << "Vector update time: " << update_time << " seconds ("
<< (update_time / total_time * 100) << "%)" << endl;
}
}

void free_memory() {
delete[] values;
delete[] col_idx;
delete[] row_ptr;
delete[] b;
delete[] x;
delete[] r;
delete[] p;
delete[] Ap;
delete[] start_row;
delete[] end_row;
delete[] threads;
delete[] local_rho_array;
delete[] local_pAp_array;
}

int main(int argc, char *argv[]) {
if (argc != 4) {
cerr << "Usage: " << argv[0] << " matrix.txt vector.txt num_threads" << endl;
return 1;
}

const char *mat_file = argv[1];
const char *vec_file = argv[2];
num_threads = atoi(argv[3]);

if (num_threads <= 0) {
cerr << "Error: num_threads must be positive" << endl;
return 1;
}
cout << "线程数: " << num_threads << endl;

read_matrix(mat_file, vec_file);
init_vectors();
partition_rows();

// 分配线程相关内存
threads = new pthread_t[num_threads];
local_rho_array = new double[num_threads];
local_pAp_array = new double[num_threads];

// 初始化屏障
pthread_barrier_init(&barrier, NULL, num_threads);

// 初始化时间统计
total_time = 0.0;
spmv_time = 0.0;
dot_time = 0.0;
update_time = 0.0;

// 创建线程
int *tids = new int[num_threads];
struct timeval tv_start, tv_end;
gettimeofday(&tv_start, NULL);

for (int i = 0; i < num_threads; i++) {
tids[i] = i;
if (pthread_create(&threads[i], NULL, cg_thread_work, &tids[i]) != 0) {
cerr << "Error: Failed to create thread " << i << endl;
return 1;
}
}

// 等待所有线程完成
for (int i = 0; i < num_threads; i++) {
pthread_join(threads[i], NULL);
}

gettimeofday(&tv_end, NULL);
total_time = (tv_end.tv_sec - tv_start.tv_sec) +
(tv_end.tv_usec - tv_start.tv_usec) / 1000000.0;

// 打印结果
print_results();

// 清理资源
free_memory();
delete[] tids;
pthread_barrier_destroy(&barrier);

return 0;
}
1
2
3
4
5
6
7
8
#! /bin/bash
#PBS -N sparse
#PBS -l nodes=1:ppn=1
#PBS -j oe

cd $PBS_O_WORKDIR
procs=$(cat $PBS_NODEFILE | wc -l)
./sparse.o matrix.txt vector.txt $procs &> run.log
  1. 编译
1
g++ -pthread -o sparse.o sparse.cpp
  1. 准备输入指导书的数据文件
1
2
3
4
5
6
cat > matrix.txt << 'EOF'
3 7
3.0 1.0 1.0 4.0 1.0 1.0 5.0
0 1 0 1 2 1 2
0 2 5 7
EOF
1
2
3
cat > vector.txt << 'EOF'
4.0 6.0 6.0
EOF
  1. 提交至作业集群并查看(这次sparse.o*为空)
1
2
3
qsub sparse.pbs
qstat
cat run.log

记录迭代次数和计算时间

  1. 更改ppn值为4 8 16,重复步骤7并记录(每线程测一次即可)

    不更改nodes值,因为此实验为多线程并行计算
    发现如下,很明显不符合并行计算期望的性能提升
    ppn=1 ?次 ?s
    ppn=4
    ppn=8
    ppn=16

  2. 可能是数据量太小,于是乎用脚本生成数据量更大的样例(200000)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
cat > gen_matrix.py << 'EOF'
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import numpy as np
import argparse
from scipy.sparse import diags, csr_matrix
import os

def generate_tridiagonal(n):
print(f"Generating {n}x{n} tridiagonal matrix...")
main_diag = 4.0 * np.ones(n)
off_diag = -1.0 * np.ones(n - 1)
A = diags([main_diag, off_diag, off_diag], [0, -1, 1], format='csr')
return A

def save_csr_matrix(A, filename):
n = A.shape[0]
nnz = A.nnz
values = A.data
col_idx = A.indices
row_ptr = A.indptr

with open(filename, 'w') as f:
f.write(f"{n} {nnz}\n")

for i, v in enumerate(values):
f.write(f"{v:.10f}")
if i < nnz - 1:
f.write(" ")
f.write("\n")

for i, idx in enumerate(col_idx):
f.write(f"{idx}")
if i < nnz - 1:
f.write(" ")
f.write("\n")

for i, ptr in enumerate(row_ptr):
f.write(f"{ptr}")
if i < n:
f.write(" ")
f.write("\n")

print(f"Saved to {filename}, size: {n}x{n}, nnz: {nnz}")

def save_vector(b, filename):
with open(filename, 'w') as f:
for i, val in enumerate(b):
f.write(f"{val:.10f}")
if i < len(b) - 1:
f.write(" ")
f.write("\n")
print(f"Saved to {filename}, size: {len(b)}")

def main():
parser = argparse.ArgumentParser()
parser.add_argument('--size', '-n', type=int, default=1000)
parser.add_argument('--output', '-o', default='matrix')
args = parser.parse_args()

print("="*60)
print(f"Generating {args.size}x{args.size} matrix")
print("="*60)

A = generate_tridiagonal(args.size)

x_true = np.ones(args.size)
b = A.dot(x_true)

save_csr_matrix(A, f"{args.output}.txt")
save_vector(b, f"{args.output}_b.txt")

for f in [f"{args.output}.txt", f"{args.output}_b.txt"]:
if os.path.exists(f):
size_mb = os.path.getsize(f) / (1024*1024)
print(f" {f}: {size_mb:.2f} MB")

print("="*60)
print("Done!")

if __name__ == "__main__":
main()
EOF

python gen_matrix.py --size 200000 --output matrix # 运行脚本
mv matrix_b.txt vector.txt
  1. 提交至作业集群并查看(这次sparse.o*为空)
1
2
3
qsub sparse.pbs
qstat
cat run.log

记录迭代次数和计算时间,重复10次取平均值

  1. 更改ppn值为4 8 16,重复步骤10

实验概述

  1. 实验名称与内容
    多线程求解稀疏线性方程组
    本实验要求使Pthread多线程并行技术,实现共轭梯度法求解对称正定稀疏线性方程组 Ax = b
    原理:(自己粘图)
  2. 实验环境的配置参数:
    CPU:2Intel(R) Xeon(R) Gold 5218
    内存:256G
    硬盘:3
    600G,使用RAID5磁盘阵列技术,可用容量为1.2T
    操作系统:CentOS7
    编译环境:GCC4.8.5、OpenMPI 1.10.7
  3. 实验题目问题分析
    1. 科学计算密集型问题,主要计算包括稀疏矩阵向量乘(SpMV)、向量点积和向量更新,均具有数据并行特征,可采用数据划分并行策略
    2. 采用行块划分,将矩阵的N行连续划分给t个线程,每个线程处理连续的行块
  4. 方案设计
    1. 大概思路:
      1. 输入矩阵文件、向量文件和线程数,主线程解析并存储参数
      2. 主线程读取CSR格式的稀疏矩阵A和右端向量b,初始化解向量x=0、残差r=b、搜索方向p=b,
      3. 将矩阵的N行均匀分配给t个线程,计算每个线程的起始行和结束行
      4. 创建t个子线程,每个线程获得自己的行块范围
      5. 子线程进行局部计算(各线程并行计算自己行块的SpMV、局部点积,并行更新自己范围的x、r、p向量)
      6. 主线程收集各线程的局部点积,计算全局α和β(通过屏障同步,确保所有线程使用相同的 α 和 β 进行更新)
      7. 主线程检查残差是否小于容差,若未收敛则继续迭代
      8. 主线程输出迭代次数、最终残差、解向量和运行时间
    2. 流程图
    3. 伪代码
  5. 实现方法:
    1. sparse.cpp(粘)
    2. sparse.pbs(粘):
      1. 申请1个节点,每个节点16个CPU核心
      2. 标准输出和错误输出合并到同一个文件
      3. 切换到提交作业时的目录
      4. 计算分配的CPU核心数
      5. 运行程序,将输出重定向到run.log
  6. 结果分析
    1. 数据(run.log截图、time、迭代次数、加速比以及加速比曲线图)
    2. 理论性能分析:本实验中,可并行部分(SpMV、点积、向量更新)占总计算量的约95%,不可并行部分(同步、汇总)约占5%。根据阿姆达尔定律:(根据实际结果自己计算)
      4线程:理论加速比 倍,效率
      8线程:理论加速比 倍,效率
      16线程:理论加速比 倍,效率
    3. 实验结果分析:
      1. 3*3小矩阵:4线程加速比仅为 ,8线程降至 ,16线程更是低至 。加速比远小于1,且随线程数增加而急剧下降。可能原因:问题规模太小,计算量远小于并行开销
      2. 20w阶大矩阵:4线程达到 倍,8线程达到 倍,16线程达到 倍。加速比随线程数增加而持续提升,说明计算量足够大时,并行能够带来明显的性能收益
      3. 小矩阵的并行效率几乎为零,因为计算时间远小于开销。大矩阵的并行效率虽然随线程数增加而递减,但整体仍然有效,说明存在一个临界规模,只有当问题规模超过该临界值时,并行才能带来正收益。
  7. 个人总结(自己用ai跑一下)