if (!BUNDLE_RUNNING_STATUS.contains(bundle.getStatus()))
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ivory.workflow.engine;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import org.apache.commons.lang.StringUtils;
import org.apache.ivory.IvoryException;
import org.apache.ivory.Tag;
import org.apache.ivory.entity.EntityUtil;
import org.apache.ivory.entity.v0.Entity;
import org.apache.ivory.entity.v0.EntityGraph;
import org.apache.ivory.entity.v0.EntityType;
import org.apache.ivory.entity.v0.Frequency;
import org.apache.ivory.entity.v0.Frequency.TimeUnit;
import org.apache.ivory.entity.v0.SchemaHelper;
import org.apache.ivory.resource.APIResult;
import org.apache.ivory.resource.InstancesResult;
import org.apache.ivory.resource.InstancesResult.Instance;
import org.apache.ivory.resource.InstancesResult.WorkflowStatus;
import org.apache.ivory.update.UpdateHelper;
import org.apache.ivory.util.OozieUtils;
import org.apache.ivory.workflow.OozieWorkflowBuilder;
import org.apache.ivory.workflow.WorkflowBuilder;
import org.apache.log4j.Logger;
import org.apache.oozie.client.*;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
import org.apache.oozie.client.WorkflowJob.Status;
/**
* Workflow engine which uses oozies APIs
*
*/
public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private static final Logger LOG = Logger
.getLogger(OozieWorkflowEngine.class);
public static final String ENGINE = "oozie";
private static final BundleJob MISSING = new NullBundleJob();
case RUNNING:
}
return false;
}
private static final List WF_KILL_PRECOND = Arrays.asList(Status.PREP,
Status.RUNNING, Status.SUSPENDED, Status.FAILED);
private static final List WF_SUSPEND_PRECOND = Arrays
.asList(Status.RUNNING);
return true;
private static final List WF_RESUME_PRECOND = Arrays
.asList(Status.SUSPENDED);
private static final List WF_RERUN_PRECOND = Arrays.asList(Status.FAILED,
Status.KILLED, Status.SUCCEEDED);
private static final List BUNDLE_ACTIVE_STATUS = Arrays.asList(
Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUSPENDED,
Job.Status.PREPSUSPENDED, Job.Status.DONEWITHERROR);
private static final List BUNDLE_SUSPENDED_STATUS = Arrays.asList(
Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED);
private static final List BUNDLE_RUNNING_STATUS = Arrays.asList(
Job.Status.PREP, Job.Status.RUNNING);
private static final List BUNDLE_SUSPEND_PRECOND = Arrays.asList(
Job.Status.PREP, Job.Status.RUNNING, Job.Status.DONEWITHERROR);
private static final List BUNDLE_RESUME_PRECOND = Arrays.asList(
Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED);
private static final String IVORY_INSTANCE_ACTION_CLUSTERS = "ivory.instance.action.clusters";
private static final String IVORY_INSTANCE_SOURCE_CLUSTERS = "ivory.instance.source.clusters";
private static final String[] BUNDLE_UPDATEABLE_PROPS = new String[] {
"parallel", "clusters.clusters[\\d+].validity.end" };
public OozieWorkflowEngine() {
registerListener(new OozieHouseKeepingService());
}
@Override
public void schedule(Entity entity) throws IvoryException {
Map bundleMap = findLatestBundle(entity);
List schedClusters = new ArrayList();
for (String cluster : bundleMap.keySet()) {
BundleJob bundleJob = bundleMap.get(cluster);
if (bundleJob == MISSING || bundleJob.getStatus().equals(Job.Status.KILLED)) {
if (bundleJob.getStatus().equals(Job.Status.KILLED)) {
LOG.warn("Bundle id: " + bundleJob.getId() + " is in killed state, so allowing schedule");
}
schedClusters.add(cluster);
} else {
LOG.debug("The entity " + entity.getName() + " is already scheduled on cluster " + cluster);
}
}
if (!schedClusters.isEmpty()) {
WorkflowBuilder builder = WorkflowBuilder.getBuilder(
ENGINE, entity);
Map newFlows = builder.newWorkflowSchedule(
entity, schedClusters);
for (String cluster : newFlows.keySet()) {
LOG.info("Scheduling " + entity.toShortString()
+ " on cluster " + cluster);
scheduleEntity(cluster, newFlows.get(cluster), entity);
}
}
}
@Override
public boolean isActive(Entity entity) throws IvoryException {
return isBundleInState(entity, BundleStatus.ACTIVE);
}
@Override
public boolean isSuspended(Entity entity) throws IvoryException {
return isBundleInState(entity, BundleStatus.SUSPENDED);
}
private enum BundleStatus {
ACTIVE, RUNNING, SUSPENDED
}
private boolean isBundleInState(Entity entity, BundleStatus status)
throws IvoryException {
Map bundles = findLatestBundle(entity);
for (BundleJob bundle : bundles.values()) {
if (bundle == MISSING) // There is no active bundle
return false;
switch (status) {
case ACTIVE:
if (!BUNDLE_ACTIVE_STATUS.contains(bundle.getStatus()))
return false;
break;
break;
case SUSPENDED:
if (!BUNDLE_SUSPENDED_STATUS.contains(bundle.getStatus()))
return false;
break;
}
private BundleJob findBundle(Entity entity, String cluster)
throws IvoryException {
String stPath = EntityUtil.getStagingPath(entity);
List bundles = findBundles(entity, cluster);
for (BundleJob job : bundles) {
if (job.getAppPath().endsWith(stPath)) {
return getBundleInfo(cluster, job.getId());
}
}
return MISSING;
}
private List findBundles(Entity entity, String cluster, boolean forKill)
throws IvoryException {
try {
OozieClient client = OozieClientFactory.get(cluster);
List jobs = client.getBundleJobsInfo(
OozieClient.FILTER_NAME + "="
+ EntityUtil.getWorkflowName(entity) + ";", 0, 256);
if (jobs != null) {
List filteredJobs = new ArrayList();
for(BundleJob job : jobs) {
if(job.getStatus() != Job.Status.KILLED || job.getEndTime() == null) {
filteredJobs.add(job);
}
} |