Projects >> java-memcached-client >>49ea78144e4390b490e480330dd69137b67afe1e

Chunk
Conflicting content
        getOperationTimeout());
  }

<<<<<<< HEAD
  @Override
  public OperationFactory getOperationFactory() {
    return new BinaryOperationFactory();
  }
=======
	@Override
	protected String getName() {
		return "BinaryConnectionFactory";
	}

>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
        getOperationTimeout());
  }

  @Override
  public OperationFactory getOperationFactory() {
    return new BinaryOperationFactory();
  }

  @Override
  protected String getName() {
    return "BinaryConnectionFactory";
  }
}
File
BinaryConnectionFactory.java
Developer's decision
Concatenation
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
   */

 *
 * 

*/ <<<<<<< HEAD public class DefaultConnectionFactory extends SpyObject implements ConnectionFactory { /** * Default failure mode. */ public static final FailureMode DEFAULT_FAILURE_MODE = FailureMode.Redistribute; /** * Default hash algorithm. */ public static final HashAlgorithm DEFAULT_HASH = DefaultHashAlgorithm.NATIVE_HASH; /** * Maximum length of the operation queue returned by this connection factory. */ public static final int DEFAULT_OP_QUEUE_LEN = 16384; /** * The maximum time to block waiting for op queue operations to complete, in * milliseconds. The default has been set with the expectation that most * requests are interactive and waiting for more than a few seconds is thus * more undesirable than failing the request. */ public static final long DEFAULT_OP_QUEUE_MAX_BLOCK_TIME = TimeUnit.SECONDS.toMillis(10); /** * The read buffer size for each server connection from this factory. /* * (non-Javadoc) * return true; } public static final int DEFAULT_READ_BUFFER_SIZE = 16384; /** * Default operation timeout in milliseconds. */ public static final long DEFAULT_OPERATION_TIMEOUT = 2500; /** * Maximum amount of time (in seconds) to wait between reconnect attempts. */ public static final long DEFAULT_MAX_RECONNECT_DELAY = 30; /** * Maximum number + 2 of timeout exception for shutdown connection. */ public static final int DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD = 998; protected final int opQueueLen; private final int readBufSize; private final HashAlgorithm hashAlg; /** * Construct a DefaultConnectionFactory with the given parameters. * * @param qLen the queue length. * @param bufSize the buffer size * @param hash the algorithm to use for hashing */ public DefaultConnectionFactory(int qLen, int bufSize, HashAlgorithm hash) { super(); opQueueLen = qLen; readBufSize = bufSize; hashAlg = hash; } /** * Create a DefaultConnectionFactory with the given maximum operation queue * length, and the given read buffer size. */ public DefaultConnectionFactory(int qLen, int bufSize) { this(qLen, bufSize, DEFAULT_HASH); } /** * Create a DefaultConnectionFactory with the default parameters. */ public DefaultConnectionFactory() { this(DEFAULT_OP_QUEUE_LEN, DEFAULT_READ_BUFFER_SIZE); } public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c, int bufSize) { OperationFactory of = getOperationFactory(); if (of instanceof AsciiOperationFactory) { return new AsciiMemcachedNodeImpl(sa, c, bufSize, createReadOperationQueue(), createWriteOperationQueue(), createOperationQueue(), getOpQueueMaxBlockTime(), getOperationTimeout()); } else if (of instanceof BinaryOperationFactory) { boolean doAuth = false; if (getAuthDescriptor() != null) { doAuth = true; } return new BinaryMemcachedNodeImpl(sa, c, bufSize, createReadOperationQueue(), createWriteOperationQueue(), createOperationQueue(), getOpQueueMaxBlockTime(), doAuth, getOperationTimeout()); } else { throw new IllegalStateException("Unhandled operation factory type " + of); } } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#createConnection(java.util.List) */ public MemcachedConnection createConnection(List addrs) throws IOException { return new MemcachedConnection(getReadBufSize(), this, addrs, getInitialObservers(), getFailureMode(), getOperationFactory()); } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getFailureMode() */ public FailureMode getFailureMode() { return DEFAULT_FAILURE_MODE; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#createOperationQueue() */ public BlockingQueue createOperationQueue() { return new ArrayBlockingQueue(getOpQueueLen()); } * @see net.spy.memcached.ConnectionFactory#createReadOperationQueue() */ public BlockingQueue createReadOperationQueue() { return new LinkedBlockingQueue(); } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#createWriteOperationQueue() */ public BlockingQueue createWriteOperationQueue() { return new LinkedBlockingQueue(); } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#createLocator(java.util.List) */ public NodeLocator createLocator(List nodes) { return new ArrayModNodeLocator(nodes, getHashAlg()); } /** * Get the op queue length set at construct time. */ public int getOpQueueLen() { return opQueueLen; } /** * @return the maximum time to block waiting for op queue operations to * complete, in milliseconds, or null for no waiting. */ public long getOpQueueMaxBlockTime() { return DEFAULT_OP_QUEUE_MAX_BLOCK_TIME; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getReadBufSize() */ public int getReadBufSize() { return readBufSize; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getHashAlg() */ public HashAlgorithm getHashAlg() { return hashAlg; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getOperationFactory() */ public OperationFactory getOperationFactory() { return new AsciiOperationFactory(); } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getOperationTimeout() */ public long getOperationTimeout() { return DEFAULT_OPERATION_TIMEOUT; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#isDaemon() */ public boolean isDaemon() { return false; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getInitialObservers() */ public Collection getInitialObservers() { return Collections.emptyList(); } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getDefaultTranscoder() */ public Transcoder getDefaultTranscoder() { return new SerializingTranscoder(); } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#useNagleAlgorithm() */ public boolean useNagleAlgorithm() { return false; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#shouldOptimize() */ public boolean shouldOptimize() { */ /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getMaxReconnectDelay() */ public long getMaxReconnectDelay() { return DEFAULT_MAX_RECONNECT_DELAY; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getAuthDescriptor() */ public AuthDescriptor getAuthDescriptor() { return null; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getTimeoutExceptionThreshold() */ public int getTimeoutExceptionThreshold() { return DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD; } ======= public class DefaultConnectionFactory extends SpyObject implements ConnectionFactory { /** * Default failure mode. */ public static final FailureMode DEFAULT_FAILURE_MODE = FailureMode.Redistribute; /** * Default hash algorithm. */ public static final HashAlgorithm DEFAULT_HASH = HashAlgorithm.NATIVE_HASH; /** * Maximum length of the operation queue returned by this connection * factory. public static final int DEFAULT_OP_QUEUE_LEN=16384; /** * The maximum time to block waiting for op queue operations to complete, * in milliseconds. The default has been set with the expectation that * most requests are interactive and waiting for more than a few seconds * is thus more undesirable than failing the request. */ public static final long DEFAULT_OP_QUEUE_MAX_BLOCK_TIME = TimeUnit.SECONDS.toMillis(10); /** * The read buffer size for each server connection from this factory. */ public static final int DEFAULT_READ_BUFFER_SIZE=16384; /** * Default operation timeout in milliseconds. */ public static final long DEFAULT_OPERATION_TIMEOUT = 2500; /** * Maximum amount of time (in seconds) to wait between reconnect attempts. */ public static final long DEFAULT_MAX_RECONNECT_DELAY = 30; /** * Maximum number + 2 of timeout exception for shutdown connection */ public static final int DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD = 998; private final int opQueueLen; private final int readBufSize; private final HashAlgorithm hashAlg; /** * Construct a DefaultConnectionFactory with the given parameters. * * @param qLen the queue length. * @param bufSize the buffer size * @param hash the algorithm to use for hashing */ public DefaultConnectionFactory(int qLen, int bufSize, HashAlgorithm hash) { super(); opQueueLen=qLen; readBufSize=bufSize; hashAlg=hash; } /** * Create a DefaultConnectionFactory with the given maximum operation * queue length, and the given read buffer size. */ public DefaultConnectionFactory(int qLen, int bufSize) { this(qLen, bufSize, DEFAULT_HASH); } /** * Create a DefaultConnectionFactory with the default parameters. */ public DefaultConnectionFactory() { this(DEFAULT_OP_QUEUE_LEN, DEFAULT_READ_BUFFER_SIZE); } public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c, int bufSize) { OperationFactory of = getOperationFactory(); if(of instanceof AsciiOperationFactory) { return new AsciiMemcachedNodeImpl(sa, c, bufSize, createReadOperationQueue(), createWriteOperationQueue(), createOperationQueue(), getOpQueueMaxBlockTime(), getOperationTimeout()); } else if(of instanceof BinaryOperationFactory) { boolean doAuth = false; if (getAuthDescriptor() != null) { doAuth = true; } return new BinaryMemcachedNodeImpl(sa, c, bufSize, createReadOperationQueue(), createWriteOperationQueue(), createOperationQueue(), getOpQueueMaxBlockTime(), doAuth, getOperationTimeout()); } else { throw new IllegalStateException( "Unhandled operation factory type " + of); } } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#createConnection(java.util.List) */ public MemcachedConnection createConnection(List addrs) throws IOException { return new MemcachedConnection(getReadBufSize(), this, addrs, getInitialObservers(), getFailureMode(), getOperationFactory()); } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#getFailureMode() */ public FailureMode getFailureMode() { return DEFAULT_FAILURE_MODE; } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#createOperationQueue() */ public BlockingQueue createOperationQueue() { return new ArrayBlockingQueue(getOpQueueLen()); } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#createReadOperationQueue() */ public BlockingQueue createReadOperationQueue() { return new LinkedBlockingQueue(); } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#createWriteOperationQueue() */ public BlockingQueue createWriteOperationQueue() { return new LinkedBlockingQueue(); } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#createLocator(java.util.List) */ public NodeLocator createLocator(List nodes) { return new ArrayModNodeLocator(nodes, getHashAlg()); } /** * Get the op queue length set at construct time. */ public int getOpQueueLen() { return opQueueLen; } /** * @return the maximum time to block waiting for op queue operations to * complete, in milliseconds, or null for no waiting. */ public long getOpQueueMaxBlockTime() { return DEFAULT_OP_QUEUE_MAX_BLOCK_TIME; } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#getReadBufSize() */ public int getReadBufSize() { return readBufSize; } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#getHashAlg() */ public HashAlgorithm getHashAlg() { return hashAlg; } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#getOperationFactory() */ public OperationFactory getOperationFactory() { return new AsciiOperationFactory(); } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#getOperationTimeout() */ public long getOperationTimeout() { return DEFAULT_OPERATION_TIMEOUT; } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#isDaemon() */ public boolean isDaemon() { return false; } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#getInitialObservers() */ public Collection getInitialObservers() { return Collections.emptyList(); } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#getDefaultTranscoder() */ public Transcoder getDefaultTranscoder() { return new SerializingTranscoder(); } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#useNagleAlgorithm() */ public boolean useNagleAlgorithm() { return false; } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#shouldOptimize() */ public boolean shouldOptimize() { return true; } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#getMaxReconnectDelay() */ public long getMaxReconnectDelay() { return DEFAULT_MAX_RECONNECT_DELAY; } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#getAuthDescriptor() */ public AuthDescriptor getAuthDescriptor() { return null; } /* (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#getTimeoutExceptionThreshold() */ public int getTimeoutExceptionThreshold() { return DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD; } protected String getName() { return "DefaultConnectionFactory"; } @Override public String toString() { return "Failure Mode: " + getFailureMode().name() + ", Hash Algorithm: " + getHashAlg().name() + " Max Reconnect Delay: " + getMaxReconnectDelay() + ", Max Op Timeout: " + getOperationTimeout() + ", Op Queue Length: " + getOpQueueLen() + ", Op Max Queue Block Time" + getOpQueueMaxBlockTime() + ", Max Timeout Exception Threshold: " + getTimeoutExceptionThreshold() + ", Read Buffer Size: " + getReadBufSize() + ", Transcoder: " + getDefaultTranscoder() + ", Operation Factory: " + getOperationFactory() + " isDaemon: " + isDaemon() + ", Optimized: " + shouldOptimize() + ", Using Nagle: " + useNagleAlgorithm() + ", ConnectionFactory: " + getName(); } >>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd }
Solution content
  }

   *
 *
 * 

*/ public class DefaultConnectionFactory extends SpyObject implements ConnectionFactory { /** * Default failure mode. */ public static final FailureMode DEFAULT_FAILURE_MODE = FailureMode.Redistribute; /** * Default hash algorithm. */ public static final HashAlgorithm DEFAULT_HASH = DefaultHashAlgorithm.NATIVE_HASH; /** * Maximum length of the operation queue returned by this connection factory. */ public static final int DEFAULT_OP_QUEUE_LEN = 16384; /** * The maximum time to block waiting for op queue operations to complete, in * milliseconds. The default has been set with the expectation that most * requests are interactive and waiting for more than a few seconds is thus * more undesirable than failing the request. */ public static final long DEFAULT_OP_QUEUE_MAX_BLOCK_TIME = TimeUnit.SECONDS.toMillis(10); /** * The read buffer size for each server connection from this factory. */ public static final int DEFAULT_READ_BUFFER_SIZE = 16384; /** return true; } * Default operation timeout in milliseconds. */ public static final long DEFAULT_OPERATION_TIMEOUT = 2500; /** * Maximum amount of time (in seconds) to wait between reconnect attempts. */ public static final long DEFAULT_MAX_RECONNECT_DELAY = 30; /** * Maximum number + 2 of timeout exception for shutdown connection. */ public static final int DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD = 998; protected final int opQueueLen; private final int readBufSize; private final HashAlgorithm hashAlg; /** * Construct a DefaultConnectionFactory with the given parameters. * * @param qLen the queue length. * @param bufSize the buffer size * @param hash the algorithm to use for hashing */ public DefaultConnectionFactory(int qLen, int bufSize, HashAlgorithm hash) { super(); opQueueLen = qLen; readBufSize = bufSize; hashAlg = hash; } /** * Create a DefaultConnectionFactory with the given maximum operation queue * length, and the given read buffer size. */ public DefaultConnectionFactory(int qLen, int bufSize) { this(qLen, bufSize, DEFAULT_HASH); } /** * Create a DefaultConnectionFactory with the default parameters. */ public DefaultConnectionFactory() { this(DEFAULT_OP_QUEUE_LEN, DEFAULT_READ_BUFFER_SIZE); } public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c, int bufSize) { OperationFactory of = getOperationFactory(); if (of instanceof AsciiOperationFactory) { return new AsciiMemcachedNodeImpl(sa, c, bufSize, createReadOperationQueue(), createWriteOperationQueue(), createOperationQueue(), getOpQueueMaxBlockTime(), getOperationTimeout()); } else if (of instanceof BinaryOperationFactory) { boolean doAuth = false; if (getAuthDescriptor() != null) { doAuth = true; return new BinaryMemcachedNodeImpl(sa, c, bufSize, createReadOperationQueue(), createWriteOperationQueue(), createOperationQueue(), getOpQueueMaxBlockTime(), doAuth, getOperationTimeout()); } else { throw new IllegalStateException("Unhandled operation factory type " + of); } } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#createConnection(java.util.List) */ public MemcachedConnection createConnection(List addrs) throws IOException { return new MemcachedConnection(getReadBufSize(), this, addrs, getInitialObservers(), getFailureMode(), getOperationFactory()); } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getFailureMode() */ public FailureMode getFailureMode() { return DEFAULT_FAILURE_MODE; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#createOperationQueue() */ public BlockingQueue createOperationQueue() { return new ArrayBlockingQueue(getOpQueueLen()); } /* * (non-Javadoc) * @see net.spy.memcached.ConnectionFactory#createReadOperationQueue() */ public BlockingQueue createReadOperationQueue() { return new LinkedBlockingQueue(); } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#createWriteOperationQueue() */ public BlockingQueue createWriteOperationQueue() { return new LinkedBlockingQueue(); } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#createLocator(java.util.List) */ public NodeLocator createLocator(List nodes) { return new ArrayModNodeLocator(nodes, getHashAlg()); } /** * Get the op queue length set at construct time. */ public int getOpQueueLen() { return opQueueLen; } /** * @return the maximum time to block waiting for op queue operations to * complete, in milliseconds, or null for no waiting. */ public long getOpQueueMaxBlockTime() { return DEFAULT_OP_QUEUE_MAX_BLOCK_TIME; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getReadBufSize() */ public int getReadBufSize() { return readBufSize; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getHashAlg() */ public HashAlgorithm getHashAlg() { return hashAlg; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getOperationFactory() */ public OperationFactory getOperationFactory() { return new AsciiOperationFactory(); } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getOperationTimeout() */ public long getOperationTimeout() { return DEFAULT_OPERATION_TIMEOUT; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#isDaemon() */ public boolean isDaemon() { return false; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getInitialObservers() */ public Collection getInitialObservers() { return Collections.emptyList(); } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getDefaultTranscoder() */ public Transcoder getDefaultTranscoder() { return new SerializingTranscoder(); } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#useNagleAlgorithm() */ public boolean useNagleAlgorithm() { return false; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#shouldOptimize() */ public boolean shouldOptimize() { /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getMaxReconnectDelay() */ public long getMaxReconnectDelay() { return DEFAULT_MAX_RECONNECT_DELAY; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getAuthDescriptor() */ public AuthDescriptor getAuthDescriptor() { return null; } /* * (non-Javadoc) * * @see net.spy.memcached.ConnectionFactory#getTimeoutExceptionThreshold() */ public int getTimeoutExceptionThreshold() { return DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD; } protected String getName() { return "DefaultConnectionFactory"; } @Override public String toString() { return "Failure Mode: " + getFailureMode().name() + ", Hash Algorithm: " + ((DefaultHashAlgorithm)getHashAlg()).name() + " Max Reconnect Delay: " + getMaxReconnectDelay() + ", Max Op Timeout: " + getOperationTimeout() + ", Op Queue Length: " + getOpQueueLen() + ", Op Max Queue Block Time" + getOpQueueMaxBlockTime() + ", Max Timeout Exception Threshold: " + getTimeoutExceptionThreshold() + ", Read Buffer Size: " + getReadBufSize() + ", Transcoder: " + getDefaultTranscoder() + ", Operation Factory: " + getOperationFactory() + " isDaemon: " + isDaemon() + ", Optimized: " + shouldOptimize() + ", Using Nagle: " + useNagleAlgorithm() + ", ConnectionFactory: " + getName(); } }
File
DefaultConnectionFactory.java
Developer's decision
Manual
Kind of conflict
Annotation
Attribute
Class signature
Comment
Method declaration
Method invocation
Chunk
Conflicting content
 *      }
 * 
*/ <<<<<<< HEAD public class MemcachedClient extends SpyObject implements MemcachedClientIF, ConnectionObserver { protected volatile boolean shuttingDown = false; protected final long operationTimeout; protected final MemcachedConnection mconn; protected final OperationFactory opFact; protected final Transcoder transcoder; protected final TranscodeService tcService; protected final AuthDescriptor authDescriptor; protected final ConnectionFactory connFactory; protected final AuthThreadMonitor authMonitor = new AuthThreadMonitor(); /** * Get a memcache client operating on the specified memcached locations. * * @param ia the memcached locations * @throws IOException if connections cannot be established */ public MemcachedClient(InetSocketAddress... ia) throws IOException { this(new DefaultConnectionFactory(), Arrays.asList(ia)); } /** * Get a memcache client over the specified memcached locations. * * @param addrs the socket addrs * @throws IOException if connections cannot be established */ public MemcachedClient(List addrs) throws IOException { this(new DefaultConnectionFactory(), addrs); } /** * Get a memcache client over the specified memcached locations. * * @param cf the connection factory to configure connections for this client * @param addrs the socket addresses * @throws IOException if connections cannot be established */ public MemcachedClient(ConnectionFactory cf, List addrs) throws IOException { if (cf == null) { throw new NullPointerException("Connection factory required"); } if (addrs == null) { throw new NullPointerException("Server list required"); } if (addrs.isEmpty()) { throw new IllegalArgumentException("You must have at least one server to" + " connect to"); } if (cf.getOperationTimeout() <= 0) { throw new IllegalArgumentException("Operation timeout must be positive."); } connFactory = cf; tcService = new TranscodeService(cf.isDaemon()); transcoder = cf.getDefaultTranscoder(); opFact = cf.getOperationFactory(); assert opFact != null : "Connection factory failed to make op factory"; mconn = cf.createConnection(addrs); assert mconn != null : "Connection factory failed to make a connection"; operationTimeout = cf.getOperationTimeout(); authDescriptor = cf.getAuthDescriptor(); if (authDescriptor != null) { addObserver(this); } } /** * Get the addresses of available servers. * *

* This is based on a snapshot in time so shouldn't be considered completely * accurate, but is a useful for getting a feel for what's working and what's * not working. *

* * @return point-in-time view of currently available servers */ public Collection getAvailableServers() { ArrayList rv = new ArrayList(); for (MemcachedNode node : mconn.getLocator().getAll()) { if (node.isActive()) { rv.add(node.getSocketAddress()); } } return rv; } /** * Get the addresses of unavailable servers. * *

* This is based on a snapshot in time so shouldn't be considered completely * accurate, but is a useful for getting a feel for what's working and what's * not working. *

* * @return point-in-time view of currently available servers */ public Collection getUnavailableServers() { ArrayList rv = new ArrayList(); for (MemcachedNode node : mconn.getLocator().getAll()) { if (!node.isActive()) { rv.add(node.getSocketAddress()); } } return rv; } /** * * Get a read-only wrapper around the node locator wrapping this instance. * * @return this instance's NodeLocator */ public NodeLocator getNodeLocator() { return mconn.getLocator().getReadonlyCopy(); } /** * Get the default transcoder that's in use. * * @return this instance's Transcoder */ public Transcoder getTranscoder() { return transcoder; } private void validateKey(String key) { byte[] keyBytes = KeyUtil.getKeyBytes(key); if (keyBytes.length > MAX_KEY_LENGTH) { throw new IllegalArgumentException("Key is too long (maxlen = " + MAX_KEY_LENGTH + ")"); } if (keyBytes.length == 0) { throw new IllegalArgumentException( "Key must contain at least one character."); } // Validate the key for (byte b : keyBytes) { if (b == ' ' || b == '\n' || b == '\r' || b == 0) { throw new IllegalArgumentException( "Key contains invalid characters: ``" + key + "''"); } } } /** * (internal use) Add a raw operation to a numbered connection. This method is * exposed for testing. * * @param which server number * @param op the operation to perform * @return the Operation */ Operation addOp(final String key, final Operation op) { validateKey(key); mconn.checkState(); mconn.addOperation(key, op); return op; } CountDownLatch broadcastOp(final BroadcastOpFactory of) { return broadcastOp(of, mconn.getLocator().getAll(), true); } CountDownLatch broadcastOp(final BroadcastOpFactory of, Collection nodes) { return broadcastOp(of, nodes, true); } private CountDownLatch broadcastOp(BroadcastOpFactory of, Collection nodes, boolean checkShuttingDown) { if (checkShuttingDown && shuttingDown) { throw new IllegalStateException("Shutting down"); } return mconn.broadcastOperation(of, nodes); } private OperationFuture asyncStore(StoreType storeType, String key, int exp, T value, Transcoder tc) { CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); Operation op = opFact.store(storeType, key, co.getFlags(), exp, co.getData(), new OperationCallback() { public void receivedStatus(OperationStatus val) { rv.set(val.isSuccess(), val); } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } private OperationFuture asyncStore(StoreType storeType, String key, int exp, Object value) { return asyncStore(storeType, key, exp, value, transcoder); } private OperationFuture asyncCat(ConcatenationType catType, long cas, String key, T value, Transcoder tc) { CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); Operation op = opFact.cat(catType, cas, key, co.getData(), new OperationCallback() { public void receivedStatus(OperationStatus val) { rv.set(val.isSuccess(), val); } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Touch the given key to reset its expiration time with the default * transcoder. * * @param key the key to fetch * @param exp the new expiration to set for the given key * @return a future that will hold the return value of whether or not the * fetch succeeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture touch(final String key, final int exp) { return touch(key, exp, transcoder); } /** * Touch the given key to reset its expiration time. * * @param key the key to fetch * @param exp the new expiration to set for the given key * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of whether or not the * fetch succeeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture touch(final String key, final int exp, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); Operation op = opFact.touch(key, exp, new OperationCallback() { public void receivedStatus(OperationStatus status) { rv.set(status.isSuccess(), status); } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Append to an existing value in the cache. * *

* Note that the return will be false any time a mutation has not occurred. *

* * @param cas cas identifier (ignored in the ascii protocol) * @param key the key to whose value will be appended * @param val the value to append * @return a future indicating success, false if there was no change to the * value * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture append(long cas, String key, Object val) { return append(cas, key, val, transcoder); } /** * Append to an existing value in the cache. * *

* Note that the return will be false any time a mutation has not occurred. *

* * @param * @param cas cas identifier (ignored in the ascii protocol) * @param key the key to whose value will be appended * @param val the value to append * @param tc the transcoder to serialize and unserialize the value * @return a future indicating success * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture append(long cas, String key, T val, Transcoder tc) { return asyncCat(ConcatenationType.append, cas, key, val, tc); } /** * Prepend to an existing value in the cache. *

*

* Note that the return will be false any time a mutation has not occurred. *

* * @param cas cas identifier (ignored in the ascii protocol) * @param key the key to whose value will be prepended * @param val the value to append * @return a future indicating success * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture prepend(long cas, String key, Object val) { return prepend(cas, key, val, transcoder); } /** * Prepend to an existing value in the cache. * *

* Note that the return will be false any time a mutation has not occurred. *

* * @param * @param cas cas identifier (ignored in the ascii protocol) * @param key the key to whose value will be prepended * @param val the value to append * @param tc the transcoder to serialize and unserialize the value * @return a future indicating success * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture prepend(long cas, String key, T val, Transcoder tc) { return asyncCat(ConcatenationType.prepend, cas, key, val, tc); } /** * Asynchronous CAS operation. * * @param * @param key the key * @param casId the CAS identifier (from a gets operation) * @param value the new value * @param tc the transcoder to serialize and unserialize the value * @return a future that will indicate the status of the CAS * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Future asyncCAS(String key, long casId, T value, Transcoder tc) { return asyncCAS(key, casId, 0, value, tc); } /** * Asynchronous CAS operation. * * @param * @param key the key * @param casId the CAS identifier (from a gets operation) * @param exp the expiration of this object * @param value the new value * @param tc the transcoder to serialize and unserialize the value * @return a future that will indicate the status of the CAS * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Future asyncCAS(String key, long casId, int exp, T value, Transcoder tc) { CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); Operation op = opFact.cas(StoreType.set, key, casId, co.getFlags(), exp, co.getData(), new OperationCallback() { public void receivedStatus(OperationStatus val) { if (val instanceof CASOperationStatus) { rv.set(((CASOperationStatus) val).getCASResponse(), val); } else if (val instanceof CancelledOperationStatus) { getLogger().debug("CAS operation cancelled"); } else { throw new RuntimeException("Unhandled state: " + val); } } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Asynchronous CAS operation using the default transcoder. * * @param key the key * @param casId the CAS identifier (from a gets operation) * @param value the new value * @return a future that will indicate the status of the CAS * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Future asyncCAS(String key, long casId, Object value) { return asyncCAS(key, casId, value, transcoder); } /** * Perform a synchronous CAS operation. * * @param * @param key the key * @param casId the CAS identifier (from a gets operation) * @param value the new value * @param tc the transcoder to serialize and unserialize the value * @return a CASResponse * @throws OperationTimeoutException if global operation timeout is exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASResponse cas(String key, long casId, T value, Transcoder tc) { return cas(key, casId, 0, value, tc); } /** * Perform a synchronous CAS operation. * * @param * @param key the key * @param casId the CAS identifier (from a gets operation) * @param exp the expiration of this object * @param value the new value * @param tc the transcoder to serialize and unserialize the value * @return a CASResponse * @throws OperationTimeoutException if global operation timeout is exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASResponse cas(String key, long casId, int exp, T value, Transcoder tc) { try { return asyncCAS(key, casId, exp, value, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Perform a synchronous CAS operation with the default transcoder. * * @param key the key * @param casId the CAS identifier (from a gets operation) * @param value the new value * @return a CASResponse * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASResponse cas(String key, long casId, Object value) { return cas(key, casId, value, transcoder); } /** * Add an object to the cache iff it does not exist already. * *

* The exp value is passed along to memcached exactly as given, * and will be processed per the memcached protocol specification: *

* *

* Note that the return will be false any time a mutation has not occurred. *

* *
*

* The actual value sent may either be Unix time (number of seconds since * January 1, 1970, as a 32-bit value), or a number of seconds starting from * current time. In the latter case, this number of seconds may not exceed * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client * is larger than that, the server will consider it to be real Unix time value * rather than an offset from current time. *

*
* * @param *

* @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @param tc the transcoder to serialize and unserialize the value * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture add(String key, int exp, T o, Transcoder tc) { return asyncStore(StoreType.add, key, exp, o, tc); } /** * Add an object to the cache (using the default transcoder) iff it does not * exist already. * *

* The exp value is passed along to memcached exactly as given, * and will be processed per the memcached protocol specification: *

* *

* Note that the return will be false any time a mutation has not occurred. *

* *
*

* The actual value sent may either be Unix time (number of seconds since * January 1, 1970, as a 32-bit value), or a number of seconds starting from * current time. In the latter case, this number of seconds may not exceed * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client * is larger than that, the server will consider it to be real Unix time value * rather than an offset from current time. *

*
* * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture add(String key, int exp, Object o) { return asyncStore(StoreType.add, key, exp, o, transcoder); } /** * Set an object in the cache regardless of any existing value. * *

* The exp value is passed along to memcached exactly as given, * and will be processed per the memcached protocol specification: *

* *

* Note that the return will be false any time a mutation has not occurred. *

* *
*

* The actual value sent may either be Unix time (number of seconds since * January 1, 1970, as a 32-bit value), or a number of seconds starting from * current time. In the latter case, this number of seconds may not exceed * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client * is larger than that, the server will consider it to be real Unix time value * rather than an offset from current time. *

*
* * @param * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @param tc the transcoder to serialize and unserialize the value * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture set(String key, int exp, T o, Transcoder tc) { return asyncStore(StoreType.set, key, exp, o, tc); } /** * Set an object in the cache (using the default transcoder) regardless of any * existing value. * *

* The exp value is passed along to memcached exactly as given, * and will be processed per the memcached protocol specification: *

* * Note that the return will be false any time a mutation has not occurred. *

* *
*

* The actual value sent may either be Unix time (number of seconds since * January 1, 1970, as a 32-bit value), or a number of seconds starting from * current time. In the latter case, this number of seconds may not exceed * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client * is larger than that, the server will consider it to be real Unix time value * rather than an offset from current time. *

*
* * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture set(String key, int exp, Object o) { return asyncStore(StoreType.set, key, exp, o, transcoder); } /** * Replace an object with the given value iff there is already a value for the * given key. * *

* The exp value is passed along to memcached exactly as given, * and will be processed per the memcached protocol specification: *

* * Note that the return will be false any time a mutation has not occurred. *

* *
*

* The actual value sent may either be Unix time (number of seconds since * January 1, 1970, as a 32-bit value), or a number of seconds starting from * current time. In the latter case, this number of seconds may not exceed * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client * is larger than that, the server will consider it to be real Unix time value * rather than an offset from current time. *

*
* * @param * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @param tc the transcoder to serialize and unserialize the value * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture replace(String key, int exp, T o, Transcoder tc) { return asyncStore(StoreType.replace, key, exp, o, tc); } /** * Replace an object with the given value (transcoded with the default * transcoder) iff there is already a value for the given key. * *

* The exp value is passed along to memcached exactly as given, * and will be processed per the memcached protocol specification: *

* *

* Note that the return will be false any time a mutation has not occurred. *

* *
*

* The actual value sent may either be Unix time (number of seconds since * January 1, 1970, as a 32-bit value), or a number of seconds starting from * current time. In the latter case, this number of seconds may not exceed * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client * is larger than that, the server will consider it to be real Unix time value * rather than an offset from current time. *

*
* * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture replace(String key, int exp, Object o) { return asyncStore(StoreType.replace, key, exp, o, transcoder); } /** * Get the given key asynchronously. * * @param * @param key the key to fetch * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public GetFuture asyncGet(final String key, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); final GetFuture rv = new GetFuture(latch, operationTimeout, key); Operation op = opFact.get(key, new GetOperation.Callback() { private Future val = null; public void receivedStatus(OperationStatus status) { rv.set(val, status); } public void gotData(String k, int flags, byte[] data) { assert key.equals(k) : "Wrong key returned"; val = tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize())); } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Get the given key asynchronously and decode with the default transcoder. * * @param key the key to fetch * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public GetFuture asyncGet(final String key) { return asyncGet(key, transcoder); } /** * Gets (with CAS support) the given key asynchronously. * * @param * @param key the key to fetch * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture> asyncGets(final String key, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture> rv = new OperationFuture>(key, latch, operationTimeout); Operation op = opFact.gets(key, new GetsOperation.Callback() { private CASValue val = null; public void receivedStatus(OperationStatus status) { rv.set(val, status); } public void gotData(String k, int flags, long cas, byte[] data) { assert key.equals(k) : "Wrong key returned"; assert cas > 0 : "CAS was less than zero: " + cas; val = new CASValue(cas, tc.decode(new CachedData(flags, data, tc.getMaxSize()))); } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Gets (with CAS support) the given key asynchronously and decode using the * default transcoder. * * @param key the key to fetch * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture> asyncGets(final String key) { return asyncGets(key, transcoder); } /** * Gets (with CAS support) with a single key. * * @param * @param key the key to get * @param tc the transcoder to serialize and unserialize value * @return the result from the cache and CAS id (null if there is none) * @throws OperationTimeoutException if global operation timeout is exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASValue gets(String key, Transcoder tc) { try { return asyncGets(key, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Get with a single key and reset its expiration. * * @param * @param key the key to get val = * @param exp the new expiration for the key * @param tc the transcoder to serialize and unserialize value * @return the result from the cache (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASValue getAndTouch(String key, int exp, Transcoder tc) { try { return asyncGetAndTouch(key, exp, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Get a single key and reset its expiration using the default transcoder. * * @param key the key to get * @param exp the new expiration for the key * @return the result from the cache and CAS id (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASValue getAndTouch(String key, int exp) { return getAndTouch(key, exp, transcoder); } /** * Gets (with CAS support) with a single key using the default transcoder. * * @param key the key to get * @return the result from the cache and CAS id (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASValue gets(String key) { return gets(key, transcoder); } /** * Get with a single key. * * @param * @param key the key to get * @param tc the transcoder to serialize and unserialize value * @return the result from the cache (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded return getBulk(Arrays.asList(keys), tc); * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public T get(String key, Transcoder tc) { try { return asyncGet(key, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Get with a single key and decode using the default transcoder. * * @param key the key to get * @return the result from the cache (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Object get(String key) { return get(key, transcoder); } /** * Asynchronously get a bunch of objects from the cache. * * @param * @param keys the keys to request * @param tcIter an iterator of transcoders to serialize and unserialize * values; the transcoders are matched with the keys in the same * order. The minimum of the key collection length and number of * transcoders is used and no exception is thrown if they do not * match * @return a Future result of that fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public BulkFuture> asyncGetBulk(Collection keys, Iterator> tcIter) { final Map> m = new ConcurrentHashMap>(); // This map does not need to be a ConcurrentHashMap // because it is fully populated when it is used and // used only to read the transcoder for a key. final Map> tcMap = new HashMap>(); // Break the gets down into groups by key final Map> chunks = new HashMap>(); final NodeLocator locator = mconn.getLocator(); Iterator keyIter = keys.iterator(); while (keyIter.hasNext() && tcIter.hasNext()) { String key = keyIter.next(); tcMap.put(key, tcIter.next()); validateKey(key); final MemcachedNode primaryNode = locator.getPrimary(key); MemcachedNode node = null; if (primaryNode.isActive()) { node = primaryNode; } else { for (Iterator i = locator.getSequence(key); node == null && i.hasNext();) { MemcachedNode n = i.next(); if (n.isActive()) { node = n; } } if (node == null) { node = primaryNode; } } assert node != null : "Didn't find a node for " + key; Collection ks = chunks.get(node); if (ks == null) { ks = new ArrayList(); chunks.put(node, ks); } ks.add(key); } final CountDownLatch latch = new CountDownLatch(chunks.size()); final Collection ops = new ArrayList(chunks.size()); final BulkGetFuture rv = new BulkGetFuture(m, ops, latch); GetOperation.Callback cb = new GetOperation.Callback() { @SuppressWarnings("synthetic-access") public void receivedStatus(OperationStatus status) { rv.setStatus(status); if (!status.isSuccess()) { getLogger().warn("Unsuccessful get: %s", status); } } public void gotData(String k, int flags, byte[] data) { Transcoder tc = tcMap.get(k); m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()))); } public void complete() { latch.countDown(); } }; // Now that we know how many servers it breaks down into, and the latch // is all set up, convert all of these strings collections to operations final Map mops = new HashMap(); for (Map.Entry> me : chunks.entrySet()) { Operation op = opFact.get(me.getValue(), cb); mops.put(me.getKey(), op); ops.add(op); } assert mops.size() == chunks.size(); mconn.checkState(); mconn.addOperations(mops); return rv; } /** * Asynchronously get a bunch of objects from the cache. * * @param * @param keys the keys to request * @param tc the transcoder to serialize and unserialize values * @return a Future result of that fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public BulkFuture> asyncGetBulk(Collection keys, Transcoder tc) { return asyncGetBulk(keys, new SingleElementInfiniteIterator>( tc)); } /** * Asynchronously get a bunch of objects from the cache and decode them with * the given transcoder. * * @param keys the keys to request * @return a Future result of that fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public BulkFuture> asyncGetBulk(Collection keys) { return asyncGetBulk(keys, transcoder); } /** * Varargs wrapper for asynchronous bulk gets. * * @param * @param tc the transcoder to serialize and unserialize value * @param keys one more more keys to get * @return the future values of those keys * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public BulkFuture> asyncGetBulk(Transcoder tc, String... keys) { return asyncGetBulk(Arrays.asList(keys), tc); } /** * Varargs wrapper for asynchronous bulk gets with the default transcoder. * * @param keys one more more keys to get * @return the future values of those keys * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public BulkFuture> asyncGetBulk(String... keys) { return asyncGetBulk(Arrays.asList(keys), transcoder); } /** * Get the given key to reset its expiration time. * * @param key the key to fetch * @param exp the new expiration to set for the given key * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture> asyncGetAndTouch(final String key, final int exp) { return asyncGetAndTouch(key, exp, transcoder); } /** * Get the given key to reset its expiration time. * * @param key the key to fetch * @param exp the new expiration to set for the given key * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture> asyncGetAndTouch(final String key, final int exp, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture> rv = new OperationFuture>( key, latch, operationTimeout); Operation op = opFact.getAndTouch(key, exp, new GetAndTouchOperation.Callback() { private CASValue val = null; public void receivedStatus(OperationStatus status) { rv.set(val, status); } public void complete() { latch.countDown(); } public void gotData(String k, int flags, long cas, byte[] data) { assert k.equals(key) : "Wrong key returned"; assert cas > 0 : "CAS was less than zero: " + cas; new CASValue(cas, tc.decode(new CachedData(flags, data, tc.getMaxSize()))); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Get the values for multiple keys from the cache. * * @param * @param keys the keys * @param tc the transcoder to serialize and unserialize value * @return a map of the values (for each value that exists) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map getBulk(Collection keys, Transcoder tc) { try { return asyncGetBulk(keys, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted getting bulk values", e); } catch (ExecutionException e) { throw new RuntimeException("Failed getting bulk values", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for bulkvalues", e); } } /** * Get the values for multiple keys from the cache. * * @param keys the keys * @return a map of the values (for each value that exists) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map getBulk(Collection keys) { return getBulk(keys, transcoder); } /** * Get the values for multiple keys from the cache. * * @param * @param tc the transcoder to serialize and unserialize value * @param keys the keys * @return a map of the values (for each value that exists) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map getBulk(Transcoder tc, String... keys) { } /** * Get the values for multiple keys from the cache. * * @param keys the keys * @return a map of the values (for each value that exists) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map getBulk(String... keys) { return getBulk(Arrays.asList(keys), transcoder); } /** * Get the versions of all of the connected memcacheds. * * @return a Map of SocketAddress to String for connected servers * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map getVersions() { final Map rv = new ConcurrentHashMap(); CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { public Operation newOp(final MemcachedNode n, final CountDownLatch latch) { final SocketAddress sa = n.getSocketAddress(); return opFact.version(new OperationCallback() { public void receivedStatus(OperationStatus s) { rv.put(sa, s.getMessage()); } public void complete() { latch.countDown(); } }); } }); try { blatch.await(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for versions", e); } return rv; } /** * Get all of the stats from all of the connections. * * @return a Map of a Map of stats replies by SocketAddress * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map> getStats() { return getStats(null); } /** * Get a set of stats from all connections. * * @param arg which stats to get * @return a Map of the server SocketAddress to a map of String stat keys to * String stat values. * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map> getStats(final String arg) { final Map> rv = new HashMap>(); CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { public Operation newOp(final MemcachedNode n, final CountDownLatch latch) { final SocketAddress sa = n.getSocketAddress(); rv.put(sa, new HashMap()); return opFact.stats(arg, new StatsOperation.Callback() { public void gotStat(String name, String val) { rv.get(sa).put(name, val); } @SuppressWarnings("synthetic-access") public void receivedStatus(OperationStatus status) { if (!status.isSuccess()) { getLogger().warn("Unsuccessful stat fetch: %s", status); } } public void complete() { latch.countDown(); } }); } }); try { blatch.await(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for stats", e); } return rv; } private long mutate(Mutator m, String key, int by, long def, int exp) { final AtomicLong rv = new AtomicLong(); final CountDownLatch latch = new CountDownLatch(1); addOp(key, opFact.mutate(m, key, by, def, exp, new OperationCallback() { public void receivedStatus(OperationStatus s) { // XXX: Potential abstraction leak. // The handling of incr/decr in the binary protocol // Allows us to avoid string processing. rv.set(new Long(s.isSuccess() ? s.getMessage() : "-1")); } public void complete() { latch.countDown(); } })); try { if (!latch.await(operationTimeout, TimeUnit.MILLISECONDS)) { throw new OperationTimeoutException("Mutate operation timed out," + "unable to modify counter [" + key + "]"); } } catch (InterruptedException e) { throw new RuntimeException("Interrupted", e); } getLogger().debug("Mutation returned %s", rv); return rv.get(); } /** * Increment the given key by the given amount. * * Due to the way the memcached server operates on items, incremented and * decremented items will be returned as Strings with any operations that * return a value. * * @param key the key * @param by the amount to increment * @return the new value (-1 if the key doesn't exist) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public long incr(String key, int by) { return mutate(Mutator.incr, key, by, 0, -1); } /** * Decrement the given key by the given value. * * Due to the way the memcached server operates on items, incremented and * decremented items will be returned as Strings with any operations that * return a value. * * @param key the key * @param by the value * @return the new value (-1 if the key doesn't exist) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public long decr(String key, int by) { return mutate(Mutator.decr, key, by, 0, -1); } /** * Increment the given counter, returning the new value. * * Due to the way the memcached server operates on items, incremented and * decremented items will be returned as Strings with any operations that * return a value. * * @param key the key * @param by the amount to increment * @param def the default value (if the counter does not exist) * @param exp the expiration of this object * @return the new value, or -1 if we were unable to increment or add * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public long incr(String key, int by, long def, int exp) { return mutateWithDefault(Mutator.incr, key, by, def, exp); } /** * Decrement the given counter, returning the new value. * * Due to the way the memcached server operates on items, incremented and * decremented items will be returned as Strings with any operations that * return a value. * * @param key the key * @param by the amount to decrement * @param def the default value (if the counter does not exist) * @param exp the expiration of this object * @return the new value, or -1 if we were unable to decrement or add * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public long decr(String key, int by, long def, int exp) { return mutateWithDefault(Mutator.decr, key, by, def, exp); } private long mutateWithDefault(Mutator t, String key, int by, long def, int exp) { long rv = mutate(t, key, by, def, exp); // The ascii protocol doesn't support defaults, so I added them // manually here. if (rv == -1) { Future f = asyncStore(StoreType.add, key, exp, String.valueOf(def)); try { if (f.get(operationTimeout, TimeUnit.MILLISECONDS)) { rv = def; } else { rv = mutate(t, key, by, 0, exp); assert rv != -1 : "Failed to mutate or init value"; } } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for store", e); } catch (ExecutionException e) { throw new RuntimeException("Failed waiting for store", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting to mutate or init" + " value", e); } } return rv; } private OperationFuture asyncMutate(Mutator m, String key, int by, long def, int exp) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); Operation op = addOp(key, opFact.mutate(m, key, by, def, exp, new OperationCallback() { public void receivedStatus(OperationStatus s) { rv.set(new Long(s.isSuccess() ? s.getMessage() : "-1"), s); } public void complete() { latch.countDown(); } })); rv.setOperation(op); return rv; } /** * Asychronous increment. * * @param key key to increment * @param by the amount to increment the value by * @return a future with the incremented value, or -1 if the increment failed. * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture asyncIncr(String key, int by) { return asyncMutate(Mutator.incr, key, by, 0, -1); } /** * Asynchronous decrement. * * @param key key to increment * @param by the amount to increment the value by * @return a future with the decremented value, or -1 if the increment failed. * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture asyncDecr(String key, int by) { return asyncMutate(Mutator.decr, key, by, 0, -1); } /** * Increment the given counter, returning the new value. * * @param key the key * @param by the amount to increment * @param def the default value (if the counter does not exist) * @return the new value, or -1 if we were unable to increment or add * @throws OperationTimeoutException if the global operation timeout is * exceeded } }); // Don't care. * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public long incr(String key, int by, long def) { return mutateWithDefault(Mutator.incr, key, by, def, 0); } /** * Decrement the given counter, returning the new value. * * @param key the key * @param by the amount to decrement * @param def the default value (if the counter does not exist) * @return the new value, or -1 if we were unable to decrement or add * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public long decr(String key, int by, long def) { return mutateWithDefault(Mutator.decr, key, by, def, 0); } /** * Delete the given key from the cache. * *

* The hold argument specifies the amount of time in seconds (or Unix time * until which) the client wishes the server to refuse "add" and "replace" * commands with this key. For this amount of item, the item is put into a * delete queue, which means that it won't possible to retrieve it by the * "get" command, but "add" and "replace" command with this key will also fail * (the "set" command will succeed, however). After the time passes, the item * is finally deleted from server memory. *

* * @param key the key to delete * @param hold how long the key should be unavailable to add commands * * @return whether or not the operation was performed * @deprecated Hold values are no longer honored. */ @Deprecated public OperationFuture delete(String key, int hold) { return delete(key); } /** * Delete the given key from the cache. * * @param key the key to delete * @return whether or not the operation was performed * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture delete(String key) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); DeleteOperation op = opFact.delete(key, new OperationCallback() { public void receivedStatus(OperationStatus s) { rv.set(s.isSuccess(), s); } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Flush all caches from all servers with a delay of application. * * @param delay the period of time to delay, in seconds * @return whether or not the operation was accepted * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture flush(final int delay) { final AtomicReference flushResult = new AtomicReference(null); final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { public Operation newOp(final MemcachedNode n, final CountDownLatch latch) { Operation op = opFact.flush(delay, new OperationCallback() { public void receivedStatus(OperationStatus s) { flushResult.set(s.isSuccess()); } public void complete() { latch.countDown(); ops.add(op); return op; } }); return new OperationFuture(null, blatch, flushResult, operationTimeout) { @Override public boolean cancel(boolean ign) { boolean rv = false; for (Operation op : ops) { op.cancel(); rv |= op.getState() == OperationState.WRITE_QUEUED; } return rv; } @Override public Boolean get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException { status = new OperationStatus(true, "OK"); return super.get(duration, units); } @Override public boolean isCancelled() { boolean rv = false; for (Operation op : ops) { rv |= op.isCancelled(); } return rv; } @Override public boolean isDone() { boolean rv = true; for (Operation op : ops) { rv &= op.getState() == OperationState.COMPLETE; } return rv || isCancelled(); } }; } /** * Flush all caches from all servers immediately. * * @return whether or not the operation was performed * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture flush() { return flush(-1); } public Set listSaslMechanisms() { final ConcurrentMap rv = new ConcurrentHashMap(); CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { public Operation newOp(MemcachedNode n, final CountDownLatch latch) { return opFact.saslMechs(new OperationCallback() { public void receivedStatus(OperationStatus status) { for (String s : status.getMessage().split(" ")) { rv.put(s, s); } } public void complete() { latch.countDown(); } }); } }); try { blatch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return rv.keySet(); } /** * Shut down immediately. */ public void shutdown() { shutdown(-1, TimeUnit.MILLISECONDS); } /** * Shut down this client gracefully. * * @param timeout the amount of time time for shutdown * @param unit the TimeUnit for the timeout * @return result of the shutdown request */ public boolean shutdown(long timeout, TimeUnit unit) { // Guard against double shutdowns (bug 8). if (shuttingDown) { getLogger().info("Suppressing duplicate attempt to shut down"); return false; } shuttingDown = true; String baseName = mconn.getName(); mconn.setName(baseName + " - SHUTTING DOWN"); boolean rv = true; try { // Conditionally wait } if (timeout > 0) { mconn.setName(baseName + " - SHUTTING DOWN (waiting)"); rv = waitForQueues(timeout, unit); } } finally { // But always begin the shutdown sequence try { mconn.setName(baseName + " - SHUTTING DOWN (telling client)"); mconn.shutdown(); mconn.setName(baseName + " - SHUTTING DOWN (informed client)"); tcService.shutdown(); } catch (IOException e) { getLogger().warn("exception while shutting down", e); } } return rv; } /** * Wait for the queues to die down. * * @param timeout the amount of time time for shutdown * @param unit the TimeUnit for the timeout * @return result of the request for the wait * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public boolean waitForQueues(long timeout, TimeUnit unit) { CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { public Operation newOp(final MemcachedNode n, final CountDownLatch latch) { return opFact.noop(new OperationCallback() { public void complete() { latch.countDown(); } public void receivedStatus(OperationStatus s) { // Nothing special when receiving status, only // necessary to complete the interface } }); } }, mconn.getLocator().getAll(), false); try { // XXX: Perhaps IllegalStateException should be caught here // and the check retried. return blatch.await(timeout, unit); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for queues", e); } } /** * Add a connection observer. * * If connections are already established, your observer will be called with * the address and -1. * * @param obs the ConnectionObserver you wish to add * @return true if the observer was added. */ public boolean addObserver(ConnectionObserver obs) { boolean rv = mconn.addObserver(obs); if (rv) { for (MemcachedNode node : mconn.getLocator().getAll()) { if (node.isActive()) { obs.connectionEstablished(node.getSocketAddress(), -1); } } } return rv; } /** * Remove a connection observer. * * @param obs the ConnectionObserver you wish to add * @return true if the observer existed, but no longer does */ public boolean removeObserver(ConnectionObserver obs) { return mconn.removeObserver(obs); } public void connectionEstablished(SocketAddress sa, int reconnectCount) { if (authDescriptor != null) { if (authDescriptor.authThresholdReached()) { this.shutdown(); } authMonitor.authConnection(mconn, opFact, authDescriptor, findNode(sa)); } } private MemcachedNode findNode(SocketAddress sa) { MemcachedNode node = null; for (MemcachedNode n : mconn.getLocator().getAll()) { if (n.getSocketAddress().equals(sa)) { node = n; } } assert node != null : "Couldn't find node connected to " + sa; return node; } public void connectionLost(SocketAddress sa) { ======= public class MemcachedClient extends SpyThread implements MemcachedClientIF, ConnectionObserver, Reconfigurable { private volatile boolean running=true; private volatile boolean shuttingDown=false; private final long operationTimeout; private final MemcachedConnection conn; final OperationFactory opFact; final Transcoder transcoder; final TranscodeService tcService; final AuthDescriptor authDescriptor; private final ConnectionFactory connFactory; private final AuthThreadMonitor authMonitor = new AuthThreadMonitor(); private volatile boolean reconfiguring = false; private ConfigurationProvider configurationProvider; /** * Get a memcache client operating on the specified memcached locations. * * @param ia the memcached locations * @throws IOException if connections cannot be established */ public MemcachedClient(InetSocketAddress... ia) throws IOException { this(new DefaultConnectionFactory(), Arrays.asList(ia)); } /** * Get a memcache client over the specified memcached locations. * * @param addrs the socket addrs /** * @throws IOException if connections cannot be established */ public MemcachedClient(List addrs) throws IOException { this(new DefaultConnectionFactory(), addrs); } /** * Get a memcache client over the specified memcached locations. * * @param cf the connection factory to configure connections for this client * @param addrs the socket addresses * @throws IOException if connections cannot be established */ public MemcachedClient(ConnectionFactory cf, List addrs) throws IOException { if(cf == null) { throw new NullPointerException("Connection factory required"); } if(addrs == null) { throw new NullPointerException("Server list required"); } if(addrs.isEmpty()) { throw new IllegalArgumentException( "You must have at least one server to connect to"); } if(cf.getOperationTimeout() <= 0) { throw new IllegalArgumentException( "Operation timeout must be positive."); } connFactory = cf; tcService = new TranscodeService(cf.isDaemon()); transcoder=cf.getDefaultTranscoder(); opFact=cf.getOperationFactory(); assert opFact != null : "Connection factory failed to make op factory"; conn=cf.createConnection(addrs); assert conn != null : "Connection factory failed to make a connection"; operationTimeout = cf.getOperationTimeout(); authDescriptor = cf.getAuthDescriptor(); if(authDescriptor != null) { addObserver(this); } setName("Memcached IO over " + conn); setDaemon(cf.isDaemon()); start(); } /** * Get a MemcachedClient based on the REST response from a Membase server * where the username is different than the bucket name. * * To connect to the "default" special bucket for a given cluster, use an * empty string as the password. * * If a password has not been assigned to the bucket, it is typically an * empty string. * * @param baseList the URI list of one or more servers from the cluster * @param bucketName the bucket name in the cluster you wish to use * @param usr the username for the bucket; this nearly always be the same * as the bucket name * @param pwd the password for the bucket * @throws IOException if connections could not be made * @throws ConfigurationException if the configuration provided by the * server has issues or is not compatible */ public MemcachedClient(final List baseList, final String bucketName, final String usr, final String pwd) throws IOException, ConfigurationException { this(new BinaryConnectionFactory(), baseList, bucketName, usr, pwd); } /** * Get a MemcachedClient based on the REST response from a Membase server * where the username is different than the bucket name. * * Note that when specifying a ConnectionFactory you must specify a * BinaryConnectionFactory. Also the ConnectionFactory's protocol * and locator values are always overwritten. The protocol will always * be binary and the locator will be chosen based on the bucket type you * are connecting to. * * To connect to the "default" special bucket for a given cluster, use an * empty string as the password. * * If a password has not been assigned to the bucket, it is typically an * empty string. * * @param cf the ConnectionFactory to use to create connections * @param baseList the URI list of one or more servers from the cluster * @param bucketName the bucket name in the cluster you wish to use * @param usr the username for the bucket; this nearly always be the same * as the bucket name * @param pwd the password for the bucket * @throws IOException if connections could not be made * @throws ConfigurationException if the configuration provided by the * server has issues or is not compatible */ public MemcachedClient(ConnectionFactory cf, final List baseList, final String bucketName, final String usr, final String pwd) throws IOException, ConfigurationException { ConnectionFactoryBuilder cfb = new ConnectionFactoryBuilder(cf); for (URI bu : baseList) { if (!bu.isAbsolute()) { throw new IllegalArgumentException("The base URI must be absolute"); } } this.configurationProvider = new ConfigurationProviderHTTP(baseList, usr, pwd); Bucket bucket = this.configurationProvider.getBucketConfiguration(bucketName); Config config = bucket.getConfig(); if (cf != null && !(cf instanceof BinaryConnectionFactory)) { throw new IllegalArgumentException("ConnectionFactory must be of type " + "BinaryConnectionFactory"); } if (config.getConfigType() == ConfigType.MEMBASE) { cfb.setFailureMode(FailureMode.Retry) .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) .setHashAlg(HashAlgorithm.KETAMA_HASH) .setLocatorType(ConnectionFactoryBuilder.Locator.VBUCKET) .setVBucketConfig(bucket.getConfig()); } else if (config.getConfigType() == ConfigType.MEMCACHE) { cfb.setFailureMode(FailureMode.Retry) .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) .setHashAlg(HashAlgorithm.KETAMA_HASH) .setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT); } else { throw new ConfigurationException("Bucket type not supported or JSON response unexpected"); } if (!this.configurationProvider.getAnonymousAuthBucket().equals(bucketName) && usr != null) { AuthDescriptor ad = new AuthDescriptor(new String[]{"PLAIN"}, new PlainCallbackHandler(usr, pwd)); cfb.setAuthDescriptor(ad); } connFactory = cfb.build(); List addrs = AddrUtil.getAddresses(bucket.getConfig().getServers()); if(connFactory == null) { throw new NullPointerException("Connection factory required"); } if(addrs == null) { throw new NullPointerException("Server list required"); } if(addrs.isEmpty()) { throw new IllegalArgumentException( "You must have at least one server to connect to"); } if(connFactory.getOperationTimeout() <= 0) { throw new IllegalArgumentException( "Operation timeout must be positive."); } tcService = new TranscodeService(connFactory.isDaemon()); transcoder=connFactory.getDefaultTranscoder(); opFact=connFactory.getOperationFactory(); assert opFact != null : "Connection factory failed to make op factory"; conn=connFactory.createConnection(addrs); assert conn != null : "Connection factory failed to make a connection"; operationTimeout = connFactory.getOperationTimeout(); authDescriptor = connFactory.getAuthDescriptor(); if(authDescriptor != null) { addObserver(this); } setName("Memcached IO over " + conn); setDaemon(connFactory.isDaemon()); this.configurationProvider.subscribe(bucketName, this); start(); } /** * Get a MemcachedClient based on the REST response from a Membase server. * * This constructor is merely a convenience for situations where the bucket * name is the same as the user name. This is commonly the case. * * To connect to the "default" special bucket for a given cluster, use an * empty string as the password. * * If a password has not been assigned to the bucket, it is typically an * empty string. * * @param baseList the URI list of one or more servers from the cluster * @param bucketName the bucket name in the cluster you wish to use * @param pwd the password for the bucket * @throws IOException if connections could not be made * @throws ConfigurationException if the configuration provided by the * server has issues or is not compatible */ public MemcachedClient(List baseList, String bucketName, String pwd) throws IOException, ConfigurationException { this(baseList, bucketName, bucketName, pwd); } public void reconfigure(Bucket bucket) { reconfiguring = true; try { conn.reconfigure(bucket); } catch (IllegalArgumentException ex) { getLogger().warn("Failed to reconfigure client, staying with previous configuration.", ex); } finally { reconfiguring = false; } } /** * Get the addresses of available servers. * *

* This is based on a snapshot in time so shouldn't be considered * completely accurate, but is a useful for getting a feel for what's * working and what's not working. *

* * @return point-in-time view of currently available servers */ public Collection getAvailableServers() { ArrayList rv=new ArrayList(); for(MemcachedNode node : conn.getLocator().getAll()) { if(node.isActive()) { rv.add(node.getSocketAddress()); } } return rv; } /** * Get the addresses of unavailable servers. * *

* This is based on a snapshot in time so shouldn't be considered * completely accurate, but is a useful for getting a feel for what's * working and what's not working. *

* * @return point-in-time view of currently available servers */ public Collection getUnavailableServers() { ArrayList rv=new ArrayList(); for(MemcachedNode node : conn.getLocator().getAll()) { if(!node.isActive()) { rv.add(node.getSocketAddress()); } } return rv; } /** * Get a read-only wrapper around the node locator wrapping this instance. * * @return this instance's NodeLocator */ public NodeLocator getNodeLocator() { return conn.getLocator().getReadonlyCopy(); } /** * Get the default transcoder that's in use. * * @return this instance's Transcoder */ public Transcoder getTranscoder() { return transcoder; } private void validateKey(String key) { byte[] keyBytes=KeyUtil.getKeyBytes(key); if(keyBytes.length > MAX_KEY_LENGTH) { throw new IllegalArgumentException("Key is too long (maxlen = " + MAX_KEY_LENGTH + ")"); } if(keyBytes.length == 0) { throw new IllegalArgumentException( "Key must contain at least one character."); } // Validate the key for(byte b : keyBytes) { if(b == ' ' || b == '\n' || b == '\r' || b == 0) { throw new IllegalArgumentException( "Key contains invalid characters: ``" + key + "''"); } } } private void checkState() { if(shuttingDown) { throw new IllegalStateException("Shutting down"); } assert isAlive() : "IO Thread is not running."; } /** * (internal use) Add a raw operation to a numbered connection. * This method is exposed for testing. * * @param which server number * @param op the operation to perform * @return the Operation */ Operation addOp(final String key, final Operation op) { validateKey(key); checkState(); conn.addOperation(key, op); return op; } CountDownLatch broadcastOp(final BroadcastOpFactory of) { return broadcastOp(of, conn.getLocator().getAll(), true); } CountDownLatch broadcastOp(final BroadcastOpFactory of, Collection nodes) { return broadcastOp(of, nodes, true); } private CountDownLatch broadcastOp(BroadcastOpFactory of, Collection nodes, boolean checkShuttingDown) { if(checkShuttingDown && shuttingDown) { throw new IllegalStateException("Shutting down"); } return conn.broadcastOperation(of, nodes); } private OperationFuture asyncStore(StoreType storeType, String key, int exp, T value, Transcoder tc) { CachedData co=tc.encode(value); final CountDownLatch latch=new CountDownLatch(1); final OperationFuture rv=new OperationFuture(key, latch, operationTimeout); Operation op=opFact.store(storeType, key, co.getFlags(), exp, co.getData(), new OperationCallback() { public void receivedStatus(OperationStatus val) { rv.set(val.isSuccess(), val); } public void complete() { latch.countDown(); }}); rv.setOperation(op); addOp(key, op); return rv; } private OperationFuture asyncStore(StoreType storeType, String key, int exp, Object value) { return asyncStore(storeType, key, exp, value, transcoder); } private OperationFuture asyncCat( ConcatenationType catType, long cas, String key, T value, Transcoder tc) { CachedData co=tc.encode(value); final CountDownLatch latch=new CountDownLatch(1); final OperationFuture rv=new OperationFuture(key, latch, operationTimeout); Operation op=opFact.cat(catType, cas, key, co.getData(), new OperationCallback() { public void receivedStatus(OperationStatus val) { rv.set(val.isSuccess(), val); } public void complete() { latch.countDown(); }}); rv.setOperation(op); addOp(key, op); return rv; } /** * Touch the given key to reset its expiration time with the default * transcoder. * * @param key the key to fetch * @param exp the new expiration to set for the given key * @return a future that will hold the return value of whether or not * the fetch succeeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture touch(final String key, final int exp) { return touch(key, exp, transcoder); } /** * Touch the given key to reset its expiration time. * * @param key the key to fetch * @param exp the new expiration to set for the given key * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of whether or not * the fetch succeeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture touch(final String key, final int exp, final Transcoder tc) { final CountDownLatch latch=new CountDownLatch(1); final OperationFuture rv=new OperationFuture(key, latch, operationTimeout); Operation op=opFact.touch(key, exp, new OperationCallback() { public void receivedStatus(OperationStatus status) { rv.set(status.isSuccess(), status); } public void complete() { latch.countDown(); }}); rv.setOperation(op); addOp(key, op); return rv; } /** * Append to an existing value in the cache. * *

Note that the return will be false any time a mutation has not * occurred.

* * @param cas cas identifier (ignored in the ascii protocol) * @param key the key to whose value will be appended * @param val the value to append * @return a future indicating success, false if there was no change * to the value * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture append(long cas, String key, Object val) { return append(cas, key, val, transcoder); } /** * Append to an existing value in the cache. * *

Note that the return will be false any time a mutation has not * occurred.

* * @param * @param cas cas identifier (ignored in the ascii protocol) * @param key the key to whose value will be appended * @param val the value to append * @param tc the transcoder to serialize and unserialize the value * @return a future indicating success * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture append(long cas, String key, T val, Transcoder tc) { return asyncCat(ConcatenationType.append, cas, key, val, tc); } /** * Prepend to an existing value in the cache. * *

Note that the return will be false any time a mutation has not * occurred.

* * @param cas cas identifier (ignored in the ascii protocol) * @param key the key to whose value will be prepended * @param val the value to append * @return a future indicating success *

* @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture prepend(long cas, String key, Object val) { return prepend(cas, key, val, transcoder); } /** * Prepend to an existing value in the cache. * *

Note that the return will be false any time a mutation has not * occurred.

* * @param * @param cas cas identifier (ignored in the ascii protocol) * @param key the key to whose value will be prepended * @param val the value to append * @param tc the transcoder to serialize and unserialize the value * @return a future indicating success * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture prepend(long cas, String key, T val, Transcoder tc) { return asyncCat(ConcatenationType.prepend, cas, key, val, tc); } /** * Asynchronous CAS operation. * * @param * @param key the key * @param casId the CAS identifier (from a gets operation) * @param value the new value * @param tc the transcoder to serialize and unserialize the value * @return a future that will indicate the status of the CAS * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public Future asyncCAS(String key, long casId, T value, Transcoder tc) { return asyncCAS(key, casId, 0, value, tc); } /** * Asynchronous CAS operation. * * @param * @param key the key * @param casId the CAS identifier (from a gets operation) * @param exp the expiration of this object * @param value the new value * @param tc the transcoder to serialize and unserialize the value * @return a future that will indicate the status of the CAS * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public Future asyncCAS(String key, long casId, int exp, T value, Transcoder tc) { CachedData co=tc.encode(value); final CountDownLatch latch=new CountDownLatch(1); final OperationFuture rv=new OperationFuture( key, latch, operationTimeout); Operation op=opFact.cas(StoreType.set, key, casId, co.getFlags(), exp, co.getData(), new OperationCallback() { public void receivedStatus(OperationStatus val) { if(val instanceof CASOperationStatus) { rv.set(((CASOperationStatus)val).getCASResponse(), val); } else if(val instanceof CancelledOperationStatus) { // Cancelled, ignore and let it float up } else { throw new RuntimeException( "Unhandled state: " + val); } } public void complete() { latch.countDown(); }}); rv.setOperation(op); addOp(key, op); return rv; } /** * Asynchronous CAS operation using the default transcoder. * * @param key the key * @param casId the CAS identifier (from a gets operation) * @param value the new value * @return a future that will indicate the status of the CAS * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public Future asyncCAS(String key, long casId, Object value) { return asyncCAS(key, casId, value, transcoder); } /** * Perform a synchronous CAS operation. * * @param * @param key the key * @param casId the CAS identifier (from a gets operation) * @param value the new value * @param tc the transcoder to serialize and unserialize the value * @return a CASResponse * @throws OperationTimeoutException if global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public CASResponse cas(String key, long casId, T value, Transcoder tc) { return cas(key, casId, 0, value, tc); } /** * Perform a synchronous CAS operation. * * @param * @param key the key * @param casId the CAS identifier (from a gets operation) * @param exp the expiration of this object * @param value the new value * @param tc the transcoder to serialize and unserialize the value * @return a CASResponse * @throws OperationTimeoutException if global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public CASResponse cas(String key, long casId, int exp, T value, Transcoder tc) { try { return asyncCAS(key, casId, exp, value, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Perform a synchronous CAS operation with the default transcoder. * * @param key the key * @param casId the CAS identifier (from a gets operation) * @param value the new value * @return a CASResponse * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public CASResponse cas(String key, long casId, Object value) { return cas(key, casId, value, transcoder); } /** * Add an object to the cache iff it does not exist already. * *

* The exp value is passed along to memcached exactly as * given, and will be processed per the memcached protocol specification: *

* *

Note that the return will be false any time a mutation has not * occurred.

* *
*

* The actual value sent may either be * Unix time (number of seconds since January 1, 1970, as a 32-bit * value), or a number of seconds starting from current time. In the * latter case, this number of seconds may not exceed 60*60*24*30 (number * of seconds in 30 days); if the number sent by a client is larger than * that, the server will consider it to be real Unix time value rather * than an offset from current time. *

*
* * @param * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @param tc the transcoder to serialize and unserialize the value * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture add(String key, int exp, T o, Transcoder tc) { * @return a future representing the processing of this operation return asyncStore(StoreType.add, key, exp, o, tc); } /** * Add an object to the cache (using the default transcoder) * iff it does not exist already. * *

* The exp value is passed along to memcached exactly as * given, and will be processed per the memcached protocol specification: *

* *

Note that the return will be false any time a mutation has not * occurred.

* *
*

* The actual value sent may either be * Unix time (number of seconds since January 1, 1970, as a 32-bit * value), or a number of seconds starting from current time. In the * latter case, this number of seconds may not exceed 60*60*24*30 (number * of seconds in 30 days); if the number sent by a client is larger than * that, the server will consider it to be real Unix time value rather * than an offset from current time. *

*
* * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture add(String key, int exp, Object o) { return asyncStore(StoreType.add, key, exp, o, transcoder); } /** * Set an object in the cache regardless of any existing value. * *

* The exp value is passed along to memcached exactly as * given, and will be processed per the memcached protocol specification: *

* *

Note that the return will be false any time a mutation has not * occurred.

* *
*

* The actual value sent may either be * Unix time (number of seconds since January 1, 1970, as a 32-bit * value), or a number of seconds starting from current time. In the * latter case, this number of seconds may not exceed 60*60*24*30 (number * of seconds in 30 days); if the number sent by a client is larger than * that, the server will consider it to be real Unix time value rather * than an offset from current time. *

*
* * @param * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @param tc the transcoder to serialize and unserialize the value * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture set(String key, int exp, T o, Transcoder tc) { return asyncStore(StoreType.set, key, exp, o, tc); } /** * Set an object in the cache (using the default transcoder) * regardless of any existing value. * *

* The exp value is passed along to memcached exactly as * given, and will be processed per the memcached protocol specification: *

* *

Note that the return will be false any time a mutation has not * occurred.

* *
*

* The actual value sent may either be * Unix time (number of seconds since January 1, 1970, as a 32-bit * value), or a number of seconds starting from current time. In the * latter case, this number of seconds may not exceed 60*60*24*30 (number * of seconds in 30 days); if the number sent by a client is larger than * that, the server will consider it to be real Unix time value rather * than an offset from current time. *

* * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture set(String key, int exp, Object o) { return asyncStore(StoreType.set, key, exp, o, transcoder); } /** * Replace an object with the given value iff there is already a value * for the given key. * *

* The exp value is passed along to memcached exactly as * given, and will be processed per the memcached protocol specification: *

* *

Note that the return will be false any time a mutation has not * occurred.

* *
*

* The actual value sent may either be * Unix time (number of seconds since January 1, 1970, as a 32-bit * value), or a number of seconds starting from current time. In the * latter case, this number of seconds may not exceed 60*60*24*30 (number * of seconds in 30 days); if the number sent by a client is larger than * that, the server will consider it to be real Unix time value rather * than an offset from current time. *

*
* * @param * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @param tc the transcoder to serialize and unserialize the value * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture replace(String key, int exp, T o, Transcoder tc) { return asyncStore(StoreType.replace, key, exp, o, tc); } /** * Replace an object with the given value (transcoded with the default * transcoder) iff there is already a value for the given key. * *

* The exp value is passed along to memcached exactly as * given, and will be processed per the memcached protocol specification: *

* *

Note that the return will be false any time a mutation has not * occurred.

* *
*

* The actual value sent may either be * Unix time (number of seconds since January 1, 1970, as a 32-bit * value), or a number of seconds starting from current time. In the * latter case, this number of seconds may not exceed 60*60*24*30 (number * of seconds in 30 days); if the number sent by a client is larger than * that, the server will consider it to be real Unix time value rather * than an offset from current time. *

*
* * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture replace(String key, int exp, Object o) { return asyncStore(StoreType.replace, key, exp, o, transcoder); } /** * Get the given key asynchronously. * * @param * @param key the key to fetch * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public GetFuture asyncGet(final String key, final Transcoder tc) { final CountDownLatch latch=new CountDownLatch(1); final GetFuture rv=new GetFuture(latch, operationTimeout, key); Operation op=opFact.get(key, new GetOperation.Callback() { private Future val=null; public void receivedStatus(OperationStatus status) { rv.set(val, status); } public void gotData(String k, int flags, byte[] data) { assert key.equals(k) : "Wrong key returned"; val=tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize())); } public void complete() { latch.countDown(); }}); rv.setOperation(op); addOp(key, op); return rv; } /** * Get the given key asynchronously and decode with the default * transcoder. * * @param key the key to fetch * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public GetFuture asyncGet(final String key) { return asyncGet(key, transcoder); } /** * Gets (with CAS support) the given key asynchronously. * * @param * @param key the key to fetch * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture> asyncGets(final String key, final Transcoder tc) { final CountDownLatch latch=new CountDownLatch(1); final OperationFuture> rv= new OperationFuture>(key, latch, operationTimeout); Operation op=opFact.gets(key, new GetsOperation.Callback() { private CASValue val=null; public void receivedStatus(OperationStatus status) { rv.set(val, status); } public void gotData(String k, int flags, long cas, byte[] data) { assert key.equals(k) : "Wrong key returned"; assert cas > 0 : "CAS was less than zero: " + cas; val=new CASValue(cas, tc.decode( new CachedData(flags, data, tc.getMaxSize()))); } public void complete() { latch.countDown(); }}); rv.setOperation(op); addOp(key, op); return rv; } /** * Gets (with CAS support) the given key asynchronously and decode using * the default transcoder. * * @param key the key to fetch * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture> asyncGets(final String key) { return asyncGets(key, transcoder); } /** * Gets (with CAS support) with a single key. * * @param * @param key the key to get * @param tc the transcoder to serialize and unserialize value * @return the result from the cache and CAS id (null if there is none) * @throws OperationTimeoutException if global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public CASValue gets(String key, Transcoder tc) { try { return asyncGets(key, tc).get( operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Get with a single key and reset its expiration. * * @param * @param key the key to get * @param exp the new expiration for the key * @param tc the transcoder to serialize and unserialize value * @return the result from the cache (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public CASValue getAndTouch(String key, int exp, Transcoder tc) { try { return asyncGetAndTouch(key, exp, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Get a single key and reset its expiration using the default transcoder. * * @param key the key to get * @param exp the new expiration for the key * @return the result from the cache and CAS id (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public CASValue getAndTouch(String key, int exp) { return getAndTouch(key, exp, transcoder); } /** * Gets (with CAS support) with a single key using the default transcoder. * * @param key the key to get * @return the result from the cache and CAS id (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public CASValue gets(String key) { return gets(key, transcoder); } /** * Get with a single key. * * @param * @param key the key to get * @param tc the transcoder to serialize and unserialize value * @return the result from the cache (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public T get(String key, Transcoder tc) { try { return asyncGet(key, tc).get( operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Get with a single key and decode using the default transcoder. * * @param key the key to get * @return the result from the cache (null if there is none) validateKey(key); * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public Object get(String key) { return get(key, transcoder); } /** * Gets and locks the given key asynchronously. By default the maximum allowed * timeout is 30 seconds. Timeouts greater than this will be set to 30 seconds. * * @param key the key to fetch and lock * @param exp the amount of time the lock should be valid for in seconds. * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture> asyncGetAndLock(final String key, int exp, final Transcoder tc) { final CountDownLatch latch=new CountDownLatch(1); final OperationFuture> rv= new OperationFuture>(key, latch, operationTimeout); Operation op=opFact.getl(key, exp, new GetlOperation.Callback() { private CASValue val=null; public void receivedStatus(OperationStatus status) { rv.set(val, status); } public void gotData(String k, int flags, long cas, byte[] data) { assert key.equals(k) : "Wrong key returned"; assert cas > 0 : "CAS was less than zero: " + cas; val=new CASValue(cas, tc.decode( new CachedData(flags, data, tc.getMaxSize()))); } public void complete() { latch.countDown(); }}); rv.setOperation(op); addOp(key, op); return rv; } /** * Get and lock the given key asynchronously and decode with the default * transcoder. By default the maximum allowed timeout is 30 seconds. * Timeouts greater than this will be set to 30 seconds. * * @param key the key to fetch and lock * @param exp the amount of time the lock should be valid for in seconds. * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture> asyncGetAndLock(final String key, int exp) { return asyncGetAndLock(key, exp, transcoder); } /** * Asynchronously get a bunch of objects from the cache. * * @param * @param keys the keys to request * @param tc_iter an iterator of transcoders to serialize and * unserialize values; the transcoders are matched with * the keys in the same order. The minimum of the key * collection length and number of transcoders is used * and no exception is thrown if they do not match * @return a Future result of that fetch * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public BulkFuture> asyncGetBulk(Collection keys, Iterator> tc_iter) { final Map> m=new ConcurrentHashMap>(); // This map does not need to be a ConcurrentHashMap // because it is fully populated when it is used and // used only to read the transcoder for a key. final Map> tc_map = new HashMap>(); // Break the gets down into groups by key final Map> chunks =new HashMap>(); final NodeLocator locator=conn.getLocator(); Iterator key_iter=keys.iterator(); while (key_iter.hasNext() && tc_iter.hasNext()) { String key=key_iter.next(); tc_map.put(key, tc_iter.next()); final MemcachedNode primaryNode=locator.getPrimary(key); MemcachedNode node=null; if(primaryNode.isActive()) { node=primaryNode; } else { for(Iterator i=locator.getSequence(key); node == null && i.hasNext();) { MemcachedNode n=i.next(); if(n.isActive()) { node=n; } } if(node == null) { node=primaryNode; } } assert node != null : "Didn't find a node for " + key; Collection ks=chunks.get(node); if(ks == null) { ks=new ArrayList(); chunks.put(node, ks); } ks.add(key); } final CountDownLatch latch=new CountDownLatch(chunks.size()); final Collection ops=new ArrayList(chunks.size()); final BulkGetFuture rv = new BulkGetFuture(m, ops, latch); GetOperation.Callback cb=new GetOperation.Callback() { @SuppressWarnings("synthetic-access") public void receivedStatus(OperationStatus status) { rv.setStatus(status); } public void gotData(String k, int flags, byte[] data) { Transcoder tc = tc_map.get(k); m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()))); } public void complete() { latch.countDown(); } }; // Now that we know how many servers it breaks down into, and the latch // is all set up, convert all of these strings collections to operations final Map mops= new HashMap(); for(Map.Entry> me : chunks.entrySet()) { Operation op=opFact.get(me.getValue(), cb); mops.put(me.getKey(), op); ops.add(op); } assert mops.size() == chunks.size(); checkState(); conn.addOperations(mops); return rv; } /** * Asynchronously get a bunch of objects from the cache. * * @param * @param keys the keys to request * @param tc the transcoder to serialize and unserialize values * @return a Future result of that fetch * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public BulkFuture> asyncGetBulk(Collection keys, Transcoder tc) { return asyncGetBulk(keys, new SingleElementInfiniteIterator>(tc)); } /** * Asynchronously get a bunch of objects from the cache and decode them * with the given transcoder. * * @param keys the keys to request * @return a Future result of that fetch * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public BulkFuture> asyncGetBulk(Collection keys) { return asyncGetBulk(keys, transcoder); } /** * Varargs wrapper for asynchronous bulk gets. * * @param * @param tc the transcoder to serialize and unserialize value * @param keys one more more keys to get * @return the future values of those keys * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public BulkFuture> asyncGetBulk(Transcoder tc, String... keys) { return asyncGetBulk(Arrays.asList(keys), tc); } /** * Varargs wrapper for asynchronous bulk gets with the default transcoder. * * @param keys one more more keys to get * @return the future values of those keys * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public BulkFuture> asyncGetBulk(String... keys) { return asyncGetBulk(Arrays.asList(keys), transcoder); } /** * Get the given key to reset its expiration time. * * @param key the key to fetch * @param exp the new expiration to set for the given key * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture> asyncGetAndTouch(final String key, final int exp) { return asyncGetAndTouch(key, exp, transcoder); } /** * Get the given key to reset its expiration time. * * @param key the key to fetch * @param exp the new expiration to set for the given key * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture> asyncGetAndTouch(final String key, final int exp, final Transcoder tc) { final CountDownLatch latch=new CountDownLatch(1); final OperationFuture> rv=new OperationFuture>(key, latch, operationTimeout); Operation op=opFact.getAndTouch(key, exp, new GetAndTouchOperation.Callback() { private CASValue val=null; public void receivedStatus(OperationStatus status) { rv.set(val, status); } public void complete() { latch.countDown(); } public void gotData(String k, int flags, long cas, byte[] data) { assert k.equals(key) : "Wrong key returned"; assert cas > 0 : "CAS was less than zero: " + cas; val=new CASValue(cas, tc.decode( new CachedData(flags, data, tc.getMaxSize()))); }}); rv.setOperation(op); addOp(key, op); return rv; } /** * Get the values for multiple keys from the cache. * * @param * @param keys the keys * @param tc the transcoder to serialize and unserialize value * @return a map of the values (for each value that exists) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public Map getBulk(Collection keys, Transcoder tc) { try { return asyncGetBulk(keys, tc).get( operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted getting bulk values", e); } catch (ExecutionException e) { throw new RuntimeException("Failed getting bulk values", e); } catch (TimeoutException e) { throw new OperationTimeoutException( "Timeout waiting for bulkvalues", e); } } /** * Get the values for multiple keys from the cache. * * @param keys the keys * @return a map of the values (for each value that exists) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests * */ public Map getBulk(Collection keys) { return getBulk(keys, transcoder); } /** * Get the values for multiple keys from the cache. * * @param * @param tc the transcoder to serialize and unserialize value * @param keys the keys * @return a map of the values (for each value that exists) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public Map getBulk(Transcoder tc, String... keys) { return getBulk(Arrays.asList(keys), tc); } /** * Get the values for multiple keys from the cache. * * @param keys the keys * @return a map of the values (for each value that exists) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public Map getBulk(String... keys) { return getBulk(Arrays.asList(keys), transcoder); } /** * Get the versions of all of the connected memcacheds. * * @return a Map of SocketAddress to String for connected servers * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public Map getVersions() { final Maprv= new ConcurrentHashMap(); CountDownLatch blatch = broadcastOp(new BroadcastOpFactory(){ public Operation newOp(final MemcachedNode n, final CountDownLatch latch) { final SocketAddress sa=n.getSocketAddress(); return opFact.version( new OperationCallback() { public void receivedStatus(OperationStatus s) { rv.put(sa, s.getMessage()); } public void complete() { latch.countDown(); } }); }}); try { blatch.await(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for versions", e); } return rv; } /** * Get all of the stats from all of the connections. * * @return a Map of a Map of stats replies by SocketAddress * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public Map> getStats() { return getStats(null); } /** * Get a set of stats from all connections. * * @param arg which stats to get * @return a Map of the server SocketAddress to a map of String stat * keys to String stat values. * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public Map> getStats(final String arg) { final Map> rv =new HashMap>(); CountDownLatch blatch = broadcastOp(new BroadcastOpFactory(){ public Operation newOp(final MemcachedNode n, final CountDownLatch latch) { final SocketAddress sa=n.getSocketAddress(); rv.put(sa, new HashMap()); return opFact.stats(arg, new StatsOperation.Callback() { * @param key the key public void gotStat(String name, String val) { rv.get(sa).put(name, val); } @SuppressWarnings("synthetic-access") // getLogger() public void receivedStatus(OperationStatus status) { if(!status.isSuccess()) { getLogger().warn("Unsuccessful stat fetch: %s", status); } } public void complete() { latch.countDown(); }}); }}); try { blatch.await(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for stats", e); } return rv; } private long mutate(Mutator m, String key, int by, long def, int exp) { final AtomicLong rv=new AtomicLong(); final CountDownLatch latch=new CountDownLatch(1); addOp(key, opFact.mutate(m, key, by, def, exp, new OperationCallback() { public void receivedStatus(OperationStatus s) { // XXX: Potential abstraction leak. // The handling of incr/decr in the binary protocol // Allows us to avoid string processing. rv.set(new Long(s.isSuccess()?s.getMessage():"-1")); } public void complete() { latch.countDown(); }})); try { if (!latch.await(operationTimeout, TimeUnit.MILLISECONDS)) { throw new OperationTimeoutException( "Mutate operation timed out, unable to modify counter [" + key + "]"); } } catch (InterruptedException e) { throw new RuntimeException("Interrupted", e); } getLogger().debug("Mutation returned %s", rv); return rv.get(); } /** * Getl with a single key. By default the maximum allowed timeout is 30 * seconds. Timeouts greater than this will be set to 30 seconds. * * @param key the key to get and lock * @param exp the amount of time the lock should be valid for in seconds. * @param tc the transcoder to serialize and unserialize value * @return the result from the cache (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public CASValue getAndLock(String key, int exp, Transcoder tc) { try { return asyncGetAndLock(key, exp, tc).get( operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Get and lock with a single key and decode using the default transcoder. * By default the maximum allowed timeout is 30 seconds. Timeouts greater * than this will be set to 30 seconds. * @param key the key to get and lock * @param exp the amount of time the lock should be valid for in seconds. * @return the result from the cache (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public CASValue getAndLock(String key, int exp) { return getAndLock(key, exp, transcoder); } /** * Increment the given key by the given amount. * * Due to the way the memcached server operates on items, incremented * and decremented items will be returned as Strings with any * operations that return a value. * @param by the amount to increment * @return the new value (-1 if the key doesn't exist) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public long incr(String key, int by) { return mutate(Mutator.incr, key, by, 0, -1); } /** * Decrement the given key by the given value. * * Due to the way the memcached server operates on items, incremented * and decremented items will be returned as Strings with any * operations that return a value. * * @param key the key * @param by the value * @return the new value (-1 if the key doesn't exist) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public long decr(String key, int by) { return mutate(Mutator.decr, key, by, 0, -1); } /** * Increment the given counter, returning the new value. * * Due to the way the memcached server operates on items, incremented * and decremented items will be returned as Strings with any * operations that return a value. * * @param key the key * @param by the amount to increment * @param def the default value (if the counter does not exist) * @param exp the expiration of this object * @return the new value, or -1 if we were unable to increment or add * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public long incr(String key, int by, long def, int exp) { return mutateWithDefault(Mutator.incr, key, by, def, exp); } /** * Decrement the given counter, returning the new value. * * Due to the way the memcached server operates on items, incremented * and decremented items will be returned as Strings with any * operations that return a value. * * @param key the key * @param by the amount to decrement * @param def the default value (if the counter does not exist) * @param exp the expiration of this object * @return the new value, or -1 if we were unable to decrement or add * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public long decr(String key, int by, long def, int exp) { return mutateWithDefault(Mutator.decr, key, by, def, exp); } private long mutateWithDefault(Mutator t, String key, int by, long def, int exp) { long rv=mutate(t, key, by, def, exp); // The ascii protocol doesn't support defaults, so I added them // manually here. if(rv == -1) { Future f=asyncStore(StoreType.add, key, exp, String.valueOf(def)); try { if(f.get(operationTimeout, TimeUnit.MILLISECONDS)) { rv=def; } else { rv=mutate(t, key, by, 0, exp); assert rv != -1 : "Failed to mutate or init value"; } } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for store", e); } catch (ExecutionException e) { throw new RuntimeException("Failed waiting for store", e); } catch (TimeoutException e) { throw new OperationTimeoutException( "Timeout waiting to mutate or init value", e); } } return rv; } private OperationFuture asyncMutate(Mutator m, String key, int by, long def, int exp) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); Operation op = addOp(key, opFact.mutate(m, key, by, def, exp, new OperationCallback() { public void receivedStatus(OperationStatus s) { rv.set(new Long(s.isSuccess() ? s.getMessage() : "-1"), s); } public void complete() { latch.countDown(); } })); rv.setOperation(op); return rv; } /** * Asychronous increment. * * @param key key to increment * @param by the amount to increment the value by * @return a future with the incremented value, or -1 if the * increment failed. * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture asyncIncr(String key, int by) { return asyncMutate(Mutator.incr, key, by, 0, -1); } * Asynchronous decrement. * * @param key key to increment * @param by the amount to increment the value by * @return a future with the decremented value, or -1 if the * increment failed. * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture asyncDecr(String key, int by) { return asyncMutate(Mutator.decr, key, by, 0, -1); } /** * Increment the given counter, returning the new value. * * @param key the key * @param by the amount to increment * @param def the default value (if the counter does not exist) * @return the new value, or -1 if we were unable to increment or add * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public long incr(String key, int by, long def) { return mutateWithDefault(Mutator.incr, key, by, def, 0); } /** * Decrement the given counter, returning the new value. * * @param key the key * @param by the amount to decrement * @param def the default value (if the counter does not exist) * @return the new value, or -1 if we were unable to decrement or add * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public long decr(String key, int by, long def) { return mutateWithDefault(Mutator.decr, key, by, def, 0); } /** * Delete the given key from the cache. * *

* The hold argument specifies the amount of time in seconds (or Unix time * until which) the client wishes the server to refuse "add" and "replace" * commands with this key. For this amount of item, the item is put into a * delete queue, which means that it won't possible to retrieve it by the * "get" command, but "add" and "replace" command with this key will also * fail (the "set" command will succeed, however). After the time passes, * the item is finally deleted from server memory. *

* * @param key the key to delete * @param hold how long the key should be unavailable to add commands * * @return whether or not the operation was performed * @deprecated Hold values are no longer honored. */ @Deprecated public OperationFuture delete(String key, int hold) { return delete(key); } /** * Delete the given key from the cache. * * @param key the key to delete * @return whether or not the operation was performed * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture delete(String key) { final CountDownLatch latch=new CountDownLatch(1); final OperationFuture rv=new OperationFuture(key, latch, operationTimeout); DeleteOperation op=opFact.delete(key, new OperationCallback() { public void receivedStatus(OperationStatus s) { rv.set(s.isSuccess(), s); } public void complete() { latch.countDown(); }}); rv.setOperation(op); addOp(key, op); return rv; } /** * Flush all caches from all servers with a delay of application. * @param delay the period of time to delay, in seconds * @return whether or not the operation was accepted * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture flush(final int delay) { final AtomicReference flushResult= new AtomicReference(null); final ConcurrentLinkedQueue ops= new ConcurrentLinkedQueue(); CountDownLatch blatch = broadcastOp(new BroadcastOpFactory(){ public Operation newOp(final MemcachedNode n, final CountDownLatch latch) { Operation op=opFact.flush(delay, new OperationCallback(){ public void receivedStatus(OperationStatus s) { flushResult.set(s.isSuccess()); } public void complete() { latch.countDown(); }}); ops.add(op); return op; }}); return new OperationFuture(null, blatch, flushResult, operationTimeout) { @Override public boolean cancel(boolean ign) { boolean rv=false; for(Operation op : ops) { op.cancel(); rv |= op.getState() == OperationState.WRITE_QUEUED; } return rv; } @Override public Boolean get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException { status = new OperationStatus(true, "OK"); return super.get(duration, units); } @Override public boolean isCancelled() { boolean rv=false; for(Operation op : ops) { rv |= op.isCancelled(); } return rv; } @Override public boolean isDone() { boolean rv=true; for(Operation op : ops) { rv &= op.getState() == OperationState.COMPLETE; } return rv || isCancelled(); } }; } /** * Flush all caches from all servers immediately. * @return whether or not the operation was performed * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public OperationFuture flush() { return flush(-1); } public Set listSaslMechanisms() { final ConcurrentMap rv = new ConcurrentHashMap(); CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { public Operation newOp(MemcachedNode n, final CountDownLatch latch) { return opFact.saslMechs(new OperationCallback() { public void receivedStatus(OperationStatus status) { for(String s : status.getMessage().split(" ")) { rv.put(s, s); } } public void complete() { latch.countDown(); } }); } }); try { blatch.await(); } catch(InterruptedException e) { Thread.currentThread().interrupt(); } return rv.keySet(); } private void logRunException(Exception e) { if(shuttingDown) { // There are a couple types of errors that occur during the // shutdown sequence that are considered OK. Log at debug. getLogger().debug("Exception occurred during shutdown", e); } else { getLogger().warn("Problem handling memcached IO", e); } } /** * Infinitely loop processing IO. */ @Override public void run() { while(running) { if (!reconfiguring) { try { conn.handleIO(); } catch (IOException e) { logRunException(e); } catch (CancelledKeyException e) { logRunException(e); } catch (ClosedSelectorException e) { logRunException(e); } catch (IllegalStateException e) { logRunException(e); } } } getLogger().info("Shut down memcached client"); } /** * Shut down immediately. */ public void shutdown() { shutdown(-1, TimeUnit.MILLISECONDS); } /** * Shut down this client gracefully. * * @param timeout the amount of time time for shutdown * @param unit the TimeUnit for the timeout * @return result of the shutdown request */ public boolean shutdown(long timeout, TimeUnit unit) { // Guard against double shutdowns (bug 8). if(shuttingDown) { getLogger().info("Suppressing duplicate attempt to shut down"); return false; } shuttingDown=true; String baseName=getName(); setName(baseName + " - SHUTTING DOWN"); boolean rv=false; try { // Conditionally wait if(timeout > 0) { setName(baseName + " - SHUTTING DOWN (waiting)"); rv=waitForQueues(timeout, unit); } } finally { // But always begin the shutdown sequence try { setName(baseName + " - SHUTTING DOWN (telling client)"); running=false; conn.shutdown(); setName(baseName + " - SHUTTING DOWN (informed client)"); tcService.shutdown(); if (configurationProvider != null) { configurationProvider.shutdown(); } } catch (IOException e) { getLogger().warn("exception while shutting down configuration provider", e); } } return rv; } /** * Wait for the queues to die down. * * @param timeout the amount of time time for shutdown * @param unit the TimeUnit for the timeout * @return result of the request for the wait * @throws IllegalStateException in the rare circumstance where queue * is too full to accept any more requests */ public boolean waitForQueues(long timeout, TimeUnit unit) { CountDownLatch blatch = broadcastOp(new BroadcastOpFactory(){ public Operation newOp(final MemcachedNode n, final CountDownLatch latch) { return opFact.noop( new OperationCallback() { public void complete() { latch.countDown(); } public void receivedStatus(OperationStatus s) { // Nothing special when receiving status, only // necessary to complete the interface } }); }}, conn.getLocator().getAll(), false); try { // XXX: Perhaps IllegalStateException should be caught here // and the check retried. return blatch.await(timeout, unit); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for queues", e); } } /** * Add a connection observer. * * If connections are already established, your observer will be called * with the address and -1. * * @param obs the ConnectionObserver you wish to add * @return true if the observer was added. */ public boolean addObserver(ConnectionObserver obs) { boolean rv = conn.addObserver(obs); if(rv) { for(MemcachedNode node : conn.getLocator().getAll()) { if(node.isActive()) { obs.connectionEstablished(node.getSocketAddress(), -1); } } } return rv; } /** * Remove a connection observer. * * @param obs the ConnectionObserver you wish to add * @return true if the observer existed, but no longer does */ public boolean removeObserver(ConnectionObserver obs) { return conn.removeObserver(obs); } public void connectionEstablished(SocketAddress sa, int reconnectCount) { if(authDescriptor != null) { if (authDescriptor.authThresholdReached()) { this.shutdown(); } authMonitor.authConnection(conn, opFact, authDescriptor, findNode(sa)); } } private MemcachedNode findNode(SocketAddress sa) { MemcachedNode node = null; for(MemcachedNode n : conn.getLocator().getAll()) { if(n.getSocketAddress().equals(sa)) { node = n; } } assert node != null : "Couldn't find node connected to " + sa; return node; } public void connectionLost(SocketAddress sa) { // Don't care. } @Override public String toString() { return connFactory.toString(); } >>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd }
Solution content
 *      }
 * 
*/ public class MemcachedClient extends SpyObject implements MemcachedClientIF, ConnectionObserver { protected volatile boolean shuttingDown = false; protected final long operationTimeout; protected final MemcachedConnection mconn; protected final OperationFactory opFact; protected final Transcoder transcoder; protected final TranscodeService tcService; protected final AuthDescriptor authDescriptor; protected final ConnectionFactory connFactory; protected final AuthThreadMonitor authMonitor = new AuthThreadMonitor(); /** * Get a memcache client operating on the specified memcached locations. * * @param ia the memcached locations * @throws IOException if connections cannot be established */ public MemcachedClient(InetSocketAddress... ia) throws IOException { this(new DefaultConnectionFactory(), Arrays.asList(ia)); } /** * Get a memcache client over the specified memcached locations. * * @param addrs the socket addrs * @throws IOException if connections cannot be established */ public MemcachedClient(List addrs) throws IOException { this(new DefaultConnectionFactory(), addrs); } /** * Get a memcache client over the specified memcached locations. * * @param cf the connection factory to configure connections for this client * @param addrs the socket addresses * @throws IOException if connections cannot be established */ public MemcachedClient(ConnectionFactory cf, List addrs) throws IOException { if (cf == null) { throw new NullPointerException("Connection factory required"); } if (addrs == null) { throw new NullPointerException("Server list required"); } if (addrs.isEmpty()) { throw new IllegalArgumentException("You must have at least one server to" + " connect to"); } if (cf.getOperationTimeout() <= 0) { throw new IllegalArgumentException("Operation timeout must be positive."); } connFactory = cf; tcService = new TranscodeService(cf.isDaemon()); transcoder = cf.getDefaultTranscoder(); opFact = cf.getOperationFactory(); assert opFact != null : "Connection factory failed to make op factory"; mconn = cf.createConnection(addrs); assert mconn != null : "Connection factory failed to make a connection"; operationTimeout = cf.getOperationTimeout(); authDescriptor = cf.getAuthDescriptor(); if (authDescriptor != null) { addObserver(this); } } /** * Get the addresses of available servers. * *

* This is based on a snapshot in time so shouldn't be considered completely * accurate, but is a useful for getting a feel for what's working and what's * not working. *

* * @return point-in-time view of currently available servers */ public Collection getAvailableServers() { ArrayList rv = new ArrayList(); for (MemcachedNode node : mconn.getLocator().getAll()) { if (node.isActive()) { rv.add(node.getSocketAddress()); } } return rv; } /** * Get the addresses of unavailable servers. * *

* This is based on a snapshot in time so shouldn't be considered completely * accurate, but is a useful for getting a feel for what's working and what's * not working. *

* * @return point-in-time view of currently available servers */ public Collection getUnavailableServers() { ArrayList rv = new ArrayList(); for (MemcachedNode node : mconn.getLocator().getAll()) { if (!node.isActive()) { rv.add(node.getSocketAddress()); } } return rv; } /** * Get a read-only wrapper around the node locator wrapping this instance. * * @return this instance's NodeLocator */ public NodeLocator getNodeLocator() { return mconn.getLocator().getReadonlyCopy(); } /** * Get the default transcoder that's in use. * * @return this instance's Transcoder */ public Transcoder getTranscoder() { return transcoder; } private void validateKey(String key) { byte[] keyBytes = KeyUtil.getKeyBytes(key); if (keyBytes.length > MAX_KEY_LENGTH) { throw new IllegalArgumentException("Key is too long (maxlen = " + MAX_KEY_LENGTH + ")"); } if (keyBytes.length == 0) { throw new IllegalArgumentException( "Key must contain at least one character."); } // Validate the key for (byte b : keyBytes) { if (b == ' ' || b == '\n' || b == '\r' || b == 0) { throw new IllegalArgumentException( "Key contains invalid characters: ``" + key + "''"); } } } /** * (internal use) Add a raw operation to a numbered connection. This method is * exposed for testing. * * @param which server number * @param op the operation to perform * @return the Operation */ Operation addOp(final String key, final Operation op) { validateKey(key); mconn.checkState(); mconn.addOperation(key, op); return op; } CountDownLatch broadcastOp(final BroadcastOpFactory of) { return broadcastOp(of, mconn.getLocator().getAll(), true); } CountDownLatch broadcastOp(final BroadcastOpFactory of, Collection nodes) { return broadcastOp(of, nodes, true); } private CountDownLatch broadcastOp(BroadcastOpFactory of, Collection nodes, boolean checkShuttingDown) { if (checkShuttingDown && shuttingDown) { throw new IllegalStateException("Shutting down"); } return mconn.broadcastOperation(of, nodes); } private OperationFuture asyncStore(StoreType storeType, String key, int exp, T value, Transcoder tc) { CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); Operation op = opFact.store(storeType, key, co.getFlags(), exp, co.getData(), new OperationCallback() { public void receivedStatus(OperationStatus val) { rv.set(val.isSuccess(), val); } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } private OperationFuture asyncStore(StoreType storeType, String key, int exp, Object value) { return asyncStore(storeType, key, exp, value, transcoder); } private OperationFuture asyncCat(ConcatenationType catType, long cas, String key, T value, Transcoder tc) { CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); Operation op = opFact.cat(catType, cas, key, co.getData(), new OperationCallback() { public void receivedStatus(OperationStatus val) { rv.set(val.isSuccess(), val); } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Touch the given key to reset its expiration time with the default * transcoder. * * @param key the key to fetch * @param exp the new expiration to set for the given key * @return a future that will hold the return value of whether or not the * fetch succeeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture touch(final String key, final int exp) { return touch(key, exp, transcoder); } /** * Touch the given key to reset its expiration time. * * @param key the key to fetch * @param exp the new expiration to set for the given key * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of whether or not the * fetch succeeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture touch(final String key, final int exp, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); Operation op = opFact.touch(key, exp, new OperationCallback() { public void receivedStatus(OperationStatus status) { rv.set(status.isSuccess(), status); } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Append to an existing value in the cache. * *

* Note that the return will be false any time a mutation has not occurred. *

* * @param cas cas identifier (ignored in the ascii protocol) * @param key the key to whose value will be appended * @param val the value to append * @return a future indicating success, false if there was no change to the * value * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture append(long cas, String key, Object val) { return append(cas, key, val, transcoder); } /** * Append to an existing value in the cache. * *

* Note that the return will be false any time a mutation has not occurred. *

* * @param * @param cas cas identifier (ignored in the ascii protocol) * @param key the key to whose value will be appended * @param val the value to append * @param tc the transcoder to serialize and unserialize the value * @return a future indicating success * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture append(long cas, String key, T val, Transcoder tc) { return asyncCat(ConcatenationType.append, cas, key, val, tc); } /** * Prepend to an existing value in the cache. * *

* Note that the return will be false any time a mutation has not occurred. *

* * @param cas cas identifier (ignored in the ascii protocol) * @param key the key to whose value will be prepended * @param val the value to append * @return a future indicating success * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture prepend(long cas, String key, Object val) { return prepend(cas, key, val, transcoder); } /** * Prepend to an existing value in the cache. * *

* Note that the return will be false any time a mutation has not occurred. *

* * @param * @param cas cas identifier (ignored in the ascii protocol) * @param key the key to whose value will be prepended * @param val the value to append * @param tc the transcoder to serialize and unserialize the value * @return a future indicating success * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture prepend(long cas, String key, T val, Transcoder tc) { return asyncCat(ConcatenationType.prepend, cas, key, val, tc); } * rv.setOperation(op); /** * Asynchronous CAS operation. * * @param * @param key the key * @param casId the CAS identifier (from a gets operation) * @param value the new value * @param tc the transcoder to serialize and unserialize the value * @return a future that will indicate the status of the CAS * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Future asyncCAS(String key, long casId, T value, Transcoder tc) { return asyncCAS(key, casId, 0, value, tc); } /** * Asynchronous CAS operation. * * @param * @param key the key * @param casId the CAS identifier (from a gets operation) * @param exp the expiration of this object * @param value the new value * @param tc the transcoder to serialize and unserialize the value * @return a future that will indicate the status of the CAS * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Future asyncCAS(String key, long casId, int exp, T value, Transcoder tc) { CachedData co = tc.encode(value); final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); Operation op = opFact.cas(StoreType.set, key, casId, co.getFlags(), exp, co.getData(), new OperationCallback() { public void receivedStatus(OperationStatus val) { if (val instanceof CASOperationStatus) { rv.set(((CASOperationStatus) val).getCASResponse(), val); } else if (val instanceof CancelledOperationStatus) { getLogger().debug("CAS operation cancelled"); } else { throw new RuntimeException("Unhandled state: " + val); } } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Asynchronous CAS operation using the default transcoder. * * @param key the key * @param casId the CAS identifier (from a gets operation) * @param value the new value * @return a future that will indicate the status of the CAS * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Future asyncCAS(String key, long casId, Object value) { return asyncCAS(key, casId, value, transcoder); } /** * Perform a synchronous CAS operation. * * @param * @param key the key * @param casId the CAS identifier (from a gets operation) * @param value the new value * @param tc the transcoder to serialize and unserialize the value * @return a CASResponse * @throws OperationTimeoutException if global operation timeout is exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASResponse cas(String key, long casId, T value, Transcoder tc) { return cas(key, casId, 0, value, tc); } /** * Perform a synchronous CAS operation. * * @param * @param key the key * @param casId the CAS identifier (from a gets operation) * @param exp the expiration of this object * @param value the new value * @param tc the transcoder to serialize and unserialize the value * @return a CASResponse * @throws OperationTimeoutException if global operation timeout is exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASResponse cas(String key, long casId, int exp, T value, Transcoder tc) { try { return asyncCAS(key, casId, exp, value, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Perform a synchronous CAS operation with the default transcoder. * * @param key the key * @param casId the CAS identifier (from a gets operation) * @param value the new value * @return a CASResponse * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASResponse cas(String key, long casId, Object value) { return cas(key, casId, value, transcoder); } /** * Add an object to the cache iff it does not exist already. * *

* The exp value is passed along to memcached exactly as given, * and will be processed per the memcached protocol specification: *

* *

* Note that the return will be false any time a mutation has not occurred. *

* *
*

* The actual value sent may either be Unix time (number of seconds since * January 1, 1970, as a 32-bit value), or a number of seconds starting from * current time. In the latter case, this number of seconds may not exceed * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client * is larger than that, the server will consider it to be real Unix time value * rather than an offset from current time. *

*
* * @param * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @param tc the transcoder to serialize and unserialize the value * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture add(String key, int exp, T o, Transcoder tc) { return asyncStore(StoreType.add, key, exp, o, tc); } /** * Add an object to the cache (using the default transcoder) iff it does not * exist already. * *

* The exp value is passed along to memcached exactly as given, * and will be processed per the memcached protocol specification: *

* *

* Note that the return will be false any time a mutation has not occurred. *

* *
*

* The actual value sent may either be Unix time (number of seconds since * January 1, 1970, as a 32-bit value), or a number of seconds starting from * current time. In the latter case, this number of seconds may not exceed * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client * is larger than that, the server will consider it to be real Unix time value * rather than an offset from current time. *

* * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture add(String key, int exp, Object o) { return asyncStore(StoreType.add, key, exp, o, transcoder); } /** * Set an object in the cache regardless of any existing value. * *

* The exp value is passed along to memcached exactly as given, * and will be processed per the memcached protocol specification: *

* *

* Note that the return will be false any time a mutation has not occurred. *

* *
*

* The actual value sent may either be Unix time (number of seconds since * January 1, 1970, as a 32-bit value), or a number of seconds starting from * current time. In the latter case, this number of seconds may not exceed * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client * is larger than that, the server will consider it to be real Unix time value * rather than an offset from current time. *

*
* * @param * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @param tc the transcoder to serialize and unserialize the value * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture set(String key, int exp, T o, Transcoder tc) { return asyncStore(StoreType.set, key, exp, o, tc); } /** * Set an object in the cache (using the default transcoder) regardless of any * existing value. * *

* The exp value is passed along to memcached exactly as given, * and will be processed per the memcached protocol specification: *

* *

* Note that the return will be false any time a mutation has not occurred. *

* *
*

* The actual value sent may either be Unix time (number of seconds since * January 1, 1970, as a 32-bit value), or a number of seconds starting from * current time. In the latter case, this number of seconds may not exceed * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client * is larger than that, the server will consider it to be real Unix time value * rather than an offset from current time. *

*
* * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture set(String key, int exp, Object o) { return asyncStore(StoreType.set, key, exp, o, transcoder); } /** * Replace an object with the given value iff there is already a value for the * given key. * *

* The exp value is passed along to memcached exactly as given, * and will be processed per the memcached protocol specification: *

* *

addOp(key, op); return asyncGetBulk(keys, new SingleElementInfiniteIterator>( tc)); * Note that the return will be false any time a mutation has not occurred. *

* *
*

* The actual value sent may either be Unix time (number of seconds since * January 1, 1970, as a 32-bit value), or a number of seconds starting from * current time. In the latter case, this number of seconds may not exceed * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client * is larger than that, the server will consider it to be real Unix time value * rather than an offset from current time. *

*
* * @param * @param key the key under which this object should be added. * @param exp the expiration of this object * @param o the object to store * @param tc the transcoder to serialize and unserialize the value * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture replace(String key, int exp, T o, Transcoder tc) { return asyncStore(StoreType.replace, key, exp, o, tc); } /** * Replace an object with the given value (transcoded with the default * transcoder) iff there is already a value for the given key. * *

* The exp value is passed along to memcached exactly as given, * and will be processed per the memcached protocol specification: *

* *

* Note that the return will be false any time a mutation has not occurred. *

* *
*

* The actual value sent may either be Unix time (number of seconds since * January 1, 1970, as a 32-bit value), or a number of seconds starting from * current time. In the latter case, this number of seconds may not exceed * 60*60*24*30 (number of seconds in 30 days); if the number sent by a client * is larger than that, the server will consider it to be real Unix time value * rather than an offset from current time. *

*
* * @param key the key under which this object should be added. * * @param exp the expiration of this object * @param o the object to store * @return a future representing the processing of this operation * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture replace(String key, int exp, Object o) { return asyncStore(StoreType.replace, key, exp, o, transcoder); } /** * Get the given key asynchronously. * * @param * @param key the key to fetch * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public GetFuture asyncGet(final String key, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); final GetFuture rv = new GetFuture(latch, operationTimeout, key); Operation op = opFact.get(key, new GetOperation.Callback() { private Future val = null; public void receivedStatus(OperationStatus status) { rv.set(val, status); } public void gotData(String k, int flags, byte[] data) { assert key.equals(k) : "Wrong key returned"; val = tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize())); } public void complete() { latch.countDown(); } }); } return rv; } /** * Get the given key asynchronously and decode with the default transcoder. * * @param key the key to fetch * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public GetFuture asyncGet(final String key) { return asyncGet(key, transcoder); } /** * Gets (with CAS support) the given key asynchronously. * * @param * @param key the key to fetch * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture> asyncGets(final String key, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture> rv = new OperationFuture>(key, latch, operationTimeout); Operation op = opFact.gets(key, new GetsOperation.Callback() { private CASValue val = null; public void receivedStatus(OperationStatus status) { rv.set(val, status); } public void gotData(String k, int flags, long cas, byte[] data) { assert key.equals(k) : "Wrong key returned"; assert cas > 0 : "CAS was less than zero: " + cas; val = new CASValue(cas, tc.decode(new CachedData(flags, data, tc.getMaxSize()))); } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Gets (with CAS support) the given key asynchronously and decode using the * default transcoder. * * @param key the key to fetch * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture> asyncGets(final String key) { return asyncGets(key, transcoder); } /** * Gets (with CAS support) with a single key. * * @param * @param key the key to get * @param tc the transcoder to serialize and unserialize value * @return the result from the cache and CAS id (null if there is none) * @throws OperationTimeoutException if global operation timeout is exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASValue gets(String key, Transcoder tc) { try { return asyncGets(key, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Get with a single key and reset its expiration. * * @param * @param key the key to get * @param exp the new expiration for the key * @param tc the transcoder to serialize and unserialize value * @return the result from the cache (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASValue getAndTouch(String key, int exp, Transcoder tc) { try { return asyncGetAndTouch(key, exp, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Get a single key and reset its expiration using the default transcoder. * * @param key the key to get * @param exp the new expiration for the key * @return the result from the cache and CAS id (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASValue getAndTouch(String key, int exp) { return getAndTouch(key, exp, transcoder); } /** * Gets (with CAS support) with a single key using the default transcoder. * * @param key the key to get * @return the result from the cache and CAS id (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public CASValue gets(String key) { return gets(key, transcoder); } /** * Get with a single key. * * @param * @param key the key to get * @param tc the transcoder to serialize and unserialize value * @return the result from the cache (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public T get(String key, Transcoder tc) { try { return asyncGet(key, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for value", e); } catch (ExecutionException e) { throw new RuntimeException("Exception waiting for value", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for value", e); } } /** * Get with a single key and decode using the default transcoder. * * @param key the key to get * @return the result from the cache (null if there is none) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Object get(String key) { return get(key, transcoder); } /** * Asynchronously get a bunch of objects from the cache. * * @param * @param keys the keys to request * @param tcIter an iterator of transcoders to serialize and unserialize * values; the transcoders are matched with the keys in the same * order. The minimum of the key collection length and number of * transcoders is used and no exception is thrown if they do not * match * @return a Future result of that fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public BulkFuture> asyncGetBulk(Collection keys, Iterator> tcIter) { final Map> m = new ConcurrentHashMap>(); // This map does not need to be a ConcurrentHashMap // because it is fully populated when it is used and // used only to read the transcoder for a key. final Map> tcMap = new HashMap>(); // Break the gets down into groups by key final Map> chunks = new HashMap>(); final NodeLocator locator = mconn.getLocator(); Iterator keyIter = keys.iterator(); while (keyIter.hasNext() && tcIter.hasNext()) { String key = keyIter.next(); tcMap.put(key, tcIter.next()); validateKey(key); final MemcachedNode primaryNode = locator.getPrimary(key); MemcachedNode node = null; if (primaryNode.isActive()) { node = primaryNode; } else { for (Iterator i = locator.getSequence(key); node == null && i.hasNext();) { MemcachedNode n = i.next(); if (n.isActive()) { node = n; } } if (node == null) { node = primaryNode; } } assert node != null : "Didn't find a node for " + key; Collection ks = chunks.get(node); if (ks == null) { ks = new ArrayList(); chunks.put(node, ks); } ks.add(key); } final CountDownLatch latch = new CountDownLatch(chunks.size()); final Collection ops = new ArrayList(chunks.size()); final BulkGetFuture rv = new BulkGetFuture(m, ops, latch); GetOperation.Callback cb = new GetOperation.Callback() { @SuppressWarnings("synthetic-access") public void receivedStatus(OperationStatus status) { rv.setStatus(status); } public void gotData(String k, int flags, byte[] data) { Transcoder tc = tcMap.get(k); m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()))); } public void complete() { latch.countDown(); } }; // Now that we know how many servers it breaks down into, and the latch // is all set up, convert all of these strings collections to operations final Map mops = new HashMap(); for (Map.Entry> me : chunks.entrySet()) { Operation op = opFact.get(me.getValue(), cb); mops.put(me.getKey(), op); ops.add(op); } assert mops.size() == chunks.size(); mconn.checkState(); mconn.addOperations(mops); return rv; } /** * Asynchronously get a bunch of objects from the cache. * * @param * @param keys the keys to request * @param tc the transcoder to serialize and unserialize values * @return a Future result of that fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public BulkFuture> asyncGetBulk(Collection keys, Transcoder tc) { } /** * Asynchronously get a bunch of objects from the cache and decode them with * the given transcoder. * * @param keys the keys to request * @return a Future result of that fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public BulkFuture> asyncGetBulk(Collection keys) { return asyncGetBulk(keys, transcoder); } /** * Varargs wrapper for asynchronous bulk gets. * * @param * @param tc the transcoder to serialize and unserialize value * @param keys one more more keys to get * @return the future values of those keys * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public BulkFuture> asyncGetBulk(Transcoder tc, String... keys) { return asyncGetBulk(Arrays.asList(keys), tc); } /** * Varargs wrapper for asynchronous bulk gets with the default transcoder. * * @param keys one more more keys to get * @return the future values of those keys * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public BulkFuture> asyncGetBulk(String... keys) { return asyncGetBulk(Arrays.asList(keys), transcoder); } /** * Get the given key to reset its expiration time. * * @param key the key to fetch * @param exp the new expiration to set for the given key * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture> asyncGetAndTouch(final String key, final int exp) { return asyncGetAndTouch(key, exp, transcoder); } /** * Get the given key to reset its expiration time. * * @param key the key to fetch * @param exp the new expiration to set for the given key * @param tc the transcoder to serialize and unserialize value * @return a future that will hold the return value of the fetch * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture> asyncGetAndTouch(final String key, final int exp, final Transcoder tc) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture> rv = new OperationFuture>( key, latch, operationTimeout); Operation op = opFact.getAndTouch(key, exp, new GetAndTouchOperation.Callback() { private CASValue val = null; public void receivedStatus(OperationStatus status) { rv.set(val, status); } public void complete() { latch.countDown(); } public void gotData(String k, int flags, long cas, byte[] data) { assert k.equals(key) : "Wrong key returned"; assert cas > 0 : "CAS was less than zero: " + cas; val = new CASValue(cas, tc.decode(new CachedData(flags, data, tc.getMaxSize()))); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Get the values for multiple keys from the cache. * * @param keys the keys * @param * @param keys the keys * @param tc the transcoder to serialize and unserialize value * @return a map of the values (for each value that exists) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map getBulk(Collection keys, Transcoder tc) { try { return asyncGetBulk(keys, tc).get(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted getting bulk values", e); } catch (ExecutionException e) { throw new RuntimeException("Failed getting bulk values", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting for bulkvalues", e); } } /** * Get the values for multiple keys from the cache. * * @param keys the keys * @return a map of the values (for each value that exists) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map getBulk(Collection keys) { return getBulk(keys, transcoder); } /** * Get the values for multiple keys from the cache. * * @param * @param tc the transcoder to serialize and unserialize value * @param keys the keys * @return a map of the values (for each value that exists) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map getBulk(Transcoder tc, String... keys) { return getBulk(Arrays.asList(keys), tc); } /** * Get the values for multiple keys from the cache. * @return a map of the values (for each value that exists) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map getBulk(String... keys) { return getBulk(Arrays.asList(keys), transcoder); } /** * Get the versions of all of the connected memcacheds. * * @return a Map of SocketAddress to String for connected servers * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map getVersions() { final Map rv = new ConcurrentHashMap(); CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { public Operation newOp(final MemcachedNode n, final CountDownLatch latch) { final SocketAddress sa = n.getSocketAddress(); return opFact.version(new OperationCallback() { public void receivedStatus(OperationStatus s) { rv.put(sa, s.getMessage()); } public void complete() { latch.countDown(); } }); } }); try { blatch.await(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for versions", e); return rv; } /** * Get all of the stats from all of the connections. * * @return a Map of a Map of stats replies by SocketAddress * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map> getStats() { return getStats(null); } /** * Get a set of stats from all connections. * * @param arg which stats to get * @return a Map of the server SocketAddress to a map of String stat keys to * String stat values. * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public Map> getStats(final String arg) { final Map> rv = new HashMap>(); CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { public Operation newOp(final MemcachedNode n, final CountDownLatch latch) { final SocketAddress sa = n.getSocketAddress(); rv.put(sa, new HashMap()); return opFact.stats(arg, new StatsOperation.Callback() { public void gotStat(String name, String val) { rv.get(sa).put(name, val); } @SuppressWarnings("synthetic-access") public void receivedStatus(OperationStatus status) { if (!status.isSuccess()) { getLogger().warn("Unsuccessful stat fetch: %s", status); } } public void complete() { latch.countDown(); } }); } }); try { blatch.await(operationTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for stats", e); } return rv; } private long mutate(Mutator m, String key, int by, long def, int exp) { final AtomicLong rv = new AtomicLong(); final CountDownLatch latch = new CountDownLatch(1); addOp(key, opFact.mutate(m, key, by, def, exp, new OperationCallback() { public void receivedStatus(OperationStatus s) { // XXX: Potential abstraction leak. // The handling of incr/decr in the binary protocol // Allows us to avoid string processing. rv.set(new Long(s.isSuccess() ? s.getMessage() : "-1")); } public void complete() { latch.countDown(); } })); try { if (!latch.await(operationTimeout, TimeUnit.MILLISECONDS)) { throw new OperationTimeoutException("Mutate operation timed out," + "unable to modify counter [" + key + "]"); } } catch (InterruptedException e) { throw new RuntimeException("Interrupted", e); } getLogger().debug("Mutation returned %s", rv); return rv.get(); } /** * Increment the given key by the given amount. * * Due to the way the memcached server operates on items, incremented and * decremented items will be returned as Strings with any operations that * return a value. * * @param key the key * @param by the amount to increment * @return the new value (-1 if the key doesn't exist) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public long incr(String key, int by) { return mutate(Mutator.incr, key, by, 0, -1); } /** * Decrement the given key by the given value. * * Due to the way the memcached server operates on items, incremented and * decremented items will be returned as Strings with any operations that * return a value. * * @param key the key * @param by the value * @return the new value (-1 if the key doesn't exist) * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public long decr(String key, int by) { return mutate(Mutator.decr, key, by, 0, -1); } /** * Increment the given counter, returning the new value. * * Due to the way the memcached server operates on items, incremented and * decremented items will be returned as Strings with any operations that * return a value. * * @param key the key * @param by the amount to increment * @param def the default value (if the counter does not exist) * @param exp the expiration of this object * @return the new value, or -1 if we were unable to increment or add * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public long incr(String key, int by, long def, int exp) { return mutateWithDefault(Mutator.incr, key, by, def, exp); } /** * Decrement the given counter, returning the new value. * * Due to the way the memcached server operates on items, incremented and * decremented items will be returned as Strings with any operations that * return a value. * * @param key the key * @param by the amount to decrement * @param def the default value (if the counter does not exist) * @param exp the expiration of this object * @return the new value, or -1 if we were unable to decrement or add * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public long decr(String key, int by, long def, int exp) { return mutateWithDefault(Mutator.decr, key, by, def, exp); } private long mutateWithDefault(Mutator t, String key, int by, long def, int exp) { long rv = mutate(t, key, by, def, exp); // The ascii protocol doesn't support defaults, so I added them // manually here. if (rv == -1) { Future f = asyncStore(StoreType.add, key, exp, String.valueOf(def)); try { if (f.get(operationTimeout, TimeUnit.MILLISECONDS)) { rv = def; } else { rv = mutate(t, key, by, 0, exp); assert rv != -1 : "Failed to mutate or init value"; } } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for store", e); } catch (ExecutionException e) { throw new RuntimeException("Failed waiting for store", e); } catch (TimeoutException e) { throw new OperationTimeoutException("Timeout waiting to mutate or init" + " value", e); } } return rv; } private OperationFuture asyncMutate(Mutator m, String key, int by, long def, int exp) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); Operation op = addOp(key, opFact.mutate(m, key, by, def, exp, new OperationCallback() { public void receivedStatus(OperationStatus s) { rv.set(new Long(s.isSuccess() ? s.getMessage() : "-1"), s); } public void complete() { latch.countDown(); } })); rv.setOperation(op); return rv; } /** * Asychronous increment. * * @param key key to increment * @param by the amount to increment the value by * @return a future with the incremented value, or -1 if the increment failed. * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture asyncIncr(String key, int by) { return asyncMutate(Mutator.incr, key, by, 0, -1); } /** * Asynchronous decrement. * * @param key key to increment * @param by the amount to increment the value by * @return a future with the decremented value, or -1 if the increment failed. * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture asyncDecr(String key, int by) { return asyncMutate(Mutator.decr, key, by, 0, -1); } /** * Increment the given counter, returning the new value. * * @param key the key * @param by the amount to increment * @param def the default value (if the counter does not exist) * @return the new value, or -1 if we were unable to increment or add * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public long incr(String key, int by, long def) { return mutateWithDefault(Mutator.incr, key, by, def, 0); } /** * Decrement the given counter, returning the new value. * * @param key the key * @param by the amount to decrement * @param def the default value (if the counter does not exist) * @return the new value, or -1 if we were unable to decrement or add * @throws OperationTimeoutException if the global operation timeout is * exceeded * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public long decr(String key, int by, long def) { return mutateWithDefault(Mutator.decr, key, by, def, 0); } /** * Delete the given key from the cache. * *

* The hold argument specifies the amount of time in seconds (or Unix time * until which) the client wishes the server to refuse "add" and "replace" * commands with this key. For this amount of item, the item is put into a * delete queue, which means that it won't possible to retrieve it by the * "get" command, but "add" and "replace" command with this key will also fail * (the "set" command will succeed, however). After the time passes, the item * is finally deleted from server memory. *

* * @param key the key to delete * @param hold how long the key should be unavailable to add commands * * @return whether or not the operation was performed * @deprecated Hold values are no longer honored. */ @Deprecated public OperationFuture delete(String key, int hold) { return delete(key); } /** * Delete the given key from the cache. * * @param key the key to delete * @return whether or not the operation was performed * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture delete(String key) { final CountDownLatch latch = new CountDownLatch(1); final OperationFuture rv = new OperationFuture(key, latch, operationTimeout); DeleteOperation op = opFact.delete(key, new OperationCallback() { public void receivedStatus(OperationStatus s) { rv.set(s.isSuccess(), s); } public void complete() { latch.countDown(); } }); rv.setOperation(op); addOp(key, op); return rv; } /** * Flush all caches from all servers with a delay of application. * * @param delay the period of time to delay, in seconds * @return whether or not the operation was accepted * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture flush(final int delay) { final AtomicReference flushResult = new AtomicReference(null); final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { public Operation newOp(final MemcachedNode n, final CountDownLatch latch) { Operation op = opFact.flush(delay, new OperationCallback() { public void receivedStatus(OperationStatus s) { flushResult.set(s.isSuccess()); } public void complete() { latch.countDown(); } }); ops.add(op); return op; } }); return new OperationFuture(null, blatch, flushResult, operationTimeout) { @Override public boolean cancel(boolean ign) { boolean rv = false; for (Operation op : ops) { op.cancel(); rv |= op.getState() == OperationState.WRITE_QUEUED; } return rv; } @Override public Boolean get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException { status = new OperationStatus(true, "OK"); return super.get(duration, units); } @Override public boolean isCancelled() { boolean rv = false; for (Operation op : ops) { rv |= op.isCancelled(); } return rv; } @Override public boolean isDone() { boolean rv = true; for (Operation op : ops) { rv &= op.getState() == OperationState.COMPLETE; } return rv || isCancelled(); } }; } /** * Flush all caches from all servers immediately. * * @return whether or not the operation was performed * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public OperationFuture flush() { return flush(-1); } public Set listSaslMechanisms() { final ConcurrentMap rv = new ConcurrentHashMap(); CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { public Operation newOp(MemcachedNode n, final CountDownLatch latch) { return opFact.saslMechs(new OperationCallback() { public void receivedStatus(OperationStatus status) { for (String s : status.getMessage().split(" ")) { rv.put(s, s); } } public void complete() { latch.countDown(); } }); } }); try { blatch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return rv.keySet(); } /** * Shut down immediately. */ public void shutdown() { shutdown(-1, TimeUnit.MILLISECONDS); } /** * Shut down this client gracefully. * * @param timeout the amount of time time for shutdown * @param unit the TimeUnit for the timeout * @return result of the shutdown request */ public boolean shutdown(long timeout, TimeUnit unit) { // Guard against double shutdowns (bug 8). if (shuttingDown) { getLogger().info("Suppressing duplicate attempt to shut down"); return false; } shuttingDown = true; String baseName = mconn.getName(); mconn.setName(baseName + " - SHUTTING DOWN"); boolean rv = true; try { // Conditionally wait if (timeout > 0) { mconn.setName(baseName + " - SHUTTING DOWN (waiting)"); rv = waitForQueues(timeout, unit); } } finally { // But always begin the shutdown sequence try { mconn.setName(baseName + " - SHUTTING DOWN (telling client)"); mconn.shutdown(); mconn.setName(baseName + " - SHUTTING DOWN (informed client)"); tcService.shutdown(); } catch (IOException e) { getLogger().warn("exception while shutting down", e); } } return rv; } /** * Wait for the queues to die down. * * @param timeout the amount of time time for shutdown * @param unit the TimeUnit for the timeout * @return result of the request for the wait * @throws IllegalStateException in the rare circumstance where queue is too * full to accept any more requests */ public boolean waitForQueues(long timeout, TimeUnit unit) { CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { public Operation newOp(final MemcachedNode n, final CountDownLatch latch) { return opFact.noop(new OperationCallback() { public void complete() { latch.countDown(); } public void receivedStatus(OperationStatus s) { // Nothing special when receiving status, only // necessary to complete the interface } }); } }, mconn.getLocator().getAll(), false); try { // XXX: Perhaps IllegalStateException should be caught here // and the check retried. return blatch.await(timeout, unit); } catch (InterruptedException e) { throw new RuntimeException("Interrupted waiting for queues", e); } } /** * Add a connection observer. * * If connections are already established, your observer will be called with * the address and -1. * * @param obs the ConnectionObserver you wish to add * @return true if the observer was added. */ public boolean addObserver(ConnectionObserver obs) { boolean rv = mconn.addObserver(obs); if (rv) { for (MemcachedNode node : mconn.getLocator().getAll()) { if (node.isActive()) { obs.connectionEstablished(node.getSocketAddress(), -1); } } } return rv; } /** * Remove a connection observer. * * @param obs the ConnectionObserver you wish to add * @return true if the observer existed, but no longer does */ public boolean removeObserver(ConnectionObserver obs) { return mconn.removeObserver(obs); } public void connectionEstablished(SocketAddress sa, int reconnectCount) { if (authDescriptor != null) { if (authDescriptor.authThresholdReached()) { this.shutdown(); } authMonitor.authConnection(mconn, opFact, authDescriptor, findNode(sa)); } } private MemcachedNode findNode(SocketAddress sa) { MemcachedNode node = null; for (MemcachedNode n : mconn.getLocator().getAll()) { if (n.getSocketAddress().equals(sa)) { node = n; } } assert node != null : "Couldn't find node connected to " + sa; return node; } public void connectionLost(SocketAddress sa) { // Don't care. } @Override public String toString() { return connFactory.toString(); } }
File
MemcachedClient.java
Developer's decision
Combination
Kind of conflict
Annotation
Attribute
Class signature
Comment
Method declaration
Method invocation
Chunk
Conflicting content
    return keys.iterator().next();
  }

<<<<<<< HEAD
  public Collection clone(KeyedOperation op) {
    assert (op.getState() == OperationState.WRITE_QUEUED || op.getState()
        == OperationState.RETRY) : "Who passed me an operation in the "
        + op.getState() + "state?";
    assert !op.isCancelled() : "Attempted to clone a canceled op";
    assert !op.hasErrored() : "Attempted to clone an errored op";

    Collection rv = new ArrayList(op.getKeys().size());
		return rv;
	}

    if (op instanceof GetOperation) {
      rv.addAll(cloneGet(op));
    } else if (op instanceof GetsOperation) {
      GetsOperation.Callback callback =
          (GetsOperation.Callback) op.getCallback();
      for (String k : op.getKeys()) {
        rv.add(gets(k, callback));
      }
    } else if (op instanceof CASOperation) {
      CASOperation cop = (CASOperation) op;
      rv.add(cas(cop.getStoreType(), first(op.getKeys()), cop.getCasValue(),
          cop.getFlags(), cop.getExpiration(), cop.getBytes(),
          cop.getCallback()));
    } else if (op instanceof DeleteOperation) {
      rv.add(delete(first(op.getKeys()), op.getCallback()));
    } else if (op instanceof MutatorOperation) {
      MutatorOperation mo = (MutatorOperation) op;
      rv.add(mutate(mo.getType(), first(op.getKeys()), mo.getBy(),
          mo.getDefault(), mo.getExpiration(), op.getCallback()));
    } else if (op instanceof StoreOperation) {
      StoreOperation so = (StoreOperation) op;
      rv.add(store(so.getStoreType(), first(op.getKeys()), so.getFlags(),
          so.getExpiration(), so.getData(), op.getCallback()));
    } else if (op instanceof ConcatenationOperation) {
      ConcatenationOperation c = (ConcatenationOperation) op;
      rv.add(cat(c.getStoreType(), c.getCasValue(), first(op.getKeys()),
          c.getData(), c.getCallback()));
    } else {
      assert false : "Unhandled operation type: " + op.getClass();
    }
    if (op instanceof VBucketAware) {
      VBucketAware vop = (VBucketAware) op;
      if (!vop.getNotMyVbucketNodes().isEmpty()) {
        for (Operation operation : rv) {
          if (operation instanceof VBucketAware) {
            Collection notMyVbucketNodes =
                vop.getNotMyVbucketNodes();
            ((VBucketAware) operation).setNotMyVbucketNodes(notMyVbucketNodes);
          }
        }
      }
    }
    return rv;
  }

  protected abstract Collection
  cloneGet(KeyedOperation op);
=======
		Collection rv = new ArrayList(
				op.getKeys().size());
		if(op instanceof GetOperation) {
			rv.addAll(cloneGet(op));
		} else if(op instanceof GetsOperation) {
			GetsOperation.Callback callback =
				(GetsOperation.Callback)op.getCallback();
			for(String k : op.getKeys()) {
				rv.add(gets(k, callback));
			}
		} else if(op instanceof CASOperation) {
			CASOperation cop = (CASOperation)op;
			rv.add(cas(cop.getStoreType(), first(op.getKeys()),
					cop.getCasValue(), cop.getFlags(), cop.getExpiration(),
					cop.getData(), cop.getCallback()));
		} else if(op instanceof DeleteOperation) {
			rv.add(delete(first(op.getKeys()), op.getCallback()));
		} else if(op instanceof MutatorOperation) {
			MutatorOperation mo = (MutatorOperation)op;
			rv.add(mutate(mo.getType(), first(op.getKeys()),
					mo.getBy(), mo.getDefault(), mo.getExpiration(),
					op.getCallback()));
		} else if(op instanceof StoreOperation) {
			StoreOperation so = (StoreOperation)op;
			rv.add(store(so.getStoreType(), first(op.getKeys()), so.getFlags(),
					so.getExpiration(), so.getData(), op.getCallback()));
		} else if(op instanceof ConcatenationOperation) {
			ConcatenationOperation c = (ConcatenationOperation)op;
			rv.add(cat(c.getStoreType(), c.getCasValue(), first(op.getKeys()),
					c.getData(), c.getCallback()));
		} else {
			assert false : "Unhandled operation type: " + op.getClass();
		}
		if (op instanceof VBucketAware) {
			VBucketAware vop = (VBucketAware)op;
			if (!vop.getNotMyVbucketNodes().isEmpty()) {
				for (Operation operation : rv) {
					if (operation instanceof VBucketAware) {
						Collection notMyVbucketNodes = vop.getNotMyVbucketNodes();
						((VBucketAware) operation).setNotMyVbucketNodes(notMyVbucketNodes);
					}
				}
			}
		}
	protected abstract Collection cloneGet(
			KeyedOperation op);
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    return keys.iterator().next();
  }

  public Collection clone(KeyedOperation op) {
    assert (op.getState() == OperationState.WRITE_QUEUED || op.getState()
        == OperationState.RETRY) : "Who passed me an operation in the "
        + op.getState() + "state?";
    assert !op.isCancelled() : "Attempted to clone a canceled op";
    assert !op.hasErrored() : "Attempted to clone an errored op";

    Collection rv = new ArrayList(op.getKeys().size());
    if (op instanceof GetOperation) {
      rv.addAll(cloneGet(op));
    } else if (op instanceof GetsOperation) {
      GetsOperation.Callback callback =
          (GetsOperation.Callback) op.getCallback();
      for (String k : op.getKeys()) {
        rv.add(gets(k, callback));
      }
    } else if (op instanceof CASOperation) {
      CASOperation cop = (CASOperation) op;
      rv.add(cas(cop.getStoreType(), first(op.getKeys()), cop.getCasValue(),
          cop.getFlags(), cop.getExpiration(), cop.getData(),
          cop.getCallback()));
    } else if(op instanceof DeleteOperation) {
      rv.add(delete(first(op.getKeys()), op.getCallback()));
    } else if (op instanceof MutatorOperation) {
      MutatorOperation mo = (MutatorOperation) op;
      rv.add(mutate(mo.getType(), first(op.getKeys()), mo.getBy(),
          mo.getDefault(), mo.getExpiration(), op.getCallback()));
    } else if (op instanceof StoreOperation) {
      StoreOperation so = (StoreOperation) op;
      rv.add(store(so.getStoreType(), first(op.getKeys()), so.getFlags(),
          so.getExpiration(), so.getData(), op.getCallback()));
    } else if (op instanceof ConcatenationOperation) {
      ConcatenationOperation c = (ConcatenationOperation) op;
      rv.add(cat(c.getStoreType(), c.getCasValue(), first(op.getKeys()),
          c.getData(), c.getCallback()));
    } else {
      assert false : "Unhandled operation type: " + op.getClass();
    }
    if (op instanceof VBucketAware) {
      VBucketAware vop = (VBucketAware) op;
      if (!vop.getNotMyVbucketNodes().isEmpty()) {
        for (Operation operation : rv) {
          if (operation instanceof VBucketAware) {
            Collection notMyVbucketNodes =
                vop.getNotMyVbucketNodes();
            ((VBucketAware) operation).setNotMyVbucketNodes(notMyVbucketNodes);
          }
        }
      }
    }
    return rv;
  }

  protected abstract Collection
  cloneGet(KeyedOperation op);
}
File
BaseOperationFactory.java
Developer's decision
Manual
Kind of conflict
If statement
Method declaration
Method interface
Method invocation
Return statement
Variable
Chunk
Conflicting content
   */
  int getFlags();

<<<<<<< HEAD
  /**
   * Get the expiration to be set for this operation.
   */
  int getExpiration();

  /**
   * Get the bytes to be set during this operation.
   *
   * 

* Note, this returns an exact reference to the bytes and the data * must not be modified. *

*/ byte[] getBytes(); ======= /** * Get the bytes to be set during this operation. * *

* Note, this returns an exact reference to the bytes and the data * must not be modified. *

*/ byte[] getData(); >>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd }
Solution content
   */
  int getFlags();

  /**
   * Get the expiration to be set for this operation.
   */
  int getExpiration();

  /**
   * Get the bytes to be set during this operation.
   *
   * 

* Note, this returns an exact reference to the bytes and the data * must not be modified. *

*/ byte[] getData(); }
File
CASOperation.java
Developer's decision
Combination
Kind of conflict
Comment
Method interface
Chunk
Conflicting content
=======
	@Override
    getCallback().receivedStatus(CANCELLED);
  }
    setBuffer(b);
  }

<<<<<<< HEAD
  @Override
  protected final void wasCancelled() {
	public String toString() {
		return "Cmd: " + cmd + " Keys: " + StringUtils.join(keys, " ") + "Exp: "
			+ exp;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    setBuffer(b);
  }

  @Override
  protected final void wasCancelled() {
    getCallback().receivedStatus(CANCELLED);
  }

  @Override
  public String toString() {
    return "Cmd: " + cmd + " Keys: " + StringUtils.join(keys, " ") + "Exp: "
      + exp;
  }
}
File
BaseGetOpImpl.java
Developer's decision
Concatenation
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
    return exp;
  }

<<<<<<< HEAD
  public byte[] getData() {
    return data;
  }
}
=======
	public byte[] getData() {
		return data;
	}

	@Override
	public String toString() {
		return "Cmd: " + type + " Key: " + key + " Flags: " + flags + " Exp: "
			+ exp + " Data Length: " + data.length;
	}
}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
Solution content
  @Override
    return exp;
  }

  public byte[] getData() {
    return data;
  }

  public String toString() {
    return "Cmd: " + type + " Key: " + key + " Flags: " + flags + " Exp: "
      + exp + " Data Length: " + data.length;
  }
}
File
BaseStoreOperationImpl.java
Developer's decision
Version 2
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content

class CASOperationImpl extends OperationImpl implements CASOperation {

<<<<<<< HEAD
  // Overhead storage stuff to make sure the buffer pushes out far enough.
  // This is "cas" + length(flags) + length(length(data)) + length(cas id)
  // + spaces
  private static final int OVERHEAD = 64;

  private static final OperationStatus STORED = new CASOperationStatus(true,
      "STORED", CASResponse.OK);
  private static final OperationStatus NOT_FOUND = new CASOperationStatus(
      false, "NOT_FOUND", CASResponse.NOT_FOUND);
  private static final OperationStatus EXISTS = new CASOperationStatus(false,
      "EXISTS", CASResponse.EXISTS);

  private final String key;
  private final long casValue;
  private final int flags;
  private final int exp;
  private final byte[] data;

  public CASOperationImpl(String k, long c, int f, int e, byte[] d,
      OperationCallback cb) {
    super(cb);
    key = k;
    casValue = c;
    flags = f;
    exp = e;
    data = d;
  }

  @Override
  public void handleLine(String line) {
    assert getState() == OperationState.READING : "Read ``" + line
        + "'' when in " + getState() + " state";
    getCallback().receivedStatus(matchStatus(line, STORED, NOT_FOUND, EXISTS));
    transitionState(OperationState.COMPLETE);
  }

  @Override
  public void initialize() {
    ByteBuffer bb = ByteBuffer.allocate(data.length
        + KeyUtil.getKeyBytes(key).length + OVERHEAD);
    setArguments(bb, "cas", key, flags, exp, data.length, casValue);
    assert bb.remaining() >= data.length + 2 : "Not enough room in buffer,"
        + " need another " + (2 + data.length - bb.remaining());
    bb.put(data);
    bb.put(CRLF);
    bb.flip();
    setBuffer(bb);
  }

  @Override
  protected void wasCancelled() {
    // XXX: Replace this comment with why I did this
    getCallback().receivedStatus(CANCELLED);
  }

  public Collection getKeys() {
    return Collections.singleton(key);
  }

  public byte[] getBytes() {
    return data;
  }

  public long getCasValue() {
    return casValue;
  }

  public int getExpiration() {
    return exp;
  }

  public int getFlags() {
    return flags;
  }

  public StoreType getStoreType() {
    return StoreType.set;
  }
=======
	// Overhead storage stuff to make sure the buffer pushes out far enough.
	// This is "cas" + length(flags) + length(length(data)) + length(cas id)
	// + spaces
	private static final int OVERHEAD = 64;

	private static final OperationStatus STORED=
		new CASOperationStatus(true, "STORED", CASResponse.OK);
	private static final OperationStatus NOT_FOUND=
		new CASOperationStatus(false, "NOT_FOUND", CASResponse.NOT_FOUND);
	private static final OperationStatus EXISTS=
		new CASOperationStatus(false, "EXISTS", CASResponse.EXISTS);

	private final String key;
	private final long casValue;
	private final int flags;
	private final int exp;
	private final byte[] data;

	public CASOperationImpl(String k, long c, int f, int e,
			byte[] d, OperationCallback cb) {
		super(cb);
		key=k;
		casValue=c;
		flags=f;
		exp=e;
		data=d;
	}

	@Override
	public void handleLine(String line) {
		assert getState() == OperationState.READING
			: "Read ``" + line + "'' when in " + getState() + " state";
		getCallback().receivedStatus(matchStatus(line,
			STORED, NOT_FOUND, EXISTS));
		transitionState(OperationState.COMPLETE);
	}

	@Override
	public void initialize() {
		ByteBuffer bb=ByteBuffer.allocate(data.length
				+ KeyUtil.getKeyBytes(key).length + OVERHEAD);
		setArguments(bb, "cas", key, flags, exp, data.length, casValue);
		assert bb.remaining() >= data.length + 2
			: "Not enough room in buffer, need another "
				+ (2 + data.length - bb.remaining());
		bb.put(data);
		bb.put(CRLF);
		bb.flip();
		setBuffer(bb);
	}

	@Override
	protected void wasCancelled() {
		// XXX:  Replace this comment with why I did this
		getCallback().receivedStatus(CANCELLED);
	}

	public Collection getKeys() {
		return Collections.singleton(key);
	}

	public byte[] getData() {
		return data;
	}

	public long getCasValue() {
		return casValue;
	}

	public int getExpiration() {
		return exp;
	}
	public int getFlags() {
		return flags;
	}

	public StoreType getStoreType() {
		return StoreType.set;
	}

	@Override
	public String toString() {
		return "Cmd: cas Key: " + key + " Cas Value: " + casValue + " Flags: "
			+ flags + " Exp: " + exp + " Data Length: " + data.length;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
class CASOperationImpl extends OperationImpl implements CASOperation {

  // Overhead storage stuff to make sure the buffer pushes out far enough.
  // This is "cas" + length(flags) + length(length(data)) + length(cas id)
  // + spaces
  private static final int OVERHEAD = 64;

  private static final OperationStatus STORED = new CASOperationStatus(true,
      "STORED", CASResponse.OK);
  private static final OperationStatus NOT_FOUND = new CASOperationStatus(
      false, "NOT_FOUND", CASResponse.NOT_FOUND);
  private static final OperationStatus EXISTS = new CASOperationStatus(false,
      "EXISTS", CASResponse.EXISTS);

  private final String key;
  private final long casValue;
  private final int flags;
  private final int exp;
  private final byte[] data;

  public CASOperationImpl(String k, long c, int f, int e, byte[] d,
      OperationCallback cb) {
    super(cb);
    key = k;
    casValue = c;
    flags = f;
    exp = e;
    data = d;
  }

  @Override
  public void handleLine(String line) {
    assert getState() == OperationState.READING : "Read ``" + line
        + "'' when in " + getState() + " state";
    getCallback().receivedStatus(matchStatus(line, STORED, NOT_FOUND, EXISTS));
    transitionState(OperationState.COMPLETE);
  }

  @Override
  public void initialize() {
    ByteBuffer bb = ByteBuffer.allocate(data.length
        + KeyUtil.getKeyBytes(key).length + OVERHEAD);
    setArguments(bb, "cas", key, flags, exp, data.length, casValue);
    assert bb.remaining() >= data.length + 2 : "Not enough room in buffer,"
        + " need another " + (2 + data.length - bb.remaining());
    bb.put(data);
    bb.put(CRLF);
    bb.flip();
    setBuffer(bb);
  }

  @Override
  protected void wasCancelled() {
    // XXX: Replace this comment with why I did this
    getCallback().receivedStatus(CANCELLED);
  }

  public Collection getKeys() {
    return Collections.singleton(key);
  }

  public byte[] getData() {
    return data;
  }

  public long getCasValue() {
    return casValue;
  }

  public int getExpiration() {
    return exp;
  }

  public int getFlags() {
    return flags;
  }

  public StoreType getStoreType() {
    return StoreType.set;
  }

  @Override
  public String toString() {
    return "Cmd: cas Key: " + key + " Cas Value: " + casValue + " Flags: "
      + flags + " Exp: " + exp + " Data Length: " + data.length;
  }
}
File
CASOperationImpl.java
Developer's decision
Combination
Kind of conflict
Annotation
Attribute
Comment
Method declaration
Method invocation
Chunk
Conflicting content
    setBuffer(b);
  }

<<<<<<< HEAD
  public Collection getKeys() {
    return Collections.singleton(key);
  }
=======
	@Override
	public String toString() {
		return "Cmd: delete Key: " + key;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    setBuffer(b);
  }

  public Collection getKeys() {
    return Collections.singleton(key);
  }

  @Override
  public String toString() {
    return "Cmd: delete Key: " + key;
  }
}
File
DeleteOperationImpl.java
Developer's decision
Concatenation
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
/**
 * Memcached flush_all operation.
 */
<<<<<<< HEAD
final class FlushOperationImpl extends OperationImpl implements FlushOperation {

  private static final byte[] FLUSH = "flush_all\r\n".getBytes();

  private static final OperationStatus OK = new OperationStatus(true, "OK");

  private final int delay;

  public FlushOperationImpl(int d, OperationCallback cb) {
    super(cb);
    delay = d;
  }

  @Override
  public void handleLine(String line) {
    getLogger().debug("Flush completed successfully");
    getCallback().receivedStatus(matchStatus(line, OK));
    transitionState(OperationState.COMPLETE);
  }

  @Override
  public void initialize() {
    ByteBuffer b = null;
    if (delay == -1) {
      b = ByteBuffer.wrap(FLUSH);
    } else {
      b = ByteBuffer.allocate(32);
      b.put(("flush_all " + delay + "\r\n").getBytes());
      b.flip();
    }
    setBuffer(b);
  }
=======
final class FlushOperationImpl extends OperationImpl
	implements FlushOperation {

	private static final byte[] FLUSH="flush_all\r\n".getBytes();

	private static final OperationStatus OK=
		new OperationStatus(true, "OK");

	private final int delay;

	public FlushOperationImpl(int d, OperationCallback cb) {
		super(cb);
		delay=d;
	}

	@Override
	public void handleLine(String line) {
		getLogger().debug("Flush completed successfully");
		getCallback().receivedStatus(matchStatus(line, OK));
		transitionState(OperationState.COMPLETE);
	}

	@Override
	public void initialize() {
		ByteBuffer b=null;
		if(delay == -1) {
			b=ByteBuffer.wrap(FLUSH);
		} else {
			b=ByteBuffer.allocate(32);
			b.put( ("flush_all " + delay + "\r\n").getBytes());
			b.flip();
		}
		setBuffer(b);
	}

	@Override
	public String toString() {
		return "Cmd: flush_all Delay: " + delay;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
/**
 * Memcached flush_all operation.
 */
final class FlushOperationImpl extends OperationImpl implements FlushOperation {

  private static final byte[] FLUSH = "flush_all\r\n".getBytes();

  private static final OperationStatus OK = new OperationStatus(true, "OK");

  private final int delay;

  public FlushOperationImpl(int d, OperationCallback cb) {
    super(cb);
    delay = d;
  }

  @Override
  public void handleLine(String line) {
    getLogger().debug("Flush completed successfully");
    getCallback().receivedStatus(matchStatus(line, OK));
    transitionState(OperationState.COMPLETE);
  }

  @Override
  public void initialize() {
    ByteBuffer b = null;
    if (delay == -1) {
      b = ByteBuffer.wrap(FLUSH);
    } else {
      b = ByteBuffer.allocate(32);
      b.put(("flush_all " + delay + "\r\n").getBytes());
      b.flip();
    }
    setBuffer(b);
  }

  @Override
  public String toString() {
    return "Cmd: flush_all Delay: " + delay;
  }
}
File
FlushOperationImpl.java
Developer's decision
Version 2
Kind of conflict
Annotation
Attribute
Class signature
Method declaration
Method invocation
Chunk
Conflicting content
/**
 * Operation for mutating integers inside of memcached.
 */
<<<<<<< HEAD
final class MutatorOperationImpl extends OperationImpl implements
    MutatorOperation {

  public static final int OVERHEAD = 32;

  private static final OperationStatus NOT_FOUND = new OperationStatus(false,
      "NOT_FOUND");

  private final Mutator mutator;
  private final String key;
  private final int amount;

  public MutatorOperationImpl(Mutator m, String k, int amt,
      OperationCallback c) {
    super(c);
    mutator = m;
    key = k;
    amount = amt;
  }

  @Override
  public void handleLine(String line) {
    getLogger().debug("Result:  %s", line);
    OperationStatus found = null;
    if (line.equals("NOT_FOUND")) {
      found = NOT_FOUND;
    } else {
      found = new OperationStatus(true, line);
    }
    getCallback().receivedStatus(found);
    transitionState(OperationState.COMPLETE);
  }

  @Override
  public void initialize() {
    int size = KeyUtil.getKeyBytes(key).length + OVERHEAD;
    ByteBuffer b = ByteBuffer.allocate(size);
    setArguments(b, mutator.name(), key, amount);
    b.flip();
    setBuffer(b);
  }

  @Override
  protected void wasCancelled() {
    // XXX: Replace this comment with why the hell I did this.
    getCallback().receivedStatus(CANCELLED);
  }

  public Collection getKeys() {
    return Collections.singleton(key);
  }

  public int getBy() {
    return amount;
  }

  public long getDefault() {
    return -1;
  }

  public int getExpiration() {
    return -1;
  }

  public Mutator getType() {
    return mutator;
  }
=======
final class MutatorOperationImpl extends OperationImpl
	implements MutatorOperation {

	public static final int OVERHEAD=32;

	private static final OperationStatus NOT_FOUND=
		new OperationStatus(false, "NOT_FOUND");

	private final Mutator mutator;
	private final String key;
	private final int amount;

	public MutatorOperationImpl(Mutator m, String k, int amt,
			OperationCallback c) {
		super(c);
		mutator=m;
		key=k;
		amount=amt;
	}

	@Override
	public void handleLine(String line) {
		getLogger().debug("Result:  %s", line);
		OperationStatus found=null;
		if(line.equals("NOT_FOUND")) {
			found=NOT_FOUND;
		} else {
			found=new OperationStatus(true, line);
		}
		getCallback().receivedStatus(found);
		transitionState(OperationState.COMPLETE);
	}

	@Override
	public void initialize() {
		int size=KeyUtil.getKeyBytes(key).length + OVERHEAD;
		ByteBuffer b=ByteBuffer.allocate(size);
		setArguments(b, mutator.name(), key, amount);
		b.flip();
		setBuffer(b);
	}

	@Override
	protected void wasCancelled() {
		// XXX:  Replace this comment with why the hell I did this.
		getCallback().receivedStatus(CANCELLED);
	}

	public Collection getKeys() {
		return Collections.singleton(key);
	}

	public int getBy() {
		return amount;
	}

	public long getDefault() {
		return -1;
	}

	public int getExpiration() {
		return -1;
	}

	public Mutator getType() {
		return mutator;
	}

	@Override
	public String toString() {
		return "Cmd: " + mutator.name() + " Key: " + key + " Amount: " + amount;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
/**
 * Operation for mutating integers inside of memcached.
 */
final class MutatorOperationImpl extends OperationImpl implements
    MutatorOperation {

  public static final int OVERHEAD = 32;

  private static final OperationStatus NOT_FOUND = new OperationStatus(false,
      "NOT_FOUND");

  private final Mutator mutator;
  private final String key;
  private final int amount;

  public MutatorOperationImpl(Mutator m, String k, int amt,
      OperationCallback c) {
    super(c);
    mutator = m;
    key = k;
    amount = amt;
  }

  @Override
  public void handleLine(String line) {
    getLogger().debug("Result:  %s", line);
    OperationStatus found = null;
    if (line.equals("NOT_FOUND")) {
      found = NOT_FOUND;
    } else {
      found = new OperationStatus(true, line);
    }
    getCallback().receivedStatus(found);
    transitionState(OperationState.COMPLETE);
  }

  @Override
  public void initialize() {
    int size = KeyUtil.getKeyBytes(key).length + OVERHEAD;
    ByteBuffer b = ByteBuffer.allocate(size);
    setArguments(b, mutator.name(), key, amount);
    b.flip();
    setBuffer(b);
  }

  @Override
  protected void wasCancelled() {
    // XXX: Replace this comment with why the hell I did this.
    getCallback().receivedStatus(CANCELLED);
  }

  public Collection getKeys() {
    return Collections.singleton(key);
  }

  public int getBy() {
    return amount;
  }

  public long getDefault() {
    return -1;
  }

  public int getExpiration() {
    return -1;
  }

  public Mutator getType() {
    return mutator;
  }

  @Override
  public String toString() {
    return "Cmd: " + mutator.name() + " Key: " + key + " Amount: " + amount;
  }
}
File
MutatorOperationImpl.java
Developer's decision
Version 2
Kind of conflict
Annotation
Attribute
Class signature
Method declaration
Method invocation
Chunk
Conflicting content
    setBuffer(ByteBuffer.wrap(msg));
  }

<<<<<<< HEAD
  @Override
  protected void wasCancelled() {
    cb.receivedStatus(CANCELLED);
  }
=======
	@Override
	public String toString() {
		return "Cmd: " + msg;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    setBuffer(ByteBuffer.wrap(msg));
  }

  @Override
  protected void wasCancelled() {
    cb.receivedStatus(CANCELLED);
  }

  @Override
  public String toString() {
    return "Cmd: " + msg;
  }
}
File
StatsOperationImpl.java
Developer's decision
Concatenation
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
/**
 * Operation to request the version of a memcached server.
 */
<<<<<<< HEAD
final class VersionOperationImpl extends OperationImpl implements
    VersionOperation, NoopOperation {

  private static final byte[] REQUEST = "version\r\n".getBytes();

  public VersionOperationImpl(OperationCallback c) {
    super(c);
  }

  @Override
  public void handleLine(String line) {
    assert line.startsWith("VERSION ");
    getCallback().receivedStatus(
        new OperationStatus(true, line.substring("VERSION ".length())));
    transitionState(OperationState.COMPLETE);
  }

  @Override
  public void initialize() {
    setBuffer(ByteBuffer.wrap(REQUEST));
  }
=======
final class VersionOperationImpl extends OperationImpl
	implements VersionOperation, NoopOperation {

	private static final byte[] REQUEST="version\r\n".getBytes();

	public VersionOperationImpl(OperationCallback c) {
		super(c);
	}

	@Override
	public void handleLine(String line) {
		assert line.startsWith("VERSION ");
		getCallback().receivedStatus(
				new OperationStatus(true, line.substring("VERSION ".length())));
		transitionState(OperationState.COMPLETE);
	}

	@Override
	public void initialize() {
		setBuffer(ByteBuffer.wrap(REQUEST));
	}

	@Override
	public String toString() {
		return "Cmd: version";
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
/**
 * Operation to request the version of a memcached server.
 */
final class VersionOperationImpl extends OperationImpl implements
    VersionOperation, NoopOperation {

  private static final byte[] REQUEST = "version\r\n".getBytes();

  public VersionOperationImpl(OperationCallback c) {
    super(c);
  }

  @Override
  public void handleLine(String line) {
    assert line.startsWith("VERSION ");
    getCallback().receivedStatus(
        new OperationStatus(true, line.substring("VERSION ".length())));
    transitionState(OperationState.COMPLETE);
  }

  @Override
  public void initialize() {
    setBuffer(ByteBuffer.wrap(REQUEST));
  }

  @Override
  public String toString() {
    return "Cmd: version";
  }
}
File
VersionOperationImpl.java
Developer's decision
Version 2
Kind of conflict
Annotation
Attribute
Class signature
Method declaration
Method invocation
Chunk
Conflicting content
    return data;
  }

<<<<<<< HEAD
  public ConcatenationType getStoreType() {
    return catType;
  }
=======
	@Override
	public String toString() {
		return super.toString() + " Cas: " + cas + " Data Length: " + data.length;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    return data;
  }

  public ConcatenationType getStoreType() {
    return catType;
  }

  @Override
  public String toString() {
    return super.toString() + " Cas: " + cas + " Data Length: " + data.length;
  }
}
File
ConcatenationOperationImpl.java
Developer's decision
Concatenation
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
    cas = c;
  }

<<<<<<< HEAD
  @Override
  public void initialize() {
    prepareBuffer(key, cas, EMPTY_BYTES);
  }
=======
	@Override
	public String toString() {
		return super.toString() + " Cas: " + cas;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
  }
    cas = c;

  @Override
  public void initialize() {
    prepareBuffer(key, cas, EMPTY_BYTES);
  }

  @Override
  public String toString() {
    return super.toString() + " Cas: " + cas;
  }
}
File
DeleteOperationImpl.java
Developer's decision
Concatenation
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
    delay = d;
  }

<<<<<<< HEAD
  @Override
  public void initialize() {
    prepareBuffer("", 0, EMPTY_BYTES, delay);
  }
}
=======
	@Override
	public String toString() {
		return super.toString() + " Delay: " + delay;
	}
}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
Solution content
    delay = d;
  }

  @Override
  public void initialize() {
    prepareBuffer("", 0, EMPTY_BYTES, delay);
  }

  @Override
  public String toString() {
    return super.toString() + " Delay: " + delay;
  }
}
File
FlushOperationImpl.java
Developer's decision
Combination
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
}
    prepareBuffer(key, 0, EMPTY_BYTES, exp);
  }

<<<<<<< HEAD
  @Override
  protected void decodePayload(byte[] pl) {
    final int flags = decodeInt(pl, 0);
    final byte[] data = new byte[pl.length - EXTRA_HDR_LEN];
    System.arraycopy(pl, EXTRA_HDR_LEN, data, 0, pl.length - EXTRA_HDR_LEN);
    GetAndTouchOperation.Callback gcb =
        (GetAndTouchOperation.Callback) getCallback();
    gcb.gotData(key, flags, responseCas, data);
    getCallback().receivedStatus(STATUS_OK);
  }
=======
	@Override
	public String toString() {
		return super.toString() + " Exp: " + exp;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
Solution content
    prepareBuffer(key, 0, EMPTY_BYTES, exp);
  }

  @Override
  protected void decodePayload(byte[] pl) {
    final int flags = decodeInt(pl, 0);
    final byte[] data = new byte[pl.length - EXTRA_HDR_LEN];
    System.arraycopy(pl, EXTRA_HDR_LEN, data, 0, pl.length - EXTRA_HDR_LEN);
    GetAndTouchOperation.Callback gcb =
        (GetAndTouchOperation.Callback) getCallback();
    gcb.gotData(key, flags, responseCas, data);
    getCallback().receivedStatus(STATUS_OK);
  }

  @Override
  public String toString() {
    return super.toString() + " Exp: " + exp;
  }
}
File
GetAndTouchOperationImpl.java
Developer's decision
Concatenation
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
import net.spy.memcached.ops.GetlOperation;

<<<<<<< HEAD
/**
 * Implementation of the get and lock operation.
 */
public class GetlOperationImpl extends SingleKeyOperationImpl implements
    GetlOperation {

  static final int GETL_CMD = 0x94;

  /**
   * Length of the extra header stuff for a GET response.
   */
  static final int EXTRA_HDR_LEN = 4;

  private final int exp;

  public GetlOperationImpl(String k, int e, GetlOperation.Callback cb) {
    super(GETL_CMD, generateOpaque(), k, cb);
    exp = e;
  }

  @Override
  public void initialize() {
    prepareBuffer(key, 0, EMPTY_BYTES, exp);
  }

  @Override
  protected void decodePayload(byte[] pl) {
    final int flags = decodeInt(pl, 0);
    final byte[] data = new byte[pl.length - EXTRA_HDR_LEN];
    System.arraycopy(pl, EXTRA_HDR_LEN, data, 0, pl.length - EXTRA_HDR_LEN);
    GetlOperation.Callback gcb = (GetlOperation.Callback) getCallback();
    gcb.gotData(key, flags, responseCas, data);
    getCallback().receivedStatus(STATUS_OK);
  }
=======
public class GetlOperationImpl extends SingleKeyOperationImpl
		implements GetlOperation {

	static final int GETL_CMD=0x94;

	/**
	 * Length of the extra header stuff for a GET response.
	 */
	static final int EXTRA_HDR_LEN=4;

	private final int exp;

	public GetlOperationImpl(String k, int e, GetlOperation.Callback cb) {
		super(GETL_CMD, generateOpaque(), k, cb);
		exp=e;
	}

	@Override
	public void initialize() {
		prepareBuffer(key, 0, EMPTY_BYTES, exp);
	}

	@Override
	protected void decodePayload(byte[] pl) {
		final int flags=decodeInt(pl, 0);
		final byte[] data=new byte[pl.length - EXTRA_HDR_LEN];
		System.arraycopy(pl, EXTRA_HDR_LEN, data, 0, pl.length-EXTRA_HDR_LEN);
		GetlOperation.Callback gcb=(GetlOperation.Callback)getCallback();
		gcb.gotData(key, flags, responseCas, data);
		getCallback().receivedStatus(STATUS_OK);
	}

	@Override
	public String toString() {
		return super.toString() + " Exp: " + exp;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    exp = e;

import net.spy.memcached.ops.GetlOperation;

/**
 * Implementation of the get and lock operation.
 */
public class GetlOperationImpl extends SingleKeyOperationImpl implements
    GetlOperation {

  static final int GETL_CMD = 0x94;

  /**
   * Length of the extra header stuff for a GET response.
   */
  static final int EXTRA_HDR_LEN = 4;

  private final int exp;

  public GetlOperationImpl(String k, int e, GetlOperation.Callback cb) {
    super(GETL_CMD, generateOpaque(), k, cb);
  }

  @Override
  public void initialize() {
    prepareBuffer(key, 0, EMPTY_BYTES, exp);
  }

  @Override
  protected void decodePayload(byte[] pl) {
    final int flags = decodeInt(pl, 0);
    final byte[] data = new byte[pl.length - EXTRA_HDR_LEN];
    System.arraycopy(pl, EXTRA_HDR_LEN, data, 0, pl.length - EXTRA_HDR_LEN);
    GetlOperation.Callback gcb = (GetlOperation.Callback) getCallback();
    gcb.gotData(key, flags, responseCas, data);
    getCallback().receivedStatus(STATUS_OK);
  }

  @Override
  public String toString() {
    return super.toString() + " Exp: " + exp;
  }
}
File
GetlOperationImpl.java
Developer's decision
Combination
Kind of conflict
Annotation
Attribute
Class signature
Comment
Method declaration
Chunk
Conflicting content
 * Binary operations that contain multiple keys and are VBucket aware operations
 * should extend this class.
 */
<<<<<<< HEAD
abstract class MultiKeyOperationImpl extends OperationImpl implements
    VBucketAware, KeyedOperation {
  protected final Map vbmap = new HashMap();

  protected MultiKeyOperationImpl(int c, int o, OperationCallback cb) {
    super(c, o, cb);
  }

  public Collection getKeys() {
    return vbmap.keySet();
  }

  public Collection getNotMyVbucketNodes() {
    return notMyVbucketNodes;
  }

  public void addNotMyVbucketNode(MemcachedNode node) {
    notMyVbucketNodes.add(node);
  }

  public void setNotMyVbucketNodes(Collection nodes) {
    notMyVbucketNodes = nodes;
  }

  public void setVBucket(String k, short vb) {
    assert vbmap.containsKey(k) : "Key " + k + " not contained in operation";
    vbmap.put(k, Short.valueOf(vb));
  }

  public short getVBucket(String k) {
    assert vbmap.containsKey(k) : "Key " + k + " not contained in operation";
    return vbmap.get(k);
  }
=======
abstract class MultiKeyOperationImpl extends OperationImpl
		implements VBucketAware, KeyedOperation {
	protected final Map vbmap = new HashMap();

	protected MultiKeyOperationImpl(int c, int o, OperationCallback cb) {
		super(c, o, cb);
	}

	public Collection getKeys() {
		return vbmap.keySet();
	}

	public Collection getNotMyVbucketNodes() {
		return notMyVbucketNodes;
	}

	public void addNotMyVbucketNode(MemcachedNode node) {
		notMyVbucketNodes.add(node);
	}

	public void setNotMyVbucketNodes(Collection nodes) {
		notMyVbucketNodes = nodes;
	}

	public void setVBucket(String k, short vb) {
		assert vbmap.containsKey(k) : "Key " + k + " not contained in operation";
		vbmap.put(k, new Short(vb));
	}

	public short getVBucket(String k) {
		assert vbmap.containsKey(k) : "Key " + k + " not contained in operation" ;
		return vbmap.get(k);
	}

	@Override
	public String toString() {
		return super.toString() + " Keys: " + StringUtils.join(getKeys(), " ");
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
 * Binary operations that contain multiple keys and are VBucket aware operations
 * should extend this class.
 */
abstract class MultiKeyOperationImpl extends OperationImpl implements
    VBucketAware, KeyedOperation {
  protected final Map vbmap = new HashMap();

  protected MultiKeyOperationImpl(int c, int o, OperationCallback cb) {
    super(c, o, cb);
  }

  public Collection getKeys() {
    return vbmap.keySet();
  }

  public Collection getNotMyVbucketNodes() {
    return notMyVbucketNodes;
  }

  public void addNotMyVbucketNode(MemcachedNode node) {
    notMyVbucketNodes.add(node);
  }

  public void setNotMyVbucketNodes(Collection nodes) {
    notMyVbucketNodes = nodes;
  }

  public void setVBucket(String k, short vb) {
    assert vbmap.containsKey(k) : "Key " + k + " not contained in operation";
    vbmap.put(k, Short.valueOf(vb));
  }

  public short getVBucket(String k) {
    assert vbmap.containsKey(k) : "Key " + k + " not contained in operation";
    return vbmap.get(k);
  }

  @Override
  public String toString() {
    return super.toString() + " Keys: " + StringUtils.join(getKeys(), " ");
  }
}
File
MultiKeyOperationImpl.java
Developer's decision
Combination
Kind of conflict
Annotation
Attribute
Class signature
Method declaration
Method invocation
Chunk
Conflicting content
    return exp;
  }

<<<<<<< HEAD
  public Mutator getType() {
    return mutator;
  }
=======
	@Override
	public String toString() {
		return super.toString() + " Amount: " + by + " Default: " + def + " Exp: "
			+ exp;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    return exp;
  }

  public Mutator getType() {
    return mutator;
  }

  @Override
  public String toString() {
    return super.toString() + " Amount: " + by + " Default: " + def + " Exp: "
      + exp;
  }
}
File
MutatorOperationImpl.java
Developer's decision
Concatenation
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
 */
abstract class OperationImpl extends BaseOperationImpl implements Operation {

<<<<<<< HEAD
  protected static final byte REQ_MAGIC = (byte) 0x80;
  protected static final byte RES_MAGIC = (byte) 0x81;
  protected static final int MIN_RECV_PACKET = 24;

  /**
   * Error code for operations.
   */
  protected static final int SUCCESS = 0x00;
  protected static final int ERR_NOT_FOUND = 0x01;
  protected static final int ERR_EXISTS = 0x02;
  protected static final int ERR_2BIG = 0x03;
  protected static final int ERR_INVAL = 0x04;
  protected static final int ERR_NOT_STORED = 0x05;
  protected static final int ERR_DELTA_BADVAL = 0x06;
  protected static final int ERR_NOT_MY_VBUCKET = 0x07;
  protected static final int ERR_UNKNOWN_COMMAND = 0x81;
  protected static final int ERR_NO_MEM = 0x82;
  protected static final int ERR_NOT_SUPPORTED = 0x83;
  protected static final int ERR_INTERNAL = 0x84;
  protected static final int ERR_BUSY = 0x85;
  protected static final int ERR_TEMP_FAIL = 0x86;

  protected static final byte[] EMPTY_BYTES = new byte[0];

  protected static final OperationStatus STATUS_OK = new CASOperationStatus(
      true, "OK", CASResponse.OK);

  private static final AtomicInteger SEQ_NUMBER = new AtomicInteger(0);

  // request header fields
  private final int cmd;
  protected short vbucket = 0;
  protected final int opaque;

  private final byte[] header = new byte[MIN_RECV_PACKET];
  private int headerOffset = 0;
  private byte[] payload = null;

  // Response header fields
  protected int keyLen;
  protected int responseCmd;
  protected int errorCode;
  protected int responseOpaque;
  protected long responseCas;

  private int payloadOffset = 0;

  /**
   * Construct with opaque.
   *
   * @param o the opaque value.
   * @param cb
   */
  protected OperationImpl(int c, int o, OperationCallback cb) {
    super();
    cmd = c;
    opaque = o;
    setCallback(cb);
  }

  protected void resetInput() {
    payload = null;
    payloadOffset = 0;
    headerOffset = 0;
  }

  // Base response packet format:
  // 0 1 2 3 4 5 6 7 8 9 10 11
  // # magic, opcode, keylen, extralen, datatype, status, bodylen,
  // 12,3,4,5 16
  // opaque, cas
  // RES_PKT_FMT=">BBHBBHIIQ"

  @Override
  public void readFromBuffer(ByteBuffer b) throws IOException {
    // First process headers if we haven't completed them yet
    if (headerOffset < MIN_RECV_PACKET) {
      int toRead = MIN_RECV_PACKET - headerOffset;
      int available = b.remaining();
      toRead = Math.min(toRead, available);
      getLogger().debug("Reading %d header bytes", toRead);
      b.get(header, headerOffset, toRead);
      headerOffset += toRead;

      // We've completed reading the header. Prepare body read.
      if (headerOffset == MIN_RECV_PACKET) {
        int magic = header[0];
        assert magic == RES_MAGIC : "Invalid magic:  " + magic;
        responseCmd = header[1];
        assert cmd == -1 || responseCmd == cmd : "Unexpected response"
            + " command value";
        keyLen = decodeShort(header, 2);
        // TODO: Examine extralen and datatype
        errorCode = decodeShort(header, 6);
        int bytesToRead = decodeInt(header, 8);
        payload = new byte[bytesToRead];
        responseOpaque = decodeInt(header, 12);
        responseCas = decodeLong(header, 16);
        assert opaqueIsValid() : "Opaque is not valid";
      }
    }

    // Now process the payload if we can.
    if (headerOffset >= MIN_RECV_PACKET && payload == null) {
      finishedPayload(EMPTY_BYTES);
    } else if (payload != null) {
      int toRead = payload.length - payloadOffset;
      int available = b.remaining();
      toRead = Math.min(toRead, available);
      getLogger().debug("Reading %d payload bytes", toRead);
      b.get(payload, payloadOffset, toRead);
      payloadOffset += toRead;

      // Have we read it all?
      if (payloadOffset == payload.length) {
        finishedPayload(payload);
      }
    } else {
      // Haven't read enough to make up a payload. Must read more.
      getLogger().debug("Only read %d of the %d needed to fill a header",
          headerOffset, MIN_RECV_PACKET);
    }

  }

  protected void finishedPayload(byte[] pl) throws IOException {
    OperationStatus status = getStatusForErrorCode(errorCode, pl);

    if (status == null) {
      handleError(OperationErrorType.SERVER, new String(pl));
    } else if (errorCode == SUCCESS) {
      decodePayload(pl);
      transitionState(OperationState.COMPLETE);
    } else if (errorCode == ERR_NOT_MY_VBUCKET
        && !getState().equals(OperationState.COMPLETE)) {
      transitionState(OperationState.RETRY);
    } else {
      getCallback().receivedStatus(status);
      transitionState(OperationState.COMPLETE);
    }
  }

  /**
   * Get the OperationStatus object for the given error code.
   *
   * @param errCode the error code
   * @return the status to return, or null if this is an exceptional case
   */
  protected OperationStatus getStatusForErrorCode(int errCode, byte[] errPl)
    throws IOException {
    switch (errCode) {
    case SUCCESS:
      return STATUS_OK;
    case ERR_NOT_FOUND:
      return new CASOperationStatus(false, new String(errPl),
          CASResponse.NOT_FOUND);
    case ERR_EXISTS:
      return new CASOperationStatus(false, new String(errPl),
          CASResponse.EXISTS);
    case ERR_NOT_STORED:
      return new CASOperationStatus(false, new String(errPl),
          CASResponse.NOT_FOUND);
    case ERR_2BIG:
    case ERR_INTERNAL:
      handleError(OperationErrorType.SERVER, new String(errPl));
    case ERR_INVAL:
    case ERR_DELTA_BADVAL:
    case ERR_NOT_MY_VBUCKET:
    case ERR_UNKNOWN_COMMAND:
    case ERR_NO_MEM:
    case ERR_NOT_SUPPORTED:
    case ERR_BUSY:
    case ERR_TEMP_FAIL:
      return new OperationStatus(false, new String(errPl));
    default:
      return null;
    }
  }

  /**
   * Decode the given payload for this command.
   *
   * @param pl the payload.
   */
  protected void decodePayload(byte[] pl) {
    assert pl.length == 0 : "Payload has bytes, but decode isn't overridden";
    getCallback().receivedStatus(STATUS_OK);
  }

  /**
   * Validate an opaque value from the header. This may be overridden from a
   * subclass where the opaque isn't expected to always be the same as the
   * request opaque.
   */
  protected boolean opaqueIsValid() {
    if (responseOpaque != opaque) {
      getLogger().warn("Expected opaque:  %d, got opaque:  %d\n",
          responseOpaque, opaque);
    }
    return responseOpaque == opaque;
  }

  static int decodeShort(byte[] data, int i) {
    return (data[i] & 0xff) << 8 | (data[i + 1] & 0xff);
  }

  static int decodeInt(byte[] data, int i) {
    return (data[i] & 0xff) << 24
      | (data[i + 1] & 0xff) << 16
      | (data[i + 2] & 0xff) << 8
      | (data[i + 3] & 0xff);
  }

  static long decodeUnsignedInt(byte[] data, int i) {
    return ((long) (data[i] & 0xff) << 24)
      | ((data[i + 1] & 0xff) << 16)
      | ((data[i + 2] & 0xff) << 8)
      | (data[i + 3] & 0xff);
  }

  static long decodeLong(byte[] data, int i) {
    return (data[i] & 0xff) << 56
      | (data[i + 1] & 0xff) << 48
      | (data[i + 2] & 0xff) << 40
      | (data[i + 3] & 0xff) << 32
      | (data[i + 4] & 0xff) << 24
      | (data[i + 5] & 0xff) << 16
      | (data[i + 6] & 0xff) << 8
      | (data[i + 7] & 0xff);
  }

  /**
   * Prepare a send buffer.
   *
   * @param key the key (for keyed ops)
   * @param cas the cas value
   * @param val the data payload
   * @param extraHeaders any additional headers that need to be sent
   */
  protected void prepareBuffer(String key, long cas, byte[] val,
      Object... extraHeaders) {
    int extraLen = 0;
    for (Object o : extraHeaders) {
      if (o instanceof Integer) {
        extraLen += 4;
      } else if (o instanceof byte[]) {
        extraLen += ((byte[]) o).length;
      } else if (o instanceof Long) {
        extraLen += 8;
      } else {
        assert false : "Unhandled extra header type:  " + o.getClass();
      }
    }
    final byte[] keyBytes = KeyUtil.getKeyBytes(key);
    int bufSize = MIN_RECV_PACKET + keyBytes.length + val.length;

    // # magic, opcode, keylen, extralen, datatype, [reserved],
    // bodylen, opaque, cas
    // REQ_PKT_FMT=">BBHBBxxIIQ"

    // set up the initial header stuff
    ByteBuffer bb = ByteBuffer.allocate(bufSize + extraLen);
    assert bb.order() == ByteOrder.BIG_ENDIAN;
    bb.put(REQ_MAGIC);
    bb.put((byte) cmd);
    bb.putShort((short) keyBytes.length);
    bb.put((byte) extraLen);
    bb.put((byte) 0); // data type
    bb.putShort(vbucket); // vbucket
    bb.putInt(keyBytes.length + val.length + extraLen);
    bb.putInt(opaque);
    bb.putLong(cas);

    // Add the extra headers.
    for (Object o : extraHeaders) {
      if (o instanceof Integer) {
        bb.putInt((Integer) o);
      } else if (o instanceof byte[]) {
        bb.put((byte[]) o);
      } else if (o instanceof Long) {
        bb.putLong((Long) o);
      } else {
        assert false : "Unhandled extra header type:  " + o.getClass();
      }
    }

    // Add the normal stuff
    bb.put(keyBytes);
    bb.put(val);

    bb.flip();
    setBuffer(bb);
  }

  /**
   * Generate an opaque ID.
   */
  static int generateOpaque() {
    int rv = SEQ_NUMBER.incrementAndGet();
    while (rv < 0) {
      SEQ_NUMBER.compareAndSet(rv, 0);
      rv = SEQ_NUMBER.incrementAndGet();
    }
    return rv;
  }
=======
	protected static final byte REQ_MAGIC = (byte)0x80;
	protected static final byte RES_MAGIC = (byte)0x81;
	protected static final int MIN_RECV_PACKET=24;

	/**
	 * Error code for operations.
	 */
	protected static final int SUCCESS = 0x00;
	protected static final int ERR_NOT_FOUND = 0x01;
	protected static final int ERR_EXISTS = 0x02;
	protected static final int ERR_2BIG = 0x03;
	protected static final int ERR_INVAL = 0x04;
	protected static final int ERR_NOT_STORED = 0x05;
	protected static final int ERR_DELTA_BADVAL = 0x06;
	protected static final int ERR_NOT_MY_VBUCKET = 0x07;
	protected static final int ERR_UNKNOWN_COMMAND = 0x81;
	protected static final int ERR_NO_MEM = 0x82;
	protected static final int ERR_NOT_SUPPORTED = 0x83;
	protected static final int ERR_INTERNAL = 0x84;
	protected static final int ERR_BUSY = 0x85;
	protected static final int ERR_TEMP_FAIL = 0x86;

	protected static final byte[] EMPTY_BYTES = new byte[0];

	protected static final OperationStatus STATUS_OK =
		new CASOperationStatus(true, "OK", CASResponse.OK);

	private static final AtomicInteger seqNumber=new AtomicInteger(0);

	// request header fields
	private final int cmd;
	protected short vbucket=0;
	protected final int opaque;

	private final byte[] header=new byte[MIN_RECV_PACKET];
	private int headerOffset=0;
	private byte[] payload=null;

	// Response header fields
	protected int keyLen;
	protected int responseCmd;
	protected int errorCode;
	protected int responseOpaque;
	protected long responseCas;

	private int payloadOffset=0;

	/**
	 * Construct with opaque.
	 *
	 * @param o the opaque value.
	 * @param cb
	 */
	protected OperationImpl(int c, int o, OperationCallback cb) {
		super();
		cmd=c;
		opaque=o;
		setCallback(cb);
	}

	protected void resetInput() {
		payload=null;
		payloadOffset=0;
		headerOffset=0;
	}

	// Base response packet format:
	//    0      1       2  3    4         5         6  7    8 9 10 11
	//	# magic, opcode, keylen, extralen, datatype, status, bodylen,
	//    12,3,4,5  16
	//    opaque, cas
	//	RES_PKT_FMT=">BBHBBHIIQ"

	@Override
	public void readFromBuffer(ByteBuffer b) throws IOException {
		// First process headers if we haven't completed them yet
		if(headerOffset < MIN_RECV_PACKET) {
			int toRead=MIN_RECV_PACKET - headerOffset;
			int available=b.remaining();
			toRead=Math.min(toRead, available);
			getLogger().debug("Reading %d header bytes", toRead);
			b.get(header, headerOffset, toRead);
			headerOffset+=toRead;

			// We've completed reading the header.  Prepare body read.
			if(headerOffset == MIN_RECV_PACKET) {
				int magic=header[0];
				assert magic == RES_MAGIC : "Invalid magic:  " + magic;
				responseCmd=header[1];
				assert cmd == -1 || responseCmd == cmd
					: "Unexpected response command value";
				keyLen=decodeShort(header, 2);
				// TODO:  Examine extralen and datatype
				errorCode=decodeShort(header, 6);
				int bytesToRead=decodeInt(header, 8);
				payload=new byte[bytesToRead];
				responseOpaque=decodeInt(header, 12);
				responseCas=decodeLong(header, 16);
				assert opaqueIsValid() : "Opaque is not valid";
			}
		}

		// Now process the payload if we can.
		if(headerOffset >= MIN_RECV_PACKET && payload == null) {
			finishedPayload(EMPTY_BYTES);
		} else if(payload != null) {
			int toRead=payload.length - payloadOffset;
			int available=b.remaining();
			toRead=Math.min(toRead, available);
			getLogger().debug("Reading %d payload bytes", toRead);
			b.get(payload, payloadOffset, toRead);
			payloadOffset+=toRead;

			// Have we read it all?
			if(payloadOffset == payload.length) {
				finishedPayload(payload);
			}
		} else {
			// Haven't read enough to make up a payload.  Must read more.
			getLogger().debug("Only read %d of the %d needed to fill a header",
				headerOffset, MIN_RECV_PACKET);
		}

	}

	protected void finishedPayload(byte[] pl) throws IOException {
		OperationStatus status=getStatusForErrorCode(errorCode, pl);

		if(status == null) {
			handleError(OperationErrorType.SERVER, new String(pl));
		} else if(errorCode == SUCCESS) {
			decodePayload(pl);
			transitionState(OperationState.COMPLETE);
		} else if (errorCode == ERR_NOT_MY_VBUCKET && !getState().equals(OperationState.COMPLETE)) {
            transitionState(OperationState.RETRY);
		} else {
			getCallback().receivedStatus(status);
			transitionState(OperationState.COMPLETE);
		}
	}

	/**
	 * Get the OperationStatus object for the given error code.
	 *
	 * @param errCode the error code
	 * @return the status to return, or null if this is an exceptional case
	 */
	protected OperationStatus getStatusForErrorCode(int errCode, byte[] errPl)
			throws IOException {
		switch (errCode) {
			case SUCCESS:
				return STATUS_OK;
			case ERR_NOT_FOUND:
				return new CASOperationStatus(false, new String(errPl), CASResponse.NOT_FOUND);
			case ERR_EXISTS:
				return new CASOperationStatus(false, new String(errPl), CASResponse.EXISTS);
			case ERR_NOT_STORED:
				return new CASOperationStatus(false, new String(errPl), CASResponse.NOT_FOUND);
			case ERR_2BIG:
			case ERR_INTERNAL:
				handleError(OperationErrorType.SERVER, new String(errPl));
			case ERR_INVAL:
			case ERR_DELTA_BADVAL:
			case ERR_NOT_MY_VBUCKET:
			case ERR_UNKNOWN_COMMAND:
			case ERR_NO_MEM:
			case ERR_NOT_SUPPORTED:
			case ERR_BUSY:
			case ERR_TEMP_FAIL:
				return new OperationStatus(false, new String(errPl));
			default:
				return null;
		}
	}

	/**
	 * Decode the given payload for this command.
	 *
	 * @param pl the payload.
	 */
	protected void decodePayload(byte[] pl) {
		assert pl.length == 0 : "Payload has bytes, but decode isn't overridden";
		getCallback().receivedStatus(STATUS_OK);
	}

	/**
	 * Validate an opaque value from the header.
	 * This may be overridden from a subclass where the opaque isn't expected
	 * to always be the same as the request opaque.
	 */
	protected boolean opaqueIsValid() {
		if(responseOpaque != opaque) {
			getLogger().warn("Expected opaque:  %d, got opaque:  %d\n",
					responseOpaque, opaque);
		}
		return responseOpaque == opaque;
	}

	static int decodeShort(byte[] data, int i) {
		return (data[i] & 0xff) << 8
			| (data[i+1] & 0xff);
	}

	static int decodeInt(byte[] data, int i) {
		return (data[i]  & 0xff) << 24
			| (data[i+1] & 0xff) << 16
			| (data[i+2] & 0xff) << 8
			| (data[i+3] & 0xff);
	}

	static long decodeUnsignedInt(byte[] data, int i) {
		return ((long)(data[i]  & 0xff) << 24)
			| ((data[i+1] & 0xff) << 16)
			| ((data[i+2] & 0xff) << 8)
			| (data[i+3] & 0xff);
	}

	static long decodeLong(byte[] data, int i) {
		return(data[i  ] & 0xff) << 56
			| (data[i+1] & 0xff) << 48
			| (data[i+2] & 0xff) << 40
			| (data[i+3] & 0xff) << 32
			| (data[i+4] & 0xff) << 24
			| (data[i+5] & 0xff) << 16
			| (data[i+6] & 0xff) << 8
			| (data[i+7] & 0xff);
	}

	/**
	 * Prepare a send buffer.
	 *
	 * @param key the key (for keyed ops)
	 * @param cas the cas value
	 * @param val the data payload
	 * @param extraHeaders any additional headers that need to be sent
	 */
	protected void prepareBuffer(String key, long cas, byte[] val,
			Object... extraHeaders) {
		int extraLen=0;
		for(Object o : extraHeaders) {
			if(o instanceof Integer) {
				extraLen += 4;
			} else if(o instanceof byte[]) {
				extraLen += ((byte[])o).length;
			} else if(o instanceof Long) {
				extraLen += 8;
			} else {
				assert false : "Unhandled extra header type:  " + o.getClass();
			}
		}
		final byte[] keyBytes=KeyUtil.getKeyBytes(key);
		int bufSize=MIN_RECV_PACKET + keyBytes.length + val.length;

		//	# magic, opcode, keylen, extralen, datatype, [reserved],
		//    bodylen, opaque, cas
		//	REQ_PKT_FMT=">BBHBBxxIIQ"

		// set up the initial header stuff
		ByteBuffer bb=ByteBuffer.allocate(bufSize + extraLen);
		assert bb.order() == ByteOrder.BIG_ENDIAN;
		bb.put(REQ_MAGIC);
		bb.put((byte)cmd);
		bb.putShort((short)keyBytes.length);
		bb.put((byte)extraLen);
		bb.put((byte)0); // data type
		bb.putShort(vbucket); // vbucket
		bb.putInt(keyBytes.length + val.length + extraLen);
		bb.putInt(opaque);
		bb.putLong(cas);

		// Add the extra headers.
		for(Object o : extraHeaders) {
			if(o instanceof Integer) {
				bb.putInt((Integer)o);
			} else if(o instanceof byte[]) {
				bb.put((byte[])o);
			} else if(o instanceof Long) {
				bb.putLong((Long)o);
			} else {
				assert false : "Unhandled extra header type:  " + o.getClass();
			}
		}

		// Add the normal stuff
		bb.put(keyBytes);
		bb.put(val);

		bb.flip();
		setBuffer(bb);
	}

	/**
	 * Generate an opaque ID.
	 */
	static int generateOpaque() {
		int rv = seqNumber.incrementAndGet();
		while(rv < 0) {
			seqNumber.compareAndSet(rv, 0);
			rv=seqNumber.incrementAndGet();
		}
		return rv;
	}

	@Override
	public String toString() {
		return "Cmd: " + cmd + " Opaque: " + opaque;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
 */
abstract class OperationImpl extends BaseOperationImpl implements Operation {

  protected static final byte REQ_MAGIC = (byte) 0x80;
  protected static final byte RES_MAGIC = (byte) 0x81;
  protected static final int MIN_RECV_PACKET = 24;

  /**
   * Error code for operations.
   */
  protected static final int SUCCESS = 0x00;
  protected static final int ERR_NOT_FOUND = 0x01;
  protected static final int ERR_EXISTS = 0x02;
  protected static final int ERR_2BIG = 0x03;
  protected static final int ERR_INVAL = 0x04;
  protected static final int ERR_NOT_STORED = 0x05;
  protected static final int ERR_DELTA_BADVAL = 0x06;
  protected static final int ERR_NOT_MY_VBUCKET = 0x07;
  protected static final int ERR_UNKNOWN_COMMAND = 0x81;
  protected static final int ERR_NO_MEM = 0x82;
  protected static final int ERR_NOT_SUPPORTED = 0x83;
  protected static final int ERR_INTERNAL = 0x84;
  protected static final int ERR_BUSY = 0x85;
  protected static final int ERR_TEMP_FAIL = 0x86;

  protected static final byte[] EMPTY_BYTES = new byte[0];

  protected static final OperationStatus STATUS_OK = new CASOperationStatus(
      true, "OK", CASResponse.OK);

  private static final AtomicInteger SEQ_NUMBER = new AtomicInteger(0);

  // request header fields
  private final int cmd;
  protected short vbucket = 0;
  protected final int opaque;

  private final byte[] header = new byte[MIN_RECV_PACKET];
  private int headerOffset = 0;
  private byte[] payload = null;

  // Response header fields
  protected int keyLen;
  protected int responseCmd;
  protected int errorCode;
  protected int responseOpaque;
  protected long responseCas;

  private int payloadOffset = 0;

  /**
   * Construct with opaque.
   *
   * @param o the opaque value.
   * @param cb
   */
  protected OperationImpl(int c, int o, OperationCallback cb) {
    super();
    cmd = c;
    opaque = o;
    setCallback(cb);
  }

  protected void resetInput() {
    payload = null;
    payloadOffset = 0;
    headerOffset = 0;
  }

  // Base response packet format:
  // 0 1 2 3 4 5 6 7 8 9 10 11
  // # magic, opcode, keylen, extralen, datatype, status, bodylen,
  // 12,3,4,5 16
  // opaque, cas
  // RES_PKT_FMT=">BBHBBHIIQ"

  @Override
  public void readFromBuffer(ByteBuffer b) throws IOException {
    // First process headers if we haven't completed them yet
    if (headerOffset < MIN_RECV_PACKET) {
      int toRead = MIN_RECV_PACKET - headerOffset;
      int available = b.remaining();
      toRead = Math.min(toRead, available);
      getLogger().debug("Reading %d header bytes", toRead);
      b.get(header, headerOffset, toRead);
      headerOffset += toRead;

      // We've completed reading the header. Prepare body read.
      if (headerOffset == MIN_RECV_PACKET) {
        int magic = header[0];
        assert magic == RES_MAGIC : "Invalid magic:  " + magic;
        responseCmd = header[1];
        assert cmd == -1 || responseCmd == cmd : "Unexpected response"
            + " command value";
        keyLen = decodeShort(header, 2);
        // TODO: Examine extralen and datatype
        errorCode = decodeShort(header, 6);
        int bytesToRead = decodeInt(header, 8);
        payload = new byte[bytesToRead];
        responseOpaque = decodeInt(header, 12);
        responseCas = decodeLong(header, 16);
        assert opaqueIsValid() : "Opaque is not valid";
      }
    }

    // Now process the payload if we can.
    if (headerOffset >= MIN_RECV_PACKET && payload == null) {
      finishedPayload(EMPTY_BYTES);
    } else if (payload != null) {
      int toRead = payload.length - payloadOffset;
      int available = b.remaining();
      toRead = Math.min(toRead, available);
      getLogger().debug("Reading %d payload bytes", toRead);
      b.get(payload, payloadOffset, toRead);
      payloadOffset += toRead;

      // Have we read it all?
      if (payloadOffset == payload.length) {
        finishedPayload(payload);
      }
    } else {
      // Haven't read enough to make up a payload. Must read more.
      getLogger().debug("Only read %d of the %d needed to fill a header",
          headerOffset, MIN_RECV_PACKET);
    }

  }

  protected void finishedPayload(byte[] pl) throws IOException {
    OperationStatus status = getStatusForErrorCode(errorCode, pl);

    if (status == null) {
      handleError(OperationErrorType.SERVER, new String(pl));
    } else if (errorCode == SUCCESS) {
      decodePayload(pl);
      transitionState(OperationState.COMPLETE);
    } else if (errorCode == ERR_NOT_MY_VBUCKET
        && !getState().equals(OperationState.COMPLETE)) {
      transitionState(OperationState.RETRY);
    } else {
      getCallback().receivedStatus(status);
      transitionState(OperationState.COMPLETE);
    }
  }

  /**
   * Get the OperationStatus object for the given error code.
   *
   * @param errCode the error code
   * @return the status to return, or null if this is an exceptional case
   */
  protected OperationStatus getStatusForErrorCode(int errCode, byte[] errPl)
    throws IOException {
    switch (errCode) {
    case SUCCESS:
      return STATUS_OK;
    case ERR_NOT_FOUND:
      return new CASOperationStatus(false, new String(errPl),
          CASResponse.NOT_FOUND);
    case ERR_EXISTS:
      return new CASOperationStatus(false, new String(errPl),
          CASResponse.EXISTS);
    case ERR_NOT_STORED:
      return new CASOperationStatus(false, new String(errPl),
          CASResponse.NOT_FOUND);
    case ERR_2BIG:
    case ERR_INTERNAL:
      handleError(OperationErrorType.SERVER, new String(errPl));
    case ERR_INVAL:
    case ERR_DELTA_BADVAL:
    case ERR_NOT_MY_VBUCKET:
    case ERR_UNKNOWN_COMMAND:
    case ERR_NO_MEM:
    case ERR_NOT_SUPPORTED:
    case ERR_BUSY:
    case ERR_TEMP_FAIL:
      return new OperationStatus(false, new String(errPl));
    default:
      return null;
    }
  }

  /**
   * Decode the given payload for this command.
   *
   * @param pl the payload.
   */
  protected void decodePayload(byte[] pl) {
    assert pl.length == 0 : "Payload has bytes, but decode isn't overridden";
    getCallback().receivedStatus(STATUS_OK);
  }

  /**
   * Validate an opaque value from the header. This may be overridden from a
   * subclass where the opaque isn't expected to always be the same as the
   * request opaque.
   */
  protected boolean opaqueIsValid() {
    if (responseOpaque != opaque) {
      getLogger().warn("Expected opaque:  %d, got opaque:  %d\n",
          responseOpaque, opaque);
    }
    return responseOpaque == opaque;
  }

  static int decodeShort(byte[] data, int i) {
    return (data[i] & 0xff) << 8 | (data[i + 1] & 0xff);
  }

  static int decodeInt(byte[] data, int i) {
    return (data[i] & 0xff) << 24
      | (data[i + 1] & 0xff) << 16
      | (data[i + 2] & 0xff) << 8
      | (data[i + 3] & 0xff);
  }

  static long decodeUnsignedInt(byte[] data, int i) {
    return ((long) (data[i] & 0xff) << 24)
      | ((data[i + 1] & 0xff) << 16)
      | ((data[i + 2] & 0xff) << 8)
      | (data[i + 3] & 0xff);
  }

  static long decodeLong(byte[] data, int i) {
    return (data[i] & 0xff) << 56
      | (data[i + 1] & 0xff) << 48
      | (data[i + 2] & 0xff) << 40
      | (data[i + 3] & 0xff) << 32
      | (data[i + 4] & 0xff) << 24
      | (data[i + 5] & 0xff) << 16
      | (data[i + 6] & 0xff) << 8
      | (data[i + 7] & 0xff);
  }

  /**
   * Prepare a send buffer.
   *
   * @param key the key (for keyed ops)
   * @param cas the cas value
   * @param val the data payload
   * @param extraHeaders any additional headers that need to be sent
   */
  protected void prepareBuffer(String key, long cas, byte[] val,
      Object... extraHeaders) {
    int extraLen = 0;
    for (Object o : extraHeaders) {
      if (o instanceof Integer) {
        extraLen += 4;
      } else if (o instanceof byte[]) {
        extraLen += ((byte[]) o).length;
      } else if (o instanceof Long) {
        extraLen += 8;
      } else {
        assert false : "Unhandled extra header type:  " + o.getClass();
      }
    }
    final byte[] keyBytes = KeyUtil.getKeyBytes(key);
    int bufSize = MIN_RECV_PACKET + keyBytes.length + val.length;

    // # magic, opcode, keylen, extralen, datatype, [reserved],
    // bodylen, opaque, cas
    // REQ_PKT_FMT=">BBHBBxxIIQ"

    // set up the initial header stuff
    ByteBuffer bb = ByteBuffer.allocate(bufSize + extraLen);
    assert bb.order() == ByteOrder.BIG_ENDIAN;
    bb.put(REQ_MAGIC);
    bb.put((byte) cmd);
    bb.putShort((short) keyBytes.length);
    bb.put((byte) extraLen);
    bb.put((byte) 0); // data type
    bb.putShort(vbucket); // vbucket
    bb.putInt(keyBytes.length + val.length + extraLen);
    bb.putInt(opaque);
    bb.putLong(cas);

    // Add the extra headers.
    for (Object o : extraHeaders) {
      if (o instanceof Integer) {
        bb.putInt((Integer) o);
      } else if (o instanceof byte[]) {
        bb.put((byte[]) o);
      } else if (o instanceof Long) {
        bb.putLong((Long) o);
      } else {
        assert false : "Unhandled extra header type:  " + o.getClass();
      }
    }

    // Add the normal stuff
    bb.put(keyBytes);
    bb.put(val);

    bb.flip();
    setBuffer(bb);
  }

  /**
   * Generate an opaque ID.
   */
  static int generateOpaque() {
    int rv = SEQ_NUMBER.incrementAndGet();
    while (rv < 0) {
      SEQ_NUMBER.compareAndSet(rv, 0);
      rv = SEQ_NUMBER.incrementAndGet();
    }
    return rv;
  }

  @Override
  public String toString() {
    return "Cmd: " + cmd + " Opaque: " + opaque;
  }
}
File
OperationImpl.java
Developer's decision
Combination
Kind of conflict
Annotation
Attribute
Cast expression
Comment
Method declaration
Method invocation
Chunk
Conflicting content
 */
public class OptimizedSetImpl extends MultiKeyOperationImpl {

<<<<<<< HEAD
  private static final OperationCallback NOOP_CALLBACK = new NoopCallback();

  private final int terminalOpaque = generateOpaque();
  private final Map callbacks =
      new HashMap();
  private final List ops = new ArrayList();

  // If nothing else, this will be a NOOP.
  private int byteCount = MIN_RECV_PACKET;

  /**
   * Construct an optimized get starting with the given get operation.
   */
  public OptimizedSetImpl(CASOperation firstStore) {

    super(-1, -1, NOOP_CALLBACK);
    addOperation(firstStore);
  }

  public void addOperation(CASOperation op) {
    ops.add(op);

    // Count the bytes required by this operation.
    Iterator is = op.getKeys().iterator();
    String k = is.next();
    int keylen = KeyUtil.getKeyBytes(k).length;

    byteCount += MIN_RECV_PACKET + StoreOperationImpl.EXTRA_LEN + keylen
      + op.getBytes().length;
  }

  public int size() {
    return ops.size();
  }

  public int bytes() {
    return byteCount;
  }

  @Override
  public void initialize() {
    // Now create a buffer.
    ByteBuffer bb = ByteBuffer.allocate(byteCount);
    for (CASOperation so : ops) {
      Iterator is = so.getKeys().iterator();
      String k = is.next();
      byte[] keyBytes = KeyUtil.getKeyBytes(k);
      assert !is.hasNext();

      int myOpaque = generateOpaque();
      callbacks.put(myOpaque, so.getCallback());
      byte[] data = so.getBytes();

      // Custom header
      bb.put(REQ_MAGIC);
      bb.put((byte) cmdMap(so.getStoreType()));
      bb.putShort((short) keyBytes.length);
      bb.put((byte) StoreOperationImpl.EXTRA_LEN); // extralen
      bb.put((byte) 0); // data type
      bb.putShort(((VBucketAware) so).getVBucket(k)); // vbucket
      bb.putInt(keyBytes.length + data.length + StoreOperationImpl.EXTRA_LEN);
      bb.putInt(myOpaque);
      bb.putLong(so.getCasValue()); // cas
      // Extras
      bb.putInt(so.getFlags());
      bb.putInt(so.getExpiration());
      // the actual key
      bb.put(keyBytes);
      // And the value
      bb.put(data);
    }
    // Add the noop
    bb.put(REQ_MAGIC);
    bb.put((byte) NoopOperationImpl.CMD);
    bb.putShort((short) 0);
    bb.put((byte) 0); // extralen
    bb.put((byte) 0); // data type
    bb.putShort((short) 0); // reserved
    bb.putInt(0);
    bb.putInt(terminalOpaque);
    bb.putLong(0); // cas

    bb.flip();
    setBuffer(bb);
  }

  private static int cmdMap(StoreType t) {
    int rv;
    switch (t) {
    case set:
      rv = StoreOperationImpl.SETQ;
      break;
    case add:
      rv = StoreOperationImpl.ADDQ;
      break;
    case replace:
      rv = StoreOperationImpl.REPLACEQ;
      break;
    default:
      rv = -1;
    }
    // Check fall-through.
    assert rv != -1 : "Unhandled store type:  " + t;
    return rv;
  }

  @Override
  protected void finishedPayload(byte[] pl) throws IOException {
    if (responseOpaque == terminalOpaque) {
      for (OperationCallback cb : callbacks.values()) {
        cb.receivedStatus(STATUS_OK);
        cb.complete();
      }
      transitionState(OperationState.COMPLETE);
    } else {
      OperationCallback cb = callbacks.remove(responseOpaque);
      assert cb != null : "No callback for " + responseOpaque;
      assert errorCode != 0 : "Got no error on a quiet mutation.";
      OperationStatus status = getStatusForErrorCode(errorCode, pl);
      assert status != null : "Got no status for a quiet mutation error";
      cb.receivedStatus(status);
      cb.complete();
    }
    resetInput();
  }

  @Override
  protected boolean opaqueIsValid() {
    return responseOpaque == terminalOpaque
        || callbacks.containsKey(responseOpaque);
  }

  static class NoopCallback implements OperationCallback {

    public void complete() {
      // noop
    }

    public void receivedStatus(OperationStatus status) {
      // noop
    }
  }
=======
	private static final OperationCallback NOOP_CALLBACK = new NoopCallback();

	private final int terminalOpaque=generateOpaque();
	private final Map callbacks =
		new HashMap();
	private final List ops = new ArrayList();

	// If nothing else, this will be a NOOP.
	private int byteCount = MIN_RECV_PACKET;

	/**
	 * Construct an optimized get starting with the given get operation.
	 */
	public OptimizedSetImpl(CASOperation firstStore) {
		super(-1, -1, NOOP_CALLBACK);
		addOperation(firstStore);
	}

	public void addOperation(CASOperation op) {
		ops.add(op);
		// Count the bytes required by this operation.
		Iterator is = op.getKeys().iterator();
		String k = is.next();
		int keylen = KeyUtil.getKeyBytes(k).length;

		byteCount += MIN_RECV_PACKET + StoreOperationImpl.EXTRA_LEN
			+ keylen + op.getData().length;
	}

	public int size() {
		return ops.size();
	}

	public int bytes() {
		return byteCount;
	}

	@Override
	public void initialize() {
		// Now create a buffer.
		ByteBuffer bb=ByteBuffer.allocate(byteCount);
		for(CASOperation so : ops) {
			Iterator is = so.getKeys().iterator();
			String k = is.next();
			byte[] keyBytes = KeyUtil.getKeyBytes(k);
			assert !is.hasNext();

			int myOpaque = generateOpaque();
			callbacks.put(myOpaque, so.getCallback());
			byte[] data = so.getData();

			// Custom header
			bb.put(REQ_MAGIC);
			bb.put((byte)cmdMap(so.getStoreType()));
			bb.putShort((short)keyBytes.length);
			bb.put((byte)StoreOperationImpl.EXTRA_LEN); // extralen
			bb.put((byte)0); // data type
			bb.putShort(((VBucketAware)so).getVBucket(k)); // vbucket
			bb.putInt(keyBytes.length + data.length +
						StoreOperationImpl.EXTRA_LEN);
			bb.putInt(myOpaque);
			bb.putLong(so.getCasValue()); // cas
			// Extras
			bb.putInt(so.getFlags());
			bb.putInt(so.getExpiration());
			// the actual key
			bb.put(keyBytes);
			// And the value
			bb.put(data);
		}
		// Add the noop
		bb.put(REQ_MAGIC);
		bb.put((byte)NoopOperationImpl.CMD);
		bb.putShort((short)0);
		bb.put((byte)0); // extralen
		bb.put((byte)0); // data type
		bb.putShort((short)0); // reserved
		bb.putInt(0);
		bb.putInt(terminalOpaque);
		bb.putLong(0); // cas

		bb.flip();
		setBuffer(bb);
	}

	private static int cmdMap(StoreType t) {
		int rv=-1;
		switch(t) {
			case set: rv=StoreOperationImpl.SETQ; break;
			case add: rv=StoreOperationImpl.ADDQ; break;
			case replace: rv=StoreOperationImpl.REPLACEQ; break;
		}
		// Check fall-through.
		assert rv != -1 : "Unhandled store type:  " + t;
		return rv;
	}

	@Override
	protected void finishedPayload(byte[] pl) throws IOException {
		if(responseOpaque == terminalOpaque) {
			for(OperationCallback cb : callbacks.values()) {
				cb.receivedStatus(STATUS_OK);
				cb.complete();
			}
			transitionState(OperationState.COMPLETE);
		} else {
			OperationCallback cb = callbacks.remove(responseOpaque);
			assert cb != null : "No callback for " + responseOpaque;
			assert errorCode != 0 : "Got no error on a quiet mutation.";
			OperationStatus status=getStatusForErrorCode(errorCode, pl);
			assert status != null : "Got no status for a quiet mutation error";
			cb.receivedStatus(status);
			cb.complete();
		}
		resetInput();
	}

	@Override
	protected boolean opaqueIsValid() {
		return responseOpaque == terminalOpaque
			|| callbacks.containsKey(responseOpaque);
	}

	static class NoopCallback implements OperationCallback {

		public void complete() {
			// noop
		}

		public void receivedStatus(OperationStatus status) {
			// noop
		}

	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
 */
public class OptimizedSetImpl extends MultiKeyOperationImpl {

  private static final OperationCallback NOOP_CALLBACK = new NoopCallback();

  private final int terminalOpaque = generateOpaque();
  private final Map callbacks =
      new HashMap();
  private final List ops = new ArrayList();

  // If nothing else, this will be a NOOP.
  private int byteCount = MIN_RECV_PACKET;

  /**
   * Construct an optimized get starting with the given get operation.
   */
  public OptimizedSetImpl(CASOperation firstStore) {
    super(-1, -1, NOOP_CALLBACK);
    addOperation(firstStore);
  }

  public void addOperation(CASOperation op) {
    ops.add(op);

    // Count the bytes required by this operation.
    Iterator is = op.getKeys().iterator();
    String k = is.next();
    int keylen = KeyUtil.getKeyBytes(k).length;

    byteCount += MIN_RECV_PACKET + StoreOperationImpl.EXTRA_LEN + keylen
      + op.getData().length;
  }

  public int size() {
    return ops.size();
  }

  public int bytes() {
    return byteCount;
  }

  @Override
  public void initialize() {
    // Now create a buffer.
    ByteBuffer bb = ByteBuffer.allocate(byteCount);
    for (CASOperation so : ops) {
      Iterator is = so.getKeys().iterator();
      String k = is.next();
      byte[] keyBytes = KeyUtil.getKeyBytes(k);
      assert !is.hasNext();

      int myOpaque = generateOpaque();
      callbacks.put(myOpaque, so.getCallback());
      byte[] data = so.getData();

      // Custom header
      bb.put(REQ_MAGIC);
      bb.put((byte) cmdMap(so.getStoreType()));
      bb.putShort((short) keyBytes.length);
      bb.put((byte) StoreOperationImpl.EXTRA_LEN); // extralen
      bb.put((byte) 0); // data type
      bb.putShort(((VBucketAware) so).getVBucket(k)); // vbucket
      bb.putInt(keyBytes.length + data.length + StoreOperationImpl.EXTRA_LEN);
      bb.putInt(myOpaque);
      bb.putLong(so.getCasValue()); // cas
      // Extras
      bb.putInt(so.getFlags());
      bb.putInt(so.getExpiration());
      // the actual key
      bb.put(keyBytes);
      // And the value
      bb.put(data);
    }
    // Add the noop
    bb.put(REQ_MAGIC);
    bb.put((byte) NoopOperationImpl.CMD);
    }

    bb.putShort((short) 0);
    bb.put((byte) 0); // extralen
    bb.put((byte) 0); // data type
    bb.putShort((short) 0); // reserved
    bb.putInt(0);
    bb.putInt(terminalOpaque);
    bb.putLong(0); // cas

    bb.flip();
    setBuffer(bb);
  }

  private static int cmdMap(StoreType t) {
    int rv;
    switch (t) {
    case set:
      rv = StoreOperationImpl.SETQ;
      break;
    case add:
      rv = StoreOperationImpl.ADDQ;
      break;
    case replace:
      rv = StoreOperationImpl.REPLACEQ;
      break;
    default:
      rv = -1;
    }
    // Check fall-through.
    assert rv != -1 : "Unhandled store type:  " + t;
    return rv;
  }

  @Override
  protected void finishedPayload(byte[] pl) throws IOException {
    if (responseOpaque == terminalOpaque) {
      for (OperationCallback cb : callbacks.values()) {
        cb.receivedStatus(STATUS_OK);
        cb.complete();
      }
      transitionState(OperationState.COMPLETE);
    } else {
      OperationCallback cb = callbacks.remove(responseOpaque);
      assert cb != null : "No callback for " + responseOpaque;
      assert errorCode != 0 : "Got no error on a quiet mutation.";
      OperationStatus status = getStatusForErrorCode(errorCode, pl);
      assert status != null : "Got no status for a quiet mutation error";
      cb.receivedStatus(status);
      cb.complete();
    }
    resetInput();
  }

  @Override
  protected boolean opaqueIsValid() {
    return responseOpaque == terminalOpaque
        || callbacks.containsKey(responseOpaque);
  }

  static class NoopCallback implements OperationCallback {

    public void complete() {
      // noop
    public void receivedStatus(OperationStatus status) {
      // noop
    }
  }
}
File
OptimizedSetImpl.java
Developer's decision
Manual
Kind of conflict
Annotation
Attribute
Class declaration
Comment
Method declaration
Method invocation
Chunk
Conflicting content
    super(CMD, m, EMPTY_BYTES, s, p, h, c);
  }

<<<<<<< HEAD
  @Override
  protected byte[] buildResponse(SaslClient sc) throws SaslException {
    return sc.hasInitialResponse() ? sc.evaluateChallenge(challenge)
        : EMPTY_BYTES;
  }
=======
	}

	@Override
	public String toString() {
		return "SASL auth operation";
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    super(CMD, m, EMPTY_BYTES, s, p, h, c);
  }

  @Override
  protected byte[] buildResponse(SaslClient sc) throws SaslException {
    return sc.hasInitialResponse() ? sc.evaluateChallenge(challenge)
        : EMPTY_BYTES;
  }

  @Override
  public String toString() {
    return "SASL auth operation";
  }
}
File
SASLAuthOperationImpl.java
Developer's decision
Combination
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
    getLogger().debug("Auth response:  %s", new String(pl));
  }

<<<<<<< HEAD
  @Override
  protected void finishedPayload(byte[] pl) throws IOException {
    if (errorCode == SASL_CONTINUE) {
      getCallback().receivedStatus(new OperationStatus(true, new String(pl)));
      transitionState(OperationState.COMPLETE);
    } else if (errorCode == 0) {
      getCallback().receivedStatus(new OperationStatus(true, ""));
      transitionState(OperationState.COMPLETE);
    } else {
      super.finishedPayload(pl);
    }
  }
=======
	@Override
	public String toString() {
		return "SASL base operation";
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    getLogger().debug("Auth response:  %s", new String(pl));
  }

  @Override
  protected void finishedPayload(byte[] pl) throws IOException {
    if (errorCode == SASL_CONTINUE) {
      getCallback().receivedStatus(new OperationStatus(true, new String(pl)));
      transitionState(OperationState.COMPLETE);
    } else if (errorCode == 0) {
      getCallback().receivedStatus(new OperationStatus(true, ""));
      transitionState(OperationState.COMPLETE);
    } else {
      super.finishedPayload(pl);
    }
  }

  @Override
  public String toString() {
    return "SASL base operation";
  }
}
File
SASLBaseOperationImpl.java
Developer's decision
Concatenation
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
    prepareBuffer("", 0, EMPTY_BYTES);
  }

<<<<<<< HEAD
  @Override
  protected void decodePayload(byte[] pl) {
    getCallback().receivedStatus(new OperationStatus(true, new String(pl)));
  }
=======
	@Override
	public String toString() {
		return "SASL mechs operation";
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    prepareBuffer("", 0, EMPTY_BYTES);
  }

  @Override
  protected void decodePayload(byte[] pl) {
    getCallback().receivedStatus(new OperationStatus(true, new String(pl)));
  }

  @Override
  public String toString() {
    return "SASL mechs operation";
  }
}
File
SASLMechsOperationImpl.java
Developer's decision
Concatenation
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
=======
    super(CMD, m, ch, s, p, h, c);
  }

<<<<<<< HEAD
  @Override
  protected byte[] buildResponse(SaslClient sc) throws SaslException {
    return sc.evaluateChallenge(challenge);
  }
	}

	@Override
	public String toString() {
		return "SASL steps operation";
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    super(CMD, m, ch, s, p, h, c);
  }

  @Override
  protected byte[] buildResponse(SaslClient sc) throws SaslException {
    return sc.evaluateChallenge(challenge);
  }

  @Override
  public String toString() {
    return "SASL steps operation";
  }
}
File
SASLStepOperationImpl.java
Developer's decision
Combination
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
    vbucket = vb;
  }

<<<<<<< HEAD
  public short getVBucket(String k) {
    assert k.equals(key) : (k + " doesn't match the key " + key
        + " for this operation");
    return vbucket;
  }
=======
	public short getVBucket(String k) {
		assert k.equals(key) : (k + " doesn't match the key " + key + " for this operation");
		return vbucket;
	}

	@Override
	public String toString() {
		return super.toString() + " Key: " + key;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    vbucket = vb;
  }

  public short getVBucket(String k) {
    assert k.equals(key) : (k + " doesn't match the key " + key
        + " for this operation");
    return vbucket;
  }

  @Override
  public String toString() {
    return super.toString() + " Key: " + key;
  }
}
File
SingleKeyOperationImpl.java
Developer's decision
Version 2
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
import net.spy.memcached.ops.StoreOperation;
import net.spy.memcached.ops.StoreType;

<<<<<<< HEAD
class StoreOperationImpl extends SingleKeyOperationImpl implements
    StoreOperation, CASOperation {

  private static final int SET = 0x01;
  private static final int ADD = 0x02;
  private static final int REPLACE = 0x03;

  static final int SETQ = 0x11;
  static final int ADDQ = 0x12;
  static final int REPLACEQ = 0x13;

  // 4-byte flags, 4-byte expiration
  static final int EXTRA_LEN = 8;

  private final StoreType storeType;
  private final int flags;
  private final int exp;
  private final long cas;
  private final byte[] data;

  private static int cmdMap(StoreType t) {
    int rv;
    switch (t) {
    case set:
      rv = SET;
      break;
    case add:
      rv = ADD;
      break;
    case replace:
      rv = REPLACE;
      break;
    default:
      rv = -1;
    }
    // Check fall-through.
    assert rv != -1 : "Unhandled store type:  " + t;
    return rv;
  }

  public StoreOperationImpl(StoreType t, String k, int f, int e, byte[] d,
      long c, OperationCallback cb) {
    super(cmdMap(t), generateOpaque(), k, cb);
    flags = f;
    exp = e;
    data = d;
    cas = c;
    storeType = t;
  }

  @Override
  public void initialize() {
    prepareBuffer(key, cas, data, flags, exp);
  }

  public byte[] getBytes() {
    return data;
  }

  public long getCasValue() {
    return cas;
  }

  public int getExpiration() {
    return exp;
  }

  public int getFlags() {
    return flags;
  }

  public byte[] getData() {
    return data;
  }

  public StoreType getStoreType() {
    return storeType;
  }
=======
class StoreOperationImpl extends SingleKeyOperationImpl
	implements StoreOperation, CASOperation {

	private static final int SET=0x01;
	private static final int ADD=0x02;
	private static final int REPLACE=0x03;

	static final int SETQ=0x11;
	static final int ADDQ=0x12;
	static final int REPLACEQ=0x13;

	// 4-byte flags, 4-byte expiration
	static final int EXTRA_LEN = 8;

	private final StoreType storeType;
	private final int flags;
	private final int exp;
	private final long cas;
	private final byte[] data;

	private static int cmdMap(StoreType t) {
		int rv=-1;
		switch(t) {
			case set: rv=SET; break;
			case add: rv=ADD; break;
			case replace: rv=REPLACE; break;
		}
		// Check fall-through.
		assert rv != -1 : "Unhandled store type:  " + t;
		return rv;
	}

	public StoreOperationImpl(StoreType t, String k, int f, int e,
			byte[] d, long c, OperationCallback cb) {
		super(cmdMap(t), generateOpaque(), k, cb);
		flags=f;
		exp=e;
		data=d;
		cas=c;
		storeType=t;
	}

	@Override
	public void initialize() {
		prepareBuffer(key, cas, data, flags, exp);
	}

	public long getCasValue() {
		return cas;
	}

	public int getExpiration() {
		return exp;
	}

	public int getFlags() {
		return flags;
	}

	public byte[] getData() {
		return data;
	}

	public StoreType getStoreType() {
		return storeType;
	}

	@Override
	public String toString() {
		return super.toString() + " Cas: " + cas + " Exp: " + exp + " Flags: "
			+ flags + " Data Length: " + data.length;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
import net.spy.memcached.ops.StoreOperation;
import net.spy.memcached.ops.StoreType;

class StoreOperationImpl extends SingleKeyOperationImpl implements
    StoreOperation, CASOperation {

  private static final int SET = 0x01;
  private static final int ADD = 0x02;
  private static final int REPLACE = 0x03;

  static final int SETQ = 0x11;
  static final int ADDQ = 0x12;
  static final int REPLACEQ = 0x13;

  // 4-byte flags, 4-byte expiration
  static final int EXTRA_LEN = 8;

  private final StoreType storeType;
  private final int flags;
  private final int exp;
  private final long cas;
  private final byte[] data;

  private static int cmdMap(StoreType t) {
    int rv;
    switch (t) {
    case set:
      rv = SET;
      break;
    case add:
      rv = ADD;
      break;
    case replace:
      rv = REPLACE;
      break;
    default:
      rv = -1;
    }
    // Check fall-through.
    assert rv != -1 : "Unhandled store type:  " + t;
    return rv;
  }

  public StoreOperationImpl(StoreType t, String k, int f, int e, byte[] d,
      long c, OperationCallback cb) {
    super(cmdMap(t), generateOpaque(), k, cb);
    flags = f;
    exp = e;
    data = d;
    cas = c;
    storeType = t;
  }

  @Override
  public void initialize() {
    prepareBuffer(key, cas, data, flags, exp);
  }

  public long getCasValue() {
    return cas;
  }

  public int getExpiration() {
    return exp;
  }

  public int getFlags() {
    return flags;
  }

  public byte[] getData() {
    return data;
  }

  public StoreType getStoreType() {
    return storeType;
  }

  @Override
  public String toString() {
    return super.toString() + " Cas: " + cas + " Exp: " + exp + " Flags: "
      + flags + " Data Length: " + data.length;
  }
}
File
StoreOperationImpl.java
Developer's decision
Combination
Kind of conflict
Annotation
Attribute
Class signature
Comment
Method declaration
Chunk
Conflicting content
 * Implementation of a tap ACK operation.
 */
public class TapAckOperationImpl extends TapOperationImpl {
<<<<<<< HEAD
  private final TapOpcode opcode;
  private final int opaque;

  TapAckOperationImpl(TapOpcode opcode, int opaque, OperationCallback cb) {
    super(cb);
    this.opcode = opcode;
    this.opaque = opaque;
  }

  @Override
  public void initialize() {
    RequestMessage message = new RequestMessage();
    message.setMagic(TapMagic.PROTOCOL_BINARY_RES);
    message.setOpcode(opcode);
    message.setOpaque(opaque);
    setBuffer(message.getBytes());
  }

  @Override
  public void readFromBuffer(ByteBuffer data) {
    // Do Nothing
  }

  @Override
  public void streamClosed(OperationState state) {
    transitionState(state);
  }
=======
	private final TapOpcode opcode;
	private final int opaque;

	TapAckOperationImpl(TapOpcode opcode, int opaque, OperationCallback cb) {
		super(cb);
		this.opcode = opcode;
		this.opaque = opaque;
	}

	@Override
	public void initialize() {
		RequestMessage message = new RequestMessage();
		message.setMagic(TapMagic.PROTOCOL_BINARY_RES);
		message.setOpcode(opcode);
		message.setOpaque(opaque);
		setBuffer(message.getBytes());
	}

	@Override
	public void readFromBuffer(ByteBuffer data) {
		// Do Nothing
	}

	@Override
	public void streamClosed(OperationState state) {
		transitionState(state);
	}

	@Override
	public String toString() {
		return "Cmd: tap ack Opcode: " + opcode + " Opaque: " + opaque;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
 * Implementation of a tap ACK operation.
 */
public class TapAckOperationImpl extends TapOperationImpl {
  private final TapOpcode opcode;
  private final int opaque;

  TapAckOperationImpl(TapOpcode opcode, int opaque, OperationCallback cb) {
    super(cb);
    this.opcode = opcode;
    this.opaque = opaque;
  }

  @Override
  public void initialize() {
    RequestMessage message = new RequestMessage();
    message.setMagic(TapMagic.PROTOCOL_BINARY_RES);
    message.setOpcode(opcode);
    message.setOpaque(opaque);
    setBuffer(message.getBytes());
  }

  @Override
  public void readFromBuffer(ByteBuffer data) {
    // Do Nothing
  }

  @Override
  public void streamClosed(OperationState state) {
    transitionState(state);
  }

  @Override
  public String toString() {
    return "Cmd: tap ack Opcode: " + opcode + " Opaque: " + opaque;
  }
}
File
TapAckOperationImpl.java
Developer's decision
Version 2
Kind of conflict
Annotation
Attribute
Method declaration
Chunk
Conflicting content
import net.spy.memcached.tapmessage.TapMagic;
import net.spy.memcached.tapmessage.TapOpcode;

<<<<<<< HEAD
/**
 * Implementation of a tap backfill operation.
 */
public class TapBackfillOperationImpl extends TapOperationImpl implements
    TapOperation {
  private final String id;
  private final long date;

  TapBackfillOperationImpl(String id, long date, OperationCallback cb) {
    super(cb);
    this.id = id;
    this.date = date;
  }

  @Override
  public void initialize() {
    RequestMessage message = new RequestMessage();
    message.setMagic(TapMagic.PROTOCOL_BINARY_REQ);
    message.setOpcode(TapOpcode.REQUEST);
    message.setFlags(TapFlag.BACKFILL);
    message.setFlags(TapFlag.SUPPORT_ACK);
    if (id != null) {
      message.setName(id);
    } else {
      message.setName(UUID.randomUUID().toString());
    }

    message.setBackfill(date);
    setBuffer(message.getBytes());
  }

  @Override
  public void streamClosed(OperationState state) {
    transitionState(state);
  }
=======
public class TapBackfillOperationImpl extends TapOperationImpl implements TapOperation {
	private final String id;
	private final long date;

	TapBackfillOperationImpl(String id, long date, OperationCallback cb) {
		super(cb);
		this.id = id;
		this.date = date;
	}

	@Override
	public void initialize() {
		RequestMessage message = new RequestMessage();
		message.setMagic(TapMagic.PROTOCOL_BINARY_REQ);
		message.setOpcode(TapOpcode.REQUEST);
		message.setFlags(TapFlag.BACKFILL);
		message.setFlags(TapFlag.SUPPORT_ACK);
		if (id != null) {
			message.setName(id);
		} else {
			message.setName(UUID.randomUUID().toString());
		}

		message.setBackfill(date);
		setBuffer(message.getBytes());
	}

	@Override
	public void streamClosed(OperationState state) {
		transitionState(state);
	}

	@Override
	public String toString() {
		return "Cmd: tap dump Flags: backfill,ack";
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
import net.spy.memcached.tapmessage.TapMagic;
import net.spy.memcached.tapmessage.TapOpcode;

/**
 * Implementation of a tap backfill operation.
 */
public class TapBackfillOperationImpl extends TapOperationImpl implements
    TapOperation {
  private final String id;
  private final long date;

  TapBackfillOperationImpl(String id, long date, OperationCallback cb) {
    super(cb);
    this.id = id;
    this.date = date;
  }

  @Override
  public void initialize() {
    RequestMessage message = new RequestMessage();
    message.setMagic(TapMagic.PROTOCOL_BINARY_REQ);
    message.setOpcode(TapOpcode.REQUEST);
    message.setFlags(TapFlag.BACKFILL);
    message.setFlags(TapFlag.SUPPORT_ACK);
    if (id != null) {
      message.setName(id);
    } else {
      message.setName(UUID.randomUUID().toString());
    }
    message.setBackfill(date);
    setBuffer(message.getBytes());
  }

  @Override
  public void streamClosed(OperationState state) {
    transitionState(state);
  }

  @Override
  public String toString() {
    return "Cmd: tap dump Flags: backfill,ack";
  }
}
File
TapBackfillOperationImpl.java
Developer's decision
Combination
Kind of conflict
Annotation
Attribute
Class signature
Comment
Method declaration
Chunk
Conflicting content
    setBuffer(message.getBytes());
  }

<<<<<<< HEAD
  @Override
  public void streamClosed(OperationState state) {
    transitionState(state);
  }
=======
	@Override
	public void streamClosed(OperationState state) {
		transitionState(state);
	}

	@Override
	public String toString() {
		return "Cmd: tap custom";
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    setBuffer(message.getBytes());
  }

  @Override
  public void streamClosed(OperationState state) {
    transitionState(state);
  }

  @Override
  public String toString() {
    return "Cmd: tap custom";
  }
}
File
TapCustomOperationImpl.java
Developer's decision
Version 2
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
    setBuffer(message.getBytes());
  }

<<<<<<< HEAD
  @Override
  public void streamClosed(OperationState state) {
    transitionState(state);
  }
=======
	@Override
	public void streamClosed(OperationState state) {
		transitionState(state);
	}

	@Override
	public String toString() {
		return "Cmd: tap dump Flags: dump,ack";
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    setBuffer(message.getBytes());
  }

  @Override
  public void streamClosed(OperationState state) {
    transitionState(state);
  }

  @Override
  public String toString() {
    return "Cmd: tap dump Flags: dump,ack";
  }
}
File
TapDumpOperationImpl.java
Developer's decision
Version 2
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
				header[bytesProcessed] = data.get();
import net.spy.memcached.tapmessage.TapOpcode;
import net.spy.memcached.tapmessage.Util;

<<<<<<< HEAD
/**
 * Abstract implementation of a tap operation.
 */
public abstract class TapOperationImpl extends OperationImpl implements
    TapOperation {
  private static final int TAP_FLAG_ACK = 0x1;

  private int bytesProcessed;
  private int bodylen;
  private byte[] header;
  private byte[] message;

  static final int CMD = 0;

  protected TapOperationImpl(OperationCallback cb) {
    super(CMD, generateOpaque(), cb);
    this.header = new byte[BaseMessage.HEADER_LENGTH];
    this.message = null;
  }

  public abstract void initialize();

  @Override
  public void readFromBuffer(ByteBuffer data) throws IOException {
    while (data.remaining() > 0) {
      if (bytesProcessed < BaseMessage.HEADER_LENGTH) {
        header[bytesProcessed] = data.get();
        bytesProcessed++;
      } else {
        if (message == null) {
          bodylen = (int) Util.fieldToValue(header,
            BaseMessage.TOTAL_BODY_INDEX, BaseMessage.TOTAL_BODY_FIELD_LENGTH);
          message = new byte[BaseMessage.HEADER_LENGTH + bodylen];
          System.arraycopy(header, 0, message, 0, BaseMessage.HEADER_LENGTH);
        }

        if (bytesProcessed < message.length) {
          message[bytesProcessed] = data.get();
          bytesProcessed++;
        }
        if (bytesProcessed >= message.length) {
          ResponseMessage response = new ResponseMessage(message);

          if (response.getOpcode() != TapOpcode.OPAQUE
              && response.getOpcode() != TapOpcode.NOOP) {
            if (response.getFlags() == TAP_FLAG_ACK) {
              ((Callback) getCallback()).gotAck(response.getOpcode(),
                  response.getOpaque());
            }
            ((Callback) getCallback()).gotData(response);
          }
          message = null;
          bytesProcessed = 0;
        }
      }
    }
  }
=======
public abstract class TapOperationImpl extends OperationImpl implements TapOperation {
	private static final int TAP_FLAG_ACK = 0x1;

	private int bytesProcessed;
	private int bodylen;
	private byte[] header;
	private byte[] message;

	static final int CMD=0;

	protected TapOperationImpl(OperationCallback cb) {
		super(CMD, generateOpaque(), cb);
		this.header = new byte[BaseMessage.HEADER_LENGTH];
		this.message = null;
	}

	public abstract void initialize();

	@Override
	public void readFromBuffer(ByteBuffer data) throws IOException {
		while (data.remaining() > 0) {
			if (bytesProcessed < BaseMessage.HEADER_LENGTH) {
				bytesProcessed++;
			} else {
				if (message == null) {
					bodylen = (int) Util.fieldToValue(header, BaseMessage.TOTAL_BODY_INDEX, BaseMessage.TOTAL_BODY_FIELD_LENGTH);
					message = new byte[BaseMessage.HEADER_LENGTH + bodylen];
					System.arraycopy(header, 0, message, 0, BaseMessage.HEADER_LENGTH);
				}

				if (bytesProcessed < message.length) {
					message[bytesProcessed] = data.get();
					bytesProcessed++;
				}
				if (bytesProcessed >= message.length) {
					ResponseMessage response = new ResponseMessage(message);

					if (response.getFlags() == TAP_FLAG_ACK) {
						((Callback)getCallback()).gotAck(response.getOpcode(), response.getOpaque());
					}
					if (response.getOpcode() != TapOpcode.OPAQUE && response.getOpcode() != TapOpcode.NOOP) {
						((Callback)getCallback()).gotData(response);
					}
					message = null;
					bytesProcessed = 0;
				}
			}
		}
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
import net.spy.memcached.tapmessage.TapOpcode;
import net.spy.memcached.tapmessage.Util;

/**
 * Abstract implementation of a tap operation.
 */
public abstract class TapOperationImpl extends OperationImpl implements
    TapOperation {
  private static final int TAP_FLAG_ACK = 0x1;

  private int bytesProcessed;
  private int bodylen;
  private byte[] header;
  private byte[] message;

  static final int CMD = 0;

  protected TapOperationImpl(OperationCallback cb) {
    super(CMD, generateOpaque(), cb);
    this.header = new byte[BaseMessage.HEADER_LENGTH];
    this.message = null;
  }

  public abstract void initialize();

  @Override
  public void readFromBuffer(ByteBuffer data) throws IOException {
    while (data.remaining() > 0) {
      if (bytesProcessed < BaseMessage.HEADER_LENGTH) {
        header[bytesProcessed] = data.get();
        bytesProcessed++;
      } else {
        if (message == null) {
          bodylen = (int) Util.fieldToValue(header,
            BaseMessage.TOTAL_BODY_INDEX, BaseMessage.TOTAL_BODY_FIELD_LENGTH);
          message = new byte[BaseMessage.HEADER_LENGTH + bodylen];
          System.arraycopy(header, 0, message, 0, BaseMessage.HEADER_LENGTH);
        }

        if (bytesProcessed < message.length) {
          message[bytesProcessed] = data.get();
          bytesProcessed++;
        }
        if (bytesProcessed >= message.length) {
          ResponseMessage response = new ResponseMessage(message);

          if (response.getFlags() == TAP_FLAG_ACK) {
            ((Callback) getCallback()).gotAck(response.getOpcode(),
                response.getOpaque());
          }

          if (response.getOpcode() != TapOpcode.OPAQUE
              && response.getOpcode() != TapOpcode.NOOP) {
            ((Callback)getCallback()).gotData(response);
          }
          message = null;
          bytesProcessed = 0;
        }
      }
    }
  }
}
File
TapOperationImpl.java
Developer's decision
Combination
Kind of conflict
Annotation
Attribute
Class signature
Comment
Method declaration
Method interface
Chunk
Conflicting content
    exp = e;
  }

<<<<<<< HEAD
  @Override
  public void initialize() {
    prepareBuffer(key, 0, EMPTY_BYTES, exp);
  }
=======
	@Override
	public void initialize() {
		prepareBuffer(key, 0, EMPTY_BYTES, exp);
	}

	@Override
	public String toString() {
		return super.toString() + " Exp: " + exp;
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
    exp = e;
  }

  @Override
  public void initialize() {
    prepareBuffer(key, 0, EMPTY_BYTES, exp);
  }

  @Override
  public String toString() {
    return super.toString() + " Exp: " + exp;
  }
}
File
TouchOperationImpl.java
Developer's decision
Version 2
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
 */
public abstract class OperationFactoryTestBase extends MockObjectTestCase {

<<<<<<< HEAD
  public static final String TEST_KEY = "someKey";
  protected OperationFactory ofact = null;
  protected OperationCallback genericCallback;
  private byte[] testData;

  @Override
  protected void setUp() throws Exception {
    super.setUp();
    ofact = getOperationFactory();
    genericCallback = new OperationCallback() {
      public void complete() {
        fail("Unexpected invocation");
      }

      public void receivedStatus(OperationStatus status) {
        fail("Unexpected status:  " + status);
      }
    };

    testData = new byte[64];
    new Random().nextBytes(testData);
  }

  /**
   * Get the operation factory used by the tests.
   */
  protected abstract OperationFactory getOperationFactory();

  public void testDeleteOperationCloning() {
    DeleteOperation op = ofact.delete(TEST_KEY, genericCallback);

    DeleteOperation op2 = cloneOne(DeleteOperation.class, op);
    assertEquals(TEST_KEY, op2.getKeys().iterator().next());
    assertCallback(op2);
  }

  public void testCASOperationCloning() {
    CASOperation op = ofact.cas(StoreType.set, "someKey", 727582, 8174, 7175,
        testData, genericCallback);

    CASOperation op2 = cloneOne(CASOperation.class, op);
    assertKey(op2);
    assertEquals(727582, op2.getCasValue());
    assertEquals(8174, op2.getFlags());
    assertEquals(7175, op2.getExpiration());
    assertBytes(op2.getBytes());
    assertCallback(op2);
  }

  public void testMutatorOperationIncrCloning() {
    int exp = 823862;
    long def = 28775;
    int by = 7735;
    MutatorOperation op = ofact.mutate(Mutator.incr, TEST_KEY, by, def, exp,
        genericCallback);

  public void testStoreOperationSetCloning() {
    MutatorOperation op2 = cloneOne(MutatorOperation.class, op);
    assertKey(op2);
    assertEquals(exp, op2.getExpiration());
    assertEquals(def, op2.getDefault());
    assertEquals(by, op2.getBy());
    assertSame(Mutator.incr, op2.getType());
    assertCallback(op2);
  }

  public void testMutatorOperationDecrCloning() {
    int exp = 823862;
    long def = 28775;
    int by = 7735;
    MutatorOperation op = ofact.mutate(Mutator.decr, TEST_KEY, by, def, exp,
        genericCallback);

    MutatorOperation op2 = cloneOne(MutatorOperation.class, op);
    assertKey(op2);
    assertEquals(exp, op2.getExpiration());
    assertEquals(def, op2.getDefault());
    assertEquals(by, op2.getBy());
    assertSame(Mutator.decr, op2.getType());
    assertCallback(op2);
  }

  public void testStoreOperationAddCloning() {
    int exp = 823862;
    int flags = 7735;
    StoreOperation op = ofact.store(StoreType.add, TEST_KEY, flags, exp,
        testData, genericCallback);

    StoreOperation op2 = cloneOne(StoreOperation.class, op);
    assertKey(op2);
    assertEquals(exp, op2.getExpiration());
    assertEquals(flags, op2.getFlags());
    assertSame(StoreType.add, op2.getStoreType());
    assertCallback(op2);
  }
    int exp = 823862;
    int flags = 7735;
    StoreOperation op = ofact.store(StoreType.set, TEST_KEY, flags, exp,
        testData, genericCallback);

    StoreOperation op2 = cloneOne(StoreOperation.class, op);
    assertKey(op2);
    assertEquals(exp, op2.getExpiration());
    assertEquals(flags, op2.getFlags());
    assertSame(StoreType.set, op2.getStoreType());
    assertCallback(op2);
  }

  public void testConcatenationOperationAppendCloning() {
    long casId = 82757248;
    ConcatenationOperation op = ofact.cat(ConcatenationType.append, casId,
        TEST_KEY, testData, genericCallback);

    ConcatenationOperation op2 = cloneOne(ConcatenationOperation.class, op);
    assertKey(op2);
    assertSame(ConcatenationType.append, op2.getStoreType());
    assertCallback(op2);
  }

  public void testConcatenationOperationPrependCloning() {
    long casId = 82757248;
    ConcatenationOperation op = ofact.cat(ConcatenationType.prepend, casId,
        TEST_KEY, testData, genericCallback);

    ConcatenationOperation op2 = cloneOne(ConcatenationOperation.class, op);
    assertKey(op2);
    assertSame(ConcatenationType.prepend, op2.getStoreType());
    assertCallback(op2);
  }

  public void testSingleGetOperationCloning() {
    GetOperation.Callback callback =
        (GetOperation.Callback) mock(GetOperation.Callback.class).proxy();
    GetOperation op = ofact.get(TEST_KEY, callback);

    GetOperation op2 = cloneOne(GetOperation.class, op);
    assertKey(op2);
    assertSame(callback, op.getCallback());
  }

  public void testSingleGetsOperationCloning() {
    GetsOperation.Callback callback =
        (GetsOperation.Callback) mock(GetsOperation.Callback.class).proxy();
    GetsOperation op = ofact.gets(TEST_KEY, callback);

    GetsOperation op2 = cloneOne(GetsOperation.class, op);
    assertKey(op2);
    assertSame(callback, op.getCallback());
  }

  // These are harder cases as they fan out.
  public void testMultipleGetOperationCloning() {
    Collection keys = Arrays.asList("k1", "k2", "k3");
    GetOperation.Callback callback =
        (GetOperation.Callback) mock(GetOperation.Callback.class).proxy();
    GetOperation op = ofact.get(keys, callback);

    Collection ops = ofact.clone(op);
    assertEquals(3, ops.size());

    Collection mutableKeys = new ArrayList(keys);
    int i = 3;
    for (Operation o : ops) {
      assertEquals(i, mutableKeys.size()); // Starting size
      GetOperation go = (GetOperation) o;
      mutableKeys.removeAll(go.getKeys());
      // Verify we matched and removed 1
      assertEquals(--i, mutableKeys.size());
    }
  }

  public void testMultipleGetOperationFanout() {
    Collection keys = Arrays.asList("k1", "k2", "k3");
    Mock m = mock(GetOperation.Callback.class);
    OperationStatus st = new OperationStatus(true, "blah");
    m.expects(once()).method("complete");
    m.expects(once()).method("receivedStatus").with(same(st));
    m.expects(once()).method("gotData").with(eq("k1"), eq(1),
        isA(byte[].class));
    m.expects(once()).method("gotData").with(eq("k2"), eq(2),
        isA(byte[].class));
    m.expects(once()).method("gotData").with(eq("k3"), eq(3),
        isA(byte[].class));

    GetOperation.Callback callback = (GetOperation.Callback) m.proxy();
    GetOperation op = ofact.get(keys, callback);

    // Transition each operation callback into the complete state.
    Iterator ki = keys.iterator();
    int i = 0;
    for (Operation o : ofact.clone(op)) {
      GetOperation.Callback cb = (GetOperation.Callback) o.getCallback();
      cb.gotData(ki.next(), ++i, new byte[3]);
      cb.receivedStatus(st);
      cb.complete();
    }
  }

  protected void assertKey(KeyedOperation op) {
    assertEquals(TEST_KEY, op.getKeys().iterator().next());
  }

  protected void assertCallback(Operation op) {
    assertSame(genericCallback, op.getCallback());
  }

  private void assertBytes(byte[] bytes) {
    assertTrue(Arrays.equals(testData, bytes));
  }

  @SuppressWarnings("unchecked")
  private  T assertOne(Class class1, Collection ops) {
    assertEquals(1, ops.size());
    Operation op = ops.iterator().next();
    return (T) op;
  }

  protected  T cloneOne(Class c, KeyedOperation t) {
    return assertOne(c, ofact.clone(t));
  }
=======
	public final static String TEST_KEY = "someKey";
	protected OperationFactory ofact = null;
	protected OperationCallback genericCallback;
	private byte[] testData;

	@Override
	protected void setUp() throws Exception {
		super.setUp();
		ofact = getOperationFactory();
		genericCallback = new OperationCallback() {
			public void complete() {
				fail("Unexpected invocation");
			}
			public void receivedStatus(OperationStatus status) {
				fail("Unexpected status:  " + status);
			}
		};

		testData = new byte[64];
		new Random().nextBytes(testData);
	}

	/**
	 * Get the operation factory used by the tests.
	 */
	protected abstract OperationFactory getOperationFactory();

	public void testDeleteOperationCloning() {
		DeleteOperation op = ofact.delete(TEST_KEY, genericCallback);

		DeleteOperation op2 = cloneOne(DeleteOperation.class, op);
		assertEquals(TEST_KEY, op2.getKeys().iterator().next());
		assertCallback(op2);
	}

	public void testCASOperationCloning() {
		CASOperation op = ofact.cas(StoreType.set,
			"someKey", 727582, 8174, 7175, testData, genericCallback);

		CASOperation op2 = cloneOne(CASOperation.class, op);
		assertKey(op2);
		assertEquals(727582, op2.getCasValue());
		assertEquals(8174, op2.getFlags());
		assertEquals(7175, op2.getExpiration());
		assertBytes(op2.getData());
		assertCallback(op2);
	}

	public void testMutatorOperationIncrCloning() {
		int exp = 823862;
		long def = 28775;
		int by = 7735;
		MutatorOperation op = ofact.mutate(Mutator.incr, TEST_KEY, by, def,
				exp, genericCallback);

		MutatorOperation op2 = cloneOne(MutatorOperation.class, op);
		assertKey(op2);
		assertEquals(exp, op2.getExpiration());
		assertEquals(def, op2.getDefault());
		assertEquals(by, op2.getBy());
		assertSame(Mutator.incr, op2.getType());
		assertCallback(op2);
	}

	public void testMutatorOperationDecrCloning() {
		int exp = 823862;
		long def = 28775;
		int by = 7735;
		MutatorOperation op = ofact.mutate(Mutator.decr, TEST_KEY, by, def,
				exp, genericCallback);

		MutatorOperation op2 = cloneOne(MutatorOperation.class, op);
		assertKey(op2);
		assertEquals(exp, op2.getExpiration());
		assertEquals(def, op2.getDefault());
		assertEquals(by, op2.getBy());
		assertSame(Mutator.decr, op2.getType());
		assertCallback(op2);
	}

	public void testStoreOperationAddCloning() {
		int exp = 823862;
		int flags = 7735;
		StoreOperation op = ofact.store(StoreType.add, TEST_KEY,
				flags, exp, testData, genericCallback);

		StoreOperation op2 = cloneOne(StoreOperation.class, op);
		assertKey(op2);
		assertEquals(exp, op2.getExpiration());
		assertEquals(flags, op2.getFlags());
		assertSame(StoreType.add, op2.getStoreType());
		assertCallback(op2);
	}

	public void testStoreOperationSetCloning() {
		int exp = 823862;
		int flags = 7735;
		StoreOperation op = ofact.store(StoreType.set, TEST_KEY,
				flags, exp, testData, genericCallback);

		StoreOperation op2 = cloneOne(StoreOperation.class, op);
		assertKey(op2);
		assertEquals(exp, op2.getExpiration());
		assertEquals(flags, op2.getFlags());
		assertSame(StoreType.set, op2.getStoreType());
		assertCallback(op2);
	}

	public void testConcatenationOperationAppendCloning() {
		long casId = 82757248;
		ConcatenationOperation op = ofact.cat(ConcatenationType.append, casId,
			TEST_KEY, testData, genericCallback);

		ConcatenationOperation op2 = cloneOne(
				ConcatenationOperation.class, op);
		assertKey(op2);
		assertSame(ConcatenationType.append, op2.getStoreType());
		assertCallback(op2);
	}

	public void testConcatenationOperationPrependCloning() {
		long casId = 82757248;
		ConcatenationOperation op = ofact.cat(ConcatenationType.prepend, casId,
			TEST_KEY, testData, genericCallback);

		ConcatenationOperation op2 = cloneOne(
				ConcatenationOperation.class, op);
		assertKey(op2);
		assertSame(ConcatenationType.prepend, op2.getStoreType());
		assertCallback(op2);
	}

	public void testSingleGetOperationCloning() {
		GetOperation.Callback callback =
			(GetOperation.Callback)mock(GetOperation.Callback.class).proxy();
		GetOperation op = ofact.get(TEST_KEY, callback);

		GetOperation op2 = cloneOne(GetOperation.class, op);
		assertKey(op2);
		assertSame(callback, op.getCallback());
	}

	public void testSingleGetsOperationCloning() {
		GetsOperation.Callback callback =
			(GetsOperation.Callback)mock(GetsOperation.Callback.class).proxy();
		GetsOperation op = ofact.gets(TEST_KEY, callback);

		GetsOperation op2 = cloneOne(GetsOperation.class, op);
		assertKey(op2);
		assertSame(callback, op.getCallback());
	}

	// These are harder cases as they fan out.
	public void testMultipleGetOperationCloning() {
		Collection keys = Arrays.asList("k1", "k2", "k3");
		GetOperation.Callback callback =
			(GetOperation.Callback)mock(GetOperation.Callback.class).proxy();
		GetOperation op = ofact.get(keys, callback);

		Collection ops = ofact.clone(op);
		assertEquals(3, ops.size());

		Collection mutableKeys = new ArrayList(keys);
		int i = 3;
		for(Operation o : ops) {
			assertEquals(i, mutableKeys.size()); // Starting size
			GetOperation go = (GetOperation)o;
			mutableKeys.removeAll(go.getKeys());
			// Verify we matched and removed 1
			assertEquals(--i, mutableKeys.size());
		}
	}

	public void testMultipleGetOperationFanout() {
		Collection keys = Arrays.asList("k1", "k2", "k3");
		Mock m = mock(GetOperation.Callback.class);
		OperationStatus st=new OperationStatus(true, "blah");
		m.expects(once()).method("complete");
		m.expects(once()).method("receivedStatus").with(same(st));
		m.expects(once()).method("gotData")
			.with(eq("k1"), eq(1), isA(byte[].class));
		m.expects(once()).method("gotData")
			.with(eq("k2"), eq(2), isA(byte[].class));
		m.expects(once()).method("gotData")
			.with(eq("k3"), eq(3), isA(byte[].class));

		GetOperation.Callback callback = (GetOperation.Callback)m.proxy();
		GetOperation op = ofact.get(keys, callback);

		// Transition each operation callback into the complete state.
		Iterator ki = keys.iterator();
		int i=0;
		for(Operation o : ofact.clone(op)) {
			GetOperation.Callback cb = (GetOperation.Callback)o.getCallback();
			cb.gotData(ki.next(), ++i, new byte[3]);
			cb.receivedStatus(st);
			cb.complete();
		}
	}

	protected void assertKey(KeyedOperation op) {
		assertEquals(TEST_KEY, op.getKeys().iterator().next());
	}

	protected void assertCallback(Operation op) {
		assertSame(genericCallback, op.getCallback());
	}

	private void assertBytes(byte[] bytes) {
		assertTrue(Arrays.equals(testData, bytes));
	}

	@SuppressWarnings("unchecked")
	private  T assertOne(Class class1,
			Collection ops) {
		assertEquals(1, ops.size());
		Operation op = ops.iterator().next();
		return (T) op;
	}

	protected  T cloneOne(Class c, KeyedOperation t) {
		return assertOne(c, ofact.clone(t));
	}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
 */
public abstract class OperationFactoryTestBase extends MockObjectTestCase {

  public static final String TEST_KEY = "someKey";
  protected OperationFactory ofact = null;
  protected OperationCallback genericCallback;
  private byte[] testData;

  @Override
  protected void setUp() throws Exception {
    super.setUp();
    ofact = getOperationFactory();
    genericCallback = new OperationCallback() {
      public void complete() {
        fail("Unexpected invocation");
      }

      public void receivedStatus(OperationStatus status) {
        fail("Unexpected status:  " + status);
      }
    };

    testData = new byte[64];
    new Random().nextBytes(testData);
  }

  /**
   * Get the operation factory used by the tests.
   */
  protected abstract OperationFactory getOperationFactory();

  public void testDeleteOperationCloning() {
    DeleteOperation op = ofact.delete(TEST_KEY, genericCallback);

    DeleteOperation op2 = cloneOne(DeleteOperation.class, op);
    assertEquals(TEST_KEY, op2.getKeys().iterator().next());
    assertCallback(op2);
  }

  public void testCASOperationCloning() {
    CASOperation op = ofact.cas(StoreType.set, "someKey", 727582, 8174, 7175,
        testData, genericCallback);

    CASOperation op2 = cloneOne(CASOperation.class, op);
    assertKey(op2);
    assertEquals(727582, op2.getCasValue());
    assertEquals(8174, op2.getFlags());
    assertEquals(7175, op2.getExpiration());
    assertBytes(op2.getData());
    assertCallback(op2);
  }

  public void testMutatorOperationIncrCloning() {
    int exp = 823862;
    long def = 28775;
    int by = 7735;
    MutatorOperation op = ofact.mutate(Mutator.incr, TEST_KEY, by, def, exp,
        genericCallback);

    MutatorOperation op2 = cloneOne(MutatorOperation.class, op);
    assertKey(op2);
    assertEquals(exp, op2.getExpiration());
    assertEquals(def, op2.getDefault());
    assertEquals(by, op2.getBy());
    assertSame(Mutator.incr, op2.getType());
    assertCallback(op2);
  }

  public void testMutatorOperationDecrCloning() {
    int exp = 823862;
    long def = 28775;
    int by = 7735;
    MutatorOperation op = ofact.mutate(Mutator.decr, TEST_KEY, by, def, exp,
        genericCallback);

    MutatorOperation op2 = cloneOne(MutatorOperation.class, op);
    assertKey(op2);
    assertEquals(exp, op2.getExpiration());
    assertEquals(def, op2.getDefault());
    assertEquals(by, op2.getBy());
    assertSame(Mutator.decr, op2.getType());
    assertCallback(op2);
  }

  public void testStoreOperationAddCloning() {
    int exp = 823862;
    int flags = 7735;
    StoreOperation op = ofact.store(StoreType.add, TEST_KEY, flags, exp,
        testData, genericCallback);

    StoreOperation op2 = cloneOne(StoreOperation.class, op);
    assertKey(op2);
    assertEquals(exp, op2.getExpiration());
    assertEquals(flags, op2.getFlags());
    assertSame(StoreType.add, op2.getStoreType());
    assertCallback(op2);
  }

  public void testStoreOperationSetCloning() {
    int exp = 823862;
    int flags = 7735;
    StoreOperation op = ofact.store(StoreType.set, TEST_KEY, flags, exp,
        testData, genericCallback);

    StoreOperation op2 = cloneOne(StoreOperation.class, op);
    assertKey(op2);
    assertEquals(exp, op2.getExpiration());
    assertEquals(flags, op2.getFlags());
    assertSame(StoreType.set, op2.getStoreType());
    assertCallback(op2);
  }

  public void testConcatenationOperationAppendCloning() {
    long casId = 82757248;
    ConcatenationOperation op = ofact.cat(ConcatenationType.append, casId,
        TEST_KEY, testData, genericCallback);

    ConcatenationOperation op2 = cloneOne(ConcatenationOperation.class, op);
    assertKey(op2);
    assertSame(ConcatenationType.append, op2.getStoreType());
    assertCallback(op2);
  }

  public void testConcatenationOperationPrependCloning() {
    long casId = 82757248;
    ConcatenationOperation op = ofact.cat(ConcatenationType.prepend, casId,
        TEST_KEY, testData, genericCallback);

    ConcatenationOperation op2 = cloneOne(ConcatenationOperation.class, op);
    assertKey(op2);
    assertSame(ConcatenationType.prepend, op2.getStoreType());
    assertCallback(op2);
  }

  public void testSingleGetOperationCloning() {
    GetOperation.Callback callback =
        (GetOperation.Callback) mock(GetOperation.Callback.class).proxy();
    GetOperation op = ofact.get(TEST_KEY, callback);

    GetOperation op2 = cloneOne(GetOperation.class, op);
    assertKey(op2);
    assertSame(callback, op.getCallback());
  }

  public void testSingleGetsOperationCloning() {
    GetsOperation.Callback callback =
        (GetsOperation.Callback) mock(GetsOperation.Callback.class).proxy();
    GetsOperation op = ofact.gets(TEST_KEY, callback);

    GetsOperation op2 = cloneOne(GetsOperation.class, op);
    assertKey(op2);
    assertSame(callback, op.getCallback());
  }

  // These are harder cases as they fan out.
  public void testMultipleGetOperationCloning() {
    Collection keys = Arrays.asList("k1", "k2", "k3");
    GetOperation.Callback callback =
        (GetOperation.Callback) mock(GetOperation.Callback.class).proxy();
    GetOperation op = ofact.get(keys, callback);

    Collection ops = ofact.clone(op);
    assertEquals(3, ops.size());

    Collection mutableKeys = new ArrayList(keys);
    int i = 3;
    for (Operation o : ops) {
      assertEquals(i, mutableKeys.size()); // Starting size
      GetOperation go = (GetOperation) o;
      mutableKeys.removeAll(go.getKeys());
      // Verify we matched and removed 1
      assertEquals(--i, mutableKeys.size());
    }
  }

  public void testMultipleGetOperationFanout() {
    Collection keys = Arrays.asList("k1", "k2", "k3");
    Mock m = mock(GetOperation.Callback.class);
    OperationStatus st = new OperationStatus(true, "blah");
    m.expects(once()).method("complete");
    m.expects(once()).method("receivedStatus").with(same(st));
    m.expects(once()).method("gotData").with(eq("k1"), eq(1),
        isA(byte[].class));
    m.expects(once()).method("gotData").with(eq("k2"), eq(2),
        isA(byte[].class));
    m.expects(once()).method("gotData").with(eq("k3"), eq(3),
        isA(byte[].class));

    GetOperation.Callback callback = (GetOperation.Callback) m.proxy();
    GetOperation op = ofact.get(keys, callback);

    // Transition each operation callback into the complete state.
    Iterator ki = keys.iterator();
    int i = 0;
    for (Operation o : ofact.clone(op)) {
      GetOperation.Callback cb = (GetOperation.Callback) o.getCallback();
      cb.gotData(ki.next(), ++i, new byte[3]);
      cb.receivedStatus(st);
      cb.complete();
    }
  }

  protected void assertKey(KeyedOperation op) {
    assertEquals(TEST_KEY, op.getKeys().iterator().next());
  }

  protected void assertCallback(Operation op) {
    assertSame(genericCallback, op.getCallback());
  }

  private void assertBytes(byte[] bytes) {
    assertTrue(Arrays.equals(testData, bytes));
  }

  @SuppressWarnings("unchecked")
  private  T assertOne(Class class1, Collection ops) {
    assertEquals(1, ops.size());
    Operation op = ops.iterator().next();
    return (T) op;
  }

  protected  T cloneOne(Class c, KeyedOperation t) {
    return assertOne(c, ofact.clone(t));
  }
}
File
OperationFactoryTestBase.java
Developer's decision
Combination
Kind of conflict
Annotation
Attribute
Comment
Method declaration
Method interface