From 115543d20103a0ce1e9701298b34fa8955982b50 Mon Sep 17 00:00:00 2001 From: Brandon Rozek Date: Wed, 13 Feb 2019 00:36:23 -0500 Subject: [PATCH] Fixed parallel implementation of getting experiences by using a queue --- examples/acrobot.py | 22 ++++++++++++++-------- examples/pong.py | 15 +++++++++++---- rltorch/mp/EnvironmentEpisode.py | 9 ++++----- rltorch/mp/EnvironmentRun.py | 9 ++++----- 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/examples/acrobot.py b/examples/acrobot.py index e4cbaed..8b33cc1 100644 --- a/examples/acrobot.py +++ b/examples/acrobot.py @@ -9,14 +9,14 @@ import rltorch.memory as M import rltorch.env as E from rltorch.action_selector import ArgMaxSelector from tensorboardX import SummaryWriter - +import torch.multiprocessing as mp class Value(nn.Module): def __init__(self, state_size, action_size): super(Value, self).__init__() self.state_size = state_size self.action_size = action_size - + self.fc1 = rn.NoisyLinear(state_size, 64) self.fc_norm = nn.LayerNorm(64) @@ -28,7 +28,6 @@ class Value(nn.Module): self.advantage_fc_norm = nn.LayerNorm(64) self.advantage = rn.NoisyLinear(64, action_size) - def forward(self, x): x = F.relu(self.fc_norm(self.fc1(x))) @@ -67,13 +66,16 @@ config['prioritized_replay_sampling_priority'] = 0.6 # Should ideally start from 0 and move your way to 1 to prevent overfitting config['prioritized_replay_weight_importance'] = rltorch.scheduler.ExponentialScheduler(initial_value = 0.4, end_value = 1, iterations = 5000) -def train(runner, agent, config, logwriter = None): +def train(runner, agent, config, logwriter = None, memory = None): finished = False episode_num = 1 + memory_queue = mp.Queue(maxsize = config['replay_skip'] + 1) while not finished: - runner.run(config['replay_skip'] + 1, printstat = runner.episode_num % config['print_stat_n_eps'] == 0) + runner.run(config['replay_skip'] + 1, printstat = runner.episode_num % config['print_stat_n_eps'] == 0, memory = memory_queue) agent.learn() runner.join() + for i in range(config['replay_skip'] + 1): + memory.append(*memory_queue.get()) # When the episode number changes, write out the weight histograms if logwriter is not None and episode_num < runner.episode_num: episode_num = runner.episode_num @@ -84,6 +86,7 @@ def train(runner, agent, config, logwriter = None): finished = runner.episode_num > config['total_training_episodes'] +torch.multiprocessing.set_sharing_strategy('file_system') # To not hit file descriptor memory limit # Setting up the environment rltorch.set_seed(config['seed']) print("Setting up environment...", end = " ") @@ -98,11 +101,14 @@ action_size = env.action_space.n logger = rltorch.log.Logger() logwriter = rltorch.log.LogWriter(logger, SummaryWriter()) + # Setting up the networks device = torch.device("cuda:0" if torch.cuda.is_available() and not config['disable_cuda'] else "cpu") net = rn.Network(Value(state_size, action_size), torch.optim.Adam, config, device = device, logger = logger, name = "DQN") target_net = rn.TargetNetwork(net, device = device) +net.model.share_memory() +target_net.model.share_memory() # Actor takes a net and uses it to produce actions from given states actor = ArgMaxSelector(net, action_size, device = device) @@ -111,14 +117,14 @@ memory = M.PrioritizedReplayMemory(capacity = config['memory_size'], alpha = con # memory = M.ReplayMemory(capacity = config['memory_size']) # Runner performs a certain number of steps in the environment -runner = rltorch.mp.EnvironmentRun(env, actor, config, memory = memory, logger = logger, name = "Training") +runner = rltorch.mp.EnvironmentRun(env, actor, config, logger = logger, name = "Training") runner.start() # Agent is what performs the training agent = rltorch.agents.DQNAgent(net, memory, config, target_net = target_net, logger = logger) print("Training...") -train(runner, agent, config, logwriter = logwriter) +train(runner, agent, config, logwriter = logwriter, memory = memory) # For profiling... # import cProfile @@ -132,4 +138,4 @@ print("Evaluating...") rltorch.env.simulateEnvEps(env, actor, config, total_episodes = config['total_evaluation_episodes'], logger = logger, name = "Evaluation") print("Evaulations Done.") -logwriter.close() # We don't need to write anything out to disk anymore \ No newline at end of file +logwriter.close() # We don't need to write anything out to disk anymore diff --git a/examples/pong.py b/examples/pong.py index 06e740a..a9b67e4 100644 --- a/examples/pong.py +++ b/examples/pong.py @@ -9,6 +9,7 @@ import rltorch.memory as M import rltorch.env as E from rltorch.action_selector import ArgMaxSelector from tensorboardX import SummaryWriter +import torch.multiprocessing as mp class Value(nn.Module): def __init__(self, state_size, action_size): @@ -87,13 +88,16 @@ config['prioritized_replay_sampling_priority'] = 0.6 # Should ideally start from 0 and move your way to 1 to prevent overfitting config['prioritized_replay_weight_importance'] = rltorch.scheduler.ExponentialScheduler(initial_value = 0.4, end_value = 1, iterations = 5000) -def train(runner, agent, config, logwriter = None): +def train(runner, agent, config, logwriter = None, memory = None): finished = False episode_num = 1 + memory_queue = mp.Queue(maxsize = config['replay_skip'] + 1) while not finished: - runner.run(config['replay_skip'] + 1, printstat = runner.episode_num % config['print_stat_n_eps'] == 0) + runner.run(config['replay_skip'] + 1, printstat = runner.episode_num % config['print_stat_n_eps'] == 0, memory = memory_queue) agent.learn() runner.join() + for i in range(config['replay_skip'] + 1): + memory.append(*memory_queue.get()) # When the episode number changes, write out the weight histograms if logwriter is not None and episode_num < runner.episode_num: episode_num = runner.episode_num @@ -104,6 +108,7 @@ def train(runner, agent, config, logwriter = None): finished = runner.episode_num > config['total_training_episodes'] +torch.multiprocessing.set_sharing_strategy('file_system') # To not hit file descriptor memory limit rltorch.set_seed(config['seed']) print("Setting up environment...", end = " ") env = E.FrameStack(E.TorchWrap( @@ -125,6 +130,8 @@ device = torch.device("cuda:0" if torch.cuda.is_available() and not config['disa net = rn.Network(Value(state_size, action_size), torch.optim.Adam, config, device = device, logger = logger, name = "DQN") target_net = rn.TargetNetwork(net, device = device) +net.model.share_memory() +target_net.model.share_memory() # Actor takes a network and uses it to produce actions from given states actor = ArgMaxSelector(net, action_size, device = device) @@ -132,14 +139,14 @@ actor = ArgMaxSelector(net, action_size, device = device) memory = M.PrioritizedReplayMemory(capacity = config['memory_size'], alpha = config['prioritized_replay_sampling_priority']) # Runner performs a certain number of steps in the environment -runner = rltorch.mp.EnvironmentRun(env, actor, config, memory = memory, logger = logger, name = "Training") +runner = rltorch.mp.EnvironmentRun(env, actor, config, logger = logger, name = "Training") runner.start() # Agent is what performs the training agent = rltorch.agents.DQNAgent(net, memory, config, target_net = target_net, logger = logger) print("Training...") -train(runner, agent, config, logwriter = logwriter) +train(runner, agent, config, logwriter = logwriter, memory = memory) # For profiling... # import cProfile diff --git a/rltorch/mp/EnvironmentEpisode.py b/rltorch/mp/EnvironmentEpisode.py index a0f03f2..7be789b 100644 --- a/rltorch/mp/EnvironmentEpisode.py +++ b/rltorch/mp/EnvironmentEpisode.py @@ -2,17 +2,16 @@ from copy import deepcopy import torch.multiprocessing as mp class EnvironmentEpisode(mp.Process): - def __init__(self, env, actor, config, memory = None, logger = None, name = ""): + def __init__(self, env, actor, config, logger = None, name = ""): super(EnvironmentEpisode, self).__init__() self.env = env self.actor = actor - self.memory = memory self.config = deepcopy(config) self.logger = logger self.name = name self.episode_num = 1 - def run(self, printstat = False): + def run(self, printstat = False, memory = None): state = self.env.reset() done = False episode_reward = 0 @@ -21,8 +20,8 @@ class EnvironmentEpisode(mp.Process): next_state, reward, done, _ = self.env.step(action) episode_reward = episode_reward + reward - if self.memory is not None: - self.memory.append(state, action, reward, next_state, done) + if memory is not None: + memory.put((state, action, reward, next_state, done)) state = next_state if printstat: diff --git a/rltorch/mp/EnvironmentRun.py b/rltorch/mp/EnvironmentRun.py index b73f8a2..cb091c3 100644 --- a/rltorch/mp/EnvironmentRun.py +++ b/rltorch/mp/EnvironmentRun.py @@ -2,11 +2,10 @@ from copy import deepcopy import torch.multiprocessing as mp class EnvironmentRun(mp.Process): - def __init__(self, env, actor, config, memory = None, logger = None, name = ""): + def __init__(self, env, actor, config, logger = None, name = ""): super(EnvironmentRun, self).__init__() self.env = env self.actor = actor - self.memory = memory self.config = deepcopy(config) self.logger = logger self.name = name @@ -14,15 +13,15 @@ class EnvironmentRun(mp.Process): self.episode_reward = 0 self.last_state = env.reset() - def run(self, iterations = 1, printstat = False): + def run(self, iterations = 1, printstat = False, memory = None): state = self.last_state for _ in range(iterations): action = self.actor.act(state) next_state, reward, done, _ = self.env.step(action) self.episode_reward = self.episode_reward + reward - if self.memory is not None: - self.memory.append(state, action, reward, next_state, done) + if memory is not None: + memory.put((state, action, reward, next_state, done)) state = next_state if done: