• Archive by category "架构设计"
  • (Page 3)

Blog Archives

Mahout分步式程序开发 基于物品的协同过滤ItemCF

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mahout-mapreduce-itemcf/

mahout-hadoop-itemcf

前言

Mahout是Hadoop家族一员,从血缘就继承了Hadoop程序的特点,支持HDFS访问和MapReduce分步式算法。随着Mahout的发展,从0.7版本开始,Mahout做了重大的升级。移除了部分算法的单机内存计算,只支持基于Hadoop的MapReduce平行计算。

从这点上,我们能看出Mahout走向大数据,坚持并行化的决心!相信在Hadoop的大框架下,Mahout最终能成为一个大数据的明星产品!

目录

  1. Mahout开发环境介绍
  2. Mahout基于Hadoop的分步环境介绍
  3. 用Mahout实现协同过滤ItemCF
  4. 模板项目上传github

1. Mahout开发环境介绍

用Maven构建Mahout项目 文章中,我们已经配置好了基于Maven的Mahout的开发环境,我们将继续完成Mahout的分步式的程序开发。

本文的mahout版本为0.8。

开发环境:

  • Win7 64bit
  • Java 1.6.0_45
  • Maven 3
  • Eclipse Juno Service Release 2
  • Mahout 0.8
  • Hadoop 1.1.2

找到pom.xml,修改mahout版本为0.8

<mahout.version>0.8</mahout.version>

然后,下载依赖库。

~ mvn clean install

由于 org.conan.mymahout.cluster06.Kmeans.java 类代码,是基于mahout-0.6的,所以会报错。我们可以先注释这个文件。

2. Mahout基于Hadoop的分步环境介绍

hadoop-mahout-cluster-dev

如上图所示,我们可以选择在win7中开发,也可以在linux中开发,开发过程我们可以在本地环境进行调试,标配的工具都是Maven和Eclipse。

Mahout在运行过程中,会把MapReduce的算法程序包,自动发布的Hadoop的集群环境中,这种开发和运行模式,就和真正的生产环境差不多了。

3. 用Mahout实现协同过滤ItemCF

实现步骤:

  • 1. 准备数据文件: item.csv
  • 2. Java程序:HdfsDAO.java
  • 3. Java程序:ItemCFHadoop.java
  • 4. 运行程序
  • 5. 推荐结果解读

1). 准备数据文件: item.csv
上传测试数据到HDFS,单机内存实验请参考文章:用Maven构建Mahout项目


~ hadoop fs -mkdir /user/hdfs/userCF
~ hadoop fs -copyFromLocal /home/conan/datafiles/item.csv /user/hdfs/userCF

~ hadoop fs -cat /user/hdfs/userCF/item.csv
1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0

2). Java程序:HdfsDAO.java
HdfsDAO.java,是一个HDFS操作的工具,用API实现Hadoop的各种HDFS命令,请参考文章:Hadoop编程调用HDFS

我们这里会用到HdfsDAO.java类中的一些方法:


        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(inPath);
        hdfs.mkdirs(inPath);
        hdfs.copyFile(localFile, inPath);
        hdfs.ls(inPath);
        hdfs.cat(inFile);

3). Java程序:ItemCFHadoop.java
用Mahout实现分步式算法,我们看到Mahout in Action中的解释。

aglorithm_2

实现程序:


package org.conan.mymahout.recommendation;

import org.apache.hadoop.mapred.JobConf;
import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
import org.conan.mymahout.hdfs.HdfsDAO;

public class ItemCFHadoop {

    private static final String HDFS = "hdfs://192.168.1.210:9000";

    public static void main(String[] args) throws Exception {
        String localFile = "datafile/item.csv";
        String inPath = HDFS + "/user/hdfs/userCF";
        String inFile = inPath + "/item.csv";
        String outPath = HDFS + "/user/hdfs/userCF/result/";
        String outFile = outPath + "/part-r-00000";
        String tmpPath = HDFS + "/tmp/" + System.currentTimeMillis();

        JobConf conf = config();
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(inPath);
        hdfs.mkdirs(inPath);
        hdfs.copyFile(localFile, inPath);
        hdfs.ls(inPath);
        hdfs.cat(inFile);

        StringBuilder sb = new StringBuilder();
        sb.append("--input ").append(inPath);
        sb.append(" --output ").append(outPath);
        sb.append(" --booleanData true");
        sb.append(" --similarityClassname org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.EuclideanDistanceSimilarity");
        sb.append(" --tempDir ").append(tmpPath);
        args = sb.toString().split(" ");

        RecommenderJob job = new RecommenderJob();
        job.setConf(conf);
        job.run(args);

        hdfs.cat(outFile);
    }

    public static JobConf config() {
        JobConf conf = new JobConf(ItemCFHadoop.class);
        conf.setJobName("ItemCFHadoop");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }
}

RecommenderJob.java,实际上就是封装了,上面整个图的分步式并行算法的执行过程!如果没有这层封装,我们需要自己去实现图中8个步骤MapReduce算法。

关于上面算法的深度剖析,请参考文章:R实现MapReduce的协同过滤算法

4). 运行程序
控制台输出:


Delete: hdfs://192.168.1.210:9000/user/hdfs/userCF
Create: hdfs://192.168.1.210:9000/user/hdfs/userCF
copy from: datafile/item.csv to hdfs://192.168.1.210:9000/user/hdfs/userCF
ls: hdfs://192.168.1.210:9000/user/hdfs/userCF
==========================================================
name: hdfs://192.168.1.210:9000/user/hdfs/userCF/item.csv, folder: false, size: 229
==========================================================
cat: hdfs://192.168.1.210:9000/user/hdfs/userCF/item.csv
1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2013-10-14 10:26:35 org.apache.hadoop.util.NativeCodeLoader 
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-10-14 10:26:35 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:35 org.apache.hadoop.io.compress.snappy.LoadSnappy 
警告: Snappy native library not loaded
2013-10-14 10:26:36 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:36 org.apache.hadoop.io.compress.CodecPool getCompressor
信息: Got brand-new compressor
2013-10-14 10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:36 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:36 org.apache.hadoop.io.compress.CodecPool getDecompressor
信息: Got brand-new decompressor
2013-10-14 10:26:36 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 42 bytes
2013-10-14 10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2013-10-14 10:26:36 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/preparePreferenceMatrix/itemIDIndex
2013-10-14 10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2013-10-14 10:26:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=187
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=3287330
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=916
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=3443292
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=645
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=229
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=46
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Map input records=21
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=14
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=84
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=376569856
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=116
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=21
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=7
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=7
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=7
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=7
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Map output records=21
2013-10-14 10:26:37 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0002
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0002_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0002_m_000000_0' done.
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 68 bytes
2013-10-14 10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0002_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0002_r_000000_0 is allowed to commit now
2013-10-14 10:26:37 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0002_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/preparePreferenceMatrix/userVectors
2013-10-14 10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0002_r_000000_0' done.
2013-10-14 10:26:38 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:38 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0002
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:   org.apache.mahout.cf.taste.hadoop.item.ToUserVectorsReducer$Counters
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     USERS=5
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=288
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=6574274
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=1374
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=6887592
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=1120
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=229
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=72
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Map input records=21
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=42
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=63
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=575930368
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=116
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=21
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=5
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=5
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Map output records=21
2013-10-14 10:26:38 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:38 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0003
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0003_m_000000_0' done.
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:38 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:38 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 89 bytes
2013-10-14 10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0003_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0003_r_000000_0 is allowed to commit now
2013-10-14 10:26:38 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0003_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/preparePreferenceMatrix/ratingMatrix
2013-10-14 10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0003_r_000000_0' done.
2013-10-14 10:26:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0003
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息: Counters: 21
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=335
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:   org.apache.mahout.cf.taste.hadoop.preparation.ToItemVectorsMapper$Elements
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     USER_RATINGS_NEGLECTED=0
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     USER_RATINGS_USED=21
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=9861349
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=1950
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=10331958
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=1751
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=288
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=93
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Map input records=5
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=14
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=336
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=775290880
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=157
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=21
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=7
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=7
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=7
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=7
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Map output records=21
2013-10-14 10:26:39 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0004
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0004_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0004_m_000000_0' done.
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:39 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:39 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 118 bytes
2013-10-14 10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0004_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0004_r_000000_0 is allowed to commit now
2013-10-14 10:26:39 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0004_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/weights
2013-10-14 10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0004_r_000000_0' done.
2013-10-14 10:26:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0004
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=381
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=13148476
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=2628
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=13780408
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=2551
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=335
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:   org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob$Counters
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     ROWS=7
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=122
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Map input records=7
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=16
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=516
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=974651392
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=158
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=24
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=8
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=8
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=8
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=5
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Map output records=24
2013-10-14 10:26:40 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0005
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0005_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0005_m_000000_0' done.
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:40 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:40 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 121 bytes
2013-10-14 10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0005_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0005_r_000000_0 is allowed to commit now
2013-10-14 10:26:40 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0005_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/pairwiseSimilarity
2013-10-14 10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0005_r_000000_0' done.
2013-10-14 10:26:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0005
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息: Counters: 21
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=392
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=16435577
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=3488
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=17230010
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=3408
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=381
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:   org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob$Counters
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     PRUNED_COOCCURRENCES=0
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     COOCCURRENCES=57
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=125
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Map input records=5
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=14
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=744
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1174011904
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=129
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=21
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=7
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=7
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=7
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=7
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Map output records=21
2013-10-14 10:26:41 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0006
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0006_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0006_m_000000_0' done.
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:41 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:41 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 158 bytes
2013-10-14 10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0006_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0006_r_000000_0 is allowed to commit now
2013-10-14 10:26:41 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0006_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/similarityMatrix
2013-10-14 10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0006_r_000000_0' done.
2013-10-14 10:26:42 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:42 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0006
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=554
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=19722740
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=4342
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=20674772
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=4354
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=392
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=162
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Map input records=7
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=14
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=599
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1373372416
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=140
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=25
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=7
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=7
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=7
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=7
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Map output records=25
2013-10-14 10:26:42 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:42 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:42 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0007
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0007_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0007_m_000000_0' done.
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0007_m_000001_0 is done. And is in the process of commiting
2013-10-14 10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0007_m_000001_0' done.
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 2 sorted segments
2013-10-14 10:26:42 org.apache.hadoop.io.compress.CodecPool getDecompressor
信息: Got brand-new decompressor
2013-10-14 10:26:42 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 2 segments left of total size: 233 bytes
2013-10-14 10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0007_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0007_r_000000_0 is allowed to commit now
2013-10-14 10:26:42 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0007_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/partialMultiply
2013-10-14 10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0007_r_000000_0' done.
2013-10-14 10:26:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0007
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=572
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=34517913
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=8751
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=36182630
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=7934
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=0
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=241
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Map input records=12
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=56
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=453
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=2558459904
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=665
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=28
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=7
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=7
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Map output records=28
2013-10-14 10:26:43 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0008
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0008_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0008_m_000000_0' done.
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:43 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:43 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 206 bytes
2013-10-14 10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0008_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0008_r_000000_0 is allowed to commit now
2013-10-14 10:26:43 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0008_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/userCF/result
2013-10-14 10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0008_r_000000_0' done.
2013-10-14 10:26:44 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:44 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0008
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=217
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=26299802
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=7357
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=27566408
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=6269
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=572
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=210
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Map input records=7
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=42
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=927
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1971453952
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=137
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=21
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=5
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=5
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Map output records=21
cat: hdfs://192.168.1.210:9000/user/hdfs/userCF/result//part-r-00000
1	[104:1.280239,106:1.1462644,105:1.0653841,107:0.33333334]
2	[106:1.560478,105:1.4795978,107:0.69935876]
3	[103:1.2475469,106:1.1944525,102:1.1462644]
4	[102:1.6462644,105:1.5277859,107:0.69935876]
5	[107:1.1993587]

5). 推荐结果解读
我们可以把上面的日志分解析成3个部分解读

  • a. 初始化环境
  • b. 算法执行
  • c. 打印推荐结果

a. 初始化环境
出初HDFS的数据目录和工作目录,并上传数据文件。


Delete: hdfs://192.168.1.210:9000/user/hdfs/userCF
Create: hdfs://192.168.1.210:9000/user/hdfs/userCF
copy from: datafile/item.csv to hdfs://192.168.1.210:9000/user/hdfs/userCF
ls: hdfs://192.168.1.210:9000/user/hdfs/userCF
==========================================================
name: hdfs://192.168.1.210:9000/user/hdfs/userCF/item.csv, folder: false, size: 229
==========================================================
cat: hdfs://192.168.1.210:9000/user/hdfs/userCF/item.csv

b. 算法执行
分别执行,上图中对应的8种MapReduce算法。
Job complete: job_local_0001
Job complete: job_local_0002
Job complete: job_local_0003
Job complete: job_local_0004
Job complete: job_local_0005
Job complete: job_local_0006
Job complete: job_local_0007
Job complete: job_local_0008

c. 打印推荐结果

方便我们看到计算后的推荐结果


cat: hdfs://192.168.1.210:9000/user/hdfs/userCF/result//part-r-00000
1	[104:1.280239,106:1.1462644,105:1.0653841,107:0.33333334]
2	[106:1.560478,105:1.4795978,107:0.69935876]
3	[103:1.2475469,106:1.1944525,102:1.1462644]
4	[102:1.6462644,105:1.5277859,107:0.69935876]
5	[107:1.1993587]

4. 模板项目上传github

https://github.com/bsspirit/maven_mahout_template/tree/mahout-0.8

大家可以下载这个项目,做为开发的起点。


~ git clone https://github.com/bsspirit/maven_mahout_template
~ git checkout mahout-0.8

我们完成了基于物品的协同过滤分步式算法实现,下面将继续介绍Mahout的Kmeans的分步式算法实现,请参考文章:Mahout分步式程序开发 聚类Kmeans

转载请注明出处:
http://blog.fens.me/hadoop-mahout-mapreduce-itemcf/

打赏作者

用Maven构建Mahout项目

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mahout-maven-eclipse/

mahout-maven-logo

前言

基于Hadoop的项目,不管是MapReduce开发,还是Mahout的开发都是在一个复杂的编程环境中开发。Java的环境问题,是困扰着每个程序员的噩梦。Java程序员,不仅要会写Java程序,还要会调linux,会配hadoop,启动hadoop,还要会自己运维。所以,新手想玩起Hadoop真不是件简单的事。

不过,我们可以尽可能的简化环境问题,让程序员只关注于写程序。特别是像算法程序员,把精力投入在算法设计上,要比花时间解决环境问题有价值的多。

目录

  1. Maven介绍和安装
  2. Mahout单机开发环境介绍
  3. 用Maven构建Mahout开发环境
  4. 用Mahout实现协同过滤userCF
  5. 用Mahout实现kmeans
  6. 模板项目上传github

1. Maven介绍和安装

请参考文章:用Maven构建Hadoop项目

开发环境

  • Win7 64bit
  • Java 1.6.0_45
  • Maven 3
  • Eclipse Juno Service Release 2
  • Mahout 0.6

这里要说明一下mahout的运行版本。

  • mahout-0.5, mahout-0.6, mahout-0.7,是基于hadoop-0.20.2x的。
  • mahout-0.8, mahout-0.9,是基于hadoop-1.1.x的。
  • mahout-0.7,有一次重大升级,去掉了多个算法的单机内存运行,并且了部分API不向前兼容。

注:本文关注于“用Maven构建Mahout的开发环境”,文中的 2个例子都是基于单机的内存实现,因此选择0.6版本。Mahout在Hadoop集群中运行会在下一篇文章介绍。

2. Mahout单机开发环境介绍

hadoop-mahout-dev

如上图所示,我们可以选择在win中开发,也可以在linux中开发,开发过程我们可以在本地环境进行调试,标配的工具都是Maven和Eclipse。

3. 用Maven构建Mahout开发环境

  • 1. 用Maven创建一个标准化的Java项目
  • 2. 导入项目到eclipse
  • 3. 增加mahout依赖,修改pom.xml
  • 4. 下载依赖

1). 用Maven创建一个标准化的Java项目


~ D:\workspace\java>mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes 
-DgroupId=org.conan.mymahout -DartifactId=myMahout -DpackageName=org.conan.mymahout -Dversion=1.0-SNAPSHOT -DinteractiveMode=false

进入项目,执行mvn命令


~ D:\workspace\java>cd myMahout
~ D:\workspace\java\myMahout>mvn clean install

2). 导入项目到eclipse

我们创建好了一个基本的maven项目,然后导入到eclipse中。 这里我们最好已安装好了Maven的插件。

mahout-eclipse-folder

3). 增加mahout依赖,修改pom.xml

这里我使用hadoop-0.6版本,同时去掉对junit的依赖,修改文件:pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.conan.mymahout</groupId>
<artifactId>myMahout</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>myMahout</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mahout.version>0.6</mahout.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
<version>${mahout.version}</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-integration</artifactId>
<version>${mahout.version}</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
</exclusion>
<exclusion>
<groupId>me.prettyprint</groupId>
<artifactId>hector-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

4). 下载依赖

~ mvn clean install

在eclipse中刷新项目:

mahout-eclipse-package

项目的依赖程序,被自动加载的库路径下面。

4. 用Mahout实现协同过滤userCF

Mahout协同过滤UserCF深度算法剖析,请参考文章:用R解析Mahout用户推荐协同过滤算法(UserCF)

实现步骤:

  • 1. 准备数据文件: item.csv
  • 2. Java程序:UserCF.java
  • 3. 运行程序
  • 4. 推荐结果解读

1). 新建数据文件: item.csv


~ mkdir datafile
~ vi datafile/item.csv

1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0

数据解释:每一行有三列,第一列是用户ID,第二列是物品ID,第三列是用户对物品的打分。

2). Java程序:UserCF.java

Mahout协同过滤的数据流,调用过程。

mahout-recommendation-process

上图摘自:Mahout in Action

新建JAVA类:org.conan.mymahout.recommendation.UserCF.java


package org.conan.mymahout.recommendation;

import java.io.File;
import java.io.IOException;
import java.util.List;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
import org.apache.mahout.cf.taste.impl.neighborhood.NearestNUserNeighborhood;
import org.apache.mahout.cf.taste.impl.recommender.GenericUserBasedRecommender;
import org.apache.mahout.cf.taste.impl.similarity.EuclideanDistanceSimilarity;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.cf.taste.recommender.Recommender;
import org.apache.mahout.cf.taste.similarity.UserSimilarity;

public class UserCF {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws IOException, TasteException {
        String file = "datafile/item.csv";
        DataModel model = new FileDataModel(new File(file));
        UserSimilarity user = new EuclideanDistanceSimilarity(model);
        NearestNUserNeighborhood neighbor = new NearestNUserNeighborhood(NEIGHBORHOOD_NUM, user, model);
        Recommender r = new GenericUserBasedRecommender(model, neighbor, user);
        LongPrimitiveIterator iter = model.getUserIDs();

        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = r.recommend(uid, RECOMMENDER_NUM);
            System.out.printf("uid:%s", uid);
            for (RecommendedItem ritem : list) {
                System.out.printf("(%s,%f)", ritem.getItemID(), ritem.getValue());
            }
            System.out.println();
        }
    }
}

3). 运行程序
控制台输出:


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
uid:1(104,4.274336)(106,4.000000)
uid:2(105,4.055916)
uid:3(103,3.360987)(102,2.773169)
uid:4(102,3.000000)
uid:5

4). 推荐结果解读

  • 向用户ID1,推荐前二个最相关的物品, 104和106
  • 向用户ID2,推荐前二个最相关的物品, 但只有一个105
  • 向用户ID3,推荐前二个最相关的物品, 103和102
  • 向用户ID4,推荐前二个最相关的物品, 但只有一个102
  • 向用户ID5,推荐前二个最相关的物品, 没有符合的

5. 用Mahout实现kmeans

  • 1. 准备数据文件: randomData.csv
  • 2. Java程序:Kmeans.java
  • 3. 运行Java程序
  • 4. mahout结果解读
  • 5. 用R语言实现Kmeans算法
  • 6. 比较Mahout和R的结果

1). 准备数据文件: randomData.csv


~ vi datafile/randomData.csv

-0.883033363823402,-3.31967192630249
-2.39312626419456,3.34726861118871
2.66976353341256,1.85144276077058
-1.09922906899594,-6.06261735207489
-4.36361936997216,1.90509905380532
-0.00351835125495037,-0.610105996559153
-2.9962958796338,-3.60959839525735
-3.27529418132066,0.0230099799641799
2.17665594420569,6.77290756817957
-2.47862038335637,2.53431833167278
5.53654901906814,2.65089785582474
5.66257474538338,6.86783609641077
-0.558946883114376,1.22332819416237
5.11728525486132,3.74663871584768
1.91240516693351,2.95874731384062
-2.49747101306535,2.05006504756875
3.98781883213459,1.00780938946366

这里只截取了一部分,更多的数据请查看源代码。

注:我是通过R语言生成的randomData.csv


x1<-cbind(x=rnorm(400,1,3),y=rnorm(400,1,3))
x2<-cbind(x=rnorm(300,1,0.5),y=rnorm(300,0,0.5))
x3<-cbind(x=rnorm(300,0,0.1),y=rnorm(300,2,0.2))
x<-rbind(x1,x2,x3)
write.table(x,file="randomData.csv",sep=",",row.names=FALSE,col.names=FALSE)

2). Java程序:Kmeans.java

Mahout中kmeans方法的算法实现过程。

mahout-kmeans-process

上图摘自:Mahout in Action

新建JAVA类:org.conan.mymahout.cluster06.Kmeans.java


package org.conan.mymahout.cluster06;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.mahout.clustering.kmeans.Cluster;
import org.apache.mahout.clustering.kmeans.KMeansClusterer;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.Vector;

public class Kmeans {

    public static void main(String[] args) throws IOException {
        List sampleData = MathUtil.readFileToVector("datafile/randomData.csv");

        int k = 3;
        double threshold = 0.01;

        List randomPoints = MathUtil.chooseRandomPoints(sampleData, k);
        for (Vector vector : randomPoints) {
            System.out.println("Init Point center: " + vector);
        }

        List clusters = new ArrayList();
        for (int i = 0; i < k; i++) {
            clusters.add(new Cluster(randomPoints.get(i), i, new EuclideanDistanceMeasure()));
        }

        List<List> finalClusters = KMeansClusterer.clusterPoints(sampleData, clusters, new EuclideanDistanceMeasure(), k, threshold);
        for (Cluster cluster : finalClusters.get(finalClusters.size() - 1)) {
            System.out.println("Cluster id: " + cluster.getId() + " center: " + cluster.getCenter().asFormatString());
        }
    }

}

3). 运行Java程序
控制台输出:


Init Point center: {0:-0.162693685149196,1:2.19951550286862}
Init Point center: {0:-0.0409782183083317,1:2.09376666042057}
Init Point center: {0:0.158401778474687,1:2.37208412905273}
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Cluster id: 0 center: {0:-2.686856800552941,1:1.8939462954763795}
Cluster id: 1 center: {0:0.6334255423230666,1:0.49472852972602105}
Cluster id: 2 center: {0:3.334520309711998,1:3.2758355898247653}

4). mahout结果解读

  • 1. Init Point center表示,kmeans算法初始时的设置的3个中心点
  • 2. Cluster center表示,聚类后找到3个中心点

5). 用R语言实现Kmeans算法
接下来为了让结果更直观,我们再用R语言,进行kmeans实验,操作相同的数据。

R语言代码:


> y<-read.csv(file="randomData.csv",sep=",",header=FALSE) 
> cl<-kmeans(y,3,iter.max = 10, nstart = 25) 
> cl$centers
          V1         V2
1 -0.4323971  2.2852949
2  0.9023786 -0.7011153
3  4.3725463  2.4622609

# 生成聚类中心的图形
> plot(y, col=c("black","blue","green")[cl$cluster])
> points(cl$centers, col="red", pch = 19)

# 画出Mahout聚类的中心
> mahout<-matrix(c(-2.686856800552941,1.8939462954763795,0.6334255423230666,0.49472852972602105,3.334520309711998,3.2758355898247653),ncol=2,byrow=TRUE) 
> points(mahout, col="violetred", pch = 19)

聚类的效果图:
kmeans-center

6). 比较Mahout和R的结果
从上图中,我们看到有 黑,蓝,绿,三种颜色的空心点,这些点就是原始的数据。

3个红色实点,是R语言kmeans后生成的3个中心。
3个紫色实点,是Mahout的kmeans后生成的3个中心。

R语言和Mahout生成的点,并不是重合的,原因有几点:

  • 1. 距离算法不一样:
    Mahout中,我们用的 “欧氏距离(EuclideanDistanceMeasure)”
    R语言中,默认是”Hartigan and Wong”
  • 2. 初始化的中心是不一样的。
  • 3. 最大迭代次数是不一样的。
  • 4. 点合并时,判断的”阈值(threshold)”是不一样的。

6. 模板项目上传github

https://github.com/bsspirit/maven_mahout_template/tree/mahout-0.6

大家可以下载这个项目,做为开发的起点。

 
~ git clone https://github.com/bsspirit/maven_mahout_template
~ git checkout mahout-0.6

我们完成了第一步,下面就将正式进入mahout算法的开发实践,并且应用到hadoop集群的环境中。

下一篇:Mahout分步式程序开发 基于物品的协同过滤ItemCF

转载请注明出处:
http://blog.fens.me/hadoop-mahout-maven-eclipse/

打赏作者

海量Web日志分析 用Hadoop提取KPI统计指标

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mapreduce-log-kpi/

hadoop-kpi-logo

前言

Web日志包含着网站最重要的信息,通过日志分析,我们可以知道网站的访问量,哪个网页访问人数最多,哪个网页最有价值等。一般中型的网站(10W的PV以上),每天会产生1G以上Web日志文件。大型或超大型的网站,可能每小时就会产生10G的数据量。

对于日志的这种规模的数据,用Hadoop进行日志分析,是最适合不过的了。

目录

  1. Web日志分析概述
  2. 需求分析:KPI指标设计
  3. 算法模型:Hadoop并行算法
  4. 架构设计:日志KPI系统架构
  5. 程序开发1:用Maven构建Hadoop项目
  6. 程序开发2:MapReduce程序实现

1. Web日志分析概述

Web日志由Web服务器产生,可能是Nginx, Apache, Tomcat等。从Web日志中,我们可以获取网站每类页面的PV值(PageView,页面访问量)、独立IP数;稍微复杂一些的,可以计算得出用户所检索的关键词排行榜、用户停留时间最高的页面等;更复杂的,构建广告点击模型、分析用户行为特征等等。

在Web日志中,每条日志通常代表着用户的一次访问行为,例如下面就是一条nginx日志:


222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939
 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1)
 AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"

拆解为以下8个变量

  • remote_addr: 记录客户端的ip地址, 222.68.172.190
  • remote_user: 记录客户端用户名称, –
  • time_local: 记录访问时间与时区, [18/Sep/2013:06:49:57 +0000]
  • request: 记录请求的url与http协议, “GET /images/my.jpg HTTP/1.1”
  • status: 记录请求状态,成功是200, 200
  • body_bytes_sent: 记录发送给客户端文件主体内容大小, 19939
  • http_referer: 用来记录从那个页面链接访问过来的, “http://www.angularjs.cn/A00n”
  • http_user_agent: 记录客户浏览器的相关信息, “Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36”

注:要更多的信息,则要用其它手段去获取,通过js代码单独发送请求,使用cookies记录用户的访问信息。

利用这些日志信息,我们可以深入挖掘网站的秘密了。

少量数据的情况

少量数据的情况(10Mb,100Mb,10G),在单机处理尚能忍受的时候,我可以直接利用各种Unix/Linux工具,awk、grep、sort、join等都是日志分析的利器,再配合perl, python,正则表达工,基本就可以解决所有的问题。

例如,我们想从上面提到的nginx日志中得到访问量最高前10个IP,实现很简单:


~ cat access.log.10 | awk '{a[$1]++} END {for(b in a) print b"\t"a[b]}' | sort -k2 -r | head -n 10
163.177.71.12   972
101.226.68.137  972
183.195.232.138 971
50.116.27.194   97
14.17.29.86     96
61.135.216.104  94
61.135.216.105  91
61.186.190.41   9
59.39.192.108   9
220.181.51.212  9

海量数据的情况

当数据量每天以10G、100G增长的时候,单机处理能力已经不能满足需求。我们就需要增加系统的复杂性,用计算机集群,存储阵列来解决。在Hadoop出现之前,海量数据存储,和海量日志分析都是非常困难的。只有少数一些公司,掌握着高效的并行计算,分步式计算,分步式存储的核心技术。

Hadoop的出现,大幅度的降低了海量数据处理的门槛,让小公司甚至是个人都能力,搞定海量数据。并且,Hadoop非常适用于日志分析系统。

2.需求分析:KPI指标设计

下面我们将从一个公司案例出发来全面的解释,如何用进行海量Web日志分析,提取KPI数据

案例介绍
某电子商务网站,在线团购业务。每日PV数100w,独立IP数5w。用户通常在工作日上午10:00-12:00和下午15:00-18:00访问量最大。日间主要是通过PC端浏览器访问,休息日及夜间通过移动设备访问较多。网站搜索浏量占整个网站的80%,PC用户不足1%的用户会消费,移动用户有5%会消费。

通过简短的描述,我们可以粗略地看出,这家电商网站的经营状况,并认识到愿意消费的用户从哪里来,有哪些潜在的用户可以挖掘,网站是否存在倒闭风险等。

KPI指标设计

  • PV(PageView): 页面访问量统计
  • IP: 页面独立IP的访问量统计
  • Time: 用户每小时PV的统计
  • Source: 用户来源域名的统计
  • Browser: 用户的访问设备统计

注:商业保密限制,无法提供电商网站的日志。
下面的内容,将以我的个人网站为例提取数据进行分析。

百度统计,对我个人网站做的统计!http://www.fens.me

基本统计指标:
hadoop-kpi-baidu

用户的访问设备统计指标:
hadoop-kpi-baidu2

从商业的角度,个人网站的特征与电商网站不太一样,没有转化率,同时跳出率也比较高。从技术的角度,同样都关注KPI指标设计。

3.算法模型:Hadoop并行算法

hadoop-kpi-log

并行算法的设计:
注:找到第一节有定义的8个变量

PV(PageView): 页面访问量统计

  • Map过程{key:$request,value:1}
  • Reduce过程{key:$request,value:求和(sum)}

IP: 页面独立IP的访问量统计

  • Map: {key:$request,value:$remote_addr}
  • Reduce: {key:$request,value:去重再求和(sum(unique))}

Time: 用户每小时PV的统计

  • Map: {key:$time_local,value:1}
  • Reduce: {key:$time_local,value:求和(sum)}

Source: 用户来源域名的统计

  • Map: {key:$http_referer,value:1}
  • Reduce: {key:$http_referer,value:求和(sum)}

Browser: 用户的访问设备统计

  • Map: {key:$http_user_agent,value:1}
  • Reduce: {key:$http_user_agent,value:求和(sum)}

4.架构设计:日志KPI系统架构

hadoop-kpi-architect

上图中,左边是Application业务系统,右边是Hadoop的HDFS, MapReduce。

  1. 日志是由业务系统产生的,我们可以设置web服务器每天产生一个新的目录,目录下面会产生多个日志文件,每个日志文件64M。
  2. 设置系统定时器CRON,夜间在0点后,向HDFS导入昨天的日志文件。
  3. 完成导入后,设置系统定时器,启动MapReduce程序,提取并计算统计指标。
  4. 完成计算后,设置系统定时器,从HDFS导出统计指标数据到数据库,方便以后的即使查询。

hadoop-kpi-process

上面这幅图,我们可以看得更清楚,数据是如何流动的。蓝色背景的部分是在Hadoop中的,接下来我们的任务就是完成MapReduce的程序实现。

5.程序开发1:用Maven构建Hadoop项目

请参考文章:用Maven构建Hadoop项目

win7的开发环境 和 Hadoop的运行环境 ,在上面文章中已经介绍过了。

我们需要放日志文件,上传的HDFS里/user/hdfs/log_kpi/目录,参考下面的命令操作


~ hadoop fs -mkdir /user/hdfs/log_kpi
~ hadoop fs -copyFromLocal /home/conan/datafiles/access.log.10 /user/hdfs/log_kpi/

我已经把整个MapReduce的实现都放到了github上面:

https://github.com/bsspirit/maven_hadoop_template/releases/tag/kpi_v1

6.程序开发2:MapReduce程序实现

开发流程:

  1. 对日志行的解析
  2. Map函数实现
  3. Reduce函数实现
  4. 启动程序实现

1). 对日志行的解析
新建文件:org.conan.myhadoop.mr.kpi.KPI.java


package org.conan.myhadoop.mr.kpi;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

/*
 * KPI Object
 */
public class KPI {
    private String remote_addr;// 记录客户端的ip地址
    private String remote_user;// 记录客户端用户名称,忽略属性"-"
    private String time_local;// 记录访问时间与时区
    private String request;// 记录请求的url与http协议
    private String status;// 记录请求状态;成功是200
    private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
    private String http_referer;// 用来记录从那个页面链接访问过来的
    private String http_user_agent;// 记录客户浏览器的相关信息

    private boolean valid = true;// 判断数据是否合法
    
    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("valid:" + this.valid);
        sb.append("\nremote_addr:" + this.remote_addr);
        sb.append("\nremote_user:" + this.remote_user);
        sb.append("\ntime_local:" + this.time_local);
        sb.append("\nrequest:" + this.request);
        sb.append("\nstatus:" + this.status);
        sb.append("\nbody_bytes_sent:" + this.body_bytes_sent);
        sb.append("\nhttp_referer:" + this.http_referer);
        sb.append("\nhttp_user_agent:" + this.http_user_agent);
        return sb.toString();
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getRemote_user() {
        return remote_user;
    }

    public void setRemote_user(String remote_user) {
        this.remote_user = remote_user;
    }

    public String getTime_local() {
        return time_local;
    }

    public Date getTime_local_Date() throws ParseException {
        SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
        return df.parse(this.time_local);
    }
    
    public String getTime_local_Date_hour() throws ParseException{
        SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
        return df.format(this.getTime_local_Date());
    }

    public void setTime_local(String time_local) {
        this.time_local = time_local;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getBody_bytes_sent() {
        return body_bytes_sent;
    }

    public void setBody_bytes_sent(String body_bytes_sent) {
        this.body_bytes_sent = body_bytes_sent;
    }

    public String getHttp_referer() {
        return http_referer;
    }
    
    public String getHttp_referer_domain(){
        if(http_referer.length()<8){ 
            return http_referer;
        }
        
        String str=this.http_referer.replace("\"", "").replace("http://", "").replace("https://", "");
        return str.indexOf("/")>0?str.substring(0, str.indexOf("/")):str;
    }

    public void setHttp_referer(String http_referer) {
        this.http_referer = http_referer;
    }

    public String getHttp_user_agent() {
        return http_user_agent;
    }

    public void setHttp_user_agent(String http_user_agent) {
        this.http_user_agent = http_user_agent;
    }

    public boolean isValid() {
        return valid;
    }

    public void setValid(boolean valid) {
        this.valid = valid;
    }

    public static void main(String args[]) {
        String line = "222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] \"GET /images/my.jpg HTTP/1.1\" 200 19939 \"http://www.angularjs.cn/A00n\" \"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36\"";
        System.out.println(line);
        KPI kpi = new KPI();
        String[] arr = line.split(" ");

        kpi.setRemote_addr(arr[0]);
        kpi.setRemote_user(arr[1]);
        kpi.setTime_local(arr[3].substring(1));
        kpi.setRequest(arr[6]);
        kpi.setStatus(arr[8]);
        kpi.setBody_bytes_sent(arr[9]);
        kpi.setHttp_referer(arr[10]);
        kpi.setHttp_user_agent(arr[11] + " " + arr[12]);
        System.out.println(kpi);

        try {
            SimpleDateFormat df = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss", Locale.US);
            System.out.println(df.format(kpi.getTime_local_Date()));
            System.out.println(kpi.getTime_local_Date_hour());
            System.out.println(kpi.getHttp_referer_domain());
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }

}

从日志文件中,取一行通过main函数写一个简单的解析测试。

控制台输出:


222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
valid:true
remote_addr:222.68.172.190
remote_user:-
time_local:18/Sep/2013:06:49:57
request:/images/my.jpg
status:200
body_bytes_sent:19939
http_referer:"http://www.angularjs.cn/A00n"
http_user_agent:"Mozilla/5.0 (Windows
2013.09.18:06:49:57
2013091806
www.angularjs.cn

我们看到日志行,被正确的解析成了kpi对象的属性。我们把解析过程,单独封装成一个方法。


    private static KPI parser(String line) {
        System.out.println(line);
        KPI kpi = new KPI();
        String[] arr = line.split(" ");
        if (arr.length > 11) {
            kpi.setRemote_addr(arr[0]);
            kpi.setRemote_user(arr[1]);
            kpi.setTime_local(arr[3].substring(1));
            kpi.setRequest(arr[6]);
            kpi.setStatus(arr[8]);
            kpi.setBody_bytes_sent(arr[9]);
            kpi.setHttp_referer(arr[10]);
            
            if (arr.length > 12) {
                kpi.setHttp_user_agent(arr[11] + " " + arr[12]);
            } else {
                kpi.setHttp_user_agent(arr[11]);
            }

            if (Integer.parseInt(kpi.getStatus()) >= 400) {// 大于400,HTTP错误
                kpi.setValid(false);
            }
        } else {
            kpi.setValid(false);
        }
        return kpi;
    }

对map方法,reduce方法,启动方法,我们单独写一个类来实现

下面将分别介绍MapReduce的实现类:

  • PV:org.conan.myhadoop.mr.kpi.KPIPV.java
  • IP: org.conan.myhadoop.mr.kpi.KPIIP.java
  • Time: org.conan.myhadoop.mr.kpi.KPITime.java
  • Browser: org.conan.myhadoop.mr.kpi.KPIBrowser.java

1). PV:org.conan.myhadoop.mr.kpi.KPIPV.java


package org.conan.myhadoop.mr.kpi;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class KPIPV { 

    public static class KPIPVMapper extends MapReduceBase implements Mapper {
        private IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, OutputCollector output, Reporter reporter) throws IOException {
            KPI kpi = KPI.filterPVs(value.toString());
            if (kpi.isValid()) {
                word.set(kpi.getRequest());
                output.collect(word, one);
            }
        }
    }

    public static class KPIPVReducer extends MapReduceBase implements Reducer {
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            result.set(sum);
            output.collect(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        String input = "hdfs://192.168.1.210:9000/user/hdfs/log_kpi/";
        String output = "hdfs://192.168.1.210:9000/user/hdfs/log_kpi/pv";

        JobConf conf = new JobConf(KPIPV.class);
        conf.setJobName("KPIPV");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");

        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(IntWritable.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(KPIPVMapper.class);
        conf.setCombinerClass(KPIPVReducer.class);
        conf.setReducerClass(KPIPVReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        JobClient.runJob(conf);
        System.exit(0);
    }
}

在程序中会调用KPI类的方法

KPI kpi = KPI.filterPVs(value.toString());

通过filterPVs方法,我们可以实现对PV,更多的控制。

在KPK.java中,增加filterPVs方法


    /**
     * 按page的pv分类
     */
    public static KPI filterPVs(String line) {
        KPI kpi = parser(line);
        Set pages = new HashSet();
        pages.add("/about");
        pages.add("/black-ip-list/");
        pages.add("/cassandra-clustor/");
        pages.add("/finance-rhive-repurchase/");
        pages.add("/hadoop-family-roadmap/");
        pages.add("/hadoop-hive-intro/");
        pages.add("/hadoop-zookeeper-intro/");
        pages.add("/hadoop-mahout-roadmap/");

        if (!pages.contains(kpi.getRequest())) {
            kpi.setValid(false);
        }
        return kpi;
    }

在filterPVs方法,我们定义了一个pages的过滤,就是只对这个页面进行PV统计。

我们运行一下KPIPV.java


2013-10-9 11:53:28 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-9 11:53:28 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-9 11:53:28 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/log_kpi/access.log.10:0+3025757
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/log_kpi/access.log.10:0+3025757
2013-10-9 11:53:30 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2013-10-9 11:53:30 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-9 11:53:30 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-9 11:53:30 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 213 bytes
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-9 11:53:30 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-9 11:53:30 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2013-10-9 11:53:30 org.apache.hadoop.mapred.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/log_kpi/pv
2013-10-9 11:53:31 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2013-10-9 11:53:33 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-9 11:53:33 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2013-10-9 11:53:34 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-9 11:53:34 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=3025757
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=183
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=545
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=6051514
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=83472
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=183
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=217
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map input records=14619
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=16
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=2004
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=376569856
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map input bytes=3025757
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=110
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=76
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=8
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=8
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=8
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=8
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map output records=76

用hadoop命令查看HDFS文件


~ hadoop fs -cat /user/hdfs/log_kpi/pv/part-00000

/about  5
/black-ip-list/ 2
/cassandra-clustor/     3
/finance-rhive-repurchase/      13
/hadoop-family-roadmap/ 13
/hadoop-hive-intro/     14
/hadoop-mahout-roadmap/ 20
/hadoop-zookeeper-intro/        6

这样我们就得到了,刚刚日志文件中的,指定页面的PV值。

指定页面,就像网站的站点地图一样,如果没有指定所有访问链接都会被找出来,通过“站点地图”的指定,我们可以更容易地找到,我们所需要的信息。

后面,其他的统计指标的提取思路,和PV的实现过程都是类似的,大家可以直接下载源代码,运行看到结果!!

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-mapreduce-log-kpi/

打赏作者

用Maven构建Hadoop项目

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-maven-eclipse/

hadoop-maven

前言

Hadoop的MapReduce环境是一个复杂的编程环境,所以我们要尽可能地简化构建MapReduce项目的过程。Maven是一个很不错的自动化项目构建工具,通过Maven来帮助我们从复杂的环境配置中解脱出来,从而标准化开发过程。所以,写MapReduce之前,让我们先花点时间把刀磨快!!当然,除了Maven还有其他的选择Gradle(推荐), Ivy….

后面将会有介绍几篇MapReduce开发的文章,都要依赖于本文中Maven的构建的MapReduce环境。

目录

  1. Maven介绍
  2. Maven安装(win)
  3. Hadoop开发环境介绍
  4. 用Maven构建Hadoop环境
  5. MapReduce程序开发
  6. 模板项目上传github

1. Maven介绍

Apache Maven,是一个Java的项目管理及自动构建工具,由Apache软件基金会所提供。基于项目对象模型(缩写:POM)概念,Maven利用一个中央信息片断能管理一个项目的构建、报告和文档等步骤。曾是Jakarta项目的子项目,现为独立Apache项目。

maven的开发者在他们开发网站上指出,maven的目标是要使得项目的构建更加容易,它把编译、打包、测试、发布等开发过程中的不同环节有机的串联了起来,并产生一致的、高质量的项目信息,使得项目成员能够及时地得到反馈。maven有效地支持了测试优先、持续集成,体现了鼓励沟通,及时反馈的软件开发理念。如果说Ant的复用是建立在”拷贝–粘贴”的基础上的,那么Maven通过插件的机制实现了项目构建逻辑的真正复用。

2. Maven安装(win)

下载Maven:http://maven.apache.org/download.cgi

下载最新的xxx-bin.zip文件,在win上解压到 D:\toolkit\maven3

并把maven/bin目录设置在环境变量PATH:

win7-maven

然后,打开命令行输入mvn,我们会看到mvn命令的运行效果


~ C:\Users\Administrator>mvn
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 0.086s
[INFO] Finished at: Mon Sep 30 18:26:58 CST 2013
[INFO] Final Memory: 2M/179M
[INFO] ------------------------------------------------------------------------
[ERROR] No goals have been specified for this build. You must specify a valid lifecycle phase or a goal in the format : or :[:]:. Available lifecycle phases are: validate, initialize, generate-sources, process-sources, generate-resources, process-resources, compile, process-class
es, generate-test-sources, process-test-sources, generate-test-resources, process-test-resources, test-compile, process-test-classes, test, prepare-package, package, pre-integration-test, integration-test, post-integration-test, verify, install, deploy, pre-clean, clean, post-clean, pre-site, site, post-site, site-deploy. -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/NoGoalSpecifiedException

安装Eclipse的Maven插件:Maven Integration for Eclipse

Maven的Eclipse插件配置

eclipse-maven

3. Hadoop开发环境介绍

hadoop-dev

如上图所示,我们可以选择在win中开发,也可以在linux中开发,本地启动Hadoop或者远程调用Hadoop,标配的工具都是Maven和Eclipse。

Hadoop集群系统环境:

  • Linux: Ubuntu 12.04.2 LTS 64bit Server
  • Java: 1.6.0_29
  • Hadoop: hadoop-1.0.3,单节点,IP:192.168.1.210

4. 用Maven构建Hadoop环境

  • 1. 用Maven创建一个标准化的Java项目
  • 2. 导入项目到eclipse
  • 3. 增加hadoop依赖,修改pom.xml
  • 4. 下载依赖
  • 5. 从Hadoop集群环境下载hadoop配置文件
  • 6. 配置本地host

1). 用Maven创建一个标准化的Java项目


~ D:\workspace\java>mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DgroupId=org.conan.myhadoop.mr
-DartifactId=myHadoop -DpackageName=org.conan.myhadoop.mr -Dversion=1.0-SNAPSHOT -DinteractiveMode=false
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom <<<
[INFO]
[INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Batch mode
[INFO] No archetype defined. Using maven-archetype-quickstart (org.apache.maven.archetypes:maven-archetype-quickstart:1.
0)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archet
ype-quickstart-1.0.jar
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archety
pe-quickstart-1.0.jar (5 KB at 4.3 KB/sec)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archet
ype-quickstart-1.0.pom
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archety
pe-quickstart-1.0.pom (703 B at 1.6 KB/sec)
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Old (1.x) Archetype: maven-archetype-quickstart:1.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.conan.myhadoop.mr
[INFO] Parameter: packageName, Value: org.conan.myhadoop.mr
[INFO] Parameter: package, Value: org.conan.myhadoop.mr
[INFO] Parameter: artifactId, Value: myHadoop
[INFO] Parameter: basedir, Value: D:\workspace\java
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] project created from Old (1.x) Archetype in dir: D:\workspace\java\myHadoop
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 8.896s
[INFO] Finished at: Sun Sep 29 20:57:07 CST 2013
[INFO] Final Memory: 9M/179M
[INFO] ------------------------------------------------------------------------

进入项目,执行mvn命令


~ D:\workspace\java>cd myHadoop
~ D:\workspace\java\myHadoop>mvn clean install
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ myHadoop ---
[INFO] Building jar: D:\workspace\java\myHadoop\target\myHadoop-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-install-plugin:2.3.1:install (default-install) @ myHadoop ---
[INFO] Installing D:\workspace\java\myHadoop\target\myHadoop-1.0-SNAPSHOT.jar to C:\Users\Administrator\.m2\repository\o
rg\conan\myhadoop\mr\myHadoop\1.0-SNAPSHOT\myHadoop-1.0-SNAPSHOT.jar
[INFO] Installing D:\workspace\java\myHadoop\pom.xml to C:\Users\Administrator\.m2\repository\org\conan\myhadoop\mr\myHa
doop\1.0-SNAPSHOT\myHadoop-1.0-SNAPSHOT.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.348s
[INFO] Finished at: Sun Sep 29 20:58:43 CST 2013
[INFO] Final Memory: 11M/179M
[INFO] ------------------------------------------------------------------------

2). 导入项目到eclipse

我们创建好了一个基本的maven项目,然后导入到eclipse中。 这里我们最好已安装好了Maven的插件。

hadoop-eclipse

3). 增加hadoop依赖

这里我使用hadoop-1.0.3版本,修改文件:pom.xml


~ vi pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.conan.myhadoop.mr</groupId>
<artifactId>myHadoop</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>myHadoop</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.3</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

4). 下载依赖

下载依赖:

~ mvn clean install

在eclipse中刷新项目:

hadoop-eclipse-maven

项目的依赖程序,被自动加载的库路径下面。

5). 从Hadoop集群环境下载hadoop配置文件

    • core-site.xml
    • hdfs-site.xml
    • mapred-site.xml

查看core-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://master:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/conan/hadoop/tmp</value>
</property>
<property>
<name>io.sort.mb</name>
<value>256</value>
</property>
</configuration>

查看hdfs-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>dfs.data.dir</name>
<value>/home/conan/hadoop/data</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>

查看mapred-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>hdfs://master:9001</value>
</property>
</configuration>

保存在src/main/resources/hadoop目录下面

hadoop-config

删除原自动生成的文件:App.java和AppTest.java

6).配置本地host,增加master的域名指向


~ vi c:/Windows/System32/drivers/etc/hosts

192.168.1.210 master

6. MapReduce程序开发

编写一个简单的MapReduce程序,实现wordcount功能。

新一个Java文件:WordCount.java


package org.conan.myhadoop.mr;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {

    public static class WordCountMapper extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                output.collect(word, one);
            }

        }
    }

    public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            result.set(sum);
            output.collect(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        String input = "hdfs://192.168.1.210:9000/user/hdfs/o_t_account";
        String output = "hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result";

        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("WordCount");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(WordCountMapper.class);
        conf.setCombinerClass(WordCountReducer.class);
        conf.setReducerClass(WordCountReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        JobClient.runJob(conf);
        System.exit(0);
    }

}

启动Java APP.

控制台错误


2013-9-30 19:25:02 org.apache.hadoop.util.NativeCodeLoader 
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-9-30 19:25:02 org.apache.hadoop.security.UserGroupInformation doAs
严重: PriviledgedActionException as:Administrator cause:java.io.IOException: Failed to set permissions of path: \tmp\hadoop-Administrator\mapred\staging\Administrator1702422322\.staging to 0700
Exception in thread "main" java.io.IOException: Failed to set permissions of path: \tmp\hadoop-Administrator\mapred\staging\Administrator1702422322\.staging to 0700
	at org.apache.hadoop.fs.FileUtil.checkReturnValue(FileUtil.java:689)
	at org.apache.hadoop.fs.FileUtil.setPermission(FileUtil.java:662)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:509)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:344)
	at org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:189)
	at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:116)
	at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:856)
	at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
	at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
	at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:824)
	at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261)
	at org.conan.myhadoop.mr.WordCount.main(WordCount.java:78)

这个错误是win中开发特有的错误,文件权限问题,在Linux下可以正常运行。

解决方法是,修改/hadoop-1.0.3/src/core/org/apache/hadoop/fs/FileUtil.java文件

688-692行注释,然后重新编译源代码,重新打一个hadoop.jar的包。


685 private static void checkReturnValue(boolean rv, File p,
686                                        FsPermission permission
687                                        ) throws IOException {
688     /*if (!rv) {
689       throw new IOException("Failed to set permissions of path: " + p +
690                             " to " +
691                             String.format("%04o", permission.toShort()));
692     }*/
693   }

我这里自己打了一个hadoop-core-1.0.3.jar包,放到了lib下面。

我们还要替换maven中的hadoop类库。


~ cp lib/hadoop-core-1.0.3.jar C:\Users\Administrator\.m2\repository\org\apache\hadoop\hadoop-core\1.0.3\hadoop-core-1.0.3.jar

再次启动Java APP,控制台输出:


2013-9-30 19:50:49 org.apache.hadoop.util.NativeCodeLoader 
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-9-30 19:50:49 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-9-30 19:50:49 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
2013-9-30 19:50:49 org.apache.hadoop.io.compress.snappy.LoadSnappy 
警告: Snappy native library not loaded
2013-9-30 19:50:49 org.apache.hadoop.mapred.FileInputFormat listStatus
信息: Total input paths to process : 4
2013-9-30 19:50:50 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
2013-9-30 19:50:50 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:50 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2013-9-30 19:50:51 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 0% reduce 0%
2013-9-30 19:50:53 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00003:0+119
2013-9-30 19:50:53 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2013-9-30 19:50:53 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:53 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
2013-9-30 19:50:54 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2013-9-30 19:50:56 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00000:0+113
2013-9-30 19:50:56 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000001_0' done.
2013-9-30 19:50:56 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:56 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000002_0 is done. And is in the process of commiting
2013-9-30 19:50:59 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00001:0+110
2013-9-30 19:50:59 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00001:0+110
2013-9-30 19:50:59 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000002_0' done.
2013-9-30 19:50:59 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:59 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000003_0 is done. And is in the process of commiting
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00002:0+79
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000003_0' done.
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-9-30 19:51:02 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 4 sorted segments
2013-9-30 19:51:02 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 4 segments left of total size: 442 bytes
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2013-9-30 19:51:02 org.apache.hadoop.mapred.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result
2013-9-30 19:51:05 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-9-30 19:51:05 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2013-9-30 19:51:06 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-9-30 19:51:06 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=421
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=348
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=7377
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=1535
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=209510
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=348
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=458
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map input records=11
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=30
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=509
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1838546944
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map input bytes=421
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=452
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=22
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=15
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=13
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=15
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=13
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map output records=22

成功运行了wordcount程序,通过命令我们查看输出结果


~ hadoop fs -ls hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result

Found 2 items
-rw-r--r--   3 Administrator supergroup          0 2013-09-30 19:51 /user/hdfs/o_t_account/result/_SUCCESS
-rw-r--r--   3 Administrator supergroup        348 2013-09-30 19:51 /user/hdfs/o_t_account/result/part-00000

~ hadoop fs -cat hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result/part-00000

1,abc@163.com,2013-04-22        1
10,ade121@sohu.com,2013-04-23   1
11,addde@sohu.com,2013-04-23    1
17:21:24.0      5
2,dedac@163.com,2013-04-22      1
20:21:39.0      6
3,qq8fed@163.com,2013-04-22     1
4,qw1@163.com,2013-04-22        1
5,af3d@163.com,2013-04-22       1
6,ab34@163.com,2013-04-22       1
7,q8d1@gmail.com,2013-04-23     1
8,conan@gmail.com,2013-04-23    1
9,adeg@sohu.com,2013-04-23      1

这样,我们就实现了在win7中的开发,通过Maven构建Hadoop依赖环境,在Eclipse中开发MapReduce的程序,然后运行JavaAPP。Hadoop应用会自动把我们的MR程序打成jar包,再上传的远程的hadoop环境中运行,返回日志在Eclipse控制台输出。

7. 模板项目上传github

https://github.com/bsspirit/maven_hadoop_template

大家可以下载这个项目,做为开发的起点。

~ git clone https://github.com/bsspirit/maven_hadoop_template.git

我们完成第一步,下面就将正式进入MapReduce开发实践。

 

转载请注明出处:
http://blog.fens.me/hadoop-maven-eclipse/

打赏作者

解读Nodejs多核处理模块cluster

从零开始nodejs系列文章,将介绍如何利Javascript做为服务端脚本,通过Nodejs框架web开发。Nodejs框架是基于V8的引擎,是目前速度最快的Javascript引擎。chrome浏览器就基于V8,同时打开20-30个网页都很流畅。Nodejs标准的web开发框架Express,可以帮助我们迅速建立web站点,比起PHP的开发效率更高,而且学习曲线更低。非常适合小型网站,个性化网站,我们自己的Geek网站!!

关于作者

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/nodejs-core-cluster/

nodejs-cluster

前言
大家都知道nodejs是一个单进程单线程的服务器引擎,不管有多么的强大硬件,只能利用到单个CPU进行计算。所以,有人开发了第三方的cluster,让node可以利用多核CPU实现并行。

随着nodejs的发展,让nodejs上生产环境,就必须是支持多进程多核处理!在V0.6.0版本,Nodejs内置了cluster的特性。自此,Nodejs终于可以作为一个独立的应用开发解决方案,映入大家眼帘了。

目录

  1. cluster介绍
  2. cluster的简单使用
  3. cluster的工作原理
  4. cluster的API
  5. master和worker的通信
  6. 用cluster实现负载均衡(Load Balance) — win7失败
  7. 用cluster实现负载均衡(Load Balance) — ubuntu成功
  8. cluster负载均衡策略的测试

1. cluster介绍

cluster是一个nodejs内置的模块,用于nodejs多核处理。cluster模块,可以帮助我们简化多进程并行化程序的开发难度,轻松构建一个用于负载均衡的集群。

2. cluster的简单使用

我的系统环境

  • win7 64bit
  • Nodejs:v0.10.5
  • Npm:1.2.19

在win的环境中,我们通过cluster启动多核的node提供web服务。

新建工程目录:


~ D:\workspace\javascript>mkdir nodejs-cluster && cd nodejs-cluster

新建文件:app.js


~ vi app.js

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log("master start...");

    // Fork workers.
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    cluster.on('listening',function(worker,address){
        console.log('listening: worker ' + worker.process.pid +', Address: '+address.address+":"+address.port);
    });

    cluster.on('exit', function(worker, code, signal) {
        console.log('worker ' + worker.process.pid + ' died');
    });
} else {
    http.createServer(function(req, res) {
        res.writeHead(200);
        res.end("hello world\n");
    }).listen(0);
}

在控制台启动node程序


~ D:\workspace\javascript\nodejs-cluster>node app.js

master start...
listening: worker 2368, Address: 0.0.0.0:57132
listening: worker 1880, Address: 0.0.0.0:57132
listening: worker 1384, Address: 0.0.0.0:57132
listening: worker 1652, Address: 0.0.0.0:57132

master是总控节点,worker是运行节点。然后根据CPU的数量,启动worker。我本地是双核双通道的CPU,所以被检测为4核,启动了4个worker。

3. cluster的工作原理

每个worker进程通过使用child_process.fork()函数,基于IPC(Inter-Process Communication,进程间通信),实现与master进程间通信。

当worker使用server.listen(...)函数时 ,将参数序列传递给master进程。如果master进程已经匹配workers,会将传递句柄给工人。如果master没有匹配好worker,那么会创建一个worker,再传递并句柄传递给worker。

在边界条件,有3个有趣的行为:
注:下面server.listen(),是对底层“http.Server-->net.Server”类的调用。

  • 1. server.listen({fd: 7}):在master和worker通信过程,通过传递文件,master会监听“文件描述为7”,而不是传递“文件描述为7”的引用。
  • 2. server.listen(handle):master和worker通信过程,通过handle函数进行通信,而不用进程联系
  • 3. server.listen(0):在master和worker通信过程,集群中的worker会打开一个随机端口共用,通过socket通信,像上例中的57132

当多个进程都在 accept() 同样的资源的时候,操作系统的负载均衡非常高效。Node.js没有路由逻辑,worker之间没有共享状态。所以,程序要设计得简单一些,比如基于内存的session。

因为workers都是独力运行的,根据程序的需要,它们可以被独立删除或者重启,worker并不相互影响。只要还有workers存活,则master将继续接收连接。Node不会自动维护workers的数目。我们可以建立自己的连接池。

4. cluster的API

官网地址:http://nodejs.org/api/cluster.html#cluster_cluster

cluster对象
cluster的各种属性和函数

  • cluster.setttings:配置集群参数对象
  • cluster.isMaster:判断是不是master节点
  • cluster.isWorker:判断是不是worker节点
  • Event: 'fork': 监听创建worker进程事件
  • Event: 'online': 监听worker创建成功事件
  • Event: 'listening': 监听worker向master状态事件
  • Event: 'disconnect': 监听worker断线事件
  • Event: 'exit': 监听worker退出事件
  • Event: 'setup': 监听setupMaster事件
  • cluster.setupMaster([settings]): 设置集群参数
  • cluster.fork([env]): 创建worker进程
  • cluster.disconnect([callback]): 关闭worket进程
  • cluster.worker: 获得当前的worker对象
  • cluster.workers: 获得集群中所有存活的worker对象

worker对象
worker的各种属性和函数:可以通过cluster.workers, cluster.worket获得。

  • worker.id: 进程ID号
  • worker.process: ChildProcess对象
  • worker.suicide: 在disconnect()后,判断worker是否自杀
  • worker.send(message, [sendHandle]): master给worker发送消息。注:worker给发master发送消息要用process.send(message)
  • worker.kill([signal='SIGTERM']): 杀死指定的worker,别名destory()
  • worker.disconnect(): 断开worker连接,让worker自杀
  • Event: 'message': 监听master和worker的message事件
  • Event: 'online': 监听指定的worker创建成功事件
  • Event: 'listening': 监听master向worker状态事件
  • Event: 'disconnect': 监听worker断线事件
  • Event: 'exit': 监听worker退出事件

5. master和worker的通信

实现cluster的API,让master和worker相互通信。

新建文件: cluster.js


~ vi cluster.js

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log('[master] ' + "start master...");

    for (var i = 0; i < numCPUs; i++) {
        var wk = cluster.fork();
        wk.send('[master] ' + 'hi worker' + wk.id);
    }

    cluster.on('fork', function (worker) {
        console.log('[master] ' + 'fork: worker' + worker.id);
    });

    cluster.on('online', function (worker) {
        console.log('[master] ' + 'online: worker' + worker.id);
    });

    cluster.on('listening', function (worker, address) {
        console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
    });

    cluster.on('disconnect', function (worker) {
        console.log('[master] ' + 'disconnect: worker' + worker.id);
    });

    cluster.on('exit', function (worker, code, signal) {
        console.log('[master] ' + 'exit worker' + worker.id + ' died');
    });

    function eachWorker(callback) {
        for (var id in cluster.workers) {
            callback(cluster.workers[id]);
        }
    }

    setTimeout(function () {
        eachWorker(function (worker) {
            worker.send('[master] ' + 'send message to worker' + worker.id);
        });
    }, 3000);

    Object.keys(cluster.workers).forEach(function(id) {
        cluster.workers[id].on('message', function(msg){
            console.log('[master] ' + 'message ' + msg);
        });
    });

} else if (cluster.isWorker) {
    console.log('[worker] ' + "start worker ..." + cluster.worker.id);

    process.on('message', function(msg) {
        console.log('[worker] '+msg);
        process.send('[worker] worker'+cluster.worker.id+' received!');
    });

    http.createServer(function (req, res) {
            res.writeHead(200, {"content-type": "text/html"});
            res.end('worker'+cluster.worker.id+',PID:'+process.pid);
    }).listen(3000);

}

控制台日志:


~ D:\workspace\javascript\nodejs-cluster>node cluster.js

[master] start master...
[worker] start worker ...1
[worker] [master] hi worker1
[worker] start worker ...2
[worker] [master] hi worker2
[master] fork: worker1
[master] fork: worker2
[master] fork: worker3
[master] fork: worker4
[master] online: worker1
[master] online: worker2
[master] message [worker] worker1 received!
[master] message [worker] worker2 received!
[master] listening: worker1,pid:6068, Address:0.0.0.0:3000
[master] listening: worker2,pid:1408, Address:0.0.0.0:3000
[master] online: worker3
[worker] start worker ...3
[worker] [master] hi worker3
[master] message [worker] worker3 received!
[master] listening: worker3,pid:3428, Address:0.0.0.0:3000
[master] online: worker4
[worker] start worker ...4
[worker] [master] hi worker4
[master] message [worker] worker4 received!
[master] listening: worker4,pid:6872, Address:0.0.0.0:3000
[worker] [master] send message to worker1
[worker] [master] send message to worker2
[worker] [master] send message to worker3
[worker] [master] send message to worker4
[master] message [worker] worker1 received!
[master] message [worker] worker2 received!
[master] message [worker] worker3 received!
[master] message [worker] worker4 received!

6. 用cluster实现负载均衡(Load Balance) -- win7失败

新建文件: server.js


~ vi server.js

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log('[master] ' + "start master...");

    for (var i = 0; i < numCPUs; i++) {
         cluster.fork();
    }

    cluster.on('listening', function (worker, address) {
        console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
    });

} else if (cluster.isWorker) {
    console.log('[worker] ' + "start worker ..." + cluster.worker.id);
    http.createServer(function (req, res) {
        console.log('worker'+cluster.worker.id);
        res.end('worker'+cluster.worker.id+',PID:'+process.pid);
    }).listen(3000);
}

启动服务器:


~ D:\workspace\javascript\nodejs-cluster>node server.js
[master] start master...
[worker] start worker ...1
[worker] start worker ...2
[master] listening: worker1,pid:1536, Address:0.0.0.0:3000
[master] listening: worker2,pid:5920, Address:0.0.0.0:3000
[worker] start worker ...3
[master] listening: worker3,pid:7156, Address:0.0.0.0:3000
[worker] start worker ...4
[master] listening: worker4,pid:2868, Address:0.0.0.0:3000
worker4
worker4
worker4
worker4
worker4
worker4
worker4
worker4

用curl工具访问


C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868

我们发现了cluster在win中的bug,只用到worker4。果断切换到Linux测试。

7. 用cluster实现负载均衡(Load Balance) -- ubuntu成功

Linux的系统环境

  • Linux: Ubuntu 12.04.2 64bit Server
  • Node: v0.11.2
  • Npm: 1.2.21

构建项目:不多解释


~ cd :/home/conan/nodejs/
~ mkdir nodejs-cluster && cd nodejs-cluster
~ vi server.js

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log('[master] ' + "start master...");

    for (var i = 0; i < numCPUs; i++) {
         cluster.fork();
    }

    cluster.on('listening', function (worker, address) {
        console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
    });

} else if (cluster.isWorker) {
    console.log('[worker] ' + "start worker ..." + cluster.worker.id);
    http.createServer(function (req, res) {
        console.log('worker'+cluster.worker.id);
        res.end('worker'+cluster.worker.id+',PID:'+process.pid);
    }).listen(3000);
}

启动服务器


conan@conan-deskop:~/nodejs/nodejs-cluster$ node server.js
[master] start master...
[worker] start worker ...1
[master] listening: worker1,pid:2925, Address:0.0.0.0:3000
[worker] start worker ...3
[master] listening: worker3,pid:2931, Address:0.0.0.0:3000
[worker] start worker ...4
[master] listening: worker4,pid:2932, Address:0.0.0.0:3000
[worker] start worker ...2
[master] listening: worker2,pid:2930, Address:0.0.0.0:3000
worker4
worker2
worker1
worker3
worker4
worker2
worker1

用curl工具访问


C:\Users\Administrator>curl 192.168.1.20:3000
worker4,PID:2932
C:\Users\Administrator>curl 192.168.1.20:3000
worker2,PID:2930
C:\Users\Administrator>curl 192.168.1.20:3000
worker1,PID:2925
C:\Users\Administrator>curl 192.168.1.20:3000
worker3,PID:2931
C:\Users\Administrator>curl 192.168.1.20:3000
worker4,PID:2932
C:\Users\Administrator>curl 192.168.1.20:3000
worker2,PID:2930
C:\Users\Administrator>curl 192.168.1.20:3000
worker1,PID:2925

在Linux环境中,cluster是运行正确的!!!

8. cluster负载均衡策略的测试

我们在Linux下面,完成测试,用过测试软件: siege

安装siege

~ sudo apt-get install siege

启动node cluster

~ node server.js > server.log

运行siege启动命令,每秒50个并发请求。


~ sudo siege -c 50 http://localhost:3000

HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.01 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.01 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.02 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.02 secs:      16 bytes ==> /
HTTP/1.1 200   0.01 secs:      16 bytes ==> /
HTTP/1.1 200   0.01 secs:      16 bytes ==> /
.....

^C
Lifting the server siege...      done.                                                                Transactions:                    3760 hits
Availability:                 100.00 %
Elapsed time:                  39.66 secs
Data transferred:               0.06 MB
Response time:                  0.01 secs
Transaction rate:              94.81 trans/sec
Throughput:                     0.00 MB/sec
Concurrency:                    1.24
Successful transactions:        3760
Failed transactions:               0
Longest transaction:            0.20
Shortest transaction:           0.00

FILE: /var/siege.log
You can disable this annoying message by editing
the .siegerc file in your home directory; change
the directive 'show-logfile' to false.

我们统计结果,执行3760次请求,消耗39.66秒,每秒处理94.81次请求。

查看server.log文件,


~  ls -l
total 64
-rw-rw-r-- 1 conan conan   756  9月 28 15:48 server.js
-rw-rw-r-- 1 conan conan 50313  9月 28 16:26 server.log

~ tail server.log
worker4
worker1
worker2
worker4
worker1
worker2
worker4
worker3
worker2
worker1

最后,用R语言分析一下:server.log


~ R 

> df<-read.table(file="server.log",skip=9,header=FALSE)
> summary(df)
       V1
 worker1:1559
 worker2:1579
 worker3:1570
 worker4:1535

我们看到,请求被分配到worker数据量相当。所以,cluster的负载均衡的策略,应该是随机分配的。

好了,我们又学了一个很有用的技能!利用cluster可以构建出多核应用,充分的利用多CPU带业的性能吧!!

转载请注明出处:
http://blog.fens.me/nodejs-core-cluster/

打赏作者