dist_iface.cc revision 11701:5e7599457b97
1/* 2 * Copyright (c) 2015 ARM Limited 3 * All rights reserved 4 * 5 * The license below extends only to copyright in the software and shall 6 * not be construed as granting a license to any other intellectual 7 * property including but not limited to intellectual property relating 8 * to a hardware implementation of the functionality of the software 9 * licensed hereunder. You may use the software subject to the license 10 * terms below provided that you ensure that this notice is replicated 11 * unmodified and in its entirety in all distributions of the software, 12 * modified or unmodified, in source code or in binary form. 13 * 14 * Redistribution and use in source and binary forms, with or without 15 * modification, are permitted provided that the following conditions are 16 * met: redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer; 18 * redistributions in binary form must reproduce the above copyright 19 * notice, this list of conditions and the following disclaimer in the 20 * documentation and/or other materials provided with the distribution; 21 * neither the name of the copyright holders nor the names of its 22 * contributors may be used to endorse or promote products derived from 23 * this software without specific prior written permission. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 36 * 37 * Authors: Gabor Dozsa 38 */ 39 40/* @file 41 * The interface class for dist-gem5 simulations. 42 */ 43 44#include "dev/net/dist_iface.hh" 45 46#include <queue> 47#include <thread> 48 49#include "base/random.hh" 50#include "base/trace.hh" 51#include "debug/DistEthernet.hh" 52#include "debug/DistEthernetPkt.hh" 53#include "dev/net/etherpkt.hh" 54#include "sim/sim_exit.hh" 55#include "sim/sim_object.hh" 56 57using namespace std; 58DistIface::Sync *DistIface::sync = nullptr; 59DistIface::SyncEvent *DistIface::syncEvent = nullptr; 60unsigned DistIface::distIfaceNum = 0; 61unsigned DistIface::recvThreadsNum = 0; 62DistIface *DistIface::master = nullptr; 63 64void 65DistIface::Sync::init(Tick start_tick, Tick repeat_tick) 66{ 67 if (start_tick < firstAt) { 68 firstAt = start_tick; 69 inform("Next dist synchronisation tick is changed to %lu.\n", nextAt); 70 } 71 72 if (repeat_tick == 0) 73 panic("Dist synchronisation interval must be greater than zero"); 74 75 if (repeat_tick < nextRepeat) { 76 nextRepeat = repeat_tick; 77 inform("Dist synchronisation interval is changed to %lu.\n", 78 nextRepeat); 79 } 80} 81 82DistIface::SyncSwitch::SyncSwitch(int num_nodes) 83{ 84 numNodes = num_nodes; 85 waitNum = num_nodes; 86 numExitReq = 0; 87 numCkptReq = 0; 88 doExit = false; 89 doCkpt = false; 90 firstAt = std::numeric_limits<Tick>::max(); 91 nextAt = 0; 92 nextRepeat = std::numeric_limits<Tick>::max(); 93} 94 95DistIface::SyncNode::SyncNode() 96{ 97 waitNum = 0; 98 needExit = ReqType::none; 99 needCkpt = ReqType::none; 100 doExit = false; 101 doCkpt = false; 102 firstAt = std::numeric_limits<Tick>::max(); 103 nextAt = 0; 104 nextRepeat = std::numeric_limits<Tick>::max(); 105} 106 107void 108DistIface::SyncNode::run(bool same_tick) 109{ 110 std::unique_lock<std::mutex> sync_lock(lock); 111 Header header; 112 113 assert(waitNum == 0); 114 waitNum = DistIface::recvThreadsNum; 115 // initiate the global synchronisation 116 header.msgType = MsgType::cmdSyncReq; 117 header.sendTick = curTick(); 118 header.syncRepeat = nextRepeat; 119 header.needCkpt = needCkpt; 120 if (needCkpt != ReqType::none) 121 needCkpt = ReqType::pending; 122 header.needExit = needExit; 123 if (needExit != ReqType::none) 124 needExit = ReqType::pending; 125 DistIface::master->sendCmd(header); 126 // now wait until all receiver threads complete the synchronisation 127 auto lf = [this]{ return waitNum == 0; }; 128 cv.wait(sync_lock, lf); 129 // global synchronisation is done 130 assert(!same_tick || (nextAt == curTick())); 131} 132 133 134void 135DistIface::SyncSwitch::run(bool same_tick) 136{ 137 std::unique_lock<std::mutex> sync_lock(lock); 138 Header header; 139 // Wait for the sync requests from the nodes 140 if (waitNum > 0) { 141 auto lf = [this]{ return waitNum == 0; }; 142 cv.wait(sync_lock, lf); 143 } 144 assert(waitNum == 0); 145 assert(!same_tick || (nextAt == curTick())); 146 waitNum = numNodes; 147 // Complete the global synchronisation 148 header.msgType = MsgType::cmdSyncAck; 149 header.sendTick = nextAt; 150 header.syncRepeat = nextRepeat; 151 if (doCkpt || numCkptReq == numNodes) { 152 doCkpt = true; 153 header.needCkpt = ReqType::immediate; 154 numCkptReq = 0; 155 } else { 156 header.needCkpt = ReqType::none; 157 } 158 if (doExit || numExitReq == numNodes) { 159 doExit = true; 160 header.needExit = ReqType::immediate; 161 } else { 162 header.needExit = ReqType::none; 163 } 164 DistIface::master->sendCmd(header); 165} 166 167void 168DistIface::SyncSwitch::progress(Tick send_tick, 169 Tick sync_repeat, 170 ReqType need_ckpt, 171 ReqType need_exit) 172{ 173 std::unique_lock<std::mutex> sync_lock(lock); 174 assert(waitNum > 0); 175 176 if (send_tick > nextAt) 177 nextAt = send_tick; 178 if (nextRepeat > sync_repeat) 179 nextRepeat = sync_repeat; 180 181 if (need_ckpt == ReqType::collective) 182 numCkptReq++; 183 else if (need_ckpt == ReqType::immediate) 184 doCkpt = true; 185 if (need_exit == ReqType::collective) 186 numExitReq++; 187 else if (need_exit == ReqType::immediate) 188 doExit = true; 189 190 waitNum--; 191 // Notify the simulation thread if the on-going sync is complete 192 if (waitNum == 0) { 193 sync_lock.unlock(); 194 cv.notify_one(); 195 } 196} 197 198void 199DistIface::SyncNode::progress(Tick max_send_tick, 200 Tick next_repeat, 201 ReqType do_ckpt, 202 ReqType do_exit) 203{ 204 std::unique_lock<std::mutex> sync_lock(lock); 205 assert(waitNum > 0); 206 207 nextAt = max_send_tick; 208 nextRepeat = next_repeat; 209 doCkpt = (do_ckpt != ReqType::none); 210 doExit = (do_exit != ReqType::none); 211 212 waitNum--; 213 // Notify the simulation thread if the on-going sync is complete 214 if (waitNum == 0) { 215 sync_lock.unlock(); 216 cv.notify_one(); 217 } 218} 219 220void 221DistIface::SyncNode::requestCkpt(ReqType req) 222{ 223 std::lock_guard<std::mutex> sync_lock(lock); 224 assert(req != ReqType::none); 225 if (needCkpt != ReqType::none) 226 warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req)); 227 if (needCkpt == ReqType::none || req == ReqType::immediate) 228 needCkpt = req; 229} 230 231void 232DistIface::SyncNode::requestExit(ReqType req) 233{ 234 std::lock_guard<std::mutex> sync_lock(lock); 235 assert(req != ReqType::none); 236 if (needExit != ReqType::none) 237 warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req)); 238 if (needExit == ReqType::none || req == ReqType::immediate) 239 needExit = req; 240} 241 242void 243DistIface::Sync::drainComplete() 244{ 245 if (doCkpt) { 246 // The first DistIface object called this right before writing the 247 // checkpoint. We need to drain the underlying physical network here. 248 // Note that other gem5 peers may enter this barrier at different 249 // ticks due to draining. 250 run(false); 251 // Only the "first" DistIface object has to perform the sync 252 doCkpt = false; 253 } 254} 255 256void 257DistIface::SyncNode::serialize(CheckpointOut &cp) const 258{ 259 int need_exit = static_cast<int>(needExit); 260 SERIALIZE_SCALAR(need_exit); 261} 262 263void 264DistIface::SyncNode::unserialize(CheckpointIn &cp) 265{ 266 int need_exit; 267 UNSERIALIZE_SCALAR(need_exit); 268 needExit = static_cast<ReqType>(need_exit); 269} 270 271void 272DistIface::SyncSwitch::serialize(CheckpointOut &cp) const 273{ 274 SERIALIZE_SCALAR(numExitReq); 275} 276 277void 278DistIface::SyncSwitch::unserialize(CheckpointIn &cp) 279{ 280 UNSERIALIZE_SCALAR(numExitReq); 281} 282 283void 284DistIface::SyncEvent::start() 285{ 286 // Note that this may be called either from startup() or drainResume() 287 288 // At this point, all DistIface objects has already called Sync::init() so 289 // we have a local minimum of the start tick and repeat for the periodic 290 // sync. 291 Tick firstAt = DistIface::sync->firstAt; 292 repeat = DistIface::sync->nextRepeat; 293 // Do a global barrier to agree on a common repeat value (the smallest 294 // one from all participating nodes 295 DistIface::sync->run(curTick() == 0); 296 297 assert(!DistIface::sync->doCkpt); 298 assert(!DistIface::sync->doExit); 299 assert(DistIface::sync->nextAt >= curTick()); 300 assert(DistIface::sync->nextRepeat <= repeat); 301 302 // if this is called at tick 0 then we use the config start param otherwise 303 // the maximum of the current tick of all participating nodes 304 if (curTick() == 0) { 305 assert(!scheduled()); 306 assert(DistIface::sync->nextAt == 0); 307 schedule(firstAt); 308 } else { 309 if (scheduled()) 310 reschedule(DistIface::sync->nextAt); 311 else 312 schedule(DistIface::sync->nextAt); 313 } 314 inform("Dist sync scheduled at %lu and repeats %lu\n", when(), 315 DistIface::sync->nextRepeat); 316} 317 318void 319DistIface::SyncEvent::process() 320{ 321 // We may not start a global periodic sync while draining before taking a 322 // checkpoint. This is due to the possibility that peer gem5 processes 323 // may not hit the same periodic sync before they complete draining and 324 // that would make this periodic sync clash with sync called from 325 // DistIface::serialize() by other gem5 processes. 326 // We would need a 'distributed drain' solution to eliminate this 327 // restriction. 328 // Note that if draining was not triggered by checkpointing then we are 329 // fine since no extra global sync will happen (i.e. all peer gem5 will 330 // hit this periodic sync eventually). 331 panic_if(_draining && DistIface::sync->doCkpt, 332 "Distributed sync is hit while draining"); 333 /* 334 * Note that this is a global event so this process method will be called 335 * by only exactly one thread. 336 */ 337 /* 338 * We hold the eventq lock at this point but the receiver thread may 339 * need the lock to schedule new recv events while waiting for the 340 * dist sync to complete. 341 * Note that the other simulation threads also release their eventq 342 * locks while waiting for us due to the global event semantics. 343 */ 344 { 345 EventQueue::ScopedRelease sr(curEventQueue()); 346 // we do a global sync here that is supposed to happen at the same 347 // tick in all gem5 peers 348 DistIface::sync->run(true); 349 // global sync completed 350 } 351 if (DistIface::sync->doCkpt) 352 exitSimLoop("checkpoint"); 353 if (DistIface::sync->doExit) 354 exitSimLoop("exit request from gem5 peers"); 355 356 // schedule the next periodic sync 357 repeat = DistIface::sync->nextRepeat; 358 schedule(curTick() + repeat); 359} 360 361void 362DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay) 363{ 364 // This is called from the receiver thread when it starts running. The new 365 // receiver thread shares the event queue with the simulation thread 366 // (associated with the simulated Ethernet link). 367 curEventQueue(eventManager->eventQueue()); 368 369 recvDone = recv_done; 370 linkDelay = link_delay; 371} 372 373Tick 374DistIface::RecvScheduler::calcReceiveTick(Tick send_tick, 375 Tick send_delay, 376 Tick prev_recv_tick) 377{ 378 Tick recv_tick = send_tick + send_delay + linkDelay; 379 // sanity check (we need atleast a send delay long window) 380 assert(recv_tick >= prev_recv_tick + send_delay); 381 panic_if(prev_recv_tick + send_delay > recv_tick, 382 "Receive window is smaller than send delay"); 383 panic_if(recv_tick <= curTick(), 384 "Simulators out of sync - missed packet receive by %llu ticks" 385 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu " 386 "linkDelay: %lu )", 387 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay, 388 linkDelay); 389 390 return recv_tick; 391} 392 393void 394DistIface::RecvScheduler::resumeRecvTicks() 395{ 396 // Schedule pending packets asap in case link speed/delay changed when 397 // restoring from the checkpoint. 398 // This may be done during unserialize except that curTick() is unknown 399 // so we call this during drainResume(). 400 // If we are not restoring from a checkppint then link latency could not 401 // change so we just return. 402 if (!ckptRestore) 403 return; 404 405 std::vector<Desc> v; 406 while (!descQueue.empty()) { 407 Desc d = descQueue.front(); 408 descQueue.pop(); 409 d.sendTick = curTick(); 410 d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed 411 v.push_back(d); 412 } 413 414 for (auto &d : v) 415 descQueue.push(d); 416 417 if (recvDone->scheduled()) { 418 assert(!descQueue.empty()); 419 eventManager->reschedule(recvDone, curTick()); 420 } else { 421 assert(descQueue.empty() && v.empty()); 422 } 423 ckptRestore = false; 424} 425 426void 427DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet, 428 Tick send_tick, 429 Tick send_delay) 430{ 431 // Note : this is called from the receiver thread 432 curEventQueue()->lock(); 433 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick); 434 435 DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket " 436 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n", 437 send_tick, send_delay, linkDelay, recv_tick); 438 // Every packet must be sent and arrive in the same quantum 439 assert(send_tick > master->syncEvent->when() - 440 master->syncEvent->repeat); 441 // No packet may be scheduled for receive in the arrival quantum 442 assert(send_tick + send_delay + linkDelay > master->syncEvent->when()); 443 444 // Now we are about to schedule a recvDone event for the new data packet. 445 // We use the same recvDone object for all incoming data packets. Packet 446 // descriptors are saved in the ordered queue. The currently scheduled 447 // packet is always on the top of the queue. 448 // NOTE: we use the event queue lock to protect the receive desc queue, 449 // too, which is accessed both by the receiver thread and the simulation 450 // thread. 451 descQueue.emplace(new_packet, send_tick, send_delay); 452 if (descQueue.size() == 1) { 453 assert(!recvDone->scheduled()); 454 eventManager->schedule(recvDone, recv_tick); 455 } else { 456 assert(recvDone->scheduled()); 457 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick, 458 "Out of order packet received (recv_tick: %lu top(): %lu\n", 459 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay); 460 } 461 curEventQueue()->unlock(); 462} 463 464EthPacketPtr 465DistIface::RecvScheduler::popPacket() 466{ 467 // Note : this is called from the simulation thread when a receive done 468 // event is being processed for the link. We assume that the thread holds 469 // the event queue queue lock when this is called! 470 EthPacketPtr next_packet = descQueue.front().packet; 471 descQueue.pop(); 472 473 if (descQueue.size() > 0) { 474 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick, 475 descQueue.front().sendDelay, 476 curTick()); 477 eventManager->schedule(recvDone, recv_tick); 478 } 479 prevRecvTick = curTick(); 480 return next_packet; 481} 482 483void 484DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const 485{ 486 SERIALIZE_SCALAR(sendTick); 487 SERIALIZE_SCALAR(sendDelay); 488 packet->serialize("rxPacket", cp); 489} 490 491void 492DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp) 493{ 494 UNSERIALIZE_SCALAR(sendTick); 495 UNSERIALIZE_SCALAR(sendDelay); 496 packet = std::make_shared<EthPacketData>(); 497 packet->unserialize("rxPacket", cp); 498} 499 500void 501DistIface::RecvScheduler::serialize(CheckpointOut &cp) const 502{ 503 SERIALIZE_SCALAR(prevRecvTick); 504 // serialize the receive desc queue 505 std::queue<Desc> tmp_queue(descQueue); 506 unsigned n_desc_queue = descQueue.size(); 507 assert(tmp_queue.size() == descQueue.size()); 508 SERIALIZE_SCALAR(n_desc_queue); 509 for (int i = 0; i < n_desc_queue; i++) { 510 tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i)); 511 tmp_queue.pop(); 512 } 513 assert(tmp_queue.empty()); 514} 515 516void 517DistIface::RecvScheduler::unserialize(CheckpointIn &cp) 518{ 519 assert(descQueue.size() == 0); 520 assert(!recvDone->scheduled()); 521 assert(!ckptRestore); 522 523 UNSERIALIZE_SCALAR(prevRecvTick); 524 // unserialize the receive desc queue 525 unsigned n_desc_queue; 526 UNSERIALIZE_SCALAR(n_desc_queue); 527 for (int i = 0; i < n_desc_queue; i++) { 528 Desc recv_desc; 529 recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i)); 530 descQueue.push(recv_desc); 531 } 532 ckptRestore = true; 533} 534 535DistIface::DistIface(unsigned dist_rank, 536 unsigned dist_size, 537 Tick sync_start, 538 Tick sync_repeat, 539 EventManager *em, 540 bool is_switch, int num_nodes) : 541 syncStart(sync_start), syncRepeat(sync_repeat), 542 recvThread(nullptr), recvScheduler(em), 543 rank(dist_rank), size(dist_size) 544{ 545 DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank); 546 isMaster = false; 547 if (master == nullptr) { 548 assert(sync == nullptr); 549 assert(syncEvent == nullptr); 550 if (is_switch) 551 sync = new SyncSwitch(num_nodes); 552 else 553 sync = new SyncNode(); 554 syncEvent = new SyncEvent(); 555 master = this; 556 isMaster = true; 557 } 558 distIfaceId = distIfaceNum; 559 distIfaceNum++; 560} 561 562DistIface::~DistIface() 563{ 564 assert(recvThread); 565 delete recvThread; 566 if (this == master) { 567 assert(syncEvent); 568 delete syncEvent; 569 assert(sync); 570 delete sync; 571 master = nullptr; 572 } 573} 574 575void 576DistIface::packetOut(EthPacketPtr pkt, Tick send_delay) 577{ 578 Header header; 579 580 // Prepare a dist header packet for the Ethernet packet we want to 581 // send out. 582 header.msgType = MsgType::dataDescriptor; 583 header.sendTick = curTick(); 584 header.sendDelay = send_delay; 585 586 header.dataPacketLength = pkt->length; 587 header.simLength = pkt->simLength; 588 589 // Send out the packet and the meta info. 590 sendPacket(header, pkt); 591 592 DPRINTF(DistEthernetPkt, 593 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n", 594 pkt->length, send_delay); 595} 596 597void 598DistIface::recvThreadFunc(Event *recv_done, Tick link_delay) 599{ 600 EthPacketPtr new_packet; 601 DistHeaderPkt::Header header; 602 603 // Initialize receive scheduler parameters 604 recvScheduler.init(recv_done, link_delay); 605 606 // Main loop to wait for and process any incoming message. 607 for (;;) { 608 // recvHeader() blocks until the next dist header packet comes in. 609 if (!recvHeader(header)) { 610 // We lost connection to the peer gem5 processes most likely 611 // because one of them called m5 exit. So we stop here. 612 // Grab the eventq lock to stop the simulation thread 613 curEventQueue()->lock(); 614 exitSimLoop("Message server closed connection, simulator " 615 "is exiting"); 616 curEventQueue()->unlock(); 617 break; 618 } 619 620 // We got a valid dist header packet, let's process it 621 if (header.msgType == MsgType::dataDescriptor) { 622 recvPacket(header, new_packet); 623 recvScheduler.pushPacket(new_packet, 624 header.sendTick, 625 header.sendDelay); 626 } else { 627 // everything else must be synchronisation related command 628 sync->progress(header.sendTick, 629 header.syncRepeat, 630 header.needCkpt, 631 header.needExit); 632 } 633 } 634} 635 636void 637DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay) 638{ 639 assert(recvThread == nullptr); 640 641 recvThread = new std::thread(&DistIface::recvThreadFunc, 642 this, 643 const_cast<Event *>(recv_done), 644 link_delay); 645 recvThreadsNum++; 646} 647 648DrainState 649DistIface::drain() 650{ 651 DPRINTF(DistEthernet,"DistIFace::drain() called\n"); 652 // This can be called multiple times in the same drain cycle. 653 if (this == master) 654 syncEvent->draining(true); 655 return DrainState::Drained; 656} 657 658void 659DistIface::drainResume() { 660 DPRINTF(DistEthernet,"DistIFace::drainResume() called\n"); 661 if (this == master) 662 syncEvent->draining(false); 663 recvScheduler.resumeRecvTicks(); 664} 665 666void 667DistIface::serialize(CheckpointOut &cp) const 668{ 669 // Drain the dist interface before the checkpoint is taken. We cannot call 670 // this as part of the normal drain cycle because this dist sync has to be 671 // called exactly once after the system is fully drained. 672 sync->drainComplete(); 673 674 unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId; 675 676 SERIALIZE_SCALAR(rank_orig); 677 SERIALIZE_SCALAR(dist_iface_id_orig); 678 679 recvScheduler.serializeSection(cp, "recvScheduler"); 680 if (this == master) { 681 sync->serializeSection(cp, "Sync"); 682 } 683} 684 685void 686DistIface::unserialize(CheckpointIn &cp) 687{ 688 unsigned rank_orig, dist_iface_id_orig; 689 UNSERIALIZE_SCALAR(rank_orig); 690 UNSERIALIZE_SCALAR(dist_iface_id_orig); 691 692 panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)", 693 rank, rank_orig); 694 panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch " 695 "at resume (distIfaceId=%d, orig=%d)", distIfaceId, 696 dist_iface_id_orig); 697 698 recvScheduler.unserializeSection(cp, "recvScheduler"); 699 if (this == master) { 700 sync->unserializeSection(cp, "Sync"); 701 } 702} 703 704void 705DistIface::init(const Event *done_event, Tick link_delay) 706{ 707 // Init hook for the underlaying message transport to setup/finalize 708 // communication channels 709 initTransport(); 710 711 // Spawn a new receiver thread that will process messages 712 // coming in from peer gem5 processes. 713 // The receive thread will also schedule a (receive) doneEvent 714 // for each incoming data packet. 715 spawnRecvThread(done_event, link_delay); 716 717 718 // Adjust the periodic sync start and interval. Different DistIface 719 // might have different requirements. The singleton sync object 720 // will select the minimum values for both params. 721 assert(sync != nullptr); 722 sync->init(syncStart, syncRepeat); 723 724 // Initialize the seed for random generator to avoid the same sequence 725 // in all gem5 peer processes 726 assert(master != nullptr); 727 if (this == master) 728 random_mt.init(5489 * (rank+1) + 257); 729} 730 731void 732DistIface::startup() 733{ 734 DPRINTF(DistEthernet, "DistIface::startup() started\n"); 735 if (this == master) 736 syncEvent->start(); 737 DPRINTF(DistEthernet, "DistIface::startup() done\n"); 738} 739 740bool 741DistIface::readyToCkpt(Tick delay, Tick period) 742{ 743 bool ret = true; 744 DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu " 745 "period:%lu\n", delay, period); 746 if (master) { 747 if (delay == 0) { 748 inform("m5 checkpoint called with zero delay => triggering collaborative " 749 "checkpoint\n"); 750 sync->requestCkpt(ReqType::collective); 751 } else { 752 inform("m5 checkpoint called with non-zero delay => triggering immediate " 753 "checkpoint (at the next sync)\n"); 754 sync->requestCkpt(ReqType::immediate); 755 } 756 if (period != 0) 757 inform("Non-zero period for m5_ckpt is ignored in " 758 "distributed gem5 runs\n"); 759 ret = false; 760 } 761 return ret; 762} 763 764bool 765DistIface::readyToExit(Tick delay) 766{ 767 bool ret = true; 768 DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n", 769 delay); 770 if (master) { 771 if (delay == 0) { 772 inform("m5 exit called with zero delay => triggering collaborative " 773 "exit\n"); 774 sync->requestExit(ReqType::collective); 775 } else { 776 inform("m5 exit called with non-zero delay => triggering immediate " 777 "exit (at the next sync)\n"); 778 sync->requestExit(ReqType::immediate); 779 } 780 ret = false; 781 } 782 return ret; 783} 784 785uint64_t 786DistIface::rankParam() 787{ 788 uint64_t val; 789 if (master) { 790 val = master->rank; 791 } else { 792 warn("Dist-rank parameter is queried in single gem5 simulation."); 793 val = 0; 794 } 795 return val; 796} 797 798uint64_t 799DistIface::sizeParam() 800{ 801 uint64_t val; 802 if (master) { 803 val = master->size; 804 } else { 805 warn("Dist-size parameter is queried in single gem5 simulation."); 806 val = 1; 807 } 808 return val; 809} 810