dist_iface.cc revision 10923
1/* 2 * Copyright (c) 2015 ARM Limited 3 * All rights reserved 4 * 5 * The license below extends only to copyright in the software and shall 6 * not be construed as granting a license to any other intellectual 7 * property including but not limited to intellectual property relating 8 * to a hardware implementation of the functionality of the software 9 * licensed hereunder. You may use the software subject to the license 10 * terms below provided that you ensure that this notice is replicated 11 * unmodified and in its entirety in all distributions of the software, 12 * modified or unmodified, in source code or in binary form. 13 * 14 * Redistribution and use in source and binary forms, with or without 15 * modification, are permitted provided that the following conditions are 16 * met: redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer; 18 * redistributions in binary form must reproduce the above copyright 19 * notice, this list of conditions and the following disclaimer in the 20 * documentation and/or other materials provided with the distribution; 21 * neither the name of the copyright holders nor the names of its 22 * contributors may be used to endorse or promote products derived from 23 * this software without specific prior written permission. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 36 * 37 * Authors: Gabor Dozsa 38 */ 39 40/* @file 41 * The interface class for multi gem5 simulations. 42 */ 43 44#include "dev/multi_iface.hh" 45 46#include <queue> 47#include <thread> 48 49#include "base/random.hh" 50#include "base/trace.hh" 51#include "debug/MultiEthernet.hh" 52#include "debug/MultiEthernetPkt.hh" 53#include "dev/etherpkt.hh" 54#include "sim/sim_exit.hh" 55#include "sim/sim_object.hh" 56 57 58MultiIface::Sync *MultiIface::sync = nullptr; 59MultiIface::SyncEvent *MultiIface::syncEvent = nullptr; 60unsigned MultiIface::recvThreadsNum = 0; 61MultiIface *MultiIface::master = nullptr; 62 63bool 64MultiIface::Sync::run(SyncTrigger t, Tick sync_tick) 65{ 66 std::unique_lock<std::mutex> sync_lock(lock); 67 68 trigger = t; 69 if (trigger != SyncTrigger::periodic) { 70 DPRINTF(MultiEthernet,"MultiIface::Sync::run() trigger:%d\n", 71 (unsigned)trigger); 72 } 73 74 switch (state) { 75 case SyncState::asyncCkpt: 76 switch (trigger) { 77 case SyncTrigger::ckpt: 78 assert(MultiIface::syncEvent->interrupted == false); 79 state = SyncState::busy; 80 break; 81 case SyncTrigger::periodic: 82 if (waitNum == 0) { 83 // So all recv threads got an async checkpoint request already 84 // and a simExit is scheduled at the end of the current tick 85 // (i.e. it is a periodic sync scheduled at the same tick as 86 // the simExit). 87 state = SyncState::idle; 88 DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted " 89 "due to async ckpt scheduled\n"); 90 return false; 91 } else { 92 // we still need to wait for some receiver thread to get the 93 // aysnc ckpt request. We are going to proceed as 'interrupted' 94 // periodic sync. 95 state = SyncState::interrupted; 96 DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted " 97 "due to ckpt request is coming in\n"); 98 } 99 break; 100 case SyncTrigger::atomic: 101 assert(trigger != SyncTrigger::atomic); 102 } 103 break; 104 case SyncState::idle: 105 state = SyncState::busy; 106 break; 107 // Only one sync can be active at any time 108 case SyncState::interrupted: 109 case SyncState::busy: 110 assert(state != SyncState::interrupted); 111 assert(state != SyncState::busy); 112 break; 113 } 114 // Kick-off the sync unless we are in the middle of an interrupted 115 // periodic sync 116 if (state != SyncState::interrupted) { 117 assert(waitNum == 0); 118 waitNum = MultiIface::recvThreadsNum; 119 // initiate the global synchronisation 120 assert(MultiIface::master != nullptr); 121 MultiIface::master->syncRaw(triggerToMsg[(unsigned)trigger], sync_tick); 122 } 123 // now wait until all receiver threads complete the synchronisation 124 auto lf = [this]{ return waitNum == 0; }; 125 cv.wait(sync_lock, lf); 126 127 // we are done 128 assert(state == SyncState::busy || state == SyncState::interrupted); 129 bool ret = (state != SyncState::interrupted); 130 state = SyncState::idle; 131 return ret; 132} 133 134void 135MultiIface::Sync::progress(MsgType msg) 136{ 137 std::unique_lock<std::mutex> sync_lock(lock); 138 139 switch (msg) { 140 case MsgType::cmdAtomicSyncAck: 141 assert(state == SyncState::busy && trigger == SyncTrigger::atomic); 142 break; 143 case MsgType::cmdPeriodicSyncAck: 144 assert(state == SyncState::busy && trigger == SyncTrigger::periodic); 145 break; 146 case MsgType::cmdCkptSyncAck: 147 assert(state == SyncState::busy && trigger == SyncTrigger::ckpt); 148 break; 149 case MsgType::cmdCkptSyncReq: 150 switch (state) { 151 case SyncState::busy: 152 if (trigger == SyncTrigger::ckpt) { 153 // We are already in a checkpoint sync but got another ckpt 154 // sync request. This may happen if two (or more) peer gem5 155 // processes try to start a ckpt nearly at the same time. 156 // Incrementing waitNum here (before decrementing it below) 157 // effectively results in ignoring this new ckpt sync request. 158 waitNum++; 159 break; 160 } 161 assert (waitNum == recvThreadsNum); 162 state = SyncState::interrupted; 163 // we need to fall over here to handle "recvThreadsNum == 1" case 164 case SyncState::interrupted: 165 assert(trigger == SyncTrigger::periodic); 166 assert(waitNum >= 1); 167 if (waitNum == 1) { 168 exitSimLoop("checkpoint"); 169 } 170 break; 171 case SyncState::idle: 172 // There is no on-going sync so we got an async ckpt request. If we 173 // are the only receiver thread then we need to schedule the 174 // checkpoint. Otherwise, only change the state to 'asyncCkpt' and 175 // let the last receiver thread to schedule the checkpoint at the 176 // 'asyncCkpt' case. 177 // Note that a periodic or resume sync may start later and that can 178 // trigger a state change to 'interrupted' (so the checkpoint may 179 // get scheduled at 'interrupted' case finally). 180 assert(waitNum == 0); 181 state = SyncState::asyncCkpt; 182 waitNum = MultiIface::recvThreadsNum; 183 // we need to fall over here to handle "recvThreadsNum == 1" case 184 case SyncState::asyncCkpt: 185 assert(waitNum >= 1); 186 if (waitNum == 1) 187 exitSimLoop("checkpoint"); 188 break; 189 default: 190 panic("Unexpected state for checkpoint request message"); 191 break; 192 } 193 break; 194 default: 195 panic("Unknown msg type"); 196 break; 197 } 198 waitNum--; 199 assert(state != SyncState::idle); 200 // Notify the simultaion thread if there is an on-going sync. 201 if (state != SyncState::asyncCkpt) { 202 sync_lock.unlock(); 203 cv.notify_one(); 204 } 205} 206 207void MultiIface::SyncEvent::start(Tick start, Tick interval) 208{ 209 assert(!scheduled()); 210 if (interval == 0) 211 panic("Multi synchronisation period must be greater than zero"); 212 repeat = interval; 213 schedule(start); 214} 215 216void 217MultiIface::SyncEvent::adjust(Tick start_tick, Tick repeat_tick) 218{ 219 // The new multi interface may require earlier start of the 220 // synchronisation. 221 assert(scheduled() == true); 222 if (start_tick < when()) 223 reschedule(start_tick); 224 // The new multi interface may require more frequent synchronisation. 225 if (repeat == 0) 226 panic("Multi synchronisation period must be greater than zero"); 227 if (repeat < repeat_tick) 228 repeat = repeat_tick; 229} 230 231void 232MultiIface::SyncEvent::process() 233{ 234 /* 235 * Note that this is a global event so this process method will be called 236 * by only exactly one thread. 237 */ 238 // if we are draining the system then we must not start a periodic sync (as 239 // it is not sure that all peer gem5 will reach this tick before taking 240 // the checkpoint). 241 if (isDraining == true) { 242 assert(interrupted == false); 243 interrupted = true; 244 DPRINTF(MultiEthernet,"MultiIface::SyncEvent::process() interrupted " 245 "due to draining\n"); 246 return; 247 } 248 if (interrupted == false) 249 scheduledAt = curTick(); 250 /* 251 * We hold the eventq lock at this point but the receiver thread may 252 * need the lock to schedule new recv events while waiting for the 253 * multi sync to complete. 254 * Note that the other simulation threads also release their eventq 255 * locks while waiting for us due to the global event semantics. 256 */ 257 curEventQueue()->unlock(); 258 // we do a global sync here 259 interrupted = !MultiIface::sync->run(SyncTrigger::periodic, scheduledAt); 260 // Global sync completed or got interrupted. 261 // we are expected to exit with the eventq lock held 262 curEventQueue()->lock(); 263 // schedule the next global sync event if this one completed. Otherwise 264 // (i.e. this one was interrupted by a checkpoint request), we will 265 // reschedule this one after the draining is complete. 266 if (!interrupted) 267 schedule(scheduledAt + repeat); 268} 269 270void MultiIface::SyncEvent::resume() 271{ 272 Tick sync_tick; 273 assert(!scheduled()); 274 if (interrupted) { 275 assert(curTick() >= scheduledAt); 276 // We have to complete the interrupted periodic sync asap. 277 // Note that this sync might be interrupted now again with a checkpoint 278 // request from a peer gem5... 279 sync_tick = curTick(); 280 schedule(sync_tick); 281 } else { 282 // So we completed the last periodic sync, let's find out the tick for 283 // next one 284 assert(curTick() > scheduledAt); 285 sync_tick = scheduledAt + repeat; 286 if (sync_tick < curTick()) 287 panic("Cannot resume periodic synchronisation"); 288 schedule(sync_tick); 289 } 290 DPRINTF(MultiEthernet, 291 "MultiIface::SyncEvent periodic sync resumed at %lld " 292 "(curTick:%lld)\n", sync_tick, curTick()); 293} 294 295void MultiIface::SyncEvent::serialize(const std::string &base, 296 CheckpointOut &cp) const 297{ 298 // Save the periodic multi sync schedule information 299 paramOut(cp, base + ".periodicSyncRepeat", repeat); 300 paramOut(cp, base + ".periodicSyncInterrupted", interrupted); 301 paramOut(cp, base + ".periodicSyncAt", scheduledAt); 302} 303 304void MultiIface::SyncEvent::unserialize(const std::string &base, 305 CheckpointIn &cp) 306{ 307 paramIn(cp, base + ".periodicSyncRepeat", repeat); 308 paramIn(cp, base + ".periodicSyncInterrupted", interrupted); 309 paramIn(cp, base + ".periodicSyncAt", scheduledAt); 310} 311 312MultiIface::MultiIface(unsigned multi_rank, 313 Tick sync_start, 314 Tick sync_repeat, 315 EventManager *em) : 316 syncStart(sync_start), syncRepeat(sync_repeat), 317 recvThread(nullptr), eventManager(em), recvDone(nullptr), 318 scheduledRecvPacket(nullptr), linkDelay(0), rank(multi_rank) 319{ 320 DPRINTF(MultiEthernet, "MultiIface() ctor rank:%d\n",multi_rank); 321 if (master == nullptr) { 322 assert(sync == nullptr); 323 assert(syncEvent == nullptr); 324 sync = new Sync(); 325 syncEvent = new SyncEvent(); 326 master = this; 327 } 328} 329 330MultiIface::~MultiIface() 331{ 332 assert(recvThread); 333 delete recvThread; 334 if (this == master) { 335 assert(syncEvent); 336 delete syncEvent; 337 assert(sync); 338 delete sync; 339 } 340} 341 342void 343MultiIface::packetOut(EthPacketPtr pkt, Tick send_delay) 344{ 345 MultiHeaderPkt::Header header_pkt; 346 unsigned address_length = MultiHeaderPkt::maxAddressLength(); 347 348 // Prepare a multi header packet for the Ethernet packet we want to 349 // send out. 350 header_pkt.msgType = MsgType::dataDescriptor; 351 header_pkt.sendTick = curTick(); 352 header_pkt.sendDelay = send_delay; 353 354 // Store also the source and destination addresses. 355 pkt->packAddress(header_pkt.srcAddress, header_pkt.dstAddress, 356 address_length); 357 358 header_pkt.dataPacketLength = pkt->size(); 359 360 // Send out the multi hedare packet followed by the Ethernet packet. 361 sendRaw(&header_pkt, sizeof(header_pkt), header_pkt.dstAddress); 362 sendRaw(pkt->data, pkt->size(), header_pkt.dstAddress); 363 DPRINTF(MultiEthernetPkt, 364 "MultiIface::sendDataPacket() done size:%d send_delay:%llu " 365 "src:0x%02x%02x%02x%02x%02x%02x " 366 "dst:0x%02x%02x%02x%02x%02x%02x\n", 367 pkt->size(), send_delay, 368 header_pkt.srcAddress[0], header_pkt.srcAddress[1], 369 header_pkt.srcAddress[2], header_pkt.srcAddress[3], 370 header_pkt.srcAddress[4], header_pkt.srcAddress[5], 371 header_pkt.dstAddress[0], header_pkt.dstAddress[1], 372 header_pkt.dstAddress[2], header_pkt.dstAddress[3], 373 header_pkt.dstAddress[4], header_pkt.dstAddress[5]); 374} 375 376bool 377MultiIface::recvHeader(MultiHeaderPkt::Header &header_pkt) 378{ 379 // Blocking receive of an incoming multi header packet. 380 return recvRaw((void *)&header_pkt, sizeof(header_pkt)); 381} 382 383void 384MultiIface::recvData(const MultiHeaderPkt::Header &header_pkt) 385{ 386 // We are here beacuse a header packet has been received implying 387 // that an Ethernet (data) packet is coming in next. 388 assert(header_pkt.msgType == MsgType::dataDescriptor); 389 // Allocate storage for the incoming Ethernet packet. 390 EthPacketPtr new_packet(new EthPacketData(header_pkt.dataPacketLength)); 391 // Now execute the blocking receive and store the incoming data directly 392 // in the new EthPacketData object. 393 if (! recvRaw((void *)(new_packet->data), header_pkt.dataPacketLength)) 394 panic("Missing data packet"); 395 396 new_packet->length = header_pkt.dataPacketLength; 397 // Grab the event queue lock to schedule a new receive event for the 398 // data packet. 399 curEventQueue()->lock(); 400 // Compute the receive tick. It includes the send delay and the 401 // simulated link delay. 402 Tick recv_tick = header_pkt.sendTick + header_pkt.sendDelay + linkDelay; 403 DPRINTF(MultiEthernetPkt, "MultiIface::recvThread() packet receive, " 404 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n", 405 header_pkt.sendTick, header_pkt.sendDelay, linkDelay, recv_tick); 406 407 if (recv_tick <= curTick()) { 408 panic("Simulators out of sync - missed packet receive by %llu ticks", 409 curTick() - recv_tick); 410 } 411 // Now we are about to schedule a recvDone event for the new data packet. 412 // We use the same recvDone object for all incoming data packets. If 413 // that is already scheduled - i.e. a receive event for a previous 414 // data packet is already pending - then we have to check whether the 415 // receive tick for the new packet is earlier than that of the currently 416 // pending event. Packets may arrive out-of-order with respect to 417 // simulated receive time. If that is the case, we need to re-schedule the 418 // recvDone event for the new packet. Otherwise, we save the packet 419 // pointer and the recv tick for the new packet in the recvQueue. See 420 // the implementation of the packetIn() method for comments on how this 421 // information is retrieved from the recvQueue by the simulation thread. 422 if (!recvDone->scheduled()) { 423 assert(recvQueue.size() == 0); 424 assert(scheduledRecvPacket == nullptr); 425 scheduledRecvPacket = new_packet; 426 eventManager->schedule(recvDone, recv_tick); 427 } else if (recvDone->when() > recv_tick) { 428 recvQueue.emplace(scheduledRecvPacket, recvDone->when()); 429 eventManager->reschedule(recvDone, recv_tick); 430 scheduledRecvPacket = new_packet; 431 } else { 432 recvQueue.emplace(new_packet, recv_tick); 433 } 434 curEventQueue()->unlock(); 435} 436 437void 438MultiIface::recvThreadFunc() 439{ 440 EthPacketPtr new_packet; 441 MultiHeaderPkt::Header header; 442 443 // The new receiver thread shares the event queue with the simulation 444 // thread (associated with the simulated Ethernet link). 445 curEventQueue(eventManager->eventQueue()); 446 // Main loop to wait for and process any incoming message. 447 for (;;) { 448 // recvHeader() blocks until the next multi header packet comes in. 449 if (!recvHeader(header)) { 450 // We lost connection to the peer gem5 processes most likely 451 // because one of them called m5 exit. So we stop here. 452 exit_message("info", 0, "Message server closed connection, " 453 "simulation is exiting"); 454 } 455 // We got a valid multi header packet, let's process it 456 if (header.msgType == MsgType::dataDescriptor) { 457 recvData(header); 458 } else { 459 // everything else must be synchronisation related command 460 sync->progress(header.msgType); 461 } 462 } 463} 464 465EthPacketPtr 466MultiIface::packetIn() 467{ 468 // We are called within the process() method of the recvDone event. We 469 // return the packet that triggered the current receive event. 470 // If there is further packets in the recvQueue, we also have to schedule 471 // the recvEvent for the next packet with the smallest receive tick. 472 // The priority queue container ensures that smallest receive tick is 473 // always on the top of the queue. 474 assert(scheduledRecvPacket != nullptr); 475 EthPacketPtr next_packet = scheduledRecvPacket; 476 477 if (! recvQueue.empty()) { 478 eventManager->schedule(recvDone, recvQueue.top().second); 479 scheduledRecvPacket = recvQueue.top().first; 480 recvQueue.pop(); 481 } else { 482 scheduledRecvPacket = nullptr; 483 } 484 485 return next_packet; 486} 487 488void 489MultiIface::spawnRecvThread(Event *recv_done, Tick link_delay) 490{ 491 assert(recvThread == nullptr); 492 // all receive thread must be spawned before simulation starts 493 assert(eventManager->eventQueue()->getCurTick() == 0); 494 495 recvDone = recv_done; 496 linkDelay = link_delay; 497 498 recvThread = new std::thread(&MultiIface::recvThreadFunc, this); 499 500 recvThreadsNum++; 501} 502 503DrainState 504MultiIface::drain() 505{ 506 DPRINTF(MultiEthernet,"MultiIFace::drain() called\n"); 507 508 // This can be called multiple times in the same drain cycle. 509 if (master == this) { 510 syncEvent->isDraining = true; 511 } 512 513 return DrainState::Drained; 514} 515 516void MultiIface::drainDone() { 517 if (master == this) { 518 assert(syncEvent->isDraining == true); 519 syncEvent->isDraining = false; 520 // We need to resume the interrupted periodic sync here now that the 521 // draining is done. If the last periodic sync completed before the 522 // checkpoint then the next one is already scheduled. 523 if (syncEvent->interrupted) 524 syncEvent->resume(); 525 } 526} 527 528void MultiIface::serialize(const std::string &base, CheckpointOut &cp) const 529{ 530 // Drain the multi interface before the checkpoint is taken. We cannot call 531 // this as part of the normal drain cycle because this multi sync has to be 532 // called exactly once after the system is fully drained. 533 // Note that every peer will take a checkpoint but they may take it at 534 // different ticks. 535 // This sync request may interrupt an on-going periodic sync in some peers. 536 sync->run(SyncTrigger::ckpt, curTick()); 537 538 // Save the periodic multi sync status 539 syncEvent->serialize(base, cp); 540 541 unsigned n_rx_packets = recvQueue.size(); 542 if (scheduledRecvPacket != nullptr) 543 n_rx_packets++; 544 545 paramOut(cp, base + ".nRxPackets", n_rx_packets); 546 547 if (n_rx_packets > 0) { 548 assert(recvDone->scheduled()); 549 scheduledRecvPacket->serialize(base + ".rxPacket[0]", cp); 550 } 551 552 for (unsigned i=1; i < n_rx_packets; i++) { 553 const RecvInfo recv_info = recvQueue.impl().at(i-1); 554 recv_info.first->serialize(base + csprintf(".rxPacket[%d]", i), cp); 555 Tick rx_tick = recv_info.second; 556 paramOut(cp, base + csprintf(".rxTick[%d]", i), rx_tick); 557 } 558} 559 560void MultiIface::unserialize(const std::string &base, CheckpointIn &cp) 561{ 562 assert(recvQueue.size() == 0); 563 assert(scheduledRecvPacket == nullptr); 564 assert(recvDone->scheduled() == false); 565 566 // restore periodic sync info 567 syncEvent->unserialize(base, cp); 568 569 unsigned n_rx_packets; 570 paramIn(cp, base + ".nRxPackets", n_rx_packets); 571 572 if (n_rx_packets > 0) { 573 scheduledRecvPacket = std::make_shared<EthPacketData>(16384); 574 scheduledRecvPacket->unserialize(base + ".rxPacket[0]", cp); 575 // Note: receive event will be scheduled when the link is unserialized 576 } 577 578 for (unsigned i=1; i < n_rx_packets; i++) { 579 EthPacketPtr rx_packet = std::make_shared<EthPacketData>(16384); 580 rx_packet->unserialize(base + csprintf(".rxPacket[%d]", i), cp); 581 Tick rx_tick = 0; 582 paramIn(cp, base + csprintf(".rxTick[%d]", i), rx_tick); 583 assert(rx_tick > 0); 584 recvQueue.emplace(rx_packet,rx_tick); 585 } 586} 587 588void MultiIface::initRandom() 589{ 590 // Initialize the seed for random generator to avoid the same sequence 591 // in all gem5 peer processes 592 assert(master != nullptr); 593 if (this == master) 594 random_mt.init(5489 * (rank+1) + 257); 595} 596 597void MultiIface::startPeriodicSync() 598{ 599 DPRINTF(MultiEthernet, "MultiIface:::initPeriodicSync started\n"); 600 // Do a global sync here to ensure that peer gem5 processes are around 601 // (actually this may not be needed...) 602 sync->run(SyncTrigger::atomic, curTick()); 603 604 // Start the periodic sync if it is a fresh simulation from scratch 605 if (curTick() == 0) { 606 if (this == master) { 607 syncEvent->start(syncStart, syncRepeat); 608 inform("Multi synchronisation activated: start at %lld, " 609 "repeat at every %lld ticks.\n", 610 syncStart, syncRepeat); 611 } else { 612 // In case another multiIface object requires different schedule 613 // for periodic sync than the master does. 614 syncEvent->adjust(syncStart, syncRepeat); 615 } 616 } else { 617 // Schedule the next periodic sync if resuming from a checkpoint 618 if (this == master) 619 syncEvent->resume(); 620 } 621 DPRINTF(MultiEthernet, "MultiIface::initPeriodicSync done\n"); 622} 623