HBase MapReduce简单实例

HBase MapReduce简单实例

前言

在项目中使用到了HBase的MapReduce,写个小例子统计HBase table记录数

Mapper

需要继承 org.apache.hadoop.hbase.mapreduce.TableMapper 这个Mapper类,重写 map() 方法。

1
2
3
4
5
6
7
8
public class CounterMapper extends TableMapper<Text, LongWritable> {

@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String tableName = context.getConfiguration().get(TableInputFormat.INPUT_TABLE);
context.write(new Text(tableName), new LongWritable(1));
}
}

Reducer

继承 org.apache.hadoop.mapreduce.Reducer 类,重写 reduce() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CounterReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
private static AtomicLong result = new AtomicLong(0);

@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
result.addAndGet(count);
String tableName = context.getConfiguration().get(TableInputFormat.INPUT_TABLE);
context.write(new Text(tableName), new LongWritable(result.get()));
}
}

job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class CounterJob {
public static void main(String[] args) throws Exception {
Configuration config = HBaseConfiguration.create();

GenericOptionsParser optionsParser = new GenericOptionsParser(config, args);
String[] remainingArgs = optionsParser.getRemainingArgs();
if (remainingArgs.length != 2) {
System.err.println("Usage: CounterJob <tableName> <outPath>");
System.exit(2);
}

String tableName = remainingArgs[0];
String outpath = remainingArgs[1];

Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);

Job job = Job.getInstance(config, "HbaseCounterJob");
job.setJarByClass(CounterJob.class);

TableMapReduceUtil.initTableMapperJob(tableName, scan, CounterMapper.class, Text.class, LongWritable.class, job);
job.setReducerClass(CounterReducer.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(outpath));

boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
}