• Archive by category "JAVA语言实践"
  • (Page 4)

Blog Archives

用Maven构建Mahout项目

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mahout-maven-eclipse/

mahout-maven-logo

前言

基于Hadoop的项目,不管是MapReduce开发,还是Mahout的开发都是在一个复杂的编程环境中开发。Java的环境问题,是困扰着每个程序员的噩梦。Java程序员,不仅要会写Java程序,还要会调linux,会配hadoop,启动hadoop,还要会自己运维。所以,新手想玩起Hadoop真不是件简单的事。

不过,我们可以尽可能的简化环境问题,让程序员只关注于写程序。特别是像算法程序员,把精力投入在算法设计上,要比花时间解决环境问题有价值的多。

目录

  1. Maven介绍和安装
  2. Mahout单机开发环境介绍
  3. 用Maven构建Mahout开发环境
  4. 用Mahout实现协同过滤userCF
  5. 用Mahout实现kmeans
  6. 模板项目上传github

1. Maven介绍和安装

请参考文章:用Maven构建Hadoop项目

开发环境

  • Win7 64bit
  • Java 1.6.0_45
  • Maven 3
  • Eclipse Juno Service Release 2
  • Mahout 0.6

这里要说明一下mahout的运行版本。

  • mahout-0.5, mahout-0.6, mahout-0.7,是基于hadoop-0.20.2x的。
  • mahout-0.8, mahout-0.9,是基于hadoop-1.1.x的。
  • mahout-0.7,有一次重大升级,去掉了多个算法的单机内存运行,并且了部分API不向前兼容。

注:本文关注于“用Maven构建Mahout的开发环境”,文中的 2个例子都是基于单机的内存实现,因此选择0.6版本。Mahout在Hadoop集群中运行会在下一篇文章介绍。

2. Mahout单机开发环境介绍

hadoop-mahout-dev

如上图所示,我们可以选择在win中开发,也可以在linux中开发,开发过程我们可以在本地环境进行调试,标配的工具都是Maven和Eclipse。

3. 用Maven构建Mahout开发环境

  • 1. 用Maven创建一个标准化的Java项目
  • 2. 导入项目到eclipse
  • 3. 增加mahout依赖,修改pom.xml
  • 4. 下载依赖

1). 用Maven创建一个标准化的Java项目


~ D:\workspace\java>mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes 
-DgroupId=org.conan.mymahout -DartifactId=myMahout -DpackageName=org.conan.mymahout -Dversion=1.0-SNAPSHOT -DinteractiveMode=false

进入项目,执行mvn命令


~ D:\workspace\java>cd myMahout
~ D:\workspace\java\myMahout>mvn clean install

2). 导入项目到eclipse

我们创建好了一个基本的maven项目,然后导入到eclipse中。 这里我们最好已安装好了Maven的插件。

mahout-eclipse-folder

3). 增加mahout依赖,修改pom.xml

这里我使用hadoop-0.6版本,同时去掉对junit的依赖,修改文件:pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.conan.mymahout</groupId>
<artifactId>myMahout</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>myMahout</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mahout.version>0.6</mahout.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
<version>${mahout.version}</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-integration</artifactId>
<version>${mahout.version}</version>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
</exclusion>
<exclusion>
<groupId>me.prettyprint</groupId>
<artifactId>hector-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

4). 下载依赖

~ mvn clean install

在eclipse中刷新项目:

mahout-eclipse-package

项目的依赖程序,被自动加载的库路径下面。

4. 用Mahout实现协同过滤userCF

Mahout协同过滤UserCF深度算法剖析,请参考文章:用R解析Mahout用户推荐协同过滤算法(UserCF)

实现步骤:

  • 1. 准备数据文件: item.csv
  • 2. Java程序:UserCF.java
  • 3. 运行程序
  • 4. 推荐结果解读

1). 新建数据文件: item.csv


~ mkdir datafile
~ vi datafile/item.csv

1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0

数据解释:每一行有三列,第一列是用户ID,第二列是物品ID,第三列是用户对物品的打分。

2). Java程序:UserCF.java

Mahout协同过滤的数据流,调用过程。

mahout-recommendation-process

上图摘自:Mahout in Action

新建JAVA类:org.conan.mymahout.recommendation.UserCF.java


package org.conan.mymahout.recommendation;

import java.io.File;
import java.io.IOException;
import java.util.List;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
import org.apache.mahout.cf.taste.impl.neighborhood.NearestNUserNeighborhood;
import org.apache.mahout.cf.taste.impl.recommender.GenericUserBasedRecommender;
import org.apache.mahout.cf.taste.impl.similarity.EuclideanDistanceSimilarity;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.cf.taste.recommender.Recommender;
import org.apache.mahout.cf.taste.similarity.UserSimilarity;

public class UserCF {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws IOException, TasteException {
        String file = "datafile/item.csv";
        DataModel model = new FileDataModel(new File(file));
        UserSimilarity user = new EuclideanDistanceSimilarity(model);
        NearestNUserNeighborhood neighbor = new NearestNUserNeighborhood(NEIGHBORHOOD_NUM, user, model);
        Recommender r = new GenericUserBasedRecommender(model, neighbor, user);
        LongPrimitiveIterator iter = model.getUserIDs();

        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = r.recommend(uid, RECOMMENDER_NUM);
            System.out.printf("uid:%s", uid);
            for (RecommendedItem ritem : list) {
                System.out.printf("(%s,%f)", ritem.getItemID(), ritem.getValue());
            }
            System.out.println();
        }
    }
}

3). 运行程序
控制台输出:


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
uid:1(104,4.274336)(106,4.000000)
uid:2(105,4.055916)
uid:3(103,3.360987)(102,2.773169)
uid:4(102,3.000000)
uid:5

4). 推荐结果解读

  • 向用户ID1,推荐前二个最相关的物品, 104和106
  • 向用户ID2,推荐前二个最相关的物品, 但只有一个105
  • 向用户ID3,推荐前二个最相关的物品, 103和102
  • 向用户ID4,推荐前二个最相关的物品, 但只有一个102
  • 向用户ID5,推荐前二个最相关的物品, 没有符合的

5. 用Mahout实现kmeans

  • 1. 准备数据文件: randomData.csv
  • 2. Java程序:Kmeans.java
  • 3. 运行Java程序
  • 4. mahout结果解读
  • 5. 用R语言实现Kmeans算法
  • 6. 比较Mahout和R的结果

1). 准备数据文件: randomData.csv


~ vi datafile/randomData.csv

-0.883033363823402,-3.31967192630249
-2.39312626419456,3.34726861118871
2.66976353341256,1.85144276077058
-1.09922906899594,-6.06261735207489
-4.36361936997216,1.90509905380532
-0.00351835125495037,-0.610105996559153
-2.9962958796338,-3.60959839525735
-3.27529418132066,0.0230099799641799
2.17665594420569,6.77290756817957
-2.47862038335637,2.53431833167278
5.53654901906814,2.65089785582474
5.66257474538338,6.86783609641077
-0.558946883114376,1.22332819416237
5.11728525486132,3.74663871584768
1.91240516693351,2.95874731384062
-2.49747101306535,2.05006504756875
3.98781883213459,1.00780938946366

这里只截取了一部分,更多的数据请查看源代码。

注:我是通过R语言生成的randomData.csv


x1<-cbind(x=rnorm(400,1,3),y=rnorm(400,1,3))
x2<-cbind(x=rnorm(300,1,0.5),y=rnorm(300,0,0.5))
x3<-cbind(x=rnorm(300,0,0.1),y=rnorm(300,2,0.2))
x<-rbind(x1,x2,x3)
write.table(x,file="randomData.csv",sep=",",row.names=FALSE,col.names=FALSE)

2). Java程序:Kmeans.java

Mahout中kmeans方法的算法实现过程。

mahout-kmeans-process

上图摘自:Mahout in Action

新建JAVA类:org.conan.mymahout.cluster06.Kmeans.java


package org.conan.mymahout.cluster06;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.mahout.clustering.kmeans.Cluster;
import org.apache.mahout.clustering.kmeans.KMeansClusterer;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.Vector;

public class Kmeans {

    public static void main(String[] args) throws IOException {
        List sampleData = MathUtil.readFileToVector("datafile/randomData.csv");

        int k = 3;
        double threshold = 0.01;

        List randomPoints = MathUtil.chooseRandomPoints(sampleData, k);
        for (Vector vector : randomPoints) {
            System.out.println("Init Point center: " + vector);
        }

        List clusters = new ArrayList();
        for (int i = 0; i < k; i++) {
            clusters.add(new Cluster(randomPoints.get(i), i, new EuclideanDistanceMeasure()));
        }

        List<List> finalClusters = KMeansClusterer.clusterPoints(sampleData, clusters, new EuclideanDistanceMeasure(), k, threshold);
        for (Cluster cluster : finalClusters.get(finalClusters.size() - 1)) {
            System.out.println("Cluster id: " + cluster.getId() + " center: " + cluster.getCenter().asFormatString());
        }
    }

}

3). 运行Java程序
控制台输出:


Init Point center: {0:-0.162693685149196,1:2.19951550286862}
Init Point center: {0:-0.0409782183083317,1:2.09376666042057}
Init Point center: {0:0.158401778474687,1:2.37208412905273}
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Cluster id: 0 center: {0:-2.686856800552941,1:1.8939462954763795}
Cluster id: 1 center: {0:0.6334255423230666,1:0.49472852972602105}
Cluster id: 2 center: {0:3.334520309711998,1:3.2758355898247653}

4). mahout结果解读

  • 1. Init Point center表示,kmeans算法初始时的设置的3个中心点
  • 2. Cluster center表示,聚类后找到3个中心点

5). 用R语言实现Kmeans算法
接下来为了让结果更直观,我们再用R语言,进行kmeans实验,操作相同的数据。

R语言代码:


> y<-read.csv(file="randomData.csv",sep=",",header=FALSE) 
> cl<-kmeans(y,3,iter.max = 10, nstart = 25) 
> cl$centers
          V1         V2
1 -0.4323971  2.2852949
2  0.9023786 -0.7011153
3  4.3725463  2.4622609

# 生成聚类中心的图形
> plot(y, col=c("black","blue","green")[cl$cluster])
> points(cl$centers, col="red", pch = 19)

# 画出Mahout聚类的中心
> mahout<-matrix(c(-2.686856800552941,1.8939462954763795,0.6334255423230666,0.49472852972602105,3.334520309711998,3.2758355898247653),ncol=2,byrow=TRUE) 
> points(mahout, col="violetred", pch = 19)

聚类的效果图:
kmeans-center

6). 比较Mahout和R的结果
从上图中,我们看到有 黑,蓝,绿,三种颜色的空心点,这些点就是原始的数据。

3个红色实点,是R语言kmeans后生成的3个中心。
3个紫色实点,是Mahout的kmeans后生成的3个中心。

R语言和Mahout生成的点,并不是重合的,原因有几点:

  • 1. 距离算法不一样:
    Mahout中,我们用的 “欧氏距离(EuclideanDistanceMeasure)”
    R语言中,默认是”Hartigan and Wong”
  • 2. 初始化的中心是不一样的。
  • 3. 最大迭代次数是不一样的。
  • 4. 点合并时,判断的”阈值(threshold)”是不一样的。

6. 模板项目上传github

https://github.com/bsspirit/maven_mahout_template/tree/mahout-0.6

大家可以下载这个项目,做为开发的起点。

 
~ git clone https://github.com/bsspirit/maven_mahout_template
~ git checkout mahout-0.6

我们完成了第一步,下面就将正式进入mahout算法的开发实践,并且应用到hadoop集群的环境中。

下一篇:Mahout分步式程序开发 基于物品的协同过滤ItemCF

转载请注明出处:
http://blog.fens.me/hadoop-mahout-maven-eclipse/

打赏作者

海量Web日志分析 用Hadoop提取KPI统计指标

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mapreduce-log-kpi/

hadoop-kpi-logo

前言

Web日志包含着网站最重要的信息,通过日志分析,我们可以知道网站的访问量,哪个网页访问人数最多,哪个网页最有价值等。一般中型的网站(10W的PV以上),每天会产生1G以上Web日志文件。大型或超大型的网站,可能每小时就会产生10G的数据量。

对于日志的这种规模的数据,用Hadoop进行日志分析,是最适合不过的了。

目录

  1. Web日志分析概述
  2. 需求分析:KPI指标设计
  3. 算法模型:Hadoop并行算法
  4. 架构设计:日志KPI系统架构
  5. 程序开发1:用Maven构建Hadoop项目
  6. 程序开发2:MapReduce程序实现

1. Web日志分析概述

Web日志由Web服务器产生,可能是Nginx, Apache, Tomcat等。从Web日志中,我们可以获取网站每类页面的PV值(PageView,页面访问量)、独立IP数;稍微复杂一些的,可以计算得出用户所检索的关键词排行榜、用户停留时间最高的页面等;更复杂的,构建广告点击模型、分析用户行为特征等等。

在Web日志中,每条日志通常代表着用户的一次访问行为,例如下面就是一条nginx日志:


222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939
 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1)
 AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"

拆解为以下8个变量

  • remote_addr: 记录客户端的ip地址, 222.68.172.190
  • remote_user: 记录客户端用户名称, –
  • time_local: 记录访问时间与时区, [18/Sep/2013:06:49:57 +0000]
  • request: 记录请求的url与http协议, “GET /images/my.jpg HTTP/1.1”
  • status: 记录请求状态,成功是200, 200
  • body_bytes_sent: 记录发送给客户端文件主体内容大小, 19939
  • http_referer: 用来记录从那个页面链接访问过来的, “http://www.angularjs.cn/A00n”
  • http_user_agent: 记录客户浏览器的相关信息, “Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36”

注:要更多的信息,则要用其它手段去获取,通过js代码单独发送请求,使用cookies记录用户的访问信息。

利用这些日志信息,我们可以深入挖掘网站的秘密了。

少量数据的情况

少量数据的情况(10Mb,100Mb,10G),在单机处理尚能忍受的时候,我可以直接利用各种Unix/Linux工具,awk、grep、sort、join等都是日志分析的利器,再配合perl, python,正则表达工,基本就可以解决所有的问题。

例如,我们想从上面提到的nginx日志中得到访问量最高前10个IP,实现很简单:


~ cat access.log.10 | awk '{a[$1]++} END {for(b in a) print b"\t"a[b]}' | sort -k2 -r | head -n 10
163.177.71.12   972
101.226.68.137  972
183.195.232.138 971
50.116.27.194   97
14.17.29.86     96
61.135.216.104  94
61.135.216.105  91
61.186.190.41   9
59.39.192.108   9
220.181.51.212  9

海量数据的情况

当数据量每天以10G、100G增长的时候,单机处理能力已经不能满足需求。我们就需要增加系统的复杂性,用计算机集群,存储阵列来解决。在Hadoop出现之前,海量数据存储,和海量日志分析都是非常困难的。只有少数一些公司,掌握着高效的并行计算,分步式计算,分步式存储的核心技术。

Hadoop的出现,大幅度的降低了海量数据处理的门槛,让小公司甚至是个人都能力,搞定海量数据。并且,Hadoop非常适用于日志分析系统。

2.需求分析:KPI指标设计

下面我们将从一个公司案例出发来全面的解释,如何用进行海量Web日志分析,提取KPI数据

案例介绍
某电子商务网站,在线团购业务。每日PV数100w,独立IP数5w。用户通常在工作日上午10:00-12:00和下午15:00-18:00访问量最大。日间主要是通过PC端浏览器访问,休息日及夜间通过移动设备访问较多。网站搜索浏量占整个网站的80%,PC用户不足1%的用户会消费,移动用户有5%会消费。

通过简短的描述,我们可以粗略地看出,这家电商网站的经营状况,并认识到愿意消费的用户从哪里来,有哪些潜在的用户可以挖掘,网站是否存在倒闭风险等。

KPI指标设计

  • PV(PageView): 页面访问量统计
  • IP: 页面独立IP的访问量统计
  • Time: 用户每小时PV的统计
  • Source: 用户来源域名的统计
  • Browser: 用户的访问设备统计

注:商业保密限制,无法提供电商网站的日志。
下面的内容,将以我的个人网站为例提取数据进行分析。

百度统计,对我个人网站做的统计!http://www.fens.me

基本统计指标:
hadoop-kpi-baidu

用户的访问设备统计指标:
hadoop-kpi-baidu2

从商业的角度,个人网站的特征与电商网站不太一样,没有转化率,同时跳出率也比较高。从技术的角度,同样都关注KPI指标设计。

3.算法模型:Hadoop并行算法

hadoop-kpi-log

并行算法的设计:
注:找到第一节有定义的8个变量

PV(PageView): 页面访问量统计

  • Map过程{key:$request,value:1}
  • Reduce过程{key:$request,value:求和(sum)}

IP: 页面独立IP的访问量统计

  • Map: {key:$request,value:$remote_addr}
  • Reduce: {key:$request,value:去重再求和(sum(unique))}

Time: 用户每小时PV的统计

  • Map: {key:$time_local,value:1}
  • Reduce: {key:$time_local,value:求和(sum)}

Source: 用户来源域名的统计

  • Map: {key:$http_referer,value:1}
  • Reduce: {key:$http_referer,value:求和(sum)}

Browser: 用户的访问设备统计

  • Map: {key:$http_user_agent,value:1}
  • Reduce: {key:$http_user_agent,value:求和(sum)}

4.架构设计:日志KPI系统架构

hadoop-kpi-architect

上图中,左边是Application业务系统,右边是Hadoop的HDFS, MapReduce。

  1. 日志是由业务系统产生的,我们可以设置web服务器每天产生一个新的目录,目录下面会产生多个日志文件,每个日志文件64M。
  2. 设置系统定时器CRON,夜间在0点后,向HDFS导入昨天的日志文件。
  3. 完成导入后,设置系统定时器,启动MapReduce程序,提取并计算统计指标。
  4. 完成计算后,设置系统定时器,从HDFS导出统计指标数据到数据库,方便以后的即使查询。

hadoop-kpi-process

上面这幅图,我们可以看得更清楚,数据是如何流动的。蓝色背景的部分是在Hadoop中的,接下来我们的任务就是完成MapReduce的程序实现。

5.程序开发1:用Maven构建Hadoop项目

请参考文章:用Maven构建Hadoop项目

win7的开发环境 和 Hadoop的运行环境 ,在上面文章中已经介绍过了。

我们需要放日志文件,上传的HDFS里/user/hdfs/log_kpi/目录,参考下面的命令操作


~ hadoop fs -mkdir /user/hdfs/log_kpi
~ hadoop fs -copyFromLocal /home/conan/datafiles/access.log.10 /user/hdfs/log_kpi/

我已经把整个MapReduce的实现都放到了github上面:

https://github.com/bsspirit/maven_hadoop_template/releases/tag/kpi_v1

6.程序开发2:MapReduce程序实现

开发流程:

  1. 对日志行的解析
  2. Map函数实现
  3. Reduce函数实现
  4. 启动程序实现

1). 对日志行的解析
新建文件:org.conan.myhadoop.mr.kpi.KPI.java


package org.conan.myhadoop.mr.kpi;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

/*
 * KPI Object
 */
public class KPI {
    private String remote_addr;// 记录客户端的ip地址
    private String remote_user;// 记录客户端用户名称,忽略属性"-"
    private String time_local;// 记录访问时间与时区
    private String request;// 记录请求的url与http协议
    private String status;// 记录请求状态;成功是200
    private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
    private String http_referer;// 用来记录从那个页面链接访问过来的
    private String http_user_agent;// 记录客户浏览器的相关信息

    private boolean valid = true;// 判断数据是否合法
    
    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("valid:" + this.valid);
        sb.append("\nremote_addr:" + this.remote_addr);
        sb.append("\nremote_user:" + this.remote_user);
        sb.append("\ntime_local:" + this.time_local);
        sb.append("\nrequest:" + this.request);
        sb.append("\nstatus:" + this.status);
        sb.append("\nbody_bytes_sent:" + this.body_bytes_sent);
        sb.append("\nhttp_referer:" + this.http_referer);
        sb.append("\nhttp_user_agent:" + this.http_user_agent);
        return sb.toString();
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getRemote_user() {
        return remote_user;
    }

    public void setRemote_user(String remote_user) {
        this.remote_user = remote_user;
    }

    public String getTime_local() {
        return time_local;
    }

    public Date getTime_local_Date() throws ParseException {
        SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
        return df.parse(this.time_local);
    }
    
    public String getTime_local_Date_hour() throws ParseException{
        SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
        return df.format(this.getTime_local_Date());
    }

    public void setTime_local(String time_local) {
        this.time_local = time_local;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getBody_bytes_sent() {
        return body_bytes_sent;
    }

    public void setBody_bytes_sent(String body_bytes_sent) {
        this.body_bytes_sent = body_bytes_sent;
    }

    public String getHttp_referer() {
        return http_referer;
    }
    
    public String getHttp_referer_domain(){
        if(http_referer.length()<8){ 
            return http_referer;
        }
        
        String str=this.http_referer.replace("\"", "").replace("http://", "").replace("https://", "");
        return str.indexOf("/")>0?str.substring(0, str.indexOf("/")):str;
    }

    public void setHttp_referer(String http_referer) {
        this.http_referer = http_referer;
    }

    public String getHttp_user_agent() {
        return http_user_agent;
    }

    public void setHttp_user_agent(String http_user_agent) {
        this.http_user_agent = http_user_agent;
    }

    public boolean isValid() {
        return valid;
    }

    public void setValid(boolean valid) {
        this.valid = valid;
    }

    public static void main(String args[]) {
        String line = "222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] \"GET /images/my.jpg HTTP/1.1\" 200 19939 \"http://www.angularjs.cn/A00n\" \"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36\"";
        System.out.println(line);
        KPI kpi = new KPI();
        String[] arr = line.split(" ");

        kpi.setRemote_addr(arr[0]);
        kpi.setRemote_user(arr[1]);
        kpi.setTime_local(arr[3].substring(1));
        kpi.setRequest(arr[6]);
        kpi.setStatus(arr[8]);
        kpi.setBody_bytes_sent(arr[9]);
        kpi.setHttp_referer(arr[10]);
        kpi.setHttp_user_agent(arr[11] + " " + arr[12]);
        System.out.println(kpi);

        try {
            SimpleDateFormat df = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss", Locale.US);
            System.out.println(df.format(kpi.getTime_local_Date()));
            System.out.println(kpi.getTime_local_Date_hour());
            System.out.println(kpi.getHttp_referer_domain());
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }

}

从日志文件中,取一行通过main函数写一个简单的解析测试。

控制台输出:


222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
valid:true
remote_addr:222.68.172.190
remote_user:-
time_local:18/Sep/2013:06:49:57
request:/images/my.jpg
status:200
body_bytes_sent:19939
http_referer:"http://www.angularjs.cn/A00n"
http_user_agent:"Mozilla/5.0 (Windows
2013.09.18:06:49:57
2013091806
www.angularjs.cn

我们看到日志行,被正确的解析成了kpi对象的属性。我们把解析过程,单独封装成一个方法。


    private static KPI parser(String line) {
        System.out.println(line);
        KPI kpi = new KPI();
        String[] arr = line.split(" ");
        if (arr.length > 11) {
            kpi.setRemote_addr(arr[0]);
            kpi.setRemote_user(arr[1]);
            kpi.setTime_local(arr[3].substring(1));
            kpi.setRequest(arr[6]);
            kpi.setStatus(arr[8]);
            kpi.setBody_bytes_sent(arr[9]);
            kpi.setHttp_referer(arr[10]);
            
            if (arr.length > 12) {
                kpi.setHttp_user_agent(arr[11] + " " + arr[12]);
            } else {
                kpi.setHttp_user_agent(arr[11]);
            }

            if (Integer.parseInt(kpi.getStatus()) >= 400) {// 大于400,HTTP错误
                kpi.setValid(false);
            }
        } else {
            kpi.setValid(false);
        }
        return kpi;
    }

对map方法,reduce方法,启动方法,我们单独写一个类来实现

下面将分别介绍MapReduce的实现类:

  • PV:org.conan.myhadoop.mr.kpi.KPIPV.java
  • IP: org.conan.myhadoop.mr.kpi.KPIIP.java
  • Time: org.conan.myhadoop.mr.kpi.KPITime.java
  • Browser: org.conan.myhadoop.mr.kpi.KPIBrowser.java

1). PV:org.conan.myhadoop.mr.kpi.KPIPV.java


package org.conan.myhadoop.mr.kpi;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class KPIPV { 

    public static class KPIPVMapper extends MapReduceBase implements Mapper {
        private IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, OutputCollector output, Reporter reporter) throws IOException {
            KPI kpi = KPI.filterPVs(value.toString());
            if (kpi.isValid()) {
                word.set(kpi.getRequest());
                output.collect(word, one);
            }
        }
    }

    public static class KPIPVReducer extends MapReduceBase implements Reducer {
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            result.set(sum);
            output.collect(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        String input = "hdfs://192.168.1.210:9000/user/hdfs/log_kpi/";
        String output = "hdfs://192.168.1.210:9000/user/hdfs/log_kpi/pv";

        JobConf conf = new JobConf(KPIPV.class);
        conf.setJobName("KPIPV");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");

        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(IntWritable.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(KPIPVMapper.class);
        conf.setCombinerClass(KPIPVReducer.class);
        conf.setReducerClass(KPIPVReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        JobClient.runJob(conf);
        System.exit(0);
    }
}

在程序中会调用KPI类的方法

KPI kpi = KPI.filterPVs(value.toString());

通过filterPVs方法,我们可以实现对PV,更多的控制。

在KPK.java中,增加filterPVs方法


    /**
     * 按page的pv分类
     */
    public static KPI filterPVs(String line) {
        KPI kpi = parser(line);
        Set pages = new HashSet();
        pages.add("/about");
        pages.add("/black-ip-list/");
        pages.add("/cassandra-clustor/");
        pages.add("/finance-rhive-repurchase/");
        pages.add("/hadoop-family-roadmap/");
        pages.add("/hadoop-hive-intro/");
        pages.add("/hadoop-zookeeper-intro/");
        pages.add("/hadoop-mahout-roadmap/");

        if (!pages.contains(kpi.getRequest())) {
            kpi.setValid(false);
        }
        return kpi;
    }

在filterPVs方法,我们定义了一个pages的过滤,就是只对这个页面进行PV统计。

我们运行一下KPIPV.java


2013-10-9 11:53:28 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-9 11:53:28 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-9 11:53:28 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/log_kpi/access.log.10:0+3025757
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/log_kpi/access.log.10:0+3025757
2013-10-9 11:53:30 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2013-10-9 11:53:30 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-9 11:53:30 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-9 11:53:30 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 213 bytes
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-9 11:53:30 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2013-10-9 11:53:30 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-9 11:53:30 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2013-10-9 11:53:30 org.apache.hadoop.mapred.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/log_kpi/pv
2013-10-9 11:53:31 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2013-10-9 11:53:33 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-9 11:53:33 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2013-10-9 11:53:34 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-9 11:53:34 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=3025757
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=183
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=545
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=6051514
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=83472
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=183
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=217
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map input records=14619
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=16
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=2004
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=376569856
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map input bytes=3025757
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=110
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=76
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=8
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=8
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=8
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=8
2013-10-9 11:53:34 org.apache.hadoop.mapred.Counters log
信息:     Map output records=76

用hadoop命令查看HDFS文件


~ hadoop fs -cat /user/hdfs/log_kpi/pv/part-00000

/about  5
/black-ip-list/ 2
/cassandra-clustor/     3
/finance-rhive-repurchase/      13
/hadoop-family-roadmap/ 13
/hadoop-hive-intro/     14
/hadoop-mahout-roadmap/ 20
/hadoop-zookeeper-intro/        6

这样我们就得到了,刚刚日志文件中的,指定页面的PV值。

指定页面,就像网站的站点地图一样,如果没有指定所有访问链接都会被找出来,通过“站点地图”的指定,我们可以更容易地找到,我们所需要的信息。

后面,其他的统计指标的提取思路,和PV的实现过程都是类似的,大家可以直接下载源代码,运行看到结果!!

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-mapreduce-log-kpi/

打赏作者

用Maven构建Hadoop项目

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-maven-eclipse/

hadoop-maven

前言

Hadoop的MapReduce环境是一个复杂的编程环境,所以我们要尽可能地简化构建MapReduce项目的过程。Maven是一个很不错的自动化项目构建工具,通过Maven来帮助我们从复杂的环境配置中解脱出来,从而标准化开发过程。所以,写MapReduce之前,让我们先花点时间把刀磨快!!当然,除了Maven还有其他的选择Gradle(推荐), Ivy….

后面将会有介绍几篇MapReduce开发的文章,都要依赖于本文中Maven的构建的MapReduce环境。

目录

  1. Maven介绍
  2. Maven安装(win)
  3. Hadoop开发环境介绍
  4. 用Maven构建Hadoop环境
  5. MapReduce程序开发
  6. 模板项目上传github

1. Maven介绍

Apache Maven,是一个Java的项目管理及自动构建工具,由Apache软件基金会所提供。基于项目对象模型(缩写:POM)概念,Maven利用一个中央信息片断能管理一个项目的构建、报告和文档等步骤。曾是Jakarta项目的子项目,现为独立Apache项目。

maven的开发者在他们开发网站上指出,maven的目标是要使得项目的构建更加容易,它把编译、打包、测试、发布等开发过程中的不同环节有机的串联了起来,并产生一致的、高质量的项目信息,使得项目成员能够及时地得到反馈。maven有效地支持了测试优先、持续集成,体现了鼓励沟通,及时反馈的软件开发理念。如果说Ant的复用是建立在”拷贝–粘贴”的基础上的,那么Maven通过插件的机制实现了项目构建逻辑的真正复用。

2. Maven安装(win)

下载Maven:http://maven.apache.org/download.cgi

下载最新的xxx-bin.zip文件,在win上解压到 D:\toolkit\maven3

并把maven/bin目录设置在环境变量PATH:

win7-maven

然后,打开命令行输入mvn,我们会看到mvn命令的运行效果


~ C:\Users\Administrator>mvn
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 0.086s
[INFO] Finished at: Mon Sep 30 18:26:58 CST 2013
[INFO] Final Memory: 2M/179M
[INFO] ------------------------------------------------------------------------
[ERROR] No goals have been specified for this build. You must specify a valid lifecycle phase or a goal in the format : or :[:]:. Available lifecycle phases are: validate, initialize, generate-sources, process-sources, generate-resources, process-resources, compile, process-class
es, generate-test-sources, process-test-sources, generate-test-resources, process-test-resources, test-compile, process-test-classes, test, prepare-package, package, pre-integration-test, integration-test, post-integration-test, verify, install, deploy, pre-clean, clean, post-clean, pre-site, site, post-site, site-deploy. -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/NoGoalSpecifiedException

安装Eclipse的Maven插件:Maven Integration for Eclipse

Maven的Eclipse插件配置

eclipse-maven

3. Hadoop开发环境介绍

hadoop-dev

如上图所示,我们可以选择在win中开发,也可以在linux中开发,本地启动Hadoop或者远程调用Hadoop,标配的工具都是Maven和Eclipse。

Hadoop集群系统环境:

  • Linux: Ubuntu 12.04.2 LTS 64bit Server
  • Java: 1.6.0_29
  • Hadoop: hadoop-1.0.3,单节点,IP:192.168.1.210

4. 用Maven构建Hadoop环境

  • 1. 用Maven创建一个标准化的Java项目
  • 2. 导入项目到eclipse
  • 3. 增加hadoop依赖,修改pom.xml
  • 4. 下载依赖
  • 5. 从Hadoop集群环境下载hadoop配置文件
  • 6. 配置本地host

1). 用Maven创建一个标准化的Java项目


~ D:\workspace\java>mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DgroupId=org.conan.myhadoop.mr
-DartifactId=myHadoop -DpackageName=org.conan.myhadoop.mr -Dversion=1.0-SNAPSHOT -DinteractiveMode=false
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom <<<
[INFO]
[INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Batch mode
[INFO] No archetype defined. Using maven-archetype-quickstart (org.apache.maven.archetypes:maven-archetype-quickstart:1.
0)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archet
ype-quickstart-1.0.jar
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archety
pe-quickstart-1.0.jar (5 KB at 4.3 KB/sec)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archet
ype-quickstart-1.0.pom
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archety
pe-quickstart-1.0.pom (703 B at 1.6 KB/sec)
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Old (1.x) Archetype: maven-archetype-quickstart:1.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.conan.myhadoop.mr
[INFO] Parameter: packageName, Value: org.conan.myhadoop.mr
[INFO] Parameter: package, Value: org.conan.myhadoop.mr
[INFO] Parameter: artifactId, Value: myHadoop
[INFO] Parameter: basedir, Value: D:\workspace\java
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] project created from Old (1.x) Archetype in dir: D:\workspace\java\myHadoop
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 8.896s
[INFO] Finished at: Sun Sep 29 20:57:07 CST 2013
[INFO] Final Memory: 9M/179M
[INFO] ------------------------------------------------------------------------

进入项目,执行mvn命令


~ D:\workspace\java>cd myHadoop
~ D:\workspace\java\myHadoop>mvn clean install
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ myHadoop ---
[INFO] Building jar: D:\workspace\java\myHadoop\target\myHadoop-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-install-plugin:2.3.1:install (default-install) @ myHadoop ---
[INFO] Installing D:\workspace\java\myHadoop\target\myHadoop-1.0-SNAPSHOT.jar to C:\Users\Administrator\.m2\repository\o
rg\conan\myhadoop\mr\myHadoop\1.0-SNAPSHOT\myHadoop-1.0-SNAPSHOT.jar
[INFO] Installing D:\workspace\java\myHadoop\pom.xml to C:\Users\Administrator\.m2\repository\org\conan\myhadoop\mr\myHa
doop\1.0-SNAPSHOT\myHadoop-1.0-SNAPSHOT.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.348s
[INFO] Finished at: Sun Sep 29 20:58:43 CST 2013
[INFO] Final Memory: 11M/179M
[INFO] ------------------------------------------------------------------------

2). 导入项目到eclipse

我们创建好了一个基本的maven项目,然后导入到eclipse中。 这里我们最好已安装好了Maven的插件。

hadoop-eclipse

3). 增加hadoop依赖

这里我使用hadoop-1.0.3版本,修改文件:pom.xml


~ vi pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.conan.myhadoop.mr</groupId>
<artifactId>myHadoop</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>myHadoop</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.3</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

4). 下载依赖

下载依赖:

~ mvn clean install

在eclipse中刷新项目:

hadoop-eclipse-maven

项目的依赖程序,被自动加载的库路径下面。

5). 从Hadoop集群环境下载hadoop配置文件

    • core-site.xml
    • hdfs-site.xml
    • mapred-site.xml

查看core-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://master:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/conan/hadoop/tmp</value>
</property>
<property>
<name>io.sort.mb</name>
<value>256</value>
</property>
</configuration>

查看hdfs-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>dfs.data.dir</name>
<value>/home/conan/hadoop/data</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>

查看mapred-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>hdfs://master:9001</value>
</property>
</configuration>

保存在src/main/resources/hadoop目录下面

hadoop-config

删除原自动生成的文件:App.java和AppTest.java

6).配置本地host,增加master的域名指向


~ vi c:/Windows/System32/drivers/etc/hosts

192.168.1.210 master

6. MapReduce程序开发

编写一个简单的MapReduce程序,实现wordcount功能。

新一个Java文件:WordCount.java


package org.conan.myhadoop.mr;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {

    public static class WordCountMapper extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                output.collect(word, one);
            }

        }
    }

    public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            result.set(sum);
            output.collect(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        String input = "hdfs://192.168.1.210:9000/user/hdfs/o_t_account";
        String output = "hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result";

        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("WordCount");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(WordCountMapper.class);
        conf.setCombinerClass(WordCountReducer.class);
        conf.setReducerClass(WordCountReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        JobClient.runJob(conf);
        System.exit(0);
    }

}

启动Java APP.

控制台错误


2013-9-30 19:25:02 org.apache.hadoop.util.NativeCodeLoader 
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-9-30 19:25:02 org.apache.hadoop.security.UserGroupInformation doAs
严重: PriviledgedActionException as:Administrator cause:java.io.IOException: Failed to set permissions of path: \tmp\hadoop-Administrator\mapred\staging\Administrator1702422322\.staging to 0700
Exception in thread "main" java.io.IOException: Failed to set permissions of path: \tmp\hadoop-Administrator\mapred\staging\Administrator1702422322\.staging to 0700
	at org.apache.hadoop.fs.FileUtil.checkReturnValue(FileUtil.java:689)
	at org.apache.hadoop.fs.FileUtil.setPermission(FileUtil.java:662)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:509)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:344)
	at org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:189)
	at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:116)
	at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:856)
	at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
	at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
	at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:824)
	at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261)
	at org.conan.myhadoop.mr.WordCount.main(WordCount.java:78)

这个错误是win中开发特有的错误,文件权限问题,在Linux下可以正常运行。

解决方法是,修改/hadoop-1.0.3/src/core/org/apache/hadoop/fs/FileUtil.java文件

688-692行注释,然后重新编译源代码,重新打一个hadoop.jar的包。


685 private static void checkReturnValue(boolean rv, File p,
686                                        FsPermission permission
687                                        ) throws IOException {
688     /*if (!rv) {
689       throw new IOException("Failed to set permissions of path: " + p +
690                             " to " +
691                             String.format("%04o", permission.toShort()));
692     }*/
693   }

我这里自己打了一个hadoop-core-1.0.3.jar包,放到了lib下面。

我们还要替换maven中的hadoop类库。


~ cp lib/hadoop-core-1.0.3.jar C:\Users\Administrator\.m2\repository\org\apache\hadoop\hadoop-core\1.0.3\hadoop-core-1.0.3.jar

再次启动Java APP,控制台输出:


2013-9-30 19:50:49 org.apache.hadoop.util.NativeCodeLoader 
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-9-30 19:50:49 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-9-30 19:50:49 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
2013-9-30 19:50:49 org.apache.hadoop.io.compress.snappy.LoadSnappy 
警告: Snappy native library not loaded
2013-9-30 19:50:49 org.apache.hadoop.mapred.FileInputFormat listStatus
信息: Total input paths to process : 4
2013-9-30 19:50:50 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
2013-9-30 19:50:50 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:50 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2013-9-30 19:50:51 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 0% reduce 0%
2013-9-30 19:50:53 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00003:0+119
2013-9-30 19:50:53 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2013-9-30 19:50:53 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:53 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
2013-9-30 19:50:54 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2013-9-30 19:50:56 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00000:0+113
2013-9-30 19:50:56 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000001_0' done.
2013-9-30 19:50:56 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:56 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000002_0 is done. And is in the process of commiting
2013-9-30 19:50:59 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00001:0+110
2013-9-30 19:50:59 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00001:0+110
2013-9-30 19:50:59 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000002_0' done.
2013-9-30 19:50:59 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:59 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000003_0 is done. And is in the process of commiting
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00002:0+79
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000003_0' done.
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-9-30 19:51:02 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 4 sorted segments
2013-9-30 19:51:02 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 4 segments left of total size: 442 bytes
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2013-9-30 19:51:02 org.apache.hadoop.mapred.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result
2013-9-30 19:51:05 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-9-30 19:51:05 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2013-9-30 19:51:06 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-9-30 19:51:06 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=421
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=348
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=7377
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=1535
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=209510
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=348
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=458
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map input records=11
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=30
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=509
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1838546944
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map input bytes=421
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=452
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=22
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=15
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=13
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=15
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=13
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map output records=22

成功运行了wordcount程序,通过命令我们查看输出结果


~ hadoop fs -ls hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result

Found 2 items
-rw-r--r--   3 Administrator supergroup          0 2013-09-30 19:51 /user/hdfs/o_t_account/result/_SUCCESS
-rw-r--r--   3 Administrator supergroup        348 2013-09-30 19:51 /user/hdfs/o_t_account/result/part-00000

~ hadoop fs -cat hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result/part-00000

1,abc@163.com,2013-04-22        1
10,ade121@sohu.com,2013-04-23   1
11,addde@sohu.com,2013-04-23    1
17:21:24.0      5
2,dedac@163.com,2013-04-22      1
20:21:39.0      6
3,qq8fed@163.com,2013-04-22     1
4,qw1@163.com,2013-04-22        1
5,af3d@163.com,2013-04-22       1
6,ab34@163.com,2013-04-22       1
7,q8d1@gmail.com,2013-04-23     1
8,conan@gmail.com,2013-04-23    1
9,adeg@sohu.com,2013-04-23      1

这样,我们就实现了在win7中的开发,通过Maven构建Hadoop依赖环境,在Eclipse中开发MapReduce的程序,然后运行JavaAPP。Hadoop应用会自动把我们的MR程序打成jar包,再上传的远程的hadoop环境中运行,返回日志在Eclipse控制台输出。

7. 模板项目上传github

https://github.com/bsspirit/maven_hadoop_template

大家可以下载这个项目,做为开发的起点。

~ git clone https://github.com/bsspirit/maven_hadoop_template.git

我们完成第一步,下面就将正式进入MapReduce开发实践。

 

转载请注明出处:
http://blog.fens.me/hadoop-maven-eclipse/

打赏作者

开发kettle插件 环境搭建

无所不能的Java系列文章,涵盖了Java的思想,应用开发,设计模式,程序架构等,通过我的经验去诠释Java的强大。

说起Java,真的有点不知道从何说起。Java是一门全领域发展的语言,从基础的来讲有4大块,Java语法,JDK,JVM,第三方类库。官方又以面向不同应用的角度,又把JDK分为JavaME,JavaSE,JavaEE三个部分。Java可以做客户端界面,可以做中间件,可以做手机系统,可以做应用,可以做工具,可以做游戏,可以做算法…,Java几乎无所不能。

在Java的世界里,Java就是一切。

关于作者

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/java-kettle-plugin-eclipse

kettle-plugin

前言

Kettle一个开源的ETL工具,提供了一套界面操作的解决方案,从而代替原有的程序开发。但有时我们还需要开发自己的插件,来满足我们的业务需求。Kettle基于Eclipse的架构系统,通过JAVA作为客户端的实现。强大的ETL功能,和图形界面的操作,让Kettle成为免费的ETL工具的首选。

目录

  1. Kettle插件开发介绍
  2. 搭建kettle源代码环境
  3. 在Eclipse中构建kettle项目
  4. 在Eclipse中构建插件项目
  5. 配置插件到Kettle中
  6. Kettle项目启动
  7. 在kettle项目集成插件源代码

1. Kettle插件开发介绍

在我们做ETL工作的时候,在某些项目中往往会遇到一些特别的流程任务,kettle原有的流程处理节点已经不能满足我们的要求,这时候我们就需要定制流程处理节点了。定制流程节点主要是针对数据的管理、数据的验证和某些特别文件数据的提取。大家通过查看kettle源代码,就可以知道怎样去创建你自己的kettle插件了。

Kettle的插件开发,需要依赖于Kettle的源代码环境。

2. 搭建kettle源代码环境

1). 我的系统环境

  • Win7: 64bit desktop
  • Java: 64bit 1.6.0_45

kettle源在svn上面,我们需要下载SVN工具,然后才能下载源代码。

2). 下载svn工具:Subversion 1.8.3 (Windows 64-bit), 注册后下载

http://www.collab.net/downloads/subversion

3). 安装Subversion

4). 下载kettle源代码

~ D:\workspace\java>svn co http://source.pentaho.org/svnkettleroot/Kettle/tags/4.4.0-stable/ kettle

A    kettle\.directory
A    kettle\.project
A    kettle\cobertura
A    kettle\cobertura\cobertura.jar
A    kettle\cobertura\lib
A    kettle\cobertura\lib\log4j-1.2.9.jar
A    kettle\cobertura\lib\LICENSE
A    kettle\cobertura\lib\javancss.jar
A    kettle\cobertura\lib\junit.jar
A    kettle\cobertura\lib\cpl-v10.html
A    kettle\cobertura\lib\jakarta-oro-2.0.8.jar
A    kettle\cobertura\lib\asm-2.1.jar
A    kettle\cobertura\lib\ccl.jar
A    kettle\src
A    kettle\src\kettle-steps.xml
A    kettle\src\kettle-job-entries.xml
A    kettle\src\kettle-import-rules.xml
A    kettle\src\org
A    kettle\src\org\pentaho
A    kettle\src\org\pentaho\xul
A    kettle\src\org\pentaho\xul\swt
A    kettle\src\org\pentaho\reporting
A    kettle\src\org\pentaho\reporting\plugin
A    kettle\src\org\pentaho\hadoop
A    kettle\src\org\pentaho\hadoop\HadoopCompression.java
A    kettle\src\org\pentaho\di
A    kettle\src\org\pentaho\di\repository
A    kettle\src\org\pentaho\di\repository\kdr
A    kettle\src\org\pentaho\di\repository\kdr\KettleDatabaseRepositorySecurityProvider.java
A    kettle\src\org\pentaho\di\repository\kdr\KettleDatabaseRepositoryCreationHelper.java
A    kettle\src\org\pentaho\di\repository\kdr\KettleDatabaseRepositoryMeta.java
A    kettle\src\org\pentaho\di\repository\kdr\KettleDatabaseRepositoryBase.java
A    kettle\src\org\pentaho\di\repository\kdr\KettleDatabaseRepository.java
A    kettle\src\org\pentaho\di\repository\kdr\delegates
A    kettle\src\org\pentaho\di\repository\kdr\delegates\KettleDatabaseRepositoryBaseDelegate.java

下载的非常慢,不可以忍了。

查看SVN服务器位置:

~ ping source.pentaho.org
正在 Ping source.pentaho.org [74.205.95.173] 具有 32 字节的数据:
来自 74.205.95.173 的回复: 字节=32 时间=210ms TTL=50
来自 74.205.95.173 的回复: 字节=32 时间=209ms TTL=50
来自 74.205.95.173 的回复: 字节=32 时间=211ms TTL=50
来自 74.205.95.173 的回复: 字节=32 时间=210ms TTL=50

kettle-svn

发现SVN服务器在美国!!换另外一种思路,下载源代码!

5). 在github上面做了一个clone版

    • a. 在一台美国的vps通过svn下载代码。(30s下载完成)
    • b. 在github上面新建一个git项目
    • c. 增加gitignore屏蔽.svn目录
    • d. 上传到自己的github的库里面
    • e. 在本地的开发环境从github下载代码
git clone https://github.com/bsspirit/kettle-4.4.0-stable.git

6). 下载完成,执行ant

~ D:\workspace\java\kettle>ant
Buildfile: D:\workspace\java\kettle\build.xml

init:
     [echo] Init...
    [mkdir] Created dir: D:\workspace\java\kettle\build
    [mkdir] Created dir: D:\workspace\java\kettle\classes
    [mkdir] Created dir: D:\workspace\java\kettle\classes\META-INF
    [mkdir] Created dir: D:\workspace\java\kettle\classes-ui
    [mkdir] Created dir: D:\workspace\java\kettle\classes-ui\ui
    [mkdir] Created dir: D:\workspace\java\kettle\classes-core
    [mkdir] Created dir: D:\workspace\java\kettle\classes-db
    [mkdir] Created dir: D:\workspace\java\kettle\classes-dbdialog
    [mkdir] Created dir: D:\workspace\java\kettle\testClasses
    [mkdir] Created dir: D:\workspace\java\kettle\lib
    [mkdir] Created dir: D:\workspace\java\kettle\distrib
    [mkdir] Created dir: D:\workspace\java\kettle\osx-distrib
    [mkdir] Created dir: D:\workspace\java\kettle\docs\api
    [mkdir] Created dir: D:\workspace\java\kettle\webstart
    [mkdir] Created dir: D:\workspace\java\kettle\junit
    [mkdir] Created dir: D:\workspace\java\kettle\pdi-ce-distrib
     [echo] Revision set to r1

compile-core:
     [echo] Compiling Kettle CORE...
    [javac] Compiling 196 source files to D:\workspace\java\kettle\classes-core

copy-core:
     [echo] Copying core images etc to classes directory...
     [copy] Copying 73 files to D:\workspace\java\kettle\classes-core

kettle-core:
     [echo] Generating the Kettle core library kettle-core.jar ...
      [jar] Building jar: D:\workspace\java\kettle\lib\kettle-core.jar

compile-db:
     [echo] Compiling Kettle DB...
    [javac] Compiling 66 source files to D:\workspace\java\kettle\classes-db

copy-db:
     [echo] Copying db images etc to classes-db directory...
     [copy] Copying 9 files to D:\workspace\java\kettle\classes-db

kettle-db:
     [echo] Generating the Kettle DB library kettle-db.jar ...
      [jar] Building jar: D:\workspace\java\kettle\lib\kettle-db.jar

compile:
     [echo] Compiling Kettle...
    [javac] Compiling 1138 source files to D:\workspace\java\kettle\classes
    [javac] D:\workspace\java\kettle\src\org\pentaho\di\job\entry\JobEntryDialogInterface.java:37: 警告:编码 GBK 的不可
映射字符
    [javac]  *
	If the user changed any settings, the JobEntryInterface object抯 揷hanged?flag must be set to true
[javac] ^
[javac] D:\workspace\java\kettle\src\org\pentaho\di\job\entry\JobEntryDialogInterface.java:43: 警告:编码 GBK 的不可
映射字符
[javac] *The JobEntryInterface object抯 揷hanged?flag must be set to the value it had at the time the dialog o
pened
	
  • [javac] ^ [javac] D:\workspace\java\kettle\src\org\pentaho\di\job\entry\JobEntryInterface.java:75: 警告:编码 GBK 的不可映射字 符 [javac] * public void loadXML(? [javac] ^ [javac] D:\workspace\java\kettle\src\org\pentaho\di\job\entry\JobEntryInterface.java:81: 警告:编码 GBK 的不可映射字 符 [javac] * public void saveRep(? [javac] ^ [javac] D:\workspace\java\kettle\src\org\pentaho\di\job\entry\JobEntryInterface.java:89: 警告:编码 GBK 的不可映射字 符 [javac] * public void loadRep(? [javac] ^ [javac] D:\workspace\java\kettle\src\org\pentaho\di\trans\steps\mondrianinput\MondrianHelper.java:121: 警告:[deprec ation] mondrian.olap.Connection 中的 execute(mondrian.olap.Query) 已过时 [javac] result = connection.execute(query); [javac] ^ [javac] 6 警告copy: [echo] Copying images etc to classes directory... [copy] Copying 1884 files to D:\workspace\java\kettle\classes [copy] Copying 1 file to D:\workspace\java\kettle\classes\META-INF kettle: [echo] Generating the Kettle library kettle-engine.jar ... [jar] Building jar: D:\workspace\java\kettle\lib\kettle-engine.jar compile-dbdialog: [echo] Compiling Kettle DB... [javac] Compiling 5 source files to D:\workspace\java\kettle\classes-dbdialog copy-dbdialog: [echo] Copying db images etc to classes-dbdialog directory... [copy] Copying 23 files to D:\workspace\java\kettle\classes-dbdialog kettle-dbdialog: [echo] Generating the Kettle DB library kettle-dbdialog.jar ... [jar] Building jar: D:\workspace\java\kettle\lib\kettle-dbdialog.jar compile-ui: [echo] Compiling Kettle UI... [javac] Compiling 585 source files to D:\workspace\java\kettle\classes-ui [javac] D:\workspace\java\kettle\src-ui\org\pentaho\di\ui\job\entries\getpop\JobEntryGetPOPDialog.java:2102: 警告: 编码 GBK 的不可映射字符 [javac] mb.setMessage("Veuillez svp donner un nom 锟?cette entr锟絜 t锟絚he!"); [javac] ^ [javac] 1 警告 [copy] Copying 200 files to D:\workspace\java\kettle\classes-ui [copy] Copying 379 files to D:\workspace\java\kettle\classes-ui\ui kettle-ui: [echo] Generating the Kettle library kettle-ui-swt.jar ... [jar] Building jar: D:\workspace\java\kettle\lib\kettle-ui-swt.jar antcontrib.download-check: antcontrib.download: [mkdir] Created dir: C:\Users\Administrator\.subfloor\tmp [get] Getting: http://downloads.sourceforge.net/ant-contrib/ant-contrib-1.0b3-bin.zip [get] To: C:\Users\Administrator\.subfloor\tmp\antcontrib.zip [get] http://downloads.sourceforge.net/ant-contrib/ant-contrib-1.0b3-bin.zip permanently moved to http://downloads .sourceforge.net/project/ant-contrib/ant-contrib/1.0b3/ant-contrib-1.0b3-bin.zip [get] http://downloads.sourceforge.net/project/ant-contrib/ant-contrib/1.0b3/ant-contrib-1.0b3-bin.zip moved to ht tp://jaist.dl.sourceforge.net/project/ant-contrib/ant-contrib/1.0b3/ant-contrib-1.0b3-bin.zip [unzip] Expanding: C:\Users\Administrator\.subfloor\tmp\antcontrib.zip into C:\Users\Administrator\.subfloor\tmp [copy] Copying 5 files to C:\Users\Administrator\.subfloor\ant-contrib install-antcontrib: compile-plugins-standalone: [echo] Compiling Kettle Plugin kettle-gpload-plugin... [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\kettle-gpload-plugin\bin\classes [javac] Compiling 5 source files to D:\workspace\java\kettle\src-plugins\kettle-gpload-plugin\bin\classes [copy] Copying 7 files to D:\workspace\java\kettle\src-plugins\kettle-gpload-plugin\bin\classes [echo] Compiling Kettle Plugin kettle-palo-plugin... [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\kettle-palo-plugin\bin\classes [javac] Compiling 17 source files to D:\workspace\java\kettle\src-plugins\kettle-palo-plugin\bin\classes [copy] Copying 28 files to D:\workspace\java\kettle\src-plugins\kettle-palo-plugin\bin\classes [echo] Compiling Kettle Plugin kettle-hl7-plugin... [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\kettle-hl7-plugin\bin\classes [javac] Compiling 13 source files to D:\workspace\java\kettle\src-plugins\kettle-hl7-plugin\bin\classes [copy] Copying 14 files to D:\workspace\java\kettle\src-plugins\kettle-hl7-plugin\bin\classes [echo] Compiling Kettle Plugin market... [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\market\bin\classes [javac] Compiling 9 source files to D:\workspace\java\kettle\src-plugins\market\bin\classes [javac] D:\workspace\java\kettle\src-plugins\market\src\org\pentaho\di\core\market\Market.java:533: 警告:[deprecati on] org.pentaho.di.ui.core.gui.GUIResource 中的 reload() 已过时 [javac] GUIResource.getInstance().reload(); [javac] ^ [javac] 1 警告 [copy] Copying 2 files to D:\workspace\java\kettle\src-plugins\market\bin\classes compile-plugins: kettle-plugins-jar-standalone: [echo] Generating the Kettle Plugin Jar ${plugin} ... [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\kettle-gpload-plugin\dist [jar] Building jar: D:\workspace\java\kettle\src-plugins\kettle-gpload-plugin\dist\kettle-gpload-plugin.jar [echo] Generating the Kettle Plugin Jar ${plugin} ... [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\kettle-palo-plugin\dist [jar] Building jar: D:\workspace\java\kettle\src-plugins\kettle-palo-plugin\dist\kettle-palo-plugin.jar [echo] Generating the Kettle Plugin Jar ${plugin} ... [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\kettle-hl7-plugin\dist [jar] Building jar: D:\workspace\java\kettle\src-plugins\kettle-hl7-plugin\dist\kettle-hl7-plugin.jar [echo] Generating the Kettle Plugin Jar ${plugin} ... [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\market\dist [jar] Building jar: D:\workspace\java\kettle\src-plugins\market\dist\market.jar kettle-plugins-jar: kettle-plugins-standalone: [echo] Staging the Kettle plugin kettle-gpload-plugin ... [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\kettle-gpload-plugin\bin\stage\kettle-gpload-plugin [copy] Copying 1 file to D:\workspace\java\kettle\src-plugins\kettle-gpload-plugin\bin\stage\kettle-gpload-plugin [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\kettle-gpload-plugin\bin\stage\kettle-gpload-plugin\lib [echo] Staging the Kettle plugin kettle-palo-plugin ... [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\kettle-palo-plugin\bin\stage\kettle-palo-plugin [copy] Copying 1 file to D:\workspace\java\kettle\src-plugins\kettle-palo-plugin\bin\stage\kettle-palo-plugin [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\kettle-palo-plugin\bin\stage\kettle-palo-plugin\lib [copy] Copying 1 file to D:\workspace\java\kettle\src-plugins\kettle-palo-plugin\bin\stage\kettle-palo-plugin\lib [echo] Staging the Kettle plugin kettle-hl7-plugin ... [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\kettle-hl7-plugin\bin\stage\kettle-hl7-plugin [copy] Copying 1 file to D:\workspace\java\kettle\src-plugins\kettle-hl7-plugin\bin\stage\kettle-hl7-plugin [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\kettle-hl7-plugin\bin\stage\kettle-hl7-plugin\lib [copy] Copying 10 files to D:\workspace\java\kettle\src-plugins\kettle-hl7-plugin\bin\stage\kettle-hl7-plugin\lib [echo] Staging the Kettle plugin market ... [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\market\bin\stage\market [copy] Copying 1 file to D:\workspace\java\kettle\src-plugins\market\bin\stage\market [mkdir] Created dir: D:\workspace\java\kettle\src-plugins\market\bin\stage\market\lib [copy] Copying 1 file to D:\workspace\java\kettle\src-plugins\market\bin\stage\market kettle-plugins: compileTests: [echo] Compiling Kettle tests... [javac] Compiling 122 source files to D:\workspace\java\kettle\testClasses kettle-test: [echo] Generating the Kettle library kettle-test.jar ... [jar] Building jar: D:\workspace\java\kettle\lib\kettle-test.jar distrib-nodeps: [echo] Construct the distribution package... [copy] Copying 34 files to D:\workspace\java\kettle\distrib [copy] Copied 10 empty directories to 2 empty directories under D:\workspace\java\kettle\distrib [mkdir] Created dir: D:\workspace\java\kettle\distrib\lib [copy] Copying 1 file to D:\workspace\java\kettle\distrib\lib [copy] Copying 1 file to D:\workspace\java\kettle\distrib\lib [copy] Copying 1 file to D:\workspace\java\kettle\distrib\lib [copy] Copying 1 file to D:\workspace\java\kettle\distrib\lib [copy] Copying 1 file to D:\workspace\java\kettle\distrib\lib [copy] Copying 1 file to D:\workspace\java\kettle\distrib\lib [mkdir] Created dir: D:\workspace\java\kettle\distrib\libext [copy] Copying 214 files to D:\workspace\java\kettle\distrib\libext [mkdir] Created dir: D:\workspace\java\kettle\distrib\libswt [copy] Copying 21 files to D:\workspace\java\kettle\distrib\libswt [mkdir] Created dir: D:\workspace\java\kettle\distrib\plugins [copy] Copying 15 files to D:\workspace\java\kettle\distrib\plugins [copy] Copied 11 empty directories to 3 empty directories under D:\workspace\java\kettle\distrib\plugins [copy] Copying 1 file to D:\workspace\java\kettle\distrib\plugins [copy] Copied 2 empty directories to 1 empty directory under D:\workspace\java\kettle\distrib\plugins [copy] Copying 2 files to D:\workspace\java\kettle\distrib\plugins [copy] Copying 11 files to D:\workspace\java\kettle\distrib\plugins [copy] Copying 2 files to D:\workspace\java\kettle\distrib\plugins [copy] Copied 2 empty directories to 1 empty directory under D:\workspace\java\kettle\distrib\plugins [mkdir] Created dir: D:\workspace\java\kettle\distrib\ui [copy] Copying 387 files to D:\workspace\java\kettle\distrib\ui [mkdir] Created dir: D:\workspace\java\kettle\distrib\docs [copy] Copying 354 files to D:\workspace\java\kettle\distrib\docs [mkdir] Created dir: D:\workspace\java\kettle\distrib\pwd [copy] Copying 6 files to D:\workspace\java\kettle\distrib\pwd [mkdir] Created dir: D:\workspace\java\kettle\distrib\launcher [copy] Copying 3 files to D:\workspace\java\kettle\distrib\launcher [mkdir] Created dir: D:\workspace\java\kettle\distrib\simple-jndi [copy] Copying 1 file to D:\workspace\java\kettle\distrib\simple-jndi [mkdir] Created dir: D:\workspace\java\kettle\distrib\samples [mkdir] Created dir: D:\workspace\java\kettle\distrib\samples\transformations [mkdir] Created dir: D:\workspace\java\kettle\distrib\samples\jobs [mkdir] Created dir: D:\workspace\java\kettle\distrib\samples\transformations\output [mkdir] Created dir: D:\workspace\java\kettle\distrib\samples\jobs\output [copy] Copying 248 files to D:\workspace\java\kettle\distrib\samples distrib: default: BUILD SUCCESSFUL Total time: 1 minute 29 seconds
  • 虽然,有一些警告,但是build成功!!

    3. 在Eclipse中构建kettle项目

    7). 把kettle项目,导入到Eclipse中。

    kettle-eclipse

    4. 在Eclipse中构建插件项目

    8). 构建插件项目:我可以基于一个模板去构建插件。

    下载kettle-TemplatePlugin项目

    wget http://www.ahuoo.com/download/TemplateStepPlugin.rar

    9). 解压后导入到eclipse工程:kettle-TemplatePlugin

    复制类库

    • 从kettle项目中,复制lib的*.jar到kettle-TemplatePlugin中的libext目录
    • 从kettle项目中,复制libswt/win64的swt.js到kettle-TemplatePlugin中的libswt/win64目录

    10). 刚才复制的类库加入项目依赖

    kettle-eclipse-template

    11). 在kettle-TemplatePlugin项目,执行ant

    
    ~ D:\workspace\java\kettle-TemplatePlugin>ant
    Buildfile: D:\workspace\java\kettle-TemplatePlugin\build.xml
    
    init:
         [echo] Init...
    
    compile:
         [echo] Compiling Jasper Reporting Plugin...
        [javac] D:\workspace\java\kettle-TemplatePlugin\build.xml:40: warning: 'includeantruntime' was not set, defaulting t
    o build.sysclasspath=last; set to false for repeatable builds
    
    copy:
         [echo] Copying images etc to classes directory...
    
    lib:
         [echo] Generating the Jasper Reporting library TemplateStepPlugin.jar ...
          [jar] Building jar: D:\workspace\java\kettle-TemplatePlugin\lib\TemplateStepPlugin.jar
    
    distrib:
         [echo] Copying libraries to distrib directory...
         [copy] Copying 1 file to D:\workspace\java\kettle-TemplatePlugin\distrib
    
    deploy:
         [echo] deploying plugin...
    
    default:
    
    BUILD SUCCESSFUL
    Total time: 0 seconds
    

    12). 修改distrib目录的文件

    • icon.png:图标文件
    • plugin.xml: 插件的配置文件(4.4以后的版本,可以去掉)
    • TemplateStepPlugin.jar:是通过ant生成的文件

    5. 配置插件到Kettle中

    13). 把kettle-TemplatePlugin发布到kettle中

    a. 在kettle是工程增加2个目录

    
    ~ mkdir D:\workspace\java\kettle\distrib\plugins\steps\myPlugin
    ~ mkdir D:\workspace\java\kettle\plugins\steps\myPlugin
    

    b. 修改kettle-TemplatePlugin的build.xml文件

    
    <property name="deploydir" location="D:\workspace\java\kettle\distrib\plugins\steps\myPlugin"/>
    <property name="projectdir" location="D:\workspace\java\kettle\plugins\steps\myPlugin"/>
    
    <fileset dir="${libswt}/win64/" includes="*.jar"/>
    
    <target name="deploy" depends="distrib" description="Deploy distribution..." >
    <echo>deploying plugin...</echo>
    <copy todir="${deploydir}">
    <fileset dir="${distrib}" includes="**/*.*"/>
    </copy>
    
    <copy todir="${projectdir}">
    <fileset dir="${distrib}" includes="**/*.*"/>
    </copy>
    </target>
    

    c. kettle-TemplatePlugin项目执行ant

    
    D:\workspace\java\kettle-TemplatePlugin>ant
    Buildfile: D:\workspace\java\kettle-TemplatePlugin\build.xml
    
    init:
         [echo] Init...
    
    compile:
         [echo] Compiling Jasper Reporting Plugin...
        [javac] D:\workspace\java\kettle-TemplatePlugin\build.xml:43: warning: 'includeantruntime' was not set, defaulting t
    o build.sysclasspath=last; set to false for repeatable builds
    
    copy:
         [echo] Copying images etc to classes directory...
    
    lib:
         [echo] Generating the Jasper Reporting library TemplateStepPlugin.jar ...
    
    distrib:
         [echo] Copying libraries to distrib directory...
    
    deploy:
         [echo] deploying plugin...
         [copy] Copying 3 files to D:\workspace\java\kettle\distrib\plugins\steps\myPlugin
         [copy] Copying 6 files to D:\workspace\java\kettle\distrib\plugins\steps\myPlugin
         [copy] Copying 3 files to D:\workspace\java\kettle\plugins\steps\myPlugin
    
    default:
    
    BUILD SUCCESSFUL
    Total time: 0 seconds
    

    14). 在kettle中查看目录:D:\workspace\java\kettle\distrib\plugins\steps\myPlugin

    kettle-dist

    kettle-TemplatePlugin项目的3个文件,已经被放到了正确的位置

    6. 命令行项目启动

    15). 命令行启动kettle
    a. 修改Spoon启动命令,不开启新窗口,直接以JAVA运行

    
    ~ vi D:\workspace\java\kettle\distrib\Spoon.bat
    
    @echo on
    REM start "Spoon" "%_PENTAHO_JAVA%" %OPT% -jar launcher\launcher.jar -lib ..\%LIBSPATH% %_cmdline%
    java %OPT% -jar launcher\launcher.jar -lib ..\%LIBSPATH% %_cmdline%
    

    b. 运行Spoon.bat命令

    
    ~ D:\workspace\java\kettle\distrib>Spoon.bat
    
    DEBUG: Using JAVA_HOME
    DEBUG: _PENTAHO_JAVA_HOME=D:\toolkit\java\jdk6
    DEBUG: _PENTAHO_JAVA=D:\toolkit\java\jdk6\bin\javaw
    
    D:\workspace\java\kettle\distrib>REM start "Spoon" "D:\toolkit\java\jdk6\bin\javaw" "-Xmx512m" "-XX:MaxPermSize=256m" "-
    Djava.library.path=libswt\win64" "-DKETTLE_HOME=" "-DKETTLE_REPOSITORY=" "-DKETTLE_USER=" "-DKETTLE_PASSWORD=" "-DKETTLE
    _PLUGIN_PACKAGES=" "-DKETTLE_LOG_SIZE_LIMIT=" -jar launcher\launcher.jar -lib ..\libswt\win64
    
    D:\workspace\java\kettle\distrib>java "-Xmx512m" "-XX:MaxPermSize=256m" "-Djava.library.path=libswt\win64" "-DKETTLE_HOM
    E=" "-DKETTLE_REPOSITORY=" "-DKETTLE_USER=" "-DKETTLE_PASSWORD=" "-DKETTLE_PLUGIN_PACKAGES=" "-DKETTLE_LOG_SIZE_LIMIT="
    -jar launcher\launcher.jar -lib ..\libswt\win64
    INFO  21-09 12:26:35,717 - Spoon - Logging goes to file:///C:/Users/ADMINI~1/AppData/Local/Temp/spoon_0042f442-2276-11e3
    -bf49-6be1282e1ee0.log
    INFO  21-09 12:26:36,655 - Spoon - 要求资源库
    INFO  21-09 12:26:36,795 - RepositoriesMeta - Reading repositories XML file: C:\Users\Administrator\.kettle\repositories
    .xml
    INFO  21-09 12:26:37,783 - Version checker - OK
    

    16). 查看kettle-TemplatePlugin 插件
    kettle-debug1

    7. 在kettle项目集成插件源代码

    17). 通过Eclipse的 link source功能,连接kettle-TemplatePlugin项目
    a. 在kettle项目中,选择link source
    kettle-link-source

    b. 在kettle项目中编程
    kettle-source

    18). 通过Eclipse启动kettle
    a. 在Eclipse中配置启动Main Class: org.pentaho.di.ui.spoon.Spoon
    kettle-run1

    b.增加64位的swt.jar类库
    kettle-run2

    c. 在Eclipse中启动kettle
    kettle-run3

    19). 通过Eclipse调用kettle-TemplatePlugin
    a. 修改TemplateStepDialog.java,找到open方法,增加一行输出

    
    public String open() { 
    System.out.println(“Open a dialog!!!”);
    
    ...
    }
    

    b. 在Eclipse中,通过debug启动:org.pentaho.di.ui.spoon.Spoon
    双点Template Plugin的图标,看到日志显示”Open a dialog!!!“

    kettle-debug1

    这样我们就构建好了,kettle插件的开发环境。接下来,我们就可以进行插件开发了!!

    转载请注明出处:
    http://blog.fens.me/java-kettle-plugin-eclipse

    打赏作者

    Mahout学习路线图

    Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

    从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

    作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

    关于作者:

    • 张丹(Conan), 程序员Java,R,PHP,Javascript
    • weibo:@Conan_Z
    • blog: http://blog.fens.me
    • email: bsspirit@gmail.com

    转载请注明出处:
    http://blog.fens.me/hadoop-mahout-roadmap/

    Hadoop-mahout-roadmap

    前言

    Mahout是Hadoop家族中与众不同的一个成员,是基于一个Hadoop的机器学习和数据挖掘的分布式计算框架。Mahout是一个跨学科产品,同时也是我认为Hadoop家族中,最有竞争力,最难掌握,最值得学习的一个项目之一。

    Mahout为数据分析人员,解决了大数据的门槛;为算法工程师,提供基础的算法库;为Hadoop开发人员,提供了数据建模的标准;为运维人员,打通了和Hadoop连接。

    Mahout就是训象人,在Hadoop上创造新的智慧!

    目录

    1. Mahout介绍
    2. Mahout学习路线图
    3. 我的学习经历
    4. Mahout的使用案例

    1. Mahout介绍

    Mahout 是基于Hadoop的机器学习和数据挖掘的一个分布式框架。Mahout用MapReduce实现了部分数据挖掘算法,解决了并行挖掘的问题。

    根据”Mahout In Action”书中的介绍,Mahout实现3大类算法, 推荐(Recommendation),聚类(Clustering),分类(Classification)。

    下文介绍的学习路线图,将以”Mahout In Action”书中思路展张。

    2. Mahout学习路线图

    HadoopMahoutRoadmap

    Mahout知识点,我已经列在图中,希望帮助其他人更好的了解Mahout。

    接下来,是我的学习经历,谁都没有捷径。把心踏实下来,就不那么难了。

    3. 我的学习经历

    之前,大概花了半年的时间,专门研究过Mahout,当时Mahout的资料非常少,中文资料更是仅仅几篇。直到发现了“Mahout In Action”如获至宝,开始反复地读。先不着急上手去做什么,一遍一遍地读。直到读完3遍,心理才有了一点把握。

    从“推荐”算法开始,UserCF, ItemCF。 记得第一次在公司给小组讲的时候,还设计了一份问卷,我列出了10个网站,(其中6个IT大站,2个个人blog,2个社交类社区),分别让大家去投票,0-5分,0为不知道,1-5为对网站喜爱程序。

    问卷结果格式:

    user1, website1, 5
    user1, website2, 2
    user1, website3, 4
    user2, website3, 2
    user3, website3, 5
    user4, website3, 0
    …..

    通过这个问卷来模拟尝试Mahout的推荐模型!计算的结果对大家来说,都是比较奇怪。为什么会有这样的推荐呢。 然后,深入Mahout源代码,看算法的实现,知道了相似度矩阵,距离算法,推荐算法,模型验证等,不同业务要求,不同的算法调用,对结果都是有影响的。把书中所有的的概念,关键词都整理过(可惜当时没写博客)。整整花了3个月,每天12个小时的强度,把推荐部分完整地学下来了。

    然后,应用到实际业务中。我的任务是做“职位推荐”,我只有用户浏览职位,收藏职位,申请职位的行为数据。

    第一次尝试,直接套用推荐模型,但结果非常之差。
    出现问题的原因是有2点:

    • 1. 职位是有时效性的,每个职位可能3个月就会过期:推荐结果包含了很多的过期职位。
    • 2. 大量的用户行为都是历史的,甚至是2-3年前的:推荐结果不符合用户的预期。我估计每半年用户的职位都可能有上升,所以历史行为是不能直接用于当前用户的计算。

    修改方案:
    1. 对用户行为数据集进行过滤,只计算最近半年内的用户行为。
    2. 对结果集进行过滤,排除过期的职位。
    3. 分别用不同的算法模型计算(我记得Tanimoto的Item Base结果最好)

    对于推荐结果有了大幅度的提升。故事到此就结束了!虽然我还做了更多的事情,不过这个产品由于公司结构性调整,最终没有上线。(程序员的悲哀!)

    聚类模型,我把这个算法 应用在网站用户的活跃度分析。假设一个网站,注册用户1000W,每天登陆的1W。我们想了解一下,未登陆的999W用户有什么特点!!用到Mahout的k-means和Canopy做聚类,假设1000W的用户可能可以划分为5个大的群体。最后我们得到了一个结果,分享到了团队。故事又到此结束了。(实现就是这么悲哀!)

    分类模型,我尝试着用Native Bayes对我的个人邮件进行垃圾分类。按机器学习的操作流程,历史数据健分词后,训练分类器,每天时时的数据通过分类器进行判断。整个自动化过程都已经完成。故事又结束了!(接受现实吧。)

    其实还有一些,我争取都整理出来。

    Mahout是有一定的学习门槛,而且需要跨学科的知识。只要坚持学习,没有跨不过的鸿沟!乐观努力!

    4. Mahout的使用案例

    已经整理成文章的案例

    正在准备更多的案例…..

    相关文章:
    Hadoop家族产品学习路线图

    转载请注明出处:
    http://blog.fens.me/hadoop-mahout-roadmap/

    打赏作者