dist_iface.cc (11703:08b78e0a3717) | dist_iface.cc (11757:78ef8daecd81) |
---|---|
1/* | 1/* |
2 * Copyright (c) 2015 ARM Limited | 2 * Copyright (c) 2015-2016 ARM Limited |
3 * All rights reserved 4 * 5 * The license below extends only to copyright in the software and shall 6 * not be construed as granting a license to any other intellectual 7 * property including but not limited to intellectual property relating 8 * to a hardware implementation of the functionality of the software 9 * licensed hereunder. You may use the software subject to the license 10 * terms below provided that you ensure that this notice is replicated --- 67 unchanged lines hidden (view full) --- 78 79 if (repeat_tick < nextRepeat) { 80 nextRepeat = repeat_tick; 81 inform("Dist synchronisation interval is changed to %lu.\n", 82 nextRepeat); 83 } 84} 85 | 3 * All rights reserved 4 * 5 * The license below extends only to copyright in the software and shall 6 * not be construed as granting a license to any other intellectual 7 * property including but not limited to intellectual property relating 8 * to a hardware implementation of the functionality of the software 9 * licensed hereunder. You may use the software subject to the license 10 * terms below provided that you ensure that this notice is replicated --- 67 unchanged lines hidden (view full) --- 78 79 if (repeat_tick < nextRepeat) { 80 nextRepeat = repeat_tick; 81 inform("Dist synchronisation interval is changed to %lu.\n", 82 nextRepeat); 83 } 84} 85 |
86void 87DistIface::Sync::abort() 88{ 89 std::unique_lock<std::mutex> sync_lock(lock); 90 waitNum = 0; 91 isAbort = true; 92 sync_lock.unlock(); 93 cv.notify_one(); 94} 95 |
|
86DistIface::SyncSwitch::SyncSwitch(int num_nodes) 87{ 88 numNodes = num_nodes; 89 waitNum = num_nodes; 90 numExitReq = 0; 91 numCkptReq = 0; 92 numStopSyncReq = 0; 93 doExit = false; 94 doCkpt = false; 95 doStopSync = false; 96 nextAt = std::numeric_limits<Tick>::max(); 97 nextRepeat = std::numeric_limits<Tick>::max(); | 96DistIface::SyncSwitch::SyncSwitch(int num_nodes) 97{ 98 numNodes = num_nodes; 99 waitNum = num_nodes; 100 numExitReq = 0; 101 numCkptReq = 0; 102 numStopSyncReq = 0; 103 doExit = false; 104 doCkpt = false; 105 doStopSync = false; 106 nextAt = std::numeric_limits<Tick>::max(); 107 nextRepeat = std::numeric_limits<Tick>::max(); |
108 isAbort = false; |
|
98} 99 100DistIface::SyncNode::SyncNode() 101{ 102 waitNum = 0; 103 needExit = ReqType::none; 104 needCkpt = ReqType::none; 105 needStopSync = ReqType::none; 106 doExit = false; 107 doCkpt = false; 108 doStopSync = false; 109 nextAt = std::numeric_limits<Tick>::max(); 110 nextRepeat = std::numeric_limits<Tick>::max(); | 109} 110 111DistIface::SyncNode::SyncNode() 112{ 113 waitNum = 0; 114 needExit = ReqType::none; 115 needCkpt = ReqType::none; 116 needStopSync = ReqType::none; 117 doExit = false; 118 doCkpt = false; 119 doStopSync = false; 120 nextAt = std::numeric_limits<Tick>::max(); 121 nextRepeat = std::numeric_limits<Tick>::max(); |
122 isAbort = false; |
|
111} 112 | 123} 124 |
113void | 125bool |
114DistIface::SyncNode::run(bool same_tick) 115{ 116 std::unique_lock<std::mutex> sync_lock(lock); 117 Header header; 118 119 assert(waitNum == 0); | 126DistIface::SyncNode::run(bool same_tick) 127{ 128 std::unique_lock<std::mutex> sync_lock(lock); 129 Header header; 130 131 assert(waitNum == 0); |
132 assert(!isAbort); |
|
120 waitNum = DistIface::recvThreadsNum; 121 // initiate the global synchronisation 122 header.msgType = MsgType::cmdSyncReq; 123 header.sendTick = curTick(); 124 header.syncRepeat = nextRepeat; 125 header.needCkpt = needCkpt; 126 header.needStopSync = needStopSync; 127 if (needCkpt != ReqType::none) 128 needCkpt = ReqType::pending; 129 header.needExit = needExit; 130 if (needExit != ReqType::none) 131 needExit = ReqType::pending; 132 if (needStopSync != ReqType::none) 133 needStopSync = ReqType::pending; 134 DistIface::master->sendCmd(header); 135 // now wait until all receiver threads complete the synchronisation 136 auto lf = [this]{ return waitNum == 0; }; 137 cv.wait(sync_lock, lf); | 133 waitNum = DistIface::recvThreadsNum; 134 // initiate the global synchronisation 135 header.msgType = MsgType::cmdSyncReq; 136 header.sendTick = curTick(); 137 header.syncRepeat = nextRepeat; 138 header.needCkpt = needCkpt; 139 header.needStopSync = needStopSync; 140 if (needCkpt != ReqType::none) 141 needCkpt = ReqType::pending; 142 header.needExit = needExit; 143 if (needExit != ReqType::none) 144 needExit = ReqType::pending; 145 if (needStopSync != ReqType::none) 146 needStopSync = ReqType::pending; 147 DistIface::master->sendCmd(header); 148 // now wait until all receiver threads complete the synchronisation 149 auto lf = [this]{ return waitNum == 0; }; 150 cv.wait(sync_lock, lf); |
138 // global synchronisation is done 139 assert(!same_tick || (nextAt == curTick())); | 151 // global synchronisation is done. 152 assert(isAbort || !same_tick || (nextAt == curTick())); 153 return !isAbort; |
140} 141 142 | 154} 155 156 |
143void | 157bool |
144DistIface::SyncSwitch::run(bool same_tick) 145{ 146 std::unique_lock<std::mutex> sync_lock(lock); 147 Header header; 148 // Wait for the sync requests from the nodes 149 if (waitNum > 0) { 150 auto lf = [this]{ return waitNum == 0; }; 151 cv.wait(sync_lock, lf); 152 } 153 assert(waitNum == 0); | 158DistIface::SyncSwitch::run(bool same_tick) 159{ 160 std::unique_lock<std::mutex> sync_lock(lock); 161 Header header; 162 // Wait for the sync requests from the nodes 163 if (waitNum > 0) { 164 auto lf = [this]{ return waitNum == 0; }; 165 cv.wait(sync_lock, lf); 166 } 167 assert(waitNum == 0); |
168 if (isAbort) // sync aborted 169 return false; |
|
154 assert(!same_tick || (nextAt == curTick())); 155 waitNum = numNodes; 156 // Complete the global synchronisation 157 header.msgType = MsgType::cmdSyncAck; 158 header.sendTick = nextAt; 159 header.syncRepeat = nextRepeat; 160 if (doCkpt || numCkptReq == numNodes) { 161 doCkpt = true; --- 11 unchanged lines hidden (view full) --- 173 if (doStopSync || numStopSyncReq == numNodes) { 174 doStopSync = true; 175 numStopSyncReq = 0; 176 header.needStopSync = ReqType::immediate; 177 } else { 178 header.needStopSync = ReqType::none; 179 } 180 DistIface::master->sendCmd(header); | 170 assert(!same_tick || (nextAt == curTick())); 171 waitNum = numNodes; 172 // Complete the global synchronisation 173 header.msgType = MsgType::cmdSyncAck; 174 header.sendTick = nextAt; 175 header.syncRepeat = nextRepeat; 176 if (doCkpt || numCkptReq == numNodes) { 177 doCkpt = true; --- 11 unchanged lines hidden (view full) --- 189 if (doStopSync || numStopSyncReq == numNodes) { 190 doStopSync = true; 191 numStopSyncReq = 0; 192 header.needStopSync = ReqType::immediate; 193 } else { 194 header.needStopSync = ReqType::none; 195 } 196 DistIface::master->sendCmd(header); |
197 return true; |
|
181} 182 | 198} 199 |
183void | 200bool |
184DistIface::SyncSwitch::progress(Tick send_tick, 185 Tick sync_repeat, 186 ReqType need_ckpt, 187 ReqType need_exit, 188 ReqType need_stop_sync) 189{ 190 std::unique_lock<std::mutex> sync_lock(lock); | 201DistIface::SyncSwitch::progress(Tick send_tick, 202 Tick sync_repeat, 203 ReqType need_ckpt, 204 ReqType need_exit, 205 ReqType need_stop_sync) 206{ 207 std::unique_lock<std::mutex> sync_lock(lock); |
208 if (isAbort) // sync aborted 209 return false; |
|
191 assert(waitNum > 0); 192 193 if (send_tick > nextAt) 194 nextAt = send_tick; 195 if (nextRepeat > sync_repeat) 196 nextRepeat = sync_repeat; 197 198 if (need_ckpt == ReqType::collective) --- 10 unchanged lines hidden (view full) --- 209 doStopSync = true; 210 211 waitNum--; 212 // Notify the simulation thread if the on-going sync is complete 213 if (waitNum == 0) { 214 sync_lock.unlock(); 215 cv.notify_one(); 216 } | 210 assert(waitNum > 0); 211 212 if (send_tick > nextAt) 213 nextAt = send_tick; 214 if (nextRepeat > sync_repeat) 215 nextRepeat = sync_repeat; 216 217 if (need_ckpt == ReqType::collective) --- 10 unchanged lines hidden (view full) --- 228 doStopSync = true; 229 230 waitNum--; 231 // Notify the simulation thread if the on-going sync is complete 232 if (waitNum == 0) { 233 sync_lock.unlock(); 234 cv.notify_one(); 235 } |
236 // The receive thread must keep alive in the switch until the node 237 // closes the connection. Thus, we always return true here. 238 return true; |
|
217} 218 | 239} 240 |
219void | 241bool |
220DistIface::SyncNode::progress(Tick max_send_tick, 221 Tick next_repeat, 222 ReqType do_ckpt, 223 ReqType do_exit, 224 ReqType do_stop_sync) 225{ 226 std::unique_lock<std::mutex> sync_lock(lock); | 242DistIface::SyncNode::progress(Tick max_send_tick, 243 Tick next_repeat, 244 ReqType do_ckpt, 245 ReqType do_exit, 246 ReqType do_stop_sync) 247{ 248 std::unique_lock<std::mutex> sync_lock(lock); |
249 if (isAbort) // sync aborted 250 return false; |
|
227 assert(waitNum > 0); 228 229 nextAt = max_send_tick; 230 nextRepeat = next_repeat; 231 doCkpt = (do_ckpt != ReqType::none); 232 doExit = (do_exit != ReqType::none); 233 doStopSync = (do_stop_sync != ReqType::none); 234 235 waitNum--; 236 // Notify the simulation thread if the on-going sync is complete 237 if (waitNum == 0) { 238 sync_lock.unlock(); 239 cv.notify_one(); 240 } | 251 assert(waitNum > 0); 252 253 nextAt = max_send_tick; 254 nextRepeat = next_repeat; 255 doCkpt = (do_ckpt != ReqType::none); 256 doExit = (do_exit != ReqType::none); 257 doStopSync = (do_stop_sync != ReqType::none); 258 259 waitNum--; 260 // Notify the simulation thread if the on-going sync is complete 261 if (waitNum == 0) { 262 sync_lock.unlock(); 263 cv.notify_one(); 264 } |
265 // The receive thread must finish when simulation is about to exit 266 return !doExit; |
|
241} 242 243void 244DistIface::SyncNode::requestCkpt(ReqType req) 245{ 246 std::lock_guard<std::mutex> sync_lock(lock); 247 assert(req != ReqType::none); 248 if (needCkpt != ReqType::none) --- 60 unchanged lines hidden (view full) --- 309 // Note that this may be called either from startup() or drainResume() 310 311 // At this point, all DistIface objects has already called Sync::init() so 312 // we have a local minimum of the start tick and repeat for the periodic 313 // sync. 314 repeat = DistIface::sync->nextRepeat; 315 // Do a global barrier to agree on a common repeat value (the smallest 316 // one from all participating nodes. | 267} 268 269void 270DistIface::SyncNode::requestCkpt(ReqType req) 271{ 272 std::lock_guard<std::mutex> sync_lock(lock); 273 assert(req != ReqType::none); 274 if (needCkpt != ReqType::none) --- 60 unchanged lines hidden (view full) --- 335 // Note that this may be called either from startup() or drainResume() 336 337 // At this point, all DistIface objects has already called Sync::init() so 338 // we have a local minimum of the start tick and repeat for the periodic 339 // sync. 340 repeat = DistIface::sync->nextRepeat; 341 // Do a global barrier to agree on a common repeat value (the smallest 342 // one from all participating nodes. |
317 DistIface::sync->run(false); | 343 if (!DistIface::sync->run(false)) 344 panic("DistIface::SyncEvent::start() aborted\n"); |
318 319 assert(!DistIface::sync->doCkpt); 320 assert(!DistIface::sync->doExit); 321 assert(!DistIface::sync->doStopSync); 322 assert(DistIface::sync->nextAt >= curTick()); 323 assert(DistIface::sync->nextRepeat <= repeat); 324 325 if (curTick() == 0) --- 35 unchanged lines hidden (view full) --- 361 * dist sync to complete. 362 * Note that the other simulation threads also release their eventq 363 * locks while waiting for us due to the global event semantics. 364 */ 365 { 366 EventQueue::ScopedRelease sr(curEventQueue()); 367 // we do a global sync here that is supposed to happen at the same 368 // tick in all gem5 peers | 345 346 assert(!DistIface::sync->doCkpt); 347 assert(!DistIface::sync->doExit); 348 assert(!DistIface::sync->doStopSync); 349 assert(DistIface::sync->nextAt >= curTick()); 350 assert(DistIface::sync->nextRepeat <= repeat); 351 352 if (curTick() == 0) --- 35 unchanged lines hidden (view full) --- 388 * dist sync to complete. 389 * Note that the other simulation threads also release their eventq 390 * locks while waiting for us due to the global event semantics. 391 */ 392 { 393 EventQueue::ScopedRelease sr(curEventQueue()); 394 // we do a global sync here that is supposed to happen at the same 395 // tick in all gem5 peers |
369 DistIface::sync->run(true); | 396 if (!DistIface::sync->run(true)) 397 return; // global sync aborted |
370 // global sync completed 371 } 372 if (DistIface::sync->doCkpt) 373 exitSimLoop("checkpoint"); | 398 // global sync completed 399 } 400 if (DistIface::sync->doCkpt) 401 exitSimLoop("checkpoint"); |
374 if (DistIface::sync->doExit) | 402 if (DistIface::sync->doExit) { |
375 exitSimLoop("exit request from gem5 peers"); | 403 exitSimLoop("exit request from gem5 peers"); |
404 return; 405 } |
|
376 if (DistIface::sync->doStopSync) { 377 DistIface::sync->doStopSync = false; 378 inform("synchronization disabled at %lu\n", curTick()); 379 380 // The switch node needs to wait for the next sync immediately. 381 if (DistIface::isSwitch) { 382 start(); 383 } else { --- 216 unchanged lines hidden (view full) --- 600 } 601 distIfaceId = distIfaceNum; 602 distIfaceNum++; 603} 604 605DistIface::~DistIface() 606{ 607 assert(recvThread); | 406 if (DistIface::sync->doStopSync) { 407 DistIface::sync->doStopSync = false; 408 inform("synchronization disabled at %lu\n", curTick()); 409 410 // The switch node needs to wait for the next sync immediately. 411 if (DistIface::isSwitch) { 412 start(); 413 } else { --- 216 unchanged lines hidden (view full) --- 630 } 631 distIfaceId = distIfaceNum; 632 distIfaceNum++; 633} 634 635DistIface::~DistIface() 636{ 637 assert(recvThread); |
638 recvThread->join(); |
|
608 delete recvThread; | 639 delete recvThread; |
609 if (this == master) { | 640 if (distIfaceNum-- == 0) { |
610 assert(syncEvent); 611 delete syncEvent; 612 assert(sync); 613 delete sync; | 641 assert(syncEvent); 642 delete syncEvent; 643 assert(sync); 644 delete sync; |
614 master = nullptr; | |
615 } | 645 } |
646 if (this == master) 647 master = nullptr; |
|
616} 617 618void 619DistIface::packetOut(EthPacketPtr pkt, Tick send_delay) 620{ 621 Header header; 622 623 // Prepare a dist header packet for the Ethernet packet we want to --- 25 unchanged lines hidden (view full) --- 649 // Main loop to wait for and process any incoming message. 650 for (;;) { 651 // recvHeader() blocks until the next dist header packet comes in. 652 if (!recvHeader(header)) { 653 // We lost connection to the peer gem5 processes most likely 654 // because one of them called m5 exit. So we stop here. 655 // Grab the eventq lock to stop the simulation thread 656 curEventQueue()->lock(); | 648} 649 650void 651DistIface::packetOut(EthPacketPtr pkt, Tick send_delay) 652{ 653 Header header; 654 655 // Prepare a dist header packet for the Ethernet packet we want to --- 25 unchanged lines hidden (view full) --- 681 // Main loop to wait for and process any incoming message. 682 for (;;) { 683 // recvHeader() blocks until the next dist header packet comes in. 684 if (!recvHeader(header)) { 685 // We lost connection to the peer gem5 processes most likely 686 // because one of them called m5 exit. So we stop here. 687 // Grab the eventq lock to stop the simulation thread 688 curEventQueue()->lock(); |
657 exitSimLoop("Message server closed connection, simulator " 658 "is exiting"); | 689 exitSimLoop("connection to gem5 peer got closed"); |
659 curEventQueue()->unlock(); | 690 curEventQueue()->unlock(); |
691 // The simulation thread may be blocked in processing an on-going 692 // global synchronisation. Abort the sync to give the simulation 693 // thread a chance to make progress and process the exit event. 694 sync->abort(); 695 // Finish receiver thread |
|
660 break; 661 } 662 663 // We got a valid dist header packet, let's process it 664 if (header.msgType == MsgType::dataDescriptor) { 665 recvPacket(header, new_packet); 666 recvScheduler.pushPacket(new_packet, 667 header.sendTick, 668 header.sendDelay); 669 } else { 670 // everything else must be synchronisation related command | 696 break; 697 } 698 699 // We got a valid dist header packet, let's process it 700 if (header.msgType == MsgType::dataDescriptor) { 701 recvPacket(header, new_packet); 702 recvScheduler.pushPacket(new_packet, 703 header.sendTick, 704 header.sendDelay); 705 } else { 706 // everything else must be synchronisation related command |
671 sync->progress(header.sendTick, 672 header.syncRepeat, 673 header.needCkpt, 674 header.needExit, 675 header.needStopSync); | 707 if (!sync->progress(header.sendTick, 708 header.syncRepeat, 709 header.needCkpt, 710 header.needExit, 711 header.needStopSync)) 712 // Finish receiver thread if simulation is about to exit 713 break; |
676 } 677 } 678} 679 680void 681DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay) 682{ 683 assert(recvThread == nullptr); --- 221 unchanged lines hidden --- | 714 } 715 } 716} 717 718void 719DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay) 720{ 721 assert(recvThread == nullptr); --- 221 unchanged lines hidden --- |