dist_iface.cc revision 11622:0b2aaf6f5c78
16145Snate@binkert.org/* 26145Snate@binkert.org * Copyright (c) 2015 ARM Limited 36145Snate@binkert.org * All rights reserved 46145Snate@binkert.org * 56145Snate@binkert.org * The license below extends only to copyright in the software and shall 66145Snate@binkert.org * not be construed as granting a license to any other intellectual 76145Snate@binkert.org * property including but not limited to intellectual property relating 86145Snate@binkert.org * to a hardware implementation of the functionality of the software 96145Snate@binkert.org * licensed hereunder. You may use the software subject to the license 106145Snate@binkert.org * terms below provided that you ensure that this notice is replicated 116145Snate@binkert.org * unmodified and in its entirety in all distributions of the software, 126145Snate@binkert.org * modified or unmodified, in source code or in binary form. 136145Snate@binkert.org * 146145Snate@binkert.org * Redistribution and use in source and binary forms, with or without 156145Snate@binkert.org * modification, are permitted provided that the following conditions are 166145Snate@binkert.org * met: redistributions of source code must retain the above copyright 176145Snate@binkert.org * notice, this list of conditions and the following disclaimer; 186145Snate@binkert.org * redistributions in binary form must reproduce the above copyright 196145Snate@binkert.org * notice, this list of conditions and the following disclaimer in the 206145Snate@binkert.org * documentation and/or other materials provided with the distribution; 216145Snate@binkert.org * neither the name of the copyright holders nor the names of its 226145Snate@binkert.org * contributors may be used to endorse or promote products derived from 236145Snate@binkert.org * this software without specific prior written permission. 246145Snate@binkert.org * 256145Snate@binkert.org * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 266145Snate@binkert.org * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 276145Snate@binkert.org * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 286145Snate@binkert.org * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 297039Snate@binkert.org * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 307039Snate@binkert.org * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 316145Snate@binkert.org * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 327055Snate@binkert.org * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 337055Snate@binkert.org * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 347455Snate@binkert.org * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 358229Snate@binkert.org * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 368165Snilay@cs.wisc.edu * 379104Shestness@cs.utexas.edu * Authors: Gabor Dozsa 387039Snate@binkert.org */ 399171Snilay@cs.wisc.edu 406285Snate@binkert.org/* @file 419171Snilay@cs.wisc.edu * The interface class for dist-gem5 simulations. 426145Snate@binkert.org */ 436145Snate@binkert.org 446145Snate@binkert.org#include "dev/net/dist_iface.hh" 457039Snate@binkert.org 467039Snate@binkert.org#include <queue> 478615Snilay@cs.wisc.edu#include <thread> 488615Snilay@cs.wisc.edu 499501Snilay@cs.wisc.edu#include "base/random.hh" 506285Snate@binkert.org#include "base/trace.hh" 519501Snilay@cs.wisc.edu#include "debug/DistEthernet.hh" 529501Snilay@cs.wisc.edu#include "debug/DistEthernetPkt.hh" 538615Snilay@cs.wisc.edu#include "dev/net/etherpkt.hh" 547039Snate@binkert.org#include "sim/sim_exit.hh" 556285Snate@binkert.org#include "sim/sim_object.hh" 566285Snate@binkert.org 576763SBrad.Beckmann@amd.comusing namespace std; 586763SBrad.Beckmann@amd.comDistIface::Sync *DistIface::sync = nullptr; 599171Snilay@cs.wisc.eduDistIface::SyncEvent *DistIface::syncEvent = nullptr; 607039Snate@binkert.orgunsigned DistIface::distIfaceNum = 0; 617039Snate@binkert.orgunsigned DistIface::recvThreadsNum = 0; 626876Ssteve.reinhardt@amd.comDistIface *DistIface::master = nullptr; 637039Snate@binkert.org 647039Snate@binkert.orgvoid 656145Snate@binkert.orgDistIface::Sync::init(Tick start_tick, Tick repeat_tick) 667039Snate@binkert.org{ 677039Snate@binkert.org if (start_tick < firstAt) { 686145Snate@binkert.org firstAt = start_tick; 697055Snate@binkert.org inform("Next dist synchronisation tick is changed to %lu.\n", nextAt); 706145Snate@binkert.org } 717039Snate@binkert.org 727546SBrad.Beckmann@amd.com if (repeat_tick == 0) 737546SBrad.Beckmann@amd.com panic("Dist synchronisation interval must be greater than zero"); 747546SBrad.Beckmann@amd.com 757546SBrad.Beckmann@amd.com if (repeat_tick < nextRepeat) { 767546SBrad.Beckmann@amd.com nextRepeat = repeat_tick; 777565SBrad.Beckmann@amd.com inform("Dist synchronisation interval is changed to %lu.\n", 787565SBrad.Beckmann@amd.com nextRepeat); 797565SBrad.Beckmann@amd.com } 807565SBrad.Beckmann@amd.com} 817565SBrad.Beckmann@amd.com 827565SBrad.Beckmann@amd.comDistIface::SyncSwitch::SyncSwitch(int num_nodes) 837565SBrad.Beckmann@amd.com{ 847039Snate@binkert.org numNodes = num_nodes; 856145Snate@binkert.org waitNum = num_nodes; 867546SBrad.Beckmann@amd.com numExitReq = 0; 877546SBrad.Beckmann@amd.com numCkptReq = 0; 887546SBrad.Beckmann@amd.com doExit = false; 897546SBrad.Beckmann@amd.com doCkpt = false; 907565SBrad.Beckmann@amd.com firstAt = std::numeric_limits<Tick>::max(); 917565SBrad.Beckmann@amd.com nextAt = 0; 927565SBrad.Beckmann@amd.com nextRepeat = std::numeric_limits<Tick>::max(); 937565SBrad.Beckmann@amd.com} 947565SBrad.Beckmann@amd.com 957565SBrad.Beckmann@amd.comDistIface::SyncNode::SyncNode() 967565SBrad.Beckmann@amd.com{ 978615Snilay@cs.wisc.edu waitNum = 0; 987039Snate@binkert.org needExit = ReqType::none; 998688Snilay@cs.wisc.edu needCkpt = ReqType::none; 1008688Snilay@cs.wisc.edu doExit = false; 1018688Snilay@cs.wisc.edu doCkpt = false; 1028688Snilay@cs.wisc.edu firstAt = std::numeric_limits<Tick>::max(); 1038688Snilay@cs.wisc.edu nextAt = 0; 1048688Snilay@cs.wisc.edu nextRepeat = std::numeric_limits<Tick>::max(); 1058688Snilay@cs.wisc.edu} 1068688Snilay@cs.wisc.edu 1078688Snilay@cs.wisc.eduvoid 1088688Snilay@cs.wisc.eduDistIface::SyncNode::run(bool same_tick) 1098688Snilay@cs.wisc.edu{ 1108688Snilay@cs.wisc.edu std::unique_lock<std::mutex> sync_lock(lock); 1116145Snate@binkert.org Header header; 1127055Snate@binkert.org 1137055Snate@binkert.org assert(waitNum == 0); 1147039Snate@binkert.org waitNum = DistIface::recvThreadsNum; 1156145Snate@binkert.org // initiate the global synchronisation 1167455Snate@binkert.org header.msgType = MsgType::cmdSyncReq; 1177039Snate@binkert.org header.sendTick = curTick(); 1188717Snilay@cs.wisc.edu header.syncRepeat = nextRepeat; 1196145Snate@binkert.org header.needCkpt = needCkpt; 1209104Shestness@cs.utexas.edu if (needCkpt != ReqType::none) 1219104Shestness@cs.utexas.edu needCkpt = ReqType::pending; 1227039Snate@binkert.org header.needExit = needExit; 1238615Snilay@cs.wisc.edu if (needExit != ReqType::none) 1246145Snate@binkert.org needExit = ReqType::pending; 1257546SBrad.Beckmann@amd.com DistIface::master->sendCmd(header); 1267546SBrad.Beckmann@amd.com // now wait until all receiver threads complete the synchronisation 1277560SBrad.Beckmann@amd.com auto lf = [this]{ return waitNum == 0; }; 1287565SBrad.Beckmann@amd.com cv.wait(sync_lock, lf); 1297565SBrad.Beckmann@amd.com // global synchronisation is done 1307565SBrad.Beckmann@amd.com assert(!same_tick || (nextAt == curTick())); 1317565SBrad.Beckmann@amd.com} 1327546SBrad.Beckmann@amd.com 1338615Snilay@cs.wisc.edu 1346285Snate@binkert.orgvoid 1357560SBrad.Beckmann@amd.comDistIface::SyncSwitch::run(bool same_tick) 1366145Snate@binkert.org{ 1377039Snate@binkert.org std::unique_lock<std::mutex> sync_lock(lock); 1387039Snate@binkert.org Header header; 1397039Snate@binkert.org // Wait for the sync requests from the nodes 1406145Snate@binkert.org if (waitNum > 0) { 1417039Snate@binkert.org auto lf = [this]{ return waitNum == 0; }; 1427039Snate@binkert.org cv.wait(sync_lock, lf); 1439184Sandreas.hansson@arm.com } 1446145Snate@binkert.org assert(waitNum == 0); 1457039Snate@binkert.org assert(!same_tick || (nextAt == curTick())); 1467039Snate@binkert.org waitNum = numNodes; 1476285Snate@binkert.org // Complete the global synchronisation 1487455Snate@binkert.org header.msgType = MsgType::cmdSyncAck; 1497455Snate@binkert.org header.sendTick = nextAt; 1507455Snate@binkert.org header.syncRepeat = nextRepeat; 1517039Snate@binkert.org if (doCkpt || numCkptReq == numNodes) { 1527039Snate@binkert.org doCkpt = true; 1537039Snate@binkert.org header.needCkpt = ReqType::immediate; 1546145Snate@binkert.org numCkptReq = 0; 1557039Snate@binkert.org } else { 1567039Snate@binkert.org header.needCkpt = ReqType::none; 1577039Snate@binkert.org } 1587039Snate@binkert.org if (doExit || numExitReq == numNodes) { 1596859Sdrh5@cs.wisc.edu doExit = true; 1608171Stushar@csail.mit.edu header.needExit = ReqType::immediate; 1618171Stushar@csail.mit.edu } else { 1627039Snate@binkert.org header.needExit = ReqType::none; 1637039Snate@binkert.org } 1647039Snate@binkert.org DistIface::master->sendCmd(header); 1657039Snate@binkert.org} 1666899SBrad.Beckmann@amd.com 1677039Snate@binkert.orgvoid 1687039Snate@binkert.orgDistIface::SyncSwitch::progress(Tick send_tick, 1697039Snate@binkert.org Tick sync_repeat, 1707039Snate@binkert.org ReqType need_ckpt, 1717039Snate@binkert.org ReqType need_exit) 1726886SBrad.Beckmann@amd.com{ 1737039Snate@binkert.org std::unique_lock<std::mutex> sync_lock(lock); 1746145Snate@binkert.org assert(waitNum > 0); 1756145Snate@binkert.org 1767055Snate@binkert.org if (send_tick > nextAt) 1777055Snate@binkert.org nextAt = send_tick; 1786145Snate@binkert.org if (nextRepeat > sync_repeat) 1797039Snate@binkert.org nextRepeat = sync_repeat; 1807055Snate@binkert.org 1817039Snate@binkert.org if (need_ckpt == ReqType::collective) 1826145Snate@binkert.org numCkptReq++; 1836145Snate@binkert.org else if (need_ckpt == ReqType::immediate) 1847039Snate@binkert.org doCkpt = true; 185 if (need_exit == ReqType::collective) 186 numExitReq++; 187 else if (need_exit == ReqType::immediate) 188 doExit = true; 189 190 waitNum--; 191 // Notify the simulation thread if the on-going sync is complete 192 if (waitNum == 0) { 193 sync_lock.unlock(); 194 cv.notify_one(); 195 } 196} 197 198void 199DistIface::SyncNode::progress(Tick max_send_tick, 200 Tick next_repeat, 201 ReqType do_ckpt, 202 ReqType do_exit) 203{ 204 std::unique_lock<std::mutex> sync_lock(lock); 205 assert(waitNum > 0); 206 207 nextAt = max_send_tick; 208 nextRepeat = next_repeat; 209 doCkpt = (do_ckpt != ReqType::none); 210 doExit = (do_exit != ReqType::none); 211 212 waitNum--; 213 // Notify the simulation thread if the on-going sync is complete 214 if (waitNum == 0) { 215 sync_lock.unlock(); 216 cv.notify_one(); 217 } 218} 219 220void 221DistIface::SyncNode::requestCkpt(ReqType req) 222{ 223 std::lock_guard<std::mutex> sync_lock(lock); 224 assert(req != ReqType::none); 225 if (needCkpt != ReqType::none) 226 warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req)); 227 if (needCkpt == ReqType::none || req == ReqType::immediate) 228 needCkpt = req; 229} 230 231void 232DistIface::SyncNode::requestExit(ReqType req) 233{ 234 std::lock_guard<std::mutex> sync_lock(lock); 235 assert(req != ReqType::none); 236 if (needExit != ReqType::none) 237 warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req)); 238 if (needExit == ReqType::none || req == ReqType::immediate) 239 needExit = req; 240} 241 242void 243DistIface::Sync::drainComplete() 244{ 245 if (doCkpt) { 246 // The first DistIface object called this right before writing the 247 // checkpoint. We need to drain the underlying physical network here. 248 // Note that other gem5 peers may enter this barrier at different 249 // ticks due to draining. 250 run(false); 251 // Only the "first" DistIface object has to perform the sync 252 doCkpt = false; 253 } 254} 255 256void 257DistIface::SyncNode::serialize(CheckpointOut &cp) const 258{ 259 int need_exit = static_cast<int>(needExit); 260 SERIALIZE_SCALAR(need_exit); 261} 262 263void 264DistIface::SyncNode::unserialize(CheckpointIn &cp) 265{ 266 int need_exit; 267 UNSERIALIZE_SCALAR(need_exit); 268 needExit = static_cast<ReqType>(need_exit); 269} 270 271void 272DistIface::SyncSwitch::serialize(CheckpointOut &cp) const 273{ 274 SERIALIZE_SCALAR(numExitReq); 275} 276 277void 278DistIface::SyncSwitch::unserialize(CheckpointIn &cp) 279{ 280 UNSERIALIZE_SCALAR(numExitReq); 281} 282 283void 284DistIface::SyncEvent::start() 285{ 286 // Note that this may be called either from startup() or drainResume() 287 288 // At this point, all DistIface objects has already called Sync::init() so 289 // we have a local minimum of the start tick and repeat for the periodic 290 // sync. 291 Tick firstAt = DistIface::sync->firstAt; 292 repeat = DistIface::sync->nextRepeat; 293 // Do a global barrier to agree on a common repeat value (the smallest 294 // one from all participating nodes 295 DistIface::sync->run(curTick() == 0); 296 297 assert(!DistIface::sync->doCkpt); 298 assert(!DistIface::sync->doExit); 299 assert(DistIface::sync->nextAt >= curTick()); 300 assert(DistIface::sync->nextRepeat <= repeat); 301 302 // if this is called at tick 0 then we use the config start param otherwise 303 // the maximum of the current tick of all participating nodes 304 if (curTick() == 0) { 305 assert(!scheduled()); 306 assert(DistIface::sync->nextAt == 0); 307 schedule(firstAt); 308 } else { 309 if (scheduled()) 310 reschedule(DistIface::sync->nextAt); 311 else 312 schedule(DistIface::sync->nextAt); 313 } 314 inform("Dist sync scheduled at %lu and repeats %lu\n", when(), 315 DistIface::sync->nextRepeat); 316} 317 318void 319DistIface::SyncEvent::process() 320{ 321 // We may not start a global periodic sync while draining before taking a 322 // checkpoint. This is due to the possibility that peer gem5 processes 323 // may not hit the same periodic sync before they complete draining and 324 // that would make this periodic sync clash with sync called from 325 // DistIface::serialize() by other gem5 processes. 326 // We would need a 'distributed drain' solution to eliminate this 327 // restriction. 328 // Note that if draining was not triggered by checkpointing then we are 329 // fine since no extra global sync will happen (i.e. all peer gem5 will 330 // hit this periodic sync eventually). 331 panic_if(_draining && DistIface::sync->doCkpt, 332 "Distributed sync is hit while draining"); 333 /* 334 * Note that this is a global event so this process method will be called 335 * by only exactly one thread. 336 */ 337 /* 338 * We hold the eventq lock at this point but the receiver thread may 339 * need the lock to schedule new recv events while waiting for the 340 * dist sync to complete. 341 * Note that the other simulation threads also release their eventq 342 * locks while waiting for us due to the global event semantics. 343 */ 344 { 345 EventQueue::ScopedRelease sr(curEventQueue()); 346 // we do a global sync here that is supposed to happen at the same 347 // tick in all gem5 peers 348 DistIface::sync->run(true); 349 // global sync completed 350 } 351 if (DistIface::sync->doCkpt) 352 exitSimLoop("checkpoint"); 353 if (DistIface::sync->doExit) 354 exitSimLoop("exit request from gem5 peers"); 355 356 // schedule the next periodic sync 357 repeat = DistIface::sync->nextRepeat; 358 schedule(curTick() + repeat); 359} 360 361void 362DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay) 363{ 364 // This is called from the receiver thread when it starts running. The new 365 // receiver thread shares the event queue with the simulation thread 366 // (associated with the simulated Ethernet link). 367 curEventQueue(eventManager->eventQueue()); 368 369 recvDone = recv_done; 370 linkDelay = link_delay; 371} 372 373Tick 374DistIface::RecvScheduler::calcReceiveTick(Tick send_tick, 375 Tick send_delay, 376 Tick prev_recv_tick) 377{ 378 Tick recv_tick = send_tick + send_delay + linkDelay; 379 // sanity check (we need atleast a send delay long window) 380 assert(recv_tick >= prev_recv_tick + send_delay); 381 panic_if(prev_recv_tick + send_delay > recv_tick, 382 "Receive window is smaller than send delay"); 383 panic_if(recv_tick <= curTick(), 384 "Simulators out of sync - missed packet receive by %llu ticks" 385 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu " 386 "linkDelay: %lu )", 387 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay, 388 linkDelay); 389 390 return recv_tick; 391} 392 393void 394DistIface::RecvScheduler::resumeRecvTicks() 395{ 396 // Schedule pending packets asap in case link speed/delay changed when 397 // restoring from the checkpoint. 398 // This may be done during unserialize except that curTick() is unknown 399 // so we call this during drainResume(). 400 // If we are not restoring from a checkppint then link latency could not 401 // change so we just return. 402 if (!ckptRestore) 403 return; 404 405 std::vector<Desc> v; 406 while (!descQueue.empty()) { 407 Desc d = descQueue.front(); 408 descQueue.pop(); 409 d.sendTick = curTick(); 410 d.sendDelay = d.packet->size(); // assume 1 tick/byte max link speed 411 v.push_back(d); 412 } 413 414 for (auto &d : v) 415 descQueue.push(d); 416 417 if (recvDone->scheduled()) { 418 assert(!descQueue.empty()); 419 eventManager->reschedule(recvDone, curTick()); 420 } else { 421 assert(descQueue.empty() && v.empty()); 422 } 423 ckptRestore = false; 424} 425 426void 427DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet, 428 Tick send_tick, 429 Tick send_delay) 430{ 431 // Note : this is called from the receiver thread 432 curEventQueue()->lock(); 433 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick); 434 435 DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket " 436 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n", 437 send_tick, send_delay, linkDelay, recv_tick); 438 // Every packet must be sent and arrive in the same quantum 439 assert(send_tick > master->syncEvent->when() - 440 master->syncEvent->repeat); 441 // No packet may be scheduled for receive in the arrival quantum 442 assert(send_tick + send_delay + linkDelay > master->syncEvent->when()); 443 444 // Now we are about to schedule a recvDone event for the new data packet. 445 // We use the same recvDone object for all incoming data packets. Packet 446 // descriptors are saved in the ordered queue. The currently scheduled 447 // packet is always on the top of the queue. 448 // NOTE: we use the event queue lock to protect the receive desc queue, 449 // too, which is accessed both by the receiver thread and the simulation 450 // thread. 451 descQueue.emplace(new_packet, send_tick, send_delay); 452 if (descQueue.size() == 1) { 453 assert(!recvDone->scheduled()); 454 eventManager->schedule(recvDone, recv_tick); 455 } else { 456 assert(recvDone->scheduled()); 457 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick, 458 "Out of order packet received (recv_tick: %lu top(): %lu\n", 459 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay); 460 } 461 curEventQueue()->unlock(); 462} 463 464EthPacketPtr 465DistIface::RecvScheduler::popPacket() 466{ 467 // Note : this is called from the simulation thread when a receive done 468 // event is being processed for the link. We assume that the thread holds 469 // the event queue queue lock when this is called! 470 EthPacketPtr next_packet = descQueue.front().packet; 471 descQueue.pop(); 472 473 if (descQueue.size() > 0) { 474 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick, 475 descQueue.front().sendDelay, 476 curTick()); 477 eventManager->schedule(recvDone, recv_tick); 478 } 479 prevRecvTick = curTick(); 480 return next_packet; 481} 482 483void 484DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const 485{ 486 SERIALIZE_SCALAR(sendTick); 487 SERIALIZE_SCALAR(sendDelay); 488 packet->serialize("rxPacket", cp); 489} 490 491void 492DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp) 493{ 494 UNSERIALIZE_SCALAR(sendTick); 495 UNSERIALIZE_SCALAR(sendDelay); 496 packet = std::make_shared<EthPacketData>(16384); 497 packet->unserialize("rxPacket", cp); 498} 499 500void 501DistIface::RecvScheduler::serialize(CheckpointOut &cp) const 502{ 503 SERIALIZE_SCALAR(prevRecvTick); 504 // serialize the receive desc queue 505 std::queue<Desc> tmp_queue(descQueue); 506 unsigned n_desc_queue = descQueue.size(); 507 assert(tmp_queue.size() == descQueue.size()); 508 SERIALIZE_SCALAR(n_desc_queue); 509 for (int i = 0; i < n_desc_queue; i++) { 510 tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i)); 511 tmp_queue.pop(); 512 } 513 assert(tmp_queue.empty()); 514} 515 516void 517DistIface::RecvScheduler::unserialize(CheckpointIn &cp) 518{ 519 assert(descQueue.size() == 0); 520 assert(!recvDone->scheduled()); 521 assert(!ckptRestore); 522 523 UNSERIALIZE_SCALAR(prevRecvTick); 524 // unserialize the receive desc queue 525 unsigned n_desc_queue; 526 UNSERIALIZE_SCALAR(n_desc_queue); 527 for (int i = 0; i < n_desc_queue; i++) { 528 Desc recv_desc; 529 recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i)); 530 descQueue.push(recv_desc); 531 } 532 ckptRestore = true; 533} 534 535DistIface::DistIface(unsigned dist_rank, 536 unsigned dist_size, 537 Tick sync_start, 538 Tick sync_repeat, 539 EventManager *em, 540 bool is_switch, int num_nodes) : 541 syncStart(sync_start), syncRepeat(sync_repeat), 542 recvThread(nullptr), recvScheduler(em), 543 rank(dist_rank), size(dist_size) 544{ 545 DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank); 546 isMaster = false; 547 if (master == nullptr) { 548 assert(sync == nullptr); 549 assert(syncEvent == nullptr); 550 if (is_switch) 551 sync = new SyncSwitch(num_nodes); 552 else 553 sync = new SyncNode(); 554 syncEvent = new SyncEvent(); 555 master = this; 556 isMaster = true; 557 } 558 distIfaceId = distIfaceNum; 559 distIfaceNum++; 560} 561 562DistIface::~DistIface() 563{ 564 assert(recvThread); 565 delete recvThread; 566 if (this == master) { 567 assert(syncEvent); 568 delete syncEvent; 569 assert(sync); 570 delete sync; 571 master = nullptr; 572 } 573} 574 575void 576DistIface::packetOut(EthPacketPtr pkt, Tick send_delay) 577{ 578 Header header; 579 580 // Prepare a dist header packet for the Ethernet packet we want to 581 // send out. 582 header.msgType = MsgType::dataDescriptor; 583 header.sendTick = curTick(); 584 header.sendDelay = send_delay; 585 586 header.dataPacketLength = pkt->size(); 587 588 // Send out the packet and the meta info. 589 sendPacket(header, pkt); 590 591 DPRINTF(DistEthernetPkt, 592 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n", 593 pkt->size(), send_delay); 594} 595 596void 597DistIface::recvThreadFunc(Event *recv_done, Tick link_delay) 598{ 599 EthPacketPtr new_packet; 600 DistHeaderPkt::Header header; 601 602 // Initialize receive scheduler parameters 603 recvScheduler.init(recv_done, link_delay); 604 605 // Main loop to wait for and process any incoming message. 606 for (;;) { 607 // recvHeader() blocks until the next dist header packet comes in. 608 if (!recvHeader(header)) { 609 // We lost connection to the peer gem5 processes most likely 610 // because one of them called m5 exit. So we stop here. 611 // Grab the eventq lock to stop the simulation thread 612 curEventQueue()->lock(); 613 exitSimLoop("Message server closed connection, simulator " 614 "is exiting"); 615 curEventQueue()->unlock(); 616 break; 617 } 618 619 // We got a valid dist header packet, let's process it 620 if (header.msgType == MsgType::dataDescriptor) { 621 recvPacket(header, new_packet); 622 recvScheduler.pushPacket(new_packet, 623 header.sendTick, 624 header.sendDelay); 625 } else { 626 // everything else must be synchronisation related command 627 sync->progress(header.sendTick, 628 header.syncRepeat, 629 header.needCkpt, 630 header.needExit); 631 } 632 } 633} 634 635void 636DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay) 637{ 638 assert(recvThread == nullptr); 639 640 recvThread = new std::thread(&DistIface::recvThreadFunc, 641 this, 642 const_cast<Event *>(recv_done), 643 link_delay); 644 recvThreadsNum++; 645} 646 647DrainState 648DistIface::drain() 649{ 650 DPRINTF(DistEthernet,"DistIFace::drain() called\n"); 651 // This can be called multiple times in the same drain cycle. 652 if (this == master) 653 syncEvent->draining(true); 654 return DrainState::Drained; 655} 656 657void 658DistIface::drainResume() { 659 DPRINTF(DistEthernet,"DistIFace::drainResume() called\n"); 660 if (this == master) 661 syncEvent->draining(false); 662 recvScheduler.resumeRecvTicks(); 663} 664 665void 666DistIface::serialize(CheckpointOut &cp) const 667{ 668 // Drain the dist interface before the checkpoint is taken. We cannot call 669 // this as part of the normal drain cycle because this dist sync has to be 670 // called exactly once after the system is fully drained. 671 sync->drainComplete(); 672 673 unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId; 674 675 SERIALIZE_SCALAR(rank_orig); 676 SERIALIZE_SCALAR(dist_iface_id_orig); 677 678 recvScheduler.serializeSection(cp, "recvScheduler"); 679 if (this == master) { 680 sync->serializeSection(cp, "Sync"); 681 } 682} 683 684void 685DistIface::unserialize(CheckpointIn &cp) 686{ 687 unsigned rank_orig, dist_iface_id_orig; 688 UNSERIALIZE_SCALAR(rank_orig); 689 UNSERIALIZE_SCALAR(dist_iface_id_orig); 690 691 panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)", 692 rank, rank_orig); 693 panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch " 694 "at resume (distIfaceId=%d, orig=%d)", distIfaceId, 695 dist_iface_id_orig); 696 697 recvScheduler.unserializeSection(cp, "recvScheduler"); 698 if (this == master) { 699 sync->unserializeSection(cp, "Sync"); 700 } 701} 702 703void 704DistIface::init(const Event *done_event, Tick link_delay) 705{ 706 // Init hook for the underlaying message transport to setup/finalize 707 // communication channels 708 initTransport(); 709 710 // Spawn a new receiver thread that will process messages 711 // coming in from peer gem5 processes. 712 // The receive thread will also schedule a (receive) doneEvent 713 // for each incoming data packet. 714 spawnRecvThread(done_event, link_delay); 715 716 717 // Adjust the periodic sync start and interval. Different DistIface 718 // might have different requirements. The singleton sync object 719 // will select the minimum values for both params. 720 assert(sync != nullptr); 721 sync->init(syncStart, syncRepeat); 722 723 // Initialize the seed for random generator to avoid the same sequence 724 // in all gem5 peer processes 725 assert(master != nullptr); 726 if (this == master) 727 random_mt.init(5489 * (rank+1) + 257); 728} 729 730void 731DistIface::startup() 732{ 733 DPRINTF(DistEthernet, "DistIface::startup() started\n"); 734 if (this == master) 735 syncEvent->start(); 736 DPRINTF(DistEthernet, "DistIface::startup() done\n"); 737} 738 739bool 740DistIface::readyToCkpt(Tick delay, Tick period) 741{ 742 bool ret = true; 743 DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu " 744 "period:%lu\n", delay, period); 745 if (master) { 746 if (delay == 0) { 747 inform("m5 checkpoint called with zero delay => triggering collaborative " 748 "checkpoint\n"); 749 sync->requestCkpt(ReqType::collective); 750 } else { 751 inform("m5 checkpoint called with non-zero delay => triggering immediate " 752 "checkpoint (at the next sync)\n"); 753 sync->requestCkpt(ReqType::immediate); 754 } 755 if (period != 0) 756 inform("Non-zero period for m5_ckpt is ignored in " 757 "distributed gem5 runs\n"); 758 ret = false; 759 } 760 return ret; 761} 762 763bool 764DistIface::readyToExit(Tick delay) 765{ 766 bool ret = true; 767 DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n", 768 delay); 769 if (master) { 770 if (delay == 0) { 771 inform("m5 exit called with zero delay => triggering collaborative " 772 "exit\n"); 773 sync->requestExit(ReqType::collective); 774 } else { 775 inform("m5 exit called with non-zero delay => triggering immediate " 776 "exit (at the next sync)\n"); 777 sync->requestExit(ReqType::immediate); 778 } 779 ret = false; 780 } 781 return ret; 782} 783 784uint64_t 785DistIface::rankParam() 786{ 787 uint64_t val; 788 if (master) { 789 val = master->rank; 790 } else { 791 warn("Dist-rank parameter is queried in single gem5 simulation."); 792 val = 0; 793 } 794 return val; 795} 796 797uint64_t 798DistIface::sizeParam() 799{ 800 uint64_t val; 801 if (master) { 802 val = master->size; 803 } else { 804 warn("Dist-size parameter is queried in single gem5 simulation."); 805 val = 1; 806 } 807 return val; 808} 809