本文共 15251 字,大约阅读时间需要 50 分钟。
例子:计算一个给定文档集中单词的相对频度。目标是建立一个N*N矩阵M,其中N为所有给定文档的单词量,每个单元Mij包含一个特定上下文单词Wi与Wj共同出现的次数。为简单起见,将这个上下文定义为Wi的邻域。例如:给定以下单词:W1,W2,W3,W4,W5,W6
如果定义一个单词的邻域为这个单词的前两个单词和后两个单词,那么这6个单词的邻域如下:
单词 领域+-2
W1 W2,W3
W2 W1,W3,W4
W3 W1,W2,W4,W5
W4 W2,W3,W5,W6
W5 W3,W4,W6
W6 W4,W5
反转排序的MapReduce/Hadoop实现:
我们要生成两个数据序列。第一个序列三单词的总领域计数(一个单词的总共出现次数),用组合键(W,*)表示,其中W表示该单词。
第二个序列是这个单词和其他特定单词出现次数,用组合键(W,W2)表示
Hadoop的定制分区器实现
定制分区器必须确保含有相同左词(即自然键)的所有词对要发送到相同的规约器。例如,组合键{(man,tall),(man,strong),(man,moon)...}都要发送到同一个规约器。要把这些键发送到相同的规约器,必须定义一个定制分区器,它只关心左词,如例中man。
相对频度映射器
以W1,W2,W3,W4,W5,W6为例:
键 值
(W1,W2) 1
(W1,W3) 1
(W1,*) 2
(W2,W1) 1
(W2,W3) 1
(W2,W4) 1
(W2,*) 3
(W3,W1) 1
(W3,W2) 1
(W3,W4) 1
(W3,W5) 1
(W3,*) 4
(W4,W2) 1
(W4,W3) 1
(W4,W5) 1
(W4,W6) 1
(W4,*) 4
(W5,W3) 1
(W5,W4) 1
(W5,W6) 1
(W5,*) 3
(W6,W4) 1
(W6,W6) 1
(W6,*) 1
相对频度规约器
规约器的输入:
键 值
(W1,*),(W1,W2) ,(W1,W3) 2,1,1
(W2,*),(W2,W1),(W2,W3),(W2,W4) 3,1,1,1
....
代码实现:
自定义组合键
package fanzhuanpaixu; import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;//import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.io.WritableUtils;/** * Adapted from Cloud9: A Hadoop toolkit for working with big data * edu.umd.cloud9.io.pair.PairOfStrings * * WritableComparable representing a pair of Strings. * The elements in the pair are referred to as the left (word) * and right (neighbor) elements. The natural sort order is: * first by the left element, and then by the right element. * * @author Mahmoud Parsian (added few new convenient methods) * */public class PairOfWords implements WritableComparable{ private String leftElement; private String rightElement; /** * Creates a pair. */ public PairOfWords() { } /** * Creates a pair. * * @param left the left element * @param right the right element */ public PairOfWords(String left, String right) { set(left, right); } /** * Deserializes the pair. * * @param in source for raw byte representation */ @Override public void readFields(DataInput in) throws IOException { leftElement = Text.readString(in); rightElement = Text.readString(in); } /** * Serializes this pair. * * @param out where to write the raw byte representation */ @Override public void write(DataOutput out) throws IOException { Text.writeString(out, leftElement); Text.writeString(out, rightElement); } public void setLeftElement(String leftElement) { this.leftElement = leftElement; } public void setWord(String leftElement) { setLeftElement(leftElement); } /** * Returns the left element. * * @return the left element */ public String getWord() { return leftElement; } /** * Returns the left element. * * @return the left element */ public String getLeftElement() { return leftElement; } public void setRightElement(String rightElement) { this.rightElement = rightElement; } public void setNeighbor(String rightElement) { setRightElement(rightElement); } /** * Returns the right element. * * @return the right element */ public String getRightElement() { return rightElement; } public String getNeighbor() { return rightElement; } /** * Returns the key (left element). * * @return the key */ public String getKey() { return leftElement; } /** * Returns the value (right element). * * @return the value */ public String getValue() { return rightElement; } /** * Sets the right and left elements of this pair. * * @param left the left element * @param right the right element */ public void set(String left, String right) { leftElement = left; rightElement = right; } /** * Checks two pairs for equality. * * @param obj object for comparison * @return true
ifobj
is equal to this object,false
otherwise */ @Override public boolean equals(Object obj) { if (obj == null) { return false; } // if (!(obj instanceof PairOfWords)) { return false; } // PairOfWords pair = (PairOfWords) obj; return leftElement.equals(pair.getLeftElement()) && rightElement.equals(pair.getRightElement()); } /** * Defines a natural sort order for pairs. Pairs are sorted first by the left element, and then by the right * element. * * @return a value less than zero, a value greater than zero, or zero if this pair should be sorted before, sorted * after, or is equal toobj
. */ @Override public int compareTo(PairOfWords pair) { String pl = pair.getLeftElement(); String pr = pair.getRightElement(); if (leftElement.equals(pl)) { return rightElement.compareTo(pr); } return leftElement.compareTo(pl); } /** * Returns a hash code value for the pair. * * @return hash code for the pair */ @Override public int hashCode() { return leftElement.hashCode() + rightElement.hashCode(); } /** * Generates human-readable String representation of this pair. * * @return human-readable String representation of this pair */ @Override public String toString() { return "(" + leftElement + ", " + rightElement + ")"; } /** * Clones this object. * * @return clone of this object */ @Override public PairOfWords clone() { return new PairOfWords(this.leftElement, this.rightElement); } /** * Comparator optimized forPairOfWords
. */ public static class Comparator extends WritableComparator { /** * Creates a new Comparator optimized forPairOfWords
. */ public Comparator() { super(PairOfWords.class); } /** * Optimization hook. */ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstVIntL1 = WritableUtils.decodeVIntSize(b1[s1]); int firstVIntL2 = WritableUtils.decodeVIntSize(b2[s2]); int firstStrL1 = readVInt(b1, s1); int firstStrL2 = readVInt(b2, s2); int cmp = compareBytes(b1, s1 + firstVIntL1, firstStrL1, b2, s2 + firstVIntL2, firstStrL2); if (cmp != 0) { return cmp; } int secondVIntL1 = WritableUtils.decodeVIntSize(b1[s1 + firstVIntL1 + firstStrL1]); int secondVIntL2 = WritableUtils.decodeVIntSize(b2[s2 + firstVIntL2 + firstStrL2]); int secondStrL1 = readVInt(b1, s1 + firstVIntL1 + firstStrL1); int secondStrL2 = readVInt(b2, s2 + firstVIntL2 + firstStrL2); return compareBytes(b1, s1 + firstVIntL1 + firstStrL1 + secondVIntL1, secondStrL1, b2, s2 + firstVIntL2 + firstStrL2 + secondVIntL2, secondStrL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } } static { // register this comparator WritableComparator.define(PairOfWords.class, new Comparator()); }}
映射器
package fanzhuanpaixu;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;//import org.apache.commons.lang.StringUtils;//import java.io.IOException;/** * RelativeFrequencyMapper implements the map() function for Relative Frequency of words. * * @author Mahmoud Parsian * */public class RelativeFrequencyMapper extends Mapper{ private int neighborWindow = 2; // pair = (leftElement, rightElement) private final PairOfWords pair = new PairOfWords(); private final IntWritable totalCount = new IntWritable(); private static final IntWritable ONE = new IntWritable(1); @Override public void setup(Context context) { this.neighborWindow = context.getConfiguration().getInt("neighbor.window", 2); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = StringUtils.split(value.toString(), " "); //String[] tokens = StringUtils.split(value.toString(), "\\s+"); if ((tokens == null) || (tokens.length < 2)) { return; } for (int i = 0; i < tokens.length; i++) { tokens[i] = tokens[i].replaceAll("\\W+", ""); if (tokens[i].equals("")) { continue; } pair.setWord(tokens[i]); int start = (i - neighborWindow < 0) ? 0 : i - neighborWindow; int end = (i + neighborWindow >= tokens.length) ? tokens.length - 1 : i + neighborWindow; for (int j = start; j <= end; j++) { if (j == i) { continue; } pair.setNeighbor(tokens[j].replaceAll("\\W", "")); context.write(pair, ONE); } // pair.setNeighbor("*"); totalCount.set(end - start); context.write(pair, totalCount); } }}
分区器
package fanzhuanpaixu;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Partitioner;/** * This is an plug-in class. * The OrderInversionPartitioner class indicates how to partition data * based on the word only (the natural key). * * @author Mahmoud Parsian * */public class OrderInversionPartitioner extends Partitioner{ @Override public int getPartition(PairOfWords key, IntWritable value, int numberOfPartitions) { // key = (leftWord, rightWord) = (word, neighbor) String leftWord = key.getLeftElement(); return Math.abs( ((int) hash(leftWord)) % numberOfPartitions); } /** * Return a hashCode() of a given String object. * This is adapted from String.hashCode() * * @param str a string object * */ private static long hash(String str) { long h = 1125899906842597L; // prime int length = str.length(); for (int i = 0; i < length; i++) { h = 31*h + str.charAt(i); } return h; } }
组合器
package fanzhuanpaixu;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * RelativeFrequencyCombiner implements the combine() [in Hadoop, * we use reduce() for implementing the combine() function] function * for MapReduce/Hadoop. * * @author Mahmoud Parsian * */public class RelativeFrequencyCombiner extends Reducer{ @Override protected void reduce(PairOfWords key, Iterable values, Context context) throws IOException, InterruptedException { // int partialSum = 0; for (IntWritable value : values) { partialSum += value.get(); } // context.write(key, new IntWritable(partialSum)); }}
规约器
package fanzhuanpaixu;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * RelativeFrequencyReducer implements the reduce() function for Relative Frequency of words. * * @author Mahmoud Parsian * */public class RelativeFrequencyReducer extends Reducer驱动{ private double totalCount = 0; private final DoubleWritable relativeCount = new DoubleWritable(); private String currentWord = "NOT_DEFINED"; @Override protected void reduce(PairOfWords key, Iterable values, Context context) throws IOException, InterruptedException { if (key.getNeighbor().equals("*")) { if (key.getWord().equals(currentWord)) { totalCount += totalCount + getTotalCount(values); } else { currentWord = key.getWord(); totalCount = getTotalCount(values); } } else { int count = getTotalCount(values); relativeCount.set((double) count / totalCount); context.write(key, relativeCount); } } private int getTotalCount(Iterable values) { int sum = 0; for (IntWritable value : values) { sum += value.get(); } return sum; }}
package fanzhuanpaixu;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;//import org.apache.log4j.Logger;/** * RelativeFrequencyDriver is driver class for computing relative frequency of words. * * @author Mahmoud Parsian * */public class RelativeFrequencyDriver extends Configured implements Tool { private static final Logger THE_LOGGER = Logger.getLogger(RelativeFrequencyDriver.class); /** * Dispatches command-line arguments to the tool by the ToolRunner. */ public static void main(String[] args) throws Exception { args = new String[1]; args[0] = "2"; int status = ToolRunner.run(new RelativeFrequencyDriver(), args); System.exit(status); } @Override public int run(String[] args) throws Exception { int neighborWindow = Integer.parseInt(args[0]); Path inputPath = new Path("input/paper.txt"); Path outputPath = new Path("output/fanzhuanpaixu"); Job job = new Job(new Configuration(), "RelativeFrequencyDriver"); job.setJarByClass(RelativeFrequencyDriver.class); job.setJobName("RelativeFrequencyDriver"); // Delete the output directory if it exists already FileSystem.get(getConf()).delete(outputPath, true); job.getConfiguration().setInt("neighbor.window", neighborWindow); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // (key,value) generated by map() job.setMapOutputKeyClass(PairOfWords.class); job.setMapOutputValueClass(IntWritable.class); // (key,value) generated by reduce() job.setOutputKeyClass(PairOfWords.class); job.setOutputValueClass(DoubleWritable.class); job.setMapperClass(RelativeFrequencyMapper.class); job.setReducerClass(RelativeFrequencyReducer.class); job.setCombinerClass(RelativeFrequencyCombiner.class); job.setPartitionerClass(OrderInversionPartitioner.class); job.setNumReduceTasks(3); long startTime = System.currentTimeMillis(); job.waitForCompletion(true); THE_LOGGER.info("Job Finished in milliseconds: " + (System.currentTimeMillis() - startTime)); return 0; }}运行结果: