.NET doesn’t have a very good way, that I’m aware of, to limit the number of Tasks/Threads that can be running at a single time. This can create issues where, for example, you are queuing up thousands of jobs that run against a finite set of resources.
Earlier today, for example, I wanted to run a few thousand tasks that process data stored in a database. The queries, upserts, and such are rather heavy. Letting 1000+ tasks, then, run simultaneously is an issue. In the past, I would do something like chunk the Tasks into groups and wait for groups to complete. However, I thought back on one of my previous incarnations using an the actor model/pattern for a distributed computing system.
Within this implementation, I had already done what I wanted to do. The “worker” has a set of actors, an actor has a mailbox, and an actor performs tasks based on the messages that it receives in its mailbox. Processing of the messages in the mailbox is performed synchronously. Therefore, by only limiting the number of actors that are available, the number of Tasks is also restricted. This seemed like a good way to reuse some code.
I stripped out the “workernode,” but left the BasicActor code mostly in tact. You can view the code for that post at the above link. All we really need, then, is a round-robin queuing system similar to the one in the aforementioned WorkerNode. Our actors (IAcotr<T>) will be stored in a concurrent dictionary along with a list of the jobs (messages) to be processed.
private int _lastIndex = 0; private ConcurrentDictionary<string, IActor<WorkerStatusMessage>> _actors = new ConcurrentDictionary<string, IActor<WorkerStatusMessage>>(); private ConcurrentDictionary<string, string> _jobs = new ConcurrentDictionary<string, string>(); public IActor<WorkerStatusMessage> Next { get { if ((_actors?.Count ?? 0) == 0) { return null; } if (_lastIndex == _actors.Count - 1) { _lastIndex = 0; } else { _lastIndex++; } return _actors.ElementAt(_lastIndex).Value; } }
After defining our simple members and round-robin scheduling mechanism, the rest of the code is simple. We get a list of data, and “Tell” the actors to process the data.
var maxActors = 3; CancellationTokenSource source = new CancellationTokenSource(); for (var i = 0; i < maxActors; i++) { var actor = new BasicActor() { CancellationToken = source.Token }; actor.AddCallback("node", message => { string outVal; _jobs.TryRemove(message.TaskId, out outVal); }); _actors.TryAdd(Guid.NewGuid().ToString(), actor); } foreach (var invoice in invoices) { _jobs.TryAdd(invoice, invoice); // The actor itself will enqueue this message and perform an action! this.Next.Tell<string>(invoice); } TaskRepeater.Interval(TimeSpan.FromMilliseconds(5000) , () => { var count = _jobs.Count; if (count == 0) { Console.WriteLine("Jobs complete"); } }, source.Token);
Notice that I have a callback that each actor will call as a job/Task is completed. As these messages are received, I remove the job from the list of jobs. Finally, there is a repeated Task that checks for the list of jobs to be empty. Once this list is empty, that’s an indication that all jobs have completed.