Projects >> cascading >>103e9fb9969fe35fba31d2430c8fc785a2f9808d

Chunk
Conflicting content
import cascading.pipe.OperatorException;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
<<<<<<< HEAD
import cascading.property.AppProps;
=======
import cascading.property.ConfigDef;
>>>>>>> 8db4b31f78e4505c2f8336ac8ddeb05f02c67430
import cascading.property.PropertyUtil;
import cascading.scheme.Scheme;
import cascading.tap.Tap;
Solution content
import cascading.pipe.OperatorException;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.property.AppProps;
import cascading.property.ConfigDef;
import cascading.property.PropertyUtil;
import cascading.scheme.Scheme;
import cascading.tap.Tap;
File
FlowPlanner.java
Developer's decision
Concatenation
Kind of conflict
Import
Chunk
Conflicting content
      {
      // only restart from a checkpoint pipe or checkpoint tap below
      if( pipe instanceof Checkpoint )
<<<<<<< HEAD
        checkpointTap = makeTempTap( checkpointTapRootPath, pipe.getName() );
=======
        {
        checkpointTap = makeTempTap( checkpointRootPath, pipe.getName() );
        // mark as an anonymous checkpoint
        checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" );
        }
>>>>>>> 8db4b31f78e4505c2f8336ac8ddeb05f02c67430
      else
        {
        checkpointTap = makeTempTap( pipe.getName() );
Solution content
      {
      // only restart from a checkpoint pipe or checkpoint tap below
      if( pipe instanceof Checkpoint )
        {
        checkpointTap = makeTempTap( checkpointTapRootPath, pipe.getName() );
        // mark as an anonymous checkpoint
        checkpointTap.getConfigDef().setProperty( ConfigDef.Mode.DEFAULT, "cascading.checkpoint", "true" );
        }
      else
        {
        checkpointTap = makeTempTap( pipe.getName() );
File
FlowPlanner.java
Developer's decision
Combination
Kind of conflict
Comment
Method invocation
Variable
Chunk
Conflicting content
    String filename = file.toString();

<<<<<<< HEAD
    flowElementGraph.writeDOT( filename );
    }

  protected void writeTracePlan( String flowName, String fileName, FlowStepGraph stepGraph )
    {
    String path = getPlanTracePath();
=======
        if( flowElement instanceof Merge )
          {
          merges.add( (Merge) flowElement );
          }
        else if( flowElement instanceof HashJoin )
          {
          HashJoin join = (HashJoin) flowElement;

          Map pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, join, true );

          // is this path streamed
          int pathPosition = pathPositionInto( path, join );
          boolean thisPathIsStreamed = pathPosition == 0;

          boolean isAccumulatedAndStreamed = isBothAccumulatedAndStreamedPath( pathCounts ); // has streamed and accumulated paths
          int pathCount = countPaths( pathCounts );

          int priorJoins = countTypesBetween( elementGraph, lastSourceElement, join, HashJoin.class );

          if( priorJoins == 0 )
            {
            // if same source is leading into the hashjoin, insert tap on the accumulated side
            if( pathCount == 2 && isAccumulatedAndStreamed && !thisPathIsStreamed )
              {
              tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
              break;
              }

            // if more than one path into streamed and accumulated branches, insert tap on streamed side
            if( pathCount > 2 && isAccumulatedAndStreamed && thisPathIsStreamed )
              {
              tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
              break;
              }
            }

          if( !merges.isEmpty() )
            {
            // if a Merge is prior to a HashJoin, and its an accumulated path, force Merge results to disk
            int joinPos = flowElements.indexOf( join );
            int mergePos = nearest( flowElements, joinPos, merges );

            if( mergePos != -1 && joinPos > mergePos )
              {
              // if all paths are accumulated and streamed, insert
              // else if just if this path is accumulated
              if( ( isAccumulatedAndStreamed && thisPathIsStreamed ) || !thisPathIsStreamed )
                {
                tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( join ) - 1 ) );
                break;
                }
              }
            }

          joins.add( (HashJoin) flowElement );
          }
        else if( flowElement instanceof Tap || flowElement instanceof Group )
          {
          // added for JoinFieldedPipesPlatformTest.testJoinMergeGroupBy where Merge hides streamed nature of path
          if( flowElement instanceof Group && !joins.isEmpty() )
            {
            List splices = new ArrayList();

            splices.addAll( merges );
            splices.add( (Splice) flowElement );

            Collections.reverse( splices );

            for( Splice splice : splices )
              {
              Map pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, splice, true );

              if( isBothAccumulatedAndStreamedPath( pathCounts ) )
                {
                tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( splice ) - 1 ) );
                break;
                }
              }

            if( !tapInsertions.isEmpty() )
              break;
            }

          for( int j = 0; j < joins.size(); j++ )
            {
            HashJoin join = joins.get( j );
>>>>>>> 8db4b31f78e4505c2f8336ac8ddeb05f02c67430

    if( path == null )
      return;
Solution content
    String filename = file.toString();

    flowElementGraph.writeDOT( filename );
    }

  protected void writeTracePlan( String flowName, String fileName, FlowStepGraph stepGraph )
    {
    String path = getPlanTracePath();

    if( path == null )
      return;
File
FlowPlanner.java
Developer's decision
Version 1
Kind of conflict
Cast expression
Comment
For statement
If statement
Method invocation
Method signature
Variable
Chunk
Conflicting content
  @Override
  public JobConf copyConfig( JobConf config )
    {
<<<<<<< HEAD:cascading-hadoop/src/main/shared-mr1/cascading/flow/hadoop/HadoopFlowProcess.java
    return new JobConf( config );
=======
    return HadoopUtil.copyJobConf( jobConf );
>>>>>>> 8db4b31f78e4505c2f8336ac8ddeb05f02c67430:cascading-hadoop/src/main/shared/cascading/flow/hadoop/HadoopFlowProcess.java
    }

  @Override
Solution content
  @Override
  public JobConf copyConfig( JobConf config )
    {
    return HadoopUtil.copyJobConf( jobConf );
    }

  @Override
File
HadoopFlowProcess.java
Developer's decision
Version 2
Kind of conflict
Method invocation
Return statement
Chunk
Conflicting content
package cascading.tuple.hadoop.util;

import cascading.tuple.Tuple;
<<<<<<< HEAD:cascading-hadoop/src/main/shared-mr1/cascading/tuple/hadoop/util/GroupingSortingPartitioner.java
import cascading.tuple.io.TuplePair;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;

/** Class GroupingPartitioner is an implementation of {@link Partitioner}. */
public class GroupingSortingPartitioner extends HasherPartitioner implements Partitioner
=======
import org.apache.hadoop.mapred.Partitioner;

/**
 * GroupingPartitioner is a Partitioner based on the hashCode of the tuple. It honors custom Hasher implementations if
 * present.
 */
public class GroupingPartitioner  extends HasherPartitioner implements Partitioner
>>>>>>> 8db4b31f78e4505c2f8336ac8ddeb05f02c67430:cascading-hadoop/src/main/shared/cascading/tuple/hadoop/util/GroupingPartitioner.java
  {
  @Override
  public int getPartition( Tuple key, Tuple value, int numReduceTasks )
Solution content
package cascading.tuple.hadoop.util;

import cascading.tuple.Tuple;
import cascading.tuple.io.TuplePair;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;

/** Class GroupingSortingPartitioner is an implementation of {@link Partitioner}. */
public class GroupingSortingPartitioner extends HasherPartitioner implements Partitioner
  {
File
GroupingSortingPartitioner.java
Developer's decision
Manual
Kind of conflict
Class signature
Comment
Import
Chunk
Conflicting content
  public static  J mergeConf( J job, Map config, boolean directly )
    {
<<<<<<< HEAD
    Configuration currentConf = directly ? job : ( job instanceof JobConf ? new JobConf( job ) : new Configuration( job ) );
=======
    JobConf currentConf = directly ? job : copyJobConf( job );
>>>>>>> 8db4b31f78e4505c2f8336ac8ddeb05f02c67430

    for( String key : config.keySet() )
      {
Solution content
  public static  J mergeConf( J job, Map config, boolean directly )
    {
    Configuration currentConf = directly ? job : ( job instanceof JobConf ? copyJobConf( (JobConf) job ) : new Configuration( job ) );

    for( String key : config.keySet() )
      {
File
HadoopUtil.java
Developer's decision
Manual
Kind of conflict
Method invocation
Variable
Chunk
Conflicting content
<<<<<<< HEAD
    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
    }

  @Test
  public void testGroupBySplitJoins() throws Exception
    {
    getPlatform().copyFromLocal( inputFileLower );
    getPlatform().copyFromLocal( inputFileUpper );
    getPlatform().copyFromLocal( inputFileJoined );

    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
    Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined );

    Map sources = new HashMap();

    sources.put( "lower", sourceLower );
    sources.put( "upper", sourceUpper );
    sources.put( "joined", sourceJoined );

    Tap lhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE );
    Tap rhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE );

    Map sinks = new HashMap();

    sinks.put( "lhs", lhsSink );
    sinks.put( "rhs", rhsSink );

    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
    Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" );

    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
    Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined );

    Pipe pipe = new GroupBy( pipeLower, new Fields( "numA" ) );

    pipe = new Every( pipe, Fields.ALL, new TestIdentityBuffer( new Fields( "numA" ), 5, false ), Fields.RESULTS );

    Pipe lhsPipe = new Each( pipe, new Identity() );
    lhsPipe = new HashJoin( "lhs", lhsPipe, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );

    Pipe rhsPipe = new Each( pipe, new Identity() );
    rhsPipe = new HashJoin( "rhs", rhsPipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) );

    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, lhsPipe, rhsPipe );

    if( getPlatform().isMapReduce() )
      assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );

    flow.complete();

    validateLength( flow.openSink( "lhs" ), 5, null );
    validateLength( flow.openSink( "rhs" ), 5, null );

    List lhsActual = asList( flow, lhsSink );

    assertTrue( lhsActual.contains( new Tuple( "1\ta\t1\tA" ) ) );
    assertTrue( lhsActual.contains( new Tuple( "2\tb\t2\tB" ) ) );

    List rhsActual = asList( flow, rhsSink );

    assertTrue( rhsActual.contains( new Tuple( "1\ta\t1\ta\tA" ) ) );
    assertTrue( rhsActual.contains( new Tuple( "2\tb\t2\tb\tB" ) ) );
    }

  /**
   * currently we cannot efficiently plan for this case. better to throw an error
   * 

* When run against a cluster a Merge before a GroupBy can hide the streamed/accumulated nature of a branch. *

* commented code is for troubleshooting. * ======= /** * When run against a cluster a Merge before a GroupBy can hide the streamed/accumulated nature of a branch. * * commented code is for troubleshooting. >>>>>>> 8db4b31f78e4505c2f8336ac8ddeb05f02c67430 * @throws Exception */ @Test

Solution content
    assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
    }

  @Test
  public void testGroupBySplitJoins() throws Exception
    {
    getPlatform().copyFromLocal( inputFileLower );
    getPlatform().copyFromLocal( inputFileUpper );
    getPlatform().copyFromLocal( inputFileJoined );

    Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
    Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );
    Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined );

    Map sources = new HashMap();

    sources.put( "lower", sourceLower );
    sources.put( "upper", sourceUpper );
    sources.put( "joined", sourceJoined );

    Tap lhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE );
    Tap rhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE );

    Map sinks = new HashMap();

    sinks.put( "lhs", lhsSink );
    sinks.put( "rhs", rhsSink );

    Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " );
    Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " );
    Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" );

    Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower );
    Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper );
    Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined );

    Pipe pipe = new GroupBy( pipeLower, new Fields( "numA" ) );

    pipe = new Every( pipe, Fields.ALL, new TestIdentityBuffer( new Fields( "numA" ), 5, false ), Fields.RESULTS );

    Pipe lhsPipe = new Each( pipe, new Identity() );
    lhsPipe = new HashJoin( "lhs", lhsPipe, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) );

    Pipe rhsPipe = new Each( pipe, new Identity() );
    rhsPipe = new HashJoin( "rhs", rhsPipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) );

    Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, lhsPipe, rhsPipe );

    if( getPlatform().isMapReduce() )
      assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() );

    flow.complete();

    validateLength( flow.openSink( "lhs" ), 5, null );
    validateLength( flow.openSink( "rhs" ), 5, null );

    List lhsActual = asList( flow, lhsSink );

    assertTrue( lhsActual.contains( new Tuple( "1\ta\t1\tA" ) ) );
    assertTrue( lhsActual.contains( new Tuple( "2\tb\t2\tB" ) ) );

    List rhsActual = asList( flow, rhsSink );

    assertTrue( rhsActual.contains( new Tuple( "1\ta\t1\ta\tA" ) ) );
    assertTrue( rhsActual.contains( new Tuple( "2\tb\t2\tb\tB" ) ) );
    }

  /**
   * currently we cannot efficiently plan for this case. better to throw an error
   * 

* When run against a cluster a Merge before a GroupBy can hide the streamed/accumulated nature of a branch. *

* commented code is for troubleshooting. * * @throws Exception */ @Test

File
JoinFieldedPipesPlatformTest.java
Developer's decision
Version 1
Kind of conflict
Annotation
Comment
Method declaration
Chunk
Conflicting content
      .addSource( lhs, lhsTap )
    Tap sink = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", null, getOutputPath( testJoinMerge ), SinkMode.REPLACE );

    FlowDef flowDef = FlowDef.flowDef()
<<<<<<< HEAD
      .setName( "join-merge" )
=======
>>>>>>> 8db4b31f78e4505c2f8336ac8ddeb05f02c67430
      .addSource( rhs, rhsTap )
      .addTailSink( counted, sink );
Solution content
    Tap sink = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", null, getOutputPath( testJoinMerge ), SinkMode.REPLACE );

    FlowDef flowDef = FlowDef.flowDef()
      .setName( "join-merge" )
      .addSource( rhs, rhsTap )
      .addSource( lhs, lhsTap )
      .addTailSink( counted, sink );
File
JoinFieldedPipesPlatformTest.java
Developer's decision
Version 1
Kind of conflict
Other
Chunk
Conflicting content
      .addSource( lhs, lhsTap )
      .addTailSink( counted, sink );

<<<<<<< HEAD
    boolean failOnMR = getPlatform().isMapReduce(); // plan seems reasonable in tez

    Flow flow = null;

    try
      {
      flow = getPlatform().getFlowConnector().connect( flowDef );

      if( failOnMR )
        fail( "planner should throw error on plan" );
      }
    catch( Exception exception )
      {
      if( !failOnMR )
        throw exception;
      else
        return;
      }
=======
    Flow flow = getPlatform().getFlowConnector().connect( flowDef );
>>>>>>> 8db4b31f78e4505c2f8336ac8ddeb05f02c67430

//    flow.writeDOT( "joinmerge.dot" );
//    flow.writeStepsDOT( "joinmerge-steps.dot" );
Solution content
      .addSource( lhs, lhsTap )
      .addTailSink( counted, sink );

    boolean failOnMR = getPlatform().isMapReduce(); // plan seems reasonable in tez

    Flow flow = null;

    try
      {
      flow = getPlatform().getFlowConnector().connect( flowDef );

      if( failOnMR )
        fail( "planner should throw error on plan" );
      }
    catch( Exception exception )
      {
      if( !failOnMR )
        throw exception;
      else
        return;
      }

//    flow.writeDOT( "joinmerge.dot" );
//    flow.writeStepsDOT( "joinmerge-steps.dot" );
File
JoinFieldedPipesPlatformTest.java
Developer's decision
Version 1
Kind of conflict
Comment
Method invocation
Try statement
Variable
Chunk
Conflicting content
    expected.add( new Tuple( "8", "2" ) );
    expected.add( new Tuple( "9", "2" ) );

<<<<<<< HEAD
    Collections.sort( values );
    Collections.sort( expected );
=======
    Collections.sort(values);
    Collections.sort(expected);
>>>>>>> 8db4b31f78e4505c2f8336ac8ddeb05f02c67430

    assertEquals( expected, values );
    }
Solution content
    expected.add( new Tuple( "8", "2" ) );
    expected.add( new Tuple( "9", "2" ) );

    Collections.sort( values );
    Collections.sort( expected );

    assertEquals( expected, values );
    }
File
JoinFieldedPipesPlatformTest.java
Developer's decision
Version 1
Kind of conflict
Method invocation