Examples¶
For many people, it is easiest to learn a new library by adapting existing examples to their purposes. For that reason, many examples are presented in this section in the hope that they will prove useful to others using inspyred.
Standard Algorithms¶
The following examples illustrate how to use the different, built-in, evolutionary computations. They are (hopefully) simple and self-explanatory. Please note that each one uses an existing benchmark problem. This is just an expedience; it is not necessary. The full list of existing benchmarks can be found in the Library Reference. See the Tutorial for examples that do not make use of a benchmark problem.
Genetic Algorithm¶
In this example, a GA is used to evolve a solution to the binary version of the Schwefel benchmark.
[download
]
from random import Random
from time import time
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Binary(inspyred.benchmarks.Schwefel(2),
dimension_bits=30)
ea = inspyred.ec.GA(prng)
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
maximize=problem.maximize,
bounder=problem.bounder,
max_evaluations=30000,
num_elites=1)
if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea
if __name__ == '__main__':
main(display=True)
Evolution Strategy¶
In this example, an ES is used to evolve a solution to the Rosenbrock benchmark.
[download
]
from random import Random
from time import time
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Rosenbrock(2)
ea = inspyred.ec.ES(prng)
ea.terminator = [inspyred.ec.terminators.evaluation_termination,
inspyred.ec.terminators.diversity_termination]
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000)
if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea
if __name__ == '__main__':
main(display=True)
Simulated Annealing¶
In this example, an SA is used to evolve a solution to the Sphere benchmark.
[download
]
from random import Random
from time import time
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Sphere(2)
ea = inspyred.ec.SA(prng)
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(evaluator=problem.evaluator,
generator=problem.generator,
maximize=problem.maximize,
bounder=problem.bounder,
max_evaluations=30000)
if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea
if __name__ == '__main__':
main(display=True)
Differential Evolution Algorithm¶
In this example, a DEA is used to evolve a solution to the Griewank benchmark.
[download
]
from random import Random
from time import time
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Griewank(2)
ea = inspyred.ec.DEA(prng)
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000)
if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea
if __name__ == '__main__':
main(display=True)
Estimation of Distribution Algorithm¶
In this example, an EDA is used to evolve a solution to the Rastrigin benchmark.
[download
]
from random import Random
from time import time
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Rastrigin(2)
ea = inspyred.ec.EDA(prng)
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(evaluator=problem.evaluator,
generator=problem.generator,
pop_size=1000,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000,
num_selected=500,
num_offspring=1000,
num_elites=1)
if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea
if __name__ == '__main__':
main(display=True)
Pareto Archived Evolution Strategy (PAES)¶
In this example, a PAES is used to evolve a solution to the Kursawe multiobjective benchmark.
[download
]
from random import Random
from time import time
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Kursawe(3)
ea = inspyred.ec.emo.PAES(prng)
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=10000,
max_archive_size=100,
num_grid_divisions=4)
if display:
final_arc = ea.archive
print('Best Solutions: \n')
for f in final_arc:
print(f)
import matplotlib.pyplot as plt
x = []
y = []
for f in final_arc:
x.append(f.fitness[0])
y.append(f.fitness[1])
plt.scatter(x, y, color='b')
plt.savefig('{0} Example ({1}).pdf'.format(ea.__class__.__name__,
problem.__class__.__name__),
format='pdf')
plt.show()
return ea
if __name__ == '__main__':
main(display=True)
Nondominated Sorting Genetic Algorithm (NSGA-II)¶
In this example, an NSGA2 is used to evolve a solution to the Kursawe multiobjective benchmark.
[download
]
from random import Random
from time import time
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Kursawe(3)
ea = inspyred.ec.emo.NSGA2(prng)
ea.variator = [inspyred.ec.variators.blend_crossover,
inspyred.ec.variators.gaussian_mutation]
ea.terminator = inspyred.ec.terminators.generation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
maximize=problem.maximize,
bounder=problem.bounder,
max_generations=80)
if display:
final_arc = ea.archive
print('Best Solutions: \n')
for f in final_arc:
print(f)
import matplotlib.pyplot as plt
x = []
y = []
for f in final_arc:
x.append(f.fitness[0])
y.append(f.fitness[1])
plt.scatter(x, y, color='b')
plt.savefig('{0} Example ({1}).pdf'.format(ea.__class__.__name__,
problem.__class__.__name__),
format='pdf')
plt.show()
return ea
if __name__ == '__main__':
main(display=True)
Particle Swarm Optimization¶
In this example, a PSO is used to evolve a solution to the Ackley benchmark.
[download
]
from time import time
from random import Random
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Ackley(2)
ea = inspyred.swarm.PSO(prng)
ea.terminator = inspyred.ec.terminators.evaluation_termination
ea.topology = inspyred.swarm.topologies.ring_topology
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000,
neighborhood_size=5)
if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea
if __name__ == '__main__':
main(display=True)
Ant Colony Optimization¶
In this example, an ACS is used to evolve a solution to the TSP benchmark.
[download
]
from random import Random
from time import time
import math
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
points = [(110.0, 225.0), (161.0, 280.0), (325.0, 554.0), (490.0, 285.0),
(157.0, 443.0), (283.0, 379.0), (397.0, 566.0), (306.0, 360.0),
(343.0, 110.0), (552.0, 199.0)]
weights = [[0 for _ in range(len(points))] for _ in range(len(points))]
for i, p in enumerate(points):
for j, q in enumerate(points):
weights[i][j] = math.sqrt((p[0] - q[0])**2 + (p[1] - q[1])**2)
problem = inspyred.benchmarks.TSP(weights)
ac = inspyred.swarm.ACS(prng, problem.components)
ac.terminator = inspyred.ec.terminators.generation_termination
final_pop = ac.evolve(generator=problem.constructor,
evaluator=problem.evaluator,
bounder=problem.bounder,
maximize=problem.maximize,
pop_size=10,
max_generations=50)
if display:
best = max(ac.archive)
print('Best Solution:')
for b in best.candidate:
print(points[b.element[0]])
print(points[best.candidate[-1].element[1]])
print('Distance: {0}'.format(1/best.fitness))
return ac
if __name__ == '__main__':
main(display=True)
Customized Algorithms¶
The true benefit of the inspyred library is that it allows the programmer to customize almost every aspect of the algorithm. This is accomplished primarily through the use of function (or function-like) callbacks that can be specified by the programmer. The following examples show how to customize many different parts of the evolutionary computation.
Custom Evolutionary Computation¶
In this example, an evolutionary computation is created which uses tournament selection,
uniform crossover, Gaussian mutation, and steady-state replacement.
[download
]
from random import Random
from time import time
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Ackley(2)
ea = inspyred.ec.EvolutionaryComputation(prng)
ea.selector = inspyred.ec.selectors.tournament_selection
ea.variator = [inspyred.ec.variators.uniform_crossover,
inspyred.ec.variators.gaussian_mutation]
ea.replacer = inspyred.ec.replacers.steady_state_replacement
ea.terminator = inspyred.ec.terminators.generation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
tournament_size=7,
num_selected=2,
max_generations=300,
mutation_rate=0.2)
if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea
if __name__ == '__main__':
main(display=True)
Custom Archiver¶
The purpose of the archiver is to provide a mechanism for candidate solutions to be
maintained without necessarily remaining in the population. This is important for
most multiobjective evolutionary approaches, but it can also be useful for single-objective
problems, as well. In this example, an archiver is created that maintains the worst
individual found. (There is no imaginable reason why one might actually do this. It
is just for illustration purposes.)
[download
]
from random import Random
from time import time
import inspyred
def my_archiver(random, population, archive, args):
worst_in_pop = min(population)
if len(archive) > 0:
worst_in_arc = min(archive)
if worst_in_pop < worst_in_arc:
return [worst_in_pop]
else:
return archive
else:
return [worst_in_pop]
if __name__ == '__main__':
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Rosenbrock(2)
ea = inspyred.ec.ES(prng)
ea.observer = [inspyred.ec.observers.stats_observer,
inspyred.ec.observers.archive_observer]
ea.archiver = my_archiver
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000)
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
print(ea.archive)
Custom Observer¶
Sometimes it is helpful to see certain aspects of the current population as it evolves.
The purpose of the “observer” functions is to provide a function that executes at the
end of each generation so that the process can be monitored accordingly. In this example,
the only information desired at each generation is the current best individual.
[download
]
from random import Random
from time import time
import inspyred
def my_observer(population, num_generations, num_evaluations, args):
best = max(population)
print('{0:6} -- {1} : {2}'.format(num_generations,
best.fitness,
str(best.candidate)))
if __name__ == '__main__':
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Rastrigin(2)
ea = inspyred.ec.ES(prng)
ea.observer = my_observer
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000)
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
Custom Replacer¶
The replacers are used to determine which of the parents, offspring, and current population
should survive into the next generation. In this example, survivors are determined to be top 50% of
individuals from the population along with 50% chosen randomly from the offspring. (Once again, this
is simply an example. There may be no good reason to create such a replacement scheme.)
[download
]
from random import Random
from time import time
import inspyred
def my_replacer(random, population, parents, offspring, args):
psize = len(population)
population.sort(reverse=True)
survivors = population[:psize // 2]
num_remaining = psize - len(survivors)
for i in range(num_remaining):
survivors.append(random.choice(offspring))
return survivors
if __name__ == '__main__':
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Ackley(2)
ea = inspyred.ec.ES(prng)
ea.replacer = my_replacer
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000)
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
Custom Selector¶
The selectors are used to determine which individuals in the population should become parents.
In this example, parents are chosen such that 50% of the time the best individual in the population
is chosen to be a parent and 50% of the time a random individual is chosen. As before, this is an
example selector that may have little practical value.
[download
]
from random import Random
from time import time
import inspyred
def my_selector(random, population, args):
n = args.get('num_selected', 2)
best = max(population)
selected = []
for i in range(n):
if random.random() <= 0.5:
selected.append(best)
else:
selected.append(random.choice(population))
return selected
if __name__ == '__main__':
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Griewank(2)
ea = inspyred.ec.DEA(prng)
ea.selector = my_selector
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=100,
bounder=problem.bounder,
maximize=problem.maximize,
max_evaluations=30000)
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
Custom Terminator¶
The terminators are used to determine when the evolutionary process should end. All terminators
return a Boolean value where True implies that the evolution should end. In this example, the evolution
should continue until the average Hamming distance between all combinations of candidates falls below
a specified minimum.
[download
]
from random import Random
from time import time
import itertools
import inspyred
def my_terminator(population, num_generations, num_evaluations, args):
min_ham_dist = args.get('minimum_hamming_distance', 30)
ham_dist = []
for x, y in itertools.combinations(population, 2):
ham_dist.append(sum(a != b for a, b in zip(x.candidate, y.candidate)))
avg_ham_dist = sum(ham_dist) / float(len(ham_dist))
return avg_ham_dist <= min_ham_dist
if __name__ == '__main__':
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Binary(inspyred.benchmarks.Schwefel(2),
dimension_bits=30)
ea = inspyred.ec.GA(prng)
ea.terminator = my_terminator
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=10,
maximize=problem.maximize,
bounder=problem.bounder,
num_elites=1,
minimum_hamming_distance=12)
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
Custom Variator¶
The variators provide what are normally classified as “crossover” and “mutation,” however
even more exotic variators can be defined. Remember that a list of variators can be specified
that will act as a pipeline with the output of the first being used as the input to the second, etc.
In this example, the binary candidate is mutated such that two points are chosen and the bits
between each point are put in reverse order. For example, 0010100100
would become 0010010100
if the third and eighth bits are the chosen points.
[download
]
from random import Random
from time import time
import inspyred
# Note that we could have used the @inspyred.ec.variators.mutator
# decorator here and simplified our custom variator to
#
# def my_variator(random, candidate, args)
#
# where candidate is a single candidate. Such a function would
# just return the single mutant.
def my_variator(random, candidates, args):
mutants = []
for c in candidates:
points = random.sample(range(len(c)), 2)
x, y = min(points), max(points)
if x == 0:
mutants.append(c[y::-1] + c[y+1:])
else:
mutants.append(c[:x] + c[y:x-1:-1] + c[y+1:])
return mutants
if __name__ == '__main__':
prng = Random()
prng.seed(time())
problem = inspyred.benchmarks.Binary(inspyred.benchmarks.Schwefel(2),
dimension_bits=30)
ea = inspyred.ec.GA(prng)
ea.variator = my_variator
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=10,
maximize=problem.maximize,
bounder=problem.bounder,
num_elites=1,
max_evaluations=20000)
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
Advanced Usage¶
The examples in this section deal with less commonly used aspects of the library. Be aware that these parts may not have received as much testing as the more core components exemplified above.
Discrete Optimization¶
Discrete optimization problems often present difficulties for naive evolutionary computation approaches. Special care must be taken to generate and maintain feasible solutions and/or to sufficiently penalize infeasible solutions. Ant colony optimization approaches were created to deal with discrete optimization problems. In these examples, we consider two of the most famous discrete optimization benchmark problems – the Traveling Salesman Problem (TSP) and the Knapsack problem. The background on these problems is omitted here because it can easily be found elsewhere.
The Traveling Salesman Problem¶
Candidate solutions for the TSP can be most easily be represented as permutations
of the list of city numbers (enumerating the order in which cities should be
visited). For instance, if there are 5 cities, then a candidate solution might
be [4, 1, 0, 2, 3]
. This is how the TSP
benchmark represents solutions.
However, this simple representation forces us to work harder to ensure that our
solutions remain feasible during crossover and mutation. Therefore, we need
to use variators suited to the task, as shown in the example below.
[download
]
from random import Random
from time import time
import math
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
points = [(110.0, 225.0), (161.0, 280.0), (325.0, 554.0), (490.0, 285.0),
(157.0, 443.0), (283.0, 379.0), (397.0, 566.0), (306.0, 360.0),
(343.0, 110.0), (552.0, 199.0)]
weights = [[0 for _ in range(len(points))] for _ in range(len(points))]
for i, p in enumerate(points):
for j, q in enumerate(points):
weights[i][j] = math.sqrt((p[0] - q[0])**2 + (p[1] - q[1])**2)
problem = inspyred.benchmarks.TSP(weights)
ea = inspyred.ec.EvolutionaryComputation(prng)
ea.selector = inspyred.ec.selectors.tournament_selection
ea.variator = [inspyred.ec.variators.partially_matched_crossover,
inspyred.ec.variators.inversion_mutation]
ea.replacer = inspyred.ec.replacers.generational_replacement
ea.terminator = inspyred.ec.terminators.generation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
bounder=problem.bounder,
maximize=problem.maximize,
pop_size=100,
max_generations=50,
tournament_size=5,
num_selected=100,
num_elites=1)
if display:
best = max(ea.population)
print('Best Solution: {0}: {1}'.format(str(best.candidate), 1/best.fitness))
return ea
if __name__ == '__main__':
main(display=True)
As an alternative, we can use ant colony optimization to solve the TSP. This
example was shown previously, but it is presented again here for completeness.
[download
]
from random import Random
from time import time
import math
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
points = [(110.0, 225.0), (161.0, 280.0), (325.0, 554.0), (490.0, 285.0),
(157.0, 443.0), (283.0, 379.0), (397.0, 566.0), (306.0, 360.0),
(343.0, 110.0), (552.0, 199.0)]
weights = [[0 for _ in range(len(points))] for _ in range(len(points))]
for i, p in enumerate(points):
for j, q in enumerate(points):
weights[i][j] = math.sqrt((p[0] - q[0])**2 + (p[1] - q[1])**2)
problem = inspyred.benchmarks.TSP(weights)
ac = inspyred.swarm.ACS(prng, problem.components)
ac.terminator = inspyred.ec.terminators.generation_termination
final_pop = ac.evolve(generator=problem.constructor,
evaluator=problem.evaluator,
bounder=problem.bounder,
maximize=problem.maximize,
pop_size=10,
max_generations=50)
if display:
best = max(ac.archive)
print('Best Solution:')
for b in best.candidate:
print(points[b.element[0]])
print(points[best.candidate[-1].element[1]])
print('Distance: {0}'.format(1/best.fitness))
return ac
if __name__ == '__main__':
main(display=True)
The Knapsack Problem¶
Candidate solutions for the Knapsack problem can be represented as either a binary
list (for the 0/1 Knapsack) or as a list of non-negative integers (for the Knapsack
with duplicates). In each case, the list is the same length as the number of items,
and each element of the list corresponds to the quantity of the corresponding item
to place in the knapsack. For the evolutionary computation, we can use
uniform_crossover
and gaussian_mutation
. The reason we are able to use
Gaussian mutation here, even though the candidates are composed of discrete values,
is because the bounder created by the Knapsack
benchmark is an instance of
DiscreteBounder
, which automatically moves an illegal component to its nearest
legal value.
[download
]
from random import Random
from time import time
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
items = [(7,369), (10,346), (11,322), (10,347), (12,348), (13,383),
(8,347), (11,364), (8,340), (8,324), (13,365), (12,314),
(13,306), (13,394), (7,326), (11,310), (9,400), (13,339),
(5,381), (14,353), (6,383), (9,317), (6,349), (11,396),
(14,353), (9,322), (5,329), (5,386), (5,382), (4,369),
(6,304), (10,392), (8,390), (8,307), (10,318), (13,359),
(9,378), (8,376), (11,330), (9,331)]
problem = inspyred.benchmarks.Knapsack(15, items, duplicates=True)
ea = inspyred.ec.EvolutionaryComputation(prng)
ea.selector = inspyred.ec.selectors.tournament_selection
ea.variator = [inspyred.ec.variators.uniform_crossover,
inspyred.ec.variators.gaussian_mutation]
ea.replacer = inspyred.ec.replacers.steady_state_replacement
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=problem.generator,
evaluator=problem.evaluator,
bounder=problem.bounder,
maximize=problem.maximize,
pop_size=100,
max_evaluations=2500,
tournament_size=5,
num_selected=2)
if display:
best = max(ea.population)
print('Best Solution: {0}: {1}'.format(str(best.candidate),
best.fitness))
return ea
if __name__ == '__main__':
main(display=True)
Once again, as an alternative we can use ant colony optimization. Just for variety,
we’ll use it to solve the 0/1 Knapsack problem (duplicates=False
).
[download
]
from random import Random
from time import time
import math
import inspyred
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
items = [(7,369), (10,346), (11,322), (10,347), (12,348), (13,383),
(8,347), (11,364), (8,340), (8,324), (13,365), (12,314),
(13,306), (13,394), (7,326), (11,310), (9,400), (13,339),
(5,381), (14,353), (6,383), (9,317), (6,349), (11,396),
(14,353), (9,322), (5,329), (5,386), (5,382), (4,369),
(6,304), (10,392), (8,390), (8,307), (10,318), (13,359),
(9,378), (8,376), (11,330), (9,331)]
problem = inspyred.benchmarks.Knapsack(15, items, duplicates=False)
ac = inspyred.swarm.ACS(prng, problem.components)
ac.terminator = inspyred.ec.terminators.generation_termination
final_pop = ac.evolve(problem.constructor, problem.evaluator,
maximize=problem.maximize, pop_size=50,
max_generations=50)
if display:
best = max(ac.archive)
print('Best Solution: {0}: {1}'.format(str(best.candidate),
best.fitness))
return ac
if __name__ == '__main__':
main(display=True)
Evaluating Individuals Concurrently¶
One of the most lauded aspects of many bio-inspired algorithms is their
inherent parallelism. Taking advantage of this parallelism is important in
real-world problems, which are often computationally expensive. There are
two approaches available in inspyred to perform parallel evaluations for
candidate solutions. The first makes use of the multiprocessing
module
that exists in core Python 2.6+. The second makes use of a third-party
library called Parallel Python (pp),
which can be used for either multi-core processing on a single machine or
distributed processing across a network.
The multiprocessing
Module¶
Using the multiprocessing
approach to parallel evaluations is probably
the better choice if all evaluations are going to be split among multiple
processors or cores on a single machine. This is because the module is part
of the core Python library (2.6+) and provides a simple, standard interface
for setting up the parallelism. As shown in the example below, the only
additional parameter is the name of the “actual” evaluation function and
the (optional) number of CPUs to use.
[download
]
from random import Random
from time import time
import inspyred
import math
def generate_rastrigin(random, args):
size = args.get('num_inputs', 10)
return [random.uniform(-5.12, 5.12) for i in range(size)]
def evaluate_rastrigin(candidates, args):
fitness = []
for cs in candidates:
fit = 10 * len(cs) + sum([((x - 1)**2 - 10 *
math.cos(2 * math.pi * (x - 1)))
for x in cs])
fitness.append(fit)
return fitness
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
ea = inspyred.ec.DEA(prng)
if display:
ea.observer = inspyred.ec.observers.stats_observer
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=generate_rastrigin,
evaluator=inspyred.ec.evaluators.parallel_evaluation_mp,
mp_evaluator=evaluate_rastrigin,
mp_nprocs=8,
pop_size=8,
bounder=inspyred.ec.Bounder(-5.12, 5.12),
maximize=False,
max_evaluations=256,
num_inputs=3)
if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea
if __name__ == '__main__':
main(display=True)
Parallel Python¶
The Parallel Python approach to multiprocessing is best suited for use across
a network of computers (though it can be used on a single machine, as well).
It takes just a little effort to install and setup pp
on each machine, but once it’s complete, the network becomes a very efficient
computing cluster. This is a capability that the multiprocessing
approach
simply does not have, and it provides incredible scalability. However, pp
requires additional, non-standard parameters to be passed in, as illustrated
in the example below.
[download
]
from random import Random
from time import time
import inspyred
import math
# Define an additional "necessary" function for the evaluator
# to see how it must be handled when using pp.
def my_squaring_function(x):
return x**2
def generate_rastrigin(random, args):
size = args.get('num_inputs', 10)
return [random.uniform(-5.12, 5.12) for i in range(size)]
def evaluate_rastrigin(candidates, args):
fitness = []
for cs in candidates:
fit = 10 * len(cs) + sum([(my_squaring_function(x - 1) -
10 * math.cos(2 * math.pi * (x - 1)))
for x in cs])
fitness.append(fit)
return fitness
def main(prng=None, display=False):
if prng is None:
prng = Random()
prng.seed(time())
ea = inspyred.ec.DEA(prng)
if display:
ea.observer = inspyred.ec.observers.stats_observer
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generator=generate_rastrigin,
evaluator=inspyred.ec.evaluators.parallel_evaluation_pp,
pp_evaluator=evaluate_rastrigin,
pp_dependencies=(my_squaring_function,),
pp_modules=("math",),
pop_size=8,
bounder=inspyred.ec.Bounder(-5.12, 5.12),
maximize=False,
max_evaluations=256,
num_inputs=3)
if display:
best = max(final_pop)
print('Best Solution: \n{0}'.format(str(best)))
return ea
if __name__ == '__main__':
main(display=True)
Island Models¶
Along with parallel evaluation of the fitness, it is also possible to create
different populations that evolve independently but are capable of sharing
solutions among themselves. Such approaches are known as “island models” and
can be accomplished within inspyred by making use of the migrator callback.
The following example illustrates a very simple two-island model using the
inspyred.ec.migrators.MultiprocessingMigrator
migrator. Remember that
custom migrators can easily be constructed for more specific needs.
[download
]
import random
import time
import multiprocessing
import inspyred
def create_island(rand_seed, island_number, problem, mp_migrator):
evals = 200
psize = 20
rand = random.Random()
rand.seed(rand_seed)
ec = inspyred.ec.EvolutionaryComputation(rand)
ec.observer = [inspyred.ec.observers.stats_observer, inspyred.ec.observers.file_observer]
ec.terminator = inspyred.ec.terminators.evaluation_termination
ec.selector = inspyred.ec.selectors.tournament_selection
ec.replacer = inspyred.ec.replacers.generational_replacement
ec.variator = [inspyred.ec.variators.blend_crossover, inspyred.ec.variators.gaussian_mutation]
ec.migrator = mp_migrator
final_pop = ec.evolve(generator=problem.generator,
evaluator=problem.evaluator,
bounder=problem.bounder,
maximize=problem.maximize,
statistics_file=open("stats_%d.csv" % island_number, "w"),
individuals_file=open("inds_%d.csv" % island_number, "w"),
max_evaluations=evals,
num_elites=1,
pop_size=psize,
num_selected=psize,
evaluate_migrant=False)
if __name__ == "__main__":
cpus = 2
problem = inspyred.benchmarks.Rastrigin(2)
mp_migrator = inspyred.ec.migrators.MultiprocessingMigrator(1)
rand_seed = int(time.time())
jobs = []
for i in range(cpus):
p = multiprocessing.Process(target=create_island, args=(rand_seed + i, i, problem, mp_migrator))
p.start()
jobs.append(p)
for j in jobs:
j.join()
Replacement via Niching¶
An example of a not-quite-standard replacer is the crowding_replacement
which
provides a niching capability. An example using this replacer is given below. Here,
the candidates are just single numbers (but created as singleton lists so as to
be Sequence
types for some of the built-in operators) between 0 and 26. Their
fitness values are simply the value of the sine function using the candidate value
as input. Since the sine function is periodic and goes through four periods between
0 and 26, this function is multimodal. So we can use crowding_replacement
to
ensure that all maxima are found.
[download
]
import random
import time
import math
import itertools
import inspyred
def my_distance(x, y):
return sum([abs(a - b) for a, b in zip(x, y)])
def generate(random, args):
return [random.uniform(0, 26)]
def evaluate(candidates, args):
fitness = []
for cand in candidates:
fit = sum([math.sin(c) for c in cand])
fitness.append(fit)
return fitness
def main(prng=None, display=False):
if prng is None:
prng = random.Random()
prng.seed(time.time())
ea = inspyred.ec.EvolutionaryComputation(prng)
ea.selector = inspyred.ec.selectors.tournament_selection
ea.replacer = inspyred.ec.replacers.crowding_replacement
ea.variator = inspyred.ec.variators.gaussian_mutation
ea.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = ea.evolve(generate, evaluate, pop_size=30,
bounder=inspyred.ec.Bounder(0, 26),
max_evaluations=10000,
num_selected=30,
mutation_rate=1.0,
crowding_distance=10,
distance_function=my_distance)
if display:
import matplotlib.pyplot as plt
x = []
y = []
for p in final_pop:
x.append(p.candidate[0])
y.append(math.sin(p.candidate[0]))
t = [(i / 1000.0) * 26.0 for i in range(1000)]
s = [math.sin(a) for a in t]
plt.plot(t, s, color='b')
plt.scatter(x, y, color='r')
plt.axis([0, 26, 0, 1.1])
plt.savefig('niche_example.pdf', format='pdf')
plt.show()
return ea
if __name__ == '__main__':
main(display=True)