dist_iface.cc revision 11701:5e7599457b97
13691Shsul@eecs.umich.edu/*
23691Shsul@eecs.umich.edu * Copyright (c) 2015 ARM Limited
33691Shsul@eecs.umich.edu * All rights reserved
43691Shsul@eecs.umich.edu *
53691Shsul@eecs.umich.edu * The license below extends only to copyright in the software and shall
63691Shsul@eecs.umich.edu * not be construed as granting a license to any other intellectual
73691Shsul@eecs.umich.edu * property including but not limited to intellectual property relating
83691Shsul@eecs.umich.edu * to a hardware implementation of the functionality of the software
93691Shsul@eecs.umich.edu * licensed hereunder.  You may use the software subject to the license
103691Shsul@eecs.umich.edu * terms below provided that you ensure that this notice is replicated
113691Shsul@eecs.umich.edu * unmodified and in its entirety in all distributions of the software,
123691Shsul@eecs.umich.edu * modified or unmodified, in source code or in binary form.
133691Shsul@eecs.umich.edu *
143691Shsul@eecs.umich.edu * Redistribution and use in source and binary forms, with or without
153691Shsul@eecs.umich.edu * modification, are permitted provided that the following conditions are
163691Shsul@eecs.umich.edu * met: redistributions of source code must retain the above copyright
173691Shsul@eecs.umich.edu * notice, this list of conditions and the following disclaimer;
183691Shsul@eecs.umich.edu * redistributions in binary form must reproduce the above copyright
193691Shsul@eecs.umich.edu * notice, this list of conditions and the following disclaimer in the
203691Shsul@eecs.umich.edu * documentation and/or other materials provided with the distribution;
213691Shsul@eecs.umich.edu * neither the name of the copyright holders nor the names of its
223691Shsul@eecs.umich.edu * contributors may be used to endorse or promote products derived from
233691Shsul@eecs.umich.edu * this software without specific prior written permission.
243691Shsul@eecs.umich.edu *
253691Shsul@eecs.umich.edu * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
263691Shsul@eecs.umich.edu * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
273691Shsul@eecs.umich.edu * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
283691Shsul@eecs.umich.edu * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
293691Shsul@eecs.umich.edu * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
303691Shsul@eecs.umich.edu * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
3111682Sandreas.hansson@arm.com * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
3211682Sandreas.hansson@arm.com * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
3311682Sandreas.hansson@arm.com * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
343691Shsul@eecs.umich.edu * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
359826Sandreas.hansson@arm.com * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
369826Sandreas.hansson@arm.com *
379793Sakash.bagdia@arm.com * Authors: Gabor Dozsa
389827Sakash.bagdia@arm.com */
399827Sakash.bagdia@arm.com
409827Sakash.bagdia@arm.com/* @file
419793Sakash.bagdia@arm.com * The interface class for dist-gem5 simulations.
429827Sakash.bagdia@arm.com */
439827Sakash.bagdia@arm.com
449793Sakash.bagdia@arm.com#include "dev/net/dist_iface.hh"
453691Shsul@eecs.umich.edu
468876Sandreas.hansson@arm.com#include <queue>
478876Sandreas.hansson@arm.com#include <thread>
487876Sgblack@eecs.umich.edu
499793Sakash.bagdia@arm.com#include "base/random.hh"
509793Sakash.bagdia@arm.com#include "base/trace.hh"
519793Sakash.bagdia@arm.com#include "debug/DistEthernet.hh"
529827Sakash.bagdia@arm.com#include "debug/DistEthernetPkt.hh"
539827Sakash.bagdia@arm.com#include "dev/net/etherpkt.hh"
549827Sakash.bagdia@arm.com#include "sim/sim_exit.hh"
559793Sakash.bagdia@arm.com#include "sim/sim_object.hh"
569793Sakash.bagdia@arm.com
579827Sakash.bagdia@arm.comusing namespace std;
589827Sakash.bagdia@arm.comDistIface::Sync *DistIface::sync = nullptr;
599827Sakash.bagdia@arm.comDistIface::SyncEvent *DistIface::syncEvent = nullptr;
609793Sakash.bagdia@arm.comunsigned DistIface::distIfaceNum = 0;
618713Sandreas.hansson@arm.comunsigned DistIface::recvThreadsNum = 0;
628713Sandreas.hansson@arm.comDistIface *DistIface::master = nullptr;
638713Sandreas.hansson@arm.com
649408Sandreas.hansson@arm.comvoid
658839Sandreas.hansson@arm.comDistIface::Sync::init(Tick start_tick, Tick repeat_tick)
668839Sandreas.hansson@arm.com{
673691Shsul@eecs.umich.edu    if (start_tick < firstAt) {
689826Sandreas.hansson@arm.com        firstAt = start_tick;
699826Sandreas.hansson@arm.com        inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
709826Sandreas.hansson@arm.com    }
719826Sandreas.hansson@arm.com
723691Shsul@eecs.umich.edu    if (repeat_tick == 0)
739827Sakash.bagdia@arm.com        panic("Dist synchronisation interval must be greater than zero");
749827Sakash.bagdia@arm.com
759793Sakash.bagdia@arm.com    if (repeat_tick < nextRepeat) {
769827Sakash.bagdia@arm.com        nextRepeat = repeat_tick;
779827Sakash.bagdia@arm.com        inform("Dist synchronisation interval is changed to %lu.\n",
789827Sakash.bagdia@arm.com               nextRepeat);
793691Shsul@eecs.umich.edu    }
808876Sandreas.hansson@arm.com}
818876Sandreas.hansson@arm.com
827876Sgblack@eecs.umich.eduDistIface::SyncSwitch::SyncSwitch(int num_nodes)
839793Sakash.bagdia@arm.com{
849793Sakash.bagdia@arm.com    numNodes = num_nodes;
859793Sakash.bagdia@arm.com    waitNum = num_nodes;
869827Sakash.bagdia@arm.com    numExitReq = 0;
879827Sakash.bagdia@arm.com    numCkptReq = 0;
889827Sakash.bagdia@arm.com    doExit = false;
899793Sakash.bagdia@arm.com    doCkpt = false;
909793Sakash.bagdia@arm.com    firstAt = std::numeric_limits<Tick>::max();
919827Sakash.bagdia@arm.com    nextAt = 0;
929827Sakash.bagdia@arm.com    nextRepeat = std::numeric_limits<Tick>::max();
939827Sakash.bagdia@arm.com}
949793Sakash.bagdia@arm.com
959408Sandreas.hansson@arm.comDistIface::SyncNode::SyncNode()
968839Sandreas.hansson@arm.com{
978839Sandreas.hansson@arm.com    waitNum = 0;
983691Shsul@eecs.umich.edu    needExit = ReqType::none;
999826Sandreas.hansson@arm.com    needCkpt = ReqType::none;
1009826Sandreas.hansson@arm.com    doExit = false;
1019826Sandreas.hansson@arm.com    doCkpt = false;
1028801Sgblack@eecs.umich.edu    firstAt = std::numeric_limits<Tick>::max();
1033691Shsul@eecs.umich.edu    nextAt = 0;
1043691Shsul@eecs.umich.edu    nextRepeat = std::numeric_limits<Tick>::max();
105}
106
107void
108DistIface::SyncNode::run(bool same_tick)
109{
110    std::unique_lock<std::mutex> sync_lock(lock);
111    Header header;
112
113    assert(waitNum == 0);
114    waitNum = DistIface::recvThreadsNum;
115    // initiate the global synchronisation
116    header.msgType = MsgType::cmdSyncReq;
117    header.sendTick = curTick();
118    header.syncRepeat = nextRepeat;
119    header.needCkpt = needCkpt;
120    if (needCkpt != ReqType::none)
121        needCkpt = ReqType::pending;
122    header.needExit = needExit;
123    if (needExit != ReqType::none)
124        needExit = ReqType::pending;
125    DistIface::master->sendCmd(header);
126    // now wait until all receiver threads complete the synchronisation
127    auto lf = [this]{ return waitNum == 0; };
128    cv.wait(sync_lock, lf);
129    // global synchronisation is done
130    assert(!same_tick || (nextAt == curTick()));
131}
132
133
134void
135DistIface::SyncSwitch::run(bool same_tick)
136{
137    std::unique_lock<std::mutex> sync_lock(lock);
138    Header header;
139    // Wait for the sync requests from the nodes
140    if (waitNum > 0) {
141        auto lf = [this]{ return waitNum == 0; };
142        cv.wait(sync_lock, lf);
143    }
144    assert(waitNum == 0);
145    assert(!same_tick || (nextAt == curTick()));
146    waitNum = numNodes;
147    // Complete the global synchronisation
148    header.msgType = MsgType::cmdSyncAck;
149    header.sendTick = nextAt;
150    header.syncRepeat = nextRepeat;
151    if (doCkpt || numCkptReq == numNodes) {
152        doCkpt = true;
153        header.needCkpt = ReqType::immediate;
154        numCkptReq = 0;
155    } else {
156        header.needCkpt = ReqType::none;
157    }
158    if (doExit || numExitReq == numNodes) {
159        doExit = true;
160        header.needExit = ReqType::immediate;
161    } else {
162        header.needExit = ReqType::none;
163    }
164    DistIface::master->sendCmd(header);
165}
166
167void
168DistIface::SyncSwitch::progress(Tick send_tick,
169                                 Tick sync_repeat,
170                                 ReqType need_ckpt,
171                                 ReqType need_exit)
172{
173    std::unique_lock<std::mutex> sync_lock(lock);
174    assert(waitNum > 0);
175
176    if (send_tick > nextAt)
177        nextAt = send_tick;
178    if (nextRepeat > sync_repeat)
179        nextRepeat = sync_repeat;
180
181    if (need_ckpt == ReqType::collective)
182        numCkptReq++;
183    else if (need_ckpt == ReqType::immediate)
184        doCkpt = true;
185    if (need_exit == ReqType::collective)
186        numExitReq++;
187    else if (need_exit == ReqType::immediate)
188        doExit = true;
189
190    waitNum--;
191    // Notify the simulation thread if the on-going sync is complete
192    if (waitNum == 0) {
193        sync_lock.unlock();
194        cv.notify_one();
195    }
196}
197
198void
199DistIface::SyncNode::progress(Tick max_send_tick,
200                               Tick next_repeat,
201                               ReqType do_ckpt,
202                               ReqType do_exit)
203{
204    std::unique_lock<std::mutex> sync_lock(lock);
205    assert(waitNum > 0);
206
207    nextAt = max_send_tick;
208    nextRepeat = next_repeat;
209    doCkpt = (do_ckpt != ReqType::none);
210    doExit = (do_exit != ReqType::none);
211
212    waitNum--;
213    // Notify the simulation thread if the on-going sync is complete
214    if (waitNum == 0) {
215        sync_lock.unlock();
216        cv.notify_one();
217    }
218}
219
220void
221DistIface::SyncNode::requestCkpt(ReqType req)
222{
223   std::lock_guard<std::mutex> sync_lock(lock);
224   assert(req != ReqType::none);
225   if (needCkpt != ReqType::none)
226       warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
227   if (needCkpt == ReqType::none || req == ReqType::immediate)
228       needCkpt = req;
229}
230
231void
232DistIface::SyncNode::requestExit(ReqType req)
233{
234   std::lock_guard<std::mutex> sync_lock(lock);
235   assert(req != ReqType::none);
236   if (needExit != ReqType::none)
237       warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
238   if (needExit == ReqType::none || req == ReqType::immediate)
239       needExit = req;
240}
241
242void
243DistIface::Sync::drainComplete()
244{
245    if (doCkpt) {
246        // The first DistIface object called this right before writing the
247        // checkpoint. We need to drain the underlying physical network here.
248        // Note that other gem5 peers may enter this barrier at different
249        // ticks due to draining.
250        run(false);
251        // Only the "first" DistIface object has to perform the sync
252        doCkpt = false;
253    }
254}
255
256void
257DistIface::SyncNode::serialize(CheckpointOut &cp) const
258{
259    int need_exit = static_cast<int>(needExit);
260    SERIALIZE_SCALAR(need_exit);
261}
262
263void
264DistIface::SyncNode::unserialize(CheckpointIn &cp)
265{
266    int need_exit;
267    UNSERIALIZE_SCALAR(need_exit);
268    needExit = static_cast<ReqType>(need_exit);
269}
270
271void
272DistIface::SyncSwitch::serialize(CheckpointOut &cp) const
273{
274    SERIALIZE_SCALAR(numExitReq);
275}
276
277void
278DistIface::SyncSwitch::unserialize(CheckpointIn &cp)
279{
280    UNSERIALIZE_SCALAR(numExitReq);
281}
282
283void
284DistIface::SyncEvent::start()
285{
286    // Note that this may be called either from startup() or drainResume()
287
288    // At this point, all DistIface objects has already called Sync::init() so
289    // we have a local minimum of the start tick and repeat for the periodic
290    // sync.
291    Tick firstAt  = DistIface::sync->firstAt;
292    repeat = DistIface::sync->nextRepeat;
293    // Do a global barrier to agree on a common repeat value (the smallest
294    // one from all participating nodes
295    DistIface::sync->run(curTick() == 0);
296
297    assert(!DistIface::sync->doCkpt);
298    assert(!DistIface::sync->doExit);
299    assert(DistIface::sync->nextAt >= curTick());
300    assert(DistIface::sync->nextRepeat <= repeat);
301
302    // if this is called at tick 0 then we use the config start param otherwise
303    // the maximum of the current tick of all participating nodes
304    if (curTick() == 0) {
305        assert(!scheduled());
306        assert(DistIface::sync->nextAt == 0);
307        schedule(firstAt);
308    } else {
309        if (scheduled())
310            reschedule(DistIface::sync->nextAt);
311        else
312            schedule(DistIface::sync->nextAt);
313    }
314    inform("Dist sync scheduled at %lu and repeats %lu\n",  when(),
315           DistIface::sync->nextRepeat);
316}
317
318void
319DistIface::SyncEvent::process()
320{
321    // We may not start a global periodic sync while draining before taking a
322    // checkpoint.  This is due to the possibility that peer gem5 processes
323    // may not hit the same periodic sync before they complete draining and
324    // that would make this periodic sync clash with sync called from
325    // DistIface::serialize() by other gem5 processes.
326    // We would need a 'distributed drain' solution to eliminate this
327    // restriction.
328    // Note that if draining was not triggered by checkpointing then we are
329    // fine since no extra global sync will happen (i.e. all peer gem5 will
330    // hit this periodic sync eventually).
331    panic_if(_draining && DistIface::sync->doCkpt,
332             "Distributed sync is hit while draining");
333    /*
334     * Note that this is a global event so this process method will be called
335     * by only exactly one thread.
336     */
337    /*
338     * We hold the eventq lock at this point but the receiver thread may
339     * need the lock to schedule new recv events while waiting for the
340     * dist sync to complete.
341     * Note that the other simulation threads also release their eventq
342     * locks while waiting for us due to the global event semantics.
343     */
344    {
345        EventQueue::ScopedRelease sr(curEventQueue());
346        // we do a global sync here that is supposed to happen at the same
347        // tick in all gem5 peers
348        DistIface::sync->run(true);
349        // global sync completed
350    }
351    if (DistIface::sync->doCkpt)
352        exitSimLoop("checkpoint");
353    if (DistIface::sync->doExit)
354        exitSimLoop("exit request from gem5 peers");
355
356    // schedule the next periodic sync
357    repeat = DistIface::sync->nextRepeat;
358    schedule(curTick() + repeat);
359}
360
361void
362DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay)
363{
364    // This is called from the receiver thread when it starts running. The new
365    // receiver thread shares the event queue with the simulation thread
366    // (associated with the simulated Ethernet link).
367    curEventQueue(eventManager->eventQueue());
368
369    recvDone = recv_done;
370    linkDelay = link_delay;
371}
372
373Tick
374DistIface::RecvScheduler::calcReceiveTick(Tick send_tick,
375                                          Tick send_delay,
376                                          Tick prev_recv_tick)
377{
378    Tick recv_tick = send_tick + send_delay + linkDelay;
379    // sanity check (we need atleast a send delay long window)
380    assert(recv_tick >= prev_recv_tick + send_delay);
381    panic_if(prev_recv_tick + send_delay > recv_tick,
382             "Receive window is smaller than send delay");
383    panic_if(recv_tick <= curTick(),
384             "Simulators out of sync - missed packet receive by %llu ticks"
385             "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
386             "linkDelay: %lu )",
387             curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
388             linkDelay);
389
390    return recv_tick;
391}
392
393void
394DistIface::RecvScheduler::resumeRecvTicks()
395{
396    // Schedule pending packets asap in case link speed/delay changed when
397    // restoring from the checkpoint.
398    // This may be done during unserialize except that curTick() is unknown
399    // so we call this during drainResume().
400    // If we are not restoring from a checkppint then link latency could not
401    // change so we just return.
402    if (!ckptRestore)
403        return;
404
405    std::vector<Desc> v;
406    while (!descQueue.empty()) {
407        Desc d = descQueue.front();
408        descQueue.pop();
409        d.sendTick = curTick();
410        d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed
411        v.push_back(d);
412    }
413
414    for (auto &d : v)
415        descQueue.push(d);
416
417    if (recvDone->scheduled()) {
418        assert(!descQueue.empty());
419        eventManager->reschedule(recvDone, curTick());
420    } else {
421        assert(descQueue.empty() && v.empty());
422    }
423    ckptRestore = false;
424}
425
426void
427DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet,
428                                     Tick send_tick,
429                                     Tick send_delay)
430{
431    // Note : this is called from the receiver thread
432    curEventQueue()->lock();
433    Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
434
435    DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
436            "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
437            send_tick, send_delay, linkDelay, recv_tick);
438    // Every packet must be sent and arrive in the same quantum
439    assert(send_tick > master->syncEvent->when() -
440           master->syncEvent->repeat);
441    // No packet may be scheduled for receive in the arrival quantum
442    assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
443
444    // Now we are about to schedule a recvDone event for the new data packet.
445    // We use the same recvDone object for all incoming data packets. Packet
446    // descriptors are saved in the ordered queue. The currently scheduled
447    // packet is always on the top of the queue.
448    // NOTE:  we use the event queue lock to protect the receive desc queue,
449    // too, which is accessed both by the receiver thread and the simulation
450    // thread.
451    descQueue.emplace(new_packet, send_tick, send_delay);
452    if (descQueue.size() == 1) {
453        assert(!recvDone->scheduled());
454        eventManager->schedule(recvDone, recv_tick);
455    } else {
456        assert(recvDone->scheduled());
457        panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
458                 "Out of order packet received (recv_tick: %lu top(): %lu\n",
459                 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
460    }
461    curEventQueue()->unlock();
462}
463
464EthPacketPtr
465DistIface::RecvScheduler::popPacket()
466{
467    // Note : this is called from the simulation thread when a receive done
468    // event is being processed for the link. We assume that the thread holds
469    // the event queue queue lock when this is called!
470    EthPacketPtr next_packet = descQueue.front().packet;
471    descQueue.pop();
472
473    if (descQueue.size() > 0) {
474        Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
475                                         descQueue.front().sendDelay,
476                                         curTick());
477        eventManager->schedule(recvDone, recv_tick);
478    }
479    prevRecvTick = curTick();
480    return next_packet;
481}
482
483void
484DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const
485{
486        SERIALIZE_SCALAR(sendTick);
487        SERIALIZE_SCALAR(sendDelay);
488        packet->serialize("rxPacket", cp);
489}
490
491void
492DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp)
493{
494        UNSERIALIZE_SCALAR(sendTick);
495        UNSERIALIZE_SCALAR(sendDelay);
496        packet = std::make_shared<EthPacketData>();
497        packet->unserialize("rxPacket", cp);
498}
499
500void
501DistIface::RecvScheduler::serialize(CheckpointOut &cp) const
502{
503    SERIALIZE_SCALAR(prevRecvTick);
504    // serialize the receive desc queue
505    std::queue<Desc> tmp_queue(descQueue);
506    unsigned n_desc_queue = descQueue.size();
507    assert(tmp_queue.size() == descQueue.size());
508    SERIALIZE_SCALAR(n_desc_queue);
509    for (int i = 0; i < n_desc_queue; i++) {
510        tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
511        tmp_queue.pop();
512    }
513    assert(tmp_queue.empty());
514}
515
516void
517DistIface::RecvScheduler::unserialize(CheckpointIn &cp)
518{
519    assert(descQueue.size() == 0);
520    assert(!recvDone->scheduled());
521    assert(!ckptRestore);
522
523    UNSERIALIZE_SCALAR(prevRecvTick);
524    // unserialize the receive desc queue
525    unsigned n_desc_queue;
526    UNSERIALIZE_SCALAR(n_desc_queue);
527    for (int i = 0; i < n_desc_queue; i++) {
528        Desc recv_desc;
529        recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
530        descQueue.push(recv_desc);
531    }
532    ckptRestore = true;
533}
534
535DistIface::DistIface(unsigned dist_rank,
536                     unsigned dist_size,
537                     Tick sync_start,
538                     Tick sync_repeat,
539                     EventManager *em,
540                     bool is_switch, int num_nodes) :
541    syncStart(sync_start), syncRepeat(sync_repeat),
542    recvThread(nullptr), recvScheduler(em),
543    rank(dist_rank), size(dist_size)
544{
545    DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
546    isMaster = false;
547    if (master == nullptr) {
548        assert(sync == nullptr);
549        assert(syncEvent == nullptr);
550        if (is_switch)
551            sync = new SyncSwitch(num_nodes);
552        else
553            sync = new SyncNode();
554        syncEvent = new SyncEvent();
555        master = this;
556        isMaster = true;
557    }
558    distIfaceId = distIfaceNum;
559    distIfaceNum++;
560}
561
562DistIface::~DistIface()
563{
564    assert(recvThread);
565    delete recvThread;
566    if (this == master) {
567        assert(syncEvent);
568        delete syncEvent;
569        assert(sync);
570        delete sync;
571        master = nullptr;
572    }
573}
574
575void
576DistIface::packetOut(EthPacketPtr pkt, Tick send_delay)
577{
578    Header header;
579
580    // Prepare a dist header packet for the Ethernet packet we want to
581    // send out.
582    header.msgType = MsgType::dataDescriptor;
583    header.sendTick  = curTick();
584    header.sendDelay = send_delay;
585
586    header.dataPacketLength = pkt->length;
587    header.simLength = pkt->simLength;
588
589    // Send out the packet and the meta info.
590    sendPacket(header, pkt);
591
592    DPRINTF(DistEthernetPkt,
593            "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
594            pkt->length, send_delay);
595}
596
597void
598DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
599{
600    EthPacketPtr new_packet;
601    DistHeaderPkt::Header header;
602
603    // Initialize receive scheduler parameters
604    recvScheduler.init(recv_done, link_delay);
605
606    // Main loop to wait for and process any incoming message.
607    for (;;) {
608        // recvHeader() blocks until the next dist header packet comes in.
609        if (!recvHeader(header)) {
610            // We lost connection to the peer gem5 processes most likely
611            // because one of them called m5 exit. So we stop here.
612            // Grab the eventq lock to stop the simulation thread
613            curEventQueue()->lock();
614            exitSimLoop("Message server closed connection, simulator "
615                        "is exiting");
616            curEventQueue()->unlock();
617            break;
618        }
619
620        // We got a valid dist header packet, let's process it
621        if (header.msgType == MsgType::dataDescriptor) {
622            recvPacket(header, new_packet);
623            recvScheduler.pushPacket(new_packet,
624                                     header.sendTick,
625                                     header.sendDelay);
626        } else {
627            // everything else must be synchronisation related command
628            sync->progress(header.sendTick,
629                           header.syncRepeat,
630                           header.needCkpt,
631                           header.needExit);
632        }
633    }
634}
635
636void
637DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
638{
639    assert(recvThread == nullptr);
640
641    recvThread = new std::thread(&DistIface::recvThreadFunc,
642                                 this,
643                                 const_cast<Event *>(recv_done),
644                                 link_delay);
645    recvThreadsNum++;
646}
647
648DrainState
649DistIface::drain()
650{
651    DPRINTF(DistEthernet,"DistIFace::drain() called\n");
652    // This can be called multiple times in the same drain cycle.
653    if (this == master)
654        syncEvent->draining(true);
655    return DrainState::Drained;
656}
657
658void
659DistIface::drainResume() {
660    DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
661    if (this == master)
662        syncEvent->draining(false);
663    recvScheduler.resumeRecvTicks();
664}
665
666void
667DistIface::serialize(CheckpointOut &cp) const
668{
669    // Drain the dist interface before the checkpoint is taken. We cannot call
670    // this as part of the normal drain cycle because this dist sync has to be
671    // called exactly once after the system is fully drained.
672    sync->drainComplete();
673
674    unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
675
676    SERIALIZE_SCALAR(rank_orig);
677    SERIALIZE_SCALAR(dist_iface_id_orig);
678
679    recvScheduler.serializeSection(cp, "recvScheduler");
680    if (this == master) {
681        sync->serializeSection(cp, "Sync");
682    }
683}
684
685void
686DistIface::unserialize(CheckpointIn &cp)
687{
688    unsigned rank_orig, dist_iface_id_orig;
689    UNSERIALIZE_SCALAR(rank_orig);
690    UNSERIALIZE_SCALAR(dist_iface_id_orig);
691
692    panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
693             rank, rank_orig);
694    panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
695             "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
696             dist_iface_id_orig);
697
698    recvScheduler.unserializeSection(cp, "recvScheduler");
699    if (this == master) {
700        sync->unserializeSection(cp, "Sync");
701    }
702}
703
704void
705DistIface::init(const Event *done_event, Tick link_delay)
706{
707    // Init hook for the underlaying message transport to setup/finalize
708    // communication channels
709    initTransport();
710
711    // Spawn a new receiver thread that will process messages
712    // coming in from peer gem5 processes.
713    // The receive thread will also schedule a (receive) doneEvent
714    // for each incoming data packet.
715    spawnRecvThread(done_event, link_delay);
716
717
718    // Adjust the periodic sync start and interval. Different DistIface
719    // might have different requirements. The singleton sync object
720    // will select the minimum values for both params.
721    assert(sync != nullptr);
722    sync->init(syncStart, syncRepeat);
723
724    // Initialize the seed for random generator to avoid the same sequence
725    // in all gem5 peer processes
726    assert(master != nullptr);
727    if (this == master)
728        random_mt.init(5489 * (rank+1) + 257);
729}
730
731void
732DistIface::startup()
733{
734    DPRINTF(DistEthernet, "DistIface::startup() started\n");
735    if (this == master)
736        syncEvent->start();
737    DPRINTF(DistEthernet, "DistIface::startup() done\n");
738}
739
740bool
741DistIface::readyToCkpt(Tick delay, Tick period)
742{
743    bool ret = true;
744    DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
745            "period:%lu\n", delay, period);
746    if (master) {
747        if (delay == 0) {
748            inform("m5 checkpoint called with zero delay => triggering collaborative "
749                   "checkpoint\n");
750            sync->requestCkpt(ReqType::collective);
751        } else {
752            inform("m5 checkpoint called with non-zero delay => triggering immediate "
753                   "checkpoint (at the next sync)\n");
754            sync->requestCkpt(ReqType::immediate);
755        }
756        if (period != 0)
757            inform("Non-zero period for m5_ckpt is ignored in "
758                   "distributed gem5 runs\n");
759        ret = false;
760    }
761    return ret;
762}
763
764bool
765DistIface::readyToExit(Tick delay)
766{
767    bool ret = true;
768    DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
769            delay);
770    if (master) {
771        if (delay == 0) {
772            inform("m5 exit called with zero delay => triggering collaborative "
773                   "exit\n");
774            sync->requestExit(ReqType::collective);
775        } else {
776            inform("m5 exit called with non-zero delay => triggering immediate "
777                   "exit (at the next sync)\n");
778            sync->requestExit(ReqType::immediate);
779        }
780        ret = false;
781    }
782    return ret;
783}
784
785uint64_t
786DistIface::rankParam()
787{
788    uint64_t val;
789    if (master) {
790        val = master->rank;
791    } else {
792        warn("Dist-rank parameter is queried in single gem5 simulation.");
793        val = 0;
794    }
795    return val;
796}
797
798uint64_t
799DistIface::sizeParam()
800{
801    uint64_t val;
802    if (master) {
803        val = master->size;
804    } else {
805        warn("Dist-size parameter is queried in single gem5 simulation.");
806        val = 1;
807    }
808    return val;
809}
810