博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop/MapReduce反转排序:控制规约器Reducer值的顺序
阅读量:2490 次
发布时间:2019-05-11

本文共 15251 字,大约阅读时间需要 50 分钟。

例子:计算一个给定文档集中单词的相对频度。目标是建立一个N*N矩阵M,其中N为所有给定文档的单词量,每个单元Mij包含一个特定上下文单词Wi与Wj共同出现的次数。为简单起见,将这个上下文定义为Wi的邻域。例如:给定以下单词:W1,W2,W3,W4,W5,W6

如果定义一个单词的邻域为这个单词的前两个单词和后两个单词,那么这6个单词的邻域如下:

单词    领域+-2

W1    W2,W3

W2    W1,W3,W4

W3    W1,W2,W4,W5

W4    W2,W3,W5,W6

W5    W3,W4,W6

W6    W4,W5

反转排序的MapReduce/Hadoop实现:

我们要生成两个数据序列。第一个序列三单词的总领域计数(一个单词的总共出现次数),用组合键(W,*)表示,其中W表示该单词。

第二个序列是这个单词和其他特定单词出现次数,用组合键(W,W2)表示

Hadoop的定制分区器实现

定制分区器必须确保含有相同左词(即自然键)的所有词对要发送到相同的规约器。例如,组合键{(man,tall),(man,strong),(man,moon)...}都要发送到同一个规约器。要把这些键发送到相同的规约器,必须定义一个定制分区器,它只关心左词,如例中man。

相对频度映射器

以W1,W2,W3,W4,W5,W6为例:

键    值

(W1,W2) 1

(W1,W3) 1

(W1,*) 2

(W2,W1) 1

(W2,W3) 1

(W2,W4) 1

(W2,*) 3

(W3,W1) 1

(W3,W2) 1

(W3,W4) 1

(W3,W5) 1

(W3,*) 4

(W4,W2) 1

(W4,W3) 1

(W4,W5) 1

(W4,W6) 1

(W4,*) 4

(W5,W3) 1

(W5,W4) 1

(W5,W6) 1

(W5,*) 3

(W6,W4) 1

(W6,W6) 1

(W6,*) 1

相对频度规约器

规约器的输入:

键    值

(W1,*),(W1,W2) ,(W1,W3)    2,1,1

(W2,*),(W2,W1),(W2,W3),(W2,W4) 3,1,1,1

....

代码实现:

自定义组合键

package fanzhuanpaixu; import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;//import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.io.WritableUtils;/** * Adapted from Cloud9: A Hadoop toolkit for working with big data * edu.umd.cloud9.io.pair.PairOfStrings  * * WritableComparable representing a pair of Strings.  * The elements in the pair are referred to as the left (word)  * and right (neighbor) elements. The natural sort order is: * first by the left element, and then by the right element. *  * @author Mahmoud Parsian (added few new convenient methods) * */public class PairOfWords implements WritableComparable
{ private String leftElement; private String rightElement; /** * Creates a pair. */ public PairOfWords() { } /** * Creates a pair. * * @param left the left element * @param right the right element */ public PairOfWords(String left, String right) { set(left, right); } /** * Deserializes the pair. * * @param in source for raw byte representation */ @Override public void readFields(DataInput in) throws IOException { leftElement = Text.readString(in); rightElement = Text.readString(in); } /** * Serializes this pair. * * @param out where to write the raw byte representation */ @Override public void write(DataOutput out) throws IOException { Text.writeString(out, leftElement); Text.writeString(out, rightElement); } public void setLeftElement(String leftElement) { this.leftElement = leftElement; } public void setWord(String leftElement) { setLeftElement(leftElement); } /** * Returns the left element. * * @return the left element */ public String getWord() { return leftElement; } /** * Returns the left element. * * @return the left element */ public String getLeftElement() { return leftElement; } public void setRightElement(String rightElement) { this.rightElement = rightElement; } public void setNeighbor(String rightElement) { setRightElement(rightElement); } /** * Returns the right element. * * @return the right element */ public String getRightElement() { return rightElement; } public String getNeighbor() { return rightElement; } /** * Returns the key (left element). * * @return the key */ public String getKey() { return leftElement; } /** * Returns the value (right element). * * @return the value */ public String getValue() { return rightElement; } /** * Sets the right and left elements of this pair. * * @param left the left element * @param right the right element */ public void set(String left, String right) { leftElement = left; rightElement = right; } /** * Checks two pairs for equality. * * @param obj object for comparison * @return
true if
obj is equal to this object,
false otherwise */ @Override public boolean equals(Object obj) { if (obj == null) { return false; } // if (!(obj instanceof PairOfWords)) { return false; } // PairOfWords pair = (PairOfWords) obj; return leftElement.equals(pair.getLeftElement()) && rightElement.equals(pair.getRightElement()); } /** * Defines a natural sort order for pairs. Pairs are sorted first by the left element, and then by the right * element. * * @return a value less than zero, a value greater than zero, or zero if this pair should be sorted before, sorted * after, or is equal to
obj. */ @Override public int compareTo(PairOfWords pair) { String pl = pair.getLeftElement(); String pr = pair.getRightElement(); if (leftElement.equals(pl)) { return rightElement.compareTo(pr); } return leftElement.compareTo(pl); } /** * Returns a hash code value for the pair. * * @return hash code for the pair */ @Override public int hashCode() { return leftElement.hashCode() + rightElement.hashCode(); } /** * Generates human-readable String representation of this pair. * * @return human-readable String representation of this pair */ @Override public String toString() { return "(" + leftElement + ", " + rightElement + ")"; } /** * Clones this object. * * @return clone of this object */ @Override public PairOfWords clone() { return new PairOfWords(this.leftElement, this.rightElement); } /** * Comparator optimized for
PairOfWords. */ public static class Comparator extends WritableComparator { /** * Creates a new Comparator optimized for
PairOfWords. */ public Comparator() { super(PairOfWords.class); } /** * Optimization hook. */ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstVIntL1 = WritableUtils.decodeVIntSize(b1[s1]); int firstVIntL2 = WritableUtils.decodeVIntSize(b2[s2]); int firstStrL1 = readVInt(b1, s1); int firstStrL2 = readVInt(b2, s2); int cmp = compareBytes(b1, s1 + firstVIntL1, firstStrL1, b2, s2 + firstVIntL2, firstStrL2); if (cmp != 0) { return cmp; } int secondVIntL1 = WritableUtils.decodeVIntSize(b1[s1 + firstVIntL1 + firstStrL1]); int secondVIntL2 = WritableUtils.decodeVIntSize(b2[s2 + firstVIntL2 + firstStrL2]); int secondStrL1 = readVInt(b1, s1 + firstVIntL1 + firstStrL1); int secondStrL2 = readVInt(b2, s2 + firstVIntL2 + firstStrL2); return compareBytes(b1, s1 + firstVIntL1 + firstStrL1 + secondVIntL1, secondStrL1, b2, s2 + firstVIntL2 + firstStrL2 + secondVIntL2, secondStrL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } } static { // register this comparator WritableComparator.define(PairOfWords.class, new Comparator()); }}

映射器

package fanzhuanpaixu;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;//import org.apache.commons.lang.StringUtils;//import java.io.IOException;/** * RelativeFrequencyMapper implements the map() function for Relative Frequency of words. * * @author Mahmoud Parsian * */public class RelativeFrequencyMapper        extends Mapper
{ private int neighborWindow = 2; // pair = (leftElement, rightElement) private final PairOfWords pair = new PairOfWords(); private final IntWritable totalCount = new IntWritable(); private static final IntWritable ONE = new IntWritable(1); @Override public void setup(Context context) { this.neighborWindow = context.getConfiguration().getInt("neighbor.window", 2); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = StringUtils.split(value.toString(), " "); //String[] tokens = StringUtils.split(value.toString(), "\\s+"); if ((tokens == null) || (tokens.length < 2)) { return; } for (int i = 0; i < tokens.length; i++) { tokens[i] = tokens[i].replaceAll("\\W+", ""); if (tokens[i].equals("")) { continue; } pair.setWord(tokens[i]); int start = (i - neighborWindow < 0) ? 0 : i - neighborWindow; int end = (i + neighborWindow >= tokens.length) ? tokens.length - 1 : i + neighborWindow; for (int j = start; j <= end; j++) { if (j == i) { continue; } pair.setNeighbor(tokens[j].replaceAll("\\W", "")); context.write(pair, ONE); } // pair.setNeighbor("*"); totalCount.set(end - start); context.write(pair, totalCount); } }}

分区器

package fanzhuanpaixu;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Partitioner;/**  * This is an plug-in class. * The OrderInversionPartitioner class indicates how to partition data * based on the word only (the natural key). *  * @author Mahmoud Parsian * */public class OrderInversionPartitioner    extends Partitioner
{ @Override public int getPartition(PairOfWords key, IntWritable value, int numberOfPartitions) { // key = (leftWord, rightWord) = (word, neighbor) String leftWord = key.getLeftElement(); return Math.abs( ((int) hash(leftWord)) % numberOfPartitions); } /** * Return a hashCode() of a given String object. * This is adapted from String.hashCode() * * @param str a string object * */ private static long hash(String str) { long h = 1125899906842597L; // prime int length = str.length(); for (int i = 0; i < length; i++) { h = 31*h + str.charAt(i); } return h; } }

组合器

package fanzhuanpaixu;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/**  * RelativeFrequencyCombiner implements the combine() [in Hadoop,  * we use reduce() for implementing the combine() function] function  * for MapReduce/Hadoop. * * @author Mahmoud Parsian * */public class RelativeFrequencyCombiner        extends Reducer
{ @Override protected void reduce(PairOfWords key, Iterable
values, Context context) throws IOException, InterruptedException { // int partialSum = 0; for (IntWritable value : values) { partialSum += value.get(); } // context.write(key, new IntWritable(partialSum)); }}

规约器

package fanzhuanpaixu;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * RelativeFrequencyReducer implements the reduce() function for Relative Frequency of words. * * @author Mahmoud Parsian * */public class RelativeFrequencyReducer        extends Reducer
{ private double totalCount = 0; private final DoubleWritable relativeCount = new DoubleWritable(); private String currentWord = "NOT_DEFINED"; @Override protected void reduce(PairOfWords key, Iterable
values, Context context) throws IOException, InterruptedException { if (key.getNeighbor().equals("*")) { if (key.getWord().equals(currentWord)) { totalCount += totalCount + getTotalCount(values); } else { currentWord = key.getWord(); totalCount = getTotalCount(values); } } else { int count = getTotalCount(values); relativeCount.set((double) count / totalCount); context.write(key, relativeCount); } } private int getTotalCount(Iterable
values) { int sum = 0; for (IntWritable value : values) { sum += value.get(); } return sum; }}

驱动

package fanzhuanpaixu;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;//import org.apache.log4j.Logger;/** * RelativeFrequencyDriver is driver class for computing relative frequency of words. * * @author Mahmoud Parsian * */public class RelativeFrequencyDriver        extends Configured implements Tool {    private static final Logger THE_LOGGER = Logger.getLogger(RelativeFrequencyDriver.class);    /**     * Dispatches command-line arguments to the tool by the ToolRunner.     */    public static void main(String[] args) throws Exception {    	args = new String[1];    	args[0] = "2";        int status = ToolRunner.run(new RelativeFrequencyDriver(), args);        System.exit(status);    }    @Override    public int run(String[] args) throws Exception {        int neighborWindow = Integer.parseInt(args[0]);        Path inputPath = new Path("input/paper.txt");        Path outputPath = new Path("output/fanzhuanpaixu");        Job job = new Job(new Configuration(), "RelativeFrequencyDriver");        job.setJarByClass(RelativeFrequencyDriver.class);        job.setJobName("RelativeFrequencyDriver");        // Delete the output directory if it exists already        FileSystem.get(getConf()).delete(outputPath, true);        job.getConfiguration().setInt("neighbor.window", neighborWindow);        FileInputFormat.setInputPaths(job, inputPath);        FileOutputFormat.setOutputPath(job, outputPath);        // (key,value) generated by map()        job.setMapOutputKeyClass(PairOfWords.class);        job.setMapOutputValueClass(IntWritable.class);        // (key,value) generated by reduce()        job.setOutputKeyClass(PairOfWords.class);        job.setOutputValueClass(DoubleWritable.class);        job.setMapperClass(RelativeFrequencyMapper.class);        job.setReducerClass(RelativeFrequencyReducer.class);        job.setCombinerClass(RelativeFrequencyCombiner.class);        job.setPartitionerClass(OrderInversionPartitioner.class);        job.setNumReduceTasks(3);        long startTime = System.currentTimeMillis();        job.waitForCompletion(true);        THE_LOGGER.info("Job Finished in milliseconds: " + (System.currentTimeMillis() - startTime));        return 0;    }}
运行结果:

你可能感兴趣的文章
关闭selinx nginx无法使用代理
查看>>
shell 脚本部署项目
查看>>
spring cloud zuul网关上传大文件
查看>>
springboot+mybatis日志显示SQL
查看>>
工作流中文乱码问题解决
查看>>
maven打包本地依赖包
查看>>
spring boot jpa 实现拦截器
查看>>
jenkins + maven+ gitlab 自动化部署
查看>>
Pull Request流程
查看>>
Lambda 表达式
查看>>
函数式数据处理(一)--流
查看>>
java 流使用
查看>>
java 用流收集数据
查看>>
java并行流
查看>>
CompletableFuture 组合式异步编程
查看>>
mysql查询某一个字段是否包含中文字符
查看>>
Java中equals和==的区别
查看>>
JVM内存管理及GC机制
查看>>
Java:按值传递还是按引用传递详细解说
查看>>
Java中Synchronized的用法
查看>>