C++の並列処理入門

この記事は、NITMic Advent Calendar 2017 15日目の記事です。

目次

はじめに

NITMicのOBで、gutと言います。 普段C++を書いていて、並列処理をやってみたくなった人向けに書いています。 自分のモチベとしては研究で使うことが多くなったので、少しまとめてみたかったのもあります。 並列化できるとは何か、C++11で追加されたstd::thread, std::asyncの話、応用例の順で話しが進んでいきます。 応用例はかなり難しい例になっているので、分からない方は飛ばしてくれて全然OKです。

並列化できる例

並列化できるとはどういうことか例を用いて説明していきます。
次のプログラムは、vector<int>に0から4を格納し、各要素を2倍して出力するものです。

#include <iostream>
#include <vector>

int main()
{
    std::vector<int> v{ 0, 1, 2, 3, 4 };

    for(auto& vi : v) {
        vi *= 2;
    }

    for(const auto& vi : v) {
        std::cout << vi << std::endl;
    }

    return 0;
}

ここで、注目してほしいところは、各要素を2倍している部分です。

    for(auto& vi : v) {
        vi *= 2;
    }

上記のように順番に各要素を2倍しているわけですが、これって同時に2倍しても良いですよね。
つまり、

    v[0] *= 2;
    v[1] *= 2;
    v[2] *= 2;
    v[3] *= 2;
    v[4] *= 2;

これを同時に処理(並列化)しても、最終的な処理結果は変わらないので問題ないわけです。

上記の例のように、データの競合がない場合に並列化できるということになります。 また、このことをスレッドセーフと呼んだりします。

std::thread

単純になんらかの処理を別スレッドで実行したい場合は、std::threadを用います。

次のプログラムは、std::threadの基本的な使い方になります。

#include <iostream>
#include <thread> // <thread>をインクルード

void task()
{
}

void another_task()
{
}

int main()
{
    // task関数を別スレッドで実行
    auto t = std::thread([](){ task(); })

    // メインスレッドで別のタスクを実行
    another_task();

    // 別スレッドが終了するまで同期待ち
    t.join();

    return 0;
}

std::threadは、std::thread(func)とすることで、別スレッドでfunc()を実行します。 funcに引数argを渡したい場合は、std::thread(func, arg)のようにして、別スレッドでfunc(arg)を実行します。 また、メインスレッドで別スレッドの同期待ちをする必要があるので、join()を必ず実行しておきます。

実際の例

では、std::threadを使って、先ほどのプログラムを並列化してみます。

#include <iostream>
#include <thread> // <thread>をインクルード
#include <vector>

int main()
{
    std::vector<int> v{ 0, 1, 2, 3, 4 };

    std::vector<std::thread> threads;

    // 各要素をスレッド別に処理
    for(std::size_t i = 0; i < 5; ++i) {
        threads.emplace_back([i, &v](){
            v[i] *= 2;
        });
    }

    // 全スレッドの同期待ち
    for(auto& t : threads) {
        t.join();
    }

    for(const auto& vi : v) {
        std::cout << vi << std::endl;
    }

    return 0;
}

この例の場合、メインスレッドと各要素ごとにスレッドが存在するので、同時に1 + 5スレッドが動いていることになります。

std::async

std::threadの場合は、処理結果をそのままvector<int>の各要素に代入していましたが、 実際にプログラムを書いてみるとラムダ式&vをキャプチャしなければならないので面倒です。 そこで、並列処理の結果を戻り値として取得したい場合に、std::asyncを使います。

次のプログラムは、std::asyncの基本的な使い方になります。

#include <iostream>
#include <future> // <future>をインクルード
#include <vector>

int task()
{
    return 1 + 1;
}

void another_task()
{
}

int main()
{
    // task関数を別スレッドで実行
    auto f = std::async(std::launch::async, [](){ return task(); });

    // メインスレッドで別のタスクを実行
    another_task();

    // 別スレッドで実行したtask関数の同期待ちと戻り値の取得
    std::cout << f.get(); << std::endl;

    return 0;
}

std::asyncは、std::async(policy, func)のようにして別スレッドで、func()を実行します。 また、std::asyncの戻り値は、std::future<result_of<func()>>型です。 例えば、戻り値がintの場合は、std::future<int>が戻り値の型となります。

policy

policyには次の3種類があります。

  • std::launch::async: 別スレッドで実行
  • std::launch::deferred: get()を実行した時に別スレッドで実行
  • std::launch::async | std::launch::deferred: 実装依存

指定しない場合は、3つ目のstd::launch::async | std::launch::deferredになります。

また、funcに引数を渡したい場合は、std::async(policy, func, arg)とすることで、別スレッドでfunc(arg)を実行し結果を取得できます。

実際の例

では、std::asyncを使って、先ほどのプログラムを並列化してみます。

#include <iostream>
#include <future> // <future>をインクルード
#include <vector>

int main()
{
    std::vector<std::future<int>> futures;

    // 各要素をスレッド別に処理
    for(std::size_t i = 0; i < 5; ++i) {
        futures.emplace_back(std::launch::async, [i](){
            return i * 2;
        });
    }

    // 全スレッドの同期待ちと結果取得
    for(auto& f : futures) {
        std::cout << f.get() << std::endl;
    }

    return 0;
}

このように、結果を取得したい場合は、std::asyncを使うことで単純にかける。

応用例

実際にstd::threadやstd::asyncを使っている応用例をあげていきます。

std::for_eachの拡張

std::for_eachをCPUのコア数だけスレッド化して処理するように拡張したプログラムです。

template<class Iterator, class Function>
inline Function parallel_for_each(Iterator begin, Iterator end, Function f)
{
    if(begin == end) {
        return std::move(f);
    }

    std::size_t num_threads = std::thread::hardware_concurrency();
    std::size_t step = std::max<std::size_t>(1, std::distance(begin, end) / num_threads);

    std::vector<std::thread> threads;

    for(; begin < end - step; begin += step) {
        threads.emplace_back([=, &f](){
            std::for_each(begin, begin + step, f);
        });
    }

    threads.emplace_back([=, &f](){
        std::for_each(begin, end, f);
    });

    for(auto&& t : threads) {
        t.join();
    }

    return std::move(f);
}

template<class Container, class Function>
inline Function parallel_for_each(Container&& c, Function f)
{
    return parallel_for_each(std::begin(c), std::end(c), f);
}

参考文献