commit 1871b3263c15a4e533b80fa90900c7d451a6ddb0 Author: Brandon Rozek Date: Wed Feb 27 09:54:47 2019 -0500 Added Genetic Algorithm and Evolutionary Strategies diff --git a/es_model_test.py b/es_model_test.py new file mode 100644 index 0000000..d37874a --- /dev/null +++ b/es_model_test.py @@ -0,0 +1,103 @@ +import random +import numpy as np +import rltorch +import torch +import torch.nn as nn +import torch.nn.functional as F +from torch.distributions import Categorical +import gym +from copy import deepcopy + +class Policy(nn.Module): + def __init__(self, state_size, action_size): + super(Policy, self).__init__() + self.state_size = state_size + self.action_size = action_size + + self.fc1 = nn.Linear(state_size, 125) + self.fc_norm = nn.LayerNorm(125) + + self.fc2 = nn.Linear(125, 125) + self.fc2_norm = nn.LayerNorm(125) + + self.action_prob = nn.Linear(125, action_size) + + def forward(self, x): + x = F.relu(self.fc_norm(self.fc1(x))) + x = F.relu(self.fc2_norm(self.fc2(x))) + x = F.softmax(self.action_prob(x), dim = 1) + return x + + + +env = gym.make("Acrobot-v1") +def fitness(model): + state = torch.from_numpy(env.reset()).float().unsqueeze(0) + total_reward = 0 + done = False + while not done: + action_probabilities = model(state) + distribution = Categorical(action_probabilities) + action = distribution.sample().item() + next_state, reward, done, _ = env.step(action) + total_reward += reward + state = torch.from_numpy(next_state).float().unsqueeze(0) + return -total_reward + +# make_model should be a function that returns a nn.Module +class Population: + def __init__(self, model, population_size, fitness_fn, learning_rate = 1e-1, sigma = 0.05): + self.model = model + self.optimizer = torch.optim.Adam(self.model.parameters(), lr = learning_rate) + self.population_size = population_size + self.sigma = sigma + self.learning_rate = learning_rate + assert self.sigma >= 0 + assert self.population_size > 0 + self.calculate_fitness = fitness_fn + + def __iter__(self): + return self + + # This function is suppose to take us to the next generation + def __next__(self): + ## Generate Noise + model_dict = self.model.state_dict() + white_noise_dict = {} + noise_dict = {} + for key in model_dict.keys(): + white_noise_dict[key] = torch.randn(self.population_size, *model_dict[key].shape) + noise_dict[key] = self.sigma * white_noise_dict[key] + + ## Generate candidate solutions + candidate_solutions = [] + for i in range(self.population_size): + candidate_statedict = {} + for key in model_dict.keys(): + candidate_statedict[key] = model_dict[key] + noise_dict[key][i] + candidate = Policy(self.model.state_size, self.model.action_size) + candidate.load_state_dict(candidate_statedict) + candidate_solutions.append(candidate) + + ## Calculate fitness + fitness_values = torch.tensor([self.calculate_fitness(x) for x in candidate_solutions]) + print("Average fitness: ", fitness_values.mean()) + # Mean shift, scale + fitness_values = (fitness_values - fitness_values.mean()) / (fitness_values.std() + np.finfo('float').eps) + + ## Insert adjustments into gradients slot + self.optimizer.zero_grad() + for name, param in self.model.named_parameters(): + if param.requires_grad: + noise_dim_n = len(white_noise_dict[name].shape) + dim = np.repeat(1, noise_dim_n - 1).tolist() if noise_dim_n > 0 else [] + param.grad = (white_noise_dict[name] * fitness_values.float().reshape(self.population_size, *dim)).mean(0) / self.sigma + self.optimizer.step() + + return deepcopy(self.model) + +p = Population(Policy(6, 3), 1000, fitness) + +def iterate(): + for i in range(10): + next(p) \ No newline at end of file diff --git a/es_test.py b/es_test.py new file mode 100644 index 0000000..24e7a2d --- /dev/null +++ b/es_test.py @@ -0,0 +1,44 @@ +import random +import numpy as np + + +# Let's solve the function f(x, y) = -2x^2 - 3(y - 4)^2 +def fitness(x): + return -2 * (x[:, 0] ** 2) - 3 * (x[:, 1] - 4)**2 + +class Population: + def __init__(self, initial_guess, population_size, fitness_fn, learning_rate = 1e-4, sigma = 0.1): + self.current_solution = initial_guess + self.population_size = population_size + self.sigma = sigma + self.learning_rate = learning_rate + assert self.population_size > 0 + assert self.sigma >= 0 + self.calculate_fitness = fitness_fn + + + def __iter__(self): + return self + + # This function is suppose to take us to the next generation + def __next__(self): + white_noise = np.random.randn(self.population_size, *self.current_solution.shape) + noise = self.sigma * white_noise + candidate_solutions = self.current_solution + noise + fitness_values = self.calculate_fitness(candidate_solutions) + # Mean shift and scale + fitness_values = (fitness_values - np.mean(fitness_values)) / (np.std(fitness_values) + np.finfo('float').eps) + new_solution = self.current_solution + self.learning_rate * np.mean(white_noise.T * fitness_values, axis = 1) / self.sigma + self.current_solution = new_solution + return new_solution + + def item(self): + return self.current_solution + + +def test(): + guess = np.random.randn(2) + p = Population(guess, 100, fitness) + for i in range(10000): + next(p) + return p.item() \ No newline at end of file diff --git a/ga_model_test.py b/ga_model_test.py new file mode 100644 index 0000000..5923fad --- /dev/null +++ b/ga_model_test.py @@ -0,0 +1,134 @@ +import random +import numpy as np +import rltorch +import torch +import torch.nn as nn +import torch.nn.functional as F +from torch.distributions import Categorical +import gym + +class Policy(nn.Module): + def __init__(self, state_size, action_size): + super(Policy, self).__init__() + self.state_size = state_size + self.action_size = action_size + + self.fc1 = nn.Linear(state_size, 125) + self.fc_norm = nn.LayerNorm(125) + + self.fc2 = nn.Linear(125, 125) + self.fc2_norm = nn.LayerNorm(125) + + self.action_prob = nn.Linear(125, action_size) + + def forward(self, x): + x = F.relu(self.fc_norm(self.fc1(x))) + x = F.relu(self.fc2_norm(self.fc2(x))) + x = F.softmax(self.action_prob(x), dim = 1) + return x + + +env = gym.make("Acrobot-v1") +def fitness(model_dict): + state_size = env.observation_space.shape[0] + action_size = env.action_space.n + model = Policy(state_size, action_size) + model.load_state_dict(model_dict) + state = torch.from_numpy(env.reset()).float().unsqueeze(0) + total_reward = 0 + done = False + while not done: + action_probabilities = model(state) + distribution = Categorical(action_probabilities) + action = distribution.sample().item() + next_state, reward, done, _ = env.step(action) + total_reward += reward + state = torch.from_numpy(next_state).float().unsqueeze(0) + return total_reward + + +# make_model should be a function that returns a nn.Module +class Population: + def __init__(self, model, population_size, fitness_fn, keep_best = 1, mutation_rate = 0.01, sigma = 0.1): + self.model = model + self.population_size = population_size + self.mutation_rate = mutation_rate + self.keep_best = keep_best + self.sigma = sigma + assert self.sigma >= 0 + assert self.keep_best >= 0 + assert self.population_size > 0 + assert self.keep_best < self.population_size + self.pop = self._generate_population(model, population_size) + + # Probability that an individual will last to the next generation + self.survivability = np.full(shape=(population_size), fill_value = 1 / population_size) + self.calculate_fitness = fitness_fn + + def _generate_population(self, model, population_size): + pop = [] + for i in range(population_size): + member = {} + for key, value in model.state_dict().items(): + member[key] = value + self.sigma * torch.randn(*value.shape) + pop.append(member) + return pop + + def _calculate_survivability(self, pop): + fitness = np.array(list(map(self.calculate_fitness, pop))) + # Make fitness non-negative + if fitness.min() <= 0: + fitness += (-1 * fitness.min()) + 1e-10 # Add some random constant to avoid 0 probability + return fitness / fitness.sum() + + def _select_survivors(self, population, survivability): + population_size = len(population) + survivors_indices = np.random.choice(range(0, population_size), size=(population_size - self.keep_best) * 2, p=survivability) + return [population[i] for i in survivors_indices] + + def _crossover(self, parents): + parent_ind = np.array(range(0, len(parents))) + parent1_ind = np.random.choice(parent_ind, size = len(parents) // 2, replace=False) + parent2_ind = np.setdiff1d(parent_ind, parent1_ind) + parent1 = [parents[i] for i in parent1_ind] + parent2 = [parents[i] for i in parent1_ind] + children = [] + for parent1, parent2 in zip(parent1, parent2): + child = {} + for key in parent1.keys(): + crossover_ind = random.randint(0, len(parent1[key])) + child_value = torch.cat((parent1[key][:crossover_ind], parent2[key][crossover_ind:])) + child_value = self._mutate(child_value) + child[key] = child_value + + children.append(child) + return children + + def _mutate(self, child): + if np.random.rand() < self.mutation_rate: + child += self.sigma * torch.randn(*child.shape) + return child + + def __iter__(self): + return self + + # This function is suppose to take us to the next generation + def __next__(self): + survivability = self._calculate_survivability(self.pop) + if self.keep_best > 0: + survivor_ind = np.argsort(survivability)[-self.keep_best:] + parents = self._select_survivors(self.pop, survivability) + children = self._crossover(parents) + next_pop = [self.pop[i] for i in survivor_ind] + children + self.pop = next_pop + return next_pop + + def solution(self): + return self.pop[self.survivability[-1]] + + +def test(): + p = Population(Policy(6, 3), 100, fitness) + for i in range(100): + next(p) + return p.solution() \ No newline at end of file diff --git a/ga_test.py b/ga_test.py new file mode 100644 index 0000000..ad1c628 --- /dev/null +++ b/ga_test.py @@ -0,0 +1,85 @@ +import random +import numpy as np + + +# Let's solve the function f(x, y) = -2x^2 - 3(y - 4)^2 +def fitness(x): + return -2 * (x[:, 0] ** 2) - 3 * (x[:, 1] - 4)**2 + +class Population: + def __init__(self, output_size, population_size, fitness_fn, low = 0., high = 1., keep_best = 1, mutation_rate = 0.001): + self.population_size = population_size + self.output_size = output_size + self.low = low + self.high = high + self.mutation_rate = mutation_rate + self.keep_best = keep_best + assert self.keep_best >= 0 + assert self.population_size > 0 + assert self.keep_best < self.population_size + self.pop = self._generate_population(output_size, population_size, low = low, high = high) + + # Probability that an individual will last to the next generation + self.survivability = np.full(shape=(population_size), fill_value = 1 / population_size) + self.calculate_fitness = fitness_fn + + def _generate_population(self, output_size, population_size, low = 0., high = 1.): + return np.random.uniform(low, high, size=(population_size, output_size)) + + def _calculate_survivability(self, pop): + fitness = self.calculate_fitness(pop) + # Make fitness non-negative + if fitness.min() <= 0: + fitness += (-1 * fitness.min()) + np.finfo('float').eps + return fitness / fitness.sum() + + def _select_survivors(self, population, survivability): + population_size = len(population) + survivors_indices = np.random.choice(range(0, population_size), size=(population_size - self.keep_best) * 2, p=survivability) + return population.take(survivors_indices, axis = 0) + + def _crossover(self, parents): + parent_ind = np.array(range(0, len(parents))) + parent1_ind = np.random.choice(parent_ind, size = len(parents) // 2, replace=False) + parent2_ind = np.setdiff1d(parent_ind, parent1_ind) + parents1 = parents[parent1_ind] + parents2 = parents[parent2_ind] + children = [] + for parent1, parent2 in zip(parents1, parents2): + crossover_ind = random.randint(0, self.output_size) + child = np.zeros_like(parent1) + child[:crossover_ind] = parent1[:crossover_ind] + child[crossover_ind:] = parent2[crossover_ind:] + child = self._mutate(child) + children.append(child) + return np.vstack(children) + + def _mutate(self, child): + for i in range(len(child)): + if np.random.rand() < self.mutation_rate: + child[i] = np.random.uniform(self.low, self.high) + return child + + def __iter__(self): + return self + + # This function is suppose to take us to the next generation + def __next__(self): + survivability = self._calculate_survivability(self.pop) + if self.keep_best > 0: + survivor_ind = np.argsort(survivability)[-self.keep_best:] + parents = self._select_survivors(self.pop, survivability) + children = self._crossover(parents) + next_pop = np.concatenate((self.pop.take(survivor_ind, axis = 0), children)) + self.pop = next_pop + return next_pop + + def solution(self): + return self.pop.take(sorted(self.survivability)[-1], axis = 0) + + +def test(): + p = Population(2, 100, fitness, low = -10, high = 10) + for i in range(10000): + next(p) + return p.solution() \ No newline at end of file