Projects >> cascading >>b0125e6c3faad1b1f80c87c76316a6a39ee99171

Chunk
Conflicting content
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
<<<<<<< HEAD
import cascading.util.Util;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
=======
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
>>>>>>> 91f7bc12b3fb66e9156e15d07fd4573d103dda44
import org.apache.log4j.Logger;

import java.io.IOException;
Solution content
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
import org.apache.log4j.Logger;

import java.io.IOException;
File
TapCollector.java
Developer's decision
Version 2
Kind of conflict
Import
Chunk
Conflicting content
    Hadoop18TapUtil.setupTask( conf );

<<<<<<< HEAD
    filename = String.format( filenamePattern, conf.getInt( "mapred.task.partition", 0 ) );

    conf.set( "mapred.work.output.dir", outputPath.toString() );

    if( outputFormat instanceof FileOutputFormat ) // only file based writing uses temp dirs
      fileSystem.mkdirs( new Path( conf.get( "mapred.work.output.dir" ), "_temporary" ) );

    if( conf.get( "mapred.task.id" ) == null ) // need to stuff a fake id
      conf.set( "mapred.task.id", String.format( "attempt_%12.0e_0000_m_000000_0", Math.rint( System.currentTimeMillis() ) ) );
=======
    OutputFormat outputFormat = conf.getOutputFormat();
>>>>>>> 91f7bc12b3fb66e9156e15d07fd4573d103dda44

    writer = outputFormat.getRecordWriter( null, conf, filename, Reporter.NULL );
    }
Solution content
    Hadoop18TapUtil.setupTask( conf );

    OutputFormat outputFormat = conf.getOutputFormat();

    writer = outputFormat.getRecordWriter( null, conf, filename, Reporter.NULL );
    }
File
TapCollector.java
Developer's decision
Version 2
Kind of conflict
Attribute
If statement
Method invocation
Variable
Chunk
Conflicting content
      }
    }

<<<<<<< HEAD
  private void moveTaskOutputs() throws IOException
    {
    Path outputPath = FileOutputFormat.getOutputPath( conf );

    String taskIdPath = conf.get( "mapred.task.id" );
    Class[] classes = {JobConf.class, String.class};
    Object[] parameters = {conf, "_temporary/" + taskIdPath};
    Path taskPath = (Path) Util.invokeStaticMethod( FileOutputFormat.class, "getTaskOutputPath", parameters, classes );

    taskPath = taskPath.getParent();

    FileSystem fileSystem = FileSystem.get( outputPath.toUri(), conf );

    if( !fileSystem.getFileStatus( taskPath ).isDir() )
      throw new IOException( "path is not a directory: " + taskPath );


    FileStatus[] statuses = fileSystem.listStatus( taskPath );
    for( FileStatus status : statuses )
      {
      Path sourcePath = status.getPath();

      if( status.isDir() )
        throw new IOException( "path is a directory, no support for nested directories: " + sourcePath );

      Path targetPath = new Path( outputPath, sourcePath.getName() );

      fileSystem.rename( sourcePath, targetPath );

      LOG.debug( "moved " + sourcePath + " to " + targetPath );
      }

    // remove _temporary directory
    fileSystem.delete( new Path( conf.get( "mapred.work.output.dir" ), "_temporary" ), true );
    }

=======
>>>>>>> 91f7bc12b3fb66e9156e15d07fd4573d103dda44
  @Override
  public void close()
    {
Solution content
      }
    }

  @Override
  public void close()
    {
File
TapCollector.java
Developer's decision
Version 2
Kind of conflict
Method declaration
Chunk
Conflicting content
      writer.close( reporter );

<<<<<<< HEAD
      if( conf.getOutputFormat() instanceof FileOutputFormat )
        moveTaskOutputs();
=======
      if( Hadoop18TapUtil.needsTaskCommit( conf ) )
        Hadoop18TapUtil.commitTask( conf );
>>>>>>> 91f7bc12b3fb66e9156e15d07fd4573d103dda44

      Hadoop18TapUtil.cleanupJob( conf );
      }
Solution content
      writer.close( reporter );

      if( Hadoop18TapUtil.needsTaskCommit( conf ) )
        Hadoop18TapUtil.commitTask( conf );

      Hadoop18TapUtil.cleanupJob( conf );
      }
File
TapCollector.java
Developer's decision
Version 2
Kind of conflict
If statement
Chunk
Conflicting content
package cascading.util;

<<<<<<< HEAD
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.List;
import java.util.Map;

=======
>>>>>>> 91f7bc12b3fb66e9156e15d07fd4573d103dda44
import cascading.flow.FlowElement;
import cascading.flow.FlowException;
import cascading.flow.Scope;
Solution content
package cascading.util;

import cascading.flow.FlowElement;
import cascading.flow.FlowException;
import cascading.flow.Scope;
File
Util.java
Developer's decision
Version 2
Kind of conflict
Import
Chunk
Conflicting content
    }
    new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider ).export( writer, graph );
    }

<<<<<<< HEAD
=======
  public interface RetryOperator
    {
    T operate() throws Exception;

    boolean rethrow( Exception exception );
    }

  public static  T retry( Logger logger, int retries, int secondsDelay, String message, RetryOperator operator ) throws Exception
    {
    Exception saved = null;

    for( int i = 0; i < retries; i++ )
      {
      try
        {
        return operator.operate();
        }
      catch( Exception exception )
        {
        if( operator.rethrow( exception ) )
          {
          logger.warn( message + ", but not retrying", exception );

          throw exception;
          }

        saved = exception;

        logger.warn( message + ", attempt: " + ( i + 1 ), exception );

        try
          {
          Thread.sleep( secondsDelay * 1000 );
          }
        catch( InterruptedException exception1 )
          {
          // do nothing
          }
        }
      }

    logger.warn( message + ", done retrying after attempts: " + retries, saved );

    throw saved;

  public static Object createProtectedObject( Class type, Object[] parameters, Class[] parameterTypes )
    {
    try
      {
      Constructor constructor = type.getDeclaredConstructor( parameterTypes );

      constructor.setAccessible( true );

      return constructor.newInstance( parameters );
      }
    catch( Exception exception )
      {
      exception.printStackTrace();

      throw new FlowException( "unable to instantiate type: " + type.getName(), exception );
      }
    }

  public static Thread getHDFSShutdownHook()
    {
    try
      {
      // we must init the FS so the finalizer is registered
      FileSystem.getLocal( new JobConf() );

      Field field = FileSystem.class.getDeclaredField( "clientFinalizer" );
      field.setAccessible( true );

      Thread finalizer = (Thread) field.get( null );

      if( finalizer != null )
        {
        Runtime.getRuntime().removeShutdownHook( finalizer );
        return finalizer;
        }

      }
    catch( NoSuchFieldException exception )
      {
      LOG.warn( "unable to get finalizer", exception );
      }
    catch( IllegalAccessException exception )
      {
      LOG.warn( "unable to get finalizer", exception );
      }
    catch( IOException exception )
      {
      LOG.warn( "unable to init FileSystem", exception );
      }

    LOG.warn( "unable to find and remove client hdfs shutdown hook" );

    return null;
    }

>>>>>>> 91f7bc12b3fb66e9156e15d07fd4573d103dda44
  public static Object invokeStaticMethod( Class type, String methodName, Object[] parameters, Class[] parameterTypes )
    {
    try
Solution content
    new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider ).export( writer, graph );
    }

  public interface RetryOperator
    {
    T operate() throws Exception;

    boolean rethrow( Exception exception );
    }

  public static  T retry( Logger logger, int retries, int secondsDelay, String message, RetryOperator operator ) throws Exception
    {
    Exception saved = null;

    for( int i = 0; i < retries; i++ )
      {
      try
        {
        return operator.operate();
        }
      catch( Exception exception )
        {
        if( operator.rethrow( exception ) )
          {
          logger.warn( message + ", but not retrying", exception );

          throw exception;
          }

        saved = exception;

        logger.warn( message + ", attempt: " + ( i + 1 ), exception );

        try
          {
          Thread.sleep( secondsDelay * 1000 );
          }
        catch( InterruptedException exception1 )
          {
          // do nothing
          }
        }
      }

    logger.warn( message + ", done retrying after attempts: " + retries, saved );

    throw saved;
    }

  public static Object createProtectedObject( Class type, Object[] parameters, Class[] parameterTypes )
    {
    try
      {
      Constructor constructor = type.getDeclaredConstructor( parameterTypes );

      constructor.setAccessible( true );

      return constructor.newInstance( parameters );
      }
    catch( Exception exception )
      {
      exception.printStackTrace();

      throw new FlowException( "unable to instantiate type: " + type.getName(), exception );
      }
    }

  public static Thread getHDFSShutdownHook()
    {
    try
      {
      // we must init the FS so the finalizer is registered
      FileSystem.getLocal( new JobConf() );

      Field field = FileSystem.class.getDeclaredField( "clientFinalizer" );
      field.setAccessible( true );

      Thread finalizer = (Thread) field.get( null );

      if( finalizer != null )
        {
        Runtime.getRuntime().removeShutdownHook( finalizer );
        return finalizer;
        }

      }
    catch( NoSuchFieldException exception )
      {
      LOG.warn( "unable to get finalizer", exception );
      }
    catch( IllegalAccessException exception )
      {
      LOG.warn( "unable to get finalizer", exception );
      }
    catch( IOException exception )
      {
      LOG.warn( "unable to init FileSystem", exception );
      }

    LOG.warn( "unable to find and remove client hdfs shutdown hook" );

    return null;
    }

  public static Object invokeStaticMethod( Class type, String methodName, Object[] parameters, Class[] parameterTypes )
    {
    try
File
Util.java
Developer's decision
Version 2
Kind of conflict
Interface declaration
Method declaration