getOperationTimeout());
}
<<<<<<< HEAD
@Override
public OperationFactory getOperationFactory() {
return new BinaryOperationFactory();
}
=======
@Override
protected String getName() {
return "BinaryConnectionFactory";
}
>>>>>>> bb1dc926948b439e528c846e5e902346ec7f3acd
}
Solution content
getOperationTimeout());
}
@Override
public OperationFactory getOperationFactory() {
return new BinaryOperationFactory();
}
@Override
protected String getName() {
return "BinaryConnectionFactory";
}
}
File
BinaryConnectionFactory.java
Developer's decision
Concatenation
Kind of conflict
Annotation
Method declaration
Chunk
Conflicting content
*/
*
*
*/
<<<<<<< HEAD
public class DefaultConnectionFactory extends SpyObject implements
ConnectionFactory {
/**
* Default failure mode.
*/
public static final FailureMode DEFAULT_FAILURE_MODE =
FailureMode.Redistribute;
/**
* Default hash algorithm.
*/
public static final HashAlgorithm DEFAULT_HASH =
DefaultHashAlgorithm.NATIVE_HASH;
/**
* Maximum length of the operation queue returned by this connection factory.
*/
public static final int DEFAULT_OP_QUEUE_LEN = 16384;
/**
* The maximum time to block waiting for op queue operations to complete, in
* milliseconds. The default has been set with the expectation that most
* requests are interactive and waiting for more than a few seconds is thus
* more undesirable than failing the request.
*/
public static final long DEFAULT_OP_QUEUE_MAX_BLOCK_TIME =
TimeUnit.SECONDS.toMillis(10);
/**
* The read buffer size for each server connection from this factory.
/*
* (non-Javadoc)
*
return true;
}
public static final int DEFAULT_READ_BUFFER_SIZE = 16384;
/**
* Default operation timeout in milliseconds.
*/
public static final long DEFAULT_OPERATION_TIMEOUT = 2500;
/**
* Maximum amount of time (in seconds) to wait between reconnect attempts.
*/
public static final long DEFAULT_MAX_RECONNECT_DELAY = 30;
/**
* Maximum number + 2 of timeout exception for shutdown connection.
*/
public static final int DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD = 998;
protected final int opQueueLen;
private final int readBufSize;
private final HashAlgorithm hashAlg;
/**
* Construct a DefaultConnectionFactory with the given parameters.
*
* @param qLen the queue length.
* @param bufSize the buffer size
* @param hash the algorithm to use for hashing
*/
public DefaultConnectionFactory(int qLen, int bufSize, HashAlgorithm hash) {
super();
opQueueLen = qLen;
readBufSize = bufSize;
hashAlg = hash;
}
/**
* Create a DefaultConnectionFactory with the given maximum operation queue
* length, and the given read buffer size.
*/
public DefaultConnectionFactory(int qLen, int bufSize) {
this(qLen, bufSize, DEFAULT_HASH);
}
/**
* Create a DefaultConnectionFactory with the default parameters.
*/
public DefaultConnectionFactory() {
this(DEFAULT_OP_QUEUE_LEN, DEFAULT_READ_BUFFER_SIZE);
}
public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c,
int bufSize) {
OperationFactory of = getOperationFactory();
if (of instanceof AsciiOperationFactory) {
return new AsciiMemcachedNodeImpl(sa, c, bufSize,
createReadOperationQueue(),
createWriteOperationQueue(),
createOperationQueue(),
getOpQueueMaxBlockTime(),
getOperationTimeout());
} else if (of instanceof BinaryOperationFactory) {
boolean doAuth = false;
if (getAuthDescriptor() != null) {
doAuth = true;
}
return new BinaryMemcachedNodeImpl(sa, c, bufSize,
createReadOperationQueue(),
createWriteOperationQueue(),
createOperationQueue(),
getOpQueueMaxBlockTime(),
doAuth,
getOperationTimeout());
} else {
throw new IllegalStateException("Unhandled operation factory type " + of);
}
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#createConnection(java.util.List)
*/
public MemcachedConnection createConnection(List addrs)
throws IOException {
return new MemcachedConnection(getReadBufSize(), this, addrs,
getInitialObservers(), getFailureMode(), getOperationFactory());
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getFailureMode()
*/
public FailureMode getFailureMode() {
return DEFAULT_FAILURE_MODE;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#createOperationQueue()
*/
public BlockingQueue createOperationQueue() {
return new ArrayBlockingQueue(getOpQueueLen());
}
* @see net.spy.memcached.ConnectionFactory#createReadOperationQueue()
*/
public BlockingQueue createReadOperationQueue() {
return new LinkedBlockingQueue();
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#createWriteOperationQueue()
*/
public BlockingQueue createWriteOperationQueue() {
return new LinkedBlockingQueue();
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#createLocator(java.util.List)
*/
public NodeLocator createLocator(List nodes) {
return new ArrayModNodeLocator(nodes, getHashAlg());
}
/**
* Get the op queue length set at construct time.
*/
public int getOpQueueLen() {
return opQueueLen;
}
/**
* @return the maximum time to block waiting for op queue operations to
* complete, in milliseconds, or null for no waiting.
*/
public long getOpQueueMaxBlockTime() {
return DEFAULT_OP_QUEUE_MAX_BLOCK_TIME;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getReadBufSize()
*/
public int getReadBufSize() {
return readBufSize;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getHashAlg()
*/
public HashAlgorithm getHashAlg() {
return hashAlg;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getOperationFactory()
*/
public OperationFactory getOperationFactory() {
return new AsciiOperationFactory();
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getOperationTimeout()
*/
public long getOperationTimeout() {
return DEFAULT_OPERATION_TIMEOUT;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#isDaemon()
*/
public boolean isDaemon() {
return false;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getInitialObservers()
*/
public Collection getInitialObservers() {
return Collections.emptyList();
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getDefaultTranscoder()
*/
public Transcoder
Solution content
}
*
*
*
*/
public class DefaultConnectionFactory extends SpyObject implements
ConnectionFactory {
/**
* Default failure mode.
*/
public static final FailureMode DEFAULT_FAILURE_MODE =
FailureMode.Redistribute;
/**
* Default hash algorithm.
*/
public static final HashAlgorithm DEFAULT_HASH =
DefaultHashAlgorithm.NATIVE_HASH;
/**
* Maximum length of the operation queue returned by this connection factory.
*/
public static final int DEFAULT_OP_QUEUE_LEN = 16384;
/**
* The maximum time to block waiting for op queue operations to complete, in
* milliseconds. The default has been set with the expectation that most
* requests are interactive and waiting for more than a few seconds is thus
* more undesirable than failing the request.
*/
public static final long DEFAULT_OP_QUEUE_MAX_BLOCK_TIME =
TimeUnit.SECONDS.toMillis(10);
/**
* The read buffer size for each server connection from this factory.
*/
public static final int DEFAULT_READ_BUFFER_SIZE = 16384;
/**
return true;
}
* Default operation timeout in milliseconds.
*/
public static final long DEFAULT_OPERATION_TIMEOUT = 2500;
/**
* Maximum amount of time (in seconds) to wait between reconnect attempts.
*/
public static final long DEFAULT_MAX_RECONNECT_DELAY = 30;
/**
* Maximum number + 2 of timeout exception for shutdown connection.
*/
public static final int DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD = 998;
protected final int opQueueLen;
private final int readBufSize;
private final HashAlgorithm hashAlg;
/**
* Construct a DefaultConnectionFactory with the given parameters.
*
* @param qLen the queue length.
* @param bufSize the buffer size
* @param hash the algorithm to use for hashing
*/
public DefaultConnectionFactory(int qLen, int bufSize, HashAlgorithm hash) {
super();
opQueueLen = qLen;
readBufSize = bufSize;
hashAlg = hash;
}
/**
* Create a DefaultConnectionFactory with the given maximum operation queue
* length, and the given read buffer size.
*/
public DefaultConnectionFactory(int qLen, int bufSize) {
this(qLen, bufSize, DEFAULT_HASH);
}
/**
* Create a DefaultConnectionFactory with the default parameters.
*/
public DefaultConnectionFactory() {
this(DEFAULT_OP_QUEUE_LEN, DEFAULT_READ_BUFFER_SIZE);
}
public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c,
int bufSize) {
OperationFactory of = getOperationFactory();
if (of instanceof AsciiOperationFactory) {
return new AsciiMemcachedNodeImpl(sa, c, bufSize,
createReadOperationQueue(),
createWriteOperationQueue(),
createOperationQueue(),
getOpQueueMaxBlockTime(),
getOperationTimeout());
} else if (of instanceof BinaryOperationFactory) {
boolean doAuth = false;
if (getAuthDescriptor() != null) {
doAuth = true;
return new BinaryMemcachedNodeImpl(sa, c, bufSize,
createReadOperationQueue(),
createWriteOperationQueue(),
createOperationQueue(),
getOpQueueMaxBlockTime(),
doAuth,
getOperationTimeout());
} else {
throw new IllegalStateException("Unhandled operation factory type " + of);
}
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#createConnection(java.util.List)
*/
public MemcachedConnection createConnection(List addrs)
throws IOException {
return new MemcachedConnection(getReadBufSize(), this, addrs,
getInitialObservers(), getFailureMode(), getOperationFactory());
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getFailureMode()
*/
public FailureMode getFailureMode() {
return DEFAULT_FAILURE_MODE;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#createOperationQueue()
*/
public BlockingQueue createOperationQueue() {
return new ArrayBlockingQueue(getOpQueueLen());
}
/*
* (non-Javadoc)
* @see net.spy.memcached.ConnectionFactory#createReadOperationQueue()
*/
public BlockingQueue createReadOperationQueue() {
return new LinkedBlockingQueue();
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#createWriteOperationQueue()
*/
public BlockingQueue createWriteOperationQueue() {
return new LinkedBlockingQueue();
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#createLocator(java.util.List)
*/
public NodeLocator createLocator(List nodes) {
return new ArrayModNodeLocator(nodes, getHashAlg());
}
/**
* Get the op queue length set at construct time.
*/
public int getOpQueueLen() {
return opQueueLen;
}
/**
* @return the maximum time to block waiting for op queue operations to
* complete, in milliseconds, or null for no waiting.
*/
public long getOpQueueMaxBlockTime() {
return DEFAULT_OP_QUEUE_MAX_BLOCK_TIME;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getReadBufSize()
*/
public int getReadBufSize() {
return readBufSize;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getHashAlg()
*/
public HashAlgorithm getHashAlg() {
return hashAlg;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getOperationFactory()
*/
public OperationFactory getOperationFactory() {
return new AsciiOperationFactory();
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getOperationTimeout()
*/
public long getOperationTimeout() {
return DEFAULT_OPERATION_TIMEOUT;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#isDaemon()
*/
public boolean isDaemon() {
return false;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getInitialObservers()
*/
public Collection getInitialObservers() {
return Collections.emptyList();
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getDefaultTranscoder()
*/
public Transcoder getDefaultTranscoder() {
return new SerializingTranscoder();
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#useNagleAlgorithm()
*/
public boolean useNagleAlgorithm() {
return false;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#shouldOptimize()
*/
public boolean shouldOptimize() {
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getMaxReconnectDelay()
*/
public long getMaxReconnectDelay() {
return DEFAULT_MAX_RECONNECT_DELAY;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getAuthDescriptor()
*/
public AuthDescriptor getAuthDescriptor() {
return null;
}
/*
* (non-Javadoc)
*
* @see net.spy.memcached.ConnectionFactory#getTimeoutExceptionThreshold()
*/
public int getTimeoutExceptionThreshold() {
return DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD;
}
protected String getName() {
return "DefaultConnectionFactory";
}
@Override
public String toString() {
return "Failure Mode: " + getFailureMode().name() + ", Hash Algorithm: "
+ ((DefaultHashAlgorithm)getHashAlg()).name() + " Max Reconnect Delay: "
+ getMaxReconnectDelay() + ", Max Op Timeout: " + getOperationTimeout()
+ ", Op Queue Length: " + getOpQueueLen() + ", Op Max Queue Block Time"
+ getOpQueueMaxBlockTime() + ", Max Timeout Exception Threshold: "
+ getTimeoutExceptionThreshold() + ", Read Buffer Size: "
+ getReadBufSize() + ", Transcoder: " + getDefaultTranscoder()
+ ", Operation Factory: " + getOperationFactory() + " isDaemon: "
+ isDaemon() + ", Optimized: " + shouldOptimize() + ", Using Nagle: "
+ useNagleAlgorithm() + ", ConnectionFactory: " + getName();
}
}
File
DefaultConnectionFactory.java
Developer's decision
Manual
Kind of conflict
Annotation
Attribute
Class signature
Comment
Method declaration
Method invocation
Chunk
Conflicting content
* }
*
*/
<<<<<<< HEAD
public class MemcachedClient extends SpyObject implements MemcachedClientIF,
ConnectionObserver {
protected volatile boolean shuttingDown = false;
protected final long operationTimeout;
protected final MemcachedConnection mconn;
protected final OperationFactory opFact;
protected final Transcoder transcoder;
protected final TranscodeService tcService;
protected final AuthDescriptor authDescriptor;
protected final ConnectionFactory connFactory;
protected final AuthThreadMonitor authMonitor = new AuthThreadMonitor();
/**
* Get a memcache client operating on the specified memcached locations.
*
* @param ia the memcached locations
* @throws IOException if connections cannot be established
*/
public MemcachedClient(InetSocketAddress... ia) throws IOException {
this(new DefaultConnectionFactory(), Arrays.asList(ia));
}
/**
* Get a memcache client over the specified memcached locations.
*
* @param addrs the socket addrs
* @throws IOException if connections cannot be established
*/
public MemcachedClient(List addrs) throws IOException {
this(new DefaultConnectionFactory(), addrs);
}
/**
* Get a memcache client over the specified memcached locations.
*
* @param cf the connection factory to configure connections for this client
* @param addrs the socket addresses
* @throws IOException if connections cannot be established
*/
public MemcachedClient(ConnectionFactory cf, List addrs)
throws IOException {
if (cf == null) {
throw new NullPointerException("Connection factory required");
}
if (addrs == null) {
throw new NullPointerException("Server list required");
}
if (addrs.isEmpty()) {
throw new IllegalArgumentException("You must have at least one server to"
+ " connect to");
}
if (cf.getOperationTimeout() <= 0) {
throw new IllegalArgumentException("Operation timeout must be positive.");
}
connFactory = cf;
tcService = new TranscodeService(cf.isDaemon());
transcoder = cf.getDefaultTranscoder();
opFact = cf.getOperationFactory();
assert opFact != null : "Connection factory failed to make op factory";
mconn = cf.createConnection(addrs);
assert mconn != null : "Connection factory failed to make a connection";
operationTimeout = cf.getOperationTimeout();
authDescriptor = cf.getAuthDescriptor();
if (authDescriptor != null) {
addObserver(this);
}
}
/**
* Get the addresses of available servers.
*
*
* This is based on a snapshot in time so shouldn't be considered completely
* accurate, but is a useful for getting a feel for what's working and what's
* not working.
*
*
* @return point-in-time view of currently available servers
*/
public Collection getAvailableServers() {
ArrayList rv = new ArrayList();
for (MemcachedNode node : mconn.getLocator().getAll()) {
if (node.isActive()) {
rv.add(node.getSocketAddress());
}
}
return rv;
}
/**
* Get the addresses of unavailable servers.
*
*
* This is based on a snapshot in time so shouldn't be considered completely
* accurate, but is a useful for getting a feel for what's working and what's
* not working.
*
*
* @return point-in-time view of currently available servers
*/
public Collection getUnavailableServers() {
ArrayList rv = new ArrayList();
for (MemcachedNode node : mconn.getLocator().getAll()) {
if (!node.isActive()) {
rv.add(node.getSocketAddress());
}
}
return rv;
}
/**
*
* Get a read-only wrapper around the node locator wrapping this instance.
*
* @return this instance's NodeLocator
*/
public NodeLocator getNodeLocator() {
return mconn.getLocator().getReadonlyCopy();
}
/**
* Get the default transcoder that's in use.
*
* @return this instance's Transcoder
*/
public Transcoder getTranscoder() {
return transcoder;
}
private void validateKey(String key) {
byte[] keyBytes = KeyUtil.getKeyBytes(key);
if (keyBytes.length > MAX_KEY_LENGTH) {
throw new IllegalArgumentException("Key is too long (maxlen = "
+ MAX_KEY_LENGTH + ")");
}
if (keyBytes.length == 0) {
throw new IllegalArgumentException(
"Key must contain at least one character.");
}
// Validate the key
for (byte b : keyBytes) {
if (b == ' ' || b == '\n' || b == '\r' || b == 0) {
throw new IllegalArgumentException(
"Key contains invalid characters: ``" + key + "''");
}
}
}
/**
* (internal use) Add a raw operation to a numbered connection. This method is
* exposed for testing.
*
* @param which server number
* @param op the operation to perform
* @return the Operation
*/
Operation addOp(final String key, final Operation op) {
validateKey(key);
mconn.checkState();
mconn.addOperation(key, op);
return op;
}
CountDownLatch broadcastOp(final BroadcastOpFactory of) {
return broadcastOp(of, mconn.getLocator().getAll(), true);
}
CountDownLatch broadcastOp(final BroadcastOpFactory of,
Collection nodes) {
return broadcastOp(of, nodes, true);
}
private CountDownLatch broadcastOp(BroadcastOpFactory of,
Collection nodes, boolean checkShuttingDown) {
if (checkShuttingDown && shuttingDown) {
throw new IllegalStateException("Shutting down");
}
return mconn.broadcastOperation(of, nodes);
}
private OperationFuture asyncStore(StoreType storeType,
String key, int exp, T value, Transcoder tc) {
CachedData co = tc.encode(value);
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture rv =
new OperationFuture(key, latch, operationTimeout);
Operation op = opFact.store(storeType, key, co.getFlags(), exp,
co.getData(), new OperationCallback() {
public void receivedStatus(OperationStatus val) {
rv.set(val.isSuccess(), val);
}
public void complete() {
latch.countDown();
}
});
rv.setOperation(op);
addOp(key, op);
return rv;
}
private OperationFuture asyncStore(StoreType storeType, String key,
int exp, Object value) {
return asyncStore(storeType, key, exp, value, transcoder);
}
private OperationFuture asyncCat(ConcatenationType catType,
long cas, String key, T value, Transcoder tc) {
CachedData co = tc.encode(value);
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture rv = new OperationFuture(key,
latch, operationTimeout);
Operation op = opFact.cat(catType, cas, key, co.getData(),
new OperationCallback() {
public void receivedStatus(OperationStatus val) {
rv.set(val.isSuccess(), val);
}
public void complete() {
latch.countDown();
}
});
rv.setOperation(op);
addOp(key, op);
return rv;
}
/**
* Touch the given key to reset its expiration time with the default
* transcoder.
*
* @param key the key to fetch
* @param exp the new expiration to set for the given key
* @return a future that will hold the return value of whether or not the
* fetch succeeded
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture touch(final String key, final int exp) {
return touch(key, exp, transcoder);
}
/**
* Touch the given key to reset its expiration time.
*
* @param key the key to fetch
* @param exp the new expiration to set for the given key
* @param tc the transcoder to serialize and unserialize value
* @return a future that will hold the return value of whether or not the
* fetch succeeded
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture touch(final String key, final int exp,
final Transcoder tc) {
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture rv =
new OperationFuture(key, latch, operationTimeout);
Operation op = opFact.touch(key, exp, new OperationCallback() {
public void receivedStatus(OperationStatus status) {
rv.set(status.isSuccess(), status);
}
public void complete() {
latch.countDown();
}
});
rv.setOperation(op);
addOp(key, op);
return rv;
}
/**
* Append to an existing value in the cache.
*
*
* Note that the return will be false any time a mutation has not occurred.
*
*
* @param cas cas identifier (ignored in the ascii protocol)
* @param key the key to whose value will be appended
* @param val the value to append
* @return a future indicating success, false if there was no change to the
* value
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture append(long cas, String key, Object val) {
return append(cas, key, val, transcoder);
}
/**
* Append to an existing value in the cache.
*
*
* Note that the return will be false any time a mutation has not occurred.
*
*
* @param
* @param cas cas identifier (ignored in the ascii protocol)
* @param key the key to whose value will be appended
* @param val the value to append
* @param tc the transcoder to serialize and unserialize the value
* @return a future indicating success
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture append(long cas, String key, T val,
Transcoder tc) {
return asyncCat(ConcatenationType.append, cas, key, val, tc);
}
/**
* Prepend to an existing value in the cache.
*
*
* Note that the return will be false any time a mutation has not occurred.
*
*
* @param cas cas identifier (ignored in the ascii protocol)
* @param key the key to whose value will be prepended
* @param val the value to append
* @return a future indicating success
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture prepend(long cas, String key, Object val) {
return prepend(cas, key, val, transcoder);
}
/**
* Prepend to an existing value in the cache.
*
*
* Note that the return will be false any time a mutation has not occurred.
*
*
* @param
* @param cas cas identifier (ignored in the ascii protocol)
* @param key the key to whose value will be prepended
* @param val the value to append
* @param tc the transcoder to serialize and unserialize the value
* @return a future indicating success
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture prepend(long cas, String key, T val,
Transcoder tc) {
return asyncCat(ConcatenationType.prepend, cas, key, val, tc);
}
/**
* Asynchronous CAS operation.
*
* @param
* @param key the key
* @param casId the CAS identifier (from a gets operation)
* @param value the new value
* @param tc the transcoder to serialize and unserialize the value
* @return a future that will indicate the status of the CAS
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public Future asyncCAS(String key, long casId, T value,
Transcoder tc) {
return asyncCAS(key, casId, 0, value, tc);
}
/**
* Asynchronous CAS operation.
*
* @param
* @param key the key
* @param casId the CAS identifier (from a gets operation)
* @param exp the expiration of this object
* @param value the new value
* @param tc the transcoder to serialize and unserialize the value
* @return a future that will indicate the status of the CAS
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public Future asyncCAS(String key, long casId, int exp,
T value, Transcoder tc) {
CachedData co = tc.encode(value);
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture rv =
new OperationFuture(key, latch, operationTimeout);
Operation op = opFact.cas(StoreType.set, key, casId, co.getFlags(), exp,
co.getData(), new OperationCallback() {
public void receivedStatus(OperationStatus val) {
if (val instanceof CASOperationStatus) {
rv.set(((CASOperationStatus) val).getCASResponse(), val);
} else if (val instanceof CancelledOperationStatus) {
getLogger().debug("CAS operation cancelled");
} else {
throw new RuntimeException("Unhandled state: " + val);
}
}
public void complete() {
latch.countDown();
}
});
rv.setOperation(op);
addOp(key, op);
return rv;
}
/**
* Asynchronous CAS operation using the default transcoder.
*
* @param key the key
* @param casId the CAS identifier (from a gets operation)
* @param value the new value
* @return a future that will indicate the status of the CAS
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public Future asyncCAS(String key, long casId, Object value) {
return asyncCAS(key, casId, value, transcoder);
}
/**
* Perform a synchronous CAS operation.
*
* @param
* @param key the key
* @param casId the CAS identifier (from a gets operation)
* @param value the new value
* @param tc the transcoder to serialize and unserialize the value
* @return a CASResponse
* @throws OperationTimeoutException if global operation timeout is exceeded
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public CASResponse cas(String key, long casId, T value,
Transcoder tc) {
return cas(key, casId, 0, value, tc);
}
/**
* Perform a synchronous CAS operation.
*
* @param
* @param key the key
* @param casId the CAS identifier (from a gets operation)
* @param exp the expiration of this object
* @param value the new value
* @param tc the transcoder to serialize and unserialize the value
* @return a CASResponse
* @throws OperationTimeoutException if global operation timeout is exceeded
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public CASResponse cas(String key, long casId, int exp, T value,
Transcoder tc) {
try {
return asyncCAS(key, casId, exp, value, tc).get(operationTimeout,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
throw new RuntimeException("Exception waiting for value", e);
} catch (TimeoutException e) {
throw new OperationTimeoutException("Timeout waiting for value", e);
}
}
/**
* Perform a synchronous CAS operation with the default transcoder.
*
* @param key the key
* @param casId the CAS identifier (from a gets operation)
* @param value the new value
* @return a CASResponse
* @throws OperationTimeoutException if the global operation timeout is
* exceeded
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public CASResponse cas(String key, long casId, Object value) {
return cas(key, casId, value, transcoder);
}
/**
* Add an object to the cache iff it does not exist already.
*
*
* The exp value is passed along to memcached exactly as given,
* and will be processed per the memcached protocol specification:
*
*
*
* Note that the return will be false any time a mutation has not occurred.
*
*
*
*
* The actual value sent may either be Unix time (number of seconds since
* January 1, 1970, as a 32-bit value), or a number of seconds starting from
* current time. In the latter case, this number of seconds may not exceed
* 60*60*24*30 (number of seconds in 30 days); if the number sent by a client
* is larger than that, the server will consider it to be real Unix time value
* rather than an offset from current time.
*
*
*
* @param
*
* @param key the key under which this object should be added.
* @param exp the expiration of this object
* @param o the object to store
* @param tc the transcoder to serialize and unserialize the value
* @return a future representing the processing of this operation
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture add(String key, int exp, T o,
Transcoder tc) {
return asyncStore(StoreType.add, key, exp, o, tc);
}
/**
* Add an object to the cache (using the default transcoder) iff it does not
* exist already.
*
*
* The exp value is passed along to memcached exactly as given,
* and will be processed per the memcached protocol specification:
*
*
*
* Note that the return will be false any time a mutation has not occurred.
*
*
*
*
* The actual value sent may either be Unix time (number of seconds since
* January 1, 1970, as a 32-bit value), or a number of seconds starting from
* current time. In the latter case, this number of seconds may not exceed
* 60*60*24*30 (number of seconds in 30 days); if the number sent by a client
* is larger than that, the server will consider it to be real Unix time value
* rather than an offset from current time.
*
*
*
* @param key the key under which this object should be added.
* @param exp the expiration of this object
* @param o the object to store
* @return a future representing the processing of this operation
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture add(String key, int exp, Object o) {
return asyncStore(StoreType.add, key, exp, o, transcoder);
}
/**
* Set an object in the cache regardless of any existing value.
*
*
* The exp value is passed along to memcached exactly as given,
* and will be processed per the memcached protocol specification:
*
*
*
* Note that the return will be false any time a mutation has not occurred.
*
*
*
*
* The actual value sent may either be Unix time (number of seconds since
* January 1, 1970, as a 32-bit value), or a number of seconds starting from
* current time. In the latter case, this number of seconds may not exceed
* 60*60*24*30 (number of seconds in 30 days); if the number sent by a client
* is larger than that, the server will consider it to be real Unix time value
* rather than an offset from current time.
*
*
*
* @param
* @param key the key under which this object should be added.
* @param exp the expiration of this object
* @param o the object to store
* @param tc the transcoder to serialize and unserialize the value
* @return a future representing the processing of this operation
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture set(String key, int exp, T o,
Transcoder tc) {
return asyncStore(StoreType.set, key, exp, o, tc);
}
/**
* Set an object in the cache (using the default transcoder) regardless of any
* existing value.
*
*
* The exp value is passed along to memcached exactly as given,
* and will be processed per the memcached protocol specification:
*
*
* Note that the return will be false any time a mutation has not occurred.
*
*
*
*
* The actual value sent may either be Unix time (number of seconds since
* January 1, 1970, as a 32-bit value), or a number of seconds starting from
* current time. In the latter case, this number of seconds may not exceed
* 60*60*24*30 (number of seconds in 30 days); if the number sent by a client
* is larger than that, the server will consider it to be real Unix time value
* rather than an offset from current time.
*
*
*
* @param key the key under which this object should be added.
* @param exp the expiration of this object
* @param o the object to store
* @return a future representing the processing of this operation
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture set(String key, int exp, Object o) {
return asyncStore(StoreType.set, key, exp, o, transcoder);
}
/**
* Replace an object with the given value iff there is already a value for the
* given key.
*
*
* The exp value is passed along to memcached exactly as given,
* and will be processed per the memcached protocol specification:
*
*
* Note that the return will be false any time a mutation has not occurred.
*
*
*
*
* The actual value sent may either be Unix time (number of seconds since
* January 1, 1970, as a 32-bit value), or a number of seconds starting from
* current time. In the latter case, this number of seconds may not exceed
* 60*60*24*30 (number of seconds in 30 days); if the number sent by a client
* is larger than that, the server will consider it to be real Unix time value
* rather than an offset from current time.
*
*
*
* @param
* @param key the key under which this object should be added.
* @param exp the expiration of this object
* @param o the object to store
* @param tc the transcoder to serialize and unserialize the value
* @return a future representing the processing of this operation
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture replace(String key, int exp, T o,
Transcoder tc) {
return asyncStore(StoreType.replace, key, exp, o, tc);
}
/**
* Replace an object with the given value (transcoded with the default
* transcoder) iff there is already a value for the given key.
*
*
* The exp value is passed along to memcached exactly as given,
* and will be processed per the memcached protocol specification:
*
*
*
* Note that the return will be false any time a mutation has not occurred.
*
*
*
*
* The actual value sent may either be Unix time (number of seconds since
* January 1, 1970, as a 32-bit value), or a number of seconds starting from
* current time. In the latter case, this number of seconds may not exceed
* 60*60*24*30 (number of seconds in 30 days); if the number sent by a client
* is larger than that, the server will consider it to be real Unix time value
* rather than an offset from current time.
*
*
*
* @param key the key under which this object should be added.
* @param exp the expiration of this object
* @param o the object to store
* @return a future representing the processing of this operation
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture replace(String key, int exp, Object o) {
return asyncStore(StoreType.replace, key, exp, o, transcoder);
}
/**
* Get the given key asynchronously.
*
* @param
* @param key the key to fetch
* @param tc the transcoder to serialize and unserialize value
* @return a future that will hold the return value of the fetch
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public GetFuture asyncGet(final String key, final Transcoder tc) {
final CountDownLatch latch = new CountDownLatch(1);
final GetFuture rv = new GetFuture(latch, operationTimeout, key);
Operation op = opFact.get(key, new GetOperation.Callback() {
private Future val = null;
public void receivedStatus(OperationStatus status) {
rv.set(val, status);
}
public void gotData(String k, int flags, byte[] data) {
assert key.equals(k) : "Wrong key returned";
val =
tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
}
public void complete() {
latch.countDown();
}
});
rv.setOperation(op);
addOp(key, op);
return rv;
}
/**
* Get the given key asynchronously and decode with the default transcoder.
*
* @param key the key to fetch
* @return a future that will hold the return value of the fetch
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public GetFuture asyncGet(final String key) {
return asyncGet(key, transcoder);
}
/**
* Gets (with CAS support) the given key asynchronously.
*
* @param
* @param key the key to fetch
* @param tc the transcoder to serialize and unserialize value
* @return a future that will hold the return value of the fetch
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture> asyncGets(final String key,
final Transcoder tc) {
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture> rv =
new OperationFuture>(key, latch, operationTimeout);
Operation op = opFact.gets(key, new GetsOperation.Callback() {
private CASValue val = null;
public void receivedStatus(OperationStatus status) {
rv.set(val, status);
}
public void gotData(String k, int flags, long cas, byte[] data) {
assert key.equals(k) : "Wrong key returned";
assert cas > 0 : "CAS was less than zero: " + cas;
val =
new CASValue(cas, tc.decode(new CachedData(flags, data,
tc.getMaxSize())));
}
public void complete() {
latch.countDown();
}
});
rv.setOperation(op);
addOp(key, op);
return rv;
}
/**
* Gets (with CAS support) the given key asynchronously and decode using the
* default transcoder.
*
* @param key the key to fetch
* @return a future that will hold the return value of the fetch
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public OperationFuture> asyncGets(final String key) {
return asyncGets(key, transcoder);
}
/**
* Gets (with CAS support) with a single key.
*
* @param
* @param key the key to get
* @param tc the transcoder to serialize and unserialize value
* @return the result from the cache and CAS id (null if there is none)
* @throws OperationTimeoutException if global operation timeout is exceeded
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public CASValue gets(String key, Transcoder tc) {
try {
return asyncGets(key, tc).get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
throw new RuntimeException("Exception waiting for value", e);
} catch (TimeoutException e) {
throw new OperationTimeoutException("Timeout waiting for value", e);
}
}
/**
* Get with a single key and reset its expiration.
*
* @param
* @param key the key to get
val =
* @param exp the new expiration for the key
* @param tc the transcoder to serialize and unserialize value
* @return the result from the cache (null if there is none)
* @throws OperationTimeoutException if the global operation timeout is
* exceeded
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public CASValue getAndTouch(String key, int exp, Transcoder tc) {
try {
return asyncGetAndTouch(key, exp, tc).get(operationTimeout,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
throw new RuntimeException("Exception waiting for value", e);
} catch (TimeoutException e) {
throw new OperationTimeoutException("Timeout waiting for value", e);
}
}
/**
* Get a single key and reset its expiration using the default transcoder.
*
* @param key the key to get
* @param exp the new expiration for the key
* @return the result from the cache and CAS id (null if there is none)
* @throws OperationTimeoutException if the global operation timeout is
* exceeded
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public CASValue getAndTouch(String key, int exp) {
return getAndTouch(key, exp, transcoder);
}
/**
* Gets (with CAS support) with a single key using the default transcoder.
*
* @param key the key to get
* @return the result from the cache and CAS id (null if there is none)
* @throws OperationTimeoutException if the global operation timeout is
* exceeded
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public CASValue gets(String key) {
return gets(key, transcoder);
}
/**
* Get with a single key.
*
* @param
* @param key the key to get
* @param tc the transcoder to serialize and unserialize value
* @return the result from the cache (null if there is none)
* @throws OperationTimeoutException if the global operation timeout is
* exceeded
return getBulk(Arrays.asList(keys), tc);
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public T get(String key, Transcoder tc) {
try {
return asyncGet(key, tc).get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
throw new RuntimeException("Exception waiting for value", e);
} catch (TimeoutException e) {
throw new OperationTimeoutException("Timeout waiting for value", e);
}
}
/**
* Get with a single key and decode using the default transcoder.
*
* @param key the key to get
* @return the result from the cache (null if there is none)
* @throws OperationTimeoutException if the global operation timeout is
* exceeded
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public Object get(String key) {
return get(key, transcoder);
}
/**
* Asynchronously get a bunch of objects from the cache.
*
* @param
* @param keys the keys to request
* @param tcIter an iterator of transcoders to serialize and unserialize
* values; the transcoders are matched with the keys in the same
* order. The minimum of the key collection length and number of
* transcoders is used and no exception is thrown if they do not
* match
* @return a Future result of that fetch
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
public BulkFuture