1 /* 2 * BSD 3-Clause License 3 * 4 * Copyright (c) 2016, Mango-Engine Team 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * * Redistributions of source code must retain the above copyright notice, this 11 * list of conditions and the following disclaimer. 12 * 13 * * Redistributions in binary form must reproduce the above copyright notice, 14 * this list of conditions and the following disclaimer in the documentation 15 * and/or other materials provided with the distribution. 16 * 17 * * Neither the name of the copyright holder nor the names of its 18 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 22 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 23 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 24 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE 25 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 26 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 27 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 28 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 29 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 module mango_engine.util; 33 34 import std.concurrency; 35 import core.atomic; 36 37 /// Dummy class used for locks. 38 class SyncLock { 39 40 } 41 42 /// Utility class to manage a group of threads. 43 class ThreadPool { 44 immutable size_t workerNumber; 45 46 private struct Worker { 47 private shared Tid _tid; 48 private shared bool _busy = false; 49 50 @property shared Tid tid() @trusted nothrow { return cast(Tid) _tid; } 51 52 @property shared bool busy() @trusted nothrow { return cast(bool) _busy; } 53 @property shared void busy(bool busy) @trusted nothrow { _busy = cast(shared) busy; } 54 55 this(Tid tid, bool busy = false) @trusted nothrow { 56 this._tid = cast(shared) tid; 57 this._busy = cast(shared) busy; 58 } 59 } 60 61 private SyncLock workerLock; 62 private SyncLock lock2; 63 64 private shared bool doStop = false; 65 private shared size_t workerCounter = 0; 66 private shared Worker[size_t] workers; 67 68 this(in size_t workerNumber) @trusted { 69 this.workerNumber = workerNumber; 70 71 for(size_t i = 0; i < workerNumber; i++) { 72 this.workers[i] = cast(shared) Worker(spawn(&spawnWorker, cast(shared) i, cast(shared) this)); 73 } 74 75 workerLock = new SyncLock(); 76 lock2 = new SyncLock(); 77 } 78 79 void submitWork(WorkDelegate work) @trusted { 80 if(doStop) 81 return; 82 83 synchronized(workerLock) { 84 foreach(id, ref worker; this.workers) { 85 // Prioritize sending work to free workers 86 if(!worker.busy) { 87 send(worker.tid, Work(work)); 88 worker.busy = true; 89 return; 90 } 91 } 92 93 // All workers busy 94 if(workerCounter >= workerNumber) { 95 workerCounter = 0; // Reset workerCounter 96 } 97 98 // Send to the next worker. workerCounter distributes evenly work among the busy workers. 99 send(workers[workerCounter].tid, Work(work)); 100 101 atomicOp!"+="(this.workerCounter, 1); 102 } 103 } 104 105 shared package void notifyBusy(in size_t id, in bool busy) @safe { 106 synchronized(workerLock) { 107 if(id > this.workers.length) return; 108 this.workers[id].busy = busy; 109 } 110 } 111 112 /// Each thread finishes it's current task and immediately stops. 113 void stopImmediate() { 114 synchronized(workerLock) { 115 doStop = true; 116 foreach(id, worker; this.workers) { 117 send(worker.tid, "stop"); 118 } 119 } 120 } 121 } 122 123 alias WorkDelegate = void delegate() @system; 124 125 package shared struct Work { 126 WorkDelegate work; 127 } 128 129 class ThreadWorker { 130 immutable size_t id; 131 132 private shared(ThreadPool) pool; 133 134 private bool running = true; 135 136 this(in size_t id, shared(ThreadPool) pool) @safe nothrow { 137 this.id = id; 138 this.pool = pool; 139 } 140 141 void doRun() @trusted { 142 import std.datetime; 143 import core.thread; 144 145 do { 146 bool recieved = receiveTimeout(1000.msecs, 147 (string s) { 148 if(s == "stop") { 149 running = false; 150 } 151 }, 152 (Work work) { 153 pool.notifyBusy(id, true); 154 debug(mango_concurrencyInfo) { 155 import std.stdio; 156 writeln("Executing work in thread, ", id); 157 } 158 work.work(); 159 pool.notifyBusy(id, false); 160 debug(mango_concurrencyInfo) { 161 import std.stdio; 162 writeln("Executing work complete in thread, ", id); 163 } 164 } 165 ); 166 } while(running); 167 168 debug(mango_concurrencyInfo) { 169 import std.stdio; 170 writeln("Worker ", id, " exiting"); 171 } 172 } 173 } 174 175 private void spawnWorker(shared(size_t) id, shared(ThreadPool) pool) @system { 176 ThreadWorker worker = new ThreadWorker(id, pool); 177 worker.doRun(); 178 } 179 180 /++ 181 Reads a whole file into a string. 182 183 Params: 184 filename = The file to be read. 185 186 Returns: The file's contents. 187 Throws: Exception if the file does not exist. 188 +/ 189 string readFileToString(in string filename) @safe { 190 import std.file : exists, readText; 191 if(exists(filename)) { 192 auto text = readText(filename); 193 return text; 194 } else throw new Exception("File does not exist!"); 195 }