Tpetra parallel linear algebra Version of the Day
Loading...
Searching...
No Matches
Tpetra_Details_DistributorActor.hpp
1// @HEADER
2// *****************************************************************************
3// Tpetra: Templated Linear Algebra Services Package
4//
5// Copyright 2008 NTESS and the Tpetra contributors.
6// SPDX-License-Identifier: BSD-3-Clause
7// *****************************************************************************
8// @HEADER
9
10#ifndef TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
11#define TPETRA_DETAILS_DISTRIBUTOR_ACTOR_HPP
12
13#include <optional>
14#include <vector>
15
16#include "Teuchos_Assert.hpp"
18#include "Tpetra_Util.hpp"
19
20#include "Teuchos_Array.hpp"
21#include "Teuchos_Comm.hpp"
24#include "Teuchos_RCP.hpp"
25
26#include "Kokkos_TeuchosCommAdapters.hpp"
27
28#ifdef HAVE_TPETRA_MPI
29#include "mpi.h"
30#include "Tpetra_Details_Ialltofewv.hpp"
31#endif
32
33namespace Tpetra::Details {
34
35template <class View>
36constexpr bool isKokkosView = Kokkos::is_view<View>::value;
37
38template <class View1, class View2>
39constexpr bool areKokkosViews = Kokkos::is_view<View1>::value&& Kokkos::is_view<View2>::value;
40
41class DistributorActor {
42 using IndexView = DistributorPlan::IndexView;
43 using SubViewLimits = DistributorPlan::SubViewLimits;
44
45 public:
46 static constexpr int DEFAULT_MPI_TAG = 1;
47
48 DistributorActor();
49 DistributorActor(const DistributorActor& otherActor) = default;
50
51 template <class ExpView, class ImpView>
52 void doPostsAndWaits(const DistributorPlan& plan,
53 const ExpView& exports,
54 size_t numPackets,
55 const ImpView& imports);
56
57 template <class ExpView, class ImpView>
58 void doPostsAndWaits(const DistributorPlan& plan,
59 const ExpView& exports,
60 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
61 const ImpView& imports,
62 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
63
64 template <class ImpView>
65 void doPostRecvs(const DistributorPlan& plan,
66 size_t numPackets,
67 const ImpView& imports);
68
69 template <class ImpView>
70 void doPostRecvs(const DistributorPlan& plan,
71 const ImpView& imports,
72 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
73
74 template <class ExpView, class ImpView>
75 void doPostSends(const DistributorPlan& plan,
76 const ExpView& exports,
77 size_t numPackets,
78 const ImpView& imports);
79
80 template <class ExpView, class ImpView>
81 void doPostSends(const DistributorPlan& plan,
82 const ExpView& exports,
83 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
84 const ImpView& imports,
85 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
86
87 template <class ExpView, class ImpView>
88 void doPosts(const DistributorPlan& plan,
89 const ExpView& exports,
90 size_t numPackets,
91 const ImpView& imports);
92
93 template <class ExpView, class ImpView>
94 void doPosts(const DistributorPlan& plan,
95 const ExpView& exports,
96 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
97 const ImpView& imports,
98 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID);
99
100 void doWaits(const DistributorPlan& plan);
101
102 void doWaitsRecv(const DistributorPlan& plan);
103
104 void doWaitsSend(const DistributorPlan& plan);
105
106 void doWaitsIalltofewv(const DistributorPlan& plan);
107
108 bool isReady() const;
109
110 int getMpiTag() const { return mpiTag_; };
111
112 private:
113 template <class ImpView>
114 void doPostRecvsImpl(const DistributorPlan& plan,
115 const ImpView& imports,
116 const SubViewLimits& totalPacketsFrom);
117
118 template <class ExpView, class ImpView>
119 void doPostSendsImpl(const DistributorPlan& plan,
120 const ExpView& exports,
121 const SubViewLimits& exportSubViewLimits,
122 const ImpView& imports,
123 const SubViewLimits& importSubViewLimits);
124
125#ifdef HAVE_TPETRA_MPI
126 template <class ExpView, class ImpView>
127 void doPostsAllToAllImpl(const DistributorPlan& plan,
128 const ExpView& exports,
129 const SubViewLimits& exportSubViewLimits,
130 const ImpView& imports,
131 const SubViewLimits& importSubViewLimits);
132
133#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
134 template <class ExpView, class ImpView>
135 void doPostsNbrAllToAllVImpl(const DistributorPlan& plan,
136 const ExpView& exports,
137 const SubViewLimits& exportSubViewLimits,
138 const ImpView& imports,
139 const SubViewLimits& importSubViewLimits);
140#endif // HAVE_TPETRACORE_MPI_ADVANCE
141
142 template <typename ExpView, typename ImpView>
143 void doPostsIalltofewvImpl(const DistributorPlan& plan,
144 const ExpView& exports,
145 const SubViewLimits& exportSubViewLimits,
146 const ImpView& imports,
147 const SubViewLimits& importSubViewLimits);
148
149 // ialltofewv members
150 struct {
151 Details::Ialltofewv impl;
152 std::optional<Details::Ialltofewv::Req> req;
153 Teuchos::RCP<std::vector<int>> sendcounts;
154 Teuchos::RCP<std::vector<int>> sdispls;
155 Teuchos::RCP<std::vector<int>> recvcounts;
156 Teuchos::RCP<std::vector<int>> rdispls;
157 std::vector<int> roots;
158 } ialltofewv_;
159
160#endif // HAVE_TPETRA_MPI
161
162 int mpiTag_;
163 int ialltofewvRootTag_;
164
165 Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsRecv_;
166 Teuchos::Array<Teuchos::RCP<Teuchos::CommRequest<int>>> requestsSend_;
167};
168
169template <class ExpView, class ImpView>
170void DistributorActor::doPosts(const DistributorPlan& plan,
171 const ExpView& exports,
172 size_t numPackets,
173 const ImpView& imports) {
174 doPostRecvs(plan, numPackets, imports);
175 doPostSends(plan, exports, numPackets, imports);
176}
177
178template <class ExpView, class ImpView>
179void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
180 const ExpView& exports,
181 size_t numPackets,
182 const ImpView& imports) {
183 static_assert(areKokkosViews<ExpView, ImpView>,
184 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
185 doPosts(plan, exports, numPackets, imports);
186 doWaits(plan);
187}
188
189template <class ExpView, class ImpView>
190void DistributorActor::doPosts(const DistributorPlan& plan,
191 const ExpView& exports,
192 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
193 const ImpView& imports,
194 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
195 doPostRecvs(plan, imports, numImportPacketsPerLID);
196 doPostSends(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
197}
198
199template <class ExpView, class ImpView>
200void DistributorActor::doPostsAndWaits(const DistributorPlan& plan,
201 const ExpView& exports,
202 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
203 const ImpView& imports,
204 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
205 static_assert(areKokkosViews<ExpView, ImpView>,
206 "Data arrays for DistributorActor::doPostsAndWaits must be Kokkos::Views");
207 doPosts(plan, exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
208 doWaits(plan);
209}
210
211template <typename ViewType>
212using HostAccessibility = Kokkos::SpaceAccessibility<Kokkos::DefaultHostExecutionSpace, typename ViewType::memory_space>;
213
214template <typename DstViewType, typename SrcViewType>
215using enableIfHostAccessible = std::enable_if_t<HostAccessibility<DstViewType>::accessible &&
216 HostAccessibility<SrcViewType>::accessible>;
217
218template <typename DstViewType, typename SrcViewType>
219using enableIfNotHostAccessible = std::enable_if_t<!HostAccessibility<DstViewType>::accessible ||
220 !HostAccessibility<SrcViewType>::accessible>;
221
222template <typename DstViewType, typename SrcViewType>
223enableIfHostAccessible<DstViewType, SrcViewType>
224packOffset(const DstViewType& dst,
225 const SrcViewType& src,
226 const size_t dst_offset,
227 const size_t src_offset,
228 const size_t size) {
229 memcpy((void*)(dst.data() + dst_offset), src.data() + src_offset, size * sizeof(typename DstViewType::value_type));
230}
231
232template <typename DstViewType, typename SrcViewType>
233enableIfNotHostAccessible<DstViewType, SrcViewType>
234packOffset(const DstViewType& dst,
235 const SrcViewType& src,
236 const size_t dst_offset,
237 const size_t src_offset,
238 const size_t size) {
239 Kokkos::Compat::deep_copy_offset(dst, src, dst_offset, src_offset, size);
240}
241
242#ifdef HAVE_TPETRA_MPI
243
244template <class ExpView, class ImpView>
245void DistributorActor::doPostsIalltofewvImpl(const DistributorPlan& plan,
246 const ExpView& exports,
247 const SubViewLimits& exportSubViewLimits,
248 const ImpView& imports,
249 const SubViewLimits& importSubViewLimits) {
250 using size_type = Teuchos::Array<size_t>::size_type;
251 using ExportValue = typename ExpView::non_const_value_type;
252
253 ProfilingRegion pr("Tpetra::Distributor::doPostsIalltofewvImpl");
254
255 TEUCHOS_TEST_FOR_EXCEPTION(
256 !plan.getIndicesTo().is_null(), std::runtime_error,
257 "Send Type=\"Ialltofewv\" only works for fast-path communication.");
258
259 TEUCHOS_TEST_FOR_EXCEPTION(
260 bool(ialltofewv_.req), std::runtime_error,
261 "This actor has an active Ialltofewv already");
262
263 TEUCHOS_TEST_FOR_EXCEPTION(
264 bool(ialltofewv_.sendcounts), std::runtime_error,
265 "This actor has an active Ialltofewv already");
266
267 TEUCHOS_TEST_FOR_EXCEPTION(
268 bool(ialltofewv_.sdispls), std::runtime_error,
269 "This actor has an active Ialltofewv already");
270
271 TEUCHOS_TEST_FOR_EXCEPTION(
272 bool(ialltofewv_.recvcounts), std::runtime_error,
273 "This actor has an active Ialltofewv already");
274
275 TEUCHOS_TEST_FOR_EXCEPTION(
276 bool(ialltofewv_.rdispls), std::runtime_error,
277 "This actor has an active Ialltofewv already");
278
279 auto comm = plan.getComm();
280
281 const auto& [importStarts, importLengths] = importSubViewLimits;
282 const auto& [exportStarts, exportLengths] = exportSubViewLimits;
283
284 ialltofewv_.roots = plan.getRoots();
285 const int nroots = ialltofewv_.roots.size();
286 const int* roots = ialltofewv_.roots.data();
287 ialltofewv_.req = std::make_optional<Details::Ialltofewv::Req>();
288 ialltofewv_.sendcounts = Teuchos::rcp(new std::vector<int>(nroots));
289 ialltofewv_.sdispls = Teuchos::rcp(new std::vector<int>(nroots));
290 ialltofewv_.recvcounts = Teuchos::rcp(new std::vector<int>);
291 ialltofewv_.rdispls = Teuchos::rcp(new std::vector<int>);
292
293 for (int rootIdx = 0; rootIdx < nroots; ++rootIdx) {
294 const int root = roots[rootIdx];
295
296 // if we can't find the root proc index in our plan, it just means we send 0
297 // also make sure root only appears once in getProcsTo()
298 size_type rootProcIndex = plan.getProcsTo().size(); // sentinel value -> not found
299 for (size_type pi = 0; pi < plan.getProcsTo().size(); ++pi) {
300 if (plan.getProcsTo()[pi] == root) {
301 rootProcIndex = pi;
302 break;
303 }
304 }
305
306 // am I sending to root?
307 int sendcount = 0;
308 if (rootProcIndex != plan.getProcsTo().size()) {
309 sendcount = exportLengths[rootProcIndex];
310 }
311 (*ialltofewv_.sendcounts)[rootIdx] = sendcount;
312
313 int sdispl = 0;
314 if (0 != sendcount) {
315 sdispl = exportStarts[rootProcIndex];
316 }
317 (*ialltofewv_.sdispls)[rootIdx] = sdispl;
318
319 // If I happen to be this root, set up my receive metadata
320 if (comm->getRank() == root) {
321 // don't recv anything from anywhere by default
322 ialltofewv_.recvcounts->resize(comm->getSize());
323 std::fill(ialltofewv_.recvcounts->begin(), ialltofewv_.recvcounts->end(), 0);
324 ialltofewv_.rdispls->resize(comm->getSize());
325 std::fill(ialltofewv_.rdispls->begin(), ialltofewv_.rdispls->end(), 0);
326
327 const size_type actualNumReceives =
328 Teuchos::as<size_type>(plan.getNumReceives()) +
329 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
330
331 for (size_type i = 0; i < actualNumReceives; ++i) {
332 const int src = plan.getProcsFrom()[i];
333 (*ialltofewv_.rdispls)[src] = importStarts[i];
334 (*ialltofewv_.recvcounts)[src] = Teuchos::as<int>(importLengths[i]);
335 }
336 }
337
338 } // rootIdx
339
340 // TODO: do we need to pass ExportValue{} here?
341 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<ExportValue>::getType(ExportValue{});
342 // FIXME: is there a better way to do this?
343 Teuchos::RCP<const Teuchos::MpiComm<int>> tMpiComm =
344 Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
345 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> oMpiComm =
346 tMpiComm->getRawMpiComm();
347 MPI_Comm mpiComm = (*oMpiComm)();
348
349 // don't care about send-side accessibility because it's not accessed through kokkos
350 // rely on MPI to do the right thing
351 constexpr bool recvDevAccess = Kokkos::SpaceAccessibility<
352 Kokkos::DefaultExecutionSpace, typename ImpView::memory_space>::accessible;
353 constexpr bool sendDevAccess = Kokkos::SpaceAccessibility<
354 Kokkos::DefaultExecutionSpace, typename ExpView::memory_space>::accessible;
355 static_assert(recvDevAccess == sendDevAccess, "sending across host/device");
356
357 const int err = ialltofewv_.impl.post<recvDevAccess>(exports.data(), ialltofewv_.sendcounts->data(), ialltofewv_.sdispls->data(), rawType,
358 imports.data(), ialltofewv_.recvcounts->data(), ialltofewv_.rdispls->data(),
359 roots, nroots,
360 rawType,
361 mpiTag_, ialltofewvRootTag_, mpiComm, &(*ialltofewv_.req));
362
363 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
364 "ialltofewv failed with error \""
365 << Teuchos::mpiErrorCodeToString(err)
366 << "\".");
367}
368
369template <class ExpView, class ImpView>
370void DistributorActor::doPostsAllToAllImpl(const DistributorPlan& plan,
371 const ExpView& exports,
372 const SubViewLimits& exportSubViewLimits,
373 const ImpView& imports,
374 const SubViewLimits& importSubViewLimits) {
375 TEUCHOS_TEST_FOR_EXCEPTION(
376 !plan.getIndicesTo().is_null(), std::runtime_error,
377 "Send Type=\"Alltoall\" only works for fast-path communication.");
378
379 using size_type = Teuchos::Array<size_t>::size_type;
380
381 auto comm = plan.getComm();
382 std::vector<int> sendcounts(comm->getSize(), 0);
383 std::vector<int> sdispls(comm->getSize(), 0);
384 std::vector<int> recvcounts(comm->getSize(), 0);
385 std::vector<int> rdispls(comm->getSize(), 0);
386
387 auto& [importStarts, importLengths] = importSubViewLimits;
388 auto& [exportStarts, exportLengths] = exportSubViewLimits;
389
390 for (size_t pp = 0; pp < plan.getNumSends(); ++pp) {
391 sdispls[plan.getProcsTo()[pp]] = exportStarts[pp];
392 size_t numPackets = exportLengths[pp];
393 // numPackets is converted down to int, so make sure it can be represented
394 TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
395 "Tpetra::Distributor::doPostsAllToAll: "
396 "Send count for send "
397 << pp << " (" << numPackets
398 << ") is too large "
399 "to be represented as int.");
400 sendcounts[plan.getProcsTo()[pp]] = static_cast<int>(numPackets);
401 }
402
403 const size_type actualNumReceives =
404 Teuchos::as<size_type>(plan.getNumReceives()) +
405 Teuchos::as<size_type>(plan.hasSelfMessage() ? 1 : 0);
406
407 for (size_type i = 0; i < actualNumReceives; ++i) {
408 rdispls[plan.getProcsFrom()[i]] = importStarts[i];
409 size_t totalPacketsFrom_i = importLengths[i];
410 // totalPacketsFrom_i is converted down to int, so make sure it can be
411 // represented
412 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
413 std::logic_error,
414 "Tpetra::Distributor::doPostsAllToAll: "
415 "Recv count for receive "
416 << i << " (" << totalPacketsFrom_i
417 << ") is too large "
418 "to be represented as int.");
419 recvcounts[plan.getProcsFrom()[i]] = static_cast<int>(totalPacketsFrom_i);
420 }
421
422 Teuchos::RCP<const Teuchos::MpiComm<int>> mpiComm =
423 Teuchos::rcp_dynamic_cast<const Teuchos::MpiComm<int>>(comm);
424 Teuchos::RCP<const Teuchos::OpaqueWrapper<MPI_Comm>> rawComm =
425 mpiComm->getRawMpiComm();
426 using T = typename ExpView::non_const_value_type;
427 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
428
429#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
430 if (Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL == plan.getSendType()) {
431 MPIX_Comm* mpixComm = *plan.getMPIXComm();
432 TEUCHOS_TEST_FOR_EXCEPTION(!mpixComm, std::runtime_error,
433 "MPIX_Comm is null in doPostsAllToAll \""
434 << __FILE__ << ":" << __LINE__);
435
436 const int err = MPIX_Alltoallv(
437 exports.data(), sendcounts.data(), sdispls.data(), rawType,
438 imports.data(), recvcounts.data(), rdispls.data(), rawType, mpixComm);
439
440 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
441 "MPIX_Alltoallv failed with error \""
442 << Teuchos::mpiErrorCodeToString(err)
443 << "\".");
444
445 return;
446 }
447#endif // HAVE_TPETRACORE_MPI_ADVANCE
448
449 const int err = MPI_Alltoallv(
450 exports.data(), sendcounts.data(), sdispls.data(), rawType,
451 imports.data(), recvcounts.data(), rdispls.data(), rawType, (*rawComm)());
452
453 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
454 "MPI_Alltoallv failed with error \""
455 << Teuchos::mpiErrorCodeToString(err)
456 << "\".");
457}
458
459#if defined(HAVE_TPETRACORE_MPI_ADVANCE)
460template <class ExpView, class ImpView>
461void DistributorActor::doPostsNbrAllToAllVImpl(const DistributorPlan& plan,
462 const ExpView& exports,
463 const SubViewLimits& exportSubViewLimits,
464 const ImpView& imports,
465 const SubViewLimits& importSubViewLimits) {
466 TEUCHOS_TEST_FOR_EXCEPTION(
467 !plan.getIndicesTo().is_null(), std::runtime_error,
468 "Send Type=\"Alltoall\" only works for fast-path communication.");
469
470 const int myRank = plan.getComm()->getRank();
471 MPIX_Comm* mpixComm = *plan.getMPIXComm();
472 using size_type = Teuchos::Array<size_t>::size_type;
473
474 const size_t numSends = plan.getNumSends() + plan.hasSelfMessage();
475 const size_t numRecvs = plan.getNumReceives() + plan.hasSelfMessage();
476 std::vector<int> sendcounts(numSends, 0);
477 std::vector<int> sdispls(numSends, 0);
478 std::vector<int> recvcounts(numRecvs, 0);
479 std::vector<int> rdispls(numRecvs, 0);
480
481 auto& [importStarts, importLengths] = importSubViewLimits;
482 auto& [exportStarts, exportLengths] = exportSubViewLimits;
483
484 for (size_t pp = 0; pp < numSends; ++pp) {
485 sdispls[pp] = exportStarts[pp];
486 size_t numPackets = exportLengths[pp];
487 // numPackets is converted down to int, so make sure it can be represented
488 TEUCHOS_TEST_FOR_EXCEPTION(numPackets > size_t(INT_MAX), std::logic_error,
489 "Tpetra::Distributor::doPostsNbrAllToAllV: "
490 "Send count for send "
491 << pp << " (" << numPackets
492 << ") is too large "
493 "to be represented as int.");
494 sendcounts[pp] = static_cast<int>(numPackets);
495 }
496
497 for (size_type i = 0; i < numRecvs; ++i) {
498 rdispls[i] = importStarts[i];
499 size_t totalPacketsFrom_i = importLengths[i];
500 // totalPacketsFrom_i is converted down to int, so make sure it can be
501 // represented
502 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
503 std::logic_error,
504 "Tpetra::Distributor::doPostsNbrAllToAllV: "
505 "Recv count for receive "
506 << i << " (" << totalPacketsFrom_i
507 << ") is too large "
508 "to be represented as int.");
509 recvcounts[i] = static_cast<int>(totalPacketsFrom_i);
510 }
511
512 using T = typename ExpView::non_const_value_type;
513 MPI_Datatype rawType = ::Tpetra::Details::MpiTypeTraits<T>::getType(T());
514
515 MPIX_Info* xinfo;
516 MPIX_Topo* xtopo;
517 MPIX_Info_init(&xinfo);
518 MPIX_Topo_init(numRecvs, plan.getProcsFrom().data(), recvcounts.data(),
519 numSends, plan.getProcsTo().data(), sendcounts.data(), xinfo, &xtopo);
520 const int err = MPIX_Neighbor_alltoallv_topo(
521 exports.data(), sendcounts.data(), sdispls.data(), rawType,
522 imports.data(), recvcounts.data(), rdispls.data(), rawType, xtopo, mpixComm);
523 MPIX_Topo_free(&xtopo);
524 MPIX_Info_free(&xinfo);
525
526 TEUCHOS_TEST_FOR_EXCEPTION(err != MPI_SUCCESS, std::runtime_error,
527 "MPIX_Neighbor_alltoallv failed with error \""
528 << Teuchos::mpiErrorCodeToString(err)
529 << "\".");
530}
531#endif // HAVE_TPETRACORE_MPI_ADVANCE
532#endif // HAVE_TPETRA_MPI
533
534template <class ImpView>
535void DistributorActor::doPostRecvs(const DistributorPlan& plan,
536 size_t numPackets,
537 const ImpView& imports) {
538 auto importSubViewLimits = plan.getImportViewLimits(numPackets);
539 doPostRecvsImpl(plan, imports, importSubViewLimits);
540}
541
542template <class ImpView>
543void DistributorActor::doPostRecvs(const DistributorPlan& plan,
544 const ImpView& imports,
545 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
546 auto importSubViewLimits = plan.getImportViewLimits(numImportPacketsPerLID);
547 doPostRecvsImpl(plan, imports, importSubViewLimits);
548}
549
550template <class ImpView>
551void DistributorActor::doPostRecvsImpl(const DistributorPlan& plan,
552 const ImpView& imports,
553 const SubViewLimits& importSubViewLimits) {
554 static_assert(isKokkosView<ImpView>,
555 "Data arrays for DistributorActor::doPostRecvs must be Kokkos::Views");
556 using Kokkos::Compat::subview_offset;
557 using Teuchos::Array;
558 using Teuchos::as;
559 using Teuchos::ireceive;
560 using size_type = Array<size_t>::size_type;
561 using imports_view_type = ImpView;
562
563 // Set the MPI message tag for this round of communication.
564 // The same tag will be used for recvs and sends. For every round of communication,
565 // the tag gets incremented by one, until it eventually gets looped around back to a
566 // small value. This logic is implemented in Teuchos.
567 auto comm = plan.getComm();
568#if defined(HAVE_TPETRA_MPI)
569 const Details::EDistributorSendType sendType = plan.getSendType();
570#endif
571 {
572 auto non_const_comm = Teuchos::rcp_const_cast<Teuchos::Comm<int>>(comm);
573 mpiTag_ = non_const_comm->incrementTag();
574#if defined(HAVE_TPETRA_MPI)
575 // Ialltofewv uses a separate tag for its aggregator -> root phase.
576 ialltofewvRootTag_ = sendType == Details::DISTRIBUTOR_IALLTOFEWV
577 ? non_const_comm->incrementTag()
578 : mpiTag_;
579#endif
580 }
581
582#ifdef KOKKOS_ENABLE_CUDA
583 static_assert(!std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
584 "Please do not use Tpetra::Distributor with UVM "
585 "allocations. See GitHub issue #1088.");
586#endif // KOKKOS_ENABLE_CUDA
587
588#ifdef KOKKOS_ENABLE_SYCL
589 static_assert(!std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
590 "Please do not use Tpetra::Distributor with SharedUSM "
591 "allocations. See GitHub issue #1088 (corresponding to CUDA).");
592#endif // KOKKOS_ENABLE_SYCL
593
594#if defined(HAVE_TPETRA_MPI)
595 // All-to-all communication layout is quite different from
596 // point-to-point, so we handle it separately.
597
598 // These send options require no matching receives, so we just return.
599 if ((sendType == Details::DISTRIBUTOR_ALLTOALL) || (sendType == Details::DISTRIBUTOR_IALLTOFEWV)
600#ifdef HAVE_TPETRACORE_MPI_ADVANCE
601 || (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) || (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV)
602#endif
603 ) {
604 return;
605 }
606#endif // HAVE_TPETRA_MPI
607
608 ProfilingRegion pr("Tpetra::Distributor::doPostRecvs");
609
610 const int myProcID = plan.getComm()->getRank();
611
612 auto& [importStarts, importLengths] = importSubViewLimits;
613
614 // Distributor uses requestsRecv_.size() and requestsSend_.size()
615 // as the number of outstanding nonblocking message requests, so
616 // we resize to zero to maintain this invariant.
617 //
618 // getNumReceives() does _not_ include the self message, if there is
619 // one. Here, we do actually send a message to ourselves, so we
620 // include any self message in the "actual" number of receives to
621 // post.
622 //
623 // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
624 // doesn't (re)allocate its array of requests. That happens in
625 // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
626 // demand), or Resize_().
627 const size_type actualNumReceives = as<size_type>(plan.getNumReceives()) +
628 as<size_type>(plan.hasSelfMessage() ? 1 : 0);
629
630#ifdef HAVE_TPETRA_DEBUG
631 size_t totalNumImportPackets = 0;
632 for (size_t i = 0; i < Teuchos::as<size_t>(actualNumReceives); ++i) {
633 totalNumImportPackets += importLengths[i];
634 }
635 TEUCHOS_TEST_FOR_EXCEPTION(
636 imports.extent(0) < totalNumImportPackets, std::runtime_error,
637 "Tpetra::Distributor::doPostRecvs: The 'imports' array must have "
638 "enough entries to hold the expected number of import packets. "
639 "imports.extent(0) = "
640 << imports.extent(0) << " < "
641 "totalNumImportPackets = "
642 << totalNumImportPackets << ".");
643 TEUCHOS_TEST_FOR_EXCEPTION(!requestsRecv_.empty(), std::logic_error,
644 "Tpetra::Distributor::"
645 "doPostRecvs: Process "
646 << myProcID << ": requestsRecv_.size () = "
647 << requestsRecv_.size() << " != 0.");
648#endif // HAVE_TPETRA_DEBUG
649
650 requestsRecv_.resize(0);
651
652 // Post the nonblocking receives. It's common MPI wisdom to post
653 // receives before sends. In MPI terms, this means favoring
654 // adding to the "posted queue" (of receive requests) over adding
655 // to the "unexpected queue" (of arrived messages not yet matched
656 // with a receive).
657 {
658 ProfilingRegion prr("Tpetra::Distributor::doPostRecvs MPI_Irecv");
659
660 for (size_type i = 0; i < actualNumReceives; ++i) {
661 size_t totalPacketsFrom_i = importLengths[Teuchos::as<size_t>(i)];
662 TEUCHOS_TEST_FOR_EXCEPTION(totalPacketsFrom_i > size_t(INT_MAX),
663 std::logic_error,
664 "Tpetra::Distributor::doPostRecvs: "
665 "Recv count for receive "
666 << i << " (" << totalPacketsFrom_i << ") is too large "
667 "to be represented as int.");
668 if (plan.getProcsFrom()[i] != myProcID && totalPacketsFrom_i) {
669 // If my process is receiving these packet(s) from another
670 // process (not a self-receive), and if there is at least
671 // one packet to receive:
672 //
673 // 1. Set up the persisting view (recvBuf) into the imports
674 // array, given the offset and size (total number of
675 // packets from process getProcsFrom()[i]).
676 // 2. Start the Irecv and save the resulting request.
677 imports_view_type recvBuf =
678 subview_offset(imports, importStarts[i], totalPacketsFrom_i);
679 requestsRecv_.push_back(ireceive<int>(recvBuf, plan.getProcsFrom()[i],
680 mpiTag_, *plan.getComm()));
681 }
682 }
683 }
684}
685
686template <class ExpView, class ImpView>
687void DistributorActor::doPostSends(const DistributorPlan& plan,
688 const ExpView& exports,
689 size_t numPackets,
690 const ImpView& imports) {
691 auto exportSubViewLimits = plan.getExportViewLimits(numPackets);
692 auto importSubViewLimits = plan.getImportViewLimits(numPackets);
693 doPostSendsImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
694}
695
696template <class ExpView, class ImpView>
697void DistributorActor::doPostSends(const DistributorPlan& plan,
698 const ExpView& exports,
699 const Teuchos::ArrayView<const size_t>& numExportPacketsPerLID,
700 const ImpView& imports,
701 const Teuchos::ArrayView<const size_t>& numImportPacketsPerLID) {
702 auto exportSubViewLimits = plan.getExportViewLimits(numExportPacketsPerLID);
703 auto importSubViewLimits = plan.getImportViewLimits(numImportPacketsPerLID);
704 doPostSendsImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
705}
706
707template <class ExpView, class ImpView>
708void DistributorActor::doPostSendsImpl(const DistributorPlan& plan,
709 const ExpView& exports,
710 const SubViewLimits& exportSubViewLimits,
711 const ImpView& imports,
712 const SubViewLimits& importSubViewLimits) {
713 static_assert(areKokkosViews<ExpView, ImpView>,
714 "Data arrays for DistributorActor::doPostSends must be Kokkos::Views");
715 using Kokkos::Compat::deep_copy_offset;
716 using Kokkos::Compat::subview_offset;
717 using Teuchos::Array;
718 using Teuchos::as;
719 using Teuchos::isend;
720 using Teuchos::send;
721 using size_type = Array<size_t>::size_type;
722 using exports_view_type = ExpView;
723
724#ifdef KOKKOS_ENABLE_CUDA
725 static_assert(!std::is_same<typename ExpView::memory_space, Kokkos::CudaUVMSpace>::value &&
726 !std::is_same<typename ImpView::memory_space, Kokkos::CudaUVMSpace>::value,
727 "Please do not use Tpetra::Distributor with UVM allocations. "
728 "See Trilinos GitHub issue #1088.");
729#endif // KOKKOS_ENABLE_CUDA
730
731#ifdef KOKKOS_ENABLE_SYCL
732 static_assert(!std::is_same<typename ExpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value &&
733 !std::is_same<typename ImpView::memory_space, Kokkos::Experimental::SYCLSharedUSMSpace>::value,
734 "Please do not use Tpetra::Distributor with SharedUSM allocations. "
735 "See Trilinos GitHub issue #1088 (corresponding to CUDA).");
736#endif // KOKKOS_ENABLE_SYCL
737
738 ProfilingRegion ps("Tpetra::Distributor::doPostSends");
739
740 const int myRank = plan.getComm()->getRank();
741 // Run-time configurable parameters that come from the input
742 // ParameterList set by setParameterList().
743 const Details::EDistributorSendType sendType = plan.getSendType();
744
745 auto& [exportStarts, exportLengths] = exportSubViewLimits;
746 auto& [importStarts, importLengths] = importSubViewLimits;
747
748#if defined(HAVE_TPETRA_MPI)
749 // All-to-all communication layout is quite different from
750 // point-to-point, so we handle it separately.
751
752 if (sendType == Details::DISTRIBUTOR_ALLTOALL) {
753 doPostsAllToAllImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
754 return;
755 } else if (sendType == Details::DISTRIBUTOR_IALLTOFEWV) {
756 doPostsIalltofewvImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
757 return;
758 }
759#ifdef HAVE_TPETRACORE_MPI_ADVANCE
760 else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_ALLTOALL) {
761 doPostsAllToAllImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
762 return;
763 } else if (sendType == Details::DISTRIBUTOR_MPIADVANCE_NBRALLTOALLV) {
764 doPostsNbrAllToAllVImpl(plan, exports, exportSubViewLimits, imports, importSubViewLimits);
765 return;
766 }
767#endif // defined(HAVE_TPETRACORE_MPI_ADVANCE)
768
769#else // HAVE_TPETRA_MPI
770 if (plan.hasSelfMessage()) {
771 // This is how we "send a message to ourself": we copy from
772 // the export buffer to the import buffer. That saves
773 // Teuchos::Comm implementations other than MpiComm (in
774 // particular, SerialComm) the trouble of implementing self
775 // messages correctly. (To do this right, SerialComm would
776 // need internal buffer space for messages, keyed on the
777 // message's tag.)
778 size_t selfReceiveOffset = 0;
779 deep_copy_offset(imports, exports, selfReceiveOffset,
780 exportStarts[0],
781 exportLengths[0]);
782 }
783 // should we just return here?
784 // likely not as comm could be a serial comm
785#endif // HAVE_TPETRA_MPI
786
787 size_t selfReceiveOffset = 0;
788
789#ifdef HAVE_TPETRA_DEBUG
790 TEUCHOS_TEST_FOR_EXCEPTION(requestsSend_.size() != 0,
791 std::logic_error,
792 "Tpetra::Distributor::doPostSends: Process "
793 << myRank << ": requestsSend_.size() = " << requestsSend_.size() << " != 0.");
794#endif // HAVE_TPETRA_DEBUG
795
796 // Distributor uses requestsRecv_.size() and requestsSend_.size()
797 // as the number of outstanding nonblocking message requests, so
798 // we resize to zero to maintain this invariant.
799 //
800 // getNumReceives() does _not_ include the self message, if there is
801 // one. Here, we do actually send a message to ourselves, so we
802 // include any self message in the "actual" number of receives to
803 // post.
804 //
805 // NOTE (mfh 19 Mar 2012): Epetra_MpiDistributor::DoPosts()
806 // doesn't (re)allocate its array of requests. That happens in
807 // CreateFromSends(), ComputeRecvs_(), DoReversePosts() (on
808 // demand), or Resize_().
809 const size_type actualNumReceives = as<size_type>(plan.getNumReceives()) +
810 as<size_type>(plan.hasSelfMessage() ? 1 : 0);
811 requestsSend_.resize(0);
812
813 {
814 for (size_type i = 0; i < actualNumReceives; ++i) {
815 if (plan.getProcsFrom()[i] == myRank) { // Receiving from myself
816 selfReceiveOffset = importStarts[i]; // Remember the self-recv offset
817 }
818 }
819 }
820
821 ProfilingRegion pss("Tpetra::Distributor::doPostSends sends");
822
823 // setup scan through getProcsTo() list starting with higher numbered procs
824 // (should help balance message traffic)
825 //
826 // FIXME (mfh 20 Feb 2013) Why haven't we precomputed this?
827 // It doesn't depend on the input at all.
828 size_t numBlocks = plan.getNumSends() + plan.hasSelfMessage();
829 size_t procIndex = 0;
830 while ((procIndex < numBlocks) && (plan.getProcsTo()[procIndex] < myRank)) {
831 ++procIndex;
832 }
833 if (procIndex == numBlocks) {
834 procIndex = 0;
835 }
836
837 size_t selfNum = 0;
838 size_t selfIndex = 0;
839
840 if (plan.getIndicesTo().is_null()) {
841 const char isend_region[] = "Tpetra::Distributor::doPostSends MPI_Isend FAST";
842 const char send_region[] = "Tpetra::Distributor::doPostSends MPI_Send FAST";
843 ProfilingRegion pssf((sendType == Details::DISTRIBUTOR_ISEND) ? isend_region : send_region);
844
845 // Data are already blocked (laid out) by process, so we don't
846 // need a separate send buffer (besides the exports array).
847 for (size_t i = 0; i < numBlocks; ++i) {
848 size_t p = i + procIndex;
849 if (p > (numBlocks - 1)) {
850 p -= numBlocks;
851 }
852
853 if (plan.getProcsTo()[p] != myRank) {
854 if (exportLengths[p] == 0) {
855 // Do not attempt to send messages of length 0.
856 continue;
857 }
858
859 exports_view_type tmpSend = subview_offset(exports, exportStarts[p], exportLengths[p]);
860
861 if (sendType == Details::DISTRIBUTOR_ISEND) {
862 // NOTE: This looks very similar to the tmpSend above, but removing
863 // tmpSendBuf and uses tmpSend leads to a performance hit on Arm
864 // SerialNode builds
865 exports_view_type tmpSendBuf =
866 subview_offset(exports, exportStarts[p], exportLengths[p]);
867 requestsSend_.push_back(isend<int>(tmpSendBuf, plan.getProcsTo()[p],
868 mpiTag_, *plan.getComm()));
869 } else { // DISTRIBUTOR_SEND
870 send<int>(tmpSend,
871 as<int>(tmpSend.size()),
872 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
873 }
874 } else { // "Sending" the message to myself
875 selfNum = p;
876 }
877 }
878
879 if (plan.hasSelfMessage()) {
880 // This is how we "send a message to ourself": we copy from
881 // the export buffer to the import buffer. That saves
882 // Teuchos::Comm implementations other than MpiComm (in
883 // particular, SerialComm) the trouble of implementing self
884 // messages correctly. (To do this right, SerialComm would
885 // need internal buffer space for messages, keyed on the
886 // message's tag.)
887 deep_copy_offset(imports, exports, selfReceiveOffset,
888 exportStarts[selfNum], exportLengths[selfNum]);
889 }
890
891 } else { // data are not blocked by proc, use send buffer
892 ProfilingRegion psss("Tpetra::Distributor::doPostSends: MPI_Send SLOW");
893
894 using Packet = typename ExpView::non_const_value_type;
895 using Layout = typename ExpView::array_layout;
896 using Device = typename ExpView::device_type;
897 using Mem = typename ExpView::memory_traits;
898
899 // This buffer is long enough for only one message at a time.
900 // Thus, we use DISTRIBUTOR_SEND always in this case, regardless
901 // of sendType requested by user.
902 // This code path formerly errored out with message:
903 // Tpetra::Distributor::doPosts(3 args):
904 // The "send buffer" code path
905 // doesn't currently work with nonblocking sends.
906 // Now, we opt to just do the communication in a way that works.
907#ifdef HAVE_TPETRA_DEBUG
908 if (sendType != Details::DISTRIBUTOR_SEND) {
909 if (plan.getComm()->getRank() == 0)
910 std::cout << "The requested Tpetra send type "
912 << " requires Distributor data to be ordered by"
913 << " the receiving processor rank. Since these"
914 << " data are not ordered, Tpetra will use Send"
915 << " instead." << std::endl;
916 }
917#endif
918
919 size_t maxSendLength = 0;
920 for (size_t i = 0; i < numBlocks; ++i) {
921 size_t p = i + procIndex;
922 if (p > (numBlocks - 1)) {
923 p -= numBlocks;
924 }
925
926 size_t sendArrayOffset = 0;
927 size_t j = plan.getStartsTo()[p];
928 for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
929 sendArrayOffset += exportLengths[j];
930 }
931 maxSendLength = std::max(maxSendLength, sendArrayOffset);
932 }
933 Kokkos::View<Packet*, Layout, Device, Mem> sendArray("sendArray", maxSendLength);
934
935 for (size_t i = 0; i < numBlocks; ++i) {
936 size_t p = i + procIndex;
937 if (p > (numBlocks - 1)) {
938 p -= numBlocks;
939 }
940
941 if (plan.getProcsTo()[p] != myRank) {
942 size_t sendArrayOffset = 0;
943 size_t j = plan.getStartsTo()[p];
944 for (size_t k = 0; k < plan.getLengthsTo()[p]; ++k, ++j) {
945 packOffset(sendArray, exports, sendArrayOffset, exportStarts[j], exportLengths[j]);
946 sendArrayOffset += exportLengths[j];
947 }
948 typename ExpView::execution_space().fence();
949
950 ImpView tmpSend =
951 subview_offset(sendArray, size_t(0), sendArrayOffset);
952
953 send<int>(tmpSend,
954 as<int>(tmpSend.size()),
955 plan.getProcsTo()[p], mpiTag_, *plan.getComm());
956 } else { // "Sending" the message to myself
957 selfNum = p;
958 selfIndex = plan.getStartsTo()[p];
959 }
960 }
961
962 if (plan.hasSelfMessage()) {
963 for (size_t k = 0; k < plan.getLengthsTo()[selfNum]; ++k) {
964 packOffset(imports, exports, selfReceiveOffset, exportStarts[selfIndex], exportLengths[selfIndex]);
965 selfReceiveOffset += exportLengths[selfIndex];
966 ++selfIndex;
967 }
968 }
969 }
970}
971
972} // namespace Tpetra::Details
973
974#endif
Add specializations of Teuchos::Details::MpiTypeTraits for Kokkos::complex<float> and Kokkos::complex...
Declaration of Tpetra::Details::Profiling, a scope guard for Kokkos Profiling.
Stand-alone utility functions and macros.
Nonmember function that computes a residual Computes R = B - A * X.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.