在map端使用关联数组实现wordcount

系统 1648 0

  今天看Data-Intensive Text Processing with MapReduce 这本书的第三章的时候,里面有写到在map端优化wordcount。

  对数据密集型数据进行分布式处理的时候,影响数据处理速度的非常重要的一个方面就是map的输出中间结果,在传送到reduce的过程中,很多的中间数据需要进行交换以及包括一些相应的处理,然后再交给相应的reduce。其中中间数据需要在网络中传输,另外中间数据在发送到网络上之前还要写到本地磁盘上,因为网络带宽和磁盘I/O是非常耗时的相比与其他的操作,所以减少中间数据的传输将会增加算法的执行效率,通过使用combiner函数或者其他的方式减少key-value对的个数。下面是一个改进的wordcount算法。

 基本的思想是:

  在map处理的时候定义一个关联数组,然后对文档进行处理,将<word,次数>加入到关联数组中,word存在,则将相应的次数加1,不存在则直接加入到关联数组中。所有的map任务结束后,然后再在run函数中输出处理结果。

伪代码:

class Mapper

  method Map(docid a,doc d)

            H =new AssociativeArray

     for all term t 属于doc  d  do

                     H{t}=H{t}+1;

                 for all term t 属于 H do

                EMIT(term t,count H{t})

class REDUCER

     method REDUCE(term t,counts[c1,c2,...])

                sum=0

               for  all count c 属于 counts[c1,c2,...]  do

                   sum+=c

             EMIT(term t,count sum)

代码如下:

      
        import
      
       java.io.IOException;
      
import java.io.InputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;


public class Mapper extends
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable> {

int c;
HashMap<String,IntWritable> map= new HashMap<String,IntWritable>();
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
String str=value.toString();
StringTokenizer token= new StringTokenizer(str);
while (token.hasMoreTokens()){
String value1=token.nextToken();
if (map.containsKey(value1)){
// System.out.println("ni");
int p=map.get(value1).get()+1;
map.remove(value1);
map.put(value1, new IntWritable(p));
}
else {
// System.out.println("ni");
map.put(value1, new IntWritable(1));
}
}
// TODO Auto-generated method stub

c++;
System.out.println(c);



}
@Override
protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
System.out.println("cleanup");
super .cleanup(context);
}

@Override
public void run(Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
super .run(context);
System.out.println("run");
Iterator it=map.entrySet().iterator();
while (it.hasNext()){
// System.out.println("nihe");
Map.Entry<String, IntWritable> entry=(Map.Entry<String, IntWritable>) it.next();
// System.out.println("nihe");
context.write( new Text(entry.getKey()), entry.getValue());

}

}

@Override
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
// System.out.println(context.getInputSplit().toString());
        // System.out.println(context.getJobID());
  // FileSplit input=(FileSplit)context.getInputSplit();
// String path=input.getPath().toString();
// Configuration conf= new Configuration();
  // System.out.println(input.getPath().toString());
   // FileSystem fs=FileSystem.get(URI.create(path), conf);
// FSDataInputStream filein=fs.open(input.getPath());
   //  LineReader in= new LineReader(filein,conf);
// Text line= new Text();
//   int cd=in.readLine(line);
//   System.out.println(line);
     }
 }



      
        import
      
       java.io.IOException;
      

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;


public class Reducer extends
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum=0;
for (IntWritable it:values){
sum+=it.get();
}
context.write(key, new IntWritable(sum));
}




}



      
        import
      
       java.io.IOException;
      
import java.net.URI;



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class Word {

/**
*
@param args
*
@throws IOException
*
@throws ClassNotFoundException
*
@throws InterruptedException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// TODO Auto-generated method stub
Job job= new Job();
Configuration conf= new Configuration();

Path in= new Path(args[0]);
Path out= new Path(args[1]);

FileSystem fs=FileSystem.get(URI.create(args[1]), conf);
fs.delete(out);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(Mapper. class );
job.setMapOutputKeyClass(Text. class );
job.setMapOutputValueClass(IntWritable. class );



job.waitForCompletion( false );



}

}




在map端使用关联数组实现wordcount


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论