Многопоточность

 
0
 
.NET
ava
Tokatak | 22.01.2013, 20:41
Работаю с задачей организации очереди поставщик-потребитель , в роли потребителя выступает отдельный поток.
Собственно рабочий код ниже




using System.Threading;

namespace multySteam
{
    class Program
    {
        
       
            class ProducerConsumerQueue : IDisposable
    {
      EventWaitHandle wh = new AutoResetEvent(false);
    

      Thread worker;
    

      object locker = new object();
      Queue<string> tasks = new Queue<string>();

      public ProducerConsumerQueue() 
      {
          
              worker = new Thread(Work);
              worker.Name = "worker 1";
              worker.Start();


       
      }

      public void EnqueueTask(string task)
      {
        lock (locker)
          tasks.Enqueue(task);

        wh.Set();
         
      }

      public void Dispose() 
      {
        EnqueueTask(null);      // Сигнал Потребителю на завершение
        worker.Join();          // Ожидание завершения Потребителя
        wh.Close();             // Освобождение ресурсов

       
      }

      void Work() 
      {
        while (true) 
        {
                      string task = null;

                      lock (locker)
                      {
                        if (tasks.Count > 0) 
                        {
                                        task = tasks.Dequeue();
                                        if (task == null)
                                        return;
                        }
                      }

                      if (task != null) 
                      {
                                                   Console.WriteLine("Выполняется задача: " + task);
                                                    Thread.Sleep(1000); // симуляция работы...

                          


                      }
                      else
                                                   wh.WaitOne();       // Больше задач нет, ждем сигнала...
                        
        }   
      }
    }


    class Test
    {
        static int  j= 0;

      static void Main() 
      {
        
          System.Timers.Timer T = new System.Timers.Timer(1000);
          T.Elapsed += new System.Timers.ElapsedEventHandler(T_Elapsed);

          T.Start();

        using(ProducerConsumerQueue q = new ProducerConsumerQueue()) 
        {


           
   
            q.EnqueueTask("Привет!");

          for (int i = 0; i < 150; i++)
            q.EnqueueTask("Сообщение " + i);

          q.EnqueueTask("Пока!");
        }


        T.Stop();
        Console.WriteLine("Заняло "+ j.ToString());
        Console.ReadKey();

        // Выход из using приводит к вызову Dispose, который ставит
        // в очередь null-задачу и ожидает, пока Потребитель не завершится.
      }

      static void T_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
      {
          j++;
          if (j % 10 == 0)
          { Console.WriteLine("_"+j.ToString()+"_"); }
      }

     
    }





Проблемы возникают при попытке  увеличить количество потоков запрашивающих очередь ,модифицируя метод  ProducerConsumerQueue :

     public ProducerConsumerQueue() 
      {
          for (int i = 0; i < 5; i++)
          {
              worker = new Thread(Work);
              worker.Name = "worker 1";
              worker.Start();

          }
      
      }




После отработки всей очереди не проходит Dispose и таймер продолжает тикать.
Заранее спасибо за все возможные тычки моей мордой в недоработки и рекомендованные изменения.


p.s. FAQ страничка http://base.vingrad.ru/NET-996 лежит
Kommentare (6)
ava
Tokatak | 23.01.2013, 00:07 #
Частично решается добавлением

void Work() 
      {
        while (true) 
        {
                      string task = null;

                      lock (locker)
                      {

                          if (tasks.Count > 0)
                          {
                              task = tasks.Dequeue();
                              if (task == null)
                                  return;
                          }
                          else return;
                      }
...


но возникает другая, не открыаются все потоки =\ 
ava
infarch | 23.01.2013, 10:47 #
Посмотрите на неймспейс System.Collections.Concurrent - там уже есть потокобезопасная очередь и примеры ее использования думаю найдутся.
ava
Tokatak | 23.01.2013, 15:06 #
infarch , cпасибо за наводку еще копаю, Судя по всему изобретенный велосипед не жизнеспособен.
ava
Tokatak | 23.01.2013, 17:02 #
Найдено решение .

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

using System.Threading;

namespace Threding2
{
    class Program
    {

        public class TaskQueue : IDisposable
        {
            object locker = new object();
            Thread[] workers;
            Queue<string> taskQ = new Queue<string>();

            public TaskQueue(int workerCount)
            {
                workers = new Thread[workerCount];

                // Создать и запустить отдельный поток на каждого потребителя
                for (int i = 0; i < workerCount; i++)
                    (workers[i] = new Thread(Consume)).Start();
            }

            public void Dispose()
            {
                // Добавить по null-задаче на каждого завершаемого потребителя
                foreach (Thread worker in workers)
                    EnqueueTask(null);

                foreach (Thread worker in workers)
                    worker.Join();
            }

            public void EnqueueTask(string task)
            {
                lock (locker)
                {
                    taskQ.Enqueue(task);
                    Monitor.PulseAll(locker);
                }
            }

            void Consume()
            {
                while (true)
                {
                    string task;

                    lock (locker)
                    {
                        while (taskQ.Count == 0)
                            Monitor.Wait(locker);

                        task = taskQ.Dequeue();
                    }

                    if (task == null)
                        return;  // Сигнал на выход

                    Console.Write(task);
                    Thread.Sleep(1000);       // Имитация длительной работы
                }
            }
        }


        static void Main(string[] args)
        {

            using (TaskQueue q = new TaskQueue(5))
            {
                Console.WriteLine("Помещаем в очередь 10 задач");
                Console.WriteLine("Ожидаем завершения задач...");

                for (int i = 0; i < 100; i++)
                    q.EnqueueTask(" Задача" + i);
            }

            // Выход из using приводит к вызову метода Dispose двух TaskQueue,
            // завершая потребителей после выполнения всех задач.
            Console.WriteLine("\r\nВсе задачи выполнены!");
        }
    }
}

ava
infarch | 24.01.2013, 11:13 #
Таки без велоспеда не обощлось? Чем вам не угодила ConcurrentQueue если не секрет?
ava
Tokatak | 25.01.2013, 02:54 #
Пишу все с нуля, хотелось бы полностью отслеживать логику работы.
Тараканы  , одним словом.
Registrieren Sie sich oder melden Sie sich an, um schreiben zu können.
Unternehmen des Tages
Вы также можете добавить свою фирму в каталог IT-фирм, и публиковать статьи, новости, вакансии и другую информацию от имени фирмы.
Подробнее
Mitwirkende
advanced
Absenden