dist_iface.cc revision 11703
1/*
2 * Copyright (c) 2015 ARM Limited
3 * All rights reserved
4 *
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder.  You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated
11 * unmodified and in its entirety in all distributions of the software,
12 * modified or unmodified, in source code or in binary form.
13 *
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are
16 * met: redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer;
18 * redistributions in binary form must reproduce the above copyright
19 * notice, this list of conditions and the following disclaimer in the
20 * documentation and/or other materials provided with the distribution;
21 * neither the name of the copyright holders nor the names of its
22 * contributors may be used to endorse or promote products derived from
23 * this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 * Authors: Gabor Dozsa
38 */
39
40/* @file
41 * The interface class for dist-gem5 simulations.
42 */
43
44#include "dev/net/dist_iface.hh"
45
46#include <queue>
47#include <thread>
48
49#include "base/random.hh"
50#include "base/trace.hh"
51#include "cpu/thread_context.hh"
52#include "debug/DistEthernet.hh"
53#include "debug/DistEthernetPkt.hh"
54#include "dev/net/etherpkt.hh"
55#include "sim/sim_exit.hh"
56#include "sim/sim_object.hh"
57#include "sim/system.hh"
58
59using namespace std;
60DistIface::Sync *DistIface::sync = nullptr;
61System *DistIface::sys = nullptr;
62DistIface::SyncEvent *DistIface::syncEvent = nullptr;
63unsigned DistIface::distIfaceNum = 0;
64unsigned DistIface::recvThreadsNum = 0;
65DistIface *DistIface::master = nullptr;
66bool DistIface::isSwitch = false;
67
68void
69DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
70{
71    if (start_tick < nextAt) {
72        nextAt = start_tick;
73        inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
74    }
75
76    if (repeat_tick == 0)
77        panic("Dist synchronisation interval must be greater than zero");
78
79    if (repeat_tick < nextRepeat) {
80        nextRepeat = repeat_tick;
81        inform("Dist synchronisation interval is changed to %lu.\n",
82               nextRepeat);
83    }
84}
85
86DistIface::SyncSwitch::SyncSwitch(int num_nodes)
87{
88    numNodes = num_nodes;
89    waitNum = num_nodes;
90    numExitReq = 0;
91    numCkptReq = 0;
92    numStopSyncReq = 0;
93    doExit = false;
94    doCkpt = false;
95    doStopSync = false;
96    nextAt = std::numeric_limits<Tick>::max();
97    nextRepeat = std::numeric_limits<Tick>::max();
98}
99
100DistIface::SyncNode::SyncNode()
101{
102    waitNum = 0;
103    needExit = ReqType::none;
104    needCkpt = ReqType::none;
105    needStopSync = ReqType::none;
106    doExit = false;
107    doCkpt = false;
108    doStopSync = false;
109    nextAt = std::numeric_limits<Tick>::max();
110    nextRepeat = std::numeric_limits<Tick>::max();
111}
112
113void
114DistIface::SyncNode::run(bool same_tick)
115{
116    std::unique_lock<std::mutex> sync_lock(lock);
117    Header header;
118
119    assert(waitNum == 0);
120    waitNum = DistIface::recvThreadsNum;
121    // initiate the global synchronisation
122    header.msgType = MsgType::cmdSyncReq;
123    header.sendTick = curTick();
124    header.syncRepeat = nextRepeat;
125    header.needCkpt = needCkpt;
126    header.needStopSync = needStopSync;
127    if (needCkpt != ReqType::none)
128        needCkpt = ReqType::pending;
129    header.needExit = needExit;
130    if (needExit != ReqType::none)
131        needExit = ReqType::pending;
132    if (needStopSync != ReqType::none)
133        needStopSync = ReqType::pending;
134    DistIface::master->sendCmd(header);
135    // now wait until all receiver threads complete the synchronisation
136    auto lf = [this]{ return waitNum == 0; };
137    cv.wait(sync_lock, lf);
138    // global synchronisation is done
139    assert(!same_tick || (nextAt == curTick()));
140}
141
142
143void
144DistIface::SyncSwitch::run(bool same_tick)
145{
146    std::unique_lock<std::mutex> sync_lock(lock);
147    Header header;
148    // Wait for the sync requests from the nodes
149    if (waitNum > 0) {
150        auto lf = [this]{ return waitNum == 0; };
151        cv.wait(sync_lock, lf);
152    }
153    assert(waitNum == 0);
154    assert(!same_tick || (nextAt == curTick()));
155    waitNum = numNodes;
156    // Complete the global synchronisation
157    header.msgType = MsgType::cmdSyncAck;
158    header.sendTick = nextAt;
159    header.syncRepeat = nextRepeat;
160    if (doCkpt || numCkptReq == numNodes) {
161        doCkpt = true;
162        header.needCkpt = ReqType::immediate;
163        numCkptReq = 0;
164    } else {
165        header.needCkpt = ReqType::none;
166    }
167    if (doExit || numExitReq == numNodes) {
168        doExit = true;
169        header.needExit = ReqType::immediate;
170    } else {
171        header.needExit = ReqType::none;
172    }
173    if (doStopSync || numStopSyncReq == numNodes) {
174        doStopSync = true;
175        numStopSyncReq = 0;
176        header.needStopSync = ReqType::immediate;
177    } else {
178        header.needStopSync = ReqType::none;
179    }
180    DistIface::master->sendCmd(header);
181}
182
183void
184DistIface::SyncSwitch::progress(Tick send_tick,
185                                 Tick sync_repeat,
186                                 ReqType need_ckpt,
187                                 ReqType need_exit,
188                                 ReqType need_stop_sync)
189{
190    std::unique_lock<std::mutex> sync_lock(lock);
191    assert(waitNum > 0);
192
193    if (send_tick > nextAt)
194        nextAt = send_tick;
195    if (nextRepeat > sync_repeat)
196        nextRepeat = sync_repeat;
197
198    if (need_ckpt == ReqType::collective)
199        numCkptReq++;
200    else if (need_ckpt == ReqType::immediate)
201        doCkpt = true;
202    if (need_exit == ReqType::collective)
203        numExitReq++;
204    else if (need_exit == ReqType::immediate)
205        doExit = true;
206    if (need_stop_sync == ReqType::collective)
207        numStopSyncReq++;
208    else if (need_stop_sync == ReqType::immediate)
209        doStopSync = true;
210
211    waitNum--;
212    // Notify the simulation thread if the on-going sync is complete
213    if (waitNum == 0) {
214        sync_lock.unlock();
215        cv.notify_one();
216    }
217}
218
219void
220DistIface::SyncNode::progress(Tick max_send_tick,
221                               Tick next_repeat,
222                               ReqType do_ckpt,
223                               ReqType do_exit,
224                               ReqType do_stop_sync)
225{
226    std::unique_lock<std::mutex> sync_lock(lock);
227    assert(waitNum > 0);
228
229    nextAt = max_send_tick;
230    nextRepeat = next_repeat;
231    doCkpt = (do_ckpt != ReqType::none);
232    doExit = (do_exit != ReqType::none);
233    doStopSync = (do_stop_sync != ReqType::none);
234
235    waitNum--;
236    // Notify the simulation thread if the on-going sync is complete
237    if (waitNum == 0) {
238        sync_lock.unlock();
239        cv.notify_one();
240    }
241}
242
243void
244DistIface::SyncNode::requestCkpt(ReqType req)
245{
246   std::lock_guard<std::mutex> sync_lock(lock);
247   assert(req != ReqType::none);
248   if (needCkpt != ReqType::none)
249       warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
250   if (needCkpt == ReqType::none || req == ReqType::immediate)
251       needCkpt = req;
252}
253
254void
255DistIface::SyncNode::requestExit(ReqType req)
256{
257   std::lock_guard<std::mutex> sync_lock(lock);
258   assert(req != ReqType::none);
259   if (needExit != ReqType::none)
260       warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
261   if (needExit == ReqType::none || req == ReqType::immediate)
262       needExit = req;
263}
264
265void
266DistIface::Sync::drainComplete()
267{
268    if (doCkpt) {
269        // The first DistIface object called this right before writing the
270        // checkpoint. We need to drain the underlying physical network here.
271        // Note that other gem5 peers may enter this barrier at different
272        // ticks due to draining.
273        run(false);
274        // Only the "first" DistIface object has to perform the sync
275        doCkpt = false;
276    }
277}
278
279void
280DistIface::SyncNode::serialize(CheckpointOut &cp) const
281{
282    int need_exit = static_cast<int>(needExit);
283    SERIALIZE_SCALAR(need_exit);
284}
285
286void
287DistIface::SyncNode::unserialize(CheckpointIn &cp)
288{
289    int need_exit;
290    UNSERIALIZE_SCALAR(need_exit);
291    needExit = static_cast<ReqType>(need_exit);
292}
293
294void
295DistIface::SyncSwitch::serialize(CheckpointOut &cp) const
296{
297    SERIALIZE_SCALAR(numExitReq);
298}
299
300void
301DistIface::SyncSwitch::unserialize(CheckpointIn &cp)
302{
303    UNSERIALIZE_SCALAR(numExitReq);
304}
305
306void
307DistIface::SyncEvent::start()
308{
309    // Note that this may be called either from startup() or drainResume()
310
311    // At this point, all DistIface objects has already called Sync::init() so
312    // we have a local minimum of the start tick and repeat for the periodic
313    // sync.
314    repeat = DistIface::sync->nextRepeat;
315    // Do a global barrier to agree on a common repeat value (the smallest
316    // one from all participating nodes.
317    DistIface::sync->run(false);
318
319    assert(!DistIface::sync->doCkpt);
320    assert(!DistIface::sync->doExit);
321    assert(!DistIface::sync->doStopSync);
322    assert(DistIface::sync->nextAt >= curTick());
323    assert(DistIface::sync->nextRepeat <= repeat);
324
325    if (curTick() == 0)
326        assert(!scheduled());
327
328    // Use the maximum of the current tick for all participating nodes or a
329    // user provided starting tick.
330    if (scheduled())
331        reschedule(DistIface::sync->nextAt);
332    else
333        schedule(DistIface::sync->nextAt);
334
335    inform("Dist sync scheduled at %lu and repeats %lu\n",  when(),
336           DistIface::sync->nextRepeat);
337}
338
339void
340DistIface::SyncEvent::process()
341{
342    // We may not start a global periodic sync while draining before taking a
343    // checkpoint.  This is due to the possibility that peer gem5 processes
344    // may not hit the same periodic sync before they complete draining and
345    // that would make this periodic sync clash with sync called from
346    // DistIface::serialize() by other gem5 processes.
347    // We would need a 'distributed drain' solution to eliminate this
348    // restriction.
349    // Note that if draining was not triggered by checkpointing then we are
350    // fine since no extra global sync will happen (i.e. all peer gem5 will
351    // hit this periodic sync eventually).
352    panic_if(_draining && DistIface::sync->doCkpt,
353             "Distributed sync is hit while draining");
354    /*
355     * Note that this is a global event so this process method will be called
356     * by only exactly one thread.
357     */
358    /*
359     * We hold the eventq lock at this point but the receiver thread may
360     * need the lock to schedule new recv events while waiting for the
361     * dist sync to complete.
362     * Note that the other simulation threads also release their eventq
363     * locks while waiting for us due to the global event semantics.
364     */
365    {
366        EventQueue::ScopedRelease sr(curEventQueue());
367        // we do a global sync here that is supposed to happen at the same
368        // tick in all gem5 peers
369        DistIface::sync->run(true);
370        // global sync completed
371    }
372    if (DistIface::sync->doCkpt)
373        exitSimLoop("checkpoint");
374    if (DistIface::sync->doExit)
375        exitSimLoop("exit request from gem5 peers");
376    if (DistIface::sync->doStopSync) {
377        DistIface::sync->doStopSync = false;
378        inform("synchronization disabled at %lu\n", curTick());
379
380        // The switch node needs to wait for the next sync immediately.
381        if (DistIface::isSwitch) {
382            start();
383        } else {
384            // Wake up thread contexts on non-switch nodes.
385            for (int i = 0; i < DistIface::master->sys->numContexts(); i++) {
386                ThreadContext *tc =
387                    DistIface::master->sys->getThreadContext(i);
388                if (tc->status() == ThreadContext::Suspended)
389                    tc->activate();
390                else
391                    warn_once("Tried to wake up thread in dist-gem5, but it "
392                              "was already awake!\n");
393            }
394        }
395        return;
396    }
397    // schedule the next periodic sync
398    repeat = DistIface::sync->nextRepeat;
399    schedule(curTick() + repeat);
400}
401
402void
403DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay)
404{
405    // This is called from the receiver thread when it starts running. The new
406    // receiver thread shares the event queue with the simulation thread
407    // (associated with the simulated Ethernet link).
408    curEventQueue(eventManager->eventQueue());
409
410    recvDone = recv_done;
411    linkDelay = link_delay;
412}
413
414Tick
415DistIface::RecvScheduler::calcReceiveTick(Tick send_tick,
416                                          Tick send_delay,
417                                          Tick prev_recv_tick)
418{
419    Tick recv_tick = send_tick + send_delay + linkDelay;
420    // sanity check (we need atleast a send delay long window)
421    assert(recv_tick >= prev_recv_tick + send_delay);
422    panic_if(prev_recv_tick + send_delay > recv_tick,
423             "Receive window is smaller than send delay");
424    panic_if(recv_tick <= curTick(),
425             "Simulators out of sync - missed packet receive by %llu ticks"
426             "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
427             "linkDelay: %lu )",
428             curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
429             linkDelay);
430
431    return recv_tick;
432}
433
434void
435DistIface::RecvScheduler::resumeRecvTicks()
436{
437    // Schedule pending packets asap in case link speed/delay changed when
438    // restoring from the checkpoint.
439    // This may be done during unserialize except that curTick() is unknown
440    // so we call this during drainResume().
441    // If we are not restoring from a checkppint then link latency could not
442    // change so we just return.
443    if (!ckptRestore)
444        return;
445
446    std::vector<Desc> v;
447    while (!descQueue.empty()) {
448        Desc d = descQueue.front();
449        descQueue.pop();
450        d.sendTick = curTick();
451        d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed
452        v.push_back(d);
453    }
454
455    for (auto &d : v)
456        descQueue.push(d);
457
458    if (recvDone->scheduled()) {
459        assert(!descQueue.empty());
460        eventManager->reschedule(recvDone, curTick());
461    } else {
462        assert(descQueue.empty() && v.empty());
463    }
464    ckptRestore = false;
465}
466
467void
468DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet,
469                                     Tick send_tick,
470                                     Tick send_delay)
471{
472    // Note : this is called from the receiver thread
473    curEventQueue()->lock();
474    Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
475
476    DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
477            "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
478            send_tick, send_delay, linkDelay, recv_tick);
479    // Every packet must be sent and arrive in the same quantum
480    assert(send_tick > master->syncEvent->when() -
481           master->syncEvent->repeat);
482    // No packet may be scheduled for receive in the arrival quantum
483    assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
484
485    // Now we are about to schedule a recvDone event for the new data packet.
486    // We use the same recvDone object for all incoming data packets. Packet
487    // descriptors are saved in the ordered queue. The currently scheduled
488    // packet is always on the top of the queue.
489    // NOTE:  we use the event queue lock to protect the receive desc queue,
490    // too, which is accessed both by the receiver thread and the simulation
491    // thread.
492    descQueue.emplace(new_packet, send_tick, send_delay);
493    if (descQueue.size() == 1) {
494        assert(!recvDone->scheduled());
495        eventManager->schedule(recvDone, recv_tick);
496    } else {
497        assert(recvDone->scheduled());
498        panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
499                 "Out of order packet received (recv_tick: %lu top(): %lu\n",
500                 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
501    }
502    curEventQueue()->unlock();
503}
504
505EthPacketPtr
506DistIface::RecvScheduler::popPacket()
507{
508    // Note : this is called from the simulation thread when a receive done
509    // event is being processed for the link. We assume that the thread holds
510    // the event queue queue lock when this is called!
511    EthPacketPtr next_packet = descQueue.front().packet;
512    descQueue.pop();
513
514    if (descQueue.size() > 0) {
515        Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
516                                         descQueue.front().sendDelay,
517                                         curTick());
518        eventManager->schedule(recvDone, recv_tick);
519    }
520    prevRecvTick = curTick();
521    return next_packet;
522}
523
524void
525DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const
526{
527        SERIALIZE_SCALAR(sendTick);
528        SERIALIZE_SCALAR(sendDelay);
529        packet->serialize("rxPacket", cp);
530}
531
532void
533DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp)
534{
535        UNSERIALIZE_SCALAR(sendTick);
536        UNSERIALIZE_SCALAR(sendDelay);
537        packet = std::make_shared<EthPacketData>();
538        packet->unserialize("rxPacket", cp);
539}
540
541void
542DistIface::RecvScheduler::serialize(CheckpointOut &cp) const
543{
544    SERIALIZE_SCALAR(prevRecvTick);
545    // serialize the receive desc queue
546    std::queue<Desc> tmp_queue(descQueue);
547    unsigned n_desc_queue = descQueue.size();
548    assert(tmp_queue.size() == descQueue.size());
549    SERIALIZE_SCALAR(n_desc_queue);
550    for (int i = 0; i < n_desc_queue; i++) {
551        tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
552        tmp_queue.pop();
553    }
554    assert(tmp_queue.empty());
555}
556
557void
558DistIface::RecvScheduler::unserialize(CheckpointIn &cp)
559{
560    assert(descQueue.size() == 0);
561    assert(!recvDone->scheduled());
562    assert(!ckptRestore);
563
564    UNSERIALIZE_SCALAR(prevRecvTick);
565    // unserialize the receive desc queue
566    unsigned n_desc_queue;
567    UNSERIALIZE_SCALAR(n_desc_queue);
568    for (int i = 0; i < n_desc_queue; i++) {
569        Desc recv_desc;
570        recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
571        descQueue.push(recv_desc);
572    }
573    ckptRestore = true;
574}
575
576DistIface::DistIface(unsigned dist_rank,
577                     unsigned dist_size,
578                     Tick sync_start,
579                     Tick sync_repeat,
580                     EventManager *em,
581                     bool use_pseudo_op,
582                     bool is_switch, int num_nodes) :
583    syncStart(sync_start), syncRepeat(sync_repeat),
584    recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op),
585    rank(dist_rank), size(dist_size)
586{
587    DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
588    isMaster = false;
589    if (master == nullptr) {
590        assert(sync == nullptr);
591        assert(syncEvent == nullptr);
592        isSwitch = is_switch;
593        if (is_switch)
594            sync = new SyncSwitch(num_nodes);
595        else
596            sync = new SyncNode();
597        syncEvent = new SyncEvent();
598        master = this;
599        isMaster = true;
600    }
601    distIfaceId = distIfaceNum;
602    distIfaceNum++;
603}
604
605DistIface::~DistIface()
606{
607    assert(recvThread);
608    delete recvThread;
609    if (this == master) {
610        assert(syncEvent);
611        delete syncEvent;
612        assert(sync);
613        delete sync;
614        master = nullptr;
615    }
616}
617
618void
619DistIface::packetOut(EthPacketPtr pkt, Tick send_delay)
620{
621    Header header;
622
623    // Prepare a dist header packet for the Ethernet packet we want to
624    // send out.
625    header.msgType = MsgType::dataDescriptor;
626    header.sendTick  = curTick();
627    header.sendDelay = send_delay;
628
629    header.dataPacketLength = pkt->length;
630    header.simLength = pkt->simLength;
631
632    // Send out the packet and the meta info.
633    sendPacket(header, pkt);
634
635    DPRINTF(DistEthernetPkt,
636            "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
637            pkt->length, send_delay);
638}
639
640void
641DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
642{
643    EthPacketPtr new_packet;
644    DistHeaderPkt::Header header;
645
646    // Initialize receive scheduler parameters
647    recvScheduler.init(recv_done, link_delay);
648
649    // Main loop to wait for and process any incoming message.
650    for (;;) {
651        // recvHeader() blocks until the next dist header packet comes in.
652        if (!recvHeader(header)) {
653            // We lost connection to the peer gem5 processes most likely
654            // because one of them called m5 exit. So we stop here.
655            // Grab the eventq lock to stop the simulation thread
656            curEventQueue()->lock();
657            exitSimLoop("Message server closed connection, simulator "
658                        "is exiting");
659            curEventQueue()->unlock();
660            break;
661        }
662
663        // We got a valid dist header packet, let's process it
664        if (header.msgType == MsgType::dataDescriptor) {
665            recvPacket(header, new_packet);
666            recvScheduler.pushPacket(new_packet,
667                                     header.sendTick,
668                                     header.sendDelay);
669        } else {
670            // everything else must be synchronisation related command
671            sync->progress(header.sendTick,
672                           header.syncRepeat,
673                           header.needCkpt,
674                           header.needExit,
675                           header.needStopSync);
676        }
677    }
678}
679
680void
681DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
682{
683    assert(recvThread == nullptr);
684
685    recvThread = new std::thread(&DistIface::recvThreadFunc,
686                                 this,
687                                 const_cast<Event *>(recv_done),
688                                 link_delay);
689    recvThreadsNum++;
690}
691
692DrainState
693DistIface::drain()
694{
695    DPRINTF(DistEthernet,"DistIFace::drain() called\n");
696    // This can be called multiple times in the same drain cycle.
697    if (this == master)
698        syncEvent->draining(true);
699    return DrainState::Drained;
700}
701
702void
703DistIface::drainResume() {
704    DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
705    if (this == master)
706        syncEvent->draining(false);
707    recvScheduler.resumeRecvTicks();
708}
709
710void
711DistIface::serialize(CheckpointOut &cp) const
712{
713    // Drain the dist interface before the checkpoint is taken. We cannot call
714    // this as part of the normal drain cycle because this dist sync has to be
715    // called exactly once after the system is fully drained.
716    sync->drainComplete();
717
718    unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
719
720    SERIALIZE_SCALAR(rank_orig);
721    SERIALIZE_SCALAR(dist_iface_id_orig);
722
723    recvScheduler.serializeSection(cp, "recvScheduler");
724    if (this == master) {
725        sync->serializeSection(cp, "Sync");
726    }
727}
728
729void
730DistIface::unserialize(CheckpointIn &cp)
731{
732    unsigned rank_orig, dist_iface_id_orig;
733    UNSERIALIZE_SCALAR(rank_orig);
734    UNSERIALIZE_SCALAR(dist_iface_id_orig);
735
736    panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
737             rank, rank_orig);
738    panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
739             "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
740             dist_iface_id_orig);
741
742    recvScheduler.unserializeSection(cp, "recvScheduler");
743    if (this == master) {
744        sync->unserializeSection(cp, "Sync");
745    }
746}
747
748void
749DistIface::init(const Event *done_event, Tick link_delay)
750{
751    // Init hook for the underlaying message transport to setup/finalize
752    // communication channels
753    initTransport();
754
755    // Spawn a new receiver thread that will process messages
756    // coming in from peer gem5 processes.
757    // The receive thread will also schedule a (receive) doneEvent
758    // for each incoming data packet.
759    spawnRecvThread(done_event, link_delay);
760
761
762    // Adjust the periodic sync start and interval. Different DistIface
763    // might have different requirements. The singleton sync object
764    // will select the minimum values for both params.
765    assert(sync != nullptr);
766    sync->init(syncStart, syncRepeat);
767
768    // Initialize the seed for random generator to avoid the same sequence
769    // in all gem5 peer processes
770    assert(master != nullptr);
771    if (this == master)
772        random_mt.init(5489 * (rank+1) + 257);
773}
774
775void
776DistIface::startup()
777{
778    DPRINTF(DistEthernet, "DistIface::startup() started\n");
779    // Schedule synchronization unless we are not a switch in pseudo_op mode.
780    if (this == master && (!syncStartOnPseudoOp || isSwitch))
781        syncEvent->start();
782    DPRINTF(DistEthernet, "DistIface::startup() done\n");
783}
784
785bool
786DistIface::readyToCkpt(Tick delay, Tick period)
787{
788    bool ret = true;
789    DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
790            "period:%lu\n", delay, period);
791    if (master) {
792        if (delay == 0) {
793            inform("m5 checkpoint called with zero delay => triggering collaborative "
794                   "checkpoint\n");
795            sync->requestCkpt(ReqType::collective);
796        } else {
797            inform("m5 checkpoint called with non-zero delay => triggering immediate "
798                   "checkpoint (at the next sync)\n");
799            sync->requestCkpt(ReqType::immediate);
800        }
801        if (period != 0)
802            inform("Non-zero period for m5_ckpt is ignored in "
803                   "distributed gem5 runs\n");
804        ret = false;
805    }
806    return ret;
807}
808
809void
810DistIface::SyncNode::requestStopSync(ReqType req)
811{
812   std::lock_guard<std::mutex> sync_lock(lock);
813   needStopSync = req;
814}
815
816void
817DistIface::toggleSync(ThreadContext *tc)
818{
819    // Unforunate that we have to populate the system pointer member this way.
820    master->sys = tc->getSystemPtr();
821
822    // The invariant for both syncing and "unsyncing" is that all threads will
823    // stop executing intructions until the desired sync state has been reached
824    // for all nodes.  This is the easiest way to prevent deadlock (in the case
825    // of "unsyncing") and causality errors (in the case of syncing).
826    if (master->syncEvent->scheduled()) {
827        inform("Request toggling syncronization off\n");
828        master->sync->requestStopSync(ReqType::collective);
829
830        // At this point, we have no clue when everyone will reach the sync
831        // stop point.  Suspend execution of all local thread contexts.
832        // Dist-gem5 will reactivate all thread contexts when everyone has
833        // reached the sync stop point.
834        for (int i = 0; i < master->sys->numContexts(); i++) {
835            ThreadContext *tc = master->sys->getThreadContext(i);
836            if (tc->status() == ThreadContext::Active)
837                tc->quiesce();
838        }
839    } else {
840        inform("Request toggling syncronization on\n");
841        master->syncEvent->start();
842
843        // We need to suspend all CPUs until the sync point is reached by all
844        // nodes to prevent causality errors.  We can also schedule CPU
845        // activation here, since we know exactly when the next sync will
846        // occur.
847        for (int i = 0; i < master->sys->numContexts(); i++) {
848            ThreadContext *tc = master->sys->getThreadContext(i);
849            if (tc->status() == ThreadContext::Active)
850                tc->quiesceTick(master->syncEvent->when() + 1);
851        }
852    }
853}
854
855bool
856DistIface::readyToExit(Tick delay)
857{
858    bool ret = true;
859    DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
860            delay);
861    if (master) {
862        // To successfully coordinate an exit, all nodes must be synchronising
863        if (!master->syncEvent->scheduled())
864            master->syncEvent->start();
865
866        if (delay == 0) {
867            inform("m5 exit called with zero delay => triggering collaborative "
868                   "exit\n");
869            sync->requestExit(ReqType::collective);
870        } else {
871            inform("m5 exit called with non-zero delay => triggering immediate "
872                   "exit (at the next sync)\n");
873            sync->requestExit(ReqType::immediate);
874        }
875        ret = false;
876    }
877    return ret;
878}
879
880uint64_t
881DistIface::rankParam()
882{
883    uint64_t val;
884    if (master) {
885        val = master->rank;
886    } else {
887        warn("Dist-rank parameter is queried in single gem5 simulation.");
888        val = 0;
889    }
890    return val;
891}
892
893uint64_t
894DistIface::sizeParam()
895{
896    uint64_t val;
897    if (master) {
898        val = master->size;
899    } else {
900        warn("Dist-size parameter is queried in single gem5 simulation.");
901        val = 1;
902    }
903    return val;
904}
905