dist_iface.cc revision 11622:0b2aaf6f5c78
16145Snate@binkert.org/*
26145Snate@binkert.org * Copyright (c) 2015 ARM Limited
36145Snate@binkert.org * All rights reserved
46145Snate@binkert.org *
56145Snate@binkert.org * The license below extends only to copyright in the software and shall
66145Snate@binkert.org * not be construed as granting a license to any other intellectual
76145Snate@binkert.org * property including but not limited to intellectual property relating
86145Snate@binkert.org * to a hardware implementation of the functionality of the software
96145Snate@binkert.org * licensed hereunder.  You may use the software subject to the license
106145Snate@binkert.org * terms below provided that you ensure that this notice is replicated
116145Snate@binkert.org * unmodified and in its entirety in all distributions of the software,
126145Snate@binkert.org * modified or unmodified, in source code or in binary form.
136145Snate@binkert.org *
146145Snate@binkert.org * Redistribution and use in source and binary forms, with or without
156145Snate@binkert.org * modification, are permitted provided that the following conditions are
166145Snate@binkert.org * met: redistributions of source code must retain the above copyright
176145Snate@binkert.org * notice, this list of conditions and the following disclaimer;
186145Snate@binkert.org * redistributions in binary form must reproduce the above copyright
196145Snate@binkert.org * notice, this list of conditions and the following disclaimer in the
206145Snate@binkert.org * documentation and/or other materials provided with the distribution;
216145Snate@binkert.org * neither the name of the copyright holders nor the names of its
226145Snate@binkert.org * contributors may be used to endorse or promote products derived from
236145Snate@binkert.org * this software without specific prior written permission.
246145Snate@binkert.org *
256145Snate@binkert.org * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
266145Snate@binkert.org * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
276145Snate@binkert.org * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
286145Snate@binkert.org * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
297039Snate@binkert.org * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
307039Snate@binkert.org * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
316145Snate@binkert.org * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
327055Snate@binkert.org * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
337055Snate@binkert.org * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
347455Snate@binkert.org * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
358229Snate@binkert.org * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
368165Snilay@cs.wisc.edu *
379104Shestness@cs.utexas.edu * Authors: Gabor Dozsa
387039Snate@binkert.org */
399171Snilay@cs.wisc.edu
406285Snate@binkert.org/* @file
419171Snilay@cs.wisc.edu * The interface class for dist-gem5 simulations.
426145Snate@binkert.org */
436145Snate@binkert.org
446145Snate@binkert.org#include "dev/net/dist_iface.hh"
457039Snate@binkert.org
467039Snate@binkert.org#include <queue>
478615Snilay@cs.wisc.edu#include <thread>
488615Snilay@cs.wisc.edu
499501Snilay@cs.wisc.edu#include "base/random.hh"
506285Snate@binkert.org#include "base/trace.hh"
519501Snilay@cs.wisc.edu#include "debug/DistEthernet.hh"
529501Snilay@cs.wisc.edu#include "debug/DistEthernetPkt.hh"
538615Snilay@cs.wisc.edu#include "dev/net/etherpkt.hh"
547039Snate@binkert.org#include "sim/sim_exit.hh"
556285Snate@binkert.org#include "sim/sim_object.hh"
566285Snate@binkert.org
576763SBrad.Beckmann@amd.comusing namespace std;
586763SBrad.Beckmann@amd.comDistIface::Sync *DistIface::sync = nullptr;
599171Snilay@cs.wisc.eduDistIface::SyncEvent *DistIface::syncEvent = nullptr;
607039Snate@binkert.orgunsigned DistIface::distIfaceNum = 0;
617039Snate@binkert.orgunsigned DistIface::recvThreadsNum = 0;
626876Ssteve.reinhardt@amd.comDistIface *DistIface::master = nullptr;
637039Snate@binkert.org
647039Snate@binkert.orgvoid
656145Snate@binkert.orgDistIface::Sync::init(Tick start_tick, Tick repeat_tick)
667039Snate@binkert.org{
677039Snate@binkert.org    if (start_tick < firstAt) {
686145Snate@binkert.org        firstAt = start_tick;
697055Snate@binkert.org        inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
706145Snate@binkert.org    }
717039Snate@binkert.org
727546SBrad.Beckmann@amd.com    if (repeat_tick == 0)
737546SBrad.Beckmann@amd.com        panic("Dist synchronisation interval must be greater than zero");
747546SBrad.Beckmann@amd.com
757546SBrad.Beckmann@amd.com    if (repeat_tick < nextRepeat) {
767546SBrad.Beckmann@amd.com        nextRepeat = repeat_tick;
777565SBrad.Beckmann@amd.com        inform("Dist synchronisation interval is changed to %lu.\n",
787565SBrad.Beckmann@amd.com               nextRepeat);
797565SBrad.Beckmann@amd.com    }
807565SBrad.Beckmann@amd.com}
817565SBrad.Beckmann@amd.com
827565SBrad.Beckmann@amd.comDistIface::SyncSwitch::SyncSwitch(int num_nodes)
837565SBrad.Beckmann@amd.com{
847039Snate@binkert.org    numNodes = num_nodes;
856145Snate@binkert.org    waitNum = num_nodes;
867546SBrad.Beckmann@amd.com    numExitReq = 0;
877546SBrad.Beckmann@amd.com    numCkptReq = 0;
887546SBrad.Beckmann@amd.com    doExit = false;
897546SBrad.Beckmann@amd.com    doCkpt = false;
907565SBrad.Beckmann@amd.com    firstAt = std::numeric_limits<Tick>::max();
917565SBrad.Beckmann@amd.com    nextAt = 0;
927565SBrad.Beckmann@amd.com    nextRepeat = std::numeric_limits<Tick>::max();
937565SBrad.Beckmann@amd.com}
947565SBrad.Beckmann@amd.com
957565SBrad.Beckmann@amd.comDistIface::SyncNode::SyncNode()
967565SBrad.Beckmann@amd.com{
978615Snilay@cs.wisc.edu    waitNum = 0;
987039Snate@binkert.org    needExit = ReqType::none;
998688Snilay@cs.wisc.edu    needCkpt = ReqType::none;
1008688Snilay@cs.wisc.edu    doExit = false;
1018688Snilay@cs.wisc.edu    doCkpt = false;
1028688Snilay@cs.wisc.edu    firstAt = std::numeric_limits<Tick>::max();
1038688Snilay@cs.wisc.edu    nextAt = 0;
1048688Snilay@cs.wisc.edu    nextRepeat = std::numeric_limits<Tick>::max();
1058688Snilay@cs.wisc.edu}
1068688Snilay@cs.wisc.edu
1078688Snilay@cs.wisc.eduvoid
1088688Snilay@cs.wisc.eduDistIface::SyncNode::run(bool same_tick)
1098688Snilay@cs.wisc.edu{
1108688Snilay@cs.wisc.edu    std::unique_lock<std::mutex> sync_lock(lock);
1116145Snate@binkert.org    Header header;
1127055Snate@binkert.org
1137055Snate@binkert.org    assert(waitNum == 0);
1147039Snate@binkert.org    waitNum = DistIface::recvThreadsNum;
1156145Snate@binkert.org    // initiate the global synchronisation
1167455Snate@binkert.org    header.msgType = MsgType::cmdSyncReq;
1177039Snate@binkert.org    header.sendTick = curTick();
1188717Snilay@cs.wisc.edu    header.syncRepeat = nextRepeat;
1196145Snate@binkert.org    header.needCkpt = needCkpt;
1209104Shestness@cs.utexas.edu    if (needCkpt != ReqType::none)
1219104Shestness@cs.utexas.edu        needCkpt = ReqType::pending;
1227039Snate@binkert.org    header.needExit = needExit;
1238615Snilay@cs.wisc.edu    if (needExit != ReqType::none)
1246145Snate@binkert.org        needExit = ReqType::pending;
1257546SBrad.Beckmann@amd.com    DistIface::master->sendCmd(header);
1267546SBrad.Beckmann@amd.com    // now wait until all receiver threads complete the synchronisation
1277560SBrad.Beckmann@amd.com    auto lf = [this]{ return waitNum == 0; };
1287565SBrad.Beckmann@amd.com    cv.wait(sync_lock, lf);
1297565SBrad.Beckmann@amd.com    // global synchronisation is done
1307565SBrad.Beckmann@amd.com    assert(!same_tick || (nextAt == curTick()));
1317565SBrad.Beckmann@amd.com}
1327546SBrad.Beckmann@amd.com
1338615Snilay@cs.wisc.edu
1346285Snate@binkert.orgvoid
1357560SBrad.Beckmann@amd.comDistIface::SyncSwitch::run(bool same_tick)
1366145Snate@binkert.org{
1377039Snate@binkert.org    std::unique_lock<std::mutex> sync_lock(lock);
1387039Snate@binkert.org    Header header;
1397039Snate@binkert.org    // Wait for the sync requests from the nodes
1406145Snate@binkert.org    if (waitNum > 0) {
1417039Snate@binkert.org        auto lf = [this]{ return waitNum == 0; };
1427039Snate@binkert.org        cv.wait(sync_lock, lf);
1439184Sandreas.hansson@arm.com    }
1446145Snate@binkert.org    assert(waitNum == 0);
1457039Snate@binkert.org    assert(!same_tick || (nextAt == curTick()));
1467039Snate@binkert.org    waitNum = numNodes;
1476285Snate@binkert.org    // Complete the global synchronisation
1487455Snate@binkert.org    header.msgType = MsgType::cmdSyncAck;
1497455Snate@binkert.org    header.sendTick = nextAt;
1507455Snate@binkert.org    header.syncRepeat = nextRepeat;
1517039Snate@binkert.org    if (doCkpt || numCkptReq == numNodes) {
1527039Snate@binkert.org        doCkpt = true;
1537039Snate@binkert.org        header.needCkpt = ReqType::immediate;
1546145Snate@binkert.org        numCkptReq = 0;
1557039Snate@binkert.org    } else {
1567039Snate@binkert.org        header.needCkpt = ReqType::none;
1577039Snate@binkert.org    }
1587039Snate@binkert.org    if (doExit || numExitReq == numNodes) {
1596859Sdrh5@cs.wisc.edu        doExit = true;
1608171Stushar@csail.mit.edu        header.needExit = ReqType::immediate;
1618171Stushar@csail.mit.edu    } else {
1627039Snate@binkert.org        header.needExit = ReqType::none;
1637039Snate@binkert.org    }
1647039Snate@binkert.org    DistIface::master->sendCmd(header);
1657039Snate@binkert.org}
1666899SBrad.Beckmann@amd.com
1677039Snate@binkert.orgvoid
1687039Snate@binkert.orgDistIface::SyncSwitch::progress(Tick send_tick,
1697039Snate@binkert.org                                 Tick sync_repeat,
1707039Snate@binkert.org                                 ReqType need_ckpt,
1717039Snate@binkert.org                                 ReqType need_exit)
1726886SBrad.Beckmann@amd.com{
1737039Snate@binkert.org    std::unique_lock<std::mutex> sync_lock(lock);
1746145Snate@binkert.org    assert(waitNum > 0);
1756145Snate@binkert.org
1767055Snate@binkert.org    if (send_tick > nextAt)
1777055Snate@binkert.org        nextAt = send_tick;
1786145Snate@binkert.org    if (nextRepeat > sync_repeat)
1797039Snate@binkert.org        nextRepeat = sync_repeat;
1807055Snate@binkert.org
1817039Snate@binkert.org    if (need_ckpt == ReqType::collective)
1826145Snate@binkert.org        numCkptReq++;
1836145Snate@binkert.org    else if (need_ckpt == ReqType::immediate)
1847039Snate@binkert.org        doCkpt = true;
185    if (need_exit == ReqType::collective)
186        numExitReq++;
187    else if (need_exit == ReqType::immediate)
188        doExit = true;
189
190    waitNum--;
191    // Notify the simulation thread if the on-going sync is complete
192    if (waitNum == 0) {
193        sync_lock.unlock();
194        cv.notify_one();
195    }
196}
197
198void
199DistIface::SyncNode::progress(Tick max_send_tick,
200                               Tick next_repeat,
201                               ReqType do_ckpt,
202                               ReqType do_exit)
203{
204    std::unique_lock<std::mutex> sync_lock(lock);
205    assert(waitNum > 0);
206
207    nextAt = max_send_tick;
208    nextRepeat = next_repeat;
209    doCkpt = (do_ckpt != ReqType::none);
210    doExit = (do_exit != ReqType::none);
211
212    waitNum--;
213    // Notify the simulation thread if the on-going sync is complete
214    if (waitNum == 0) {
215        sync_lock.unlock();
216        cv.notify_one();
217    }
218}
219
220void
221DistIface::SyncNode::requestCkpt(ReqType req)
222{
223   std::lock_guard<std::mutex> sync_lock(lock);
224   assert(req != ReqType::none);
225   if (needCkpt != ReqType::none)
226       warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
227   if (needCkpt == ReqType::none || req == ReqType::immediate)
228       needCkpt = req;
229}
230
231void
232DistIface::SyncNode::requestExit(ReqType req)
233{
234   std::lock_guard<std::mutex> sync_lock(lock);
235   assert(req != ReqType::none);
236   if (needExit != ReqType::none)
237       warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
238   if (needExit == ReqType::none || req == ReqType::immediate)
239       needExit = req;
240}
241
242void
243DistIface::Sync::drainComplete()
244{
245    if (doCkpt) {
246        // The first DistIface object called this right before writing the
247        // checkpoint. We need to drain the underlying physical network here.
248        // Note that other gem5 peers may enter this barrier at different
249        // ticks due to draining.
250        run(false);
251        // Only the "first" DistIface object has to perform the sync
252        doCkpt = false;
253    }
254}
255
256void
257DistIface::SyncNode::serialize(CheckpointOut &cp) const
258{
259    int need_exit = static_cast<int>(needExit);
260    SERIALIZE_SCALAR(need_exit);
261}
262
263void
264DistIface::SyncNode::unserialize(CheckpointIn &cp)
265{
266    int need_exit;
267    UNSERIALIZE_SCALAR(need_exit);
268    needExit = static_cast<ReqType>(need_exit);
269}
270
271void
272DistIface::SyncSwitch::serialize(CheckpointOut &cp) const
273{
274    SERIALIZE_SCALAR(numExitReq);
275}
276
277void
278DistIface::SyncSwitch::unserialize(CheckpointIn &cp)
279{
280    UNSERIALIZE_SCALAR(numExitReq);
281}
282
283void
284DistIface::SyncEvent::start()
285{
286    // Note that this may be called either from startup() or drainResume()
287
288    // At this point, all DistIface objects has already called Sync::init() so
289    // we have a local minimum of the start tick and repeat for the periodic
290    // sync.
291    Tick firstAt  = DistIface::sync->firstAt;
292    repeat = DistIface::sync->nextRepeat;
293    // Do a global barrier to agree on a common repeat value (the smallest
294    // one from all participating nodes
295    DistIface::sync->run(curTick() == 0);
296
297    assert(!DistIface::sync->doCkpt);
298    assert(!DistIface::sync->doExit);
299    assert(DistIface::sync->nextAt >= curTick());
300    assert(DistIface::sync->nextRepeat <= repeat);
301
302    // if this is called at tick 0 then we use the config start param otherwise
303    // the maximum of the current tick of all participating nodes
304    if (curTick() == 0) {
305        assert(!scheduled());
306        assert(DistIface::sync->nextAt == 0);
307        schedule(firstAt);
308    } else {
309        if (scheduled())
310            reschedule(DistIface::sync->nextAt);
311        else
312            schedule(DistIface::sync->nextAt);
313    }
314    inform("Dist sync scheduled at %lu and repeats %lu\n",  when(),
315           DistIface::sync->nextRepeat);
316}
317
318void
319DistIface::SyncEvent::process()
320{
321    // We may not start a global periodic sync while draining before taking a
322    // checkpoint.  This is due to the possibility that peer gem5 processes
323    // may not hit the same periodic sync before they complete draining and
324    // that would make this periodic sync clash with sync called from
325    // DistIface::serialize() by other gem5 processes.
326    // We would need a 'distributed drain' solution to eliminate this
327    // restriction.
328    // Note that if draining was not triggered by checkpointing then we are
329    // fine since no extra global sync will happen (i.e. all peer gem5 will
330    // hit this periodic sync eventually).
331    panic_if(_draining && DistIface::sync->doCkpt,
332             "Distributed sync is hit while draining");
333    /*
334     * Note that this is a global event so this process method will be called
335     * by only exactly one thread.
336     */
337    /*
338     * We hold the eventq lock at this point but the receiver thread may
339     * need the lock to schedule new recv events while waiting for the
340     * dist sync to complete.
341     * Note that the other simulation threads also release their eventq
342     * locks while waiting for us due to the global event semantics.
343     */
344    {
345        EventQueue::ScopedRelease sr(curEventQueue());
346        // we do a global sync here that is supposed to happen at the same
347        // tick in all gem5 peers
348        DistIface::sync->run(true);
349        // global sync completed
350    }
351    if (DistIface::sync->doCkpt)
352        exitSimLoop("checkpoint");
353    if (DistIface::sync->doExit)
354        exitSimLoop("exit request from gem5 peers");
355
356    // schedule the next periodic sync
357    repeat = DistIface::sync->nextRepeat;
358    schedule(curTick() + repeat);
359}
360
361void
362DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay)
363{
364    // This is called from the receiver thread when it starts running. The new
365    // receiver thread shares the event queue with the simulation thread
366    // (associated with the simulated Ethernet link).
367    curEventQueue(eventManager->eventQueue());
368
369    recvDone = recv_done;
370    linkDelay = link_delay;
371}
372
373Tick
374DistIface::RecvScheduler::calcReceiveTick(Tick send_tick,
375                                          Tick send_delay,
376                                          Tick prev_recv_tick)
377{
378    Tick recv_tick = send_tick + send_delay + linkDelay;
379    // sanity check (we need atleast a send delay long window)
380    assert(recv_tick >= prev_recv_tick + send_delay);
381    panic_if(prev_recv_tick + send_delay > recv_tick,
382             "Receive window is smaller than send delay");
383    panic_if(recv_tick <= curTick(),
384             "Simulators out of sync - missed packet receive by %llu ticks"
385             "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
386             "linkDelay: %lu )",
387             curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
388             linkDelay);
389
390    return recv_tick;
391}
392
393void
394DistIface::RecvScheduler::resumeRecvTicks()
395{
396    // Schedule pending packets asap in case link speed/delay changed when
397    // restoring from the checkpoint.
398    // This may be done during unserialize except that curTick() is unknown
399    // so we call this during drainResume().
400    // If we are not restoring from a checkppint then link latency could not
401    // change so we just return.
402    if (!ckptRestore)
403        return;
404
405    std::vector<Desc> v;
406    while (!descQueue.empty()) {
407        Desc d = descQueue.front();
408        descQueue.pop();
409        d.sendTick = curTick();
410        d.sendDelay = d.packet->size(); // assume 1 tick/byte max link speed
411        v.push_back(d);
412    }
413
414    for (auto &d : v)
415        descQueue.push(d);
416
417    if (recvDone->scheduled()) {
418        assert(!descQueue.empty());
419        eventManager->reschedule(recvDone, curTick());
420    } else {
421        assert(descQueue.empty() && v.empty());
422    }
423    ckptRestore = false;
424}
425
426void
427DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet,
428                                     Tick send_tick,
429                                     Tick send_delay)
430{
431    // Note : this is called from the receiver thread
432    curEventQueue()->lock();
433    Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
434
435    DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
436            "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
437            send_tick, send_delay, linkDelay, recv_tick);
438    // Every packet must be sent and arrive in the same quantum
439    assert(send_tick > master->syncEvent->when() -
440           master->syncEvent->repeat);
441    // No packet may be scheduled for receive in the arrival quantum
442    assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
443
444    // Now we are about to schedule a recvDone event for the new data packet.
445    // We use the same recvDone object for all incoming data packets. Packet
446    // descriptors are saved in the ordered queue. The currently scheduled
447    // packet is always on the top of the queue.
448    // NOTE:  we use the event queue lock to protect the receive desc queue,
449    // too, which is accessed both by the receiver thread and the simulation
450    // thread.
451    descQueue.emplace(new_packet, send_tick, send_delay);
452    if (descQueue.size() == 1) {
453        assert(!recvDone->scheduled());
454        eventManager->schedule(recvDone, recv_tick);
455    } else {
456        assert(recvDone->scheduled());
457        panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
458                 "Out of order packet received (recv_tick: %lu top(): %lu\n",
459                 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
460    }
461    curEventQueue()->unlock();
462}
463
464EthPacketPtr
465DistIface::RecvScheduler::popPacket()
466{
467    // Note : this is called from the simulation thread when a receive done
468    // event is being processed for the link. We assume that the thread holds
469    // the event queue queue lock when this is called!
470    EthPacketPtr next_packet = descQueue.front().packet;
471    descQueue.pop();
472
473    if (descQueue.size() > 0) {
474        Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
475                                         descQueue.front().sendDelay,
476                                         curTick());
477        eventManager->schedule(recvDone, recv_tick);
478    }
479    prevRecvTick = curTick();
480    return next_packet;
481}
482
483void
484DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const
485{
486        SERIALIZE_SCALAR(sendTick);
487        SERIALIZE_SCALAR(sendDelay);
488        packet->serialize("rxPacket", cp);
489}
490
491void
492DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp)
493{
494        UNSERIALIZE_SCALAR(sendTick);
495        UNSERIALIZE_SCALAR(sendDelay);
496        packet = std::make_shared<EthPacketData>(16384);
497        packet->unserialize("rxPacket", cp);
498}
499
500void
501DistIface::RecvScheduler::serialize(CheckpointOut &cp) const
502{
503    SERIALIZE_SCALAR(prevRecvTick);
504    // serialize the receive desc queue
505    std::queue<Desc> tmp_queue(descQueue);
506    unsigned n_desc_queue = descQueue.size();
507    assert(tmp_queue.size() == descQueue.size());
508    SERIALIZE_SCALAR(n_desc_queue);
509    for (int i = 0; i < n_desc_queue; i++) {
510        tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
511        tmp_queue.pop();
512    }
513    assert(tmp_queue.empty());
514}
515
516void
517DistIface::RecvScheduler::unserialize(CheckpointIn &cp)
518{
519    assert(descQueue.size() == 0);
520    assert(!recvDone->scheduled());
521    assert(!ckptRestore);
522
523    UNSERIALIZE_SCALAR(prevRecvTick);
524    // unserialize the receive desc queue
525    unsigned n_desc_queue;
526    UNSERIALIZE_SCALAR(n_desc_queue);
527    for (int i = 0; i < n_desc_queue; i++) {
528        Desc recv_desc;
529        recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
530        descQueue.push(recv_desc);
531    }
532    ckptRestore = true;
533}
534
535DistIface::DistIface(unsigned dist_rank,
536                     unsigned dist_size,
537                     Tick sync_start,
538                     Tick sync_repeat,
539                     EventManager *em,
540                     bool is_switch, int num_nodes) :
541    syncStart(sync_start), syncRepeat(sync_repeat),
542    recvThread(nullptr), recvScheduler(em),
543    rank(dist_rank), size(dist_size)
544{
545    DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
546    isMaster = false;
547    if (master == nullptr) {
548        assert(sync == nullptr);
549        assert(syncEvent == nullptr);
550        if (is_switch)
551            sync = new SyncSwitch(num_nodes);
552        else
553            sync = new SyncNode();
554        syncEvent = new SyncEvent();
555        master = this;
556        isMaster = true;
557    }
558    distIfaceId = distIfaceNum;
559    distIfaceNum++;
560}
561
562DistIface::~DistIface()
563{
564    assert(recvThread);
565    delete recvThread;
566    if (this == master) {
567        assert(syncEvent);
568        delete syncEvent;
569        assert(sync);
570        delete sync;
571        master = nullptr;
572    }
573}
574
575void
576DistIface::packetOut(EthPacketPtr pkt, Tick send_delay)
577{
578    Header header;
579
580    // Prepare a dist header packet for the Ethernet packet we want to
581    // send out.
582    header.msgType = MsgType::dataDescriptor;
583    header.sendTick  = curTick();
584    header.sendDelay = send_delay;
585
586    header.dataPacketLength = pkt->size();
587
588    // Send out the packet and the meta info.
589    sendPacket(header, pkt);
590
591    DPRINTF(DistEthernetPkt,
592            "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
593            pkt->size(), send_delay);
594}
595
596void
597DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
598{
599    EthPacketPtr new_packet;
600    DistHeaderPkt::Header header;
601
602    // Initialize receive scheduler parameters
603    recvScheduler.init(recv_done, link_delay);
604
605    // Main loop to wait for and process any incoming message.
606    for (;;) {
607        // recvHeader() blocks until the next dist header packet comes in.
608        if (!recvHeader(header)) {
609            // We lost connection to the peer gem5 processes most likely
610            // because one of them called m5 exit. So we stop here.
611            // Grab the eventq lock to stop the simulation thread
612            curEventQueue()->lock();
613            exitSimLoop("Message server closed connection, simulator "
614                        "is exiting");
615            curEventQueue()->unlock();
616            break;
617        }
618
619        // We got a valid dist header packet, let's process it
620        if (header.msgType == MsgType::dataDescriptor) {
621            recvPacket(header, new_packet);
622            recvScheduler.pushPacket(new_packet,
623                                     header.sendTick,
624                                     header.sendDelay);
625        } else {
626            // everything else must be synchronisation related command
627            sync->progress(header.sendTick,
628                           header.syncRepeat,
629                           header.needCkpt,
630                           header.needExit);
631        }
632    }
633}
634
635void
636DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
637{
638    assert(recvThread == nullptr);
639
640    recvThread = new std::thread(&DistIface::recvThreadFunc,
641                                 this,
642                                 const_cast<Event *>(recv_done),
643                                 link_delay);
644    recvThreadsNum++;
645}
646
647DrainState
648DistIface::drain()
649{
650    DPRINTF(DistEthernet,"DistIFace::drain() called\n");
651    // This can be called multiple times in the same drain cycle.
652    if (this == master)
653        syncEvent->draining(true);
654    return DrainState::Drained;
655}
656
657void
658DistIface::drainResume() {
659    DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
660    if (this == master)
661        syncEvent->draining(false);
662    recvScheduler.resumeRecvTicks();
663}
664
665void
666DistIface::serialize(CheckpointOut &cp) const
667{
668    // Drain the dist interface before the checkpoint is taken. We cannot call
669    // this as part of the normal drain cycle because this dist sync has to be
670    // called exactly once after the system is fully drained.
671    sync->drainComplete();
672
673    unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
674
675    SERIALIZE_SCALAR(rank_orig);
676    SERIALIZE_SCALAR(dist_iface_id_orig);
677
678    recvScheduler.serializeSection(cp, "recvScheduler");
679    if (this == master) {
680        sync->serializeSection(cp, "Sync");
681    }
682}
683
684void
685DistIface::unserialize(CheckpointIn &cp)
686{
687    unsigned rank_orig, dist_iface_id_orig;
688    UNSERIALIZE_SCALAR(rank_orig);
689    UNSERIALIZE_SCALAR(dist_iface_id_orig);
690
691    panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
692             rank, rank_orig);
693    panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
694             "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
695             dist_iface_id_orig);
696
697    recvScheduler.unserializeSection(cp, "recvScheduler");
698    if (this == master) {
699        sync->unserializeSection(cp, "Sync");
700    }
701}
702
703void
704DistIface::init(const Event *done_event, Tick link_delay)
705{
706    // Init hook for the underlaying message transport to setup/finalize
707    // communication channels
708    initTransport();
709
710    // Spawn a new receiver thread that will process messages
711    // coming in from peer gem5 processes.
712    // The receive thread will also schedule a (receive) doneEvent
713    // for each incoming data packet.
714    spawnRecvThread(done_event, link_delay);
715
716
717    // Adjust the periodic sync start and interval. Different DistIface
718    // might have different requirements. The singleton sync object
719    // will select the minimum values for both params.
720    assert(sync != nullptr);
721    sync->init(syncStart, syncRepeat);
722
723    // Initialize the seed for random generator to avoid the same sequence
724    // in all gem5 peer processes
725    assert(master != nullptr);
726    if (this == master)
727        random_mt.init(5489 * (rank+1) + 257);
728}
729
730void
731DistIface::startup()
732{
733    DPRINTF(DistEthernet, "DistIface::startup() started\n");
734    if (this == master)
735        syncEvent->start();
736    DPRINTF(DistEthernet, "DistIface::startup() done\n");
737}
738
739bool
740DistIface::readyToCkpt(Tick delay, Tick period)
741{
742    bool ret = true;
743    DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
744            "period:%lu\n", delay, period);
745    if (master) {
746        if (delay == 0) {
747            inform("m5 checkpoint called with zero delay => triggering collaborative "
748                   "checkpoint\n");
749            sync->requestCkpt(ReqType::collective);
750        } else {
751            inform("m5 checkpoint called with non-zero delay => triggering immediate "
752                   "checkpoint (at the next sync)\n");
753            sync->requestCkpt(ReqType::immediate);
754        }
755        if (period != 0)
756            inform("Non-zero period for m5_ckpt is ignored in "
757                   "distributed gem5 runs\n");
758        ret = false;
759    }
760    return ret;
761}
762
763bool
764DistIface::readyToExit(Tick delay)
765{
766    bool ret = true;
767    DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
768            delay);
769    if (master) {
770        if (delay == 0) {
771            inform("m5 exit called with zero delay => triggering collaborative "
772                   "exit\n");
773            sync->requestExit(ReqType::collective);
774        } else {
775            inform("m5 exit called with non-zero delay => triggering immediate "
776                   "exit (at the next sync)\n");
777            sync->requestExit(ReqType::immediate);
778        }
779        ret = false;
780    }
781    return ret;
782}
783
784uint64_t
785DistIface::rankParam()
786{
787    uint64_t val;
788    if (master) {
789        val = master->rank;
790    } else {
791        warn("Dist-rank parameter is queried in single gem5 simulation.");
792        val = 0;
793    }
794    return val;
795}
796
797uint64_t
798DistIface::sizeParam()
799{
800    uint64_t val;
801    if (master) {
802        val = master->size;
803    } else {
804        warn("Dist-size parameter is queried in single gem5 simulation.");
805        val = 1;
806    }
807    return val;
808}
809