Re[7]: Несколько потоков - быстрее вычичсление?
От: Димчанский Литва http://dimchansky.github.io/
Дата: 23.05.08 12:10
Оценка: 9 (1)
Здравствуйте, stump, Вы писали:

S>Кстати, я читал, что ParallelFX смотрит сколько ядер, и решает стоит ли рараллелить задачу.


Да, если мне не изменяет память, то именно так и сделано в Parallel FX. Мне нужна была подобная либа под .NET 2.0, так я на коленке написал свою реализацию, пусть и не полный функционал, но то, что требуется. Там тоже распараллеливание происходит реально по числу процессоров.

Parallel.cs
using System;
using System.Collections.Generic;
using System.Threading;

namespace Dimchansky.Framework.Threading
{
    /// <summary>
    /// Provides support for parallel loops and regions.
    /// </summary>
    public static class Parallel
    {
        /// <summary>
        /// Number of workers thread used in parallel computations by default.
        /// </summary>
        private static readonly int idealNumberOfThreads = Environment.ProcessorCount;

        /// <summary>
        /// Executes each of the provided actions inside a discrete, asynchronous task. 
        /// </summary>
        /// <param name="actions">An array of actions to execute.</param>
        public static void Do(params Action[] actions)
        {
            Do(idealNumberOfThreads, actions);
        }

        /// <summary>
        /// Executes each of the provided actions inside a discrete, asynchronous task. 
        /// </summary>
        /// <param name="numberOfThreads">An array of actions to execute.</param>
        /// <param name="actions">An array of actions to execute.</param>
        public static void Do(int numberOfThreads, params Action[] actions)
        {
            if (actions == null || numberOfThreads < 1)
                return;

            if (actions.Length > 0)
            {
                ForEach(actions,
                        delegate(Action a)
                        {
                            if (a != null)
                                a();
                        },
                        numberOfThreads);
            }
        }

        /// <summary>
        /// Executes a for loop in which iterations may run in parallel. 
        /// </summary>
        /// <param name="fromInclusive">The start index, inclusive.</param>
        /// <param name="toExclusive">The end index, exclusive.</param>
        /// <param name="body">The body to be invoked for each iteration.</param>
        public static void For(int fromInclusive, int toExclusive, Action<int> body)
        {
            For(fromInclusive, toExclusive, body, idealNumberOfThreads);
        }

        /// <summary>
        /// Executes a for loop in which iterations may run in parallel. 
        /// </summary>
        /// <param name="fromInclusive">The start index, inclusive.</param>
        /// <param name="toExclusive">The end index, exclusive.</param>
        /// <param name="body">The body to be invoked for each iteration.</param>
        /// <param name="numberOfThreads">Number of threads to use for executing action.</param>
        public static void For(int fromInclusive, int toExclusive, Action<int> body, int numberOfThreads)
        {
            if (fromInclusive >= toExclusive || body == null || numberOfThreads < 1)
                return;

            parallelForWorker(fromInclusive, toExclusive, body, numberOfThreads);
        }

        /// <summary>
        /// Executes an action for each item in the enumerable data source, where each element may potentially be processed in parallel.
        /// </summary>
        /// <typeparam name="T">The type of the data in the enumerable.</typeparam>
        /// <param name="source">An enumerable data source.</param>
        /// <param name="body">The action to invoke for each element in the source.</param>
        public static void ForEach<T>(IEnumerable<T> source, Action<T> body)
        {
            ForEach(source, body, idealNumberOfThreads);
        }

        /// <summary>
        /// Executes an action for each item in the enumerable data source, where each element may potentially be processed in parallel.
        /// </summary>
        /// <typeparam name="T">The type of the data in the enumerable.</typeparam>
        /// <param name="source">An enumerable data source.</param>
        /// <param name="body">The action to invoke for each element in the source.</param>
        /// <param name="numberOfThreads">Number of threads to use for executing action.</param>
        public static void ForEach<T>(IEnumerable<T> source, Action<T> body, int numberOfThreads)
        {
            if (source == null || body == null || numberOfThreads < 1)
                return;

            T[] array = source as T[];
            if (array != null)
            {
                parallelForEachWorker(array, body, numberOfThreads);
            }
            else
            {
                IList<T> list = source as IList<T>;
                if (list != null)
                {
                    parallelForEachWorker(list, body, numberOfThreads);
                }
                else
                {
                    using (IEnumerator<T> enumerator = source.GetEnumerator())
                    {
                        parallelForEachWorker(enumerator, body, numberOfThreads);
                    }
                }
            }
        }

        #region Helpers

        private static void parallelForWorker(int fromInclusive, int toExclusive, Action<int> body, int numberOfThreads)
        {
            if (toExclusive > fromInclusive)
            {
                int elementsCount = toExclusive - fromInclusive;

                // reduce workers count if possible
                int workItems = Math.Max(Math.Min(elementsCount, numberOfThreads), 1);

                // Divide the list up into chunks
                int chunkSize = Math.Max(elementsCount/workItems, 1);
                int count = workItems;

                // Use an event to wait for all work items
                using (ManualResetEvent mre = new ManualResetEvent(false))
                {
                    List<Exception> innerExceptions = null;
                    object innerExceptionsLocker = new object();

                    // Each work item processes appx 1/Nth of the data items
                    WaitCallback callback = delegate(object state)
                                                {
                                                    int iteration = (int) state;
                                                    int from = chunkSize*iteration + fromInclusive;
                                                    int to = iteration == workItems - 1
                                                                 ? toExclusive
                                                                 : chunkSize*(iteration + 1) + fromInclusive;

                                                    while (from < to)
                                                    {
                                                        try
                                                        {
                                                            body(from++);
                                                        }
                                                        catch (Exception exception)
                                                        {
                                                            lock (innerExceptionsLocker)
                                                            {
                                                                if (innerExceptions == null)
                                                                {
                                                                    innerExceptions = new List<Exception>();
                                                                }
                                                                innerExceptions.Add(exception);
                                                            }
                                                            break;
                                                        }
                                                    }

                                                    if (Interlocked.Decrement(ref count) == 0)
                                                        mre.Set();
                                                };

                    // The ThreadPool is used to process all but one of the 
                    // chunks; the current thread is used for that chunk, 
                    // rather than just blocking.
                    for (int i = 0; i < workItems - 1; i++)
                    {
                        ThreadPool.QueueUserWorkItem(callback, i);
                    }

                    try
                    {
                        callback(workItems - 1);
                    }
                    catch (Exception exception)
                    {
                        lock (innerExceptionsLocker)
                        {
                            if (innerExceptions == null)
                            {
                                innerExceptions = new List<Exception>();
                            }
                            innerExceptions.Add(exception);
                        }
                    }

                    // Wait for all work to complete
                    mre.WaitOne();

                    if (innerExceptions != null)
                    {
                        throw new AggregateException(innerExceptions);
                    }
                }
            }
        }

        private static void parallelForEachWorker<T>(T[] array, Action<T> body, int numberOfThreads)
        {
            int lowerBound = array.GetLowerBound(0);
            int toExclusive = array.GetUpperBound(0) + 1;

            parallelForWorker(lowerBound, toExclusive,
                              delegate(int i) { body(array[i]); }, numberOfThreads);
        }

        private static void parallelForEachWorker<T>(IList<T> list, Action<T> body, int numberOfThreads)
        {
            parallelForWorker(0, list.Count, delegate(int i) { body(list[i]); }, numberOfThreads);
        }

        private static void parallelForEachWorker<T>(IEnumerator<T> enumerator, Action<T> action, int numberOfThreads)
        {
            int count = numberOfThreads;

            // Use an event to wait for all work items to complete
            using (ManualResetEvent mre = new ManualResetEvent(false))
            {
                List<Exception> innerExceptions = null;
                object innerExceptionsLocker = new object();

                // Each work item will continually pull data from the 
                // enumerator and process it until there is no more data
                // to process
                WaitCallback callback = delegate
                                            {
                                                while (true)
                                                {
                                                    T data;
                                                    lock (enumerator)
                                                    {
                                                        if (!enumerator.MoveNext()) break;
                                                        data = enumerator.Current;
                                                    }

                                                    try
                                                    {
                                                        action(data);
                                                    }
                                                    catch (Exception exception)
                                                    {
                                                        lock (innerExceptionsLocker)
                                                        {
                                                            if (innerExceptions == null)
                                                            {
                                                                innerExceptions = new List<Exception>();
                                                            }
                                                            innerExceptions.Add(exception);
                                                        }
                                                        break;
                                                    }
                                                }

                                                if (Interlocked.Decrement(ref count) == 0)
                                                    mre.Set();
                                            };

                // The ThreadPool is used to process all but one of the 
                // chunks; the current  thread is used for that chunk, 
                // rather than just blocking.
                for (int i = 0; i < numberOfThreads - 1; i++)
                {
                    ThreadPool.QueueUserWorkItem(callback, i);
                }

                try
                {
                    callback(numberOfThreads - 1);
                }
                catch (Exception exception)
                {
                    lock (innerExceptionsLocker)
                    {
                        if (innerExceptions == null)
                        {
                            innerExceptions = new List<Exception>();
                        }
                        innerExceptions.Add(exception);
                    }
                }

                // Wait for all work to complete
                mre.WaitOne();

                if (innerExceptions != null)
                {
                    throw new AggregateException(innerExceptions);
                }
            }
        }

        #endregion
    }
}


AggregateException.cs — беспощадно выдрал из Parallel FX
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Runtime.Serialization;
using System.Security.Permissions;

namespace Dimchansky.Framework.Threading
{
    public delegate TResult Func<T, TResult>(T arg);

    [Serializable, DebuggerDisplay("Count = {InnerExceptions.Count}")]
    public class AggregateException : Exception
    {
        // Fields
        private Exception[] m_innerExceptions;

        // Methods
        public AggregateException()
        {
        }

        public AggregateException(IEnumerable<Exception> innerExceptions)
            : this(null, innerExceptions)
        {
        }

        public AggregateException(string message)
            : base(message)
        {
        }

        [SecurityPermission(SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.SerializationFormatter)]
        protected AggregateException(SerializationInfo info, StreamingContext context)
            : base(info, context)
        {
        }

        public AggregateException(string message, IEnumerable<Exception> innerExceptions)
            : this(message, (innerExceptions == null) ? null : new List<Exception>(innerExceptions))
        {
        }

        private AggregateException(string message, List<Exception> innerExceptions)
            : base(message, ((innerExceptions != null) && (innerExceptions.Count > 0)) ? innerExceptions[0] : null)
        {
            if (innerExceptions == null)
            {
                throw new ArgumentNullException("innerExceptions");
            }
            this.m_innerExceptions = innerExceptions.ToArray();
            for (int i = 0; i < this.m_innerExceptions.Length; i++)
            {
                if (this.m_innerExceptions[i] == null)
                {
                    throw new ArgumentException("An element of innerExceptions was null.");
                }
            }
        }

        public AggregateException(string message, Exception inner)
            : this(message, new Exception[] { inner })
        {
        }

        public AggregateException Flatten(params AggregateException[] exceptions)
        {
            if (exceptions == null)
            {
                throw new ArgumentNullException("exceptions");
            }
            List<Exception> list = new List<Exception>(this.m_innerExceptions);
            AggregateException[] exceptionArray = (AggregateException[])exceptions.Clone();
            for (int i = 0; i < exceptionArray.Length; i++)
            {
                if (exceptionArray[i] == null)
                {
                    throw new ArgumentException("An element in exceptions was null.");
                }
                list.AddRange(exceptionArray[i].InnerExceptions);
            }
            return new AggregateException(this.Message, list.ToArray());
        }

        public void Handle(Func<Exception, bool> handler)
        {
            if (handler == null)
            {
                throw new ArgumentNullException("handler");
            }
            List<Exception> list = null;
            for (int i = 0; i < this.m_innerExceptions.Length; i++)
            {
                if (!handler(this.m_innerExceptions[i]))
                {
                    if (list == null)
                    {
                        list = new List<Exception>();
                    }
                    list.Add(this.m_innerExceptions[i]);
                }
            }
            if (list != null)
            {
                throw new AggregateException(this.Message, list.ToArray());
            }
        }

        public override string ToString()
        {
            string str = base.ToString();
            for (int i = 0; i < this.m_innerExceptions.Length; i++)
            {
                str = string.Format("{0}{1}---> (Inner Exception #{2}) {3}{4}{5}", new object[] { str, Environment.NewLine, i, this.m_innerExceptions[i].ToString(), "<---", Environment.NewLine });
            }
            return str;
        }

        // Properties
        public ReadOnlyCollection<Exception> InnerExceptions
        {
            get
            {
                return new ReadOnlyCollection<Exception>(this.m_innerExceptions);
            }
        }
    }
}


Пример обработки исключений приводил уже здесь
Автор: Димчанский
Дата: 28.02.08
.

Пример распареллеливания кода.
Было:
for (int i = 0; i < 100; i++) { 
  a[i] = a[i]*a[i]; 
}


Стало:
Parallel.For(0, 100, delegate(int i) { 
  a[i] = a[i]*a[i]; 
});
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.