dist_iface.cc (11290:1640dd68b0a4) dist_iface.cc (11325:67cc559d513a)
1/*
2 * Copyright (c) 2015 ARM Limited
3 * All rights reserved
4 *
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder. You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated
11 * unmodified and in its entirety in all distributions of the software,
12 * modified or unmodified, in source code or in binary form.
13 *
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are
16 * met: redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer;
18 * redistributions in binary form must reproduce the above copyright
19 * notice, this list of conditions and the following disclaimer in the
20 * documentation and/or other materials provided with the distribution;
21 * neither the name of the copyright holders nor the names of its
22 * contributors may be used to endorse or promote products derived from
23 * this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 * Authors: Gabor Dozsa
38 */
39
40/* @file
41 * The interface class for dist-gem5 simulations.
42 */
43
44#include "dev/net/dist_iface.hh"
45
46#include <queue>
47#include <thread>
48
49#include "base/random.hh"
50#include "base/trace.hh"
51#include "debug/DistEthernet.hh"
52#include "debug/DistEthernetPkt.hh"
53#include "dev/net/etherpkt.hh"
54#include "sim/sim_exit.hh"
55#include "sim/sim_object.hh"
56
57using namespace std;
58DistIface::Sync *DistIface::sync = nullptr;
59DistIface::SyncEvent *DistIface::syncEvent = nullptr;
60unsigned DistIface::distIfaceNum = 0;
61unsigned DistIface::recvThreadsNum = 0;
62DistIface *DistIface::master = nullptr;
63
64void
65DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
66{
67 if (start_tick < firstAt) {
68 firstAt = start_tick;
69 inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
70 }
71
72 if (repeat_tick == 0)
73 panic("Dist synchronisation interval must be greater than zero");
74
75 if (repeat_tick < nextRepeat) {
76 nextRepeat = repeat_tick;
77 inform("Dist synchronisation interval is changed to %lu.\n",
78 nextRepeat);
79 }
80}
81
82DistIface::SyncSwitch::SyncSwitch(int num_nodes)
83{
84 numNodes = num_nodes;
85 waitNum = num_nodes;
86 numExitReq = 0;
87 numCkptReq = 0;
88 doExit = false;
89 doCkpt = false;
90 firstAt = std::numeric_limits<Tick>::max();
91 nextAt = 0;
92 nextRepeat = std::numeric_limits<Tick>::max();
93}
94
95DistIface::SyncNode::SyncNode()
96{
97 waitNum = 0;
98 needExit = ReqType::none;
99 needCkpt = ReqType::none;
100 doExit = false;
101 doCkpt = false;
102 firstAt = std::numeric_limits<Tick>::max();
103 nextAt = 0;
104 nextRepeat = std::numeric_limits<Tick>::max();
105}
106
107void
108DistIface::SyncNode::run(bool same_tick)
109{
110 std::unique_lock<std::mutex> sync_lock(lock);
111 Header header;
112
113 assert(waitNum == 0);
114 waitNum = DistIface::recvThreadsNum;
115 // initiate the global synchronisation
116 header.msgType = MsgType::cmdSyncReq;
117 header.sendTick = curTick();
118 header.syncRepeat = nextRepeat;
119 header.needCkpt = needCkpt;
120 if (needCkpt != ReqType::none)
121 needCkpt = ReqType::pending;
122 header.needExit = needExit;
123 if (needExit != ReqType::none)
124 needExit = ReqType::pending;
125 DistIface::master->sendCmd(header);
126 // now wait until all receiver threads complete the synchronisation
127 auto lf = [this]{ return waitNum == 0; };
128 cv.wait(sync_lock, lf);
129 // global synchronisation is done
130 assert(!same_tick || (nextAt == curTick()));
131}
132
133
134void
135DistIface::SyncSwitch::run(bool same_tick)
136{
137 std::unique_lock<std::mutex> sync_lock(lock);
138 Header header;
139 // Wait for the sync requests from the nodes
140 if (waitNum > 0) {
141 auto lf = [this]{ return waitNum == 0; };
142 cv.wait(sync_lock, lf);
143 }
144 assert(waitNum == 0);
145 assert(!same_tick || (nextAt == curTick()));
146 waitNum = numNodes;
147 // Complete the global synchronisation
148 header.msgType = MsgType::cmdSyncAck;
149 header.sendTick = nextAt;
150 header.syncRepeat = nextRepeat;
151 if (doCkpt || numCkptReq == numNodes) {
152 doCkpt = true;
153 header.needCkpt = ReqType::immediate;
154 numCkptReq = 0;
155 } else {
156 header.needCkpt = ReqType::none;
157 }
158 if (doExit || numExitReq == numNodes) {
159 doExit = true;
160 header.needExit = ReqType::immediate;
161 } else {
162 header.needExit = ReqType::none;
163 }
164 DistIface::master->sendCmd(header);
165}
166
167void
168DistIface::SyncSwitch::progress(Tick send_tick,
169 Tick sync_repeat,
170 ReqType need_ckpt,
171 ReqType need_exit)
172{
173 std::unique_lock<std::mutex> sync_lock(lock);
174 assert(waitNum > 0);
175
176 if (send_tick > nextAt)
177 nextAt = send_tick;
178 if (nextRepeat > sync_repeat)
179 nextRepeat = sync_repeat;
180
181 if (need_ckpt == ReqType::collective)
182 numCkptReq++;
183 else if (need_ckpt == ReqType::immediate)
184 doCkpt = true;
185 if (need_exit == ReqType::collective)
186 numExitReq++;
187 else if (need_exit == ReqType::immediate)
188 doExit = true;
189
190 waitNum--;
191 // Notify the simulation thread if the on-going sync is complete
192 if (waitNum == 0) {
193 sync_lock.unlock();
194 cv.notify_one();
195 }
196}
197
198void
199DistIface::SyncNode::progress(Tick max_send_tick,
200 Tick next_repeat,
201 ReqType do_ckpt,
202 ReqType do_exit)
203{
204 std::unique_lock<std::mutex> sync_lock(lock);
205 assert(waitNum > 0);
206
207 nextAt = max_send_tick;
208 nextRepeat = next_repeat;
209 doCkpt = (do_ckpt != ReqType::none);
210 doExit = (do_exit != ReqType::none);
211
212 waitNum--;
213 // Notify the simulation thread if the on-going sync is complete
214 if (waitNum == 0) {
215 sync_lock.unlock();
216 cv.notify_one();
217 }
218}
219
220void
221DistIface::SyncNode::requestCkpt(ReqType req)
222{
223 std::lock_guard<std::mutex> sync_lock(lock);
224 assert(req != ReqType::none);
225 if (needCkpt != ReqType::none)
226 warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
227 if (needCkpt == ReqType::none || req == ReqType::immediate)
228 needCkpt = req;
229}
230
231void
232DistIface::SyncNode::requestExit(ReqType req)
233{
234 std::lock_guard<std::mutex> sync_lock(lock);
235 assert(req != ReqType::none);
236 if (needExit != ReqType::none)
237 warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
238 if (needExit == ReqType::none || req == ReqType::immediate)
239 needExit = req;
240}
241
242void
243DistIface::Sync::drainComplete()
244{
245 if (doCkpt) {
246 // The first DistIface object called this right before writing the
247 // checkpoint. We need to drain the underlying physical network here.
248 // Note that other gem5 peers may enter this barrier at different
249 // ticks due to draining.
250 run(false);
251 // Only the "first" DistIface object has to perform the sync
252 doCkpt = false;
253 }
254}
255
256void
257DistIface::SyncNode::serialize(CheckpointOut &cp) const
258{
259 int need_exit = static_cast<int>(needExit);
260 SERIALIZE_SCALAR(need_exit);
261}
262
263void
264DistIface::SyncNode::unserialize(CheckpointIn &cp)
265{
266 int need_exit;
267 UNSERIALIZE_SCALAR(need_exit);
268 needExit = static_cast<ReqType>(need_exit);
269}
270
271void
272DistIface::SyncSwitch::serialize(CheckpointOut &cp) const
273{
274 SERIALIZE_SCALAR(numExitReq);
275}
276
277void
278DistIface::SyncSwitch::unserialize(CheckpointIn &cp)
279{
280 UNSERIALIZE_SCALAR(numExitReq);
281}
282
283void
284DistIface::SyncEvent::start()
285{
286 // Note that this may be called either from startup() or drainResume()
287
288 // At this point, all DistIface objects has already called Sync::init() so
289 // we have a local minimum of the start tick and repeat for the periodic
290 // sync.
291 Tick firstAt = DistIface::sync->firstAt;
292 repeat = DistIface::sync->nextRepeat;
293 // Do a global barrier to agree on a common repeat value (the smallest
294 // one from all participating nodes
295 DistIface::sync->run(curTick() == 0);
296
297 assert(!DistIface::sync->doCkpt);
298 assert(!DistIface::sync->doExit);
299 assert(DistIface::sync->nextAt >= curTick());
300 assert(DistIface::sync->nextRepeat <= repeat);
301
302 // if this is called at tick 0 then we use the config start param otherwise
303 // the maximum of the current tick of all participating nodes
304 if (curTick() == 0) {
305 assert(!scheduled());
306 assert(DistIface::sync->nextAt == 0);
307 schedule(firstAt);
308 } else {
309 if (scheduled())
310 reschedule(DistIface::sync->nextAt);
311 else
312 schedule(DistIface::sync->nextAt);
313 }
314 inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
315 DistIface::sync->nextRepeat);
316}
317
318void
319DistIface::SyncEvent::process()
320{
321 // We may not start a global periodic sync while draining before taking a
322 // checkpoint. This is due to the possibility that peer gem5 processes
323 // may not hit the same periodic sync before they complete draining and
324 // that would make this periodic sync clash with sync called from
325 // DistIface::serialize() by other gem5 processes.
326 // We would need a 'distributed drain' solution to eliminate this
327 // restriction.
328 // Note that if draining was not triggered by checkpointing then we are
329 // fine since no extra global sync will happen (i.e. all peer gem5 will
330 // hit this periodic sync eventually).
331 panic_if(_draining && DistIface::sync->doCkpt,
332 "Distributed sync is hit while draining");
333 /*
334 * Note that this is a global event so this process method will be called
335 * by only exactly one thread.
336 */
337 /*
338 * We hold the eventq lock at this point but the receiver thread may
339 * need the lock to schedule new recv events while waiting for the
340 * dist sync to complete.
341 * Note that the other simulation threads also release their eventq
342 * locks while waiting for us due to the global event semantics.
343 */
344 {
345 EventQueue::ScopedRelease sr(curEventQueue());
346 // we do a global sync here that is supposed to happen at the same
347 // tick in all gem5 peers
348 DistIface::sync->run(true);
349 // global sync completed
350 }
351 if (DistIface::sync->doCkpt)
352 exitSimLoop("checkpoint");
353 if (DistIface::sync->doExit)
354 exitSimLoop("exit request from gem5 peers");
355
356 // schedule the next periodic sync
357 repeat = DistIface::sync->nextRepeat;
358 schedule(curTick() + repeat);
359}
360
361void
362DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay)
363{
364 // This is called from the receiver thread when it starts running. The new
365 // receiver thread shares the event queue with the simulation thread
366 // (associated with the simulated Ethernet link).
367 curEventQueue(eventManager->eventQueue());
368
369 recvDone = recv_done;
370 linkDelay = link_delay;
371}
372
373Tick
374DistIface::RecvScheduler::calcReceiveTick(Tick send_tick,
375 Tick send_delay,
376 Tick prev_recv_tick)
377{
378 Tick recv_tick = send_tick + send_delay + linkDelay;
379 // sanity check (we need atleast a send delay long window)
380 assert(recv_tick >= prev_recv_tick + send_delay);
381 panic_if(prev_recv_tick + send_delay > recv_tick,
382 "Receive window is smaller than send delay");
383 panic_if(recv_tick <= curTick(),
384 "Simulators out of sync - missed packet receive by %llu ticks"
385 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
386 "linkDelay: %lu )",
387 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
388 linkDelay);
389
390 return recv_tick;
391}
392
393void
394DistIface::RecvScheduler::resumeRecvTicks()
395{
396 // Schedule pending packets asap in case link speed/delay changed when
397 // restoring from the checkpoint.
398 // This may be done during unserialize except that curTick() is unknown
399 // so we call this during drainResume().
400 // If we are not restoring from a checkppint then link latency could not
401 // change so we just return.
402 if (!ckptRestore)
403 return;
404
405 std::vector<Desc> v;
406 while (!descQueue.empty()) {
407 Desc d = descQueue.front();
408 descQueue.pop();
409 d.sendTick = curTick();
410 d.sendDelay = d.packet->size(); // assume 1 tick/byte max link speed
411 v.push_back(d);
412 }
413
414 for (auto &d : v)
415 descQueue.push(d);
416
417 if (recvDone->scheduled()) {
418 assert(!descQueue.empty());
419 eventManager->reschedule(recvDone, curTick());
420 } else {
421 assert(descQueue.empty() && v.empty());
422 }
423 ckptRestore = false;
424}
425
426void
427DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet,
428 Tick send_tick,
429 Tick send_delay)
430{
431 // Note : this is called from the receiver thread
432 curEventQueue()->lock();
433 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
434
435 DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
436 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
437 send_tick, send_delay, linkDelay, recv_tick);
438 // Every packet must be sent and arrive in the same quantum
439 assert(send_tick > master->syncEvent->when() -
440 master->syncEvent->repeat);
441 // No packet may be scheduled for receive in the arrival quantum
442 assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
443
444 // Now we are about to schedule a recvDone event for the new data packet.
445 // We use the same recvDone object for all incoming data packets. Packet
446 // descriptors are saved in the ordered queue. The currently scheduled
447 // packet is always on the top of the queue.
448 // NOTE: we use the event queue lock to protect the receive desc queue,
449 // too, which is accessed both by the receiver thread and the simulation
450 // thread.
451 descQueue.emplace(new_packet, send_tick, send_delay);
452 if (descQueue.size() == 1) {
453 assert(!recvDone->scheduled());
454 eventManager->schedule(recvDone, recv_tick);
455 } else {
456 assert(recvDone->scheduled());
457 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
458 "Out of order packet received (recv_tick: %lu top(): %lu\n",
459 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
460 }
461 curEventQueue()->unlock();
462}
463
464EthPacketPtr
465DistIface::RecvScheduler::popPacket()
466{
467 // Note : this is called from the simulation thread when a receive done
468 // event is being processed for the link. We assume that the thread holds
469 // the event queue queue lock when this is called!
470 EthPacketPtr next_packet = descQueue.front().packet;
471 descQueue.pop();
472
473 if (descQueue.size() > 0) {
474 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
475 descQueue.front().sendDelay,
476 curTick());
477 eventManager->schedule(recvDone, recv_tick);
478 }
479 prevRecvTick = curTick();
480 return next_packet;
481}
482
483void
484DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const
485{
486 SERIALIZE_SCALAR(sendTick);
487 SERIALIZE_SCALAR(sendDelay);
488 packet->serialize("rxPacket", cp);
489}
490
491void
492DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp)
493{
494 UNSERIALIZE_SCALAR(sendTick);
495 UNSERIALIZE_SCALAR(sendDelay);
496 packet = std::make_shared<EthPacketData>(16384);
497 packet->unserialize("rxPacket", cp);
498}
499
500void
501DistIface::RecvScheduler::serialize(CheckpointOut &cp) const
502{
503 SERIALIZE_SCALAR(prevRecvTick);
504 // serialize the receive desc queue
505 std::queue<Desc> tmp_queue(descQueue);
506 unsigned n_desc_queue = descQueue.size();
507 assert(tmp_queue.size() == descQueue.size());
508 SERIALIZE_SCALAR(n_desc_queue);
509 for (int i = 0; i < n_desc_queue; i++) {
510 tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
511 tmp_queue.pop();
512 }
513 assert(tmp_queue.empty());
514}
515
516void
517DistIface::RecvScheduler::unserialize(CheckpointIn &cp)
518{
519 assert(descQueue.size() == 0);
1/*
2 * Copyright (c) 2015 ARM Limited
3 * All rights reserved
4 *
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder. You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated
11 * unmodified and in its entirety in all distributions of the software,
12 * modified or unmodified, in source code or in binary form.
13 *
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are
16 * met: redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer;
18 * redistributions in binary form must reproduce the above copyright
19 * notice, this list of conditions and the following disclaimer in the
20 * documentation and/or other materials provided with the distribution;
21 * neither the name of the copyright holders nor the names of its
22 * contributors may be used to endorse or promote products derived from
23 * this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 * Authors: Gabor Dozsa
38 */
39
40/* @file
41 * The interface class for dist-gem5 simulations.
42 */
43
44#include "dev/net/dist_iface.hh"
45
46#include <queue>
47#include <thread>
48
49#include "base/random.hh"
50#include "base/trace.hh"
51#include "debug/DistEthernet.hh"
52#include "debug/DistEthernetPkt.hh"
53#include "dev/net/etherpkt.hh"
54#include "sim/sim_exit.hh"
55#include "sim/sim_object.hh"
56
57using namespace std;
58DistIface::Sync *DistIface::sync = nullptr;
59DistIface::SyncEvent *DistIface::syncEvent = nullptr;
60unsigned DistIface::distIfaceNum = 0;
61unsigned DistIface::recvThreadsNum = 0;
62DistIface *DistIface::master = nullptr;
63
64void
65DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
66{
67 if (start_tick < firstAt) {
68 firstAt = start_tick;
69 inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
70 }
71
72 if (repeat_tick == 0)
73 panic("Dist synchronisation interval must be greater than zero");
74
75 if (repeat_tick < nextRepeat) {
76 nextRepeat = repeat_tick;
77 inform("Dist synchronisation interval is changed to %lu.\n",
78 nextRepeat);
79 }
80}
81
82DistIface::SyncSwitch::SyncSwitch(int num_nodes)
83{
84 numNodes = num_nodes;
85 waitNum = num_nodes;
86 numExitReq = 0;
87 numCkptReq = 0;
88 doExit = false;
89 doCkpt = false;
90 firstAt = std::numeric_limits<Tick>::max();
91 nextAt = 0;
92 nextRepeat = std::numeric_limits<Tick>::max();
93}
94
95DistIface::SyncNode::SyncNode()
96{
97 waitNum = 0;
98 needExit = ReqType::none;
99 needCkpt = ReqType::none;
100 doExit = false;
101 doCkpt = false;
102 firstAt = std::numeric_limits<Tick>::max();
103 nextAt = 0;
104 nextRepeat = std::numeric_limits<Tick>::max();
105}
106
107void
108DistIface::SyncNode::run(bool same_tick)
109{
110 std::unique_lock<std::mutex> sync_lock(lock);
111 Header header;
112
113 assert(waitNum == 0);
114 waitNum = DistIface::recvThreadsNum;
115 // initiate the global synchronisation
116 header.msgType = MsgType::cmdSyncReq;
117 header.sendTick = curTick();
118 header.syncRepeat = nextRepeat;
119 header.needCkpt = needCkpt;
120 if (needCkpt != ReqType::none)
121 needCkpt = ReqType::pending;
122 header.needExit = needExit;
123 if (needExit != ReqType::none)
124 needExit = ReqType::pending;
125 DistIface::master->sendCmd(header);
126 // now wait until all receiver threads complete the synchronisation
127 auto lf = [this]{ return waitNum == 0; };
128 cv.wait(sync_lock, lf);
129 // global synchronisation is done
130 assert(!same_tick || (nextAt == curTick()));
131}
132
133
134void
135DistIface::SyncSwitch::run(bool same_tick)
136{
137 std::unique_lock<std::mutex> sync_lock(lock);
138 Header header;
139 // Wait for the sync requests from the nodes
140 if (waitNum > 0) {
141 auto lf = [this]{ return waitNum == 0; };
142 cv.wait(sync_lock, lf);
143 }
144 assert(waitNum == 0);
145 assert(!same_tick || (nextAt == curTick()));
146 waitNum = numNodes;
147 // Complete the global synchronisation
148 header.msgType = MsgType::cmdSyncAck;
149 header.sendTick = nextAt;
150 header.syncRepeat = nextRepeat;
151 if (doCkpt || numCkptReq == numNodes) {
152 doCkpt = true;
153 header.needCkpt = ReqType::immediate;
154 numCkptReq = 0;
155 } else {
156 header.needCkpt = ReqType::none;
157 }
158 if (doExit || numExitReq == numNodes) {
159 doExit = true;
160 header.needExit = ReqType::immediate;
161 } else {
162 header.needExit = ReqType::none;
163 }
164 DistIface::master->sendCmd(header);
165}
166
167void
168DistIface::SyncSwitch::progress(Tick send_tick,
169 Tick sync_repeat,
170 ReqType need_ckpt,
171 ReqType need_exit)
172{
173 std::unique_lock<std::mutex> sync_lock(lock);
174 assert(waitNum > 0);
175
176 if (send_tick > nextAt)
177 nextAt = send_tick;
178 if (nextRepeat > sync_repeat)
179 nextRepeat = sync_repeat;
180
181 if (need_ckpt == ReqType::collective)
182 numCkptReq++;
183 else if (need_ckpt == ReqType::immediate)
184 doCkpt = true;
185 if (need_exit == ReqType::collective)
186 numExitReq++;
187 else if (need_exit == ReqType::immediate)
188 doExit = true;
189
190 waitNum--;
191 // Notify the simulation thread if the on-going sync is complete
192 if (waitNum == 0) {
193 sync_lock.unlock();
194 cv.notify_one();
195 }
196}
197
198void
199DistIface::SyncNode::progress(Tick max_send_tick,
200 Tick next_repeat,
201 ReqType do_ckpt,
202 ReqType do_exit)
203{
204 std::unique_lock<std::mutex> sync_lock(lock);
205 assert(waitNum > 0);
206
207 nextAt = max_send_tick;
208 nextRepeat = next_repeat;
209 doCkpt = (do_ckpt != ReqType::none);
210 doExit = (do_exit != ReqType::none);
211
212 waitNum--;
213 // Notify the simulation thread if the on-going sync is complete
214 if (waitNum == 0) {
215 sync_lock.unlock();
216 cv.notify_one();
217 }
218}
219
220void
221DistIface::SyncNode::requestCkpt(ReqType req)
222{
223 std::lock_guard<std::mutex> sync_lock(lock);
224 assert(req != ReqType::none);
225 if (needCkpt != ReqType::none)
226 warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
227 if (needCkpt == ReqType::none || req == ReqType::immediate)
228 needCkpt = req;
229}
230
231void
232DistIface::SyncNode::requestExit(ReqType req)
233{
234 std::lock_guard<std::mutex> sync_lock(lock);
235 assert(req != ReqType::none);
236 if (needExit != ReqType::none)
237 warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
238 if (needExit == ReqType::none || req == ReqType::immediate)
239 needExit = req;
240}
241
242void
243DistIface::Sync::drainComplete()
244{
245 if (doCkpt) {
246 // The first DistIface object called this right before writing the
247 // checkpoint. We need to drain the underlying physical network here.
248 // Note that other gem5 peers may enter this barrier at different
249 // ticks due to draining.
250 run(false);
251 // Only the "first" DistIface object has to perform the sync
252 doCkpt = false;
253 }
254}
255
256void
257DistIface::SyncNode::serialize(CheckpointOut &cp) const
258{
259 int need_exit = static_cast<int>(needExit);
260 SERIALIZE_SCALAR(need_exit);
261}
262
263void
264DistIface::SyncNode::unserialize(CheckpointIn &cp)
265{
266 int need_exit;
267 UNSERIALIZE_SCALAR(need_exit);
268 needExit = static_cast<ReqType>(need_exit);
269}
270
271void
272DistIface::SyncSwitch::serialize(CheckpointOut &cp) const
273{
274 SERIALIZE_SCALAR(numExitReq);
275}
276
277void
278DistIface::SyncSwitch::unserialize(CheckpointIn &cp)
279{
280 UNSERIALIZE_SCALAR(numExitReq);
281}
282
283void
284DistIface::SyncEvent::start()
285{
286 // Note that this may be called either from startup() or drainResume()
287
288 // At this point, all DistIface objects has already called Sync::init() so
289 // we have a local minimum of the start tick and repeat for the periodic
290 // sync.
291 Tick firstAt = DistIface::sync->firstAt;
292 repeat = DistIface::sync->nextRepeat;
293 // Do a global barrier to agree on a common repeat value (the smallest
294 // one from all participating nodes
295 DistIface::sync->run(curTick() == 0);
296
297 assert(!DistIface::sync->doCkpt);
298 assert(!DistIface::sync->doExit);
299 assert(DistIface::sync->nextAt >= curTick());
300 assert(DistIface::sync->nextRepeat <= repeat);
301
302 // if this is called at tick 0 then we use the config start param otherwise
303 // the maximum of the current tick of all participating nodes
304 if (curTick() == 0) {
305 assert(!scheduled());
306 assert(DistIface::sync->nextAt == 0);
307 schedule(firstAt);
308 } else {
309 if (scheduled())
310 reschedule(DistIface::sync->nextAt);
311 else
312 schedule(DistIface::sync->nextAt);
313 }
314 inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
315 DistIface::sync->nextRepeat);
316}
317
318void
319DistIface::SyncEvent::process()
320{
321 // We may not start a global periodic sync while draining before taking a
322 // checkpoint. This is due to the possibility that peer gem5 processes
323 // may not hit the same periodic sync before they complete draining and
324 // that would make this periodic sync clash with sync called from
325 // DistIface::serialize() by other gem5 processes.
326 // We would need a 'distributed drain' solution to eliminate this
327 // restriction.
328 // Note that if draining was not triggered by checkpointing then we are
329 // fine since no extra global sync will happen (i.e. all peer gem5 will
330 // hit this periodic sync eventually).
331 panic_if(_draining && DistIface::sync->doCkpt,
332 "Distributed sync is hit while draining");
333 /*
334 * Note that this is a global event so this process method will be called
335 * by only exactly one thread.
336 */
337 /*
338 * We hold the eventq lock at this point but the receiver thread may
339 * need the lock to schedule new recv events while waiting for the
340 * dist sync to complete.
341 * Note that the other simulation threads also release their eventq
342 * locks while waiting for us due to the global event semantics.
343 */
344 {
345 EventQueue::ScopedRelease sr(curEventQueue());
346 // we do a global sync here that is supposed to happen at the same
347 // tick in all gem5 peers
348 DistIface::sync->run(true);
349 // global sync completed
350 }
351 if (DistIface::sync->doCkpt)
352 exitSimLoop("checkpoint");
353 if (DistIface::sync->doExit)
354 exitSimLoop("exit request from gem5 peers");
355
356 // schedule the next periodic sync
357 repeat = DistIface::sync->nextRepeat;
358 schedule(curTick() + repeat);
359}
360
361void
362DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay)
363{
364 // This is called from the receiver thread when it starts running. The new
365 // receiver thread shares the event queue with the simulation thread
366 // (associated with the simulated Ethernet link).
367 curEventQueue(eventManager->eventQueue());
368
369 recvDone = recv_done;
370 linkDelay = link_delay;
371}
372
373Tick
374DistIface::RecvScheduler::calcReceiveTick(Tick send_tick,
375 Tick send_delay,
376 Tick prev_recv_tick)
377{
378 Tick recv_tick = send_tick + send_delay + linkDelay;
379 // sanity check (we need atleast a send delay long window)
380 assert(recv_tick >= prev_recv_tick + send_delay);
381 panic_if(prev_recv_tick + send_delay > recv_tick,
382 "Receive window is smaller than send delay");
383 panic_if(recv_tick <= curTick(),
384 "Simulators out of sync - missed packet receive by %llu ticks"
385 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
386 "linkDelay: %lu )",
387 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
388 linkDelay);
389
390 return recv_tick;
391}
392
393void
394DistIface::RecvScheduler::resumeRecvTicks()
395{
396 // Schedule pending packets asap in case link speed/delay changed when
397 // restoring from the checkpoint.
398 // This may be done during unserialize except that curTick() is unknown
399 // so we call this during drainResume().
400 // If we are not restoring from a checkppint then link latency could not
401 // change so we just return.
402 if (!ckptRestore)
403 return;
404
405 std::vector<Desc> v;
406 while (!descQueue.empty()) {
407 Desc d = descQueue.front();
408 descQueue.pop();
409 d.sendTick = curTick();
410 d.sendDelay = d.packet->size(); // assume 1 tick/byte max link speed
411 v.push_back(d);
412 }
413
414 for (auto &d : v)
415 descQueue.push(d);
416
417 if (recvDone->scheduled()) {
418 assert(!descQueue.empty());
419 eventManager->reschedule(recvDone, curTick());
420 } else {
421 assert(descQueue.empty() && v.empty());
422 }
423 ckptRestore = false;
424}
425
426void
427DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet,
428 Tick send_tick,
429 Tick send_delay)
430{
431 // Note : this is called from the receiver thread
432 curEventQueue()->lock();
433 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
434
435 DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
436 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
437 send_tick, send_delay, linkDelay, recv_tick);
438 // Every packet must be sent and arrive in the same quantum
439 assert(send_tick > master->syncEvent->when() -
440 master->syncEvent->repeat);
441 // No packet may be scheduled for receive in the arrival quantum
442 assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
443
444 // Now we are about to schedule a recvDone event for the new data packet.
445 // We use the same recvDone object for all incoming data packets. Packet
446 // descriptors are saved in the ordered queue. The currently scheduled
447 // packet is always on the top of the queue.
448 // NOTE: we use the event queue lock to protect the receive desc queue,
449 // too, which is accessed both by the receiver thread and the simulation
450 // thread.
451 descQueue.emplace(new_packet, send_tick, send_delay);
452 if (descQueue.size() == 1) {
453 assert(!recvDone->scheduled());
454 eventManager->schedule(recvDone, recv_tick);
455 } else {
456 assert(recvDone->scheduled());
457 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
458 "Out of order packet received (recv_tick: %lu top(): %lu\n",
459 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
460 }
461 curEventQueue()->unlock();
462}
463
464EthPacketPtr
465DistIface::RecvScheduler::popPacket()
466{
467 // Note : this is called from the simulation thread when a receive done
468 // event is being processed for the link. We assume that the thread holds
469 // the event queue queue lock when this is called!
470 EthPacketPtr next_packet = descQueue.front().packet;
471 descQueue.pop();
472
473 if (descQueue.size() > 0) {
474 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
475 descQueue.front().sendDelay,
476 curTick());
477 eventManager->schedule(recvDone, recv_tick);
478 }
479 prevRecvTick = curTick();
480 return next_packet;
481}
482
483void
484DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const
485{
486 SERIALIZE_SCALAR(sendTick);
487 SERIALIZE_SCALAR(sendDelay);
488 packet->serialize("rxPacket", cp);
489}
490
491void
492DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp)
493{
494 UNSERIALIZE_SCALAR(sendTick);
495 UNSERIALIZE_SCALAR(sendDelay);
496 packet = std::make_shared<EthPacketData>(16384);
497 packet->unserialize("rxPacket", cp);
498}
499
500void
501DistIface::RecvScheduler::serialize(CheckpointOut &cp) const
502{
503 SERIALIZE_SCALAR(prevRecvTick);
504 // serialize the receive desc queue
505 std::queue<Desc> tmp_queue(descQueue);
506 unsigned n_desc_queue = descQueue.size();
507 assert(tmp_queue.size() == descQueue.size());
508 SERIALIZE_SCALAR(n_desc_queue);
509 for (int i = 0; i < n_desc_queue; i++) {
510 tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
511 tmp_queue.pop();
512 }
513 assert(tmp_queue.empty());
514}
515
516void
517DistIface::RecvScheduler::unserialize(CheckpointIn &cp)
518{
519 assert(descQueue.size() == 0);
520 assert(recvDone->scheduled() == false);
521 assert(ckptRestore == false);
520 assert(!recvDone->scheduled());
521 assert(!ckptRestore);
522
523 UNSERIALIZE_SCALAR(prevRecvTick);
524 // unserialize the receive desc queue
525 unsigned n_desc_queue;
526 UNSERIALIZE_SCALAR(n_desc_queue);
527 for (int i = 0; i < n_desc_queue; i++) {
528 Desc recv_desc;
529 recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
530 descQueue.push(recv_desc);
531 }
532 ckptRestore = true;
533}
534
535DistIface::DistIface(unsigned dist_rank,
536 unsigned dist_size,
537 Tick sync_start,
538 Tick sync_repeat,
539 EventManager *em,
540 bool is_switch, int num_nodes) :
541 syncStart(sync_start), syncRepeat(sync_repeat),
542 recvThread(nullptr), recvScheduler(em),
543 rank(dist_rank), size(dist_size)
544{
545 DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
546 isMaster = false;
547 if (master == nullptr) {
548 assert(sync == nullptr);
549 assert(syncEvent == nullptr);
550 if (is_switch)
551 sync = new SyncSwitch(num_nodes);
552 else
553 sync = new SyncNode();
554 syncEvent = new SyncEvent();
555 master = this;
556 isMaster = true;
557 }
558 distIfaceId = distIfaceNum;
559 distIfaceNum++;
560}
561
562DistIface::~DistIface()
563{
564 assert(recvThread);
565 delete recvThread;
566 if (this == master) {
567 assert(syncEvent);
568 delete syncEvent;
569 assert(sync);
570 delete sync;
571 master = nullptr;
572 }
573}
574
575void
576DistIface::packetOut(EthPacketPtr pkt, Tick send_delay)
577{
578 Header header;
579
580 // Prepare a dist header packet for the Ethernet packet we want to
581 // send out.
582 header.msgType = MsgType::dataDescriptor;
583 header.sendTick = curTick();
584 header.sendDelay = send_delay;
585
586 header.dataPacketLength = pkt->size();
587
588 // Send out the packet and the meta info.
589 sendPacket(header, pkt);
590
591 DPRINTF(DistEthernetPkt,
592 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
593 pkt->size(), send_delay);
594}
595
596void
597DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
598{
599 EthPacketPtr new_packet;
600 DistHeaderPkt::Header header;
601
602 // Initialize receive scheduler parameters
603 recvScheduler.init(recv_done, link_delay);
604
605 // Main loop to wait for and process any incoming message.
606 for (;;) {
607 // recvHeader() blocks until the next dist header packet comes in.
608 if (!recvHeader(header)) {
609 // We lost connection to the peer gem5 processes most likely
610 // because one of them called m5 exit. So we stop here.
611 // Grab the eventq lock to stop the simulation thread
612 curEventQueue()->lock();
613 exit_message("info",
614 0,
615 "Message server closed connection, "
616 "simulation is exiting");
617 }
618
619 // We got a valid dist header packet, let's process it
620 if (header.msgType == MsgType::dataDescriptor) {
621 recvPacket(header, new_packet);
622 recvScheduler.pushPacket(new_packet,
623 header.sendTick,
624 header.sendDelay);
625 } else {
626 // everything else must be synchronisation related command
627 sync->progress(header.sendTick,
628 header.syncRepeat,
629 header.needCkpt,
630 header.needExit);
631 }
632 }
633}
634
635void
636DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
637{
638 assert(recvThread == nullptr);
639
640 recvThread = new std::thread(&DistIface::recvThreadFunc,
641 this,
642 const_cast<Event *>(recv_done),
643 link_delay);
644 recvThreadsNum++;
645}
646
647DrainState
648DistIface::drain()
649{
650 DPRINTF(DistEthernet,"DistIFace::drain() called\n");
651 // This can be called multiple times in the same drain cycle.
652 if (this == master)
653 syncEvent->draining(true);
654 return DrainState::Drained;
655}
656
657void
658DistIface::drainResume() {
659 DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
660 if (this == master)
661 syncEvent->draining(false);
662 recvScheduler.resumeRecvTicks();
663}
664
665void
666DistIface::serialize(CheckpointOut &cp) const
667{
668 // Drain the dist interface before the checkpoint is taken. We cannot call
669 // this as part of the normal drain cycle because this dist sync has to be
670 // called exactly once after the system is fully drained.
671 sync->drainComplete();
672
673 unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
674
675 SERIALIZE_SCALAR(rank_orig);
676 SERIALIZE_SCALAR(dist_iface_id_orig);
677
678 recvScheduler.serializeSection(cp, "recvScheduler");
679 if (this == master) {
680 sync->serializeSection(cp, "Sync");
681 }
682}
683
684void
685DistIface::unserialize(CheckpointIn &cp)
686{
687 unsigned rank_orig, dist_iface_id_orig;
688 UNSERIALIZE_SCALAR(rank_orig);
689 UNSERIALIZE_SCALAR(dist_iface_id_orig);
690
691 panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
692 rank, rank_orig);
693 panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
694 "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
695 dist_iface_id_orig);
696
697 recvScheduler.unserializeSection(cp, "recvScheduler");
698 if (this == master) {
699 sync->unserializeSection(cp, "Sync");
700 }
701}
702
703void
704DistIface::init(const Event *done_event, Tick link_delay)
705{
706 // Init hook for the underlaying message transport to setup/finalize
707 // communication channels
708 initTransport();
709
710 // Spawn a new receiver thread that will process messages
711 // coming in from peer gem5 processes.
712 // The receive thread will also schedule a (receive) doneEvent
713 // for each incoming data packet.
714 spawnRecvThread(done_event, link_delay);
715
716
717 // Adjust the periodic sync start and interval. Different DistIface
718 // might have different requirements. The singleton sync object
719 // will select the minimum values for both params.
720 assert(sync != nullptr);
721 sync->init(syncStart, syncRepeat);
722
723 // Initialize the seed for random generator to avoid the same sequence
724 // in all gem5 peer processes
725 assert(master != nullptr);
726 if (this == master)
727 random_mt.init(5489 * (rank+1) + 257);
728}
729
730void
731DistIface::startup()
732{
733 DPRINTF(DistEthernet, "DistIface::startup() started\n");
734 if (this == master)
735 syncEvent->start();
736 DPRINTF(DistEthernet, "DistIface::startup() done\n");
737}
738
739bool
740DistIface::readyToCkpt(Tick delay, Tick period)
741{
742 bool ret = true;
743 DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
744 "period:%lu\n", delay, period);
745 if (master) {
746 if (delay == 0) {
747 inform("m5 checkpoint called with zero delay => triggering collaborative "
748 "checkpoint\n");
749 sync->requestCkpt(ReqType::collective);
750 } else {
751 inform("m5 checkpoint called with non-zero delay => triggering immediate "
752 "checkpoint (at the next sync)\n");
753 sync->requestCkpt(ReqType::immediate);
754 }
755 if (period != 0)
756 inform("Non-zero period for m5_ckpt is ignored in "
757 "distributed gem5 runs\n");
758 ret = false;
759 }
760 return ret;
761}
762
763bool
764DistIface::readyToExit(Tick delay)
765{
766 bool ret = true;
767 DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
768 delay);
769 if (master) {
770 if (delay == 0) {
771 inform("m5 exit called with zero delay => triggering collaborative "
772 "exit\n");
773 sync->requestExit(ReqType::collective);
774 } else {
775 inform("m5 exit called with non-zero delay => triggering immediate "
776 "exit (at the next sync)\n");
777 sync->requestExit(ReqType::immediate);
778 }
779 ret = false;
780 }
781 return ret;
782}
783
784uint64_t
785DistIface::rankParam()
786{
787 uint64_t val;
788 if (master) {
789 val = master->rank;
790 } else {
791 warn("Dist-rank parameter is queried in single gem5 simulation.");
792 val = 0;
793 }
794 return val;
795}
796
797uint64_t
798DistIface::sizeParam()
799{
800 uint64_t val;
801 if (master) {
802 val = master->size;
803 } else {
804 warn("Dist-size parameter is queried in single gem5 simulation.");
805 val = 1;
806 }
807 return val;
808}
522
523 UNSERIALIZE_SCALAR(prevRecvTick);
524 // unserialize the receive desc queue
525 unsigned n_desc_queue;
526 UNSERIALIZE_SCALAR(n_desc_queue);
527 for (int i = 0; i < n_desc_queue; i++) {
528 Desc recv_desc;
529 recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
530 descQueue.push(recv_desc);
531 }
532 ckptRestore = true;
533}
534
535DistIface::DistIface(unsigned dist_rank,
536 unsigned dist_size,
537 Tick sync_start,
538 Tick sync_repeat,
539 EventManager *em,
540 bool is_switch, int num_nodes) :
541 syncStart(sync_start), syncRepeat(sync_repeat),
542 recvThread(nullptr), recvScheduler(em),
543 rank(dist_rank), size(dist_size)
544{
545 DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
546 isMaster = false;
547 if (master == nullptr) {
548 assert(sync == nullptr);
549 assert(syncEvent == nullptr);
550 if (is_switch)
551 sync = new SyncSwitch(num_nodes);
552 else
553 sync = new SyncNode();
554 syncEvent = new SyncEvent();
555 master = this;
556 isMaster = true;
557 }
558 distIfaceId = distIfaceNum;
559 distIfaceNum++;
560}
561
562DistIface::~DistIface()
563{
564 assert(recvThread);
565 delete recvThread;
566 if (this == master) {
567 assert(syncEvent);
568 delete syncEvent;
569 assert(sync);
570 delete sync;
571 master = nullptr;
572 }
573}
574
575void
576DistIface::packetOut(EthPacketPtr pkt, Tick send_delay)
577{
578 Header header;
579
580 // Prepare a dist header packet for the Ethernet packet we want to
581 // send out.
582 header.msgType = MsgType::dataDescriptor;
583 header.sendTick = curTick();
584 header.sendDelay = send_delay;
585
586 header.dataPacketLength = pkt->size();
587
588 // Send out the packet and the meta info.
589 sendPacket(header, pkt);
590
591 DPRINTF(DistEthernetPkt,
592 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
593 pkt->size(), send_delay);
594}
595
596void
597DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
598{
599 EthPacketPtr new_packet;
600 DistHeaderPkt::Header header;
601
602 // Initialize receive scheduler parameters
603 recvScheduler.init(recv_done, link_delay);
604
605 // Main loop to wait for and process any incoming message.
606 for (;;) {
607 // recvHeader() blocks until the next dist header packet comes in.
608 if (!recvHeader(header)) {
609 // We lost connection to the peer gem5 processes most likely
610 // because one of them called m5 exit. So we stop here.
611 // Grab the eventq lock to stop the simulation thread
612 curEventQueue()->lock();
613 exit_message("info",
614 0,
615 "Message server closed connection, "
616 "simulation is exiting");
617 }
618
619 // We got a valid dist header packet, let's process it
620 if (header.msgType == MsgType::dataDescriptor) {
621 recvPacket(header, new_packet);
622 recvScheduler.pushPacket(new_packet,
623 header.sendTick,
624 header.sendDelay);
625 } else {
626 // everything else must be synchronisation related command
627 sync->progress(header.sendTick,
628 header.syncRepeat,
629 header.needCkpt,
630 header.needExit);
631 }
632 }
633}
634
635void
636DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
637{
638 assert(recvThread == nullptr);
639
640 recvThread = new std::thread(&DistIface::recvThreadFunc,
641 this,
642 const_cast<Event *>(recv_done),
643 link_delay);
644 recvThreadsNum++;
645}
646
647DrainState
648DistIface::drain()
649{
650 DPRINTF(DistEthernet,"DistIFace::drain() called\n");
651 // This can be called multiple times in the same drain cycle.
652 if (this == master)
653 syncEvent->draining(true);
654 return DrainState::Drained;
655}
656
657void
658DistIface::drainResume() {
659 DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
660 if (this == master)
661 syncEvent->draining(false);
662 recvScheduler.resumeRecvTicks();
663}
664
665void
666DistIface::serialize(CheckpointOut &cp) const
667{
668 // Drain the dist interface before the checkpoint is taken. We cannot call
669 // this as part of the normal drain cycle because this dist sync has to be
670 // called exactly once after the system is fully drained.
671 sync->drainComplete();
672
673 unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
674
675 SERIALIZE_SCALAR(rank_orig);
676 SERIALIZE_SCALAR(dist_iface_id_orig);
677
678 recvScheduler.serializeSection(cp, "recvScheduler");
679 if (this == master) {
680 sync->serializeSection(cp, "Sync");
681 }
682}
683
684void
685DistIface::unserialize(CheckpointIn &cp)
686{
687 unsigned rank_orig, dist_iface_id_orig;
688 UNSERIALIZE_SCALAR(rank_orig);
689 UNSERIALIZE_SCALAR(dist_iface_id_orig);
690
691 panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
692 rank, rank_orig);
693 panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
694 "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
695 dist_iface_id_orig);
696
697 recvScheduler.unserializeSection(cp, "recvScheduler");
698 if (this == master) {
699 sync->unserializeSection(cp, "Sync");
700 }
701}
702
703void
704DistIface::init(const Event *done_event, Tick link_delay)
705{
706 // Init hook for the underlaying message transport to setup/finalize
707 // communication channels
708 initTransport();
709
710 // Spawn a new receiver thread that will process messages
711 // coming in from peer gem5 processes.
712 // The receive thread will also schedule a (receive) doneEvent
713 // for each incoming data packet.
714 spawnRecvThread(done_event, link_delay);
715
716
717 // Adjust the periodic sync start and interval. Different DistIface
718 // might have different requirements. The singleton sync object
719 // will select the minimum values for both params.
720 assert(sync != nullptr);
721 sync->init(syncStart, syncRepeat);
722
723 // Initialize the seed for random generator to avoid the same sequence
724 // in all gem5 peer processes
725 assert(master != nullptr);
726 if (this == master)
727 random_mt.init(5489 * (rank+1) + 257);
728}
729
730void
731DistIface::startup()
732{
733 DPRINTF(DistEthernet, "DistIface::startup() started\n");
734 if (this == master)
735 syncEvent->start();
736 DPRINTF(DistEthernet, "DistIface::startup() done\n");
737}
738
739bool
740DistIface::readyToCkpt(Tick delay, Tick period)
741{
742 bool ret = true;
743 DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
744 "period:%lu\n", delay, period);
745 if (master) {
746 if (delay == 0) {
747 inform("m5 checkpoint called with zero delay => triggering collaborative "
748 "checkpoint\n");
749 sync->requestCkpt(ReqType::collective);
750 } else {
751 inform("m5 checkpoint called with non-zero delay => triggering immediate "
752 "checkpoint (at the next sync)\n");
753 sync->requestCkpt(ReqType::immediate);
754 }
755 if (period != 0)
756 inform("Non-zero period for m5_ckpt is ignored in "
757 "distributed gem5 runs\n");
758 ret = false;
759 }
760 return ret;
761}
762
763bool
764DistIface::readyToExit(Tick delay)
765{
766 bool ret = true;
767 DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
768 delay);
769 if (master) {
770 if (delay == 0) {
771 inform("m5 exit called with zero delay => triggering collaborative "
772 "exit\n");
773 sync->requestExit(ReqType::collective);
774 } else {
775 inform("m5 exit called with non-zero delay => triggering immediate "
776 "exit (at the next sync)\n");
777 sync->requestExit(ReqType::immediate);
778 }
779 ret = false;
780 }
781 return ret;
782}
783
784uint64_t
785DistIface::rankParam()
786{
787 uint64_t val;
788 if (master) {
789 val = master->rank;
790 } else {
791 warn("Dist-rank parameter is queried in single gem5 simulation.");
792 val = 0;
793 }
794 return val;
795}
796
797uint64_t
798DistIface::sizeParam()
799{
800 uint64_t val;
801 if (master) {
802 val = master->size;
803 } else {
804 warn("Dist-size parameter is queried in single gem5 simulation.");
805 val = 1;
806 }
807 return val;
808}