Projects >> ambrose >>cf9d2a763603568b7c290dc7817557ad93538033

Chunk
Conflicting content
   */
  @Override
  public void jobStartedNotification(String scriptId, String assignedJobId) {
<<<<<<< HEAD
    log.info("jobStartedNotification - scriptId " + scriptId + "jobId " + assignedJobId);

=======
>>>>>>> e5f05a204f50f43d0050ea3704837fb6efe560b9
    // for each job in the graph, check if the stats for a job with this name is found. If so, look
    // up it's scope and bind the jobId to the DAGNode with the same scope.
    for (JobStats jobStats : jobGraph) {
Solution content
   */
  @Override
  public void jobStartedNotification(String scriptId, String assignedJobId) {
    log.info("jobStartedNotification - scriptId " + scriptId + "jobId " + assignedJobId);

    // for each job in the graph, check if the stats for a job with this name is found. If so, look
    // up it's scope and bind the jobId to the DAGNode with the same scope.
    for (JobStats jobStats : jobGraph) {
File
AmbrosePigProgressNotificationListener.java
Developer's decision
Version 1
Kind of conflict
Method invocation
Chunk
Conflicting content
   */
  @Override
  public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
<<<<<<< HEAD
    Workflow workflow = new Workflow(scriptId, workflowVersion, jobs);
    try {
      outputStatsData(workflow);
    } catch (IOException e) {
      log.error("Exception outputting workflow", e);
=======
    if (workflowVersion == null) {
      log.warn("scriptFingerprint not set for this script - not saving stats." );
    } else {
      Workflow workflow = new Workflow(scriptId, workflowVersion, jobs);

      try {
        outputStatsData(workflow);
      } catch (IOException e) {
        log.error("Exception outputting workflow", e);
      }
>>>>>>> e5f05a204f50f43d0050ea3704837fb6efe560b9
    }
  }
Solution content
   */
  @Override
  public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
    Workflow workflow = new Workflow(scriptId, workflowVersion, jobs);
    try {
      outputStatsData(workflow);
    } catch (IOException e) {
      log.error("Exception outputting workflow", e);
    }
  }
File
AmbrosePigProgressNotificationListener.java
Developer's decision
Version 1
Kind of conflict
Catch clause
If statement
Method invocation
Try statement
Variable
Chunk
Conflicting content
    }
    }
  }

<<<<<<< HEAD
=======
  private void pushEvent(String scriptId, Event event) {
    try {
      statsWriteService.pushEvent(scriptId, event);
    } catch (IOException e) {
      log.error("Couldn't send event to StatsWriteService", e);
    }
  }

  @SuppressWarnings("deprecation")
  private void addMapReduceJobState(PigJob pigJob) {
    JobClient jobClientLocal = getJobClient();

    try {
      String id = pigJob.getId();
      RunningJob runningJob = null;
      try {
    	  runningJob = jobClientLocal.getJob(id);
      } catch (Exception e) {
    	  log.warn("Exception while querying job status for jobId=" + id +" message: " + e.getMessage());
    	  log.debug(e);
    	  return;
      }
      
      if (runningJob == null) {
          log.warn("Couldn't find job status for jobId=" + id);
          return;
      }
      
      Properties jobConfProperties = getJobConfFromFile(runningJob);

      JobID jobID = null;
      try {
        jobID = runningJob.getID();
      } catch (NullPointerException e) {
        log.warn("Couldn't get jobID for runningJob.");
        return;
      }
      
      TaskReport[] mapTaskReport = jobClientLocal.getMapTaskReports(jobID);
      TaskReport[] reduceTaskReport = jobClientLocal.getReduceTaskReports(jobID);
      if (jobConfProperties != null && jobConfProperties.size() > 0) {
        pigJob.setConfiguration(jobConfProperties);
      }
      pigJob.setMapReduceJobState(
          new MapReduceJobState(runningJob, mapTaskReport, reduceTaskReport));
    } catch (IOException e) {
      log.warn("Error occurred when retrieving job progress info. ", e);
    }
  }

  /**
   * Get the configurations at the beginning of the job flow, it will contain information
   * about the map/reduce plan and decoded pig script.
   * @param runningJob
   * @return Properties - configuration properties of the job
   */
  private Properties getJobConfFromFile(RunningJob runningJob) {
    Properties jobConfProperties = new Properties();
    try {
      log.info("RunningJob Configuration File location: " + runningJob.getJobFile());
      Path path = new Path(runningJob.getJobFile());
      Configuration conf = new Configuration(false);
      FileSystem fileSystem = FileSystem.get(new Configuration());
      InputStream inputStream = fileSystem.open(path);
      conf.addResource(inputStream);

      for (Map.Entry entry : conf) {
        if (entry.getKey().equals("pig.mapPlan")
            || entry.getKey().equals("pig.reducePlan")) {
          jobConfProperties.setProperty(entry.getKey(),
              ObjectSerializer.deserialize(entry.getValue()).toString());
        } else {
          jobConfProperties.setProperty(entry.getKey(), entry.getValue());
        }
      }
    } catch (FileNotFoundException e) {
      log.warn("Configuration file not found for old jobsflows.");
    } catch (IOException e) {
      log.warn("Error occurred when retrieving configuration info.", e);
    return jobConfProperties;
  }

>>>>>>> e5f05a204f50f43d0050ea3704837fb6efe560b9
  private static String[] toArray(String string) {
    return string == null ? new String[0] : string.trim().split(",");
  }
Solution content
    }
  }

  private static String[] toArray(String string) {
    return string == null ? new String[0] : string.trim().split(",");
  }
File
AmbrosePigProgressNotificationListener.java
Developer's decision
Version 1
Kind of conflict
Annotation
Comment
Method declaration