Здравствуйте, stump, Вы писали:
S>Кстати, я читал, что ParallelFX смотрит сколько ядер, и решает стоит ли рараллелить задачу.
Да, если мне не изменяет память, то именно так и сделано в Parallel FX. Мне нужна была подобная либа под .NET 2.0, так я на коленке написал свою реализацию, пусть и не полный функционал, но то, что требуется. Там тоже распараллеливание происходит реально по числу процессоров.
Parallel.cs
using System;
using System.Collections.Generic;
using System.Threading;
namespace Dimchansky.Framework.Threading
{
/// <summary>
/// Provides support for parallel loops and regions.
/// </summary>
public static class Parallel
{
/// <summary>
/// Number of workers thread used in parallel computations by default.
/// </summary>
private static readonly int idealNumberOfThreads = Environment.ProcessorCount;
/// <summary>
/// Executes each of the provided actions inside a discrete, asynchronous task.
/// </summary>
/// <param name="actions">An array of actions to execute.</param>
public static void Do(params Action[] actions)
{
Do(idealNumberOfThreads, actions);
}
/// <summary>
/// Executes each of the provided actions inside a discrete, asynchronous task.
/// </summary>
/// <param name="numberOfThreads">An array of actions to execute.</param>
/// <param name="actions">An array of actions to execute.</param>
public static void Do(int numberOfThreads, params Action[] actions)
{
if (actions == null || numberOfThreads < 1)
return;
if (actions.Length > 0)
{
ForEach(actions,
delegate(Action a)
{
if (a != null)
a();
},
numberOfThreads);
}
}
/// <summary>
/// Executes a for loop in which iterations may run in parallel.
/// </summary>
/// <param name="fromInclusive">The start index, inclusive.</param>
/// <param name="toExclusive">The end index, exclusive.</param>
/// <param name="body">The body to be invoked for each iteration.</param>
public static void For(int fromInclusive, int toExclusive, Action<int> body)
{
For(fromInclusive, toExclusive, body, idealNumberOfThreads);
}
/// <summary>
/// Executes a for loop in which iterations may run in parallel.
/// </summary>
/// <param name="fromInclusive">The start index, inclusive.</param>
/// <param name="toExclusive">The end index, exclusive.</param>
/// <param name="body">The body to be invoked for each iteration.</param>
/// <param name="numberOfThreads">Number of threads to use for executing action.</param>
public static void For(int fromInclusive, int toExclusive, Action<int> body, int numberOfThreads)
{
if (fromInclusive >= toExclusive || body == null || numberOfThreads < 1)
return;
parallelForWorker(fromInclusive, toExclusive, body, numberOfThreads);
}
/// <summary>
/// Executes an action for each item in the enumerable data source, where each element may potentially be processed in parallel.
/// </summary>
/// <typeparam name="T">The type of the data in the enumerable.</typeparam>
/// <param name="source">An enumerable data source.</param>
/// <param name="body">The action to invoke for each element in the source.</param>
public static void ForEach<T>(IEnumerable<T> source, Action<T> body)
{
ForEach(source, body, idealNumberOfThreads);
}
/// <summary>
/// Executes an action for each item in the enumerable data source, where each element may potentially be processed in parallel.
/// </summary>
/// <typeparam name="T">The type of the data in the enumerable.</typeparam>
/// <param name="source">An enumerable data source.</param>
/// <param name="body">The action to invoke for each element in the source.</param>
/// <param name="numberOfThreads">Number of threads to use for executing action.</param>
public static void ForEach<T>(IEnumerable<T> source, Action<T> body, int numberOfThreads)
{
if (source == null || body == null || numberOfThreads < 1)
return;
T[] array = source as T[];
if (array != null)
{
parallelForEachWorker(array, body, numberOfThreads);
}
else
{
IList<T> list = source as IList<T>;
if (list != null)
{
parallelForEachWorker(list, body, numberOfThreads);
}
else
{
using (IEnumerator<T> enumerator = source.GetEnumerator())
{
parallelForEachWorker(enumerator, body, numberOfThreads);
}
}
}
}
#region Helpers
private static void parallelForWorker(int fromInclusive, int toExclusive, Action<int> body, int numberOfThreads)
{
if (toExclusive > fromInclusive)
{
int elementsCount = toExclusive - fromInclusive;
// reduce workers count if possible
int workItems = Math.Max(Math.Min(elementsCount, numberOfThreads), 1);
// Divide the list up into chunks
int chunkSize = Math.Max(elementsCount/workItems, 1);
int count = workItems;
// Use an event to wait for all work items
using (ManualResetEvent mre = new ManualResetEvent(false))
{
List<Exception> innerExceptions = null;
object innerExceptionsLocker = new object();
// Each work item processes appx 1/Nth of the data items
WaitCallback callback = delegate(object state)
{
int iteration = (int) state;
int from = chunkSize*iteration + fromInclusive;
int to = iteration == workItems - 1
? toExclusive
: chunkSize*(iteration + 1) + fromInclusive;
while (from < to)
{
try
{
body(from++);
}
catch (Exception exception)
{
lock (innerExceptionsLocker)
{
if (innerExceptions == null)
{
innerExceptions = new List<Exception>();
}
innerExceptions.Add(exception);
}
break;
}
}
if (Interlocked.Decrement(ref count) == 0)
mre.Set();
};
// The ThreadPool is used to process all but one of the
// chunks; the current thread is used for that chunk,
// rather than just blocking.
for (int i = 0; i < workItems - 1; i++)
{
ThreadPool.QueueUserWorkItem(callback, i);
}
try
{
callback(workItems - 1);
}
catch (Exception exception)
{
lock (innerExceptionsLocker)
{
if (innerExceptions == null)
{
innerExceptions = new List<Exception>();
}
innerExceptions.Add(exception);
}
}
// Wait for all work to complete
mre.WaitOne();
if (innerExceptions != null)
{
throw new AggregateException(innerExceptions);
}
}
}
}
private static void parallelForEachWorker<T>(T[] array, Action<T> body, int numberOfThreads)
{
int lowerBound = array.GetLowerBound(0);
int toExclusive = array.GetUpperBound(0) + 1;
parallelForWorker(lowerBound, toExclusive,
delegate(int i) { body(array[i]); }, numberOfThreads);
}
private static void parallelForEachWorker<T>(IList<T> list, Action<T> body, int numberOfThreads)
{
parallelForWorker(0, list.Count, delegate(int i) { body(list[i]); }, numberOfThreads);
}
private static void parallelForEachWorker<T>(IEnumerator<T> enumerator, Action<T> action, int numberOfThreads)
{
int count = numberOfThreads;
// Use an event to wait for all work items to complete
using (ManualResetEvent mre = new ManualResetEvent(false))
{
List<Exception> innerExceptions = null;
object innerExceptionsLocker = new object();
// Each work item will continually pull data from the
// enumerator and process it until there is no more data
// to process
WaitCallback callback = delegate
{
while (true)
{
T data;
lock (enumerator)
{
if (!enumerator.MoveNext()) break;
data = enumerator.Current;
}
try
{
action(data);
}
catch (Exception exception)
{
lock (innerExceptionsLocker)
{
if (innerExceptions == null)
{
innerExceptions = new List<Exception>();
}
innerExceptions.Add(exception);
}
break;
}
}
if (Interlocked.Decrement(ref count) == 0)
mre.Set();
};
// The ThreadPool is used to process all but one of the
// chunks; the current thread is used for that chunk,
// rather than just blocking.
for (int i = 0; i < numberOfThreads - 1; i++)
{
ThreadPool.QueueUserWorkItem(callback, i);
}
try
{
callback(numberOfThreads - 1);
}
catch (Exception exception)
{
lock (innerExceptionsLocker)
{
if (innerExceptions == null)
{
innerExceptions = new List<Exception>();
}
innerExceptions.Add(exception);
}
}
// Wait for all work to complete
mre.WaitOne();
if (innerExceptions != null)
{
throw new AggregateException(innerExceptions);
}
}
}
#endregion
}
}
AggregateException.cs — беспощадно выдрал из Parallel FX
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Security.Permissions;
namespace Dimchansky.Framework.Threading
{
public delegate TResult Func<T, TResult>(T arg);
[Serializable, DebuggerDisplay("Count = {InnerExceptions.Count}")]
public class AggregateException : Exception
{
// Fields
private Exception[] m_innerExceptions;
// Methods
public AggregateException()
{
}
public AggregateException(IEnumerable<Exception> innerExceptions)
: this(null, innerExceptions)
{
}
public AggregateException(string message)
: base(message)
{
}
[SecurityPermission(SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.SerializationFormatter)]
protected AggregateException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
public AggregateException(string message, IEnumerable<Exception> innerExceptions)
: this(message, (innerExceptions == null) ? null : new List<Exception>(innerExceptions))
{
}
private AggregateException(string message, List<Exception> innerExceptions)
: base(message, ((innerExceptions != null) && (innerExceptions.Count > 0)) ? innerExceptions[0] : null)
{
if (innerExceptions == null)
{
throw new ArgumentNullException("innerExceptions");
}
this.m_innerExceptions = innerExceptions.ToArray();
for (int i = 0; i < this.m_innerExceptions.Length; i++)
{
if (this.m_innerExceptions[i] == null)
{
throw new ArgumentException("An element of innerExceptions was null.");
}
}
}
public AggregateException(string message, Exception inner)
: this(message, new Exception[] { inner })
{
}
public AggregateException Flatten(params AggregateException[] exceptions)
{
if (exceptions == null)
{
throw new ArgumentNullException("exceptions");
}
List<Exception> list = new List<Exception>(this.m_innerExceptions);
AggregateException[] exceptionArray = (AggregateException[])exceptions.Clone();
for (int i = 0; i < exceptionArray.Length; i++)
{
if (exceptionArray[i] == null)
{
throw new ArgumentException("An element in exceptions was null.");
}
list.AddRange(exceptionArray[i].InnerExceptions);
}
return new AggregateException(this.Message, list.ToArray());
}
public void Handle(Func<Exception, bool> handler)
{
if (handler == null)
{
throw new ArgumentNullException("handler");
}
List<Exception> list = null;
for (int i = 0; i < this.m_innerExceptions.Length; i++)
{
if (!handler(this.m_innerExceptions[i]))
{
if (list == null)
{
list = new List<Exception>();
}
list.Add(this.m_innerExceptions[i]);
}
}
if (list != null)
{
throw new AggregateException(this.Message, list.ToArray());
}
}
public override string ToString()
{
string str = base.ToString();
for (int i = 0; i < this.m_innerExceptions.Length; i++)
{
str = string.Format("{0}{1}---> (Inner Exception #{2}) {3}{4}{5}", new object[] { str, Environment.NewLine, i, this.m_innerExceptions[i].ToString(), "<---", Environment.NewLine });
}
return str;
}
// Properties
public ReadOnlyCollection<Exception> InnerExceptions
{
get
{
return new ReadOnlyCollection<Exception>(this.m_innerExceptions);
}
}
}
}
Пример обработки исключений приводил уже
здесьАвтор: Димчанский
Дата: 28.02.08
.
Пример распареллеливания кода.
Было:
for (int i = 0; i < 100; i++) {
a[i] = a[i]*a[i];
}
Стало:
Parallel.For(0, 100, delegate(int i) {
a[i] = a[i]*a[i];
});