| Chunk |
|---|
| Conflicting content |
|---|
*/
public ClientCommand(ByteBuffer input) {
commandType = CommandType.values()[input.getInt()];
<<<<<<< HEAD
// Discard the next int, size of request.
input.getInt();
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
request = ClientRequest.create(input);
}
|
| Solution content |
|---|
*/
public ClientCommand(ByteBuffer input) {
commandType = CommandType.values()[input.getInt()];
request = ClientRequest.create(input);
}
|
| File |
|---|
| ClientCommand.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
*
* @see Reply
*/
<<<<<<< HEAD:src/lsr/common/ClientRequest.java
public final class ClientRequest implements Serializable {
=======
public final class ClientRequest implements Serializable, RequestType {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/common/ClientRequest.java
/*
* The Request class should be final. The custome deserialization does not
* respect class hierarchy, so any class derived from request would be |
| Solution content |
|---|
*
* @see Reply
*/
public final class ClientRequest implements Serializable, RequestType {
/*
* The Request class should be final. The custome deserialization does not
* respect class hierarchy, so any class derived from request would be |
| File |
|---|
| ClientRequest.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Class signature |
| Chunk |
|---|
| Conflicting content |
|---|
if (obj == null || obj.getClass() != this.getClass()) {
return false;
}
<<<<<<< HEAD:src/lsr/common/ClientRequest.java
ClientRequest request = (ClientRequest) obj;
=======
ClientRequest request = (ClientRequest) obj;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/common/ClientRequest.java
if (requestId.equals(request.requestId)) {
assert Arrays.equals(value, request.value) : "Critical: identical RequestID, different value";
return true; |
| Solution content |
|---|
if (obj == null || obj.getClass() != this.getClass()) {
return false;
}
ClientRequest request = (ClientRequest) obj;
if (requestId.equals(request.requestId)) {
assert Arrays.equals(value, request.value) : "Critical: identical RequestID, different value";
return true; |
| File |
|---|
| ClientRequest.java |
| Developer's decision |
|---|
| Version 1 |
| Kind of conflict |
|---|
| Cast expression |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
public interface Dispatcher {
<<<<<<< HEAD
=======
public static interface Task {
/**
* Attempts to cancel execution of this task. This attempt will fail if
* the task has already completed, has already been canceled, or could
* not be canceled for some other reason. If successful, and this task
* has not started when cancel is called, this task should never run.
*
* Subsequent calls to isCancelled() will always return true if this
* method was called.
*/
void cancel();
/**
* Returns the remaining delay associated with this task, in
* milliseconds.
*
* @return the remaining delay; zero or negative values indicate that
* the delay has already elapsed
*/
long getDelay();
/**
* Returns true if the |
| Solution content |
|---|
public interface Dispatcher {
public static interface Task {
/**
* Attempts to cancel execution of this task. This attempt will fail if
* the task has already completed, has already been canceled, or could
* not be canceled for some other reason. If successful, and this task
* has not started when cancel is called, this task should never run.
*
* Subsequent calls to isCancelled() will always return true if this
* method was called.
*/
void cancel();
/**
* Returns the remaining delay associated with this task, in
* milliseconds.
*
* @return the remaining delay; zero or negative values indicate that
* the delay has already elapsed
*/
long getDelay();
/**
* Returns true if the |
| File |
|---|
| Dispatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Interface declaration |
| Chunk |
|---|
| Conflicting content |
|---|
*
* @return a PriorityTask representing pending completion of the task.
*/
<<<<<<< HEAD
PriorityTask dispatch(Runnable task);
=======
Task dispatch(Runnable task);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
/**
* Creates and executes one-shot action that becomes enabled after the given |
| Solution content |
|---|
*
* @return a PriorityTask representing pending completion of the task.
*/
Task dispatch(Runnable task);
/**
* Creates and executes one-shot action that becomes enabled after the given |
| File |
|---|
| Dispatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method interface |
| Chunk |
|---|
| Conflicting content |
|---|
/** If a TCP connection fails, how much to wait for another try */
public static final String TCP_RECONNECT_TIMEOUT = "TcpReconnectMilisecs";
public static final long DEFAULT_TCP_RECONNECT_TIMEOUT = 1000;
<<<<<<< HEAD
=======
/** ??? Corresponds to a ethernet frame */
public final static String FORWARD_MAX_BATCH_SIZE = "replica.ForwardMaxBatchSize";
public final static int DEFAULT_FORWARD_MAX_BATCH_SIZE = 1450;
/** ??? In milliseconds */
public final static String FORWARD_MAX_BATCH_DELAY = "replica.ForwardMaxBatchDelay";
public final static int DEFAULT_FORWARD_MAX_BATCH_DELAY = 50;
/** How many selector threads to use */
public static final String SELECTOR_THREADS = "replica.SelectorThreads";
public static final int DEFAULT_SELECTOR_THREADS = -1;
/**
* Size of a buffer for reading client requests; larger requests than this
* size will cause extra memory allocation and freeing at each such request.
* This variable impacts memory usage, as each client connection
* pre-allocates such buffer.
*/
public static final String CLIENT_REQUEST_BUFFER_SIZE = "replica.ClientRequestBufferSize";
public static final int DEFAULT_CLIENT_REQUEST_BUFFER_SIZE = 8 * 1024 + ClientCommand.HEADERS_SIZE;
/**
* How long can the proposer / catch-up wait for batch values during view
* change / catching up, in milliseconds
*/
private static final String MAX_BATCH_FETCHING_TIME_MS = "TimeoutFetchBatchValue";
private static final int DEFAULT_MAX_BATCH_FETCHING_TIME_MS = 2500;
private static final String MULTICAST_PORT = "MulticastPort";
private static final int DEFAULT_MULTICAST_PORT = 3000;
private static final String MULTICAST_IP_ADDRESS = "MulticastIpAddress";
private static final String DEFAULT_MULTICAST_IP_ADDRESS = "224.0.0.144";
private static final String MTU = "NetworkMtuSize";
private static final int DEFAULT_MTU = 1492;
private static final String INDIRECT_CONSENSUS = "IndirectConsensus";
private static final boolean DEFAULT_INDIRECT_CONSENSUS = false;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
/*
* Exposing fields is generally not good practice, but here they are made |
| Solution content |
|---|
/** If a TCP connection fails, how much to wait for another try */
public static final String TCP_RECONNECT_TIMEOUT = "TcpReconnectMilisecs";
public static final long DEFAULT_TCP_RECONNECT_TIMEOUT = 1000;
/** ??? Corresponds to a ethernet frame */
public final static String FORWARD_MAX_BATCH_SIZE = "replica.ForwardMaxBatchSize";
public final static int DEFAULT_FORWARD_MAX_BATCH_SIZE = 1450;
/** ??? In milliseconds */
public final static String FORWARD_MAX_BATCH_DELAY = "replica.ForwardMaxBatchDelay";
public final static int DEFAULT_FORWARD_MAX_BATCH_DELAY = 50;
/** How many selector threads to use */
public static final String SELECTOR_THREADS = "replica.SelectorThreads";
public static final int DEFAULT_SELECTOR_THREADS = -1;
/**
* Size of a buffer for reading client requests; larger requests than this
* size will cause extra memory allocation and freeing at each such request.
* This variable impacts memory usage, as each client connection
* pre-allocates such buffer.
*/
public static final String CLIENT_REQUEST_BUFFER_SIZE = "replica.ClientRequestBufferSize";
public static final int DEFAULT_CLIENT_REQUEST_BUFFER_SIZE = 8 * 1024 + ClientCommand.HEADERS_SIZE;
/**
* How long can the proposer / catch-up wait for batch values during view
* change / catching up, in milliseconds
*/
private static final String MAX_BATCH_FETCHING_TIME_MS = "TimeoutFetchBatchValue";
private static final int DEFAULT_MAX_BATCH_FETCHING_TIME_MS = 2500;
private static final String MULTICAST_PORT = "MulticastPort";
private static final int DEFAULT_MULTICAST_PORT = 3000;
private static final String MULTICAST_IP_ADDRESS = "MulticastIpAddress";
private static final String DEFAULT_MULTICAST_IP_ADDRESS = "224.0.0.144";
private static final String MTU = "NetworkMtuSize";
private static final int DEFAULT_MTU = 1492;
private static final String INDIRECT_CONSENSUS = "IndirectConsensus";
private static final boolean DEFAULT_INDIRECT_CONSENSUS = false;
/*
* Exposing fields is generally not good practice, but here they are made |
| File |
|---|
| ProcessDescriptor.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
public final int fdSuspectTimeout;
public final int fdSendTimeout;
<<<<<<< HEAD
/*
* Singleton class with static access. This allows any class on the JVM to
* statically access the process descriptor without needing to be given a
* reference.
*/
public static ProcessDescriptor processDescriptor;
public static void initialize(Configuration config, int localId) {
ProcessDescriptor.processDescriptor = new ProcessDescriptor(config, localId);
}
public static ProcessDescriptor getInstance() {
return processDescriptor;
=======
public final int forwardBatchMaxSize;
public final int forwardBatchMaxDelay;
public final int selectorThreadCount;
public final int clientRequestBufferSize;
/** ⌊(n+1)/2⌋ */
public final int majority;
public final long maxBatchFetchingTimeoutMs;
public final int multicastPort;
public final String multicastIpAddress;
public final int mtu;
public final boolean indirectConsensus;
/**
* The singleton instance of process descriptor. Must be initialized before
* use.
*/
public static ProcessDescriptor processDescriptor = null;
public static void initialize(Configuration config, int localId) {
ProcessDescriptor.processDescriptor = new ProcessDescriptor(config, localId);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
private ProcessDescriptor(Configuration config, int localId) { |
| Solution content |
|---|
public final int fdSuspectTimeout;
public final int fdSendTimeout;
public final int forwardBatchMaxSize;
public final int forwardBatchMaxDelay;
public final int selectorThreadCount;
public final int clientRequestBufferSize;
/** ⌊(n+1)/2⌋ */
public final int majority;
public final long maxBatchFetchingTimeoutMs;
public final int multicastPort;
public final String multicastIpAddress;
public final int mtu;
public final boolean indirectConsensus;
/**
* The singleton instance of process descriptor. Must be initialized before
* use.
*/
public static ProcessDescriptor processDescriptor = null;
public static void initialize(Configuration config, int localId) {
ProcessDescriptor.processDescriptor = new ProcessDescriptor(config, localId);
}
private ProcessDescriptor(Configuration config, int localId) { |
| File |
|---|
| ProcessDescriptor.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Comment |
| Method declaration |
| Method invocation |
| Method signature |
| Return statement |
| Chunk |
|---|
| Conflicting content |
|---|
}
this.crashModel = crashModel;
<<<<<<< HEAD
this.firstSnapshotSizeEstimate = config.getIntProperty(
FIRST_SNAPSHOT_SIZE_ESTIMATE,
DEFAULT_FIRST_SNAPSHOT_SIZE_ESTIMATE);
this.snapshotMinLogSize = Math.max(1, config.getIntProperty(SNAPSHOT_MIN_LOG_SIZE,
DEFAULT_SNAPSHOT_MIN_LOG_SIZE));
this.snapshotAskRatio = config.getDoubleProperty(SNAPSHOT_ASK_RATIO,
DEFAULT_SNAPSHOT_ASK_RATIO);
this.snapshotForceRatio = config.getDoubleProperty(SNAPSHOT_FORCE_RATIO,
DEFAULT_SNAPSHOT_FORCE_RATIO);
this.minSnapshotSampling = config.getIntProperty(MIN_SNAPSHOT_SAMPLING,
DEFAULT_MIN_SNAPSHOT_SAMPLING);
this.retransmitTimeout = config.getLongProperty(RETRANSMIT_TIMEOUT,
DEFAULT_RETRANSMIT_TIMEOUT);
this.periodicCatchupTimeout = config.getLongProperty(PERIODIC_CATCHUP_TIMEOUT,
DEFAULT_PERIODIC_CATCHUP_TIMEOUT);
this.tcpReconnectTimeout = config.getLongProperty(TCP_RECONNECT_TIMEOUT,
DEFAULT_TCP_RECONNECT_TIMEOUT);
this.fdSuspectTimeout = config.getIntProperty(FD_SUSPECT_TO,
DEFAULT_FD_SUSPECT_TO);
this.fdSendTimeout = config.getIntProperty(FD_SEND_TO,
DEFAULT_FD_SEND_TO);
logger.config(config.toString());
logger.config("Configuration: " + WINDOW_SIZE + "=" + windowSize + ", " +
BATCH_SIZE + "=" + batchingLevel + ", " + MAX_BATCH_DELAY +
"=" + maxBatchDelay + ", " + MAX_UDP_PACKET_SIZE + "=" +
maxUdpPacketSize + ", " + NETWORK + "=" + network + ", " +
MAY_SHARE_SNAPSHOTS + "=" + mayShareSnapshots + ", " +
BENCHMARK_RUN_REPLICA + "=" + benchmarkRunReplica + ", " +
CLIENT_ID_GENERATOR + "=" + clientIDGenerator);
=======
majority = (numReplicas + 1) / 2;
printProcessDescriptor(config, crashModel);
}
private void printProcessDescriptor(Configuration config, CrashModel crashModel) {
logger.config(config.toString());
logger.config("Configuration: " + WINDOW_SIZE + "=" + windowSize + ", " +
BATCH_SIZE + "=" + batchingLevel + ", " + MAX_BATCH_DELAY +
"=" + maxBatchDelay + ", " + MAX_UDP_PACKET_SIZE + "=" +
maxUdpPacketSize + ", " + NETWORK + "=" + network + ", " +
CLIENT_ID_GENERATOR + "=" + clientIDGenerator);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
logger.config("Failure Detection: " + FD_SEND_TO + "=" + fdSendTimeout + ", " +
FD_SUSPECT_TO + "=" + fdSuspectTimeout);
logger.config("Crash model: " + crashModel + ", LogPath: " + logPath); |
| Solution content |
|---|
}
this.crashModel = crashModel;
majority = (numReplicas + 1) / 2;
printProcessDescriptor(config, crashModel);
}
private void printProcessDescriptor(Configuration config, CrashModel crashModel) {
logger.config(config.toString());
logger.config("Configuration: " + WINDOW_SIZE + "=" + windowSize + ", " +
BATCH_SIZE + "=" + batchingLevel + ", " + MAX_BATCH_DELAY +
"=" + maxBatchDelay + ", " + MAX_UDP_PACKET_SIZE + "=" +
maxUdpPacketSize + ", " + NETWORK + "=" + network + ", " +
CLIENT_ID_GENERATOR + "=" + clientIDGenerator);
logger.config("Failure Detection: " + FD_SEND_TO + "=" + fdSendTimeout + ", " +
FD_SUSPECT_TO + "=" + fdSuspectTimeout);
logger.config("Crash model: " + crashModel + ", LogPath: " + logPath); |
| File |
|---|
| ProcessDescriptor.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
* The reply to client request. It is send to client when replica execute this
* command on state machine.
*
<<<<<<< HEAD
* @see ClientRequest
=======
* @see ClientBatch
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
*/
public class Reply implements Serializable {
private static final long serialVersionUID = 1L; |
| Solution content |
|---|
* The reply to client request. It is send to client when replica execute this
* command on state machine.
*
* @see ClientBatch
*/
public class Reply implements Serializable {
private static final long serialVersionUID = 1L; |
| File |
|---|
| Reply.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
public final class SelectorThread extends Thread {
private final Selector selector;
<<<<<<< HEAD
=======
/** lock for tasks object; tasks cannot be used, as it is recreated often */
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
private final Object taskLock = new Object();
/** list of active tasks waiting for execution in selector thread */ |
| Solution content |
|---|
public final class SelectorThread extends Thread {
private final Selector selector;
/** lock for tasks object; tasks cannot be used, as it is recreated often */
private final Object taskLock = new Object();
/** list of active tasks waiting for execution in selector thread */ |
| File |
|---|
| SelectorThread.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
*/
public SelectorThread(int i) throws IOException {
super("ClientIO-" + i);
<<<<<<< HEAD
=======
setDaemon(true);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
setDefaultUncaughtExceptionHandler(new KillOnExceptionHandler());
selector = Selector.open();
} |
| Solution content |
|---|
*/
public SelectorThread(int i) throws IOException {
super("ClientIO-" + i);
setDaemon(true);
setDefaultUncaughtExceptionHandler(new KillOnExceptionHandler());
selector = Selector.open();
} |
| File |
|---|
| SelectorThread.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
runScheduleTasks();
try {
<<<<<<< HEAD
// TODO: JK: measure if the hell it is really needed to stop every
// 10ms and if wakeup is a bad idea
// Check the scheduleTasks queue at least once every 10ms
// In some cases, this might require skipping a call to select
// in some iteration, if handling the previous iteration took
// more than 10ms
=======
// FIXME: JK investigate if it is better to poll the tasks or to
// use event-driven approach. Now polling is made, the previous
// comment say it's better
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
int selectedCount = selector.select(10);
if (selectedCount > 0) { |
| Solution content |
|---|
runScheduleTasks();
try {
// FIXME: JK investigate if it is better to poll the tasks or to
// use event-driven approach. Now polling is made, the previous
// comment say it's better
int selectedCount = selector.select(10);
if (selectedCount > 0) { |
| File |
|---|
| SelectorThread.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
synchronized (taskLock) {
tasks.add(task);
// Do not wakeup the Selector thread by calling selector.wakeup().
<<<<<<< HEAD
// Doing so generates too much contention on the selector internal
// lock.
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
// Instead, the selector will periodically poll the array with tasks
// // selector.wakeup();
} |
| Solution content |
|---|
synchronized (taskLock) {
tasks.add(task);
// Do not wakeup the Selector thread by calling selector.wakeup().
// Instead, the selector will periodically poll the array with tasks
// // selector.wakeup();
} |
| File |
|---|
| SelectorThread.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
package lsr.paxos; <<<<<<< HEAD ======= import static lsr.common.ProcessDescriptor.processDescriptor; >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; |
| Solution content |
|---|
package lsr.paxos; import static lsr.common.ProcessDescriptor.processDescriptor; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
import java.util.logging.Level;
import java.util.logging.Logger;
<<<<<<< HEAD
import lsr.common.Dispatcher;
import lsr.common.ProcessDescriptor;
import lsr.common.ClientRequest;
import lsr.common.RequestId;
import lsr.paxos.core.Paxos;
import lsr.paxos.core.Proposer;
import lsr.paxos.statistics.QueueMonitor;
/**
* Thread responsible to receive and queue client requests and to prepare
* batches for proposing.
*
* The Selector thread calls the {@link #enqueueClientRequest(ClientRequest)}
=======
import lsr.common.MovingAverage;
import lsr.common.RequestType;
import lsr.common.SingleThreadDispatcher;
import lsr.paxos.core.Paxos;
import lsr.paxos.core.ProposerImpl;
import lsr.paxos.replica.ClientBatchID;
import lsr.paxos.replica.ClientRequestBatcher;
import lsr.paxos.replica.DecideCallback;
/**
* This batcher is only active on the leader replica. It receives requests or
* batch IDs, packs them and passes to Paxos for voting.
*/
/**
* Thread responsible to receive and queue client requests and to prepare
* batches for proposing.
*
* The Selector thread calls the {@link #enqueueClientRequest(ClientBatch)}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
* method when it reads a new request. This method places the request in an
* internal queue. An internal thread, called Batcher, reads from this queue and
* packs the request in batches, respecting the size limits, the maximum batch |
| Solution content |
|---|
import java.util.logging.Level;
import java.util.logging.Logger;
import lsr.common.MovingAverage;
import lsr.common.RequestType;
import lsr.common.SingleThreadDispatcher;
import lsr.paxos.core.Paxos;
import lsr.paxos.core.ProposerImpl;
import lsr.paxos.replica.ClientBatchID;
import lsr.paxos.replica.ClientRequestBatcher;
import lsr.paxos.replica.DecideCallback;
/**
* This batcher is only active on the leader replica. It receives requests or
* batch IDs, packs them and passes to Paxos for voting.
*/
/**
* Thread responsible to receive and queue client requests and to prepare
* batches for proposing.
*
* The Selector thread calls the {@link #enqueueClientRequest(ClientBatch)}
* method when it reads a new request. This method places the request in an
* internal queue. An internal thread, called Batcher, reads from this queue and
* packs the request in batches, respecting the size limits, the maximum batch |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
* delay and the number of available slots on the send window. When a new batch * is ready, the Batcher thread submits to the Dispatcher thread a task that * will initiate the proposal of the new batch. <<<<<<< HEAD * * A new batch is created only when there are slot available on the send window. * This ensures that in case of overload, the requests that cannot be served * immediately are kept on the internal queue and that the task queue of the * Dispatcher thread is not overloaded with batches that cannot be proposed * right away. * ======= * * A new batch is created only when there are slot available on the send window. * This ensures that in case of overload, the requests that cannot be served * immediately are kept on the internal queue and that the task queue of the * Dispatcher thread is not overloaded with batches that cannot be proposed * right away. * >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 * The internal request queue is also used to throttle the clients, by blocking * the selector thread whenever the request queue is full. * |
| Solution content |
|---|
* delay and the number of available slots on the send window. When a new batch * is ready, the Batcher thread submits to the Dispatcher thread a task that * will initiate the proposal of the new batch. * * A new batch is created only when there are slot available on the send window. * This ensures that in case of overload, the requests that cannot be served * immediately are kept on the internal queue and that the task queue of the * Dispatcher thread is not overloaded with batches that cannot be proposed * right away. * * The internal request queue is also used to throttle the clients, by blocking * the selector thread whenever the request queue is full. * |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 1 |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
*
* Proposer.proposeNext() - single point to start new proposals. Called
* whenever one of the conditions above may have become true. Tries to start
<<<<<<< HEAD
* as many instances as possible, ie, until either pendingProposals is empty
* or all the window slots are taken.
=======
* as many instances as possible, i.e., until either pendingProposals is
* empty or all the window slots are taken.
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
*
* proposeNext() is called in the following situations: - Protocol calls
* nextPropose() whenever it decides an instance. - Batcher adds a request |
| Solution content |
|---|
*
* Proposer.proposeNext() - single point to start new proposals. Called
* whenever one of the conditions above may have become true. Tries to start
* as many instances as possible, i.e., until either pendingProposals is
* empty or all the window slots are taken.
*
* proposeNext() is called in the following situations: - Protocol calls
* nextPropose() whenever it decides an instance. - Batcher adds a request |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
* called when the next consensus is decided and in the second case the
* propose task enqueued by the Batcher thread is still waiting to be
* executed.
<<<<<<< HEAD
*/
/**
* Stores client requests. Selector thread enqueues requests, Batcher thread
* dequeues.
*/
private final static int MAX_QUEUE_SIZE = 10 * 1024;
private final BlockingQueue |
| Solution content |
|---|
* called when the next consensus is decided and in the second case the
* propose task enqueued by the Batcher thread is still waiting to be
* executed.
*/
/**
* Stores client requests. Selector thread enqueues requests, Batcher thread
* dequeues.
*/
private final static int MAX_QUEUE_SIZE = 2 * 1024;
private final BlockingQueue |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Comment |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
*/
private volatile boolean suspended = true;
<<<<<<< HEAD
private final Dispatcher paxosDispatcher;
public ActiveBatcher(Paxos paxos) {
this.proposer = paxos.getProposer();
this.paxosDispatcher = paxos.getDispatcher();
this.maxBatchDelay = ProcessDescriptor.getInstance().maxBatchDelay;
this.maxBatchSize = ProcessDescriptor.getInstance().batchingLevel;
QueueMonitor.getInstance().registerQueue("BatcherQueue", queue);
=======
private final SingleThreadDispatcher paxosDispatcher;
public ActiveBatcher(Paxos paxos) {
this.proposer = (ProposerImpl) paxos.getProposer();
this.paxosDispatcher = paxos.getDispatcher();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
public void start() { |
| Solution content |
|---|
*/
private volatile boolean suspended = true;
private final SingleThreadDispatcher paxosDispatcher;
public ActiveBatcher(Paxos paxos) {
this.proposer = (ProposerImpl) paxos.getProposer();
this.paxosDispatcher = paxos.getDispatcher();
}
public void start() { |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Cast expression |
| Method invocation |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
* Called from the Client manager thread when it
*
* @param request
<<<<<<< HEAD
* @throws InterruptedException
*/
public boolean enqueueClientRequest(ClientRequest request) throws InterruptedException {
// This block is not atomic, so it may happen that suspended is false
// when
// the test below is done but becomes true before this thread has time
// to
// put the request in the queue. So some requests might stay in the
// queue between
// view changes and be re-proposed later. The request will be ignored,
// so it
// does not violate safety. And it should be rare. Avoiding this
// possibility
// would require a lock between suspended and put, which would slow down
// considerably the good case.
assert !request.equals(SENTINEL);
if (suspended) {
logger.warning("Enqueueing client request on follower!" + request);
return false;
}
// queue.put(request);
// This queue should never fill up, the RequestManager.pendingRequests
// queues will enforce flow control.
// Use add() to throw an exception if the queue fills up.
=======
* @throws NotLeaderException
* @throws InterruptedException
*/
public boolean enqueueClientRequest(RequestType request) {
/*
* This block is not atomic, so it may happen that suspended is false
* when the test below is done, but becomes true before this thread has
* time to put the request in the queue. So some requests might stay in
* the queue between view changes and be re-proposed later. The request
* will be ignored, so it does not violate safety. And it should be
* rare. Avoiding this possibility would require a lock between
* suspended and put, which would slow down considerably the good case.
*/
assert !SENTINEL.equals(request) : request + " " + SENTINEL;
if (suspended) {
logger.warning("Cannot enqueue proposal. Batcher is suspended.");
return false;
}
// This queue should never fill up, the RequestManager.pendingRequests
// queues will enforce flow control. Use add() instead of put() to throw
// an exception if the queue fills up.
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
queue.add(request);
return true;
} |
| Solution content |
|---|
* Called from the Client manager thread when it
*
* @param request
* @throws NotLeaderException
* @throws InterruptedException
*/
public boolean enqueueClientRequest(RequestType request) {
/*
* This block is not atomic, so it may happen that suspended is false
* when the test below is done, but becomes true before this thread has
* time to put the request in the queue. So some requests might stay in
* the queue between view changes and be re-proposed later. The request
* will be ignored, so it does not violate safety. And it should be
* rare. Avoiding this possibility would require a lock between
* suspended and put, which would slow down considerably the good case.
*/
assert !SENTINEL.equals(request) : request + " " + SENTINEL;
if (suspended) {
logger.warning("Cannot enqueue proposal. Batcher is suspended.");
return false;
}
// This queue should never fill up, the RequestManager.pendingRequests
// queues will enforce flow control. Use add() instead of put() to throw
// an exception if the queue fills up.
queue.add(request);
return true;
} |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Comment |
| If statement |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
* proposals until the size of the batch is known, it's possible to
* create a byte[] for the batch with the exact size, therefore avoiding
* the creation of a temporary buffer.
<<<<<<< HEAD
*/
ArrayList |
| Solution content |
|---|
* proposals until the size of the batch is known, it's possible to
* create a byte[] for the batch with the exact size, therefore avoiding
* the creation of a temporary buffer.
*/
ArrayList |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Try statement |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
// The header takes 4 bytes
int batchSize = BATCH_HEADER_SIZE;
<<<<<<< HEAD
if (request == null) {
request = queue.take();
if (request == SENTINEL) {
// batcher asked to stop, ignore this request
=======
RequestType request;
if (overflowRequest == null) {
// (possibly) wait for a new request
request = queue.take();
if (SENTINEL.equals(request)) {
// No longer being the leader. Abort this batch
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
if (logger.isLoggable(Level.FINE)) {
logger.fine("Discarding end of epoch marker.");
} |
| Solution content |
|---|
// The header takes 4 bytes
int batchSize = 4;
RequestType request;
if (overflowRequest == null) {
// (possibly) wait for a new request
request = queue.take();
if (SENTINEL.equals(request)) {
// No longer being the leader. Abort this batch
if (logger.isLoggable(Level.FINE)) {
logger.fine("Discarding end of epoch marker.");
} |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| If statement |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
averageRequestSize.add(request.byteSize());
batchSize += request.byteSize();
batchReqs.add(request);
<<<<<<< HEAD
request = null;
if (batchSize < maxBatchSize) {
// Deadline for sending this batch
long batchDeadline = System.currentTimeMillis() + maxBatchDelay;
if (logger.isLoggable(Level.FINE)) {
logger.fine("Starting batch.");
}
// Fill the batch
while (true) {
long maxWait = batchDeadline - System.currentTimeMillis();
// wait for additional requests until either the batch
// timeout expires or the batcher is suspended at least
// once.
request = queue.poll(maxWait, TimeUnit.MILLISECONDS);
if (request == null) {
if (logger.isLoggable(Level.FINE)) {
logger.fine("Batch timeout");
}
=======
// Deadline for sending this batch
long batchDeadline = System.currentTimeMillis() + processDescriptor.maxBatchDelay;
if (logger.isLoggable(Level.FINE)) {
logger.fine("Starting batch.");
}
// Fill the batch
while (true) {
if (batchSize >= processDescriptor.batchingLevel) {
// already full, let's break.
if (logger.isLoggable(Level.FINE)) {
logger.fine("Batch full");
}
break;
}
if (batchSize + (averageRequestSize.get() / 2) >= processDescriptor.batchingLevel) {
// small chance to fit the next request.
if (queue.isEmpty()) {
if (logger.isLoggable(Level.FINER)) {
logger.finer("Predicting that next request won't fit. Left with " +
(batchSize - processDescriptor.batchingLevel) +
"bytes, estimated request size:" +
averageRequestSize.get());
}
break;
}
}
long maxWait = batchDeadline - System.currentTimeMillis();
// wait for additional requests until either the batch
// timeout expires or the batcher is suspended at least
// once.
request = queue.poll(maxWait, TimeUnit.MILLISECONDS);
if (request == null) {
if (decideCallback != null &&
decideCallback.hasDecidedNotExecutedOverflow()) {
batchDeadline = System.currentTimeMillis() +
Math.max(processDescriptor.maxBatchDelay,
PRELONGED_BATCHING_TIME);;
logger.info("Prelonging batching in ActiveBatcher");
continue;
} else {
if (logger.isLoggable(Level.FINE)) {
logger.fine("Batch timeout");
}
break;
}
} else if (SENTINEL.equals(request)) {
if (logger.isLoggable(Level.FINE)) {
logger.fine("Discarding end of epoch marker and partial batch.");
}
break;
} else {
if (batchSize + request.byteSize() > processDescriptor.batchingLevel) {
// Can't include it in the current batch, as it
// would exceed size limit.
// Save it for the next batch.
overflowRequest = request;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
break;
} else if (request == SENTINEL) {
// batcher asked to stop, ignore this request and |
| Solution content |
|---|
averageRequestSize.add(request.byteSize());
batchSize += request.byteSize();
batchReqs.add(request);
// Deadline for sending this batch
long batchDeadline = System.currentTimeMillis() + processDescriptor.maxBatchDelay;
if (logger.isLoggable(Level.FINE)) {
logger.fine("Starting batch.");
}
// Fill the batch
while (true) {
if (batchSize >= processDescriptor.batchingLevel) {
// already full, let's break.
if (logger.isLoggable(Level.FINE)) {
logger.fine("Batch full");
}
break;
}
if (batchSize + (averageRequestSize.get() / 2) >= processDescriptor.batchingLevel) {
// small chance to fit the next request.
if (queue.isEmpty()) {
if (logger.isLoggable(Level.FINER)) {
logger.finer("Predicting that next request won't fit. Left with " +
(batchSize - processDescriptor.batchingLevel) +
"bytes, estimated request size:" +
averageRequestSize.get());
}
break;
}
}
long maxWait = batchDeadline - System.currentTimeMillis();
// wait for additional requests until either the batch
// timeout expires or the batcher is suspended at least
// once.
request = queue.poll(maxWait, TimeUnit.MILLISECONDS);
if (request == null) {
if (decideCallback != null &&
decideCallback.hasDecidedNotExecutedOverflow()) {
batchDeadline = System.currentTimeMillis() +
Math.max(processDescriptor.maxBatchDelay,
PRELONGED_BATCHING_TIME);;
logger.info("Prelonging batching in ActiveBatcher");
continue;
} else {
if (logger.isLoggable(Level.FINE)) {
logger.fine("Batch timeout");
}
break;
}
} else if (SENTINEL.equals(request)) {
if (logger.isLoggable(Level.FINE)) {
logger.fine("Discarding end of epoch marker and partial batch.");
}
break;
} else {
if (batchSize + request.byteSize() > processDescriptor.batchingLevel) {
// Can't include it in the current batch, as it
// would exceed size limit.
// Save it for the next batch.
overflowRequest = request;
break; |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Break statement |
| Comment |
| If statement |
| Method invocation |
| Variable |
| While statement |
| Chunk |
|---|
| Conflicting content |
|---|
request = null;
continue mainloop;
} else {
<<<<<<< HEAD
if (batchSize + request.byteSize() > maxBatchSize) {
// Can't include it in the current batch, as it
// would exceed size limit.
// Save it for the next batch.
break;
} else {
batchSize += request.byteSize();
batchReqs.add(request);
request = null;
if (batchSize == maxBatchSize)
break;
}
}
}
}
=======
averageRequestSize.add(request.byteSize());
batchSize += request.byteSize();
batchReqs.add(request);
}
}
}
// Lost leadership, drop the batch.
if (SENTINEL.equals(request)) {
continue;
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
// Serialize the batch
ByteBuffer bb = ByteBuffer.allocate(batchSize); |
| Solution content |
|---|
} else {
averageRequestSize.add(request.byteSize());
batchSize += request.byteSize();
batchReqs.add(request);
}
}
}
// Lost leadership, drop the batch.
if (SENTINEL.equals(request)) {
continue;
}
// Serialize the batch
ByteBuffer bb = ByteBuffer.allocate(batchSize); |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| If statement |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
// Serialize the batch
ByteBuffer bb = ByteBuffer.allocate(batchSize);
bb.putInt(batchReqs.size());
<<<<<<< HEAD
for (ClientRequest req : batchReqs) {
=======
for (RequestType req : batchReqs) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
req.writeTo(bb);
}
byte[] value = bb.array(); |
| Solution content |
|---|
// Serialize the batch
ByteBuffer bb = ByteBuffer.allocate(batchSize);
bb.putInt(batchReqs.size());
for (RequestType req : batchReqs) {
req.writeTo(bb);
}
byte[] value = bb.array(); |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| For statement |
| Chunk |
|---|
| Conflicting content |
|---|
<<<<<<< HEAD
}
byte[] value = bb.array();
// FIXME: ignore Nuno's logging and make something in order to
// get rid of redundant bricks
// Must also pass an array with the request so that the
// dispatcher thread
// has enough information for logging the batch
ClientRequest[] requests = batchReqs.toArray(new ClientRequest[batchReqs.size()]);
if (logger.isLoggable(Level.FINE)) {
logger.fine("Batch ready. Number of requests: " + requests.length +
", queued reqs: " + queue.size());
}
// Can block if the Proposer internal propose queue is full
proposer.enqueueProposal(requests, value);
=======
if (logger.isLoggable(Level.FINE)) {
logger.fine("Batch ready. Number of requests: " + batchReqs.size() +
", queued reqs: " + queue.size());
}
// Can block if the Proposer internal propose queue is full
proposer.enqueueProposal(value);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
if (logger.isLoggable(Level.FINE)) {
logger.fine("Batch dispatched.");
} |
| Solution content |
|---|
}
byte[] value = bb.array();
if (logger.isLoggable(Level.FINE)) {
logger.fine("Batch ready. Number of requests: " + batchReqs.size() +
", queued reqs: " + queue.size());
}
// Can block if the Proposer internal propose queue is full
proposer.enqueueProposal(value);
if (logger.isLoggable(Level.FINE)) {
logger.fine("Batch dispatched.");
} |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| If statement |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
} catch (InterruptedException ex) {
logger.warning("Thread dying: " + ex.getMessage());
}
<<<<<<< HEAD
}
/**
* Stops the batcher from creating new batches. Called when the process is
* demoted
=======
logger.warning("Thread dying");
throw new RuntimeException("Escaped an ever-lasting loop. should-never-hapen");
}
/**
* Stops the batcher from creating new batches. Called when the process
* stops being a leader
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
*/
public void suspendBatcher() {
assert paxosDispatcher.amIInDispatcher(); |
| Solution content |
|---|
} catch (InterruptedException ex) {
logger.warning("Thread dying: " + ex.getMessage());
}
logger.warning("Thread dying");
throw new RuntimeException("Escaped an ever-lasting loop. should-never-hapen");
}
/**
* Stops the batcher from creating new batches. Called when the process
* stops being a leader
*/
public void suspendBatcher() {
assert paxosDispatcher.amIInDispatcher(); |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Throw statement |
| Chunk |
|---|
| Conflicting content |
|---|
if (suspended) {
// Can happen when the leader advances view before finishing
// preparing.
<<<<<<< HEAD
// Batcher is started only when the
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
return;
}
if (logger.isLoggable(Level.INFO)) { |
| Solution content |
|---|
if (suspended) {
// Can happen when the leader advances view before finishing
// preparing.
return;
}
if (logger.isLoggable(Level.INFO)) { |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
if (logger.isLoggable(Level.INFO)) {
logger.info("Suspend batcher. Discarding " + queue.size() + " queued requests.");
}
<<<<<<< HEAD
// volatile, ensures that no request are put in the queue after
// this line is executed
=======
// volatile, but does not ensure that no request are put in the queue
// after this line is executed; to discard the requests sentinel is used
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
suspended = true;
queue.clear();
try { |
| Solution content |
|---|
if (logger.isLoggable(Level.INFO)) {
logger.info("Suspend batcher. Discarding " + queue.size() + " queued requests.");
}
// volatile, but does not ensure that no request are put in the queue
// after this line is executed; to discard the requests sentinel is used
suspended = true;
queue.clear();
try { |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
try {
queue.put(SENTINEL);
} catch (InterruptedException e) {
<<<<<<< HEAD
throw new RuntimeException("Interrupted by adding to just emptied batcher queue.");
=======
throw new RuntimeException("should-never-happen", e);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
}
|
| Solution content |
|---|
try {
queue.put(SENTINEL);
} catch (InterruptedException e) {
throw new RuntimeException("should-never-happen", e);
}
}
|
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Throw statement |
| Chunk |
|---|
| Conflicting content |
|---|
}
/** Restarts the batcher, giving it an initial window size. */
<<<<<<< HEAD
public void resumeBatcher(int currentWndSize) {
=======
public void resumeBatcher() {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
assert paxosDispatcher.amIInDispatcher();
assert suspended;
logger.info("Resuming batcher."); |
| Solution content |
|---|
}
/** Restarts the batcher, giving it an initial window size. */
public void resumeBatcher() {
assert paxosDispatcher.amIInDispatcher();
assert suspended;
logger.info("Resuming batcher."); |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
suspended = false;
}
<<<<<<< HEAD
private final static Logger logger =
Logger.getLogger(ActiveBatcher.class.getCanonicalName());
=======
public void setDecideCallback(DecideCallback decideCallback) {
this.decideCallback = decideCallback;
}
private final static Logger logger =
Logger.getLogger(ActiveBatcher.class.getCanonicalName());
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
} |
| Solution content |
|---|
suspended = false;
}
public void setDecideCallback(DecideCallback decideCallback) {
this.decideCallback = decideCallback;
}
private final static Logger logger =
Logger.getLogger(ActiveBatcher.class.getCanonicalName());
} |
| File |
|---|
| ActiveBatcher.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method declaration |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
private final Thread thread;
private int view;
<<<<<<< HEAD
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
/** Follower role: reception time of the last heartbeat from the leader */
private volatile long lastHeartbeatRcvdTS;
/** Leader role: time when the last message or heartbeat was sent to all */ |
| Solution content |
|---|
private final Thread thread;
private int view;
/** Follower role: reception time of the last heartbeat from the leader */
private volatile long lastHeartbeatRcvdTS;
/** Leader role: time when the last message or heartbeat was sent to all */ |
| File |
|---|
| ActiveFailureDetector.java |
| Developer's decision |
|---|
| Version 1 |
| Kind of conflict |
|---|
| Blank |
| Chunk |
|---|
| Conflicting content |
|---|
this.fdListener = fdListener;
this.network = network;
this.storage = storage;
<<<<<<< HEAD
this.suspectTimeout = processDescriptor.fdSuspectTimeout;
this.sendTimeout = processDescriptor.fdSendTimeout;
this.thread = new Thread(this, "FailureDetector");
this.innerListener = new InnerMessageHandler();
=======
suspectTimeout = processDescriptor.fdSuspectTimeout;
sendTimeout = processDescriptor.fdSendTimeout;
thread = new Thread(this, "FailureDetector");
thread.setDaemon(true);
innerListener = new InnerMessageHandler();
storage.addViewChangeListener(viewCahngeListener);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
/** |
| Solution content |
|---|
this.fdListener = fdListener;
this.network = network;
this.storage = storage;
suspectTimeout = processDescriptor.fdSuspectTimeout;
sendTimeout = processDescriptor.fdSendTimeout;
thread = new Thread(this, "FailureDetector");
thread.setDaemon(true);
innerListener = new InnerMessageHandler();
storage.addViewChangeListener(viewCahngeListener);
}
/** |
| File |
|---|
| ActiveFailureDetector.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
*/
protected Storage.ViewChangeListener viewCahngeListener = new Storage.ViewChangeListener() {
<<<<<<< HEAD
=======
public void viewChanged(int newView, int newLeader) {
synchronized (ActiveFailureDetector.this) {
logger.fine("FD has been informed about view " + newView);
view = newView;
ActiveFailureDetector.this.notify();
}
}
};
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
public void run() {
logger.info("Starting failure detector");
try { |
| Solution content |
|---|
*/
protected Storage.ViewChangeListener viewCahngeListener = new Storage.ViewChangeListener() {
public void viewChanged(int newView, int newLeader) {
synchronized (ActiveFailureDetector.this) {
logger.fine("FD has been informed about view " + newView);
view = newView;
ActiveFailureDetector.this.notify();
}
}
};
public void run() {
logger.info("Starting failure detector");
try { |
| File |
|---|
| ActiveFailureDetector.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method declaration |
| Chunk |
|---|
| Conflicting content |
|---|
if (processDescriptor.isLocalProcessLeader(view)) {
// Send
Alive alive = new Alive(view, storage.getLog().getNextId());
<<<<<<< HEAD
network.sendToAllButMe(alive);
=======
network.sendToOthers(alive);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
lastHeartbeatSentTS = now;
long nextSend = lastHeartbeatSentTS + sendTimeout;
|
| Solution content |
|---|
if (processDescriptor.isLocalProcessLeader(view)) {
// Send
Alive alive = new Alive(view, storage.getLog().getNextId());
network.sendToOthers(alive);
lastHeartbeatSentTS = now;
long nextSend = lastHeartbeatSentTS + sendTimeout;
|
| File |
|---|
| ActiveFailureDetector.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
// monitor, thereby unlocking this thread.
int oldView = view;
while (oldView == view) {
<<<<<<< HEAD
=======
logger.fine("FD is waiting for view change from " + oldView);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
wait();
}
logger.fine("FD now knows about new view"); |
| Solution content |
|---|
// monitor, thereby unlocking this thread.
int oldView = view;
while (oldView == view) {
logger.fine("FD is waiting for view change from " + oldView);
wait();
}
logger.fine("FD now knows about new view"); |
| File |
|---|
| ActiveFailureDetector.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
final class InnerMessageHandler implements MessageHandler {
public void onMessageReceived(Message message, int sender) {
<<<<<<< HEAD
if (!processDescriptor.isLocalProcessLeader(view) &&
sender == processDescriptor.getLeaderOfView(view)) {
=======
// followers only.
if (processDescriptor.isLocalProcessLeader(view))
return;
// Use the message as heartbeat if the local process is
// a follower and the sender is the leader of the current view
if (sender == processDescriptor.getLeaderOfView(view)) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
lastHeartbeatRcvdTS = getTime();
}
} |
| Solution content |
|---|
final class InnerMessageHandler implements MessageHandler {
public void onMessageReceived(Message message, int sender) {
// followers only.
if (processDescriptor.isLocalProcessLeader(view))
return;
// Use the message as heartbeat if the local process is
// a follower and the sender is the leader of the current view
if (sender == processDescriptor.getLeaderOfView(view)) {
lastHeartbeatRcvdTS = getTime();
}
} |
| File |
|---|
| ActiveFailureDetector.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| If statement |
| Chunk |
|---|
| Conflicting content |
|---|
}
public void onMessageSent(Message message, BitSet destinations) {
<<<<<<< HEAD
=======
// leader only.
if (!processDescriptor.isLocalProcessLeader(view))
return;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
// Ignore Alive messages, the clock was already reset when the
// message was sent.
if (message.getType() == MessageType.Alive) { |
| Solution content |
|---|
return;
}
public void onMessageSent(Message message, BitSet destinations) {
// leader only.
if (!processDescriptor.isLocalProcessLeader(view))
// Ignore Alive messages, the clock was already reset when the
// message was sent.
if (message.getType() == MessageType.Alive) { |
| File |
|---|
| ActiveFailureDetector.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| If statement |
| Chunk |
|---|
| Conflicting content |
|---|
return;
if (destinations.cardinality() < processDescriptor.numReplicas - 1) {
}
<<<<<<< HEAD
// This process just sent a message to all.
// If leader, update the lastHeartbeatSentTS
if (processDescriptor.isLocalProcessLeader(view)) {
lastHeartbeatSentTS = getTime();
}
=======
// Check if comment above is true
assert !destinations.get(processDescriptor.localId) : message;
// This process just sent a message to all. Reset the timeout.
lastHeartbeatSentTS = getTime();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
}
|
| Solution content |
|---|
if (destinations.cardinality() < processDescriptor.numReplicas - 1) {
return;
}
// Check if comment above is true
assert !destinations.get(processDescriptor.localId) : message;
// This process just sent a message to all. Reset the timeout.
lastHeartbeatSentTS = getTime();
}
}
|
| File |
|---|
| ActiveFailureDetector.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Attribute |
| Comment |
| If statement |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
}
static long getTime() {
<<<<<<< HEAD
// TODO: JK: out of pure interests: why nanotime?
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
// return System.currentTimeMillis();
return System.nanoTime() / 1000000;
} |
| Solution content |
|---|
}
static long getTime() {
// return System.currentTimeMillis();
return System.nanoTime() / 1000000;
} |
| File |
|---|
| ActiveFailureDetector.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
import lsr.common.MovingAverage; import lsr.paxos.messages.Message; import lsr.paxos.network.Network; <<<<<<< HEAD import lsr.paxos.statistics.ReplicaStats; ======= >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 /** * Manages retransmissions of messages using a dedicated thread and a delay |
| Solution content |
|---|
import lsr.common.MovingAverage; import lsr.paxos.messages.Message; import lsr.paxos.network.Network; /** * Manages retransmissions of messages using a dedicated thread and a delay |
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
* and re-enqueues for further retransmission.
*/
public final class ActiveRetransmitter implements Runnable, Retransmitter {
<<<<<<< HEAD
private final static BitSet ALL_BUT_ME = ALL_BUT_ME_initializer();
private static BitSet ALL_BUT_ME_initializer() {
BitSet bs = new BitSet(ProcessDescriptor.getInstance().numReplicas);
bs.set(0, ProcessDescriptor.getInstance().numReplicas);
bs.clear(ProcessDescriptor.getInstance().localId);
return bs;
}
private final Network network;
private final DelayQueue |
| Solution content |
|---|
* and re-enqueues for further retransmission.
*/
public final class ActiveRetransmitter implements Runnable, Retransmitter {
private final Network network;
private final String name;
private Thread thread; |
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method declaration |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
private Thread thread;
<<<<<<< HEAD
volatile boolean stopThread = false;
=======
private final DelayQueue |
| Solution content |
|---|
private Thread thread;
private final DelayQueue |
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
this.name = name;
}
<<<<<<< HEAD
public synchronized void init() {
thread = new Thread(this, name);
thread.start();
=======
@Override
public void init() {
thread = new Thread(this, name);
thread.start();
}
@Override
public void close() {
stopAll();
thread.interrupt();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
/** |
| Solution content |
|---|
this.name = name;
}
@Override
public void init() {
thread = new Thread(this, name);
thread.start();
}
@Override
public void close() {
stopAll();
thread.interrupt();
}
/** |
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Annotation |
| Attribute |
| Method declaration |
| Method invocation |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
* @return the handler used to control retransmitting message
*/
public RetransmittedMessage startTransmitting(Message message) {
<<<<<<< HEAD
// destination gets cloned in the InnerRetransmittedMessage, so using
// here a constant is correct
return startTransmitting(message, ALL_BUT_ME);
}
public RetransmittedMessage startTransmitting(Message message, BitSet destinations) {
return startTransmitting(message, destinations, -1);
=======
// no need to clone ALL_BUT_ME - the constructor down there does this
return startTransmitting(message, Network.OTHERS);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
/** |
| Solution content |
|---|
* @return the handler used to control retransmitting message
*/
public RetransmittedMessage startTransmitting(Message message) {
// no need to clone ALL_BUT_ME - the constructor down there does this
return startTransmitting(message, Network.OTHERS);
}
/** |
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Method signature |
| Return statement |
| Chunk |
|---|
| Conflicting content |
|---|
* this method.
* @return the handler used to control retransmitting message
*/
<<<<<<< HEAD
public RetransmittedMessage startTransmitting(Message message, BitSet destinations, int cid) {
InnerRetransmittedMessage handler = new InnerRetransmittedMessage(message, destinations,
cid);
// First attempt is done directly by the dispatcher thread. Therefore,
// in the normal
// case, there is no additional context switch to send a message.
// retransmit() will enqueue the message for additional retransmission.
=======
public RetransmittedMessage startTransmitting(Message message, BitSet destinations) {
InnerRetransmittedMessage handler = new InnerRetransmittedMessage(message, destinations);
/*
* First attempt is done directly by the dispatcher thread. Therefore,
* in the normal case, there is no additional context switch to send a
* message. retransmit() will enqueue the message for additional
* retransmission.
*/
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
handler.retransmit();
return handler;
} |
| Solution content |
|---|
* this method.
* @return the handler used to control retransmitting message
*/
public RetransmittedMessage startTransmitting(Message message, BitSet destinations) {
InnerRetransmittedMessage handler = new InnerRetransmittedMessage(message, destinations);
/*
* First attempt is done directly by the dispatcher thread. Therefore,
* in the normal case, there is no additional context switch to send a
* message. retransmit() will enqueue the message for additional
* retransmission.
*/
handler.retransmit();
return handler;
} |
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Method signature |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
@Override
public void run() {
<<<<<<< HEAD
logger.info("ActiveRetransmitter " + name + " starting");
try {
while (!Thread.interrupted()) {
// The message might be canceled between take() returns and
// retrasmit() is called.
// To avoid retransmitting canceled messages, the
// RetransMessage.stop() sets a
// canceled flag to false, which is checked before
// retransmission
=======
logger.info("ActiveRetransmitter '" + name + "' starting");
try {
while (!Thread.interrupted()) {
/*
* The message might be canceled between take() returns and
* retrasmit() is called. To avoid retransmitting canceled
* messages, the RetransMessage.stop() sets a canceled flag to
* false, which is checked before retransmission
*/
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
InnerRetransmittedMessage rMsg = queue.take();
rMsg.retransmit();
} |
| Solution content |
|---|
public void run() {
logger.info("ActiveRetransmitter '" + name + "' starting");
try {
while (!Thread.interrupted()) {
/*
* The message might be canceled between take() returns and
* retrasmit() is called. To avoid retransmitting canceled
* messages, the RetransMessage.stop() sets a canceled flag to
* false, which is checked before retransmission
*/
InnerRetransmittedMessage rMsg = queue.take();
rMsg.retransmit();
} |
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Try statement |
| While statement |
| Chunk |
|---|
| Conflicting content |
|---|
rMsg.retransmit();
}
} catch (InterruptedException e) {
<<<<<<< HEAD
if (stopThread) {
stopThread = false;
logger.info("Closing retransmitter thread: " + name);
} else
logger.warning("Thread dying: " + e.getMessage());
=======
logger.warning("ActiveRetransmitter '" + name + "' closing: " + e.getMessage());
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
}
|
| Solution content |
|---|
rMsg.retransmit();
}
} catch (InterruptedException e) {
logger.warning("ActiveRetransmitter '" + name + "' closing: " + e.getMessage());
}
}
|
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| If statement |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
/** The time the task is enabled to execute in milliseconds */
private volatile long time = -1;
private boolean cancelled = false;
<<<<<<< HEAD
private final int cid;
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
InnerRetransmittedMessage(Message message, BitSet destinations) {
this.message = message; |
| Solution content |
|---|
/** The time the task is enabled to execute in milliseconds */
private volatile long time = -1;
private boolean cancelled = false;
InnerRetransmittedMessage(Message message, BitSet destinations) {
this.message = message; |
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Chunk |
|---|
| Conflicting content |
|---|
// -----------------------------------------
// RetransmittedMessage interface implementation
// -----------------------------------------
<<<<<<< HEAD
=======
public synchronized void start(int destination) {
destinations.set(destination);
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
public synchronized void stop(int destination) {
this.destinations.clear(destination);
if (this.destinations.isEmpty()) { |
| Solution content |
|---|
// -----------------------------------------
// RetransmittedMessage interface implementation
// -----------------------------------------
public synchronized void start(int destination) {
destinations.set(destination);
}
public synchronized void stop(int destination) {
this.destinations.clear(destination);
if (this.destinations.isEmpty()) { |
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method declaration |
| Chunk |
|---|
| Conflicting content |
|---|
cancelled = true;
}
<<<<<<< HEAD
public synchronized void start(int destination) {
destinations.set(destination);
}
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
// -----------------------------------------
// Delayed interface implementation
// ----------------------------------------- |
| Solution content |
|---|
cancelled = true;
}
// -----------------------------------------
// Delayed interface implementation
// ----------------------------------------- |
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method declaration |
| Chunk |
|---|
| Conflicting content |
|---|
long diff = time - x.time;
if (diff < 0)
return -1;
<<<<<<< HEAD
else if (diff > 0)
return 1;
else if (this.hashCode() < other.hashCode())
return 1;
else
return -1;
}
// Release the lock
// TODO: JK: what lock?
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
int res = (d == 0) ? 0 : ((d < 0) ? -1 : 1);
logger.severe("Investigate!!! two different objects return 0 in compareTo function");
return res;
=======
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d < 0) ? -1 : 1;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
// ////////////////////////////////////// |
| Solution content |
|---|
long diff = time - x.time;
if (diff < 0)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d < 0) ? -1 : 1;
}
// ////////////////////////////////////// |
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| If statement |
| Method invocation |
| Return statement |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
logger.warning("Trying to retransmit a cancelled message");
return;
}
<<<<<<< HEAD
if (cid != -1) {
ReplicaStats.getInstance().retransmit(cid);
}
// Can be called either by Dispatcher (first time message is sent)
// or by Retransmitter thread (retransmissions)
sendTs = System.currentTimeMillis();
// Due to the lock on "this", destinations does not change while
// this method
// is called.
network.sendMessage(message, destinations);
// Schedule the next attempt
// Impose a lower bound on retransmission frequency to prevent
// excessive retransmission
time = sendTs + Math.max((int) (ma.get() * 3), 5000);
if (logger.isLoggable(Level.FINE)) {
logger.fine("Resending in: " + getDelay(TimeUnit.MILLISECONDS) + " to " +
destinations);
=======
sendTs = System.currentTimeMillis();
network.sendMessage(message, destinations);
// Schedule the next attempt
time = sendTs + Math.max((int) (ma.get() * 3), MIN_RETRANSMIT_TIME);
if (logger.isLoggable(Level.FINER)) {
logger.finer("Resending in: " + getDelay(TimeUnit.MILLISECONDS));
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
queue.offer(this);
} |
| Solution content |
|---|
logger.warning("Trying to retransmit a cancelled message");
return;
}
sendTs = System.currentTimeMillis();
network.sendMessage(message, destinations);
// Schedule the next attempt
time = sendTs + Math.max((int) (ma.get() * 3), MIN_RETRANSMIT_TIME);
if (logger.isLoggable(Level.FINER)) {
logger.finer("Resending in: " + getDelay(TimeUnit.MILLISECONDS));
}
queue.offer(this);
} |
| File |
|---|
| ActiveRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Comment |
| If statement |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
package lsr.paxos; <<<<<<< HEAD import java.util.Arrays; ======= import static lsr.common.ProcessDescriptor.processDescriptor; >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import java.util.BitSet; import java.util.logging.Level; import java.util.logging.Logger; |
| Solution content |
|---|
package lsr.paxos; import static lsr.common.ProcessDescriptor.processDescriptor; import java.util.BitSet; |
| File |
|---|
| EpochPrepareRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
import lsr.paxos.storage.Storage;
public class EpochPrepareRetransmitter implements PrepareRetransmitter {
<<<<<<< HEAD
private final Retransmitter retransmitter;
=======
private final ActiveRetransmitter retransmitter;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
private RetransmittedMessage prepareRetransmitter;
// keeps epochs of received prepareOk messages.
private long[] prepareEpoch; |
| Solution content |
|---|
import lsr.paxos.storage.Storage;
public class EpochPrepareRetransmitter implements PrepareRetransmitter {
private final ActiveRetransmitter retransmitter;
private RetransmittedMessage prepareRetransmitter;
// keeps epochs of received prepareOk messages.
private long[] prepareEpoch; |
| File |
|---|
| EpochPrepareRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Chunk |
|---|
| Conflicting content |
|---|
}
public void update(PrepareOK message, int sender) {
<<<<<<< HEAD
if (logger.isLoggable(Level.FINE)) {
logger.fine("Updating from " + Arrays.toString(message.getEpoch()) +
" (current answers: " + Arrays.toString(prepareEpoch) + ")" +
" knowledge: " + Arrays.toString(storage.getEpoch()));
}
=======
if (sender == processDescriptor.localId) {
prepareEpoch[sender] = storage.getEpoch()[sender];
prepared.set(sender);
return;
}
// update storage - storage has greatest seen epochs.
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
storage.updateEpoch(message.getEpoch());
// Mark that we got prepareOk; overwrite received epoch only if we got |
| Solution content |
|---|
}
public void update(PrepareOK message, int sender) {
if (sender == processDescriptor.localId) {
prepareEpoch[sender] = storage.getEpoch()[sender];
prepared.set(sender);
return;
}
// update storage - storage has greatest seen epochs.
storage.updateEpoch(message.getEpoch());
// Mark that we got prepareOk; overwrite received epoch only if we got |
| File |
|---|
| EpochPrepareRetransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| If statement |
| Chunk |
|---|
| Conflicting content |
|---|
public interface FailureDetector {
public interface FailureDetectorListener {
<<<<<<< HEAD
public void suspect(int process);
=======
/**
* The failure detector suspected the leader of the current view
*
* The FD will call this method ONCE a view.
*
* @param view The view whose leader was suspected.
*/
public void suspect(int view);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
public void start(int initialView); |
| Solution content |
|---|
public interface FailureDetector {
public interface FailureDetectorListener {
/**
* The failure detector suspected the leader of the current view
*
* The FD will call this method ONCE a view.
*
* @param view The view whose leader was suspected.
*/
public void suspect(int view);
}
public void start(int initialView); |
| File |
|---|
| FailureDetector.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method interface |
| Chunk |
|---|
| Conflicting content |
|---|
public void start(int initialView);
public void stop();
<<<<<<< HEAD
public void viewChange(int newView);
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
} |
| Solution content |
|---|
public void start(int initialView);
public void stop();
} |
| File |
|---|
| FailureDetector.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method interface |
| Chunk |
|---|
| Conflicting content |
|---|
* using the |
| Solution content |
|---|
* using the |
| File |
|---|
| PrepareRetransmitterImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Chunk |
|---|
| Conflicting content |
|---|
*/
void init();
=======
public interface Retransmitter {
/**
<<<<<<< HEAD
* Starts the process of handling retransmission.Messages are sent only if
* the retransmitter has been started.
* Starts the process of handling retransmission. Messages are sent only if
* the retransmitter has been started.
*/
void init();
/**
* Stops retransmitting all messages.
*/
void stopAll();
/**
* Disables retransmitter, opposite to #init().
*/
void close();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
/**
* Starts retransmitting specified message to all processes except local |
| Solution content |
|---|
public interface Retransmitter {
/**
* Starts the process of handling retransmission. Messages are sent only if
* the retransmitter has been started.
*/
void init();
/**
* Stops retransmitting all messages.
*/
void stopAll();
/**
* Disables retransmitter, opposite to #init().
*/
void close();
/**
* Starts retransmitting specified message to all processes except local |
| File |
|---|
| Retransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method interface |
| Chunk |
|---|
| Conflicting content |
|---|
* @return the handler used to control retransmitting message
*/
RetransmittedMessage startTransmitting(Message message, BitSet destinations);
<<<<<<< HEAD
/**
* @param id TODO: JK: what is this Why is this here?
*/
RetransmittedMessage startTransmitting(Message message, BitSet destinations, int id);
/**
* Stops retransmitting all messages.
*/
void stopAll();
/**
* Disables retransmitter, opposite to #init().
*/
void close();
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
} |
| Solution content |
|---|
* @return the handler used to control retransmitting message
*/
RetransmittedMessage startTransmitting(Message message, BitSet destinations);
} |
| File |
|---|
| Retransmitter.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method interface |
| Chunk |
|---|
| Conflicting content |
|---|
double sizeRatio = logByteSize / snapshotByteSizeEstimate.get();
if (!askedForSnapshot) {
<<<<<<< HEAD
if (sizeRatio < ProcessDescriptor.getInstance().snapshotAskRatio) {
=======
if (sizeRatio < processDescriptor.snapshotAskRatio) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
return;
}
|
| Solution content |
|---|
double sizeRatio = logByteSize / snapshotByteSizeEstimate.get();
if (!askedForSnapshot) {
if (sizeRatio < processDescriptor.snapshotAskRatio) {
return;
}
|
| File |
|---|
| SnapshotMaintainer.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| If statement |
| Chunk |
|---|
| Conflicting content |
|---|
}
if (!forcedSnapshot) {
<<<<<<< HEAD
if (sizeRatio < ProcessDescriptor.getInstance().snapshotForceRatio) {
=======
if (sizeRatio < processDescriptor.snapshotForceRatio) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
return;
}
|
| Solution content |
|---|
}
if (!forcedSnapshot) {
if (sizeRatio < processDescriptor.snapshotForceRatio) {
return;
}
|
| File |
|---|
| SnapshotMaintainer.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| If statement |
| Chunk |
|---|
| Conflicting content |
|---|
import lsr.common.PID; import lsr.common.Reply; import lsr.common.RequestId; <<<<<<< HEAD import lsr.common.ClientCommand.CommandType; import lsr.paxos.ReplicationException; import lsr.paxos.statistics.ClientStats; ======= >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 /** * Class represents TCP connection to replica. It should be used by clients, to |
| Solution content |
|---|
import lsr.common.PID; import lsr.common.Reply; import lsr.common.RequestId; /** * Class represents TCP connection to replica. It should be used by clients, to |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
* client is bound to.
*/
public class Client {
<<<<<<< HEAD
/*
* Minimum time to wait before reconnecting after a connection failure
* (connection reset or refused). Must be large enough to allow the system
* to elect a new leader.
*/
private static final int CONNECTION_FAILURE_TIMEOUT = 500;
/*
* Minimum time to wait before reconnecting to a new replica after receiving
* a redirect
*/
private static final int REDIRECT_TIMEOUT = 100;
/*
* How long to wait for an answer from the replica before connecting to
* another replica.
*
* Should be long enough for the replicas to suspect a failed replica and to
* elect a new leader.
*/
private static final int SOCKET_TIMEOUT = 1000;
// Connection timeout management - exponential moving average with upper
// bound on max timeout. Timeout == TO_MULTIPLIER*average
private static final int TO_MULTIPLIER = 3;
private static final int MAX_TIMEOUT = 10000;
private static final Random r = new Random();
public static final String BENCHMARK_RUN_CLIENT = "BenchmarkRunClient";
public static final boolean DEFAULT_BENCHMARK_RUN_CLIENT = false;
public final boolean benchmarkRun;
private final MovingAverage timeoutAverage = new MovingAverage(0.2, SOCKET_TIMEOUT);
private int timeout;
// List of replicas, and information who's the leader
private final List |
| Solution content |
|---|
* client is bound to.
*/
public class Client {
// TODO: JK: export tunable client parameters to a configuration file / as
// constructor parameter
/*
* Minimum time to wait before reconnecting after a connection failure
* (connection reset or refused).
*/
private static final int CONNECTION_FAILURE_TIMEOUT = 500;
/* Timeout == TO_MULTIPLIER*average */
private static final int TO_MULTIPLIER = 3;
/*
* Initial time to wait for an answer from the replica before connecting to
* another replica.
*/
private static final int INITIAL_TIMEOUT = 3000 / TO_MULTIPLIER;
/* Maximum time to wait for an answer from the replica before reconnect */
private static final int MAX_TIMEOUT = 10000;
private final MovingAverage average = new MovingAverage(0.2, INITIAL_TIMEOUT);
private int timeout;
// Constants exchanged with replica to manage client ID's
public static final char HAVE_CLIENT_ID = 'F';
public static final char REQUEST_NEW_ID = 'T';
// List of replicas, and information who's the leader
private final List |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Comment |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
*/
public Client(Configuration config) throws IOException {
this.replicas = config.getProcesses();
<<<<<<< HEAD
this.n = replicas.size();
/*
* Randomize replica for initial connection. This avoids the thundering
* herd problem when many clients are started simultaneously and all
* connect to the same replicas.
*/
primary = r.nextInt(n);
this.benchmarkRun = config.getBooleanProperty(BENCHMARK_RUN_CLIENT,
DEFAULT_BENCHMARK_RUN_CLIENT);
=======
contactReplicaId = null;
}
/**
* Creates new connection used by client to connect to replicas.
*
* Unless a redirect is received, uses ONLY the replica whose ID is declared
* as contactReplicaId.
*
* @param config - the configuration with information about replicas to
* connect to
* @throws IOException if I/O error occurs while reading configuration
*/
public Client(Configuration config, int contactReplicaId) {
this.contactReplicaId = contactReplicaId;
this.replicas = config.getProcesses();
}
/**
* Creates new connection used by client to connect to replicas.
*
* Loads the configuration from the default configuration file, as defined
* in the class {@link Configuration}
*
*
* Unless a redirect is received, uses ONLY the replica whose ID is declared
* as contactReplicaId.
*
* @param config - the configuration with information about replicas to
* connect to
* @throws IOException if I/O error occurs while reading configuration
*/
public Client(int contactReplicaId) throws IOException {
this.replicas = new Configuration().getProcesses();
this.contactReplicaId = contactReplicaId;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
/** |
| Solution content |
|---|
*/
public Client(Configuration config) throws IOException {
this.replicas = config.getProcesses();
contactReplicaId = null;
}
/**
* Creates new connection used by client to connect to replicas.
*
* Unless a redirect is received, uses ONLY the replica whose ID is declared
* as contactReplicaId.
*
* @param config - the configuration with information about replicas to
* connect to
* @throws IOException if I/O error occurs while reading configuration
*/
public Client(Configuration config, int contactReplicaId) {
this.contactReplicaId = contactReplicaId;
this.replicas = config.getProcesses();
}
/**
* Creates new connection used by client to connect to replicas.
*
* Loads the configuration from the default configuration file, as defined
* in the class {@link Configuration}
*
*
* Unless a redirect is received, uses ONLY the replica whose ID is declared
* as contactReplicaId.
*
* @param config - the configuration with information about replicas to
* connect to
* @throws IOException if I/O error occurs while reading configuration
*/
public Client(int contactReplicaId) throws IOException {
this.replicas = new Configuration().getProcesses();
this.contactReplicaId = contactReplicaId;
}
/** |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Comment |
| Method declaration |
| Method invocation |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
public synchronized byte[] execute(byte[] bytes) throws ReplicationException {
ClientRequest request = new ClientRequest(nextRequestId(), bytes);
ClientCommand command = new ClientCommand(CommandType.REQUEST, request);
<<<<<<< HEAD
=======
ByteBuffer bb = ByteBuffer.allocate(command.byteSize());
command.writeTo(bb);
bb.flip();
byte[] requestBA = bb.array();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
long start = System.currentTimeMillis();
while (true) { |
| Solution content |
|---|
public synchronized byte[] execute(byte[] bytes) throws ReplicationException {
ClientRequest request = new ClientRequest(nextRequestId(), bytes);
ClientCommand command = new ClientCommand(CommandType.REQUEST, request);
ByteBuffer bb = ByteBuffer.allocate(command.byteSize());
command.writeTo(bb);
bb.flip();
byte[] requestBA = bb.array();
long start = System.currentTimeMillis();
while (true) { |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
logger.fine("Sending " + request.getRequestId());
}
<<<<<<< HEAD
ByteBuffer bb = ByteBuffer.allocate(command.byteSize());
command.writeTo(bb);
// TODO: JK: why flip? Is this needed?
bb.flip();
output.write(bb.array());
output.flush();
// Blocks only for Socket.SO_TIMEOUT
stats.requestSent(request.getRequestId());
=======
output.write(requestBA);
output.flush();
// Blocks only for socket timeout
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
ClientReply clientReply = new ClientReply(input);
switch (clientReply.getResult()) { |
| Solution content |
|---|
logger.fine("Sending " + request.getRequestId());
}
output.write(requestBA);
output.flush();
// Blocks only for socket timeout
ClientReply clientReply = new ClientReply(input);
switch (clientReply.getResult()) { |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
reply.getRequestId();
long time = System.currentTimeMillis() - start;
<<<<<<< HEAD
stats.replyOk(reply.getRequestId());
timeoutAverage.add(time);
timeout = (int) timeoutAverage.get() * TO_MULTIPLIER;
socket.setSoTimeout(Math.min(timeout, MAX_TIMEOUT));
=======
average.add(time);
// update socket timeout as 3 times average response
// time
updateTimeout();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
return reply.getValue();
case REDIRECT: |
| Solution content |
|---|
reply.getRequestId();
long time = System.currentTimeMillis() - start;
average.add(time);
// update socket timeout as 3 times average response
// time
updateTimeout();
return reply.getValue();
case REDIRECT: |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Cast expression |
| Comment |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
// Invalid ID. Ignore redirect and try next replica.
logger.warning("Reply: Invalid redirect received: " + currentPrimary +
". Proceeding with next replica.");
<<<<<<< HEAD
currentPrimary = (primary + 1) % n;
=======
currentPrimary = nextReplica();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
} else {
logger.info("Reply REDIRECT to " + currentPrimary);
} |
| Solution content |
|---|
// Invalid ID. Ignore redirect and try next replica.
logger.warning("Reply: Invalid redirect received: " + currentPrimary +
". Proceeding with next replica.");
currentPrimary = nextReplica();
} else {
logger.info("Reply REDIRECT to " + currentPrimary);
} |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
} else {
logger.info("Reply REDIRECT to " + currentPrimary);
}
<<<<<<< HEAD
waitForReconnect(REDIRECT_TIMEOUT);
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
reconnect(currentPrimary);
break;
|
| Solution content |
|---|
} else {
logger.info("Reply REDIRECT to " + currentPrimary);
}
reconnect(currentPrimary);
break;
|
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
case NACK:
throw new ReplicationException("Nack received: " +
new String(clientReply.getValue()));
<<<<<<< HEAD
case BUSY:
stats.replyBusy();
throw new ReplicationException(new String(clientReply.getValue()));
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
default:
throw new RuntimeException("Unknown reply type"); |
| Solution content |
|---|
case NACK:
throw new ReplicationException("Nack received: " +
new String(clientReply.getValue()));
default:
throw new RuntimeException("Unknown reply type"); |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Case statement |
| Method invocation |
| Throw statement |
| Chunk |
|---|
| Conflicting content |
|---|
} catch (SocketTimeoutException e) {
logger.warning("Error waiting for answer: " + e.getMessage() + ", Request: " +
<<<<<<< HEAD
request.getRequestId() + ", node: " + primary);
stats.replyTimeout();
=======
request.getRequestId());
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
cleanClose();
increaseTimeout();
connect(); |
| Solution content |
|---|
} catch (SocketTimeoutException e) {
logger.warning("Error waiting for answer: " + e.getMessage() + ", Request: " +
request.getRequestId());
cleanClose();
increaseTimeout();
connect(); |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
connect();
} catch (IOException e) {
logger.warning("Error reading socket: " + e.toString() + ". Request: " +
<<<<<<< HEAD
request.getRequestId() + ", node: " + primary);
=======
request.getRequestId());
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
waitForReconnect(CONNECTION_FAILURE_TIMEOUT);
connect();
} |
| Solution content |
|---|
connect();
} catch (IOException e) {
logger.warning("Error reading socket: " + e.toString() + ". Request: " +
request.getRequestId());
waitForReconnect(CONNECTION_FAILURE_TIMEOUT);
connect();
} |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
* client id is granted which will be used for sending all messages.
*/
public synchronized void connect() {
<<<<<<< HEAD
reconnect((primary + 1) % n);
}
private RequestId nextRequestId() {
return new RequestId(clientId, ++sequenceId);
}
private void increaseTimeout() {
timeoutAverage.add(timeout * TO_MULTIPLIER);
if (timeoutAverage.get() <= 0 || timeoutAverage.get() > 600000) {
timeoutAverage.reset(600000);
}
=======
reconnect(nextReplica());
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
/** |
| Solution content |
|---|
*/
* client id is granted which will be used for sending all messages.
public synchronized void connect() {
reconnect(nextReplica());
}
/** |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| If statement |
| Method declaration |
| Method invocation |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
return;
} catch (IOException e) {
cleanClose();
<<<<<<< HEAD
logger.warning("Connect to " + nextNode + " failed: " + e.getMessage());
nextNode = (nextNode + 1) % n;
waitForReconnect(CONNECTION_FAILURE_TIMEOUT);
}
}
}
private void waitForReconnect(int timeout) {
try {
// random backoff
timeout += r.nextInt(500);
logger.warning("Reconnecting in " + timeout + "ms.");
Thread.sleep(timeout);
} catch (InterruptedException e) {
logger.warning("Interrupted while sleeping: " + e.getMessage());
// Set the interrupt flag again, it will result in an
// InterruptException being thrown again the next time this thread
// tries to block.
Thread.currentThread().interrupt();
}
}
private void cleanClose() {
try {
if (socket != null) {
socket.shutdownOutput();
socket.close();
socket = null;
logger.info("Closing socket");
=======
logger.warning("Connect to " + nr + " failed: " + e.getMessage());
waitForReconnect(CONNECTION_FAILURE_TIMEOUT);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
}
} |
| Solution content |
|---|
return;
} catch (IOException e) {
cleanClose();
logger.warning("Connect to " + nr + " failed: " + e.getMessage());
waitForReconnect(CONNECTION_FAILURE_TIMEOUT);
}
}
} |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| If statement |
| Method declaration |
| Method invocation |
| Method signature |
| Try statement |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
cleanClose();
PID replica = replicas.get(replicaId);
<<<<<<< HEAD
String host = replica.getHostname();
int port = replica.getClientPort();
logger.info("Connecting to " + host + ":" + port);
socket = new Socket(host, port);
timeout = (int) timeoutAverage.get() * TO_MULTIPLIER;
socket.setSoTimeout(Math.min(timeout, MAX_TIMEOUT));
=======
String host = replica.getHostname();
int port = replica.getClientPort();
logger.info("Connecting to " + replica);
socket = new Socket(host, port);
updateTimeout();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
socket.setReuseAddress(true);
socket.setTcpNoDelay(true);
output = new DataOutputStream(socket.getOutputStream()); |
| Solution content |
|---|
cleanClose();
PID replica = replicas.get(replicaId);
String host = replica.getHostname();
int port = replica.getClientPort();
logger.info("Connecting to " + replica);
socket = new Socket(host, port);
updateTimeout();
socket.setReuseAddress(true);
socket.setTcpNoDelay(true);
output = new DataOutputStream(socket.getOutputStream()); |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Cast expression |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
output.write(REQUEST_NEW_ID);
output.flush();
clientId = input.readLong();
<<<<<<< HEAD
this.stats = benchmarkRun ? new ClientStats.ClientStatsImpl(clientId)
: new ClientStats.ClientStatsNull();
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
logger.fine("New client id: " + clientId);
} else {
output.write(HAVE_CLIENT_ID); |
| Solution content |
|---|
output.write(REQUEST_NEW_ID);
output.flush();
clientId = input.readLong();
logger.fine("New client id: " + clientId);
} else {
output.write(HAVE_CLIENT_ID); |
| File |
|---|
| Client.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
super();
}
<<<<<<< HEAD
public SerializableClient(Configuration config) throws IOException {
super(config);
}
=======
public SerializableClient(Configuration conf) throws IOException {
super(conf);
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
/**
* Creates new connection used by client to connect to replicas.
* |
| Solution content |
|---|
super();
}
public SerializableClient(Configuration conf) throws IOException {
super(conf);
}
/**
* Creates new connection used by client to connect to replicas.
* |
| File |
|---|
| SerializableClient.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method declaration |
| Chunk |
|---|
| Conflicting content |
|---|
package lsr.paxos.core; <<<<<<< HEAD:src/lsr/paxos/core/Learner.java ======= import static lsr.common.ProcessDescriptor.processDescriptor; >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/Learner.java import java.util.logging.Level; import java.util.logging.Logger; |
| Solution content |
|---|
package lsr.paxos.core; import static lsr.common.ProcessDescriptor.processDescriptor; import java.util.logging.Level; import java.util.logging.Logger; |
| File |
|---|
| Learner.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
proposer.stopPropose(instance.getId(), sender);
}
<<<<<<< HEAD:src/lsr/paxos/core/Learner.java
if (instance.acceptedByMajority()) {
=======
if (instance.isMajority()) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/Learner.java
if (instance.getValue() == null) {
if (logger.isLoggable(Level.FINE)) {
logger.fine("Majority but no value. Delaying deciding. Instance: " + |
| Solution content |
|---|
proposer.stopPropose(instance.getId(), sender);
}
if (instance.isMajority()) {
if (instance.getValue() == null) {
if (logger.isLoggable(Level.FINE)) {
logger.fine("Majority but no value. Delaying deciding. Instance: " + |
| File |
|---|
| Learner.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| If statement |
| Chunk |
|---|
| Conflicting content |
|---|
package lsr.paxos.core;
<<<<<<< HEAD:src/lsr/paxos/core/Proposer.java
import lsr.common.ClientRequest;
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/Proposer.java
import lsr.paxos.messages.PrepareOK;
public interface Proposer { |
| Solution content |
|---|
package lsr.paxos.core;
import lsr.paxos.messages.PrepareOK;
public interface Proposer { |
| File |
|---|
| Proposer.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
public void onPrepareOK(PrepareOK msg, int sender);
<<<<<<< HEAD:src/lsr/paxos/core/Proposer.java
public void propose(ClientRequest[] requests, byte[] value);
=======
public void propose(byte[] value);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/Proposer.java
public void prepareNextView();
|
| Solution content |
|---|
public void onPrepareOK(PrepareOK msg, int sender);
public void propose(byte[] value);
public void prepareNextView();
|
| File |
|---|
| Proposer.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method interface |
| Chunk |
|---|
| Conflicting content |
|---|
package lsr.paxos.core; <<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java ======= import static lsr.common.ProcessDescriptor.processDescriptor; >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java import java.nio.ByteBuffer; import java.util.ArrayDeque; |
| Solution content |
|---|
package lsr.paxos.core; import static lsr.common.ProcessDescriptor.processDescriptor; import java.nio.ByteBuffer; import java.util.ArrayDeque; |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
import java.util.logging.Level; import java.util.logging.Logger; <<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java import lsr.common.ProcessDescriptor; import lsr.common.ClientRequest; import lsr.paxos.ActiveRetransmitter; import lsr.paxos.EpochPrepareRetransmitter; import lsr.paxos.FailureDetector; import lsr.paxos.PrepareRetransmitter; import lsr.paxos.PrepareRetransmitterImpl; import lsr.paxos.RetransmittedMessage; import lsr.paxos.Retransmitter; ======= import lsr.common.CrashModel; import lsr.paxos.ActiveRetransmitter; import lsr.paxos.EpochPrepareRetransmitter; import lsr.paxos.PrepareRetransmitter; import lsr.paxos.PrepareRetransmitterImpl; import lsr.paxos.RetransmittedMessage; >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java import lsr.paxos.messages.Message; import lsr.paxos.messages.Prepare; import lsr.paxos.messages.PrepareOK; |
| Solution content |
|---|
import java.util.logging.Level; import java.util.logging.Logger; import lsr.common.CrashModel; import lsr.paxos.ActiveRetransmitter; import lsr.paxos.EpochPrepareRetransmitter; import lsr.paxos.PrepareRetransmitter; import lsr.paxos.PrepareRetransmitterImpl; import lsr.paxos.RetransmittedMessage; import lsr.paxos.messages.Message; import lsr.paxos.messages.Prepare; import lsr.paxos.messages.PrepareOK; |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
private final ActiveRetransmitter retransmitter;
private final Paxos paxos;
private final Storage storage;
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
private final FailureDetector failureDetector;
private ProposerState state;
=======
private ProposerState state;
private ClientBatchManager cliBatchManager;
/** Locked on the array, modifies the int inside. */
private final int[] waitingHooks = new int[] {0};
private final ArrayList |
| Solution content |
|---|
private final ActiveRetransmitter retransmitter;
private final Paxos paxos;
private final Storage storage;
private ProposerState state;
private ClientBatchManager cliBatchManager;
/** Locked on the array, modifies the int inside. */
private final int[] waitingHooks = new int[] {0};
private final ArrayList |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Comment |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
*/
public ProposerImpl(Paxos paxos,
Network network,
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
FailureDetector failureDetector,
Storage storage,
CrashModel crashModel) {
this.paxos = paxos;
this.failureDetector = failureDetector;
this.storage = storage;
=======
Storage storage,
CrashModel crashModel)
{
this.paxos = paxos;
this.storage = storage;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
retransmitter = new ActiveRetransmitter(network, "ProposerRetransmitter");
this.state = ProposerState.INACTIVE; |
| Solution content |
|---|
*/
public ProposerImpl(Paxos paxos,
Network network,
Storage storage,
CrashModel crashModel)
{
this.paxos = paxos;
this.storage = storage;
retransmitter = new ActiveRetransmitter(network, "ProposerRetransmitter");
this.state = ProposerState.INACTIVE; |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
this.state = ProposerState.INACTIVE;
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
Retransmitter retransmitter = new ActiveRetransmitter(network, "PrepareRetransmitter");
retransmitter.init();
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
if (crashModel == CrashModel.EpochSS) {
prepareRetransmitter = new EpochPrepareRetransmitter(retransmitter, storage);
} else { |
| Solution content |
|---|
this.state = ProposerState.INACTIVE;
if (crashModel == CrashModel.EpochSS) {
prepareRetransmitter = new EpochPrepareRetransmitter(retransmitter, storage);
} else { |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
} else {
prepareRetransmitter = new PrepareRetransmitterImpl(retransmitter);
}
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
QueueMonitor.getInstance().registerQueue("pendingProposals", pendingProposals);
}
public void start() {
=======
}
public void setClientRequestManager(ClientRequestManager requestManager) {
cliBatchManager = requestManager.getClientBatchManager();
}
public void start() {
assert !processDescriptor.indirectConsensus || cliBatchManager != null;
retransmitter.init();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
}
/** |
| Solution content |
|---|
} else {
prepareRetransmitter = new PrepareRetransmitterImpl(retransmitter);
}
}
public void setClientRequestManager(ClientRequestManager requestManager) {
cliBatchManager = requestManager.getClientBatchManager();
}
public void start() {
assert !processDescriptor.indirectConsensus || cliBatchManager != null;
retransmitter.init();
}
/** |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Method declaration |
| Method invocation |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
state = ProposerState.PREPARING;
setNextViewNumber();
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
failureDetector.viewChange(storage.getView());
Prepare prepare = new Prepare(storage.getView(), storage.getFirstUncommitted());
logger.warning("Preparing view: " + storage.getView());
BitSet acceptors = storage.getAcceptors();
acceptors.clear(ProcessDescriptor.processDescriptor.localId);
prepareRetransmitter.startTransmitting(prepare, acceptors);
onPrepareOK(new PrepareOK(storage.getView(), new ConsensusInstance[0], storage.getEpoch()),
ProcessDescriptor.processDescriptor.localId);
=======
logger.warning("Preparing view: " + storage.getView());
Prepare prepare = new Prepare(storage.getView(), storage.getFirstUncommitted());
prepareRetransmitter.startTransmitting(prepare, Network.OTHERS);
if (processDescriptor.indirectConsensus)
fetchLocalMissingBatches();
// tell that local process is already prepared
prepareRetransmitter.update(null, processDescriptor.localId);
// unlikely, unless N==1
if (prepareRetransmitter.isMajority()) {
onMajorityOfPrepareOK();
}
}
private void fetchLocalMissingBatches() {
for (ConsensusInstance instane : storage.getLog().getInstanceMap().tailMap(
storage.getFirstUncommitted()).values()) {
if (instane.getState() == LogEntryState.KNOWN &&
!ClientBatchStore.instance.hasAllBatches(instane.getClientBatchIds())) {
waitingHooks[0]++;
FwdBatchRetransmitter fbr = cliBatchManager.fetchMissingBatches(
instane.getClientBatchIds(),
new ClientBatchManager.Hook() {
public void hook() {
synchronized (waitingHooks) {
if (Thread.interrupted())
return;
waitingHooks[0]--;
waitingHooks.notifyAll();
}
}
}, true);
waitingFBRs.add(fbr);
}
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
}
private void setNextViewNumber() { |
| Solution content |
|---|
state = ProposerState.PREPARING;
setNextViewNumber();
logger.warning("Preparing view: " + storage.getView());
Prepare prepare = new Prepare(storage.getView(), storage.getFirstUncommitted());
prepareRetransmitter.startTransmitting(prepare, Network.OTHERS);
if (processDescriptor.indirectConsensus)
fetchLocalMissingBatches();
// tell that local process is already prepared
prepareRetransmitter.update(null, processDescriptor.localId);
// unlikely, unless N==1
if (prepareRetransmitter.isMajority()) {
onMajorityOfPrepareOK();
}
}
private void fetchLocalMissingBatches() {
for (ConsensusInstance instane : storage.getLog().getInstanceMap().tailMap(
storage.getFirstUncommitted()).values()) {
if (instane.getState() == LogEntryState.KNOWN &&
!ClientBatchStore.instance.hasAllBatches(instane.getClientBatchIds())) {
waitingHooks[0]++;
FwdBatchRetransmitter fbr = cliBatchManager.fetchMissingBatches(
instane.getClientBatchIds(),
new ClientBatchManager.Hook() {
public void hook() {
synchronized (waitingHooks) {
if (Thread.interrupted())
return;
waitingHooks[0]--;
waitingHooks.notifyAll();
}
}
}, true);
waitingFBRs.add(fbr);
}
}
}
private void setNextViewNumber() { |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| For statement |
| If statement |
| Method invocation |
| Method signature |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
// asserting the same again. Who knows what happens in between?
assert message.getView() == storage.getView() : "Received a PrepareOK for a higher or lower view. " +
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
"Msg.view: " +
message.getView() +
", view: " + storage.getView();
=======
"Msg.view: " + message.getView() +
", view: " + storage.getView();
logger.info("Received from " + sender + ": " + message);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
// Ignore prepareOK messages if we have finished preparing
if (state == ProposerState.PREPARED) { |
| Solution content |
|---|
// asserting the same again. Who knows what happens in between?
assert message.getView() == storage.getView() : "Received a PrepareOK for a higher or lower view. " +
"Msg.view: " + message.getView() +
", view: " + storage.getView();
logger.info("Received from " + sender + ": " + message);
// Ignore prepareOK messages if we have finished preparing
if (state == ProposerState.PREPARED) { |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
if (state == ProposerState.PREPARED) {
if (logger.isLoggable(Level.FINE)) {
logger.fine("View " + storage.getView() +
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
" already prepared. Ignoring PrepareOK from " + sender);
=======
" already prepared. Ignoring message.");
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
}
return;
} |
| Solution content |
|---|
if (state == ProposerState.PREPARED) {
if (logger.isLoggable(Level.FINE)) {
logger.fine("View " + storage.getView() +
" already prepared. Ignoring message.");
}
return;
} |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
}
}
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
private void stopPreparingStartProposing() {
prepareRetransmitter.stopAndDestroy();
retransmitter.init();
=======
private void onMajorityOfPrepareOK() {
prepareRetransmitter.stop();
logger.info("Majority of PrepareOK gathered. Waiting for " + waitingHooks[0] +
" missing batch values");
long timeout = System.currentTimeMillis() + processDescriptor.maxBatchFetchingTimeoutMs;
// wait for all batch values to arrive
synchronized (waitingHooks) {
while (waitingHooks[0] > 0)
try {
long timeLeft = timeout - System.currentTimeMillis();
if (timeLeft <= 0) {
logger.warning("Could not fetch batch values - restarting view change");
for (FwdBatchRetransmitter fbr : waitingFBRs)
cliBatchManager.removeTask(fbr);
waitingFBRs.clear();
waitingHooks[0] = 0;
prepareNextView();
return;
}
waitingHooks.wait(timeLeft);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
waitingFBRs.clear();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
state = ProposerState.PREPARED;
logger.warning("View prepared " + storage.getView()); |
| Solution content |
|---|
}
}
private void onMajorityOfPrepareOK() {
prepareRetransmitter.stop();
logger.info("Majority of PrepareOK gathered. Waiting for " + waitingHooks[0] +
" missing batch values");
long timeout = System.currentTimeMillis() + processDescriptor.maxBatchFetchingTimeoutMs;
// wait for all batch values to arrive
synchronized (waitingHooks) {
while (waitingHooks[0] > 0)
try {
long timeLeft = timeout - System.currentTimeMillis();
if (timeLeft <= 0) {
logger.warning("Could not fetch batch values - restarting view change");
for (FwdBatchRetransmitter fbr : waitingFBRs)
cliBatchManager.removeTask(fbr);
waitingFBRs.clear();
waitingHooks[0] = 0;
prepareNextView();
return;
}
waitingHooks.wait(timeLeft);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
waitingFBRs.clear();
state = ProposerState.PREPARED;
logger.warning("View prepared " + storage.getView()); |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Method signature |
| Synchronized statement |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
break;
case KNOWN:
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
// No decision, but some process already accepted it.
=======
// No decision, but some value is known
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
logger.info("Proposing value from previous view: " + instance);
instance.setView(storage.getView());
continueProposal(instance); |
| Solution content |
|---|
break;
case KNOWN:
// No decision, but some value is known
logger.info("Proposing value from previous view: " + instance);
instance.setView(storage.getView());
continueProposal(instance); |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
}
private void fillWithNoOperation(ConsensusInstance instance) {
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
ByteBuffer bb = ByteBuffer.allocate(4 + ClientRequest.NOP.byteSize());
bb.putInt(1); // Size of batch
ClientRequest.NOP.writeTo(bb); // request
instance.setValue(storage.getView(), bb.array());
=======
ByteBuffer bb = ByteBuffer.allocate(4 + ClientBatchID.NOP.byteSize());
bb.putInt(1); // Size of batch
ClientBatchID.NOP.writeTo(bb); // request
instance.updateStateFromKnown(storage.getView(), bb.array());
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
continueProposal(instance);
}
|
| Solution content |
|---|
}
private void fillWithNoOperation(ConsensusInstance instance) {
ByteBuffer bb = ByteBuffer.allocate(4 + ClientBatchID.NOP.byteSize());
bb.putInt(1); // Size of batch
ClientBatchID.NOP.writeTo(bb); // request
instance.updateStateFromKnown(storage.getView(), bb.array());
continueProposal(instance);
}
|
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
if (message.getPrepared() == null) {
return;
}
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
// logger.warning(message.toString());
// Update the local log with the data sent by this process
for (int i = 0; i < message.getPrepared().length; i++) {
ConsensusInstance ci = message.getPrepared()[i];
// logger.warning(ci.toString());
=======
// Update the local log with the data sent by this process
for (final ConsensusInstance ci : message.getPrepared()) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
// Algorithm: The received instance can be either
// Decided - Set the local log entry to decided.
// Accepted - If the local log entry is decided, ignore. |
| Solution content |
|---|
if (message.getPrepared() == null) {
return;
}
// Update the local log with the data sent by this process
for (final ConsensusInstance ci : message.getPrepared()) {
// Algorithm: The received instance can be either
// Decided - Set the local log entry to decided.
// Accepted - If the local log entry is decided, ignore. |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Array access |
| Comment |
| For statement |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
}
final class Proposal implements Runnable {
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
final ClientRequest[] requests;
final byte[] value;
public Proposal(ClientRequest[] requests, byte[] value) {
this.requests = requests;
=======
final byte[] value;
public Proposal(byte[] value) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
this.value = value;
}
|
| Solution content |
|---|
}
final class Proposal implements Runnable {
final byte[] value;
public Proposal(byte[] value) {
this.value = value;
}
|
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
=======
*/
private boolean acceptNewBatches = false;
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
@Override
public void enqueueProposal(ClientRequest[] requests, byte[] value)
throws InterruptedException {
// Called from batcher thread
Proposal proposal = new Proposal(requests, value);
public void enqueueProposal(byte[] value)
throws InterruptedException
{
// Called from batcher thread
Proposal proposal = new Proposal(value);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
if (logger.isLoggable(Level.FINE)) {
logger.fine("pendingProposals.size() = " + pendingProposals.size() + ", MAX: " +
MAX_QUEUED_PROPOSALS); |
| Solution content |
|---|
*/
private boolean acceptNewBatches = false;
public void enqueueProposal(byte[] value)
throws InterruptedException
{
// Called from batcher thread
Proposal proposal = new Proposal(value);
if (logger.isLoggable(Level.FINE)) {
logger.fine("pendingProposals.size() = " + pendingProposals.size() + ", MAX: " +
MAX_QUEUED_PROPOSALS); |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Annotation |
| Comment |
| Method invocation |
| Method signature |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
logger.fine("Proposer not active.");
return;
}
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
// Do we have to "wake up" the Protocol thread? Remember flag to do
// it after releasing the lock
wasEmpty = pendingProposals.isEmpty();
pendingProposals.addLast(proposal);
}
if (wasEmpty) {
paxos.getDispatcher().dispatch(proposal);
=======
boolean wasEmpty = pendingProposals.isEmpty();
pendingProposals.addLast(proposal);
if (wasEmpty) {
logger.info("Scheduling proposal task");
try {
paxos.getDispatcher().submit(proposal);
} catch (Exception e) {
e.printStackTrace();
}
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
}
if (logger.isLoggable(Level.FINE)) {
logger.fine("Enqueued proposal. pendingProposal.size(): " + pendingProposals.size()); |
| Solution content |
|---|
logger.fine("Proposer not active.");
return;
}
boolean wasEmpty = pendingProposals.isEmpty();
pendingProposals.addLast(proposal);
if (wasEmpty) {
logger.info("Scheduling proposal task");
try {
paxos.getDispatcher().submit(proposal);
} catch (Exception e) {
e.printStackTrace();
}
}
}
if (logger.isLoggable(Level.FINE)) {
logger.fine("Enqueued proposal. pendingProposal.size(): " + pendingProposals.size()); |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| If statement |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
}
}
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
private void proposeNext() {
Proposal proposal;
=======
public void proposeNext() {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
if (logger.isLoggable(Level.FINE)) {
logger.info("Proposing. pendingProposals.size(): " + pendingProposals.size() +
", window used: " + storage.getWindowUsed()); |
| Solution content |
|---|
}
}
public void proposeNext() {
if (logger.isLoggable(Level.FINE)) {
logger.info("Proposing. pendingProposals.size(): " + pendingProposals.size() +
", window used: " + storage.getWindowUsed()); |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method signature |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
* @param value - the value to propose
* @throws InterruptedException
*/
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
public void propose(ClientRequest[] requests, byte[] value) {
assert paxos.getDispatcher().amIInDispatcher();
if (state != ProposerState.PREPARED) {
// This can happen if there is a Propose event queued on the
// Dispatcher when the view changes.
=======
public void propose(byte[] value) {
assert paxos.getDispatcher().amIInDispatcher();
if (state != ProposerState.PREPARED) {
/*
* This can happen if there is a Propose event queued on the
* Dispatcher when the view changes.
*/
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
logger.warning("Cannot propose in INACTIVE or PREPARING state. Discarding batch");
return;
} |
| Solution content |
|---|
* @param value - the value to propose
* @throws InterruptedException
*/
public void propose(byte[] value) {
assert paxos.getDispatcher().amIInDispatcher();
if (state != ProposerState.PREPARED) {
/*
* This can happen if there is a Propose event queued on the
* Dispatcher when the view changes.
*/
logger.warning("Cannot propose in INACTIVE or PREPARING state. Discarding batch");
return;
} |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Comment |
| If statement |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
if (logger.isLoggable(Level.INFO)) {
/** Builds the string with the log message */
StringBuilder sb = new StringBuilder(64);
sb.append("Proposing: ").append(storage.getLog().getNextId()).append(", Reqs:");
for (ClientRequest req : requests) {
sb.append(req.getRequestId().toString()).append(",");
}
sb.append(" Size:").append(value.length);
sb.append(", k=").append(requests.length);
=======
sb.append("Proposing: ").append(storage.getLog().getNextId()).append(", Size:");
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
logger.info(sb.toString());
}
|
| Solution content |
|---|
if (logger.isLoggable(Level.INFO)) {
/** Builds the string with the log message */
StringBuilder sb = new StringBuilder(64);
sb.append("Proposing: ").append(storage.getLog().getNextId()).append(", Size:");
logger.info(sb.toString());
}
|
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| For statement |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
ConsensusInstance instance = storage.getLog().append(storage.getView(), value);
<<<<<<< HEAD:src/lsr/paxos/core/ProposerImpl.java
ReplicaStats.getInstance().consensusStart(
instance.getId(),
value.length,
requests.length,
storage.getWindowUsed());
// creating retransmitter, which automatically starts
// sending propose message to all acceptors
Message message = new Propose(instance);
BitSet destinations = storage.getAcceptors();
=======
assert !processDescriptor.indirectConsensus ||
ClientBatchStore.instance.hasAllBatches(instance.getClientBatchIds());
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/core/ProposerImpl.java
// Mark the instance as accepted locally
instance.getAccepts().set(processDescriptor.localId); |
| Solution content |
|---|
ConsensusInstance instance = storage.getLog().append(storage.getView(), value);
assert !processDescriptor.indirectConsensus ||
ClientBatchStore.instance.hasAllBatches(instance.getClientBatchIds());
// Mark the instance as accepted locally
instance.getAccepts().set(processDescriptor.localId); |
| File |
|---|
| ProposerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Comment |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
CatchUpResponse,
CatchUpSnapshot,
<<<<<<< HEAD
Ping,
Pong,
Start,
Report,
ForwardedRequest,
ViewPrepared,
=======
ForwardedClientBatch,
AskForClientBatch,
ForwardedClientRequests,
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
// Special markers used by the network implementation to raise callbacks
// There are no classes with this messages types |
| Solution content |
|---|
CatchUpResponse,
CatchUpSnapshot,
ForwardedClientBatch,
AskForClientBatch,
ForwardedClientRequests,
// Special markers used by the network implementation to raise callbacks
// There are no classes with this messages types |
| File |
|---|
| MessageType.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Enum value |
| Chunk |
|---|
| Conflicting content |
|---|
import java.util.BitSet;
import java.util.logging.Logger;
<<<<<<< HEAD
import lsr.common.ProcessDescriptor;
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
import lsr.paxos.messages.Message;
public class GenericNetwork extends Network { |
| Solution content |
|---|
import java.util.BitSet;
import java.util.logging.Logger;
import lsr.paxos.messages.Message;
public class GenericNetwork extends Network { |
| File |
|---|
| GenericNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
private final TcpNetwork tcpNetwork;
public GenericNetwork(TcpNetwork tcpNetwork, UdpNetwork udpNetwork) {
<<<<<<< HEAD
super();
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
this.tcpNetwork = tcpNetwork;
this.udpNetwork = udpNetwork;
|
| Solution content |
|---|
private final TcpNetwork tcpNetwork;
public GenericNetwork(TcpNetwork tcpNetwork, UdpNetwork udpNetwork) {
this.tcpNetwork = tcpNetwork;
this.udpNetwork = udpNetwork; |
| File |
|---|
| GenericNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
}
// we using internal methods in networks, so listeners has to be handled
<<<<<<< HEAD
public void sendMessage(Message message, BitSet destinations) {
assert !destinations.isEmpty() : "Sending a message to noone";
int localId = ProcessDescriptor.processDescriptor.localId;
BitSet dests = (BitSet) destinations.clone();
if (dests.get(localId)) {
fireReceiveMessage(message, localId);
dests.clear(localId);
}
// serialize message to discover its size
byte[] data = message.toByteArray();
// send message using UDP or TCP
if (data.length < ProcessDescriptor.processDescriptor.maxUdpPacketSize) {
=======
public void send(Message message, BitSet destinations) {
// send message using UDP or TCP
if (message.byteSize() < processDescriptor.maxUdpPacketSize) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
// packet small enough to send using UDP
udpNetwork.sendMessage(message, destinations);
} else { |
| Solution content |
|---|
}
// we using internal methods in networks, so listeners has to be handled
public void send(Message message, BitSet destinations) {
// send message using UDP or TCP
if (message.byteSize() < processDescriptor.maxUdpPacketSize) {
// packet small enough to send using UDP
udpNetwork.sendMessage(message, destinations);
} else { |
| File |
|---|
| GenericNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Cast expression |
| Comment |
| If statement |
| Method invocation |
| Method signature |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
udpNetwork.sendMessage(message, destinations);
} else {
// big packet so send using TCP
<<<<<<< HEAD
for (int i = dests.nextSetBit(0); i >= 0; i = dests.nextSetBit(i + 1)) {
tcpNetwork.sendBytes(data, i);
}
=======
tcpNetwork.sendMessage(message, destinations);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
}
|
| Solution content |
|---|
udpNetwork.sendMessage(message, destinations);
} else {
// big packet so send using TCP
tcpNetwork.sendMessage(message, destinations);
}
}
|
| File |
|---|
| GenericNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| For statement |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
<<<<<<< HEAD
}
}
public void sendToAllButMe(Message message) {
sendMessage(message, allButMe);
=======
@Override
protected void send(Message message, int destination) {
// send message using UDP or TCP
if (message.byteSize() < processDescriptor.maxUdpPacketSize) {
// packet small enough to send using UDP
udpNetwork.send(message, destination);
} else {
// big packet so send using TCP
tcpNetwork.send(message, destination);
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
@SuppressWarnings("unused") |
| Solution content |
|---|
}
}
@Override
protected void send(Message message, int destination) {
// send message using UDP or TCP
if (message.byteSize() < processDescriptor.maxUdpPacketSize) {
// packet small enough to send using UDP
udpNetwork.send(message, destination);
} else {
// big packet so send using TCP
tcpNetwork.send(message, destination);
}
}
@SuppressWarnings("unused") |
| File |
|---|
| GenericNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Annotation |
| Comment |
| If statement |
| Method invocation |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
* @param message the message to send
* @param destination the id of replica to send message to
*/
<<<<<<< HEAD
public abstract void sendMessage(Message message, int destination);
/**
* ???
*
* @Deprecated: this method is extremely error-prone.
*/
@Deprecated
public abstract boolean send(byte[] message, int destination);
=======
final public void sendMessage(Message message, int destination) {
assert destination != localId : "sending unicast to self";
BitSet bs = new BitSet();
bs.set(destination);
send(message, destination);
fireSentMessage(message, bs);
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
/**
* Sends the message to processes with specified ids as a bitset |
| Solution content |
|---|
* @param message the message to send
* @param destination the id of replica to send message to
*/
final public void sendMessage(Message message, int destination) {
assert destination != localId : "sending unicast to self";
BitSet bs = new BitSet();
bs.set(destination);
send(message, destination);
fireSentMessage(message, bs);
}
/**
* Sends the message to processes with specified ids as a bitset |
| File |
|---|
| Network.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Annotation |
| Comment |
| Method declaration |
| Method interface |
| Chunk |
|---|
| Conflicting content |
|---|
}
/**
<<<<<<< HEAD
* Sends the message to all processes except self.
*
* @param message the message to send
*/
public abstract void sendToAllButMe(Message message);
public abstract void start();
=======
* Sends the message to all processes but the sender
*
* @param message the message to send
*/
final public void sendToOthers(Message message) {
sendMessage(message, OTHERS);
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
/**
* Adds a new message listener for a certain type of message or all messages |
| Solution content |
|---|
}
/**
* Sends the message to all processes but the sender
*
* @param message the message to send
*/
final public void sendToOthers(Message message) {
sendMessage(message, OTHERS);
}
/**
* Adds a new message listener for a certain type of message or all messages |
| File |
|---|
| Network.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method declaration |
| Method interface |
| Chunk |
|---|
| Conflicting content |
|---|
import lsr.common.KillOnExceptionHandler; import lsr.common.PID; <<<<<<< HEAD import lsr.common.ProcessDescriptor; ======= >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import lsr.paxos.messages.Message; import lsr.paxos.messages.MessageFactory; import lsr.paxos.statistics.QueueMonitor; |
| Solution content |
|---|
import lsr.common.KillOnExceptionHandler; import lsr.common.PID; import lsr.paxos.messages.Message; import lsr.paxos.messages.MessageFactory; |
| File |
|---|
| TcpConnection.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
private final Thread receiverThread;
private final ArrayBlockingQueue |
| Solution content |
|---|
private final Thread receiverThread;
private final ArrayBlockingQueue |
| File |
|---|
| TcpConnection.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Chunk |
|---|
| Conflicting content |
|---|
logger.info("Creating connection: " + replica + " - " + active);
<<<<<<< HEAD
this.receiverThread = new Thread(new ReceiverThread(), "ReplicaIORcv-" +
this.replica.getId());
this.senderThread = new Thread(new Sender(), "ReplicaIOSnd-" + this.replica.getId());
=======
receiverThread = new Thread(new ReceiverThread(), "ReplicaIORcv-" + replica.getId());
senderThread = new Thread(new Sender(), "ReplicaIOSnd-" + replica.getId());
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
receiverThread.setUncaughtExceptionHandler(new KillOnExceptionHandler());
senderThread.setUncaughtExceptionHandler(new KillOnExceptionHandler());
|
| Solution content |
|---|
logger.info("Creating connection: " + replica + " - " + active);
receiverThread = new Thread(new ReceiverThread(), "ReplicaIORcv-" + replica.getId());
senderThread = new Thread(new Sender(), "ReplicaIOSnd-" + replica.getId());
receiverThread.setUncaughtExceptionHandler(new KillOnExceptionHandler());
senderThread.setUncaughtExceptionHandler(new KillOnExceptionHandler());
|
| File |
|---|
| TcpConnection.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
logger.info("Sender thread started.");
try {
while (true) {
<<<<<<< HEAD
byte[] msg = sendQueue.take();
// ignore message if not connected
// Works without memory barrier because connected is
// volatile
if (!connected) {
logger.log(Level.FINE, "Ignoting " + msg.length + " bytes");
continue;
}
try {
logger.log(Level.FINE, "Sending " + msg.length + " bytes");
output.write(msg);
output.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "Error sending message", e);
=======
// wait for connection
synchronized (connectedLock) {
while (!connected)
connectedLock.wait();
}
while (true) {
if (Thread.interrupted()) {
if (!closing)
logger.severe("Sender " + Thread.currentThread().getName() +
" thread has been interupted and stopped.");
return;
}
byte[] msg = sendQueue.take();
// ignore message if not connected
// Works without memory barrier because connected is
// volatile
if (!connected) {
sendQueue.offer(msg);
break;
}
try {
output.write(msg);
output.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "Error sending message", e);
close();
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
}
} catch (InterruptedException e) { |
| Solution content |
|---|
logger.info("Sender thread started.");
try {
while (true) {
// wait for connection
synchronized (connectedLock) {
while (!connected)
connectedLock.wait();
}
while (true) {
if (Thread.interrupted()) {
if (!closing)
logger.severe("Sender " + Thread.currentThread().getName() +
" thread has been interupted and stopped.");
return;
}
byte[] msg = sendQueue.take();
// ignore message if not connected
// Works without memory barrier because connected is
// volatile
if (!connected) {
sendQueue.offer(msg);
break;
}
try {
output.write(msg);
output.flush();
} catch (IOException e) {
logger.log(Level.WARNING, "Error sending message", e);
close();
}
}
}
} catch (InterruptedException e) { |
| File |
|---|
| TcpConnection.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Catch clause |
| Comment |
| If statement |
| Method invocation |
| Synchronized statement |
| Try statement |
| Variable |
| While statement |
| Chunk |
|---|
| Conflicting content |
|---|
logger.severe("Receiver thread has been interupted.");
break;
}
<<<<<<< HEAD
logger.warning("Tcp connected " + replica.getId());
=======
logger.info("Tcp connected " + replica.getId());
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
while (true) {
if (Thread.interrupted()) { |
| Solution content |
|---|
logger.severe("Receiver thread has been interupted.");
break;
}
logger.info("Tcp connected " + replica.getId());
while (true) {
if (Thread.interrupted()) { |
| File |
|---|
| TcpConnection.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
* @param message - binary packet to send
* @return true if sending message was successful
*/
<<<<<<< HEAD
public boolean send(byte[] message) {
// TODO: JK: warning log messages here? Why's that?
try {
if (connected) {
// TODO: JK: why wait 10 milliseconds? What does it change?
// boolean enqueued = sendQueue.offer(message);
boolean enqueued = sendQueue.offer(message, 10, TimeUnit.MILLISECONDS);
if (!enqueued) {
if (droppedFull % 16 == 0) {
logger.warning("Dropping message, send queue full. To: " + replica.getId() +
". " + droppedFull);
}
droppedFull++;
}
} else {
if (dropped % 512 == 0) {
logger.warning("Dropping message, not connected. To: " + replica.getId() +
". " + dropped);
}
dropped++;
}
} catch (InterruptedException e) {
logger.warning("Thread interrupted. Terminating.");
Thread.currentThread().interrupt();
=======
public void send(byte[] message) {
try {
if (connected) {
long start = System.currentTimeMillis();
sendQueue.put(message);
int delta = (int) (System.currentTimeMillis() - start);
if (delta > 10) {
logger.warning("Wait time: " + delta);
}
} else {
// keep last n messages
while (!sendQueue.offer(message)) {
sendQueue.poll();
}
}
} catch (InterruptedException e) {
if (!closing) {
logger.warning("Thread interrupted. Terminating.");
Thread.currentThread().interrupt();
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
}
|
| Solution content |
|---|
* @param message - binary packet to send
* @return true if sending message was successful
*/
public void send(byte[] message) {
try {
if (connected) {
long start = System.currentTimeMillis();
sendQueue.put(message);
int delta = (int) (System.currentTimeMillis() - start);
if (delta > 10) {
logger.warning("Wait time: " + delta);
}
} else {
// keep last n messages
while (!sendQueue.offer(message)) {
sendQueue.poll();
}
}
} catch (InterruptedException e) {
if (!closing) {
logger.warning("Thread interrupted. Terminating.");
Thread.currentThread().interrupt();
}
}
}
|
| File |
|---|
| TcpConnection.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Catch clause |
| Comment |
| If statement |
| Method invocation |
| Method signature |
| Try statement |
| Chunk |
|---|
| Conflicting content |
|---|
* @param output - output stream from this socket
*/
public synchronized void setConnection(Socket socket, DataInputStream input,
<<<<<<< HEAD
OutputStream output) {
assert socket.isConnected() : "Invalid socket state";
// first close old connection
close();
=======
DataOutputStream output) {
assert socket != null : "Invalid socket state";
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
// initialize new connection
this.socket = socket; |
| Solution content |
|---|
* @param output - output stream from this socket
*/
public synchronized void setConnection(Socket socket, DataInputStream input,
DataOutputStream output) {
assert socket != null : "Invalid socket state";
// initialize new connection
this.socket = socket; |
| File |
|---|
| TcpConnection.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Comment |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
socket.setReceiveBufferSize(TCP_BUFFER_SIZE);
socket.setSendBufferSize(TCP_BUFFER_SIZE);
logger.warning("RcvdBuffer: " + socket.getReceiveBufferSize() +
<<<<<<< HEAD
", SendBuffer: " +
socket.getSendBufferSize());
=======
", SendBuffer: " + socket.getSendBufferSize());
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
socket.setTcpNoDelay(true);
logger.warning("Connecting to: " + replica); |
| Solution content |
|---|
socket.setReceiveBufferSize(TCP_BUFFER_SIZE);
socket.setSendBufferSize(TCP_BUFFER_SIZE);
logger.warning("RcvdBuffer: " + socket.getReceiveBufferSize() +
", SendBuffer: " + socket.getSendBufferSize());
socket.setTcpNoDelay(true);
|
| File |
|---|
| TcpConnection.java |
| Developer's decision |
|---|
| Version 1 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
continue;
}
<<<<<<< HEAD
input = new DataInputStream(
new BufferedInputStream(socket.getInputStream()));
output = socket.getOutputStream();
int v = ProcessDescriptor.getInstance().localId;
output.write((v >>> 24) & 0xFF);
output.write((v >>> 16) & 0xFF);
output.write((v >>> 8) & 0xFF);
output.write((v >>> 0) & 0xFF);
=======
input = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
output = socket.getOutputStream();
byte buf[] = new byte[4];
ByteBuffer.wrap(buf).putInt(processDescriptor.localId);
output.write(buf);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
output.flush();
// connection established
break; |
| Solution content |
|---|
continue;
}
input = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
output = socket.getOutputStream();
byte buf[] = new byte[4];
ByteBuffer.wrap(buf).putInt(processDescriptor.localId);
output.write(buf);
output.flush();
// connection established
break; |
| File |
|---|
| TcpConnection.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
// connection established
break;
} catch (IOException e) {
<<<<<<< HEAD
// some other problem (possibly other side closes
// connection while initializing connection); for debug
// purpose we print this message
long sleepTime = ProcessDescriptor.getInstance().tcpReconnectTimeout;
logger.log(Level.WARNING, "Error connecting to " + replica +
". Reconnecting in " + sleepTime, e);
Thread.sleep(sleepTime);
=======
logger.log(Level.WARNING, "Error connecting to " + replica, e);
Thread.sleep(processDescriptor.tcpReconnectTimeout);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
}
|
| Solution content |
|---|
// connection established
break;
} catch (IOException e) {
logger.log(Level.WARNING, "Error connecting to " + replica, e);
Thread.sleep(processDescriptor.tcpReconnectTimeout);
}
}
|
| File |
|---|
| TcpConnection.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
<<<<<<< HEAD
closing = true;
connected = false;
if (socket != null && socket.isConnected()) {
logger.info("Closing socket ...");
connected = false;
=======
logger.info("Closing TCP connection to " + replica);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
try {
socket.shutdownOutput();
socket.close(); |
| Solution content |
|---|
closing = true;
connected = false;
if (socket != null && socket.isConnected()) {
logger.info("Closing TCP connection to " + replica);
try {
socket.shutdownOutput();
socket.close(); |
| File |
|---|
| TcpConnection.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
import lsr.paxos.messages.Message;
public class TcpNetwork extends Network implements Runnable {
<<<<<<< HEAD
private final TcpConnection[] connections;
=======
private final TcpConnection[][] activeConnections;
private final List |
| Solution content |
|---|
import lsr.paxos.messages.Message;
public class TcpNetwork extends Network implements Runnable {
private final TcpConnection[][] activeConnections;
private final List |
| File |
|---|
| TcpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
* @throws IOException if opening server socket fails
*/
public TcpNetwork() throws IOException {
<<<<<<< HEAD
super();
this.connections = new TcpConnection[processDescriptor.numReplicas];
logger.fine("Opening port: " + processDescriptor.getLocalProcess().getReplicaPort());
this.server = new ServerSocket();
=======
activeConnections = new TcpConnection[processDescriptor.numReplicas][2];
logger.info("Opening port: " + processDescriptor.getLocalProcess().getReplicaPort());
server = new ServerSocket();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
server.setReceiveBufferSize(256 * 1024);
server.bind(new InetSocketAddress((InetAddress) null,
processDescriptor.getLocalProcess().getReplicaPort())); |
| Solution content |
|---|
* @throws IOException if opening server socket fails
*/
public TcpNetwork() throws IOException {
activeConnections = new TcpConnection[processDescriptor.numReplicas][2];
logger.info("Opening port: " + processDescriptor.getLocalProcess().getReplicaPort());
server = new ServerSocket();
server.setReceiveBufferSize(256 * 1024);
server.bind(new InetSocketAddress((InetAddress) null,
processDescriptor.getLocalProcess().getReplicaPort())); |
| File |
|---|
| TcpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
@Override
public synchronized void start() {
if (!started) {
<<<<<<< HEAD
logger.fine("Starting TcpNetwork");
for (int i = 0; i < connections.length; i++) {
if (i < processDescriptor.localId) {
connections[i] = new TcpConnection(this,
processDescriptor.config.getProcess(i), false);
connections[i].start();
}
if (i > processDescriptor.localId) {
connections[i] = new TcpConnection(this,
processDescriptor.config.getProcess(i), true);
connections[i].start();
}
=======
for (int i = 0; i < activeConnections.length; i++) {
activeConnections[i][0] = null;
activeConnections[i][1] = null;
if (i == processDescriptor.localId)
continue;
TcpConnection tcpConn = new TcpConnection(this,
processDescriptor.config.getProcess(i), i, true);
allConnections.add(tcpConn);
tcpConn.start();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
acceptorThread.start();
started = true; |
| Solution content |
|---|
@Override
public void start() {
if (!started) {
for (int i = 0; i < activeConnections.length; i++) {
activeConnections[i][0] = null;
activeConnections[i][1] = null;
if (i == processDescriptor.localId)
continue;
TcpConnection tcpConn = new TcpConnection(this,
processDescriptor.config.getProcess(i), i, true);
allConnections.add(tcpConn);
tcpConn.start();
}
acceptorThread.start();
started = true; |
| File |
|---|
| TcpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Array access |
| For statement |
| If statement |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
acceptorThread.start();
started = true;
} else {
<<<<<<< HEAD
logger.warning("Starting TcpNetwork multiple times!");
=======
logger.warning("Starting TCP networkmultiple times!");
assert false;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
}
|
| Solution content |
|---|
acceptorThread.start();
started = true;
} else {
logger.warning("Starting TCP networkmultiple times!");
assert false;
}
}
|
| File |
|---|
| TcpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
* @param destination - id of replica to send data to
* @return true if message was sent; false if some error occurred
*/
<<<<<<< HEAD
@Deprecated
public boolean send(byte[] message, int destination) {
return sendBytes(message, destination);
}
/* package access */boolean sendBytes(byte[] message, int destination) {
assert destination != processDescriptor.localId;
return connections[destination].send(message);
=======
protected void send(byte[] message, int destination) {
if (activeConnections[destination][0] != null)
activeConnections[destination][0].send(message);
}
protected void send(Message message, int destination) {
send(message.toByteArray(), destination);
}
@Override
public void send(Message message, BitSet destinations) {
assert !destinations.isEmpty() : "Sending a message to no one";
byte[] bytes = message.toByteArray();
for (int i = destinations.nextSetBit(0); i >= 0; i = destinations.nextSetBit(i + 1)) {
send(bytes, i);
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
/** |
| Solution content |
|---|
* @param destination - id of replica to send data to
* @return true if message was sent; false if some error occurred
*/
protected void send(byte[] message, int destination) {
if (activeConnections[destination][0] != null)
activeConnections[destination][0].send(message);
}
protected void send(Message message, int destination) {
send(message.toByteArray(), destination);
}
@Override
public void send(Message message, BitSet destinations) {
assert !destinations.isEmpty() : "Sending a message to no one";
byte[] bytes = message.toByteArray();
for (int i = destinations.nextSetBit(0); i >= 0; i = destinations.nextSetBit(i + 1)) {
send(bytes, i);
}
}
/** |
| File |
|---|
| TcpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Annotation |
| Assert statement |
| For statement |
| Method declaration |
| Method invocation |
| Method signature |
| Return statement |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
socket.close();
return;
}
<<<<<<< HEAD
assert replicaId != processDescriptor.localId : "Remote replica has same id as local";
=======
if (replicaId == processDescriptor.localId) {
logger.warning("Remote replica has same id as local: " + replicaId);
socket.close();
return;
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
TcpConnection tcpConn = new TcpConnection(this,
processDescriptor.config.getProcess(replicaId), replicaId, false); |
| Solution content |
|---|
socket.close();
return;
}
if (replicaId == processDescriptor.localId) {
logger.warning("Remote replica has same id as local: " + replicaId);
socket.close();
return;
}
TcpConnection tcpConn = new TcpConnection(this,
processDescriptor.config.getProcess(replicaId), replicaId, false); |
| File |
|---|
| TcpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| If statement |
| Chunk |
|---|
| Conflicting content |
|---|
}
}
<<<<<<< HEAD
public void sendMessage(Message message, BitSet destinations) {
assert !destinations.isEmpty() : "Sending a message to no one";
// do not send message to self (just fire event)
if (destinations.get(processDescriptor.localId)) {
logger.warning("Sending message to self: " + message);
fireReceiveMessage(message, processDescriptor.localId);
destinations = (BitSet) destinations.clone();
destinations.clear(processDescriptor.localId);
}
byte[] bytes = message.toByteArray();
for (int i = destinations.nextSetBit(0); i >= 0; i = destinations.nextSetBit(i + 1)) {
sendBytes(bytes, i);
=======
/* package private */void addConnection(int replicaId, TcpConnection tcpConn) {
synchronized (activeConnections) {
if (activeConnections[replicaId][0] == null) {
activeConnections[replicaId][0] = tcpConn;
} else {
if (activeConnections[replicaId][1] == null) {
if (activeConnections[replicaId][0].isActive() ^ tcpConn.isActive()) {
activeConnections[replicaId][1] = tcpConn;
} else {
activeConnections[replicaId][0].stopAsync();
activeConnections[replicaId][0] = tcpConn;
}
} else {
if (activeConnections[replicaId][1].isActive() ^ tcpConn.isActive()) {
activeConnections[replicaId][0].stopAsync();
activeConnections[replicaId][0] = tcpConn;
} else {
activeConnections[replicaId][1].stopAsync();
activeConnections[replicaId][1] = tcpConn;
}
}
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
}
|
| Solution content |
|---|
}
}
/* package private */void addConnection(int replicaId, TcpConnection tcpConn) {
synchronized (activeConnections) {
if (activeConnections[replicaId][0] == null) {
activeConnections[replicaId][0] = tcpConn;
} else {
if (activeConnections[replicaId][1] == null) {
if (activeConnections[replicaId][0].isActive() ^ tcpConn.isActive()) {
activeConnections[replicaId][1] = tcpConn;
} else {
activeConnections[replicaId][0].stopAsync();
activeConnections[replicaId][0] = tcpConn;
}
} else {
if (activeConnections[replicaId][1].isActive() ^ tcpConn.isActive()) {
activeConnections[replicaId][0].stopAsync();
activeConnections[replicaId][0] = tcpConn;
} else {
activeConnections[replicaId][1].stopAsync();
activeConnections[replicaId][1] = tcpConn;
}
}
}
}
}
|
| File |
|---|
| TcpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Comment |
| For statement |
| If statement |
| Method invocation |
| Method signature |
| Synchronized statement |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
}
}
<<<<<<< HEAD
public void sendToAllButMe(Message message) {
sendMessage(message, allButMe);
}
public void closeAll() {
for (TcpConnection c : connections) {
=======
public void closeAll() {
for (TcpConnection c : allConnections) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
try {
if (c != null)
c.stop(); |
| Solution content |
|---|
}
}
public void closeAll() {
for (TcpConnection c : allConnections) {
try {
if (c != null)
c.stop(); |
| File |
|---|
| TcpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| For statement |
| Method declaration |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
import lsr.common.Configuration; import lsr.common.KillOnExceptionHandler; import lsr.common.PID; <<<<<<< HEAD import static lsr.common.ProcessDescriptor.processDescriptor; ======= >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import lsr.paxos.messages.Message; import lsr.paxos.messages.MessageFactory; |
| Solution content |
|---|
import lsr.common.Configuration; import lsr.common.KillOnExceptionHandler; import lsr.common.PID; import lsr.paxos.messages.Message; import lsr.paxos.messages.MessageFactory; |
| File |
|---|
| UdpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
* @throws SocketException
*/
public UdpNetwork() throws SocketException {
<<<<<<< HEAD
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
addresses = new SocketAddress[processDescriptor.numReplicas];
for (int i = 0; i < addresses.length; i++) {
PID pid = processDescriptor.config.getProcess(i); |
| Solution content |
|---|
* @throws SocketException
*/
public UdpNetwork() throws SocketException {
addresses = new SocketAddress[processDescriptor.numReplicas];
for (int i = 0; i < addresses.length; i++) {
PID pid = processDescriptor.config.getProcess(i); |
| File |
|---|
| UdpNetwork.java |
| Developer's decision |
|---|
| Version 1 |
| Kind of conflict |
|---|
| Blank |
| Chunk |
|---|
| Conflicting content |
|---|
readThread.start();
started = true;
} else {
<<<<<<< HEAD
logger.warning("Starting UdpNetwork multiple times!");
=======
logger.warning("Starting UDP networkmultiple times!");
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
}
|
| Solution content |
|---|
readThread.start();
started = true;
} else {
logger.warning("Starting UDP networkmultiple times!");
}
}
|
| File |
|---|
| UdpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
logger.info(Thread.currentThread().getName() +
" thread started. Waiting for UDP messages");
try {
<<<<<<< HEAD
while (true) {
=======
while (!Thread.interrupted()) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
byte[] buffer = new byte[processDescriptor.maxUdpPacketSize + 4];
// Read message and enqueue it for processing.
DatagramPacket dp = new DatagramPacket(buffer, buffer.length); |
| Solution content |
|---|
logger.info(Thread.currentThread().getName() +
" thread started. Waiting for UDP messages");
try {
while (!Thread.interrupted()) {
byte[] buffer = new byte[processDescriptor.maxUdpPacketSize + 4];
// Read message and enqueue it for processing.
DatagramPacket dp = new DatagramPacket(buffer, buffer.length); |
| File |
|---|
| UdpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| While statement |
| Chunk |
|---|
| Conflicting content |
|---|
byte[] data = new byte[dp.getLength() - 4];
dis.read(data);
<<<<<<< HEAD
try {
Message message = MessageFactory.readByteArray(data);
if (logger.isLoggable(Level.FINE)) {
logger.fine("Received from " + sender + ":" + message);
}
fireReceiveMessage(message, sender);
} catch (ClassNotFoundException e) {
logger.log(Level.WARNING, "Error deserializing message", e);
=======
Message message = MessageFactory.readByteArray(data);
if (logger.isLoggable(Level.FINE)) {
logger.fine("Received from " + sender + ":" + message);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
fireReceiveMessage(message, sender);
} |
| Solution content |
|---|
byte[] data = new byte[dp.getLength() - 4];
dis.read(data);
Message message = MessageFactory.readByteArray(data);
if (logger.isLoggable(Level.FINE)) {
logger.fine("Received from " + sender + ":" + message);
}
fireReceiveMessage(message, sender);
} |
| File |
|---|
| UdpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Catch clause |
| If statement |
| Method invocation |
| Try statement |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
* @param destinations - the id's of replicas to send message to
* @throws IOException if an I/O error occurs
*/
<<<<<<< HEAD
/* package access */void send(byte[] message, BitSet destinations) {
=======
protected void send(byte[] message, BitSet destinations) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
// prepare packet to send
byte[] data = new byte[message.length + 4];
ByteBuffer.wrap(data).putInt(processDescriptor.localId).put(message); |
| Solution content |
|---|
* @param destinations - the id's of replicas to send message to
* @throws IOException if an I/O error occurs
*/
protected void send(byte[] message, BitSet destinations) {
// prepare packet to send
byte[] data = new byte[message.length + 4];
ByteBuffer.wrap(data).putInt(processDescriptor.localId).put(message); |
| File |
|---|
| UdpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method signature |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
@Override
public void send(Message message, BitSet destinations) {
message.setSentTime();
<<<<<<< HEAD
if (logger.isLoggable(Level.FINEST)) {
logger.finest("Sending " + message + " to " + destinations);
}
byte[] messageBytes = message.toByteArray();
=======
byte[] messageBytes = message.toByteArray();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
if (messageBytes.length > processDescriptor.maxUdpPacketSize + 4) {
throw new RuntimeException("Data packet too big. Size: " +
messageBytes.length + ", limit: " + |
| Solution content |
|---|
@Override
public void send(Message message, BitSet destinations) {
message.setSentTime();
byte[] messageBytes = message.toByteArray();
if (messageBytes.length > processDescriptor.maxUdpPacketSize + 4) {
throw new RuntimeException("Data packet too big. Size: " +
messageBytes.length + ", limit: " + |
| File |
|---|
| UdpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| If statement |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
messageBytes.length + ", limit: " +
processDescriptor.maxUdpPacketSize +
". Packet not sent.");
<<<<<<< HEAD
}
if (destinations.get(processDescriptor.localId)) {
logger.warning("Sending message to self: " + message);
fireReceiveMessage(message, processDescriptor.localId);
destinations = (BitSet) destinations.clone();
destinations.clear(processDescriptor.localId);
}
send(messageBytes, destinations);
fireSentMessage(message, destinations);
}
public void sendMessage(Message message, int destination) {
BitSet all = new BitSet();
all.set(destination);
sendMessage(message, all);
}
public void sendToAllButMe(Message message) {
sendMessage(message, allButMe);
=======
}
send(messageBytes, destinations);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
private final static Logger logger = Logger.getLogger(UdpNetwork.class.getCanonicalName()); |
| Solution content |
|---|
messageBytes.length + ", limit: " +
processDescriptor.maxUdpPacketSize +
". Packet not sent.");
}
send(messageBytes, destinations);
}
private final static Logger logger = Logger.getLogger(UdpNetwork.class.getCanonicalName()); |
| File |
|---|
| UdpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| If statement |
| Method declaration |
| Method invocation |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
private final static Logger logger = Logger.getLogger(UdpNetwork.class.getCanonicalName());
<<<<<<< HEAD
@Deprecated
public boolean send(byte[] message, int destination) {
BitSet bs = new BitSet();
bs.set(destination);
send(message, bs);
return true;
=======
@Override
protected void send(Message message, int destination) {
// prepare packet to send
byte[] data = new byte[message.byteSize() + 4];
ByteBuffer bb = ByteBuffer.wrap(data);
bb.putInt(processDescriptor.localId);
message.writeTo(bb);
DatagramPacket dp = new DatagramPacket(data, data.length);
dp.setSocketAddress(addresses[destination]);
try {
datagramSocket.send(dp);
} catch (IOException e) {
throw new RuntimeException(e);
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
} |
| Solution content |
|---|
private final static Logger logger = Logger.getLogger(UdpNetwork.class.getCanonicalName());
@Override
protected void send(Message message, int destination) {
// prepare packet to send
byte[] data = new byte[message.byteSize() + 4];
ByteBuffer bb = ByteBuffer.wrap(data);
bb.putInt(processDescriptor.localId);
message.writeTo(bb);
DatagramPacket dp = new DatagramPacket(data, data.length);
dp.setSocketAddress(addresses[destination]);
try {
datagramSocket.send(dp);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
} |
| File |
|---|
| UdpNetwork.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Annotation |
| Comment |
| Method invocation |
| Method signature |
| Return statement |
| Try statement |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
import java.io.IOException; <<<<<<< HEAD import lsr.common.ProcessDescriptor; import lsr.paxos.ReplicaCallback; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; import lsr.paxos.core.PaxosImpl; ======= import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import lsr.paxos.storage.InMemoryStorage; import lsr.paxos.storage.Storage; |
| Solution content |
|---|
import java.io.IOException; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; import lsr.paxos.storage.InMemoryStorage; import lsr.paxos.storage.Storage; |
| File |
|---|
| CrashStopRecovery.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
import java.util.logging.Level; import java.util.logging.Logger; <<<<<<< HEAD import lsr.common.Dispatcher; import lsr.common.ProcessDescriptor; import lsr.paxos.ActiveRetransmitter; import lsr.paxos.ReplicaCallback; ======= import lsr.common.SingleThreadDispatcher; import lsr.paxos.ActiveRetransmitter; >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import lsr.paxos.RetransmittedMessage; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; |
| Solution content |
|---|
import java.util.logging.Level; import java.util.logging.Logger; import lsr.common.SingleThreadDispatcher; import lsr.paxos.ActiveRetransmitter; import lsr.paxos.RetransmittedMessage; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; |
| File |
|---|
| EpochSSRecovery.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
import lsr.paxos.RetransmittedMessage; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; <<<<<<< HEAD import lsr.paxos.core.PaxosImpl; ======= >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import lsr.paxos.messages.Message; import lsr.paxos.messages.MessageType; import lsr.paxos.messages.Recovery; |
| Solution content |
|---|
import lsr.paxos.RetransmittedMessage; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; import lsr.paxos.messages.Message; import lsr.paxos.messages.MessageType; import lsr.paxos.messages.Recovery; |
| File |
|---|
| EpochSSRecovery.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
answerFromLeader = null;
}
<<<<<<< HEAD
if (paxos.getLeaderId() == sender) {
=======
if (processDescriptor.getLeaderOfView(storage.getView()) == sender) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
answerFromLeader = recoveryAnswer;
}
|
| Solution content |
|---|
answerFromLeader = null;
}
if (processDescriptor.getLeaderOfView(storage.getView()) == sender) {
answerFromLeader = recoveryAnswer;
}
|
| File |
|---|
| EpochSSRecovery.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| If statement |
| Chunk |
|---|
| Conflicting content |
|---|
import java.io.IOException; import java.util.logging.Logger; <<<<<<< HEAD import static lsr.common.ProcessDescriptor.processDescriptor; import lsr.paxos.ReplicaCallback; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; import lsr.paxos.core.PaxosImpl; ======= import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import lsr.paxos.storage.FullSSDiscWriter; import lsr.paxos.storage.SingleNumberWriter; import lsr.paxos.storage.Storage; |
| Solution content |
|---|
import java.io.IOException; import java.util.logging.Logger; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; import lsr.paxos.storage.FullSSDiscWriter; import lsr.paxos.storage.SingleNumberWriter; import lsr.paxos.storage.Storage; |
| File |
|---|
| FullSSRecovery.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
}
private Storage createStorage() throws IOException {
<<<<<<< HEAD
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
logger.info("Reading log from: " + logPath);
FullSSDiscWriter writer = new FullSSDiscWriter(logPath);
|
| Solution content |
|---|
}
private Storage createStorage() throws IOException {
logger.info("Reading log from: " + logPath);
FullSSDiscWriter writer = new FullSSDiscWriter(logPath);
|
| File |
|---|
| FullSSRecovery.java |
| Developer's decision |
|---|
| Version 1 |
| Kind of conflict |
|---|
| Blank |
| Chunk |
|---|
| Conflicting content |
|---|
FullSSDiscWriter writer = new FullSSDiscWriter(logPath);
Storage storage = new SynchronousStorage(writer);
<<<<<<< HEAD
if (storage.getView() % processDescriptor.numReplicas == processDescriptor.localId) {
=======
// Client batches and ViewEpochIdGenerator use epoch in FullSS
SingleNumberWriter epochFile = new SingleNumberWriter(logPath, "sync.epoch");
storage.setEpoch(new long[] {epochFile.readNumber() + 1});
epochFile.writeNumber(storage.getEpoch()[0]);
if (processDescriptor.isLocalProcessLeader(storage.getView())) {
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
storage.setView(storage.getView() + 1);
}
return storage; |
| Solution content |
|---|
FullSSDiscWriter writer = new FullSSDiscWriter(logPath);
Storage storage = new SynchronousStorage(writer);
// Client batches and ViewEpochIdGenerator use epoch in FullSS
SingleNumberWriter epochFile = new SingleNumberWriter(logPath, "sync.epoch");
storage.setEpoch(new long[] {epochFile.readNumber() + 1});
epochFile.writeNumber(storage.getEpoch()[0]);
if (processDescriptor.isLocalProcessLeader(storage.getView())) {
storage.setView(storage.getView() + 1);
}
return storage; |
| File |
|---|
| FullSSRecovery.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| If statement |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
package lsr.paxos.recovery; <<<<<<< HEAD ======= import java.util.logging.Logger; >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import lsr.paxos.CatchUpListener; import lsr.paxos.core.CatchUp; import lsr.paxos.storage.Storage; |
| Solution content |
|---|
package lsr.paxos.recovery; import java.util.logging.Logger; import lsr.paxos.CatchUpListener; import lsr.paxos.core.CatchUp; import lsr.paxos.storage.Storage; |
| File |
|---|
| RecoveryCatchUp.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
catchUp.addListener(new CatchUpListener() {
public void catchUpAdvanced() {
if (storage.getFirstUncommitted() >= firstUncommitted) {
<<<<<<< HEAD
=======
logger.info("Recovery catch-up succeeded");
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
catchUp.removeListener(this);
callback.run();
} else { |
| Solution content |
|---|
catchUp.addListener(new CatchUpListener() {
public void catchUpAdvanced() {
if (storage.getFirstUncommitted() >= firstUncommitted) {
logger.info("Recovery catch-up succeeded");
catchUp.removeListener(this);
callback.run();
} else { |
| File |
|---|
| RecoveryCatchUp.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
import java.util.logging.Level; import java.util.logging.Logger; <<<<<<< HEAD import lsr.common.Dispatcher; import lsr.common.ProcessDescriptor; import lsr.paxos.ActiveRetransmitter; import lsr.paxos.ReplicaCallback; ======= import lsr.common.SingleThreadDispatcher; import lsr.paxos.ActiveRetransmitter; >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import lsr.paxos.RetransmittedMessage; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; |
| Solution content |
|---|
import java.util.logging.Level; import java.util.logging.Logger; import lsr.common.SingleThreadDispatcher; import lsr.paxos.ActiveRetransmitter; import lsr.paxos.RetransmittedMessage; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; |
| File |
|---|
| ViewSSRecovery.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
import lsr.paxos.RetransmittedMessage; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; <<<<<<< HEAD import lsr.paxos.core.PaxosImpl; ======= >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import lsr.paxos.messages.Message; import lsr.paxos.messages.MessageType; import lsr.paxos.messages.Recovery; |
| Solution content |
|---|
import lsr.paxos.RetransmittedMessage; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; import lsr.paxos.messages.Message; import lsr.paxos.messages.MessageType; import lsr.paxos.messages.Recovery; |
| File |
|---|
| ViewSSRecovery.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
private boolean firstRun;
private final Paxos paxos;
private final int numReplicas;
<<<<<<< HEAD
private final int localId;
private final Storage storage;
private final Dispatcher dispatcher;
private Retransmitter retransmitter;
=======
private Storage storage;
private SingleThreadDispatcher dispatcher;
private ActiveRetransmitter retransmitter;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
private RetransmittedMessage recoveryRetransmitter;
public ViewSSRecovery(SnapshotProvider snapshotProvider, String stableStoragePath) |
| Solution content |
|---|
private boolean firstRun;
private Paxos paxos;
private final int numReplicas;
private Storage storage;
private SingleThreadDispatcher dispatcher;
private ActiveRetransmitter retransmitter;
private RetransmittedMessage recoveryRetransmitter;
public ViewSSRecovery(SnapshotProvider snapshotProvider, String stableStoragePath) |
| File |
|---|
| ViewSSRecovery.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Chunk |
|---|
| Conflicting content |
|---|
* @see NioClientProxy
*/
public class NioClientManager implements AcceptHandler {
<<<<<<< HEAD
/** How many selector threads to use */
public static final String SELECTOR_THREADS = "replica.SelectorThreads";
public static final int DEFAULT_SELECTOR_THREADS = -1;
private final SelectorThread[] selectorThreads;
private int nextThread = 0;
private final int localPort;
private final IdGenerator idGenerator;
private final RequestManager requestManager;
private ServerSocketChannel serverSocketChannel;
private volatile boolean started = false;
=======
private final SelectorThread[] selectorThreads;
private int nextThread = 0;
private final ClientRequestManager requestManager;
private ServerSocketChannel serverSocketChannel = null;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
/**
* Creates new client manager. |
| Solution content |
|---|
* @see NioClientProxy
*/
public class NioClientManager implements AcceptHandler {
private final SelectorThread[] selectorThreads;
private int nextThread = 0;
private final ClientRequestManager requestManager;
private ServerSocketChannel serverSocketChannel = null;
/**
* Creates new client manager. |
| File |
|---|
| NioClientManager.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
requestManager.setClientManager(this);
this.requestManager = requestManager;
<<<<<<< HEAD
// TODO: JK: Why is it here? Transport it to process descriptor.
int nSelectors = ProcessDescriptor.getInstance().config.getIntProperty(
SELECTOR_THREADS,
DEFAULT_SELECTOR_THREADS);
=======
int nSelectors = processDescriptor.selectorThreadCount;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
if (nSelectors == -1) {
nSelectors = NioClientManager.computeNSelectors();
} else { |
| Solution content |
|---|
this.requestManager = requestManager;
requestManager.setClientManager(this);
int nSelectors = processDescriptor.selectorThreadCount;
if (nSelectors == -1) {
nSelectors = NioClientManager.computeNSelectors();
} else { |
| File |
|---|
| NioClientManager.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
nSelectors = NioClientManager.computeNSelectors();
} else {
if (nSelectors < 0) {
<<<<<<< HEAD
throw new IOException("Invalid value for property " + SELECTOR_THREADS + ": " +
nSelectors);
}
}
logger.config(SELECTOR_THREADS + "=" + nSelectors);
=======
throw new IOException("Invalid value for property " +
ProcessDescriptor.SELECTOR_THREADS + ": " + nSelectors);
}
}
logger.config("Real " + ProcessDescriptor.SELECTOR_THREADS + "=" + nSelectors);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
selectorThreads = new SelectorThread[nSelectors];
for (int i = 0; i < selectorThreads.length; i++) { |
| Solution content |
|---|
nSelectors = NioClientManager.computeNSelectors();
} else {
if (nSelectors < 0) {
throw new IOException("Invalid value for property " +
ProcessDescriptor.SELECTOR_THREADS + ": " + nSelectors);
}
}
logger.config("Real " + ProcessDescriptor.SELECTOR_THREADS + "=" + nSelectors);
selectorThreads = new SelectorThread[nSelectors];
for (int i = 0; i < selectorThreads.length; i++) { |
| File |
|---|
| NioClientManager.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Throw statement |
| Chunk |
|---|
| Conflicting content |
|---|
for (int i = 0; i < selectorThreads.length; i++) {
selectorThreads[i] = new SelectorThread(i);
}
<<<<<<< HEAD
requestManager.setNioClientManager(this);
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
public void executeInAllSelectors(Runnable r) { |
| Solution content |
|---|
for (int i = 0; i < selectorThreads.length; i++) {
selectorThreads[i] = new SelectorThread(i);
}
}
public void executeInAllSelectors(Runnable r) { |
| File |
|---|
| NioClientManager.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
}
}
<<<<<<< HEAD
private static int computeNSelectors() {
int nProcessors = Runtime.getRuntime().availableProcessors();
int n;
// Values determined empirically based on tests on 24 core Opteron
// system.
if (nProcessors < 3) {
n = 1;
} else if (nProcessors < 5) {
n = 2;
} else if (nProcessors < 7) {
n = 3;
} else if (nProcessors < 9) {
n = 4;
} else if (nProcessors < 17) {
n = 5;
} else {
n = 6;
}
logger.info("Number of selector threads computed dynamically. Processors: " + nProcessors +
", selectors: " + n);
return n;
}
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
/**
* Starts listening and handling client connections.
* |
| Solution content |
|---|
}
}
/**
* Starts listening and handling client connections.
* |
| File |
|---|
| NioClientManager.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method declaration |
| Chunk |
|---|
| Conflicting content |
|---|
for (int i = 0; i < selectorThreads.length; i++) {
selectorThreads[i].start();
}
<<<<<<< HEAD
started = true;
}
public boolean isStarted() {
return started;
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
/** |
| Solution content |
|---|
for (int i = 0; i < selectorThreads.length; i++) {
selectorThreads[i].start();
}
}
/** |
| File |
|---|
| NioClientManager.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method signature |
| Return statement |
| Chunk |
|---|
| Conflicting content |
|---|
}
}
selectorThreads[0].addChannelInterest(serverSocketChannel, SelectionKey.OP_ACCEPT);
<<<<<<< HEAD
// if accepting was successful create new client proxy
if (socketChannel != null) {
try {
SelectorThread selectorThread = getNextThread();
ReaderAndWriter raw = new ReaderAndWriter(socketChannel, selectorThread);
new NioClientProxy(raw, requestManager, idGenerator);
if (logger.isLoggable(Level.FINE)) {
logger.fine("Connection from " + socketChannel.socket().getInetAddress());
} catch (IOException e) {
// TODO: probably registering to selector has failed; should we
// just close the client connection?
logger.log(
Level.SEVERE,
"probably registering to selector has failed; should we just close the client connection?",
e);
=======
assert socketChannel != null;
try {
SelectorThread selectorThread = getNextThread();
ReaderAndWriter raw = new ReaderAndWriter(socketChannel, selectorThread);
new NioClientProxy(raw, requestManager);
if (logger.isLoggable(Level.FINE)) {
logger.fine("Connection from " + socketChannel.socket().getInetAddress());
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
} catch (IOException e) {
throw new RuntimeException("Failed to set tcpNoDelay somewhere below. Let's die.", |
| Solution content |
|---|
}
selectorThreads[0].addChannelInterest(serverSocketChannel, SelectionKey.OP_ACCEPT);
assert socketChannel != null;
try {
SelectorThread selectorThread = getNextThread();
ReaderAndWriter raw = new ReaderAndWriter(socketChannel, selectorThread);
new NioClientProxy(raw, requestManager);
if (logger.isLoggable(Level.FINE)) {
logger.fine("Connection from " + socketChannel.socket().getInetAddress());
}
} catch (IOException e) {
throw new RuntimeException("Failed to set tcpNoDelay somewhere below. Let's die.", |
| File |
|---|
| NioClientManager.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Catch clause |
| Comment |
| If statement |
| Method invocation |
| Try statement |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
*/
private void execute(ByteBuffer buffer) throws InterruptedException {
ClientCommand command = new ClientCommand(buffer);
<<<<<<< HEAD
if (logger.isLoggable(Level.FINE)) {
logger.fine("Received client request: " + command.getRequest());
}
requestManager.processClientRequest(command, this);
=======
requestManager.onClientRequest(command, this);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
/** |
| Solution content |
|---|
*/
private void execute(ByteBuffer buffer) throws InterruptedException {
ClientCommand command = new ClientCommand(buffer);
requestManager.onClientRequest(command, this);
}
/** |
| File |
|---|
| NioClientProxy.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| If statement |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
/**
* Waits for the header and then for the message from the client.
*/
<<<<<<< HEAD
private class MyClientCommandPacketHandler implements PacketHandler {
/** Buffer passed in constructor, used as long as has enough capacity */
private final ByteBuffer defaultBuffer;
/**
* Buffer used by this PaketHaldler Points to:
*
|
| Solution content |
|---|
/**
* Waits for the header and then for the message from the client.
*/
private class ClientCommandPacketHandler implements PacketHandler {
// TODO: (JK) consider keeping the once allocated bigger buffer
/**
* Client request may be bigger than default buffer size; in such case,
* new buffer will be created and used for the request
*/
private ByteBuffer currentBuffer;
private boolean header = true;
public ClientCommandPacketHandler() { |
| File |
|---|
| NioClientProxy.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Class signature |
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
public void finished() throws InterruptedException {
if (header) {
<<<<<<< HEAD
assert buffer == defaultBuffer : "Default buffer should be used for reading header";
defaultBuffer.rewind();
int firstNumber = defaultBuffer.getInt();
int sizeOfValue = defaultBuffer.getInt();
if (8 + sizeOfValue <= defaultBuffer.capacity()) {
defaultBuffer.limit(8 + sizeOfValue);
} else {
buffer = ByteBuffer.allocate(8 + sizeOfValue);
buffer.putInt(firstNumber);
buffer.putInt(sizeOfValue);
}
} else {
buffer.flip();
execute(buffer);
// for reading header we can use default buffer
buffer = defaultBuffer;
defaultBuffer.clear();
defaultBuffer.limit(8);
=======
assert currentBuffer == readBuffer : "Default buffer should be used for reading header";
readBuffer.position(ClientCommand.HEADER_VALUE_SIZE_OFFSET);
int sizeOfValue = readBuffer.getInt();
if (ClientCommand.HEADERS_SIZE + sizeOfValue <= readBuffer.capacity()) {
readBuffer.limit(ClientCommand.HEADERS_SIZE + sizeOfValue);
} else {
currentBuffer = ByteBuffer.allocate(ClientCommand.HEADERS_SIZE + sizeOfValue);
currentBuffer.put(readBuffer.array(), 0, ClientCommand.HEADERS_SIZE);
}
} else {
currentBuffer.flip();
execute(currentBuffer);
// for reading header we can use default buffer
currentBuffer = readBuffer;
readBuffer.clear();
readBuffer.limit(ClientCommand.HEADERS_SIZE);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
header = !header;
readerAndWriter.setPacketHandler(this); |
| Solution content |
|---|
public void finished() throws InterruptedException {
if (header) {
assert currentBuffer == readBuffer : "Default buffer should be used for reading header";
readBuffer.position(ClientCommand.HEADER_VALUE_SIZE_OFFSET);
int sizeOfValue = readBuffer.getInt();
if (ClientCommand.HEADERS_SIZE + sizeOfValue <= readBuffer.capacity()) {
readBuffer.limit(ClientCommand.HEADERS_SIZE + sizeOfValue);
} else {
currentBuffer = ByteBuffer.allocate(ClientCommand.HEADERS_SIZE + sizeOfValue);
currentBuffer.put(readBuffer.array(), 0, ClientCommand.HEADERS_SIZE);
}
} else {
currentBuffer.flip();
execute(currentBuffer);
// for reading header we can use default buffer
currentBuffer = readBuffer;
readBuffer.clear();
readBuffer.limit(ClientCommand.HEADERS_SIZE);
}
header = !header;
readerAndWriter.setPacketHandler(this); |
| File |
|---|
| NioClientProxy.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Attribute |
| Comment |
| If statement |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
}
public ByteBuffer getByteBuffer() {
<<<<<<< HEAD
return buffer;
=======
return currentBuffer;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
}
|
| Solution content |
|---|
}
public ByteBuffer getByteBuffer() {
return currentBuffer;
}
}
|
| File |
|---|
| NioClientProxy.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Return statement |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
import lsr.common.CrashModel; import lsr.common.ProcessDescriptor; import lsr.common.Reply; <<<<<<< HEAD import lsr.common.ClientRequest; import lsr.common.SingleThreadDispatcher; import lsr.paxos.BatchUnpacker; import lsr.paxos.BatchUnpackerImpl; import lsr.paxos.ReplicaCallback; ======= import lsr.common.RequestId; import lsr.common.SingleThreadDispatcher; >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import lsr.paxos.Snapshot; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; |
| Solution content |
|---|
import lsr.common.CrashModel; import lsr.common.ProcessDescriptor; import lsr.common.Reply; import lsr.common.RequestId; import lsr.common.SingleThreadDispatcher; import lsr.paxos.Snapshot; import lsr.paxos.SnapshotProvider; import lsr.paxos.core.Paxos; |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
*
*/
public class Replica {
<<<<<<< HEAD
/**
* Represents different crash models.
*/
public enum CrashModel {
/**
* Synchronous writes to disk are made on critical path of paxos
* execution. Catastrophic failures will be handled correctly. It is
* slower than other algorithms.
*/
FullStableStorage,
=======
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
// // // // // // // // // // // // // // // //
// External modules accessed by the replica. // |
| Solution content |
|---|
*
*/
public class Replica {
// // // // // // // // // // // // // // // //
// External modules accessed by the replica. // |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Enum signature |
| Enum value |
| Chunk |
|---|
| Conflicting content |
|---|
/** Thread for handling events connected to the replica */
private final SingleThreadDispatcher replicaDispatcher;
<<<<<<< HEAD
// TODO: JK check if this map is cleared where possible
/** caches responses for clients for a specific instance; used by snapshot */
private final NavigableMap |
| Solution content |
|---|
/** Thread for handling events connected to the replica */
private final SingleThreadDispatcher replicaDispatcher;
// // // // // // // // // // // // // // // // // // // //
// Cached replies and past replies for snapshot creation //
// // // // // // // // // // // // // // // // // // // //
/**
* For each client, keeps the sequence id of the last request executed from |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Comment |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
* the time stamp, it's already a new request, right? Client with broken
* clocks will have bad luck.
*/
<<<<<<< HEAD
private final Map |
| Solution content |
|---|
* the time stamp, it's already a new request, right? Client with broken
* clocks will have bad luck.
*/
// // // // // // //
// Public methods //
// // // // // // //
/**
* Initializes new instance of |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Comment |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
* @param localId - the id of replica to create
* @param service - the state machine to execute request on
*/
<<<<<<< HEAD
public Replica(Configuration config, int localId, Service service) throws IOException {
this.innerDecideCallback = new InnerDecideCallback();
this.innerSnapshotListener2 = new InnerSnapshotListener2();
this.innerSnapshotProvider = new InnerSnapshotProvider();
this.dispatcher = new SingleThreadDispatcher("Replica");
ProcessDescriptor.initialize(config, localId);
logPath = ProcessDescriptor.getInstance().logPath + '/' + localId;
=======
public Replica(Configuration config, int localId, Service service) {
ProcessDescriptor.initialize(config, localId);
stableStoragePath = processDescriptor.logPath + '/' + localId;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
innerSnapshotListener2 = new InnerSnapshotListener2();
innerSnapshotProvider = new InnerSnapshotProvider(); |
| Solution content |
|---|
* @param localId - the id of replica to create
* @param service - the state machine to execute request on
*/
public Replica(Configuration config, int localId, Service service) {
ProcessDescriptor.initialize(config, localId);
stableStoragePath = processDescriptor.logPath + '/' + localId;
innerSnapshotListener2 = new InnerSnapshotListener2();
innerSnapshotProvider = new InnerSnapshotProvider(); |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Method signature |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
public void start() throws IOException {
logger.info("Recovery phase started.");
<<<<<<< HEAD
RecoveryAlgorithm recovery = createRecoveryAlgorithm(ProcessDescriptor.getInstance().crashModel);
paxos = recovery.getPaxos();
// the dispatcher and network has to be started before recovery phase.
paxos.prepareForRecovery();
=======
replicaDispatcher.start();
RecoveryAlgorithm recovery = createRecoveryAlgorithm(processDescriptor.crashModel);
paxos = recovery.getPaxos();
decideCallback = new DecideCallbackImpl(paxos, this, executeUB);
paxos.setDecideCallback(decideCallback);
if (processDescriptor.indirectConsensus) {
batchManager = new ClientBatchManager(paxos, this);
batchManager.start();
requestForwarder = null;
ClientBatchStore.instance.setClientBatchManager(batchManager);
} else {
batchManager = null;
requestForwarder = new ClientRequestForwarder(paxos);
requestForwarder.start();
}
paxos.startPassivePaxos();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
recovery.addRecoveryListener(new InnerRecoveryListener());
recovery.start(); |
| Solution content |
|---|
public void start() throws IOException {
logger.info("Recovery phase started.");
replicaDispatcher.start();
RecoveryAlgorithm recovery = createRecoveryAlgorithm(processDescriptor.crashModel);
paxos = recovery.getPaxos();
decideCallback = new DecideCallbackImpl(paxos, this, executeUB);
paxos.setDecideCallback(decideCallback);
if (processDescriptor.indirectConsensus) {
batchManager = new ClientBatchManager(paxos, this);
batchManager.start();
requestForwarder = null;
ClientBatchStore.instance.setClientBatchManager(batchManager);
} else {
batchManager = null;
requestForwarder = new ClientRequestForwarder(paxos);
requestForwarder.start();
}
paxos.startPassivePaxos();
recovery.addRecoveryListener(new InnerRecoveryListener());
recovery.start(); |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Comment |
| If statement |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
<<<<<<< HEAD
public String getLogPath() {
*
* @return path
*/
return logPath;
=======
public String getStableStoragePath() {
return stableStoragePath;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
/** |
| Solution content |
|---|
*
* @return path
*/
public String getStableStoragePath() {
return stableStoragePath;
}
/** |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method signature |
| Return statement |
| Chunk |
|---|
| Conflicting content |
|---|
* @throws IllegalStateException if the method is called before the recovery
* has finished
*/
<<<<<<< HEAD
private void executeDecided() {
while (true) {
Deque |
| Solution content |
|---|
* @throws IllegalStateException if the method is called before the recovery
* has finished
*/
public void executeNonFifo(byte[] requestValue) throws IllegalStateException {
if (intCli == null)
throw new IllegalStateException(
"Request cannot be executed before recovery has finished");
intCli.executeNonFifo(requestValue);
}
/** Returns the current view */
public int getView() { |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method declaration |
| Method signature |
| Synchronized statement |
| Variable |
| While statement |
| Chunk |
|---|
| Conflicting content |
|---|
return paxos.getStorage().addViewChangeListener(listener);
}
<<<<<<< HEAD
for (ClientRequest request : requestByteArray) {
if (request.isNop()) {
// TODO: handling a no-op request
logger.warning("Executing a nop request. Instance: " + executeUB);
serviceProxy.executeNop();
} else {
Reply lastReply = executedRequests.get(request.getRequestId().getClientId());
if (lastReply != null) {
int lastSequenceNumberFromClient = lastReply.getRequestId().getSeqNumber();
// prevents executing the same request few times.
// Do not execute the same request several times.
int requestSeqNumber = request.getRequestId().getSeqNumber();
if (requestSeqNumber <= lastSequenceNumberFromClient) {
logger.warning("Request ordered multiple times. Not executing " +
executeUB + ", " + request);
continue;
}
if (requestSeqNumber != lastSequenceNumberFromClient + 1) {
logger.severe("Client seq number is " + requestSeqNumber + " after " +
lastSequenceNumberFromClient + " -- something gone bad!");
}
}
=======
/**
* Removes a listener previously added by
* {@link #addViewChangeListener(Storage.ViewChangeListener)}
*/
public boolean removeViewChangeListener(Storage.ViewChangeListener listener) {
return paxos.getStorage().removeViewChangeListener(listener);
}
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
// // // // // // // // // // // //
// Callback's for JPaxos modules // |
| Solution content |
|---|
return paxos.getStorage().addViewChangeListener(listener);
}
/**
* Removes a listener previously added by
* {@link #addViewChangeListener(Storage.ViewChangeListener)}
*/
public boolean removeViewChangeListener(Storage.ViewChangeListener listener) {
return paxos.getStorage().removeViewChangeListener(listener);
}
// // // // // // // // // // // //
// Callback's for JPaxos modules // |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| For statement |
| If statement |
| Method declaration |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
// Internal methods and classes. //
// // // // // // // // // // // //
<<<<<<< HEAD
// Can this ever be null?
// JK: yes, on recovery, when method execute decided is
// called this object is null
if (requestManager != null) {
requestManager.handleReply(request, reply);
=======
/**
* Called by the RequestManager when it has the ClientRequest that should be
* executed next.
*
* @param instance
* @param bInfo
*/
private void innerExecuteClientBatch(int instance, ClientRequest[] requests) {
assert replicaDispatcher.amIInDispatcher() : "Wrong thread: " +
Thread.currentThread().getName();
for (ClientRequest cRequest : requests) {
RequestId rID = cRequest.getRequestId();
Reply lastReply = executedRequests.get(rID.getClientId());
if (lastReply != null) {
int lastSequenceNumberFromClient = lastReply.getRequestId().getSeqNumber();
// Do not execute the same request several times.
if (rID.getSeqNumber() <= lastSequenceNumberFromClient) {
logger.warning("Request ordered multiple times. " +
instance + ", " + cRequest +
", lastSequenceNumberFromClient: " +
lastSequenceNumberFromClient);
// Send the cached reply back to the client
if (rID.getSeqNumber() == lastSequenceNumberFromClient) {
// req manager can be null on fullss disk read
if (requestManager != null)
requestManager.onRequestExecuted(cRequest, lastReply);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
continue;
} |
| Solution content |
|---|
// Internal methods and classes. //
// // // // // // // // // // // //
/**
* Called by the RequestManager when it has the ClientRequest that should be
* executed next.
*
* @param instance
* @param bInfo
*/
private void innerExecuteClientBatch(int instance, ClientRequest[] requests) {
assert replicaDispatcher.amIInDispatcher() : "Wrong thread: " +
Thread.currentThread().getName();
for (ClientRequest cRequest : requests) {
RequestId rID = cRequest.getRequestId();
Reply lastReply = executedRequests.get(rID.getClientId());
if (lastReply != null) {
int lastSequenceNumberFromClient = lastReply.getRequestId().getSeqNumber();
// Do not execute the same request several times.
if (rID.getSeqNumber() <= lastSequenceNumberFromClient) {
logger.warning("Request ordered multiple times. " +
instance + ", " + cRequest +
", lastSequenceNumberFromClient: " +
lastSequenceNumberFromClient);
// Send the cached reply back to the client
if (rID.getSeqNumber() == lastSequenceNumberFromClient) {
// req manager can be null on fullss disk read
if (requestManager != null)
requestManager.onRequestExecuted(cRequest, lastReply);
}
continue;
} |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Comment |
| For statement |
| If statement |
| Method invocation |
| Method signature |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
instances = instances.tailMap(snapshot.getNextInstanceId());
}
<<<<<<< HEAD
BatchUnpacker batcher = new BatchUnpackerImpl();
for (ConsensusInstance instance : instances.values()) {
if (instance.getState() == LogEntryState.DECIDED) {
Deque |
| Solution content |
|---|
instances = instances.tailMap(snapshot.getNextInstanceId());
}
for (ConsensusInstance instance : instances.values()) {
if (instance.getState() == LogEntryState.DECIDED) {
decideCallback.onRequestOrdered(instance.getId(), instance);
}
}
storage.updateFirstUncommitted(); |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| For statement |
| If statement |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
}
storage.updateFirstUncommitted();
}
<<<<<<< HEAD
private void createAndStartClientManager(Paxos paxos) {
IdGenerator idGenerator = createIdGenerator();
int clientPort = ProcessDescriptor.getInstance().getLocalProcess().getClientPort();
requestManager = new RequestManager(paxos, executedRequests);
try {
clientManager = new NioClientManager(clientPort, requestManager, idGenerator);
clientManager.start();
} catch (IOException e) {
throw new RuntimeException("Could not prepare the socket for clients! Aborting.");
}
}
private IdGenerator createIdGenerator() {
ProcessDescriptor descriptor = ProcessDescriptor.getInstance();
String generatorName = descriptor.clientIDGenerator;
if (generatorName.equals("TimeBased")) {
return new TimeBasedIdGenerator(descriptor.localId, descriptor.numReplicas);
}
if (generatorName.equals("Simple")) {
return new SimpleIdGenerator(descriptor.localId, descriptor.numReplicas);
}
throw new RuntimeException("Unknown id generator: " + generatorName +
". Valid options: {TimeBased, Simple}");
}
}
private class InnerDecideCallback implements ReplicaCallback {
/** Called by the paxos box when a new request is ordered. */
public void onRequestOrdered(int instance, Deque |
| Solution content |
|---|
}
storage.updateFirstUncommitted();
}
}
private class InnerSnapshotListener2 implements SnapshotListener2 { |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Annotation |
| Class signature |
| Comment |
| Method declaration |
| Chunk |
|---|
| Conflicting content |
|---|
logger.warning("Updating machine state from " + snapshot);
serviceProxy.updateToSnapshot(snapshot);
<<<<<<< HEAD
synchronized (decidedWaitingExecution) {
if (!decidedWaitingExecution.isEmpty()) {
if (decidedWaitingExecution.lastKey() < snapshot.getNextInstanceId()) {
decidedWaitingExecution.clear();
} else {
// TODO: JK: would tailMap free memory here? If so, it'd
// be
// more efficient
while (decidedWaitingExecution.firstKey() < snapshot.getNextInstanceId()) {
decidedWaitingExecution.pollFirstEntry();
}
}
}
}
=======
decideCallback.atRestoringStateFromSnapshot(snapshot.getNextInstanceId());
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
executedRequests.clear();
executedDifference.clear(); |
| Solution content |
|---|
logger.warning("Updating machine state from " + snapshot);
serviceProxy.updateToSnapshot(snapshot);
decideCallback.atRestoringStateFromSnapshot(snapshot.getNextInstanceId());
executedRequests.clear();
executedDifference.clear(); |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Synchronized statement |
| Chunk |
|---|
| Conflicting content |
|---|
final Object snapshotLock = new Object();
synchronized (snapshotLock) {
<<<<<<< HEAD
AfterCatchupSnapshotEvent event;
event = paxos.dispatchAfterCatchupSnapshotEvent(snapshot, snapshotLock);
=======
AfterCatchupSnapshotEvent event = new
AfterCatchupSnapshotEvent(snapshot,
paxos.getStorage(), snapshotLock);
paxos.getDispatcher().submit(event);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
try {
while (!event.isFinished()) { |
| Solution content |
|---|
final Object snapshotLock = new Object();
synchronized (snapshotLock) {
AfterCatchupSnapshotEvent event = new
AfterCatchupSnapshotEvent(snapshot,
paxos.getStorage(), snapshotLock);
paxos.getDispatcher().submit(event);
try {
while (!event.isFinished()) { |
| File |
|---|
| Replica.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
import java.util.List; import java.util.Map; import java.util.Queue; <<<<<<< HEAD import java.util.Vector; ======= >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import java.util.logging.Level; import java.util.logging.Logger; |
| Solution content |
|---|
import java.util.List; import java.util.Map; import java.util.Queue; import java.util.logging.Level; import java.util.logging.Logger; |
| File |
|---|
| ServiceProxy.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
import lsr.common.ClientRequest; import lsr.common.Pair; import lsr.common.Reply; <<<<<<< HEAD import lsr.common.ClientRequest; ======= >>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11 import lsr.common.SingleThreadDispatcher; import lsr.paxos.Snapshot; import lsr.service.Service; |
| Solution content |
|---|
import lsr.common.ClientRequest; import lsr.common.Pair; import lsr.common.Reply; import lsr.common.SingleThreadDispatcher; import lsr.paxos.Snapshot; import lsr.service.Service; |
| File |
|---|
| ServiceProxy.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Import |
| Chunk |
|---|
| Conflicting content |
|---|
currentRequest = request;
long nanos = 0;
if (logger.isLoggable(Level.FINE)) {
<<<<<<< HEAD
logger.fine("Passing request to be executed to service: " + request + " as " + (nextSeqNo - 1));
=======
logger.fine("Passing request to be executed to service: " + request + " as " +
(nextSeqNo - 1));
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
nanos = System.nanoTime();
}
byte[] result = service.execute(request.getValue(), nextSeqNo - 1); |
| Solution content |
|---|
currentRequest = request;
long nanos = 0;
if (logger.isLoggable(Level.FINE)) {
logger.fine("Passing request to be executed to service: " + request + " as " +
(nextSeqNo - 1));
nanos = System.nanoTime();
}
byte[] result = service.execute(request.getValue(), nextSeqNo - 1); |
| File |
|---|
| ServiceProxy.java |
| Developer's decision |
|---|
| Version 1 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
lastSnapshotNextSeqNo);
}
if (nextRequestSeqNo > nextSeqNo) {
<<<<<<< HEAD
logger.warning("The snapshot marked as newer than current state. " +
"nextRequestSeqNo: " + nextRequestSeqNo + ", nextSeqNo: " +
nextSeqNo);
throw new IllegalArgumentException(
"The snapshot marked as newer than current state. " +
"nextRequestSeqNo: " + nextRequestSeqNo + ", nextSeqNo: "
+ nextSeqNo);
=======
throw new IllegalArgumentException(
"The snapshot marked as newer than current state. " +
"nextRequestSeqNo: " + nextRequestSeqNo + ", nextSeqNo: " +
nextSeqNo);
}
if (logger.isLoggable(Level.INFO)) {
logger.info("Snapshot up to: " + nextRequestSeqNo);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
truncateStartingSeqNo(nextRequestSeqNo); |
| Solution content |
|---|
lastSnapshotNextSeqNo);
}
if (nextRequestSeqNo > nextSeqNo) {
throw new IllegalArgumentException(
"The snapshot marked as newer than current state. " +
"nextRequestSeqNo: " + nextRequestSeqNo + ", nextSeqNo: " +
nextSeqNo);
}
if (logger.isLoggable(Level.INFO)) {
logger.info("Snapshot up to: " + nextRequestSeqNo);
}
truncateStartingSeqNo(nextRequestSeqNo); |
| File |
|---|
| ServiceProxy.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| If statement |
| Method invocation |
| Throw statement |
| Chunk |
|---|
| Conflicting content |
|---|
input.readFully(value);
}
<<<<<<< HEAD
=======
onValueChange();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
assertInvariant();
}
|
| Solution content |
|---|
input.readFully(value);
}
onValueChange();
assertInvariant();
}
|
| File |
|---|
| ConsensusInstance.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
private void assertInvariant() {
// If value is non null, the state must be either Decided or Known.
// If value is null, it must be unknown
<<<<<<< HEAD
assert (value != null && state != LogEntryState.UNKNOWN) ||
(value == null && state == LogEntryState.UNKNOWN) : "Invalid state. Value=" +
value + " state " + state;
=======
assert value == null ^ state != LogEntryState.UNKNOWN : "Invalid state. Value=" + value +
": " + toString();
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
/** |
| Solution content |
|---|
private void assertInvariant() {
// If value is non null, the state must be either Decided or Known.
// If value is null, it must be unknown
assert value == null ^ state != LogEntryState.UNKNOWN : "Invalid state. Value=" + value +
": " + toString();
}
/** |
| File |
|---|
| ConsensusInstance.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Assert statement |
| Chunk |
|---|
| Conflicting content |
|---|
if (state == LogEntryState.UNKNOWN) {
state = LogEntryState.KNOWN;
<<<<<<< HEAD
} else if (state == LogEntryState.DECIDED && !Arrays.equals(this.value, value)) {
throw new RuntimeException("Cannot change values on a decided instance: " + this);
}
if (view > this.view) {
// Higher view value. Accept any value.
this.view = view;
} else {
assert this.view == view;
// Same view. Accept a value only if the current value is null
// or if the current value is equal to the new value.
assert this.value == null || Arrays.equals(value, this.value) : "Different value for the same view. " +
"View: " +
view +
", current value: " +
this.value +
", new value: " + value;
}
=======
}
setView(view);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
this.value = value;
|
| Solution content |
|---|
if (state == LogEntryState.UNKNOWN) {
state = LogEntryState.KNOWN;
}
setView(view);
this.value = value;
|
| File |
|---|
| ConsensusInstance.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| If statement |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
return accepts;
}
<<<<<<< HEAD
public boolean acceptedByMajority() {
return accepts.cardinality() > (ProcessDescriptor.getInstance().numReplicas / 2);
=======
/** Returns if the instances is accepted by the majority */
public boolean isMajority() {
return accepts.cardinality() >= processDescriptor.majority;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
/** |
| Solution content |
|---|
return accepts;
}
/** Returns if the instances is accepted by the majority */
public boolean isMajority() {
return accepts.cardinality() >= processDescriptor.majority;
}
/** |
| File |
|---|
| ConsensusInstance.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Method signature |
| Return statement |
| Chunk |
|---|
| Conflicting content |
|---|
}
public boolean isWindowFull() {
<<<<<<< HEAD
return getWindowUsed() >= ProcessDescriptor.getInstance().windowSize;
=======
return getWindowUsed() == processDescriptor.windowSize;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
}
public boolean isIdle() { |
| Solution content |
|---|
}
public boolean isWindowFull() {
return getWindowUsed() == processDescriptor.windowSize;
}
public boolean isIdle() { |
| File |
|---|
| InMemoryStorage.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Return statement |
| Chunk |
|---|
| Conflicting content |
|---|
*/
boolean isInWindow(int instanceId);
<<<<<<< HEAD
/** Returns number of parallel instances that are voted currently */
int getWindowUsed();
/**
* returns true if the window is full, i.e. we reached maximum number of
=======
/** Number of instances from lowest not yet decided to highest known */
int getWindowUsed();
/**
* returns true if the window is full, i.e., we reached maximum number of
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
* open parallel instances
*/
boolean isWindowFull(); |
| Solution content |
|---|
*/
boolean isInWindow(int instanceId);
/** Number of instances from lowest not yet decided to highest known */
int getWindowUsed();
/**
* returns true if the window is full, i.e., we reached maximum number of
* open parallel instances
*/
boolean isWindowFull(); |
| File |
|---|
| Storage.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method interface |
| Chunk |
|---|
| Conflicting content |
|---|
public static void main(String[] args) throws Exception {
int localId = Integer.parseInt(args[0]);
Configuration config = new Configuration();
<<<<<<< HEAD
String logPath = config.getProperty(ProcessDescriptor.LOG_PATH,
ProcessDescriptor.DEFAULT_LOG_PATH);
DigestService service = new DigestService(localId, logPath);
=======
DigestService service = new DigestService(localId);
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11
Replica replica = new Replica(config, localId, service);
service.initLogFile(processDescriptor.logPath);
replica.start(); |
| Solution content |
|---|
public static void main(String[] args) throws Exception {
int localId = Integer.parseInt(args[0]);
Configuration config = new Configuration();
DigestService service = new DigestService(localId);
Replica replica = new Replica(config, localId, service);
service.initLogFile(processDescriptor.logPath);
replica.start(); |
| File |
|---|
| DigestService.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
Integer count;
count = sends.take();
for (int i = 0; i < count; i++) {
<<<<<<< HEAD:src/lsr/paxos/test/MultiClient.java
// byte[] request = requestGenerator.generate();
if (Thread.interrupted()) {
return;
=======
if (randomRequests)
rnd.nextBytes(request);
if (Thread.interrupted()) {
break;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/test/GenericMultiClient.java
}
@SuppressWarnings("unused")
byte[] response; |
| Solution content |
|---|
Integer count;
count = sends.take();
for (int i = 0; i < count; i++) {
if (randomRequests)
rnd.nextBytes(request);
if (Thread.interrupted()) {
break;
}
@SuppressWarnings("unused")
byte[] response; |
| File |
|---|
| GenericMultiClient.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Break statement |
| Comment |
| If statement |
| Return statement |
| Chunk |
|---|
| Conflicting content |
|---|
if (args[0].equals("kill")) {
for (ClientThread client : clients)
client.interrupt();
<<<<<<< HEAD:src/lsr/paxos/test/MultiClient.java
}
System.exit(1);
break;
=======
continue;
>>>>>>> 19d52295bd105caff93ed24fb5594ac344414f11:src/lsr/paxos/test/GenericMultiClient.java
}
if (args.length < 2) { |
| Solution content |
|---|
if (args[0].equals("kill")) {
for (ClientThread client : clients)
client.interrupt();
continue;
}
if (args.length < 2) { |
| File |
|---|
| GenericMultiClient.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Break statement |
| Continue statement |
| Method invocation |