Tpetra parallel linear algebra Version of the Day
Loading...
Searching...
No Matches
Tpetra_Details_DistributorActor.hpp
1// @HEADER
2// *****************************************************************************
3// Tpetra: Templated Linear Algebra Services Package
4//
5// Copyright 2008 NTESS and the Tpetra contributors.
6// SPDX-License-Identifier: BSD-3-Clause
7// *****************************************************************************
8// @HEADER
9
10#ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
11#define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
12
13#include <optional>
14#include <vector>
15
16#include "Teuchos_Assert.hpp"
18#include "Tpetra_Util.hpp"
19
20#include "Teuchos_Array.hpp"
21#include "Teuchos_Comm.hpp"
24#include "Teuchos_RCP.hpp"
25
26#include "Kokkos_TeuchosCommAdapters.hpp"
27
28#ifdef HAVE_TPETRA_MPI
29#include "mpi.h"
30#include "Tpetra_Details_Ialltofewv.hpp"
31#endif
32
33namespace Tpetra::Details {
34
35template <class View>
36constexpr bool isKokkosView = Kokkos::is_view<View>::value;
37
38template <class View1, class View2>
39constexpr bool areKokkosViews = Kokkos::is_view<View1>::value&& Kokkos::is_view<View2>::value;
40
41class DistributorActor {
42 static constexpr int DEFAULT_MPI_TAG = 1;
43
44 using IndexView = DistributorPlan::IndexView;
45 using SubViewLimits = DistributorPlan::SubViewLimits;
46
47 public:
48 DistributorActor();
49 DistributorActor(const DistributorActor& otherActor) = default;
50
51 template <class ExpView, class ImpView>
52 void doPostsAndWaits(const DistributorPlan& plan,
53 const ExpView& exports,
54 size_t numPackets,
55 const ImpView& imports);
56
57 template <class ExpView, class ImpView>
58 void doPostsAndWaits(const DistributorPlan& plan,
59 const ExpView& exports,
60 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
61 const ImpView& imports,
62 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
63
64 template <class ImpView>
65 void doPostRecvs(const DistributorPlan& plan,
66 size_t numPackets,
67 const ImpView& imports);
68
69 template <class ImpView>
70 void doPostRecvs(const DistributorPlan& plan,
71 const ImpView& imports,
72 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
73
74 template <class ExpView, class ImpView>
75 void doPostSends(const DistributorPlan& plan,
76 const ExpView& exports,
77 size_t numPackets,
78 const ImpView& imports);
79
80 template <class ExpView, class ImpView>
81 void doPostSends(const DistributorPlan& plan,
82 const ExpView& exports,
83 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
84 const ImpView& imports,
85 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
86
87 template <class ExpView, class ImpView>
88 void doPosts(const DistributorPlan& plan,
89 const ExpView& exports,
90 size_t numPackets,
91 const ImpView& imports);
92
93 template <class ExpView, class ImpView>
94 void doPosts(const DistributorPlan& plan,
95 const ExpView& exports,
96 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
97 const ImpView& imports,
98 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
99
100 void doWaits(const DistributorPlan& plan);
101
102 void doWaitsRecv(const DistributorPlan& plan);
103
104 void doWaitsSend(const DistributorPlan& plan);
105
106 void doWaitsIalltofewv(const DistributorPlan& plan);
107
108 bool isReady() const;
109
110 private:
111 template <class ImpView>
112 void doPostRecvsImpl(const DistributorPlan& plan,
113 const ImpView& imports,
114 const SubViewLimits& totalPacketsFrom);
115
116 template <class ExpView, class ImpView>
117 void doPostSendsImpl(const DistributorPlan& plan,
118 const ExpView& exports,
119 const SubViewLimits& exportSubViewLimits,
120 const ImpView& imports,
121 const SubViewLimits& importSubViewLimits);
122
123#ifdef HAVE_TPETRA_MPI
124 template <class ExpView, class ImpView>
125 void doPostsAllToAllImpl(const DistributorPlan& plan,
126 const ExpView& exports,
127 const SubViewLimits& exportSubViewLimits,
128 const ImpView& imports,
129 const SubViewLimits& importSubViewLimits);
130
131#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
132 template <class ExpView, class ImpView>
133 void doPostsNbrAllToAllVImpl(const DistributorPlan& plan,
134 const ExpView& exports,
135 const SubViewLimits& exportSubViewLimits,
136 const ImpView& imports,
137 const SubViewLimits& importSubViewLimits);
138#endif // HAVE_TPETRACORE_MPI_ADVANCE
139
140 template <typename ExpView, typename ImpView>
141 void doPostsIalltofewvImpl(const DistributorPlan& plan,
142 const ExpView& exports,
143 const SubViewLimits& exportSubViewLimits,
144 const ImpView& imports,
145 const SubViewLimits& importSubViewLimits);
146
147 // ialltofewv members
148 struct {
149 Details::Ialltofewv impl;
150 std::optional<Details::Ialltofewv::Req> req;
151 Teuchos::RCP<std::vector<int>> sendcounts;
152 Teuchos::RCP<std::vector<int>> sdispls;
153 Teuchos::RCP<std::vector<int>> recvcounts;
154 Teuchos::RCP<std::vector<int>> rdispls;
155 std::vector<int> roots;
156 } ialltofewv_;
157
158#endif // HAVE_TPETRA_MPI
159
160 int mpiTag_;
161
162 Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsRecv_;
163 Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsSend_;
164};
165
166template <class ExpView, class ImpView>
167void DistributorActor::doPosts(const DistributorPlan& plan,
168 const ExpView& exports,
169 size_t numPackets,
170 const ImpView& imports) {
171 doPostRecvs(plan, numPackets, imports);
172 doPostSends(plan, exports, numPackets, imports);
173}
174
175template <class ExpView, class ImpView>
176void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
177 const ExpView& exports,
178 size_t numPackets,
179 const ImpView& imports) {
180 static_assert(areKokkosViews<ExpView, ImpView>,
181 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
182 doPosts(plan, exports, numPackets, imports);
183 doWaits(plan);
184}
185
186template <class ExpView, class ImpView>
187void DistributorActor::doPosts(const DistributorPlan& plan,
188 const ExpView& exports,
189 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
190 const ImpView& imports,
191 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
192 doPostRecvs(plan, imports, numImportPacketsPerLID);
193 doPostSends(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
194}
195
196template <class ExpView, class ImpView>
197void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
198 const ExpView& exports,
199 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
200 const ImpView& imports,
201 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
202 static_assert(areKokkosViews<ExpView, ImpView>,
203 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
204 doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
205 doWaits(plan);
206}
207
208template <typename ViewType>
209using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
210
211template <typename DstViewType, typename SrcViewType>
212using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
213 HostAccessibility<SrcViewType>::accessible>;
214
215template <typename DstViewType, typename SrcViewType>
216using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
217 !HostAccessibility<SrcViewType>::accessible>;
218
219template <typename DstViewType, typename SrcViewType>
220enableIfHostAccessible<DstViewType, SrcViewType>
221packOffset(const DstViewType& dst,
222 const SrcViewType& src,
223 const size_t dst_offset,
224 const size_t src_offset,
225 const size_t size) {
226 memcpy((void*)(dst.data() + dst_offset), src.data() + src_offset, size * sizeof(typename DstViewType::value_type));
227}
228
229template <typename DstViewType, typename SrcViewType>
230enableIfNotHostAccessible<DstViewType, SrcViewType>
231packOffset(const DstViewType& dst,
232 const SrcViewType& src,
233 const size_t dst_offset,
234 const size_t src_offset,
235 const size_t size) {
236 Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
237}
238
239#ifdef HAVE_TPETRA_MPI
240
241template <class ExpView, class ImpView>
242void DistributorActor::doPostsIalltofewvImpl(const DistributorPlan& plan,
243 const ExpView& exports,
244 const SubViewLimits& exportSubViewLimits,
245 const ImpView& imports,
246 const SubViewLimits& importSubViewLimits) {
247 using size_type = Teuchos::Array<size_t>::size_type;
248 using ExportValue = typename ExpView::non_const_value_type;
249
250 ProfilingRegion pr("Tpetra::Distributor::doPostsIalltofewvImpl");
251
252 TEUCHOS_TEST_FOR_EXCEPTION(
253 !plan.getIndicesTo().is_null(), std::runtime_error,
254 "Send Type=\"Ialltofewv\" only works for fast-path communication.");
255
256 TEUCHOS_TEST_FOR_EXCEPTION(
257 bool(ialltofewv_.req), std::runtime_error,
258 "This actor has an active Ialltofewv already");
259
260 TEUCHOS_TEST_FOR_EXCEPTION(
261 bool(ialltofewv_.sendcounts), std::runtime_error,
262 "This actor has an active Ialltofewv already");
263
264 TEUCHOS_TEST_FOR_EXCEPTION(
265 bool(ialltofewv_.sdispls), std::runtime_error,
266 "This actor has an active Ialltofewv already");
267
268 TEUCHOS_TEST_FOR_EXCEPTION(
269 bool(ialltofewv_.recvcounts), std::runtime_error,
270 "This actor has an active Ialltofewv already");
271
272 TEUCHOS_TEST_FOR_EXCEPTION(
273 bool(ialltofewv_.rdispls), std::runtime_error,
274 "This actor has an active Ialltofewv already");
275
276 auto comm = plan.getComm();
277
278 const auto& [importStarts, importLengths] = importSubViewLimits;
279 const auto& [exportStarts, exportLengths] = exportSubViewLimits;
280
281 ialltofewv_.roots = plan.getRoots();
282 const int nroots = ialltofewv_.roots.size();
283 const int* roots = ialltofewv_.roots.data();
284 ialltofewv_.req = std::make_optional<Details::Ialltofewv::Req>();
285 ialltofewv_.sendcounts = Teuchos::rcp(new std::vector<int>(nroots));
286 ialltofewv_.sdispls = Teuchos::rcp(new std::vector<int>(nroots));
287 ialltofewv_.recvcounts = Teuchos::rcp(new std::vector<int>);
288 ialltofewv_.rdispls = Teuchos::rcp(new std::vector<int>);
289
290 for (int rootIdx = 0; rootIdx < nroots; ++rootIdx) {
291 const int root = roots[rootIdx];
292
293 // if we can't find the root proc index in our plan, it just means we send 0
294 // also make sure root only appears once in getProcsTo()
295 size_type rootProcIndex = plan.getProcsTo().size(); // sentinel value -> not found
296 for (size_type pi = 0; pi < plan.getProcsTo().size(); ++pi) {
297 if (plan.getProcsTo()[pi] == root) {
298 rootProcIndex = pi;
299 break;
300 }
301 }
302
303 // am I sending to root?
304 int sendcount = 0;
305 if (rootProcIndex != plan.getProcsTo().size()) {
306 sendcount = exportLengths[rootProcIndex];
307 }
308 (*ialltofewv_.sendcounts)[rootIdx] = sendcount;
309
310 int sdispl = 0;
311 if (0 != sendcount) {
312 sdispl = exportStarts[rootProcIndex];
313 }
314 (*ialltofewv_.sdispls)[rootIdx] = sdispl;
315
316 // If I happen to be this root, set up my receive metadata
317 if (comm->getRank() == root) {
318 // don't recv anything from anywhere by default
319 ialltofewv_.recvcounts->resize(comm->getSize());
320 std::fill(ialltofewv_.recvcounts->begin(), ialltofewv_.recvcounts->end(), 0);
321 ialltofewv_.rdispls->resize(comm->getSize());
322 std::fill(ialltofewv_.rdispls->begin(), ialltofewv_.rdispls->end(), 0);
323
324 const size_type actualNumReceives =
325 Teuchos::as<size_type>(plan.getNumReceives()) +
326 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
327
328 for (size_type i = 0; i < actualNumReceives; ++i) {
329 const int src = plan.getProcsFrom()[i];
330 (*ialltofewv_.rdispls)[src] = importStarts[i];
331 (*ialltofewv_.recvcounts)[src] = Teuchos::as<int>(importLengths[i]);
332 }
333 }
334
335 } // rootIdx
336
337 // TODO: do we need to pass ExportValue{} here?
338 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<ExportValue>::getType(ExportValue{});
339 // FIXME: is there a better way to do this?
340 Teuchos::RCP<const Teuchos::MpiComm<int>> tMpiComm =
341 Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
342 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> oMpiComm =
343 tMpiComm->getRawMpiComm();
344 MPI_Comm mpiComm = (*oMpiComm)();
345
346 // don't care about send-side accessibility because it's not accessed through kokkos
347 // rely on MPI to do the right thing
348 constexpr bool recvDevAccess = Kokkos::SpaceAccessibility<
349 Kokkos::DefaultExecutionSpace, typename ImpView::memory_space>::accessible;
350 constexpr bool sendDevAccess = Kokkos::SpaceAccessibility<
351 Kokkos::DefaultExecutionSpace, typename ExpView::memory_space>::accessible;
352 static_assert(recvDevAccess == sendDevAccess, "sending across host/device");
353
354 const int err = ialltofewv_.impl.post<recvDevAccess>(exports.data(), ialltofewv_.sendcounts->data(), ialltofewv_.sdispls->data(), rawType,
355 imports.data(), ialltofewv_.recvcounts->data(), ialltofewv_.rdispls->data(),
356 roots, nroots,
357 rawType,
358 mpiTag_, mpiComm, &(*ialltofewv_.req));
359
360 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
361 "ialltofewv failed with error \""
362 << Teuchos::mpiErrorCodeToString(err)
363 << "\".");
364}
365
366template <class ExpView, class ImpView>
367void DistributorActor::doPostsAllToAllImpl(const DistributorPlan& plan,
368 const ExpView& exports,
369 const SubViewLimits& exportSubViewLimits,
370 const ImpView& imports,
371 const SubViewLimits& importSubViewLimits) {
372 TEUCHOS_TEST_FOR_EXCEPTION(
373 !plan.getIndicesTo().is_null(), std::runtime_error,
374 "Send Type=\"Alltoall\" only works for fast-path communication.");
375
376 using size_type = Teuchos::Array<size_t>::size_type;
377
378 auto comm = plan.getComm();
379 std::vector<int> sendcounts(comm->getSize(), 0);
380 std::vector<int> sdispls(comm->getSize(), 0);
381 std::vector<int> recvcounts(comm->getSize(), 0);
382 std::vector<int> rdispls(comm->getSize(), 0);
383
384 auto& [importStarts, importLengths] = importSubViewLimits;
385 auto& [exportStarts, exportLengths] = exportSubViewLimits;
386
387 for (size_t pp = 0; pp < plan.getNumSends(); ++pp) {
388 sdispls[plan.getProcsTo()[pp]] = exportStarts[pp];
389 size_t numPackets = exportLengths[pp];
390 // numPackets is converted down to int, so make sure it can be represented
391 TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
392 "Tpetra::Distributor::doPostsAllToAll: "
393 "Send count for send "
394 << pp << " (" << numPackets
395 << ") is too large "
396 "to be represented as int.");
397 sendcounts[plan.getProcsTo()[pp]] = static_cast<int>(numPackets);
398 }
399
400 const size_type actualNumReceives =
401 Teuchos::as<size_type>(plan.getNumReceives()) +
402 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
403
404 for (size_type i = 0; i < actualNumReceives; ++i) {
405 rdispls[plan.getProcsFrom()[i]] = importStarts[i];
406 size_t totalPacketsFrom_i = importLengths[i];
407 // totalPacketsFrom_i is converted down to int, so make sure it can be
408 // represented
409 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
410 std::logic_error,
411 "Tpetra::Distributor::doPostsAllToAll: "
412 "Recv count for receive "
413 << i << " (" << totalPacketsFrom_i
414 << ") is too large "
415 "to be represented as int.");
416 recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(totalPacketsFrom_i);
417 }
418
419 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
420 Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
421 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
422 mpiComm->getRawMpiComm();
423 using T = typename ExpView::non_const_value_type;
424 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
425
426#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
427 if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
428 MPIX_Comm* mpixComm = *plan.getMPIXComm();
429 TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
430 "MPIX_Comm is null in doPostsAllToAll \""
431 << __FILE__ << ":" << __LINE__);
432
433 const int err = MPIX_Alltoallv(
434 exports.data(), sendcounts.data(), sdispls.data(), rawType,
435 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
436
437 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
438 "MPIX_Alltoallv failed with error \""
439 << Teuchos::mpiErrorCodeToString(err)
440 << "\".");
441
442 return;
443 }
444#endif // HAVE_TPETRACORE_MPI_ADVANCE
445
446 const int err = MPI_Alltoallv(
447 exports.data(), sendcounts.data(), sdispls.data(), rawType,
448 imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
449
450 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
451 "MPI_Alltoallv failed with error \""
452 << Teuchos::mpiErrorCodeToString(err)
453 << "\".");
454}
455
456#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
457template <class ExpView, class ImpView>
458void DistributorActor::doPostsNbrAllToAllVImpl(const DistributorPlan& plan,
459 const ExpView& exports,
460 const SubViewLimits& exportSubViewLimits,
461 const ImpView& imports,
462 const SubViewLimits& importSubViewLimits) {
463 TEUCHOS_TEST_FOR_EXCEPTION(
464 !plan.getIndicesTo().is_null(), std::runtime_error,
465 "Send Type=\"Alltoall\" only works for fast-path communication.");
466
467 const int myRank = plan.getComm()->getRank();
468 MPIX_Comm* mpixComm = *plan.getMPIXComm();
469 using size_type = Teuchos::Array<size_t>::size_type;
470
471 const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
472 const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
473 std::vector<int> sendcounts(numSends, 0);
474 std::vector<int> sdispls(numSends, 0);
475 std::vector<int> recvcounts(numRecvs, 0);
476 std::vector<int> rdispls(numRecvs, 0);
477
478 auto& [importStarts, importLengths] = importSubViewLimits;
479 auto& [exportStarts, exportLengths] = exportSubViewLimits;
480
481 for (size_t pp = 0; pp < numSends; ++pp) {
482 sdispls[pp] = exportStarts[pp];
483 size_t numPackets = exportLengths[pp];
484 // numPackets is converted down to int, so make sure it can be represented
485 TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
486 "Tpetra::Distributor::doPostsNbrAllToAllV: "
487 "Send count for send "
488 << pp << " (" << numPackets
489 << ") is too large "
490 "to be represented as int.");
491 sendcounts[pp] = static_cast<int>(numPackets);
492 }
493
494 for (size_type i = 0; i < numRecvs; ++i) {
495 rdispls[i] = importStarts[i];
496 size_t totalPacketsFrom_i = importLengths[i];
497 // totalPacketsFrom_i is converted down to int, so make sure it can be
498 // represented
499 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
500 std::logic_error,
501 "Tpetra::Distributor::doPostsNbrAllToAllV: "
502 "Recv count for receive "
503 << i << " (" << totalPacketsFrom_i
504 << ") is too large "
505 "to be represented as int.");
506 recvcounts[i] = static_cast<int>(totalPacketsFrom_i);
507 }
508
509 using T = typename ExpView::non_const_value_type;
510 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
511
512 MPIX_Info* xinfo;
513 MPIX_Topo* xtopo;
514 MPIX_Info_init(&xinfo);
515 MPIX_Topo_init(numRecvs, plan.getProcsFrom().data(), recvcounts.data(),
516 numSends, plan.getProcsTo().data(), sendcounts.data(), xinfo, &xtopo);
517 const int err = MPIX_Neighbor_alltoallv_topo(
518 exports.data(), sendcounts.data(), sdispls.data(), rawType,
519 imports.data(), recvcounts.data(), rdispls.data(), rawType, xtopo, mpixComm);
520 MPIX_Topo_free(&xtopo);
521 MPIX_Info_free(&xinfo);
522
523 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
524 "MPIX_Neighbor_alltoallv failed with error \""
525 << Teuchos::mpiErrorCodeToString(err)
526 << "\".");
527}
528#endif // HAVE_TPETRACORE_MPI_ADVANCE
529#endif // HAVE_TPETRA_MPI
530
531template <class ImpView>
532void DistributorActor::doPostRecvs(const DistributorPlan& plan,
533 size_t numPackets,
534 const ImpView& imports) {
535 auto importSubViewLimits = plan.getImportViewLimits(numPackets);
536 doPostRecvsImpl(plan, imports, importSubViewLimits);
537}
538
539template <class ImpView>
540void DistributorActor::doPostRecvs(const DistributorPlan& plan,
541 const ImpView& imports,
542 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
543 auto importSubViewLimits = plan.getImportViewLimits(numImportPacketsPerLID);
544 doPostRecvsImpl(plan, imports, importSubViewLimits);
545}
546
547template <class ImpView>
548void DistributorActor::doPostRecvsImpl(const DistributorPlan& plan,
549 const ImpView& imports,
550 const SubViewLimits& importSubViewLimits) {
551 static_assert(isKokkosView<ImpView>,
552 "Data arrays for DistributorActor::doPostRecvs must be Kokkos::Views");
553 using Kokkos::Compat::subview_offset;
554 using Teuchos::Array;
555 using Teuchos::as;
556 using Teuchos::ireceive;
557 using size_type = Array<size_t>::size_type;
558 using imports_view_type = ImpView;
559
560#ifdef KOKKOS_ENABLE_CUDA
561 static_assert(!std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
562 "Please do not use Tpetra::Distributor with UVM "
563 "allocations. See GitHub issue #1088.");
564#endif // KOKKOS_ENABLE_CUDA
565
566#ifdef KOKKOS_ENABLE_SYCL
567 static_assert(!std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
568 "Please do not use Tpetra::Distributor with SharedUSM "
569 "allocations. See GitHub issue #1088 (corresponding to CUDA).");
570#endif // KOKKOS_ENABLE_SYCL
571
572#if defined(HAVE_TPETRA_MPI)
573 // All-to-all communication layout is quite different from
574 // point-to-point, so we handle it separately.
575
576 // These send options require no matching receives, so we just return.
577 const Details::EDistributorSendType sendType = plan.getSendType();
578 if ((sendType == Details::DISTRIBUTOR_ALLTOALL) || (sendType == Details::DISTRIBUTOR_IALLTOFEWV)
579#ifdef HAVE_TPETRACORE_MPI_ADVANCE
580 || (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) || (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV)
581#endif
582 ) {
583 return;
584 }
585#endif // HAVE_TPETRA_MPI
586
587 ProfilingRegion pr("Tpetra::Distributor::doPostRecvs");
588
589 const int myProcID = plan.getComm()->getRank();
590
591 auto& [importStarts, importLengths] = importSubViewLimits;
592
593 // Distributor uses requestsRecv_.size() and requestsSend_.size()
594 // as the number of outstanding nonblocking message requests, so
595 // we resize to zero to maintain this invariant.
596 //
597 // getNumReceives() does _not_ include the self message, if there is
598 // one. Here, we do actually send a message to ourselves, so we
599 // include any self message in the "actual" number of receives to
600 // post.
601 //
602 // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
603 // doesn't (re)allocate its array of requests. That happens in
604 // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
605 // demand), or Resize_().
606 const size_type actualNumReceives = as<size_type>(plan.getNumReceives()) +
607 as<size_type>(plan.hasSelfMessage() ? 1 : 0);
608
609#ifdef HAVE_TPETRA_DEBUG
610 size_t totalNumImportPackets = 0;
611 for (size_t i = 0; i < Teuchos::as<size_t>(actualNumReceives); ++i) {
612 totalNumImportPackets += importLengths[i];
613 }
614 TEUCHOS_TEST_FOR_EXCEPTION(
615 imports.extent(0) < totalNumImportPackets, std::runtime_error,
616 "Tpetra::Distributor::doPostRecvs: The 'imports' array must have "
617 "enough entries to hold the expected number of import packets. "
618 "imports.extent(0) = "
619 << imports.extent(0) << " < "
620 "totalNumImportPackets = "
621 << totalNumImportPackets << ".");
622 TEUCHOS_TEST_FOR_EXCEPTION(!requestsRecv_.empty(), std::logic_error,
623 "Tpetra::Distributor::"
624 "doPostRecvs: Process "
625 << myProcID << ": requestsRecv_.size () = "
626 << requestsRecv_.size() << " != 0.");
627#endif // HAVE_TPETRA_DEBUG
628
629 requestsRecv_.resize(0);
630
631 // Post the nonblocking receives. It's common MPI wisdom to post
632 // receives before sends. In MPI terms, this means favoring
633 // adding to the "posted queue" (of receive requests) over adding
634 // to the "unexpected queue" (of arrived messages not yet matched
635 // with a receive).
636 {
637 ProfilingRegion prr("Tpetra::Distributor::doPostRecvs MPI_Irecv");
638
639 for (size_type i = 0; i < actualNumReceives; ++i) {
640 size_t totalPacketsFrom_i = importLengths[Teuchos::as<size_t>(i)];
641 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
642 std::logic_error,
643 "Tpetra::Distributor::doPostRecvs: "
644 "Recv count for receive "
645 << i << " (" << totalPacketsFrom_i << ") is too large "
646 "to be represented as int.");
647 if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
648 // If my process is receiving these packet(s) from another
649 // process (not a self-receive), and if there is at least
650 // one packet to receive:
651 //
652 // 1. Set up the persisting view (recvBuf) into the imports
653 // array, given the offset and size (total number of
654 // packets from process getProcsFrom()[i]).
655 // 2. Start the Irecv and save the resulting request.
656 imports_view_type recvBuf =
657 subview_offset(imports, importStarts[i], totalPacketsFrom_i);
658 requestsRecv_.push_back(ireceive<int>(recvBuf, plan.getProcsFrom()[i],
659 mpiTag_, *plan.getComm()));
660 }
661 }
662 }
663}
664
665template <class ExpView, class ImpView>
666void DistributorActor::doPostSends(const DistributorPlan& plan,
667 const ExpView& exports,
668 size_t numPackets,
669 const ImpView& imports) {
670 auto exportSubViewLimits = plan.getExportViewLimits(numPackets);
671 auto importSubViewLimits = plan.getImportViewLimits(numPackets);
672 doPostSendsImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
673}
674
675template <class ExpView, class ImpView>
676void DistributorActor::doPostSends(const DistributorPlan& plan,
677 const ExpView& exports,
678 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
679 const ImpView& imports,
680 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
681 auto exportSubViewLimits = plan.getExportViewLimits(numExportPacketsPerLID);
682 auto importSubViewLimits = plan.getImportViewLimits(numImportPacketsPerLID);
683 doPostSendsImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
684}
685
686template <class ExpView, class ImpView>
687void DistributorActor::doPostSendsImpl(const DistributorPlan& plan,
688 const ExpView& exports,
689 const SubViewLimits& exportSubViewLimits,
690 const ImpView& imports,
691 const SubViewLimits& importSubViewLimits) {
692 static_assert(areKokkosViews<ExpView, ImpView>,
693 "Data arrays for DistributorActor::doPostSends must be Kokkos::Views");
694 using Kokkos::Compat::deep_copy_offset;
695 using Kokkos::Compat::subview_offset;
696 using Teuchos::Array;
697 using Teuchos::as;
698 using Teuchos::isend;
699 using Teuchos::send;
700 using size_type = Array<size_t>::size_type;
701 using exports_view_type = ExpView;
702
703#ifdef KOKKOS_ENABLE_CUDA
704 static_assert(!std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
705 !std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
706 "Please do not use Tpetra::Distributor with UVM allocations. "
707 "See Trilinos GitHub issue #1088.");
708#endif // KOKKOS_ENABLE_CUDA
709
710#ifdef KOKKOS_ENABLE_SYCL
711 static_assert(!std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
712 !std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
713 "Please do not use Tpetra::Distributor with SharedUSM allocations. "
714 "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
715#endif // KOKKOS_ENABLE_SYCL
716
717 ProfilingRegion ps("Tpetra::Distributor::doPostSends");
718
719 const int myRank = plan.getComm()->getRank();
720 // Run-time configurable parameters that come from the input
721 // ParameterList set by setParameterList().
722 const Details::EDistributorSendType sendType = plan.getSendType();
723
724 auto& [exportStarts, exportLengths] = exportSubViewLimits;
725 auto& [importStarts, importLengths] = importSubViewLimits;
726
727#if defined(HAVE_TPETRA_MPI)
728 // All-to-all communication layout is quite different from
729 // point-to-point, so we handle it separately.
730
731 if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
732 doPostsAllToAllImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
733 return;
734 } else if (sendType == Details::DISTRIBUTOR_IALLTOFEWV) {
735 doPostsIalltofewvImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
736 return;
737 }
738#ifdef HAVE_TPETRACORE_MPI_ADVANCE
739 else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
740 doPostsAllToAllImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
741 return;
742 } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
743 doPostsNbrAllToAllVImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
744 return;
745 }
746#endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
747
748#else // HAVE_TPETRA_MPI
749 if (plan.hasSelfMessage()) {
750 // This is how we "send a message to ourself": we copy from
751 // the export buffer to the import buffer. That saves
752 // Teuchos::Comm implementations other than MpiComm (in
753 // particular, SerialComm) the trouble of implementing self
754 // messages correctly. (To do this right, SerialComm would
755 // need internal buffer space for messages, keyed on the
756 // message's tag.)
757 size_t selfReceiveOffset = 0;
758 deep_copy_offset(imports, exports, selfReceiveOffset,
759 exportStarts[0],
760 exportLengths[0]);
761 }
762 // should we just return here?
763 // likely not as comm could be a serial comm
764#endif // HAVE_TPETRA_MPI
765
766 size_t selfReceiveOffset = 0;
767
768#ifdef HAVE_TPETRA_DEBUG
769 TEUCHOS_TEST_FOR_EXCEPTION(requestsSend_.size() != 0,
770 std::logic_error,
771 "Tpetra::Distributor::doPostSends: Process "
772 << myRank << ": requestsSend_.size() = " << requestsSend_.size() << " != 0.");
773#endif // HAVE_TPETRA_DEBUG
774
775 // Distributor uses requestsRecv_.size() and requestsSend_.size()
776 // as the number of outstanding nonblocking message requests, so
777 // we resize to zero to maintain this invariant.
778 //
779 // getNumReceives() does _not_ include the self message, if there is
780 // one. Here, we do actually send a message to ourselves, so we
781 // include any self message in the "actual" number of receives to
782 // post.
783 //
784 // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
785 // doesn't (re)allocate its array of requests. That happens in
786 // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
787 // demand), or Resize_().
788 const size_type actualNumReceives = as<size_type>(plan.getNumReceives()) +
789 as<size_type>(plan.hasSelfMessage() ? 1 : 0);
790 requestsSend_.resize(0);
791
792 {
793 for (size_type i = 0; i < actualNumReceives; ++i) {
794 if (plan.getProcsFrom()[i] == myRank) { // Receiving from myself
795 selfReceiveOffset = importStarts[i]; // Remember the self-recv offset
796 }
797 }
798 }
799
800 ProfilingRegion pss("Tpetra::Distributor::doPostSends sends");
801
802 // setup scan through getProcsTo() list starting with higher numbered procs
803 // (should help balance message traffic)
804 //
805 // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
806 // It doesn't depend on the input at all.
807 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
808 size_t procIndex = 0;
809 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
810 ++procIndex;
811 }
812 if (procIndex == numBlocks) {
813 procIndex = 0;
814 }
815
816 size_t selfNum = 0;
817 size_t selfIndex = 0;
818
819 if (plan.getIndicesTo().is_null()) {
820 const char isend_region[] = "Tpetra::Distributor::doPostSends MPI_Isend FAST";
821 const char send_region[] = "Tpetra::Distributor::doPostSends MPI_Send FAST";
822 ProfilingRegion pssf((sendType == Details::DISTRIBUTOR_ISEND) ? isend_region : send_region);
823
824 // Data are already blocked (laid out) by process, so we don't
825 // need a separate send buffer (besides the exports array).
826 for (size_t i = 0; i < numBlocks; ++i) {
827 size_t p = i + procIndex;
828 if (p > (numBlocks - 1)) {
829 p -= numBlocks;
830 }
831
832 if (plan.getProcsTo()[p] != myRank) {
833 if (exportLengths[p] == 0) {
834 // Do not attempt to send messages of length 0.
835 continue;
836 }
837
838 exports_view_type tmpSend = subview_offset(exports, exportStarts[p], exportLengths[p]);
839
840 if (sendType == Details::DISTRIBUTOR_ISEND) {
841 // NOTE: This looks very similar to the tmpSend above, but removing
842 // tmpSendBuf and uses tmpSend leads to a performance hit on Arm
843 // SerialNode builds
844 exports_view_type tmpSendBuf =
845 subview_offset(exports, exportStarts[p], exportLengths[p]);
846 requestsSend_.push_back(isend<int>(tmpSendBuf, plan.getProcsTo()[p],
847 mpiTag_, *plan.getComm()));
848 } else { // DISTRIBUTOR_SEND
849 send<int>(tmpSend,
850 as<int>(tmpSend.size()),
851 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
852 }
853 } else { // "Sending" the message to myself
854 selfNum = p;
855 }
856 }
857
858 if (plan.hasSelfMessage()) {
859 // This is how we "send a message to ourself": we copy from
860 // the export buffer to the import buffer. That saves
861 // Teuchos::Comm implementations other than MpiComm (in
862 // particular, SerialComm) the trouble of implementing self
863 // messages correctly. (To do this right, SerialComm would
864 // need internal buffer space for messages, keyed on the
865 // message's tag.)
866 deep_copy_offset(imports, exports, selfReceiveOffset,
867 exportStarts[selfNum], exportLengths[selfNum]);
868 }
869
870 } else { // data are not blocked by proc, use send buffer
871 ProfilingRegion psss("Tpetra::Distributor::doPostSends: MPI_Send SLOW");
872
873 using Packet = typename ExpView::non_const_value_type;
874 using Layout = typename ExpView::array_layout;
875 using Device = typename ExpView::device_type;
876 using Mem = typename ExpView::memory_traits;
877
878 // This buffer is long enough for only one message at a time.
879 // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
880 // of sendType requested by user.
881 // This code path formerly errored out with message:
882 // Tpetra::Distributor::doPosts(3 args):
883 // The "send buffer" code path
884 // doesn't currently work with nonblocking sends.
885 // Now, we opt to just do the communication in a way that works.
886#ifdef HAVE_TPETRA_DEBUG
887 if (sendType != Details::DISTRIBUTOR_SEND) {
888 if (plan.getComm()->getRank() == 0)
889 std::cout << "The requested Tpetra send type "
891 << " requires Distributor data to be ordered by"
892 << " the receiving processor rank. Since these"
893 << " data are not ordered, Tpetra will use Send"
894 << " instead." << std::endl;
895 }
896#endif
897
898 size_t maxSendLength = 0;
899 for (size_t i = 0; i < numBlocks; ++i) {
900 size_t p = i + procIndex;
901 if (p > (numBlocks - 1)) {
902 p -= numBlocks;
903 }
904
905 size_t sendArrayOffset = 0;
906 size_t j = plan.getStartsTo()[p];
907 for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
908 sendArrayOffset += exportLengths[j];
909 }
910 maxSendLength = std::max(maxSendLength, sendArrayOffset);
911 }
912 Kokkos::View<Packet*, Layout, Device, Mem> sendArray("sendArray", maxSendLength);
913
914 for (size_t i = 0; i < numBlocks; ++i) {
915 size_t p = i + procIndex;
916 if (p > (numBlocks - 1)) {
917 p -= numBlocks;
918 }
919
920 if (plan.getProcsTo()[p] != myRank) {
921 size_t sendArrayOffset = 0;
922 size_t j = plan.getStartsTo()[p];
923 for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
924 packOffset(sendArray, exports, sendArrayOffset, exportStarts[j], exportLengths[j]);
925 sendArrayOffset += exportLengths[j];
926 }
927 typename ExpView::execution_space().fence();
928
929 ImpView tmpSend =
930 subview_offset(sendArray, size_t(0), sendArrayOffset);
931
932 send<int>(tmpSend,
933 as<int>(tmpSend.size()),
934 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
935 } else { // "Sending" the message to myself
936 selfNum = p;
937 selfIndex = plan.getStartsTo()[p];
938 }
939 }
940
941 if (plan.hasSelfMessage()) {
942 for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
943 packOffset(imports, exports, selfReceiveOffset, exportStarts[selfIndex], exportLengths[selfIndex]);
944 selfReceiveOffset += exportLengths[selfIndex];
945 ++selfIndex;
946 }
947 }
948 }
949}
950
951} // namespace Tpetra::Details
952
953#endif
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex<float> and Kokkos::complex...
Declaration of Tpetra::Details::Profiling, a scope guard for Kokkos Profiling.
Stand-alone utility functions and macros.
Nonmember function that computes a residual Computes R = B - A * X.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.