ECF 1.5
Communicator.cpp
1#include "ECF_base.h"
2#include <string.h>
3
4namespace Comm
5{
6
7#ifdef _MPI
8
9Communicator::Communicator()
10{
11 bInitialized_ = false;
12 idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
13 logLevel_ = 4;
14 sendCnt_ = recvCnt_ = 0;
15}
16
17
18bool Communicator::initialize(StateP state, int argc, char** argv)
19{
20 idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
21
22 // multiple runs
23 if(bInitialized_) {
24 beginTime_ = lastTime_ = MPI::Wtime();
25 ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of "
26 + uint2str(mpiGlobalSize_) + " on " + processorName_);
27 return true;
28 }
29
30 state_ = state;
31
32 MPI::Init(argc, argv);
33
34 mpiGlobalSize_ = MPI::COMM_WORLD.Get_size();
35 mpiGlobalRank_ = MPI::COMM_WORLD.Get_rank();
36
37 demeComm_ = MPI::COMM_WORLD.Dup();
38 mpiSize_ = demeComm_.Get_size();
39 mpiRank_ = demeComm_.Get_rank();
40
41 frameworkComm_ = MPI::COMM_WORLD.Dup();
42
43 char *pp, processor_name[MPI_MAX_PROCESSOR_NAME];
44 pp = &processor_name[0];
45 int namelen;
46 MPI_Get_processor_name(processor_name, &namelen);
47 processorName_ = processor_name;
48
49 beginTime_ = lastTime_ = MPI::Wtime();
50
51 ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of "
52 + uint2str(mpiGlobalSize_) + " on " + processorName_);
53
54 bInitialized_ = true;
55
56 return true;
57}
58
59
60// zove se na pocetku prije evolucije
61// stvara se posebni komunikator za svaki deme (lokalni kontekst za algoritam)
62uint Communicator::createDemeCommunicator(uint nDemes)
63{
64 // TODO: parametrize deme distribution among processes
65 uint myColor = mpiGlobalRank_ % nDemes;
66 demeMasters.resize(nDemes);
67 for(uint i = 0; i < nDemes; i++)
68 demeMasters[i] = i;
69
70 demeComm_ = MPI::COMM_WORLD.Split(myColor, mpiGlobalRank_);
71 mpiSize_ = demeComm_.Get_size();
72 mpiRank_ = demeComm_.Get_rank();
73
74 std::stringstream log;
75 log << "Global process " << mpiGlobalRank_ << " joined deme communicator with index ";
76 log << myColor << " (local rank: " << mpiRank_ << " of " << mpiSize_ << ")";
77 ECF_LOG(state_, 2, log.str());
78
79 return myColor; // deme index
80}
81
82
83bool Communicator::finalize()
84{
85 if(!bInitialized_)
86 return true;
87
88 endTime_ = MPI::Wtime();
89
90 std::stringstream times;
91 times << "Process " << mpiGlobalRank_ << ": total MPI time: " << endTime_ - beginTime_;
92 times << ", COMP: " << compTime_;
93 times << ", IDLE: " << idleTime_;
94 times << ", SEND: " << sendTime_;
95 times << ", RECV: " << recvTime_;
96 times << ", PACK: " << packTime_;
97 times << ", UNPACK: " << unpackTime_ << std::endl;
98
99 // collect and log process times at process 0
100 if(mpiGlobalRank_ == 0) {
101 std::string message;
102 MPI::Status status;
103
104 for(uint iProcess = 1; iProcess < mpiGlobalSize_; iProcess++) {
105 frameworkComm_.Probe(iProcess, T_FINAL, status);
106 uint size = status.Get_count(MPI::CHAR);
107 message.resize(size);
108 frameworkComm_.Recv(&message[0], size, MPI::CHAR, iProcess, T_FINAL, status);
109
110 times << message;
111 }
112 ECF_LOG(state_, 2, times.str());
113 state_->getLogger()->saveTo(true);
114 }
115 else {
116 std::string message = times.str();
117 frameworkComm_.Send(&message[0], (int) message.size(), MPI::CHAR, 0, T_FINAL);
118 }
119
120 if(!state_->getBatchMode())
121 MPI::Finalize();
122
123 return true;
124}
125
130double Communicator::time(enum timing T)
131{
132 currentTime_ = MPI::Wtime();
133 double elapsed = currentTime_ - lastTime_;
134 lastTime_ = currentTime_;
135
136 switch(T) {
137 case IDLE:
138 idleTime_ += elapsed;
139 break;
140 case COMP:
141 compTime_ += elapsed;
142 break;
143 case SEND:
144 sendTime_ += elapsed;
145 break;
146 case RECV:
147 recvTime_ += elapsed;
148 break;
149 case PACK:
150 packTime_ += elapsed;
151 break;
152 case UNPACK:
153 unpackTime_ += elapsed;
154 break;
155 }
156
157 return 1000 * elapsed;
158}
159
160
161uint Communicator::getDemeMaster(uint iDeme)
162{
163 return demeMasters[iDeme];
164}
165
166
167uint Communicator::getLastSource()
168{
169 return status_.Get_source();
170}
171
172
173void Communicator::synchronize()
174{
175 MPI::COMM_WORLD.Barrier();
176}
177
178
179// provjerava (trenutno) ima li neka pristigla poruka
180bool Communicator::messageWaiting(uint iProcess, uint tag)
181{
182 return demeComm_.Iprobe(iProcess, tag, status_);
183}
184
185
186bool Communicator::sendControlMessage(uint iProcess, int control)
187{
188 demeComm_.Send(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
189 return true;
190}
191
192
193int Communicator::recvControlMessage(uint iProcess)
194{
195 int control;
196 demeComm_.Recv(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
197 return control;
198}
199
200
201bool Communicator::sendTerminateMessage(uint iProcess, bool termination)
202{
203 ECF_LOG(state_, logLevel_, "Sending terminate message to process " + uint2str(iProcess));
204 uint tag = (termination == true) ? T_TERMINATE : T_CONTINUE;
205 frameworkComm_.Send(&termination, sizeof(bool), MPI::BYTE, iProcess, tag);
206 return true;
207}
208
209
210bool Communicator::recvTerminateMessage(uint iProcess)
211{
212 bool termination;
213 frameworkComm_.Recv(&termination, sizeof(bool), MPI::BYTE, iProcess, MPI::ANY_TAG, controlStatus_);
214 if(controlStatus_.Get_tag() == T_TERMINATE)
215 termination = true;
216 if(controlStatus_.Get_tag() == T_CONTINUE)
217 termination = false;
218 return termination;
219}
220
221
222bool Communicator::checkTerminationMessage(uint master)
223{
224 if(frameworkComm_.Iprobe(master, MPI::ANY_TAG, controlStatus_)) {
225 if(controlStatus_.Get_tag() == T_TERMINATE) {
226 return true;
227 }
228 else
229 recvTerminateMessage(master);
230 }
231 return false;
232}
233
234
235// salje prvih nIndividuals jedinki iz zadanog vektora (sve ako je nInvididuals == 0)
236// svakoj jedinki se dodaje njen indeks u _deme_ (ne iz vektora pool!)
237bool Communicator::sendIndividuals(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
238{
239 time(COMP);
240
241 XMLNode xAll, xIndividual;
242 xAll = XMLNode::createXMLTopNode("Pack");
243
244 if(nIndividuals == 0)
245 nIndividuals = (uint) pool.size();
246 xAll.addAttribute("size", uint2str(nIndividuals).c_str());
247
248 for(uint ind = 0; ind < nIndividuals; ind++) {
249 pool[ind]->write(xIndividual);
250 xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
251 xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
252 xAll.addChild(xIndividual);
253 }
254 char *message = xAll.createXMLString(0);
255 //std::string message = m;
256
257 double createTime = time(PACK);
258
259 //demeComm_.Send(message.data(), (int) message.length(), MPI::CHAR, iProcess, 0);
260 demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
261
262 double sendTime = time(SEND);
263
264 std::stringstream log;
265 log << "sent " << nIndividuals << " individuals, " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
266 ECF_LOG(state_, logLevel_, log.str());
267
268 freeXMLString(message);
269
270 return true;
271}
272
273
274// isto kao i gornja fja, samo sa frameworkComm_
275bool Communicator::sendIndividualsGlobal(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
276{
277 time(COMP);
278
279 XMLNode xAll, xIndividual;
280 xAll = XMLNode::createXMLTopNode("Pack");
281
282 if(nIndividuals == 0)
283 nIndividuals = (uint) pool.size();
284 xAll.addAttribute("size", uint2str(nIndividuals).c_str());
285
286 for(uint ind = 0; ind < nIndividuals; ind++) {
287 pool[ind]->write(xIndividual);
288 xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
289 xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
290 xAll.addChild(xIndividual);
291 }
292 char *message = xAll.createXMLString(0);
293
294 double createTime = time(PACK);
295
296 frameworkComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
297
298 double sendTime = time(SEND);
299
300 std::stringstream log;
301 log << "sent " << nIndividuals << " individuals (global), " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
302 ECF_LOG(state_, logLevel_, log.str());
303
304 freeXMLString(message);
305
306 return true;
307}
308
309
310// prima jednike na mjesto postojecih u deme objektu
311// cita se indeks iz poruke i jedinka se stavlja na odgovarajuce mjesto u deme vektoru
312uint Communicator::recvDemeIndividuals(std::vector<IndividualP>& deme, uint iProcess)
313{
314 XMLNode xAll, xIndividual;
315 MPI::Status status;
316 std::string message;
317
318 time(COMP);
319
320 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
321
322 double idle = time(IDLE);
323
324 uint length = status.Get_count(MPI::CHAR);
325 message.resize(length + 1);
326 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
327
328 double recv = time(RECV);
329
330 xAll = XMLNode::parseString(message.c_str(), "Pack");
331 uint nIndividuals = atoi(xAll.getAttribute("size"));
332 uint index, cid;
333 for(uint i = 0; i < nIndividuals; i++) {
334 //xIndividual = xAll.getChildNode("Individual", i);
335 xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
336 index = atoi(xIndividual.getAttributeValue(1));
337 cid = atoi(xIndividual.getAttributeValue(2));
338 deme[index]->read(xIndividual);
339 deme[index]->cid = cid;
340 }
341 status_ = status;
342
343 double read = time(UNPACK);
344
345 std::stringstream log;
346 log << "received " << nIndividuals << " individuals, " << length << " bytes (";
347 log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
348 ECF_LOG(state_, logLevel_, log.str());
349
350 return nIndividuals;
351}
352
353
354// prima jedinke u novi vektor
355// ujedno stvara i inicijalizira pristigle jedinke i stvara njihov fitness
356std::vector<IndividualP> Communicator::recvIndividuals(uint iProcess)
357{
358 XMLNode xAll, xIndividual;
359 MPI::Status status;
360 std::string message;
361 std::vector<IndividualP> pack;
362
363 time(COMP);
364
365 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
366
367 double idle = time(IDLE);
368
369 uint length = status.Get_count(MPI::CHAR);
370 message.resize(length + 1);
371 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
372
373 double recv = time(RECV);
374
375 xAll = XMLNode::parseString(message.c_str(), "Pack");
376 uint nIndividuals = atoi(xAll.getAttribute("size"));
377 uint index, cid;
378 for(uint i = 0; i < nIndividuals; i++) {
379 //xIndividual = xAll.getChildNode("Individual", i);
380 xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
381 pack.push_back((IndividualP) new Individual(state_));
382 index = atoi(xIndividual.getAttributeValue(1));
383 cid = atoi(xIndividual.getAttributeValue(2));
384
385 pack[i]->index = index;
386 pack[i]->cid = cid;
387 pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
388 pack[i]->read(xIndividual);
389 }
390 status_ = status;
391
392 double read = time(UNPACK);
393
394 std::stringstream log;
395 log << "received " << nIndividuals << " individuals, " << length << " bytes (";
396 log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
397 ECF_LOG(state_, logLevel_, log.str());
398
399 return pack;
400}
401
402
403// isto kao gornja fja, samo uz frameworkComm_
404std::vector<IndividualP> Communicator::recvIndividualsGlobal(uint iProcess)
405{
406 XMLNode xAll, xIndividual;
407 MPI::Status status;
408 std::string message;
409 std::vector<IndividualP> pack;
410
411 time(COMP);
412
413 frameworkComm_.Probe(iProcess, T_DEFAULT, status);
414
415 double idle = time(IDLE);
416
417 uint length = status.Get_count(MPI::CHAR);
418 message.resize(length);
419 frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
420
421 double recv = time(RECV);
422
423 xAll = XMLNode::parseString(message.c_str(), "Pack");
424 uint nIndividuals = atoi(xAll.getAttribute("size"));
425 uint index, cid;
426 for(uint i = 0; i < nIndividuals; i++) {
427 xIndividual = xAll.getChildNode(i);
428 pack.push_back((IndividualP) new Individual(state_));
429 index = atoi(xIndividual.getAttributeValue(1));
430 cid = atoi(xIndividual.getAttributeValue(2));
431
432 pack[i]->index = index;
433 pack[i]->cid = cid;
434 pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
435 pack[i]->read(xIndividual);
436 }
437 status_ = status;
438
439 double read = time(UNPACK);
440
441 std::stringstream log;
442 log << "received " << nIndividuals << " individuals, " << length << " bytes (";
443 log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
444 ECF_LOG(state_, logLevel_, log.str());
445
446 return pack;
447}
448
449
450// prima jedinke u postojeci vektor jedinki (bez obzira na indekse)
451// velicina vektora se _povecava_ na broj primljenih jedinki
452// ako je jedinki manje, vektor se _ne smanjuje_
453// vraca broj primljenih jedinki
454uint Communicator::recvReplaceIndividuals(std::vector<IndividualP>& pool, uint iProcess)
455{
456 XMLNode xAll, xIndividual;
457 MPI::Status status;
458
459 time(COMP);
460
461 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
462
463 double idle = time(IDLE);
464
465 uint length = status.Get_count(MPI::CHAR);
466 char *message = new char[length + 1];
467 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
468 status_ = status;
469
470 double recv = time(RECV);
471
472 xAll = XMLNode::parseString(message, "Pack");
473 uint nIndividuals = atoi(xAll.getAttribute("size"));
474 uint poolSize = (uint) pool.size();
475
476 if(poolSize < nIndividuals) {
477 pool.resize(nIndividuals);
478 for(uint i = poolSize; i < nIndividuals; i++) {
479 pool[i] = (IndividualP) new Individual(state_);
480 pool[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
481 }
482 }
483
484 uint index, cid;
485 for(uint i = 0; i < nIndividuals; i++) {
486 xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
487 index = atoi(xIndividual.getAttributeValue(1));
488 cid = atoi(xIndividual.getAttributeValue(2));
489 pool[i]->index = index;
490 pool[i]->cid = cid;
491 pool[i]->read(xIndividual);
492 }
493
494 double read = time(UNPACK);
495
496 std::stringstream log;
497 log << "received " << nIndividuals << " individuals, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
498 ECF_LOG(state_, logLevel_, log.str());
499
500 delete [] message;
501
502 return nIndividuals;
503}
504
505
506// salje fitnese jedinki iz zadanog vektora
507// fitnesi se pakiraju kao jedinke bez genotipa (atribut "i" je prvi po redu!)
508// svakoj jedinki se dodaje njen indeks iz _deme_ (ne iz vektora pool!)
509// ako je nIndividuals != 0, salje se samo toliko prvih iz vektora
510bool Communicator::sendFitness(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
511{
512 time(COMP);
513
514 XMLNode xAll, xIndividual, xFitness;
515 xAll = XMLNode::createXMLTopNode("Pack");
516
517 if(nIndividuals == 0)
518 nIndividuals = (uint) pool.size();
519 xAll.addAttribute("size", uint2str(nIndividuals).c_str());
520
521 for(uint ind = 0; ind < nIndividuals; ind++) {
522 xIndividual = XMLNode::createXMLTopNode("Individual");
523 xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
524 xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
525
526 pool[ind]->fitness->write(xFitness);
527 xIndividual.addChild(xFitness);
528 xAll.addChild(xIndividual);
529 }
530 char *message = xAll.createXMLString(0);
531
532 double pack = time(PACK);
533
534 demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
535
536 double send = time(SEND);
537
538 std::stringstream log;
539 log << "sent " << nIndividuals << " fitness objects, " << strlen(message) << " bytes (P: " << pack << " | S: " << send << ")";
540 ECF_LOG(state_, logLevel_, log.str());
541
542 freeXMLString(message);
543
544 return true;
545}
546
547
548// prima fitnese na mjesto postojecih u deme objektu
549// fitnesi zu zapakirani u jedinke bez genotipa (atribut "i" je prvi po redu!)
550// cita se indeks iz poruke i fitnes jedinke se stavlja na odgovarajuce mjesto u deme vektoru
551uint Communicator::recvDemeFitness(std::vector<IndividualP>& deme, uint iProcess)
552{
553 XMLNode xAll, xIndividual, xFitness;
554 MPI::Status status;
555 std::string message;
556
557 time(COMP);
558
559 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
560
561 double idle = time(IDLE);
562
563 uint length = status.Get_count(MPI::CHAR);
564 message.resize(length);
565 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
566
567 double recv = time(RECV);
568
569 xAll = XMLNode::parseString(message.c_str(), "Pack");
570 uint nIndividuals = atoi(xAll.getAttribute("size"));
571 uint index, cid;
572 for(uint i = 0; i < nIndividuals; i++) {
573 xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
574 index = atoi(xIndividual.getAttributeValue(0));
575 cid = atoi(xIndividual.getAttributeValue(1));
576 xFitness = xIndividual.getChildNode(0);
577 deme[index]->fitness->read(xFitness);
578 deme[index]->fitness->cid = cid;
579
580 }
581 status_ = status;
582
583 double read = time(UNPACK);
584
585 std::stringstream log;
586 log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
587 ECF_LOG(state_, logLevel_, log.str());
588
589 return nIndividuals;
590}
591
592
593// prima fitnese u vektor fitnesa
594// fitnesi dolaze zapakirani kao jedinke bez genotipa (atribut "i" je prvi po redu!)
595// fitnesi se stavljaju u vektor (prvi argument) na mjesto koje pise kao indeks jedinke u poruci
596// funkcija vraca vektor indeksa jedinki primljenih fitnesa
597std::vector<uint> Communicator::recvFitnessVector(std::vector<IndividualP>& deme, uint iProcess)
598{
599 XMLNode xAll, xIndividual, xFitness;
600 MPI::Status status;
601 std::string message;
602 std::vector<uint> indices;
603
604 time(COMP);
605
606 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
607
608 double idle = time(IDLE);
609
610 uint length = status.Get_count(MPI::CHAR);
611 message.resize(length);
612 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
613
614 double recv = time(RECV);
615
616 xAll = XMLNode::parseString(message.c_str(), "Pack");
617 uint nIndividuals = atoi(xAll.getAttribute("size"));
618 uint index, cid;
619 for(uint i = 0; i < nIndividuals; i++) {
620 xIndividual = xAll.getChildNode(i);
621 index = atoi(xIndividual.getAttributeValue(0));
622 cid = atoi(xIndividual.getAttributeValue(1));
623 xFitness = xIndividual.getChildNode(0);
624
625 deme[index]->fitness->read(xFitness);
626 deme[index]->fitness->cid = cid;
627 indices.push_back(index);
628 }
629 status_ = status;
630
631 double read = time(UNPACK);
632
633 std::stringstream log;
634 log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
635 ECF_LOG(state_, logLevel_, log.str());
636
637 return indices;
638}
639
640
641// salje vektor vrijednosti u globalnom komunikatoru
642bool Communicator::sendValuesGlobal(std::vector<double> values, uint iProcess)
643{
644 time(COMP);
645
646 frameworkComm_.Send(&values[0], (int) values.size(), MPI::DOUBLE, iProcess, T_VALUES);
647
648 time(SEND);
649
650 std::stringstream log;
651 log << "sent " << values.size() << " doubles";
652 ECF_LOG(state_, logLevel_, log.str());
653
654 return true;
655}
656
657
658// prima vektor vrijednosti u globalnom komunikatoru
659std::vector<double> Communicator::recvValuesGlobal(uint iProcess)
660{
661 std::vector<double> values;
662 MPI::Status status;
663
664 time(COMP);
665
666 frameworkComm_.Probe(iProcess, T_VALUES, status);
667
668 double idle = time(IDLE);
669
670 uint size = status.Get_count(MPI::DOUBLE);
671 values.resize(size);
672 frameworkComm_.Recv(&values[0], size, MPI::DOUBLE, iProcess, T_VALUES, status);
673
674 double recv = time(RECV);
675
676 std::stringstream log;
677 log << "received " << values.size() << " doubles";
678 ECF_LOG(state_, logLevel_, log.str());
679
680 return values;
681}
682
683
684// prima pristigle log poruke od bilo kojih procesa u globalnom komunikatoru
685std::string Communicator::recvLogsGlobal()
686{
687 std::string logs = "", message;
688 uint logCount = 0;
689 MPI::Status status;
690
691 time(COMP);
692
693 while(frameworkComm_.Iprobe(MPI::ANY_SOURCE, T_LOGS, status)) {
694 uint iProcess = status.Get_source();
695 uint length = status.Get_count(MPI::CHAR);
696 message.resize(length);
697 frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_LOGS, status);
698 logs += message;
699 logCount++;
700 }
701
702 time(RECV);
703
704 std::stringstream log;
705 log << "received " << logCount << " logs";
706 ECF_LOG(state_, logLevel_, log.str());
707
708 return logs;
709}
710
711
712// salje log poruke u globalnom komunikatoru
713bool Communicator::sendLogsGlobal(std::string logs, uint iProcess, bool blocking)
714{
715 time(COMP);
716
717 MPI::Request request;
718
719 if(blocking)
720 frameworkComm_.Send(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
721 else
722 request = frameworkComm_.Isend(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
723
724 time(SEND);
725
726 std::stringstream log;
727 log << "sent " << logs.size() << " log chars";
728 ECF_LOG(state_, logLevel_, log.str());
729
730 return true;
731}
732
733
734// slanje bilo kakvih podataka u globalnom komunikatoru
735bool Communicator::sendDataGlobal(voidP data, uint size, uint iProcess)
736{
737 time(COMP);
738
739 frameworkComm_.Send(data.get(), size, MPI::BYTE, iProcess, T_DATA);
740
741 time(SEND);
742
743 std::stringstream log;
744 log << "sent " << size << " bytes";
745 ECF_LOG(state_, logLevel_, log.str());
746
747 return true;
748}
749
750
751// primanje bilo kakvih podataka u globalnom komunikatoru
752voidP Communicator::recvDataGlobal(uint iProcess)
753{
754 MPI::Status status;
755 voidP data;
756
757 time(COMP);
758
759 frameworkComm_.Probe(iProcess, T_DATA, status);
760
761 time(IDLE);
762
763 uint size = status.Get_count(MPI::BYTE);
764 data = (voidP) new char[size / sizeof(char) + 1];
765 frameworkComm_.Recv(data.get(), size, MPI::BYTE, iProcess, T_DATA, status);
766
767 time(RECV);
768
769 std::stringstream log;
770 log << "received " << size << " bytes";
771 ECF_LOG(state_, logLevel_, log.str());
772
773 return data;
774}
775
776#endif
777
778} // namespace
double time(enum timing T)
MPI::Intercomm frameworkComm_
Definition: Communicator.h:69
MPI::Intercomm demeComm_
Definition: Communicator.h:76
Individual class - inherits a vector of Genotype objects.
Definition: Individual.h:12
FitnessP fitness
sptr to individual's fitness object
Definition: Individual.h:38