return ( domain != null ) ? domain : host;
}
synchronized String dump()
{
StringBuilder out = new StringBuilder(4096);
long now = System.currentTimeMillis();
out.append( String.format(
"VisitQueue@%x Dump, orders %d, acq %d, hosts %d ::\n",
System.identityHashCode( this ),
orderCount(),
acquiredCount(),
hostCount() ) );
for( HostQueue hq : _hosts.values() ) {
boolean isReady = _readyHosts.contains( hq );
boolean isSleep = _sleepHosts.contains( hq );
out.append( String.format(
"%20s size %4d, acq %1d, next %3dms, %c %c\n",
hq.host(),
hq.size(),
hq.accessCount(),
hq.nextVisit() - now,
( isReady ? 'R' : ' ' ),
( isSleep ? 'S' : ' ' ) ) );
}
return out.toString();
}
/**
* Take the next ready/highest priority host queue. May block up
* to maxWait for the next ready queue.
* @param maxWait maximum wait in milliseconds
* @return HostQueue or null if maxWait exceeded
*/
private synchronized HostQueue take( long maxWait )
throws InterruptedException
{
long now = System.currentTimeMillis();
HostQueue ready = null;
while( ( ( ready = _readyHosts.poll() ) == null ) &&
( maxWait > 0 ) ) {
HostQueue next = null;
while( ( next = _sleepHosts.peek() ) != null ) {
if( ( now - next.nextVisit() ) >= 0 ) {
addReady( _sleepHosts.remove() );
}
else break;
}
if( _readyHosts.isEmpty() ) {
long delay = maxWait;
if( next != null ) {
delay = Math.min( next.nextVisit() - now + 1, maxWait );
}
wait( delay );
long nextNow = System.currentTimeMillis();
}
maxWait -= nextNow - now;
now = nextNow;
}
if( ready != null ) ready.setLastTake( now );
return ready;
}
private void checkRemove( HostQueue queue )
{
if( ( queue.accessCount() == 0 ) && ( queue.size() == 0 ) ) {
--_hostCount;
if( ( queue.minHostDelay() == _defaultMinHostDelay ) &&
( queue.maxAccessCount() == _defaultMaxAccessPerHost ) ) {
_hosts.remove( queue.host() );
}
}
}
private void untakeImpl( HostQueue queue )
{
if( queue.isAvailable() && ( queue.size() > 0 ) ) {
addSleep( queue );
}
}
private void privAdd( UniMap order )
{
String host = orderKey( order );
HostQueue queue = _hosts.get( host );
final boolean isNew = ( queue == null );
if( isNew ) {
queue = new HostQueue( host,
_defaultMinHostDelay,
_defaultMaxAccessPerHost );
_hosts.put( host, queue );
}
queue.add( order );
if( ( queue.size() == 1 ) && ( queue.isAvailable() ) ) {
addReady( queue );
}
if( ( queue.size() == 1 ) && ( queue.accessCount() == 0 ) ) {
++_hostCount;
}
++_orderCount;
}
private void addReady( HostQueue queue )
{
if( _log.isDebugEnabled() ) {
_log.debug( "addReady: {} {}", queue.host(), queue.size() );
checkAdd( queue );
}
if( ! queue.isAvailable() ) {
throw new IllegalStateException( "Unavailable addReady!");
}
_readyHosts.add( queue );
}
private void addSleep( HostQueue queue )
{
if( _log.isDebugEnabled() ) {
_log.debug( "addSleep: {} {}", queue.host(), queue.size() );
checkAdd( queue );
}
if( ! queue.isAvailable() ) {
throw new IllegalStateException( "Unavailable addSleep!");
}
_sleepHosts.add( queue );
notifyAll();
}
private void checkAdd( HostQueue queue )
throws IllegalStateException
{
if( _readyHosts.contains( queue ) ) {
throw new IllegalStateException( "Already ready!" );
}
if( _sleepHosts.contains( queue ) ) {
throw new IllegalStateException( "Already sleeping!" );
}
if( queue.size() == 0 ) {
throw new IllegalStateException( "Adding empty queue!" );
}
}
private int _defaultMinHostDelay = 500; //ms
private int _defaultMaxAccessPerHost = 1;
private int _orderCount = 0;
private int _acquiredCount = 0;
private int _hostCount = 0;
private final Map _hosts =
new HashMap( 2048 );
private PriorityQueue _readyHosts =
new PriorityQueue( 1024, new HostQueue.TopOrderComparator());
private PriorityQueue _sleepHosts =
new PriorityQueue( 128, new HostQueue.NextVisitComparator());
private Logger _log = LoggerFactory.getLogger( getClass() );
} |