001/*
002 *  This file is part of the Jikes RVM project (http://jikesrvm.org).
003 *
004 *  This file is licensed to You under the Eclipse Public License (EPL);
005 *  You may not use this file except in compliance with the License. You
006 *  may obtain a copy of the License at
007 *
008 *      http://www.opensource.org/licenses/eclipse-1.0.php
009 *
010 *  See the COPYRIGHT.txt file distributed with this work for information
011 *  regarding copyright ownership.
012 */
013package org.mmtk.utility.deque;
014
015import static org.mmtk.utility.Constants.*;
016
017import org.mmtk.vm.VM;
018
019import org.vmmagic.pragma.*;
020import org.vmmagic.unboxed.*;
021
022/**
023 * This class implements a local (<i>unsynchronized</i>) queue.
024 * A queue is strictly FIFO.<p>
025 *
026 * Each instance stores word-sized values into a local buffer.  When
027 * the buffer is full, or if the <code>flushLocal()</code> method is
028 * called, the buffer enqueued at the tail of a
029 * <code>SharedDeque</code>.
030 *
031 * The implementation is intended to be as efficient as possible, in
032 * time and space, and is the basis for the <code>TraceBuffer</code> used by
033 * heap trace generation. Each instance adds a single field to those inherited
034 * from the SSB: a bump pointer.
035 *
036 * Preconditions: Buffers are always aligned on buffer-size address
037 * boundaries.<p>
038 *
039 * Invariants: Buffers are filled such that tuples (of the specified
040 * arity) are packed to the low end of the buffer.  Thus buffer
041 * underflows will always arise when then cursor is buffer-size aligned.
042 */
043@Uninterruptible class LocalQueue extends LocalSSB {
044
045  /**
046   * Constructor
047   *
048   * @param queue The shared queue to which this local ssb will append
049   * its buffers (when full or flushed).
050   */
051  LocalQueue(SharedDeque queue) {
052    super(queue);
053  }
054
055 /****************************************************************************
056   *
057   * Protected instance methods and fields
058   */
059
060  /** the start of the buffer */
061  @Entrypoint
062  protected Address head;
063
064  @Override
065  public void resetLocal() {
066    super.resetLocal();
067    head = Deque.HEAD_INITIAL_VALUE;
068  }
069
070  /**
071   * Check whether there are values in the buffer for a pending dequeue.
072   * If there is not data, grab the first buffer on the shared queue
073   * (after freeing the buffer).
074   *
075   * @param arity The arity of the values stored in this queue: the
076   * buffer must contain enough space for this many words.
077   * @return whether there are values available for a dequeue
078   */
079  @Inline
080  protected final boolean checkDequeue(int arity) {
081    if (bufferOffset(head).isZero()) {
082      return dequeueUnderflow(arity);
083    } else {
084      if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(bufferOffset(head).sGE(Word.fromIntZeroExtend(arity).lsh(LOG_BYTES_IN_ADDRESS).toOffset()));
085      return true;
086    }
087  }
088
089  /**
090   * Dequeue a value from the buffer.  This is <i>unchecked</i>.  The
091   * caller must first call <code>checkDequeue()</code> to ensure the
092   * buffer has and entry to be removed.
093   *
094   * @return The first entry on the queue.
095   */
096  @Inline
097  protected final Address uncheckedDequeue() {
098    if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(bufferOffset(head).sGE(Offset.fromIntZeroExtend(BYTES_IN_ADDRESS)));
099    head = head.minus(BYTES_IN_ADDRESS);
100    return head.loadAddress();
101  }
102
103  /**
104   * The head is empty (or null), and the shared queue has no buffers
105   * available.  If the tail has sufficient entries, consume the tail.
106   * Otherwise try wait on the global queue until either all other
107   * clients of the queue reach exhaustion or a buffer becomes
108   * available.
109   *
110   * @param arity The arity of this buffer
111   * @return True if the consumer has eaten all the entries
112   */
113  protected final boolean headStarved(int arity) {
114    if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == queue.getArity());
115
116    // If the tail has entries...
117    if (tail.NE(tailBufferEnd)) {
118      head = normalizeTail(arity).plus(BYTES_IN_ADDRESS);
119      tail = Deque.TAIL_INITIAL_VALUE;
120      tailBufferEnd = Deque.TAIL_INITIAL_VALUE;
121      // Return that we acquired more entries
122      return false;
123    }
124    // Wait for another entry to materialize...
125    head = queue.dequeueAndWait(arity);
126    // return true if a) there is a head buffer, and b) it is non-empty
127    return (head.EQ(Deque.HEAD_INITIAL_VALUE) || bufferOffset(head).isZero());
128  }
129
130  /****************************************************************************
131   *
132   * Private instance methods
133   */
134
135  /**
136   * There are not sufficient entries in the head buffer for a pending
137   * dequeue.  Acquire a new head buffer.  If the shared queue has no
138   * buffers available, consume the tail if necessary.  Return false
139   * if entries cannot be acquired.
140   *
141   * @param arity The arity of this buffer (used for sanity test only).
142   * @return True if there the head buffer has been successfully
143   * replenished.
144   */
145  @NoInline
146  private boolean dequeueUnderflow(int arity) {
147    if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == queue.getArity());
148    do {
149      if (head.NE(Deque.HEAD_INITIAL_VALUE))
150        queue.free(head);
151      head = queue.dequeue(arity);
152    } while (head.NE(Deque.HEAD_INITIAL_VALUE) && bufferOffset(head).isZero());
153
154    if (head.EQ(Deque.HEAD_INITIAL_VALUE))
155      return !headStarved(arity);
156
157    return true;
158  }
159}