Projects >> Ivory >>77f22923095fe6d244c7f697094f1bfada08495c

Chunk
Conflicting content
							+ EntityUtil.getWorkflowName(entity) + ";", 0, 256);
			if (jobs != null) {
			    List filteredJobs = new ArrayList();
<<<<<<< HEAD
			    for(BundleJob job : jobs) {
			        if (forKill || (job.getStatus() != Job.Status.KILLED || job.getEndTime() == null)) {
=======
			    for(BundleJob job : jobs)
			        if(job.getStatus() != Job.Status.KILLED || job.getEndTime() == null)
>>>>>>> ef7bd45ecb5a925af2f7594e75c044da4bd9ac7a
			            filteredJobs.add(job);
                    }
                }
Solution content
                    if (!BUNDLE_RUNNING_STATUS.contains(bundle.getStatus()))
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ivory.workflow.engine;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;

import org.apache.commons.lang.StringUtils;
import org.apache.ivory.IvoryException;
import org.apache.ivory.Tag;
import org.apache.ivory.entity.EntityUtil;
import org.apache.ivory.entity.v0.Entity;
import org.apache.ivory.entity.v0.EntityGraph;
import org.apache.ivory.entity.v0.EntityType;
import org.apache.ivory.entity.v0.Frequency;
import org.apache.ivory.entity.v0.Frequency.TimeUnit;
import org.apache.ivory.entity.v0.SchemaHelper;
import org.apache.ivory.resource.APIResult;
import org.apache.ivory.resource.InstancesResult;
import org.apache.ivory.resource.InstancesResult.Instance;
import org.apache.ivory.resource.InstancesResult.WorkflowStatus;
import org.apache.ivory.update.UpdateHelper;
import org.apache.ivory.util.OozieUtils;
import org.apache.ivory.workflow.OozieWorkflowBuilder;
import org.apache.ivory.workflow.WorkflowBuilder;
import org.apache.log4j.Logger;
import org.apache.oozie.client.*;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
import org.apache.oozie.client.WorkflowJob.Status;

/**
 * Workflow engine which uses oozies APIs
 *
 */
public class OozieWorkflowEngine extends AbstractWorkflowEngine {

    private static final Logger LOG = Logger
            .getLogger(OozieWorkflowEngine.class);

    public static final String ENGINE = "oozie";
    private static final BundleJob MISSING = new NullBundleJob();

                case RUNNING:
            }
                        return false;
        }
    private static final List WF_KILL_PRECOND = Arrays.asList(Status.PREP,
            Status.RUNNING, Status.SUSPENDED, Status.FAILED);
    private static final List WF_SUSPEND_PRECOND = Arrays
            .asList(Status.RUNNING);
        return true;
    private static final List WF_RESUME_PRECOND = Arrays
            .asList(Status.SUSPENDED);
    private static final List WF_RERUN_PRECOND = Arrays.asList(Status.FAILED,
            Status.KILLED, Status.SUCCEEDED);

    private static final List BUNDLE_ACTIVE_STATUS = Arrays.asList(
            Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUSPENDED,
            Job.Status.PREPSUSPENDED, Job.Status.DONEWITHERROR);
    private static final List BUNDLE_SUSPENDED_STATUS = Arrays.asList(
            Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED);
    private static final List BUNDLE_RUNNING_STATUS = Arrays.asList(
            Job.Status.PREP, Job.Status.RUNNING);

    private static final List BUNDLE_SUSPEND_PRECOND = Arrays.asList(
            Job.Status.PREP, Job.Status.RUNNING, Job.Status.DONEWITHERROR);
    private static final List BUNDLE_RESUME_PRECOND = Arrays.asList(
            Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED);
    private static final String IVORY_INSTANCE_ACTION_CLUSTERS = "ivory.instance.action.clusters";
    private static final String IVORY_INSTANCE_SOURCE_CLUSTERS = "ivory.instance.source.clusters";

    private static final String[] BUNDLE_UPDATEABLE_PROPS = new String[] {
            "parallel", "clusters.clusters[\\d+].validity.end" };

    public OozieWorkflowEngine() {
        registerListener(new OozieHouseKeepingService());
    }

    @Override
    public void schedule(Entity entity) throws IvoryException {
        Map bundleMap = findLatestBundle(entity);
        List schedClusters = new ArrayList();
        for (String cluster : bundleMap.keySet()) {
            BundleJob bundleJob = bundleMap.get(cluster);
            if (bundleJob == MISSING || bundleJob.getStatus().equals(Job.Status.KILLED)) {
                if (bundleJob.getStatus().equals(Job.Status.KILLED)) {
                    LOG.warn("Bundle id: " + bundleJob.getId() + " is in killed state, so allowing schedule");
                }
                schedClusters.add(cluster);
            } else {
                LOG.debug("The entity " + entity.getName() + " is already scheduled on cluster " + cluster);
            }
        }

        if (!schedClusters.isEmpty()) {
            WorkflowBuilder builder = WorkflowBuilder.getBuilder(
                    ENGINE, entity);
            Map newFlows = builder.newWorkflowSchedule(
                    entity, schedClusters);
            for (String cluster : newFlows.keySet()) {
                LOG.info("Scheduling " + entity.toShortString()
                        + " on cluster " + cluster);
                scheduleEntity(cluster, newFlows.get(cluster), entity);
            }
        }
    }

    @Override
    public boolean isActive(Entity entity) throws IvoryException {
        return isBundleInState(entity, BundleStatus.ACTIVE);
    }

    @Override
    public boolean isSuspended(Entity entity) throws IvoryException {
        return isBundleInState(entity, BundleStatus.SUSPENDED);
    }

    private enum BundleStatus {
        ACTIVE, RUNNING, SUSPENDED
    }

    private boolean isBundleInState(Entity entity, BundleStatus status)
            throws IvoryException {
        Map bundles = findLatestBundle(entity);
        for (BundleJob bundle : bundles.values()) {
            if (bundle == MISSING) // There is no active bundle
                return false;

            switch (status) {
                case ACTIVE:
                    if (!BUNDLE_ACTIVE_STATUS.contains(bundle.getStatus()))
                        return false;
                    break;

                    break;

                case SUSPENDED:
                    if (!BUNDLE_SUSPENDED_STATUS.contains(bundle.getStatus()))
                        return false;
                    break;
    }

    private BundleJob findBundle(Entity entity, String cluster)
            throws IvoryException {
        String stPath = EntityUtil.getStagingPath(entity);
        List bundles = findBundles(entity, cluster);
        for (BundleJob job : bundles) {
            if (job.getAppPath().endsWith(stPath)) {
                return getBundleInfo(cluster, job.getId());
            }
        }
        return MISSING;
    }

    private List findBundles(Entity entity, String cluster, boolean forKill)
            throws IvoryException {
        try {
            OozieClient client = OozieClientFactory.get(cluster);
            List jobs = client.getBundleJobsInfo(
                    OozieClient.FILTER_NAME + "="
                            + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
            if (jobs != null) {
                List filteredJobs = new ArrayList();
                for(BundleJob job : jobs) {
                    if(job.getStatus() != Job.Status.KILLED || job.getEndTime() == null) {
                        filteredJobs.add(job);
                    }
                }
File
OozieWorkflowEngine.java
Developer's decision
Manual
Kind of conflict
For statement
If statement