-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMagnitudeRange.java
131 lines (104 loc) · 3.79 KB
/
MagnitudeRange.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import java.io.IOException;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.*;
import java.util.Scanner;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MagnitudeRange extends Configured implements Tool{
//public static String usrCh;
public static int colFlag = 0;
public static class MagnitudeRangeMapper extends MapReduceBase
implements Mapper <LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter rep)
throws IOException {
//reading each line and splitting by ,
String line = value.toString();
String [] data = line.split(",");
System.out.println("\tMapper Starts Here");
System.out.println();
System.out.println("|"+data+"|");
String str_mag = data[4].toString();
System.out.println("Magnitude: " +str_mag);
Double mag = Double.parseDouble(str_mag);
String mag_range;
if(mag>=1.00&&mag<2.00){
mag_range = "1-2";
} else if(mag>=2.00&&mag<3.00){
mag_range = "2-3";
} else if(mag>=3.00&&mag<4.00){
mag_range = "3-4";
} else if(mag>=4.00&&mag<5.00){
mag_range = "4-5";
} else if (mag>=5.00&&mag<6.00){
mag_range = "5-6";
} else{
mag_range = "0-0";
}
int count =1;
output.collect(new Text(mag_range), new IntWritable(count));
}
}
public static class MagnitudeRangeReducer extends MapReduceBase
implements Reducer <Text, IntWritable, Text, Text> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text,Text> output, Reporter rep)
throws IOException {
System.out.println("\tReducer Starts Here");
System.out.println();
int sum = 0;
if(colFlag==0){
output.collect(new Text("Range"),new Text("Total"));
colFlag = 1;
}
while(values.hasNext()){
IntWritable val = values.next();
sum += val.get();
}
String str_sum = sum+"";
output.collect(key ,new Text(str_sum));
}
}
//@Override
public int run(String[] args) throws IOException
{
return 0;
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
/*Scanner s = new Scanner(System.in);
System.out.print("\nEnter Col: ");
String col = s.next();
System.out.println("User Input: " + col);*/
BufferedReader br = new BufferedReader(new FileReader("/home/ubuntu/user_input/uip.txt"));
String line = br.readLine();
String [] input = line.split("_");
System.out.println("User Input: " + input[0] + " | " + input[1]);
int mapperCount = Integer.parseInt(input[0]);
int reducerCount = Integer.parseInt(input[1]);
JobConf conf = new JobConf(MagnitudeRange.class);
conf.setJobName("magRangeAnalysis");
conf.setNumMapTasks(mapperCount);
conf.setNumReduceTasks(reducerCount);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MagnitudeRangeMapper.class);
//conf.setCombinerClass(MagnitudeRangeReducer.class);
conf.setReducerClass(MagnitudeRangeReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
long startTime = System.currentTimeMillis();
JobClient.runJob(conf);
long totalTime = System.currentTimeMillis()- startTime;
System.out.println("\t Total Execution Time:\t"+ (totalTime/1000)+" seconds");
}
}