開発

[rust] tokioで様々なasync/awaitの使い方を試してみる

前回のasync/awaitを使ったコードが非常に遅かったので様々な使い方を試してどうすれば早く処理できるか探ってみます。

tokioの基本動作

そもそも適当にasync/awaitなコードを書いたので根本のtokioのランタイム動作から学び直してみます。そこから得られた理解としては、単純にasyncな関数を書いてもその処理は別のスレッドが処理してくれるわけではない、ということ。

tokio::task::spawnなどを使用しないとでスケジューラーがFutureを別スレッドへ割り当ててくれないので効率的に処理してくれない、ということらしいです。

スレッドが配達員、Futureが荷物、tokioランタイムが配達員を管理するマネージャーだとすると、重量の異なるそれぞれの荷物を配達員が目的地まで運ぶことに例えることができるかもしれません。

マネージャーが配達員をアサインして配達員が担当する荷物を運ぶ。しかし、荷物は一人の配達員が終着まで運ぶのではなく途中交代される可能性がある。荷物の配達員の負荷を見てマネージャーがそのあたりを調整する。無理矢理シーケンス図にするとこんな感じだろうか。

spawnは荷物の配送を依頼するようなもの、と理解するとわかりやすいかもしれません。依頼しないとシングルスレッドで動かすことと同じになるのでパフォーマンスが上がらない、というのがざっくりとした理解です。

ということで、spawnを使用して多くのFutureをランタイムへ送り込めば上手い具合に処理してくれるであろうという理解の元、様々なコードを試してみます。ちなみにマルチスレッドプログラミングもやったことはないので手探りです。

再帰処理を使用しないパターン

async関数の中からasync関数を呼ぶとデッドロックしそうなので再帰処理をしないコードで書き直してみました。グローバル変数にディレクトリ情報を管理させることでループ処理にしています。これを基本として様々な書き方を試していきたいと思います。

use anyhow::Result; // 1.0.71
use clap::Parser; // 4.3.11

use tokio::fs; // 1.32.0
use tokio::sync::{Mutex, OnceCell};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
    /// target directory
    #[arg(short, long, default_value_t = String::from(".") )]
    dir: String,
}

static LIST_FROM_THD: tokio::sync::OnceCell<tokio::sync::Mutex<Vec<String>>> = OnceCell::const_new();

#[tokio::main]
async fn main() -> Result<()> {
    let args = Args::parse();

    let dir_string = args.dir.to_string();
    println!("{}", dir_string);

    let _ = LIST_FROM_THD.set(Mutex::new(Vec::<String>::new()));
    let mut dir_list;

    // Get lock and push dir
    {
        let mut lock = LIST_FROM_THD
            .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
            .await
            .lock()
            .await;
        lock.push(dir_string);
    }

    // Like a do-while
    while {
        // Get lock and take array as snapshot
        {
            let mut lock = LIST_FROM_THD
                .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
                .await
                .lock()
                .await;
            dir_list = lock.to_vec();
            lock.clear();
        }

        !dir_list.is_empty()
    } {
        let mut dir_list = dir_list.iter();
        while let Some(item) = dir_list.next() {
            let _ = get_dirs(item.to_string()).await?;
        }
    }

    Ok(())
}

async fn get_dirs(dir: String) -> Result<()> {
    let mut entries = fs::read_dir(dir).await?;

    // Folder list
    let mut dirs = Vec::new();

    while let Some(entry) = entries.next_entry().await? {
        let metadata = entry.metadata().await?;
        let path = entry.path();

        if metadata.is_dir() {
            let path = path.display().to_string();
            println!("{}", path);
            dirs.push(path);

            // continue;
        }

        if let Ok(symlink) = fs::read_link(&path).await {
            if path.is_dir() {
                println!("{}@ -> {}", path.display(), symlink.display());
            }
        }
    }

    // Get lock and add array
    {
        let mut lock = LIST_FROM_THD
            .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
            .await
            .lock()
            .await;
        lock.append(&mut dirs);
    }

    Ok(())
}

リファレンスとしてmultitimeで50回ループして計測してみます。ただ後述しますが、環境条件によって大きく値が変わる場合があるので参考としてみておいた方がよいです。

参考としてvCPUが4個の環境です。

ちなみにmtimeというRustポーティングがあるのですが動作は少し怪しかったです。

MeanStd.Dev.MinMedianMax
real34.5450.55634.07834.47638.239
user7.6450.6976.1957.639.068
sys38.6310.85437.21838.6142.437

ちなみにasync/awaitを使わないバージョンの場合は、

MeanStd.Dev.MinMedianMax
real0.6910.3180.6310.6432.912
user0.1190.0230.0760.1160.176
sys0.5470.1380.4550.5321.496

async/awaitバージョンは約60倍遅いですね。

関数内の処理を分割してspawnする

1つのブロックをspawn

最初のコードはget_dirs関数をまるごとasync関数にしてみましたが、get_dirsのループ処理は大きく2つのブロックに分かれるので、そのうちの一つをspawnすることで別スレッドでの処理を狙ってみたいと思います。

spawnする処理で共通する配列に書き込むのでArcでテンポラリの配列を共有することにします。

結果としては、

MeanStd.Dev.MinMedianMax
real36.1840.78834.63936.19538.428
user7.4030.7515.7887.4129.604
sys40.8451.32536.8940.76644.002

さらに若干遅くなりました。

use anyhow::Result; // 1.0.71
use clap::Parser; // 4.3.11

use std::sync::Arc;

use tokio::fs; // 1.32.0
use tokio::sync::{Mutex, OnceCell};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
    /// target directory
    #[arg(short, long, default_value_t = String::from(".") )]
    dir: String,
}

static LIST_FROM_THD: tokio::sync::OnceCell<tokio::sync::Mutex<Vec<String>>> = OnceCell::const_new();

#[tokio::main]
async fn main() -> Result<()> {
    let args = Args::parse();

    let dir_string = args.dir.to_string();
    println!("{}", dir_string);

    let _ = LIST_FROM_THD.set(Mutex::new(Vec::<String>::new()));
    let mut dir_list;

    // Get lock and push dir
    {
        let mut lock = LIST_FROM_THD
            .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
            .await
            .lock()
            .await;
        lock.push(dir_string);
    }

    // Like a do-while
    while {
        // Get lock and take array as snapshot
        {
            let mut lock = LIST_FROM_THD
                .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
                .await
                .lock()
                .await;
            dir_list = lock.to_vec();
            lock.clear();
        }

        !dir_list.is_empty()
    } {
        let mut dir_list = dir_list.iter();
        while let Some(item) = dir_list.next() {
            let _ = get_dirs(item.to_string()).await?;
        }
    }

    Ok(())
}

async fn get_dirs(dir: String) -> Result<()> {
    let mut entries = fs::read_dir(dir).await?;

    // Folder list
    let dirs = Arc::new(Mutex::new(Vec::new()));

    while let Some(entry) = entries.next_entry().await? {
        let metadata = entry.metadata().await?;
        let path = entry.path();

        let _ = tokio::spawn(
            (|path: std::path::PathBuf, dirs: std::sync::Arc<Mutex<Vec<String>>>| async move {
                if metadata.is_dir() {
                    let path = path.display().to_string();
                    println!("{}", path);

                    // Get lock and add array
                    {
                        let mut lock = dirs.lock().await;
                        lock.push(path);
                    }
                    // continue;
                }
            })(path.clone(), Arc::clone(&dirs)),
        )
        .await;

        if let Ok(symlink) = fs::read_link(&path).await {
            if path.is_dir() {
                println!("{}@ -> {}", path.display(), symlink.display());
            }
        }
    }

    // Get lock and add array
    {
        let mut lock = LIST_FROM_THD
            .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
            .await
            .lock()
            .await;
        let mut dirs = dirs.lock().await;
        lock.append(&mut dirs);
    }

    Ok(())
}

2つのブロックをspawn

今度は2つのブロックを両方ともspawnしてみます。

結果としては、

MeanStd.Dev.MinMedianMax
real86.7731.27883.40686.94789.449
user18.7940.91117.28918.65820.741
sys119.0492.265112.294119.183122.94

めちゃくちゃ遅くなりました。約150倍ぐらいでしょうか。

use anyhow::Result; // 1.0.71
use clap::Parser; // 4.3.11

use std::sync::Arc;

use tokio::fs; // 1.32.0
use tokio::sync::{Mutex, OnceCell};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
    /// target directory
    #[arg(short, long, default_value_t = String::from(".") )]
    dir: String,
}

static LIST_FROM_THD: tokio::sync::OnceCell<tokio::sync::Mutex<Vec<String>>> = OnceCell::const_new();

#[tokio::main]
async fn main() -> Result<()> {
    let args = Args::parse();

    let dir_string = args.dir.to_string();
    println!("{}", dir_string);

    let _ = LIST_FROM_THD.set(Mutex::new(Vec::<String>::new()));
    let mut dir_list;

    // Get lock and push dir
    {
        let mut lock = LIST_FROM_THD
            .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
            .await
            .lock()
            .await;
        lock.push(dir_string);
    }

    // Like a do-while
    while {
        // Get lock and take array as snapshot
        {
            let mut lock = LIST_FROM_THD
                .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
                .await
                .lock()
                .await;
            dir_list = lock.to_vec();
            lock.clear();
        }

        !dir_list.is_empty()
    } {
        let mut dir_list = dir_list.iter();
        while let Some(item) = dir_list.next() {
            let _ = get_dirs(item.to_string()).await?;
        }
    }

    Ok(())
}

async fn get_dirs(dir: String) -> Result<()> {
    let mut entries = fs::read_dir(dir).await?;

    // Folder list
    let dirs = Arc::new(Mutex::new(Vec::new()));

    while let Some(entry) = entries.next_entry().await? {
        let metadata = entry.metadata().await?;
        let path = entry.path();

        let _ = tokio::spawn(
            (|path: std::path::PathBuf, dirs: std::sync::Arc<Mutex<Vec<String>>>| async move {
                let path = path.display().to_string();
                if metadata.is_dir() {
                    println!("{}", path);

                    // Get lock and add array
                    {
                        let mut lock = dirs.lock().await;
                        lock.push(path);
                    }
                    //                continue;
                }
            })(path.clone(), Arc::clone(&dirs)),
        )
        .await;

        let _ = tokio::spawn((|| async move {
            if let Ok(symlink) = fs::read_link(&path).await {
                if path.is_dir() {
                    println!("{}@ -> {}", path.display(), symlink.display());
                }
            }
        })())
        .await;
    }

    // Get lock and add array
    {
        let mut lock = LIST_FROM_THD
            .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
            .await
            .lock()
            .await;
        let mut dirs = dirs.lock().await;
        lock.append(&mut dirs);
    }

    Ok(())
}

2つのブロックをspawnしてjoin

今度はjoinを使用してみます。joinを使わないと並行処理にならない様なので2つのブロックのspawnをjoinしてみます。

結果としては、

MeanStd.Dev.MinMedianMax
real65.7481.04465.10265.51172.151
user10.5470.4759.64310.47811.71
sys100.71.04399.184100.507105.5

若干早くなりましたがやはり依然として遅いです。

use anyhow::Result; // 1.0.71
use clap::Parser; // 4.3.11

use std::sync::Arc;

use tokio::fs; // 1.32.0
use tokio::sync::{Mutex, OnceCell};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
    /// target directory
    #[arg(short, long, default_value_t = String::from(".") )]
    dir: String,
}

static LIST_FROM_THD: tokio::sync::OnceCell<tokio::sync::Mutex<Vec<String>>> = OnceCell::const_new();

#[tokio::main]
async fn main() -> Result<()> {
    let args = Args::parse();

    let dir_string = args.dir.to_string();
    println!("{}", dir_string);

    let _ = LIST_FROM_THD.set(Mutex::new(Vec::<String>::new()));
    let mut dir_list;

    // Get lock and push dir
    {
        let mut lock = LIST_FROM_THD
            .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
            .await
            .lock()
            .await;
        lock.push(dir_string);
    }

    // Like a do-while
    while {
        // Get lock and take array as snapshot
        {
            let mut lock = LIST_FROM_THD
                .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
                .await
                .lock()
                .await;
            dir_list = lock.to_vec();
            lock.clear();
        }

        !dir_list.is_empty()
    } {
        let mut dir_list = dir_list.iter();
        while let Some(item) = dir_list.next() {
            let _ = get_dirs(item.to_string()).await?;
        }
    }

    Ok(())
}

async fn get_dirs(dir: String) -> Result<()> {
    let mut entries = fs::read_dir(dir).await?;

    // Folder list
    let dirs = Arc::new(Mutex::new(Vec::new()));

    while let Some(entry) = entries.next_entry().await? {
        let metadata = entry.metadata().await?;
        let path = entry.path();

        let thd_1 = tokio::spawn(
            (|path: std::path::PathBuf, dirs: std::sync::Arc<Mutex<Vec<String>>>| async move {
                let path = path.display().to_string();
                if metadata.is_dir() {
                    println!("{}", path);

                    // Get lock and add array
                    {
                        let mut lock = dirs.lock().await;
                        lock.push(path);
                    }
                    //                continue;
                }
            })(path.clone(), dirs.clone()),
        );

        let thd_2 = tokio::spawn(async move {
            if let Ok(symlink) = fs::read_link(&path).await {
                if path.is_dir() {
                    println!("{}@ -> {}", path.display(), symlink.display());
                }
            }
        });

        tokio::join!(thd_1, thd_2);
    }

    // Get lock and add array
    {
        let mut lock = LIST_FROM_THD
            .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
            .await
            .lock()
            .await;
        let mut dirs = dirs.lock().await;
        lock.append(&mut dirs);
    }

    Ok(())
}

Futureをコレクションに溜め込んでawaitしてみる

多数のFutureがある場合はどうすればよいでしょうか

Vec等のコレクションにFutureを溜め込み、後にまとめて処理すると並行処理になるということをこちらで書かれていたのでコードを参考にネット上で集めた幾つかのパターンを検証してみます。

ループの中で新規Futureを作ってawaitする、を繰り返す事とFutureをまとめてループでawaitする事は同じ気がするのですが実行結果より違うことがわかります。これの違いが何かよくわかっていませんが溜め込む事が並行処理になるのでこういう書き方のパターンとして覚えておくことにします。

tokio::task::JoinSetとFuturesUnorderedは処理が終了したものから結果を取り出すようです。

join_allとVecとFuturesOrderedは同等の出力結果が得られました。

// [dependencies]
// tokio = { version = "1.32.0", features = ["full"] }
// futures = "0.3"

use futures::future;
// use std::time::Duration;
use tokio::time::{Duration};
use tokio::task;

// use futures::{stream::FuturesUnordered, stream::FuturesOrdered, StreamExt};
use futures::stream::{FuturesUnordered, FuturesOrdered, StreamExt};

// 5秒待って引数をそのまま返す非同期関数
async fn some_heavy_work(id: i64) -> i64 {
    tokio::time::sleep(Duration::from_secs(5)).await;
    id
}

#[tokio::main]
async fn main() {

// 参照
// https://zenn.dev/nojima/articles/30bef27473a6fd

    println!("Case: join_all");
    let now = tokio::time::Instant::now();
    // 1000個の Future を作る (このタイミングでは実行されていない)
    // let works: Vec<_> = (0..1000).map(|id| some_heavy_work(id)).collect();
    let works: Vec<_> = (0..1000).map(|id| task::spawn(some_heavy_work(id))).collect();
    // 1000個の Future を並列実行する
    let ret = future::join_all(works).await;
    let ret: Vec<_> = ret.iter().map(|res| res.as_ref().unwrap()).collect();
    println!("ret = {:?}", ret);

    let duration = now.elapsed();
    println!("duration = {:?}", duration);

///////////////////////////////////////////////////////////////////

    println!("Case: Vec");
    let mut ret = Vec::new();
    let mut thds = Vec::new();
    let now = tokio::time::Instant::now();

    for id in 0..1000 {
        thds.push(task::spawn(some_heavy_work(id)));
    }

    for thd in thds {
        if let Ok(id) = thd.await {
            ret.push(id);
        }
    }
    println!("ret = {:?}", ret);

    let duration = now.elapsed();
    println!("duration = {:?}", duration);

///////////////////////////////////////////////////////////////////

    println!("Case: FuturesUnordered");
    let mut ret = Vec::new();
    let mut thds = FuturesUnordered::new();
    let now = tokio::time::Instant::now();

    for id in 0..1000 {
        thds.push(task::spawn(some_heavy_work(id)));
    }

    // let ret: Vec<_>  = thds.map(|res| res.unwrap()).collect().await;
    while let Some(Ok(id)) = thds.next().await {
        ret.push(id);
    }
    println!("ret = {:?}", ret);

    let duration = now.elapsed();
    println!("duration = {:?}", duration);

///////////////////////////////////////////////////////////////////

    println!("Case: tokio::task::JoinSet");
    let mut ret = Vec::new();
    let mut thds = task::JoinSet::new();
    let now = tokio::time::Instant::now();

    for id in 0..1000 {
        thds.spawn(some_heavy_work(id));
    }

    while let Some(Ok(id)) = thds.join_next().await {
        ret.push(id);
    }
    println!("ret = {:?}", ret);

    let duration = now.elapsed();
    println!("duration = {:?}", duration);

///////////////////////////////////////////////////////////////////

    println!("Case: FuturesOrdered");
    let mut ret = Vec::new();
    let mut thds = FuturesOrdered::new();
    let now = tokio::time::Instant::now();

    for id in 0..1000 {
        thds.push_back(task::spawn(some_heavy_work(id)));
    }

    // let ret: Vec<_>  = thds.map(|res| res.unwrap()).collect().await;
    while let Some(Ok(id)) = thds.next().await {
        ret.push(id);
    }
    println!("ret = {:?}", ret);

    let duration = now.elapsed();
    println!("duration = {:?}", duration);

///////////////////////////////////////////////////////////////////

    println!("Case: spawn and await in loop");
    let mut ret = Vec::new();
    let now = tokio::time::Instant::now();

    // 時間がかかりすぎるので4つまで
    for i in 0..4 {
        let thd = task::spawn(some_heavy_work(i));
        if let Ok(id) = thd.await {
            ret.push(id);
        }
    }
    println!("ret = {:?}", ret);

    let duration = now.elapsed();
    println!("duration = {:?}", duration);

}
Case: join_all
ret = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, ...]
duration = 5.003171598s
Case: Vec
ret = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, ...]
duration = 5.003351963s
Case: FuturesUnordered
ret = [804, 784, 746, 785, 747, 748, 749, 810, 750, 811, 751, 812, 752, 813, 753, 814, 754, 815, 755, 816, 756, 817, 818, 830, 819, 831, 820, 832, 821, 833, 822, 834, 823, 835, 824, 836, 825, 837, 826, 838, 827, 839, 828, 840, 829, 841, 842, 846, 843, 847, 844, 848, 845, 849, 850, 861, 851, 862, 852, 863, 853, 864, 854, 865, 873, 866, 867, 874, 868, 882, 883, 855, 875, 856, 876, 857, 869, 877, 870, ...]
duration = 5.003634026s
Case: tokio::task::JoinSet
ret = [855, 777, 778, 807, 741, 779, 808, 780, 809, 781, 782, 783, 810, 784, 811, 785, 812, 786, 813, 787, 814, 788, 721, 815, 722, 816, 856, 817, 857, 723, 858, 859, 893, 860, 894, 861, 895, 862, 896, 863, 897, 898, 0, 864, 899, 865, 900, 866, 1, 901, 867, 902, 903, 868, 904, 869, 905, 870, 906, 871, 907, 872, 818, 873, 819, 874, 920, 875, 820, 821, 822, 876, 823, 877, 824, 878, 879, 880, 881, 882, ...]
duration = 5.004296013s
Case: FuturesOrdered
ret = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, ...]
duration = 5.003727411s
Case: spawn and await
ret = [0, 1, 2, 3]
duration = 20.006742359s

JoinSetでまとめて処理する

並行処理性を高める書き方が分かったのでspawnできそうな処理をJoinSetにガンガン追加してみました。

その結果、劇的な速度の改善がありました。対象データが増えてもvCPUを増やすことでスケールアウトしていくことができそうです。

と、思ったのですが次章では結果が想定と異なりました。

MeanStd.Dev.MinMedianMax
real3.3280.2223.1393.2634.465
user2.5660.1892.2842.5393.378
sys8.9020.4968.348.79211.126
use anyhow::Result; // 1.0.71
use clap::Parser; // 4.3.11

use std::sync::Arc;

use tokio::fs; // 1.32.0
use tokio::sync::{Mutex, OnceCell};
use tokio::task::JoinSet;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
    /// target directory
    #[arg(short, long, default_value_t = String::from(".") )]
    dir: String,
}

static LIST_FROM_THD: tokio::sync::OnceCell<tokio::sync::Mutex<Vec<String>>> =
    OnceCell::const_new();

#[tokio::main]
async fn main() -> Result<()> {
    let args = Args::parse();

    let dir_string = args.dir.to_string();
    println!("{}", dir_string);

    let _ = LIST_FROM_THD.set(Mutex::new(Vec::<String>::new()));
    let mut dir_list;

    // Get lock and push dir
    {
        let mut lock = LIST_FROM_THD
            .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
            .await
            .lock()
            .await;
        lock.push(dir_string);
    }

    // Like a do-while
    while {
        // Get lock and take array as snapshot
        {
            let mut lock = LIST_FROM_THD
                .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
                .await
                .lock()
                .await;
            dir_list = lock.to_vec();
            lock.clear();
        }

        !dir_list.is_empty()
    } {
        let mut thds = JoinSet::new();
        let mut dir_list = dir_list.iter();
        while let Some(item) = dir_list.next() {
            thds.spawn(get_dirs(item.to_string()));
        }

        while let Some(thd) = thds.join_next().await {
            let _ = thd;
        }
    }

    Ok(())
}

async fn get_dirs(dir: String) -> Result<()> {
    let mut entries = fs::read_dir(dir).await?;

    // Folder list
    let dirs = Arc::new(Mutex::new(Vec::new()));

    let mut thds = JoinSet::new();

    while let Some(entry) = entries.next_entry().await? {
        let metadata = entry.metadata().await?;
        let path = entry.path();

        thds.spawn(
            (|path: std::path::PathBuf, dirs: std::sync::Arc<Mutex<Vec<String>>>| async move {
                let path = path.display().to_string();
                if metadata.is_dir() {
                    println!("{}", path);

                    // Get lock and add array
                    {
                        let mut lock = dirs.lock().await;
                        lock.push(path);
                    }
                    //                continue;
                }
            })(path.clone(), dirs.clone()),
        );

        thds.spawn(async move {
            if let Ok(symlink) = fs::read_link(&path).await {
                if path.is_dir() {
                    println!("{}@ -> {}", path.display(), symlink.display());
                }
            }
        });

    }

    while let Some(thd) = thds.join_next().await {
        let _ = thd;
    }

    // Get lock and add array
    {
        let mut lock = LIST_FROM_THD
            .get_or_init(|| async move { Mutex::new(Vec::<String>::new()) })
            .await
            .lock()
            .await;
        let mut dirs = dirs.lock().await;
        lock.append(&mut dirs);
    }

    Ok(())
}

VirtualBoxの動作

物理コア=8(vCPU Max 16)の環境だったのでvCPU=8を使うようにVirtualBoxの設定を変更して検証したのですが劇的に遅くなりました。

考えられる可能性としては主に4つ。

  • vCPU=8だといずれかのCPUでOSなりアプリケーションが動いているので遅いものがあるはずです。VirtualBoxがソレを掴んでしまうのでそこで動くスレッドも遅くなり、結果ボトルネックとなっている可能性。
  • vCPUが増えたことにより、スケジューラーが各vCPUの動作状況を確認するコスト等が増えて結果として遅くなった可能性。
  • Linuxカーネルやライブラリ等がかなり古くマルチコアの性能を活かしきれていない環境起因の可能性。
  • mutexのロック解除待ちが発生している可能性。

極端に遅い場合はVirtualBoxを立ち上げ直してパフォーマンスが変わるかどうか確認したり、vCPUを減らすことも経験上有効なようです。

vCPU=6

MeanStd.Dev.MinMedianMax
real4.50.2594.284.4476.13
user3.7980.1573.5053.7844.247
sys17.1740.80816.36617.05922.184

vCPU=8

MeanStd.Dev.MinMedianMax
real18.0490.9915.9417.93321.145
user5.7380.2465.1185.7396.59
sys85.4464.44373.92285.46895.867

まとめ

tokioランタイムと組み合わせてasync/awaitを使えば非同期コードを書けることがわかりました。ただし、並列処理をするにはspawnして明示的にスレッドに渡すようにしないといけません。ただspawnするだけではパフォーマンスが出ないので書き方に注意が必要です。JavaScriptとは違い注意が必要です。

またFutureをまとめてspawnすることによってtokioランタイムのスケジューラーが上手い事、並行処理してくれる事がわかりました。ただ、理屈がよく理解できていません。

JoinSetはPromise.all的な使い方ができそうで、グローバル変数をスレッドで共有していますがこれも不要にする書き方ができそうです。

VirtualBoxの動作はちょっと分からない事が多いです。要因が多いのでできるところから潰してボトルネックを特定していくしかなさそうです。

次回はボトルネックの可能性があるMutexを調べてみます。

同じようなコードばかりでスミマセン。

Tags: rust
管理人

Recent Posts

CanvaがSerif (Affinity) を買収

私は使ったことがないのですが名前はよく聞…

4週間 ago

Serifのスプリングセール – アドオンが50%オフ

Affinity Photoなどレタッチ…

2か月 ago

音声がロボットのようになるときの対処

リモート会議などでたまに相手の音声がおか…

3か月 ago

Serifのブラックフライデー – 全品40%オフ V1ユーザは更にお得!

恒例のブラックフライデーセールが始まりま…

5か月 ago

[rust] rayonで書き直してみました

前回のコードを元にrayonを使った処理…

6か月 ago

[rust] async-stdで書き直してみました

前回のコードをasync-stdで書き直…

6か月 ago