ECF 1.5
Algorithm.cpp
1#include "ECF_base.h"
2
3#ifdef _MPI
4
5const int MASTER = 0;
6
7
9{
10 std::string *type = new std::string("eval");
11 state->getRegistry()->registerEntry("parallel.type", (voidP) type, ECF::STRING,
12 "implicit parallelization method: eval - evaluation, mut - mutation + eval");
13
14 uint* jobSize = new uint(10);
15 state->getRegistry()->registerEntry("parallel.jobsize", (voidP) jobSize, ECF::UINT,
16 "implicit parallelization jobsize (individuals per job)");
17
18 uint *sync = new uint(0);
19 state->getRegistry()->registerEntry("parallel.sync", (voidP) sync, ECF::UINT,
20 "implicit parallelization synchronicity: 0 - async, 1 - sync ");
21}
22
23
24bool Algorithm::initializeParallel(StateP state)
25{
26 state_ = state;
27 comm_ = state->getCommunicator();
28 selBestOp = static_cast<SelBestOpP> (new SelBestOp);
29 totalEvaluations_ = wastedEvaluations_ = 0;
31
32 // initialize protected members for population consistency
33 uint demeSize = state->getPopulation()->getLocalDeme()->getSize();
34 storedInds_.resize(demeSize);
35 sentInds_.resize(demeSize);
36 isConsistent_.resize(demeSize);
37
38 if(!state_->getRegistry()->isModified("parallel.type"))
39 bImplicitParallel_ = false;
40 else
41 bImplicitParallel_ = true;
42
43 if(this->isParallel() && bImplicitParallel_) {
44 ECF_LOG_ERROR(state_, "Error: implicit parallelization possible only with sequential algorithm!");
45 throw "";
46 }
47
48 if((this->isParallel() || bImplicitParallel_)) {
49 if(state->getPopulation()->getNoDemes() > state->getCommunicator()->getCommGlobalSize()) {
50 ECF_LOG_ERROR(state, "Error: number of processes must be equal or greater than the number of demes in parallel EA!");
51 throw "";
52 }
53 }
54 else if(state->getPopulation()->getNoDemes() != state->getCommunicator()->getCommGlobalSize()) {
55 ECF_LOG_ERROR(state, "Error: number of processes must equal the number of demes in EA with sequential algorithm!");
56 throw "";
57 }
58
60 return true;
61
62 voidP sptr = state->getRegistry()->getEntry("parallel.type");
63 std::string parallelType = *((std::string*)sptr.get());
64
65 if(parallelType == "eval") {
67 }
68 else if(parallelType == "mut") {
69 bImplicitMutation_ = true;
70 }
71 else {
72 ECF_LOG_ERROR(state, "Error: unkown implicit parallelization mode!");
73 throw "";
74 }
75
76 sptr = state->getRegistry()->getEntry("parallel.sync");
77 bSynchronized_ = ((*((uint*) sptr.get())) != 0);
78
79 voidP jobSizeP = state->getRegistry()->getEntry("parallel.jobsize");
80 jobSize_ = *((uint*) jobSizeP.get());
81
82 if(jobSize_ > state->getPopulation()->at(0)->getSize()) {
83 ECF_LOG_ERROR(state, "Error: parallel.jobsize must be less or equal to the number of individuals!");
84 throw "";
85 }
86
87 return true;
88}
89
90
91// create deme-sized vector of initialized individuals for receiving
93{
94 for(uint i = 0; i < state->getPopulation()->at(0)->size(); i++) {
95 IndividualP ind = (IndividualP) new Individual(state);
96 ind->fitness = (FitnessP) state->getFitnessObject()->copy();
97 demeCopy_.push_back(ind);
98 requestIds_.push_back(0);
99 requestMutIds_.push_back(0);
100 }
101 stored_.resize(state->getPopulation()->at(0)->size());
102 currentBest_ = selBestOp->select(*(state->getPopulation()->at(0)));
103}
104
105
106bool Algorithm::advanceGeneration(StateP state)
107{
108 // implicitly parallel: worker processes go to work
109 if(state->getCommunicator()->getCommRank() != 0 && bImplicitParallel_)
110 return implicitParallelOperate(state);
111 // otherwise, run the algorithm
112 else
113 return advanceGeneration(state, state->getPopulation()->at(0));
114}
115
116
117void Algorithm::implicitEvaluate(IndividualP ind)
118{
119 totalEvaluations_++;
120 if(requestIds_[ind->index] > 0) { // if already on evaluation
121 wastedEvaluations_++;
122 return;
123 }
124
125 DemeP workingDeme = state_->getPopulation()->getLocalDeme();
126
127 if(!isMember(ind, requests_)) {
128 requests_.push_back(ind);
129 requestIds_[ind->index] = 1; // fitness wanted
130
131 // synchronous implicit evaluation: individual to be evaluated is removed from the deme
132 // upon return, the individual is put at the end of deme
133 // individual's index is _NOT_ its place in deme vector anymore!
134 if(bSynchronized_) {
135 stored_[ind->index] = ind;
136 removeFrom(ind, *workingDeme);
137 }
138 }
139
140 if(requests_.size() < jobSize_)
141 return;
142
143 if(comm_->messageWaiting()) {
144 ECF_LOG(state_, 4, "Receiving evaluated individuals");
145 std::vector<uint> indices = comm_->recvFitnessVector(demeCopy_, Comm::ANY_PROCESS);
146 for(uint i = 0; i < indices.size(); i++) {
147 if(requestIds_[indices[i]] > 0) { // only if needs evaluation
148 if(bSynchronized_) { // return individual to deme
149 stored_[indices[i]]->fitness = demeCopy_[indices[i]]->fitness;
150 workingDeme->push_back(stored_[indices[i]]);
151 }
152 else
153 workingDeme->at(indices[i])->fitness = demeCopy_[indices[i]]->fitness;
154 requestIds_[indices[i]] = 0;
155 }
156 }
157
158 uint iWorker = comm_->getLastSource();
159 comm_->sendIndividuals(requests_, iWorker);
160 requests_.resize(0);
161 }
162 else {
163 ECF_LOG(state_, 4, "Evaluating locally...");
164 IndividualP ind = requests_.back();
165 ind->fitness = evalOp_->evaluate(ind);
166 requestIds_[ind->index] = 0;
167 if(bSynchronized_) { // return individual to deme
168 workingDeme->push_back(ind);
169 }
170 requests_.pop_back();
171 }
172
173}
174
175
176uint Algorithm::implicitMutate(IndividualP ind)
177{
178 DemeP workingDeme = state_->getPopulation()->getLocalDeme();
179
180 if(!isMember(ind, requestsMut_)) {
181 requestsMut_.push_back(ind);
182 requestMutIds_[ind->index] = 1; // mutation needed
183
184 if(bSynchronized_) { // remove from deme
185 removeFrom(ind, *workingDeme);
186 }
187 }
188 if(requestsMut_.size() < jobSize_)
189 return 0;
190
191 if(comm_->messageWaiting()) {
192 ECF_LOG(state_, 4, "Receiving mutated individuals");
193 uint received = comm_->recvReplaceIndividuals(receivedMut_, Comm::ANY_PROCESS);
194 currentBest_ = selBestOp->select(*workingDeme);
195
196 for(uint i = 0; i < received; i++) {
197 // if synchronized, return individual to deme
198 if(bSynchronized_) {
199 IndividualP newInd = (IndividualP) receivedMut_[i]->copy();
200 newInd->index = receivedMut_[i]->index;
201 workingDeme->push_back(newInd);
202 }
203 // replace individual only if mutation is needed
204 // also: only if not replacing the current best
205 else {
206 if(requestMutIds_[receivedMut_[i]->index] > 0
207 && (workingDeme->at(receivedMut_[i]->index) != currentBest_
208 || receivedMut_[i]->fitness->isBetterThan(currentBest_->fitness))) {
209 IndividualP newInd = (IndividualP) receivedMut_[i]->copy();
210 workingDeme->replace(receivedMut_[i]->index, newInd);
211 }
212 }
213 requestMutIds_[receivedMut_[i]->index] = 0;
214 }
215
216 uint iWorker = comm_->getLastSource();
217 comm_->sendIndividuals(requestsMut_, iWorker);
218
219 requestsMut_.resize(0);
220 return received;
221 }
222 else {
223 ECF_LOG(state_, 4, "Mutating locally...");
224 std::vector<IndividualP> pool;
225 pool.push_back(requestsMut_.back());
226 requestMutIds_[requestsMut_.back()->index] = 0;
227 requestsMut_.pop_back();
228
229 if(bSynchronized_) { // return individual to deme
230 workingDeme->push_back(pool[0]);
231 }
232 uint mutated = mutation_->mutation(pool);
233 return mutated;
234 }
235}
236
237
238// implicit parallelization: worker processes
240{
241 ECF_LOG(state, 4, "Worker process initiating.");
242
244 myJob_.resize(0);
245 comm_->sendFitness(myJob_, MASTER);
246 uint myJobSize = 1;
247
248 // while individuals to evaluate
249 while(myJobSize != 0) {
250
251 myJobSize = comm_->recvReplaceIndividuals(myJob_, MASTER);
252 for(uint i = 0; i < myJobSize; i++)
253 myJob_[i]->fitness = evalOp_->evaluate(myJob_[i]);
254
255 comm_->sendFitness(myJob_, MASTER, myJobSize);
256 }
257 }
258
259 // implicit mutation + evaluation
260 else if(bImplicitMutation_) {
261 std::vector<IndividualP> mutationPool;
262 myJob_.resize(0);
263 comm_->sendIndividuals(myJob_, MASTER);
264 uint myJobSize = 1;
265
266 // while individuals to mutate
267 while(myJobSize != 0) {
268
269 myJobSize = comm_->recvReplaceIndividuals(myJob_, MASTER);
270
271 mutationPool.resize(myJobSize);
272 for(uint i = 0; i < myJobSize; i++) {
273 mutationPool[i] = myJob_[i];
274 }
275
276 mutation_->mutation(mutationPool);
277
278 for(uint i = 0; i < myJobSize; i++)
279 if(!mutationPool[i]->fitness->isValid())
280 mutationPool[i]->fitness = evalOp_->evaluate(mutationPool[i]);
281
282 comm_->sendIndividuals(myJob_, MASTER, myJobSize);
283 }
284 }
285
286 return true;
287}
288
289
291{
293 // master: if termination not true, do nothing; otherwise, send workers empty jobs
294 if(state->getCommunicator()->getCommRank() == 0 && state->getTerminateCond()) {
295 std::vector<IndividualP> empty;
296 for(uint iWorker = 1; iWorker < state->getCommunicator()->getCommSize(); iWorker++) {
297 state->getCommunicator()->sendIndividuals(empty, iWorker);
298 }
299 }
300 // worker: empty job received, set terminate for local process
301 else if(state->getCommunicator()->getCommRank() != 0)
302 state->setTerminateCond();
303
304 return;
305 }
306
307 // explicitly parallel algorithm (synchronous)
308 if(state->getCommunicator()->getCommRank() == 0) {
309 for(uint i = 1; i < comm_->getCommSize(); i++)
310 comm_->sendControlMessage(i, state->getTerminateCond());
311 }
312 else {
313 if(comm_->recvControlMessage(0))
314 state->setTerminateCond();
315 }
316}
317
318
319void Algorithm::replaceWith(IndividualP oldInd, IndividualP newInd)
320{
321 if(!bSynchronized_) {
322 state_->getPopulation()->at(0)->replace(oldInd->index, newInd);
323 return;
324 }
325
326 DemeP workingDeme = state_->getPopulation()->at(0);
327
328 for(uint i = 0; i < workingDeme->size(); i++) {
329 if(workingDeme->at(i)->index == oldInd->index) {
330 workingDeme->at(i) = newInd;
331 newInd->index = oldInd->index;
332 return;
333 }
334 }
335}
336
337
339{
340 CommunicatorP comm = state->getCommunicator();
341 DemeP myDeme = state->getPopulation()->at(0);
342 uint demeSize = (uint) myDeme->size();
343
344 isConsistent_.assign(isConsistent_.size(), true);
345
346 if(comm->getCommSize() == 1) {
347 for(uint iInd = 0; iInd < demeSize; iInd++) {
348 IndividualP ind = myDeme->at(iInd);
349 state_->getContext()->evaluatedIndividual = ind;
350 ECF_LOG(state_, 5, "Evaluating individual: " + ind->toString());
351 ind->fitness = evalOp_->evaluate(ind);
352 ECF_LOG(state_, 5, "New fitness: " + ind->fitness->toString());
353 state_->increaseEvaluations();
354 }
355 return true;
356 }
357
358 if(comm->getCommRank() == 0) {
359 uint jobsPerProcess = 4;
360 uint jobSize = 1 + demeSize / comm->getCommSize() / jobsPerProcess;
361 uint remaining = demeSize;
362 std::vector<IndividualP> job;
363
364 for(uint iInd = 0; iInd < demeSize; ) {
365 for(uint i = 0; i < jobSize && iInd < demeSize; i++, iInd++) {
366 myDeme->at(iInd)->fitness = (FitnessP) state->getFitnessObject()->copy();
367 job.push_back(myDeme->at(iInd));
368 }
369 remaining -= comm->recvDemeFitness(*myDeme, MPI::ANY_SOURCE);
370 uint iWorker = comm->getLastSource();
371 comm->sendIndividuals(job, iWorker);
372 job.resize(0);
373 }
374
375 int remainingWorkers = comm->getCommSize() - 1;
376 while(remaining > 0 || remainingWorkers > 0) {
377 remaining -= comm->recvDemeFitness(*myDeme, MPI::ANY_SOURCE);
378
379 uint iWorker = comm->getLastSource();
380 comm->sendIndividuals(job, iWorker);
381 remainingWorkers--;
382 }
383
384 state->increaseEvaluations(demeSize);
385 }
386
387 else {
388 std::vector<IndividualP> myJob_;
389 myJob_.resize(0);
390 comm->sendFitness(myJob_, MASTER);
391
392 uint myJobSize;
393 myJobSize = comm->recvReplaceIndividuals(myJob_, MASTER);
394
395 // while individuals to evaluate
396 while(myJobSize > 0) {
397 for(uint i = 0; i < myJobSize; i++) {
398 state_->getContext()->evaluatedIndividual = myJob_[i];
399 ECF_LOG(state_, 5, "Evaluating individual: " + myJob_[i]->toString());
400 myJob_[i]->fitness = evalOp_->evaluate(myJob_[i]);
401 ECF_LOG(state_, 5, "New fitness: " + myJob_[i]->fitness->toString());
402 }
403
404 comm->sendFitness(myJob_, MASTER, myJobSize);
405 myJobSize = comm->recvReplaceIndividuals(myJob_, MASTER);
406 }
407 }
408
409 return true;
410}
411
412
413// called before genotype change
414void Algorithm::storeIndividual(IndividualP ind)
415{
416 if(isConsistent_[ind->index]) {
417 storedInds_[ind->index] = (IndividualP) ind->copy();
418 }
419 isConsistent_[ind->index] = false;
420}
421
422
423// called after individual evaluation
424void Algorithm::setConsistency(IndividualP ind)
425{
426 isConsistent_[ind->index] = true;
427 storedInds_[ind->index]->fitness = (FitnessP) ind->fitness->copy();
428}
429
430
431// called when sending individuals for evaluation
432void Algorithm::storeGenotypes(std::vector<IndividualP>& pool)
433{
434 for(uint ind = 0; ind < pool.size(); ind++) {
435 uint index = pool[ind]->index;
436
437 // find first available slot for individual
438 uint cid = 0;
439 while(cid < sentInds_[index].size()) {
440 if(!sentInds_[index][cid])
441 break;
442 cid++;
443 }
444 if(cid == sentInds_[index].size())
445 sentInds_[index].resize(cid + 1);
446
447 // assign cid, copy individual
448 pool[ind]->cid = cid;
449 sentInds_[index][cid] = (IndividualP) pool[ind]->copy();
450 }
451}
452
453
454// called when fitness is restored
455void Algorithm::restoreIndividuals(std::vector<uint> received)
456{
457 DemeP myDeme = state_->getPopulation()->getLocalDeme();
458
459 for(uint ind = 0; ind < received.size(); ind++) {
460 uint index = received[ind];
461 uint cid = myDeme->at(index)->fitness->cid;
462
463 // restore genotype
464 if(isConsistent_[index] == false) {
465 sentInds_[index][cid]->fitness = myDeme->at(index)->fitness;
466 replaceWith(myDeme->at(index), sentInds_[index][cid]);
467
468 isConsistent_[index] = true;
469 storedInds_[index]->fitness = (FitnessP) myDeme->at(index)->fitness->copy();
470 }
471 // or reject new fitness
472 else {
473 myDeme->at(index)->fitness = (FitnessP) storedInds_[index]->fitness->copy();
474 }
475 sentInds_[index][cid].reset();
476 }
477}
478
479
480// called at evolution end for fitness consistency
482{
483 DemeP myDeme = state_->getPopulation()->getLocalDeme();
484 uint demeSize = myDeme->getSize();
485
486 for(uint ind = 0; ind < demeSize; ind++) {
487 if(!isConsistent_[ind]) {
488 (*myDeme)[ind] = (IndividualP) storedInds_[ind]->copy();
489 }
490 }
491}
492
493
494#else // non _MPI
495
496
497bool Algorithm::initializePopulation(StateP state)
498{
499 for(uint iDeme = 0; iDeme < state->getPopulation()->size(); iDeme++)
500 for(uint iInd = 0; iInd < state->getPopulation()->at(iDeme)->size(); iInd++)
501 evaluate(state->getPopulation()->at(iDeme)->at(iInd));
502 return true;
503}
504
505#endif // _MPI
std::vector< IndividualP > stored_
individual vectors for implicit evaluation
Definition: Algorithm.h:52
bool implicitParallelOperate(StateP state)
Parallel ECF: Worker processes in implicit parallel algorithm.
Definition: Algorithm.cpp:239
IndividualP copy(IndividualP source)
Helper function: make a copy of an individual.
Definition: Algorithm.h:291
bool bSynchronized_
is implicit paralelization synchronous
Definition: Algorithm.h:61
void initializeImplicit(StateP state)
Parallel ECF: Initialize implicit parallel mode.
Definition: Algorithm.cpp:92
std::vector< uint > requestMutIds_
individual indexes for implicit mutation
Definition: Algorithm.h:64
bool bImplicitParallel_
implicit parallel flag
Definition: Algorithm.h:24
virtual bool advanceGeneration(StateP, DemeP)=0
Perform a single generation on a single deme.
EvaluateOpP evalOp_
sptr to evaluation operator (set by the system)
Definition: Algorithm.h:82
void storeIndividual(IndividualP)
stores the individual (if it is consistent), resets consistency flag
Definition: Algorithm.cpp:414
std::vector< IndividualP > myJob_
worker's individual vector
Definition: Algorithm.h:55
virtual bool initializePopulation(StateP)
Evaluate initial population (called by State::run before evolution starts).
Definition: Algorithm.cpp:338
bool isMember(IndividualP single, std::vector< IndividualP > &pool)
Helper function: check if individual is in the pool.
Definition: Algorithm.h:311
void setConsistency(IndividualP)
denotes current individual as consistent
Definition: Algorithm.cpp:424
void replaceWith(IndividualP oldInd, IndividualP newInd)
Helper function: replace an individual in current deme.
Definition: Algorithm.h:187
bool removeFrom(IndividualP victim, std::vector< IndividualP > &pool)
Helper function: remove victim from pool of individual pointers.
Definition: Algorithm.h:297
MutationP mutation_
sptr to container of mutation operators (set by the system)
Definition: Algorithm.h:81
uint implicitMutate(IndividualP ind)
Parallel ECF: implicitly mutate an individual (store for later mutation in implicit parallel version)...
Definition: Algorithm.cpp:176
void registerParallelParameters(StateP state)
used only in parallel ECF
Definition: Algorithm.h:199
std::vector< bool > isConsistent_
is individual (genotype-fitness pair) consistent
Definition: Algorithm.h:69
void restorePopulation()
restores inconsistent individuals to last consistent state
Definition: Algorithm.cpp:481
bool bImplicitMutation_
implicit mutation flag
Definition: Algorithm.h:60
bool initializeParallel(StateP state)
used only in parallel ECF
Definition: Algorithm.h:203
virtual void bcastTermination(StateP state)
Parallel ECF: broadcast termination to worker processes.
Definition: Algorithm.cpp:290
void evaluate(IndividualP ind)
Helper function: evaluate an individual.
Definition: Algorithm.h:157
void implicitEvaluate(IndividualP ind)
Parallel ECF: implicitly evaluate an individual (store for later evaluation in implicit parallel vers...
Definition: Algorithm.cpp:117
bool bImplicitEvaluation_
implicit evaluation flag
Definition: Algorithm.h:59
std::vector< std::vector< IndividualP > > sentInds_
individuals sent for evaluation
Definition: Algorithm.h:68
void restoreIndividuals(std::vector< uint >)
restores individuals whose fitness is received
Definition: Algorithm.cpp:455
void storeGenotypes(std::vector< IndividualP > &)
adds genotypes of individuals to 'sent' repository
Definition: Algorithm.cpp:432
std::vector< IndividualP > receivedMut_
individual vectors for implicit mutation
Definition: Algorithm.h:63
std::vector< uint > requestIds_
individual indexes for implicit evaluation
Definition: Algorithm.h:53
virtual bool isParallel()
Is algorithm parallel (false by default for all algorithms not inheriting ParallelAlgorithm class).
Definition: Algorithm.h:96
Individual class - inherits a vector of Genotype objects.
Definition: Individual.h:12
FitnessP fitness
sptr to individual's fitness object
Definition: Individual.h:38
Best individual selection operator.
Definition: SelBestOp.h:10
type
Data types used for configuration file parameters.
Definition: Registry.h:16