Recipes¶
This section provides a set of recipes that can be used to add additional functionality to inspyred. These recipes are not a part of the core library, but they have proven to be useful in the past for real-world programs. If they continue to be useful, they may be incorporated into inspyred in a future version.
Lexicographic Ordering¶
In multiobjective optimization problems, alternatives to Pareto preference include linear weighting of the objectives and prioritizing the objectives from most to least important. Both of these methods essentially reduce the problem to a single objective optimization. Obviously, the weighting of the objectives would be handled entirely in the evaluator for the problem, so no special recipe is needed. But the prioritizing of the objectives, which is also known as lexicographic ordering, requires some additional effort.
The fitness values for two individuals, x and y, should be compared such that, if the first objective for x is “better” (i.e., lower when minimizing or higher when maximizing) than the first objective for y, then x is considered “better” than y. If they are equal in that objective, then the second objective is considered in the same way. This process is repeated for all objectives.
The following recipe provides a class to deal with such comparisons that is intended to function much like
the inspyred.ec.emo.Pareto
class.
[download
]
import functools
@functools.total_ordering
class Lexicographic(object):
def __init__(self, values=None, maximize=True):
if values is None:
values = []
self.values = values
try:
iter(maximize)
except TypeError:
maximize = [maximize for v in values]
self.maximize = maximize
def __len__(self):
return len(self.values)
def __getitem__(self, key):
return self.values[key]
def __iter__(self):
return iter(self.values)
def __lt__(self, other):
for v, o, m in zip(self.values, other.values, self.maximize):
if m:
if v < o:
return True
elif v > o:
return False
else:
if v > o:
return True
elif v < o:
return False
return False
def __eq__(self, other):
return (self.values == other.values and self.maximize == other.maximize)
def __str__(self):
return str(self.values)
def __repr__(self):
return str(self.values)
def my_evaluator(candidates, args):
fitness = []
for candidate in candidates:
f = candidate[0] ** 2 + 1
g = candidate[0] ** 2 - 1
fitness.append(Lexicographic([f, g], maximize=False))
return fitness
def my_generator(random, args):
return [random.random()]
if __name__ == '__main__':
a = Lexicographic([1, 2, 3], maximize=True)
b = Lexicographic([1, 3, 2], maximize=True)
c = Lexicographic([2, 1, 3], maximize=True)
d = Lexicographic([2, 3, 1], maximize=True)
e = Lexicographic([3, 1, 2], maximize=True)
f = Lexicographic([3, 2, 1], maximize=True)
u = Lexicographic([1, 2, 3], maximize=False)
v = Lexicographic([1, 3, 2], maximize=False)
w = Lexicographic([2, 1, 3], maximize=False)
x = Lexicographic([2, 3, 1], maximize=False)
y = Lexicographic([3, 1, 2], maximize=False)
z = Lexicographic([3, 2, 1], maximize=False)
for p in [a, b, c, d, e, f]:
for q in [a, b, c, d, e, f]:
print('%s < %s : %s' % (p, q, p < q))
print('----------------------------------------')
for p in [u, v, w, x, y, z]:
for q in [u, v, w, x, y, z]:
print('%s < %s : %s' % (p, q, p < q))
Constraint Selection¶
Optimization problems often have to deal with constraints and constraint violations. The following recipe
provides one example of how to handle such a thing with inspyred. Here, candidates represent ordered pairs
and their fitness is simply their distance from the origin. However, we provide a constraint that punishes
candidates that lie outside of the unit circle. Such a scenario should produce a candidate that lies on the
unit circle. Note also that crowding_replacement
or some other fitness sharing or niching scheme could
be used to generate many such points on the circle.
[download
]
import random
from inspyred import ec
from inspyred.ec import variators
from inspyred.ec import replacers
from inspyred.ec import terminators
from inspyred.ec import observers
def my_constraint_function(candidate):
"""Return the number of constraints that candidate violates."""
# In this case, we'll just say that the point has to lie
# within a circle centered at (0, 0) of radius 1.
if candidate[0]**2 + candidate[1]**2 > 1:
return 1
else:
return 0
def my_generator(random, args):
# Create pairs in the range [-2, 2].
return [random.uniform(-2.0, 2.0) for i in range(2)]
def my_evaluator(candidates, args):
# The fitness will be how far the point is from
# the origin. (We're maximizing, in this case.)
# Note that the constraint heavily punishes individuals
# who go beyond the unit circle. Therefore, these
# two functions combined focus the evolution toward
# finding individual who lie ON the circle.
fitness = []
for c in candidates:
if my_constraint_function(c) > 0:
fitness.append(-1)
else:
fitness.append(c[0]**2 + c[1]**2)
return fitness
def constrained_tournament_selection(random, population, args):
num_selected = args.setdefault('num_selected', 1)
constraint_func = args.setdefault('constraint_function', None)
tournament_size = 2
pop = list(population)
selected = []
for _ in range(num_selected):
tournament = random.sample(pop, tournament_size)
# If there is not a constraint function,
# just do regular tournament selection.
if constraint_func is None:
selected.append(max(tournament))
else:
cons = [constraint_func(t.candidate) for t in tournament]
# If no constraints are violated, just do
# regular tournament selection.
if max(cons) == 0:
selected.append(max(tournament))
# Otherwise, choose the least violator
# (which may be a non-violator).
else:
selected.append(tournament[cons.index(min(cons))])
return selected
r = random.Random()
myec = ec.EvolutionaryComputation(r)
myec.selector = constrained_tournament_selection
myec.variator = variators.gaussian_mutation
myec.replacer = replacers.generational_replacement
myec.terminator = terminators.evaluation_termination
myec.observer = observers.stats_observer
pop = myec.evolve(my_generator, my_evaluator,
pop_size=100,
bounder=ec.Bounder(-2, 2),
num_selected=100,
constraint_func=my_constraint_function,
mutation_rate=0.5,
max_evaluations=2000)
import matplotlib.pyplot as plt
import numpy
x = []
y = []
c = []
pop.sort()
num_feasible = len([p for p in pop if p.fitness >= 0])
feasible_count = 0
for i, p in enumerate(pop):
x.append(p.candidate[0])
y.append(p.candidate[1])
if i == len(pop) - 1:
c.append('r')
elif p.fitness < 0:
c.append('0.98')
else:
c.append(str(1 - feasible_count / float(num_feasible)))
feasible_count += 1
angles = numpy.linspace(0, 2*numpy.pi, 100)
plt.plot(numpy.cos(angles), numpy.sin(angles), color='b')
plt.scatter(x, y, color=c)
plt.savefig('constraint_example.pdf', format='pdf')
Meta-Evolutionary Computation¶
The following recipe shows how an evolutionary computation can be used to evolve near-optimal operators and
parameters for another evolutionary computation. In the EC literature, such a thing is generally referred
to as a “meta-EC”.
[download
]
import csv
import time
import random
from inspyred import ec
from inspyred.ec import selectors
from inspyred.ec import replacers
from inspyred.ec import variators
from inspyred.ec import terminators
from inspyred.ec import observers
class MetaEC(ec.EvolutionaryComputation):
def __init__(self, random):
ec.EvolutionaryComputation.__init__(self, random)
self.selector = selectors.tournament_selection
self.replacer = replacers.generational_replacement
self.variator = [variators.uniform_crossover, self._internal_variator]
self.terminator = self._internal_meta_terminator
def _create_selector_replacer(self, random):
pop_size = random.randint(1, 100)
selector = random.choice(range(0, 5))
replacer = random.choice(range(0, 8))
sel = [selector]
if selector > 0:
if replacer == 0 or replacer == 2 or replacer == 3:
sel.append(pop_size)
else:
sel.append(random.randint(1, pop_size))
if selector == 2:
sel.append(random.randint(min(2, pop_size), pop_size))
rep = [replacer]
if replacer == 1:
rep.append(random.randint(min(2, pop_size), pop_size))
elif replacer == 3 or replacer == 5:
rep.append(random.randint(0, pop_size))
return [pop_size, sel, rep]
def _create_variators(self, random):
crossover = random.choice([0, 1, 2, 3, 4, 6])
mutator = random.choice([5, 6])
variators = ([crossover], [mutator])
if crossover == 0 or crossover == 4:
variators[0].append(random.random())
variators[0].append(random.random())
elif crossover == 1:
variators[0].append(random.random())
elif crossover == 2:
variators[0].append(random.random())
variators[0].append(random.randint(1, 10))
elif crossover == 3:
variators[0].append(random.randint(0, 30))
if mutator == 5:
variators[1].append(random.random())
variators[1].append(random.random())
return variators
def _internal_generator(self, random, args):
cross, mut = self._create_variators(random)
return [self._create_selector_replacer(random), cross, mut]
def _internal_variator(self, random, candidates, args):
cs_copy = list(candidates)
for i, cs in enumerate(cs_copy):
if random.random() < 0.1:
cs_copy[i][0] = self._create_selector_replacer(random)
if random.random() < 0.1:
cross, mut = self._create_variators(random)
cs_copy[i][1] = cross
cs_copy[i][2] = mut
return cs_copy
def _internal_observer(self, population, num_generations, num_evaluations, args):
for i, p in enumerate(population):
self._observer_file.write('{0}, {1}, {2}\n'.format(i, p.fitness, str(p.candidate)))
self._observer_file.flush()
def _internal_terminator(self, population, num_generations, num_evaluations, args):
maxevals = args.get('max_evaluations', 0)
self._meta_evaluations += num_evaluations
return num_evaluations >= maxevals or self._meta_evaluations >= self._max_meta_evaluations
def _internal_meta_terminator(self, population, num_generations, num_evaluations, args):
return self._meta_evaluations >= self._max_meta_evaluations
def _internal_evaluator(self, candidates, args):
the_generator = args.get('the_generator')
the_evaluator = args.get('the_evaluator')
do_maximize = args.get('do_maximize', True)
fitness = []
for candidate in candidates:
popsize, selector, replacer, crossover, mutator, myargs = self.interpret_candidate(candidate)
myargs['max_evaluations'] = args.get('num_trial_evaluations', popsize * 10)
num_trials = args.get('num_trials', 1)
evo = ec.EvolutionaryComputation(self._random)
evo.terminator = self._internal_terminator
evo.observer = self._internal_observer
evo.selector = selector
evo.variator = [crossover, mutator]
evo.replacer = replacer
best_fit = []
for i in range(num_trials):
final_pop = evo.evolve(generator=the_generator,
evaluator=the_evaluator,
pop_size=popsize,
maximize=do_maximize,
args=myargs)
best_fit.append(final_pop[0].fitness)
fitness.append(sum(best_fit) / float(len(best_fit)))
return fitness
def interpret_candidate(self, candidate):
selector_mapping = (selectors.default_selection,
selectors.rank_selection,
selectors.tournament_selection,
selectors.truncation_selection,
selectors.uniform_selection)
variator_mapping = (variators.blend_crossover,
variators.heuristic_crossover,
variators.n_point_crossover,
variators.simulated_binary_crossover,
variators.uniform_crossover,
variators.gaussian_mutation,
variators.default_variation)
replacer_mapping = (replacers.comma_replacement,
replacers.crowding_replacement,
replacers.default_replacement,
replacers.generational_replacement,
replacers.plus_replacement,
replacers.random_replacement,
replacers.steady_state_replacement,
replacers.truncation_replacement)
myargs = dict()
# Selectors
if candidate[0][1][0] == 1:
myargs['num_selected'] = candidate[0][1][1]
elif candidate[0][1][0] == 2:
myargs['num_selected'] = candidate[0][1][1]
myargs['tournament_size'] = candidate[0][1][2]
elif candidate[0][1][0] == 3:
myargs['num_selected'] = candidate[0][1][1]
elif candidate[0][1][0] == 4:
myargs['num_selected'] = candidate[0][1][1]
# Replacers
if candidate[0][2][0] == 1:
myargs['crowding_distance'] = candidate[0][2][1]
elif candidate[0][2][0] == 3:
myargs['num_elites'] = candidate[0][2][1]
elif candidate[0][2][0] == 5:
myargs['num_elites'] = candidate[0][2][1]
# Crossovers
if candidate[1][0] == 0:
myargs['crossover_rate'] = candidate[1][1]
myargs['blx_alpha'] = candidate[1][2]
elif candidate[1][0] == 1:
myargs['crossover_rate'] = candidate[1][1]
elif candidate[1][0] == 2:
myargs['crossover_rate'] = candidate[1][1]
myargs['num_crossover_points'] = candidate[1][2]
elif candidate[1][0] == 3:
myargs['sbx_distribution_index'] = candidate[1][1]
elif candidate[1][0] == 4:
myargs['crossover_rate'] = candidate[1][1]
myargs['ux_bias'] = candidate[1][2]
# Mutators
if candidate[2][0] == 5:
myargs['mutation_rate'] = candidate[2][1]
myargs['gaussian_stdev'] = candidate[2][2]
return (candidate[0][0],
selector_mapping[candidate[0][1][0]],
replacer_mapping[candidate[0][2][0]],
variator_mapping[candidate[1][0]],
variator_mapping[candidate[2][0]],
myargs)
def evolve(self, generator, evaluator, pop_size=100, seeds=[], maximize=True, **args):
args.setdefault('the_generator', generator)
args.setdefault('the_evaluator', evaluator)
args.setdefault('do_maximize', maximize)
args.setdefault('num_elites', 1)
args.setdefault('num_selected', pop_size)
self._observer_file = open('metaec-individuals-file-' + time.strftime('%m%d%Y-%H%M%S') + '.csv', 'w')
self._meta_evaluations = 0
self._max_meta_evaluations = args.get('max_evaluations', 0)
final_pop = ec.EvolutionaryComputation.evolve(self, self._internal_generator,
self._internal_evaluator, pop_size,
seeds, maximize, **args)
self._observer_file.close()
return final_pop
if __name__ == '__main__':
import math
import inspyred
prng = random.Random()
prng.seed(time.time())
problem = inspyred.benchmarks.Rastrigin(3)
mec = MetaEC(prng)
mec.observer = observers.stats_observer
final_pop = mec.evolve(generator=problem.generator,
evaluator=problem.evaluator,
pop_size=10,
maximize=problem.maximize,
bounder=problem.bounder,
num_trials=1,
num_trial_evaluations=5000,
max_evaluations=100000)
pop_size, selector, replacer, crossover, mutator, args = mec.interpret_candidate(final_pop[0].candidate)
print('Best Fitness: {0}'.format(final_pop[0].fitness))
print('Population Size: {0}'.format(pop_size))
print('Selector: {0}'.format(selector.__name__))
print('Replacer: {0}'.format(replacer.__name__))
print('Crossover: {0}'.format(crossover.__name__))
print('Mutator: {0}'.format(mutator.__name__))
print('Parameters:')
for key in args:
print(' {0}: {1}'.format(key, args[key]))
print('Actual Evaluations Used: {0}'.format(mec._meta_evaluations))
Micro-Evolutionary Computation¶
Another approach that has been successfully applied to some difficult problems is to use many small-population EC’s
for small numbers of evaluations in succession. Each succeeding EC is seeded with the best solution from the
previous run. This is somewhat akin to a random-restart hill-climbing approach, except that information about the
best solution so far is passed along during each restart.
[download
]
import collections
import inspyred
class MicroEC(inspyred.ec.EvolutionaryComputation):
def __init__(self, random):
inspyred.ec.EvolutionaryComputation.__init__(self, random)
def evolve(self, generator, evaluator, pop_size=10, seeds=None, maximize=True, bounder=None, **args):
self._kwargs = args
self._kwargs['_ec'] = self
if seeds is None:
seeds = []
if bounder is None:
bounder = inspyred.ec.Bounder()
self.termination_cause = None
self.generator = generator
self.evaluator = evaluator
self.bounder = bounder
self.maximize = maximize
self.population = []
self.archive = []
microseeds = seeds
while not self._should_terminate(list(self.population), self.num_generations, self.num_evaluations):
microec = inspyred.ec.EvolutionaryComputation(self._random)
microec.selector = self.selector
microec.variator = self.variator
microec.replacer = self.replacer
microec.observer = self.observer
microec.terminator = inspyred.ec.terminators.evaluation_termination
maxevals = args['max_evaluations']
args['max_evaluations'] = args['micro_evaluations']
result = microec.evolve(generator=generator, evaluator=evaluator,
pop_size=pop_size, seeds=microseeds,
maximize=maximize, **args)
self.population = list(result)
args['max_evaluations'] = maxevals
result.sort(reverse=True)
microseeds = [result[0].candidate]
self.num_evaluations += microec.num_evaluations
# Migrate individuals.
self.population = self.migrator(random=self._random,
population=self.population,
args=self._kwargs)
# Archive individuals.
self.archive = self.archiver(random=self._random, archive=self.archive,
population=list(self.population), args=self._kwargs)
self.num_generations += microec.num_generations
if isinstance(self.observer, collections.Iterable):
for obs in self.observer:
obs(population=list(self.population), num_generations=self.num_generations,
num_evaluations=self.num_evaluations, args=self._kwargs)
else:
self.observer(population=list(self.population), num_generations=self.num_generations,
num_evaluations=self.num_evaluations, args=self._kwargs)
return self.population
if __name__ == '__main__':
import random
import math
import time
def rastrigin_generator(random, args):
return [random.uniform(-5.12, 5.12) for _ in range(2)]
def rastrigin_evaluator(candidates, args):
fitness = []
for cand in candidates:
fitness.append(10 * len(cand) + sum([x**2 - 10 * (math.cos(2*math.pi*x)) for x in cand]))
return fitness
prng = random.Random()
prng.seed(time.time())
micro = MicroEC(prng)
micro.selector = inspyred.ec.selectors.tournament_selection
micro.replacer = inspyred.ec.replacers.steady_state_replacement
micro.variator = [inspyred.ec.variators.uniform_crossover, inspyred.ec.variators.gaussian_mutation]
micro.archiver = inspyred.ec.archivers.best_archiver
micro.observer = inspyred.ec.observers.stats_observer
micro.terminator = inspyred.ec.terminators.evaluation_termination
final_pop = micro.evolve(rastrigin_generator,
rastrigin_evaluator,
pop_size=10,
maximize=False,
bounder=inspyred.ec.Bounder(-5.12, 5.12),
max_evaluations=3000,
micro_evaluations=300,
num_selected=2,
gaussian_stdev=0.1)
print('Actual evaluations: {0}'.format(micro.num_evaluations))
for p in micro.archive:
print p
Network Migrator¶
The following custom migrator is a callable class (because the migrator must behave like a callback function)
that allows solutions to migrate from one network machine to another. It is assumed that the EC islands are
running on the given IP:port combinations.
[download
]
import sys
import socket
import pickle
import threading
import collections
import SocketServer
class NetworkMigrator(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
"""Defines a migration function across a network.
This callable class acts as a migration function that
allows candidate solutions to migrate from one population
to another via TCP/IP connections.
The migrator is constructed by specifying the IP address
of the server (hosting the population from which individuals
emigrate) as an IP-port tuple and the addresses of the clients
(hosting the populations to which individuals from the server
immigrate) as a list of IP-port tuples. The ``max_migrants``
parameter specifies the size of the queue of migrants waiting
to immigrate to the server from the clients; the newest migrants
replace older ones in the queue.
Note: In order to use this migration operator, individuals
must be pickle-able.
The following is an example of the use of this operator::
m = NetworkMigrator(('192.168.1.10', 25125),
[('192.168.1.11', 12345), ('192.168.1.12', 54321)],
max_migrants=3)
Since the NetworkMigrator object is a server, it should always
call the ``shutdown()`` method when it is no longer needed, in
order to give back its resources.
Public Attributes:
- *client_addresses* -- the list of IP address tuples
(IP, port) to which individuals should migrate
- *migrants* -- the deque of migrants (of maximum size
specified by ``max_migrants``) waiting to immigrate
to client populations
"""
def __init__(self, server_address, client_addresses, max_migrants=1):
self._lock = threading.Lock()
SocketServer.TCPServer.__init__(self, server_address, None)
self.client_addresses = client_addresses
self.migrants = collections.deque(maxlen=max_migrants)
t = threading.Thread(target=self.serve_forever)
t.setDaemon(True)
t.start()
self.__name__ = self.__class__.__name__
def finish_request(self, request, client_address):
try:
rbufsize = -1
wbufsize = 0
rfile = request.makefile('rb', rbufsize)
wfile = request.makefile('wb', wbufsize)
pickle_data = rfile.readline().strip()
migrant = pickle.loads(pickle_data)
with self._lock:
self.migrants.append(migrant)
if not wfile.closed:
wfile.flush()
wfile.close()
rfile.close()
finally:
sys.exc_traceback = None
def __call__(self, random, population, args):
"""Perform the migration.
This function serves as the migration operator. Here, a random address
is chosen from the ``client_addresses`` list, and a random individual
is chosen from the population to become the migrant. A socket is opened
to the chosen client address, and the chosen migrant is pickled and
sent to the NetworkMigrator object running at the client address. Then
the migrant queue on the current machine is queried for a migrant
to replace the one sent. If one is found, it replaces the newly
migrated individual; otherwise, the individual remains in the population.
Any immigrants may also be re-evaluated before insertion into the
current population by setting the ``evaluate_migrant`` keyword
argument in ``args`` to True. This is useful if the evaluation
functions in different populations are different and we want to compare
"apples to apples," as they say.
Arguments:
- *random* -- the random number generator object
- *population* -- the population of Individuals
- *args* -- a dictionary of keyword arguments
Optional keyword arguments in the ``args`` parameter:
- *evaluate_migrant* -- whether to re-evaluate the immigrant (default False)
"""
evaluate_migrant = args.setdefault('evaluate_migrant', False)
client_address = random.choice(self.client_addresses)
migrant_index = random.randint(0, len(population) - 1)
pickle_data = pickle.dumps(population[migrant_index])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(client_address)
sock.send(pickle_data + '\n')
finally:
sock.close()
migrant = None
with self._lock:
if len(self.migrants) > 0:
migrant = self.migrants.popleft()
if migrant is not None:
if evaluate_migrant:
fit = args._ec.evaluator([migrant], args)
migrant.fitness = fit[0]
args._ec.num_evaluations += 1
population[migrant_index] = migrant
return population
def __str__(self):
return str(self.migrants)