Main Content

fetchNext

Retrieve next available unread outputs from a reinforcement learning environment simulations running on workers

Since R2022a

    Description

    example

    [idx,out] = fetchNext(F) blocks the command prompt and waits for an unread element of F (which corresponds to a simulation scheduled on a worker) to reach a finished state. It then returns the index idx of the simulation that finished and the corresponding output out, which is a scalar structure consistent with the output of runEpisode.

    example

    [idx,out] = fetchNext(F,timeout) waits for a maximum of timeout seconds for a result to become available. If no simulation is finished after timeout seconds, then both outputs are returned as empty arrays.

    Examples

    collapse all

    This example shows how to use fetchNext to retrieve the next available unread result from reinforcement learning environment simulations running on workers.

    Load a predefined environment and a suitable agent. For this example use both the environment and agent described in Train MBPO Agent to Balance Cart-Pole System.

    env = rlPredefinedEnv("CartPole-Continuous");
    load("MATLABCartpoleMBPO.mat","agent");

    Start a parallel pool and set up the environment so that it simulates on workers.

    pp = parpool(2);
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to parallel pool with 2 workers.
    
    setup(env,UseParallel=true);

    To record the completion time of each simulation, start a timer.

    tic

    Initialize two vectors to store the average reward values and the completion time for each simulation, respectively.

    avr = zeros(6,1); 
    sct = zeros(6,1);

    Schedule six simulations to run on the available workers. At the beginning of the simulation, the reset function of the environment sets the initial angle of the pole randomly around zero (the upward position), thereby ensuring that each simulation is unique.

    for i=1:6  
        ftr(i) = runEpisode(env,agent,CleanupPostSim=false); 
    end

    Use fetchNext in a loop to retrieve results.

    while ~all([ftr.Read] == true)
    
        % Wait until an output is available then retrieve it 
        [i,out] = fetchNext(ftr);
    
        % Store the simulation completion time
        sct(i) = toc;
    
        % Store the average reward value
        avr(i) = mean([out.AgentData.Experiences.Reward]);
    
    end

    Plot average reward and timing for each simulation.

    figure
    
    subplot(2,1,1);
    plot(sct)
    title('Simulation completion times (seconds)');
    
    subplot(2,1,2)
    plot(avr)
    title('Average reward value');
    
    xlabel('Simulation number');

    Figure contains 2 axes objects. Axes object 1 with title Simulation completion times (seconds) contains an object of type line. Axes object 2 with title Average reward value, xlabel Simulation number contains an object of type line.

    As expected, simulations run in parallel (and therefore terminate at about the same time) in groups of two.

    Clear the array of Future objects, the environment, and delete the parallel pool (this is the reverse order in which they were created).

    clear ftr
    clear env
    delete(pp)

    This example shows how to use Future objects and their methods fetchNext, fetchOutput, cancel, and wait to defer output retrieval for environment simulations running on workers, monitor the status of ongoing simulations, fetch outputs of completed simulations, cancel ongoing simulations, or wait for ongoing simulations to complete.

    Load a predefined environment and a suitable agent. For this example use both the environment and agent described in Train AC Agent to Balance Cart-Pole System.

    env = rlPredefinedEnv("CartPole-Discrete");
    load("MATLABCartpoleAC.mat","agent")

    Start a parallel pool and set up the environment so that it simulates on workers.

    pp = parpool(2);
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to parallel pool with 2 workers.
    
    setup(env,UseParallel=true);

    To display the simulation completion times, start a timer.

    tic

    Schedule six simulation to run on the available workers. At the beginning of the simulation, the reset function of the cart-pole environment sets the initial angle of the pole to a random position in the neighborhood of zero (the upward position). This randomization ensures that each simulation is different.

    for i=1:6  
        ftr(i) = runEpisode(env,agent,CleanupPostSim=false); 
    end

    Each element of the Future array ftr represents a scheduled simulation.

    ftr
    ftr=1×6 object
      1×6 Future array with properties:
    
        Read
        State
        Diary
        ID
    
    

    Display the state of each simulation.

    ftr.State
    ans = 
    'running'
    
    ans = 
    'running'
    
    ans = 
    'queued'
    
    ans = 
    'queued'
    
    ans = 
    'queued'
    
    ans = 
    'queued'
    

    Two simulations are ongoing while the others are queued.

    Use fetchNext with a timeout of 0.1 seconds to retrieve results for simulations that complete within that time (if any).

    [idx,out] = fetchNext(ftr,0.1)
    idx =
    
         []
    
    
    out =
    
         []
    

    Both the outputs are empty, which means that none of the four simulation has completed yet.

    Display how many output results have been already retrieved.

    ftr.Read
    ans = logical
       0
    
    
    ans = logical
       0
    
    
    ans = logical
       0
    
    
    ans = logical
       0
    
    
    ans = logical
       0
    
    
    ans = logical
       0
    
    

    Use fetchNext without any timeout to wait until an unretrieved simulation output becomes available and then return the results.

    [idx,out] = fetchNext(ftr)
    idx = 2
    
    out = struct with fields:
        SimulationInfo: [1×1 struct]
             AgentData: [1×1 struct]
    
    

    Display the state of the simulations.

    ftr.State
    ans = 
    'finished'
    
    ans = 
    'finished'
    
    ans = 
    'running'
    
    ans = 
    'running'
    
    ans = 
    'queued'
    
    ans = 
    'queued'
    

    As expected, the first two simulations, which were running in parallel on the two workers, are finished, while the next two, which were previously queued, are now running, and the final two are still queued.

    Display the time taken for the first two simulations to complete.

    toc
    Elapsed time is 10.451231 seconds.
    

    Note that once the results from a simulation has been already retrieved, any attempt to use fetchNext to retrieve it again, such as in fetchNext(ftr(2)), will result in an error. To retrieve the results from a Future object that has already been read, you can use fetchOuptuts, such as in fetchOutputs(ftr(2)).

    Retrieve the next available result, and display the time elapsed since the simulations started.

    [idx,out] = fetchNext(ftr)
    idx = 1
    
    out = struct with fields:
        SimulationInfo: [1×1 struct]
             AgentData: [1×1 struct]
    
    
    toc
    Elapsed time is 11.945070 seconds.
    

    As expected, fetchNext promptly returns the results from the second simulation, since it was already available.

    Display how many output results have been already retrieved.

    ftr.Read
    ans = logical
       1
    
    
    ans = logical
       1
    
    
    ans = logical
       0
    
    
    ans = logical
       0
    
    
    ans = logical
       0
    
    
    ans = logical
       0
    
    

    Cancel the last simulation.

    cancel(ftr(6))

    Wait for the fourth simulation to complete. The wait function blocks the command prompt until the fourth simulation is completed.

    wait(ftr(4))

    Display the elapsed time since the simulations started.

    toc
    Elapsed time is 12.414076 seconds.
    

    Display the state of the simulations.

    ftr.State
    ans = 
    'finished'
    
    ans = 
    'finished'
    
    ans = 
    'finished'
    
    ans = 
    'finished'
    
    ans = 
    'running'
    
    ans = 
    'finished'
    

    The status of the last element of the array, for which the simulation has been canceled, is classified as 'finished'.

    Since any attempt to retrieve results from a simulation that has been canceled will result in an error, remove the canceled object from the array.

    ftr(6)=[]
    ftr=1×5 object
      1×5 Future array with properties:
    
        Read
        State
        Diary
        ID
    
    

    Use fetchOutputs to wait until all remaining simulations are completed and then retrieve all outputs.

    outs = fetchOutputs(ftr)
    outs=5×1 struct array with fields:
        SimulationInfo
        AgentData
    
    

    Display the elapsed time.

    toc
    Elapsed time is 16.265069 seconds.
    

    Plot the action and observations from the fifth simulation.

    figure
    
    subplot(2,1,1);
    plot(outs(5).AgentData.Time(2:end), ...
         cell2mat([outs(5).AgentData.Experiences.Action]))
    title('Simulation #5: action');
    xlabel('time');
    
    subplot(2,1,2)
    plot(outs(5).AgentData.Time(2:end), ...
         cell2mat([outs(5).AgentData.Experiences.Observation]))
    title('Simulation #5: observations')
    xlabel('time');

    Figure contains 2 axes objects. Axes object 1 with title Simulation #5: action contains an object of type line. Axes object 2 with title Simulation #5: observations contains 4 objects of type line.

    Clear the array of Future objects, the environment, and delete the parallel pool (this is the reverse order in which they were created).

    clear ftr
    clear env
    delete(pp)

    Input Arguments

    collapse all

    Future simulation outputs, specified as a Future objects or as an array of Future objects. To create an element of F, set the UseParallel property of a reinforcement learning environment to true, and then use runEpisode to simulate an agent or a policy within this environment. Assign the element of F to the output of runEpisode.

    Note

    fetchNext can only retrieve results from elements of F that have not been read before. Any attempt to retrieve results from an element that has its Read property set to true will result in an error.

    Maximum number of seconds to wait for a result to become available, specified as a positive scalar.

    Example: 5

    Output Arguments

    collapse all

    Index of the finished simulation that has its output out returned by fetchNext, returned as a positive integer scalar.

    Output returned from the finished simulation indicated by idx, returned as a structure with the fields AgentData and SimulationInfo, as described in the outputs section of runEpisode.

    Version History

    Introduced in R2022a