Zoltan2
Loading...
Searching...
No Matches
Zoltan2_Directory_Comm.cpp
Go to the documentation of this file.
1// @HEADER
2// *****************************************************************************
3// Zoltan2: A package of combinatorial algorithms for scientific computing
4//
5// Copyright 2012 NTESS and the Zoltan2 contributors.
6// SPDX-License-Identifier: BSD-3-Clause
7// *****************************************************************************
8// @HEADER
9
11#include <stdexcept>
12#include <memory>
13
14namespace Zoltan2 {
15
18 for (int i = 0; i < from->nsends + from->self_msg; i++) {
19 total_recv_size += from->lengths_to[i];
20 }
21
22 max_send_size = 0;
23 for (int i = 0; i < from->nrecvs; i++) {
24 if (from->lengths_from[i] > max_send_size) {
25 max_send_size = from->lengths_from[i];
26 }
27 }
28
29 nvals = from->nvals_recv;
30 nvals_recv = from->nvals;
32 procs_to = from->procs_from;
34
35 starts_to = from->starts_from;
37 procs_from = from->procs_to;
39
40 starts_from = from->starts_to;
41 nrecvs = from->nsends;
42 nsends = from->nrecvs;
43 self_msg = from->self_msg;
44 comm = from->comm;
45}
46
47
48void Zoltan2_Directory_Plan::print(const std::string& headerMessage) const {
49
50 #define PRINT_VECTOR(v) \
51 if(v != Teuchos::null) { \
52 std::cout << " " << #v << " "; \
53 for(Teuchos::ArrayRCP<int>::size_type n = 0; n < v.size(); ++n) { \
54 std::cout << v[n] << " "; \
55 } \
56 std::cout << std::endl; \
57 }
58
59 #define PRINT_VAL(val) std::cout << " " << #val << ": " << val << std::endl;
60
61 for(int proc = 0; proc < comm->getSize(); ++proc) {
62 comm->barrier();
63 if(proc == comm->getRank()) {
64
65 std::cout << "Rank " << proc << " " << headerMessage << std::endl;
66
82
91 }
92 }
93 comm->barrier();
94}
95
97 int nvals, /* number of values I currently own */
98 const Teuchos::ArrayRCP<int> &assign, /* processor assignment for all my values */
99 Teuchos::RCP<const Teuchos::Comm<int> > comm, /* communicator */
100 int tag) : /* message tag I can use */
101 comm_(comm),
102 plan_forward(NULL)
103{
104 if (comm == Teuchos::null){
105 throw std::logic_error("Invalid communicator: MPI_COMM_NULL.");
106 }
107
108 int my_proc = comm->getRank(); /* my processor tag in communicator */
109 int nprocs = comm->getSize(); /* number of processors in communicator */
110
111 /* First check to see if items are grouped by processor with no gaps. */
112 /* If so, indices_to should be NULL (= identity) */
113
114 /* Make data structures that will allow me to traverse arrays quickly. */
115 /* Begin by determining number of objects I'm sending to each processor. */
116 Teuchos::ArrayRCP<int> starts(new int[nprocs + 1], 0, nprocs + 1, true);
117 for(int n = 0; n < starts.size(); ++n) {
118 starts[n] = 0;
119 }
120
121 /* Note: Negative assign value means ignore item. */
122 /* Non-trailing negatives mean data not packed so need send_buf. */
123 /* Could (but don't) allow negatives between processor blocks w/o buf. */
124 int nactive = 0; /* number of values to remap */
125
126 int no_send_buff = 1; /* is data nicely grouped by processor? */
127
128 int prev_proc = nprocs; /* processor on previous loop pass */
129
130 for (int i = 0; i < nvals; i++) {
131 int proc = assign[i];
132 if (no_send_buff && proc != prev_proc) { /* Checks if blocked by proc */
133 if (proc >= 0 && (starts[proc] || prev_proc < 0)) {
134 no_send_buff = 0;
135 }
136 else {
137 prev_proc = proc;
138 }
139 }
140 if (proc >= 0) {
141 ++starts[proc];
142 ++nactive;
143 }
144 }
145
146 int self_msg = (starts[my_proc] != 0); /* do I have data for myself? */
147
148 Teuchos::ArrayRCP<int> lengths_to; /* lengths I'll send to */
149 Teuchos::ArrayRCP<int> procs_to; /* processors I'll send to */
150 Teuchos::ArrayRCP<int> starts_to; /* where in list my sends begin */
151 Teuchos::ArrayRCP<int> indices_to; /* local_id values I'll be sending */
152
153 int max_send_size = 0; /* size of longest message I send */
154 int nsends = 0; /* # procs I'll send to (including self) */
155 int nrecvs = 0; /* # procs I'll recv from (including self) */
156
157 if (no_send_buff) {
158 /* Grouped by processor. Array indices_to can be NULL (= identity) */
159 nsends = 0;
160 for (int i = 0; i < nprocs; i++) {
161 if (starts[i] != 0) ++nsends;
162 }
163
164 lengths_to.resize(nsends);
165 starts_to.resize(nsends);
166 procs_to.resize(nsends);
167
168 int index = 0; /* index into list of objects */
169 /* Note that procs_to is in the order the data was passed in. */
170 for (int i = 0; i < nsends; i++) {
171 starts_to[i] = index;
172 int proc = assign[index];
173 procs_to[i] = proc;
174 index += starts[proc];
175 }
176
177 /* Now sort the outgoing procs. */
178 /* This keeps recvs deterministic if I ever invert communication */
179 /* It also allows for better balance of traffic in comm_do */
180 sort_ints(procs_to, starts_to);
181
182 max_send_size = 0;
183 for (int i = 0; i < nsends; i++) {
184 int proc = procs_to[i];
185 lengths_to[i] = starts[proc];
186 if (proc != my_proc && lengths_to[i] > max_send_size) {
187 max_send_size = lengths_to[i];
188 }
189 }
190 }
191 else { /* Not grouped by processor. More complex data structures. */
192 /* Sum starts values to be offsets into indices_to array. */
193 nsends = (starts[0] != 0);
194 for (int i = 1; i < nprocs; i++) {
195 if (starts[i] != 0)
196 ++nsends;
197 starts[i] += starts[i - 1];
198 }
199
200 for (int i = nprocs - 1; i; i--) {
201 starts[i] = starts[i - 1];
202 }
203
204 starts[0] = 0;
205
206 indices_to = (nactive > 0) ?
207 Teuchos::arcp(new int[nactive], 0, nactive, true) : Teuchos::null;
208
209 for (int i = 0; i < nvals; i++) {
210 int proc = assign[i];
211 if (proc >= 0) {
212 indices_to[starts[proc]] = i;
213 ++starts[proc];
214 }
215 }
216
217 /* Indices_to array now has the data in clumps for each processor. */
218 /* Now reconstruct starts array to index into indices_to. */
219 for (int i = nprocs - 1; i; i--) {
220 starts[i] = starts[i - 1];
221 }
222 starts[0] = 0;
223 starts[nprocs] = nactive;
224
225 /* Construct lengths_to, starts_to and procs_to arrays. */
226 /* Note: If indices_to is needed, procs are in increasing order */
227 lengths_to.resize(nsends);
228 starts_to.resize(nsends);
229 procs_to.resize(nsends);
230
231 int j = 0;
232 max_send_size = 0;
233 for (int i = 0; i < nprocs; i++) {
234 if (starts[i + 1] != starts[i]) {
235 starts_to[j] = starts[i];
236 lengths_to[j] = starts[i + 1] - starts[i];
237 if (i != my_proc && lengths_to[j] > max_send_size) {
238 max_send_size = lengths_to[j];
239 }
240 procs_to[j] = i;
241 j++;
242 }
243 }
244 }
245
246 /* Now change nsends to count only non-self messages */
247 nsends -= self_msg;
248
249 /* Determine how many messages & what length I'll receive. */
250 Teuchos::ArrayRCP<int> lengths_from; /* lengths of messages I'll receive */
251 Teuchos::ArrayRCP<int> procs_from; /* processors I'll receive from */
252 int out_of_mem = 0; // TODO refactor this bit
253
254 int comm_flag = invert_map(lengths_to, procs_to, nsends, self_msg,
255 lengths_from, procs_from, &nrecvs, my_proc, nprocs,
256 out_of_mem, tag, comm);
257
258 /* pointers for where to put recv data */
259 Teuchos::ArrayRCP<int> starts_from(
260 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
261 int j = 0;
262 for (int i = 0; i < nrecvs + self_msg; i++) {
263 starts_from[i] = j;
264 j += lengths_from[i];
265 }
266
267 if (comm_flag != 0) {
268 throw std::logic_error("Failed to construct Zoltan2_Directory_Comm");
269 }
270
271 int total_recv_size = 0; /* total size of messages I recv */
272 for (int i = 0; i < nrecvs + self_msg; i++) {
273 total_recv_size += lengths_from[i];
274 }
275
276 plan_forward = new Zoltan2_Directory_Plan;
277 plan_forward->lengths_to = lengths_to;
278 plan_forward->starts_to = starts_to;
279 plan_forward->procs_to = procs_to;
280 plan_forward->indices_to = indices_to;
281 plan_forward->lengths_from = lengths_from;
282 plan_forward->starts_from = starts_from;
283 plan_forward->procs_from = procs_from;
284 plan_forward->nvals = nvals;
285 plan_forward->nvals_recv = total_recv_size;
286 plan_forward->nrecvs = nrecvs;
287 plan_forward->nsends = nsends;
288 plan_forward->self_msg = self_msg;
289 plan_forward->max_send_size = max_send_size;
290 plan_forward->total_recv_size = total_recv_size;
291 plan_forward->maxed_recvs = 0;
292 plan_forward->comm = comm;
293
294 if (MPI_RECV_LIMIT > 0) {
295
296 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (1)"); // needs unit testing
297
298 /* If we have a limit to the number of posted receives we are allowed,
299 ** and our plan has exceeded that, then switch to an MPI_Alltoallv so
300 ** that we will have fewer receives posted when we do the communication.
301 */
302 int global_nrecvs;
303 Teuchos::reduceAll(*comm, Teuchos::REDUCE_MAX, 1, &nrecvs, &global_nrecvs);
304 if (global_nrecvs > MPI_RECV_LIMIT){
305 plan_forward->maxed_recvs = 1;
306 }
307 }
308
309 if (plan_forward->maxed_recvs == 0) {
310 // See notes in header for MPI_Request
311 plan_forward->request.resize(plan_forward->nrecvs);
312 }
313
314 nrec = total_recv_size;
315}
316
318{
319 delete plan_forward;
320}
321
322int Zoltan2_Directory_Comm::invert_map(
323 const Teuchos::ArrayRCP<int> &lengths_to, /* number of items I'm sending */
324 const Teuchos::ArrayRCP<int> &procs_to, /* procs I send to */
325 int nsends, /* number of messages I'll send */
326 int self_msg, /* do I copy data to myself? */
327 Teuchos::ArrayRCP<int> &lengths_from, /* number of items I'm receiving */
328 Teuchos::ArrayRCP<int> &procs_from, /* procs I recv lengths from */
329 int *pnrecvs, /* number of messages I receive */
330 int my_proc, /* my processor number */
331 int nprocs, /* total number of processors */
332 int /* out_of_mem */, /* tell everyone I'm out of memory? */
333 int tag, /* message tag I can use */
334 Teuchos::RCP<const Teuchos::Comm<int> > comm) /* communicator */
335{
336 Teuchos::ArrayRCP<int> msg_count(new int[nprocs], 0, nprocs, true);
337 Teuchos::ArrayRCP<int> counts(new int[nprocs], 0, nprocs, true);
338 for(int i = 0; i < nprocs; ++i) {
339 msg_count[i] = 0;
340 counts[i] = 1;
341 }
342
343 for (int i = 0; i < nsends + self_msg; i++) {
344 msg_count[procs_to[i]] = 1;
345 }
346
347 /*
348 * KDDKDD: Replaced MPI_Reduce_scatter with MPI_Reduce and MPI_Scatter
349 * KDDKDD: to avoid reported problems with MPICH 1.5.2.1.
350 * KDDKDD: Some sort of MPI_TYPE_INDEXED error.
351 * KDDKDD: Bug fix suggested by Clark Dohrmann and Rob Hoekstra.
352 * KDDKDD: July 20, 2004
353
354 MPI_Reduce_scatter((void *) msg_count, (void *) &nrecvs, counts, MPI_INT,
355 MPI_SUM, comm);
356 */
357 Teuchos::reduceAll<int>(*comm, Teuchos::REDUCE_SUM, nprocs,
358 msg_count.getRawPtr(), counts.getRawPtr());
359
360 int nrecvs = 0; /* number of messages I'll receive */
361
362 Teuchos::scatter<int, int>(&(counts[0]), 1, &nrecvs, 1, 0, *comm);
363
364 int max_nrecvs = 0;
365 if (my_proc == 0) {
366 for (int i=0; i < nprocs; i++) {
367 if (counts[i] > max_nrecvs) {
368 max_nrecvs = counts[i];
369 }
370 }
371 }
372
373 Teuchos::broadcast(*comm, 0, &max_nrecvs);
374
375 if(nrecvs > 0) {
376 lengths_from.resize(nrecvs); /* number of items I'm receiving */
377 procs_from.resize(nrecvs); /* processors I'll receive from */
378 }
379
380 if (MPI_RECV_LIMIT == 0 || max_nrecvs <= MPI_RECV_LIMIT) {
381 // See notes in header for MPI_Request
382 Teuchos::ArrayRCP<Teuchos::RCP<Teuchos::CommRequest<int> > > requests(nrecvs);
383
384 /* Note: I'm counting on having a unique tag or some of my incoming */
385 /* messages might get confused with others. */
386 for (int i=0; i < nrecvs; i++) {
387#ifdef HAVE_MPI // Teuchos::ireceive not implemented for Serial - Serial is just for debugging
388 Teuchos::ArrayRCP<int> single_elem(&lengths_from[i], 0, 1, false);
389 requests[i] = Teuchos::ireceive(single_elem, MPI_ANY_SOURCE, tag, *comm);
390#endif
391 }
392
393 for (int i=0; i < nsends+self_msg; i++) {
394#ifdef HAVE_MPI // Teuchos::send not implemented for Serial - Serial is just for debugging
395 Teuchos::send(&lengths_to[i], 1, procs_to[i], tag, *comm);
396#endif
397 }
398
399 for (int i=0; i < nrecvs; i++) {
400#ifdef HAVE_MPI
401 procs_from[i] = requests[i]->wait()->getSourceRank();
402#else
403 // above Teuchos MPI calls not supported for Serial so manually do the transfer.
404 // We don't really need Serial for this class but it helps with debugging to have a serial test that can run.
405 lengths_from[i] = lengths_to[i];
406#endif
407 }
408
409 }
410 else { /* some large HPC machines have a limit on number of posted receives */
411 Teuchos::ArrayRCP<int> sendbuf(new int[nprocs], 0, nprocs, true);
412 Teuchos::ArrayRCP<int> recvbuf(new int[nprocs], 0, nprocs, true);
413
414 for (int i=0; i < nsends + self_msg; i++) {
415 sendbuf[procs_to[i]] = lengths_to[i];
416 }
417
418 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (2)"); // needs unit testing
419 // Did not refactor this - need Teuchos form but this is not tested code.
420 // MPI_Alltoall(&(sendbuf[0]), 1, MPI_INT, &(recvbuf[0]), 1, MPI_INT, getRawComm());
421
422 for (int i=0, j=0; i < nprocs; i++) {
423 if (recvbuf[i] > 0){
424 lengths_from[j] = recvbuf[i];
425 procs_from[j] = i;
426 if (++j == nrecvs) {
427 break;
428 }
429 }
430 }
431 }
432
433 /* Sort recv lists to keep execution deterministic (e.g. for debugging) */
434 sort_ints(procs_from, lengths_from);
435
436 *pnrecvs = nrecvs - self_msg; /* Only return number of true messages */
437
438 return 0;
439}
440
441int Zoltan2_Directory_Comm::sort_ints(
442 Teuchos::ArrayRCP<int> &vals_sort, /* values to be sorted */
443 Teuchos::ArrayRCP<int> &vals_other) /* other array to be reordered w/ sort */
444{
445 // TODO: Check - perhaps we can skip all of these for efficiency
446 if (vals_sort == Teuchos::null || vals_sort.size() == 0) {
447 return 1;
448 }
449 if (vals_other == Teuchos::null || vals_other.size() == 0) {
450 return 1;
451 }
452 if (vals_sort == Teuchos::null || vals_sort.size() == 1) {
453 return 0; /* fastest way to sort 1 item is to return */
454 }
455
456 /* find largest value (sort sometimes used for non processor lists) */
457 int already_sorted = 1; /* flag indicating whether vals_sort is
458 already sorted; can exit early and skip
459 memory allocations if it is. */
460 int top = vals_sort[0]; /* largest integer to sort, smallest is assumed 0 */
461 for (Teuchos::ArrayRCP<int>::size_type i = 1; i < vals_sort.size(); i++) {
462 if (vals_sort[i-1] > vals_sort[i]) {
463 already_sorted = 0;
464 }
465 if (top < vals_sort[i]) {
466 top = vals_sort[i];
467 }
468 }
469
470 if (already_sorted) {
471 return 0;
472 }
473
474 Teuchos::ArrayRCP<int> store(new int[top+2], 0, top+2, true);
475 for(int n = 0; n < store.size(); ++n) {
476 store[n] = 0;
477 }
478
479 Teuchos::ArrayRCP<int> copy_sort(new int[vals_sort.size()], 0, vals_sort.size(), true);
480 for(Teuchos::ArrayRCP<int>::size_type n = 0; n < copy_sort.size(); ++n) {
481 copy_sort[n] = vals_sort[n]; // TODO - use deepCopy method?
482 }
483
484 Teuchos::ArrayRCP<int> copy_other(new int[vals_other.size()], 0, vals_other.size(), true);
485 for(Teuchos::ArrayRCP<int>::size_type n = 0; n < copy_other.size(); ++n) {
486 copy_other[n] = vals_other[n]; // TODO - use deepCopy method?
487 }
488
489 // TODO: May want to modernize this ptr handling - however I didn't want
490 // to introduce inefficiencies so for now have kept the original structure
491 int *p = &(store[1]);
492 for (Teuchos::ArrayRCP<int>::size_type i = 0; i < vals_sort.size(); i++) {
493 p[copy_sort[i]]++; /* count number of occurances */
494 }
495
496 for (int i = 1; i < top+1; i++) {
497 p[i] += p[i-1]; /* compute partial sums */
498 }
499 /* assert: p[top] = nvals */
500 p = &(store[0]); /* effectively shifts down by one */
501 for (Teuchos::ArrayRCP<int>::size_type i = 0; i < vals_sort.size(); i++) {
502 vals_sort[p[copy_sort[i]]] = copy_sort[i];
503 vals_other[p[copy_sort[i]]] = copy_other[i];
504 ++p[copy_sort[i]];
505 }
506
507 return 0;
508}
509
511 int tag, /* message tag for communicating */
512 const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
513 int nbytes, /* msg size */
514 Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
515{
516 int status = 0;
517
518 if (!plan_forward->maxed_recvs) {
519 status = do_post (plan_forward, tag, send_data, nbytes, recv_data);
520 if (status == 0) {
521 status = do_wait (plan_forward, tag, send_data, nbytes, recv_data);
522 }
523 }
524 else {
525 status = do_all_to_all(plan_forward, send_data, nbytes, recv_data);
526 }
527
528 return status;
529}
530
531int Zoltan2_Directory_Comm::do_post(
532 Zoltan2_Directory_Plan *plan, /* communication data structure */
533 int tag, /* message tag for communicating */
534 const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
535 int nbytes, /* msg size */
536 Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
537{
538 /* Check input parameters */
539 if (!plan) {
540 throw std::logic_error("Communication plan = NULL");
541 }
542
543 /* If not point to point, currently we do synchroneous communications */
544 if (plan->maxed_recvs) {
545 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (3)"); // needs unit testing
546 return do_all_to_all(plan, send_data, nbytes, recv_data);
547 }
548
549 int my_proc = plan->comm->getRank(); /* processor ID */
550 if ((plan->nsends + plan->self_msg) && !send_data.size()) {
551 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (4)"); // needs unit testing
552 size_t sum = 0;
553 if (plan->sizes_to.size()) { /* Not an error if all sizes_to == 0 */
554 for (int i = 0; i < (plan->nsends + plan->self_msg); i++) {
555 sum += plan->sizes_to[i];
556 }
557 }
558 if (!plan->sizes_to.size() || (plan->sizes_to.size() && sum)) {
559 throw std::logic_error("nsends not zero, but send_data = NULL");
560 }
561 }
562 if ((plan->nrecvs + plan->self_msg) && recv_data == Teuchos::null) {
563 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (5)"); // needs unit testing
564 size_t sum = 0;
565 if (plan->sizes_from != Teuchos::null) /* Not an error if all sizes_from == 0 */
566 for (int i = 0; i < (plan->nrecvs + plan->self_msg); i++)
567 sum += plan->sizes_from[i];
568 if (!plan->sizes_from.size() || (plan->sizes_from.size() && sum)) {
569 throw std::logic_error("nrecvs not zero, but recv_data = NULL");
570 }
571 }
572
573 /* Post irecvs */
574 if (plan->indices_from == Teuchos::null) {
575 /* Data can go directly into user space. */
576 plan->recv_buff = recv_data;
577 }
578 else { /* Need to buffer receive to reorder */
579 size_t rsize = (size_t) (plan->total_recv_size) * (size_t) nbytes;
580 plan->recv_buff = Teuchos::arcp(new char[rsize], 0, rsize, true);
581 }
582
583 size_t self_recv_address = 0; /* where in recv_data self info starts */
584 if (!plan->using_sizes) { /* All data the same size */
585 int k = 0;
586 for (int i = 0; i < plan->nrecvs + plan->self_msg; i++) {
587 if (plan->procs_from[i] != my_proc) {
588 Teuchos::ArrayRCP<char> subview(
589 &plan->getRecvBuff().getRawPtr()[plan->starts_from[i] * nbytes],
590 0, plan->lengths_from[i] * nbytes, false);
591 plan->request[k] = Teuchos::ireceive(subview, plan->procs_from[i], tag, *comm_);
592 k++;
593 }
594 else {
595 self_recv_address = (size_t)(plan->starts_from[i]) * (size_t)nbytes;
596 }
597 }
598 }
599 else { /* Data of varying sizes */
600 int k = 0;
601 for (int i = 0; i < plan->nrecvs + plan->self_msg; i++) {
602 if (plan->procs_from[i] != my_proc) {
603 if (plan->sizes_from[i]) {
604 Teuchos::ArrayRCP<char> subview(
605 &plan->getRecvBuff().getRawPtr()[(size_t)(plan->starts_from_ptr[i])
606 * (size_t)nbytes],
607 0, plan->sizes_from[i] * nbytes, false);
608 plan->request[k] = Teuchos::ireceive(subview, plan->procs_from[i], tag, *comm_);
609 }
610 else {
611 plan->request[k] = Teuchos::null;
612 }
613 k++;
614 }
615 else {
616 self_recv_address =
617 (size_t)(plan->starts_from_ptr[i]) * (size_t)nbytes;
618 }
619 }
620 }
621
622 Teuchos::ArrayRCP<char> send_buff;
623 if(plan->indices_to != Teuchos::null) {
624 send_buff = Teuchos::arcp(new char[plan->max_send_size * nbytes], 0, plan->max_send_size * nbytes, true);
625 }
626
627 /* Barrier to ensure irecvs are posted before doing any sends. */
628 /* Simultaneously see if anyone out of memory */
629 int out_of_mem = 0;
630 // WARNING - do not delete this without proper barrier added as replacmeent.
631 // I'm refactoring memory handling so probably we won't use out_of_mem
632 // in the new version but we must still preserve a barrier here or get
633 // intermittent failures.
634 // I'll keep the memory reduce for now since we may end up with a memory
635 // handling anyways.
636 int global_out_of_mem;
637 Teuchos::reduceAll(*plan->comm, Teuchos::REDUCE_SUM, 1, &out_of_mem,
638 &global_out_of_mem);
639
640 /* Send out data */
641
642 /* Scan through procs_to list to start w/ higher numbered procs */
643 /* This should balance message traffic. */
644
645 int nblocks = plan->nsends + plan->self_msg; /* # procs needing my data */
646 int proc_index = 0; /* loop counter over procs to send to */
647 while (proc_index < nblocks && plan->procs_to[proc_index] < my_proc) {
648 proc_index++;
649 }
650 if (proc_index == nblocks) {
651 proc_index = 0;
652 }
653
654 if (!plan->using_sizes) { /* Data all of same size */
655 if (plan->indices_to == Teuchos::null) { /* data already blocked by processor. */
656 int self_num = 0; /* where in send list my_proc appears */
657 for (int i = proc_index, j = 0; j < nblocks; j++) {
658 if (plan->procs_to[i] != my_proc) {
659 Teuchos::ArrayRCP<char> subview(
660 &send_data[plan->starts_to[i] * nbytes],
661 0, plan->lengths_to[i] * nbytes, false);
662 Teuchos::readySend(subview.getRawPtr(), static_cast<int>(subview.size()), plan->procs_to[i], tag, *comm_);
663 }
664 else {
665 self_num = i;
666 }
667 if (++i == nblocks) {
668 i = 0;
669 }
670 }
671
672 if (plan->self_msg) { /* Copy data to self. */
673 /* I use array+offset instead of &(array[offset]) because of
674 a bug with PGI v9 */
675 /* I use memmove because I'm not sure that the pointer are not
676 overlapped. */
677
678 // TODO: Refactor to make C++ and remove getRawPtr, etc
679 memmove(
680 plan->getRecvBuff().getRawPtr()+self_recv_address,
681 send_data.getRawPtr()+(size_t)(plan->starts_to[self_num])*(size_t)nbytes,
682 (size_t) (plan->lengths_to[self_num]) * (size_t) nbytes);
683 }
684 }
685 else { /* Not blocked by processor. Need to buffer. */
686 int self_index = 0; /* send offset for data I'm keeping */
687 int self_num = 0; /* where in send list my_proc appears */
688 for (int i = proc_index, jj = 0; jj < nblocks; jj++) {
689 if (plan->procs_to[i] != my_proc) {
690 /* Need to pack message first. */
691 size_t offset = 0; /* offset into array I'm copying into */
692 int j = plan->starts_to[i];
693 for (int k = 0; k < plan->lengths_to[i]; k++) {
694 memcpy(&send_buff[offset],
695 &send_data[(size_t)(plan->indices_to[j++]) * (size_t)nbytes], nbytes);
696 offset += nbytes;
697 }
698 Teuchos::readySend(&send_buff[0], plan->lengths_to[i] * nbytes,
699 plan->procs_to[i], tag, *comm_);
700 }
701 else {
702 self_num = i;
703 self_index = plan->starts_to[i];
704 }
705 if (++i == nblocks)
706 i = 0;
707 }
708
709 if (plan->self_msg) { /* Copy data to self. */
710 for (int k = 0; k < plan->lengths_to[self_num]; k++) {
711 memcpy(&(plan->getRecvBuff())[self_recv_address],
712 &send_data[
713 (size_t)(plan->indices_to[self_index++]) * (size_t)nbytes],
714 nbytes);
715 self_recv_address += nbytes;
716 }
717 }
718 }
719 }
720 else { /* Data of differing sizes */
721 if (plan->indices_to == Teuchos::null) { /* data already blocked by processor. */
722 int self_num = 0; /* where in send list my_proc appears */
723 for (int i = proc_index, j = 0; j < nblocks; j++) {
724 if (plan->procs_to[i] != my_proc) {
725 if (plan->sizes_to[i]) {
726 Teuchos::readySend(&send_data[plan->starts_to_ptr[i] * nbytes], plan->sizes_to[i] * nbytes,
727 plan->procs_to[i], tag, *comm_);
728 }
729 }
730 else
731 self_num = i;
732
733 if (++i == nblocks)
734 i = 0;
735 }
736 if (plan->self_msg) { /* Copy data to self. */
737 if (plan->sizes_to[self_num]) {
738 char* lrecv = &plan->getRecvBuff().getRawPtr()[self_recv_address];
739 const char* lsend =
740 &send_data.getRawPtr()[(size_t)(plan->starts_to_ptr[self_num]) * (size_t)nbytes];
741 int sindex = plan->sizes_to[self_num], idx;
742 for (idx=0; idx<nbytes; idx++) {
743 memcpy(lrecv, lsend, sindex);
744 lrecv += sindex;
745 lsend += sindex;
746 }
747 }
748 }
749 }
750 else { /* Not blocked by processor. Need to buffer. */
751 int self_num = 0; /* where in send list my_proc appears */
752 for (int i = proc_index, jj = 0; jj < nblocks; jj++) {
753 if (plan->procs_to[i] != my_proc) {
754 size_t offset = 0; /* offset into array I'm copying into */
755 int j = plan->starts_to[i];
756 for (int k = 0; k < plan->lengths_to[i]; k++) {
757 if (plan->sizes[plan->indices_to[j]]) {
758 memcpy(&send_buff[offset],
759 &send_data[(size_t)(plan->indices_to_ptr[j]) * (size_t)nbytes],
760 (size_t)(plan->sizes[plan->indices_to[j]]) * (size_t)nbytes);
761 offset +=
762 (size_t)(plan->sizes[plan->indices_to[j]]) * (size_t)nbytes;
763 }
764 j++;
765 }
766 if (plan->sizes_to[i]) {
767 Teuchos::readySend(&send_buff[0], plan->sizes_to[i] * nbytes,
768 plan->procs_to[i], tag, *comm_);
769 }
770 }
771 else
772 self_num = i;
773
774 if (++i == nblocks)
775 i = 0;
776 }
777 if (plan->self_msg) { /* Copy data to self. */
778 if (plan->sizes_to[self_num]) {
779 int j = plan->starts_to[self_num];
780 for (int k = 0; k < plan->lengths_to[self_num]; k++) {
781 int kk = plan->indices_to_ptr[j];
782 char* lrecv = &(plan->getRecvBuff())[self_recv_address];
783 size_t send_idx = (size_t)kk * (size_t)nbytes;
784 const char* lsend = &send_data[send_idx];
785 int sindex = plan->sizes[plan->indices_to[j]], idx;
786 for (idx=0; idx<nbytes; idx++) {
787 memcpy(lrecv, lsend, sindex);
788 lrecv += sindex;
789 lsend += sindex;
790 }
791 self_recv_address += (size_t)(plan->sizes[plan->indices_to[j]])
792 * (size_t) nbytes;
793 j++;
794 }
795 }
796 }
797 }
798 }
799
800 return 0;
801}
802
803int Zoltan2_Directory_Comm::do_wait(
804 Zoltan2_Directory_Plan *plan, /* communication data structure */
805 int /* tag */, /* message tag for communicating */
806 const Teuchos::ArrayRCP<char> &/* send_data */, /* array of data I currently own */
807 int nbytes, /* msg size */
808 Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
809{
810 /* If not point to point, currently we do synchroneous communications */
811 if (plan->maxed_recvs){
812 /* Do nothing */
813 return 0;
814 }
815
816 int my_proc = plan->comm->getRank(); /* processor ID */
817
818 /* Wait for messages to arrive & unpack them if necessary. */
819 /* Note: since request is in plan, could wait in later routine. */
820 if (plan->indices_from == Teuchos::null) { /* No copying required */
821 if (plan->nrecvs > 0) {
822 Teuchos::waitAll(*comm_, plan->request());
823 }
824 }
825 else { /* Need to copy into recv_data. */
826 int self_num; /* where in send list my_proc appears */
827 size_t offsetDst = 0;
828 if (plan->self_msg) { /* Unpack own data before waiting */
829 for (self_num = 0; self_num < plan->nrecvs + plan->self_msg;
830 self_num++) {
831 if (plan->procs_from[self_num] == my_proc) {
832 break;
833 }
834 }
835
836 if(plan->sizes_from.size()) {
837 // NEW METHOD for variable sized data
838 // This will NOT put the data in order but during the directory read
839 // of the buffer which follows, this data gets sorted anyways since each
840 // element has an index to tell where it belongs. I am not sure yet if
841 // we must sort here, or if it's fine to allow the sort to happen through
842 // the index value. This needs furthe discussion.
843 memcpy(&recv_data[offsetDst * (size_t)nbytes],
844 &(plan->getRecvBuff())[plan->starts_from_ptr[self_num] * (size_t)nbytes],
845 plan->sizes_from[self_num] * (size_t)nbytes);
846 offsetDst += plan->sizes_from[self_num];
847 }
848 else {
849 int k = plan->starts_from[self_num];
850 for (int j = plan->lengths_from[self_num]; j; j--) {
851 memcpy(&recv_data[(size_t)(plan->indices_from[k]) * (size_t)nbytes],
852 &(plan->getRecvBuff())[(size_t)k * (size_t)nbytes], nbytes);
853 k++;
854 }
855 }
856 }
857 else {
858 self_num = plan->nrecvs;
859 }
860
861 for (int jj = 0; jj < plan->nrecvs; jj++) {
862 // TODO: Refactored directory to use Teuchos comm but we have no Teuchos::waitAny
863 // Short term fix is we just call wait() on each request in serial.
864 // When we add Teuchos::waitAny we can replace it for the old version
865 // which is commented out below.
866 plan->request[jj]->wait();
867 int index = jj;
868
869 // Old form with MPI_Waitany
870 // MPI_Status status; /* return from Waitany */
871 // int index;
872 // MPI_Waitany(plan->nrecvs, &plan->request[0], &index, &status);
873
874 if (index >= self_num) {
875 index++;
876 }
877
878 if(plan->sizes_from.size()) {
879 // NEW METHOD for variable sized data
880 // This will NOT put the data in order but during the directory read
881 // of the buffer which follows, this data gets sorted anyways since each
882 // element has an index to tell where it belongs. I am not sure yet if
883 // we must sort here, or if it's fine to allow the sort to happen through
884 // the index value. This needs furthe discussion.
885 memcpy(&recv_data[offsetDst * (size_t)nbytes],
886 &plan->getRecvBuff().getRawPtr()[plan->starts_from_ptr[index] * (size_t)nbytes],
887 plan->sizes_from[index] * (size_t)nbytes);
888 offsetDst += plan->sizes_from[index];
889 }
890 else {
891 int k = plan->starts_from[index];
892 for (int j = plan->lengths_from[index]; j; j--) {
893 memcpy(&recv_data.getRawPtr()[(size_t)(plan->indices_from[k]) * (size_t)nbytes],
894 &plan->getRecvBuff().getRawPtr()[(size_t)k * (size_t)nbytes], nbytes);
895 k++;
896 }
897 }
898 }
899 }
900
901 return 0;
902}
903
904/* Do_Post would require posting more receives than allowed on this platform.
905* We use MPI_AlltoAllv instead, which is probably implemented such that each
906* process does one receive at a time.
907*/
908
909int Zoltan2_Directory_Comm::do_all_to_all(
910 Zoltan2_Directory_Plan *plan, /* communication data structure */
911 const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
912 int nbytes, /* msg size */
913 Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
914{
915 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (6)"); // needs unit testing
916
917 int sm = (plan->self_msg > 0) ? 1 : 0;
918
919 int nSendMsgs = plan->nsends + sm;
920 int nRecvMsgs = plan->nrecvs + sm;
921
922 int nSendItems = 0;
923 for (int i=0; i <nSendMsgs; i++) {
924 nSendItems += plan->lengths_to[i];
925 }
926 int nRecvItems = 0;
927 for (int i=0; i <nRecvMsgs; i++) {
928 nRecvItems += plan->lengths_from[i];
929 }
930
931 int nprocs = plan->comm->getSize();
932
933 Teuchos::ArrayRCP<int> outbufCounts(new int[nprocs], 0, nprocs, true);
934 Teuchos::ArrayRCP<int> outbufOffsets(new int[nprocs], 0, nprocs, true);
935 Teuchos::ArrayRCP<int> inbufCounts(new int[nprocs], 0, nprocs, true);
936 Teuchos::ArrayRCP<int> inbufOffsets(new int[nprocs], 0, nprocs, true);
937
938 /* The *_to fields of the plan refer to the items in the send_data buffer,
939 * and how to pull out the correct items for each receiver. The
940 * *_from fields of the plan refer to the recv_data buffer. Items
941 * arrive in process rank order, and these fields tell us where to
942 * put them in the recv_data buffer.
943 */
944
945 /* CREATE SEND BUFFER */
946
947 int sorted = 0;
948 if (plan->indices_to == Teuchos::null){
949 sorted = 1;
950 for (int i=1; i< nSendMsgs; i++){
951 if (plan->starts_to[i] < plan->starts_to[i-1]){
952 sorted = 0;
953 break;
954 }
955 }
956 }
957
958 Teuchos::ArrayRCP<char> outbuf;
959 Teuchos::ArrayRCP<char> inbuf;
960 Teuchos::ArrayRCP<char> buf;
961
962 if (plan->sizes_to.size()){
963 /*
964 * Each message contains items for a process, and each item may be
965 * a different size.
966 */
967
968 int outbufLen = 0;
969 for (int i = 0; i < nSendMsgs; i++){
970 outbufLen += plan->sizes_to[i];
971 }
972
973 if (plan->indices_to != Teuchos::null) {
974 /*
975 * items are not grouped by message
976 */
977 buf.resize(outbufLen*nbytes);
978 outbuf.resize(outbufLen*nbytes);
979 char * pBufPtr = &(outbuf[0]);
980 int i = 0;
981 int k = 0;
982 for (int p = 0; p < nprocs; p++) {
983
984 int length = 0;
985
986 if (i < nSendMsgs){
987 if (plan->procs_to[i] == p){ /* procs_to is sorted */
988
989 for (int j=0; j < plan->lengths_to[i]; j++,k++){
990 int itemSize = plan->sizes[plan->indices_to[k]] * nbytes;
991 int offset = plan->indices_to_ptr[k] * nbytes;
992
993 memcpy(pBufPtr, &(send_data[0]) + offset, itemSize);
994
995 pBufPtr += itemSize;
996 length += itemSize;
997 }
998 i++;
999 }
1000 }
1001
1002 outbufCounts[p] = length;
1003 if (p){
1004 outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1005 }
1006 }
1007 }
1008 else{
1009 /*
1010 * items are stored contiguously for each message
1011 */
1012
1013 if (!sorted || (plan->nvals > nSendItems) ){
1014 buf.resize(outbufLen*nbytes);
1015 outbuf.resize(outbufLen*nbytes);
1016 }
1017 else{
1018 /* All items in send_data are being sent, and they are sorted
1019 * in process rank order.
1020 */
1021 // TODO: Optimize - original just set the ptr...
1022 for(int n = 0; n < outbufLen*nbytes; ++n) {
1023 outbuf[n] = send_data[n];
1024 }
1025 }
1026
1027 char * pBufPtr = &(outbuf[0]);
1028
1029 int i = 0;
1030 for (int p = 0; p < nprocs; p++) {
1031
1032 int length = 0;
1033
1034 if (i < nSendMsgs){
1035 if (plan->procs_to[i] == p){ /* procs_to is sorted */
1036 length = plan->sizes_to[i] * nbytes;
1037 int offset = plan->starts_to_ptr[i] * nbytes;
1038
1039 if ((!sorted || (plan->nvals > nSendItems)) && length){
1040 memcpy(pBufPtr, &(send_data[0]) + offset, length);
1041 pBufPtr += length;
1042 }
1043 i++;
1044 }
1045 }
1046
1047 outbufCounts[p] = length;
1048 if (p){
1049 outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1050 }
1051 }
1052 }
1053 }
1054 else if (plan->indices_to != Teuchos::null) {
1055 /*
1056 * item sizes are constant, however the items belonging in a given
1057 * message may not be contiguous in send_data
1058 */
1059 buf.resize(nSendItems*nbytes);
1060 outbuf.resize(nSendItems*nbytes);
1061 char * pBufPtr = &(outbuf[0]);
1062 int i = 0;
1063 int k = 0;
1064 for (int p = 0; p < nprocs; p++){
1065
1066 int length = 0;
1067
1068 if (i < nSendMsgs){
1069 if (plan->procs_to[i] == p){ /* procs_to is sorted */
1070 for (int j=0; j < plan->lengths_to[i]; j++,k++) {
1071 int offset = plan->indices_to[k] * nbytes;
1072 memcpy(pBufPtr, &(send_data[0]) + offset, nbytes);
1073 pBufPtr += nbytes;
1074 }
1075 length = plan->lengths_to[i] * nbytes;
1076 i++;
1077 }
1078 }
1079
1080 outbufCounts[p] = length;
1081 if (p){
1082 outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1083 }
1084 }
1085 }
1086 else{
1087
1088 /* item sizes are constant, and items belonging to a
1089 * given message are always stored contiguously in send_data
1090 */
1091
1092 if (!sorted || (plan->nvals > nSendItems)){
1093 buf.resize(nSendItems*nbytes);
1094 outbuf.resize(nSendItems*nbytes);
1095 }
1096 else{
1097 /* send_data is sorted by process, and we don't skip
1098 * any of the data in the buffer, so we can use send_data
1099 * in the alltoall call
1100 */
1101 // TODO: Optimize - original just set ptr
1102 outbuf = send_data;
1103 }
1104
1105 char * pBufPtr = &(outbuf[0]);
1106
1107 int i = 0;
1108 for (int p=0; p < nprocs; p++) {
1109
1110 int length = 0;
1111
1112 if (i < nSendMsgs){
1113 if (plan->procs_to[i] == p){ /* procs_to is sorted */
1114 int offset = plan->starts_to[i] * nbytes;
1115 length = plan->lengths_to[i] * nbytes;
1116
1117 if ((!sorted || (plan->nvals > nSendItems)) && length){
1118 memcpy(pBufPtr, &(send_data[0]) + offset, length);
1119 pBufPtr += length;
1120 }
1121 i++;
1122 }
1123 }
1124
1125 outbufCounts[p] = length;
1126 if (p){
1127 outbufOffsets[p] = outbufOffsets[p-1] + outbufCounts[p-1];
1128 }
1129 }
1130 }
1131
1132 /* CREATE RECEIVE BUFFER */
1133
1134 sorted = 0;
1135 int i;
1136 if (plan->indices_from == Teuchos::null) {
1137 sorted = 1;
1138 for (i=1; i< nRecvMsgs; i++) {
1139 if (plan->starts_from[i] < plan->starts_from[i-1]){
1140 sorted = 0;
1141 break;
1142 }
1143 }
1144 }
1145
1146 if (sorted){
1147 /* Caller already expects received data to be ordered by
1148 * the sending process rank.
1149 */
1150
1151 // TODO: Optimize - original just set ptr
1152 outbuf = send_data;
1153 inbuf = recv_data;
1154 }
1155 else {
1156 inbuf.resize(plan->total_recv_size * nbytes);
1157 }
1158
1159 for (int p = 0; p < nprocs; p++) {
1160 int length = 0;
1161
1162 if (i < nRecvMsgs){
1163 if (plan->procs_from[i] == p){
1164
1165 if (!plan->using_sizes){
1166 length = plan->lengths_from[i] * nbytes;
1167 }
1168 else{
1169 length = plan->sizes_from[i] * nbytes;
1170 }
1171 i++;
1172 }
1173 }
1174
1175 inbufCounts[p] = length;
1176 if (p){
1177 inbufOffsets[p] = inbufOffsets[p-1] + inbufCounts[p-1];
1178 }
1179 }
1180
1181 /* EXCHANGE DATA */
1182 // Did not refactor this - need Teuchos form but not tested code
1183 // MPI_Alltoall(&(sendbuf[0]), 1, MPI_INT, &(recvbuf[0]), 1, MPI_INT,
1184 // getRawComm());
1185
1186 /* WRITE RECEIVED DATA INTO USER'S BUFFER WHERE IT'S EXPECTED */
1187
1188 if (!sorted){
1189
1190 char * pBufPtr = &(inbuf[0]);
1191
1192 if (!plan->using_sizes){
1193
1194 /* each item in each message is nbytes long */
1195
1196 if (plan->indices_from == Teuchos::null) {
1197 for (i=0; i < nRecvMsgs; i++){
1198 int offset = plan->starts_from[i] * nbytes;
1199 int length = plan->lengths_from[i] * nbytes;
1200 memcpy(&(recv_data[0]) + offset, pBufPtr, length);
1201 pBufPtr += length;
1202 }
1203 }
1204 else{
1205 int k = 0;
1206 for (i=0; i < nRecvMsgs; i++) {
1207
1208 for (int j=0; j < plan->lengths_from[i]; j++,k++){
1209 int offset = plan->indices_from[k] * nbytes;
1210 memcpy(&(recv_data[0]) + offset, pBufPtr, nbytes);
1211 pBufPtr += nbytes;
1212 }
1213 }
1214 }
1215 }
1216 else{ /* (sizes!=NULL) && (indices_from!=NULL) not allowed by Zoltan_Comm_Resize */
1217
1218 /* items can be different sizes */
1219
1220 for (i=0; i < nRecvMsgs; i++){
1221 int offset = plan->starts_from_ptr[i] * nbytes;
1222 int length = plan->sizes_from[i] * nbytes;
1223 memcpy(&(recv_data[0]) + offset, pBufPtr, length);
1224 pBufPtr += length;
1225 }
1226 }
1227 }
1228
1229 return 0;
1230}
1231
1233 int tag, /* message tag for communicating */
1234 const Teuchos::ArrayRCP<char> &send_data, /* array of data I currently own */
1235 int nbytes, /* msg size */
1236 const Teuchos::ArrayRCP<int> &sizes,
1237 Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after reverse comm */
1238{
1239 /* create plan->plan_reverse
1240 */
1241 int status = create_reverse_plan(tag, sizes);
1242
1243 // NEW METHOD
1244 // Set up recv_data with the proper size
1245 // This information is only available after the above create_reverse_plan is
1246 // called so we can setup the return data with proper buffer size now.
1247 // However should we do this here?
1248 size_t new_size = plan_forward->plan_reverse->total_recv_size*nbytes;
1249 if(new_size > 0) {
1250 // have to be careful with new[0] which will for example turn a
1251 // arrayRCP.getRawPtr() from a valid NULL read to a debug assert fail.
1252 recv_data = Teuchos::arcp(new char[new_size], 0, new_size, true);
1253 }
1254
1255 if (status == 0) {
1256
1257 if (plan_forward->plan_reverse->maxed_recvs) {
1258
1259 throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (7)"); // needs unit testing
1260
1261 /* use MPI_Alltoallv to implement plan->plan_reverse, because comm_do_post
1262 * would post more receives that allowed on this machine
1263 */
1264
1265 status = do_all_to_all(plan_forward->plan_reverse, send_data,
1266 nbytes, recv_data);
1267 }
1268 else {
1269 /* use post/wait which is faster when each sends to few
1270 */
1271 status = do_post(plan_forward->plan_reverse, tag, send_data,
1272 nbytes, recv_data);
1273
1274 if (status == 0) {
1275 status = do_wait (plan_forward->plan_reverse, tag, send_data,
1276 nbytes, recv_data);
1277 }
1278 }
1279 }
1280
1281 free_reverse_plan(plan_forward);
1282
1283 return status;
1284}
1285
1286void Zoltan2_Directory_Comm::free_reverse_plan(Zoltan2_Directory_Plan *plan)
1287{
1288 if(!plan) {
1289 throw std::logic_error("Plan is NULL!");
1290 }
1291 delete plan->plan_reverse;
1292 plan->plan_reverse = NULL;
1293}
1294
1295int Zoltan2_Directory_Comm::create_reverse_plan(
1296 int tag,
1297 const Teuchos::ArrayRCP<int> &sizes)/* variable size of objects (if not size 0) */
1298{
1299 /* Check input parameters */
1300 if (!plan_forward){
1301 throw std::logic_error("memory error");
1302 }
1303
1304 /* Let Zoltan_Comm_Do check the remaining parameters. */
1305 plan_forward->plan_reverse = new Zoltan2_Directory_Plan;
1306 plan_forward->plan_reverse->getInvertedValues(plan_forward);
1307
1308 if (MPI_RECV_LIMIT > 0){
1309 /* If we have a limit to the number of posted receives we are allowed,
1310 ** and our plan has exceeded that, then switch to an MPI_Alltoallv so
1311 ** that we will have fewer receives posted when we do the communication.
1312 */
1313 int global_nsends;
1314 Teuchos::reduceAll<int>(*plan_forward->comm, Teuchos::REDUCE_SUM, 1,
1315 &plan_forward->nsends, &global_nsends);
1316 if (global_nsends > MPI_RECV_LIMIT){
1317 plan_forward->plan_reverse->maxed_recvs = 1;
1318 }
1319 }
1320
1321 if (plan_forward->plan_reverse->maxed_recvs == 0) {
1322 // See notes in header for MPI_Request
1323 // plan_forward->plan_reverse->request = Teuchos::arcp(new MPI_Request[plan_forward->plan_reverse->nrecvs], 0, plan_forward->plan_reverse->nrecvs, true);
1324 // plan_forward->plan_reverse->status = Teuchos::arcp(new MPI_Status[plan_forward->plan_reverse->nrecvs], 0, plan_forward->plan_reverse->nrecvs, true);
1325 plan_forward->plan_reverse->request.resize(plan_forward->plan_reverse->nrecvs);
1326 }
1327
1328 int sum_recv_sizes;
1329 int comm_flag = resize( plan_forward->plan_reverse,
1330 sizes, tag, &sum_recv_sizes);
1331
1332 if (comm_flag != 0) {
1333 return(comm_flag);
1334 }
1335
1336 if (sum_recv_sizes != plan_forward->plan_reverse->total_recv_size){
1337 /* Sanity check */
1338 return 1;
1339 }
1340
1341 return 0;
1342}
1343
1345 const Teuchos::ArrayRCP<int> &sizes, /* size of each item I'm sending */
1346 int tag, /* message tag I can use */
1347 int *sum_recv_sizes) /* sum of the sizes of the items I'll receive */
1348{
1349 return resize(plan_forward, sizes, tag, sum_recv_sizes);
1350}
1351
1353 Zoltan2_Directory_Plan *plan, /* communication plan object */
1354 const Teuchos::ArrayRCP<int> &sizes, /* size of each item I'm sending */
1355 int tag, /* message tag I can use */
1356 int *sum_recv_sizes) /* sum of the sizes of the items I'll receive */
1357{
1358 /* If sizes vary, then I need to compute and communicate message lengths */
1359 /* First check if sizes array is NULL on all procs. */
1360 int my_proc = plan->comm->getRank(); /* my processor ID */
1361 int has_sizes = (sizes.size() != 0);
1362 int var_sizes; /* items have variable sizes? */
1363
1364 Teuchos::reduceAll(*comm_, Teuchos::REDUCE_BOR, 1, &has_sizes, &var_sizes);
1365
1366 if (var_sizes && plan->indices_from != Teuchos::null) {
1367 // NEW METHOD
1368 // Allow this to run now - the below implementation is working but perhaps
1369 // not done correctly for other usage cases I have not considered yet.
1370 // throw std::logic_error("Non-blocked, variable-sized recvs not supported");
1371 }
1372
1373 int nsends = plan->nsends; /* number of msgs I'll send */
1374 int nrecvs = plan->nrecvs; /* number of msgs I'll recv */
1375 int self_msg = plan->self_msg;
1376
1377 Teuchos::ArrayRCP<int> sizes_to;
1378 Teuchos::ArrayRCP<int> sizes_from;
1379 Teuchos::ArrayRCP<int> starts_to_ptr;
1380 Teuchos::ArrayRCP<int> starts_from_ptr;
1381 Teuchos::ArrayRCP<int> indices_to_ptr;
1382 Teuchos::ArrayRCP<int> indices_from_ptr;
1383
1384 if (!var_sizes) { /* Easy case. Size = length */
1385 plan->total_recv_size = 0;
1386 for (int i = 0; i < nrecvs + self_msg; i++) {
1387 plan->total_recv_size += plan->lengths_from[i];
1388 }
1389
1390 plan->max_send_size = 0;
1391 for (int i = 0; i < nsends + self_msg; i++) {
1392 if (plan->procs_to[i] != my_proc &&
1393 plan->lengths_to[i] > plan->max_send_size) {
1394 plan->max_send_size = plan->lengths_to[i];
1395 }
1396 }
1397 }
1398 else { /* Need to actually compute message sizes */
1399
1400 // TODO Investigate purpose of the +1 in the old code. Is that used?
1401
1402 // OLD CODE
1403 // plan->sizes.resize(plan->nvals + 1);
1404 // for (int i = 0; i < plan->nvals; i++) {
1405 // plan->sizes[i] = sizes[i];
1406 // }
1407
1408 // NEW CODE
1409 plan->sizes = sizes; // can we just copy?
1410 plan->using_sizes = true;
1411
1412 if(nsends + self_msg > 0) {
1413 sizes_to = Teuchos::arcp(
1414 new int[nsends + self_msg], 0, nsends + self_msg, true);
1415 for(int n = 0; n < sizes_to.size(); ++n) {
1416 sizes_to[n] = 0;
1417 }
1418 }
1419 if(nrecvs + self_msg > 0) {
1420 sizes_from = Teuchos::arcp(
1421 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1422 }
1423
1424 /* Several cases:
1425 1. indices_to == NULL
1426 => starts_to != NULL, need to allocate, set starts_to_ptr
1427 2. indices_to != NULL (=> starts_to == NULL)
1428 need to allocate, set indices_to_ptr
1429 3,4. mirror cases for _from
1430 */
1431 if(nsends + self_msg > 0) {
1432 starts_to_ptr = Teuchos::arcp(
1433 new int[nsends + self_msg], 0, nsends + self_msg, true);
1434 }
1435 if (plan->indices_to == Teuchos::null) {
1436 /* Simpler case; sends already blocked by processor */
1437 Teuchos::ArrayRCP<int> index;
1438 Teuchos::ArrayRCP<int> sort_val;
1439 if(nsends + self_msg > 0) {
1440 index = Teuchos::arcp(
1441 new int[nsends + self_msg], 0, nsends + self_msg, true);
1442 sort_val = Teuchos::arcp(
1443 new int[nsends + self_msg], 0, nsends + self_msg, true);
1444 }
1445 for (int i = 0; i < nsends + self_msg; i++) {
1446 int j = plan->starts_to[i];
1447
1448 for (int k = 0; k < plan->lengths_to[i]; k++) {
1449 sizes_to[i] += sizes[j++];
1450 }
1451 if (sizes_to[i] > plan->max_send_size &&
1452 plan->procs_to[i] != my_proc)
1453 plan->max_send_size = sizes_to[i];
1454 }
1455 for (int i = 0; i < nsends + self_msg; i++) {
1456 sort_val[i] = plan->starts_to[i];
1457 index[i] = i;
1458 }
1459 sort_ints(sort_val, index);
1460 int sum = 0;
1461 for (int i = 0; i < nsends + self_msg; i++) {
1462 starts_to_ptr[index[i]] = sum;
1463 sum += sizes_to[index[i]];
1464 }
1465 }
1466 else { /* Harder case, sends not blocked */
1467 Teuchos::ArrayRCP<int> offset;
1468 if(plan->nvals > 0) {
1469 offset = Teuchos::arcp(new int[plan->nvals], 0, plan->nvals, true);
1470 }
1471 indices_to_ptr.resize(plan->nvals);
1472
1473 /* Compute address for every item in send array */
1474 int sum = 0;
1475 for (int i = 0; i < plan->nvals; i++) {
1476 offset[i] = sum;
1477 sum += sizes[i];
1478 }
1479
1480 sum = 0;
1481 plan->max_send_size = 0;
1482 for (int i = 0; i < nsends + self_msg; i++) {
1483 starts_to_ptr[i] = sum;
1484 int j = plan->starts_to[i];
1485 for (int k = 0; k < plan->lengths_to[i]; k++) {
1486 indices_to_ptr[j] = offset[plan->indices_to[j]];
1487 sizes_to[i] += sizes[plan->indices_to[j++]];
1488 }
1489 if (sizes_to[i] > plan->max_send_size &&
1490 plan->procs_to[i] != my_proc)
1491 plan->max_send_size = sizes_to[i];
1492 sum += sizes_to[i];
1493 }
1494 }
1495
1496 /* Note: This routine only gets message sizes, not object sizes. */
1497 /* Anything requiring item sizes requires more code */
1498 /* Should such functionality reside here? */
1499 exchange_sizes(sizes_to, plan->procs_to, nsends, self_msg,
1500 sizes_from, plan->procs_from, nrecvs,
1501 &plan->total_recv_size, my_proc, tag, plan->comm);
1502
1503 if(nrecvs + self_msg > 0) {
1504 starts_from_ptr = Teuchos::arcp(
1505 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1506 }
1507
1508 if (plan->indices_from == Teuchos::null) {
1509 /* Simpler case; recvs already blocked by processor */
1510 Teuchos::ArrayRCP<int> index;
1511 Teuchos::ArrayRCP<int> sort_val;
1512 if(nrecvs + self_msg > 0) {
1513 index = Teuchos::arcp(
1514 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1515 sort_val = Teuchos::arcp<int>(
1516 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1517 }
1518
1519 for (int i = 0; i < nrecvs + self_msg; i++) {
1520 sort_val[i] = plan->starts_from[i];
1521 index[i] = i;
1522 }
1523 sort_ints(sort_val, index);
1524
1525 int sum = 0;
1526 for (int i = 0; i < nrecvs + self_msg; i++) {
1527 starts_from_ptr[index[i]] = sum;
1528 sum += sizes_from[index[i]];
1529 }
1530 }
1531 else {
1532 // OLD COMMENT left here for reference but to be deleted
1533 /* Harder case, recvs not blocked */
1534 /* Not currently supported */
1535 /* Can't do w/o individual item sizes */
1536 /* Currently checked for at top of file */
1537
1538 // NEW METHOD
1539 // Note this is currently just a duplicate of above block which is working
1540 // since I've set up do_wait to just copy the entire block. However I am
1541 // not sure yet how to organize this implementation and suspect this may
1542 // not be correct anyways - though it seems to work. After do_wait copies
1543 // the data (out of order) the directory will handle resorting it since
1544 // each element has index to indicate where it goes in the array. In the
1545 // original do_wait implementation it seemed it was setup to place the
1546 // elemenets in order, even though they would be sorted later, so that
1547 // part I need to discuss further
1548 Teuchos::ArrayRCP<int> index;
1549 Teuchos::ArrayRCP<int> sort_val;
1550 if(nrecvs + self_msg > 0) {
1551 index = Teuchos::arcp(
1552 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1553 sort_val = Teuchos::arcp(
1554 new int[nrecvs + self_msg], 0, nrecvs + self_msg, true);
1555 }
1556
1557 for (int i = 0; i < nrecvs + self_msg; i++) {
1558 sort_val[i] = plan->starts_from[i];
1559 index[i] = i;
1560 }
1561 sort_ints(sort_val, index);
1562
1563 int sum = 0;
1564 for (int i = 0; i < nrecvs + self_msg; i++) {
1565 starts_from_ptr[index[i]] = sum;
1566 sum += sizes_from[index[i]];
1567 }
1568 }
1569 }
1570 plan->sizes_to = sizes_to;
1571 plan->sizes_from = sizes_from;
1572 plan->starts_to_ptr = starts_to_ptr;
1573 plan->starts_from_ptr = starts_from_ptr;
1574 plan->indices_to_ptr = indices_to_ptr;
1575 plan->indices_from_ptr = indices_from_ptr;
1576
1577 if (sum_recv_sizes) {
1578 *sum_recv_sizes = plan->total_recv_size;
1579 }
1580
1581 return 0;
1582}
1583
1584int Zoltan2_Directory_Comm::exchange_sizes(
1585 const Teuchos::ArrayRCP<int> &sizes_to, /* value I need to exchange (size of true msg) */
1586 const Teuchos::ArrayRCP<int> &procs_to, /* procs I send to */
1587 int nsends, /* number of messages I'll send */
1588 int self_msg, /* do I copy data to myself? */
1589 Teuchos::ArrayRCP<int> &sizes_from, /* (returned) size of all my receives */
1590 const Teuchos::ArrayRCP<int> &procs_from, /* procs I recv from */
1591 int nrecvs, /* number of messages I receive */
1592 int *total_recv_size, /* (returned) sum of all incoming sizes */
1593 int my_proc, /* my processor number */
1594 int tag, /* message tag I can use */
1595 Teuchos::RCP<const Teuchos::Comm<int> > /* comm */) { /* communicator */
1596
1597 /* If sizes vary, then I need to communicate messaaage lengths */
1598 int self_index_to = -1; /* location of self in procs_to */
1599 for (int i = 0; i < nsends + self_msg; i++) {
1600 if (procs_to[i] != my_proc) {
1601#ifdef HAVE_MPI // Teuchos::send not implemented for Serial - Serial is just for debugging
1602 Teuchos::send(*comm_, 1, &sizes_to[i], procs_to[i]);
1603#endif
1604 }
1605 else {
1606 self_index_to = i;
1607 }
1608 }
1609
1610 *total_recv_size = 0;
1611
1612 for (int i = 0; i < nrecvs + self_msg; i++) {
1613 if (procs_from[i] != my_proc) {
1614#ifdef HAVE_MPI // Teuchos::receive not implemented for Serial - Serial is just for debugging
1615 Teuchos::receive(*comm_, procs_from[i], 1, &sizes_from[i]);
1616#endif
1617 }
1618 else {
1619 sizes_from[i] = sizes_to[self_index_to];
1620 }
1621 *total_recv_size += sizes_from[i];
1622 }
1623 return 0;
1624}
1625
1626} // end namespace Zoltan2
#define PRINT_VECTOR(v)
#define PRINT_VAL(val)
#define MPI_RECV_LIMIT
int do_reverse(int tag, const Teuchos::ArrayRCP< char > &send_data, int nbytes, const Teuchos::ArrayRCP< int > &sizes, Teuchos::ArrayRCP< char > &recv_data)
Zoltan2_Directory_Comm(int nvals, const Teuchos::ArrayRCP< int > &assign, Teuchos::RCP< const Teuchos::Comm< int > > comm, int tag)
int do_forward(int tag, const Teuchos::ArrayRCP< char > &send_data, int nbytes, Teuchos::ArrayRCP< char > &recv_data)
int resize(const Teuchos::ArrayRCP< int > &sizes, int tag, int *sum_recv_sizes)
void getInvertedValues(Zoltan2_Directory_Plan *from)
void print(const std::string &headerMessage) const
Teuchos::ArrayRCP< char > getRecvBuff() const
Teuchos::ArrayRCP< Teuchos::RCP< Teuchos::CommRequest< int > > > request
Teuchos::RCP< const Teuchos::Comm< int > > comm
Teuchos::ArrayRCP< int > indices_from_ptr
Created by mbenlioglu on Aug 31, 2020.