المساعد الشخصي الرقمي

مشاهدة النسخة كاملة : Need assistance with a Producer/Consumer Queue logic where the Consumer can create a



C# Programming
04-25-2009, 03:52 AM
Hello,

I am new to threading. I have developed a synchronized producer/consumer queue where the queue supporting the producer/consumer logic is both bounded (allows only x items in the queue at one time) and blocking (if the queue is full/data is unavailable it will block the threads until room/data is available on the queue). This code is going to drive a windows service that received information from MSMQ and sends a request for data out via TCPIP.

The producer, running on it's own thread, can produce requests up to the predefined size of the boundedblockingqueue. When requests have been pulled off the queue by the consumer the producer adds additional requests to the boundedblockingqueue. This logic is working fine.

The boundedblockingqueue is based off one found here on codeproject.

I have a problem with the consumer. Due to connectivity constraints, I can only allow the consumer to spawn 5 worker threads to send data out over TCPIP. Initially I was tracking these additionally thread in a hashtable. When the consumer found work in the queue it would spawn 5 additional threads, placing a reference to each in a hashtable. When these threads had completed their work, an async callback is made to consumer to remove them from the hashtable. The consumer logic's initial design was to check the hashtable for count less than 5. If the count was less than 5, additional worker threads would be spawned...if greater than 5 nothing would be added.

With this logic, the first 5 entries in the boundblockingqueue driving the producer/consumer get run. Then the whole process stops. If I remove the hashtable thread tracking logic from the consumer code, the producer/consumer queue keeps working as designed - producer generates data and consumer consumers the data ad infinitum until I stop the project.

Needless to say I am a tad bit frustrated with this and I am soooo close. http://www.barakasoft.com/script/Forums/Images/smiley_sigh.gif I am including my code base here - the code has been stripped down to the base consumer/producer functionality. Would someone please take some time and guide me on this issue! I will be more than glad to create an article on codeproject once this is accomplished and give the appropriate credit.

Remember I am new at threading, this has been quite a challenge to get this far. Thanks In Advance!

class Consumer
{
public delegate void threadCompleteHandler(Thread myThread);

private BoundedBlockingQueue _queue;
private SyncEvents _syncEvents;
private static Hashtable _trackerThreads = null;


public Consumer(BoundedBlockingQueue q, SyncEvents e)
{
_queue = q;
_syncEvents = e;
_trackerThreads = new Hashtable();
}

public void ThreadRun()
{
while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
{
if (_trackerThreads.Count < 7)
{
lock (((ICollection)_queue).SyncRoot)
{
Console.WriteLine("Item Consumed!");
int item = (int)_queue.Dequeue();

CP_Request_Handler.Request requestHandler = new WindowsService2.CP_Request_Handler.Request();

Thread workerThread = new Thread(requestHandler.DoWork);
workerThread.Name = String.Format("Worker Thread - {0}", item);

requestHandler.OnWorkDone += new WindowsService2.CP_Request_Handler.Request.BgWorkDoneHandler(BackgroundWorkerThread_OnWorkDone);

_trackerThreads.Add(workerThread, workerThread);

workerThread.Start();
Console.WriteLine("Thread Spawned!");

}
}
}
}

public void BackgroundWorkerThread_OnWorkDone(EventArgs e, string asdf, System.Threading.Thread thread)
{

_trackerThreads.Remove(thread);

Console.WriteLine("Thread Completed");
}
}

class Producer
{
private BoundedBlockingQueue _queue;
private SyncEvents _syncEvents;

public Producer(BoundedBlockingQueue q, SyncEvents e)
{
_queue = q;
_syncEvents = e;
}

public void ThreadRun()
{
Random r = new Random();
while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
{
lock (((ICollection)_queue).SyncRoot)
{
//while (_queue.Count < _queue.Size)
//{
_queue.Enqueue(r.Next(0, 100));
Console.WriteLine("Producer Fired !");
_syncEvents.NewItemEvent.Set();
Console.WriteLine("New Item Set!");
//}
}
}
}
}

class SyncEvents
{
private EventWaitHandle _newItemEvent;
private EventWaitHandle _exitThreadEvent;
private WaitHandle[] _eventArray;

public SyncEvents()
{
_newItemEvent = new AutoResetEvent(false);
_exitThreadEvent = new ManualResetEvent(false);

_eventArray = new WaitHandle[2];
_eventArray[0] = _newItemEvent;
_eventArray[1] = _exitThreadEvent;
}

public EventWaitHandle ExitThreadEvent
{
get { return _exitThreadEvent; }
}

public EventWaitHandle NewItemEvent
{
get { return _newItemEvent; }
}

public WaitHandle[] EventArray
{
get { return _eventArray; }
}
}

public sealed class BoundedBlockingQueue : ICollection
{
private Object[] _buffer;
private int _count;
private int _size;
private int _head;
private int _tail;
private readonly Object _syncRoot;

public BoundedBlockingQueue(int size)
{
if (size