1 /*
2  *  BSD 3-Clause License
3  *  
4  *  Copyright (c) 2016, Mango-Engine Team
5  *  All rights reserved.
6  *  
7  *  Redistribution and use in source and binary forms, with or without
8  *  modification, are permitted provided that the following conditions are met:
9  *  
10  *  * Redistributions of source code must retain the above copyright notice, this
11  *    list of conditions and the following disclaimer.
12  *  
13  *  * Redistributions in binary form must reproduce the above copyright notice,
14  *    this list of conditions and the following disclaimer in the documentation
15  *    and/or other materials provided with the distribution.
16  *  
17  *  * Neither the name of the copyright holder nor the names of its
18  *    contributors may be used to endorse or promote products derived from
19  *    this software without specific prior written permission.
20  *  
21  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22  *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23  *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
24  *  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
25  *  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
26  *  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
27  *  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
29  *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 module mango_engine.util;
33 
34 import std.concurrency;
35 import core.atomic;
36 
37 /// Dummy class used for locks.
38 class SyncLock {
39 
40 }
41 
42 /// Utility class to manage a group of threads.
43 class ThreadPool {
44     immutable size_t workerNumber;
45 
46     private struct Worker {
47         private shared Tid _tid;
48         private shared bool _busy = false;
49 
50         @property shared Tid tid() @trusted nothrow { return cast(Tid) _tid; }
51 
52         @property shared bool busy() @trusted nothrow { return cast(bool) _busy; }
53         @property shared void busy(bool busy) @trusted nothrow { _busy = cast(shared) busy; }
54  
55         this(Tid tid, bool busy = false) @trusted nothrow {
56             this._tid = cast(shared) tid;
57             this._busy = cast(shared) busy;
58         }
59     }
60 
61     private SyncLock workerLock;
62     private SyncLock lock2;
63 
64     private shared bool doStop = false;
65     private shared size_t workerCounter = 0;
66     private shared Worker[size_t] workers;
67 
68     this(in size_t workerNumber) @trusted {
69         this.workerNumber = workerNumber;
70 
71         for(size_t i = 0; i < workerNumber; i++) {
72             this.workers[i] = cast(shared) Worker(spawn(&spawnWorker, cast(shared) i, cast(shared) this));
73         }
74 
75         workerLock = new SyncLock();
76         lock2 = new SyncLock();
77     }
78 
79     void submitWork(WorkDelegate work) @trusted {
80         if(doStop)
81             return;
82 
83         synchronized(workerLock) {
84             foreach(id, ref worker; this.workers) {
85                 // Prioritize sending work to free workers
86                 if(!worker.busy) {
87                     send(worker.tid, Work(work));
88                     worker.busy = true;
89                     return;
90                 }
91             }
92 
93             // All workers busy
94             if(workerCounter >= workerNumber) {
95                 workerCounter = 0; // Reset workerCounter
96             }
97 
98             // Send to the next worker. workerCounter distributes evenly work among the busy workers.
99             send(workers[workerCounter].tid, Work(work));
100 
101             atomicOp!"+="(this.workerCounter, 1);
102         }
103     }
104 
105     shared package void notifyBusy(in size_t id, in bool busy) @safe {
106         synchronized(workerLock) {
107             if(id > this.workers.length) return;
108             this.workers[id].busy = busy;
109         }
110     }
111 
112     /// Each thread finishes it's current task and immediately stops. 
113     void stopImmediate() {
114         synchronized(workerLock) {
115             doStop = true;
116             foreach(id, worker; this.workers) {
117                 send(worker.tid, "stop");
118             }
119         }
120     }
121 }
122 
123 alias WorkDelegate = void delegate() @system;
124 
125 package shared struct Work {
126     WorkDelegate work;
127 }
128 
129 class ThreadWorker {
130     immutable size_t id;
131 
132     private shared(ThreadPool) pool;
133 
134     private bool running = true;
135 
136     this(in size_t id, shared(ThreadPool) pool) @safe nothrow {
137         this.id = id;
138         this.pool = pool;
139     }
140 
141     void doRun() @trusted {
142         import std.datetime;
143         import core.thread;
144 
145         do {
146             bool recieved = receiveTimeout(1000.msecs,
147                 (string s) {
148                     if(s == "stop") {
149                         running = false;
150                     }
151                 },
152                 (Work work) {
153                     pool.notifyBusy(id, true);
154                     debug(mango_concurrencyInfo) {
155                         import std.stdio;
156                         writeln("Executing work in thread, ", id);
157                     }
158                     work.work();
159                     pool.notifyBusy(id, false);
160                     debug(mango_concurrencyInfo) {
161                         import std.stdio;
162                         writeln("Executing work complete in thread, ", id);
163                     }
164                 }
165             );
166         } while(running);
167 
168         debug(mango_concurrencyInfo) {
169             import std.stdio;
170             writeln("Worker ", id, " exiting");
171         }
172     }
173 }
174 
175 private void spawnWorker(shared(size_t) id, shared(ThreadPool) pool) @system {
176     ThreadWorker worker = new ThreadWorker(id, pool);
177     worker.doRun();
178 }
179 
180 /++
181     Reads a whole file into a string.
182 
183     Params:
184             filename =  The file to be read.
185 
186     Returns: The file's contents.
187     Throws: Exception if the file does not exist.
188 +/
189 string readFileToString(in string filename) @safe {
190     import std.file : exists, readText;
191     if(exists(filename)) {
192         auto text = readText(filename);
193         return text;
194     } else throw new Exception("File does not exist!"); 
195 }