PLINQ (Parallel LINQ)

データを分割し、複数のプロセッサでクエリを並列実行 (execute in parallel) させられます。

並列実行されるとは限りません。実行時にクエリの形 (shape) とそれを構成する演算子が調べられ、並列化による高速化が見込めないならば順次 (sequentially) 実行されます。実行モード - PLINQ の概要 - .NET | Microsoft Learn

ParallelQuery<TSource>クラス

並列シーケンス (parallel sequence) を表せます。

public class ParallelQuery<TSource> :
    System.Linq.ParallelQuery,
    System.Collections.Generic.IEnumerable<TSource>
ParallelQuery<TSource> クラス (System.Linq) | Microsoft Learn

ParallelEnumerableクラス

ParallelQuery<TSource>を実装するオブジェクトに対して、クエリを実行するためのメソッドが提供されます。

メソッド 機能
AsParallel(IEnumerable) クエリの並列化 (parallelization) を有効にする
AsSequential<TSource>(ParallelQuery<TSource>) ParallelQuery<TSource>をIEnumerable<T>へ変換し、クエリを順に評価することを強制する
AsOrdered(ParallelQuery) データソースの処理を、順序ありとできる
WithExecutionMode<TSource>(ParallelQuery<TSource>, ParallelExecutionMode) クエリの実行モードを設定できる
WithDegreeOfParallelism<TSource>(ParallelQuery<TSource>, Int32) 並列に実行する最大数を設定できる
WithCancellation<TSource>(ParallelQuery<TSource>, CancellationToken) クエリにCancellationTokenを設定できる
   
ParallelEnumerable クラス (System.Linq) | Microsoft Learn

AsParallel()

public static System.Linq.ParallelQuery<TSource> AsParallel<TSource> (
    this System.Collections.Generic.IEnumerable<TSource> source
    );
AsParallel<TSource>(IEnumerable<TSource>) - ParallelEnumerable.AsParallel メソッド (System.Linq) | Microsoft Learn

IEnumerable<T>に格納してクエリ式に渡すと、並列化されません。

IEnumerable<int> source = Enumerable.Range(0, 10);
Func<int, bool> selector = (num) =>
{
    Task.Delay(1).Wait();
    return num % 2 == 0;
};


ParallelQuery<int> parallel1 = source.AsParallel();
IEnumerable<int> query1 = from num in parallel1 where selector(num) select num;
int[] a1 = query1.ToArray(); // 2, 4, 6, 0, 8 (並列実行される)

IEnumerable<int> parallel2 = source.AsParallel();
IEnumerable<int> query2 = from num in parallel2 where selector(num) select num; // where句は呼び出し元と同じスレッドで実行される
int[] a2 = query2.ToArray(); // 0, 2, 4, 6, 8 (順次実行される)

WithExecutionMode<TSource>()

クエリの実行モードを設定できます。そのとき並列に実行することを強制することで、クエリの構造と無関係に並列化できます。

public static System.Linq.ParallelQuery<TSource> WithExecutionMode<TSource> (
    this System.Linq.ParallelQuery<TSource> source,
    System.Linq.ParallelExecutionMode executionMode
    );
ParallelEnumerable.WithExecutionMode<TSource> メソッド (System.Linq) | Microsoft Learn
IEnumerable<int> source = Enumerable.Range(0, 10);
Func<int, bool> selector = (num) =>
{ // ここはLINQでは呼び出し元と同じ、PLINQでは異なるスレッドで実行される
    Task.Delay(100).Wait();
    return num % 2 == 0;
};


IEnumerable<int> query1 = // LINQ
    from num in source
    where selector(num)
    select num;

ParallelQuery<int> query2 = // PLINQ
    from num in source.AsParallel()
    where selector(num)
    select num;

ParallelQuery<int> query3 = // PLINQ 並列を強制
    from num in source.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism)
    where selector(num)
    select num;

int a1 = query1.Count(); // 約1100 ms
int a2 = query2.Count(); // 約1100 ms
int a3 = query3.Count(); // 約220 ms

int b1 = query1.Count(); // 約1100 ms
int b2 = query2.Count(); // 約220 ms
int b3 = query3.Count(); // 約220 ms

WithCancellation<TSource>()

クエリにCancellationTokenを設定することで、クエリをキャンセルできるようになります。

IEnumerable<int> source = Enumerable.Range(0, 100);
using (CancellationTokenSource tokenSource = new CancellationTokenSource())
{
    ParallelQuery<int> parallel = source.AsParallel().WithCancellation(tokenSource.Token);
    IEnumerable<int> query = from num in parallel where num % 2 == 0 select num;

    Parallel.Invoke(
    delegate
    {
        try
        {
            query.ToArray();
        }
        catch (OperationCanceledException e) { } // キャンセルされた
        catch (AggregateException e) { } // 他の例外も発生している
    },
    delegate
    {
        tokenSource.Cancel(); // キャンセルを要求する
    });
}
方法: PLINQ クエリを取り消す - .NET | Microsoft Learn

AsOrdered<TSource>()

PLINQでは並列に実行されることで、既定では順序が維持されません。これを維持するように指示できます。

public static System.Linq.ParallelQuery<TSource> AsOrdered<TSource> (
    this System.Linq.ParallelQuery<TSource> source
    );
AsOrdered<TSource>(ParallelQuery<TSource>) - ParallelEnumerable.AsOrdered メソッド (System.Linq) | Microsoft Learn

順序の維持が不要となったら、AsUnordered<TSource>()を呼び出しそれを無効にします。

orderbyなどで並べ替えると、AsUnordered<TSource>()が呼び出されるまで順序が維持されます。Query Operators and Ordering - Order Preservation in PLINQ - .NET | Microsoft Learn

IEnumerable<int> source = Enumerable.Range(0, 10);

Func<int, bool> selector = (num) =>
{
    Task.Delay(1).Wait();
    return num % 2 == 0;
};


IEnumerable<int> query1 = // LINQ
    from num in source
    where selector(num)
    select num;

ParallelQuery<int> query2 = // PLINQ
    from num in source.AsParallel()
    where selector(num)
    select num;

ParallelQuery<int> query3 = // PLINQ AsOrdered()指定
    from num in source.AsParallel().AsOrdered()
    where selector(num)
    select num;

ParallelQuery<int> query4 = // PLINQ orderbyで並べ替え
    from num in source.AsParallel()
    where selector(num)
    orderby num
    select num;

int[] a1 = query1.ToArray(); // 0, 2, 4, 6, 8
int[] a2 = query2.ToArray(); // 0, 8, 2, 4, 6 (順は不定)
int[] a3 = query3.ToArray(); // 0, 2, 4, 6, 8
int[] a4 = query4.ToArray(); // 0, 2, 4, 6, 8

ForAll<TSource>()

public static void ForAll<TSource> (
    this System.Linq.ParallelQuery<TSource> source,
    Action<TSource> action
    );
ParallelEnumerable.ForAll<TSource> メソッド (System.Linq) | Microsoft Learn

同等の操作はForEach<TSource>()でも可能です。

IEnumerable<int> source = Enumerable.Range(0, 10);
source.AsParallel().ForAll((item) =>
{
    Console.Write(item);
});


ParallelLoopResult result = Parallel.ForEach(source, (item) =>
{
    Console.Write(item);
});


foreach (int item in source)
{
    Console.Write(item);
}
.net - Parallel.ForEach vs AsParallel().ForAll - Stack Overflow

例外

PLINQのクエリから例外が投げられても、並列処理されている他のスレッドの実行は継続されることがあります。そしてそれらから投げられた例外は、AggregateExceptionにまとめて投げられます。

Func<int, bool> predicate = (num) =>
{
    if (num % 2 == 0) throw new NotImplementedException();
    return true;
};

IEnumerable<int> source = Enumerable.Range(0, 10);
IEnumerable<int> query =
    from a in source.AsParallel()
    where predicate(a)
    select a;

try
{
    query.Count(); // クエリを実行している途中で例外が投げられ、この処理は中断される
}
catch (NotImplementedException e) { } // LINQとして実行したときは、ここで捕捉できる
catch (AggregateException e) { }
方法:PLINQ クエリの例外を処理する - .NET | Microsoft Learn

注意点

つねに高速化されるとは限らない

GroupBy()やJoin()ではオーバーヘッドが発生することで、遅くなることがあります。 PLINQ クエリのパフォーマンスに影響する要因 - PLINQ での高速化について - .NET | Microsoft Learn 並列処理が常に高速であると思い込まない - PLINQ の非利便性 - .NET | Microsoft Learn

int[] array = new int[1000000];

Random rnd = new Random(0);
for (int i = 0; i < array.Length; i++)
{
    array[i] = rnd.Next(10);
}

IEnumerable<IGrouping<int, int>> query1 = array.AsParallel().GroupBy(x => x);
int a1 = query1.Count(); // 約47 ms
int b1 = query1.Count(); // 約43 ms

IEnumerable<IGrouping<int, int>> query2 = array.GroupBy(x => x);
int a2 = query2.Count(); // 約28 ms
int b2 = query2.Count(); // 約25 ms

たとえば比較方法をIEqualityComparer<TKey>で指定するGroupBy()を並列実行すると、同じ要素に対してGetHashCode()がくりかえし呼ばれることがあります。

Microsoft Learnから検索