10 std::string *
type =
new std::string(
"eval");
11 state->getRegistry()->registerEntry(
"parallel.type", (voidP)
type, ECF::STRING,
12 "implicit parallelization method: eval - evaluation, mut - mutation + eval");
14 uint* jobSize =
new uint(10);
15 state->getRegistry()->registerEntry(
"parallel.jobsize", (voidP) jobSize, ECF::UINT,
16 "implicit parallelization jobsize (individuals per job)");
18 uint *sync =
new uint(0);
19 state->getRegistry()->registerEntry(
"parallel.sync", (voidP) sync, ECF::UINT,
20 "implicit parallelization synchronicity: 0 - async, 1 - sync ");
27 comm_ = state->getCommunicator();
28 selBestOp =
static_cast<SelBestOpP
> (
new SelBestOp);
29 totalEvaluations_ = wastedEvaluations_ = 0;
33 uint demeSize = state->getPopulation()->getLocalDeme()->getSize();
34 storedInds_.resize(demeSize);
38 if(!state_->getRegistry()->isModified(
"parallel.type"))
44 ECF_LOG_ERROR(state_,
"Error: implicit parallelization possible only with sequential algorithm!");
49 if(state->getPopulation()->getNoDemes() > state->getCommunicator()->getCommGlobalSize()) {
50 ECF_LOG_ERROR(state,
"Error: number of processes must be equal or greater than the number of demes in parallel EA!");
54 else if(state->getPopulation()->getNoDemes() != state->getCommunicator()->getCommGlobalSize()) {
55 ECF_LOG_ERROR(state,
"Error: number of processes must equal the number of demes in EA with sequential algorithm!");
62 voidP sptr = state->getRegistry()->getEntry(
"parallel.type");
63 std::string parallelType = *((std::string*)sptr.get());
65 if(parallelType ==
"eval") {
68 else if(parallelType ==
"mut") {
72 ECF_LOG_ERROR(state,
"Error: unkown implicit parallelization mode!");
76 sptr = state->getRegistry()->getEntry(
"parallel.sync");
79 voidP jobSizeP = state->getRegistry()->getEntry(
"parallel.jobsize");
80 jobSize_ = *((uint*) jobSizeP.get());
82 if(jobSize_ > state->getPopulation()->at(0)->getSize()) {
83 ECF_LOG_ERROR(state,
"Error: parallel.jobsize must be less or equal to the number of individuals!");
94 for(uint i = 0; i < state->getPopulation()->at(0)->size(); i++) {
95 IndividualP ind = (IndividualP)
new Individual(state);
96 ind->
fitness = (FitnessP) state->getFitnessObject()->copy();
97 demeCopy_.push_back(ind);
101 stored_.resize(state->getPopulation()->at(0)->size());
102 currentBest_ = selBestOp->select(*(state->getPopulation()->at(0)));
121 wastedEvaluations_++;
125 DemeP workingDeme = state_->getPopulation()->getLocalDeme();
128 requests_.push_back(ind);
140 if(requests_.size() < jobSize_)
143 if(comm_->messageWaiting()) {
144 ECF_LOG(state_, 4,
"Receiving evaluated individuals");
145 std::vector<uint> indices = comm_->recvFitnessVector(demeCopy_, Comm::ANY_PROCESS);
146 for(uint i = 0; i < indices.size(); i++) {
149 stored_[indices[i]]->fitness = demeCopy_[indices[i]]->fitness;
150 workingDeme->push_back(
stored_[indices[i]]);
153 workingDeme->at(indices[i])->fitness = demeCopy_[indices[i]]->fitness;
158 uint iWorker = comm_->getLastSource();
159 comm_->sendIndividuals(requests_, iWorker);
163 ECF_LOG(state_, 4,
"Evaluating locally...");
164 IndividualP ind = requests_.back();
165 ind->fitness =
evalOp_->evaluate(ind);
168 workingDeme->push_back(ind);
170 requests_.pop_back();
178 DemeP workingDeme = state_->getPopulation()->getLocalDeme();
181 requestsMut_.push_back(ind);
188 if(requestsMut_.size() < jobSize_)
191 if(comm_->messageWaiting()) {
192 ECF_LOG(state_, 4,
"Receiving mutated individuals");
193 uint received = comm_->recvReplaceIndividuals(
receivedMut_, Comm::ANY_PROCESS);
194 currentBest_ = selBestOp->select(*workingDeme);
196 for(uint i = 0; i < received; i++) {
201 workingDeme->push_back(newInd);
207 && (workingDeme->at(
receivedMut_[i]->index) != currentBest_
208 ||
receivedMut_[i]->fitness->isBetterThan(currentBest_->fitness))) {
216 uint iWorker = comm_->getLastSource();
217 comm_->sendIndividuals(requestsMut_, iWorker);
219 requestsMut_.resize(0);
223 ECF_LOG(state_, 4,
"Mutating locally...");
224 std::vector<IndividualP> pool;
225 pool.push_back(requestsMut_.back());
227 requestsMut_.pop_back();
230 workingDeme->push_back(pool[0]);
232 uint mutated =
mutation_->mutation(pool);
241 ECF_LOG(state, 4,
"Worker process initiating.");
245 comm_->sendFitness(
myJob_, MASTER);
249 while(myJobSize != 0) {
251 myJobSize = comm_->recvReplaceIndividuals(
myJob_, MASTER);
252 for(uint i = 0; i < myJobSize; i++)
255 comm_->sendFitness(
myJob_, MASTER, myJobSize);
261 std::vector<IndividualP> mutationPool;
263 comm_->sendIndividuals(
myJob_, MASTER);
267 while(myJobSize != 0) {
269 myJobSize = comm_->recvReplaceIndividuals(
myJob_, MASTER);
271 mutationPool.resize(myJobSize);
272 for(uint i = 0; i < myJobSize; i++) {
273 mutationPool[i] =
myJob_[i];
278 for(uint i = 0; i < myJobSize; i++)
279 if(!mutationPool[i]->fitness->isValid())
280 mutationPool[i]->fitness =
evalOp_->evaluate(mutationPool[i]);
282 comm_->sendIndividuals(
myJob_, MASTER, myJobSize);
294 if(state->getCommunicator()->getCommRank() == 0 && state->getTerminateCond()) {
295 std::vector<IndividualP> empty;
296 for(uint iWorker = 1; iWorker < state->getCommunicator()->getCommSize(); iWorker++) {
297 state->getCommunicator()->sendIndividuals(empty, iWorker);
301 else if(state->getCommunicator()->getCommRank() != 0)
302 state->setTerminateCond();
308 if(state->getCommunicator()->getCommRank() == 0) {
309 for(uint i = 1; i < comm_->getCommSize(); i++)
310 comm_->sendControlMessage(i, state->getTerminateCond());
313 if(comm_->recvControlMessage(0))
314 state->setTerminateCond();
322 state_->getPopulation()->at(0)->replace(oldInd->index, newInd);
326 DemeP workingDeme = state_->getPopulation()->at(0);
328 for(uint i = 0; i < workingDeme->size(); i++) {
329 if(workingDeme->at(i)->index == oldInd->index) {
330 workingDeme->at(i) = newInd;
331 newInd->index = oldInd->index;
340 CommunicatorP comm = state->getCommunicator();
341 DemeP myDeme = state->getPopulation()->at(0);
342 uint demeSize = (uint) myDeme->size();
346 if(comm->getCommSize() == 1) {
347 for(uint iInd = 0; iInd < demeSize; iInd++) {
348 IndividualP ind = myDeme->at(iInd);
349 state_->getContext()->evaluatedIndividual = ind;
350 ECF_LOG(state_, 5,
"Evaluating individual: " + ind->toString());
351 ind->fitness =
evalOp_->evaluate(ind);
352 ECF_LOG(state_, 5,
"New fitness: " + ind->fitness->toString());
353 state_->increaseEvaluations();
358 if(comm->getCommRank() == 0) {
359 uint jobsPerProcess = 4;
360 uint jobSize = 1 + demeSize / comm->getCommSize() / jobsPerProcess;
361 uint remaining = demeSize;
362 std::vector<IndividualP> job;
364 for(uint iInd = 0; iInd < demeSize; ) {
365 for(uint i = 0; i < jobSize && iInd < demeSize; i++, iInd++) {
366 myDeme->at(iInd)->fitness = (FitnessP) state->getFitnessObject()->copy();
367 job.push_back(myDeme->at(iInd));
369 remaining -= comm->recvDemeFitness(*myDeme, MPI::ANY_SOURCE);
370 uint iWorker = comm->getLastSource();
371 comm->sendIndividuals(job, iWorker);
375 int remainingWorkers = comm->getCommSize() - 1;
376 while(remaining > 0 || remainingWorkers > 0) {
377 remaining -= comm->recvDemeFitness(*myDeme, MPI::ANY_SOURCE);
379 uint iWorker = comm->getLastSource();
380 comm->sendIndividuals(job, iWorker);
384 state->increaseEvaluations(demeSize);
388 std::vector<IndividualP>
myJob_;
390 comm->sendFitness(
myJob_, MASTER);
393 myJobSize = comm->recvReplaceIndividuals(
myJob_, MASTER);
396 while(myJobSize > 0) {
397 for(uint i = 0; i < myJobSize; i++) {
398 state_->getContext()->evaluatedIndividual =
myJob_[i];
399 ECF_LOG(state_, 5,
"Evaluating individual: " +
myJob_[i]->toString());
401 ECF_LOG(state_, 5,
"New fitness: " +
myJob_[i]->fitness->toString());
404 comm->sendFitness(
myJob_, MASTER, myJobSize);
405 myJobSize = comm->recvReplaceIndividuals(
myJob_, MASTER);
417 storedInds_[ind->index] = (IndividualP) ind->copy();
427 storedInds_[ind->index]->fitness = (FitnessP) ind->fitness->copy();
434 for(uint ind = 0; ind < pool.size(); ind++) {
435 uint index = pool[ind]->index;
448 pool[ind]->cid = cid;
457 DemeP myDeme = state_->getPopulation()->getLocalDeme();
459 for(uint ind = 0; ind < received.size(); ind++) {
460 uint index = received[ind];
461 uint cid = myDeme->at(index)->fitness->cid;
465 sentInds_[index][cid]->fitness = myDeme->at(index)->fitness;
469 storedInds_[index]->fitness = (FitnessP) myDeme->at(index)->fitness->copy();
473 myDeme->at(index)->fitness = (FitnessP) storedInds_[index]->fitness->copy();
483 DemeP myDeme = state_->getPopulation()->getLocalDeme();
484 uint demeSize = myDeme->getSize();
486 for(uint ind = 0; ind < demeSize; ind++) {
488 (*myDeme)[ind] = (IndividualP) storedInds_[ind]->
copy();
499 for(uint iDeme = 0; iDeme < state->getPopulation()->size(); iDeme++)
500 for(uint iInd = 0; iInd < state->getPopulation()->at(iDeme)->size(); iInd++)
501 evaluate(state->getPopulation()->at(iDeme)->at(iInd));
std::vector< IndividualP > stored_
individual vectors for implicit evaluation
bool implicitParallelOperate(StateP state)
Parallel ECF: Worker processes in implicit parallel algorithm.
IndividualP copy(IndividualP source)
Helper function: make a copy of an individual.
bool bSynchronized_
is implicit paralelization synchronous
void initializeImplicit(StateP state)
Parallel ECF: Initialize implicit parallel mode.
std::vector< uint > requestMutIds_
individual indexes for implicit mutation
bool bImplicitParallel_
implicit parallel flag
virtual bool advanceGeneration(StateP, DemeP)=0
Perform a single generation on a single deme.
EvaluateOpP evalOp_
sptr to evaluation operator (set by the system)
void storeIndividual(IndividualP)
stores the individual (if it is consistent), resets consistency flag
std::vector< IndividualP > myJob_
worker's individual vector
virtual bool initializePopulation(StateP)
Evaluate initial population (called by State::run before evolution starts).
bool isMember(IndividualP single, std::vector< IndividualP > &pool)
Helper function: check if individual is in the pool.
void setConsistency(IndividualP)
denotes current individual as consistent
void replaceWith(IndividualP oldInd, IndividualP newInd)
Helper function: replace an individual in current deme.
bool removeFrom(IndividualP victim, std::vector< IndividualP > &pool)
Helper function: remove victim from pool of individual pointers.
MutationP mutation_
sptr to container of mutation operators (set by the system)
uint implicitMutate(IndividualP ind)
Parallel ECF: implicitly mutate an individual (store for later mutation in implicit parallel version)...
void registerParallelParameters(StateP state)
used only in parallel ECF
std::vector< bool > isConsistent_
is individual (genotype-fitness pair) consistent
void restorePopulation()
restores inconsistent individuals to last consistent state
bool bImplicitMutation_
implicit mutation flag
bool initializeParallel(StateP state)
used only in parallel ECF
virtual void bcastTermination(StateP state)
Parallel ECF: broadcast termination to worker processes.
void evaluate(IndividualP ind)
Helper function: evaluate an individual.
void implicitEvaluate(IndividualP ind)
Parallel ECF: implicitly evaluate an individual (store for later evaluation in implicit parallel vers...
bool bImplicitEvaluation_
implicit evaluation flag
std::vector< std::vector< IndividualP > > sentInds_
individuals sent for evaluation
void restoreIndividuals(std::vector< uint >)
restores individuals whose fitness is received
void storeGenotypes(std::vector< IndividualP > &)
adds genotypes of individuals to 'sent' repository
std::vector< IndividualP > receivedMut_
individual vectors for implicit mutation
std::vector< uint > requestIds_
individual indexes for implicit evaluation
virtual bool isParallel()
Is algorithm parallel (false by default for all algorithms not inheriting ParallelAlgorithm class).
Individual class - inherits a vector of Genotype objects.
FitnessP fitness
sptr to individual's fitness object
Best individual selection operator.
type
Data types used for configuration file parameters.