Tpetra parallel linear algebra Version of the Day
Loading...
Searching...
No Matches
Tpetra_Details_ReadTriples.hpp
Go to the documentation of this file.
1// @HEADER
2// *****************************************************************************
3// Tpetra: Templated Linear Algebra Services Package
4//
5// Copyright 2008 NTESS and the Tpetra contributors.
6// SPDX-License-Identifier: BSD-3-Clause
7// *****************************************************************************
8// @HEADER
9
10#ifndef TPETRA_DETAILS_READTRIPLES_HPP
11#define TPETRA_DETAILS_READTRIPLES_HPP
12
21
22#include "TpetraCore_config.h"
23#include "Tpetra_Details_PackTriples.hpp"
24#include "KokkosKernels_ArithTraits.hpp"
25#include "Teuchos_MatrixMarket_generic.hpp"
26#include "Teuchos_CommHelpers.hpp"
27#include <iostream>
28#include <typeinfo> // for debugging
29
30namespace Tpetra {
31namespace Details {
32
33//
34// Search for "SKIP DOWN TO HERE" (omit quotes) for the "public"
35// interface. I put "public" in quotes because it's public only for
36// Tpetra developers, NOT for Tpetra users.
37//
38
39namespace Impl {
40
41// mfh 01 Feb 2017: Unfortunately,
42// Teuchos::MatrixMarket::readComplexData requires Teuchos to have
43// complex arithmetic support enabled. To avoid this issue, I
44// reimplement the function here. It's not very long.
45
78template <class OrdinalType, class RealType>
79bool readComplexData(std::istream& istr,
84 const std::size_t lineNumber,
85 const bool tolerant) {
86 using ::Teuchos::MatrixMarket::readRealData;
87
90 if (tolerant) {
91 return false;
92 } else {
93 std::ostringstream os;
94 os << "Failed to read pattern data and/or real value from line "
95 << lineNumber << " of input";
96 throw std::invalid_argument(os.str());
97 }
98 }
99 if (istr.eof()) {
100 if (tolerant) {
101 return false;
102 } else {
103 std::ostringstream os;
104 os << "No more data after real value on line "
105 << lineNumber << " of input";
106 throw std::invalid_argument(os.str());
107 }
108 }
110 if (istr.fail()) {
111 if (tolerant) {
112 return false;
113 } else {
114 std::ostringstream os;
115 os << "Failed to get imaginary value from line "
116 << lineNumber << " of input";
117 throw std::invalid_argument(os.str());
118 }
119 }
122 return true;
123}
124
134template <class SC,
135 class GO,
136 const bool isComplex = ::KokkosKernels::ArithTraits<SC>::is_complex>
137struct ReadLine {
158 static int
159 readLine(std::function<int(const GO, const GO, const SC&)> processTriple,
160 const std::string& line,
161 const std::size_t lineNumber,
162 const bool tolerant = false,
163 std::ostream* errStrm = NULL,
164 const bool debug = false);
165};
166
174template <class SC, class GO>
175struct ReadLine<SC, GO, true> {
196 static int
197 readLine(std::function<int(const GO, const GO, const SC&)> processTriple,
198 const std::string& line,
199 const std::size_t lineNumber,
200 const bool tolerant = false,
201 std::ostream* errStrm = NULL,
202 const bool debug = false) {
203 using ::Teuchos::MatrixMarket::checkCommentLine;
204 typedef typename ::KokkosKernels::ArithTraits<SC>::mag_type real_type;
205 using std::endl;
206
207 GO rowInd, colInd;
209 std::istringstream istr(line);
210 bool success = true;
211 try {
212 // Use the version of this function in this file, not the
213 // version in Teuchos_MatrixMarket_generic.hpp, because the
214 // latter only exists if HAVE_TEUCHOS_COMPLEX is defined.
217 } catch (std::exception& e) {
218 success = false;
219 if (errStrm != NULL) {
220 std::ostringstream os;
221 os << "readLine: readComplexData threw an exception: " << e.what()
222 << endl;
223 *errStrm << os.str();
224 }
225 }
226
227 if (success) {
228 // if (debug && errStrm != NULL) {
229 // std::ostringstream os;
230 // os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
231 // << ", realPart=" << realPart << ", imagPart=" << imagPart
232 // << std::endl;
233 // *errStrm << os.str ();
234 // }
235 // This line may have side effects.
236 const int errCode =
238 if (errCode != 0 && errStrm != NULL) {
239 std::ostringstream os;
240 os << "readLine: processTriple returned " << errCode << " != 0."
241 << endl;
242 *errStrm << os.str();
243 }
244 return errCode;
245 } else {
246 return -1;
247 }
248 }
249};
250
258template <class SC, class GO>
259struct ReadLine<SC, GO, false> {
280 static int
281 readLine(std::function<int(const GO, const GO, const SC&)> processTriple,
282 const std::string& line,
283 const std::size_t lineNumber,
284 const bool tolerant = false,
285 std::ostream* errStrm = NULL,
286 const bool debug = false) {
287 using std::endl;
288 using ::Teuchos::MatrixMarket::checkCommentLine;
289 using ::Teuchos::MatrixMarket::readRealData;
290
291 GO rowInd, colInd;
292 SC val;
293 std::istringstream istr(line);
294 bool success = true;
295 try {
298 } catch (std::exception& e) {
299 success = false;
300 if (errStrm != NULL) {
301 std::ostringstream os;
302 os << "readLine: readRealData threw an exception: " << e.what()
303 << endl;
304 *errStrm << os.str();
305 }
306 }
307
308 if (success) {
309 if (debug && errStrm != NULL) {
310 std::ostringstream os;
311 os << "readLine: Got entry: row=" << rowInd << ", col=" << colInd
312 << ", val=" << val << std::endl;
313 *errStrm << os.str();
314 }
315 // This line may have side effects.
316 const int errCode = processTriple(rowInd, colInd, val);
317 if (errCode != 0 && errStrm != NULL) {
318 std::ostringstream os;
319 os << "readLine: processTriple returned " << errCode << " != 0."
320 << endl;
321 *errStrm << os.str();
322 }
323 return errCode;
324 } else {
325 return -1;
326 }
327 }
328};
329
355template <class SC, class GO>
356int readLine(std::function<int(const GO, const GO, const SC&)> processTriple,
357 const std::string& line,
358 const std::size_t lineNumber,
359 const bool tolerant = false,
360 std::ostream* errStrm = NULL,
361 const bool debug = false) {
363 tolerant, errStrm, debug);
364}
365
396template <class SC, class GO>
397int readTriples(std::istream& inputStream,
398 std::size_t& curLineNum,
399 std::size_t& numTriplesRead,
400 std::function<int(const GO, const GO, const SC&)> processTriple,
401 const std::size_t maxNumTriplesToRead,
402 const bool tolerant = false,
403 std::ostream* errStrm = NULL,
404 const bool debug = false) {
405 using std::endl;
406 using std::size_t;
407 using Teuchos::MatrixMarket::checkCommentLine;
408
409 numTriplesRead = 0; // output argument only
410 if (inputStream.eof()) {
411 return 0; // no error, just nothing left to read
412 } else if (inputStream.fail()) {
413 if (errStrm != NULL) {
414 *errStrm << "Input stream reports a failure (not the same as "
415 "end-of-file)."
416 << endl;
417 }
418 return -1;
419 }
420
421 std::string line;
422 std::vector<size_t> badLineNumbers;
423 int errCode = 0; // 0 means success
424
425 bool inputStreamCanStillBeRead = std::getline(inputStream, line).good();
426 ++curLineNum; // we read the line; we can't put it back
428 // if (debug && errStrm != NULL) {
429 // std::ostringstream os;
430 // os << "readTriples: Got line: \"" << line << "\"" << std::endl;
431 // *errStrm << os.str ();
432 // }
433 size_t start, size;
434
435 const bool isCommentLine =
437 if (isCommentLine) {
438 // Move on to the next line, if there is a next line.
439 inputStreamCanStillBeRead = std::getline(inputStream, line).good();
440 ++curLineNum; // we read another line; we can't put it back
441 continue; // move on to the next line
442 } else { // not a comment line; should have a sparse matrix entry
443 const std::string theLine = line.substr(start, size);
444 // If the line has a valid sparse matrix entry, extract it and
445 // hand it off to the processTriple closure.
446 const int curErrCode =
448 if (curErrCode != 0) {
450 badLineNumbers.push_back(curLineNum);
451 } else {
453 }
455 inputStreamCanStillBeRead = std::getline(inputStream, line).good();
456 }
457 }
458 } // while there are lines to read and we need more triples
459
460 if (errCode != 0 && errStrm != NULL) {
461 const size_t numBadLines = badLineNumbers.size();
462 *errStrm << "Encountered " << numBadLines << " bad line"
463 << (numBadLines != size_t(1) ? "s" : "")
464 << ": [";
465 for (size_t k = 0; k < numBadLines; ++k) {
467 if (k + 1 < numBadLines) {
468 *errStrm << ", ";
469 }
470 }
471 *errStrm << "]" << endl;
472 }
473 if (!inputStream.eof() && inputStream.fail()) {
474 if (errCode == 0) {
475 errCode = -1;
476 }
477 if (errStrm != NULL) {
478 *errStrm << "The input stream is not at end-of-file, "
479 "but is in a bad state."
480 << endl;
481 }
482 }
483 return errCode;
484}
485
512template <class SC, class GO>
514 std::size_t& curLineNum,
515 std::size_t& numEntRead,
516 ::Teuchos::ArrayRCP<int>& sizeBuf,
517 ::Teuchos::ArrayRCP<char>& msgBuf,
518 std::vector<GO>& rowInds,
519 std::vector<GO>& colInds,
520 std::vector<SC>& vals,
521 const std::size_t maxNumEntPerMsg,
522 const int destRank,
523 const ::Teuchos::Comm<int>& comm,
524 const bool tolerant = false,
525 std::ostream* errStrm = NULL,
526 const bool debug = false) {
527 using std::endl;
528 using ::Teuchos::isend;
529 using ::Tpetra::Details::countPackTriples;
530 using ::Tpetra::Details::countPackTriplesCount;
531 using ::Tpetra::Details::packTriples;
532 using ::Tpetra::Details::packTriplesCount;
533
534 using ::KokkosKernels::ArithTraits;
535 // constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
536 // constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
537 constexpr int sizeTag = 42;
538 constexpr int msgTag = 43;
539 // constexpr int srcRank = 0;
540 int errCode = 0;
541
542 // This doesn't actually deallocate memory; it just changes the size
543 // back to zero, so that push_back starts over from the beginning.
544 rowInds.resize(0);
545 colInds.resize(0);
546 vals.resize(0);
547 // Closure that adds the new matrix entry to the above temp arrays.
548 auto processTriple = [&rowInds, &colInds, &vals](const GO rowInd, const GO colInd, const SC& val) {
549 try {
550 rowInds.push_back(rowInd);
551 colInds.push_back(colInd);
552 vals.push_back(val);
553 } catch (...) {
554 return -1;
555 }
556 return 0;
557 };
558 numEntRead = 0; // output argument
561 errStrm, debug);
562 if (debug && errStrm != NULL) {
563 std::ostringstream os;
564 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
565 << ", GO=" << typeid(GO).name() << ": "
566 << "readAndSendOneBatchOfTriples: readTriples read "
567 << numEntRead << " matrix entries, and returned errCode="
568 << errCode << "." << std::endl;
569 *errStrm << os.str();
570 }
571 if (numEntRead != rowInds.size() ||
572 numEntRead != colInds.size() ||
573 numEntRead != vals.size()) {
574 if (errStrm != NULL) {
575 *errStrm << "readTriples size results are not consistent. "
576 << "numEntRead = " << numEntRead
577 << ", rowInds.size() = " << rowInds.size()
578 << ", colInds.size() = " << colInds.size()
579 << ", and vals.size() = " << vals.size() << "."
580 << std::endl;
581 }
582 if (errCode == 0) {
583 errCode = -1;
584 }
585 }
586
587 // We don't consider reading having "failed" if we've reached
588 // end-of-file before reading maxNumEntPerMsg entries. It's OK if
589 // we got fewer triples than that. Furthermore, we have to send at
590 // least one message to the destination process, even if the read
591 // from the file failed.
592
593 if (numEntRead == 0 || errCode != 0) {
594 // Send a message size of zero to the receiving process, to tell
595 // it that we have no triples to send, or that there was an error
596 // reading. The latter just means that we "go through the
597 // motions," then broadcast the error code.
598 sizeBuf[0] = 0;
599 if (debug && errStrm != NULL) {
600 std::ostringstream os;
601 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
602 << ", GO=" << typeid(GO).name() << ": "
603 << "Post send (size=0, errCode=" << errCode << ") "
604 << "to " << destRank << " with tag " << sizeTag << endl;
605 *errStrm << os.str();
606 }
607 send(sizeBuf.getRawPtr(), 1, destRank, sizeTag, comm);
608 return errCode;
609 } else { // we read a nonzero # of triples, without error
610 const int numEnt = static_cast<int>(numEntRead);
611 int countSize = 0; // output argument
612 int triplesSize = 0; // output argument
613
615 // countSize should never be nonpositive, since we have to pack an
616 // integer size.
617 if (countSize <= 0 && errCode == 0) {
618 errCode = -1;
619 }
620
621 if (errCode != 0) {
622 // Send zero to the receiving process, to tell it about the error.
623 sizeBuf[0] = 0;
624 if (debug && errStrm != NULL) {
625 std::ostringstream os;
626 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
627 << ", GO=" << typeid(GO).name() << ": "
628 << "Post send (size=0, error case) to " << destRank
629 << " with tag " << sizeTag << endl;
630 *errStrm << os.str();
631 }
632 send(sizeBuf.getRawPtr(), 1, destRank, sizeTag, comm);
633 return errCode;
634 } else { // countPackTriplesCount succeeded
636 if (errCode != 0) {
637 // Send a message size of zero to the receiving process, to
638 // tell it that there was an error counting.
639 sizeBuf[0] = 0;
640 if (debug && errStrm != NULL) {
641 std::ostringstream os;
642 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
643 << ", GO=" << typeid(GO).name() << ": "
644 << "Post send (size=0, error case) to " << destRank
645 << " with tag " << sizeTag << endl;
646 *errStrm << os.str();
647 }
648 send(sizeBuf.getRawPtr(), 1, destRank, sizeTag, comm);
649 return errCode;
650 } else { // countPackTriples succeeded; message packed & ready to send
651 // Send the message size (in bytes). We can use a nonblocking
652 // send here, and try to overlap with message packing.
653 const int outBufSize = countSize + triplesSize;
654 sizeBuf[0] = outBufSize;
655 if (debug && errStrm != NULL) {
656 std::ostringstream os;
657 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
658 << ", GO=" << typeid(GO).name() << ": "
659 << "Post isend (size=" << sizeBuf[0] << ") to " << destRank
660 << " with tag " << sizeTag << endl;
661 *errStrm << os.str();
662 }
664
665 msgBuf.resize(outBufSize);
666 char* outBuf = msgBuf.getRawPtr();
667
668 // If anything goes wrong with packing, send the pack buffer
669 // anyway, since the receiving process expects a message.
670 int outBufCurPos = 0; // input/output argument
672 outBufCurPos, comm, errStrm);
673 if (errCode == 0) {
675 vals.data(), numEnt, outBuf,
677 errStrm);
678 }
679 if (debug && errStrm != NULL) {
680 std::ostringstream os;
681 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
682 << ", GO=" << typeid(GO).name() << ": "
683 << "Post isend (packed data) to " << destRank
684 << " with tag " << msgTag << endl;
685 *errStrm << os.str();
686 }
688
689 // Wait on the two messages. It doesn't matter in what order
690 // we send them, because they have different tags. The
691 // receiving process will wait on the first message first, in
692 // order to get the size of the second message.
693 if (debug && errStrm != NULL) {
694 std::ostringstream os;
695 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
696 << ", GO=" << typeid(GO).name() << ": "
697 << "Wait on isend (size)" << endl;
698 *errStrm << os.str();
699 }
700 sizeReq->wait();
701 if (debug && errStrm != NULL) {
702 std::ostringstream os;
703 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
704 << ", GO=" << typeid(GO).name() << ": "
705 << "Wait on isend (packed data)" << endl;
706 *errStrm << os.str();
707 }
708 msgReq->wait();
709
710 // This doesn't actually deallocate; it just resets sizes to zero.
711 rowInds.clear();
712 colInds.clear();
713 vals.clear();
714 }
715 }
716 }
717 return errCode;
718}
719
727
756template <class SC, class GO, class CommRequestPtr>
757int recvOneBatchOfTriples(std::vector<GO>& rowInds,
758 std::vector<GO>& colInds,
759 std::vector<SC>& vals,
760 int& numEnt,
761 ::Teuchos::ArrayRCP<int>& sizeBuf,
762 ::Teuchos::ArrayRCP<char>& msgBuf,
764 const int srcRank,
765 const ::Teuchos::Comm<int>& comm,
766 const bool tolerant = false,
767 std::ostream* errStrm = NULL,
768 const bool debug = false) {
769 using ::KokkosKernels::ArithTraits;
770 using ::Tpetra::Details::unpackTriples;
771 using ::Tpetra::Details::unpackTriplesCount;
772
774 // constexpr int msgTag = 43 + (ArithTraits<SC>::is_complex ? 100 : 0);
775 // constexpr int sizeTag = 42;
776 constexpr int msgTag = 43;
777 int errCode = 0; // return value
778 numEnt = 0; // output argument
779
780 // Wait on the ireceive we preposted before calling this function.
781 if (debug && errStrm != NULL) {
782 std::ostringstream os;
783 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
784 << ", GO=" << typeid(GO).name() << ": "
785 << "Wait on irecv (size)" << std::endl;
786 *errStrm << os.str();
787 }
788 sizeReq->wait();
790 const int inBufSize = sizeBuf[0];
791 if (debug && errStrm != NULL) {
792 std::ostringstream os;
793 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
794 << ", GO=" << typeid(GO).name() << ": "
795 << "Received size: sizeBuf[0]=" << sizeBuf[0] << std::endl;
796 *errStrm << os.str();
797 }
798
799 if (inBufSize == 0) {
800 numEnt = 0;
801 rowInds.resize(0);
802 colInds.resize(0);
803 vals.resize(0);
804 } else {
805 msgBuf.resize(inBufSize);
806 char* inBuf = msgBuf.getRawPtr();
807
808 if (debug && errStrm != NULL) {
809 std::ostringstream os;
810 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
811 << ", GO=" << typeid(GO).name() << ": "
812 << "Post irecv (packed data) "
813 << "from " << srcRank
814 << " with tag " << msgTag << std::endl;
815 *errStrm << os.str();
816 }
817 auto msgReq = ::Teuchos::ireceive(msgBuf, srcRank, msgTag, comm);
818 if (debug && errStrm != NULL) {
819 std::ostringstream os;
820 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
821 << ", GO=" << typeid(GO).name() << ": "
822 << "Wait on irecv (packed data)" << std::endl;
823 *errStrm << os.str();
824 }
825 msgReq->wait();
826
827 int inBufCurPos = 0; // output argument
829 numEnt, comm, errStrm);
830 if (errCode == 0) {
831 rowInds.resize(numEnt);
832 colInds.resize(numEnt);
833 vals.resize(numEnt);
835 rowInds.data(), colInds.data(),
836 vals.data(), numEnt, comm, errStrm);
837 }
838 }
839 return errCode;
840}
841
842} // namespace Impl
843
844//
845// SKIP DOWN TO HERE FOR "PUBLIC" INTERFACE
846//
847
883template <class SC, class GO>
884int readAndDealOutTriples(std::istream& inputStream, // only valid on Proc 0
885 std::size_t& curLineNum, // only valid on Proc 0
886 std::size_t& totalNumEntRead, // only valid on Proc 0
887 std::function<int(const GO, const GO, const SC&)> processTriple,
888 const std::size_t maxNumEntPerMsg,
889 const ::Teuchos::Comm<int>& comm,
890 const bool tolerant = false,
891 std::ostream* errStrm = NULL,
892 const bool debug = false) {
893 using KokkosKernels::ArithTraits;
894 using std::endl;
895 using std::size_t;
896
897 constexpr int srcRank = 0;
898 // constexpr int sizeTag = 42 + (ArithTraits<SC>::is_complex ? 100 : 0);
900 constexpr int sizeTag = 42;
901 // constexpr int msgTag = 43;
902 const int myRank = comm.getRank();
903 const int numProcs = comm.getSize();
904 int errCode = 0;
905
906 ::Teuchos::ArrayRCP<int> sizeBuf(1);
907 ::Teuchos::ArrayRCP<char> msgBuf; // to be resized as needed
908
909 // Temporary storage for reading & packing (on Process srcRank) or
910 // unpacking (every other process) triples.
911 std::vector<GO> rowInds;
912 std::vector<GO> colInds;
913 std::vector<SC> vals;
914 rowInds.reserve(maxNumEntPerMsg);
915 colInds.reserve(maxNumEntPerMsg);
916 vals.reserve(maxNumEntPerMsg);
917
918 totalNumEntRead = 0;
919 if (myRank == srcRank) {
920 // Loop around through all the processes, including this one, over
921 // and over until we reach the end of the file, or an error occurs.
922 int destRank = 0;
923 bool lastMessageWasLegitZero = false;
924 for (;
925 !inputStream.eof() && errCode == 0;
926 destRank = (destRank + 1) % numProcs) {
927 size_t curNumEntRead = 0; // output argument of below
928 if (destRank == srcRank) {
929 // We can read and process the triples directly. We don't
930 // need to use intermediate storage, because we don't need to
931 // pack and send the triples.
932 const int readErrCode =
933 Impl::readTriples<SC, GO>(inputStream, curLineNum, curNumEntRead,
935 errStrm, debug);
936 if (debug && errStrm != NULL) {
937 std::ostringstream os;
938 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
939 << ", GO=" << typeid(GO).name() << ": "
940 << "(dest=src) readTriples returned curNumEntRead="
941 << curNumEntRead << ", errCode=" << readErrCode << endl;
942 *errStrm << os.str();
943 }
945 } else {
946 if (false && debug && errStrm != NULL) {
947 std::ostringstream os;
948 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
949 << ", GO=" << typeid(GO).name() << ": "
950 << "Calling readAndSend... with destRank=" << destRank << endl;
951 *errStrm << os.str();
952 }
953 // Read, pack, and send the triples to destRank.
954 const int readAndSendErrCode =
955 Impl::readAndSendOneBatchOfTriples<SC, GO>(inputStream, curLineNum,
960 comm, tolerant, errStrm,
961 debug);
963 if (debug && errStrm != NULL) {
964 std::ostringstream os;
965 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
966 << ", GO=" << typeid(GO).name() << ": "
967 << "readAndSend... with destRank=" << destRank
968 << " returned curNumEntRead=" << curNumEntRead
969 << ", errCode=" << readAndSendErrCode << endl;
970 *errStrm << os.str();
971 }
973 if (readAndSendErrCode == 0 && curNumEntRead == 0) {
975 if (debug && errStrm != NULL) {
976 std::ostringstream os;
977 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
978 << ", GO=" << typeid(GO).name() << ": "
979 << "Last send to " << destRank << " with tag " << sizeTag
980 << " was legit zero, counts as termination" << endl;
981 *errStrm << os.str();
982 }
983 }
984 }
985 } // loop around through processes until done reading file, or error
986
987 // Loop around through the remaining processes, and tell them that
988 // we're done, by sending zero. If the last message we sent to
989 // destRank was zero, then skip that process, since it only
990 // expects one message of size zero. Note that destRank got
991 // incremented mod numProcs at end of loop, so we have to
992 // decrement it mod numProcs.
993 destRank = (destRank - 1) % numProcs;
994 if (destRank < 0) { // C mod operator does not promise positivity
996 }
997
999 for (int outRank = startRank; outRank < numProcs; ++outRank) {
1000 if (outRank != srcRank) {
1001 if (debug && errStrm != NULL) {
1002 std::ostringstream os;
1003 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
1004 << ", GO=" << typeid(GO).name() << ": "
1005 << "Post send (size, termination msg) to " << outRank
1006 << " with tag " << sizeTag << "(was last message legit zero? "
1007 << (lastMessageWasLegitZero ? "true" : "false") << ")" << endl;
1008 *errStrm << os.str();
1009 }
1010 sizeBuf[0] = 0;
1011 ::Teuchos::send(sizeBuf.getRawPtr(), 1, outRank, sizeTag, comm);
1012 }
1013 }
1014 } else {
1015 while (true) {
1016 // Prepost a message to receive the size (in bytes) of the
1017 // incoming packet.
1018 sizeBuf[0] = 0; // superfluous, but safe
1019 if (debug && errStrm != NULL) {
1020 std::ostringstream os;
1021 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
1022 << ", GO=" << typeid(GO).name() << ": "
1023 << "Post irecv (size) from " << srcRank
1024 << " with tag " << sizeTag << std::endl;
1025 *errStrm << os.str();
1026 }
1027 auto sizeReq = ::Teuchos::ireceive(sizeBuf, srcRank, sizeTag, comm);
1028
1029 int numEnt = 0; // output argument
1030 const int recvErrCode =
1031 Impl::recvOneBatchOfTriples(rowInds, colInds, vals, numEnt, sizeBuf,
1032 msgBuf, sizeReq, srcRank, comm, tolerant,
1033 errStrm, debug);
1034 if (debug && errStrm != NULL) {
1035 std::ostringstream os;
1036 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
1037 << ", GO=" << typeid(GO).name() << ": "
1038 << "recvOneBatchOfTriples returned numEnt=" << numEnt
1039 << ", errCode=" << recvErrCode << endl;
1040 *errStrm << os.str();
1041 }
1043
1044 if (numEnt != static_cast<int>(rowInds.size()) ||
1045 numEnt != static_cast<int>(colInds.size()) ||
1046 numEnt != static_cast<int>(vals.size())) {
1047 errCode = (errCode == 0) ? -1 : errCode;
1048 if (errStrm != NULL) {
1049 *errStrm << "recvOneBatchOfTriples produced inconsistent data sizes. "
1050 << "numEnt = " << numEnt
1051 << ", rowInds.size() = " << rowInds.size()
1052 << ", colInds.size() = " << colInds.size()
1053 << ", vals.size() = " << vals.size() << "."
1054 << endl;
1055 }
1056 } // if sizes inconsistent
1057
1058 // Sending zero items is how Process srcRank tells this process
1059 // that it (Process srcRank) is done sending out data.
1060 if (numEnt == 0) {
1061 break;
1062 }
1063
1064 for (int k = 0; k < numEnt && errCode == 0; ++k) {
1065 const int curErrCode = processTriple(rowInds[k], colInds[k], vals[k]);
1066 errCode = (curErrCode == 0) ? errCode : curErrCode;
1067 }
1068 } // while we still get messages from srcRank
1069 }
1070
1071 if (debug && errStrm != NULL) {
1072 std::ostringstream os;
1073 os << "Proc " << comm.getRank() << ", SC=" << typeid(SC).name()
1074 << ", GO=" << typeid(GO).name() << ": "
1075 << "Done with send/recv loop" << endl;
1076 *errStrm << os.str();
1077 }
1078 // Do a bitwise OR to get an error code that is nonzero if and only
1079 // if any process' local error code is nonzero.
1080 using ::Teuchos::outArg;
1081 using ::Teuchos::REDUCE_BOR;
1082 using ::Teuchos::reduceAll;
1083 const int lclErrCode = errCode;
1085 return errCode;
1086}
1087
1088} // namespace Details
1089} // namespace Tpetra
1090
1091#endif // TPETRA_DETAILS_READTRIPLES_HPP
bool readComplexData(std::istream &istr, OrdinalType &rowIndex, OrdinalType &colIndex, RealType &realPart, RealType &imagPart, const std::size_t lineNumber, const bool tolerant)
Read "<rowIndex> <colIndex> <realPart> <imagPart>" from a line.
int readTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &numTriplesRead, std::function< int(const GO, const GO, const SC &)> processTriple, const std::size_t maxNumTriplesToRead, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Read at most numTriplesToRead triples from the given Matrix Market input stream, and pass along any r...
int recvOneBatchOfTriples(std::vector< GO > &rowInds, std::vector< GO > &colInds, std::vector< SC > &vals, int &numEnt, ::Teuchos::ArrayRCP< int > &sizeBuf, ::Teuchos::ArrayRCP< char > &msgBuf, CommRequestPtr &sizeReq, const int srcRank, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Read at most maxNumEntPerMsg sparse matrix entries from the input stream, and send them to the proces...
int readAndSendOneBatchOfTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &numEntRead, ::Teuchos::ArrayRCP< int > &sizeBuf, ::Teuchos::ArrayRCP< char > &msgBuf, std::vector< GO > &rowInds, std::vector< GO > &colInds, std::vector< SC > &vals, const std::size_t maxNumEntPerMsg, const int destRank, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Read at most maxNumEntPerMsg sparse matrix entries from the input stream, and send them to the proces...
int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
Struct that holds views of the contents of a CrsMatrix.
Implementation details of Tpetra.
int packTriplesCount(const int, char[], const int, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Pack the count (number) of matrix triples.
int countPackTriplesCount(const ::Teuchos::Comm< int > &, int &size, std::ostream *errStrm)
Compute the buffer size required by packTriples for packing the number of matrix entries ("triples").
int unpackTriplesCount(const char[], const int, int &, int &, const ::Teuchos::Comm< int > &, std::ostream *errStrm)
Unpack just the count of triples from the given input buffer.
int readAndDealOutTriples(std::istream &inputStream, std::size_t &curLineNum, std::size_t &totalNumEntRead, std::function< int(const GO, const GO, const SC &)> processTriple, const std::size_t maxNumEntPerMsg, const ::Teuchos::Comm< int > &comm, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
On Process 0 in the given communicator, read sparse matrix entries (in chunks of at most maxNumEntPer...
Namespace Tpetra contains the class and methods constituting the Tpetra library.
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...
Implementation of the readLine stand-alone function in this namespace (see below).
static int readLine(std::function< int(const GO, const GO, const SC &)> processTriple, const std::string &line, const std::size_t lineNumber, const bool tolerant=false, std::ostream *errStrm=NULL, const bool debug=false)
Take a line from the Matrix Market file or input stream, and process the sparse matrix entry in that ...