Projects >> Pregel >>08a6060a02cc76e7d3f068a4b6e12379a8f6a09d

Chunk
Conflicting content
		// assigns it to healthy nodes
		while (iter.hasNext()) {
			workerID = iter.next();
<<<<<<< HEAD
			workerStateFile = checkpointDir + File.separator + workerID;
=======
			workerStateFile = checkpointDir + File.separator + workerID + "_" + this.master.getLastCheckpointedSuperstep();		
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
			workerData = (WorkerData) GeneralUtils.deserialize(workerStateFile);
			assignRecoveredPartitions(workerID, workerData);
		}
Solution content
		// assigns it to healthy nodes
		while (iter.hasNext()) {
			workerID = iter.next();
			workerStateFile = checkpointDir + File.separator + workerID + "_" + this.master.getLastCheckpointedSuperstep();	
			workerData = (WorkerData) GeneralUtils.deserialize(workerStateFile);
			assignRecoveredPartitions(workerID, workerData);
		}
File
HealthManager.java
Developer's decision
Version 2
Kind of conflict
Attribute
Method invocation
Variable
Chunk
Conflicting content
			// Choose a random worker from the map and assign the partition to
			// it.
			WorkerProxy workerProxy = (WorkerProxy) workerProxyCollection[index];
<<<<<<< HEAD
			System.out.println("Assigning " + partition + " to "
					+ workerProxy.getWorkerID());
=======
			System.out.println("Assigning " + partition.getPartitionID() + " to " + workerProxy.getWorkerID() );
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
			partitionWorkerMap.put(partition.getPartitionID(),
					workerProxy.getWorkerID());
			// If the dead worker was active during checkpointing, add the
Solution content
			// Choose a random worker from the map and assign the partition to
			// it.
			WorkerProxy workerProxy = (WorkerProxy) workerProxyCollection[index];
			System.out.println("Assigning " + partition.getPartitionID() + " to " + workerProxy.getWorkerID() );
			partitionWorkerMap.put(partition.getPartitionID(),
					workerProxy.getWorkerID());
			// If the dead worker was active during checkpointing, add the
File
HealthManager.java
Developer's decision
Version 2
Kind of conflict
Method invocation
Chunk
Conflicting content
	/**
	 * Check point.
<<<<<<< HEAD
	 * 
	 * @throws Exception
	 *             the exception
=======
	 *
	 * @param superstep the superstep
	 * @throws Exception the exception
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
	 */
	public void checkPoint(long superstep) throws Exception;
Solution content
	/**
	 * Check point.
	 * 
	 * @param superstep the superstep
	 * @throws Exception the exception
	 */
	public void checkPoint(long superstep) throws Exception;
File
Worker.java
Developer's decision
Version 2
Kind of conflict
Comment
Chunk
Conflicting content
	/** The super step counter. */
	private long superstep = 0;

<<<<<<< HEAD
	/** The location of checkpoint directory */
=======
	/** The checkpoint file that points to the latest checkpoint */
	private String nextCheckpointFile;
	
	/** The checkpoint file that points to the last successful checkpoint (among all Workers). */
	private String currentCheckpointFile;
	
	/** The CHECKPOINTIN g_ directory. */
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
	private static String CHECKPOINTING_DIRECTORY;

	static {
Solution content
	/** The super step counter. */
	private long superstep = 0;

	/** The checkpoint file that points to the latest checkpoint */
	private String nextCheckpointFile;

	/**
	 * The checkpoint file that points to the last successful checkpoint (among
	 * all Workers).
	 */
	private String currentCheckpointFile;

	/** The CHECKPOINT directory. */
	private static String CHECKPOINTING_DIRECTORY;

	static {
File
WorkerImpl.java
Developer's decision
Manual
Kind of conflict
Attribute
Comment
Chunk
Conflicting content
		/**
		 * Check and send message.
<<<<<<< HEAD
		 * 
		 * @throws RemoteException
=======
		 *
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
		 */
		private synchronized void checkAndSendMessage() {
			// System.out.println(this + "sendingMessage: " + sendingMessage +
Solution content
		/**
		 * Check and send message. <<<<<<< HEAD
		 * 
		 * @throws RemoteException
		 *             =======
		 * 
		 *             >>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
		 */
		private synchronized void checkAndSendMessage() {
			// System.out.println(this + "sendingMessage: " + sendingMessage +
File
WorkerImpl.java
Developer's decision
Manual
Kind of conflict
Comment
Chunk
Conflicting content
			if (!stopSendingMessage
					&& (nextPartitionQueue.size() == totalPartitionsAssigned)) {
				stopSendingMessage = true;
<<<<<<< HEAD
				// System.out.println(this + "WorkerImpl: checkAndSendMessage "
				// + superstep);
=======
				System.out.println(this + " WorkerImpl: Superstep " + superstep + " completed.");
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
				startSuperStep = false;
				for (Entry>> entry : outgoingMessages
						.entrySet()) {
Solution content
			if (!stopSendingMessage
					&& (nextPartitionQueue.size() == totalPartitionsAssigned)) {
				stopSendingMessage = true;
				System.out.println(this + " WorkerImpl: Superstep " + superstep
						+ " completed.");
				startSuperStep = false;
				for (Entry>> entry : outgoingMessages
						.entrySet()) {
File
WorkerImpl.java
Developer's decision
Version 2
Kind of conflict
Comment
Method invocation
Chunk
Conflicting content
		System.out.println("Worker Machine " + workerID + " halts");
		this.restoreInitialState();
	}
<<<<<<< HEAD

	/**
	 * Restores the worker to the initial state
	 */
	private void restoreInitialState() {
=======
	
	/**
	 * Restore initial state.
	 */
	private void restoreInitialState(){
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
		this.nextPartitionQueue.clear();
		this.currentIncomingMessages.clear();
		this.outgoingMessages.clear();
Solution content
		System.out.println("Worker Machine " + workerID + " halts");
		this.restoreInitialState();
	}

	/**
	 * Restore the worker to the initial state
	 */
	private void restoreInitialState() {
		this.nextPartitionQueue.clear();
		this.currentIncomingMessages.clear();
		this.outgoingMessages.clear();
File
WorkerImpl.java
Developer's decision
Manual
Kind of conflict
Comment
Method signature
Chunk
Conflicting content
	/**
	 * Sets the worker partition info.
<<<<<<< HEAD
	 * 
	 * @param totalPartitionsAssigned
	 *            the total partitions assigned
	 * @param mapPartitionIdToWorkerId
	 *            the map partition id to worker id
	 * @param mapWorkerIdToWorker
	 *            the map worker id to worker
	 * @throws RemoteException
=======
	 *
	 * @param totalPartitionsAssigned the total partitions assigned
	 * @param mapPartitionIdToWorkerId the map partition id to worker id
	 * @param mapWorkerIdToWorker the map worker id to worker
	 * @throws RemoteException the remote exception
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
	 */
	public void setWorkerPartitionInfo(int totalPartitionsAssigned,
			Map mapPartitionIdToWorkerId,
Solution content
	/**
	 * Sets the worker partition info. <<<<<<< HEAD
	 * 
	 * @param totalPartitionsAssigned
	 *            the total partitions assigned
	 * @param mapPartitionIdToWorkerId
	 *            the map partition id to worker id
	 * @param mapWorkerIdToWorker
	 *            the map worker id to worker
	 * @throws RemoteException
	 *             =======
	 * 
	 * @param totalPartitionsAssigned
	 *            the total partitions assigned
	 * @param mapPartitionIdToWorkerId
	 *            the map partition id to worker id
	 * @param mapWorkerIdToWorker
	 *            the map worker id to worker
	 * @throws RemoteException
	 *             the remote exception >>>>>>>
	 *             42b91fb45356bdb8ce40222761cb75525693696a
	 */
	public void setWorkerPartitionInfo(int totalPartitionsAssigned,
			Map mapPartitionIdToWorkerId,
File
WorkerImpl.java
Developer's decision
Manual
Kind of conflict
Comment
Chunk
Conflicting content
	/**
	 * Receive message.
<<<<<<< HEAD
	 * 
	 * @param incomingMessages
	 *            the incoming messages
=======
	 *
	 * @param incomingMessages the incoming messages
	 * @throws RemoteException the remote exception
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
	 */
	public void receiveMessage(Map> incomingMessages)
			throws RemoteException {
Solution content
	/**
	 * Receive message. <<<<<<< HEAD
	 * 
	 * @param incomingMessages
	 *            the incoming messages =======
	 * 
	 * @param incomingMessages
	 *            the incoming messages
	 * @throws RemoteException
	 *             the remote exception >>>>>>>
	 *             42b91fb45356bdb8ce40222761cb75525693696a
	 */
	public void receiveMessage(Map> incomingMessages)
			throws RemoteException {
File
WorkerImpl.java
Developer's decision
Manual
Kind of conflict
Comment
Chunk
Conflicting content
	}

	/**
<<<<<<< HEAD
	 * 
	 * Sets the initial message for the Worker that has the source vertex.
	 * 
	 * @param initialMessage
	 *            the initial message
=======
	 * Sets the initial message for the Worker that has the source vertex.
	 *
	 * @param initialMessage the initial message
	 * @throws RemoteException the remote exception
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
	 */
	public void setInitialMessage(
			ConcurrentHashMap>> initialMessage)
Solution content
	}

	/**
	 * <<<<<<< HEAD
	 * 
	 * Sets the initial message for the Worker that has the source vertex.
	 * 
	 * @param initialMessage
	 *            the initial message ======= Sets the initial message for the
	 *            Worker that has the source vertex.
	 * 
	 * @param initialMessage
	 *            the initial message
	 * @throws RemoteException
	 *             the remote exception >>>>>>>
	 *             42b91fb45356bdb8ce40222761cb75525693696a
	 */
	public void setInitialMessage(
			ConcurrentHashMap>> initialMessage)
File
WorkerImpl.java
Developer's decision
Manual
Kind of conflict
Comment
Chunk
Conflicting content
		this.currentIncomingMessages = initialMessage;
	}

<<<<<<< HEAD
	/**
	 * Check point.
	 * 
	 * @throws Exception
	 *             the exception
	 */
	@Override
	public void checkPoint() throws Exception {
		System.out.println(this + " WorkerImpl: checkPoint");

		WorkerData wd = new WorkerData(this.nextPartitionQueue,
				this.currentIncomingMessages);
		// Serialization
		// System.out.println("Checkpointing WorkerData " + wd + " for " +
		// workerID);
		String filePath = CHECKPOINTING_DIRECTORY + File.separator + workerID;
		GeneralUtils.serialize(filePath, wd);
=======
	/* (non-Javadoc)
	 * @see system.Worker#checkPoint(long)
	 */
	@Override
	public void checkPoint(long superstep) throws Exception{
		System.out.println("WorkerImpl: checkPoint " + superstep);
		this.superstep = superstep;
		WorkerData wd = new WorkerData(
				this.nextPartitionQueue, 
				this.currentIncomingMessages
				);
		// Serialization
		
		// Don't update the currentCheckpointFile until the Master confirms that the checkpointing had succeeded in all the Workers.
		this.nextCheckpointFile = CHECKPOINTING_DIRECTORY + File.separator + workerID + "_" + superstep;
		// String newFilePath = CHECKPOINTING_DIRECTORY + File.separator + workerID;
		GeneralUtils.serialize(this.nextCheckpointFile, wd);
		//nextCheckpointFile = tmpFilePath;
		// GeneralUtils.renameFile(tmpFilePath, newFilePath);
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
	}

	
Solution content
		this.currentIncomingMessages = initialMessage;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see system.Worker#checkPoint(long)
	 */
	@Override
	public void checkPoint(long superstep) throws Exception {
		System.out.println("WorkerImpl: checkPoint " + superstep);
		this.superstep = superstep;
		WorkerData wd = new WorkerData(this.nextPartitionQueue,
				this.currentIncomingMessages);
		// Serialization

		// Don't update the currentCheckpointFile until the Master confirms that
		// the checkpointing had succeeded in all the Workers.
		this.nextCheckpointFile = CHECKPOINTING_DIRECTORY + File.separator
				+ workerID + "_" + superstep;
		// String newFilePath = CHECKPOINTING_DIRECTORY + File.separator +
		// workerID;
		GeneralUtils.serialize(this.nextCheckpointFile, wd);
		// nextCheckpointFile = tmpFilePath;
		// GeneralUtils.renameFile(tmpFilePath, newFilePath);
	}
File
WorkerImpl.java
Developer's decision
Manual
Kind of conflict
Annotation
Attribute
Comment
Method invocation
Method signature
Variable
Chunk
Conflicting content
	}

	/* (non-Javadoc)
		this.outgoingMessages.clear();

		WorkerData workerData;
<<<<<<< HEAD
		String checkpointDir;
		try {
			checkpointDir = Props.getInstance().getStringProperty(
					"CHECKPOINT_DIR");
			String workerStateFile = checkpointDir + File.separator + workerID;
			workerData = (WorkerData) GeneralUtils.deserialize(workerStateFile);
			this.currentIncomingMessages = (ConcurrentHashMap>>) workerData
					.getMessages();
			this.nextPartitionQueue = (BlockingQueue) workerData
					.getPartitions();
			System.out.println("Restoring checkpointed data "
					+ this.nextPartitionQueue);
		} catch (PropertyNotFoundException p) {
			p.printStackTrace();
		}
=======
		// String checkpointDir;
		// checkpointDir = Props.getInstance().getStringProperty("CHECKPOINT_DIR");
        // String workerStateFile = checkpointDir + File.separator + workerID;
		workerData = (WorkerData)GeneralUtils.deserialize(this.currentCheckpointFile);
		this.currentIncomingMessages = (ConcurrentHashMap>>)workerData.getMessages();
		this.nextPartitionQueue = (BlockingQueue)workerData.getPartitions();
		// System.out.println("Restoring checkpointed data " + this.nextPartitionQueue);			
	
				
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
Solution content
		this.outgoingMessages.clear();

		WorkerData workerData;
		String checkpointDir;
		try {
			checkpointDir = Props.getInstance().getStringProperty(
					"CHECKPOINT_DIR");
			String workerStateFile = checkpointDir + File.separator + workerID;
			workerData = (WorkerData) GeneralUtils.deserialize(workerStateFile);
			this.currentIncomingMessages = (ConcurrentHashMap>>) workerData
					.getMessages();
			this.nextPartitionQueue = (BlockingQueue) workerData
					.getPartitions();
			System.out.println("Restoring checkpointed data "
					+ this.nextPartitionQueue);
		} catch (PropertyNotFoundException p) {
			p.printStackTrace();
		}
		// String checkpointDir;
		// checkpointDir =
		// Props.getInstance().getStringProperty("CHECKPOINT_DIR");
		// String workerStateFile = checkpointDir + File.separator + workerID;
		workerData = (WorkerData) GeneralUtils
				.deserialize(this.currentCheckpointFile);
		this.currentIncomingMessages = (ConcurrentHashMap>>) workerData
				.getMessages();
		this.nextPartitionQueue = (BlockingQueue) workerData
				.getPartitions();
		// System.out.println("Restoring checkpointed data " +
		// this.nextPartitionQueue);
	}
File
WorkerImpl.java
Developer's decision
Manual
Kind of conflict
Attribute
Cast expression
Comment
Try statement
Variable
Chunk
Conflicting content
	public void finishRecovery() throws RemoteException {
		System.out.println("WorkerImpl: finishRecovery");
		try {
<<<<<<< HEAD
			checkPoint();
=======
			 checkPoint(this.masterProxy.getCheckpointedSuperstep());
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
		} catch (Exception e) {
			System.out.println("checkpoint failure");
			throw new RemoteException();
Solution content
	public void finishRecovery() throws RemoteException {
		System.out.println("WorkerImpl: finishRecovery");
		try {
			checkPoint(this.masterProxy.getCheckpointedSuperstep());
		} catch (Exception e) {
			System.out.println("checkpoint failure");
			throw new RemoteException();
File
WorkerImpl.java
Developer's decision
Version 2
Kind of conflict
Method invocation
Chunk
Conflicting content
		}
	}

<<<<<<< HEAD
	/**
	 * Adds the recovered partition data to this worker
	 * 
	 * @param partition
	 *            Represents the set of vertices
	 * @param messages
	 *            Represents the map of vertes with its associated messages
	 */
	public void addRecoveredData(Partition partition,
			Map> messages) throws RemoteException {
=======
	/* (non-Javadoc)
	 * @see system.Worker#addRecoveredData(system.Partition, java.util.Map)
	 */
	public void addRecoveredData(Partition partition, Map> messages) throws RemoteException {
>>>>>>> 42b91fb45356bdb8ce40222761cb75525693696a
		System.out.println("WorkerImpl: addRecoveredData");
		// System.out.println("Partition " + partition.getPartitionID());
		// System.out.println("Messages: " + messages);
Solution content
		}
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see system.Worker#addRecoveredData(system.Partition, java.util.Map)
	 */
	public void addRecoveredData(Partition partition,
			Map> messages) throws RemoteException {
		System.out.println("WorkerImpl: addRecoveredData");
		// System.out.println("Partition " + partition.getPartitionID());
		// System.out.println("Messages: " + messages);
File
WorkerImpl.java
Developer's decision
Manual
Kind of conflict
Comment
Method signature