001/*
002 *  This file is part of the Jikes RVM project (http://jikesrvm.org).
003 *
004 *  This file is licensed to You under the Eclipse Public License (EPL);
005 *  You may not use this file except in compliance with the License. You
006 *  may obtain a copy of the License at
007 *
008 *      http://www.opensource.org/licenses/eclipse-1.0.php
009 *
010 *  See the COPYRIGHT.txt file distributed with this work for information
011 *  regarding copyright ownership.
012 */
013package org.mmtk.utility.deque;
014
015import static org.mmtk.utility.Constants.*;
016
017import org.mmtk.policy.RawPageSpace;
018import org.mmtk.policy.Space;
019import org.mmtk.utility.Log;
020import org.mmtk.vm.Lock;
021import org.mmtk.vm.VM;
022import org.vmmagic.pragma.Entrypoint;
023import org.vmmagic.pragma.Inline;
024import org.vmmagic.pragma.Uninterruptible;
025import org.vmmagic.unboxed.Address;
026import org.vmmagic.unboxed.Offset;
027
028/**
029 * This supports <i>unsynchronized</i> enqueuing and dequeuing of buffers
030 * for shared use.  The data can be added to and removed from either end
031 * of the deque.
032 */
033@Uninterruptible
034public class SharedDeque extends Deque {
035  private static final boolean DISABLE_WAITING = true;
036  private static final Offset NEXT_OFFSET = Offset.zero();
037  private static final Offset PREV_OFFSET = Offset.fromIntSignExtend(BYTES_IN_ADDRESS);
038
039  private static final boolean TRACE = false;
040  private static final boolean TRACE_DETAIL = false;
041  private static final boolean TRACE_BLOCKERS = false;
042
043  /****************************************************************************
044   *
045   * Public instance methods
046   */
047
048  /**
049   * @param name the queue's human-readable name
050   * @param rps the space to get pages from
051   * @param arity the arity (number of words per entry) of this queue
052   */
053  public SharedDeque(String name, RawPageSpace rps, int arity) {
054    this.rps = rps;
055    this.arity = arity;
056    this.name = name;
057    lock = VM.newLock("SharedDeque");
058    clearCompletionFlag();
059    head = HEAD_INITIAL_VALUE;
060    tail = TAIL_INITIAL_VALUE;
061  }
062
063  /** @return the arity (words per entry) of this queue */
064  @Inline
065  final int getArity() {
066    return arity;
067  }
068
069  /**
070   * Enqueue a block on the head or tail of the shared queue
071   *
072   * @param buf the block to enqueue
073   * @param arity the arity of this queue
074   * @param toTail whether to enqueue to the tail of the shared queue
075   */
076  final void enqueue(Address buf, int arity, boolean toTail) {
077    if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity);
078    lock();
079    if (toTail) {
080      // Add to the tail of the queue
081      setNext(buf, Address.zero());
082      if (tail.EQ(TAIL_INITIAL_VALUE))
083        head = buf;
084      else
085        setNext(tail, buf);
086      setPrev(buf, tail);
087      tail = buf;
088    } else {
089      // Add to the head of the queue
090      setPrev(buf, Address.zero());
091      if (head.EQ(HEAD_INITIAL_VALUE))
092        tail = buf;
093      else
094        setPrev(head, buf);
095      setNext(buf, head);
096      head = buf;
097    }
098    bufsenqueued++;
099    if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(checkDequeLength(bufsenqueued));
100    unlock();
101  }
102
103  public final void clearDeque(int arity) {
104    Address buf = dequeue(arity);
105    while (!buf.isZero()) {
106      free(bufferStart(buf));
107      buf = dequeue(arity);
108    }
109    setCompletionFlag();
110  }
111
112  @Inline
113  final Address dequeue(int arity) {
114    return dequeue(arity, false);
115  }
116
117  final Address dequeue(int arity, boolean fromTail) {
118    if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity);
119    return dequeue(false, fromTail);
120  }
121
122  @Inline
123  final Address dequeueAndWait(int arity) {
124    return dequeueAndWait(arity, false);
125  }
126
127  final Address dequeueAndWait(int arity, boolean fromTail) {
128    if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity);
129    Address buf = dequeue(false, fromTail);
130    if (buf.isZero() && (!complete())) {
131      buf = dequeue(true, fromTail);  // Wait inside dequeue
132    }
133    return buf;
134  }
135
136  /**
137   * Prepare for parallel processing. All active GC threads will
138   * participate, and pop operations will block until all work
139   * is complete.
140   */
141  public final void prepare() {
142    if (DISABLE_WAITING) {
143      prepareNonBlocking();
144    } else {
145      /* This should be the normal mode of operation once performance is fixed */
146      prepare(VM.activePlan.collector().parallelWorkerCount());
147    }
148  }
149
150  /**
151   * Prepare for processing where pop operations on the deques
152   * will never block.
153   */
154  public final void prepareNonBlocking() {
155    prepare(1);
156  }
157
158  /**
159   * Prepare for parallel processing where a specific number
160   * of threads take part.
161   *
162   * @param consumers # threads taking part.
163   */
164  private void prepare(int consumers) {
165    if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(numConsumersWaiting == 0);
166    setNumConsumers(consumers);
167    clearCompletionFlag();
168  }
169
170  public final void reset() {
171    if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(numConsumersWaiting == 0);
172    clearCompletionFlag();
173    setNumConsumersWaiting(0);
174    assertExhausted();
175  }
176
177  public final void assertExhausted() {
178    if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(head.isZero() && tail.isZero());
179  }
180
181  @Inline
182  final Address alloc() {
183    Address rtn = rps.acquire(PAGES_PER_BUFFER);
184    if (rtn.isZero()) {
185      Space.printUsageMB();
186      VM.assertions.fail("Failed to allocate space for queue.  Is metadata virtual memory exhausted?");
187    }
188    if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(rtn.EQ(bufferStart(rtn)));
189    return rtn;
190  }
191
192  @Inline
193  final void free(Address buf) {
194    if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(buf.EQ(bufferStart(buf)) && !buf.isZero());
195    rps.release(buf);
196  }
197
198  @Inline
199  public final int enqueuedPages() {
200    return bufsenqueued * PAGES_PER_BUFFER;
201  }
202
203  /****************************************************************************
204   *
205   * Private instance methods and fields
206   */
207
208  /** The name of this shared deque - for diagnostics */
209  private final String name;
210
211  /** Raw page space from which to allocate */
212  private final RawPageSpace rps;
213
214  /** Number of words per entry */
215  private final int arity;
216
217  /** Completion flag - set when all consumers have arrived at the barrier */
218  @Entrypoint
219  private volatile int completionFlag;
220
221  /** # active threads - processing is complete when # waiting == this */
222  @Entrypoint
223  private volatile int numConsumers;
224
225  /** # threads waiting */
226  @Entrypoint
227  private volatile int numConsumersWaiting;
228
229  /** Head of the shared deque */
230  @Entrypoint
231  protected volatile Address head;
232
233  /** Tail of the shared deque */
234  @Entrypoint
235  protected volatile Address tail;
236  @Entrypoint
237  private volatile int bufsenqueued;
238  private final Lock lock;
239
240  private static final long WARN_PERIOD = (long)(2 * 1E9);
241  private static final long TIMEOUT_PERIOD = 10 * WARN_PERIOD;
242
243  /**
244   * Dequeue a block from the shared pool.  If 'waiting' is true, and the
245   * queue is empty, wait for either a new block to show up or all the
246   * other consumers to join us.
247   *
248   * @param waiting whether to wait to dequeue a block if none is present
249   * @param fromTail whether to dequeue from the tail
250   * @return the Address of the block
251   */
252  private Address dequeue(boolean waiting, boolean fromTail) {
253    lock();
254    Address rtn = ((fromTail) ? tail : head);
255    if (rtn.isZero()) {
256      if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(tail.isZero() && head.isZero());
257      // no buffers available
258      if (waiting) {
259        int ordinal = TRACE ? 0 : VM.activePlan.collector().getId();
260        setNumConsumersWaiting(numConsumersWaiting + 1);
261        while (rtn.isZero()) {
262          if (numConsumersWaiting == numConsumers)
263            setCompletionFlag();
264          if (TRACE) {
265            Log.write("-- ("); Log.write(ordinal);
266            Log.write(") joining wait queue of SharedDeque(");
267            Log.write(name); Log.write(") ");
268            Log.write(numConsumersWaiting); Log.write("/");
269            Log.write(numConsumers);
270            Log.write(" consumers waiting");
271            if (complete()) Log.write(" WAIT COMPLETE");
272            Log.writeln();
273            if (TRACE_BLOCKERS)
274              VM.assertions.dumpStack();
275          }
276          unlock();
277          // Spin and wait
278          spinWait(fromTail);
279
280          if (complete()) {
281            if (TRACE) {
282              Log.write("-- ("); Log.write(ordinal); Log.writeln(") EXITING");
283            }
284            lock();
285            setNumConsumersWaiting(numConsumersWaiting - 1);
286            unlock();
287            return Address.zero();
288          }
289          lock();
290          // Re-get the list head/tail while holding the lock
291          rtn = ((fromTail) ? tail : head);
292        }
293        setNumConsumersWaiting(numConsumersWaiting - 1);
294        if (TRACE) {
295          Log.write("-- ("); Log.write(ordinal); Log.write(") resuming work ");
296          Log.write(" n="); Log.writeln(numConsumersWaiting);
297        }
298      } else {
299        unlock();
300        return Address.zero();
301      }
302    }
303    if (fromTail) {
304      // dequeue the tail buffer
305      setTail(getPrev(tail));
306      if (head.EQ(rtn)) {
307        setHead(Address.zero());
308        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(tail.isZero());
309      } else {
310        setNext(tail, Address.zero());
311      }
312    } else {
313      // dequeue the head buffer
314      setHead(getNext(head));
315      if (tail.EQ(rtn)) {
316        setTail(Address.zero());
317        if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(head.isZero());
318      } else {
319        setPrev(head, Address.zero());
320      }
321    }
322    bufsenqueued--;
323    unlock();
324    return rtn;
325  }
326
327  /**
328   * Spinwait for GC work to arrive
329   *
330   * @param fromTail Check the head or the tail ?
331   */
332  private void spinWait(boolean fromTail) {
333    long startNano = 0;
334    long lastElapsedNano = 0;
335    while (true) {
336      long startCycles = VM.statistics.cycles();
337      long endCycles = startCycles + ((long) 1e9); // a few hundred milliseconds more or less.
338      long nowCycles;
339      do {
340        VM.memory.isync();
341        Address rtn = ((fromTail) ? tail : head);
342        if (!rtn.isZero() || complete()) return;
343        nowCycles = VM.statistics.cycles();
344      } while (startCycles < nowCycles && nowCycles < endCycles); /* check against both ends to guard against CPU migration */
345
346      /*
347       * According to the cycle counter, we've been spinning for a while.
348       * Time to check nanoTime and see if we should print a warning and/or fail.
349       * We lock the deque while doing this to avoid interleaved messages from multiple threads.
350       */
351      lock();
352      if (startNano == 0) {
353        startNano = VM.statistics.nanoTime();
354      } else {
355        long nowNano = VM.statistics.nanoTime();
356        long elapsedNano = nowNano - startNano;
357        if (elapsedNano - lastElapsedNano > WARN_PERIOD) {
358          Log.write("GC Warning: SharedDeque("); Log.write(name);
359          Log.write(") wait has reached "); Log.write(VM.statistics.nanosToSecs(elapsedNano));
360          Log.write(", "); Log.write(numConsumersWaiting); Log.write("/");
361          Log.write(numConsumers); Log.writeln(" threads waiting");
362          lastElapsedNano = elapsedNano;
363        }
364        if (elapsedNano > TIMEOUT_PERIOD) {
365          unlock();   // To allow other GC threads to die in turn
366          VM.assertions.fail("GC Error: SharedDeque Timeout");
367        }
368      }
369      unlock();
370    }
371  }
372
373  /**
374   * Set the "next" pointer in a buffer forming the linked buffer chain.
375   *
376   * @param buf The buffer whose next field is to be set.
377   * @param next The reference to which next should point.
378   */
379  private static void setNext(Address buf, Address next) {
380    buf.store(next, NEXT_OFFSET);
381  }
382
383  /**
384   * Get the "next" pointer in a buffer forming the linked buffer chain.
385   *
386   * @param buf The buffer whose next field is to be returned.
387   * @return The next field for this buffer.
388   */
389  protected final Address getNext(Address buf) {
390    return buf.loadAddress(NEXT_OFFSET);
391  }
392
393  /**
394   * Set the "prev" pointer in a buffer forming the linked buffer chain.
395   *
396   * @param buf The buffer whose next field is to be set.
397   * @param prev The reference to which prev should point.
398   */
399  private void setPrev(Address buf, Address prev) {
400    buf.store(prev, PREV_OFFSET);
401  }
402
403  /**
404   * Get the "next" pointer in a buffer forming the linked buffer chain.
405   *
406   * @param buf The buffer whose next field is to be returned.
407   * @return The next field for this buffer.
408   */
409  protected final Address getPrev(Address buf) {
410    return buf.loadAddress(PREV_OFFSET);
411  }
412
413  /**
414   * Check the number of buffers in the work queue (for debugging
415   * purposes).
416   *
417   * @param length The number of buffers believed to be in the queue.
418   * @return True if the length of the queue matches length.
419   */
420  private boolean checkDequeLength(int length) {
421    Address top = head;
422    int l = 0;
423    while (!top.isZero() && l <= length) {
424      top = getNext(top);
425      l++;
426    }
427    return l == length;
428  }
429
430  /**
431   * Lock this shared queue.  We use one simple low-level lock to
432   * synchronize access to the shared queue of buffers.
433   */
434  private void lock() {
435    lock.acquire();
436  }
437
438  /**
439   * Release the lock.  We use one simple low-level lock to synchronize
440   * access to the shared queue of buffers.
441   */
442  private void unlock() {
443    lock.release();
444  }
445
446  /**
447   * @return whether the current round of processing is complete
448   */
449  private boolean complete() {
450    return completionFlag == 1;
451  }
452
453  /**
454   * Set the completion flag.
455   */
456  @Inline
457  private void setCompletionFlag() {
458    if (TRACE_DETAIL) {
459      Log.writeln("# setCompletionFlag: ");
460    }
461    completionFlag = 1;
462  }
463
464  /**
465   * Clear the completion flag.
466   */
467  @Inline
468  private void clearCompletionFlag() {
469    if (TRACE_DETAIL) {
470      Log.writeln("# clearCompletionFlag: ");
471    }
472    completionFlag = 0;
473  }
474
475  @Inline
476  private void setNumConsumers(int newNumConsumers) {
477    if (TRACE_DETAIL) {
478      Log.write("# Num consumers "); Log.writeln(newNumConsumers);
479    }
480    numConsumers = newNumConsumers;
481  }
482
483  @Inline
484  private void setNumConsumersWaiting(int newNCW) {
485    if (TRACE_DETAIL) {
486      Log.write("# Num consumers waiting "); Log.writeln(newNCW);
487    }
488    numConsumersWaiting = newNCW;
489  }
490
491  @Inline
492  private void setHead(Address newHead) {
493    head = newHead;
494    VM.memory.sync();
495  }
496
497  @Inline
498  private void setTail(Address newTail) {
499    tail = newTail;
500    VM.memory.sync();
501  }
502}