| Chunk |
|---|
| Conflicting content |
|---|
// assigns it to healthy nodes
while (iter.hasNext()) {
workerID = iter.next();
<<<<<<< HEAD
workerStateFile = checkpointDir + File.separator + workerID;
=======
workerStateFile = checkpointDir + File.separator + workerID + "_" + this.master.getLastCheckpointedSuperstep();
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
workerData = (WorkerData) GeneralUtils.deserialize(workerStateFile);
assignRecoveredPartitions(workerID, workerData);
} |
| Solution content |
|---|
// assigns it to healthy nodes
while (iter.hasNext()) {
workerID = iter.next();
workerStateFile = checkpointDir + File.separator + workerID + "_" + this.master.getLastCheckpointedSuperstep();
workerData = (WorkerData) GeneralUtils.deserialize(workerStateFile);
assignRecoveredPartitions(workerID, workerData);
} |
| File |
|---|
| HealthManager.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Attribute |
| Method invocation |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
// Choose a random worker from the map and assign the partition to
// it.
WorkerProxy workerProxy = (WorkerProxy) workerProxyCollection[index];
<<<<<<< HEAD
System.out.println("Assigning " + partition + " to "
+ workerProxy.getWorkerID());
=======
System.out.println("Assigning " + partition.getPartitionID() + " to " + workerProxy.getWorkerID() );
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
partitionWorkerMap.put(partition.getPartitionID(),
workerProxy.getWorkerID());
// If the dead worker was active during checkpointing, add the |
| Solution content |
|---|
// Choose a random worker from the map and assign the partition to
// it.
WorkerProxy workerProxy = (WorkerProxy) workerProxyCollection[index];
System.out.println("Assigning " + partition.getPartitionID() + " to " + workerProxy.getWorkerID() );
partitionWorkerMap.put(partition.getPartitionID(),
workerProxy.getWorkerID());
// If the dead worker was active during checkpointing, add the |
| File |
|---|
| HealthManager.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
/** * Check point. <<<<<<< HEAD * * @throws Exception * the exception ======= * * @param superstep the superstep * @throws Exception the exception >>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a */ public void checkPoint(long superstep) throws Exception; |
| Solution content |
|---|
/** * Check point. * * @param superstep the superstep * @throws Exception the exception */ public void checkPoint(long superstep) throws Exception; |
| File |
|---|
| Worker.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
/** The super step counter. */
private long superstep = 0;
<<<<<<< HEAD
/** The location of checkpoint directory */
=======
/** The checkpoint file that points to the latest checkpoint */
private String nextCheckpointFile;
/** The checkpoint file that points to the last successful checkpoint (among all Workers). */
private String currentCheckpointFile;
/** The CHECKPOINTIN g_ directory. */
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
private static String CHECKPOINTING_DIRECTORY;
static { |
| Solution content |
|---|
/** The super step counter. */
private long superstep = 0;
/** The checkpoint file that points to the latest checkpoint */
private String nextCheckpointFile;
/**
* The checkpoint file that points to the last successful checkpoint (among
* all Workers).
*/
private String currentCheckpointFile;
/** The CHECKPOINT directory. */
private static String CHECKPOINTING_DIRECTORY;
static { |
| File |
|---|
| WorkerImpl.java |
| Developer's decision |
|---|
| Manual |
| Kind of conflict |
|---|
| Attribute |
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
/**
* Check and send message.
<<<<<<< HEAD
*
* @throws RemoteException
=======
*
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
*/
private synchronized void checkAndSendMessage() {
// System.out.println(this + "sendingMessage: " + sendingMessage + |
| Solution content |
|---|
/**
* Check and send message. <<<<<<< HEAD
*
* @throws RemoteException
* =======
*
* >>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
*/
private synchronized void checkAndSendMessage() {
// System.out.println(this + "sendingMessage: " + sendingMessage + |
| File |
|---|
| WorkerImpl.java |
| Developer's decision |
|---|
| Manual |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
if (!stopSendingMessage
&& (nextPartitionQueue.size() == totalPartitionsAssigned)) {
stopSendingMessage = true;
<<<<<<< HEAD
// System.out.println(this + "WorkerImpl: checkAndSendMessage "
// + superstep);
=======
System.out.println(this + " WorkerImpl: Superstep " + superstep + " completed.");
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
startSuperStep = false;
for (Entry |
| Solution content |
|---|
if (!stopSendingMessage
&& (nextPartitionQueue.size() == totalPartitionsAssigned)) {
stopSendingMessage = true;
System.out.println(this + " WorkerImpl: Superstep " + superstep
+ " completed.");
startSuperStep = false;
for (Entry |
| File |
|---|
| WorkerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Comment |
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
System.out.println("Worker Machine " + workerID + " halts");
this.restoreInitialState();
}
<<<<<<< HEAD
/**
* Restores the worker to the initial state
*/
private void restoreInitialState() {
=======
/**
* Restore initial state.
*/
private void restoreInitialState(){
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
this.nextPartitionQueue.clear();
this.currentIncomingMessages.clear();
this.outgoingMessages.clear(); |
| Solution content |
|---|
System.out.println("Worker Machine " + workerID + " halts");
this.restoreInitialState();
}
/**
* Restore the worker to the initial state
*/
private void restoreInitialState() {
this.nextPartitionQueue.clear();
this.currentIncomingMessages.clear();
this.outgoingMessages.clear(); |
| File |
|---|
| WorkerImpl.java |
| Developer's decision |
|---|
| Manual |
| Kind of conflict |
|---|
| Comment |
| Method signature |
| Chunk |
|---|
| Conflicting content |
|---|
/** * Sets the worker partition info. <<<<<<< HEAD * * @param totalPartitionsAssigned * the total partitions assigned * @param mapPartitionIdToWorkerId * the map partition id to worker id * @param mapWorkerIdToWorker * the map worker id to worker * @throws RemoteException ======= * * @param totalPartitionsAssigned the total partitions assigned * @param mapPartitionIdToWorkerId the map partition id to worker id * @param mapWorkerIdToWorker the map worker id to worker * @throws RemoteException the remote exception >>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a */ public void setWorkerPartitionInfo(int totalPartitionsAssigned, Map |
| Solution content |
|---|
/** * Sets the worker partition info. <<<<<<< HEAD * * @param totalPartitionsAssigned * the total partitions assigned * @param mapPartitionIdToWorkerId * the map partition id to worker id * @param mapWorkerIdToWorker * the map worker id to worker * @throws RemoteException * ======= * * @param totalPartitionsAssigned * the total partitions assigned * @param mapPartitionIdToWorkerId * the map partition id to worker id * @param mapWorkerIdToWorker * the map worker id to worker * @throws RemoteException * the remote exception >>>>>>> * 42b91fb45356bdb8ce40222761cb75525693696a */ public void setWorkerPartitionInfo(int totalPartitionsAssigned, Map |
| File |
|---|
| WorkerImpl.java |
| Developer's decision |
|---|
| Manual |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
/** * Receive message. <<<<<<< HEAD * * @param incomingMessages * the incoming messages ======= * * @param incomingMessages the incoming messages * @throws RemoteException the remote exception >>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a */ public void receiveMessage(Map |
| Solution content |
|---|
/** * Receive message. <<<<<<< HEAD * * @param incomingMessages * the incoming messages ======= * * @param incomingMessages * the incoming messages * @throws RemoteException * the remote exception >>>>>>> * 42b91fb45356bdb8ce40222761cb75525693696a */ public void receiveMessage(Map |
| File |
|---|
| WorkerImpl.java |
| Developer's decision |
|---|
| Manual |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
} /** <<<<<<< HEAD * * Sets the initial message for the Worker that has the source vertex. * * @param initialMessage * the initial message ======= * Sets the initial message for the Worker that has the source vertex. * * @param initialMessage the initial message * @throws RemoteException the remote exception >>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a */ public void setInitialMessage( ConcurrentHashMap |
| Solution content |
|---|
} /** * <<<<<<< HEAD * * Sets the initial message for the Worker that has the source vertex. * * @param initialMessage * the initial message ======= Sets the initial message for the * Worker that has the source vertex. * * @param initialMessage * the initial message * @throws RemoteException * the remote exception >>>>>>> * 42b91fb45356bdb8ce40222761cb75525693696a */ public void setInitialMessage( ConcurrentHashMap |
| File |
|---|
| WorkerImpl.java |
| Developer's decision |
|---|
| Manual |
| Kind of conflict |
|---|
| Comment |
| Chunk |
|---|
| Conflicting content |
|---|
this.currentIncomingMessages = initialMessage;
}
<<<<<<< HEAD
/**
* Check point.
*
* @throws Exception
* the exception
*/
@Override
public void checkPoint() throws Exception {
System.out.println(this + " WorkerImpl: checkPoint");
WorkerData wd = new WorkerData(this.nextPartitionQueue,
this.currentIncomingMessages);
// Serialization
// System.out.println("Checkpointing WorkerData " + wd + " for " +
// workerID);
String filePath = CHECKPOINTING_DIRECTORY + File.separator + workerID;
GeneralUtils.serialize(filePath, wd);
=======
/* (non-Javadoc)
* @see system.Worker#checkPoint(long)
*/
@Override
public void checkPoint(long superstep) throws Exception{
System.out.println("WorkerImpl: checkPoint " + superstep);
this.superstep = superstep;
WorkerData wd = new WorkerData(
this.nextPartitionQueue,
this.currentIncomingMessages
);
// Serialization
// Don't update the currentCheckpointFile until the Master confirms that the checkpointing had succeeded in all the Workers.
this.nextCheckpointFile = CHECKPOINTING_DIRECTORY + File.separator + workerID + "_" + superstep;
// String newFilePath = CHECKPOINTING_DIRECTORY + File.separator + workerID;
GeneralUtils.serialize(this.nextCheckpointFile, wd);
//nextCheckpointFile = tmpFilePath;
// GeneralUtils.renameFile(tmpFilePath, newFilePath);
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
}
|
| Solution content |
|---|
this.currentIncomingMessages = initialMessage;
}
/*
* (non-Javadoc)
*
* @see system.Worker#checkPoint(long)
*/
@Override
public void checkPoint(long superstep) throws Exception {
System.out.println("WorkerImpl: checkPoint " + superstep);
this.superstep = superstep;
WorkerData wd = new WorkerData(this.nextPartitionQueue,
this.currentIncomingMessages);
// Serialization
// Don't update the currentCheckpointFile until the Master confirms that
// the checkpointing had succeeded in all the Workers.
this.nextCheckpointFile = CHECKPOINTING_DIRECTORY + File.separator
+ workerID + "_" + superstep;
// String newFilePath = CHECKPOINTING_DIRECTORY + File.separator +
// workerID;
GeneralUtils.serialize(this.nextCheckpointFile, wd);
// nextCheckpointFile = tmpFilePath;
// GeneralUtils.renameFile(tmpFilePath, newFilePath);
}
|
| File |
|---|
| WorkerImpl.java |
| Developer's decision |
|---|
| Manual |
| Kind of conflict |
|---|
| Annotation |
| Attribute |
| Comment |
| Method invocation |
| Method signature |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
}
/* (non-Javadoc)
this.outgoingMessages.clear();
WorkerData workerData;
<<<<<<< HEAD
String checkpointDir;
try {
checkpointDir = Props.getInstance().getStringProperty(
"CHECKPOINT_DIR");
String workerStateFile = checkpointDir + File.separator + workerID;
workerData = (WorkerData) GeneralUtils.deserialize(workerStateFile);
this.currentIncomingMessages = (ConcurrentHashMap |
| Solution content |
|---|
this.outgoingMessages.clear();
WorkerData workerData;
String checkpointDir;
try {
checkpointDir = Props.getInstance().getStringProperty(
"CHECKPOINT_DIR");
String workerStateFile = checkpointDir + File.separator + workerID;
workerData = (WorkerData) GeneralUtils.deserialize(workerStateFile);
this.currentIncomingMessages = (ConcurrentHashMap |
| File |
|---|
| WorkerImpl.java |
| Developer's decision |
|---|
| Manual |
| Kind of conflict |
|---|
| Attribute |
| Cast expression |
| Comment |
| Try statement |
| Variable |
| Chunk |
|---|
| Conflicting content |
|---|
public void finishRecovery() throws RemoteException {
System.out.println("WorkerImpl: finishRecovery");
try {
<<<<<<< HEAD
checkPoint();
=======
checkPoint(this.masterProxy.getCheckpointedSuperstep());
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
} catch (Exception e) {
System.out.println("checkpoint failure");
throw new RemoteException(); |
| Solution content |
|---|
public void finishRecovery() throws RemoteException {
System.out.println("WorkerImpl: finishRecovery");
try {
checkPoint(this.masterProxy.getCheckpointedSuperstep());
} catch (Exception e) {
System.out.println("checkpoint failure");
throw new RemoteException(); |
| File |
|---|
| WorkerImpl.java |
| Developer's decision |
|---|
| Version 2 |
| Kind of conflict |
|---|
| Method invocation |
| Chunk |
|---|
| Conflicting content |
|---|
} } <<<<<<< HEAD /** * Adds the recovered partition data to this worker * * @param partition * Represents the set of vertices * @param messages * Represents the map of vertes with its associated messages */ public void addRecoveredData(Partition partition, Map |
| Solution content |
|---|
} } /* * (non-Javadoc) * * @see system.Worker#addRecoveredData(system.Partition, java.util.Map) */ public void addRecoveredData(Partition partition, Map |
| File |
|---|
| WorkerImpl.java |
| Developer's decision |
|---|
| Manual |
| Kind of conflict |
|---|
| Comment |
| Method signature |