1/*
2 * Copyright (c) 2015-2016 ARM Limited
3 * All rights reserved
4 *
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder.  You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated
11 * unmodified and in its entirety in all distributions of the software,
12 * modified or unmodified, in source code or in binary form.
13 *
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are
16 * met: redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer;
18 * redistributions in binary form must reproduce the above copyright
19 * notice, this list of conditions and the following disclaimer in the
20 * documentation and/or other materials provided with the distribution;
21 * neither the name of the copyright holders nor the names of its
22 * contributors may be used to endorse or promote products derived from
23 * this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 * Authors: Gabor Dozsa
38 */
39
40/* @file
41 * The interface class for dist-gem5 simulations.
42 */
43
44#include "dev/net/dist_iface.hh"
45
46#include <queue>
47#include <thread>
48
49#include "base/random.hh"
50#include "base/trace.hh"
51#include "cpu/thread_context.hh"
52#include "debug/DistEthernet.hh"
53#include "debug/DistEthernetPkt.hh"
54#include "dev/net/etherpkt.hh"
55#include "sim/sim_exit.hh"
56#include "sim/sim_object.hh"
57#include "sim/system.hh"
58
59using namespace std;
60DistIface::Sync *DistIface::sync = nullptr;
61System *DistIface::sys = nullptr;
62DistIface::SyncEvent *DistIface::syncEvent = nullptr;
63unsigned DistIface::distIfaceNum = 0;
64unsigned DistIface::recvThreadsNum = 0;
65DistIface *DistIface::master = nullptr;
66bool DistIface::isSwitch = false;
67
68void
69DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
70{
71    if (start_tick < nextAt) {
72        nextAt = start_tick;
73        inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
74    }
75
76    if (repeat_tick == 0)
77        panic("Dist synchronisation interval must be greater than zero");
78
79    if (repeat_tick < nextRepeat) {
80        nextRepeat = repeat_tick;
81        inform("Dist synchronisation interval is changed to %lu.\n",
82               nextRepeat);
83    }
84}
85
86void
87DistIface::Sync::abort()
88{
89    std::unique_lock<std::mutex> sync_lock(lock);
90    waitNum = 0;
91    isAbort = true;
92    sync_lock.unlock();
93    cv.notify_one();
94}
95
96DistIface::SyncSwitch::SyncSwitch(int num_nodes)
97{
98    numNodes = num_nodes;
99    waitNum = num_nodes;
100    numExitReq = 0;
101    numCkptReq = 0;
102    numStopSyncReq = 0;
103    doExit = false;
104    doCkpt = false;
105    doStopSync = false;
106    nextAt = std::numeric_limits<Tick>::max();
107    nextRepeat = std::numeric_limits<Tick>::max();
108    isAbort = false;
109}
110
111DistIface::SyncNode::SyncNode()
112{
113    waitNum = 0;
114    needExit = ReqType::none;
115    needCkpt = ReqType::none;
116    needStopSync = ReqType::none;
117    doExit = false;
118    doCkpt = false;
119    doStopSync = false;
120    nextAt = std::numeric_limits<Tick>::max();
121    nextRepeat = std::numeric_limits<Tick>::max();
122    isAbort = false;
123}
124
125bool
126DistIface::SyncNode::run(bool same_tick)
127{
128    std::unique_lock<std::mutex> sync_lock(lock);
129    Header header;
130
131    assert(waitNum == 0);
132    assert(!isAbort);
133    waitNum = DistIface::recvThreadsNum;
134    // initiate the global synchronisation
135    header.msgType = MsgType::cmdSyncReq;
136    header.sendTick = curTick();
137    header.syncRepeat = nextRepeat;
138    header.needCkpt = needCkpt;
139    header.needStopSync = needStopSync;
140    if (needCkpt != ReqType::none)
141        needCkpt = ReqType::pending;
142    header.needExit = needExit;
143    if (needExit != ReqType::none)
144        needExit = ReqType::pending;
145    if (needStopSync != ReqType::none)
146        needStopSync = ReqType::pending;
147    DistIface::master->sendCmd(header);
148    // now wait until all receiver threads complete the synchronisation
149    auto lf = [this]{ return waitNum == 0; };
150    cv.wait(sync_lock, lf);
151    // global synchronisation is done.
152    assert(isAbort || !same_tick || (nextAt == curTick()));
153    return !isAbort;
154}
155
156
157bool
158DistIface::SyncSwitch::run(bool same_tick)
159{
160    std::unique_lock<std::mutex> sync_lock(lock);
161    Header header;
162    // Wait for the sync requests from the nodes
163    if (waitNum > 0) {
164        auto lf = [this]{ return waitNum == 0; };
165        cv.wait(sync_lock, lf);
166    }
167    assert(waitNum == 0);
168    if (isAbort) // sync aborted
169        return false;
170    assert(!same_tick || (nextAt == curTick()));
171    waitNum = numNodes;
172    // Complete the global synchronisation
173    header.msgType = MsgType::cmdSyncAck;
174    header.sendTick = nextAt;
175    header.syncRepeat = nextRepeat;
176    if (doCkpt || numCkptReq == numNodes) {
177        doCkpt = true;
178        header.needCkpt = ReqType::immediate;
179        numCkptReq = 0;
180    } else {
181        header.needCkpt = ReqType::none;
182    }
183    if (doExit || numExitReq == numNodes) {
184        doExit = true;
185        header.needExit = ReqType::immediate;
186    } else {
187        header.needExit = ReqType::none;
188    }
189    if (doStopSync || numStopSyncReq == numNodes) {
190        doStopSync = true;
191        numStopSyncReq = 0;
192        header.needStopSync = ReqType::immediate;
193    } else {
194        header.needStopSync = ReqType::none;
195    }
196    DistIface::master->sendCmd(header);
197    return true;
198}
199
200bool
201DistIface::SyncSwitch::progress(Tick send_tick,
202                                 Tick sync_repeat,
203                                 ReqType need_ckpt,
204                                 ReqType need_exit,
205                                 ReqType need_stop_sync)
206{
207    std::unique_lock<std::mutex> sync_lock(lock);
208    if (isAbort) // sync aborted
209        return false;
210    assert(waitNum > 0);
211
212    if (send_tick > nextAt)
213        nextAt = send_tick;
214    if (nextRepeat > sync_repeat)
215        nextRepeat = sync_repeat;
216
217    if (need_ckpt == ReqType::collective)
218        numCkptReq++;
219    else if (need_ckpt == ReqType::immediate)
220        doCkpt = true;
221    if (need_exit == ReqType::collective)
222        numExitReq++;
223    else if (need_exit == ReqType::immediate)
224        doExit = true;
225    if (need_stop_sync == ReqType::collective)
226        numStopSyncReq++;
227    else if (need_stop_sync == ReqType::immediate)
228        doStopSync = true;
229
230    waitNum--;
231    // Notify the simulation thread if the on-going sync is complete
232    if (waitNum == 0) {
233        sync_lock.unlock();
234        cv.notify_one();
235    }
236    // The receive thread must keep alive in the switch until the node
237    // closes the connection. Thus, we always return true here.
238    return true;
239}
240
241bool
242DistIface::SyncNode::progress(Tick max_send_tick,
243                               Tick next_repeat,
244                               ReqType do_ckpt,
245                               ReqType do_exit,
246                               ReqType do_stop_sync)
247{
248    std::unique_lock<std::mutex> sync_lock(lock);
249    if (isAbort) // sync aborted
250        return false;
251    assert(waitNum > 0);
252
253    nextAt = max_send_tick;
254    nextRepeat = next_repeat;
255    doCkpt = (do_ckpt != ReqType::none);
256    doExit = (do_exit != ReqType::none);
257    doStopSync = (do_stop_sync != ReqType::none);
258
259    waitNum--;
260    // Notify the simulation thread if the on-going sync is complete
261    if (waitNum == 0) {
262        sync_lock.unlock();
263        cv.notify_one();
264    }
265    // The receive thread must finish when simulation is about to exit
266    return !doExit;
267}
268
269void
270DistIface::SyncNode::requestCkpt(ReqType req)
271{
272   std::lock_guard<std::mutex> sync_lock(lock);
273   assert(req != ReqType::none);
274   if (needCkpt != ReqType::none)
275       warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
276   if (needCkpt == ReqType::none || req == ReqType::immediate)
277       needCkpt = req;
278}
279
280void
281DistIface::SyncNode::requestExit(ReqType req)
282{
283   std::lock_guard<std::mutex> sync_lock(lock);
284   assert(req != ReqType::none);
285   if (needExit != ReqType::none)
286       warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
287   if (needExit == ReqType::none || req == ReqType::immediate)
288       needExit = req;
289}
290
291void
292DistIface::Sync::drainComplete()
293{
294    if (doCkpt) {
295        // The first DistIface object called this right before writing the
296        // checkpoint. We need to drain the underlying physical network here.
297        // Note that other gem5 peers may enter this barrier at different
298        // ticks due to draining.
299        run(false);
300        // Only the "first" DistIface object has to perform the sync
301        doCkpt = false;
302    }
303}
304
305void
306DistIface::SyncNode::serialize(CheckpointOut &cp) const
307{
308    int need_exit = static_cast<int>(needExit);
309    SERIALIZE_SCALAR(need_exit);
310}
311
312void
313DistIface::SyncNode::unserialize(CheckpointIn &cp)
314{
315    int need_exit;
316    UNSERIALIZE_SCALAR(need_exit);
317    needExit = static_cast<ReqType>(need_exit);
318}
319
320void
321DistIface::SyncSwitch::serialize(CheckpointOut &cp) const
322{
323    SERIALIZE_SCALAR(numExitReq);
324}
325
326void
327DistIface::SyncSwitch::unserialize(CheckpointIn &cp)
328{
329    UNSERIALIZE_SCALAR(numExitReq);
330}
331
332void
333DistIface::SyncEvent::start()
334{
335    // Note that this may be called either from startup() or drainResume()
336
337    // At this point, all DistIface objects has already called Sync::init() so
338    // we have a local minimum of the start tick and repeat for the periodic
339    // sync.
340    repeat = DistIface::sync->nextRepeat;
341    // Do a global barrier to agree on a common repeat value (the smallest
342    // one from all participating nodes.
343    if (!DistIface::sync->run(false))
344        panic("DistIface::SyncEvent::start() aborted\n");
345
346    assert(!DistIface::sync->doCkpt);
347    assert(!DistIface::sync->doExit);
348    assert(!DistIface::sync->doStopSync);
349    assert(DistIface::sync->nextAt >= curTick());
350    assert(DistIface::sync->nextRepeat <= repeat);
351
352    if (curTick() == 0)
353        assert(!scheduled());
354
355    // Use the maximum of the current tick for all participating nodes or a
356    // user provided starting tick.
357    if (scheduled())
358        reschedule(DistIface::sync->nextAt);
359    else
360        schedule(DistIface::sync->nextAt);
361
362    inform("Dist sync scheduled at %lu and repeats %lu\n",  when(),
363           DistIface::sync->nextRepeat);
364}
365
366void
367DistIface::SyncEvent::process()
368{
369    // We may not start a global periodic sync while draining before taking a
370    // checkpoint.  This is due to the possibility that peer gem5 processes
371    // may not hit the same periodic sync before they complete draining and
372    // that would make this periodic sync clash with sync called from
373    // DistIface::serialize() by other gem5 processes.
374    // We would need a 'distributed drain' solution to eliminate this
375    // restriction.
376    // Note that if draining was not triggered by checkpointing then we are
377    // fine since no extra global sync will happen (i.e. all peer gem5 will
378    // hit this periodic sync eventually).
379    panic_if(_draining && DistIface::sync->doCkpt,
380             "Distributed sync is hit while draining");
381    /*
382     * Note that this is a global event so this process method will be called
383     * by only exactly one thread.
384     */
385    /*
386     * We hold the eventq lock at this point but the receiver thread may
387     * need the lock to schedule new recv events while waiting for the
388     * dist sync to complete.
389     * Note that the other simulation threads also release their eventq
390     * locks while waiting for us due to the global event semantics.
391     */
392    {
393        EventQueue::ScopedRelease sr(curEventQueue());
394        // we do a global sync here that is supposed to happen at the same
395        // tick in all gem5 peers
396        if (!DistIface::sync->run(true))
397            return; // global sync aborted
398        // global sync completed
399    }
400    if (DistIface::sync->doCkpt)
401        exitSimLoop("checkpoint");
402    if (DistIface::sync->doExit) {
403        exitSimLoop("exit request from gem5 peers");
404        return;
405    }
406    if (DistIface::sync->doStopSync) {
407        DistIface::sync->doStopSync = false;
408        inform("synchronization disabled at %lu\n", curTick());
409
410        // The switch node needs to wait for the next sync immediately.
411        if (DistIface::isSwitch) {
412            start();
413        } else {
414            // Wake up thread contexts on non-switch nodes.
415            for (int i = 0; i < DistIface::master->sys->numContexts(); i++) {
416                ThreadContext *tc =
417                    DistIface::master->sys->getThreadContext(i);
418                if (tc->status() == ThreadContext::Suspended)
419                    tc->activate();
420                else
421                    warn_once("Tried to wake up thread in dist-gem5, but it "
422                              "was already awake!\n");
423            }
424        }
425        return;
426    }
427    // schedule the next periodic sync
428    repeat = DistIface::sync->nextRepeat;
429    schedule(curTick() + repeat);
430}
431
432void
433DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay)
434{
435    // This is called from the receiver thread when it starts running. The new
436    // receiver thread shares the event queue with the simulation thread
437    // (associated with the simulated Ethernet link).
438    curEventQueue(eventManager->eventQueue());
439
440    recvDone = recv_done;
441    linkDelay = link_delay;
442}
443
444Tick
445DistIface::RecvScheduler::calcReceiveTick(Tick send_tick,
446                                          Tick send_delay,
447                                          Tick prev_recv_tick)
448{
449    Tick recv_tick = send_tick + send_delay + linkDelay;
450    // sanity check (we need atleast a send delay long window)
451    assert(recv_tick >= prev_recv_tick + send_delay);
452    panic_if(prev_recv_tick + send_delay > recv_tick,
453             "Receive window is smaller than send delay");
454    panic_if(recv_tick <= curTick(),
455             "Simulators out of sync - missed packet receive by %llu ticks"
456             "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
457             "linkDelay: %lu )",
458             curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
459             linkDelay);
460
461    return recv_tick;
462}
463
464void
465DistIface::RecvScheduler::resumeRecvTicks()
466{
467    // Schedule pending packets asap in case link speed/delay changed when
468    // restoring from the checkpoint.
469    // This may be done during unserialize except that curTick() is unknown
470    // so we call this during drainResume().
471    // If we are not restoring from a checkppint then link latency could not
472    // change so we just return.
473    if (!ckptRestore)
474        return;
475
476    std::vector<Desc> v;
477    while (!descQueue.empty()) {
478        Desc d = descQueue.front();
479        descQueue.pop();
480        d.sendTick = curTick();
481        d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed
482        v.push_back(d);
483    }
484
485    for (auto &d : v)
486        descQueue.push(d);
487
488    if (recvDone->scheduled()) {
489        assert(!descQueue.empty());
490        eventManager->reschedule(recvDone, curTick());
491    } else {
492        assert(descQueue.empty() && v.empty());
493    }
494    ckptRestore = false;
495}
496
497void
498DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet,
499                                     Tick send_tick,
500                                     Tick send_delay)
501{
502    // Note : this is called from the receiver thread
503    curEventQueue()->lock();
504    Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
505
506    DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
507            "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
508            send_tick, send_delay, linkDelay, recv_tick);
509    // Every packet must be sent and arrive in the same quantum
510    assert(send_tick > master->syncEvent->when() -
511           master->syncEvent->repeat);
512    // No packet may be scheduled for receive in the arrival quantum
513    assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
514
515    // Now we are about to schedule a recvDone event for the new data packet.
516    // We use the same recvDone object for all incoming data packets. Packet
517    // descriptors are saved in the ordered queue. The currently scheduled
518    // packet is always on the top of the queue.
519    // NOTE:  we use the event queue lock to protect the receive desc queue,
520    // too, which is accessed both by the receiver thread and the simulation
521    // thread.
522    descQueue.emplace(new_packet, send_tick, send_delay);
523    if (descQueue.size() == 1) {
524        assert(!recvDone->scheduled());
525        eventManager->schedule(recvDone, recv_tick);
526    } else {
527        assert(recvDone->scheduled());
528        panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
529                 "Out of order packet received (recv_tick: %lu top(): %lu\n",
530                 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
531    }
532    curEventQueue()->unlock();
533}
534
535EthPacketPtr
536DistIface::RecvScheduler::popPacket()
537{
538    // Note : this is called from the simulation thread when a receive done
539    // event is being processed for the link. We assume that the thread holds
540    // the event queue queue lock when this is called!
541    EthPacketPtr next_packet = descQueue.front().packet;
542    descQueue.pop();
543
544    if (descQueue.size() > 0) {
545        Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
546                                         descQueue.front().sendDelay,
547                                         curTick());
548        eventManager->schedule(recvDone, recv_tick);
549    }
550    prevRecvTick = curTick();
551    return next_packet;
552}
553
554void
555DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const
556{
557        SERIALIZE_SCALAR(sendTick);
558        SERIALIZE_SCALAR(sendDelay);
559        packet->serialize("rxPacket", cp);
560}
561
562void
563DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp)
564{
565        UNSERIALIZE_SCALAR(sendTick);
566        UNSERIALIZE_SCALAR(sendDelay);
567        packet = std::make_shared<EthPacketData>();
568        packet->unserialize("rxPacket", cp);
569}
570
571void
572DistIface::RecvScheduler::serialize(CheckpointOut &cp) const
573{
574    SERIALIZE_SCALAR(prevRecvTick);
575    // serialize the receive desc queue
576    std::queue<Desc> tmp_queue(descQueue);
577    unsigned n_desc_queue = descQueue.size();
578    assert(tmp_queue.size() == descQueue.size());
579    SERIALIZE_SCALAR(n_desc_queue);
580    for (int i = 0; i < n_desc_queue; i++) {
581        tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
582        tmp_queue.pop();
583    }
584    assert(tmp_queue.empty());
585}
586
587void
588DistIface::RecvScheduler::unserialize(CheckpointIn &cp)
589{
590    assert(descQueue.size() == 0);
591    assert(!recvDone->scheduled());
592    assert(!ckptRestore);
593
594    UNSERIALIZE_SCALAR(prevRecvTick);
595    // unserialize the receive desc queue
596    unsigned n_desc_queue;
597    UNSERIALIZE_SCALAR(n_desc_queue);
598    for (int i = 0; i < n_desc_queue; i++) {
599        Desc recv_desc;
600        recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
601        descQueue.push(recv_desc);
602    }
603    ckptRestore = true;
604}
605
606DistIface::DistIface(unsigned dist_rank,
607                     unsigned dist_size,
608                     Tick sync_start,
609                     Tick sync_repeat,
610                     EventManager *em,
611                     bool use_pseudo_op,
612                     bool is_switch, int num_nodes) :
613    syncStart(sync_start), syncRepeat(sync_repeat),
614    recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op),
615    rank(dist_rank), size(dist_size)
616{
617    DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
618    isMaster = false;
619    if (master == nullptr) {
620        assert(sync == nullptr);
621        assert(syncEvent == nullptr);
622        isSwitch = is_switch;
623        if (is_switch)
624            sync = new SyncSwitch(num_nodes);
625        else
626            sync = new SyncNode();
627        syncEvent = new SyncEvent();
628        master = this;
629        isMaster = true;
630    }
631    distIfaceId = distIfaceNum;
632    distIfaceNum++;
633}
634
635DistIface::~DistIface()
636{
637    assert(recvThread);
638    recvThread->join();
639    delete recvThread;
640    if (distIfaceNum-- == 0) {
641        assert(syncEvent);
642        delete syncEvent;
643        assert(sync);
644        delete sync;
645    }
646    if (this == master)
647        master = nullptr;
648}
649
650void
651DistIface::packetOut(EthPacketPtr pkt, Tick send_delay)
652{
653    Header header;
654
655    // Prepare a dist header packet for the Ethernet packet we want to
656    // send out.
657    header.msgType = MsgType::dataDescriptor;
658    header.sendTick  = curTick();
659    header.sendDelay = send_delay;
660
661    header.dataPacketLength = pkt->length;
662    header.simLength = pkt->simLength;
663
664    // Send out the packet and the meta info.
665    sendPacket(header, pkt);
666
667    DPRINTF(DistEthernetPkt,
668            "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
669            pkt->length, send_delay);
670}
671
672void
673DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
674{
675    EthPacketPtr new_packet;
676    DistHeaderPkt::Header header;
677
678    // Initialize receive scheduler parameters
679    recvScheduler.init(recv_done, link_delay);
680
681    // Main loop to wait for and process any incoming message.
682    for (;;) {
683        // recvHeader() blocks until the next dist header packet comes in.
684        if (!recvHeader(header)) {
685            // We lost connection to the peer gem5 processes most likely
686            // because one of them called m5 exit. So we stop here.
687            // Grab the eventq lock to stop the simulation thread
688            curEventQueue()->lock();
689            exitSimLoop("connection to gem5 peer got closed");
690            curEventQueue()->unlock();
691            // The simulation thread may be blocked in processing an on-going
692            // global synchronisation. Abort the sync to give the simulation
693            // thread a chance to make progress and process the exit event.
694            sync->abort();
695            // Finish receiver thread
696            break;
697        }
698
699        // We got a valid dist header packet, let's process it
700        if (header.msgType == MsgType::dataDescriptor) {
701            recvPacket(header, new_packet);
702            recvScheduler.pushPacket(new_packet,
703                                     header.sendTick,
704                                     header.sendDelay);
705        } else {
706            // everything else must be synchronisation related command
707            if (!sync->progress(header.sendTick,
708                                header.syncRepeat,
709                                header.needCkpt,
710                                header.needExit,
711                                header.needStopSync))
712                // Finish receiver thread if simulation is about to exit
713                break;
714        }
715    }
716}
717
718void
719DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
720{
721    assert(recvThread == nullptr);
722
723    recvThread = new std::thread(&DistIface::recvThreadFunc,
724                                 this,
725                                 const_cast<Event *>(recv_done),
726                                 link_delay);
727    recvThreadsNum++;
728}
729
730DrainState
731DistIface::drain()
732{
733    DPRINTF(DistEthernet,"DistIFace::drain() called\n");
734    // This can be called multiple times in the same drain cycle.
735    if (this == master)
736        syncEvent->draining(true);
737    return DrainState::Drained;
738}
739
740void
741DistIface::drainResume() {
742    DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
743    if (this == master)
744        syncEvent->draining(false);
745    recvScheduler.resumeRecvTicks();
746}
747
748void
749DistIface::serialize(CheckpointOut &cp) const
750{
751    // Drain the dist interface before the checkpoint is taken. We cannot call
752    // this as part of the normal drain cycle because this dist sync has to be
753    // called exactly once after the system is fully drained.
754    sync->drainComplete();
755
756    unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
757
758    SERIALIZE_SCALAR(rank_orig);
759    SERIALIZE_SCALAR(dist_iface_id_orig);
760
761    recvScheduler.serializeSection(cp, "recvScheduler");
762    if (this == master) {
763        sync->serializeSection(cp, "Sync");
764    }
765}
766
767void
768DistIface::unserialize(CheckpointIn &cp)
769{
770    unsigned rank_orig, dist_iface_id_orig;
771    UNSERIALIZE_SCALAR(rank_orig);
772    UNSERIALIZE_SCALAR(dist_iface_id_orig);
773
774    panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
775             rank, rank_orig);
776    panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
777             "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
778             dist_iface_id_orig);
779
780    recvScheduler.unserializeSection(cp, "recvScheduler");
781    if (this == master) {
782        sync->unserializeSection(cp, "Sync");
783    }
784}
785
786void
787DistIface::init(const Event *done_event, Tick link_delay)
788{
789    // Init hook for the underlaying message transport to setup/finalize
790    // communication channels
791    initTransport();
792
793    // Spawn a new receiver thread that will process messages
794    // coming in from peer gem5 processes.
795    // The receive thread will also schedule a (receive) doneEvent
796    // for each incoming data packet.
797    spawnRecvThread(done_event, link_delay);
798
799
800    // Adjust the periodic sync start and interval. Different DistIface
801    // might have different requirements. The singleton sync object
802    // will select the minimum values for both params.
803    assert(sync != nullptr);
804    sync->init(syncStart, syncRepeat);
805
806    // Initialize the seed for random generator to avoid the same sequence
807    // in all gem5 peer processes
808    assert(master != nullptr);
809    if (this == master)
810        random_mt.init(5489 * (rank+1) + 257);
811}
812
813void
814DistIface::startup()
815{
816    DPRINTF(DistEthernet, "DistIface::startup() started\n");
817    // Schedule synchronization unless we are not a switch in pseudo_op mode.
818    if (this == master && (!syncStartOnPseudoOp || isSwitch))
819        syncEvent->start();
820    DPRINTF(DistEthernet, "DistIface::startup() done\n");
821}
822
823bool
824DistIface::readyToCkpt(Tick delay, Tick period)
825{
826    bool ret = true;
827    DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
828            "period:%lu\n", delay, period);
829    if (master) {
830        if (delay == 0) {
831            inform("m5 checkpoint called with zero delay => triggering collaborative "
832                   "checkpoint\n");
833            sync->requestCkpt(ReqType::collective);
834        } else {
835            inform("m5 checkpoint called with non-zero delay => triggering immediate "
836                   "checkpoint (at the next sync)\n");
837            sync->requestCkpt(ReqType::immediate);
838        }
839        if (period != 0)
840            inform("Non-zero period for m5_ckpt is ignored in "
841                   "distributed gem5 runs\n");
842        ret = false;
843    }
844    return ret;
845}
846
847void
848DistIface::SyncNode::requestStopSync(ReqType req)
849{
850   std::lock_guard<std::mutex> sync_lock(lock);
851   needStopSync = req;
852}
853
854void
855DistIface::toggleSync(ThreadContext *tc)
856{
857    // Unforunate that we have to populate the system pointer member this way.
858    master->sys = tc->getSystemPtr();
859
860    // The invariant for both syncing and "unsyncing" is that all threads will
861    // stop executing intructions until the desired sync state has been reached
862    // for all nodes.  This is the easiest way to prevent deadlock (in the case
863    // of "unsyncing") and causality errors (in the case of syncing).
864    if (master->syncEvent->scheduled()) {
865        inform("Request toggling syncronization off\n");
866        master->sync->requestStopSync(ReqType::collective);
867
868        // At this point, we have no clue when everyone will reach the sync
869        // stop point.  Suspend execution of all local thread contexts.
870        // Dist-gem5 will reactivate all thread contexts when everyone has
871        // reached the sync stop point.
872#if THE_ISA != NULL_ISA
873        for (int i = 0; i < master->sys->numContexts(); i++) {
874            ThreadContext *tc = master->sys->getThreadContext(i);
875            if (tc->status() == ThreadContext::Active)
876                tc->quiesce();
877        }
878#endif
879    } else {
880        inform("Request toggling syncronization on\n");
881        master->syncEvent->start();
882
883        // We need to suspend all CPUs until the sync point is reached by all
884        // nodes to prevent causality errors.  We can also schedule CPU
885        // activation here, since we know exactly when the next sync will
886        // occur.
887#if THE_ISA != NULL_ISA
888        for (int i = 0; i < master->sys->numContexts(); i++) {
889            ThreadContext *tc = master->sys->getThreadContext(i);
890            if (tc->status() == ThreadContext::Active)
891                tc->quiesceTick(master->syncEvent->when() + 1);
892        }
893#endif
894    }
895}
896
897bool
898DistIface::readyToExit(Tick delay)
899{
900    bool ret = true;
901    DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
902            delay);
903    if (master) {
904        // To successfully coordinate an exit, all nodes must be synchronising
905        if (!master->syncEvent->scheduled())
906            master->syncEvent->start();
907
908        if (delay == 0) {
909            inform("m5 exit called with zero delay => triggering collaborative "
910                   "exit\n");
911            sync->requestExit(ReqType::collective);
912        } else {
913            inform("m5 exit called with non-zero delay => triggering immediate "
914                   "exit (at the next sync)\n");
915            sync->requestExit(ReqType::immediate);
916        }
917        ret = false;
918    }
919    return ret;
920}
921
922uint64_t
923DistIface::rankParam()
924{
925    uint64_t val;
926    if (master) {
927        val = master->rank;
928    } else {
929        warn("Dist-rank parameter is queried in single gem5 simulation.");
930        val = 0;
931    }
932    return val;
933}
934
935uint64_t
936DistIface::sizeParam()
937{
938    uint64_t val;
939    if (master) {
940        val = master->size;
941    } else {
942        warn("Dist-size parameter is queried in single gem5 simulation.");
943        val = 1;
944    }
945    return val;
946}
947