为什么共现矩阵* 评分矩阵=推荐结果
矩阵式决策: 利用矩阵图评估项目风险与收益 #生活技巧# #团队建设技巧# #团队决策方法#
举例说明用户3是否对商品102 :感兴趣
用户3对所有商品的评分,有评分表明用户3喜欢该商品
用户身份
项目ID1
用户评分
3
101
2
3
102
0
3
103
0
3
104
4
3
105
4.5
3
106
0
3
107
5
共现矩阵其实就是item之间的相关程度,它是由所有用户对所有item的评分决定的
例如第一篇文章显示有3个用户同时喜欢101和102
项目ID1
项目ID2
与商品ID1 和商品ID2 关联的重量
101
102
3
102
102
4
103
102
4
104
102
2
105
102
2
106
102
1
107
102
1
计算推荐分数的含义是:例如,在第5行中,用户3非常喜欢商品105(评分为4.5),其他用户也知道他们对商品105有多喜欢商品102(分数为2)。乘法意味着用户3,因为
喜欢项目105 推导出也喜欢项目102 的分数。
用户身份
项目ID1
用户评分
项目ID2
与商品ID1 和商品ID2 关联的重量
分数=用户评分* 与项目ID1 和项目ID2 关联的权重
3
101
2
102
3
2*3=6
3
102
0
102
4
0
3
103
0
102
4
0
3
104
4
102
2
4*2=8
3
105
4.5
102
2
4.5*2=9
3
106
0
102
1
0
3
107
5
102
1
5*1=5
用户3对商品102的兴趣总分
用户身份
项目ID2
总得分
3
102
28
这篇文章最后由pig2编辑于2014年6月17日19:43
提问指南:
1、推荐系统概述;
2、推荐系统指标设计;
3. Hadoop并行算法;
4、推荐系统架构;
5.MapReduce程序实现。
前言
Netflix 电影推荐的百万美元竞赛已经将“推荐”变成了时下最流行的数据挖掘算法之一。也正是因为Netflix的竞争,商业界和学术界之间发生了更深层次的技术碰撞。引发了各网站的“推荐”热潮,个性时代已经到来。
1. 推荐系统概述
电子商务网站是个性化推荐系统的重要应用领域之一。亚马逊是个性化推荐系统的活跃用户和推动者。亚马逊的推荐系统深入到网站上的各种商品,为亚马逊带来至少30%的销售额。
不仅是电子商务,推荐系统也无处不在。 QQ、人人网好友推荐;您可能感兴趣的新浪微博人物;优酷、土豆的电影推荐;豆瓣的图书推荐;大丛大众点评餐厅推荐;世纪佳缘相亲推荐;天际网职业推荐敬请期待。
推荐算法分类:
按数据使用情况:
协同过滤算法:UserCF、ItemCF、ModelCF
基于内容的推荐: 用户内容属性和项目内容属性
社交过滤:基于用户的社交网络关系
按型号:
最近邻模型: 基于距离的协同过滤算法
Latent Factor Mode (SVD):基于矩阵分解的模型
Graph:图模型、社交网络图模型
基于用户的协同过滤算法UserCF
基于用户的协同过滤通过不同用户对项目的评分来评估用户之间的相似度,并根据用户之间的相似度进行推荐。简单来说就是:推荐与该用户有相似兴趣的其他用户喜欢的物品。
用例描述:
算法实现和使用介绍请参考文章:Mahout推荐算法API详解
基于项目的协同过滤算法ItemCF
基于项目的协同过滤通过用户对不同项目的评分来评估项目之间的相似度,并根据项目之间的相似度进行推荐。简单来说就是:向用户推荐与他之前喜欢的物品相似的物品。
用例描述:
算法实现和使用介绍请参考文章:Mahout推荐算法API详解
注:基于项目的协同过滤算法是目前商用最广泛的推荐算法。
协同过滤算法实现,分2步
1. 计算物品之间的相似度2. 根据物品的相似度和历史用户行为为用户生成推荐列表
2.需求分析:推荐系统指标设计
下面我们将从一个公司案例出发,全面讲解如何设计推荐系统指标。
案例介绍
Netflix 电影推荐百万大奖赛,http://www.netflixprize.com/Netflix 官方网站:www.netflix.com
Netflix 成立于2006 年,是一家以在线电影租赁为生的公司。他们根据网友的评分判断用户可能喜欢什么电影,并根据会员看过的电影和口味偏好做出判断,混合搭配各种电影风格的需求。
在收集了会员的一些信息并为他们指定了个性化的电影推荐后,许多冷门电影进入了等待名单。考虑到公司电影资源的成本,热门电影的成本普遍较高。如果Netflix能够提高电影租赁中冷门电影的比例,自然会提高自身的盈利能力。
Netflix曾声称,约60%的会员根据推荐列表定制租赁订单。如果推荐系统无法准确猜测会员喜欢的电影类型,很容易造成多次租借冷门电影且不符合个人口味的会员流失。为了更高效地向会员推荐电影,Netflix一直致力于不断改进和完善个性化推荐服务。 2006年,它设立了百万美元奖项。谁能最好地优化Netflix 推荐算法,将获得100 万美元的奖励。到了2009 年,该赏金被一个七人开发团队赢得,Netflix 立即推出了第二笔百万美元的赏金。这充分说明了一个好的推荐算法系统是多么的重要和困难。
上图为各队在比赛中的排名!
补充说明:
1. Netflix的比赛是基于静态数据的,即给定“训练水平”并匹配“结果集”,“结果集”也是提前准备好的,所以这实际上和我们操作的系统不同天。
2. Netflix用于比赛的数据集较小,整个数据集只有666MB,而实际的推荐系统是基于大量的历史数据,会上传到GB、TB等。
Netflix 数据下载
部分训练集:http://graphlab.org/wp-content/uploads/2013/07/smallnetflix_mm.train_.gz 部分结果集:http://graphlab.org/wp-content/uploads/2013/07/smallnetflix_mm.validate.gz 完整数据集:http://www .lifecrunch.biz/wp-content/uploads/2011/04/nf_prize_dataset.tar.gz
因此,我们在真实环境中设计推荐时,必须充分考虑数据量、算法性能、结果准确性等指标。
推荐算法选择:基于项目的协同过滤算法ItemCF,并行实现
数据量:基于Hadoop架构,支持GB、TB、PB级数据量
算法检验:可以通过准确率、召回率、覆盖率、流行度等指标来判断。
结果解释:通过ItemCF的定义,对结果给出合理的解释
3.算法模型:Hadoop并行算法
这里我使用《Mahout In Action》一书第1 章第6 节中介绍的分步基于项目的协同过滤算法。第6:章分配推荐计算
测试数据集:small.csv
1,101,5.0 1,102,3.0 1,103,2.5 2,101,2.0 2,102,2.5 2,103,5.0 2,104,2.0 3,101,2.0 3,104,4.0 3,105,4.5 3,107,5.0 4,101,5.0 4 ,103,3.0 4,104,4.5 4,106,4.0 5,101,4.0 5,102, 3.0 5,103,2.0 5,104,4.0 5,105,3.5 5,106,4.0
复制代码
每行有3个字段,依次是用户ID、电影ID、用户对电影的评分(0-5分,每0.5就是一个评分分!)
算法思路:
1.建立项目的共现矩阵
2. 建立物品的用户评分矩阵
3. 矩阵计算推荐结果
1)。建立项目的共现矩阵
按用户分组,查找每个用户选择的项目,单独计数和成对计数。
[101] [102] [103] [104] [105] [106] [107] [101] 5 3 4 4 2 2 1 [102] 3 3 3 2 1 1 0 [103] 4 3 4 3 1 2 0 [104] 4 2 3 4 2 2 1 [105] 2 1 1 2 2 1 1 [106] 2 1 2 2 1 2 0 [107] 1 0 0 1 1 0 1
复制代码
2)。建立用户对物品的评分矩阵
按用户分组,查找每个用户选择的项目和评分
U3 [101] 2.0 [102] 0.0 [103] 0.0 [104] 4.0 [105] 4.5 [106] 0.0 [107] 5.0
复制代码
3)。矩阵计算推荐结果
共现矩阵*评分矩阵=推荐结果
图片来自“象夫在行动”
MapReduce任务设计
图片来自“象夫在行动”
MapRduce任务解读:
步骤1: 按用户分组,计算所有item的组合列表,得到用户对item的评分矩阵
步骤2: 统计项目组合列表并建立项目共现矩阵
步骤3:合并共现矩阵和评分矩阵
步骤4:推荐结果列表的计算
4.架构设计:推荐系统架构
上图中,左边是应用业务系统,右边是Hadoop的HDFS和MapReduce。
业务系统记录用户的行为以及项目的评分。设置系统定时器CRON,每隔xx小时增量导入数据(userid、itemid、value、time)到HDFS。导入完成后,设置系统定时器,启动MapReduce程序,运行推荐算法。计算完成后,设置系统定时器,将推荐结果数据从HDFS导出到数据库,方便以后及时查询。
5. 程序开发:MapReduce程序实现
Win7开发环境和Hadoop运行环境请参考文章:使用Maven构建Hadoop项目
创建一个新的Java 类:
Recommend.java,主任务启动器
Step1.java,按用户分组,计算所有item的组合列表,得到用户对该item的评分矩阵
Step2.java,统计item组合列表,建立item的共现矩阵
Step3.java,合并共现矩阵和评分矩阵
Step4.java,计算推荐结果列表
hdfsDAO.java,HDFS操作工具类
1)。 Recommend.java,主任务启动器
源代码:
1. 包org.conan.myhadoop.recommend;
2. 3. 导入java.util.HashMap;
4. 导入java.util.Map;
5. 导入java.util.regex.Pattern;
6. 7. 导入org.apache.hadoop.mapred.JobConf;
8. 9. 公开课推荐{
10. 11. public static final String HDFS='hdfs://192.168.1.210:9000';
12. public static final Pattern DELIMITER=Pattern.compile('[\t,]');
13. 14. public static void main(String[] args) throws Exception {
15.MapString, String 路径=new HashMapString, String();
16.path.put('data', 'logfile/small.csv');
17.path.put('Step1Input', HDFS + '/user/hdfs/recommend');
18.path.put('Step1Output', path.get('Step1Input') + '/step1');
19. 路径。 put('Step2Input', path.get('Step1Output'));
20. 路径。 put('Step2Output', path.get('Step1Input') + '/step2');
21.路径。 put('Step3Input1', path.get('Step1Output'));
22.path.put('Step3Output1', path.get('Step1Input') + '/step3_1');
23.路径。 put('Step3Input2', path.get('Step2Output'));
24.path.put('Step3Output2', path.get('Step1Input') + '/step3_2');
25. 路径。 put('Step4Input1', path.get('Step3Output1'));
26. 路径。 put('Step4Input2', path.get('Step3Output2'));
27. 路径。 put('Step4Output', path.get('Step1Input') + '/step4');
28. 29. 步骤1。运行(路径);
30. 步骤2。运行(路径);
31. 步骤3。运行1(路径);
32. 步骤3。运行2(路径);
33. 步骤4。运行(路径);
34. 系统。退出(0);
35.}
36. 37. public static JobConf config() {
38. JobConf conf=new JobConf(Recommend.class);
39. 会议。 setJobName('推荐');
40.conf.addResource('classpath:/hadoop/core-site.xml');
41.conf.addResource('classpath:/hadoop/hdfs-site.xml');
42.conf.addResource('classpath:/hadoop/mapred-site.xml');
43.返回conf;
44.}
45. 46. } 复制代码
1. /blockquote/div/font/divdivalign='left'style='font-size: 13px;'字体颜色='#4d4d4f'b2)。 Step1.java,按用户分组,计算所有item的组合列表,得到用户对items/b/font/div的评分矩阵
2、divalign='left'style='font-size: 13px;'font color='#4d4d4f'源代码:/font/divdivalign='left'style='font-size: 13px;'span style='color: RGB(77, 77, 79); line-height: 1.5;'div blockquotepackage org.conan.myhadoop.recommend;
3. 4. 导入java.io.IOException;
5. 导入java.util.Iterator;
6. 导入java.util.Map;
7. 8. 导入org.apache.hadoop.fs.Path;
9. 导入组织。阿帕奇。
hadoop.io.IntWritable; 10. import org.apache.hadoop.io.Text; 11. import org.apache.hadoop.mapred.FileInputFormat; 12. import org.apache.hadoop.mapred.FileOutputFormat; 13. import org.apache.hadoop.mapred.JobClient; 14. import org.apache.hadoop.mapred.JobConf; 15. import org.apache.hadoop.mapred.MapReduceBase; 16. import org.apache.hadoop.mapred.Mapper; 17. import org.apache.hadoop.mapred.OutputCollector; 18. import org.apache.hadoop.mapred.Reducer; 19. import org.apache.hadoop.mapred.Reporter; 20. import org.apache.hadoop.mapred.RunningJob; 21. import org.apache.hadoop.mapred.TextInputFormat; 22. import org.apache.hadoop.mapred.TextOutputFormat; 23. import org.conan.myhadoop.hdfs.HdfsDAO; 24. 25. public class Step1 { 26. 27. public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper<Object, Text, IntWritable, Text> { 28. private final static IntWritable k = new IntWritable(); 29. private final static Text v = new Text(); 30. 31. @Override 32. public void map(Object key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException { 33. String[] tokens = Recommend.DELIMITER.split(value.toString()); 34. int userID = Integer.parseInt(tokens[0]); 35. String itemID = tokens[1]; 36. String pref = tokens[2]; 37. k.set(userID); 38. v.set(itemID + ":" + pref); 39. output.collect(k, v); 40. } 41. } 42. 43. public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> { 44. private final static Text v = new Text(); 45. 46. @Override 47. public void reduce(IntWritable key, Iterator values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException { 48. StringBuilder sb = new StringBuilder(); 49. while (values.hasNext()) { 50. sb.append("," + values.next()); 51. } 52. v.set(sb.toString().replaceFirst(",", "")); 53. output.collect(key, v); 54. } 55. } 56. 57. public static void run(Map<String, String> path) throws IOException { 58. JobConf conf = Recommend.config(); 59. 60. String input = path.get("Step1Input"); 61. String output = path.get("Step1Output"); 62. 63. HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); 64. hdfs.rmr(input); 65. hdfs.mkdirs(input); 66. hdfs.copyFile(path.get("data"), input); 67. 68. conf.setMapOutputKeyClass(IntWritable.class); 69. conf.setMapOutputValueClass(Text.class); 70. 71. conf.setOutputKeyClass(IntWritable.class); 72. conf.setOutputValueClass(Text.class); 73. 74. conf.setMapperClass(Step1_ToItemPreMapper.class); 75. conf.setCombinerClass(Step1_ToUserVectorReducer.class); 76. conf.setReducerClass(Step1_ToUserVectorReducer.class); 77. 78. conf.setInputFormat(TextInputFormat.class); 79. conf.setOutputFormat(TextOutputFormat.class); 80. 81. FileInputFormat.setInputPaths(conf, new Path(input)); 82. FileOutputFormat.setOutputPath(conf, new Path(output)); 83. 84. RunningJob job = JobClient.runJob(conf); 85. while (!job.isComplete()) { 86. job.waitForCompletion(); 87. } 88. } 89. 90. } 91. 复制代码 计算结果: 1. ~ hadoop fs -cat /user/hdfs/recommend/step1/part-00000 2. 3. 1 102:3.0,103:2.5,101:5.0 4. 2 101:2.0,102:2.5,103:5.0,104:2.0 5. 3 107:5.0,101:2.0,104:4.0,105:4.5 6. 4 101:5.0,103:3.0,104:4.5,106:4.0 7. 5 101:4.0,102:3.0,103:2.0,104:4.0,105:3.5,106:4.0 复制代码 3). Step2.java,对物品组合列表进行计数,建立物品的同现矩阵 源代码: 1. package org.conan.myhadoop.recommend; 2. 3. import java.io.IOException; 4. import java.util.Iterator; 5. import java.util.Map; 6. 7. import org.apache.hadoop.fs.Path; 8. import org.apache.hadoop.io.IntWritable; 9. import org.apache.hadoop.io.LongWritable; 10. import org.apache.hadoop.io.Text; 11. import org.apache.hadoop.mapred.FileInputFormat; 12. import org.apache.hadoop.mapred.FileOutputFormat; 13. import org.apache.hadoop.mapred.JobClient; 14. import org.apache.hadoop.mapred.JobConf; 15. import org.apache.hadoop.mapred.MapReduceBase; 16. import org.apache.hadoop.mapred.Mapper; 17. import org.apache.hadoop.mapred.OutputCollector; 18. import org.apache.hadoop.mapred.Reducer; 19. import org.apache.hadoop.mapred.Reporter; 20. import org.apache.hadoop.mapred.RunningJob; 21. import org.apache.hadoop.mapred.TextInputFormat; 22. import org.apache.hadoop.mapred.TextOutputFormat; 23. import org.conan.myhadoop.hdfs.HdfsDAO; 24. 25. public class Step2 { 26. public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { 27. private final static Text k = new Text(); 28. private final static IntWritable v = new IntWritable(1); 29. 30. @Override 31. public void map(LongWritable key, Text values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 32. String[] tokens = Recommend.DELIMITER.split(values.toString()); 33. for (int i = 1; i < tokens.length; i++) { 34. String itemID = tokens[i].split(":")[0]; 35. for (int j = 1; j < tokens.length; j++) { 36. String itemID2 = tokens[j].split(":")[0]; 37. k.set(itemID + ":" + itemID2); 38. output.collect(k, v); 39. } 40. } 41. } 42. } 43. 44. public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { 45. private IntWritable result = new IntWritable(); 46. 47. @Override 48. public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 49. int sum = 0; 50. while (values.hasNext()) { 51. sum += values.next().get(); 52. } 53. result.set(sum); 54. output.collect(key, result); 55. } 56. } 57. 58. public static void run(Map<String, String> path) throws IOException { 59. JobConf conf = Recommend.config(); 60. 61. String input = path.get("Step2Input"); 62. String output = path.get("Step2Output"); 63. 64. HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); 65. hdfs.rmr(output); 66. 67. conf.setOutputKeyClass(Text.class); 68. conf.setOutputValueClass(IntWritable.class); 69. 70. conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class); 71. conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class); 72. conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class); 73. 74. conf.setInputFormat(TextInputFormat.class); 75. conf.setOutputFormat(TextOutputFormat.class); 76. 77. FileInputFormat.setInputPaths(conf, new Path(input)); 78. FileOutputFormat.setOutputPath(conf, new Path(output)); 79. 80. RunningJob job = JobClient.runJob(conf); 81. while (!job.isComplete()) { 82. job.waitForCompletion(); 83. } 84. } 85. } 复制代码 计算结果: 1. ~ hadoop fs -cat /user/hdfs/recommend/step2/part-00000 2. 3. 101:101 5 4. 101:102 3 5. 101:103 4 6. 101:104 4 7. 101:105 2 8. 101:106 2 9. 101:107 1 10. 102:101 3 11. 102:102 3 12. 102:103 3 13. 102:104 2 14. 102:105 1 15. 102:106 1 16. 103:101 4 17. 103:102 3 18. 103:103 4 19. 103:104 3 20. 103:105 1 21. 103:106 2 22. 104:101 4 23. 104:102 2 24. 104:103 3 25. 104:104 4 26. 104:105 2 27. 104:106 2 28. 104:107 1 29. 105:101 2 30. 105:102 1 31. 105:103 1 32. 105:104 2 33. 105:105 2 34. 105:106 1 35. 105:107 1 36. 106:101 2 37. 106:102 1 38. 106:103 2 39. 106:104 2 40. 106:105 1 41. 106:106 2 42. 107:101 1 43. 107:104 1 44. 107:105 1 45. 107:107 1 复制代码 4). Step3.java,合并同现矩阵和评分矩阵 源代码: 1. package org.conan.myhadoop.recommend; 2. 3. import java.io.IOException; 4. import java.util.Map; 5. 6. import org.apache.hadoop.fs.Path; 7. import org.apache.hadoop.io.IntWritable; 8. import org.apache.hadoop.io.LongWritable; 9. import org.apache.hadoop.io.Text; 10. import org.apache.hadoop.mapred.FileInputFormat; 11. import org.apache.hadoop.mapred.FileOutputFormat; 12. import org.apache.hadoop.mapred.JobClient; 13. import org.apache.hadoop.mapred.JobConf; 14. import org.apache.hadoop.mapred.MapReduceBase; 15. import org.apache.hadoop.mapred.Mapper; 16. import org.apache.hadoop.mapred.OutputCollector; 17. import org.apache.hadoop.mapred.Reporter; 18. import org.apache.hadoop.mapred.RunningJob; 19. import org.apache.hadoop.mapred.TextInputFormat; 20. import org.apache.hadoop.mapred.TextOutputFormat; 21. import org.conan.myhadoop.hdfs.HdfsDAO; 22. 23. public class Step3 { 24. 25. public static class Step31_UserVectorSplitterMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> { 26. private final static IntWritable k = new IntWritable(); 27. private final static Text v = new Text(); 28. 29. @Override 30. public void map(LongWritable key, Text values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException { 31. String[] tokens = Recommend.DELIMITER.split(values.toString()); 32. for (int i = 1; i < tokens.length; i++) { 33. String[] vector = tokens[i].split(":"); 34. int itemID = Integer.parseInt(vector[0]); 35. String pref = vector[1]; 36. 37. k.set(itemID); 38. v.set(tokens[0] + ":" + pref); 39. output.collect(k, v); 40. } 41. } 42. } 43. 44. public static void run1(Map<String, String> path) throws IOException { 45. JobConf conf = Recommend.config(); 46. 47. String input = path.get("Step3Input1"); 48. String output = path.get("Step3Output1"); 49. 50. HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); 51. hdfs.rmr(output); 52. 53. conf.setOutputKeyClass(IntWritable.class); 54. conf.setOutputValueClass(Text.class); 55. 56. conf.setMapperClass(Step31_UserVectorSplitterMapper.class); 57. 58. conf.setInputFormat(TextInputFormat.class); 59. conf.setOutputFormat(TextOutputFormat.class); 60. 61. FileInputFormat.setInputPaths(conf, new Path(input)); 62. FileOutputFormat.setOutputPath(conf, new Path(output)); 63. 64. RunningJob job = JobClient.runJob(conf); 65. while (!job.isComplete()) { 66. job.waitForCompletion(); 67. } 68. } 69. 70. public static class Step32_CooccurrenceColumnWrapperMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { 71. private final static Text k = new Text(); 72. private final static IntWritable v = new IntWritable(); 73. 74. @Override 75. public void map(LongWritable key, Text values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 76. String[] tokens = Recommend.DELIMITER.split(values.toString()); 77. k.set(tokens[0]); 78. v.set(Integer.parseInt(tokens[1])); 79. output.collect(k, v); 80. } 81. } 82. 83. public static void run2(Map<String, String> path) throws IOException { 84. JobConf conf = Recommend.config(); 85. 86. String input = path.get("Step3Input2"); 87. String output = path.get("Step3Output2"); 88. 89. HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); 90. hdfs.rmr(output); 91. 92. conf.setOutputKeyClass(Text.class); 93. conf.setOutputValueClass(IntWritable.class); 94. 95. conf.setMapperClass(Step32_CooccurrenceColumnWrapperMapper.class); 96. 97. conf.setInputFormat(TextInputFormat.class); 98. conf.setOutputFormat(TextOutputFormat.class); 99. 100. FileInputFormat.setInputPaths(conf, new Path(input)); 101. FileOutputFormat.setOutputPath(conf, new Path(output)); 102. 103. RunningJob job = JobClient.runJob(conf); 104. while (!job.isComplete()) { 105. job.waitForCompletion(); 106. } 107. } 108. 109. } 复制代码 计算结果: 1. ~ hadoop fs -cat /user/hdfs/recommend/step3_1/part-00000 2. 3. 101 5:4.0 4. 101 1:5.0 5. 101 2:2.0 6. 101 3:2.0 7. 101 4:5.0 8. 102 1:3.0 9. 102 5:3.0 10. 102 2:2.5 11. 103 2:5.0 12. 103 5:2.0 13. 103 1:2.5 14. 103 4:3.0 15. 104 2:2.0 16. 104 5:4.0 17. 104 3:4.0 18. 104 4:4.5 19. 105 3:4.5 20. 105 5:3.5 21. 106 5:4.0 22. 106 4:4.0 23. 107 3:5.0 24. 25. ~ hadoop fs -cat /user/hdfs/recommend/step3_2/part-00000 26. 27. 101:101 5 28. 101:102 3 29. 101:103 4 30. 101:104 4 31. 101:105 2 32. 101:106 2 33. 101:107 1 34. 102:101 3 35. 102:102 3 36. 102:103 3 37. 102:104 2 38. 102:105 1 39. 102:106 1 40. 103:101 4 41. 103:102 3 42. 103:103 4 43. 103:104 3 44. 103:105 1 45. 103:106 2 46. 104:101 4 47. 104:102 2 48. 104:103 3 49. 104:104 4 50. 104:105 2 51. 104:106 2 52. 104:107 1 53. 105:101 2 54. 105:102 1 55. 105:103 1 56. 105:104 2 57. 105:105 2 58. 105:106 1 59. 105:107 1 60. 106:101 2 61. 106:102 1 62. 106:103 2 63. 106:104 2 64. 106:105 1 65. 106:106 2 66. 107:101 1 67. 107:104 1 68. 107:105 1 69. 107:107 1 复制代码 5). Step4.java,计算推荐结果列表 源代码: 1. package org.conan.myhadoop.recommend; 2. 3. import java.io.IOException; 4. import java.util.ArrayList; 5. import java.util.HashMap; 6. import java.util.Iterator; 7. import java.util.List; 8. import java.util.Map; 9. 10. import org.apache.hadoop.fs.Path; 11. import org.apache.hadoop.io.IntWritable; 12. import org.apache.hadoop.io.LongWritable; 13. import org.apache.hadoop.io.Text; 14. import org.apache.hadoop.mapred.FileInputFormat; 15. import org.apache.hadoop.mapred.FileOutputFormat; 16. import org.apache.hadoop.mapred.JobClient; 17. import org.apache.hadoop.mapred.JobConf; 18. import org.apache.hadoop.mapred.MapReduceBase; 19. import org.apache.hadoop.mapred.Mapper; 20. import org.apache.hadoop.mapred.OutputCollector; 21. import org.apache.hadoop.mapred.Reducer; 22. import org.apache.hadoop.mapred.Reporter; 23. import org.apache.hadoop.mapred.RunningJob; 24. import org.apache.hadoop.mapred.TextInputFormat; 25. import org.apache.hadoop.mapred.TextOutputFormat; 26. import org.conan.myhadoop.hdfs.HdfsDAO; 27. 28. public class Step4 { 29. 30. public static class Step4_PartialMultiplyMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> { 31. private final static IntWritable k = new IntWritable(); 32. private final static Text v = new Text(); 33. 34. private final static Map<Integer, List> cooccurrenceMatrix = new HashMap<Integer, List>(); 35. 36. @Override 37. public void map(LongWritable key, Text values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException { 38. String[] tokens = Recommend.DELIMITER.split(values.toString()); 39. 40. String[] v1 = tokens[0].split(":"); 41. String[] v2 = tokens[1].split(":"); 42. 43. if (v1.length > 1) {// cooccurrence 44. int itemID1 = Integer.parseInt(v1[0]); 45. int itemID2 = Integer.parseInt(v1[1]); 46. int num = Integer.parseInt(tokens[1]); 47. 48. List list = null; 49. if (!cooccurrenceMatrix.containsKey(itemID1)) { 50. list = new ArrayList(); 51. } else { 52. list = cooccurrenceMatrix.get(itemID1); 53. } 54. list.add(new Cooccurrence(itemID1, itemID2, num)); 55. cooccurrenceMatrix.put(itemID1, list); 56. } 57. 58. if (v2.length > 1) {// userVector 59. int itemID = Integer.parseInt(tokens[0]); 60. int userID = Integer.parseInt(v2[0]); 61. double pref = Double.parseDouble(v2[1]); 62. k.set(userID); 63. for (Cooccurrence co : cooccurrenceMatrix.get(itemID)) { 64. v.set(co.getItemID2() + "," + pref * co.getNum()); 65. output.collect(k, v); 66. } 67. 68. } 69. } 70. } 71. 72. public static class Step4_AggregateAndRecommendReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> { 73. private final static Text v = new Text(); 74. 75. @Override 76. public void reduce(IntWritable key, Iterator values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException { 77. Map<String, Double> result = new HashMap<String, Double>(); 78. while (values.hasNext()) { 79. String[] str = values.next().toString().split(","); 80. if (result.containsKey(str[0])) { 81. result.put(str[0], result.get(str[0]) + Double.parseDouble(str[1])); 82. } else { 83. result.put(str[0], Double.parseDouble(str[1])); 84. } 85. } 86. Iterator iter = result.keySet().iterator(); 87. while (iter.hasNext()) { 88. String itemID = iter.next(); 89. double score = result.get(itemID); 90. v.set(itemID + "," + score); 91. output.collect(key, v); 92. } 93. } 94. } 95. 96. public static void run(Map<String, String> path) throws IOException { 97. JobConf conf = Recommend.config(); 98. 99. String input1 = path.get("Step4Input1"); 100. String input2 = path.get("Step4Input2"); 101. String output = path.get("Step4Output"); 102. 103. HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); 104. hdfs.rmr(output); 105. 106. conf.setOutputKeyClass(IntWritable.class); 107. conf.setOutputValueClass(Text.class); 108. 109. conf.setMapperClass(Step4_PartialMultiplyMapper.class); 110. conf.setCombinerClass(Step4_AggregateAndRecommendReducer.class); 111. conf.setReducerClass(Step4_AggregateAndRecommendReducer.class); 112. 113. conf.setInputFormat(TextInputFormat.class); 114. conf.setOutputFormat(TextOutputFormat.class); 115. 116. FileInputFormat.setInputPaths(conf, new Path(input1), new Path(input2)); 117. FileOutputFormat.setOutputPath(conf, new Path(output)); 118. 119. RunningJob job = JobClient.runJob(conf); 120. while (!job.isComplete()) { 121. job.waitForCompletion(); 122. } 123. } 124. 125. } 126. 127. class Cooccurrence { 128. private int itemID1; 129. private int itemID2; 130. private int num; 131. 132. public Cooccurrence(int itemID1, int itemID2, int num) { 133. super(); 134. this.itemID1 = itemID1; 135. this.itemID2 = itemID2; 136. this.num = num; 137. } 138. 139. public int getItemID1() { 140. return itemID1; 141. } 142. 143. public void setItemID1(int itemID1) { 144. this.itemID1 = itemID1; 145. } 146. 147. public int getItemID2() { 148. return itemID2; 149. } 150. 151. public void setItemID2(int itemID2) { 152. this.itemID2 = itemID2; 153. } 154. 155. public int getNum() { 156. return num; 157. } 158. 159. public void setNum(int num) { 160. this.num = num; 161. } 162. 163. } 164. 复制代码 计算结果: 1. ~ hadoop fs -cat /user/hdfs/recommend/step4/part-00000 2. 3. 1 107,5.0 4. 1 106,18.0 5. 1 105,15.5 6. 1 104,33.5 7. 1 103,39.0 8. 1 102,31.5 9. 1 101,44.0 10. 2 107,4.0 11. 2 106,20.5 12. 2 105,15.5 13. 2 104,36.0 14. 2 103,41.5 15. 2 102,32.5 16. 2 101,45.5 17. 3 107,15.5 18. 3 106,16.5 19. 3 105,26.0 20. 3 104,38.0 21. 3 103,24.5 22. 3 102,18.5 23. 3 101,40.0 24. 4 107,9.5 25. 4 106,33.0 26. 4 105,26.0 27. 4 104,55.0 28. 4 103,53.5 29. 4 102,37.0 30. 4 101,63.0 31. 5 107,11.5 32. 5 106,34.5 33. 5 105,32.0 34. 5 104,59.0 35. 5 103,56.5 36. 5 102,42.5 37. 5 101,68.0 复制代码 对Step4过程优化,请参考本文最后的补充内容。 6). HdfsDAO.java,HDFS操作工具类 详细解释,请参考文章:Hadoop编程调用HDFS 源代码: 1. package org.conan.myhadoop.hdfs; 2. 3. import java.io.IOException; 4. import java.net.URI; 5. 6. import org.apache.hadoop.conf.Configuration; 7. import org.apache.hadoop.fs.FSDataInputStream; 8. import org.apache.hadoop.fs.FSDataOutputStream; 9. import org.apache.hadoop.fs.FileStatus; 10. import org.apache.hadoop.fs.FileSystem; 11. import org.apache.hadoop.fs.Path; 12. import org.apache.hadoop.io.IOUtils; 13. import org.apache.hadoop.mapred.JobConf; 14. 15. public class HdfsDAO { 16. 17. private static final String HDFS = "hdfs://192.168.1.210:9000/"; 18. 19. public HdfsDAO(Configuration conf) { 20. this(HDFS, conf); 21. } 22. 23. public HdfsDAO(String hdfs, Configuration conf) { 24. this.hdfsPath = hdfs; 25. this.conf = conf; 26. } 27. 28. private String hdfsPath; 29. private Configuration conf; 30. 31. public static void main(String[] args) throws IOException { 32. JobConf conf = config(); 33. HdfsDAO hdfs = new HdfsDAO(conf); 34. hdfs.copyFile("datafile/item.csv", "/tmp/new"); 35. hdfs.ls("/tmp/new"); 36. } 37. 38. public static JobConf config(){ 39. JobConf conf = new JobConf(HdfsDAO.class); 40. conf.setJobName("HdfsDAO"); 41. conf.addResource("classpath:/hadoop/core-site.xml"); 42. conf.addResource("classpath:/hadoop/hdfs-site.xml"); 43. conf.addResource("classpath:/hadoop/mapred-site.xml"); 44. return conf; 45. } 46. 47. public void mkdirs(String folder) throws IOException { 48. Path path = new Path(folder); 49. FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); 50. if (!fs.exists(path)) { 51. fs.mkdirs(path); 52. System.out.println("Create: " + folder); 53. } 54. fs.close(); 55. } 56. 57. public void rmr(String folder) throws IOException { 58. Path path = new Path(folder); 59. FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); 60. fs.deleteOnExit(path); 61. System.out.println("Delete: " + folder); 62. fs.close(); 63. } 64. 65. public void ls(String folder) throws IOException { 66. Path path = new Path(folder); 67. FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); 68. FileStatus[] list = fs.listStatus(path); 69. System.out.println("ls: " + folder); 70. System.out.println("=========================================================="); 71. for (FileStatus f : list) { 72. System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen()); 73. } 74. System.out.println("=========================================================="); 75. fs.close(); 76. } 77. 78. public void createFile(String file, String content) throws IOException { 79. FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); 80. byte[] buff = content.getBytes(); 81. FSDataOutputStream os = null; 82. try { 83. os = fs.create(new Path(file)); 84. os.write(buff, 0, buff.length); 85. System.out.println("Create: " + file); 86. } finally { 87. if (os != null) 88. os.close(); 89. } 90. fs.close(); 91. } 92. 93. public void copyFile(String local, String remote) throws IOException { 94. FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); 95. fs.copyFromLocalFile(new Path(local), new Path(remote)); 96. System.out.println("copy from: " + local + " to " + remote); 97. fs.close(); 98. } 99. 100. public void download(String remote, String local) throws IOException { 101. Path path = new Path(remote); 102. FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); 103. fs.copyToLocalFile(path, new Path(local)); 104. System.out.println("download: from" + remote + " to " + local); 105. fs.close(); 106. } 107. 108. public void cat(String remoteFile) throws IOException { 109. Path path = new Path(remoteFile); 110. FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); 111. FSDataInputStream fsdis = null; 112. System.out.println("cat: " + remoteFile); 113. try { 114. fsdis =fs.open(path); 115. IOUtils.copyBytes(fsdis, System.out, 4096, false); 116. } finally { 117. IOUtils.closeStream(fsdis); 118. fs.close(); 119. } 120. } 121. } 复制代码 这样我们就自己编程实现了MapReduce化基于物品的协同过滤算法。 RHadoop的实现方案,请参考文章:RHadoop实践系列之三 R实现MapReduce的协同过滤算法 Mahout的实现方案,请参考文章:Mahout分步式程序开发 基于物品的协同过滤ItemCF 我已经把整个MapReduce的实现都放到了github上面: https://github.com/bsspirit/maven_hadoop_template/releases/tag/recommend
网址:为什么共现矩阵* 评分矩阵=推荐结果 https://www.yuejiaxmz.com/news/view/318325
相关内容
正交矩阵; 实对称矩阵; 为什么实对称矩阵一定可以对角化; AB=0 r(A)+r(B)<=n 证明; 初等矩阵; 初等矩阵的逆矩阵; 矩阵的左除右除;若A为三阶矩阵且A=a,则A*是多少
蓝桥杯 矩阵键盘的使用
设矩阵A=是正定矩阵,则a满足( )
Matalb for 语句 操作大矩阵 优化
线性代数中,从矩阵AB=E可以推出AB=BA吗
设矩阵A与B相似,且 求可逆矩阵P,使P
矩阵A^2=A为什么特征值只能是0和1
R(A+B)<=R(A)+R(B): R(AB)<=min(R(A)+R(B)): A为m×n矩阵,r(A)=n,则AX=0只有零解。设矩阵A为m×n的秩R(A)=m;Ax=b 有解;
Python:seaborn的散点图矩阵(Pairs Plots)可视化数据