1/* 2 * Copyright (c) 2015-2016 ARM Limited 3 * All rights reserved 4 * 5 * The license below extends only to copyright in the software and shall 6 * not be construed as granting a license to any other intellectual 7 * property including but not limited to intellectual property relating 8 * to a hardware implementation of the functionality of the software 9 * licensed hereunder. You may use the software subject to the license 10 * terms below provided that you ensure that this notice is replicated 11 * unmodified and in its entirety in all distributions of the software, 12 * modified or unmodified, in source code or in binary form. 13 * 14 * Redistribution and use in source and binary forms, with or without 15 * modification, are permitted provided that the following conditions are 16 * met: redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer; 18 * redistributions in binary form must reproduce the above copyright 19 * notice, this list of conditions and the following disclaimer in the 20 * documentation and/or other materials provided with the distribution; 21 * neither the name of the copyright holders nor the names of its 22 * contributors may be used to endorse or promote products derived from 23 * this software without specific prior written permission. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 36 * 37 * Authors: Gabor Dozsa 38 */ 39 40/* @file 41 * The interface class for dist-gem5 simulations. 42 */ 43 44#include "dev/net/dist_iface.hh" 45 46#include <queue> 47#include <thread> 48 49#include "base/random.hh" 50#include "base/trace.hh" 51#include "cpu/thread_context.hh" 52#include "debug/DistEthernet.hh" 53#include "debug/DistEthernetPkt.hh" 54#include "dev/net/etherpkt.hh" 55#include "sim/sim_exit.hh" 56#include "sim/sim_object.hh" 57#include "sim/system.hh" 58 59using namespace std; 60DistIface::Sync *DistIface::sync = nullptr; 61System *DistIface::sys = nullptr; 62DistIface::SyncEvent *DistIface::syncEvent = nullptr; 63unsigned DistIface::distIfaceNum = 0; 64unsigned DistIface::recvThreadsNum = 0; 65DistIface *DistIface::master = nullptr; 66bool DistIface::isSwitch = false; 67 68void 69DistIface::Sync::init(Tick start_tick, Tick repeat_tick) 70{ 71 if (start_tick < nextAt) { 72 nextAt = start_tick; 73 inform("Next dist synchronisation tick is changed to %lu.\n", nextAt); 74 } 75 76 if (repeat_tick == 0) 77 panic("Dist synchronisation interval must be greater than zero"); 78 79 if (repeat_tick < nextRepeat) { 80 nextRepeat = repeat_tick; 81 inform("Dist synchronisation interval is changed to %lu.\n", 82 nextRepeat); 83 } 84} 85 86void 87DistIface::Sync::abort() 88{ 89 std::unique_lock<std::mutex> sync_lock(lock); 90 waitNum = 0; 91 isAbort = true; 92 sync_lock.unlock(); 93 cv.notify_one(); 94} 95 96DistIface::SyncSwitch::SyncSwitch(int num_nodes) 97{ 98 numNodes = num_nodes; 99 waitNum = num_nodes; 100 numExitReq = 0; 101 numCkptReq = 0; 102 numStopSyncReq = 0; 103 doExit = false; 104 doCkpt = false; 105 doStopSync = false; 106 nextAt = std::numeric_limits<Tick>::max(); 107 nextRepeat = std::numeric_limits<Tick>::max(); 108 isAbort = false; 109} 110 111DistIface::SyncNode::SyncNode() 112{ 113 waitNum = 0; 114 needExit = ReqType::none; 115 needCkpt = ReqType::none; 116 needStopSync = ReqType::none; 117 doExit = false; 118 doCkpt = false; 119 doStopSync = false; 120 nextAt = std::numeric_limits<Tick>::max(); 121 nextRepeat = std::numeric_limits<Tick>::max(); 122 isAbort = false; 123} 124 125bool 126DistIface::SyncNode::run(bool same_tick) 127{ 128 std::unique_lock<std::mutex> sync_lock(lock); 129 Header header; 130 131 assert(waitNum == 0); 132 assert(!isAbort); 133 waitNum = DistIface::recvThreadsNum; 134 // initiate the global synchronisation 135 header.msgType = MsgType::cmdSyncReq; 136 header.sendTick = curTick(); 137 header.syncRepeat = nextRepeat; 138 header.needCkpt = needCkpt; 139 header.needStopSync = needStopSync; 140 if (needCkpt != ReqType::none) 141 needCkpt = ReqType::pending; 142 header.needExit = needExit; 143 if (needExit != ReqType::none) 144 needExit = ReqType::pending; 145 if (needStopSync != ReqType::none) 146 needStopSync = ReqType::pending; 147 DistIface::master->sendCmd(header); 148 // now wait until all receiver threads complete the synchronisation 149 auto lf = [this]{ return waitNum == 0; }; 150 cv.wait(sync_lock, lf); 151 // global synchronisation is done. 152 assert(isAbort || !same_tick || (nextAt == curTick())); 153 return !isAbort; 154} 155 156 157bool 158DistIface::SyncSwitch::run(bool same_tick) 159{ 160 std::unique_lock<std::mutex> sync_lock(lock); 161 Header header; 162 // Wait for the sync requests from the nodes 163 if (waitNum > 0) { 164 auto lf = [this]{ return waitNum == 0; }; 165 cv.wait(sync_lock, lf); 166 } 167 assert(waitNum == 0); 168 if (isAbort) // sync aborted 169 return false; 170 assert(!same_tick || (nextAt == curTick())); 171 waitNum = numNodes; 172 // Complete the global synchronisation 173 header.msgType = MsgType::cmdSyncAck; 174 header.sendTick = nextAt; 175 header.syncRepeat = nextRepeat; 176 if (doCkpt || numCkptReq == numNodes) { 177 doCkpt = true; 178 header.needCkpt = ReqType::immediate; 179 numCkptReq = 0; 180 } else { 181 header.needCkpt = ReqType::none; 182 } 183 if (doExit || numExitReq == numNodes) { 184 doExit = true; 185 header.needExit = ReqType::immediate; 186 } else { 187 header.needExit = ReqType::none; 188 } 189 if (doStopSync || numStopSyncReq == numNodes) { 190 doStopSync = true; 191 numStopSyncReq = 0; 192 header.needStopSync = ReqType::immediate; 193 } else { 194 header.needStopSync = ReqType::none; 195 } 196 DistIface::master->sendCmd(header); 197 return true; 198} 199 200bool 201DistIface::SyncSwitch::progress(Tick send_tick, 202 Tick sync_repeat, 203 ReqType need_ckpt, 204 ReqType need_exit, 205 ReqType need_stop_sync) 206{ 207 std::unique_lock<std::mutex> sync_lock(lock); 208 if (isAbort) // sync aborted 209 return false; 210 assert(waitNum > 0); 211 212 if (send_tick > nextAt) 213 nextAt = send_tick; 214 if (nextRepeat > sync_repeat) 215 nextRepeat = sync_repeat; 216 217 if (need_ckpt == ReqType::collective) 218 numCkptReq++; 219 else if (need_ckpt == ReqType::immediate) 220 doCkpt = true; 221 if (need_exit == ReqType::collective) 222 numExitReq++; 223 else if (need_exit == ReqType::immediate) 224 doExit = true; 225 if (need_stop_sync == ReqType::collective) 226 numStopSyncReq++; 227 else if (need_stop_sync == ReqType::immediate) 228 doStopSync = true; 229 230 waitNum--; 231 // Notify the simulation thread if the on-going sync is complete 232 if (waitNum == 0) { 233 sync_lock.unlock(); 234 cv.notify_one(); 235 } 236 // The receive thread must keep alive in the switch until the node 237 // closes the connection. Thus, we always return true here. 238 return true; 239} 240 241bool 242DistIface::SyncNode::progress(Tick max_send_tick, 243 Tick next_repeat, 244 ReqType do_ckpt, 245 ReqType do_exit, 246 ReqType do_stop_sync) 247{ 248 std::unique_lock<std::mutex> sync_lock(lock); 249 if (isAbort) // sync aborted 250 return false; 251 assert(waitNum > 0); 252 253 nextAt = max_send_tick; 254 nextRepeat = next_repeat; 255 doCkpt = (do_ckpt != ReqType::none); 256 doExit = (do_exit != ReqType::none); 257 doStopSync = (do_stop_sync != ReqType::none); 258 259 waitNum--; 260 // Notify the simulation thread if the on-going sync is complete 261 if (waitNum == 0) { 262 sync_lock.unlock(); 263 cv.notify_one(); 264 } 265 // The receive thread must finish when simulation is about to exit 266 return !doExit; 267} 268 269void 270DistIface::SyncNode::requestCkpt(ReqType req) 271{ 272 std::lock_guard<std::mutex> sync_lock(lock); 273 assert(req != ReqType::none); 274 if (needCkpt != ReqType::none) 275 warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req)); 276 if (needCkpt == ReqType::none || req == ReqType::immediate) 277 needCkpt = req; 278} 279 280void 281DistIface::SyncNode::requestExit(ReqType req) 282{ 283 std::lock_guard<std::mutex> sync_lock(lock); 284 assert(req != ReqType::none); 285 if (needExit != ReqType::none) 286 warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req)); 287 if (needExit == ReqType::none || req == ReqType::immediate) 288 needExit = req; 289} 290 291void 292DistIface::Sync::drainComplete() 293{ 294 if (doCkpt) { 295 // The first DistIface object called this right before writing the 296 // checkpoint. We need to drain the underlying physical network here. 297 // Note that other gem5 peers may enter this barrier at different 298 // ticks due to draining. 299 run(false); 300 // Only the "first" DistIface object has to perform the sync 301 doCkpt = false; 302 } 303} 304 305void 306DistIface::SyncNode::serialize(CheckpointOut &cp) const 307{ 308 int need_exit = static_cast<int>(needExit); 309 SERIALIZE_SCALAR(need_exit); 310} 311 312void 313DistIface::SyncNode::unserialize(CheckpointIn &cp) 314{ 315 int need_exit; 316 UNSERIALIZE_SCALAR(need_exit); 317 needExit = static_cast<ReqType>(need_exit); 318} 319 320void 321DistIface::SyncSwitch::serialize(CheckpointOut &cp) const 322{ 323 SERIALIZE_SCALAR(numExitReq); 324} 325 326void 327DistIface::SyncSwitch::unserialize(CheckpointIn &cp) 328{ 329 UNSERIALIZE_SCALAR(numExitReq); 330} 331 332void 333DistIface::SyncEvent::start() 334{ 335 // Note that this may be called either from startup() or drainResume() 336 337 // At this point, all DistIface objects has already called Sync::init() so 338 // we have a local minimum of the start tick and repeat for the periodic 339 // sync. 340 repeat = DistIface::sync->nextRepeat; 341 // Do a global barrier to agree on a common repeat value (the smallest 342 // one from all participating nodes. 343 if (!DistIface::sync->run(false)) 344 panic("DistIface::SyncEvent::start() aborted\n"); 345 346 assert(!DistIface::sync->doCkpt); 347 assert(!DistIface::sync->doExit); 348 assert(!DistIface::sync->doStopSync); 349 assert(DistIface::sync->nextAt >= curTick()); 350 assert(DistIface::sync->nextRepeat <= repeat); 351 352 if (curTick() == 0) 353 assert(!scheduled()); 354 355 // Use the maximum of the current tick for all participating nodes or a 356 // user provided starting tick. 357 if (scheduled()) 358 reschedule(DistIface::sync->nextAt); 359 else 360 schedule(DistIface::sync->nextAt); 361 362 inform("Dist sync scheduled at %lu and repeats %lu\n", when(), 363 DistIface::sync->nextRepeat); 364} 365 366void 367DistIface::SyncEvent::process() 368{ 369 // We may not start a global periodic sync while draining before taking a 370 // checkpoint. This is due to the possibility that peer gem5 processes 371 // may not hit the same periodic sync before they complete draining and 372 // that would make this periodic sync clash with sync called from 373 // DistIface::serialize() by other gem5 processes. 374 // We would need a 'distributed drain' solution to eliminate this 375 // restriction. 376 // Note that if draining was not triggered by checkpointing then we are 377 // fine since no extra global sync will happen (i.e. all peer gem5 will 378 // hit this periodic sync eventually). 379 panic_if(_draining && DistIface::sync->doCkpt, 380 "Distributed sync is hit while draining"); 381 /* 382 * Note that this is a global event so this process method will be called 383 * by only exactly one thread. 384 */ 385 /* 386 * We hold the eventq lock at this point but the receiver thread may 387 * need the lock to schedule new recv events while waiting for the 388 * dist sync to complete. 389 * Note that the other simulation threads also release their eventq 390 * locks while waiting for us due to the global event semantics. 391 */ 392 { 393 EventQueue::ScopedRelease sr(curEventQueue()); 394 // we do a global sync here that is supposed to happen at the same 395 // tick in all gem5 peers 396 if (!DistIface::sync->run(true)) 397 return; // global sync aborted 398 // global sync completed 399 } 400 if (DistIface::sync->doCkpt) 401 exitSimLoop("checkpoint"); 402 if (DistIface::sync->doExit) { 403 exitSimLoop("exit request from gem5 peers"); 404 return; 405 } 406 if (DistIface::sync->doStopSync) { 407 DistIface::sync->doStopSync = false; 408 inform("synchronization disabled at %lu\n", curTick()); 409 410 // The switch node needs to wait for the next sync immediately. 411 if (DistIface::isSwitch) { 412 start(); 413 } else { 414 // Wake up thread contexts on non-switch nodes. 415 for (int i = 0; i < DistIface::master->sys->numContexts(); i++) { 416 ThreadContext *tc = 417 DistIface::master->sys->getThreadContext(i); 418 if (tc->status() == ThreadContext::Suspended) 419 tc->activate(); 420 else 421 warn_once("Tried to wake up thread in dist-gem5, but it " 422 "was already awake!\n"); 423 } 424 } 425 return; 426 } 427 // schedule the next periodic sync 428 repeat = DistIface::sync->nextRepeat; 429 schedule(curTick() + repeat); 430} 431 432void 433DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay) 434{ 435 // This is called from the receiver thread when it starts running. The new 436 // receiver thread shares the event queue with the simulation thread 437 // (associated with the simulated Ethernet link). 438 curEventQueue(eventManager->eventQueue()); 439 440 recvDone = recv_done; 441 linkDelay = link_delay; 442} 443 444Tick 445DistIface::RecvScheduler::calcReceiveTick(Tick send_tick, 446 Tick send_delay, 447 Tick prev_recv_tick) 448{ 449 Tick recv_tick = send_tick + send_delay + linkDelay; 450 // sanity check (we need atleast a send delay long window) 451 assert(recv_tick >= prev_recv_tick + send_delay); 452 panic_if(prev_recv_tick + send_delay > recv_tick, 453 "Receive window is smaller than send delay"); 454 panic_if(recv_tick <= curTick(), 455 "Simulators out of sync - missed packet receive by %llu ticks" 456 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu " 457 "linkDelay: %lu )", 458 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay, 459 linkDelay); 460 461 return recv_tick; 462} 463 464void 465DistIface::RecvScheduler::resumeRecvTicks() 466{ 467 // Schedule pending packets asap in case link speed/delay changed when 468 // restoring from the checkpoint. 469 // This may be done during unserialize except that curTick() is unknown 470 // so we call this during drainResume(). 471 // If we are not restoring from a checkppint then link latency could not 472 // change so we just return. 473 if (!ckptRestore) 474 return; 475 476 std::vector<Desc> v; 477 while (!descQueue.empty()) { 478 Desc d = descQueue.front(); 479 descQueue.pop(); 480 d.sendTick = curTick(); 481 d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed 482 v.push_back(d); 483 } 484 485 for (auto &d : v) 486 descQueue.push(d); 487 488 if (recvDone->scheduled()) { 489 assert(!descQueue.empty()); 490 eventManager->reschedule(recvDone, curTick()); 491 } else { 492 assert(descQueue.empty() && v.empty()); 493 } 494 ckptRestore = false; 495} 496 497void 498DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet, 499 Tick send_tick, 500 Tick send_delay) 501{ 502 // Note : this is called from the receiver thread 503 curEventQueue()->lock(); 504 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick); 505 506 DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket " 507 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n", 508 send_tick, send_delay, linkDelay, recv_tick); 509 // Every packet must be sent and arrive in the same quantum 510 assert(send_tick > master->syncEvent->when() - 511 master->syncEvent->repeat); 512 // No packet may be scheduled for receive in the arrival quantum 513 assert(send_tick + send_delay + linkDelay > master->syncEvent->when()); 514 515 // Now we are about to schedule a recvDone event for the new data packet. 516 // We use the same recvDone object for all incoming data packets. Packet 517 // descriptors are saved in the ordered queue. The currently scheduled 518 // packet is always on the top of the queue. 519 // NOTE: we use the event queue lock to protect the receive desc queue, 520 // too, which is accessed both by the receiver thread and the simulation 521 // thread. 522 descQueue.emplace(new_packet, send_tick, send_delay); 523 if (descQueue.size() == 1) { 524 assert(!recvDone->scheduled()); 525 eventManager->schedule(recvDone, recv_tick); 526 } else { 527 assert(recvDone->scheduled()); 528 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick, 529 "Out of order packet received (recv_tick: %lu top(): %lu\n", 530 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay); 531 } 532 curEventQueue()->unlock(); 533} 534 535EthPacketPtr 536DistIface::RecvScheduler::popPacket() 537{ 538 // Note : this is called from the simulation thread when a receive done 539 // event is being processed for the link. We assume that the thread holds 540 // the event queue queue lock when this is called! 541 EthPacketPtr next_packet = descQueue.front().packet; 542 descQueue.pop(); 543 544 if (descQueue.size() > 0) { 545 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick, 546 descQueue.front().sendDelay, 547 curTick()); 548 eventManager->schedule(recvDone, recv_tick); 549 } 550 prevRecvTick = curTick(); 551 return next_packet; 552} 553 554void 555DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const 556{ 557 SERIALIZE_SCALAR(sendTick); 558 SERIALIZE_SCALAR(sendDelay); 559 packet->serialize("rxPacket", cp); 560} 561 562void 563DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp) 564{ 565 UNSERIALIZE_SCALAR(sendTick); 566 UNSERIALIZE_SCALAR(sendDelay); 567 packet = std::make_shared<EthPacketData>(); 568 packet->unserialize("rxPacket", cp); 569} 570 571void 572DistIface::RecvScheduler::serialize(CheckpointOut &cp) const 573{ 574 SERIALIZE_SCALAR(prevRecvTick); 575 // serialize the receive desc queue 576 std::queue<Desc> tmp_queue(descQueue); 577 unsigned n_desc_queue = descQueue.size(); 578 assert(tmp_queue.size() == descQueue.size()); 579 SERIALIZE_SCALAR(n_desc_queue); 580 for (int i = 0; i < n_desc_queue; i++) { 581 tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i)); 582 tmp_queue.pop(); 583 } 584 assert(tmp_queue.empty()); 585} 586 587void 588DistIface::RecvScheduler::unserialize(CheckpointIn &cp) 589{ 590 assert(descQueue.size() == 0); 591 assert(!recvDone->scheduled()); 592 assert(!ckptRestore); 593 594 UNSERIALIZE_SCALAR(prevRecvTick); 595 // unserialize the receive desc queue 596 unsigned n_desc_queue; 597 UNSERIALIZE_SCALAR(n_desc_queue); 598 for (int i = 0; i < n_desc_queue; i++) { 599 Desc recv_desc; 600 recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i)); 601 descQueue.push(recv_desc); 602 } 603 ckptRestore = true; 604} 605 606DistIface::DistIface(unsigned dist_rank, 607 unsigned dist_size, 608 Tick sync_start, 609 Tick sync_repeat, 610 EventManager *em, 611 bool use_pseudo_op, 612 bool is_switch, int num_nodes) : 613 syncStart(sync_start), syncRepeat(sync_repeat), 614 recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op), 615 rank(dist_rank), size(dist_size) 616{ 617 DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank); 618 isMaster = false; 619 if (master == nullptr) { 620 assert(sync == nullptr); 621 assert(syncEvent == nullptr); 622 isSwitch = is_switch; 623 if (is_switch) 624 sync = new SyncSwitch(num_nodes); 625 else 626 sync = new SyncNode(); 627 syncEvent = new SyncEvent(); 628 master = this; 629 isMaster = true; 630 } 631 distIfaceId = distIfaceNum; 632 distIfaceNum++; 633} 634 635DistIface::~DistIface() 636{ 637 assert(recvThread); 638 recvThread->join(); 639 delete recvThread; 640 if (distIfaceNum-- == 0) { 641 assert(syncEvent); 642 delete syncEvent; 643 assert(sync); 644 delete sync; 645 } 646 if (this == master) 647 master = nullptr; 648} 649 650void 651DistIface::packetOut(EthPacketPtr pkt, Tick send_delay) 652{ 653 Header header; 654 655 // Prepare a dist header packet for the Ethernet packet we want to 656 // send out. 657 header.msgType = MsgType::dataDescriptor; 658 header.sendTick = curTick(); 659 header.sendDelay = send_delay; 660 661 header.dataPacketLength = pkt->length; 662 header.simLength = pkt->simLength; 663 664 // Send out the packet and the meta info. 665 sendPacket(header, pkt); 666 667 DPRINTF(DistEthernetPkt, 668 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n", 669 pkt->length, send_delay); 670} 671 672void 673DistIface::recvThreadFunc(Event *recv_done, Tick link_delay) 674{ 675 EthPacketPtr new_packet; 676 DistHeaderPkt::Header header; 677 678 // Initialize receive scheduler parameters 679 recvScheduler.init(recv_done, link_delay); 680 681 // Main loop to wait for and process any incoming message. 682 for (;;) { 683 // recvHeader() blocks until the next dist header packet comes in. 684 if (!recvHeader(header)) { 685 // We lost connection to the peer gem5 processes most likely 686 // because one of them called m5 exit. So we stop here. 687 // Grab the eventq lock to stop the simulation thread 688 curEventQueue()->lock(); 689 exitSimLoop("connection to gem5 peer got closed"); 690 curEventQueue()->unlock(); 691 // The simulation thread may be blocked in processing an on-going 692 // global synchronisation. Abort the sync to give the simulation 693 // thread a chance to make progress and process the exit event. 694 sync->abort(); 695 // Finish receiver thread 696 break; 697 } 698 699 // We got a valid dist header packet, let's process it 700 if (header.msgType == MsgType::dataDescriptor) { 701 recvPacket(header, new_packet); 702 recvScheduler.pushPacket(new_packet, 703 header.sendTick, 704 header.sendDelay); 705 } else { 706 // everything else must be synchronisation related command 707 if (!sync->progress(header.sendTick, 708 header.syncRepeat, 709 header.needCkpt, 710 header.needExit, 711 header.needStopSync)) 712 // Finish receiver thread if simulation is about to exit 713 break; 714 } 715 } 716} 717 718void 719DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay) 720{ 721 assert(recvThread == nullptr); 722 723 recvThread = new std::thread(&DistIface::recvThreadFunc, 724 this, 725 const_cast<Event *>(recv_done), 726 link_delay); 727 recvThreadsNum++; 728} 729 730DrainState 731DistIface::drain() 732{ 733 DPRINTF(DistEthernet,"DistIFace::drain() called\n"); 734 // This can be called multiple times in the same drain cycle. 735 if (this == master) 736 syncEvent->draining(true); 737 return DrainState::Drained; 738} 739 740void 741DistIface::drainResume() { 742 DPRINTF(DistEthernet,"DistIFace::drainResume() called\n"); 743 if (this == master) 744 syncEvent->draining(false); 745 recvScheduler.resumeRecvTicks(); 746} 747 748void 749DistIface::serialize(CheckpointOut &cp) const 750{ 751 // Drain the dist interface before the checkpoint is taken. We cannot call 752 // this as part of the normal drain cycle because this dist sync has to be 753 // called exactly once after the system is fully drained. 754 sync->drainComplete(); 755 756 unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId; 757 758 SERIALIZE_SCALAR(rank_orig); 759 SERIALIZE_SCALAR(dist_iface_id_orig); 760 761 recvScheduler.serializeSection(cp, "recvScheduler"); 762 if (this == master) { 763 sync->serializeSection(cp, "Sync"); 764 } 765} 766 767void 768DistIface::unserialize(CheckpointIn &cp) 769{ 770 unsigned rank_orig, dist_iface_id_orig; 771 UNSERIALIZE_SCALAR(rank_orig); 772 UNSERIALIZE_SCALAR(dist_iface_id_orig); 773 774 panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)", 775 rank, rank_orig); 776 panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch " 777 "at resume (distIfaceId=%d, orig=%d)", distIfaceId, 778 dist_iface_id_orig); 779 780 recvScheduler.unserializeSection(cp, "recvScheduler"); 781 if (this == master) { 782 sync->unserializeSection(cp, "Sync"); 783 } 784} 785 786void 787DistIface::init(const Event *done_event, Tick link_delay) 788{ 789 // Init hook for the underlaying message transport to setup/finalize 790 // communication channels 791 initTransport(); 792 793 // Spawn a new receiver thread that will process messages 794 // coming in from peer gem5 processes. 795 // The receive thread will also schedule a (receive) doneEvent 796 // for each incoming data packet. 797 spawnRecvThread(done_event, link_delay); 798 799 800 // Adjust the periodic sync start and interval. Different DistIface 801 // might have different requirements. The singleton sync object 802 // will select the minimum values for both params. 803 assert(sync != nullptr); 804 sync->init(syncStart, syncRepeat); 805 806 // Initialize the seed for random generator to avoid the same sequence 807 // in all gem5 peer processes 808 assert(master != nullptr); 809 if (this == master) 810 random_mt.init(5489 * (rank+1) + 257); 811} 812 813void 814DistIface::startup() 815{ 816 DPRINTF(DistEthernet, "DistIface::startup() started\n"); 817 // Schedule synchronization unless we are not a switch in pseudo_op mode. 818 if (this == master && (!syncStartOnPseudoOp || isSwitch)) 819 syncEvent->start(); 820 DPRINTF(DistEthernet, "DistIface::startup() done\n"); 821} 822 823bool 824DistIface::readyToCkpt(Tick delay, Tick period) 825{ 826 bool ret = true; 827 DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu " 828 "period:%lu\n", delay, period); 829 if (master) { 830 if (delay == 0) { 831 inform("m5 checkpoint called with zero delay => triggering collaborative " 832 "checkpoint\n"); 833 sync->requestCkpt(ReqType::collective); 834 } else { 835 inform("m5 checkpoint called with non-zero delay => triggering immediate " 836 "checkpoint (at the next sync)\n"); 837 sync->requestCkpt(ReqType::immediate); 838 } 839 if (period != 0) 840 inform("Non-zero period for m5_ckpt is ignored in " 841 "distributed gem5 runs\n"); 842 ret = false; 843 } 844 return ret; 845} 846 847void 848DistIface::SyncNode::requestStopSync(ReqType req) 849{ 850 std::lock_guard<std::mutex> sync_lock(lock); 851 needStopSync = req; 852} 853 854void 855DistIface::toggleSync(ThreadContext *tc) 856{ 857 // Unforunate that we have to populate the system pointer member this way. 858 master->sys = tc->getSystemPtr(); 859 860 // The invariant for both syncing and "unsyncing" is that all threads will 861 // stop executing intructions until the desired sync state has been reached 862 // for all nodes. This is the easiest way to prevent deadlock (in the case 863 // of "unsyncing") and causality errors (in the case of syncing). 864 if (master->syncEvent->scheduled()) { 865 inform("Request toggling syncronization off\n"); 866 master->sync->requestStopSync(ReqType::collective); 867 868 // At this point, we have no clue when everyone will reach the sync 869 // stop point. Suspend execution of all local thread contexts. 870 // Dist-gem5 will reactivate all thread contexts when everyone has 871 // reached the sync stop point. 872#if THE_ISA != NULL_ISA 873 for (int i = 0; i < master->sys->numContexts(); i++) { 874 ThreadContext *tc = master->sys->getThreadContext(i); 875 if (tc->status() == ThreadContext::Active) 876 tc->quiesce(); 877 } 878#endif 879 } else { 880 inform("Request toggling syncronization on\n"); 881 master->syncEvent->start(); 882 883 // We need to suspend all CPUs until the sync point is reached by all 884 // nodes to prevent causality errors. We can also schedule CPU 885 // activation here, since we know exactly when the next sync will 886 // occur. 887#if THE_ISA != NULL_ISA 888 for (int i = 0; i < master->sys->numContexts(); i++) { 889 ThreadContext *tc = master->sys->getThreadContext(i); 890 if (tc->status() == ThreadContext::Active) 891 tc->quiesceTick(master->syncEvent->when() + 1); 892 } 893#endif 894 } 895} 896 897bool 898DistIface::readyToExit(Tick delay) 899{ 900 bool ret = true; 901 DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n", 902 delay); 903 if (master) { 904 // To successfully coordinate an exit, all nodes must be synchronising 905 if (!master->syncEvent->scheduled()) 906 master->syncEvent->start(); 907 908 if (delay == 0) { 909 inform("m5 exit called with zero delay => triggering collaborative " 910 "exit\n"); 911 sync->requestExit(ReqType::collective); 912 } else { 913 inform("m5 exit called with non-zero delay => triggering immediate " 914 "exit (at the next sync)\n"); 915 sync->requestExit(ReqType::immediate); 916 } 917 ret = false; 918 } 919 return ret; 920} 921 922uint64_t 923DistIface::rankParam() 924{ 925 uint64_t val; 926 if (master) { 927 val = master->rank; 928 } else { 929 warn("Dist-rank parameter is queried in single gem5 simulation."); 930 val = 0; 931 } 932 return val; 933} 934 935uint64_t 936DistIface::sizeParam() 937{ 938 uint64_t val; 939 if (master) { 940 val = master->size; 941 } else { 942 warn("Dist-size parameter is queried in single gem5 simulation."); 943 val = 1; 944 } 945 return val; 946} 947