Tpetra parallel linear algebra Version of the Day
Loading...
Searching...
No Matches
Tpetra_Details_Ialltofewv.cpp
1// @HEADER
2// *****************************************************************************
3// Tpetra: Templated Linear Algebra Services Package
4//
5// Copyright 2008 NTESS and the Tpetra contributors.
6// SPDX-License-Identifier: BSD-3-Clause
7// *****************************************************************************
8// @HEADER
9
10#include "Tpetra_Details_Ialltofewv.hpp"
11
12#include <algorithm>
13#include <cmath>
14#include <vector>
15
16#include <mpi.h>
17#include <Kokkos_Core.hpp>
18
19#ifndef NDEBUG
20#include <iostream>
21#include <sstream>
22#endif
23
24namespace {
25
26struct ProfilingRegion {
27 ProfilingRegion() = delete;
28 ProfilingRegion(const ProfilingRegion &other) = delete;
29 ProfilingRegion(ProfilingRegion &&other) = delete;
30
31 ProfilingRegion(const std::string &name) {
32 Kokkos::Profiling::pushRegion(name);
33 }
34 ~ProfilingRegion() {
35 Kokkos::Profiling::popRegion();
36 }
37};
38
39struct MemcpyArg {
40 void *dst;
41 void *src;
42 size_t count;
43};
44
45template <typename T>
46KOKKOS_INLINE_FUNCTION bool is_compatible(const MemcpyArg &arg) {
47 return (0 == (uintptr_t(arg.dst) % sizeof(T))) && (0 == (uintptr_t(arg.src) & sizeof(T))) && (0 == (arg.count % sizeof(T)));
48}
49
50template <typename T, typename Member>
51KOKKOS_INLINE_FUNCTION void team_memcpy_as(const Member &member, void *dst, void *const src, size_t count) {
52 Kokkos::parallel_for(
53 Kokkos::TeamThreadRange(member, count),
54 [&](size_t i) {
55 reinterpret_cast<T *>(dst)[i] = reinterpret_cast<T const *>(src)[i];
56 });
57}
58
59template <typename Member>
60KOKKOS_INLINE_FUNCTION void team_memcpy(const Member &member, MemcpyArg &arg) {
61 if (is_compatible<uint64_t>(arg)) {
62 team_memcpy_as<uint64_t>(member, arg.dst, arg.src, arg.count / sizeof(uint64_t));
63 } else if (is_compatible<uint32_t>(arg)) {
64 team_memcpy_as<uint32_t>(member, arg.dst, arg.src, arg.count / sizeof(uint32_t));
65 } else {
66 team_memcpy_as<uint8_t>(member, arg.dst, arg.src, arg.count);
67 }
68}
69
70} // namespace
71
72namespace Tpetra::Details {
73
74struct Ialltofewv::Cache::impl {
75 impl()
76 : rootBufDev("rootBufDev", 0)
77 , rootBufHost("rootBufHost", 0)
78 , aggBufDev("aggBufDev", 0)
79 , aggBufHost("rootBufHost", 0)
80 , argsDev("argsDev", 0)
81 , argsHost("argsHost", 0)
82 , rootBufGets_(0)
83 , rootBufHits_(0)
84 , aggBufGets_(0)
85 , aggBufHits_(0)
86 , argsGets_(0)
87 , argsHits_(0)
88 , rootBufDevSize_(0)
89 , aggBufDevSize_(0)
90 , argsDevSize_(0)
91 , argsHostSize_(0)
92 , rootBufHostSize_(0)
93 , aggBufHostSize_(0) {}
94
95 // cached views
96 Kokkos::View<uint8_t *, typename Kokkos::DefaultExecutionSpace::memory_space> rootBufDev;
97 Kokkos::View<uint8_t *, typename Kokkos::DefaultHostExecutionSpace::memory_space> rootBufHost;
98 Kokkos::View<char *, typename Kokkos::DefaultExecutionSpace::memory_space> aggBufDev;
99 Kokkos::View<char *, typename Kokkos::DefaultHostExecutionSpace::memory_space> aggBufHost;
100 Kokkos::View<MemcpyArg *, typename Kokkos::DefaultExecutionSpace::memory_space> argsDev;
101 Kokkos::View<MemcpyArg *, typename Kokkos::DefaultHostExecutionSpace::memory_space> argsHost;
102
103 size_t rootBufGets_;
104 size_t rootBufHits_;
105 size_t aggBufGets_;
106 size_t aggBufHits_;
107 size_t argsGets_;
108 size_t argsHits_;
109
110 size_t rootBufDevSize_, aggBufDevSize_;
111 size_t argsDevSize_, argsHostSize_;
112 size_t rootBufHostSize_, aggBufHostSize_;
113
114 template <typename ExecSpace>
115 auto get_rootBuf(size_t size) {
116 ++rootBufGets_;
117 if constexpr (std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
118 if (rootBufDev.extent(0) < size) {
119 Kokkos::resize(Kokkos::WithoutInitializing, rootBufDev, size);
120 rootBufDevSize_ = size;
121 } else {
122 ++rootBufHits_;
123 }
124 return Kokkos::subview(rootBufDev, Kokkos::pair{size_t(0), size});
125 } else {
126 if (rootBufHost.extent(0) < size) {
127 Kokkos::resize(Kokkos::WithoutInitializing, rootBufHost, size);
128 rootBufHostSize_ = size;
129 } else {
130 ++rootBufHits_;
131 }
132 return Kokkos::subview(rootBufHost, Kokkos::pair{size_t(0), size});
133 }
134 }
135
136 template <typename ExecSpace>
137 auto get_aggBuf(size_t size) {
138 ++aggBufGets_;
139 if constexpr (std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
140 if (aggBufDev.extent(0) < size) {
141 Kokkos::resize(Kokkos::WithoutInitializing, aggBufDev, size);
142 aggBufHostSize_ = size;
143 } else {
144 ++aggBufHits_;
145 }
146 return Kokkos::subview(aggBufDev, Kokkos::pair{size_t(0), size});
147 } else {
148 if (aggBufHost.extent(0) < size) {
149 Kokkos::resize(Kokkos::WithoutInitializing, aggBufHost, size);
150 aggBufHostSize_ = size;
151 } else {
152 ++aggBufHits_;
153 }
154 return Kokkos::subview(aggBufHost, Kokkos::pair{size_t(0), size});
155 }
156 }
157
158 template <typename ExecSpace>
159 auto get_args(size_t size) {
160 ++argsGets_;
161 if constexpr (std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
162 if (argsDev.extent(0) < size) {
163 Kokkos::resize(Kokkos::WithoutInitializing, argsDev, size);
164 argsHostSize_ = size;
165 } else {
166 ++argsHits_;
167 }
168 return Kokkos::subview(argsDev, Kokkos::pair{size_t(0), size});
169 } else {
170 if (argsHost.extent(0) < size) {
171 Kokkos::resize(Kokkos::WithoutInitializing, argsHost, size);
172 argsHostSize_ = size;
173 } else {
174 ++argsHits_;
175 }
176 return Kokkos::subview(argsHost, Kokkos::pair{size_t(0), size});
177 }
178 }
179};
180
181Ialltofewv::Cache::Cache() = default;
182Ialltofewv::Cache::~Cache() = default;
183
184namespace {
185template <typename RecvExecSpace>
186int wait_impl(Ialltofewv::Req &req, Ialltofewv::Cache &cache) {
187 auto finalize = [&]() -> int {
188 req.completed = true;
189 return MPI_SUCCESS;
190 };
191
192 if (0 == req.nroots) {
193 return finalize();
194 }
195
196 ProfilingRegion pr("alltofewv::wait");
197
198 // lazy-init view cache
199 if (!cache.pimpl) {
200 cache.pimpl = std::make_shared<Ialltofewv::Cache::impl>();
201 }
202
203 const int rank = [&]() -> int {
204 int _rank;
205 MPI_Comm_rank(req.comm, &_rank);
206 return _rank;
207 }();
208
209 const int size = [&]() -> int {
210 int _size;
211 MPI_Comm_size(req.comm, &_size);
212 return _size;
213 }();
214
215 const size_t sendSize = [&]() -> size_t {
216 int _size;
217 MPI_Type_size(req.sendtype, &_size);
218 return _size;
219 }();
220
221 const size_t recvSize = [&]() -> size_t {
222 int _size;
223 MPI_Type_size(req.recvtype, &_size);
224 return _size;
225 }();
226
227 // is this rank a root? linear search - nroots expected to be small
228 const bool isRoot = std::find(req.roots, req.roots + req.nroots, rank) != req.roots + req.nroots;
229
230 const int AGG_TAG = req.tag + 0;
231 const int ROOT_TAG = req.tag + 1;
232
233 // Balance the number of incoming messages at each phase:
234 // Aggregation = size / naggs * nroots
235 // Root = naggs
236 // so
237 // size / naggs * nroots = naggs
238 // size * nroots = naggs^2
239 // naggs = sqrt(size * nroots)
240 const int naggs = std::sqrt(size_t(size) * size_t(req.nroots)) + /*rounding*/ 0.5;
241
242 // how many srcs go to each aggregator
243 const int srcsPerAgg = (size + naggs - 1) / naggs;
244
245 // the aggregator I send to
246 const int myAgg = rank / srcsPerAgg * srcsPerAgg;
247
248 // ensure aggregators know how much data each rank is sending to the root
249 // [si * nroots + r1] -> how much si'th rank in group wants to send to root ri
250 std::vector<int> groupSendCounts(size_t(req.nroots) * size_t(srcsPerAgg));
251 std::vector<MPI_Request> reqs;
252 if (rank == myAgg) {
253 reqs.reserve(srcsPerAgg);
254 // recv counts from each member of my group
255 for (int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
256 MPI_Request rreq;
257
258#ifndef NDEBUG
259 if (size_t(si) * req.nroots + req.nroots > groupSendCounts.size()) {
260 std::stringstream ss;
261 ss << __FILE__ << ":" << __LINE__
262 << " [" << rank << "] tpetra internal Ialltofewv error: OOB access in recv buffer\n";
263 std::cerr << ss.str();
264 }
265#endif
266 MPI_Irecv(&groupSendCounts[size_t(si) * size_t(req.nroots)], req.nroots, MPI_INT, si + rank,
267 req.tag, req.comm, &rreq);
268 reqs.push_back(rreq);
269 }
270 }
271 // send sendcounts to aggregator
272 MPI_Send(req.sendcounts, req.nroots, MPI_INT, myAgg, req.tag, req.comm);
273
274 MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
275 reqs.resize(0);
276
277 // at this point, in each aggregator, groupSendCounts holds the send counts
278 // from each member of the aggregator. The first nroots entries are from the first rank,
279 // the second nroots entries are from the second rank, etc
280
281 // a temporary buffer to aggregate data. Data for a root is contiguous.
282 auto aggBuf = cache.pimpl->get_aggBuf<RecvExecSpace>(0);
283 std::vector<size_t> rootCount(req.nroots, 0); // [ri] the count of data held for root ri
284 if (rank == myAgg) {
285 size_t aggBytes = 0;
286 for (int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
287 for (int ri = 0; ri < req.nroots; ++ri) {
288 int count = groupSendCounts[si * req.nroots + ri];
289 rootCount[ri] += count;
290 aggBytes += count * sendSize;
291 }
292 }
293
294 aggBuf = cache.pimpl->get_aggBuf<RecvExecSpace>(aggBytes);
295 }
296 // now, on the aggregator ranks,
297 // * aggBuf is resized to accomodate all incoming data
298 // * rootCount holds how much data i hold for each root
299
300 // Send the actual data to the aggregator
301 if (rank == myAgg) {
302 reqs.reserve(srcsPerAgg + req.nroots);
303 // receive from all ranks in my group
304 size_t displ = 0;
305 // senders will send in root order, so we will recv in that order as well
306 // this puts all data for a root contiguous in the aggregation buffer
307 for (int ri = 0; ri < req.nroots; ++ri) {
308 for (int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
309 // receive data for the ri'th root from the si'th sender
310 const int count = groupSendCounts[si * req.nroots + ri];
311 if (count) {
312#ifndef NDEBUG
313 if (displ + count * sendSize > aggBuf.size()) {
314 std::stringstream ss;
315 ss << __FILE__ << ":" << __LINE__
316 << " [" << rank << "] tpetra internal Ialltofewv error: OOB access in send buffer\n";
317 std::cerr << ss.str();
318 }
319#endif
320 MPI_Request rreq;
321 // &aggBuf(displ)
322 MPI_Irecv(aggBuf.data() + displ, count, req.sendtype, si + rank, req.tag, req.comm, &rreq);
323 reqs.push_back(rreq);
324 displ += size_t(count) * sendSize;
325 }
326 }
327 }
328 } else {
329 reqs.reserve(req.nroots); // prepare for one send per root
330 }
331
332 // send data to aggregator
333 for (int ri = 0; ri < req.nroots; ++ri) {
334 const size_t displ = size_t(req.sdispls[ri]) * sendSize;
335 const int count = req.sendcounts[ri];
336 if (count) {
337 MPI_Request sreq;
338 MPI_Isend(&reinterpret_cast<const char *>(req.sendbuf)[displ], req.sendcounts[ri],
339 req.sendtype, myAgg, AGG_TAG, req.comm, &sreq);
340 reqs.push_back(sreq);
341 }
342 }
343
344 MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
345 reqs.resize(0);
346
347 // if I am a root, receive data from each aggregator
348 // The aggregator will send contiguous data, which we may need to spread out according to rdispls
349 auto rootBuf = cache.pimpl->get_rootBuf<RecvExecSpace>(0);
350 if (isRoot) {
351 reqs.reserve(naggs); // receive from each aggregator
352
353 const size_t totalRecvd = recvSize * [&]() -> size_t {
354 size_t acc = 0;
355 for (int i = 0; i < size; ++i) {
356 acc += req.recvcounts[i];
357 }
358 return acc;
359 }();
360 rootBuf = cache.pimpl->get_rootBuf<RecvExecSpace>(totalRecvd);
361
362 // Receive data from each aggregator.
363 // Aggregators send data in order of the ranks they're aggregating,
364 // which is also the order the root needs in its recv buffer.
365 size_t displ = 0;
366 for (int aggSrc = 0; aggSrc < size; aggSrc += srcsPerAgg) {
367 // tally up the total data to recv from the sending aggregator
368 int count = 0;
369 for (int origSrc = aggSrc;
370 origSrc < aggSrc + srcsPerAgg && origSrc < size; ++origSrc) {
371 count += req.recvcounts[origSrc];
372 }
373
374 if (count) {
375 MPI_Request rreq;
376 MPI_Irecv(rootBuf.data() + displ, count, req.recvtype, aggSrc, ROOT_TAG, req.comm, &rreq);
377 reqs.push_back(rreq);
378 displ += size_t(count) * recvSize;
379 }
380 }
381 }
382
383 // if I am an aggregator, forward data to the roots
384 // To each root, send my data in order of the ranks that sent to me
385 // which is the order the recvers expect
386 if (rank == myAgg) {
387 size_t displ = 0;
388 for (int ri = 0; ri < req.nroots; ++ri) {
389 const size_t count = rootCount[ri];
390 if (count) {
391 // &aggBuf[displ]
392 MPI_Send(aggBuf.data() + displ, count, req.sendtype, req.roots[ri], ROOT_TAG, req.comm);
393 displ += count * sendSize;
394 }
395 }
396 }
397
398 MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
399
400 // at root, copy data from contiguous buffer into recv buffer
401 if (isRoot) {
402 // set up src and dst for each block
403 auto args = cache.pimpl->get_args<RecvExecSpace>(size);
404 auto args_h = Kokkos::create_mirror_view(Kokkos::WithoutInitializing, args);
405
406 size_t srcOff = 0;
407 for (int sRank = 0; sRank < size; ++sRank) {
408 const size_t dstOff = req.rdispls[sRank] * recvSize;
409
410 void *dst = &reinterpret_cast<char *>(req.recvbuf)[dstOff];
411 void *const src = rootBuf.data() + srcOff; // &rootBuf(srcOff);
412 const size_t count = req.recvcounts[sRank] * recvSize;
413 args_h(sRank) = MemcpyArg{dst, src, count};
414
415#ifndef NDEBUG
416 if (srcOff + count > rootBuf.extent(0)) {
417 std::stringstream ss;
418 ss << __FILE__ << ":" << __LINE__ << " Tpetra internal Ialltofewv error: src access OOB in memcpy\n";
419 std::cerr << ss.str();
420 }
421#endif
422 srcOff += count;
423 }
424
425 // Actually copy the data
426 Kokkos::deep_copy(args, args_h);
427 using Policy = Kokkos::TeamPolicy<RecvExecSpace>;
428 Policy policy(size, Kokkos::AUTO);
429 Kokkos::parallel_for(
430 "Tpetra::Details::Ialltofewv: apply rdispls to contiguous root buffer", policy,
431 KOKKOS_LAMBDA(typename Policy::member_type member) {
432 team_memcpy(member, args(member.league_rank()));
433 });
434 Kokkos::fence("Tpetra::Details::Ialltofewv: after apply rdispls to contiguous root buffer");
435 }
436
437 return finalize();
438}
439} // namespace
440
441int Ialltofewv::wait(Req &req) {
442 if (req.devAccess) {
443 return wait_impl<Kokkos::DefaultExecutionSpace>(req, cache_);
444 } else {
445 return wait_impl<Kokkos::DefaultHostExecutionSpace>(req, cache_);
446 }
447}
448
449int Ialltofewv::get_status(const Req &req, int *flag, MPI_Status * /*status*/) const {
450 *flag = req.completed;
451 return MPI_SUCCESS;
452}
453
454} // namespace Tpetra::Details
Nonmember function that computes a residual Computes R = B - A * X.
void finalize()
Finalize Tpetra.