Tpetra parallel linear algebra Version of the Day
Loading...
Searching...
No Matches
Tpetra_Details_ReadTriples.hpp
Go to the documentation of this file.
1// @HEADER
2// *****************************************************************************
3// Tpetra: Templated Linear Algebra Services Package
4//
5// Copyright 2008 NTESS and the Tpetra contributors.
6// SPDX-License-Identifier: BSD-3-Clause
7// *****************************************************************************
8// @HEADER
9
10#ifndef TPETRA_DETAILS_READTRIPLES_HPP
11#define TPETRA_DETAILS_READTRIPLES_HPP
12
21
22#include "TpetraCore_config.h"
23#include "Tpetra_Details_PackTriples.hpp"
24#if KOKKOS_VERSION >= 40799
25#include "KokkosKernels_ArithTraits.hpp"
26#else
27#include "Kokkos_ArithTraits.hpp"
28#endif
29#include "Teuchos_MatrixMarket_generic.hpp"
30#include "Teuchos_CommHelpers.hpp"
31#include <iostream>
32#include <typeinfo> // for debugging
33
34namespace Tpetra {
35namespace Details {
36
37//
38// Search for "SKIP DOWN TO HERE" (omit quotes) for the "public"
39// interface. I put "public" in quotes because it's public only for
40// Tpetra developers, NOT for Tpetra users.
41//
42
43namespace Impl {
44
45// mfh 01 Feb 2017: Unfortunately,
46// Teuchos::MatrixMarket::readComplexData requires Teuchos to have
47// complex arithmetic support enabled. To avoid this issue, I
48// reimplement the function here. It's not very long.
49
82template <class OrdinalType, class RealType>
83bool readComplexData(std::istream& istr,
88 const std::size_t lineNumber,
89 const bool tolerant) {
90 using ::Teuchos::MatrixMarket::readRealData;
91
94 if (tolerant) {
95 return false;
96 } else {
97 std::ostringstream os;
98 os << "Failed to read pattern data and/or real value from line "
99 << lineNumber << " of input";
100 throw std::invalid_argument(os.str());
101 }
102 }
103 if (istr.eof()) {
104 if (tolerant) {
105 return false;
106 } else {
107 std::ostringstream os;
108 os << "No more data after real value on line "
109 << lineNumber << " of input";
110 throw std::invalid_argument(os.str());
111 }
112 }
114 if (istr.fail()) {
115 if (tolerant) {
116 return false;
117 } else {
118 std::ostringstream os;
119 os << "Failed to get imaginary value from line "
120 << lineNumber << " of input";
121 throw std::invalid_argument(os.str());
122 }
123 }
126 return true;
127}
128
138template <class SC,
139 class GO,
140#if KOKKOS_VERSION >= 40799
141 const bool isComplex = ::KokkosKernels::ArithTraits<SC>::is_complex>
142#else
143 const bool isComplex = ::Kokkos::ArithTraits<SC>::is_complex>
144#endif
145struct ReadLine {
166 static int
167 readLine(std::function<int(const GO, const GO, const SC&)> processTriple,
168 const std::string& line,
169 const std::size_t lineNumber,
170 const bool tolerant = false,
171 std::ostream* errStrm = NULL,
172 const bool debug = false);
173};
174
182template <class SC, class GO>
183struct ReadLine<SC, GO, true> {
204 static int
205 readLine(std::function<int(const GO, const GO, const SC&)> processTriple,
206 const std::string& line,
207 const std::size_t lineNumber,
208 const bool tolerant = false,
209 std::ostream* errStrm = NULL,
210 const bool debug = false) {
211 using ::Teuchos::MatrixMarket::checkCommentLine;
212#if KOKKOS_VERSION >= 40799
213 typedef typename ::KokkosKernels::ArithTraits<SC>::mag_type real_type;
214#else
215 typedef typename ::Kokkos::ArithTraits<SC>::mag_type real_type;
216#endif
217 using std::endl;
218
219 GO rowInd, colInd;
221 std::istringstream istr(line);
222 bool success = true;
223 try {
224 // Use the version of this function in this file, not the
225 // version in Teuchos_MatrixMarket_generic.hpp, because the
226 // latter only exists if HAVE_TEUCHOS_COMPLEX is defined.
229 } catch (std::exception& e) {
230 success = false;
231 if (errStrm != NULL) {
232 std::ostringstream os;
233 os << "readLine: readComplexData threw an exception: " << e.what()
234 << endl;
235 *errStrm << os.str();
236 }
237 }
238
239 if (success) {
240 // if (debug && errStrm != NULL) {
241 // std::ostringstream os;
242 // os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
243 // << ", realPart=" << realPart << ", imagPart=" << imagPart
244 // << std::endl;
245 // *errStrm << os.str ();
246 // }
247 // This line may have side effects.
248 const int errCode =
250 if (errCode != 0 && errStrm != NULL) {
251 std::ostringstream os;
252 os << "readLine: processTriple returned " << errCode << " != 0."
253 << endl;
254 *errStrm << os.str();
255 }
256 return errCode;
257 } else {
258 return -1;
259 }
260 }
261};
262
270template <class SC, class GO>
271struct ReadLine<SC, GO, false> {
292 static int
293 readLine(std::function<int(const GO, const GO, const SC&)> processTriple,
294 const std::string& line,
295 const std::size_t lineNumber,
296 const bool tolerant = false,
297 std::ostream* errStrm = NULL,
298 const bool debug = false) {
299 using std::endl;
300 using ::Teuchos::MatrixMarket::checkCommentLine;
301 using ::Teuchos::MatrixMarket::readRealData;
302
303 GO rowInd, colInd;
304 SC val;
305 std::istringstream istr(line);
306 bool success = true;
307 try {
310 } catch (std::exception& e) {
311 success = false;
312 if (errStrm != NULL) {
313 std::ostringstream os;
314 os << "readLine: readRealData threw an exception: " << e.what()
315 << endl;
316 *errStrm << os.str();
317 }
318 }
319
320 if (success) {
321 if (debug && errStrm != NULL) {
322 std::ostringstream os;
323 os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
324 << ", val=" << val << std::endl;
325 *errStrm << os.str();
326 }
327 // This line may have side effects.
328 const int errCode = processTriple(rowInd, colInd, val);
329 if (errCode != 0 && errStrm != NULL) {
330 std::ostringstream os;
331 os << "readLine: processTriple returned " << errCode << " != 0."
332 << endl;
333 *errStrm << os.str();
334 }
335 return errCode;
336 } else {
337 return -1;
338 }
339 }
340};
341
367template <class SC, class GO>
368int readLine(std::function<int(const GO, const GO, const SC&)> processTriple,
369 const std::string& line,
370 const std::size_t lineNumber,
371 const bool tolerant = false,
372 std::ostream* errStrm = NULL,
373 const bool debug = false) {
375 tolerant, errStrm, debug);
376}
377
408template <class SC, class GO>
409int readTriples(std::istream& inputStream,
410 std::size_t& curLineNum,
411 std::size_t& numTriplesRead,
412 std::function<int(const GO, const GO, const SC&)> processTriple,
413 const std::size_t maxNumTriplesToRead,
414 const bool tolerant = false,
415 std::ostream* errStrm = NULL,
416 const bool debug = false) {
417 using std::endl;
418 using std::size_t;
419 using Teuchos::MatrixMarket::checkCommentLine;
420
421 numTriplesRead = 0; // output argument only
422 if (inputStream.eof()) {
423 return 0; // no error, just nothing left to read
424 } else if (inputStream.fail()) {
425 if (errStrm != NULL) {
426 *errStrm << "Input stream reports a failure (not the same as "
427 "end-of-file)."
428 << endl;
429 }
430 return -1;
431 }
432
433 std::string line;
434 std::vector<size_t> badLineNumbers;
435 int errCode = 0; // 0 means success
436
437 bool inputStreamCanStillBeRead = std::getline(inputStream, line).good();
438 ++curLineNum; // we read the line; we can't put it back
440 // if (debug && errStrm != NULL) {
441 // std::ostringstream os;
442 // os << "readTriples: Got line: \"" << line << "\"" << std::endl;
443 // *errStrm << os.str ();
444 // }
445 size_t start, size;
446
447 const bool isCommentLine =
449 if (isCommentLine) {
450 // Move on to the next line, if there is a next line.
451 inputStreamCanStillBeRead = std::getline(inputStream, line).good();
452 ++curLineNum; // we read another line; we can't put it back
453 continue; // move on to the next line
454 } else { // not a comment line; should have a sparse matrix entry
455 const std::string theLine = line.substr(start, size);
456 // If the line has a valid sparse matrix entry, extract it and
457 // hand it off to the processTriple closure.
458 const int curErrCode =
460 if (curErrCode != 0) {
462 badLineNumbers.push_back(curLineNum);
463 } else {
465 }
467 inputStreamCanStillBeRead = std::getline(inputStream, line).good();
468 }
469 }
470 } // while there are lines to read and we need more triples
471
472 if (errCode != 0 && errStrm != NULL) {
473 const size_t numBadLines = badLineNumbers.size();
474 *errStrm << "Encountered " << numBadLines << " bad line"
475 << (numBadLines != size_t(1) ? "s" : "")
476 << ": [";
477 for (size_t k = 0; k < numBadLines; ++k) {
479 if (k + 1 < numBadLines) {
480 *errStrm << ", ";
481 }
482 }
483 *errStrm << "]" << endl;
484 }
485 if (!inputStream.eof() && inputStream.fail()) {
486 if (errCode == 0) {
487 errCode = -1;
488 }
489 if (errStrm != NULL) {
490 *errStrm << "The input stream is not at end-of-file, "
491 "but is in a bad state."
492 << endl;
493 }
494 }
495 return errCode;
496}
497
523template <class SC, class GO>
525 std::size_t& curLineNum,
526 std::size_t& numEntRead,
527 ::Teuchos::ArrayRCP<int>& sizeBuf,
528 ::Teuchos::ArrayRCP<char>& msgBuf,
529 std::vector<GO>& rowInds,
530 std::vector<GO>& colInds,
531 std::vector<SC>& vals,
532 const std::size_t maxNumEntPerMsg,
533 const int destRank,
534 const ::Teuchos::Comm<int>& comm,
535 const bool tolerant = false,
536 std::ostream* errStrm = NULL,
537 const bool debug = false) {
538 using std::endl;
539 using ::Teuchos::isend;
540 using ::Tpetra::Details::countPackTriples;
541 using ::Tpetra::Details::countPackTriplesCount;
542 using ::Tpetra::Details::packTriples;
543 using ::Tpetra::Details::packTriplesCount;
544
545#if KOKKOS_VERSION >= 40799
546 using ::KokkosKernels::ArithTraits;
547#else
548 using ::Kokkos::ArithTraits;
549#endif
550 // constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
551 // constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
552 constexpr int sizeTag = 42;
553 constexpr int msgTag = 43;
554 // constexpr int srcRank = 0;
555 int errCode = 0;
556
557 // This doesn't actually deallocate memory; it just changes the size
558 // back to zero, so that push_back starts over from the beginning.
559 rowInds.resize(0);
560 colInds.resize(0);
561 vals.resize(0);
562 // Closure that adds the new matrix entry to the above temp arrays.
563 auto processTriple = [&rowInds, &colInds, &vals](const GO rowInd, const GO colInd, const SC& val) {
564 try {
565 rowInds.push_back(rowInd);
566 colInds.push_back(colInd);
567 vals.push_back(val);
568 } catch (...) {
569 return -1;
570 }
571 return 0;
572 };
573 numEntRead = 0; // output argument
576 errStrm, debug);
577 if (debug && errStrm != NULL) {
578 std::ostringstream os;
579 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
580 << ", GO=" << typeid(GO).name() << ": "
581 << "readAndSendOneBatchOfTriples: readTriples read "
582 << numEntRead << " matrix entries, and returned errCode="
583 << errCode << "." << std::endl;
584 *errStrm << os.str();
585 }
586 if (numEntRead != rowInds.size() ||
587 numEntRead != colInds.size() ||
588 numEntRead != vals.size()) {
589 if (errStrm != NULL) {
590 *errStrm << "readTriples size results are not consistent. "
591 << "numEntRead = " << numEntRead
592 << ", rowInds.size() = " << rowInds.size()
593 << ", colInds.size() = " << colInds.size()
594 << ", and vals.size() = " << vals.size() << "."
595 << std::endl;
596 }
597 if (errCode == 0) {
598 errCode = -1;
599 }
600 }
601
602 // We don't consider reading having "failed" if we've reached
603 // end-of-file before reading maxNumEntPerMsg entries. It's OK if
604 // we got fewer triples than that. Furthermore, we have to send at
605 // least one message to the destination process, even if the read
606 // from the file failed.
607
608 if (numEntRead == 0 || errCode != 0) {
609 // Send a message size of zero to the receiving process, to tell
610 // it that we have no triples to send, or that there was an error
611 // reading. The latter just means that we "go through the
612 // motions," then broadcast the error code.
613 sizeBuf[0] = 0;
614 if (debug && errStrm != NULL) {
615 std::ostringstream os;
616 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
617 << ", GO=" << typeid(GO).name() << ": "
618 << "Post send (size=0, errCode=" << errCode << ") "
619 << "to " << destRank << " with tag " << sizeTag << endl;
620 *errStrm << os.str();
621 }
622 send(sizeBuf.getRawPtr(), 1, destRank, sizeTag, comm);
623 return errCode;
624 } else { // we read a nonzero # of triples, without error
625 const int numEnt = static_cast<int>(numEntRead);
626 int countSize = 0; // output argument
627 int triplesSize = 0; // output argument
628
630 // countSize should never be nonpositive, since we have to pack an
631 // integer size.
632 if (countSize <= 0 && errCode == 0) {
633 errCode = -1;
634 }
635
636 if (errCode != 0) {
637 // Send zero to the receiving process, to tell it about the error.
638 sizeBuf[0] = 0;
639 if (debug && errStrm != NULL) {
640 std::ostringstream os;
641 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
642 << ", GO=" << typeid(GO).name() << ": "
643 << "Post send (size=0, error case) to " << destRank
644 << " with tag " << sizeTag << endl;
645 *errStrm << os.str();
646 }
647 send(sizeBuf.getRawPtr(), 1, destRank, sizeTag, comm);
648 return errCode;
649 } else { // countPackTriplesCount succeeded
651 if (errCode != 0) {
652 // Send a message size of zero to the receiving process, to
653 // tell it that there was an error counting.
654 sizeBuf[0] = 0;
655 if (debug && errStrm != NULL) {
656 std::ostringstream os;
657 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
658 << ", GO=" << typeid(GO).name() << ": "
659 << "Post send (size=0, error case) to " << destRank
660 << " with tag " << sizeTag << endl;
661 *errStrm << os.str();
662 }
663 send(sizeBuf.getRawPtr(), 1, destRank, sizeTag, comm);
664 return errCode;
665 } else { // countPackTriples succeeded; message packed & ready to send
666 // Send the message size (in bytes). We can use a nonblocking
667 // send here, and try to overlap with message packing.
668 const int outBufSize = countSize + triplesSize;
669 sizeBuf[0] = outBufSize;
670 if (debug && errStrm != NULL) {
671 std::ostringstream os;
672 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
673 << ", GO=" << typeid(GO).name() << ": "
674 << "Post isend (size=" << sizeBuf[0] << ") to " << destRank
675 << " with tag " << sizeTag << endl;
676 *errStrm << os.str();
677 }
679
680 msgBuf.resize(outBufSize);
681 char* outBuf = msgBuf.getRawPtr();
682
683 // If anything goes wrong with packing, send the pack buffer
684 // anyway, since the receiving process expects a message.
685 int outBufCurPos = 0; // input/output argument
687 outBufCurPos, comm, errStrm);
688 if (errCode == 0) {
690 vals.data(), numEnt, outBuf,
692 errStrm);
693 }
694 if (debug && errStrm != NULL) {
695 std::ostringstream os;
696 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
697 << ", GO=" << typeid(GO).name() << ": "
698 << "Post isend (packed data) to " << destRank
699 << " with tag " << msgTag << endl;
700 *errStrm << os.str();
701 }
703
704 // Wait on the two messages. It doesn't matter in what order
705 // we send them, because they have different tags. The
706 // receiving process will wait on the first message first, in
707 // order to get the size of the second message.
708 if (debug && errStrm != NULL) {
709 std::ostringstream os;
710 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
711 << ", GO=" << typeid(GO).name() << ": "
712 << "Wait on isend (size)" << endl;
713 *errStrm << os.str();
714 }
715 sizeReq->wait();
716 if (debug && errStrm != NULL) {
717 std::ostringstream os;
718 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
719 << ", GO=" << typeid(GO).name() << ": "
720 << "Wait on isend (packed data)" << endl;
721 *errStrm << os.str();
722 }
723 msgReq->wait();
724
725 // This doesn't actually deallocate; it just resets sizes to zero.
726 rowInds.clear();
727 colInds.clear();
728 vals.clear();
729 }
730 }
731 }
732 return errCode;
733}
734
742
770template <class SC, class GO, class CommRequestPtr>
771int recvOneBatchOfTriples(std::vector<GO>& rowInds,
772 std::vector<GO>& colInds,
773 std::vector<SC>& vals,
774 int& numEnt,
775 ::Teuchos::ArrayRCP<int>& sizeBuf,
776 ::Teuchos::ArrayRCP<char>& msgBuf,
778 const int srcRank,
779 const ::Teuchos::Comm<int>& comm,
780 const bool tolerant = false,
781 std::ostream* errStrm = NULL,
782 const bool debug = false) {
783#if KOKKOS_VERSION >= 40799
784 using ::KokkosKernels::ArithTraits;
785#else
786 using ::Kokkos::ArithTraits;
787#endif
788 using ::Tpetra::Details::unpackTriples;
789 using ::Tpetra::Details::unpackTriplesCount;
790
792 // constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
793 // constexpr int sizeTag = 42;
794 constexpr int msgTag = 43;
795 int errCode = 0; // return value
796 numEnt = 0; // output argument
797
798 // Wait on the ireceive we preposted before calling this function.
799 if (debug && errStrm != NULL) {
800 std::ostringstream os;
801 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
802 << ", GO=" << typeid(GO).name() << ": "
803 << "Wait on irecv (size)" << std::endl;
804 *errStrm << os.str();
805 }
806 sizeReq->wait();
808 const int inBufSize = sizeBuf[0];
809 if (debug && errStrm != NULL) {
810 std::ostringstream os;
811 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
812 << ", GO=" << typeid(GO).name() << ": "
813 << "Received size: sizeBuf[0]=" << sizeBuf[0] << std::endl;
814 *errStrm << os.str();
815 }
816
817 if (inBufSize == 0) {
818 numEnt = 0;
819 rowInds.resize(0);
820 colInds.resize(0);
821 vals.resize(0);
822 } else {
823 msgBuf.resize(inBufSize);
824 char* inBuf = msgBuf.getRawPtr();
825
826 if (debug && errStrm != NULL) {
827 std::ostringstream os;
828 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
829 << ", GO=" << typeid(GO).name() << ": "
830 << "Post irecv (packed data) "
831 << "from " << srcRank
832 << " with tag " << msgTag << std::endl;
833 *errStrm << os.str();
834 }
835 auto msgReq = ::Teuchos::ireceive(msgBuf, srcRank, msgTag, comm);
836 if (debug && errStrm != NULL) {
837 std::ostringstream os;
838 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
839 << ", GO=" << typeid(GO).name() << ": "
840 << "Wait on irecv (packed data)" << std::endl;
841 *errStrm << os.str();
842 }
843 msgReq->wait();
844
845 int inBufCurPos = 0; // output argument
847 numEnt, comm, errStrm);
848 if (errCode == 0) {
849 rowInds.resize(numEnt);
850 colInds.resize(numEnt);
851 vals.resize(numEnt);
853 rowInds.data(), colInds.data(),
854 vals.data(), numEnt, comm, errStrm);
855 }
856 }
857 return errCode;
858}
859
860} // namespace Impl
861
862//
863// SKIP DOWN TO HERE FOR "PUBLIC" INTERFACE
864//
865
899template <class SC, class GO>
900int readAndDealOutTriples(std::istream& inputStream, // only valid on Proc 0
901 std::size_t& curLineNum, // only valid on Proc 0
902 std::size_t& totalNumEntRead, // only valid on Proc 0
903 std::function<int(const GO, const GO, const SC&)> processTriple,
904 const std::size_t maxNumEntPerMsg,
905 const ::Teuchos::Comm<int>& comm,
906 const bool tolerant = false,
907 std::ostream* errStrm = NULL,
908 const bool debug = false) {
909#if KOKKOS_VERSION >= 40799
910 using KokkosKernels::ArithTraits;
911#else
912 using Kokkos::ArithTraits;
913#endif
914 using std::endl;
915 using std::size_t;
916
917 constexpr int srcRank = 0;
918 // constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
920 constexpr int sizeTag = 42;
921 // constexpr int msgTag = 43;
922 const int myRank = comm.getRank();
923 const int numProcs = comm.getSize();
924 int errCode = 0;
925
926 ::Teuchos::ArrayRCP<int> sizeBuf(1);
927 ::Teuchos::ArrayRCP<char> msgBuf; // to be resized as needed
928
929 // Temporary storage for reading & packing (on Process srcRank) or
930 // unpacking (every other process) triples.
931 std::vector<GO> rowInds;
932 std::vector<GO> colInds;
933 std::vector<SC> vals;
934 rowInds.reserve(maxNumEntPerMsg);
935 colInds.reserve(maxNumEntPerMsg);
936 vals.reserve(maxNumEntPerMsg);
937
938 totalNumEntRead = 0;
939 if (myRank == srcRank) {
940 // Loop around through all the processes, including this one, over
941 // and over until we reach the end of the file, or an error occurs.
942 int destRank = 0;
943 bool lastMessageWasLegitZero = false;
944 for (;
945 !inputStream.eof() && errCode == 0;
946 destRank = (destRank + 1) % numProcs) {
947 size_t curNumEntRead = 0; // output argument of below
948 if (destRank == srcRank) {
949 // We can read and process the triples directly. We don't
950 // need to use intermediate storage, because we don't need to
951 // pack and send the triples.
952 const int readErrCode =
953 Impl::readTriples<SC, GO>(inputStream, curLineNum, curNumEntRead,
955 errStrm, debug);
956 if (debug && errStrm != NULL) {
957 std::ostringstream os;
958 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
959 << ", GO=" << typeid(GO).name() << ": "
960 << "(dest=src) readTriples returned curNumEntRead="
961 << curNumEntRead << ", errCode=" << readErrCode << endl;
962 *errStrm << os.str();
963 }
965 } else {
966 if (false && debug && errStrm != NULL) {
967 std::ostringstream os;
968 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
969 << ", GO=" << typeid(GO).name() << ": "
970 << "Calling readAndSend... with destRank=" << destRank << endl;
971 *errStrm << os.str();
972 }
973 // Read, pack, and send the triples to destRank.
974 const int readAndSendErrCode =
975 Impl::readAndSendOneBatchOfTriples<SC, GO>(inputStream, curLineNum,
980 comm, tolerant, errStrm,
981 debug);
983 if (debug && errStrm != NULL) {
984 std::ostringstream os;
985 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
986 << ", GO=" << typeid(GO).name() << ": "
987 << "readAndSend... with destRank=" << destRank
988 << " returned curNumEntRead=" << curNumEntRead
989 << ", errCode=" << readAndSendErrCode << endl;
990 *errStrm << os.str();
991 }
993 if (readAndSendErrCode == 0 && curNumEntRead == 0) {
995 if (debug && errStrm != NULL) {
996 std::ostringstream os;
997 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
998 << ", GO=" << typeid(GO).name() << ": "
999 << "Last send to " << destRank << " with tag " << sizeTag
1000 << " was legit zero, counts as termination" << endl;
1001 *errStrm << os.str();
1002 }
1003 }
1004 }
1005 } // loop around through processes until done reading file, or error
1006
1007 // Loop around through the remaining processes, and tell them that
1008 // we're done, by sending zero. If the last message we sent to
1009 // destRank was zero, then skip that process, since it only
1010 // expects one message of size zero. Note that destRank got
1011 // incremented mod numProcs at end of loop, so we have to
1012 // decrement it mod numProcs.
1013 destRank = (destRank - 1) % numProcs;
1014 if (destRank < 0) { // C mod operator does not promise positivity
1016 }
1017
1018 const int startRank = lastMessageWasLegitZero ? (destRank + 1) : destRank;
1019 for (int outRank = startRank; outRank < numProcs; ++outRank) {
1020 if (outRank != srcRank) {
1021 if (debug && errStrm != NULL) {
1022 std::ostringstream os;
1023 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
1024 << ", GO=" << typeid(GO).name() << ": "
1025 << "Post send (size, termination msg) to " << outRank
1026 << " with tag " << sizeTag << "(was last message legit zero? "
1027 << (lastMessageWasLegitZero ? "true" : "false") << ")" << endl;
1028 *errStrm << os.str();
1029 }
1030 sizeBuf[0] = 0;
1031 ::Teuchos::send(sizeBuf.getRawPtr(), 1, outRank, sizeTag, comm);
1032 }
1033 }
1034 } else {
1035 while (true) {
1036 // Prepost a message to receive the size (in bytes) of the
1037 // incoming packet.
1038 sizeBuf[0] = 0; // superfluous, but safe
1039 if (debug && errStrm != NULL) {
1040 std::ostringstream os;
1041 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
1042 << ", GO=" << typeid(GO).name() << ": "
1043 << "Post irecv (size) from " << srcRank
1044 << " with tag " << sizeTag << std::endl;
1045 *errStrm << os.str();
1046 }
1047 auto sizeReq = ::Teuchos::ireceive(sizeBuf, srcRank, sizeTag, comm);
1048
1049 int numEnt = 0; // output argument
1050 const int recvErrCode =
1051 Impl::recvOneBatchOfTriples(rowInds, colInds, vals, numEnt, sizeBuf,
1052 msgBuf, sizeReq, srcRank, comm, tolerant,
1053 errStrm, debug);
1054 if (debug && errStrm != NULL) {
1055 std::ostringstream os;
1056 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
1057 << ", GO=" << typeid(GO).name() << ": "
1058 << "recvOneBatchOfTriples returned numEnt=" << numEnt
1059 << ", errCode=" << recvErrCode << endl;
1060 *errStrm << os.str();
1061 }
1063
1064 if (numEnt != static_cast<int>(rowInds.size()) ||
1065 numEnt != static_cast<int>(colInds.size()) ||
1066 numEnt != static_cast<int>(vals.size())) {
1067 errCode = (errCode == 0) ? -1 : errCode;
1068 if (errStrm != NULL) {
1069 *errStrm << "recvOneBatchOfTriples produced inconsistent data sizes. "
1070 << "numEnt = " << numEnt
1071 << ", rowInds.size() = " << rowInds.size()
1072 << ", colInds.size() = " << colInds.size()
1073 << ", vals.size() = " << vals.size() << "."
1074 << endl;
1075 }
1076 } // if sizes inconsistent
1077
1078 // Sending zero items is how Process srcRank tells this process
1079 // that it (Process srcRank) is done sending out data.
1080 if (numEnt == 0) {
1081 break;
1082 }
1083
1084 for (int k = 0; k < numEnt && errCode == 0; ++k) {
1085 const int curErrCode = processTriple(rowInds[k], colInds[k], vals[k]);
1086 errCode = (curErrCode == 0) ? errCode : curErrCode;
1087 }
1088 } // while we still get messages from srcRank
1089 }
1090
1091 if (debug && errStrm != NULL) {
1092 std::ostringstream os;
1093 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
1094 << ", GO=" << typeid(GO).name() << ": "
1095 << "Done with send/recv loop" << endl;
1096 *errStrm << os.str();
1097 }
1098 // Do a bitwise OR to get an error code that is nonzero if and only
1099 // if any process' local error code is nonzero.
1100 using ::Teuchos::outArg;
1101 using ::Teuchos::REDUCE_BOR;
1102 using ::Teuchos::reduceAll;
1103 const int lclErrCode = errCode;
1105 return errCode;
1106}
1107
1108} // namespace Details
1109} // namespace Tpetra
1110
1111#endif // TPETRA_DETAILS_READTRIPLES_HPP
bool readComplexData(std::istream &istr, OrdinalType &rowIndex, OrdinalType &colIndex, RealType &realPart, RealType &imagPart, const std::size_t lineNumber, const bool tolerant)
Read "<rowIndex> <colIndex> <realPart> <imagPart>" from a line.
int readTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &numTriplesRead, std::function< int(const GO, const GO, const SC &)> processTriple, const std::size_t maxNumTriplesToRead, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Read at most numTriplesToRead triples from the given Matrix Market input stream, and pass along any r...
int recvOneBatchOfTriples(std::vector< GO > &rowInds, std::vector< GO > &colInds, std::vector< SC > &vals, int &numEnt, ::Teuchos::ArrayRCP< int > &sizeBuf, ::Teuchos::ArrayRCP< char > &msgBuf, CommRequestPtr &sizeReq, const int srcRank, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Read at most maxNumEntPerMsg sparse matrix entries from the input stream, and send them to the proces...
int readAndSendOneBatchOfTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &numEntRead, ::Teuchos::ArrayRCP< int > &sizeBuf, ::Teuchos::ArrayRCP< char > &msgBuf, std::vector< GO > &rowInds, std::vector< GO > &colInds, std::vector< SC > &vals, const std::size_t maxNumEntPerMsg, const int destRank, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Read at most maxNumEntPerMsg sparse matrix entries from the input stream, and send them to the proces...
int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
Struct that holds views of the contents of a CrsMatrix.
Implementation details of Tpetra.
int packTriplesCount(const int, char[], const int, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Pack the count (number) of matrix triples.
int countPackTriplesCount(const ::Teuchos::Comm< int > &, int &size, std::ostream *errStrm)
Compute the buffer size required by packTriples for packing the number of matrix entries ("triples").
int unpackTriplesCount(const char[], const int, int &, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Unpack just the count of triples from the given input buffer.
int readAndDealOutTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &totalNumEntRead, std::function< int(const GO, const GO, const SC &)> processTriple, const std::size_t maxNumEntPerMsg, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
On Process 0 in the given communicator, read sparse matrix entries (in chunks of at most maxNumEntPer...
Namespace Tpetra contains the class and methods constituting the Tpetra library.
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
Implementation of the readLine stand-alone function in this namespace (see below).
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...