データを分割し、複数のプロセッサでクエリを並列実行 (execute in parallel) させられます。
並列実行されるとは限りません。実行時にクエリの形 (shape) とそれを構成する演算子が調べられ、並列化による高速化が見込めないならば順次 (sequentially) 実行されます。実行モード - PLINQ の概要 - .NET | Microsoft Learn
並列シーケンス (parallel sequence) を表せます。
public class ParallelQuery<TSource> :
System.Linq.ParallelQuery,
System.Collections.Generic.IEnumerable<TSource>
ParallelQuery<TSource> クラス (System.Linq) | Microsoft Learn
ParallelQuery<TSource>を実装するオブジェクトに対して、クエリを実行するためのメソッドが提供されます。
| メソッド | 機能 |
|---|---|
| AsParallel(IEnumerable) | クエリの並列化 (parallelization) を有効にする |
| AsSequential<TSource>(ParallelQuery<TSource>) | ParallelQuery<TSource>をIEnumerable<T>へ変換し、クエリを順に評価することを強制する |
| AsOrdered(ParallelQuery) | データソースの処理を、順序ありとできる |
| WithExecutionMode<TSource>(ParallelQuery<TSource>, ParallelExecutionMode) | クエリの実行モードを設定できる |
| WithDegreeOfParallelism<TSource>(ParallelQuery<TSource>, Int32) | 並列に実行する最大数を設定できる |
| WithCancellation<TSource>(ParallelQuery<TSource>, CancellationToken) | クエリにCancellationTokenを設定できる |
public static System.Linq.ParallelQuery<TSource> AsParallel<TSource> (
this System.Collections.Generic.IEnumerable<TSource> source
);
AsParallel<TSource>(IEnumerable<TSource>) - ParallelEnumerable.AsParallel メソッド (System.Linq) | Microsoft Learn
IEnumerable<T>に格納してクエリ式に渡すと、並列化されません。
IEnumerable<int> source = Enumerable.Range(0, 10);
Func<int, bool> selector = (num) =>
{
Task.Delay(1).Wait();
return num % 2 == 0;
};
ParallelQuery<int> parallel1 = source.AsParallel();
IEnumerable<int> query1 = from num in parallel1 where selector(num) select num;
int[] a1 = query1.ToArray(); // 2, 4, 6, 0, 8 (並列実行される)
IEnumerable<int> parallel2 = source.AsParallel();
IEnumerable<int> query2 = from num in parallel2 where selector(num) select num; // where句は呼び出し元と同じスレッドで実行される
int[] a2 = query2.ToArray(); // 0, 2, 4, 6, 8 (順次実行される)
クエリの実行モードを設定できます。そのとき並列に実行することを強制することで、クエリの構造と無関係に並列化できます。
public static System.Linq.ParallelQuery<TSource> WithExecutionMode<TSource> (
this System.Linq.ParallelQuery<TSource> source,
System.Linq.ParallelExecutionMode executionMode
);
ParallelEnumerable.WithExecutionMode<TSource> メソッド (System.Linq) | Microsoft Learn
IEnumerable<int> source = Enumerable.Range(0, 10);
Func<int, bool> selector = (num) =>
{ // ここはLINQでは呼び出し元と同じ、PLINQでは異なるスレッドで実行される
Task.Delay(100).Wait();
return num % 2 == 0;
};
IEnumerable<int> query1 = // LINQ
from num in source
where selector(num)
select num;
ParallelQuery<int> query2 = // PLINQ
from num in source.AsParallel()
where selector(num)
select num;
ParallelQuery<int> query3 = // PLINQ 並列を強制
from num in source.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism)
where selector(num)
select num;
int a1 = query1.Count(); // 約1100 ms
int a2 = query2.Count(); // 約1100 ms
int a3 = query3.Count(); // 約220 ms
int b1 = query1.Count(); // 約1100 ms
int b2 = query2.Count(); // 約220 ms
int b3 = query3.Count(); // 約220 ms
既定ではすべてのプロセッサが使用されますが、その並列度 (Degree Of Parallelism) を変更できます。Degree of Parallelism - Introduction to PLINQ - .NET | Microsoft Learn
このメソッドを呼ばないときの既定値は、Environment.ProcessorCountまたは512の小さい方です。DefaultDegreeOfParallelism - DefaultDegreeOfParallelism
// The largest number of partitions that PLINQ supports. internal const int MAX_SUPPORTED_DOP = 512; internal static int DefaultDegreeOfParallelism = Math.Min(Environment.ProcessorCount, MAX_SUPPORTED_DOP);
小さな値を指定することで、CPU時間を他の処理に割り当てられます。不用意に大きな値を指定すると、処理が遅くなります。
IEnumerable<int> source = Enumerable.Range(0, 100);
Func<int, bool> selector = (num) =>
{
Task.Delay(100).Wait();
return num % 2 == 0;
};
ParallelQuery<int> query1 = // PLINQ
from num in source.AsParallel()
where selector(num)
select num;
ParallelQuery<int> query2 = // PLINQ
from num in source.AsParallel().WithDegreeOfParallelism(1)
where selector(num)
select num;
ParallelQuery<int> query3 = // PLINQ
from num in source.AsParallel().WithDegreeOfParallelism(100)
where selector(num)
select num;
int a1 = query1.Count(); // 約1500 ms
int a2 = query2.Count(); // 約11100 ms
int a3 = query3.Count(); // 約43700 ms
int b1 = query1.Count(); // 約1500 ms
int b2 = query2.Count(); // 約11200 ms
int b3 = query3.Count(); // 約45600 ms
クエリにCancellationTokenを設定することで、クエリをキャンセルできるようになります。
IEnumerable<int> source = Enumerable.Range(0, 100); using (CancellationTokenSource tokenSource = new CancellationTokenSource()) { ParallelQuery<int> parallel = source.AsParallel().WithCancellation(tokenSource.Token); IEnumerable<int> query = from num in parallel where num % 2 == 0 select num; Parallel.Invoke( delegate { try { query.ToArray(); } catch (OperationCanceledException e) { } // キャンセルされた catch (AggregateException e) { } // 他の例外も発生している }, delegate { tokenSource.Cancel(); // キャンセルを要求する }); }方法: PLINQ クエリを取り消す - .NET | Microsoft Learn
PLINQでは並列に実行されることで、既定では順序が維持されません。これを維持するように指示できます。
public static System.Linq.ParallelQuery<TSource> AsOrdered<TSource> (
this System.Linq.ParallelQuery<TSource> source
);
AsOrdered<TSource>(ParallelQuery<TSource>) - ParallelEnumerable.AsOrdered メソッド (System.Linq) | Microsoft Learn
順序の維持が不要となったら、AsUnordered<TSource>()を呼び出しそれを無効にします。
orderbyなどで並べ替えると、AsUnordered<TSource>()が呼び出されるまで順序が維持されます。Query Operators and Ordering - Order Preservation in PLINQ - .NET | Microsoft Learn
IEnumerable<int> source = Enumerable.Range(0, 10);
Func<int, bool> selector = (num) =>
{
Task.Delay(1).Wait();
return num % 2 == 0;
};
IEnumerable<int> query1 = // LINQ
from num in source
where selector(num)
select num;
ParallelQuery<int> query2 = // PLINQ
from num in source.AsParallel()
where selector(num)
select num;
ParallelQuery<int> query3 = // PLINQ AsOrdered()指定
from num in source.AsParallel().AsOrdered()
where selector(num)
select num;
ParallelQuery<int> query4 = // PLINQ orderbyで並べ替え
from num in source.AsParallel()
where selector(num)
orderby num
select num;
int[] a1 = query1.ToArray(); // 0, 2, 4, 6, 8
int[] a2 = query2.ToArray(); // 0, 8, 2, 4, 6 (順は不定)
int[] a3 = query3.ToArray(); // 0, 2, 4, 6, 8
int[] a4 = query4.ToArray(); // 0, 2, 4, 6, 8
public static void ForAll<TSource> (
this System.Linq.ParallelQuery<TSource> source,
Action<TSource> action
);
ParallelEnumerable.ForAll<TSource> メソッド (System.Linq) | Microsoft Learn
同等の操作はForEach<TSource>()でも可能です。
IEnumerable<int> source = Enumerable.Range(0, 10);
source.AsParallel().ForAll((item) =>
{
Console.Write(item);
});
ParallelLoopResult result = Parallel.ForEach(source, (item) =>
{
Console.Write(item);
});
foreach (int item in source)
{
Console.Write(item);
}
.net - Parallel.ForEach vs AsParallel().ForAll - Stack Overflow
PLINQのクエリから例外が投げられても、並列処理されている他のスレッドの実行は継続されることがあります。そしてそれらから投げられた例外は、AggregateExceptionにまとめて投げられます。
Func<int, bool> predicate = (num) =>
{
if (num % 2 == 0) throw new NotImplementedException();
return true;
};
IEnumerable<int> source = Enumerable.Range(0, 10);
IEnumerable<int> query =
from a in source.AsParallel()
where predicate(a)
select a;
try
{
query.Count(); // クエリを実行している途中で例外が投げられ、この処理は中断される
}
catch (NotImplementedException e) { } // LINQとして実行したときは、ここで捕捉できる
catch (AggregateException e) { }
方法:PLINQ クエリの例外を処理する - .NET | Microsoft Learn
GroupBy()やJoin()ではオーバーヘッドが発生することで、遅くなることがあります。 PLINQ クエリのパフォーマンスに影響する要因 - PLINQ での高速化について - .NET | Microsoft Learn 並列処理が常に高速であると思い込まない - PLINQ の非利便性 - .NET | Microsoft Learn
int[] array = new int[1000000];
Random rnd = new Random(0);
for (int i = 0; i < array.Length; i++)
{
array[i] = rnd.Next(10);
}
IEnumerable<IGrouping<int, int>> query1 = array.AsParallel().GroupBy(x => x);
int a1 = query1.Count(); // 約47 ms
int b1 = query1.Count(); // 約43 ms
IEnumerable<IGrouping<int, int>> query2 = array.GroupBy(x => x);
int a2 = query2.Count(); // 約28 ms
int b2 = query2.Count(); // 約25 ms
たとえば比較方法をIEqualityComparer<TKey>で指定するGroupBy()を並列実行すると、同じ要素に対してGetHashCode()がくりかえし呼ばれることがあります。