dist_iface.cc revision 11703
1/* 2 * Copyright (c) 2015 ARM Limited 3 * All rights reserved 4 * 5 * The license below extends only to copyright in the software and shall 6 * not be construed as granting a license to any other intellectual 7 * property including but not limited to intellectual property relating 8 * to a hardware implementation of the functionality of the software 9 * licensed hereunder. You may use the software subject to the license 10 * terms below provided that you ensure that this notice is replicated 11 * unmodified and in its entirety in all distributions of the software, 12 * modified or unmodified, in source code or in binary form. 13 * 14 * Redistribution and use in source and binary forms, with or without 15 * modification, are permitted provided that the following conditions are 16 * met: redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer; 18 * redistributions in binary form must reproduce the above copyright 19 * notice, this list of conditions and the following disclaimer in the 20 * documentation and/or other materials provided with the distribution; 21 * neither the name of the copyright holders nor the names of its 22 * contributors may be used to endorse or promote products derived from 23 * this software without specific prior written permission. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 36 * 37 * Authors: Gabor Dozsa 38 */ 39 40/* @file 41 * The interface class for dist-gem5 simulations. 42 */ 43 44#include "dev/net/dist_iface.hh" 45 46#include <queue> 47#include <thread> 48 49#include "base/random.hh" 50#include "base/trace.hh" 51#include "cpu/thread_context.hh" 52#include "debug/DistEthernet.hh" 53#include "debug/DistEthernetPkt.hh" 54#include "dev/net/etherpkt.hh" 55#include "sim/sim_exit.hh" 56#include "sim/sim_object.hh" 57#include "sim/system.hh" 58 59using namespace std; 60DistIface::Sync *DistIface::sync = nullptr; 61System *DistIface::sys = nullptr; 62DistIface::SyncEvent *DistIface::syncEvent = nullptr; 63unsigned DistIface::distIfaceNum = 0; 64unsigned DistIface::recvThreadsNum = 0; 65DistIface *DistIface::master = nullptr; 66bool DistIface::isSwitch = false; 67 68void 69DistIface::Sync::init(Tick start_tick, Tick repeat_tick) 70{ 71 if (start_tick < nextAt) { 72 nextAt = start_tick; 73 inform("Next dist synchronisation tick is changed to %lu.\n", nextAt); 74 } 75 76 if (repeat_tick == 0) 77 panic("Dist synchronisation interval must be greater than zero"); 78 79 if (repeat_tick < nextRepeat) { 80 nextRepeat = repeat_tick; 81 inform("Dist synchronisation interval is changed to %lu.\n", 82 nextRepeat); 83 } 84} 85 86DistIface::SyncSwitch::SyncSwitch(int num_nodes) 87{ 88 numNodes = num_nodes; 89 waitNum = num_nodes; 90 numExitReq = 0; 91 numCkptReq = 0; 92 numStopSyncReq = 0; 93 doExit = false; 94 doCkpt = false; 95 doStopSync = false; 96 nextAt = std::numeric_limits<Tick>::max(); 97 nextRepeat = std::numeric_limits<Tick>::max(); 98} 99 100DistIface::SyncNode::SyncNode() 101{ 102 waitNum = 0; 103 needExit = ReqType::none; 104 needCkpt = ReqType::none; 105 needStopSync = ReqType::none; 106 doExit = false; 107 doCkpt = false; 108 doStopSync = false; 109 nextAt = std::numeric_limits<Tick>::max(); 110 nextRepeat = std::numeric_limits<Tick>::max(); 111} 112 113void 114DistIface::SyncNode::run(bool same_tick) 115{ 116 std::unique_lock<std::mutex> sync_lock(lock); 117 Header header; 118 119 assert(waitNum == 0); 120 waitNum = DistIface::recvThreadsNum; 121 // initiate the global synchronisation 122 header.msgType = MsgType::cmdSyncReq; 123 header.sendTick = curTick(); 124 header.syncRepeat = nextRepeat; 125 header.needCkpt = needCkpt; 126 header.needStopSync = needStopSync; 127 if (needCkpt != ReqType::none) 128 needCkpt = ReqType::pending; 129 header.needExit = needExit; 130 if (needExit != ReqType::none) 131 needExit = ReqType::pending; 132 if (needStopSync != ReqType::none) 133 needStopSync = ReqType::pending; 134 DistIface::master->sendCmd(header); 135 // now wait until all receiver threads complete the synchronisation 136 auto lf = [this]{ return waitNum == 0; }; 137 cv.wait(sync_lock, lf); 138 // global synchronisation is done 139 assert(!same_tick || (nextAt == curTick())); 140} 141 142 143void 144DistIface::SyncSwitch::run(bool same_tick) 145{ 146 std::unique_lock<std::mutex> sync_lock(lock); 147 Header header; 148 // Wait for the sync requests from the nodes 149 if (waitNum > 0) { 150 auto lf = [this]{ return waitNum == 0; }; 151 cv.wait(sync_lock, lf); 152 } 153 assert(waitNum == 0); 154 assert(!same_tick || (nextAt == curTick())); 155 waitNum = numNodes; 156 // Complete the global synchronisation 157 header.msgType = MsgType::cmdSyncAck; 158 header.sendTick = nextAt; 159 header.syncRepeat = nextRepeat; 160 if (doCkpt || numCkptReq == numNodes) { 161 doCkpt = true; 162 header.needCkpt = ReqType::immediate; 163 numCkptReq = 0; 164 } else { 165 header.needCkpt = ReqType::none; 166 } 167 if (doExit || numExitReq == numNodes) { 168 doExit = true; 169 header.needExit = ReqType::immediate; 170 } else { 171 header.needExit = ReqType::none; 172 } 173 if (doStopSync || numStopSyncReq == numNodes) { 174 doStopSync = true; 175 numStopSyncReq = 0; 176 header.needStopSync = ReqType::immediate; 177 } else { 178 header.needStopSync = ReqType::none; 179 } 180 DistIface::master->sendCmd(header); 181} 182 183void 184DistIface::SyncSwitch::progress(Tick send_tick, 185 Tick sync_repeat, 186 ReqType need_ckpt, 187 ReqType need_exit, 188 ReqType need_stop_sync) 189{ 190 std::unique_lock<std::mutex> sync_lock(lock); 191 assert(waitNum > 0); 192 193 if (send_tick > nextAt) 194 nextAt = send_tick; 195 if (nextRepeat > sync_repeat) 196 nextRepeat = sync_repeat; 197 198 if (need_ckpt == ReqType::collective) 199 numCkptReq++; 200 else if (need_ckpt == ReqType::immediate) 201 doCkpt = true; 202 if (need_exit == ReqType::collective) 203 numExitReq++; 204 else if (need_exit == ReqType::immediate) 205 doExit = true; 206 if (need_stop_sync == ReqType::collective) 207 numStopSyncReq++; 208 else if (need_stop_sync == ReqType::immediate) 209 doStopSync = true; 210 211 waitNum--; 212 // Notify the simulation thread if the on-going sync is complete 213 if (waitNum == 0) { 214 sync_lock.unlock(); 215 cv.notify_one(); 216 } 217} 218 219void 220DistIface::SyncNode::progress(Tick max_send_tick, 221 Tick next_repeat, 222 ReqType do_ckpt, 223 ReqType do_exit, 224 ReqType do_stop_sync) 225{ 226 std::unique_lock<std::mutex> sync_lock(lock); 227 assert(waitNum > 0); 228 229 nextAt = max_send_tick; 230 nextRepeat = next_repeat; 231 doCkpt = (do_ckpt != ReqType::none); 232 doExit = (do_exit != ReqType::none); 233 doStopSync = (do_stop_sync != ReqType::none); 234 235 waitNum--; 236 // Notify the simulation thread if the on-going sync is complete 237 if (waitNum == 0) { 238 sync_lock.unlock(); 239 cv.notify_one(); 240 } 241} 242 243void 244DistIface::SyncNode::requestCkpt(ReqType req) 245{ 246 std::lock_guard<std::mutex> sync_lock(lock); 247 assert(req != ReqType::none); 248 if (needCkpt != ReqType::none) 249 warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req)); 250 if (needCkpt == ReqType::none || req == ReqType::immediate) 251 needCkpt = req; 252} 253 254void 255DistIface::SyncNode::requestExit(ReqType req) 256{ 257 std::lock_guard<std::mutex> sync_lock(lock); 258 assert(req != ReqType::none); 259 if (needExit != ReqType::none) 260 warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req)); 261 if (needExit == ReqType::none || req == ReqType::immediate) 262 needExit = req; 263} 264 265void 266DistIface::Sync::drainComplete() 267{ 268 if (doCkpt) { 269 // The first DistIface object called this right before writing the 270 // checkpoint. We need to drain the underlying physical network here. 271 // Note that other gem5 peers may enter this barrier at different 272 // ticks due to draining. 273 run(false); 274 // Only the "first" DistIface object has to perform the sync 275 doCkpt = false; 276 } 277} 278 279void 280DistIface::SyncNode::serialize(CheckpointOut &cp) const 281{ 282 int need_exit = static_cast<int>(needExit); 283 SERIALIZE_SCALAR(need_exit); 284} 285 286void 287DistIface::SyncNode::unserialize(CheckpointIn &cp) 288{ 289 int need_exit; 290 UNSERIALIZE_SCALAR(need_exit); 291 needExit = static_cast<ReqType>(need_exit); 292} 293 294void 295DistIface::SyncSwitch::serialize(CheckpointOut &cp) const 296{ 297 SERIALIZE_SCALAR(numExitReq); 298} 299 300void 301DistIface::SyncSwitch::unserialize(CheckpointIn &cp) 302{ 303 UNSERIALIZE_SCALAR(numExitReq); 304} 305 306void 307DistIface::SyncEvent::start() 308{ 309 // Note that this may be called either from startup() or drainResume() 310 311 // At this point, all DistIface objects has already called Sync::init() so 312 // we have a local minimum of the start tick and repeat for the periodic 313 // sync. 314 repeat = DistIface::sync->nextRepeat; 315 // Do a global barrier to agree on a common repeat value (the smallest 316 // one from all participating nodes. 317 DistIface::sync->run(false); 318 319 assert(!DistIface::sync->doCkpt); 320 assert(!DistIface::sync->doExit); 321 assert(!DistIface::sync->doStopSync); 322 assert(DistIface::sync->nextAt >= curTick()); 323 assert(DistIface::sync->nextRepeat <= repeat); 324 325 if (curTick() == 0) 326 assert(!scheduled()); 327 328 // Use the maximum of the current tick for all participating nodes or a 329 // user provided starting tick. 330 if (scheduled()) 331 reschedule(DistIface::sync->nextAt); 332 else 333 schedule(DistIface::sync->nextAt); 334 335 inform("Dist sync scheduled at %lu and repeats %lu\n", when(), 336 DistIface::sync->nextRepeat); 337} 338 339void 340DistIface::SyncEvent::process() 341{ 342 // We may not start a global periodic sync while draining before taking a 343 // checkpoint. This is due to the possibility that peer gem5 processes 344 // may not hit the same periodic sync before they complete draining and 345 // that would make this periodic sync clash with sync called from 346 // DistIface::serialize() by other gem5 processes. 347 // We would need a 'distributed drain' solution to eliminate this 348 // restriction. 349 // Note that if draining was not triggered by checkpointing then we are 350 // fine since no extra global sync will happen (i.e. all peer gem5 will 351 // hit this periodic sync eventually). 352 panic_if(_draining && DistIface::sync->doCkpt, 353 "Distributed sync is hit while draining"); 354 /* 355 * Note that this is a global event so this process method will be called 356 * by only exactly one thread. 357 */ 358 /* 359 * We hold the eventq lock at this point but the receiver thread may 360 * need the lock to schedule new recv events while waiting for the 361 * dist sync to complete. 362 * Note that the other simulation threads also release their eventq 363 * locks while waiting for us due to the global event semantics. 364 */ 365 { 366 EventQueue::ScopedRelease sr(curEventQueue()); 367 // we do a global sync here that is supposed to happen at the same 368 // tick in all gem5 peers 369 DistIface::sync->run(true); 370 // global sync completed 371 } 372 if (DistIface::sync->doCkpt) 373 exitSimLoop("checkpoint"); 374 if (DistIface::sync->doExit) 375 exitSimLoop("exit request from gem5 peers"); 376 if (DistIface::sync->doStopSync) { 377 DistIface::sync->doStopSync = false; 378 inform("synchronization disabled at %lu\n", curTick()); 379 380 // The switch node needs to wait for the next sync immediately. 381 if (DistIface::isSwitch) { 382 start(); 383 } else { 384 // Wake up thread contexts on non-switch nodes. 385 for (int i = 0; i < DistIface::master->sys->numContexts(); i++) { 386 ThreadContext *tc = 387 DistIface::master->sys->getThreadContext(i); 388 if (tc->status() == ThreadContext::Suspended) 389 tc->activate(); 390 else 391 warn_once("Tried to wake up thread in dist-gem5, but it " 392 "was already awake!\n"); 393 } 394 } 395 return; 396 } 397 // schedule the next periodic sync 398 repeat = DistIface::sync->nextRepeat; 399 schedule(curTick() + repeat); 400} 401 402void 403DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay) 404{ 405 // This is called from the receiver thread when it starts running. The new 406 // receiver thread shares the event queue with the simulation thread 407 // (associated with the simulated Ethernet link). 408 curEventQueue(eventManager->eventQueue()); 409 410 recvDone = recv_done; 411 linkDelay = link_delay; 412} 413 414Tick 415DistIface::RecvScheduler::calcReceiveTick(Tick send_tick, 416 Tick send_delay, 417 Tick prev_recv_tick) 418{ 419 Tick recv_tick = send_tick + send_delay + linkDelay; 420 // sanity check (we need atleast a send delay long window) 421 assert(recv_tick >= prev_recv_tick + send_delay); 422 panic_if(prev_recv_tick + send_delay > recv_tick, 423 "Receive window is smaller than send delay"); 424 panic_if(recv_tick <= curTick(), 425 "Simulators out of sync - missed packet receive by %llu ticks" 426 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu " 427 "linkDelay: %lu )", 428 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay, 429 linkDelay); 430 431 return recv_tick; 432} 433 434void 435DistIface::RecvScheduler::resumeRecvTicks() 436{ 437 // Schedule pending packets asap in case link speed/delay changed when 438 // restoring from the checkpoint. 439 // This may be done during unserialize except that curTick() is unknown 440 // so we call this during drainResume(). 441 // If we are not restoring from a checkppint then link latency could not 442 // change so we just return. 443 if (!ckptRestore) 444 return; 445 446 std::vector<Desc> v; 447 while (!descQueue.empty()) { 448 Desc d = descQueue.front(); 449 descQueue.pop(); 450 d.sendTick = curTick(); 451 d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed 452 v.push_back(d); 453 } 454 455 for (auto &d : v) 456 descQueue.push(d); 457 458 if (recvDone->scheduled()) { 459 assert(!descQueue.empty()); 460 eventManager->reschedule(recvDone, curTick()); 461 } else { 462 assert(descQueue.empty() && v.empty()); 463 } 464 ckptRestore = false; 465} 466 467void 468DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet, 469 Tick send_tick, 470 Tick send_delay) 471{ 472 // Note : this is called from the receiver thread 473 curEventQueue()->lock(); 474 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick); 475 476 DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket " 477 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n", 478 send_tick, send_delay, linkDelay, recv_tick); 479 // Every packet must be sent and arrive in the same quantum 480 assert(send_tick > master->syncEvent->when() - 481 master->syncEvent->repeat); 482 // No packet may be scheduled for receive in the arrival quantum 483 assert(send_tick + send_delay + linkDelay > master->syncEvent->when()); 484 485 // Now we are about to schedule a recvDone event for the new data packet. 486 // We use the same recvDone object for all incoming data packets. Packet 487 // descriptors are saved in the ordered queue. The currently scheduled 488 // packet is always on the top of the queue. 489 // NOTE: we use the event queue lock to protect the receive desc queue, 490 // too, which is accessed both by the receiver thread and the simulation 491 // thread. 492 descQueue.emplace(new_packet, send_tick, send_delay); 493 if (descQueue.size() == 1) { 494 assert(!recvDone->scheduled()); 495 eventManager->schedule(recvDone, recv_tick); 496 } else { 497 assert(recvDone->scheduled()); 498 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick, 499 "Out of order packet received (recv_tick: %lu top(): %lu\n", 500 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay); 501 } 502 curEventQueue()->unlock(); 503} 504 505EthPacketPtr 506DistIface::RecvScheduler::popPacket() 507{ 508 // Note : this is called from the simulation thread when a receive done 509 // event is being processed for the link. We assume that the thread holds 510 // the event queue queue lock when this is called! 511 EthPacketPtr next_packet = descQueue.front().packet; 512 descQueue.pop(); 513 514 if (descQueue.size() > 0) { 515 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick, 516 descQueue.front().sendDelay, 517 curTick()); 518 eventManager->schedule(recvDone, recv_tick); 519 } 520 prevRecvTick = curTick(); 521 return next_packet; 522} 523 524void 525DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const 526{ 527 SERIALIZE_SCALAR(sendTick); 528 SERIALIZE_SCALAR(sendDelay); 529 packet->serialize("rxPacket", cp); 530} 531 532void 533DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp) 534{ 535 UNSERIALIZE_SCALAR(sendTick); 536 UNSERIALIZE_SCALAR(sendDelay); 537 packet = std::make_shared<EthPacketData>(); 538 packet->unserialize("rxPacket", cp); 539} 540 541void 542DistIface::RecvScheduler::serialize(CheckpointOut &cp) const 543{ 544 SERIALIZE_SCALAR(prevRecvTick); 545 // serialize the receive desc queue 546 std::queue<Desc> tmp_queue(descQueue); 547 unsigned n_desc_queue = descQueue.size(); 548 assert(tmp_queue.size() == descQueue.size()); 549 SERIALIZE_SCALAR(n_desc_queue); 550 for (int i = 0; i < n_desc_queue; i++) { 551 tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i)); 552 tmp_queue.pop(); 553 } 554 assert(tmp_queue.empty()); 555} 556 557void 558DistIface::RecvScheduler::unserialize(CheckpointIn &cp) 559{ 560 assert(descQueue.size() == 0); 561 assert(!recvDone->scheduled()); 562 assert(!ckptRestore); 563 564 UNSERIALIZE_SCALAR(prevRecvTick); 565 // unserialize the receive desc queue 566 unsigned n_desc_queue; 567 UNSERIALIZE_SCALAR(n_desc_queue); 568 for (int i = 0; i < n_desc_queue; i++) { 569 Desc recv_desc; 570 recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i)); 571 descQueue.push(recv_desc); 572 } 573 ckptRestore = true; 574} 575 576DistIface::DistIface(unsigned dist_rank, 577 unsigned dist_size, 578 Tick sync_start, 579 Tick sync_repeat, 580 EventManager *em, 581 bool use_pseudo_op, 582 bool is_switch, int num_nodes) : 583 syncStart(sync_start), syncRepeat(sync_repeat), 584 recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op), 585 rank(dist_rank), size(dist_size) 586{ 587 DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank); 588 isMaster = false; 589 if (master == nullptr) { 590 assert(sync == nullptr); 591 assert(syncEvent == nullptr); 592 isSwitch = is_switch; 593 if (is_switch) 594 sync = new SyncSwitch(num_nodes); 595 else 596 sync = new SyncNode(); 597 syncEvent = new SyncEvent(); 598 master = this; 599 isMaster = true; 600 } 601 distIfaceId = distIfaceNum; 602 distIfaceNum++; 603} 604 605DistIface::~DistIface() 606{ 607 assert(recvThread); 608 delete recvThread; 609 if (this == master) { 610 assert(syncEvent); 611 delete syncEvent; 612 assert(sync); 613 delete sync; 614 master = nullptr; 615 } 616} 617 618void 619DistIface::packetOut(EthPacketPtr pkt, Tick send_delay) 620{ 621 Header header; 622 623 // Prepare a dist header packet for the Ethernet packet we want to 624 // send out. 625 header.msgType = MsgType::dataDescriptor; 626 header.sendTick = curTick(); 627 header.sendDelay = send_delay; 628 629 header.dataPacketLength = pkt->length; 630 header.simLength = pkt->simLength; 631 632 // Send out the packet and the meta info. 633 sendPacket(header, pkt); 634 635 DPRINTF(DistEthernetPkt, 636 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n", 637 pkt->length, send_delay); 638} 639 640void 641DistIface::recvThreadFunc(Event *recv_done, Tick link_delay) 642{ 643 EthPacketPtr new_packet; 644 DistHeaderPkt::Header header; 645 646 // Initialize receive scheduler parameters 647 recvScheduler.init(recv_done, link_delay); 648 649 // Main loop to wait for and process any incoming message. 650 for (;;) { 651 // recvHeader() blocks until the next dist header packet comes in. 652 if (!recvHeader(header)) { 653 // We lost connection to the peer gem5 processes most likely 654 // because one of them called m5 exit. So we stop here. 655 // Grab the eventq lock to stop the simulation thread 656 curEventQueue()->lock(); 657 exitSimLoop("Message server closed connection, simulator " 658 "is exiting"); 659 curEventQueue()->unlock(); 660 break; 661 } 662 663 // We got a valid dist header packet, let's process it 664 if (header.msgType == MsgType::dataDescriptor) { 665 recvPacket(header, new_packet); 666 recvScheduler.pushPacket(new_packet, 667 header.sendTick, 668 header.sendDelay); 669 } else { 670 // everything else must be synchronisation related command 671 sync->progress(header.sendTick, 672 header.syncRepeat, 673 header.needCkpt, 674 header.needExit, 675 header.needStopSync); 676 } 677 } 678} 679 680void 681DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay) 682{ 683 assert(recvThread == nullptr); 684 685 recvThread = new std::thread(&DistIface::recvThreadFunc, 686 this, 687 const_cast<Event *>(recv_done), 688 link_delay); 689 recvThreadsNum++; 690} 691 692DrainState 693DistIface::drain() 694{ 695 DPRINTF(DistEthernet,"DistIFace::drain() called\n"); 696 // This can be called multiple times in the same drain cycle. 697 if (this == master) 698 syncEvent->draining(true); 699 return DrainState::Drained; 700} 701 702void 703DistIface::drainResume() { 704 DPRINTF(DistEthernet,"DistIFace::drainResume() called\n"); 705 if (this == master) 706 syncEvent->draining(false); 707 recvScheduler.resumeRecvTicks(); 708} 709 710void 711DistIface::serialize(CheckpointOut &cp) const 712{ 713 // Drain the dist interface before the checkpoint is taken. We cannot call 714 // this as part of the normal drain cycle because this dist sync has to be 715 // called exactly once after the system is fully drained. 716 sync->drainComplete(); 717 718 unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId; 719 720 SERIALIZE_SCALAR(rank_orig); 721 SERIALIZE_SCALAR(dist_iface_id_orig); 722 723 recvScheduler.serializeSection(cp, "recvScheduler"); 724 if (this == master) { 725 sync->serializeSection(cp, "Sync"); 726 } 727} 728 729void 730DistIface::unserialize(CheckpointIn &cp) 731{ 732 unsigned rank_orig, dist_iface_id_orig; 733 UNSERIALIZE_SCALAR(rank_orig); 734 UNSERIALIZE_SCALAR(dist_iface_id_orig); 735 736 panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)", 737 rank, rank_orig); 738 panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch " 739 "at resume (distIfaceId=%d, orig=%d)", distIfaceId, 740 dist_iface_id_orig); 741 742 recvScheduler.unserializeSection(cp, "recvScheduler"); 743 if (this == master) { 744 sync->unserializeSection(cp, "Sync"); 745 } 746} 747 748void 749DistIface::init(const Event *done_event, Tick link_delay) 750{ 751 // Init hook for the underlaying message transport to setup/finalize 752 // communication channels 753 initTransport(); 754 755 // Spawn a new receiver thread that will process messages 756 // coming in from peer gem5 processes. 757 // The receive thread will also schedule a (receive) doneEvent 758 // for each incoming data packet. 759 spawnRecvThread(done_event, link_delay); 760 761 762 // Adjust the periodic sync start and interval. Different DistIface 763 // might have different requirements. The singleton sync object 764 // will select the minimum values for both params. 765 assert(sync != nullptr); 766 sync->init(syncStart, syncRepeat); 767 768 // Initialize the seed for random generator to avoid the same sequence 769 // in all gem5 peer processes 770 assert(master != nullptr); 771 if (this == master) 772 random_mt.init(5489 * (rank+1) + 257); 773} 774 775void 776DistIface::startup() 777{ 778 DPRINTF(DistEthernet, "DistIface::startup() started\n"); 779 // Schedule synchronization unless we are not a switch in pseudo_op mode. 780 if (this == master && (!syncStartOnPseudoOp || isSwitch)) 781 syncEvent->start(); 782 DPRINTF(DistEthernet, "DistIface::startup() done\n"); 783} 784 785bool 786DistIface::readyToCkpt(Tick delay, Tick period) 787{ 788 bool ret = true; 789 DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu " 790 "period:%lu\n", delay, period); 791 if (master) { 792 if (delay == 0) { 793 inform("m5 checkpoint called with zero delay => triggering collaborative " 794 "checkpoint\n"); 795 sync->requestCkpt(ReqType::collective); 796 } else { 797 inform("m5 checkpoint called with non-zero delay => triggering immediate " 798 "checkpoint (at the next sync)\n"); 799 sync->requestCkpt(ReqType::immediate); 800 } 801 if (period != 0) 802 inform("Non-zero period for m5_ckpt is ignored in " 803 "distributed gem5 runs\n"); 804 ret = false; 805 } 806 return ret; 807} 808 809void 810DistIface::SyncNode::requestStopSync(ReqType req) 811{ 812 std::lock_guard<std::mutex> sync_lock(lock); 813 needStopSync = req; 814} 815 816void 817DistIface::toggleSync(ThreadContext *tc) 818{ 819 // Unforunate that we have to populate the system pointer member this way. 820 master->sys = tc->getSystemPtr(); 821 822 // The invariant for both syncing and "unsyncing" is that all threads will 823 // stop executing intructions until the desired sync state has been reached 824 // for all nodes. This is the easiest way to prevent deadlock (in the case 825 // of "unsyncing") and causality errors (in the case of syncing). 826 if (master->syncEvent->scheduled()) { 827 inform("Request toggling syncronization off\n"); 828 master->sync->requestStopSync(ReqType::collective); 829 830 // At this point, we have no clue when everyone will reach the sync 831 // stop point. Suspend execution of all local thread contexts. 832 // Dist-gem5 will reactivate all thread contexts when everyone has 833 // reached the sync stop point. 834 for (int i = 0; i < master->sys->numContexts(); i++) { 835 ThreadContext *tc = master->sys->getThreadContext(i); 836 if (tc->status() == ThreadContext::Active) 837 tc->quiesce(); 838 } 839 } else { 840 inform("Request toggling syncronization on\n"); 841 master->syncEvent->start(); 842 843 // We need to suspend all CPUs until the sync point is reached by all 844 // nodes to prevent causality errors. We can also schedule CPU 845 // activation here, since we know exactly when the next sync will 846 // occur. 847 for (int i = 0; i < master->sys->numContexts(); i++) { 848 ThreadContext *tc = master->sys->getThreadContext(i); 849 if (tc->status() == ThreadContext::Active) 850 tc->quiesceTick(master->syncEvent->when() + 1); 851 } 852 } 853} 854 855bool 856DistIface::readyToExit(Tick delay) 857{ 858 bool ret = true; 859 DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n", 860 delay); 861 if (master) { 862 // To successfully coordinate an exit, all nodes must be synchronising 863 if (!master->syncEvent->scheduled()) 864 master->syncEvent->start(); 865 866 if (delay == 0) { 867 inform("m5 exit called with zero delay => triggering collaborative " 868 "exit\n"); 869 sync->requestExit(ReqType::collective); 870 } else { 871 inform("m5 exit called with non-zero delay => triggering immediate " 872 "exit (at the next sync)\n"); 873 sync->requestExit(ReqType::immediate); 874 } 875 ret = false; 876 } 877 return ret; 878} 879 880uint64_t 881DistIface::rankParam() 882{ 883 uint64_t val; 884 if (master) { 885 val = master->rank; 886 } else { 887 warn("Dist-rank parameter is queried in single gem5 simulation."); 888 val = 0; 889 } 890 return val; 891} 892 893uint64_t 894DistIface::sizeParam() 895{ 896 uint64_t val; 897 if (master) { 898 val = master->size; 899 } else { 900 warn("Dist-size parameter is queried in single gem5 simulation."); 901 val = 1; 902 } 903 return val; 904} 905