• Archive by category "架构设计"
  • (Page 3)

Blog Archives

用Maven构建Mahout项目

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mahout-maven-eclipse/

mahout-maven-logo

前言

基于Hadoop的项目,不管是MapReduce开发,还是Mahout的开发都是在一个复杂的编程环境中开发。Java的环境问题,是困扰着每个程序员的噩梦。Java程序员,不仅要会写Java程序,还要会调linux,会配hadoop,启动hadoop,还要会自己运维。所以,新手想玩起Hadoop真不是件简单的事。

不过,我们可以尽可能的简化环境问题,让程序员只关注于写程序。特别是像算法程序员,把精力投入在算法设计上,要比花时间解决环境问题有价值的多。

目录

  1. Maven介绍和安装
  2. Mahout单机开发环境介绍
  3. 用Maven构建Mahout开发环境
  4. 用Mahout实现协同过滤userCF
  5. 用Mahout实现kmeans
  6. 模板项目上传github

1. Maven介绍和安装

请参考文章:用Maven构建Hadoop项目

开发环境

  • Win7 64bit
  • Java 1.6.0_45
  • Maven 3
  • Eclipse Juno Service Release 2
  • Mahout 0.6

这里要说明一下mahout的运行版本。

  • mahout-0.5, mahout-0.6, mahout-0.7,是基于hadoop-0.20.2x的。
  • mahout-0.8, mahout-0.9,是基于hadoop-1.1.x的。
  • mahout-0.7,有一次重大升级,去掉了多个算法的单机内存运行,并且了部分API不向前兼容。

注:本文关注于“用Maven构建Mahout的开发环境”,文中的 2个例子都是基于单机的内存实现,因此选择0.6版本。Mahout在Hadoop集群中运行会在下一篇文章介绍。

2. Mahout单机开发环境介绍

hadoop-mahout-dev

如上图所示,我们可以选择在win中开发,也可以在linux中开发,开发过程我们可以在本地环境进行调试,标配的工具都是Maven和Eclipse。

3. 用Maven构建Mahout开发环境

  • 1. 用Maven创建一个标准化的Java项目
  • 2. 导入项目到eclipse
  • 3. 增加mahout依赖,修改pom.xml
  • 4. 下载依赖

1). 用Maven创建一个标准化的Java项目


~ D:\workspace\java>mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes 
-DgroupId=org.conan.mymahout -DartifactId=myMahout -DpackageName=org.conan.mymahout -Dversion=1.0-SNAPSHOT -DinteractiveMode=false

进入项目,执行mvn命令


~ D:\workspace\java>cd myMahout
~ D:\workspace\java\myMahout>mvn clean install

2). 导入项目到eclipse

我们创建好了一个基本的maven项目,然后导入到eclipse中。 这里我们最好已安装好了Maven的插件。

mahout-eclipse-folder

3). 增加mahout依赖,修改pom.xml

这里我使用hadoop-0.6版本,同时去掉对junit的依赖,修改文件:pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.conan.mymahout</groupId>
<artifactId>myMahout</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>myMahout</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mahout.version>0.6</mahout.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
<version>${mahout.version}</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-integration</artifactId>
<version>${mahout.version}</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
</exclusion>
<exclusion>
<groupId>me.prettyprint</groupId>
<artifactId>hector-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

4). 下载依赖

~ mvn clean install

在eclipse中刷新项目:

mahout-eclipse-package

项目的依赖程序,被自动加载的库路径下面。

4. 用Mahout实现协同过滤userCF

Mahout协同过滤UserCF深度算法剖析,请参考文章:用R解析Mahout用户推荐协同过滤算法(UserCF)

实现步骤:

  • 1. 准备数据文件: item.csv
  • 2. Java程序:UserCF.java
  • 3. 运行程序
  • 4. 推荐结果解读

1). 新建数据文件: item.csv


~ mkdir datafile
~ vi datafile/item.csv

1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0

数据解释:每一行有三列,第一列是用户ID,第二列是物品ID,第三列是用户对物品的打分。

2). Java程序:UserCF.java

Mahout协同过滤的数据流,调用过程。

mahout-recommendation-process

上图摘自:Mahout in Action

新建JAVA类:org.conan.mymahout.recommendation.UserCF.java


package org.conan.mymahout.recommendation;

import java.io.File;
import java.io.IOException;
import java.util.List;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
import org.apache.mahout.cf.taste.impl.neighborhood.NearestNUserNeighborhood;
import org.apache.mahout.cf.taste.impl.recommender.GenericUserBasedRecommender;
import org.apache.mahout.cf.taste.impl.similarity.EuclideanDistanceSimilarity;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.cf.taste.recommender.Recommender;
import org.apache.mahout.cf.taste.similarity.UserSimilarity;

public class UserCF {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws IOException, TasteException {
        String file = "datafile/item.csv";
        DataModel model = new FileDataModel(new File(file));
        UserSimilarity user = new EuclideanDistanceSimilarity(model);
        NearestNUserNeighborhood neighbor = new NearestNUserNeighborhood(NEIGHBORHOOD_NUM, user, model);
        Recommender r = new GenericUserBasedRecommender(model, neighbor, user);
        LongPrimitiveIterator iter = model.getUserIDs();

        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = r.recommend(uid, RECOMMENDER_NUM);
            System.out.printf("uid:%s", uid);
            for (RecommendedItem ritem : list) {
                System.out.printf("(%s,%f)", ritem.getItemID(), ritem.getValue());
            }
            System.out.println();
        }
    }
}

3). 运行程序
控制台输出:


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
uid:1(104,4.274336)(106,4.000000)
uid:2(105,4.055916)
uid:3(103,3.360987)(102,2.773169)
uid:4(102,3.000000)
uid:5

4). 推荐结果解读

  • 向用户ID1,推荐前二个最相关的物品, 104和106
  • 向用户ID2,推荐前二个最相关的物品, 但只有一个105
  • 向用户ID3,推荐前二个最相关的物品, 103和102
  • 向用户ID4,推荐前二个最相关的物品, 但只有一个102
  • 向用户ID5,推荐前二个最相关的物品, 没有符合的

5. 用Mahout实现kmeans

  • 1. 准备数据文件: randomData.csv
  • 2. Java程序:Kmeans.java
  • 3. 运行Java程序
  • 4. mahout结果解读
  • 5. 用R语言实现Kmeans算法
  • 6. 比较Mahout和R的结果

1). 准备数据文件: randomData.csv


~ vi datafile/randomData.csv

-0.883033363823402,-3.31967192630249
-2.39312626419456,3.34726861118871
2.66976353341256,1.85144276077058
-1.09922906899594,-6.06261735207489
-4.36361936997216,1.90509905380532
-0.00351835125495037,-0.610105996559153
-2.9962958796338,-3.60959839525735
-3.27529418132066,0.0230099799641799
2.17665594420569,6.77290756817957
-2.47862038335637,2.53431833167278
5.53654901906814,2.65089785582474
5.66257474538338,6.86783609641077
-0.558946883114376,1.22332819416237
5.11728525486132,3.74663871584768
1.91240516693351,2.95874731384062
-2.49747101306535,2.05006504756875
3.98781883213459,1.00780938946366

这里只截取了一部分,更多的数据请查看源代码。

注:我是通过R语言生成的randomData.csv


x1<-cbind(x=rnorm(400,1,3),y=rnorm(400,1,3))
x2<-cbind(x=rnorm(300,1,0.5),y=rnorm(300,0,0.5))
x3<-cbind(x=rnorm(300,0,0.1),y=rnorm(300,2,0.2))
x<-rbind(x1,x2,x3)
write.table(x,file="randomData.csv",sep=",",row.names=FALSE,col.names=FALSE)

2). Java程序:Kmeans.java

Mahout中kmeans方法的算法实现过程。

mahout-kmeans-process

上图摘自:Mahout in Action

新建JAVA类:org.conan.mymahout.cluster06.Kmeans.java


package org.conan.mymahout.cluster06;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.mahout.clustering.kmeans.Cluster;
import org.apache.mahout.clustering.kmeans.KMeansClusterer;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.Vector;

public class Kmeans {

    public static void main(String[] args) throws IOException {
        List sampleData = MathUtil.readFileToVector("datafile/randomData.csv");

        int k = 3;
        double threshold = 0.01;

        List randomPoints = MathUtil.chooseRandomPoints(sampleData, k);
        for (Vector vector : randomPoints) {
            System.out.println("Init Point center: " + vector);
        }

        List clusters = new ArrayList();
        for (int i = 0; i < k; i++) {
            clusters.add(new Cluster(randomPoints.get(i), i, new EuclideanDistanceMeasure()));
        }

        List<List> finalClusters = KMeansClusterer.clusterPoints(sampleData, clusters, new EuclideanDistanceMeasure(), k, threshold);
        for (Cluster cluster : finalClusters.get(finalClusters.size() - 1)) {
            System.out.println("Cluster id: " + cluster.getId() + " center: " + cluster.getCenter().asFormatString());
        }
    }

}

3). 运行Java程序
控制台输出:


Init Point center: {0:-0.162693685149196,1:2.19951550286862}
Init Point center: {0:-0.0409782183083317,1:2.09376666042057}
Init Point center: {0:0.158401778474687,1:2.37208412905273}
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Cluster id: 0 center: {0:-2.686856800552941,1:1.8939462954763795}
Cluster id: 1 center: {0:0.6334255423230666,1:0.49472852972602105}
Cluster id: 2 center: {0:3.334520309711998,1:3.2758355898247653}

4). mahout结果解读

  • 1. Init Point center表示,kmeans算法初始时的设置的3个中心点
  • 2. Cluster center表示,聚类后找到3个中心点

5). 用R语言实现Kmeans算法
接下来为了让结果更直观,我们再用R语言,进行kmeans实验,操作相同的数据。

R语言代码:


> y<-read.csv(file="randomData.csv",sep=",",header=FALSE) 
> cl<-kmeans(y,3,iter.max = 10, nstart = 25) 
> cl$centers
          V1         V2
1 -0.4323971  2.2852949
2  0.9023786 -0.7011153
3  4.3725463  2.4622609

# 生成聚类中心的图形
> plot(y, col=c("black","blue","green")[cl$cluster])
> points(cl$centers, col="red", pch = 19)

# 画出Mahout聚类的中心
> mahout<-matrix(c(-2.686856800552941,1.8939462954763795,0.6334255423230666,0.49472852972602105,3.334520309711998,3.2758355898247653),ncol=2,byrow=TRUE) 
> points(mahout, col="violetred", pch = 19)

聚类的效果图:
kmeans-center

6). 比较Mahout和R的结果
从上图中,我们看到有 黑,蓝,绿,三种颜色的空心点,这些点就是原始的数据。

3个红色实点,是R语言kmeans后生成的3个中心。
3个紫色实点,是Mahout的kmeans后生成的3个中心。

R语言和Mahout生成的点,并不是重合的,原因有几点:

  • 1. 距离算法不一样:
    Mahout中,我们用的 “欧氏距离(EuclideanDistanceMeasure)”
    R语言中,默认是”Hartigan and Wong”
  • 2. 初始化的中心是不一样的。
  • 3. 最大迭代次数是不一样的。
  • 4. 点合并时,判断的”阈值(threshold)”是不一样的。

6. 模板项目上传github

https://github.com/bsspirit/maven_mahout_template/tree/mahout-0.6

大家可以下载这个项目,做为开发的起点。

 
~ git clone https://github.com/bsspirit/maven_mahout_template
~ git checkout mahout-0.6

我们完成了第一步,下面就将正式进入mahout算法的开发实践,并且应用到hadoop集群的环境中。

下一篇:Mahout分步式程序开发 基于物品的协同过滤ItemCF

转载请注明出处:
http://blog.fens.me/hadoop-mahout-maven-eclipse/

打赏作者

海量Web日志分析 用Hadoop提取KPI统计指标

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mapreduce-log-kpi/

hadoop-kpi-logo

前言

Web日志包含着网站最重要的信息,通过日志分析,我们可以知道网站的访问量,哪个网页访问人数最多,哪个网页最有价值等。一般中型的网站(10W的PV以上),每天会产生1G以上Web日志文件。大型或超大型的网站,可能每小时就会产生10G的数据量。

对于日志的这种规模的数据,用Hadoop进行日志分析,是最适合不过的了。

目录

  1. Web日志分析概述
  2. 需求分析:KPI指标设计
  3. 算法模型:Hadoop并行算法
  4. 架构设计:日志KPI系统架构
  5. 程序开发1:用Maven构建Hadoop项目
  6. 程序开发2:MapReduce程序实现

1. Web日志分析概述

Web日志由Web服务器产生,可能是Nginx, Apache, Tomcat等。从Web日志中,我们可以获取网站每类页面的PV值(PageView,页面访问量)、独立IP数;稍微复杂一些的,可以计算得出用户所检索的关键词排行榜、用户停留时间最高的页面等;更复杂的,构建广告点击模型、分析用户行为特征等等。

在Web日志中,每条日志通常代表着用户的一次访问行为,例如下面就是一条nginx日志:


222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939
 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1)
 AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"

拆解为以下8个变量

  • remote_addr: 记录客户端的ip地址, 222.68.172.190
  • remote_user: 记录客户端用户名称, –
  • time_local: 记录访问时间与时区, [18/Sep/2013:06:49:57 +0000]
  • request: 记录请求的url与http协议, “GET /images/my.jpg HTTP/1.1”
  • status: 记录请求状态,成功是200, 200
  • body_bytes_sent: 记录发送给客户端文件主体内容大小, 19939
  • http_referer: 用来记录从那个页面链接访问过来的, “http://www.angularjs.cn/A00n”
  • http_user_agent: 记录客户浏览器的相关信息, “Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36”

注:要更多的信息,则要用其它手段去获取,通过js代码单独发送请求,使用cookies记录用户的访问信息。

利用这些日志信息,我们可以深入挖掘网站的秘密了。

少量数据的情况

少量数据的情况(10Mb,100Mb,10G),在单机处理尚能忍受的时候,我可以直接利用各种Unix/Linux工具,awk、grep、sort、join等都是日志分析的利器,再配合perl, python,正则表达工,基本就可以解决所有的问题。

例如,我们想从上面提到的nginx日志中得到访问量最高前10个IP,实现很简单:


~ cat access.log.10 | awk '{a[$1]++} END {for(b in a) print b"\t"a[b]}' | sort -k2 -r | head -n 10
163.177.71.12   972
101.226.68.137  972
183.195.232.138 971
50.116.27.194   97
14.17.29.86     96
61.135.216.104  94
61.135.216.105  91
61.186.190.41   9
59.39.192.108   9
220.181.51.212  9

海量数据的情况

当数据量每天以10G、100G增长的时候,单机处理能力已经不能满足需求。我们就需要增加系统的复杂性,用计算机集群,存储阵列来解决。在Hadoop出现之前,海量数据存储,和海量日志分析都是非常困难的。只有少数一些公司,掌握着高效的并行计算,分步式计算,分步式存储的核心技术。

Hadoop的出现,大幅度的降低了海量数据处理的门槛,让小公司甚至是个人都能力,搞定海量数据。并且,Hadoop非常适用于日志分析系统。

2.需求分析:KPI指标设计

下面我们将从一个公司案例出发来全面的解释,如何用进行海量Web日志分析,提取KPI数据

案例介绍
某电子商务网站,在线团购业务。每日PV数100w,独立IP数5w。用户通常在工作日上午10:00-12:00和下午15:00-18:00访问量最大。日间主要是通过PC端浏览器访问,休息日及夜间通过移动设备访问较多。网站搜索浏量占整个网站的80%,PC用户不足1%的用户会消费,移动用户有5%会消费。

通过简短的描述,我们可以粗略地看出,这家电商网站的经营状况,并认识到愿意消费的用户从哪里来,有哪些潜在的用户可以挖掘,网站是否存在倒闭风险等。

KPI指标设计

  • PV(PageView): 页面访问量统计
  • IP: 页面独立IP的访问量统计
  • Time: 用户每小时PV的统计
  • Source: 用户来源域名的统计
  • Browser: 用户的访问设备统计

注:商业保密限制,无法提供电商网站的日志。
下面的内容,将以我的个人网站为例提取数据进行分析。

百度统计,对我个人网站做的统计!http://www.fens.me

基本统计指标:
hadoop-kpi-baidu

用户的访问设备统计指标:
hadoop-kpi-baidu2

从商业的角度,个人网站的特征与电商网站不太一样,没有转化率,同时跳出率也比较高。从技术的角度,同样都关注KPI指标设计。

3.算法模型:Hadoop并行算法

hadoop-kpi-log

并行算法的设计:
注:找到第一节有定义的8个变量

PV(PageView): 页面访问量统计

  • Map过程{key:$request,value:1}
  • Reduce过程{key:$request,value:求和(sum)}

IP: 页面独立IP的访问量统计

  • Map: {key:$request,value:$remote_addr}
  • Reduce: {key:$request,value:去重再求和(sum(unique))}

Time: 用户每小时PV的统计

  • Map: {key:$time_local,value:1}
  • Reduce: {key:$time_local,value:求和(sum)}

Source: 用户来源域名的统计

  • Map: {key:$http_referer,value:1}
  • Reduce: {key:$http_referer,value:求和(sum)}

Browser: 用户的访问设备统计

  • Map: {key:$http_user_agent,value:1}
  • Reduce: {key:$http_user_agent,value:求和(sum)}

4.架构设计:日志KPI系统架构

hadoop-kpi-architect

上图中,左边是Application业务系统,右边是Hadoop的HDFS, MapReduce。

  1. 日志是由业务系统产生的,我们可以设置web服务器每天产生一个新的目录,目录下面会产生多个日志文件,每个日志文件64M。
  2. 设置系统定时器CRON,夜间在0点后,向HDFS导入昨天的日志文件。
  3. 完成导入后,设置系统定时器,启动MapReduce程序,提取并计算统计指标。
  4. 完成计算后,设置系统定时器,从HDFS导出统计指标数据到数据库,方便以后的即使查询。

hadoop-kpi-process

上面这幅图,我们可以看得更清楚,数据是如何流动的。蓝色背景的部分是在Hadoop中的,接下来我们的任务就是完成MapReduce的程序实现。

5.程序开发1:用Maven构建Hadoop项目

请参考文章:用Maven构建Hadoop项目

win7的开发环境 和 Hadoop的运行环境 ,在上面文章中已经介绍过了。

我们需要放日志文件,上传的HDFS里/user/hdfs/log_kpi/目录,参考下面的命令操作


~ hadoop fs -mkdir /user/hdfs/log_kpi
~ hadoop fs -copyFromLocal /home/conan/datafiles/access.log.10 /user/hdfs/log_kpi/

我已经把整个MapReduce的实现都放到了github上面:

https://github.com/bsspirit/maven_hadoop_template/releases/tag/kpi_v1

6.程序开发2:MapReduce程序实现

开发流程:

  1. 对日志行的解析
  2. Map函数实现
  3. Reduce函数实现
  4. 启动程序实现

1). 对日志行的解析
新建文件:org.conan.myhadoop.mr.kpi.KPI.java


package org.conan.myhadoop.mr.kpi;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

/*
 * KPI Object
 */
public class KPI {
    private String remote_addr;// 记录客户端的ip地址
    private String remote_user;// 记录客户端用户名称,忽略属性"-"
    private String time_local;// 记录访问时间与时区
    private String request;// 记录请求的url与http协议
    private String status;// 记录请求状态;成功是200
    private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
    private String http_referer;// 用来记录从那个页面链接访问过来的
    private String http_user_agent;// 记录客户浏览器的相关信息

    private boolean valid = true;// 判断数据是否合法
    
    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("valid:" + this.valid);
        sb.append("\nremote_addr:" + this.remote_addr);
        sb.append("\nremote_user:" + this.remote_user);
        sb.append("\ntime_local:" + this.time_local);
        sb.append("\nrequest:" + this.request);
        sb.append("\nstatus:" + this.status);
        sb.append("\nbody_bytes_sent:" + this.body_bytes_sent);
        sb.append("\nhttp_referer:" + this.http_referer);
        sb.append("\nhttp_user_agent:" + this.http_user_agent);
        return sb.toString();
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getRemote_user() {
        return remote_user;
    }

    public void setRemote_user(String remote_user) {
        this.remote_user = remote_user;
    }

    public String getTime_local() {
        return time_local;
    }

    public Date getTime_local_Date() throws ParseException {
        SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
        return df.parse(this.time_local);
    }
    
    public String getTime_local_Date_hour() throws ParseException{
        SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
        return df.format(this.getTime_local_Date());
    }

    public void setTime_local(String time_local) {
        this.time_local = time_local;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getBody_bytes_sent() {
        return body_bytes_sent;
    }

    public void setBody_bytes_sent(String body_bytes_sent) {
        this.body_bytes_sent = body_bytes_sent;
    }

    public String getHttp_referer() {
        return http_referer;
    }
    
    public String getHttp_referer_domain(){
        if(http_referer.length()<8){ 
            return http_referer;
        }
        
        String str=this.http_referer.replace("\"", "").replace("http://", "").replace("https://", "");
        return str.indexOf("/")>0?str.substring(0, str.indexOf("/")):str;
    }

    public void setHttp_referer(String http_referer) {
        this.http_referer = http_referer;
    }

    public String getHttp_user_agent() {
        return http_user_agent;
    }

    public void setHttp_user_agent(String http_user_agent) {
        this.http_user_agent = http_user_agent;
    }

    public boolean isValid() {
        return valid;
    }

    public void setValid(boolean valid) {
        this.valid = valid;
    }

    public static void main(String args[]) {
        String line = "222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] \"GET /images/my.jpg HTTP/1.1\" 200 19939 \"http://www.angularjs.cn/A00n\" \"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36\"";
        System.out.println(line);
        KPI kpi = new KPI();
        String[] arr = line.split(" ");

        kpi.setRemote_addr(arr[0]);
        kpi.setRemote_user(arr[1]);
        kpi.setTime_local(arr[3].substring(1));
        kpi.setRequest(arr[6]);
        kpi.setStatus(arr[8]);
        kpi.setBody_bytes_sent(arr[9]);
        kpi.setHttp_referer(arr[10]);
        kpi.setHttp_user_agent(arr[11] + " " + arr[12]);
        System.out.println(kpi);

        try {
            SimpleDateFormat df = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss", Locale.US);
            System.out.println(df.format(kpi.getTime_local_Date()));
            System.out.println(kpi.getTime_local_Date_hour());
            System.out.println(kpi.getHttp_referer_domain());
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }

}

从日志文件中,取一行通过main函数写一个简单的解析测试。

控制台输出:


222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
valid:true
remote_addr:222.68.172.190
remote_user:-
time_local:18/Sep/2013:06:49:57
request:/images/my.jpg
status:200
body_bytes_sent:19939
http_referer:"http://www.angularjs.cn/A00n"
http_user_agent:"Mozilla/5.0 (Windows
2013.09.18:06:49:57
2013091806
www.angularjs.cn

我们看到日志行,被正确的解析成了kpi对象的属性。我们把解析过程,单独封装成一个方法。


    private static KPI parser(String line) {
        System.out.println(line);
        KPI kpi = new KPI();
        String[] arr = line.split(" ");
        if (arr.length > 11) {
            kpi.setRemote_addr(arr[0]);
            kpi.setRemote_user(arr[1]);
            kpi.setTime_local(arr[3].substring(1));
            kpi.setRequest(arr[6]);
            kpi.setStatus(arr[8]);
            kpi.setBody_bytes_sent(arr[9]);
            kpi.setHttp_referer(arr[10]);
            
            if (arr.length > 12) {
                kpi.setHttp_user_agent(arr[11] + " " + arr[12]);
            } else {
                kpi.setHttp_user_agent(arr[11]);
            }

            if (Integer.parseInt(kpi.getStatus()) >= 400) {// 大于400,HTTP错误
                kpi.setValid(false);
            }
        } else {
            kpi.setValid(false);
        }
        return kpi;
    }

对map方法,reduce方法,启动方法,我们单独写一个类来实现

下面将分别介绍MapReduce的实现类:

  • PV:org.conan.myhadoop.mr.kpi.KPIPV.java
  • IP: org.conan.myhadoop.mr.kpi.KPIIP.java
  • Time: org.conan.myhadoop.mr.kpi.KPITime.java
  • Browser: org.conan.myhadoop.mr.kpi.KPIBrowser.java

1). PV:org.conan.myhadoop.mr.kpi.KPIPV.java


package org.conan.myhadoop.mr.kpi;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class KPIPV { 

    public static class KPIPVMapper extends MapReduceBase implements Mapper {
        private IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, OutputCollector output, Reporter reporter) throws IOException {
            KPI kpi = KPI.filterPVs(value.toString());
            if (kpi.isValid()) {
                word.set(kpi.getRequest());
                output.collect(word, one);
            }
        }
    }

    public static class KPIPVReducer extends MapReduceBase implements Reducer {
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            result.set(sum);
            output.collect(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        String input = "hdfs://192.168.1.210:9000/user/hdfs/log_kpi/";
        String output = "hdfs://192.168.1.210:9000/user/hdfs/log_kpi/pv";

        JobConf conf = new JobConf(KPIPV.class);
        conf.setJobName("KPIPV");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");

        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(IntWritable.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(KPIPVMapper.class);
        conf.setCombinerClass(KPIPVReducer.class);
        conf.setReducerClass(KPIPVReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        JobClient.runJob(conf);
        System.exit(0);
    }
}

在程序中会调用KPI类的方法

KPI kpi = KPI.filterPVs(value.toString());

通过filterPVs方法,我们可以实现对PV,更多的控制。

在KPK.java中,增加filterPVs方法


    /**
     * 按page的pv分类
     */
    public static KPI filterPVs(String line) {
        KPI kpi = parser(line);
        Set pages = new HashSet();
        pages.add("/about");
        pages.add("/black-ip-list/");
        pages.add("/cassandra-clustor/");
        pages.add("/finance-rhive-repurchase/");
        pages.add("/hadoop-family-roadmap/");
        pages.add("/hadoop-hive-intro/");
        pages.add("/hadoop-zookeeper-intro/");
        pages.add("/hadoop-mahout-roadmap/");

        if (!pages.contains(kpi.getRequest())) {
            kpi.setValid(false);
        }
        return kpi;
    }

在filterPVs方法,我们定义了一个pages的过滤,就是只对这个页面进行PV统计。

我们运行一下KPIPV.java


2013-10-9 11:53:28 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-9 11:53:28 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-9 11:53:28 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/log_kpi/access.log.10:0+3025757
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/log_kpi/access.log.10:0+3025757
2013-10-9 11:53:30 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2013-10-9 11:53:30 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-9 11:53:30 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-9 11:53:30 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 213 bytes
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-9 11:53:30 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-9 11:53:30 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2013-10-9 11:53:30 org.apache.hadoop.mapred.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/log_kpi/pv
2013-10-9 11:53:31 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2013-10-9 11:53:33 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-9 11:53:33 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2013-10-9 11:53:34 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-9 11:53:34 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=3025757
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=183
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=545
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=6051514
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=83472
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=183
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=217
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map input records=14619
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=16
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=2004
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=376569856
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map input bytes=3025757
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=110
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=76
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=8
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=8
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=8
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=8
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map output records=76

用hadoop命令查看HDFS文件


~ hadoop fs -cat /user/hdfs/log_kpi/pv/part-00000

/about  5
/black-ip-list/ 2
/cassandra-clustor/     3
/finance-rhive-repurchase/      13
/hadoop-family-roadmap/ 13
/hadoop-hive-intro/     14
/hadoop-mahout-roadmap/ 20
/hadoop-zookeeper-intro/        6

这样我们就得到了,刚刚日志文件中的,指定页面的PV值。

指定页面,就像网站的站点地图一样,如果没有指定所有访问链接都会被找出来,通过“站点地图”的指定,我们可以更容易地找到,我们所需要的信息。

后面,其他的统计指标的提取思路,和PV的实现过程都是类似的,大家可以直接下载源代码,运行看到结果!!

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-mapreduce-log-kpi/

打赏作者

用Maven构建Hadoop项目

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-maven-eclipse/

hadoop-maven

前言

Hadoop的MapReduce环境是一个复杂的编程环境,所以我们要尽可能地简化构建MapReduce项目的过程。Maven是一个很不错的自动化项目构建工具,通过Maven来帮助我们从复杂的环境配置中解脱出来,从而标准化开发过程。所以,写MapReduce之前,让我们先花点时间把刀磨快!!当然,除了Maven还有其他的选择Gradle(推荐), Ivy….

后面将会有介绍几篇MapReduce开发的文章,都要依赖于本文中Maven的构建的MapReduce环境。

目录

  1. Maven介绍
  2. Maven安装(win)
  3. Hadoop开发环境介绍
  4. 用Maven构建Hadoop环境
  5. MapReduce程序开发
  6. 模板项目上传github

1. Maven介绍

Apache Maven,是一个Java的项目管理及自动构建工具,由Apache软件基金会所提供。基于项目对象模型(缩写:POM)概念,Maven利用一个中央信息片断能管理一个项目的构建、报告和文档等步骤。曾是Jakarta项目的子项目,现为独立Apache项目。

maven的开发者在他们开发网站上指出,maven的目标是要使得项目的构建更加容易,它把编译、打包、测试、发布等开发过程中的不同环节有机的串联了起来,并产生一致的、高质量的项目信息,使得项目成员能够及时地得到反馈。maven有效地支持了测试优先、持续集成,体现了鼓励沟通,及时反馈的软件开发理念。如果说Ant的复用是建立在”拷贝–粘贴”的基础上的,那么Maven通过插件的机制实现了项目构建逻辑的真正复用。

2. Maven安装(win)

下载Maven:http://maven.apache.org/download.cgi

下载最新的xxx-bin.zip文件,在win上解压到 D:\toolkit\maven3

并把maven/bin目录设置在环境变量PATH:

win7-maven

然后,打开命令行输入mvn,我们会看到mvn命令的运行效果


~ C:\Users\Administrator>mvn
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 0.086s
[INFO] Finished at: Mon Sep 30 18:26:58 CST 2013
[INFO] Final Memory: 2M/179M
[INFO] ------------------------------------------------------------------------
[ERROR] No goals have been specified for this build. You must specify a valid lifecycle phase or a goal in the format : or :[:]:. Available lifecycle phases are: validate, initialize, generate-sources, process-sources, generate-resources, process-resources, compile, process-class
es, generate-test-sources, process-test-sources, generate-test-resources, process-test-resources, test-compile, process-test-classes, test, prepare-package, package, pre-integration-test, integration-test, post-integration-test, verify, install, deploy, pre-clean, clean, post-clean, pre-site, site, post-site, site-deploy. -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/NoGoalSpecifiedException

安装Eclipse的Maven插件:Maven Integration for Eclipse

Maven的Eclipse插件配置

eclipse-maven

3. Hadoop开发环境介绍

hadoop-dev

如上图所示,我们可以选择在win中开发,也可以在linux中开发,本地启动Hadoop或者远程调用Hadoop,标配的工具都是Maven和Eclipse。

Hadoop集群系统环境:

  • Linux: Ubuntu 12.04.2 LTS 64bit Server
  • Java: 1.6.0_29
  • Hadoop: hadoop-1.0.3,单节点,IP:192.168.1.210

4. 用Maven构建Hadoop环境

  • 1. 用Maven创建一个标准化的Java项目
  • 2. 导入项目到eclipse
  • 3. 增加hadoop依赖,修改pom.xml
  • 4. 下载依赖
  • 5. 从Hadoop集群环境下载hadoop配置文件
  • 6. 配置本地host

1). 用Maven创建一个标准化的Java项目


~ D:\workspace\java>mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DgroupId=org.conan.myhadoop.mr
-DartifactId=myHadoop -DpackageName=org.conan.myhadoop.mr -Dversion=1.0-SNAPSHOT -DinteractiveMode=false
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom <<<
[INFO]
[INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Batch mode
[INFO] No archetype defined. Using maven-archetype-quickstart (org.apache.maven.archetypes:maven-archetype-quickstart:1.
0)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archet
ype-quickstart-1.0.jar
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archety
pe-quickstart-1.0.jar (5 KB at 4.3 KB/sec)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archet
ype-quickstart-1.0.pom
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archety
pe-quickstart-1.0.pom (703 B at 1.6 KB/sec)
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Old (1.x) Archetype: maven-archetype-quickstart:1.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.conan.myhadoop.mr
[INFO] Parameter: packageName, Value: org.conan.myhadoop.mr
[INFO] Parameter: package, Value: org.conan.myhadoop.mr
[INFO] Parameter: artifactId, Value: myHadoop
[INFO] Parameter: basedir, Value: D:\workspace\java
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] project created from Old (1.x) Archetype in dir: D:\workspace\java\myHadoop
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 8.896s
[INFO] Finished at: Sun Sep 29 20:57:07 CST 2013
[INFO] Final Memory: 9M/179M
[INFO] ------------------------------------------------------------------------

进入项目,执行mvn命令


~ D:\workspace\java>cd myHadoop
~ D:\workspace\java\myHadoop>mvn clean install
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ myHadoop ---
[INFO] Building jar: D:\workspace\java\myHadoop\target\myHadoop-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-install-plugin:2.3.1:install (default-install) @ myHadoop ---
[INFO] Installing D:\workspace\java\myHadoop\target\myHadoop-1.0-SNAPSHOT.jar to C:\Users\Administrator\.m2\repository\o
rg\conan\myhadoop\mr\myHadoop\1.0-SNAPSHOT\myHadoop-1.0-SNAPSHOT.jar
[INFO] Installing D:\workspace\java\myHadoop\pom.xml to C:\Users\Administrator\.m2\repository\org\conan\myhadoop\mr\myHa
doop\1.0-SNAPSHOT\myHadoop-1.0-SNAPSHOT.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.348s
[INFO] Finished at: Sun Sep 29 20:58:43 CST 2013
[INFO] Final Memory: 11M/179M
[INFO] ------------------------------------------------------------------------

2). 导入项目到eclipse

我们创建好了一个基本的maven项目,然后导入到eclipse中。 这里我们最好已安装好了Maven的插件。

hadoop-eclipse

3). 增加hadoop依赖

这里我使用hadoop-1.0.3版本,修改文件:pom.xml


~ vi pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.conan.myhadoop.mr</groupId>
<artifactId>myHadoop</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>myHadoop</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.3</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

4). 下载依赖

下载依赖:

~ mvn clean install

在eclipse中刷新项目:

hadoop-eclipse-maven

项目的依赖程序,被自动加载的库路径下面。

5). 从Hadoop集群环境下载hadoop配置文件

    • core-site.xml
    • hdfs-site.xml
    • mapred-site.xml

查看core-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://master:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/conan/hadoop/tmp</value>
</property>
<property>
<name>io.sort.mb</name>
<value>256</value>
</property>
</configuration>

查看hdfs-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>dfs.data.dir</name>
<value>/home/conan/hadoop/data</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>

查看mapred-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>hdfs://master:9001</value>
</property>
</configuration>

保存在src/main/resources/hadoop目录下面

hadoop-config

删除原自动生成的文件:App.java和AppTest.java

6).配置本地host,增加master的域名指向


~ vi c:/Windows/System32/drivers/etc/hosts

192.168.1.210 master

6. MapReduce程序开发

编写一个简单的MapReduce程序,实现wordcount功能。

新一个Java文件:WordCount.java


package org.conan.myhadoop.mr;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {

    public static class WordCountMapper extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                output.collect(word, one);
            }

        }
    }

    public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            result.set(sum);
            output.collect(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        String input = "hdfs://192.168.1.210:9000/user/hdfs/o_t_account";
        String output = "hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result";

        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("WordCount");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(WordCountMapper.class);
        conf.setCombinerClass(WordCountReducer.class);
        conf.setReducerClass(WordCountReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        JobClient.runJob(conf);
        System.exit(0);
    }

}

启动Java APP.

控制台错误


2013-9-30 19:25:02 org.apache.hadoop.util.NativeCodeLoader 
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-9-30 19:25:02 org.apache.hadoop.security.UserGroupInformation doAs
严重: PriviledgedActionException as:Administrator cause:java.io.IOException: Failed to set permissions of path: \tmp\hadoop-Administrator\mapred\staging\Administrator1702422322\.staging to 0700
Exception in thread "main" java.io.IOException: Failed to set permissions of path: \tmp\hadoop-Administrator\mapred\staging\Administrator1702422322\.staging to 0700
	at org.apache.hadoop.fs.FileUtil.checkReturnValue(FileUtil.java:689)
	at org.apache.hadoop.fs.FileUtil.setPermission(FileUtil.java:662)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:509)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:344)
	at org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:189)
	at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:116)
	at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:856)
	at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
	at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
	at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:824)
	at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261)
	at org.conan.myhadoop.mr.WordCount.main(WordCount.java:78)

这个错误是win中开发特有的错误,文件权限问题,在Linux下可以正常运行。

解决方法是,修改/hadoop-1.0.3/src/core/org/apache/hadoop/fs/FileUtil.java文件

688-692行注释,然后重新编译源代码,重新打一个hadoop.jar的包。


685 private static void checkReturnValue(boolean rv, File p,
686                                        FsPermission permission
687                                        ) throws IOException {
688     /*if (!rv) {
689       throw new IOException("Failed to set permissions of path: " + p +
690                             " to " +
691                             String.format("%04o", permission.toShort()));
692     }*/
693   }

我这里自己打了一个hadoop-core-1.0.3.jar包,放到了lib下面。

我们还要替换maven中的hadoop类库。


~ cp lib/hadoop-core-1.0.3.jar C:\Users\Administrator\.m2\repository\org\apache\hadoop\hadoop-core\1.0.3\hadoop-core-1.0.3.jar

再次启动Java APP,控制台输出:


2013-9-30 19:50:49 org.apache.hadoop.util.NativeCodeLoader 
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-9-30 19:50:49 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-9-30 19:50:49 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
2013-9-30 19:50:49 org.apache.hadoop.io.compress.snappy.LoadSnappy 
警告: Snappy native library not loaded
2013-9-30 19:50:49 org.apache.hadoop.mapred.FileInputFormat listStatus
信息: Total input paths to process : 4
2013-9-30 19:50:50 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
2013-9-30 19:50:50 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:50 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2013-9-30 19:50:51 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 0% reduce 0%
2013-9-30 19:50:53 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00003:0+119
2013-9-30 19:50:53 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2013-9-30 19:50:53 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:53 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
2013-9-30 19:50:54 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2013-9-30 19:50:56 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00000:0+113
2013-9-30 19:50:56 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000001_0' done.
2013-9-30 19:50:56 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:56 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000002_0 is done. And is in the process of commiting
2013-9-30 19:50:59 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00001:0+110
2013-9-30 19:50:59 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00001:0+110
2013-9-30 19:50:59 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000002_0' done.
2013-9-30 19:50:59 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:59 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000003_0 is done. And is in the process of commiting
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00002:0+79
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000003_0' done.
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-9-30 19:51:02 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 4 sorted segments
2013-9-30 19:51:02 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 4 segments left of total size: 442 bytes
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2013-9-30 19:51:02 org.apache.hadoop.mapred.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result
2013-9-30 19:51:05 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-9-30 19:51:05 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2013-9-30 19:51:06 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-9-30 19:51:06 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=421
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=348
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=7377
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=1535
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=209510
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=348
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=458
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map input records=11
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=30
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=509
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1838546944
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map input bytes=421
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=452
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=22
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=15
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=13
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=15
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=13
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map output records=22

成功运行了wordcount程序,通过命令我们查看输出结果


~ hadoop fs -ls hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result

Found 2 items
-rw-r--r--   3 Administrator supergroup          0 2013-09-30 19:51 /user/hdfs/o_t_account/result/_SUCCESS
-rw-r--r--   3 Administrator supergroup        348 2013-09-30 19:51 /user/hdfs/o_t_account/result/part-00000

~ hadoop fs -cat hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result/part-00000

1,abc@163.com,2013-04-22        1
10,ade121@sohu.com,2013-04-23   1
11,addde@sohu.com,2013-04-23    1
17:21:24.0      5
2,dedac@163.com,2013-04-22      1
20:21:39.0      6
3,qq8fed@163.com,2013-04-22     1
4,qw1@163.com,2013-04-22        1
5,af3d@163.com,2013-04-22       1
6,ab34@163.com,2013-04-22       1
7,q8d1@gmail.com,2013-04-23     1
8,conan@gmail.com,2013-04-23    1
9,adeg@sohu.com,2013-04-23      1

这样,我们就实现了在win7中的开发,通过Maven构建Hadoop依赖环境,在Eclipse中开发MapReduce的程序,然后运行JavaAPP。Hadoop应用会自动把我们的MR程序打成jar包,再上传的远程的hadoop环境中运行,返回日志在Eclipse控制台输出。

7. 模板项目上传github

https://github.com/bsspirit/maven_hadoop_template

大家可以下载这个项目,做为开发的起点。

~ git clone https://github.com/bsspirit/maven_hadoop_template.git

我们完成第一步,下面就将正式进入MapReduce开发实践。

 

转载请注明出处:
http://blog.fens.me/hadoop-maven-eclipse/

打赏作者

解读Nodejs多核处理模块cluster

从零开始nodejs系列文章,将介绍如何利Javascript做为服务端脚本,通过Nodejs框架web开发。Nodejs框架是基于V8的引擎,是目前速度最快的Javascript引擎。chrome浏览器就基于V8,同时打开20-30个网页都很流畅。Nodejs标准的web开发框架Express,可以帮助我们迅速建立web站点,比起PHP的开发效率更高,而且学习曲线更低。非常适合小型网站,个性化网站,我们自己的Geek网站!!

关于作者

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/nodejs-core-cluster/

nodejs-cluster

前言
大家都知道nodejs是一个单进程单线程的服务器引擎,不管有多么的强大硬件,只能利用到单个CPU进行计算。所以,有人开发了第三方的cluster,让node可以利用多核CPU实现并行。

随着nodejs的发展,让nodejs上生产环境,就必须是支持多进程多核处理!在V0.6.0版本,Nodejs内置了cluster的特性。自此,Nodejs终于可以作为一个独立的应用开发解决方案,映入大家眼帘了。

目录

  1. cluster介绍
  2. cluster的简单使用
  3. cluster的工作原理
  4. cluster的API
  5. master和worker的通信
  6. 用cluster实现负载均衡(Load Balance) — win7失败
  7. 用cluster实现负载均衡(Load Balance) — ubuntu成功
  8. cluster负载均衡策略的测试

1. cluster介绍

cluster是一个nodejs内置的模块,用于nodejs多核处理。cluster模块,可以帮助我们简化多进程并行化程序的开发难度,轻松构建一个用于负载均衡的集群。

2. cluster的简单使用

我的系统环境

  • win7 64bit
  • Nodejs:v0.10.5
  • Npm:1.2.19

在win的环境中,我们通过cluster启动多核的node提供web服务。

新建工程目录:


~ D:\workspace\javascript>mkdir nodejs-cluster && cd nodejs-cluster

新建文件:app.js


~ vi app.js

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log("master start...");

    // Fork workers.
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    cluster.on('listening',function(worker,address){
        console.log('listening: worker ' + worker.process.pid +', Address: '+address.address+":"+address.port);
    });

    cluster.on('exit', function(worker, code, signal) {
        console.log('worker ' + worker.process.pid + ' died');
    });
} else {
    http.createServer(function(req, res) {
        res.writeHead(200);
        res.end("hello world\n");
    }).listen(0);
}

在控制台启动node程序


~ D:\workspace\javascript\nodejs-cluster>node app.js

master start...
listening: worker 2368, Address: 0.0.0.0:57132
listening: worker 1880, Address: 0.0.0.0:57132
listening: worker 1384, Address: 0.0.0.0:57132
listening: worker 1652, Address: 0.0.0.0:57132

master是总控节点,worker是运行节点。然后根据CPU的数量,启动worker。我本地是双核双通道的CPU,所以被检测为4核,启动了4个worker。

3. cluster的工作原理

每个worker进程通过使用child_process.fork()函数,基于IPC(Inter-Process Communication,进程间通信),实现与master进程间通信。

当worker使用server.listen(...)函数时 ,将参数序列传递给master进程。如果master进程已经匹配workers,会将传递句柄给工人。如果master没有匹配好worker,那么会创建一个worker,再传递并句柄传递给worker。

在边界条件,有3个有趣的行为:
注:下面server.listen(),是对底层“http.Server-->net.Server”类的调用。

  • 1. server.listen({fd: 7}):在master和worker通信过程,通过传递文件,master会监听“文件描述为7”,而不是传递“文件描述为7”的引用。
  • 2. server.listen(handle):master和worker通信过程,通过handle函数进行通信,而不用进程联系
  • 3. server.listen(0):在master和worker通信过程,集群中的worker会打开一个随机端口共用,通过socket通信,像上例中的57132

当多个进程都在 accept() 同样的资源的时候,操作系统的负载均衡非常高效。Node.js没有路由逻辑,worker之间没有共享状态。所以,程序要设计得简单一些,比如基于内存的session。

因为workers都是独力运行的,根据程序的需要,它们可以被独立删除或者重启,worker并不相互影响。只要还有workers存活,则master将继续接收连接。Node不会自动维护workers的数目。我们可以建立自己的连接池。

4. cluster的API

官网地址:http://nodejs.org/api/cluster.html#cluster_cluster

cluster对象
cluster的各种属性和函数

  • cluster.setttings:配置集群参数对象
  • cluster.isMaster:判断是不是master节点
  • cluster.isWorker:判断是不是worker节点
  • Event: 'fork': 监听创建worker进程事件
  • Event: 'online': 监听worker创建成功事件
  • Event: 'listening': 监听worker向master状态事件
  • Event: 'disconnect': 监听worker断线事件
  • Event: 'exit': 监听worker退出事件
  • Event: 'setup': 监听setupMaster事件
  • cluster.setupMaster([settings]): 设置集群参数
  • cluster.fork([env]): 创建worker进程
  • cluster.disconnect([callback]): 关闭worket进程
  • cluster.worker: 获得当前的worker对象
  • cluster.workers: 获得集群中所有存活的worker对象

worker对象
worker的各种属性和函数:可以通过cluster.workers, cluster.worket获得。

  • worker.id: 进程ID号
  • worker.process: ChildProcess对象
  • worker.suicide: 在disconnect()后,判断worker是否自杀
  • worker.send(message, [sendHandle]): master给worker发送消息。注:worker给发master发送消息要用process.send(message)
  • worker.kill([signal='SIGTERM']): 杀死指定的worker,别名destory()
  • worker.disconnect(): 断开worker连接,让worker自杀
  • Event: 'message': 监听master和worker的message事件
  • Event: 'online': 监听指定的worker创建成功事件
  • Event: 'listening': 监听master向worker状态事件
  • Event: 'disconnect': 监听worker断线事件
  • Event: 'exit': 监听worker退出事件

5. master和worker的通信

实现cluster的API,让master和worker相互通信。

新建文件: cluster.js


~ vi cluster.js

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log('[master] ' + "start master...");

    for (var i = 0; i < numCPUs; i++) {
        var wk = cluster.fork();
        wk.send('[master] ' + 'hi worker' + wk.id);
    }

    cluster.on('fork', function (worker) {
        console.log('[master] ' + 'fork: worker' + worker.id);
    });

    cluster.on('online', function (worker) {
        console.log('[master] ' + 'online: worker' + worker.id);
    });

    cluster.on('listening', function (worker, address) {
        console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
    });

    cluster.on('disconnect', function (worker) {
        console.log('[master] ' + 'disconnect: worker' + worker.id);
    });

    cluster.on('exit', function (worker, code, signal) {
        console.log('[master] ' + 'exit worker' + worker.id + ' died');
    });

    function eachWorker(callback) {
        for (var id in cluster.workers) {
            callback(cluster.workers[id]);
        }
    }

    setTimeout(function () {
        eachWorker(function (worker) {
            worker.send('[master] ' + 'send message to worker' + worker.id);
        });
    }, 3000);

    Object.keys(cluster.workers).forEach(function(id) {
        cluster.workers[id].on('message', function(msg){
            console.log('[master] ' + 'message ' + msg);
        });
    });

} else if (cluster.isWorker) {
    console.log('[worker] ' + "start worker ..." + cluster.worker.id);

    process.on('message', function(msg) {
        console.log('[worker] '+msg);
        process.send('[worker] worker'+cluster.worker.id+' received!');
    });

    http.createServer(function (req, res) {
            res.writeHead(200, {"content-type": "text/html"});
            res.end('worker'+cluster.worker.id+',PID:'+process.pid);
    }).listen(3000);

}

控制台日志:


~ D:\workspace\javascript\nodejs-cluster>node cluster.js

[master] start master...
[worker] start worker ...1
[worker] [master] hi worker1
[worker] start worker ...2
[worker] [master] hi worker2
[master] fork: worker1
[master] fork: worker2
[master] fork: worker3
[master] fork: worker4
[master] online: worker1
[master] online: worker2
[master] message [worker] worker1 received!
[master] message [worker] worker2 received!
[master] listening: worker1,pid:6068, Address:0.0.0.0:3000
[master] listening: worker2,pid:1408, Address:0.0.0.0:3000
[master] online: worker3
[worker] start worker ...3
[worker] [master] hi worker3
[master] message [worker] worker3 received!
[master] listening: worker3,pid:3428, Address:0.0.0.0:3000
[master] online: worker4
[worker] start worker ...4
[worker] [master] hi worker4
[master] message [worker] worker4 received!
[master] listening: worker4,pid:6872, Address:0.0.0.0:3000
[worker] [master] send message to worker1
[worker] [master] send message to worker2
[worker] [master] send message to worker3
[worker] [master] send message to worker4
[master] message [worker] worker1 received!
[master] message [worker] worker2 received!
[master] message [worker] worker3 received!
[master] message [worker] worker4 received!

6. 用cluster实现负载均衡(Load Balance) -- win7失败

新建文件: server.js


~ vi server.js

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log('[master] ' + "start master...");

    for (var i = 0; i < numCPUs; i++) {
         cluster.fork();
    }

    cluster.on('listening', function (worker, address) {
        console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
    });

} else if (cluster.isWorker) {
    console.log('[worker] ' + "start worker ..." + cluster.worker.id);
    http.createServer(function (req, res) {
        console.log('worker'+cluster.worker.id);
        res.end('worker'+cluster.worker.id+',PID:'+process.pid);
    }).listen(3000);
}

启动服务器:


~ D:\workspace\javascript\nodejs-cluster>node server.js
[master] start master...
[worker] start worker ...1
[worker] start worker ...2
[master] listening: worker1,pid:1536, Address:0.0.0.0:3000
[master] listening: worker2,pid:5920, Address:0.0.0.0:3000
[worker] start worker ...3
[master] listening: worker3,pid:7156, Address:0.0.0.0:3000
[worker] start worker ...4
[master] listening: worker4,pid:2868, Address:0.0.0.0:3000
worker4
worker4
worker4
worker4
worker4
worker4
worker4
worker4

用curl工具访问


C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868
C:\Users\Administrator>curl localhost:3000
worker4,PID:2868

我们发现了cluster在win中的bug,只用到worker4。果断切换到Linux测试。

7. 用cluster实现负载均衡(Load Balance) -- ubuntu成功

Linux的系统环境

  • Linux: Ubuntu 12.04.2 64bit Server
  • Node: v0.11.2
  • Npm: 1.2.21

构建项目:不多解释


~ cd :/home/conan/nodejs/
~ mkdir nodejs-cluster && cd nodejs-cluster
~ vi server.js

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log('[master] ' + "start master...");

    for (var i = 0; i < numCPUs; i++) {
         cluster.fork();
    }

    cluster.on('listening', function (worker, address) {
        console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
    });

} else if (cluster.isWorker) {
    console.log('[worker] ' + "start worker ..." + cluster.worker.id);
    http.createServer(function (req, res) {
        console.log('worker'+cluster.worker.id);
        res.end('worker'+cluster.worker.id+',PID:'+process.pid);
    }).listen(3000);
}

启动服务器


conan@conan-deskop:~/nodejs/nodejs-cluster$ node server.js
[master] start master...
[worker] start worker ...1
[master] listening: worker1,pid:2925, Address:0.0.0.0:3000
[worker] start worker ...3
[master] listening: worker3,pid:2931, Address:0.0.0.0:3000
[worker] start worker ...4
[master] listening: worker4,pid:2932, Address:0.0.0.0:3000
[worker] start worker ...2
[master] listening: worker2,pid:2930, Address:0.0.0.0:3000
worker4
worker2
worker1
worker3
worker4
worker2
worker1

用curl工具访问


C:\Users\Administrator>curl 192.168.1.20:3000
worker4,PID:2932
C:\Users\Administrator>curl 192.168.1.20:3000
worker2,PID:2930
C:\Users\Administrator>curl 192.168.1.20:3000
worker1,PID:2925
C:\Users\Administrator>curl 192.168.1.20:3000
worker3,PID:2931
C:\Users\Administrator>curl 192.168.1.20:3000
worker4,PID:2932
C:\Users\Administrator>curl 192.168.1.20:3000
worker2,PID:2930
C:\Users\Administrator>curl 192.168.1.20:3000
worker1,PID:2925

在Linux环境中,cluster是运行正确的!!!

8. cluster负载均衡策略的测试

我们在Linux下面,完成测试,用过测试软件: siege

安装siege

~ sudo apt-get install siege

启动node cluster

~ node server.js > server.log

运行siege启动命令,每秒50个并发请求。


~ sudo siege -c 50 http://localhost:3000

HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.01 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.01 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.02 secs:      16 bytes ==> /
HTTP/1.1 200   0.00 secs:      16 bytes ==> /
HTTP/1.1 200   0.02 secs:      16 bytes ==> /
HTTP/1.1 200   0.01 secs:      16 bytes ==> /
HTTP/1.1 200   0.01 secs:      16 bytes ==> /
.....

^C
Lifting the server siege...      done.                                                                Transactions:                    3760 hits
Availability:                 100.00 %
Elapsed time:                  39.66 secs
Data transferred:               0.06 MB
Response time:                  0.01 secs
Transaction rate:              94.81 trans/sec
Throughput:                     0.00 MB/sec
Concurrency:                    1.24
Successful transactions:        3760
Failed transactions:               0
Longest transaction:            0.20
Shortest transaction:           0.00

FILE: /var/siege.log
You can disable this annoying message by editing
the .siegerc file in your home directory; change
the directive 'show-logfile' to false.

我们统计结果,执行3760次请求,消耗39.66秒,每秒处理94.81次请求。

查看server.log文件,


~  ls -l
total 64
-rw-rw-r-- 1 conan conan   756  9月 28 15:48 server.js
-rw-rw-r-- 1 conan conan 50313  9月 28 16:26 server.log

~ tail server.log
worker4
worker1
worker2
worker4
worker1
worker2
worker4
worker3
worker2
worker1

最后,用R语言分析一下:server.log


~ R 

> df<-read.table(file="server.log",skip=9,header=FALSE)
> summary(df)
       V1
 worker1:1559
 worker2:1579
 worker3:1570
 worker4:1535

我们看到,请求被分配到worker数据量相当。所以,cluster的负载均衡的策略,应该是随机分配的。

好了,我们又学了一个很有用的技能!利用cluster可以构建出多核应用,充分的利用多CPU带业的性能吧!!

转载请注明出处:
http://blog.fens.me/nodejs-core-cluster/

打赏作者

Nodejs基础中间件Connect

从零开始nodejs系列文章,将介绍如何利Javascript做为服务端脚本,通过Nodejs框架web开发。Nodejs框架是基于V8的引擎,是目前速度最快的Javascript引擎。chrome浏览器就基于V8,同时打开20-30个网页都很流畅。Nodejs标准的web开发框架Express,可以帮助我们迅速建立web站点,比起PHP的开发效率更高,而且学习曲线更低。非常适合小型网站,个性化网站,我们自己的Geek网站!!

关于作者

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:

http://blog.fens.me/nodejs-connect/

connect

前言

“中间件”在软件领域是一个非常广的概念,除操作系统的软件都可以称为中间件,比如,消息中间件,ESB中间件,日志中间件,数据库中间件等等。

Connect被定义为Node平台的中间件框架,从定位上看Connect一定是出众的,广泛兼容的,稳定的,基础的平台性框架。如果攻克Connect,会有助于我们更了解Node的世界。Express就是基于Connect开发的。

让我们开始探索Connect中间件。

目录

  1. Connect介绍
  2. Connect安装
  3. Connect内置中间件介绍
  4. logger
  5. cookieParser
  6. session
  7. cookieSession
  8. compress
  9. basicAuth
  10. bodyParser
  11. json
  12. urlencoded
  13. multipart
  14. timeout
  15. reponseTime
  16. methodOverride
  17. csrf
  18. static
  19. staticCache
  20. directory
  21. vhost
  22. favicon
  23. limit
  24. query
  25. errorHadnler

1. Connect介绍

Connect是一个node中间件(middleware)框架。如果把一个http处理过程比作是污水处理,中间件就像是一层层的过滤网。每个中间件在http处理过程中通过改写request或(和)response的数据、状态,实现了特定的功能。这些功能非常广泛,下图列出了connect所有内置中间件和部分第三方中间件。 这里能看到完整的中间件列表

下图根据中间件在整个http处理流程的位置,将中间件大致分为3类:

  • 1. Pre-Request 通常用来改写request的原始数据
  • 2. Request/Response 大部分中间件都在这里,功能各异
  • 3. Post-Response 全局异常处理,改写response数据等

connect-middleware

关于Connect介绍部分,摘自:http://www.cnblogs.com/luics/archive/2012/11/28/2775206.html

2. Connect安装

我的系统环境

  • win7 64bit
  • Nodejs:v0.10.5
  • Npm:1.2.19

通过nodejs安装Connect


~ D:\workspace\javascript>mkdir nodejs-connect && cd nodejs-connect
~ D:\workspace\javascript\nodejs-connect> npm install connect
connect@2.9.0 node_modules\connect
├── methods@0.0.1
├── uid2@0.0.2
├── pause@0.0.1
├── cookie-signature@1.0.1
├── fresh@0.2.0
├── qs@0.6.5
├── bytes@0.2.0
├── buffer-crc32@0.2.1
├── cookie@0.1.0
├── debug@0.7.2
├── send@0.1.4 (range-parser@0.0.4, mime@1.2.11)
└── multiparty@2.1.8 (stream-counter@0.1.0, readable-stream@1.0.17)

尝试做一个最简单的web服务器

增加一个文件:app.js


var connect = require('connect');
var app = connect()
    .use(connect.logger('dev'))
    .use(function (req, res) {
        res.end('hello world\n');
    })
    .listen(3000);

启动node


~ D:\workspace\javascript\nodejs-connect>node app.js
GET / 200 5ms
GET /favicon.ico 200 0ms

打开浏览器:http://localhost:3000/

connect-web

3. Connect内置中间件介绍

22个内置中间件列表

下面将分别介绍这22个中间件。

4. logger

描述:用来输出用户请求日志。

参数:options或者format字符串

options:

  • format:参考下面的tokens
  • stream:输出流,默认是stdout
  • buffer:缓冲时间,默认是1000ms
  • immediate:立刻打印日志

tokens: format格式

  • :req[header] ex: :req[Accept]
  • :res[header] ex: :res[Content-Length]
  • :http-version
  • :response-time
  • :remote-addr
  • :date
  • :method
  • :url
  • :referrer
  • :user-agent
  • :status

Formats:缩写

  • default ‘:remote-addr – – [:date] “:method :url HTTP/:http-version” :status :res[content-length] “:referrer” “:user-agent”‘
  • short ‘:remote-addr – :method :url HTTP/:http-version :status :res[content-length] – :response-time ms’
  • tiny ‘:method :url :status :res[content-length] – :response-time ms’
  • dev concise output colored by response status for development use

例子:新建logger.js


var connect = require('connect');
var app = connect()
    .use(connect.logger())
    .use(function (req, res) {
        res.end('hello world\n');
    })
    .listen(3000);

connect.logger()


127.0.0.1 - - [Mon, 23 Sep 2013 05:14:18 GMT] "GET / HTTP/1.1" 200 - "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKi
t/537.36 (KHTML, like Gecko) Chrome/28.0.1500.95 Safari/537.36"
127.0.0.1 - - [Mon, 23 Sep 2013 05:14:18 GMT] "GET /favicon.ico HTTP/1.1" 200 - "-" "Mozilla/5.0 (Windows NT 6.1; WOW64)
 AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.95 Safari/537.36"

connect.logger(‘short’)


127.0.0.1 - GET / HTTP/1.1 200 - - 9 ms
127.0.0.1 - GET /favicon.ico HTTP/1.1 200 - - 1 ms

connect.logger(‘dev’)


GET / 200 5ms
GET /favicon.ico 200 1ms

connect.logger(function(tokens, req, res){ return ‘some format string’ })


some format string
some format string

所以在开发环境,我们日志设置成dev就好了。

5. cookieParser

描述:cookie解析中间件,解析Cookies的头通过req.cookies得到cookies。还可以通过req.secret加密cookies。

例子:新建cookieParser.js


var connect = require('connect');
var app = connect()
    .use(connect.logger('dev'))
    .use(connect.cookieParser('secret string'))
    .use(function (req, res, next) {
        req.cookies.website="blog.fens.me";
        res.end(JSON.stringify(req.cookies));
    })
    .listen(3000);

connect-cookiesParser

6. session

描述:会话管理中间件

依赖:cookieParser

参数:options

options:

  • key:Cookies名,默认值为connect.sid
  • store: session存储实例
  • secret: session的cookie加密
  • cookie: session的cookie配置,默认值为{path: ‘/’, httpOnly: true, maxAge: null}
  • proxy:安全cookie的反向代理,通过x-forwarded-proto实现

Cookie option:

cookie.maxAge: 默认值null,表示当浏览器关闭后cookie被删除。

例子:新建session.js


var connect = require('connect');
var app = connect()
    .use(connect.logger('dev'))
    .use(connect.cookieParser())
    .use(connect.session({secret: '123', cookie: { maxAge: 60000 }}))
    .use(function (req, res, next) {
        if(req.session.pv){
            res.setHeader('Content-Type', 'text/html');
            res.write('views: ' + req.session.pv);
            req.session.pv++;
            res.end();
        }else{
            req.session.pv = 1;
            res.end('Refresh');
        }

    })
    .listen(3000);

connect-session

7. cookieSession

描述:基于cookies的会话中间件

参数:options:

options:

  • key:Cookies名,默认值为connect.sess
  • secret: 防止cookie窃取
  • cookie: session的cookie配置,默认值为{path: ‘/’, httpOnly: true, maxAge: null}
  • proxy:安全cookie的反向代理,通过x-forwarded-proto实现

Clearing sessions:

req.session = null;

例子:新建cookieSession.js


var connect = require('connect');
var connect = require('connect');
var app = connect()
    .use(connect.logger('dev'))
    .use(connect.cookieParser())
    .use(connect.cookieSession({ secret: 'adddaa!', cookie: { maxAge: 60 * 60 * 1000 }}))
    .use(function (req, res, next) {
        if(req.session.pv){
            res.setHeader('Content-Type', 'text/html');
            res.write('views: ' + req.session.pv );
            req.session.pv++;
            console.log(req.session.pv);
            res.end();
        }else{
            req.session.pv = 1;
            res.end('Refresh');
        }

    })
    .listen(3000);

connect-cookieSession

我们发现,这次不管刷新多少次页面,req.session.pv始终是1.

8. compress

描述:gzip压缩中间件,通过filter函数设置,需要压缩的文件类型。压缩算法为gzip/deflate。

filter函数


exports.filter = function(req, res){
  return /json|text|javascript|dart|image\/svg\+xml|application\/x-font-ttf|application\/vnd\.ms-opentype|application\/vnd\.ms-fontobject/.test(res.getHeader('Content-Type'));
};

Threshold压缩阈值:当响应请求大于阈值,则压缩响应请求。

参数:options

  • chunkSize (default: 16*1024)
  • windowBits
  • level: 0-9 where 0 is no compression, and 9 is slow but best compression
  • memLevel: 1-9 low is slower but uses less memory, high is fast but uses more
  • strategy: compression strategy

例子:新建compress.js


var connect = require('connect');
var app = connect()
    .use(connect.logger('dev'))
    .use(connect.compress({level:9}))
    .use(function (req, res) {
        res.setHeader('Content-Type', 'text/html');
        res.write(res);
        res.end('hello world\n');
    })
    .listen(3000);

connect-compress

9. basicAuth

描述:basic认证中间件,实现简单的用户密码登陆,当用户和密码验证通过后,通过一个callback方法继续执行。

同步验证:


connect()
  .use(connect.basicAuth(function(user, pass){
    return 'tj' == user && 'wahoo' == pass;
  }))

异步验证:


connect()
  .use(connect.basicAuth(function(user, pass, fn){
    User.authenticate({ user: user, pass: pass }, fn);
  }))

例子:新建basicAuth.js


var connect = require('connect');
var app = connect();
app.use(connect.logger('dev'));

//  基本用法
//  app.use(connect.basicAuth('fens','fens'))

//  同步验证
app.use(connect.basicAuth(function (user, pass) {
    var isLogin = 'fens' == user && 'fens' == pass;
    console.log("Login:" + isLogin);
    return isLogin;
}))
app.use(function (req, res) {
    res.end('hello world\n');
})
app.listen(3000);

验证弹出框

connect-basicAuth1

正确输入用户和密码后,访问页面

connect-basicAuth2

10. bodyParser

描述:请求内容解析中间件,支持多种类型application/json,

application/x-www-form-urlencoded, multipart/form-data.

与其他的3个中间件相同:


app.use(connect.json());
app.use(connect.urlencoded());
app.use(connect.multipart());

例子:新建bodyParser.js


var connect = require('connect');
var app = connect()
    .use(connect.logger('dev'))
    .use(connect.bodyParser())
    .use(function(req, res) {
        res.end('req.body=>' + JSON.stringify(req.body));
    })
    .listen(3000);

POST方法:


~ curl -d 'user[name]=tj' http://localhost:3000/
req.body=>{"'user":{"name":"tj'"}}

GET方法:


~ curl http://localhost:3000/?user=123
req.body=>{}

11. json

描述:JSON解析中间件,req.body传值

参数:options

  • strict: when false anything JSON.parse() accepts will be parsed
  • reviver: used as the second “reviver” argument for JSON.parse
  • limit: byte limit [1mb]

同bodyParser。

12. urlencoded

描述:application/x-www-form-urlencode请求解析中间件

参数:options

  • limit: byte limit [1mb]

同bodyParser。

13. multipart

描述:multipart/form-data请求解析中间件,解析req.body和req.files.

上传文件:uploadDir


app.use(connect.multipart({ uploadDir: path }))

参数:options

  • limit: byte limit defaulting to [100mb]
  • defer: 在不等待“结束”事件,通过调用req.form.next()函数,可以缓冲并处理多个表单对象,还可以绑定到“progress”或“events”的事件。

Temporary Files:临时文件

默认情况下,临时文件会被保存在os.tmpDir()目录,但不会自动回归,我们必须手动处理。如果没有使用defer选项时,你可以通过req.files来获得对象的使用。


req.files.images.forEach(function(file){
  console.log('  uploaded : %s %skb : %s', file.originalFilename, file.size / 1024 | 0, file.path);
});

Streaming:流式处理

当使用defer选项时,文件在上传过程中,你可以通过”part”事件和流控制访问文件。


req.form.on('part', function(part){
  // transfer to s3 etc
  console.log('upload %s %s', part.name, part.filename);
  var out = fs.createWriteStream('/tmp/' + part.filename);
  part.pipe(out);
});

req.form.on('close', function(){
  res.end('uploaded!');
});

例子:新建multipart.js


var connect = require('connect');
var app = connect()
.use(connect.logger('dev'))
.use(connect.multipart({ uploadDir: 'd:\\tmp' }))
.use(function (req, res) {
if(req.method=='POST'){
console.log(req.files);
res.end('Upload ==>'+ req.files.file.path);
}
res.setHeader('Content-Type', 'text/html');
res.write('<form enctype="multipart/form-data" method="POST"><input type="file" name="file">');
res.write('<input type="submit" value="submit"/>');
res.write('</form>');
res.end('hello world\n');
})
.listen(3000);

通过form表单选择文件

connect-multipart1

POST请求解析

connect-multipart2

14. timeout

描述:请求超时中间件,默认超时时间是5000ms,可以清除这个时间通过req.clearTimeout()函数。超时的错误,可以通过next()函数传递。我们也可以设置超时的响应错误状态码:.timeout=503

例子:新建timeout.js,模拟超时


var connect = require('connect');
var app = connect()
    .use(connect.logger('dev'))
    .use(connect.timeout(1000))
    .use(function (req, res) {
        setTimeout(function(){
            res.end('hello world\n');
        },5000)
    })
    .listen(3000);

控制台输出:


Error: Response timeout
    at IncomingMessage. (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\middleware\timeout.j
s:39:17)
    at IncomingMessage.EventEmitter.emit (events.js:95:17)
    at null._onTimeout (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\middleware\timeout.js:34:11)
    at Timer.listOnTimeout [as ontimeout] (timers.js:110:15)
GET / 503 1030ms - 389b
Error: Response timeout
    at IncomingMessage. (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\middleware\timeout.j
s:39:17)
    at IncomingMessage.EventEmitter.emit (events.js:95:17)
    at null._onTimeout (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\middleware\timeout.js:34:11)
    at Timer.listOnTimeout [as ontimeout] (timers.js:110:15)
GET /favicon.ico 503 1006ms - 389b

15. reponseTime

描述:计算响应时间中间件,在response的header增加X-Response-Time,计算响应时间。

例子:新建reponseTime.js


var connect = require('connect');
var app = connect()
    .use(connect.logger('dev'))
    .use(connect.responseTime())
    .use(function (req, res) {
        res.end('hello world\n');
    })
    .listen(3000);

connect-responseTime

16. methodOverride

无法模拟出效果,暂时先跳过

描述: 提供伪造HTTP中间件,检查一个method是否被重写,通过传递一个key,得到_method,原始的方法通过req.originalMethod获得。

例子:新建methodOverride.js


var connect = require('connect');
var app = connect()
    .use(connect.logger('dev'))
    .use(connect.methodOverride('KEY'))
    .use(function (req, res) {
        res.end(JSON.stringify(req.headers));
    })
    .listen(3000);

17. csrf

描述:跨域请求csrf保护中间件,通过req.csrfToken()令牌函数绑定到请求的表单字段。这个令牌会对访客会话进行验证。

默认情况会检查通过bodyParser()产生的req.body,query()函数产生的req.query,和X-CSRF-Token的header。

依赖:session(), cookieParser()

参数:options:

  • value: 一个函数接受请求,返回的令牌

例子:新建csrf.js


var connect = require('connect');
var app = connect()
    .use(connect.logger('dev'))
    .use(connect.cookieParser())
    .use(connect.session({secret: '123', cookie: { maxAge: 60000 }}))
    .use(connect.csrf({value:'admin'}))
    .use(function (req, res) {
        res.setHeader('Content-Type', 'text/html');
        res.end('hello world\n');
        console.log(req.csrfToken());
    })
    .listen(3000);

生成的csrf Token:


1YZ72JuGRTOh/mzqByktPoyz2C+Dk1E5wXyj0=
GET / 200 356ms
bItSjAAXydK4jkYYLDnc1c0+7AGDFwGX6r8ns=
GET /favicon.ico 200 3ms

18. static

描述: 静态文件处理中间件,设置root路径作为静态文件服务器

参数:options:

options:

  • maxAge:浏览器缓存存活时间(毫秒),默认值0
  • hidden:是否允许传递隐藏类型的文件,默认值false
  • redirect:是否允许当访问名是一个目录,结尾增加”/”,默认值true
  • index:设置默认的文件名,默认值index.html

例子:新建static.js


var connect = require('connect');
var app = connect()
    .use(connect.logger('dev'))
    .use(connect.static(__dirname+'/static',{maxAge:60*60*1000,hidden:false}))
    .use(function (req, res) {
        res.setHeader('Content-Type', 'text/html');
        res.write('static:');
        res.write('');
        res.write('hidden:');
        res.end('');
    })
    .listen(3000);

隐藏类型的文件,没有被显示出来。

connect-static

19. staticCache

描述: 静态文件缓存中间件,最大可以缓存128个对象,每个对象最大256K,总加起来32mb。

缓存算法采用LRU(最近最少使用)算法,让最活跃的对象保存在缓存中,从而增加命中。

Benchmarks:性能测试

  • static(): 2700 rps
  • node-static: 5300 rps
  • static() + staticCache(): 7500 rps

依赖:static()

参数:options:

  • maxObjects:最多能缓存的对象,默认值128个
  • maxLength:最大缓存的对象,默认值256kb

例子:新建staticCache.js


var connect = require('connect');
var app = connect()
.use(connect.logger('dev'))
.use(connect.static(__dirname+'/static',{maxAge:60*60*1000,hidden:false}))
.use(connect.staticCache())
.use(function (req, res) {
res.setHeader('Content-Type', 'text/html');
res.write('static:');
res.write('<img src="static.png" width="100px"/>');
res.write('hidden:');
res.end('<img src=".png" width="100px"/>');
})
.listen(3000);

控制台日志:


connect.staticCache() is deprecated and will be removed in 3.0
use varnish or similar reverse proxy caches.
GET / 200 11ms
GET /.png 200 0ms

建议用varnish或专门的缓存工具,来代替staticCache()。

20. directory

描述: 目录列表中间件,列出指定目录下的文件

参数:options:

  • hidden:是否显示隐藏文件,默认值false.
  • icons:是否显示网站图标,默认值false.
  • filter:是否过滤文件,默认值false.

例子:新建directory.js

var connect = require('connect');
var app = connect()
    .use(connect.logger('dev'))
    .use(connect.directory(__dirname+'/static',{hidden:true}))
    .use(function (req, res) {
        res.end();
    })
    .listen(3000);

connect-directory

21. vhost

无法模拟出效果,暂时先跳过

描述: 虚拟二级域名映射中间件,设置hostname和server。

例子:新建vhost.js


var connect = require('connect');
var app = connect();
app.use(connect.logger('dev'));
app.use(function (req, res) {
    res.end(JSON.stringify(req.headers.host));
});

var fooApp = connect();
fooApp.use(connect.logger('dev'));
fooApp.use(function (req, res) {
    res.end('hello fooApp\n');
});

var barApp = connect();
barApp.use(connect.logger('dev'));
barApp.use(function (req, res) {
    res.end('hello barApp\n');
});

app.use(connect.vhost('foo.com', fooApp));
app.use(connect.vhost('bar.com', barApp));

app.listen(3000);

22. favicon

描述:网页图标中间件,指定favicon的路径

参数:options:

  • maxAge:缓存存活时间(毫秒),默认值1天

例子:新建favicon.js


var connect = require('connect');
var app = connect()
    .use(connect.favicon('static/favicon.ico'))
    .use(connect.logger('dev'))
    .use(function (req, res) {
        res.end('hello world\n');
    })
    .listen(3000);

网站图标

connect-favicon

23. limit

描述: 请求内容大小限制中间件,可以用mb,kb,gb表示单位。

例子:新建limit.js


var connect = require('connect');
var app = connect()
.use(connect.logger('dev'))
.use(connect.limit('1mb'))
.use(connect.multipart({ uploadDir: 'd:\\tmp' }))
.use(function (req, res) {
if(req.method=='POST'){
console.log(req.files);
res.end('Upload ==>'+ req.files.file.path);
}
res.setHeader('Content-Type', 'text/html');
res.write('<form enctype="multipart/form-data" method="POST"><input type="file" name="file">');
res.write('<input type="submit" value="submit"/>');
res.write('</form>');
res.end('hello world\n');
})
.listen(3000);

控制台日志,上传大于1mb的文件。


GET / 200 8ms
Error: Request Entity Too Large
    at Object.exports.error (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\utils.js:62:13)
    at Object.limit [as handle] (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\middleware\limit.js:46:
47)
    at next (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\proto.js:190:15)
    at Object.logger (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\middleware\logger.js:156:5)
    at next (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\proto.js:190:15)
    at Function.app.handle (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\proto.js:198:3)
    at Server.app (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\connect.js:65:37)
    at Server.EventEmitter.emit (events.js:98:17)
    at HTTPParser.parser.onIncoming (http.js:2027:12)
    at HTTPParser.parserOnHeadersComplete [as onHeadersComplete] (http.js:119:23)
POST / 413 8ms - 961b

24. query

描述:URL解析中间件,自动解析URL查询参数req.query

参数:options,qs.parse函数

例子:新建query.js


var connect = require('connect');
var app = connect()
    .use(connect.query())
    .use(connect.logger('dev'))
    .use(function (req, res) {
        res.end(JSON.stringify(req.query));
    })
    .listen(3000);

通过CURL测试:


curl -d '{name:'aad'}' http://localhost:3000?pass=did
{"pass":"did"}

req.query会自动解析URL的查询参数,不解析POST的数据。

25. errorHadnler

无法模拟出效果,暂时先跳过

描述:错误处理中间件,对于开发过程中的错误,提供栈跟踪和错误响应,接受3种类型text,html,json。

Text: text/plain是默认类型,返回一个简单的栈跟踪和错误消息

JSON:application/json,返回{‘error’:error}对象

HTML: 返回一个HTML错误页面

参数:options:

  • showStack:返回错误信息和错误栈.默认值false
  • showMessage,只返回错误信息,默认值false
  • dumpExceptions, 输出异常日志,默认值false
  • logErrors,输出错误日志到文件,默认值false

例子:新建errorHadnler.js


var connect = require('connect');
var app = connect()
    .use(connect.logger())
    .use(connect.errorHandler({ dumpExceptions: true, showStack: true }))
    .use(function (req, res) {
        req.headers.accept='html';
        res.write(JSON.stringify(req.headers.accept));
        throw new Error('my errorHandler!!!');
        res.end('Hello');
    })
    .listen(3000);

控制台输出


Error: my errorHandler!!!
    at Object.handle (D:\workspace\javascript\nodejs-connect\errorHadnler.js:8:15)
    at next (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\proto.js:190:15)
    at next (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\proto.js:192:9)
    at Object.logger (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\middleware\logger.js:156:5)
    at next (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\proto.js:190:15)
    at Function.app.handle (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\proto.js:198:3)
    at Server.app (D:\workspace\javascript\nodejs-connect\node_modules\connect\lib\connect.js:65:37)
    at Server.EventEmitter.emit (events.js:98:17)
    at HTTPParser.parser.onIncoming (http.js:2027:12)
    at HTTPParser.parserOnHeadersComplete [as onHeadersComplete] (http.js:119:23)

转载请注明出处:
http://blog.fens.me/nodejs-connect/

打赏作者