Tpetra parallel linear algebra Version of the Day
Loading...
Searching...
No Matches
Tpetra_Details_DistributorActor.hpp
1// @HEADER
2// *****************************************************************************
3// Tpetra: Templated Linear Algebra Services Package
4//
5// Copyright 2008 NTESS and the Tpetra contributors.
6// SPDX-License-Identifier: BSD-3-Clause
7// *****************************************************************************
8// @HEADER
9
10#ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
11#define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
12
13#include <optional>
14#include <vector>
15
16#include "Teuchos_Assert.hpp"
18#include "Tpetra_Util.hpp"
19
20#include "Teuchos_Array.hpp"
21#include "Teuchos_Comm.hpp"
24#include "Teuchos_RCP.hpp"
25
26#include "Kokkos_TeuchosCommAdapters.hpp"
27
28#ifdef HAVE_TPETRA_MPI
29#include "mpi.h"
30#include "Tpetra_Details_Ialltofewv.hpp"
31#endif
32
33namespace Tpetra::Details {
34
35template <class View>
36constexpr bool isKokkosView = Kokkos::is_view<View>::value;
37
38template <class View1, class View2>
39constexpr bool areKokkosViews = Kokkos::is_view<View1>::value&& Kokkos::is_view<View2>::value;
40
41class DistributorActor {
42 using IndexView = DistributorPlan::IndexView;
43 using SubViewLimits = DistributorPlan::SubViewLimits;
44
45 public:
46 static constexpr int DEFAULT_MPI_TAG = 1;
47
48 DistributorActor();
49 DistributorActor(const DistributorActor& otherActor) = default;
50
51 template <class ExpView, class ImpView>
52 void doPostsAndWaits(const DistributorPlan& plan,
53 const ExpView& exports,
54 size_t numPackets,
55 const ImpView& imports);
56
57 template <class ExpView, class ImpView>
58 void doPostsAndWaits(const DistributorPlan& plan,
59 const ExpView& exports,
60 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
61 const ImpView& imports,
62 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
63
64 template <class ImpView>
65 void doPostRecvs(const DistributorPlan& plan,
66 size_t numPackets,
67 const ImpView& imports);
68
69 template <class ImpView>
70 void doPostRecvs(const DistributorPlan& plan,
71 const ImpView& imports,
72 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
73
74 template <class ExpView, class ImpView>
75 void doPostSends(const DistributorPlan& plan,
76 const ExpView& exports,
77 size_t numPackets,
78 const ImpView& imports);
79
80 template <class ExpView, class ImpView>
81 void doPostSends(const DistributorPlan& plan,
82 const ExpView& exports,
83 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
84 const ImpView& imports,
85 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
86
87 template <class ExpView, class ImpView>
88 void doPosts(const DistributorPlan& plan,
89 const ExpView& exports,
90 size_t numPackets,
91 const ImpView& imports);
92
93 template <class ExpView, class ImpView>
94 void doPosts(const DistributorPlan& plan,
95 const ExpView& exports,
96 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
97 const ImpView& imports,
98 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
99
100 void doWaits(const DistributorPlan& plan);
101
102 void doWaitsRecv(const DistributorPlan& plan);
103
104 void doWaitsSend(const DistributorPlan& plan);
105
106 void doWaitsIalltofewv(const DistributorPlan& plan);
107
108 bool isReady() const;
109
110 int getMpiTag() const { return mpiTag_; };
111
112 private:
113 template <class ImpView>
114 void doPostRecvsImpl(const DistributorPlan& plan,
115 const ImpView& imports,
116 const SubViewLimits& totalPacketsFrom);
117
118 template <class ExpView, class ImpView>
119 void doPostSendsImpl(const DistributorPlan& plan,
120 const ExpView& exports,
121 const SubViewLimits& exportSubViewLimits,
122 const ImpView& imports,
123 const SubViewLimits& importSubViewLimits);
124
125#ifdef HAVE_TPETRA_MPI
126 template <class ExpView, class ImpView>
127 void doPostsAllToAllImpl(const DistributorPlan& plan,
128 const ExpView& exports,
129 const SubViewLimits& exportSubViewLimits,
130 const ImpView& imports,
131 const SubViewLimits& importSubViewLimits);
132
133#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
134 template <class ExpView, class ImpView>
135 void doPostsNbrAllToAllVImpl(const DistributorPlan& plan,
136 const ExpView& exports,
137 const SubViewLimits& exportSubViewLimits,
138 const ImpView& imports,
139 const SubViewLimits& importSubViewLimits);
140#endif // HAVE_TPETRACORE_MPI_ADVANCE
141
142 template <typename ExpView, typename ImpView>
143 void doPostsIalltofewvImpl(const DistributorPlan& plan,
144 const ExpView& exports,
145 const SubViewLimits& exportSubViewLimits,
146 const ImpView& imports,
147 const SubViewLimits& importSubViewLimits);
148
149 // ialltofewv members
150 struct {
151 Details::Ialltofewv impl;
152 std::optional<Details::Ialltofewv::Req> req;
153 Teuchos::RCP<std::vector<int>> sendcounts;
154 Teuchos::RCP<std::vector<int>> sdispls;
155 Teuchos::RCP<std::vector<int>> recvcounts;
156 Teuchos::RCP<std::vector<int>> rdispls;
157 std::vector<int> roots;
158 } ialltofewv_;
159
160#endif // HAVE_TPETRA_MPI
161
162 int mpiTag_;
163
164 Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsRecv_;
165 Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsSend_;
166};
167
168template <class ExpView, class ImpView>
169void DistributorActor::doPosts(const DistributorPlan& plan,
170 const ExpView& exports,
171 size_t numPackets,
172 const ImpView& imports) {
173 doPostRecvs(plan, numPackets, imports);
174 doPostSends(plan, exports, numPackets, imports);
175}
176
177template <class ExpView, class ImpView>
178void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
179 const ExpView& exports,
180 size_t numPackets,
181 const ImpView& imports) {
182 static_assert(areKokkosViews<ExpView, ImpView>,
183 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
184 doPosts(plan, exports, numPackets, imports);
185 doWaits(plan);
186}
187
188template <class ExpView, class ImpView>
189void DistributorActor::doPosts(const DistributorPlan& plan,
190 const ExpView& exports,
191 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
192 const ImpView& imports,
193 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
194 doPostRecvs(plan, imports, numImportPacketsPerLID);
195 doPostSends(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
196}
197
198template <class ExpView, class ImpView>
199void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
200 const ExpView& exports,
201 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
202 const ImpView& imports,
203 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
204 static_assert(areKokkosViews<ExpView, ImpView>,
205 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
206 doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
207 doWaits(plan);
208}
209
210template <typename ViewType>
211using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
212
213template <typename DstViewType, typename SrcViewType>
214using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
215 HostAccessibility<SrcViewType>::accessible>;
216
217template <typename DstViewType, typename SrcViewType>
218using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
219 !HostAccessibility<SrcViewType>::accessible>;
220
221template <typename DstViewType, typename SrcViewType>
222enableIfHostAccessible<DstViewType, SrcViewType>
223packOffset(const DstViewType& dst,
224 const SrcViewType& src,
225 const size_t dst_offset,
226 const size_t src_offset,
227 const size_t size) {
228 memcpy((void*)(dst.data() + dst_offset), src.data() + src_offset, size * sizeof(typename DstViewType::value_type));
229}
230
231template <typename DstViewType, typename SrcViewType>
232enableIfNotHostAccessible<DstViewType, SrcViewType>
233packOffset(const DstViewType& dst,
234 const SrcViewType& src,
235 const size_t dst_offset,
236 const size_t src_offset,
237 const size_t size) {
238 Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
239}
240
241#ifdef HAVE_TPETRA_MPI
242
243template <class ExpView, class ImpView>
244void DistributorActor::doPostsIalltofewvImpl(const DistributorPlan& plan,
245 const ExpView& exports,
246 const SubViewLimits& exportSubViewLimits,
247 const ImpView& imports,
248 const SubViewLimits& importSubViewLimits) {
249 using size_type = Teuchos::Array<size_t>::size_type;
250 using ExportValue = typename ExpView::non_const_value_type;
251
252 ProfilingRegion pr("Tpetra::Distributor::doPostsIalltofewvImpl");
253
254 TEUCHOS_TEST_FOR_EXCEPTION(
255 !plan.getIndicesTo().is_null(), std::runtime_error,
256 "Send Type=\"Ialltofewv\" only works for fast-path communication.");
257
258 TEUCHOS_TEST_FOR_EXCEPTION(
259 bool(ialltofewv_.req), std::runtime_error,
260 "This actor has an active Ialltofewv already");
261
262 TEUCHOS_TEST_FOR_EXCEPTION(
263 bool(ialltofewv_.sendcounts), std::runtime_error,
264 "This actor has an active Ialltofewv already");
265
266 TEUCHOS_TEST_FOR_EXCEPTION(
267 bool(ialltofewv_.sdispls), std::runtime_error,
268 "This actor has an active Ialltofewv already");
269
270 TEUCHOS_TEST_FOR_EXCEPTION(
271 bool(ialltofewv_.recvcounts), std::runtime_error,
272 "This actor has an active Ialltofewv already");
273
274 TEUCHOS_TEST_FOR_EXCEPTION(
275 bool(ialltofewv_.rdispls), std::runtime_error,
276 "This actor has an active Ialltofewv already");
277
278 auto comm = plan.getComm();
279
280 const auto& [importStarts, importLengths] = importSubViewLimits;
281 const auto& [exportStarts, exportLengths] = exportSubViewLimits;
282
283 ialltofewv_.roots = plan.getRoots();
284 const int nroots = ialltofewv_.roots.size();
285 const int* roots = ialltofewv_.roots.data();
286 ialltofewv_.req = std::make_optional<Details::Ialltofewv::Req>();
287 ialltofewv_.sendcounts = Teuchos::rcp(new std::vector<int>(nroots));
288 ialltofewv_.sdispls = Teuchos::rcp(new std::vector<int>(nroots));
289 ialltofewv_.recvcounts = Teuchos::rcp(new std::vector<int>);
290 ialltofewv_.rdispls = Teuchos::rcp(new std::vector<int>);
291
292 for (int rootIdx = 0; rootIdx < nroots; ++rootIdx) {
293 const int root = roots[rootIdx];
294
295 // if we can't find the root proc index in our plan, it just means we send 0
296 // also make sure root only appears once in getProcsTo()
297 size_type rootProcIndex = plan.getProcsTo().size(); // sentinel value -> not found
298 for (size_type pi = 0; pi < plan.getProcsTo().size(); ++pi) {
299 if (plan.getProcsTo()[pi] == root) {
300 rootProcIndex = pi;
301 break;
302 }
303 }
304
305 // am I sending to root?
306 int sendcount = 0;
307 if (rootProcIndex != plan.getProcsTo().size()) {
308 sendcount = exportLengths[rootProcIndex];
309 }
310 (*ialltofewv_.sendcounts)[rootIdx] = sendcount;
311
312 int sdispl = 0;
313 if (0 != sendcount) {
314 sdispl = exportStarts[rootProcIndex];
315 }
316 (*ialltofewv_.sdispls)[rootIdx] = sdispl;
317
318 // If I happen to be this root, set up my receive metadata
319 if (comm->getRank() == root) {
320 // don't recv anything from anywhere by default
321 ialltofewv_.recvcounts->resize(comm->getSize());
322 std::fill(ialltofewv_.recvcounts->begin(), ialltofewv_.recvcounts->end(), 0);
323 ialltofewv_.rdispls->resize(comm->getSize());
324 std::fill(ialltofewv_.rdispls->begin(), ialltofewv_.rdispls->end(), 0);
325
326 const size_type actualNumReceives =
327 Teuchos::as<size_type>(plan.getNumReceives()) +
328 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
329
330 for (size_type i = 0; i < actualNumReceives; ++i) {
331 const int src = plan.getProcsFrom()[i];
332 (*ialltofewv_.rdispls)[src] = importStarts[i];
333 (*ialltofewv_.recvcounts)[src] = Teuchos::as<int>(importLengths[i]);
334 }
335 }
336
337 } // rootIdx
338
339 // TODO: do we need to pass ExportValue{} here?
340 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<ExportValue>::getType(ExportValue{});
341 // FIXME: is there a better way to do this?
342 Teuchos::RCP<const Teuchos::MpiComm<int>> tMpiComm =
343 Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
344 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> oMpiComm =
345 tMpiComm->getRawMpiComm();
346 MPI_Comm mpiComm = (*oMpiComm)();
347
348 // don't care about send-side accessibility because it's not accessed through kokkos
349 // rely on MPI to do the right thing
350 constexpr bool recvDevAccess = Kokkos::SpaceAccessibility<
351 Kokkos::DefaultExecutionSpace, typename ImpView::memory_space>::accessible;
352 constexpr bool sendDevAccess = Kokkos::SpaceAccessibility<
353 Kokkos::DefaultExecutionSpace, typename ExpView::memory_space>::accessible;
354 static_assert(recvDevAccess == sendDevAccess, "sending across host/device");
355
356 const int err = ialltofewv_.impl.post<recvDevAccess>(exports.data(), ialltofewv_.sendcounts->data(), ialltofewv_.sdispls->data(), rawType,
357 imports.data(), ialltofewv_.recvcounts->data(), ialltofewv_.rdispls->data(),
358 roots, nroots,
359 rawType,
360 mpiTag_, mpiComm, &(*ialltofewv_.req));
361
362 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
363 "ialltofewv failed with error \""
364 << Teuchos::mpiErrorCodeToString(err)
365 << "\".");
366}
367
368template <class ExpView, class ImpView>
369void DistributorActor::doPostsAllToAllImpl(const DistributorPlan& plan,
370 const ExpView& exports,
371 const SubViewLimits& exportSubViewLimits,
372 const ImpView& imports,
373 const SubViewLimits& importSubViewLimits) {
374 TEUCHOS_TEST_FOR_EXCEPTION(
375 !plan.getIndicesTo().is_null(), std::runtime_error,
376 "Send Type=\"Alltoall\" only works for fast-path communication.");
377
378 using size_type = Teuchos::Array<size_t>::size_type;
379
380 auto comm = plan.getComm();
381 std::vector<int> sendcounts(comm->getSize(), 0);
382 std::vector<int> sdispls(comm->getSize(), 0);
383 std::vector<int> recvcounts(comm->getSize(), 0);
384 std::vector<int> rdispls(comm->getSize(), 0);
385
386 auto& [importStarts, importLengths] = importSubViewLimits;
387 auto& [exportStarts, exportLengths] = exportSubViewLimits;
388
389 for (size_t pp = 0; pp < plan.getNumSends(); ++pp) {
390 sdispls[plan.getProcsTo()[pp]] = exportStarts[pp];
391 size_t numPackets = exportLengths[pp];
392 // numPackets is converted down to int, so make sure it can be represented
393 TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
394 "Tpetra::Distributor::doPostsAllToAll: "
395 "Send count for send "
396 << pp << " (" << numPackets
397 << ") is too large "
398 "to be represented as int.");
399 sendcounts[plan.getProcsTo()[pp]] = static_cast<int>(numPackets);
400 }
401
402 const size_type actualNumReceives =
403 Teuchos::as<size_type>(plan.getNumReceives()) +
404 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
405
406 for (size_type i = 0; i < actualNumReceives; ++i) {
407 rdispls[plan.getProcsFrom()[i]] = importStarts[i];
408 size_t totalPacketsFrom_i = importLengths[i];
409 // totalPacketsFrom_i is converted down to int, so make sure it can be
410 // represented
411 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
412 std::logic_error,
413 "Tpetra::Distributor::doPostsAllToAll: "
414 "Recv count for receive "
415 << i << " (" << totalPacketsFrom_i
416 << ") is too large "
417 "to be represented as int.");
418 recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(totalPacketsFrom_i);
419 }
420
421 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
422 Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
423 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
424 mpiComm->getRawMpiComm();
425 using T = typename ExpView::non_const_value_type;
426 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
427
428#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
429 if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
430 MPIX_Comm* mpixComm = *plan.getMPIXComm();
431 TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
432 "MPIX_Comm is null in doPostsAllToAll \""
433 << __FILE__ << ":" << __LINE__);
434
435 const int err = MPIX_Alltoallv(
436 exports.data(), sendcounts.data(), sdispls.data(), rawType,
437 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
438
439 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
440 "MPIX_Alltoallv failed with error \""
441 << Teuchos::mpiErrorCodeToString(err)
442 << "\".");
443
444 return;
445 }
446#endif // HAVE_TPETRACORE_MPI_ADVANCE
447
448 const int err = MPI_Alltoallv(
449 exports.data(), sendcounts.data(), sdispls.data(), rawType,
450 imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
451
452 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
453 "MPI_Alltoallv failed with error \""
454 << Teuchos::mpiErrorCodeToString(err)
455 << "\".");
456}
457
458#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
459template <class ExpView, class ImpView>
460void DistributorActor::doPostsNbrAllToAllVImpl(const DistributorPlan& plan,
461 const ExpView& exports,
462 const SubViewLimits& exportSubViewLimits,
463 const ImpView& imports,
464 const SubViewLimits& importSubViewLimits) {
465 TEUCHOS_TEST_FOR_EXCEPTION(
466 !plan.getIndicesTo().is_null(), std::runtime_error,
467 "Send Type=\"Alltoall\" only works for fast-path communication.");
468
469 const int myRank = plan.getComm()->getRank();
470 MPIX_Comm* mpixComm = *plan.getMPIXComm();
471 using size_type = Teuchos::Array<size_t>::size_type;
472
473 const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
474 const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
475 std::vector<int> sendcounts(numSends, 0);
476 std::vector<int> sdispls(numSends, 0);
477 std::vector<int> recvcounts(numRecvs, 0);
478 std::vector<int> rdispls(numRecvs, 0);
479
480 auto& [importStarts, importLengths] = importSubViewLimits;
481 auto& [exportStarts, exportLengths] = exportSubViewLimits;
482
483 for (size_t pp = 0; pp < numSends; ++pp) {
484 sdispls[pp] = exportStarts[pp];
485 size_t numPackets = exportLengths[pp];
486 // numPackets is converted down to int, so make sure it can be represented
487 TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
488 "Tpetra::Distributor::doPostsNbrAllToAllV: "
489 "Send count for send "
490 << pp << " (" << numPackets
491 << ") is too large "
492 "to be represented as int.");
493 sendcounts[pp] = static_cast<int>(numPackets);
494 }
495
496 for (size_type i = 0; i < numRecvs; ++i) {
497 rdispls[i] = importStarts[i];
498 size_t totalPacketsFrom_i = importLengths[i];
499 // totalPacketsFrom_i is converted down to int, so make sure it can be
500 // represented
501 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
502 std::logic_error,
503 "Tpetra::Distributor::doPostsNbrAllToAllV: "
504 "Recv count for receive "
505 << i << " (" << totalPacketsFrom_i
506 << ") is too large "
507 "to be represented as int.");
508 recvcounts[i] = static_cast<int>(totalPacketsFrom_i);
509 }
510
511 using T = typename ExpView::non_const_value_type;
512 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
513
514 MPIX_Info* xinfo;
515 MPIX_Topo* xtopo;
516 MPIX_Info_init(&xinfo);
517 MPIX_Topo_init(numRecvs, plan.getProcsFrom().data(), recvcounts.data(),
518 numSends, plan.getProcsTo().data(), sendcounts.data(), xinfo, &xtopo);
519 const int err = MPIX_Neighbor_alltoallv_topo(
520 exports.data(), sendcounts.data(), sdispls.data(), rawType,
521 imports.data(), recvcounts.data(), rdispls.data(), rawType, xtopo, mpixComm);
522 MPIX_Topo_free(&xtopo);
523 MPIX_Info_free(&xinfo);
524
525 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
526 "MPIX_Neighbor_alltoallv failed with error \""
527 << Teuchos::mpiErrorCodeToString(err)
528 << "\".");
529}
530#endif // HAVE_TPETRACORE_MPI_ADVANCE
531#endif // HAVE_TPETRA_MPI
532
533template <class ImpView>
534void DistributorActor::doPostRecvs(const DistributorPlan& plan,
535 size_t numPackets,
536 const ImpView& imports) {
537 auto importSubViewLimits = plan.getImportViewLimits(numPackets);
538 doPostRecvsImpl(plan, imports, importSubViewLimits);
539}
540
541template <class ImpView>
542void DistributorActor::doPostRecvs(const DistributorPlan& plan,
543 const ImpView& imports,
544 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
545 auto importSubViewLimits = plan.getImportViewLimits(numImportPacketsPerLID);
546 doPostRecvsImpl(plan, imports, importSubViewLimits);
547}
548
549template <class ImpView>
550void DistributorActor::doPostRecvsImpl(const DistributorPlan& plan,
551 const ImpView& imports,
552 const SubViewLimits& importSubViewLimits) {
553 static_assert(isKokkosView<ImpView>,
554 "Data arrays for DistributorActor::doPostRecvs must be Kokkos::Views");
555 using Kokkos::Compat::subview_offset;
556 using Teuchos::Array;
557 using Teuchos::as;
558 using Teuchos::ireceive;
559 using size_type = Array<size_t>::size_type;
560 using imports_view_type = ImpView;
561
562 // Set the MPI message tag for this round of communication.
563 // The same tag will be used for recvs and sends. For every round of communication,
564 // the tag gets incremented by one, until it eventually gets looped around back to a
565 // small value. This logic is implemented in Teuchos.
566 auto comm = plan.getComm();
567 {
568 auto non_const_comm = Teuchos::rcp_const_cast<Teuchos::Comm<int>>(comm);
569 mpiTag_ = non_const_comm->incrementTag();
570 }
571
572#ifdef KOKKOS_ENABLE_CUDA
573 static_assert(!std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
574 "Please do not use Tpetra::Distributor with UVM "
575 "allocations. See GitHub issue #1088.");
576#endif // KOKKOS_ENABLE_CUDA
577
578#ifdef KOKKOS_ENABLE_SYCL
579 static_assert(!std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
580 "Please do not use Tpetra::Distributor with SharedUSM "
581 "allocations. See GitHub issue #1088 (corresponding to CUDA).");
582#endif // KOKKOS_ENABLE_SYCL
583
584#if defined(HAVE_TPETRA_MPI)
585 // All-to-all communication layout is quite different from
586 // point-to-point, so we handle it separately.
587
588 // These send options require no matching receives, so we just return.
589 const Details::EDistributorSendType sendType = plan.getSendType();
590 if ((sendType == Details::DISTRIBUTOR_ALLTOALL) || (sendType == Details::DISTRIBUTOR_IALLTOFEWV)
591#ifdef HAVE_TPETRACORE_MPI_ADVANCE
592 || (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) || (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV)
593#endif
594 ) {
595 return;
596 }
597#endif // HAVE_TPETRA_MPI
598
599 ProfilingRegion pr("Tpetra::Distributor::doPostRecvs");
600
601 const int myProcID = plan.getComm()->getRank();
602
603 auto& [importStarts, importLengths] = importSubViewLimits;
604
605 // Distributor uses requestsRecv_.size() and requestsSend_.size()
606 // as the number of outstanding nonblocking message requests, so
607 // we resize to zero to maintain this invariant.
608 //
609 // getNumReceives() does _not_ include the self message, if there is
610 // one. Here, we do actually send a message to ourselves, so we
611 // include any self message in the "actual" number of receives to
612 // post.
613 //
614 // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
615 // doesn't (re)allocate its array of requests. That happens in
616 // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
617 // demand), or Resize_().
618 const size_type actualNumReceives = as<size_type>(plan.getNumReceives()) +
619 as<size_type>(plan.hasSelfMessage() ? 1 : 0);
620
621#ifdef HAVE_TPETRA_DEBUG
622 size_t totalNumImportPackets = 0;
623 for (size_t i = 0; i < Teuchos::as<size_t>(actualNumReceives); ++i) {
624 totalNumImportPackets += importLengths[i];
625 }
626 TEUCHOS_TEST_FOR_EXCEPTION(
627 imports.extent(0) < totalNumImportPackets, std::runtime_error,
628 "Tpetra::Distributor::doPostRecvs: The 'imports' array must have "
629 "enough entries to hold the expected number of import packets. "
630 "imports.extent(0) = "
631 << imports.extent(0) << " < "
632 "totalNumImportPackets = "
633 << totalNumImportPackets << ".");
634 TEUCHOS_TEST_FOR_EXCEPTION(!requestsRecv_.empty(), std::logic_error,
635 "Tpetra::Distributor::"
636 "doPostRecvs: Process "
637 << myProcID << ": requestsRecv_.size () = "
638 << requestsRecv_.size() << " != 0.");
639#endif // HAVE_TPETRA_DEBUG
640
641 requestsRecv_.resize(0);
642
643 // Post the nonblocking receives. It's common MPI wisdom to post
644 // receives before sends. In MPI terms, this means favoring
645 // adding to the "posted queue" (of receive requests) over adding
646 // to the "unexpected queue" (of arrived messages not yet matched
647 // with a receive).
648 {
649 ProfilingRegion prr("Tpetra::Distributor::doPostRecvs MPI_Irecv");
650
651 for (size_type i = 0; i < actualNumReceives; ++i) {
652 size_t totalPacketsFrom_i = importLengths[Teuchos::as<size_t>(i)];
653 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
654 std::logic_error,
655 "Tpetra::Distributor::doPostRecvs: "
656 "Recv count for receive "
657 << i << " (" << totalPacketsFrom_i << ") is too large "
658 "to be represented as int.");
659 if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
660 // If my process is receiving these packet(s) from another
661 // process (not a self-receive), and if there is at least
662 // one packet to receive:
663 //
664 // 1. Set up the persisting view (recvBuf) into the imports
665 // array, given the offset and size (total number of
666 // packets from process getProcsFrom()[i]).
667 // 2. Start the Irecv and save the resulting request.
668 imports_view_type recvBuf =
669 subview_offset(imports, importStarts[i], totalPacketsFrom_i);
670 requestsRecv_.push_back(ireceive<int>(recvBuf, plan.getProcsFrom()[i],
671 mpiTag_, *plan.getComm()));
672 }
673 }
674 }
675}
676
677template <class ExpView, class ImpView>
678void DistributorActor::doPostSends(const DistributorPlan& plan,
679 const ExpView& exports,
680 size_t numPackets,
681 const ImpView& imports) {
682 auto exportSubViewLimits = plan.getExportViewLimits(numPackets);
683 auto importSubViewLimits = plan.getImportViewLimits(numPackets);
684 doPostSendsImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
685}
686
687template <class ExpView, class ImpView>
688void DistributorActor::doPostSends(const DistributorPlan& plan,
689 const ExpView& exports,
690 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
691 const ImpView& imports,
692 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
693 auto exportSubViewLimits = plan.getExportViewLimits(numExportPacketsPerLID);
694 auto importSubViewLimits = plan.getImportViewLimits(numImportPacketsPerLID);
695 doPostSendsImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
696}
697
698template <class ExpView, class ImpView>
699void DistributorActor::doPostSendsImpl(const DistributorPlan& plan,
700 const ExpView& exports,
701 const SubViewLimits& exportSubViewLimits,
702 const ImpView& imports,
703 const SubViewLimits& importSubViewLimits) {
704 static_assert(areKokkosViews<ExpView, ImpView>,
705 "Data arrays for DistributorActor::doPostSends must be Kokkos::Views");
706 using Kokkos::Compat::deep_copy_offset;
707 using Kokkos::Compat::subview_offset;
708 using Teuchos::Array;
709 using Teuchos::as;
710 using Teuchos::isend;
711 using Teuchos::send;
712 using size_type = Array<size_t>::size_type;
713 using exports_view_type = ExpView;
714
715#ifdef KOKKOS_ENABLE_CUDA
716 static_assert(!std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
717 !std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
718 "Please do not use Tpetra::Distributor with UVM allocations. "
719 "See Trilinos GitHub issue #1088.");
720#endif // KOKKOS_ENABLE_CUDA
721
722#ifdef KOKKOS_ENABLE_SYCL
723 static_assert(!std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
724 !std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
725 "Please do not use Tpetra::Distributor with SharedUSM allocations. "
726 "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
727#endif // KOKKOS_ENABLE_SYCL
728
729 ProfilingRegion ps("Tpetra::Distributor::doPostSends");
730
731 const int myRank = plan.getComm()->getRank();
732 // Run-time configurable parameters that come from the input
733 // ParameterList set by setParameterList().
734 const Details::EDistributorSendType sendType = plan.getSendType();
735
736 auto& [exportStarts, exportLengths] = exportSubViewLimits;
737 auto& [importStarts, importLengths] = importSubViewLimits;
738
739#if defined(HAVE_TPETRA_MPI)
740 // All-to-all communication layout is quite different from
741 // point-to-point, so we handle it separately.
742
743 if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
744 doPostsAllToAllImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
745 return;
746 } else if (sendType == Details::DISTRIBUTOR_IALLTOFEWV) {
747 doPostsIalltofewvImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
748 return;
749 }
750#ifdef HAVE_TPETRACORE_MPI_ADVANCE
751 else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
752 doPostsAllToAllImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
753 return;
754 } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
755 doPostsNbrAllToAllVImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
756 return;
757 }
758#endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
759
760#else // HAVE_TPETRA_MPI
761 if (plan.hasSelfMessage()) {
762 // This is how we "send a message to ourself": we copy from
763 // the export buffer to the import buffer. That saves
764 // Teuchos::Comm implementations other than MpiComm (in
765 // particular, SerialComm) the trouble of implementing self
766 // messages correctly. (To do this right, SerialComm would
767 // need internal buffer space for messages, keyed on the
768 // message's tag.)
769 size_t selfReceiveOffset = 0;
770 deep_copy_offset(imports, exports, selfReceiveOffset,
771 exportStarts[0],
772 exportLengths[0]);
773 }
774 // should we just return here?
775 // likely not as comm could be a serial comm
776#endif // HAVE_TPETRA_MPI
777
778 size_t selfReceiveOffset = 0;
779
780#ifdef HAVE_TPETRA_DEBUG
781 TEUCHOS_TEST_FOR_EXCEPTION(requestsSend_.size() != 0,
782 std::logic_error,
783 "Tpetra::Distributor::doPostSends: Process "
784 << myRank << ": requestsSend_.size() = " << requestsSend_.size() << " != 0.");
785#endif // HAVE_TPETRA_DEBUG
786
787 // Distributor uses requestsRecv_.size() and requestsSend_.size()
788 // as the number of outstanding nonblocking message requests, so
789 // we resize to zero to maintain this invariant.
790 //
791 // getNumReceives() does _not_ include the self message, if there is
792 // one. Here, we do actually send a message to ourselves, so we
793 // include any self message in the "actual" number of receives to
794 // post.
795 //
796 // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
797 // doesn't (re)allocate its array of requests. That happens in
798 // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
799 // demand), or Resize_().
800 const size_type actualNumReceives = as<size_type>(plan.getNumReceives()) +
801 as<size_type>(plan.hasSelfMessage() ? 1 : 0);
802 requestsSend_.resize(0);
803
804 {
805 for (size_type i = 0; i < actualNumReceives; ++i) {
806 if (plan.getProcsFrom()[i] == myRank) { // Receiving from myself
807 selfReceiveOffset = importStarts[i]; // Remember the self-recv offset
808 }
809 }
810 }
811
812 ProfilingRegion pss("Tpetra::Distributor::doPostSends sends");
813
814 // setup scan through getProcsTo() list starting with higher numbered procs
815 // (should help balance message traffic)
816 //
817 // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
818 // It doesn't depend on the input at all.
819 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
820 size_t procIndex = 0;
821 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
822 ++procIndex;
823 }
824 if (procIndex == numBlocks) {
825 procIndex = 0;
826 }
827
828 size_t selfNum = 0;
829 size_t selfIndex = 0;
830
831 if (plan.getIndicesTo().is_null()) {
832 const char isend_region[] = "Tpetra::Distributor::doPostSends MPI_Isend FAST";
833 const char send_region[] = "Tpetra::Distributor::doPostSends MPI_Send FAST";
834 ProfilingRegion pssf((sendType == Details::DISTRIBUTOR_ISEND) ? isend_region : send_region);
835
836 // Data are already blocked (laid out) by process, so we don't
837 // need a separate send buffer (besides the exports array).
838 for (size_t i = 0; i < numBlocks; ++i) {
839 size_t p = i + procIndex;
840 if (p > (numBlocks - 1)) {
841 p -= numBlocks;
842 }
843
844 if (plan.getProcsTo()[p] != myRank) {
845 if (exportLengths[p] == 0) {
846 // Do not attempt to send messages of length 0.
847 continue;
848 }
849
850 exports_view_type tmpSend = subview_offset(exports, exportStarts[p], exportLengths[p]);
851
852 if (sendType == Details::DISTRIBUTOR_ISEND) {
853 // NOTE: This looks very similar to the tmpSend above, but removing
854 // tmpSendBuf and uses tmpSend leads to a performance hit on Arm
855 // SerialNode builds
856 exports_view_type tmpSendBuf =
857 subview_offset(exports, exportStarts[p], exportLengths[p]);
858 requestsSend_.push_back(isend<int>(tmpSendBuf, plan.getProcsTo()[p],
859 mpiTag_, *plan.getComm()));
860 } else { // DISTRIBUTOR_SEND
861 send<int>(tmpSend,
862 as<int>(tmpSend.size()),
863 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
864 }
865 } else { // "Sending" the message to myself
866 selfNum = p;
867 }
868 }
869
870 if (plan.hasSelfMessage()) {
871 // This is how we "send a message to ourself": we copy from
872 // the export buffer to the import buffer. That saves
873 // Teuchos::Comm implementations other than MpiComm (in
874 // particular, SerialComm) the trouble of implementing self
875 // messages correctly. (To do this right, SerialComm would
876 // need internal buffer space for messages, keyed on the
877 // message's tag.)
878 deep_copy_offset(imports, exports, selfReceiveOffset,
879 exportStarts[selfNum], exportLengths[selfNum]);
880 }
881
882 } else { // data are not blocked by proc, use send buffer
883 ProfilingRegion psss("Tpetra::Distributor::doPostSends: MPI_Send SLOW");
884
885 using Packet = typename ExpView::non_const_value_type;
886 using Layout = typename ExpView::array_layout;
887 using Device = typename ExpView::device_type;
888 using Mem = typename ExpView::memory_traits;
889
890 // This buffer is long enough for only one message at a time.
891 // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
892 // of sendType requested by user.
893 // This code path formerly errored out with message:
894 // Tpetra::Distributor::doPosts(3 args):
895 // The "send buffer" code path
896 // doesn't currently work with nonblocking sends.
897 // Now, we opt to just do the communication in a way that works.
898#ifdef HAVE_TPETRA_DEBUG
899 if (sendType != Details::DISTRIBUTOR_SEND) {
900 if (plan.getComm()->getRank() == 0)
901 std::cout << "The requested Tpetra send type "
903 << " requires Distributor data to be ordered by"
904 << " the receiving processor rank. Since these"
905 << " data are not ordered, Tpetra will use Send"
906 << " instead." << std::endl;
907 }
908#endif
909
910 size_t maxSendLength = 0;
911 for (size_t i = 0; i < numBlocks; ++i) {
912 size_t p = i + procIndex;
913 if (p > (numBlocks - 1)) {
914 p -= numBlocks;
915 }
916
917 size_t sendArrayOffset = 0;
918 size_t j = plan.getStartsTo()[p];
919 for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
920 sendArrayOffset += exportLengths[j];
921 }
922 maxSendLength = std::max(maxSendLength, sendArrayOffset);
923 }
924 Kokkos::View<Packet*, Layout, Device, Mem> sendArray("sendArray", maxSendLength);
925
926 for (size_t i = 0; i < numBlocks; ++i) {
927 size_t p = i + procIndex;
928 if (p > (numBlocks - 1)) {
929 p -= numBlocks;
930 }
931
932 if (plan.getProcsTo()[p] != myRank) {
933 size_t sendArrayOffset = 0;
934 size_t j = plan.getStartsTo()[p];
935 for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
936 packOffset(sendArray, exports, sendArrayOffset, exportStarts[j], exportLengths[j]);
937 sendArrayOffset += exportLengths[j];
938 }
939 typename ExpView::execution_space().fence();
940
941 ImpView tmpSend =
942 subview_offset(sendArray, size_t(0), sendArrayOffset);
943
944 send<int>(tmpSend,
945 as<int>(tmpSend.size()),
946 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
947 } else { // "Sending" the message to myself
948 selfNum = p;
949 selfIndex = plan.getStartsTo()[p];
950 }
951 }
952
953 if (plan.hasSelfMessage()) {
954 for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
955 packOffset(imports, exports, selfReceiveOffset, exportStarts[selfIndex], exportLengths[selfIndex]);
956 selfReceiveOffset += exportLengths[selfIndex];
957 ++selfIndex;
958 }
959 }
960 }
961}
962
963} // namespace Tpetra::Details
964
965#endif
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex<float> and Kokkos::complex...
Declaration of Tpetra::Details::Profiling, a scope guard for Kokkos Profiling.
Stand-alone utility functions and macros.
Nonmember function that computes a residual Computes R = B - A * X.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.