# How to create a custom Reinforcement Learning Environment in Gymnasium with Ray

Hello everyone today we are going to discuss how to create a custom Reinforcement Learning Environment (RL) with Ray , Pygame and Gymnasium.

Ray is a high-performance distributed execution framework targeted at large-scale machine learning and reinforcement learning applications. It achieves scalability and fault tolerance by abstracting the control state of the system in a global control store and keeping all other components stateless.

We are interested to understand the building blocks tha allow us create any custom environment applied to any software that we want to create by using the strategy of RL.

I will show several codes as an example to visualize the idea of the RL algorithm in practice.

There are at least 3 essential components that is needed to understand:

1. The environment. Here we define the rules of the game. Are the constrains that we have to put in the class.
2. Agent. Is the worker or the entity that will play the previous environment by doing an action. Given an observation you got an state which the agent should decide what todo by performing an action. Once the action is performed you got a reward defined in the environment.
3. RL model. Once we got the the final state obtained by the environment, it is needed to remember the state or observation that you got with their respective reward. The manner to memorize this information is given by any standard RL model.

In order to play with our environments I will use Pygame which is a cross-platform set of Python modules designed for writing video games. It includes computer graphics and sound libraries designed to be used with the Python programming language.

Let us check some of the essential components said before.

## Step 1. Creation of Python environment

I will create an environment called gym, because we are interested in the Gymnasium library.

Gymnasium is an open source Python library for developing and comparing reinforcement learning algorithms by providing a standard API to communicate between learning algorithms and environments, as well as a standard set of environments compliant with that API.

First you need to install anaconda at this link

conda update --all

conda config --append channels conda-forge


then after is installed type in your terminal

conda create -n gym python==3.8 gym==0.18.0


then

conda activate gym


# Creation of the project

and then we clone our repository

git clone https://github.com/ruslanmv/How-to-create-custom-Reinforcement-Learning-environment.git

cd How-to-create-custom-Reinforcement-Learning-environment


then we install the following libraries

pip install -r requirements.txt


or

pip install torch==1.7.1  install extras_require keras   tensorflow  keras-rl2  opencv-python seaborn scipy tensorflow_probability torch lz4 gputil ray[rllib] pandas pygame matplotlib dm_tree


then in your terminal type the following commands:

conda install ipykernel


then we install

python -m ipykernel install --user --name gym --display-name "Python (Gym)"

pip install -e snake


then we type

jupyter notebook


and we choose our Python (Gym) notebook

If you want to uninstall your environment

conda env remove -n gym


and by typing

jupyter kernelspec list


to get the paths of all your kernels. Then simply uninstall your unwanted-kernel

jupyter kernelspec uninstall gym


# Creating an Open AI Gym Environment

Once is loaded the Python (Gym) kernel you can open the example notebooks.

The first notebook, is simple the game where we want to develop the appropriate environment

0-Custom-Snake-Game.ipynb

The second notebook is an example about how to initialize the custom environment,

snake_env.py

where it has the structure

in our case

1-Creating-a-Gym-Environment.ipynb

and finally the third notebook is simply an application of the Gym Environment into a RL model.

2-Applying-a-Custom-Environment.ipyn

# Custom enviroment game

We are interested to build a program that will find the best desktop .

The first program is the game where will be developed the environment of gym.

12_many_office_detection.py

After is cloned this repo , open with in Visual Studio Code

code .


you can open the program

How-to-create-custom-Reinforcement-Learning-environment\custom_game\12_many_office_detection.py


make sure you have the Gym environment

run this program by pressing ctrl+F5

and move with the arrows the desired desktop.

# 3 - RL Environment

In this part we are going to build the most essential Environment to create a RL Pipeline.

The first framework that we are going to use is the RAY

We are going to pass either a string name or a Python class to specify an environment. In particular we are going to choose the simplest local enviroment.

Custom env classes passed directly to the algorithm must take a single env_config parameter in their constructor:

### Example 1 - Gym + Ray

import gym, ray
from gym import spaces
from ray.rllib.algorithms import ppo

class MyEnv(gym.Env):
def __init__(self, env_config=None):
# There are two actions, first will get reward of 1, second reward of -1.
self.action_space = spaces.Discrete(5)      #<gym.Space>
self.observation_space = spaces.Discrete(2) #<gym.Space>

def reset(self):
state = 0
#return <obs>
return state

def step(self, action):

# if we took an action, we were in state 1
state = 1

if action == 2:
reward = 1
else:
reward = -1

# regardless of the action, game is done after a single step
done = True

info = {}
# return <obs>, <reward: float>, <done: bool>, <info: dict>
return state, reward, done, info

ray.shutdown()
ray.init()
algo = ppo.PPO(env=MyEnv,config={"num_workers": 4})

mean_ppo = []
for _ in range(5):

result = algo.train()
print("episode reward mean:", _, result['episode_reward_mean'])
mean_ppo.append(result['episode_reward_mean'])

episode reward mean: 0 -0.592
episode reward mean: 1 -0.1035
episode reward mean: 2 0.1895
episode reward mean: 3 0.591
episode reward mean: 4 0.7615

import matplotlib.pyplot as plt

xs = [x for x in range(len(mean_ppo))]

plt.plot(xs, mean_ppo)
plt.show()


from ray.rllib.algorithms.ppo import PPOConfig
config = PPOConfig()
config = config.training(gamma=0.9, lr=0.01, kl_coeff=0.3)
config = config.resources(num_gpus=0)
config = config.rollouts(num_rollout_workers=4)
algo = ppo.PPO(env=MyEnv, config=config)

mean_ppo = []
for _ in range(10):

result = algo.train()
print("episode reward mean:", _, result['episode_reward_mean'])
mean_ppo.append(result['episode_reward_mean'])

episode reward mean: 0 -0.6125
episode reward mean: 1 -0.133
episode reward mean: 2 0.1735
episode reward mean: 3 0.533
episode reward mean: 4 0.705
episode reward mean: 5 0.7445
episode reward mean: 6 0.764
episode reward mean: 7 0.8955
episode reward mean: 8 0.919
episode reward mean: 9 0.9695

import matplotlib.pyplot as plt
xs = [x for x in range(len(mean_ppo))]
plt.plot(xs, mean_ppo)
plt.show()


### How to use the trained algorithm in RL with PP0

trainer=algo

checkpoint = trainer.save()

print(checkpoint)

#TODO Fix the windows path
#evaluation = trainer.evaluate(checkpoint)


## Computing actions

env = MyEnv()

done = False
total_reward = 0

observations = env.reset()

print(observations) # The state which you should determine the action


Given any state compute the action which you get the maximum reward in according to the traning

action = trainer.compute_single_action(observations)

action

while not done:
action = trainer.compute_single_action(observations)
observations, reward, done, info = env.step(action)
total_reward += reward
print("observations, reward, done, info",observations, reward, done, info)


action = trainer.compute_actions({"obs_1": observations, "obs_2": observations})
print(action)
# {'obs_1': 0, 'ob


### Example 2 - Gym + Ray + Pygame

In order to create an appropiate gym enviroment applied to ray and pygame we need need to pay attention into two gym objects:

– action (object): The action to be performed in the step() function. In a game of chess, the action would be the specific, legal move performed by a player.

–observation (object): This is all the information available to the agent to choose the next action. The observation is based only on the current state of the environment.

## Observation_space and Action_space

In particular the observation_space and action_space: there are only certain actions and observations are valid in regards of a specific environment.

To define a format, the observation_space and action_space variables need to be assigned to a respective gym.space class.

Spaces can differ in their dimensionality and their value range. Continuous and discrete spaces are both possible.

The structure of the init in the class inside the environment is

self.observation_space = <gym.space>
self.action_space = <gym.space>


We are going to consider an enviroment where there are two points, one red and one blu. The purpose of the game is give a blue point ( worker) where intercept the red point (target)

## Definition of action space

We want to control the position of the blue point.

So the action is the position, the action are the coordinates that you provides to the enviroment

action =[x, y]

The value of each coordinate are continous and must be in the range of the size of the horizontal box

gym.spaces.Box(low=min_x., high=max_x., shape=(2,), dtype=np.float32)

## Analysis of Spaces

Before we continue le us check some examples of spaces in order to understand how it works.

## Box

Box - Supports continuous (and discrete) vectors or matrices, used for vector observations, images, etc

from gym.spaces import Box, Discrete,MultiBinary , MultiDiscrete
import numpy as np

# Example 1
observation_space = Box(low=-1.0, high=2.0, shape=(3, 4), dtype=np.float32)
print(observation_space.sample().shape)
observation_space.sample()

#Example 2
observation_space = Box(low=np.array([-1.0, -2.0]), high=np.array([2.0, 4.0]), dtype=np.float32)
print(observation_space.sample().shape)
observation_space.sample()

#Example 2
observation_space = Box(low=np.array(-1.0), high=np.array(2.0), dtype=np.float32)
print(observation_space.sample().shape)
observation_space.sample()

# Example 1
observation_space = Box(low=0, high=200, shape=(2,), dtype=np.float32)
print(observation_space.sample().shape)
observation_space.sample()

print(type(observation_space.sample()))


## Discrete

#Example 3
observation_space =Discrete(2)            # {0, 1}
observation_space.sample()

#Example 4
observation_space =Discrete(3)  # {0, 1, 2}
observation_space.sample()


## MultiBinary

# Example 5
observation_space = MultiBinary(5)
print(observation_space.sample().shape)
observation_space.sample()

# Example 5
observation_space = MultiBinary(2)
print(observation_space.sample().shape)
observation_space.sample()


## MultiDiscrete

# Example 6
observation_space =  MultiDiscrete(np.array([[1, 2], [3, 4]]))
print(observation_space.sample().shape)
observation_space.sample()


## Text

# Example 6a
observation_space =Text(5)
# {"0", "42", "0123456789", ...}
observation_space.sample()

# Example 6b
import string
observation_space = Text(min_length = 1,
max_length = 10,
charset = string.digits)
observation_space.sample()


## Dict

Elements of this space are (ordered) dictionaries of elements from the constituent spaces

# Example 7
from gym.spaces import Dict, Discrete
observation_space = Dict({"position": Discrete(2), "velocity": Discrete(3)})
observation_space.sample()

#Example 8 [nested]:
from gym.spaces import Box, Dict, Discrete, MultiBinary, MultiDiscrete
observation_space =Dict(
{
"ext_controller": MultiDiscrete([5, 2, 2]),
"inner_state": Dict(
{
"charge": Discrete(100),
"system_checks": MultiBinary(10),
"job_status": Dict(
{
"progress": Box(low=0, high=100, shape=()),
}
),
}
),
}
)

type(observation_space.sample())

observation_space.sample()

# Example 9
from gym.spaces import Box, Discrete
observation_space = Dict({"position": Box(-1, 1, shape=(2,)), "color": Discrete(3)})

observation_space.sample()


# Tuple

# Example 10
from gym.spaces import Box, Discrete, Tuple
observation_space = Tuple((Discrete(2), Box(-1, 1, shape=(2,))))
observation_space.sample()


# Sequence

# Example 11
from gym.spaces import Sequence
space = Sequence(Box(0, 1))
space.sample()


for futher information visit https://gymnasium.farama.org/api/spaces/composite/#gymnasium.spaces.Dict

# Summary

Discrete - Supports a single discrete number of values with an optional start for the values

MultiDiscrete - Supports single or matrices of binary values, used for holding down a button or if an agent has an object

MultiBinary - Supports multiple discrete values with multiple axes, used for controller actions

Text - Supports strings, used for passing agent messages, mission details, etc

Composite Spaces Often environment spaces require joining fundamental spaces together for vectorised environments, separate agents or readability of the space.

Dict - Supports a dictionary of keys and subspaces, used for a fixed number of unordered spaces

Tuple - Supports a tuple of subspaces, used for multiple for a fixed number of ordered spaces

Sequence - Supports a variable number of instances of a single subspace, used for entities spaces or selecting a variable number of actions

Graph - Supports graph based actions or observations with discrete or continuous nodes and edge values.

Example: if we want to build an observation of a PNG image , you can use the follow:

# The action and observation spaces need to be gym.spaces objects:
self.action_space = Discrete(4)  # up, left, right, down
# Here's an observation space for 200 wide x 100 high RGB image inputs:
self.observation_space = Box(
low=0, high=255, shape=(100, 200, 3), dtype=np.uint8)


The standard structure of the ray enviroment https://docs.ray.io/en/latest/rllib/rllib-env.html should be:

import gym, ray
from ray.rllib.algorithms import ppo

class MyEnv(gym.Env):
def __init__(self, env_config):
self.action_space = <gym.Space>
self.observation_space = <gym.Space>
def reset(self):
return <obs>
def step(self, action):
return <obs>, <reward: float>, <done: bool>, <info: dict>

ray.init()
algo = ppo.PPO(env=MyEnv, config={
"env_config": {},  # config to pass to env class
})

while True:
print(algo.train())


## Reinforcement Learning Algorithms in Ray

There are different algorithms that can be used in Ray among them we choose the PP0

More Algorithms

### Proximal Policy Optimization (PPO)

PPO’s clipped objective supports multiple SGD passes over the same batch of experiences. RLlib’s multi-GPU optimizer pins that data in GPU memory to avoid unnecessary transfers from host memory, substantially improving performance over a naive implementation. PPO scales out using multiple workers for experience collection, and also to multiple GPUs for SGD.

PPO architecture

more details

# Custom Example of Environment with Pygame Ray and Gym

import gym
from gym import spaces
import numpy as np
import pygame
from pygame import display
from pygame.surfarray import array3d
import random

BLACK = pygame.Color(0, 0, 0)
WHITE = pygame.Color(255, 255, 255)
RED = pygame.Color(255, 0, 0)
GREEN = pygame.Color(0, 255, 0)
BLUE = pygame.Color(0, 0, 255)
worker_pos=[0,0]

resolution=50

class MyEnv(gym.Env):

def __init__(self, env_config):

# We inizialize the display
self.frame_size_x = resolution
self.frame_size_y = resolution
self.game_window = pygame.display.set_mode((self.frame_size_x, self.frame_size_y))

# Define action and observation space
# They must be gym.spaces objects

# ------------------ACTION SPACE----------------------------------
# The action space are the possible actions that is allowed by the worker
# previously was a simple number
# self.action_space = spaces.Discrete(1)
# Now in this example a simple action is a single coordinates (x,y)
self.action_space = gym.spaces.Box(low=0, high=resolution, shape=(2,), dtype=np.int32)#dtype=np.float32)

# ------------------OBSERVATION SPACE-------------------------------
# Is what the state  is observed
#Previously was a single number discrete
#self.observation_space = gym.spaces.Discrete(2)
# For this example we choose a numpy array   shape=(2,) or an image
# Attention!!! The observation space should have the same shape that the reset
# In order to perform the RL training otherwise will fail!!

self.observation_space = spaces.Box(low=0, high=resolution,
shape=(2,), dtype=np.int32) #np.float32)

# Initial conditions

self.game_window.fill(WHITE)
# Moreover we add a position in the screen display

# Initial worker position
self.worker_pos=[0,0]

#We assing a kind of gym object to a circle
self.worker_rect=pygame.draw.circle(self.game_window,BLUE,(self.worker_pos[0], self.worker_pos[1]),6) # DRAW CIRCLE

# Initial target position

self.target_pos = [int(resolution/2), int(resolution/2)]
print('Initial target position',self.target_pos[0],self.target_pos[1])
self.target_rect=pygame.draw.circle(self.game_window,RED,(self.target_pos[0], self.target_pos[1]),6) # DRAW CIRCLE
self.steps = 0

def reward_value(self,worker,target):
#Check for collision between two rects
if worker.colliderect(target):
#print("worker, target",worker, target)
reward=1.0
done=True
else:
reward=-1.0
done=False
return reward

def step(self, action):
#reward = 0.0

self.worker_pos = action
#print('worker_pos:'self.worker_pos)

# We update the state with an image ( in other words plot the points due to the action)
self.update_game_state()

#print(self.worker_rect,self.target_rect)

reward= self.reward_value(self.worker_rect,self.target_rect)

# regardless of the action, game is done after step becomes true
reward_tmp, done = self.game_over(reward)

#self.reward=self.reward+reward_tmp
self.reward=reward_tmp
info = {}

#Accumulative reward
#print('Reward in step:',self.steps,self.reward)

# -----Under the assuption that we deal with an image---
#img = self.get_image_array_from_game()
#state=img
#--------------------------------------------------------
self.state=[self.target_pos[0], self.target_pos[1]]

#print('step:', self.steps)
self.steps += 1

observation = np.array(self.state, dtype=np.int32) #float32)
return observation, self.reward, done, info

def worker_step(self,event):
'''
Takes human keyboard event and then returns it as an action string
'''
action = None
if event.type == pygame.QUIT:
pygame.quit()
sys.exit()

#Move based on mouse clicks
if event.type == pygame.MOUSEBUTTONDOWN:
#print(event)
mouse_x = event.pos[0]
mouse_y = event.pos[1]
self.worker_pos[0]=mouse_x
self.worker_pos[1]=mouse_y
action = [self.worker_pos[0], self.worker_pos[1]]
#Drag the object when the mouse button is clicked

if event.type == pygame.MOUSEMOTION and event.buttons[0] == 1:
#print(event)
mouse_x = event.pos[0]
mouse_y = event.pos[1]
self.worker_pos[0]=mouse_x
self.worker_pos[1]=mouse_y
action = [self.worker_pos[0], self.worker_pos[1]]

elif event.type == pygame.KEYDOWN:
# Esc -> Create event to quit the game
if event.key == pygame.K_ESCAPE:
pygame.event.post(pygame.event.Event(pygame.QUIT))

return  action

def update_game_state(self):

#We fill the screen to white
self.game_window.fill(WHITE)

#Draw rectangles to represent the rect's of each object
# For the worker

self.worker_rect.x=self.worker_pos[0]
self.worker_rect.y=self.worker_pos[1]
self.worker_rect=pygame.draw.circle(self.game_window,BLUE,(self.worker_rect.x,self.worker_rect.y),6) # DRAW CIRCLE

# For the target
self.target_rect.x=self.target_pos[0]
self.target_rect.y=self.target_pos[1]
pygame.draw.circle(self.game_window,RED,(self.target_rect.x,self.target_rect.y),6) # DRAW CIRCLE

def get_image_array_from_game(self):
img = array3d(display.get_surface())
#Preprocessing of channels ( needed for tensorflow)
img = np.swapaxes(img, 0, 1)
return img

def reset(self):

# Type 1 Observation
#print('Target position',self.target_pos[0], self.target_pos[1])
#observation = np.array([self.target_pos[0], self.target_pos[1]])

#Type 2 Observation
#We create a simple observation state from a sample
#self.state = self.observation_space.sample()
#observation = np.array(self.state, dtype=np.int32) #float32)

#Type 2 From a random sit
#sit_random = np.random.randint(1, 9)
#self.state['x_position'][sit_random:] = 0
#self.state['y_position'][sit_random:] = 0
#observation = self.state

#Type 4 From a target
self.state=[self.target_pos[0], self.target_pos[1]]
observation = np.array(self.state, dtype=np.int32) #float32)

# Type 3 Observation
#img = array3d(display.get_surface())
#img = np.swapaxes(img, 0, 1)
#observation=img

self.steps = 0
self.reward = 0

#print("Game Reset.")
#print('observation',observation)
return observation

def render(self, mode='human'):
if mode == "human":
display.update()
def close(self):
pass

def game_over(self, reward):
if (reward < 1) or (self.steps >= 1000):
return -1, False
else:
return reward, True

pygame 2.1.2 (SDL 2.0.18, Python 3.8.0)
Hello from the pygame community. https://www.pygame.org/contribute.html


# Testing Environment

# method 1 - use local test class
# Testing local frame
env = MyEnv(env_config={})

Initial target position 25 25

test=env.reset()

test.shape

(2,)

action = env.action_space.sample()
print('action',action)

action [23 38]

#action=[100,100]
state, reward, done, info = env.step(action)
print("Reward = {} with action = {}".format(reward,action))
import matplotlib.pyplot as plt
print(reward, done, info)
#state = np.array(state)

print(state,type(state))
#plt.figure()
#plt.imshow(state)

Reward = 1.0 with action = [23 38]
1.0 True {}
[25 25] <class 'numpy.ndarray'>

# Testing multiple frames
import sys
env = MyEnv(env_config={})
env.reset()
# This is technically a FPS Refresh rate
FPS = 10
# FPS (frames per second) controller
fps_controller = pygame.time.Clock()
# Checks for errors encountered
check_errors = pygame.init()
# Initialise game window
pygame.display.set_caption('Testing Game')
#The main game loop
running = True
while running:
# Check Input from Human Step
for event in pygame.event.get():
action = env.worker_step(event)
if event.type == pygame.QUIT:
running = False
pygame.display.update()
if action != None :# (0,0) :# and reward!=0:
#print(action,type(action))
state, reward, done, info = env.step(action)
print("Reward = {} with action = {} , done = {}".format(reward,action,done))
# Refresh game screen
# Refresh rate
fps_controller.tick(FPS)
img = array3d(env.game_window)

Initial target position 25 25
Reward = -1 with action = [13, 17] , done = False
Reward = -1 with action = [32, 14] , done = False
Reward = 1.0 with action = [39, 33] , done = True
Reward = 1.0 with action = [27, 40] , done = True
Reward = -1 with action = [16, 39] , done = False
Reward = -1 with action = [17, 9] , done = False
Reward = -1 with action = [43, 14] , done = False
Reward = 1.0 with action = [24, 24] , done = True


# Ray testing

config={"env_config": {},}  # config to pass to env class

import gym, ray
from ray.rllib.algorithms import ppo
ray.shutdown()
ray.init()
algo = ppo.PPO(env=MyEnv, config=config
)

mean_ppo = []
for _ in range(25):
result = algo.train()
##print("episode reward mean:", _, result['episode_reward_mean'])
mean_ppo.append(result['episode_reward_mean'])

import matplotlib.pyplot as plt

xs = [x for x in range(len(mean_ppo))]

plt.plot(xs, mean_ppo)
plt.show()



## Computing actions

# Testing local frame
env = MyEnv(env_config={})

Initial target position 25 25

done = False
total_reward = 0

observations = env.reset()

print(observations) # The state which you should determine the action

[25 25]


Given any state compute the action which you get the maximum reward in according to the traning

action = algo.compute_single_action(observations)

action

array([37, 23])

while not done:
action = algo.compute_single_action(observations)
observations, reward, done, info = env.step(action)
total_reward += reward
print("observations, reward, done, info",observations, reward, done, info)


action = algo.compute_actions({"obs_1": observations, "obs_2": observations})
print(action)
# {'obs_1': 0, 'ob

{'obs_1': array([23, 40]), 'obs_2': array([25, 20])}


Congratulations! We have created several environments that were trained by using Reinforcement Learning using Ray.

Posted: