Tpetra parallel linear algebra Version of the Day
Loading...
Searching...
No Matches
Tpetra_Details_Ialltofewv.cpp
1// @HEADER
2// *****************************************************************************
3// Tpetra: Templated Linear Algebra Services Package
4//
5// Copyright 2008 NTESS and the Tpetra contributors.
6// SPDX-License-Identifier: BSD-3-Clause
7// *****************************************************************************
8// @HEADER
9
10#include "Tpetra_Details_Ialltofewv.hpp"
11
12#include <algorithm>
13#include <cmath>
14#include <vector>
15
16#include <mpi.h>
17#include <Kokkos_Core.hpp>
18
19#ifndef NDEBUG
20#include <iostream>
21#include <sstream>
22#endif
23
24namespace {
25
26struct ProfilingRegion {
27 ProfilingRegion() = delete;
28 ProfilingRegion(const ProfilingRegion &other) = delete;
29 ProfilingRegion(ProfilingRegion &&other) = delete;
30
31 ProfilingRegion(const std::string &name) {
32 Kokkos::Profiling::pushRegion(name);
33 }
34 ~ProfilingRegion() {
35 Kokkos::Profiling::popRegion();
36 }
37};
38
39struct MemcpyArg {
40 void *dst;
41 void *src;
42 size_t count;
43};
44
45template <typename T>
46KOKKOS_INLINE_FUNCTION bool is_compatible(const MemcpyArg &arg) {
47 return (0 == (uintptr_t(arg.dst) % sizeof(T))) && (0 == (uintptr_t(arg.src) & sizeof(T))) && (0 == (arg.count % sizeof(T)));
48}
49
50template <typename T, typename Member>
51KOKKOS_INLINE_FUNCTION void team_memcpy_as(const Member &member, void *dst, void *const src, size_t count) {
52 Kokkos::parallel_for(
53 Kokkos::TeamThreadRange(member, count),
54 [&](size_t i) {
55 reinterpret_cast<T *>(dst)[i] = reinterpret_cast<T const *>(src)[i];
56 });
57}
58
59template <typename Member>
60KOKKOS_INLINE_FUNCTION void team_memcpy(const Member &member, MemcpyArg &arg) {
61 if (is_compatible<uint64_t>(arg)) {
62 team_memcpy_as<uint64_t>(member, arg.dst, arg.src, arg.count / sizeof(uint64_t));
63 } else if (is_compatible<uint32_t>(arg)) {
64 team_memcpy_as<uint32_t>(member, arg.dst, arg.src, arg.count / sizeof(uint32_t));
65 } else {
66 team_memcpy_as<uint8_t>(member, arg.dst, arg.src, arg.count);
67 }
68}
69
70} // namespace
71
72namespace Tpetra::Details {
73
74struct Ialltofewv::Cache::impl {
75 impl()
76 : rootBufDev("rootBufDev", 0)
77 , rootBufHost("rootBufHost", 0)
78 , aggBufDev("aggBufDev", 0)
79 , aggBufHost("rootBufHost", 0)
80 , argsDev("argsDev", 0)
81 , argsHost("argsHost", 0)
82 , rootBufGets_(0)
83 , rootBufHits_(0)
84 , aggBufGets_(0)
85 , aggBufHits_(0)
86 , argsGets_(0)
87 , argsHits_(0)
88 , rootBufDevSize_(0)
89 , aggBufDevSize_(0)
90 , argsDevSize_(0)
91 , argsHostSize_(0)
92 , rootBufHostSize_(0)
93 , aggBufHostSize_(0) {}
94
95 // cached views
96 Kokkos::View<uint8_t *, typename Kokkos::DefaultExecutionSpace::memory_space> rootBufDev;
97 Kokkos::View<uint8_t *, typename Kokkos::DefaultHostExecutionSpace::memory_space> rootBufHost;
98 Kokkos::View<char *, typename Kokkos::DefaultExecutionSpace::memory_space> aggBufDev;
99 Kokkos::View<char *, typename Kokkos::DefaultHostExecutionSpace::memory_space> aggBufHost;
100 Kokkos::View<MemcpyArg *, typename Kokkos::DefaultExecutionSpace::memory_space> argsDev;
101 Kokkos::View<MemcpyArg *, typename Kokkos::DefaultHostExecutionSpace::memory_space> argsHost;
102
103 size_t rootBufGets_;
104 size_t rootBufHits_;
105 size_t aggBufGets_;
106 size_t aggBufHits_;
107 size_t argsGets_;
108 size_t argsHits_;
109
110 size_t rootBufDevSize_, aggBufDevSize_;
111 size_t argsDevSize_, argsHostSize_;
112 size_t rootBufHostSize_, aggBufHostSize_;
113
114 template <typename ExecSpace>
115 auto get_rootBuf(size_t size) {
116 ++rootBufGets_;
117 if constexpr (std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
118 if (rootBufDev.extent(0) < size) {
119 Kokkos::resize(Kokkos::WithoutInitializing, rootBufDev, size);
120 rootBufDevSize_ = size;
121 } else {
122 ++rootBufHits_;
123 }
124 return Kokkos::subview(rootBufDev, Kokkos::pair{size_t(0), size});
125 } else {
126 if (rootBufHost.extent(0) < size) {
127 Kokkos::resize(Kokkos::WithoutInitializing, rootBufHost, size);
128 rootBufHostSize_ = size;
129 } else {
130 ++rootBufHits_;
131 }
132 return Kokkos::subview(rootBufHost, Kokkos::pair{size_t(0), size});
133 }
134 }
135
136 template <typename ExecSpace>
137 auto get_aggBuf(size_t size) {
138 ++aggBufGets_;
139 if constexpr (std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
140 if (aggBufDev.extent(0) < size) {
141 Kokkos::resize(Kokkos::WithoutInitializing, aggBufDev, size);
142 aggBufHostSize_ = size;
143 } else {
144 ++aggBufHits_;
145 }
146 return Kokkos::subview(aggBufDev, Kokkos::pair{size_t(0), size});
147 } else {
148 if (aggBufHost.extent(0) < size) {
149 Kokkos::resize(Kokkos::WithoutInitializing, aggBufHost, size);
150 aggBufHostSize_ = size;
151 } else {
152 ++aggBufHits_;
153 }
154 return Kokkos::subview(aggBufHost, Kokkos::pair{size_t(0), size});
155 }
156 }
157
158 template <typename ExecSpace>
159 auto get_args(size_t size) {
160 ++argsGets_;
161 if constexpr (std::is_same_v<ExecSpace, Kokkos::DefaultExecutionSpace>) {
162 if (argsDev.extent(0) < size) {
163 Kokkos::resize(Kokkos::WithoutInitializing, argsDev, size);
164 argsHostSize_ = size;
165 } else {
166 ++argsHits_;
167 }
168 return Kokkos::subview(argsDev, Kokkos::pair{size_t(0), size});
169 } else {
170 if (argsHost.extent(0) < size) {
171 Kokkos::resize(Kokkos::WithoutInitializing, argsHost, size);
172 argsHostSize_ = size;
173 } else {
174 ++argsHits_;
175 }
176 return Kokkos::subview(argsHost, Kokkos::pair{size_t(0), size});
177 }
178 }
179};
180
181Ialltofewv::Cache::Cache() = default;
182Ialltofewv::Cache::~Cache() = default;
183
184namespace {
185template <typename RecvExecSpace>
186int wait_impl(Ialltofewv::Req &req, Ialltofewv::Cache &cache) {
187 auto finalize = [&]() -> int {
188 req.completed = true;
189 return MPI_SUCCESS;
190 };
191
192 if (0 == req.nroots) {
193 return finalize();
194 }
195
196 ProfilingRegion pr("alltofewv::wait");
197
198 // lazy-init view cache
199 if (!cache.pimpl) {
200 cache.pimpl = std::make_shared<Ialltofewv::Cache::impl>();
201 }
202
203 const int rank = [&]() -> int {
204 int _rank;
205 MPI_Comm_rank(req.comm, &_rank);
206 return _rank;
207 }();
208
209 const int size = [&]() -> int {
210 int _size;
211 MPI_Comm_size(req.comm, &_size);
212 return _size;
213 }();
214
215 const size_t sendSize = [&]() -> size_t {
216 int _size;
217 MPI_Type_size(req.sendtype, &_size);
218 return _size;
219 }();
220
221 const size_t recvSize = [&]() -> size_t {
222 int _size;
223 MPI_Type_size(req.recvtype, &_size);
224 return _size;
225 }();
226
227 // is this rank a root? linear search - nroots expected to be small
228 const bool isRoot = std::find(req.roots, req.roots + req.nroots, rank) != req.roots + req.nroots;
229
230 // Balance the number of incoming messages at each phase:
231 // Aggregation = size / naggs * nroots
232 // Root = naggs
233 // so
234 // size / naggs * nroots = naggs
235 // size * nroots = naggs^2
236 // naggs = sqrt(size * nroots)
237 const int naggs = std::sqrt(size_t(size) * size_t(req.nroots)) + /*rounding*/ 0.5;
238
239 // how many srcs go to each aggregator
240 const int srcsPerAgg = (size + naggs - 1) / naggs;
241
242 // the aggregator I send to
243 const int myAgg = rank / srcsPerAgg * srcsPerAgg;
244
245 // ensure aggregators know how much data each rank is sending to the root
246 // [si * nroots + r1] -> how much si'th rank in group wants to send to root ri
247 std::vector<int> groupSendCounts(size_t(req.nroots) * size_t(srcsPerAgg));
248 std::vector<MPI_Request> reqs;
249 if (rank == myAgg) {
250 reqs.reserve(srcsPerAgg);
251 // recv counts from each member of my group
252 for (int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
253 MPI_Request rreq;
254
255#ifndef NDEBUG
256 if (size_t(si) * req.nroots + req.nroots > groupSendCounts.size()) {
257 std::stringstream ss;
258 ss << __FILE__ << ":" << __LINE__
259 << " [" << rank << "] tpetra internal Ialltofewv error: OOB access in recv buffer\n";
260 std::cerr << ss.str();
261 }
262#endif
263 MPI_Irecv(&groupSendCounts[size_t(si) * size_t(req.nroots)], req.nroots, MPI_INT, si + rank,
264 req.aggTag, req.comm, &rreq);
265 reqs.push_back(rreq);
266 }
267 }
268 // send sendcounts to aggregator
269 MPI_Send(req.sendcounts, req.nroots, MPI_INT, myAgg, req.aggTag, req.comm);
270
271 MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
272 reqs.resize(0);
273
274 // at this point, in each aggregator, groupSendCounts holds the send counts
275 // from each member of the aggregator. The first nroots entries are from the first rank,
276 // the second nroots entries are from the second rank, etc
277
278 // a temporary buffer to aggregate data. Data for a root is contiguous.
279 auto aggBuf = cache.pimpl->get_aggBuf<RecvExecSpace>(0);
280 std::vector<size_t> rootCount(req.nroots, 0); // [ri] the count of data held for root ri
281 if (rank == myAgg) {
282 size_t aggBytes = 0;
283 for (int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
284 for (int ri = 0; ri < req.nroots; ++ri) {
285 int count = groupSendCounts[si * req.nroots + ri];
286 rootCount[ri] += count;
287 aggBytes += count * sendSize;
288 }
289 }
290
291 aggBuf = cache.pimpl->get_aggBuf<RecvExecSpace>(aggBytes);
292 }
293 // now, on the aggregator ranks,
294 // * aggBuf is resized to accomodate all incoming data
295 // * rootCount holds how much data i hold for each root
296
297 // Send the actual data to the aggregator
298 if (rank == myAgg) {
299 reqs.reserve(srcsPerAgg + req.nroots);
300 // receive from all ranks in my group
301 size_t displ = 0;
302 // senders will send in root order, so we will recv in that order as well
303 // this puts all data for a root contiguous in the aggregation buffer
304 for (int ri = 0; ri < req.nroots; ++ri) {
305 for (int si = 0; si < srcsPerAgg && si + rank < size; ++si) {
306 // receive data for the ri'th root from the si'th sender
307 const int count = groupSendCounts[si * req.nroots + ri];
308 if (count) {
309#ifndef NDEBUG
310 if (displ + count * sendSize > aggBuf.size()) {
311 std::stringstream ss;
312 ss << __FILE__ << ":" << __LINE__
313 << " [" << rank << "] tpetra internal Ialltofewv error: OOB access in send buffer\n";
314 std::cerr << ss.str();
315 }
316#endif
317 MPI_Request rreq;
318 // &aggBuf(displ)
319 MPI_Irecv(aggBuf.data() + displ, count, req.sendtype, si + rank, req.aggTag, req.comm, &rreq);
320 reqs.push_back(rreq);
321 displ += size_t(count) * sendSize;
322 }
323 }
324 }
325 } else {
326 reqs.reserve(req.nroots); // prepare for one send per root
327 }
328
329 // send data to aggregator
330 for (int ri = 0; ri < req.nroots; ++ri) {
331 const size_t displ = size_t(req.sdispls[ri]) * sendSize;
332 const int count = req.sendcounts[ri];
333 if (count) {
334 MPI_Request sreq;
335 MPI_Isend(&reinterpret_cast<const char *>(req.sendbuf)[displ], req.sendcounts[ri],
336 req.sendtype, myAgg, req.aggTag, req.comm, &sreq);
337 reqs.push_back(sreq);
338 }
339 }
340
341 MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
342 reqs.resize(0);
343
344 // if I am a root, receive data from each aggregator
345 // The aggregator will send contiguous data, which we may need to spread out according to rdispls
346 auto rootBuf = cache.pimpl->get_rootBuf<RecvExecSpace>(0);
347 if (isRoot) {
348 reqs.reserve(naggs); // receive from each aggregator
349
350 const size_t totalRecvd = recvSize * [&]() -> size_t {
351 size_t acc = 0;
352 for (int i = 0; i < size; ++i) {
353 acc += req.recvcounts[i];
354 }
355 return acc;
356 }();
357 rootBuf = cache.pimpl->get_rootBuf<RecvExecSpace>(totalRecvd);
358
359 // Receive data from each aggregator.
360 // Aggregators send data in order of the ranks they're aggregating,
361 // which is also the order the root needs in its recv buffer.
362 size_t displ = 0;
363 for (int aggSrc = 0; aggSrc < size; aggSrc += srcsPerAgg) {
364 // tally up the total data to recv from the sending aggregator
365 int count = 0;
366 for (int origSrc = aggSrc;
367 origSrc < aggSrc + srcsPerAgg && origSrc < size; ++origSrc) {
368 count += req.recvcounts[origSrc];
369 }
370
371 if (count) {
372 MPI_Request rreq;
373 MPI_Irecv(rootBuf.data() + displ, count, req.recvtype, aggSrc, req.rootTag, req.comm, &rreq);
374 reqs.push_back(rreq);
375 displ += size_t(count) * recvSize;
376 }
377 }
378 }
379
380 // if I am an aggregator, forward data to the roots
381 // To each root, send my data in order of the ranks that sent to me
382 // which is the order the recvers expect
383 if (rank == myAgg) {
384 size_t displ = 0;
385 for (int ri = 0; ri < req.nroots; ++ri) {
386 const size_t count = rootCount[ri];
387 if (count) {
388 // &aggBuf[displ]
389 MPI_Send(aggBuf.data() + displ, count, req.sendtype, req.roots[ri], req.rootTag, req.comm);
390 displ += count * sendSize;
391 }
392 }
393 }
394
395 MPI_Waitall(reqs.size(), reqs.data(), MPI_STATUSES_IGNORE);
396
397 // at root, copy data from contiguous buffer into recv buffer
398 if (isRoot) {
399 // set up src and dst for each block
400 auto args = cache.pimpl->get_args<RecvExecSpace>(size);
401 auto args_h = Kokkos::create_mirror_view(Kokkos::WithoutInitializing, args);
402
403 size_t srcOff = 0;
404 for (int sRank = 0; sRank < size; ++sRank) {
405 const size_t dstOff = req.rdispls[sRank] * recvSize;
406
407 void *dst = &reinterpret_cast<char *>(req.recvbuf)[dstOff];
408 void *const src = rootBuf.data() + srcOff; // &rootBuf(srcOff);
409 const size_t count = req.recvcounts[sRank] * recvSize;
410 args_h(sRank) = MemcpyArg{dst, src, count};
411
412#ifndef NDEBUG
413 if (srcOff + count > rootBuf.extent(0)) {
414 std::stringstream ss;
415 ss << __FILE__ << ":" << __LINE__ << " Tpetra internal Ialltofewv error: src access OOB in memcpy\n";
416 std::cerr << ss.str();
417 }
418#endif
419 srcOff += count;
420 }
421
422 // Actually copy the data
423 Kokkos::deep_copy(args, args_h);
424 using Policy = Kokkos::TeamPolicy<RecvExecSpace>;
425 Policy policy(size, Kokkos::AUTO);
426 Kokkos::parallel_for(
427 "Tpetra::Details::Ialltofewv: apply rdispls to contiguous root buffer", policy,
428 KOKKOS_LAMBDA(typename Policy::member_type member) {
429 team_memcpy(member, args(member.league_rank()));
430 });
431 Kokkos::fence("Tpetra::Details::Ialltofewv: after apply rdispls to contiguous root buffer");
432 }
433
434 return finalize();
435}
436} // namespace
437
438int Ialltofewv::wait(Req &req) {
439 if (req.devAccess) {
440 return wait_impl<Kokkos::DefaultExecutionSpace>(req, cache_);
441 } else {
442 return wait_impl<Kokkos::DefaultHostExecutionSpace>(req, cache_);
443 }
444}
445
446int Ialltofewv::get_status(const Req &req, int *flag, MPI_Status * /*status*/) const {
447 *flag = req.completed;
448 return MPI_SUCCESS;
449}
450
451} // namespace Tpetra::Details
Nonmember function that computes a residual Computes R = B - A * X.
void finalize()
Finalize Tpetra.