dist_iface.cc (11703:08b78e0a3717) dist_iface.cc (11757:78ef8daecd81)
1/*
1/*
2 * Copyright (c) 2015 ARM Limited
2 * Copyright (c) 2015-2016 ARM Limited
3 * All rights reserved
4 *
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder. You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated

--- 67 unchanged lines hidden (view full) ---

78
79 if (repeat_tick < nextRepeat) {
80 nextRepeat = repeat_tick;
81 inform("Dist synchronisation interval is changed to %lu.\n",
82 nextRepeat);
83 }
84}
85
3 * All rights reserved
4 *
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder. You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated

--- 67 unchanged lines hidden (view full) ---

78
79 if (repeat_tick < nextRepeat) {
80 nextRepeat = repeat_tick;
81 inform("Dist synchronisation interval is changed to %lu.\n",
82 nextRepeat);
83 }
84}
85
86void
87DistIface::Sync::abort()
88{
89 std::unique_lock<std::mutex> sync_lock(lock);
90 waitNum = 0;
91 isAbort = true;
92 sync_lock.unlock();
93 cv.notify_one();
94}
95
86DistIface::SyncSwitch::SyncSwitch(int num_nodes)
87{
88 numNodes = num_nodes;
89 waitNum = num_nodes;
90 numExitReq = 0;
91 numCkptReq = 0;
92 numStopSyncReq = 0;
93 doExit = false;
94 doCkpt = false;
95 doStopSync = false;
96 nextAt = std::numeric_limits<Tick>::max();
97 nextRepeat = std::numeric_limits<Tick>::max();
96DistIface::SyncSwitch::SyncSwitch(int num_nodes)
97{
98 numNodes = num_nodes;
99 waitNum = num_nodes;
100 numExitReq = 0;
101 numCkptReq = 0;
102 numStopSyncReq = 0;
103 doExit = false;
104 doCkpt = false;
105 doStopSync = false;
106 nextAt = std::numeric_limits<Tick>::max();
107 nextRepeat = std::numeric_limits<Tick>::max();
108 isAbort = false;
98}
99
100DistIface::SyncNode::SyncNode()
101{
102 waitNum = 0;
103 needExit = ReqType::none;
104 needCkpt = ReqType::none;
105 needStopSync = ReqType::none;
106 doExit = false;
107 doCkpt = false;
108 doStopSync = false;
109 nextAt = std::numeric_limits<Tick>::max();
110 nextRepeat = std::numeric_limits<Tick>::max();
109}
110
111DistIface::SyncNode::SyncNode()
112{
113 waitNum = 0;
114 needExit = ReqType::none;
115 needCkpt = ReqType::none;
116 needStopSync = ReqType::none;
117 doExit = false;
118 doCkpt = false;
119 doStopSync = false;
120 nextAt = std::numeric_limits<Tick>::max();
121 nextRepeat = std::numeric_limits<Tick>::max();
122 isAbort = false;
111}
112
123}
124
113void
125bool
114DistIface::SyncNode::run(bool same_tick)
115{
116 std::unique_lock<std::mutex> sync_lock(lock);
117 Header header;
118
119 assert(waitNum == 0);
126DistIface::SyncNode::run(bool same_tick)
127{
128 std::unique_lock<std::mutex> sync_lock(lock);
129 Header header;
130
131 assert(waitNum == 0);
132 assert(!isAbort);
120 waitNum = DistIface::recvThreadsNum;
121 // initiate the global synchronisation
122 header.msgType = MsgType::cmdSyncReq;
123 header.sendTick = curTick();
124 header.syncRepeat = nextRepeat;
125 header.needCkpt = needCkpt;
126 header.needStopSync = needStopSync;
127 if (needCkpt != ReqType::none)
128 needCkpt = ReqType::pending;
129 header.needExit = needExit;
130 if (needExit != ReqType::none)
131 needExit = ReqType::pending;
132 if (needStopSync != ReqType::none)
133 needStopSync = ReqType::pending;
134 DistIface::master->sendCmd(header);
135 // now wait until all receiver threads complete the synchronisation
136 auto lf = [this]{ return waitNum == 0; };
137 cv.wait(sync_lock, lf);
133 waitNum = DistIface::recvThreadsNum;
134 // initiate the global synchronisation
135 header.msgType = MsgType::cmdSyncReq;
136 header.sendTick = curTick();
137 header.syncRepeat = nextRepeat;
138 header.needCkpt = needCkpt;
139 header.needStopSync = needStopSync;
140 if (needCkpt != ReqType::none)
141 needCkpt = ReqType::pending;
142 header.needExit = needExit;
143 if (needExit != ReqType::none)
144 needExit = ReqType::pending;
145 if (needStopSync != ReqType::none)
146 needStopSync = ReqType::pending;
147 DistIface::master->sendCmd(header);
148 // now wait until all receiver threads complete the synchronisation
149 auto lf = [this]{ return waitNum == 0; };
150 cv.wait(sync_lock, lf);
138 // global synchronisation is done
139 assert(!same_tick || (nextAt == curTick()));
151 // global synchronisation is done.
152 assert(isAbort || !same_tick || (nextAt == curTick()));
153 return !isAbort;
140}
141
142
154}
155
156
143void
157bool
144DistIface::SyncSwitch::run(bool same_tick)
145{
146 std::unique_lock<std::mutex> sync_lock(lock);
147 Header header;
148 // Wait for the sync requests from the nodes
149 if (waitNum > 0) {
150 auto lf = [this]{ return waitNum == 0; };
151 cv.wait(sync_lock, lf);
152 }
153 assert(waitNum == 0);
158DistIface::SyncSwitch::run(bool same_tick)
159{
160 std::unique_lock<std::mutex> sync_lock(lock);
161 Header header;
162 // Wait for the sync requests from the nodes
163 if (waitNum > 0) {
164 auto lf = [this]{ return waitNum == 0; };
165 cv.wait(sync_lock, lf);
166 }
167 assert(waitNum == 0);
168 if (isAbort) // sync aborted
169 return false;
154 assert(!same_tick || (nextAt == curTick()));
155 waitNum = numNodes;
156 // Complete the global synchronisation
157 header.msgType = MsgType::cmdSyncAck;
158 header.sendTick = nextAt;
159 header.syncRepeat = nextRepeat;
160 if (doCkpt || numCkptReq == numNodes) {
161 doCkpt = true;

--- 11 unchanged lines hidden (view full) ---

173 if (doStopSync || numStopSyncReq == numNodes) {
174 doStopSync = true;
175 numStopSyncReq = 0;
176 header.needStopSync = ReqType::immediate;
177 } else {
178 header.needStopSync = ReqType::none;
179 }
180 DistIface::master->sendCmd(header);
170 assert(!same_tick || (nextAt == curTick()));
171 waitNum = numNodes;
172 // Complete the global synchronisation
173 header.msgType = MsgType::cmdSyncAck;
174 header.sendTick = nextAt;
175 header.syncRepeat = nextRepeat;
176 if (doCkpt || numCkptReq == numNodes) {
177 doCkpt = true;

--- 11 unchanged lines hidden (view full) ---

189 if (doStopSync || numStopSyncReq == numNodes) {
190 doStopSync = true;
191 numStopSyncReq = 0;
192 header.needStopSync = ReqType::immediate;
193 } else {
194 header.needStopSync = ReqType::none;
195 }
196 DistIface::master->sendCmd(header);
197 return true;
181}
182
198}
199
183void
200bool
184DistIface::SyncSwitch::progress(Tick send_tick,
185 Tick sync_repeat,
186 ReqType need_ckpt,
187 ReqType need_exit,
188 ReqType need_stop_sync)
189{
190 std::unique_lock<std::mutex> sync_lock(lock);
201DistIface::SyncSwitch::progress(Tick send_tick,
202 Tick sync_repeat,
203 ReqType need_ckpt,
204 ReqType need_exit,
205 ReqType need_stop_sync)
206{
207 std::unique_lock<std::mutex> sync_lock(lock);
208 if (isAbort) // sync aborted
209 return false;
191 assert(waitNum > 0);
192
193 if (send_tick > nextAt)
194 nextAt = send_tick;
195 if (nextRepeat > sync_repeat)
196 nextRepeat = sync_repeat;
197
198 if (need_ckpt == ReqType::collective)

--- 10 unchanged lines hidden (view full) ---

209 doStopSync = true;
210
211 waitNum--;
212 // Notify the simulation thread if the on-going sync is complete
213 if (waitNum == 0) {
214 sync_lock.unlock();
215 cv.notify_one();
216 }
210 assert(waitNum > 0);
211
212 if (send_tick > nextAt)
213 nextAt = send_tick;
214 if (nextRepeat > sync_repeat)
215 nextRepeat = sync_repeat;
216
217 if (need_ckpt == ReqType::collective)

--- 10 unchanged lines hidden (view full) ---

228 doStopSync = true;
229
230 waitNum--;
231 // Notify the simulation thread if the on-going sync is complete
232 if (waitNum == 0) {
233 sync_lock.unlock();
234 cv.notify_one();
235 }
236 // The receive thread must keep alive in the switch until the node
237 // closes the connection. Thus, we always return true here.
238 return true;
217}
218
239}
240
219void
241bool
220DistIface::SyncNode::progress(Tick max_send_tick,
221 Tick next_repeat,
222 ReqType do_ckpt,
223 ReqType do_exit,
224 ReqType do_stop_sync)
225{
226 std::unique_lock<std::mutex> sync_lock(lock);
242DistIface::SyncNode::progress(Tick max_send_tick,
243 Tick next_repeat,
244 ReqType do_ckpt,
245 ReqType do_exit,
246 ReqType do_stop_sync)
247{
248 std::unique_lock<std::mutex> sync_lock(lock);
249 if (isAbort) // sync aborted
250 return false;
227 assert(waitNum > 0);
228
229 nextAt = max_send_tick;
230 nextRepeat = next_repeat;
231 doCkpt = (do_ckpt != ReqType::none);
232 doExit = (do_exit != ReqType::none);
233 doStopSync = (do_stop_sync != ReqType::none);
234
235 waitNum--;
236 // Notify the simulation thread if the on-going sync is complete
237 if (waitNum == 0) {
238 sync_lock.unlock();
239 cv.notify_one();
240 }
251 assert(waitNum > 0);
252
253 nextAt = max_send_tick;
254 nextRepeat = next_repeat;
255 doCkpt = (do_ckpt != ReqType::none);
256 doExit = (do_exit != ReqType::none);
257 doStopSync = (do_stop_sync != ReqType::none);
258
259 waitNum--;
260 // Notify the simulation thread if the on-going sync is complete
261 if (waitNum == 0) {
262 sync_lock.unlock();
263 cv.notify_one();
264 }
265 // The receive thread must finish when simulation is about to exit
266 return !doExit;
241}
242
243void
244DistIface::SyncNode::requestCkpt(ReqType req)
245{
246 std::lock_guard<std::mutex> sync_lock(lock);
247 assert(req != ReqType::none);
248 if (needCkpt != ReqType::none)

--- 60 unchanged lines hidden (view full) ---

309 // Note that this may be called either from startup() or drainResume()
310
311 // At this point, all DistIface objects has already called Sync::init() so
312 // we have a local minimum of the start tick and repeat for the periodic
313 // sync.
314 repeat = DistIface::sync->nextRepeat;
315 // Do a global barrier to agree on a common repeat value (the smallest
316 // one from all participating nodes.
267}
268
269void
270DistIface::SyncNode::requestCkpt(ReqType req)
271{
272 std::lock_guard<std::mutex> sync_lock(lock);
273 assert(req != ReqType::none);
274 if (needCkpt != ReqType::none)

--- 60 unchanged lines hidden (view full) ---

335 // Note that this may be called either from startup() or drainResume()
336
337 // At this point, all DistIface objects has already called Sync::init() so
338 // we have a local minimum of the start tick and repeat for the periodic
339 // sync.
340 repeat = DistIface::sync->nextRepeat;
341 // Do a global barrier to agree on a common repeat value (the smallest
342 // one from all participating nodes.
317 DistIface::sync->run(false);
343 if (!DistIface::sync->run(false))
344 panic("DistIface::SyncEvent::start() aborted\n");
318
319 assert(!DistIface::sync->doCkpt);
320 assert(!DistIface::sync->doExit);
321 assert(!DistIface::sync->doStopSync);
322 assert(DistIface::sync->nextAt >= curTick());
323 assert(DistIface::sync->nextRepeat <= repeat);
324
325 if (curTick() == 0)

--- 35 unchanged lines hidden (view full) ---

361 * dist sync to complete.
362 * Note that the other simulation threads also release their eventq
363 * locks while waiting for us due to the global event semantics.
364 */
365 {
366 EventQueue::ScopedRelease sr(curEventQueue());
367 // we do a global sync here that is supposed to happen at the same
368 // tick in all gem5 peers
345
346 assert(!DistIface::sync->doCkpt);
347 assert(!DistIface::sync->doExit);
348 assert(!DistIface::sync->doStopSync);
349 assert(DistIface::sync->nextAt >= curTick());
350 assert(DistIface::sync->nextRepeat <= repeat);
351
352 if (curTick() == 0)

--- 35 unchanged lines hidden (view full) ---

388 * dist sync to complete.
389 * Note that the other simulation threads also release their eventq
390 * locks while waiting for us due to the global event semantics.
391 */
392 {
393 EventQueue::ScopedRelease sr(curEventQueue());
394 // we do a global sync here that is supposed to happen at the same
395 // tick in all gem5 peers
369 DistIface::sync->run(true);
396 if (!DistIface::sync->run(true))
397 return; // global sync aborted
370 // global sync completed
371 }
372 if (DistIface::sync->doCkpt)
373 exitSimLoop("checkpoint");
398 // global sync completed
399 }
400 if (DistIface::sync->doCkpt)
401 exitSimLoop("checkpoint");
374 if (DistIface::sync->doExit)
402 if (DistIface::sync->doExit) {
375 exitSimLoop("exit request from gem5 peers");
403 exitSimLoop("exit request from gem5 peers");
404 return;
405 }
376 if (DistIface::sync->doStopSync) {
377 DistIface::sync->doStopSync = false;
378 inform("synchronization disabled at %lu\n", curTick());
379
380 // The switch node needs to wait for the next sync immediately.
381 if (DistIface::isSwitch) {
382 start();
383 } else {

--- 216 unchanged lines hidden (view full) ---

600 }
601 distIfaceId = distIfaceNum;
602 distIfaceNum++;
603}
604
605DistIface::~DistIface()
606{
607 assert(recvThread);
406 if (DistIface::sync->doStopSync) {
407 DistIface::sync->doStopSync = false;
408 inform("synchronization disabled at %lu\n", curTick());
409
410 // The switch node needs to wait for the next sync immediately.
411 if (DistIface::isSwitch) {
412 start();
413 } else {

--- 216 unchanged lines hidden (view full) ---

630 }
631 distIfaceId = distIfaceNum;
632 distIfaceNum++;
633}
634
635DistIface::~DistIface()
636{
637 assert(recvThread);
638 recvThread->join();
608 delete recvThread;
639 delete recvThread;
609 if (this == master) {
640 if (distIfaceNum-- == 0) {
610 assert(syncEvent);
611 delete syncEvent;
612 assert(sync);
613 delete sync;
641 assert(syncEvent);
642 delete syncEvent;
643 assert(sync);
644 delete sync;
614 master = nullptr;
615 }
645 }
646 if (this == master)
647 master = nullptr;
616}
617
618void
619DistIface::packetOut(EthPacketPtr pkt, Tick send_delay)
620{
621 Header header;
622
623 // Prepare a dist header packet for the Ethernet packet we want to

--- 25 unchanged lines hidden (view full) ---

649 // Main loop to wait for and process any incoming message.
650 for (;;) {
651 // recvHeader() blocks until the next dist header packet comes in.
652 if (!recvHeader(header)) {
653 // We lost connection to the peer gem5 processes most likely
654 // because one of them called m5 exit. So we stop here.
655 // Grab the eventq lock to stop the simulation thread
656 curEventQueue()->lock();
648}
649
650void
651DistIface::packetOut(EthPacketPtr pkt, Tick send_delay)
652{
653 Header header;
654
655 // Prepare a dist header packet for the Ethernet packet we want to

--- 25 unchanged lines hidden (view full) ---

681 // Main loop to wait for and process any incoming message.
682 for (;;) {
683 // recvHeader() blocks until the next dist header packet comes in.
684 if (!recvHeader(header)) {
685 // We lost connection to the peer gem5 processes most likely
686 // because one of them called m5 exit. So we stop here.
687 // Grab the eventq lock to stop the simulation thread
688 curEventQueue()->lock();
657 exitSimLoop("Message server closed connection, simulator "
658 "is exiting");
689 exitSimLoop("connection to gem5 peer got closed");
659 curEventQueue()->unlock();
690 curEventQueue()->unlock();
691 // The simulation thread may be blocked in processing an on-going
692 // global synchronisation. Abort the sync to give the simulation
693 // thread a chance to make progress and process the exit event.
694 sync->abort();
695 // Finish receiver thread
660 break;
661 }
662
663 // We got a valid dist header packet, let's process it
664 if (header.msgType == MsgType::dataDescriptor) {
665 recvPacket(header, new_packet);
666 recvScheduler.pushPacket(new_packet,
667 header.sendTick,
668 header.sendDelay);
669 } else {
670 // everything else must be synchronisation related command
696 break;
697 }
698
699 // We got a valid dist header packet, let's process it
700 if (header.msgType == MsgType::dataDescriptor) {
701 recvPacket(header, new_packet);
702 recvScheduler.pushPacket(new_packet,
703 header.sendTick,
704 header.sendDelay);
705 } else {
706 // everything else must be synchronisation related command
671 sync->progress(header.sendTick,
672 header.syncRepeat,
673 header.needCkpt,
674 header.needExit,
675 header.needStopSync);
707 if (!sync->progress(header.sendTick,
708 header.syncRepeat,
709 header.needCkpt,
710 header.needExit,
711 header.needStopSync))
712 // Finish receiver thread if simulation is about to exit
713 break;
676 }
677 }
678}
679
680void
681DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
682{
683 assert(recvThread == nullptr);

--- 221 unchanged lines hidden ---
714 }
715 }
716}
717
718void
719DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
720{
721 assert(recvThread == nullptr);

--- 221 unchanged lines hidden ---