-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathsimple_read.cpp
More file actions
175 lines (142 loc) · 5.96 KB
/
Copy pathsimple_read.cpp
File metadata and controls
175 lines (142 loc) · 5.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
//
// MPI / InfiniBand Verbs simple read demo
//
// Run on Sampa cluster with command like:
// make && srun --label --nodes=2 --ntasks-per-node=3 ./simple_read
//
#include "MPIConnection.hpp"
#include "Verbs.hpp"
#include "SymmetricMemoryRegion.hpp"
#include "SymmetricAllocator.hpp"
#include <cstring>
#include <sys/types.h>
#include <unistd.h>
int main( int argc, char * argv[] ) {
// set up MPI communication between all processes in job
MPIConnection mpi( &argc, &argv );
// set up IBVerbs queue pairs between all processes in job
Verbs verbs( mpi );
// set up symmetric allocator
SymmetricAllocator allocator( mpi );
#ifdef VERBOSE
std::cout << "hostname " << mpi.hostname()
<< " MPI rank " << mpi.rank
<< " ranks " << mpi.ranks
<< " locale " << mpi.locale
<< " locales " << mpi.locales
<< " locale rank " << mpi.locale_rank
<< " locale ranks " << mpi.locale_size
<< " pid " << getpid()
<< "\n";
#endif
//
// verify reads are working by writing a value from an array on
// every rank. Rank n will read the product n*m from the nth
// location of an array on rank m.
//
// create space to store data for remote ranks
//
// There are at least three options for doing this. I'll illustrate
// two here.
//
// First, we can allocate the data statically with a fixed size so
// that it's at the same base address on every core, like this:
// static int64_t remote_rank_data[ 1 << 20 ]; // 2^20 endpoints should be enough. :-)
// SymmetricMemoryRegion source_mr( verbs, &remote_rank_data[0], sizeof(remote_rank_data) ); // register allocated memory
// Second, we can use my symmetric allocator code from Grappa to
// dynamically allocate space at the same address on all cores.
int64_t * remote_rank_data = allocator.alloc< int64_t >( mpi.size );
SymmetricMemoryRegion source_mr( verbs, &remote_rank_data[0], sizeof(int64_t) * mpi.size ); // register allocated memory
// The third option would be to allocate memory using normal
// mechanisms on all cores, without caring what addresses the blocks
// were allocated at, and then communicate each base addresses to
// all other cores so that they can form a proper virtual address
// for whatever core they're writing to. This gets a little ugly, so
// I'm not doing it here.
// initialize array
for( int64_t i = 0; i < mpi.size; ++i ) {
remote_rank_data[i] = i * mpi.rank;
}
// ensure initialization is done before anybody reads anything
mpi.barrier();
#ifdef VERBOSE
std::cout << "Base address of remote_rank_data is " << &remote_rank_data[0] << std::endl;
#endif
// create storage for destination data
int64_t my_data;
// we can use the SymmetricMemoryRegion wrapper since all the cores
// are executing this right now; since this is for local access
// only, we'll just ignore the rkeys it exchanges.
//SymmetricMemoryRegion dest_mr( verbs, &my_data, sizeof(my_data) );
// alternatively, you could ignore my SymmetricMemoryRegion wrapper
// and call the Verbs memory region registration wrapper instead
ibv_mr * dest_mr_p = verbs.register_memory_region( &my_data, sizeof(my_data) );
// assume that we'll get all values correctly; check as we receive them in the loop below.
bool pass = true;
// write our rank data to remote ranks, one at a time
for( int i = 0; i < mpi.size; ++i ) {
// reset destination location to wrong value to detect error
my_data = -1;
// point scatter/gather element at destination data
ibv_sge sge;
std::memset(&sge, 0, sizeof(sge));
// code that goes with SymmetricMemoryRegion above
//sge.addr = (uintptr_t) dest_mr.base();
//sge.length = dest_mr.size();
//sge.lkey = dest_mr.lkey();
// code that goes with ibv_mr allocation instead of SymmetricMemoryRegion above
sge.addr = (uintptr_t) &my_data;
sge.length = sizeof(my_data);
sge.lkey = dest_mr_p->lkey; // local memory region key
// create work request for RDMA read
ibv_send_wr wr;
std::memset(&wr, 0, sizeof(wr));
wr.wr_id = i; // unused here
wr.next = nullptr; // only one send WR in this linked list
wr.sg_list = &sge;
wr.num_sge = 1;
wr.imm_data = 0; // unused here
wr.opcode = IBV_WR_RDMA_READ;
wr.send_flags = IBV_SEND_SIGNALED; // create completion queue entry once this operation has completed
wr.wr.rdma.remote_addr = (uintptr_t) &remote_rank_data[ mpi.rank ]; // read from this rank's slot of remote array
wr.wr.rdma.rkey = source_mr.rkey( i );
// hand WR to library/card to send
verbs.post_send( i, &wr );
// wait until WR is complete before continuing.
//
// If you don't want to wait, you must ensure that 1) source data
// is unchanged until the WR has completed, and 2) you don't post
// WRs too fast for the card.
while( !verbs.poll() ) {
; // poll until we get a completion queue entry
}
// check that we read the right value
int64_t expected_value = i * mpi.rank;
if( expected_value != my_data ) {
pass = false;
std::cout << "Rank " << mpi.rank
<< " got bad data from rank " << i
<< ": expected " << expected_value
<< ", got " << remote_rank_data[i]
<< std::endl;
}
}
// wait for everyone to finish all reads
mpi.barrier();
// Use MPI reduction operation to AND together all ranks' "pass" value.
bool overall_pass = false;
MPI_CHECK( MPI_Reduce( &pass, &overall_pass, 1, MPI_C_BOOL,
MPI_LAND, // logical and
0, // destination rank
mpi.main_communicator_ ) );
// have one rank check the reduced value
if( 0 == mpi.rank ){
if( overall_pass ) {
std::cout << "PASS: All ranks received correct data." << std::endl;
} else {
std::cout << "FAIL: Some rank(s) received incorrect data!" << std::endl;
}
}
mpi.finalize();
return 0;
}