dist_iface.cc revision 10923
1/*
2 * Copyright (c) 2015 ARM Limited
3 * All rights reserved
4 *
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder.  You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated
11 * unmodified and in its entirety in all distributions of the software,
12 * modified or unmodified, in source code or in binary form.
13 *
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are
16 * met: redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer;
18 * redistributions in binary form must reproduce the above copyright
19 * notice, this list of conditions and the following disclaimer in the
20 * documentation and/or other materials provided with the distribution;
21 * neither the name of the copyright holders nor the names of its
22 * contributors may be used to endorse or promote products derived from
23 * this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 * Authors: Gabor Dozsa
38 */
39
40/* @file
41 * The interface class for multi gem5 simulations.
42 */
43
44#include "dev/multi_iface.hh"
45
46#include <queue>
47#include <thread>
48
49#include "base/random.hh"
50#include "base/trace.hh"
51#include "debug/MultiEthernet.hh"
52#include "debug/MultiEthernetPkt.hh"
53#include "dev/etherpkt.hh"
54#include "sim/sim_exit.hh"
55#include "sim/sim_object.hh"
56
57
58MultiIface::Sync *MultiIface::sync = nullptr;
59MultiIface::SyncEvent *MultiIface::syncEvent = nullptr;
60unsigned MultiIface::recvThreadsNum = 0;
61MultiIface *MultiIface::master = nullptr;
62
63bool
64MultiIface::Sync::run(SyncTrigger t, Tick sync_tick)
65{
66    std::unique_lock<std::mutex> sync_lock(lock);
67
68    trigger = t;
69    if (trigger != SyncTrigger::periodic) {
70        DPRINTF(MultiEthernet,"MultiIface::Sync::run() trigger:%d\n",
71                (unsigned)trigger);
72    }
73
74    switch (state) {
75      case SyncState::asyncCkpt:
76        switch (trigger) {
77          case SyncTrigger::ckpt:
78            assert(MultiIface::syncEvent->interrupted == false);
79            state = SyncState::busy;
80            break;
81          case SyncTrigger::periodic:
82            if (waitNum == 0) {
83                // So all recv threads got an async checkpoint request already
84                // and a simExit is scheduled at the end of the current tick
85                // (i.e. it is a periodic sync scheduled at the same tick as
86                // the simExit).
87                state = SyncState::idle;
88                DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted "
89                "due to async ckpt scheduled\n");
90                return false;
91            } else {
92                // we still need to wait for some receiver thread to get the
93                // aysnc ckpt request. We are going to proceed as 'interrupted'
94                // periodic sync.
95                state = SyncState::interrupted;
96                DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted "
97                "due to ckpt request is coming in\n");
98            }
99            break;
100          case SyncTrigger::atomic:
101            assert(trigger != SyncTrigger::atomic);
102        }
103        break;
104      case SyncState::idle:
105        state = SyncState::busy;
106        break;
107        // Only one sync can be active at any time
108      case SyncState::interrupted:
109      case SyncState::busy:
110        assert(state != SyncState::interrupted);
111        assert(state != SyncState::busy);
112        break;
113    }
114    // Kick-off the sync unless we are in the middle of an interrupted
115    // periodic sync
116    if (state != SyncState::interrupted) {
117        assert(waitNum == 0);
118        waitNum = MultiIface::recvThreadsNum;
119        // initiate the global synchronisation
120        assert(MultiIface::master != nullptr);
121        MultiIface::master->syncRaw(triggerToMsg[(unsigned)trigger], sync_tick);
122    }
123    // now wait until all receiver threads complete the synchronisation
124    auto lf = [this]{ return waitNum == 0; };
125    cv.wait(sync_lock, lf);
126
127    // we are done
128    assert(state == SyncState::busy || state == SyncState::interrupted);
129    bool ret = (state != SyncState::interrupted);
130    state = SyncState::idle;
131    return ret;
132}
133
134void
135MultiIface::Sync::progress(MsgType msg)
136{
137    std::unique_lock<std::mutex> sync_lock(lock);
138
139    switch (msg) {
140      case MsgType::cmdAtomicSyncAck:
141        assert(state == SyncState::busy && trigger == SyncTrigger::atomic);
142        break;
143      case MsgType::cmdPeriodicSyncAck:
144        assert(state == SyncState::busy && trigger == SyncTrigger::periodic);
145        break;
146      case MsgType::cmdCkptSyncAck:
147        assert(state == SyncState::busy && trigger == SyncTrigger::ckpt);
148        break;
149      case MsgType::cmdCkptSyncReq:
150        switch (state) {
151          case SyncState::busy:
152            if (trigger == SyncTrigger::ckpt) {
153                // We are already in a checkpoint sync but got another ckpt
154                // sync request. This may happen if two (or more) peer gem5
155                // processes try to start a ckpt nearly at the same time.
156                // Incrementing waitNum here (before decrementing it below)
157                // effectively results in ignoring this new ckpt sync request.
158                waitNum++;
159                break;
160            }
161            assert (waitNum == recvThreadsNum);
162            state = SyncState::interrupted;
163            // we need to fall over here to handle "recvThreadsNum == 1" case
164          case SyncState::interrupted:
165            assert(trigger == SyncTrigger::periodic);
166            assert(waitNum >= 1);
167            if (waitNum == 1) {
168                exitSimLoop("checkpoint");
169            }
170            break;
171          case SyncState::idle:
172            // There is no on-going sync so we got an async ckpt request. If we
173            // are the only receiver thread then we need to schedule the
174            // checkpoint. Otherwise, only change the state to 'asyncCkpt' and
175            // let the last receiver thread to schedule the checkpoint at the
176            // 'asyncCkpt' case.
177            // Note that a periodic or resume sync may start later and that can
178            // trigger a state change to 'interrupted' (so the checkpoint may
179            // get scheduled at 'interrupted' case finally).
180            assert(waitNum == 0);
181            state = SyncState::asyncCkpt;
182            waitNum = MultiIface::recvThreadsNum;
183            // we need to fall over here to handle "recvThreadsNum == 1" case
184          case SyncState::asyncCkpt:
185            assert(waitNum >= 1);
186            if (waitNum == 1)
187                exitSimLoop("checkpoint");
188            break;
189          default:
190            panic("Unexpected state for checkpoint request message");
191            break;
192        }
193        break;
194      default:
195        panic("Unknown msg type");
196        break;
197    }
198    waitNum--;
199    assert(state != SyncState::idle);
200    // Notify the simultaion thread if there is an on-going sync.
201    if (state != SyncState::asyncCkpt) {
202        sync_lock.unlock();
203        cv.notify_one();
204    }
205}
206
207void MultiIface::SyncEvent::start(Tick start, Tick interval)
208{
209    assert(!scheduled());
210    if (interval == 0)
211        panic("Multi synchronisation period must be greater than zero");
212    repeat = interval;
213    schedule(start);
214}
215
216void
217MultiIface::SyncEvent::adjust(Tick start_tick, Tick repeat_tick)
218{
219    // The new multi interface may require earlier start of the
220    // synchronisation.
221    assert(scheduled() == true);
222    if (start_tick < when())
223        reschedule(start_tick);
224    // The new multi interface may require more frequent synchronisation.
225    if (repeat == 0)
226        panic("Multi synchronisation period must be greater than zero");
227    if (repeat < repeat_tick)
228        repeat = repeat_tick;
229}
230
231void
232MultiIface::SyncEvent::process()
233{
234    /*
235     * Note that this is a global event so this process method will be called
236     * by only exactly one thread.
237     */
238    // if we are draining the system then we must not start a periodic sync (as
239    // it is not sure that all peer gem5 will reach this tick before taking
240    // the checkpoint).
241    if (isDraining == true) {
242        assert(interrupted == false);
243        interrupted = true;
244        DPRINTF(MultiEthernet,"MultiIface::SyncEvent::process() interrupted "
245                "due to draining\n");
246        return;
247    }
248    if (interrupted == false)
249        scheduledAt = curTick();
250    /*
251     * We hold the eventq lock at this point but the receiver thread may
252     * need the lock to schedule new recv events while waiting for the
253     * multi sync to complete.
254     * Note that the other simulation threads also release their eventq
255     * locks while waiting for us due to the global event semantics.
256     */
257    curEventQueue()->unlock();
258    // we do a global sync here
259    interrupted = !MultiIface::sync->run(SyncTrigger::periodic, scheduledAt);
260    // Global sync completed or got interrupted.
261    // we are expected to exit with the eventq lock held
262    curEventQueue()->lock();
263    // schedule the next global sync event if this one completed. Otherwise
264    // (i.e. this one was interrupted by a checkpoint request), we will
265    // reschedule this one after the draining is complete.
266    if (!interrupted)
267        schedule(scheduledAt + repeat);
268}
269
270void MultiIface::SyncEvent::resume()
271{
272    Tick sync_tick;
273    assert(!scheduled());
274    if (interrupted) {
275        assert(curTick() >= scheduledAt);
276        // We have to complete the interrupted periodic sync asap.
277        // Note that this sync might be interrupted now again with a checkpoint
278        // request from a peer gem5...
279        sync_tick = curTick();
280        schedule(sync_tick);
281    } else {
282        // So we completed the last periodic sync, let's find  out the tick for
283        // next one
284        assert(curTick() > scheduledAt);
285        sync_tick = scheduledAt + repeat;
286        if (sync_tick < curTick())
287            panic("Cannot resume periodic synchronisation");
288        schedule(sync_tick);
289    }
290    DPRINTF(MultiEthernet,
291            "MultiIface::SyncEvent periodic sync resumed at %lld "
292            "(curTick:%lld)\n", sync_tick, curTick());
293}
294
295void MultiIface::SyncEvent::serialize(const std::string &base,
296                                      CheckpointOut &cp) const
297{
298    // Save the periodic multi sync schedule information
299    paramOut(cp, base + ".periodicSyncRepeat", repeat);
300    paramOut(cp, base + ".periodicSyncInterrupted", interrupted);
301    paramOut(cp, base + ".periodicSyncAt", scheduledAt);
302}
303
304void MultiIface::SyncEvent::unserialize(const std::string &base,
305                                        CheckpointIn &cp)
306{
307    paramIn(cp, base + ".periodicSyncRepeat", repeat);
308    paramIn(cp, base + ".periodicSyncInterrupted", interrupted);
309    paramIn(cp, base + ".periodicSyncAt", scheduledAt);
310}
311
312MultiIface::MultiIface(unsigned multi_rank,
313                       Tick sync_start,
314                       Tick sync_repeat,
315                       EventManager *em) :
316    syncStart(sync_start), syncRepeat(sync_repeat),
317    recvThread(nullptr), eventManager(em), recvDone(nullptr),
318    scheduledRecvPacket(nullptr), linkDelay(0), rank(multi_rank)
319{
320    DPRINTF(MultiEthernet, "MultiIface() ctor rank:%d\n",multi_rank);
321    if (master == nullptr) {
322        assert(sync == nullptr);
323        assert(syncEvent == nullptr);
324        sync = new Sync();
325        syncEvent = new SyncEvent();
326        master = this;
327    }
328}
329
330MultiIface::~MultiIface()
331{
332    assert(recvThread);
333    delete recvThread;
334    if (this == master) {
335        assert(syncEvent);
336        delete syncEvent;
337        assert(sync);
338        delete sync;
339    }
340}
341
342void
343MultiIface::packetOut(EthPacketPtr pkt, Tick send_delay)
344{
345    MultiHeaderPkt::Header header_pkt;
346    unsigned address_length = MultiHeaderPkt::maxAddressLength();
347
348    // Prepare a multi header packet for the Ethernet packet we want to
349    // send out.
350    header_pkt.msgType = MsgType::dataDescriptor;
351    header_pkt.sendTick  = curTick();
352    header_pkt.sendDelay = send_delay;
353
354    // Store also the source and destination addresses.
355    pkt->packAddress(header_pkt.srcAddress, header_pkt.dstAddress,
356                     address_length);
357
358    header_pkt.dataPacketLength = pkt->size();
359
360    // Send out the multi hedare packet followed by the Ethernet packet.
361    sendRaw(&header_pkt, sizeof(header_pkt), header_pkt.dstAddress);
362    sendRaw(pkt->data, pkt->size(), header_pkt.dstAddress);
363    DPRINTF(MultiEthernetPkt,
364            "MultiIface::sendDataPacket() done size:%d send_delay:%llu "
365            "src:0x%02x%02x%02x%02x%02x%02x "
366            "dst:0x%02x%02x%02x%02x%02x%02x\n",
367            pkt->size(), send_delay,
368            header_pkt.srcAddress[0], header_pkt.srcAddress[1],
369            header_pkt.srcAddress[2], header_pkt.srcAddress[3],
370            header_pkt.srcAddress[4], header_pkt.srcAddress[5],
371            header_pkt.dstAddress[0], header_pkt.dstAddress[1],
372            header_pkt.dstAddress[2], header_pkt.dstAddress[3],
373            header_pkt.dstAddress[4], header_pkt.dstAddress[5]);
374}
375
376bool
377MultiIface::recvHeader(MultiHeaderPkt::Header &header_pkt)
378{
379    // Blocking receive of an incoming multi header packet.
380    return recvRaw((void *)&header_pkt, sizeof(header_pkt));
381}
382
383void
384MultiIface::recvData(const MultiHeaderPkt::Header &header_pkt)
385{
386    // We are here beacuse a header packet has been received implying
387    // that an Ethernet (data) packet is coming in next.
388    assert(header_pkt.msgType == MsgType::dataDescriptor);
389    // Allocate storage for the incoming Ethernet packet.
390    EthPacketPtr new_packet(new EthPacketData(header_pkt.dataPacketLength));
391    // Now execute the blocking receive and store the incoming data directly
392    // in the new EthPacketData object.
393    if (! recvRaw((void *)(new_packet->data), header_pkt.dataPacketLength))
394        panic("Missing data packet");
395
396    new_packet->length = header_pkt.dataPacketLength;
397    // Grab the event queue lock to schedule a new receive event for the
398    // data packet.
399    curEventQueue()->lock();
400    // Compute the receive tick. It includes the send delay and the
401    // simulated link delay.
402    Tick recv_tick = header_pkt.sendTick + header_pkt.sendDelay + linkDelay;
403    DPRINTF(MultiEthernetPkt, "MultiIface::recvThread() packet receive, "
404            "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
405            header_pkt.sendTick, header_pkt.sendDelay, linkDelay, recv_tick);
406
407    if (recv_tick <= curTick()) {
408        panic("Simulators out of sync - missed packet receive by %llu ticks",
409              curTick() - recv_tick);
410    }
411    // Now we are about to schedule a recvDone event for the new data packet.
412    // We use the same recvDone object for all incoming data packets. If
413    // that is already scheduled - i.e. a receive event for a previous
414    // data packet is already pending - then we have to check whether the
415    // receive tick for the new packet is earlier than that of the currently
416    // pending event. Packets may arrive out-of-order with respect to
417    // simulated receive time. If that is the case, we need to re-schedule the
418    // recvDone event for the new packet. Otherwise, we save the packet
419    // pointer and the recv tick for the new packet in the recvQueue. See
420    // the implementation of the packetIn() method for comments on how this
421    // information is retrieved from the recvQueue by the simulation thread.
422    if (!recvDone->scheduled()) {
423        assert(recvQueue.size() == 0);
424        assert(scheduledRecvPacket == nullptr);
425        scheduledRecvPacket = new_packet;
426        eventManager->schedule(recvDone, recv_tick);
427    } else if (recvDone->when() > recv_tick) {
428        recvQueue.emplace(scheduledRecvPacket, recvDone->when());
429        eventManager->reschedule(recvDone, recv_tick);
430        scheduledRecvPacket = new_packet;
431    } else {
432        recvQueue.emplace(new_packet, recv_tick);
433    }
434    curEventQueue()->unlock();
435}
436
437void
438MultiIface::recvThreadFunc()
439{
440    EthPacketPtr new_packet;
441    MultiHeaderPkt::Header header;
442
443    // The new receiver thread shares the event queue with the simulation
444    // thread (associated with the simulated Ethernet link).
445    curEventQueue(eventManager->eventQueue());
446    // Main loop to wait for and process any incoming message.
447    for (;;) {
448        // recvHeader() blocks until the next multi header packet comes in.
449        if (!recvHeader(header)) {
450            // We lost connection to the peer gem5 processes most likely
451            // because one of them called m5 exit. So we stop here.
452            exit_message("info", 0, "Message server closed connection, "
453                         "simulation is exiting");
454        }
455        // We got a valid multi header packet, let's process it
456        if (header.msgType == MsgType::dataDescriptor) {
457            recvData(header);
458        } else {
459            // everything else must be synchronisation related command
460            sync->progress(header.msgType);
461        }
462    }
463}
464
465EthPacketPtr
466MultiIface::packetIn()
467{
468    // We are called within the process() method of the recvDone event. We
469    // return the packet that triggered the current receive event.
470    // If there is further packets in the recvQueue, we also have to schedule
471    // the recvEvent for the next packet with the smallest receive tick.
472    // The priority queue container ensures that smallest receive tick is
473    // always on the top of the queue.
474    assert(scheduledRecvPacket != nullptr);
475    EthPacketPtr next_packet = scheduledRecvPacket;
476
477    if (! recvQueue.empty()) {
478        eventManager->schedule(recvDone, recvQueue.top().second);
479        scheduledRecvPacket = recvQueue.top().first;
480        recvQueue.pop();
481    } else {
482        scheduledRecvPacket = nullptr;
483    }
484
485    return next_packet;
486}
487
488void
489MultiIface::spawnRecvThread(Event *recv_done, Tick link_delay)
490{
491    assert(recvThread == nullptr);
492    // all receive thread must be spawned before simulation starts
493    assert(eventManager->eventQueue()->getCurTick() == 0);
494
495    recvDone = recv_done;
496    linkDelay = link_delay;
497
498    recvThread = new std::thread(&MultiIface::recvThreadFunc, this);
499
500    recvThreadsNum++;
501}
502
503DrainState
504MultiIface::drain()
505{
506    DPRINTF(MultiEthernet,"MultiIFace::drain() called\n");
507
508    // This can be called multiple times in the same drain cycle.
509    if (master == this) {
510        syncEvent->isDraining = true;
511    }
512
513    return DrainState::Drained;
514}
515
516void MultiIface::drainDone() {
517    if (master == this) {
518        assert(syncEvent->isDraining == true);
519        syncEvent->isDraining = false;
520        // We need to resume the interrupted periodic sync here now that the
521        // draining is done. If the last periodic sync completed before the
522        // checkpoint then the next one is already scheduled.
523        if (syncEvent->interrupted)
524            syncEvent->resume();
525    }
526}
527
528void MultiIface::serialize(const std::string &base, CheckpointOut &cp) const
529{
530    // Drain the multi interface before the checkpoint is taken. We cannot call
531    // this as part of the normal drain cycle because this multi sync has to be
532    // called exactly once after the system is fully drained.
533    // Note that every peer will take a checkpoint but they may take it at
534    // different ticks.
535    // This sync request may interrupt an on-going periodic sync in some peers.
536    sync->run(SyncTrigger::ckpt, curTick());
537
538    // Save the periodic multi sync status
539    syncEvent->serialize(base, cp);
540
541    unsigned n_rx_packets = recvQueue.size();
542    if (scheduledRecvPacket != nullptr)
543        n_rx_packets++;
544
545    paramOut(cp, base + ".nRxPackets", n_rx_packets);
546
547    if (n_rx_packets > 0) {
548        assert(recvDone->scheduled());
549        scheduledRecvPacket->serialize(base + ".rxPacket[0]", cp);
550    }
551
552    for (unsigned i=1; i < n_rx_packets; i++)  {
553        const RecvInfo recv_info = recvQueue.impl().at(i-1);
554        recv_info.first->serialize(base + csprintf(".rxPacket[%d]", i), cp);
555        Tick rx_tick = recv_info.second;
556        paramOut(cp, base + csprintf(".rxTick[%d]", i), rx_tick);
557    }
558}
559
560void MultiIface::unserialize(const std::string &base, CheckpointIn &cp)
561{
562    assert(recvQueue.size() == 0);
563    assert(scheduledRecvPacket == nullptr);
564    assert(recvDone->scheduled() == false);
565
566    // restore periodic sync info
567    syncEvent->unserialize(base, cp);
568
569    unsigned n_rx_packets;
570    paramIn(cp, base + ".nRxPackets", n_rx_packets);
571
572    if (n_rx_packets > 0) {
573        scheduledRecvPacket = std::make_shared<EthPacketData>(16384);
574        scheduledRecvPacket->unserialize(base + ".rxPacket[0]", cp);
575        // Note: receive event will be scheduled when the link is unserialized
576    }
577
578    for (unsigned i=1; i < n_rx_packets; i++) {
579        EthPacketPtr rx_packet = std::make_shared<EthPacketData>(16384);
580        rx_packet->unserialize(base + csprintf(".rxPacket[%d]", i), cp);
581        Tick rx_tick = 0;
582        paramIn(cp, base + csprintf(".rxTick[%d]", i), rx_tick);
583        assert(rx_tick > 0);
584        recvQueue.emplace(rx_packet,rx_tick);
585    }
586}
587
588void MultiIface::initRandom()
589{
590    // Initialize the seed for random generator to avoid the same sequence
591    // in all gem5 peer processes
592    assert(master != nullptr);
593    if (this == master)
594        random_mt.init(5489 * (rank+1) + 257);
595}
596
597void MultiIface::startPeriodicSync()
598{
599    DPRINTF(MultiEthernet, "MultiIface:::initPeriodicSync started\n");
600    // Do a global sync here to ensure that peer gem5 processes are around
601    // (actually this may not be needed...)
602    sync->run(SyncTrigger::atomic, curTick());
603
604    // Start the periodic sync if it is a fresh simulation from scratch
605    if (curTick() == 0) {
606        if (this == master) {
607        syncEvent->start(syncStart, syncRepeat);
608        inform("Multi synchronisation activated: start at %lld, "
609               "repeat at every %lld ticks.\n",
610               syncStart, syncRepeat);
611        } else {
612            // In case another multiIface object requires different schedule
613            // for periodic sync than the master does.
614            syncEvent->adjust(syncStart, syncRepeat);
615        }
616    } else {
617        // Schedule the next periodic sync if resuming from a checkpoint
618        if (this == master)
619            syncEvent->resume();
620    }
621    DPRINTF(MultiEthernet, "MultiIface::initPeriodicSync done\n");
622}
623