[java] Hadoop MapReduce: Strange Result when Storing Previous Value in Memory in a Reduce Class (Java)

If I wish to store the current value of an iterator to compare to the next value of the iterator in a Reduce method, Hadoop requires that I clone it instead of simply assigning its reference to a temporary variable.

I am about to post the code to my reducer.

You will see two parts:

  1. main method for testing in Eclipse
  2. reduce method for executing in Hadoop

You will notice that both lines of code are identical, except for the following things:

  1. The main method gets an Iterator from an ArrayList that I hard coded into it, whereas the reduce method gets an Iterator from the mapper method.
  2. The main method does not execute context.write, of course.

Here is the code that both pretty much share:

MMI currentMMI = null; MMI previousMMI = null; UltraAggregation currentAggregation = null;  while (values.hasNext()) {     currentMMI = values.next();     if (currentAggregation == null) {         currentAggregation = new UltraAggregation(currentMMI);     }     if (previousMMI == null) {         //previousMMI = new MMI(currentMMI);         previousMMI = currentMMI;         continue;     }     System.out.println();     System.out.println("currentMMI = " + currentMMI);     System.out.println("previousMMI = " + previousMMI);     System.out.println("equals? " + currentMMI.equals(previousMMI));     System.out.println("==? " + (currentMMI == previousMMI));     System.out.println();      // Business logic goes here and involves a context.write on certain conditions      previousMMI = currentMMI; } //final context.write 

You will notice that at the end of each loop I set the reference of the just-used MMI ("currentMMI") to the object variable "previousMMI". Then, on the next loop, I set the reference of the next() to currentMMI. When I execute my main method in Eclipse, the following queries evaluate to false, as expected:

currentMMI == previousMMI; currentMMI.equals(previousMMI);   

However, when executed in Hadoop, the currentMMI and the previousMMI always evaluate to true for the following two queries:

currentMMI == previousMMI; currentMMI.equals(previousMMI); 

Only if I change the line previousMMI = currentMMI to previousMMI = new MMI(currentMMI) will they evaluate to false. (I made a constructor for the MMI class which essentially shallow clones the incoming parameter).

Why would I have to clone instead of setting the reference when using the reducer in hadoop but not in the main method?

I am now going to copy and paste the reducer class, which has the 2 parts: the main method for eclipse testing and the reduce method for actual use in Hadoop.

import java.io.IOException; import java.util.ArrayList; import java.util.Iterator;  import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;  import com.cisco.webex.hadoop.ultrautility.models.MMI; import com.cisco.webex.hadoop.ultrautility.models.UltraAggregation;  public class MMIReducer extends Reducer<Text, MMI, Object, UltraAggregation> {     public static void main(String[] args) {         ArrayList<MMI> mmis = new ArrayList<MMI>();         mmis.add(new MMI("961864,1,1,1,D1,10,0,2013-08-02 06:00:00.0,USA,N,N"));         mmis.add(new MMI("961865,1,1,1,D1,10,1,2013-08-02 07:00:00.0,USA,N,N"));         mmis.add(new MMI("961866,1,1,1,D1,10,2,2013-08-02 08:00:00.0,USA,N,N"));         mmis.add(new MMI("961867,1,1,1,D1,10,3,2013-08-02 09:00:00.0,USA,N,N"));         mmis.add(new MMI("961868,1,1,1,D1,10,4,2013-08-02 10:00:00.0,USA,N,N"));         mmis.add(new MMI("961869,1,1,1,D1,10,5,2013-08-02 11:00:00.0,USA,N,N"));         mmis.add(new MMI("961870,1,1,1,D1,10,6,2013-08-02 12:00:00.0,USA,N,N"));         mmis.add(new MMI("961871,1,1,1,D1,10,7,2013-08-02 13:00:00.0,USA,N,N"));         mmis.add(new MMI("961872,1,1,1,D1,10,8,2013-08-02 14:00:00.0,USA,N,N"));         mmis.add(new MMI("961873,1,1,1,D1,10,9,2013-08-02 15:00:00.0,USA,N,N"));          Iterator<MMI> values = mmis.iterator();          MMI currentMMI = null;         MMI previousMMI = null;         UltraAggregation currentAggregation = null;          while (values.hasNext()) {             currentMMI = values.next();             if (currentAggregation == null) {                 currentAggregation = new UltraAggregation(currentMMI);             }             if (previousMMI == null) {                 //previousMMI = new MMI(currentMMI);                 previousMMI = currentMMI;                 continue;             }             System.out.println();             System.out.println("currentMMI = " + currentMMI);             System.out.println("previousMMI = " + previousMMI);             System.out.println("equals? " + currentMMI.equals(previousMMI));             System.out.println("==? " + (currentMMI == previousMMI));             System.out.println();              // Business logic goes here and involves a context.write on certain conditions              //previousMMI = new MMI(currentMMI);             /*             * THIS DOESNT CAUSE LOGIC ERRORS IN MAIN METHOD             */             previousMMI = currentMMI;         }         //context.write(null, currentAggregation);     }      @Override     public void reduce(Text key, Iterable<MMI> vals, Context context) throws IOException, InterruptedException {         Iterator<MMI> values = vals.iterator();          //key = deviceId         MMI currentMMI = null;         MMI previousMMI = null;         UltraAggregation currentAggregation = null;          while (values.hasNext()) {             currentMMI = values.next();             if (currentAggregation == null) {                 currentAggregation = new UltraAggregation(currentMMI);             }             if (previousMMI == null) {                 System.out.println("PreviousMMI is null, setting previousMMI to current MMI and continuing");                 //previousMMI = new MMI(currentMMI);                 previousMMI = currentMMI;                 continue;             }             System.out.println();             System.out.println("currentMMI = " + currentMMI);             System.out.println("previousMMI = " + previousMMI);             System.out.println("equals? " + currentMMI.equals(previousMMI));             System.out.println("==? " + (currentMMI == previousMMI));             System.out.println();              // Business logic goes here and involves a context.write on certain conditions              //previousMMI = new MMI(currentMMI); //Acts as intended             /*             * THIS CAUSES ERRORS WHEN EXECUTED THROUGH HADOOP             */             previousMMI = currentMMI; // Causes errors         }         context.write(null, currentAggregation);     } } 

Here is a truncated result from stdout when I execute the main method in eclipse with the static values:

currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 07:00:00 PDT 2013;Uptime|1.0 previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 06:00:00 PDT 2013;Uptime|0.0 equals? false ==? false   currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 08:00:00 PDT 2013;Uptime|2.0 previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 07:00:00 PDT 2013;Uptime|1.0 equals? false ==? false 

Here is a truncated result when I execute the hadoop jar:

currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 06:00:00 PDT 2013;Uptime|0.0 previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 06:00:00 PDT 2013;Uptime|0.0 equals? true ==? true  currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 07:00:00 PDT 2013;Uptime|1.0 previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 07:00:00 PDT 2013;Uptime|1.0 equals? true ==? true 

Why must I clone it for Hadoop but not in Eclipse?

This question is related to java hadoop mapreduce object-reference reducers

The answer is


It is very inefficient to store all values in memory, so the objects are reused and loaded one at a time. See this other SO question for a good explanation. Summary:

[...] when looping through the Iterable value list, each Object instance is re-used, so it only keeps one instance around at a given time.


Examples related to java

Under what circumstances can I call findViewById with an Options Menu / Action Bar item? How much should a function trust another function How to implement a simple scenario the OO way Two constructors How do I get some variable from another class in Java? this in equals method How to split a string in two and store it in a field How to do perspective fixing? String index out of range: 4 My eclipse won't open, i download the bundle pack it keeps saying error log

Examples related to hadoop

Hadoop MapReduce: Strange Result when Storing Previous Value in Memory in a Reduce Class (Java) What is the difference between spark.sql.shuffle.partitions and spark.default.parallelism? How to check Spark Version What are the pros and cons of parquet format compared to other formats? java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient How to export data from Spark SQL to CSV How to copy data from one HDFS to another HDFS? How to calculate Date difference in Hive Select top 2 rows in Hive Spark - load CSV file as DataFrame?

Examples related to mapreduce

Hadoop MapReduce: Strange Result when Storing Previous Value in Memory in a Reduce Class (Java) Java8: HashMap<X, Y> to HashMap<X, Z> using Stream / Map-Reduce / Collector What is the purpose of shuffling and sorting phase in the reducer in Map Reduce Programming? Container is running beyond memory limits Hive ParseException - cannot recognize input near 'end' 'string' Count lines in large files Good MapReduce examples What is Hive: Return Code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask Setting the number of map tasks and reduce tasks Map and Reduce in .NET

Examples related to object-reference

Hadoop MapReduce: Strange Result when Storing Previous Value in Memory in a Reduce Class (Java) Reference to a non-shared member requires an object reference occurs when calling public sub

Examples related to reducers

Hadoop MapReduce: Strange Result when Storing Previous Value in Memory in a Reduce Class (Java) Can I dispatch an action in reducer?