Sunday, April 14, 2024

FuturesOrdered: Keeping Concurrency in Order In Rust

The use case is as follows: You have some tasks you want to execute concurrently, but you want the results to be returned in the same order the tasks were defined to be executed.

As an example, let's say you have some records returned from the database, as specified by some query. But before returning the records, some transformation needs to be done, and these transformations can be done concurrently. Due to the nature of concurrency, the order in which the transformations will be done is nondeterministic, meaning the order of the transformed records can and will end up being different from the original order of the records as retrieved from the database. This is something you do not want; you want the transformed records to be in the same order as specified in the original query that retrieved them from the database.

This sounds like the perfect task for Rust's FuturesOrdered type from the futures crate.

Reading the documentation, it states:

"...it imposes a FIFO order on top of the set of futures. While futures in the set will race to completion in parallel, results will only be returned in the order their originating futures were added to the queue."

Just what we want.

Let us illustrate with some code.

The dependencies:
[dependencies]
futures = "0.3.30"
tokio = { version = "1.37.0", features = ["full"]}
rand = "0.8.5"
The code:
async fn transform(number: i32) -> i32 {
    let mut rng = rand::thread_rng();
    let random_number = rng.gen_range(1..=5);
    tokio::time::sleep(Duration::from_secs(random_number)).await;
    println!("Done from {number}");
    return number;
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let mut tasks = FuturesOrdered::new();
    for n in 0..9 {
        tasks.push_back(async move { transform(n).await })
    }
    let mut results = vec![];
    while let Some(result) = tasks.next().await {
        results.push(result)
    }
    println!("{results:?}");
    Ok(())
}

The transform function represents the concurrent task. It sleeps for a random amount of seconds between 1 and 5, prints it is done, and then returns the transformed value, which in this case is just the original value passed to the function.

Running the code above should give something like:

Done from 1
Done from 3
Done from 6
Done from 9
Done from 5
Done from 7
Done from 0
Done from 2
Done from 4
Done from 8
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

The Done from n shows that the tasks are finished in different ordering, but the final printing of the results is still in order, showing that even though the tasks get executed concurrently and finishes out of order, when the result is retrieved via calling `tasks.next().await` it is returned in other.

Another way is to collect the results into a vec. That is:
#[tokio::main]
async fn main() -> std::io::Result<()> {
    let mut tasks = FuturesOrdered::new();
    for n in 0..=9 {
        tasks.push_back(transform(n))
    }
    let results = tasks.collect::<Vec<i32>>().await;
    println!("{results:?}");
    Ok(())
}

With this, the tasks would also execute, concurrently, finish in nondeterministic order, but the results will be collected in order as the tasks were started.