9Communicator::Communicator()
11 bInitialized_ =
false;
12 idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
14 sendCnt_ = recvCnt_ = 0;
18bool Communicator::initialize(StateP state,
int argc,
char** argv)
20 idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
24 beginTime_ = lastTime_ = MPI::Wtime();
25 ECF_LOG(state_, 2,
"Process " + uint2str(mpiGlobalRank_) +
" of "
26 + uint2str(mpiGlobalSize_) +
" on " + processorName_);
32 MPI::Init(argc, argv);
34 mpiGlobalSize_ = MPI::COMM_WORLD.Get_size();
35 mpiGlobalRank_ = MPI::COMM_WORLD.Get_rank();
43 char *pp, processor_name[MPI_MAX_PROCESSOR_NAME];
44 pp = &processor_name[0];
46 MPI_Get_processor_name(processor_name, &namelen);
47 processorName_ = processor_name;
49 beginTime_ = lastTime_ = MPI::Wtime();
51 ECF_LOG(state_, 2,
"Process " + uint2str(mpiGlobalRank_) +
" of "
52 + uint2str(mpiGlobalSize_) +
" on " + processorName_);
62uint Communicator::createDemeCommunicator(uint nDemes)
65 uint myColor = mpiGlobalRank_ % nDemes;
66 demeMasters.resize(nDemes);
67 for(uint i = 0; i < nDemes; i++)
70 demeComm_ = MPI::COMM_WORLD.Split(myColor, mpiGlobalRank_);
74 std::stringstream log;
75 log <<
"Global process " << mpiGlobalRank_ <<
" joined deme communicator with index ";
76 log << myColor <<
" (local rank: " << mpiRank_ <<
" of " << mpiSize_ <<
")";
77 ECF_LOG(state_, 2, log.str());
83bool Communicator::finalize()
88 endTime_ = MPI::Wtime();
90 std::stringstream times;
91 times <<
"Process " << mpiGlobalRank_ <<
": total MPI time: " << endTime_ - beginTime_;
92 times <<
", COMP: " << compTime_;
93 times <<
", IDLE: " << idleTime_;
94 times <<
", SEND: " << sendTime_;
95 times <<
", RECV: " << recvTime_;
96 times <<
", PACK: " << packTime_;
97 times <<
", UNPACK: " << unpackTime_ << std::endl;
100 if(mpiGlobalRank_ == 0) {
104 for(uint iProcess = 1; iProcess < mpiGlobalSize_; iProcess++) {
106 uint size = status.Get_count(MPI::CHAR);
107 message.resize(size);
108 frameworkComm_.Recv(&message[0], size, MPI::CHAR, iProcess, T_FINAL, status);
112 ECF_LOG(state_, 2, times.str());
113 state_->getLogger()->saveTo(
true);
116 std::string message = times.str();
117 frameworkComm_.Send(&message[0], (
int) message.size(), MPI::CHAR, 0, T_FINAL);
120 if(!state_->getBatchMode())
132 currentTime_ = MPI::Wtime();
133 double elapsed = currentTime_ - lastTime_;
134 lastTime_ = currentTime_;
138 idleTime_ += elapsed;
141 compTime_ += elapsed;
144 sendTime_ += elapsed;
147 recvTime_ += elapsed;
150 packTime_ += elapsed;
153 unpackTime_ += elapsed;
157 return 1000 * elapsed;
161uint Communicator::getDemeMaster(uint iDeme)
163 return demeMasters[iDeme];
167uint Communicator::getLastSource()
169 return status_.Get_source();
173void Communicator::synchronize()
175 MPI::COMM_WORLD.Barrier();
180bool Communicator::messageWaiting(uint iProcess, uint tag)
182 return demeComm_.Iprobe(iProcess, tag, status_);
186bool Communicator::sendControlMessage(uint iProcess,
int control)
188 demeComm_.Send(&control,
sizeof(
int), MPI::BYTE, iProcess, T_CONTROL);
193int Communicator::recvControlMessage(uint iProcess)
196 demeComm_.Recv(&control,
sizeof(
int), MPI::BYTE, iProcess, T_CONTROL);
201bool Communicator::sendTerminateMessage(uint iProcess,
bool termination)
203 ECF_LOG(state_, logLevel_,
"Sending terminate message to process " + uint2str(iProcess));
204 uint tag = (termination ==
true) ? T_TERMINATE : T_CONTINUE;
205 frameworkComm_.Send(&termination,
sizeof(
bool), MPI::BYTE, iProcess, tag);
210bool Communicator::recvTerminateMessage(uint iProcess)
213 frameworkComm_.Recv(&termination,
sizeof(
bool), MPI::BYTE, iProcess, MPI::ANY_TAG, controlStatus_);
214 if(controlStatus_.Get_tag() == T_TERMINATE)
216 if(controlStatus_.Get_tag() == T_CONTINUE)
222bool Communicator::checkTerminationMessage(uint master)
225 if(controlStatus_.Get_tag() == T_TERMINATE) {
229 recvTerminateMessage(master);
237bool Communicator::sendIndividuals(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
241 XMLNode xAll, xIndividual;
242 xAll = XMLNode::createXMLTopNode(
"Pack");
244 if(nIndividuals == 0)
245 nIndividuals = (uint) pool.size();
246 xAll.addAttribute(
"size", uint2str(nIndividuals).c_str());
248 for(uint ind = 0; ind < nIndividuals; ind++) {
249 pool[ind]->write(xIndividual);
250 xIndividual.addAttribute(
"i", uint2str(pool[ind]->index).c_str());
251 xIndividual.addAttribute(
"c", uint2str(pool[ind]->cid).c_str());
252 xAll.addChild(xIndividual);
254 char *message = xAll.createXMLString(0);
257 double createTime =
time(PACK);
260 demeComm_.Send(message, (
int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
262 double sendTime =
time(SEND);
264 std::stringstream log;
265 log <<
"sent " << nIndividuals <<
" individuals, " << strlen(message) <<
" bytes (P: " << createTime <<
" | S: " << sendTime <<
")";
266 ECF_LOG(state_, logLevel_, log.str());
268 freeXMLString(message);
275bool Communicator::sendIndividualsGlobal(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
279 XMLNode xAll, xIndividual;
280 xAll = XMLNode::createXMLTopNode(
"Pack");
282 if(nIndividuals == 0)
283 nIndividuals = (uint) pool.size();
284 xAll.addAttribute(
"size", uint2str(nIndividuals).c_str());
286 for(uint ind = 0; ind < nIndividuals; ind++) {
287 pool[ind]->write(xIndividual);
288 xIndividual.addAttribute(
"i", uint2str(pool[ind]->index).c_str());
289 xIndividual.addAttribute(
"c", uint2str(pool[ind]->cid).c_str());
290 xAll.addChild(xIndividual);
292 char *message = xAll.createXMLString(0);
294 double createTime =
time(PACK);
296 frameworkComm_.Send(message, (
int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
298 double sendTime =
time(SEND);
300 std::stringstream log;
301 log <<
"sent " << nIndividuals <<
" individuals (global), " << strlen(message) <<
" bytes (P: " << createTime <<
" | S: " << sendTime <<
")";
302 ECF_LOG(state_, logLevel_, log.str());
304 freeXMLString(message);
312uint Communicator::recvDemeIndividuals(std::vector<IndividualP>& deme, uint iProcess)
314 XMLNode xAll, xIndividual;
320 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
322 double idle =
time(IDLE);
324 uint length = status.Get_count(MPI::CHAR);
325 message.resize(length + 1);
326 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
328 double recv =
time(RECV);
330 xAll = XMLNode::parseString(message.c_str(),
"Pack");
331 uint nIndividuals = atoi(xAll.getAttribute(
"size"));
333 for(uint i = 0; i < nIndividuals; i++) {
335 xIndividual = xAll.getChildNode(i);
336 index = atoi(xIndividual.getAttributeValue(1));
337 cid = atoi(xIndividual.getAttributeValue(2));
338 deme[index]->read(xIndividual);
339 deme[index]->cid = cid;
343 double read =
time(UNPACK);
345 std::stringstream log;
346 log <<
"received " << nIndividuals <<
" individuals, " << length <<
" bytes (";
347 log <<
"I: " << idle <<
" | R: " << recv <<
" | U: " << read <<
")";
348 ECF_LOG(state_, logLevel_, log.str());
356std::vector<IndividualP> Communicator::recvIndividuals(uint iProcess)
358 XMLNode xAll, xIndividual;
361 std::vector<IndividualP> pack;
365 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
367 double idle =
time(IDLE);
369 uint length = status.Get_count(MPI::CHAR);
370 message.resize(length + 1);
371 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
373 double recv =
time(RECV);
375 xAll = XMLNode::parseString(message.c_str(),
"Pack");
376 uint nIndividuals = atoi(xAll.getAttribute(
"size"));
378 for(uint i = 0; i < nIndividuals; i++) {
380 xIndividual = xAll.getChildNode(i);
381 pack.push_back((IndividualP)
new Individual(state_));
382 index = atoi(xIndividual.getAttributeValue(1));
383 cid = atoi(xIndividual.getAttributeValue(2));
385 pack[i]->index = index;
387 pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
388 pack[i]->read(xIndividual);
392 double read =
time(UNPACK);
394 std::stringstream log;
395 log <<
"received " << nIndividuals <<
" individuals, " << length <<
" bytes (";
396 log <<
"I: " << idle <<
" | R: " << recv <<
" | U: " << read <<
")";
397 ECF_LOG(state_, logLevel_, log.str());
404std::vector<IndividualP> Communicator::recvIndividualsGlobal(uint iProcess)
406 XMLNode xAll, xIndividual;
409 std::vector<IndividualP> pack;
415 double idle =
time(IDLE);
417 uint length = status.Get_count(MPI::CHAR);
418 message.resize(length);
419 frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
421 double recv =
time(RECV);
423 xAll = XMLNode::parseString(message.c_str(),
"Pack");
424 uint nIndividuals = atoi(xAll.getAttribute(
"size"));
426 for(uint i = 0; i < nIndividuals; i++) {
427 xIndividual = xAll.getChildNode(i);
428 pack.push_back((IndividualP)
new Individual(state_));
429 index = atoi(xIndividual.getAttributeValue(1));
430 cid = atoi(xIndividual.getAttributeValue(2));
432 pack[i]->index = index;
434 pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
435 pack[i]->read(xIndividual);
439 double read =
time(UNPACK);
441 std::stringstream log;
442 log <<
"received " << nIndividuals <<
" individuals, " << length <<
" bytes (";
443 log <<
"I: " << idle <<
" | R: " << recv <<
" | U: " << read <<
")";
444 ECF_LOG(state_, logLevel_, log.str());
454uint Communicator::recvReplaceIndividuals(std::vector<IndividualP>& pool, uint iProcess)
456 XMLNode xAll, xIndividual;
461 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
463 double idle =
time(IDLE);
465 uint length = status.Get_count(MPI::CHAR);
466 char *message =
new char[length + 1];
467 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
470 double recv =
time(RECV);
472 xAll = XMLNode::parseString(message,
"Pack");
473 uint nIndividuals = atoi(xAll.getAttribute(
"size"));
474 uint poolSize = (uint) pool.size();
476 if(poolSize < nIndividuals) {
477 pool.resize(nIndividuals);
478 for(uint i = poolSize; i < nIndividuals; i++) {
479 pool[i] = (IndividualP)
new Individual(state_);
480 pool[i]->
fitness = (FitnessP) state_->getFitnessObject()->copy();
485 for(uint i = 0; i < nIndividuals; i++) {
486 xIndividual = xAll.getChildNode(i);
487 index = atoi(xIndividual.getAttributeValue(1));
488 cid = atoi(xIndividual.getAttributeValue(2));
489 pool[i]->index = index;
491 pool[i]->read(xIndividual);
494 double read =
time(UNPACK);
496 std::stringstream log;
497 log <<
"received " << nIndividuals <<
" individuals, " << length <<
" bytes (I: " << idle <<
" | R: " << recv <<
" | U: " << read <<
")";
498 ECF_LOG(state_, logLevel_, log.str());
510bool Communicator::sendFitness(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
514 XMLNode xAll, xIndividual, xFitness;
515 xAll = XMLNode::createXMLTopNode(
"Pack");
517 if(nIndividuals == 0)
518 nIndividuals = (uint) pool.size();
519 xAll.addAttribute(
"size", uint2str(nIndividuals).c_str());
521 for(uint ind = 0; ind < nIndividuals; ind++) {
522 xIndividual = XMLNode::createXMLTopNode(
"Individual");
523 xIndividual.addAttribute(
"i", uint2str(pool[ind]->index).c_str());
524 xIndividual.addAttribute(
"c", uint2str(pool[ind]->cid).c_str());
526 pool[ind]->fitness->write(xFitness);
527 xIndividual.addChild(xFitness);
528 xAll.addChild(xIndividual);
530 char *message = xAll.createXMLString(0);
532 double pack =
time(PACK);
534 demeComm_.Send(message, (
int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
536 double send =
time(SEND);
538 std::stringstream log;
539 log <<
"sent " << nIndividuals <<
" fitness objects, " << strlen(message) <<
" bytes (P: " << pack <<
" | S: " << send <<
")";
540 ECF_LOG(state_, logLevel_, log.str());
542 freeXMLString(message);
551uint Communicator::recvDemeFitness(std::vector<IndividualP>& deme, uint iProcess)
553 XMLNode xAll, xIndividual, xFitness;
559 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
561 double idle =
time(IDLE);
563 uint length = status.Get_count(MPI::CHAR);
564 message.resize(length);
565 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
567 double recv =
time(RECV);
569 xAll = XMLNode::parseString(message.c_str(),
"Pack");
570 uint nIndividuals = atoi(xAll.getAttribute(
"size"));
572 for(uint i = 0; i < nIndividuals; i++) {
573 xIndividual = xAll.getChildNode(i);
574 index = atoi(xIndividual.getAttributeValue(0));
575 cid = atoi(xIndividual.getAttributeValue(1));
576 xFitness = xIndividual.getChildNode(0);
577 deme[index]->fitness->read(xFitness);
578 deme[index]->fitness->cid = cid;
583 double read =
time(UNPACK);
585 std::stringstream log;
586 log <<
"received " << nIndividuals <<
" fitnes objects, " << length <<
" bytes (I: " << idle <<
" | R: " << recv <<
" | U: " << read <<
")";
587 ECF_LOG(state_, logLevel_, log.str());
597std::vector<uint> Communicator::recvFitnessVector(std::vector<IndividualP>& deme, uint iProcess)
599 XMLNode xAll, xIndividual, xFitness;
602 std::vector<uint> indices;
606 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
608 double idle =
time(IDLE);
610 uint length = status.Get_count(MPI::CHAR);
611 message.resize(length);
612 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
614 double recv =
time(RECV);
616 xAll = XMLNode::parseString(message.c_str(),
"Pack");
617 uint nIndividuals = atoi(xAll.getAttribute(
"size"));
619 for(uint i = 0; i < nIndividuals; i++) {
620 xIndividual = xAll.getChildNode(i);
621 index = atoi(xIndividual.getAttributeValue(0));
622 cid = atoi(xIndividual.getAttributeValue(1));
623 xFitness = xIndividual.getChildNode(0);
625 deme[index]->fitness->read(xFitness);
626 deme[index]->fitness->cid = cid;
627 indices.push_back(index);
631 double read =
time(UNPACK);
633 std::stringstream log;
634 log <<
"received " << nIndividuals <<
" fitnes objects, " << length <<
" bytes (I: " << idle <<
" | R: " << recv <<
" | U: " << read <<
")";
635 ECF_LOG(state_, logLevel_, log.str());
642bool Communicator::sendValuesGlobal(std::vector<double> values, uint iProcess)
646 frameworkComm_.Send(&values[0], (
int) values.size(), MPI::DOUBLE, iProcess, T_VALUES);
650 std::stringstream log;
651 log <<
"sent " << values.size() <<
" doubles";
652 ECF_LOG(state_, logLevel_, log.str());
659std::vector<double> Communicator::recvValuesGlobal(uint iProcess)
661 std::vector<double> values;
668 double idle =
time(IDLE);
670 uint size = status.Get_count(MPI::DOUBLE);
672 frameworkComm_.Recv(&values[0], size, MPI::DOUBLE, iProcess, T_VALUES, status);
674 double recv =
time(RECV);
676 std::stringstream log;
677 log <<
"received " << values.size() <<
" doubles";
678 ECF_LOG(state_, logLevel_, log.str());
685std::string Communicator::recvLogsGlobal()
687 std::string logs =
"", message;
694 uint iProcess = status.Get_source();
695 uint length = status.Get_count(MPI::CHAR);
696 message.resize(length);
697 frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_LOGS, status);
704 std::stringstream log;
705 log <<
"received " << logCount <<
" logs";
706 ECF_LOG(state_, logLevel_, log.str());
713bool Communicator::sendLogsGlobal(std::string logs, uint iProcess,
bool blocking)
717 MPI::Request request;
720 frameworkComm_.Send(&logs[0], (
int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
722 request =
frameworkComm_.Isend(&logs[0], (
int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
726 std::stringstream log;
727 log <<
"sent " << logs.size() <<
" log chars";
728 ECF_LOG(state_, logLevel_, log.str());
735bool Communicator::sendDataGlobal(voidP data, uint size, uint iProcess)
739 frameworkComm_.Send(data.get(), size, MPI::BYTE, iProcess, T_DATA);
743 std::stringstream log;
744 log <<
"sent " << size <<
" bytes";
745 ECF_LOG(state_, logLevel_, log.str());
752voidP Communicator::recvDataGlobal(uint iProcess)
763 uint size = status.Get_count(MPI::BYTE);
764 data = (voidP)
new char[size /
sizeof(
char) + 1];
765 frameworkComm_.Recv(data.get(), size, MPI::BYTE, iProcess, T_DATA, status);
769 std::stringstream log;
770 log <<
"received " << size <<
" bytes";
771 ECF_LOG(state_, logLevel_, log.str());
double time(enum timing T)
MPI::Intercomm frameworkComm_
Individual class - inherits a vector of Genotype objects.
FitnessP fitness
sptr to individual's fitness object