}
}
}
<<<<<<< HEAD
=======
private void pushEvent(String scriptId, Event event) {
try {
statsWriteService.pushEvent(scriptId, event);
} catch (IOException e) {
log.error("Couldn't send event to StatsWriteService", e);
}
}
@SuppressWarnings("deprecation")
private void addMapReduceJobState(PigJob pigJob) {
JobClient jobClientLocal = getJobClient();
try {
String id = pigJob.getId();
RunningJob runningJob = null;
try {
runningJob = jobClientLocal.getJob(id);
} catch (Exception e) {
log.warn("Exception while querying job status for jobId=" + id +" message: " + e.getMessage());
log.debug(e);
return;
}
if (runningJob == null) {
log.warn("Couldn't find job status for jobId=" + id);
return;
}
Properties jobConfProperties = getJobConfFromFile(runningJob);
JobID jobID = null;
try {
jobID = runningJob.getID();
} catch (NullPointerException e) {
log.warn("Couldn't get jobID for runningJob.");
return;
}
TaskReport[] mapTaskReport = jobClientLocal.getMapTaskReports(jobID);
TaskReport[] reduceTaskReport = jobClientLocal.getReduceTaskReports(jobID);
if (jobConfProperties != null && jobConfProperties.size() > 0) {
pigJob.setConfiguration(jobConfProperties);
}
pigJob.setMapReduceJobState(
new MapReduceJobState(runningJob, mapTaskReport, reduceTaskReport));
} catch (IOException e) {
log.warn("Error occurred when retrieving job progress info. ", e);
}
}
/**
* Get the configurations at the beginning of the job flow, it will contain information
* about the map/reduce plan and decoded pig script.
* @param runningJob
* @return Properties - configuration properties of the job
*/
private Properties getJobConfFromFile(RunningJob runningJob) {
Properties jobConfProperties = new Properties();
try {
log.info("RunningJob Configuration File location: " + runningJob.getJobFile());
Path path = new Path(runningJob.getJobFile());
Configuration conf = new Configuration(false);
FileSystem fileSystem = FileSystem.get(new Configuration());
InputStream inputStream = fileSystem.open(path);
conf.addResource(inputStream);
for (Map.Entry entry : conf) {
if (entry.getKey().equals("pig.mapPlan")
|| entry.getKey().equals("pig.reducePlan")) {
jobConfProperties.setProperty(entry.getKey(),
ObjectSerializer.deserialize(entry.getValue()).toString());
} else {
jobConfProperties.setProperty(entry.getKey(), entry.getValue());
}
}
} catch (FileNotFoundException e) {
log.warn("Configuration file not found for old jobsflows.");
} catch (IOException e) {
log.warn("Error occurred when retrieving configuration info.", e);
return jobConfProperties;
}
>>>>>>> e5f05a204f50f43d0050ea3704837fb6efe560b9
private static String[] toArray(String string) {
return string == null ? new String[0] : string.trim().split(",");
} |