利用 MapReduce分析明星微博数据实战

发布时间:2024-12-10 23:30

版权声明:

本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《 阿里云开发者社区用户服务协议》和 《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写 侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

互联网时代的到来,使得名人的形象变得更加鲜活,也拉近了明星和粉丝之间的距离。歌星、影星、体育明星、作家等名人通过互联网能够轻易实现和粉丝的互动,赚钱也变得前所未有的简单。同时,互联网的飞速发展本身也造就了一批互联网明星,这些人借助新的手段,最大程度发挥了粉丝经济的能量和作用,在互联网时代赚得盆满钵满。

正是基于这样一个大背景,今天我们做一个分析明星微博数据的小项目。

1、项目需求

自定义输入格式,将明星微博数据排序后按粉丝数关注数 微博数分别输出到不同文件中。

2、数据集

明星 明星微博名称 粉丝数 关注数 微博数

俞灏明 俞灏明 10591367 206 558

李敏镐 李敏镐 22898071 11 268

林心如 林心如 57488649 214 5940

黄晓明 黄晓明 22616497 506 2011

张靓颖 张靓颖 27878708 238 3846

李娜 李娜 23309493 81 631

徐小平 徐小平 11659926 1929 13795

唐嫣 唐嫣 24301532 200 2391

有斐君 有斐君 8779383 577 4251

3、分析

自定义InputFormat读取明星微博数据,通过自定义getSortedHashtableByValue方法分别对明星的fan、followers、microblogs数据进行排序,然后利用MultipleOutputs输出不同项到不同的文件中

4、实现

1)、定义WeiBo实体类,实现WritableComparable接口

package com.buaa;    import java.io.DataInput;  import java.io.DataOutput;  import java.io.IOException;    import org.apache.hadoop.io.WritableComparable;                    public class WeiBo implements WritableComparable<Object> {            private int fan;            private int followers;            private int microblogs;            public WeiBo(){};            public WeiBo(int fan,int followers,int microblogs){          this.fan = fan;          this.followers = followers;          this.microblogs = microblogs;      }            public void set(int fan,int followers,int microblogs){          this.fan = fan;          this.followers = followers;          this.microblogs = microblogs;      }                  @Override      public void readFields(DataInput in) throws IOException {          fan  = in.readInt();          followers = in.readInt();          microblogs = in.readInt();      }                  @Override      public void write(DataOutput out) throws IOException {          out.writeInt(fan);          out.writeInt(followers);          out.writeInt(microblogs);      }            @Override      public int compareTo(Object o) {                    return 0;      }        public int getFan() {          return fan;      }        public void setFan(int fan) {          this.fan = fan;      }        public int getFollowers() {          return followers;      }        public void setFollowers(int followers) {          this.followers = followers;      }        public int getMicroblogs() {          return microblogs;      }        public void setMicroblogs(int microblogs) {          this.microblogs = microblogs;      }  } 

2)、自定义WeiboInputFormat,继承FileInputFormat抽象类

package com.buaa;    import java.io.IOException;    import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.fs.FSDataInputStream;  import org.apache.hadoop.fs.FileSystem;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.InputSplit;  import org.apache.hadoop.mapreduce.RecordReader;  import org.apache.hadoop.mapreduce.TaskAttemptContext;  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  import org.apache.hadoop.mapreduce.lib.input.FileSplit;  import org.apache.hadoop.util.LineReader;                    public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{         @Override       public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {                        return new WeiboRecordReader();       }         public class WeiboRecordReader extends RecordReader<Text, WeiBo>{              public LineReader in;                             public Text lineKey = new Text();                            public WeiBo lineValue = new WeiBo();                            @Override              public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {                                    FileSplit split = (FileSplit)input;                                    Configuration job = context.getConfiguration();                                    Path file = split.getPath();                                    FileSystem fs = file.getFileSystem(job);                                     FSDataInputStream filein = fs.open(file);                                    in = new LineReader(filein,job);               }                @Override              public boolean nextKeyValue() throws IOException, InterruptedException {                                    Text line = new Text();                                    int linesize = in.readLine(line);                                    if(linesize == 0)                       return false;                                                       String[] pieces = line.toString().split("\t");                                    if(pieces.length != 5){                        throw new IOException("Invalid record received");                    }                                     int a,b,c;                  try{                                              a = Integer.parseInt(pieces[2].trim());                                            b = Integer.parseInt(pieces[3].trim());                                            c = Integer.parseInt(pieces[4].trim());                  }catch(NumberFormatException nfe){                        throw new IOException("Error parsing floating poing value in record");                    }                                                      lineKey.set(pieces[0]);                    lineValue.set(a, b, c);                                    return true;              }                            @Override              public void close() throws IOException {                  if(in != null){                      in.close();                  }              }                @Override              public Text getCurrentKey() throws IOException, InterruptedException {                  return lineKey;              }                @Override              public WeiBo getCurrentValue() throws IOException, InterruptedException {                  return lineValue;              }                @Override              public float getProgress() throws IOException, InterruptedException {                  return 0;              }                        }  } 

3)、编写mr程序

package com.buaa;    import java.io.IOException;  import java.util.Arrays;  import java.util.Comparator;  import java.util.HashMap;  import java.util.Map;  import java.util.Map.Entry;    import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.conf.Configured;  import org.apache.hadoop.fs.FileSystem;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.io.IntWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.Job;  import org.apache.hadoop.mapreduce.Mapper;  import org.apache.hadoop.mapreduce.Reducer;  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;  import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;  import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  import org.apache.hadoop.util.Tool;  import org.apache.hadoop.util.ToolRunner;                    public class WeiboCount extends Configured implements Tool {            private static String TAB_SEPARATOR = "\t";            private static String FAN = "fan";            private static String FOLLOWERS = "followers";            private static String MICROBLOGS = "microblogs";            public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text> {          @Override          protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException {                            context.write(new Text(FAN), new Text(key.toString() + TAB_SEPARATOR + value.getFan()));                            context.write(new Text(FOLLOWERS), new Text(key.toString() + TAB_SEPARATOR + value.getFollowers()));                            context.write(new Text(MICROBLOGS), new Text(key.toString() + TAB_SEPARATOR + value.getMicroblogs()));          }      }            public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> {          private MultipleOutputs<Text, IntWritable> mos;            protected void setup(Context context) throws IOException, InterruptedException {              mos = new MultipleOutputs<Text, IntWritable>(context);          }            protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException {              Map<String,Integer> map = new HashMap< String,Integer>();                            for(Text value : Values){                                    String[] records = value.toString().split(TAB_SEPARATOR);                  map.put(records[0], Integer.parseInt(records[1].toString()));              }                                          Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(map);                            for(int i = 0; i < entries.length;i++){                  mos.write(Key.toString(),entries[i].getKey(), entries[i].getValue());              }                         }            protected void cleanup(Context context) throws IOException, InterruptedException {              mos.close();          }      }            @SuppressWarnings("deprecation")      @Override      public int run(String[] args) throws Exception {                    Configuration conf = new Configuration();                              Path mypath = new Path(args[1]);          FileSystem hdfs = mypath.getFileSystem(conf);          if (hdfs.isDirectory(mypath)) {              hdfs.delete(mypath, true);          }                              Job job = new Job(conf, "weibo");                    job.setJarByClass(WeiboCount.class);                      job.setMapperClass(WeiBoMapper.class);                    job.setMapOutputKeyClass(Text.class);                    job.setMapOutputValueClass(Text.class);                              job.setReducerClass(WeiBoReducer.class);                    job.setOutputKeyClass(Text.class);                    job.setOutputValueClass(IntWritable.class);                              FileInputFormat.addInputPath(job, new Path(args[0]));                    FileOutputFormat.setOutputPath(job, new Path(args[1]));                              job.setInputFormatClass(WeiboInputFormat.class) ;                    MultipleOutputs.addNamedOutput(job, FAN, TextOutputFormat.class, Text.class, IntWritable.class);          MultipleOutputs.addNamedOutput(job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class);          MultipleOutputs.addNamedOutput(job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class);                              LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);                                return job.waitForCompletion(true)?0:1;      }                  @SuppressWarnings("unchecked")      public static Entry<String, Integer>[] getSortedHashtableByValue(Map<String, Integer> h) {            Entry<String, Integer>[] entries = (Entry<String, Integer>[]) h.entrySet().toArray(new Entry[0]);                      Arrays.sort(entries, new Comparator<Entry<String, Integer>>() {              public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {                  return entry2.getValue().compareTo(entry1.getValue());              }           });          return entries;        }            public static void main(String[] args) throws Exception {          String[] args0 = {                  "hdfs://ljc:9000/buaa/microblog/weibo.txt",                  "hdfs://ljc:9000/buaa/microblog/out/"           };          int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0);          System.exit(ec);      }  } 

5、运行结果

本文作者:刘超ljc

来源:51CTO

网址:利用 MapReduce分析明星微博数据实战 http://c.mxgxt.com/news/view/136221

相关内容

微博怎么查看明星数据分析 如何查看微博明星的流量热度
微博粉丝趋势分析 分析微博粉丝
如何高效进行微博舆情分析?揭秘行业内的实战技巧与工具
明星数据分析怎么查
如何用数据中台,做一套关于明星的舆情分析系统?
粉丝数据分析
微博明星新社交商业模式中的数据能力提效
数据分析模型
虚拟服装大数据分析
新浪微博娱乐明星社会网络探析.doc

随便看看