• Archive by category "架构设计"

Blog Archives

如何确定数据分析目标

架构师的信仰系列文章,主要介绍我对系统架构的理解,从我的视角描述各种软件应用系统的架构设计思想和实现思路。

从程序员开始,到架构师一路走来,经历过太多的系统和应用。做过手机游戏,写过编程工具;做过大型Web应用系统,写过公司内部CRM;做过SOA的系统集成,写过基于Hadoop的大数据工具;做过外包,做过电商,做过团购,做过支付,做过SNS,也做过移动SNS。以前只用Java,然后学了PHP,现在用R和Javascript。最后跳出IT圈,进入金融圈,研发量化交易软件。

架构设计就是定义一套完整的程序规范,坚持架构师的信仰,做自己想做的东西。

关于作者:

  • 张丹,数据分析师/程序员/Quant: R,Java,Nodejs
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/data-analysis-goal/

前言

数据分析核心要解决业务问题,通过数据发现规律,驱动业务创新发展。那么如何确定一个核心的分析目标就变得尤为重要,如果目标找的不对,那么就会一直原地打转。花费了无数时间,也不过是徒劳。

目录

  1. 如何设定分析目标
  2. 目标分解
  3. 人是最核心的

1. 如何设定分析目标

设定数据分析目标,是为了能让数据能更好的落地。那么在设计目标前,需要先理解数据落地是什么,请参考文章怎么理解数据分析落地

如何设定分析目标?首先,需要与业务团队紧密合作,了解他们的需求和目标。确保数据分析的目标与实际业务目标紧密相关,以便分析结果能够为业务决策提供有意义的指导。然后,具体定义需要解决的分析问题。问题应具体明确,具备可操作性。

目标,最好是一句话能说明白,到底要做什么,能达到什么样的结果。

在不同场景下,我们定义的分析目标是不一样的。

  • 量化投资,通过程序算法可直接变现。
  • 电商商品推荐,增加客户购买效率。
  • 地图导航:帮助用户规划路线
  • 风险识别:精准发现风险问题
  • 医疗影像识别:给出辅助诊断结果
  • 自动驾驶:帮助用户自动开车

在每个生活的领域中,都已经存在大量的数据分析的案例。

那么,在你做的数据分析的工作,你能用一句话说清楚吗,数据分析到底是为了什么?这个点上,大家通常都是迷茫的。可能是为了发论文,可能是领导布置的工作,可能是照搬之前形成的模板…

所以怎么把思路改变,怎么能让工作变得有意义,就需要从定义一个好的目标开始。然后,命中靶心!

2. 目标分解

目标一定是明确的,通过目标分解,可以把一个宏大目标分解成多少小的目标来执行,最终实现最初的目标。

量化投资,通过程序算法可直接变现,以一个金融市场交易策略来举例。

我们要达成 “通过程序算法可直接变现” 的目标,就需要把目标进行分解,变成可操作的任务。从而把一个大的抽象的目标,变成多个小的更具体的目标。

  1. 设定目标:在金融市场上,分析关税对于大宗商品的影响,从而进行期货交易实现套利。
  2. 定义逻辑和输出结果:分析受到关税影响的商品,以及进出口的变化,结合期货市场的价格和持仓,找到合适的交易买卖点,生成交易信号。
  3. 技术方法:以均线策略和布林线策略为核心,结合基本面的关税数据,通过R语言编写程序,计算出交易信号。
  4. 回测验证:把交易信号放到回测程序中,通过历史数据验证交易是否赚钱,计算收益率,夏普率,胜率等指标,买卖滑点,资金容量等
  5. 实盘验证:金融白银的投入到策略中,策略出什么信号,就买什么票,直到实现盈利。

然后,再近一步拆解,拆成更细粒度的,通过数据能体现的。

  1. 采集国内期货市场数据,取1分钟周期的数据,提取数据字段包括交易时间、开高低收价格、持仓量、交易金额。
  2. 以跨期交易为核心统计套利思路,叠加关税变化在相同商品,不同合约的影响,找到变化大小的规律。
  3. 以均线策略和布林线策略为核心,结合基本面的关税数据,计算出交易信号,并考虑手续费、滑点等影响因素。
  4. 构建回测功能函数,支持按手交易,或者按资金体量交易,能够进行收益率,夏普率,胜率等指标计算。

通过一层一层的分析,在我们梳理业务逻辑并结合数据,就可以把最初的目标,逐步分解成可执行小任务小目标,从而完成整个项目方案。

当然,对于新的数据分析方向来说,你可能不清楚怎么分解更合理,这需求大量来补充行业经验和知识。人的经验很重要。

3. 人是最核心的

数据分析核心要解决业务问题,通过数据发现规律,为业务服务,产生价值,获得收益,从而实现数据落地。

整个一套流程下来人是最核心的。人需要理解业务,人需要发现规律,人需要把业务和数据连接,人需要编写逻辑程序,人需要验证逻辑,人需要验证价值。

大模型虽然可以辅助人来做其中的一部分或者多部分工作,但是还远远达不到人的能力水平。每个都是个性化的个体,有着不是生长环境,不同的工作经历,掌握不同专业技能,因此我们每个看待同一件事情的角度,都是不一样的。这种不一样的东西,才是创新的原动力。

希望每一位数据分析师,都能发挥出自己的聪明才智,实现自己的目标突破,完成数据落地。

转载请注明出处:
http://blog.fens.me/data-analysis-goal/

影响数据分析落地的9大影响因素

架构师的信仰系列文章,主要介绍我对系统架构的理解,从我的视角描述各种软件应用系统的架构设计思想和实现思路。

从程序员开始,到架构师一路走来,经历过太多的系统和应用。做过手机游戏,写过编程工具;做过大型Web应用系统,写过公司内部CRM;做过SOA的系统集成,写过基于Hadoop的大数据工具;做过外包,做过电商,做过团购,做过支付,做过SNS,也做过移动SNS。以前只用Java,然后学了PHP,现在用R和Javascript。最后跳出IT圈,进入金融圈,研发量化交易软件。

架构设计就是定义一套完整的程序规范,坚持架构师的信仰,做自己想做的东西。

关于作者:

  • 张丹,数据分析师/程序员/Quant: R,Java,Nodejs
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/data-analysis-factor-top9/

前言

现在我们正处于大数据时代,处处都产生数据,大部分数据已经不在稀缺,分析方法和算法模型都也写在了教课书中,如何挖掘出数据的价值,让数据分析落地,把数据价值转换为业务价值,是数据分析师核心要考虑的。数据分析要解决实际业务场景问题,伪需求、不清晰的目标,都会造成项目失败。

本文总结了影响数据分析落地的9大影响因素。

目录

  1. 影响数据分析的因素
  2. 无处不在的伪需求
  3. 目标不明确
  4. 口嗨
  5. 自我陶醉
  6. 业务理解不到位
  7. 炫技
  8. 只用自己会的技术
  9. 模型是万能的
  10. 既要..又要..也要..还要

1. 影响数据分析的因素

影响数据落地的因素有很多,可能数据分析做着做着,就会偏离原先既定的目标。

做数据分析项目,通常在开始时,第一步,我们都要先设定分析目标是什么,要解决一个怎样的问题。第二步,要思考如何用数据来解决,能产生什么结果,这个结果对于我们目标有什么样的促进作用。第三步,方法路径是什么,用什么技术,用什么模型,怎么做数据,怎么进行训练。第四步,是对模型结果进行验证,是否结果是有效的,是否正确,又是否准确的。

一个目标明确的数据分析项目,我们通常把具体做什么,提前设计技术框架,来保证数据分析的稳定执行。

但是,在执行过程中,会遇到各样的问题。可能有我们自身思路的限制的影响,也有外界市场变化,甲方领导带来的影响,导致我们在做分析时,会有各种各样的偏差。

2. 无处不在的伪需求

伪需求是指那些表面上看起来像是需求,实际上并不是真正必需的需求,通常是由于外部压力、误解或者不合理的预期所产生的。简单来说,伪需求并非真正影响项目目标的关键需求,它可能会浪费时间和资源,导致不必要的复杂性。在一些领域,特别是在产品开发或项目管理中,伪需求可能会引发团队的误方向。比如客户可能要求一些看似有用但实际无关紧要的功能,而这些功能最终并没有为项目带来实际价值。

很多人都不能分辨,什么是真正的需求和伪需求。以为自己需要的,自己想到的,就是真正需求,而并不关心,客户想要什么。这样提出的需求,就很有局限性,通常就不是一个共性问题。

在数据分析领域,伪需求通常表现为对数据的误解或不必要的分析,导致资源浪费和效率低下。以下是一个具体的例子:

案例:缺陷状态数据分析

某公司要求数据分析师对产品缺陷的状态进行详细分析,包括每个缺陷的当前状态、历史状态变化等。然而,深入分析后发现,这些状态数据并未对产品质量改进提供实质性帮助。真正有价值的指标是缺陷的严重程度、发生频率以及修复时间等。对缺陷状态的过度分析反而分散了团队对关键问题的关注,导致资源浪费。

在上述案例中,对缺陷状态的详细分析被认为是必要的,但实际上并未对产品改进带来实际价值。这类伪需求可能源于对数据价值的误解,或是未能准确识别业务需求。为了避免陷入伪需求的陷阱,数据分析师应与业务部门密切沟通,明确分析目标,聚焦于能为决策提供支持的关键指标。

通过识别和避免伪需求,数据分析团队可以更有效地利用资源,专注于真正能推动业务发展的分析工作。

3. 目标不明确

数据分析的核心,是就数据思路解决实际的业务问题。在不了解业务的时候,可能提不出来明确的目标。另一方面,在做数据分析的过程中,可能一会儿想一个,一会儿又想一个,造成目标很多。一直就不能定位到核心目标是什么。比如,我们的目标是从期货市场赚到钱。首先,要研究期货数据,然后研究策略,然后进行交易,最后赚到钱。

但目标不明确,导致分析方向不明确,难以得出有价值的结论。

如果甲方没思路,让你来提供思路,恭喜你,你不做出10个版本,甲方是不会放过你的。如果传达不准确,领导开会一句话,你为了这句话,绞尽脑汁,也不可能想到他的心理去。如果多个决策人,A处说一,B处说二,C处说三,都对,都是领导,后面就可以自行脑补了。

4. 口嗨

口嗨:大厂的人都说了能实现,所以这个技术一点也不难。

由于现在的短视频很发达,我们经常会刷到各种神乎其神的技术,解决了人类一大痛点问题。现在可能只是一个很小的突破,未来真的有可能解决。也被自媒体放大或者错误解读,让大家就真的感受到未来已来的幻觉。

有了这种幻觉后,就感觉自己作为数据分析的从业者,也能做出和报道中一样的效果。

有些大公司团队,甚至把这种虚幻的内容,当成碾压小公司的亮点技术,在客户这边打散吹嘘。一旦甲方领导信了这些飘在天上的,就会对自己的下属,乃至其他的乙方团队提出相当大的质疑。

为什么大公司都说,这个实现起来很容易,你们就是不会做呢?最后领导找大公司来做了,并花费了不少的钱,搞几年烂尾了,终于知道当时那些大厂销售,是一时的口嗨。

这个故事在不断的重复。

5. 自我陶醉

自我陶醉:我做的模型非常好:查准率,查全率,F1,AUC/ROC,接近完美。

在面试做数据分析的小朋友,很多人都处于“自我陶醉”中。在完成了一些老师给的任务,或者参加kaggle的比赛,得到了一些鼓励,就觉得自己行了。

比如,基于给定的数据做分类训练,比如纽约出租车数据分析、xx地区犯罪案件分析、xx地区房屋租金分析、xx企业零售商品分析等。

当问道为什么会有这么好的结果时,回复通常是,“教课书就这么说的,我按步骤做的,就应该有这样的结果,难道不是么”。然后,一脸质疑地看着我。

只有陶醉,过于入戏了。

6. 业务理解不到位

业务理解不到位:我能找出异常点,具体对应什么问题,你们一看应该就知道。

很多从软件开发转型做数据分析的人,绝大部分精力都关注在技术实现,比如何做数据结构,如何如做ETL,如何调用某个模型等,但是对于业务逻辑,并没有清晰的认识。

他们特别擅长用神经网络一类的模型,把数据填入模型,根据技术指标做做调优。把结果成功的计算出来,认为这一定是业务需要的,他的工作就完成了。并不对结果进行解析,觉得你们一看应该就知道,这个结果怎么来的。

这将导致,结果在业务上不可用。因为业务人员也不能理解,这些结果有什么用,是什么逻辑产生的,有什么价值等。

7. 炫技

炫技:有一种新算法,这个项目一定要使用。

这可是最新的论文,能解决xx问题,在哪个杂志发表了,设计到几百个特征,几十的调参变量,50层以上的神经网络。

这种做法,会把项目带入无尽的复杂度。如果真是复杂度匹配的项目,那么这么做没问题。但如果是一个一般的项目,一个逻辑回归就能很好解决的问题,用上了复杂度过高的模型,不仅成本会大大超过,而且后期维护调优,更是噩梦。

就类似于糟糕的程序员10年攒下的屎山一样。

8. 只用自己会的技术

只用自己会的技术:我在学校里学过逻辑回归,所以我要用这个模型。

现在国内外的本科和研究生都开了数据科学的课程,在课程中会学到统计学、机器学习等的课程,大家也会掌握一些技术方法和代码的写法。

有些同学结合课堂练习,有可能掌握了1-2种的模型。后面遇到的问题,就全往这1-2个模型中套。有些问题是类似的直接套上直接能用,但是实现的问题,更具有多样性和特殊性,要因地制宜的思考解决办法,而不是套用。

在统计学领域中,有非常多的模型,不可能全在课堂上都学到,我们需要私下里努力从而掌握更多的知识。如果只用自己会的技术,就必然导致对于模型结果的偏差。

9. 模型是万能的

模型是万能的:训练一个模型,想让它干什么,它就能干什么。

说到模型,很多人就把模型直接和AI画上等号,以为AI应该是万能的。

当然chatgpt的横空出世,也进一步推升了这样的想法。现在,随着对chatgpt的熟悉,我们也发现了,在通用领域有大量的使用场景,但在专业领域还是需要专业模型。

因此,模型不是万能的,一个模型也只能解决一类的问题,如果想让他万能,就多种一些有效的专业模型,形成模型集群吧。

9. 模型应该是不花钱的

模型应该是不花钱的:既想马而跑得快,又想马儿不吃草。

训练一个模型有着巨大的成本开销,包括是高级人才的成本,服务器算力的成本,数据采集的成本,数据存储的成本,数据标记的成本等等。

人才是最宝贵,掌握核心技术的人才,可以真的让我们梦想成真。但奇葩的是,国内大部分信息化项目,人都按外包成本算,按岁数划分人头的成本,设备和数据都比人值钱。

如果要想在人把成本拉平,就需要多多的报人头。为啥科技含量这么高的项目,人这么不值钱呢。

10. 既要..又要..也要..还要

当一个模型被赋予了诸多的使命时,做出来是个四不像,谁都想用,谁都用不起来,那么这个项目必然会失败。

甲方客户,通常都有很多的需求。

  • 既要模型准,又要速度快,也要使用简单,还要成本低。
  • 既要解决业务问题,又要能模型自己能学业务逻辑,也要能说数据进行预警,还要对未来公司战略给出建议。
  • 既要服务于一线工作人员解决业务落地,又要服务于公司运营能自己出完美绩效报表,也要智能设计公司宣传文案,还要自动生成给领导汇报的材料。

一个做了数据分析多年的老兵,自我发出一些感慨!

数据分析是一件很有价值的事情,但是我们也有太多的枷锁需要去克服。希望2025年,数据分析师们,用专业创造未来。争取有点时间,写个数据分析的系列出来!

转载请注明出处:
http://blog.fens.me/data-analysis-factor-top9/

打赏作者

Nifi从安装开始

解构数据平台系列文章,数据越来越重要,针对数据平台系统要求和设计规范也越来越多,为了匹配数据处理个性化和复杂度的要求,新的理论和名词层出不穷,同时也诞生了各种各样的新工具。

本系列文章主要介绍数据平台架构设计,搭建过程,数据处理的各种新工具的使用,从数据采集、数据清洗,到数据交换、数据治理,再到工作流引擎、规则引擎,最后到数据可视化的整套数据系统全流程。

关于作者:

  • 张丹,分析师/程序员/Quant: R,Java,Nodejs
  • blog: http://fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/linux-nifi-install/

前言

最近有一个数据项目要搭建数据仓库平台,最核心的一个功能,就是从多数据源中同步数据。网上有不少的解决方案,就放弃了自己造车轮的想法,相比较最后选择了Nifi。关于Nifi的文档不多,我就自己一步一步来使用,把整个实现的过程记录下来,方便给后人参考。

目录

  1. Apache Nifi是什么
  2. Apache Nifi安装
  3. Apache Nifi运行第一个实例

1. Apache Nifi是什么

Apache Nifi是一个易于使用,功能强大且可靠的系统来处理和分发数据。

Apache NiFi 最初由美国国家安全局(NSA)开发和使用的一个可视化、可定制的数据集成产品。2014 年 NSA 将其贡献给了 Apache 开源社区,2015 年 7 月成为 Apache 顶级项目。

Apache NiFi支持数据路由,转换和系统中介逻辑的强大且可扩展的有向图。NiFi 通过拖拽界面、配置参数、简单地连接,即可完成对数据流的托管和系统间的自动化传输,使用者可以可视化整个过程并实时进行更改。

Apache NiFi遵循APACHE LICENSE, VERSION 2.0 开源协议。

2. Apache Nifi安装

首先,下载Nifi的最新版本1.12.1二进制版本,下载地址为: http://nifi.apache.org/download.html

本机系统环境

  • 操作系统:Ubuntu 20.04.1 LTS
  • Java版本:java version “11.0.7” 2020-04-14 LTS
# 登陆Ubuntu系统
~ pwd
/home/conan

# 新建文件夹
~ mkdir deploy

# 进入deploy目录  
~ cd deploy

# 下载nifi软件
~ wget https://mirrors.tuna.tsinghua.edu.cn/apache/nifi/1.12.1/nifi-1.12.1-bin.zip

# 解压
~ unzip nifi-1.12.1-bin.zip

# 进入nifi目录 
~ cd nifi-1.12.1

# 查看nifi状态
~ bin/nifi.sh status
nifi.sh: JAVA_HOME not set; results may vary
nifi.sh: java command not found

提示缺少JAVA环境变量,我们来配置Java环境变量。

# 打开系统文件(在文件最后面增加配置)
~ sudo vi /etc/environment

JAVA_HOME=/home/conan/deploy/jdk-11.0.7
PATH=$PATH:$JAVA_HOME/bin

让配置生效

# 执行命令
~ source /etc/environment

# 查看nifi状态,JAVA路径已经生效
~ bin/nifi.sh status
Java home: /home/conan/deploy/jdk-11.0.7
NiFi home: /home/conan/deploy/nifi-1.12.1

Bootstrap Config File: /home/conan/deploy/nifi-1.12.1/conf/bootstrap.conf

2020-11-05 09:38:44,837 INFO [main] org.apache.nifi.bootstrap.Command Apache NiFi is not running

# 安装nifi
~ sudo bin/nifi.sh install
Service nifi installed

启动nifi

~ bin/nifi.sh start

Java home: /home/conan/deploy/jdk-11.0.7
NiFi home: /home/conan/deploy/nifi-1.12.1

Bootstrap Config File: /home/conan/deploy/nifi-1.12.1/conf/bootstrap.conf

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.nifi.bootstrap.util.OSUtils (file:/home/conan/deploy/nifi-1.12.1/lib/bootstrap/nifi-bootstrap-1.12.1.jar) to method java.lang.ProcessImpl.pid()
WARNING: Please consider reporting this to the maintainers of org.apache.nifi.bootstrap.util.OSUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

查看nifi运行状态,

~ bin/nifi.sh status

Java home: /home/conan/deploy/jdk-11.0.7
NiFi home: /home/conan/deploy/nifi-1.12.1

Bootstrap Config File: /home/conan/deploy/nifi-1.12.1/conf/bootstrap.conf

2020-11-05 09:41:11,846 INFO [main] org.apache.nifi.bootstrap.Command Apache NiFi is currently running, listening to Bootstrap on port 42913, PID=3498

# 查看系统进程,3498进程号为nifi主进程
~ ps -aux|grep nifi
conan       3475  0.0  0.0   2608   152 pts/0    S    09:40   0:00 /bin/sh bin/nifi.sh start
conan       3477  0.2  0.3 3620512 61732 pts/0   Sl   09:40   0:01 /home/conan/deploy/jdk-11.0.7/bin/java -cp /home/conan/deploy/nifi-1.12.1/conf:/home/conan/deploy/nifi-1.12.1/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.log.dir=/home/conan/deploy/nifi-1.12.1/logs -Dorg.apache.nifi.bootstrap.config.pid.dir=/home/conan/deploy/nifi-1.12.1/run -Dorg.apache.nifi.bootstrap.config.file=/home/conan/deploy/nifi-1.12.1/conf/bootstrap.conf org.apache.nifi.bootstrap.RunNiFi start
conan       3498  9.6  5.6 4429452 898504 pts/0  Sl   09:40   1:15 /home/conan/deploy/jdk-11.0.7/bin/java -classpath /home/conan/deploy/nifi-1.12.1/./conf:/home/conan/deploy/nifi-1.12.1/./lib/nifi-nar-utils-1.12.1.jar:/home/conan/deploy/nifi-1.12.1/./lib/logback-classic-1.2.3.jar:/home/conan/deploy/nifi-1.12.1/./lib/slf4j-api-1.7.30.jar:/home/conan/deploy/nifi-1.12.1/./lib/nifi-framework-api-1.12.1.jar:/home/conan/deploy/nifi-1.12.1/./lib/jul-to-slf4j-1.7.30.jar:/home/conan/deploy/nifi-1.12.1/./lib/nifi-runtime-1.12.1.jar:/home/conan/deploy/nifi-1.12.1/./lib/nifi-properties-1.12.1.jar:/home/conan/deploy/nifi-1.12.1/./lib/javax.servlet-api-3.1.0.jar:/home/conan/deploy/nifi-1.12.1/./lib/jetty-schemas-3.1.jar:/home/conan/deploy/nifi-1.12.1/./lib/jcl-over-slf4j-1.7.30.jar:/home/conan/deploy/nifi-1.12.1/./lib/log4j-over-slf4j-1.7.30.jar:/home/conan/deploy/nifi-1.12.1/./lib/nifi-api-1.12.1.jar:/home/conan/deploy/nifi-1.12.1/./lib/logback-core-1.2.3.jar:/home/conan/deploy/nifi-1.12.1/./lib/java11/javax.activation-api-1.2.0.jar:/home/conan/deploy/nifi-1.12.1/./lib/java11/jaxb-api-2.3.0.jar:/home/conan/deploy/nifi-1.12.1/./lib/java11/jaxb-impl-2.3.0.jar:/home/conan/deploy/nifi-1.12.1/./lib/java11/javax.annotation-api-1.3.2.jar:/home/conan/deploy/nifi-1.12.1/./lib/java11/jaxb-core-2.3.0.jar -Dorg.apache.jasper.compiler.disablejsr199=true -Xmx512m -Xms512m -Dcurator-log-only-first-connection-issue-as-error-level=true -Djavax.security.auth.useSubjectCredsOnly=true -Djava.security.egd=file:/dev/urandom -Dzookeeper.admin.enableServer=false -Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dnifi.properties.file.path=/home/conan/deploy/nifi-1.12.1/./conf/nifi.properties -Dnifi.bootstrap.listen.port=37955 -Dapp=NiFi -Dorg.apache.nifi.bootstrap.config.log.dir=/home/conan/deploy/nifi-1.12.1/logs org.apache.nifi.NiFi

# 查看系统端口, 默认8080为web界面端口
~ netstat -nltp
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 127.0.0.1:42913         0.0.0.0:*               LISTEN      3498/java
tcp        0      0 0.0.0.0:8080            0.0.0.0:*               LISTEN      3498/java
tcp        0      0 127.0.0.53:53           0.0.0.0:*               LISTEN      -
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      -
tcp6       0      0 127.0.0.1:37955         :::*                    LISTEN      3477/java
tcp6       0      0 :::22                   :::*                    LISTEN      -

在浏览器中打开,http://192.168.1.13:8080。你可以输入你自己的IP进行访问。初始化时是不带权限的,后面可以配置启用全新设置。

界面还是很不错的,打开后就是一个完整的画布,我们可以在画布上来进行数据调用任务的编排。

3.Apache Nifi运行第一个实例

任务:产生一组随机文件,写到/home/conan/demo目录下的文件中。

为完成第一个任务,我们来建立一个工作流图,一共需要5个操作。

  • 第一个操作,GenerateFlowFile,用于产生随机文件。
  • 第二个操作,PutFile,用于写入本地文件。
  • 第三个操作,success,用于结束标识位。
  • 第四个操作,Name success,是执行队列,用于连接GenerateFlowFile 和 PutFile,当GenerateFlowFile成功后,产生数据会存在这个过程中。
  • 第五个操作,Name failure success,是执行队列,用于连接PutFile和结束标识,PutFile无论成功或失败,都会起这个流程到结束。

我们需要对 GenerateFlowFile 进行配置,在PROPERTIES标签页,可以设置随机生成的文件大小,也可以自定义一些文字的信息,我把输出文件设置为Hello world,会被写到生成的文件中。

在第四步的连接节点,设置压力阈值为2,可以同时生成2个文件,默认为10000。

再调整 PutFile 配置,在PROPERTIES标签页,可以设置文件的输出目录为/home/conan/demo/ ,我们生成的文件可以到这个目录中找到。

在第五步连接节点,设置压力阈值为5,可以同时写5个文件到磁盘,默认为10000。

然后,启动每一个流程,变为运行状态,这样生成的文件就会被写入到指定的目录了。在输出目录,查看生成的文件。


# 查看输出目录,共有5个文件
~ ll /home/conan/demo
total 1276
drwxrwxr-x 2 conan conan 1282048 Nov  6 01:09 ./
drwxr-xr-x 6 conan conan    4096 Nov  6 01:09 ../
-rw-rw-r-- 1 conan conan      11 Nov  6 01:09 1c108cb4-6d8d-43c7-b8f1-01f796103eb0
-rw-rw-r-- 1 conan conan      11 Nov  6 01:09 2352fea5-2e7a-43e8-8c59-7059d4d6f878
-rw-rw-r-- 1 conan conan      11 Nov  6 01:09 2d3e9fad-fe6e-46d4-8dc1-3e0465994130
-rw-rw-r-- 1 conan conan      11 Nov  6 01:09 965d6888-b612-40cc-af73-46a1c5ed74ec
-rw-rw-r-- 1 conan conan      11 Nov  6 01:09 cccf11bf-6ef3-418b-b2c8-22c9222bc55a

# 查看其中一个文件内容
~ cat /home/conan/demo/1c108cb4-6d8d-43c7-b8f1-01f796103eb0
Hello world

这样就完成了第一个Nifi的实例,总体来说还是比较容易上手的,没有遇到太复杂的环境配置问题。Nifi是一个功能强大的工具,后面我会继续介绍Nifi的使用的详细内容。

转载请注明出处:
http://blog.fens.me/linux-nifi-install/

打赏作者

基于Zookeeper的分步式队列系统集成案例

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-case/

zookeeper-case

前言

软件系统集成一直是工业界的一个难题,像10年以上的遗留系统集成,公司收购后的多系统集成,全球性的分步式系统集成等。虽然基于SOA的软件架构,从理论上都可以解决这些集成的问题,但是具体实施过程,有些集成项目过于复杂而失败。

随着技术的创新和发展,对于分步式集群应用的集成,有了更好的开源软件的支持,像zookeeper就是一个不错的分步式协作软件平台。本文将通过一个案例介绍Zookeeper的强大。

目录

  1. 项目背景:分布式消息中间件
  2. 需求分析:业务系统升级方案
  3. 架构设计:搭建Zookeeper的分步式协作平台
  4. 程序开发:基于Zookeeper的程序设计
  5. 程序运行

1. 项目背景:分布式消息中间件

随着Hadoop的普及,越来越多的公司开始构建自己的Hadoop系统。有时候,公司内部的不同部门或不同的团队,都有自己的Hadoop集群。这种多集群的方式,既能让每个团队拥有个性化的Hadoop,又能避免大集群的高度其中化运维难度。当数据量不是特别巨大的时候,小型集群会有很多适用的场合。

当然,多个小型集群也有缺点,就是资源配置可能造成浪费。每个团队的Hadoop集群,都要配有服务器和运维人员。有些能力强的团队,构建的hadoop集群,可以达到真正的个性化要求;而有一些能力比较差的团队,搭建的Hadoop集群性能会比较糟糕。

还有一些时候,多个团队需要共同完成一个任务,比如,A团队通过Hadoop集群计算的结果,交给B团队继续工作,B完成了自己任务再交给C团队继续做。这就有点像业务系统的工作流一样,一环一环地传下去,直到最后一部分完成。

在业务系统中,我们经常会用SOA的架构来解决这种问题,每个团队在ESB服务器上部署自己的服务,然后通过消息中间件完成调度任务。对于分步式的多个Hadoop集群系统的协作,同样可以用这种架构来做,只要把消息中间件引擎换成支持分步式的消息中间件的引擎就行了。

Zookeeper就可以做为 分步式消息中间件,来完成上面的说的业务需求。ZooKeeper是Hadoop家族的一款高性能的分布式协作的产品,是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。Zookeeper的安装和使用,请参考文章 ZooKeeper伪分布式集群安装及使用

ZooKeeper提供分布式协作服务,并不需要依赖于Hadoop的环境。

2. 需求分析:业务系统升级方案

下面我将从一个案例出发,来解释如何进行分步式协作平台的系统设计。

2.1 案例介绍

某大型软件公司,从事领域为供应链管理,主要业务包括了 采购管理、应付账款管理、应收账款管理、供应商反复管理、退货管理、销售管理、库存管理、电子商务、系统集成等。

biz

每块业务的逻辑都很复杂,由单独部门进行软件开发和维护,部门之间的系统没有直接通信需求,每个部门完成自己的功能就行了,最后通过数据库来共享数据,实现各功能之间的数据交换。

zk1

随着业务的发展,客户对响应速度要求越来越高,通过数据库来共享数据的方式,已经达不到信息交换的要求,系统进行了第一次升级,通过企业服务总线(ESB)统一管理公司内部所有业务。通过WebServices发布服务,通过Message Queue实现业务功能的调度。

zk2

公司业务规模继续扩大,跨国收购了多家公司。业务系统从原来的一个机房的集中式部署,变成了全球性的多机房的分步式部署。这时,Message Queue已经不能满足多机房跨地域的业务系统的功能需求了,需要一种分步式的消息中间件解决方案,来代替原有消息中间件的服务。

系统进行了第二次升级,采用Zookeeper作为分步式中间件调度引擎。

zk3

通过上面的描述,我们可以看出,当一个公司从小到大,从国内业务发展到全球性业务的时候。
为了配合业务发展,IT系统也是越来越复杂的,从最早的主从数据库设计,到ESB企业系统总线的扩展,再到分步式ESB配合分步式消息系统,每一次的升级都需要软件技术的支撑。

2.2 功能需求

全球性采购业务和全球性销售业务,让公司在市场中处于竞争优势。但由于采购和销售分别是由不同部门进行的软件开发和维护,而且业务往来也在不同的国家和地区。所以在每月底结算时,工作量都特别大。

比如,计算利润表 (请不要纠结于公式的准确性)

当月利润 = 当月销售金额 - 当月采购金额 - 当月其他支出

这样一个非常简单的计算公式,但对于跨国公司和部门来说,一点也不简单的。

从系统角度来看,采购部门要统计采购数据(海量数据),销售部门统计销售数据((海量数据),其他部门统计的其他费用支出(汇总的少量数据),最后系统计算得到当月的利润。

这里要说明的是,采购系统是单独的系统,销售是另外单独的系统,及以其他几十个大大小小的系统,如何能让多个系统,配合起来做这道计算题呢??

3. 架构设计:搭建Zookeeper的分步式协作平台

接下来,我们基于zookeeper来构建一个分步式队列的应用,来解决上面的功能需求。下面内容,排除了ESB的部分,只保留zookeeper进行实现。

  • 采购数据,为海量数据,基于Hadoop存储和分析。
  • 销售数据,为海量数据,基于Hadoop存储和分析。
  • 其他费用支出,为少量数据,基于文件或数据库存储和分析。

我们设计一个同步队列,这个队列有3个条件节点,分别对应采购(purchase),销售(sell),其他费用(other)3个部分。当3个节点都被创建后,程序会自动触发计算利润,并创建利润(profit)节点。上面3个节点的创建,无顺序要求。每个节点只能被创建一次。

zk4

系统环境

  • 2个独立的Hadoop集群
  • 2个独立的Java应用
  • 3个Zookeeper集群节点

图标解释:

  • Hadoop App1,Hadoop App2 是2个独立的Hadoop集群应用
  • Java App3,Java App4 是2个独立的Java应用
  • zk1,zk2,zk3是ZooKeeper集群的3个连接点
  • /queue,是znode的队列目录,假设队列长度为3
  • /queue/purchase,是znode队列中,1号排对者,由Hadoop App1提交,用于统计采购金额。
  • /queue/sell,是znode队列中,2号排对者,由Hadoop App2提交,用于统计销售金额。
  • /queue/other,是znode队列中,3号排对者,由Java App3提交,用于统计其他费用支出金额。
  • /queue/profit,当znode队列中满了,触发创建利润节点。
  • 当/qeueu/profit被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束。

补充说明:

  • 创建/queue/purchase,/queue/sell,/queue/other目录时,没有前后顺序,程序提交后,/queue目录下会生成对应该子目录
  • App1可以通过zk2提交,App2也可通过zk3提交。原则上,找最近路由最近的znode节点提交。
  • 每个应用不能重复提出,直到3个任务都提交,计算利润的任务才会被执行。
  • /queue/profit被创建后,zk的应用会监听到这个事件,通知应用,队列已完成!

这里的同步队列的架构更详细的设计思路,请参考文章 ZooKeeper实现分布式队列Queue

4. 程序开发:基于Zookeeper的程序设计

最终的功能需求:计算2013年01月的利润。

4.1 实验环境

在真正企业开发时,我们的实验环境应该与需求是一致的,但我的硬件条件有限,因些做了一个简化的环境设置。

  • 把zookeeper的完全分步式部署的3台服务器集群节点的,改为一台服务器上3个集群节点。
  • 把2个独立Hadoop集群,改为一个集群的2个独立的MapReduce任务。

开发环境:

  • Win7 64bit
  • JDK 1.6
  • Maven3
  • Juno Service Release 2
  • IP:192.168.1.10

Zookeeper服务器环境:

  • Linux Ubuntu 12.04 LTS 64bit
  • Java 1.6.0_29
  • Zookeeper: 3.4.5
  • IP: 192.168.1.201
  • 3个集群节点

Hadoop服务器环境:

  • Linux Ubuntu 12.04 LTS 64bit
  • Java 1.6.0_29
  • Hadoop: 1.0.3
  • IP: 192.168.1.210

4.2 实验数据

3组实验数据:

  • 采购数据,purchase.csv
  • 销售数据,sell.csv
  • 其他费用数据,other.csv

4.2.1 采购数据集

一共4列,分别对应 产品ID,产品数量,产品单价,采购日期。


1,26,1168,2013-01-08
2,49,779,2013-02-12
3,80,850,2013-02-05
4,69,1585,2013-01-26
5,88,1052,2013-01-13
6,84,2363,2013-01-19
7,64,1410,2013-01-12
8,53,910,2013-01-11
9,21,1661,2013-01-19
10,53,2426,2013-02-18
11,64,2022,2013-01-07
12,36,2941,2013-01-28
13,99,3819,2013-01-19
14,64,2563,2013-02-16
15,91,752,2013-02-05
16,65,750,2013-02-04
17,19,2426,2013-02-23
18,19,724,2013-02-05
19,87,137,2013-01-25
20,86,2939,2013-01-14
21,92,159,2013-01-23
22,81,2331,2013-03-01
23,88,998,2013-01-20
24,38,102,2013-02-22
25,32,4813,2013-01-13
26,36,1671,2013-01-19

//省略部分数据

4.2.2 销售数据集

一共4列,分别对应 产品ID,销售数量,销售单价,销售日期。


1,14,1236,2013-01-14
2,19,808,2013-03-06
3,26,886,2013-02-23
4,23,1793,2013-02-09
5,27,1206,2013-01-21
6,27,2648,2013-01-30
7,22,1502,2013-01-19
8,20,1050,2013-01-18
9,13,1778,2013-01-30
10,20,2718,2013-03-14
11,22,2175,2013-01-12
12,16,3284,2013-02-12
13,30,4152,2013-01-30
14,22,2770,2013-03-11
15,28,778,2013-02-23
16,22,874,2013-02-22
17,12,2718,2013-03-22
18,12,747,2013-02-23
19,27,172,2013-02-07
20,27,3282,2013-01-22
21,28,224,2013-02-05
22,26,2613,2013-03-30
23,27,1147,2013-01-31
24,16,141,2013-03-20
25,15,5343,2013-01-21
26,16,1887,2013-01-30
27,12,2535,2013-01-12
28,16,469,2013-01-07
29,29,2395,2013-03-30
30,17,1549,2013-01-30
31,25,4173,2013-03-17

//省略部分数据

4.2.3 其他费用数据集

一共2列,分别对应 发生日期,发生金额


2013-01-02,552
2013-01-03,1092
2013-01-04,1794
2013-01-05,435
2013-01-06,960
2013-01-07,1066
2013-01-08,1354
2013-01-09,880
2013-01-10,1992
2013-01-11,931
2013-01-12,1209
2013-01-13,1491
2013-01-14,804
2013-01-15,480
2013-01-16,1891
2013-01-17,156
2013-01-18,1439
2013-01-19,1018
2013-01-20,1506
2013-01-21,1216
2013-01-22,2045
2013-01-23,400
2013-01-24,1795
2013-01-25,1977
2013-01-26,1002
2013-01-27,226
2013-01-28,1239
2013-01-29,702
2013-01-30,1396

//省略部分数据

4.3 程序设计

我们要编写5个文件:

  • 计算采购金额,Purchase.java
  • 计算销售金额,Sell.java
  • 计算其他费用金额,Other.java
  • 计算利润,Profit.java
  • Zookeeper的调度,ZookeeperJob.java

4.3.1 计算采购金额

采购金额,是基于Hadoop的MapReduce统计计算。


public class Purchase {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class PurchaseMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class PurchaseReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("purchase");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化purchase
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Purchase.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(PurchaseMapper.class);
        job.setReducerClass(PurchaseReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("purchase", "logfile/biz/purchase.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/purchase");// HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.2 计算销售金额

销售金额,是基于Hadoop的MapReduce统计计算。


public class Sell {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class SellMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class SellReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("sell");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化sell
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Sell.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(SellMapper.class);
        job.setReducerClass(SellReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("sell", "logfile/biz/sell.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/sell");// HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.3 计算其他费用金额

其他费用金额,是基于本地文件的统计计算。


public class Other {

    public static String file = "logfile/biz/other.csv";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
    private static String month = "2013-01";

    public static void main(String[] args) throws IOException {
        calcOther(file);
    }

    public static int calcOther(String file) throws IOException {
        int money = 0;
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));

        String s = null;
        while ((s = br.readLine()) != null) {
            // System.out.println(s);
            String[] tokens = DELIMITER.split(s);
            if (tokens[0].startsWith(month)) {// 1月的数据
                money += Integer.parseInt(tokens[1]);
            }
        }
        br.close();

        System.out.println("Output:" + month + "," + money);
        return money;
    }
}

4.3.4 计算利润

利润,通过zookeeper分步式自动调度计算利润。


public class Profit {

    public static void main(String[] args) throws Exception {
        profit();
    }

    public static void profit() throws Exception {
        int sell = getSell();
        int purchase = getPurchase();
        int other = getOther();
        int profit = sell - purchase - other;
        System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit);
    }

    public static int getPurchase() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config());
        return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim());
    }

    public static int getSell() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config());
        return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim());
    }

    public static int getOther() throws IOException {
        return Other.calcOther(Other.file);
    }

}

4.3.5 Zookeeper调度

调度,通过构建分步式队列系统,自动化程序代替人工操作。


public class ZooKeeperJob {

    final public static String QUEUE = "/queue";
    final public static String PROFIT = "/queue/profit";
    final public static String PURCHASE = "/queue/purchase";
    final public static String SELL = "/queue/sell";
    final public static String OTHER = "/queue/other";

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            System.out.println("Please start a task:");
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            doPurchase(zk);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            doSell(zk);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            doOther(zk);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) {
                    System.out.println("Queue has Completed!!!");
                }
            }
        });
        return zk;
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => " + PROFIT);
        zk.exists(PROFIT, true);

        if (zk.exists(QUEUE, false) == null) {
            System.out.println("create " + QUEUE);
            zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(QUEUE + " is exist!");
        }
    }

    public static void doPurchase(ZooKeeper zk) throws Exception {
        if (zk.exists(PURCHASE, false) == null) {

            Purchase.run(Purchase.path());

            System.out.println("create " + PURCHASE);
            zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(PURCHASE + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doSell(ZooKeeper zk) throws Exception {
        if (zk.exists(SELL, false) == null) {

            Sell.run(Sell.path());

            System.out.println("create " + SELL);
            zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(SELL + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doOther(ZooKeeper zk) throws Exception {
        if (zk.exists(OTHER, false) == null) {

            Other.calcOther(Other.file);

            System.out.println("create " + OTHER);
            zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(OTHER + " is exist!");
        }
        isCompleted(zk);
    }

    public static void isCompleted(ZooKeeper zk) throws Exception {
        int size = 3;
        List children = zk.getChildren(QUEUE, true);
        int length = children.size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create " + PROFIT);
            Profit.profit();
            zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

            for (String child : children) {// 清空节点
                zk.delete(QUEUE + "/" + child, -1);
            }
        }
    }
}

5. 运行程序

最后,我们运行整个的程序,包括3个部分。

  • zookeeper服务器
  • hadoop服务器
  • 分步式队列应用

5.1 启动zookeeper服务

启动zookeeper服务器集群:


~ cd toolkit/zookeeper345

# 启动zk集群3个节点
~ bin/zkServer.sh start conf/zk1.cfg
~ bin/zkServer.sh start conf/zk2.cfg
~ bin/zkServer.sh start conf/zk3.cfg

~ jps
4234 QuorumPeerMain
5002 Jps
4275 QuorumPeerMain
4207 QuorumPeerMain

查看zookeeper集群中,各节点的状态


# 查看zk1节点状态
~ bin/zkServer.sh status conf/zk1.cfg
JMX enabled by default
Using config: conf/zk1.cfg
Mode: follower

# 查看zk2节点状态,zk2为leader
~ bin/zkServer.sh status conf/zk2.cfg
JMX enabled by default
Using config: conf/zk2.cfg
Mode: leader

# 查看zk3节点状态
~ bin/zkServer.sh status conf/zk3.cfg
JMX enabled by default
Using config: conf/zk3.cfg
Mode: follower

启动zookeeper客户端:


~ bin/zkCli.sh -server 192.168.1.201:2181

# 查看zk
[zk: 192.168.1.201:2181(CONNECTED) 0] ls /
[queue, queue-fifo, zookeeper]

# /queue路径无子目录
[zk: 192.168.1.201:2181(CONNECTED) 1] ls /queue
[]

5.2 启动Hadoop服务


~ hadoop/hadoop-1.0.3
~ bin/start-all.sh

~ jps
25979 JobTracker
26257 TaskTracker
25576 DataNode
25300 NameNode
12116 Jps
25875 SecondaryNameNode

5.3 启动分步式队列ZookeeperJob

5.3.1 启动统计采购数据程序,设置启动参数1

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
copy from: logfile/biz/purchase.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Output:2013-01,9609887
create /queue/purchase
Queue Complete:1/3

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 3] ls /queue
[purchase]

5.3.2 启动统计销售数据程序,设置启动参数2

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
copy from: logfile/biz/sell.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Output:2013-01,2950315
create /queue/sell
Queue Complete:2/3

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[purchase, sell]

5.3.3 启动统计其他费用数据程序,设置启动参数3

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Output:2013-01,34193
create /queue/other
Queue Complete:3/3
create /queue/profit
cat: hdfs://192.168.1.210:9000/user/hdfs/biz/sell/output/part-r-00000
2950315

cat: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase/output/part-r-00000
9609887

Output:2013-01,34193
profit = sell - purchase - other = 2950315 - 9609887 - 34193 = -6693765
Queue has Completed!!!

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[profit]

在最后一步,统计其他费用数据程序运行后,从日志中看到3个条件节点都已满足要求。然后,通过同步的分步式队列自动启动了计算利润的程序,并在日志中打印了2013年1月的利润为-6693765。

本文介绍的源代码,已上传到github: https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myzk/hadoop

通过这个复杂的实验,我们成功地用zookeeper实现了分步式队列,并应用到了业务中。当然,实验中也有一些不是特别的严谨的地方,请同学边做边思考。

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-case/

打赏作者

Passport现实社交网络OAuth登陆

从零开始nodejs系列文章,将介绍如何利Javascript做为服务端脚本,通过Nodejs框架web开发。Nodejs框架是基于V8的引擎,是目前速度最快的Javascript引擎。chrome浏览器就基于V8,同时打开20-30个网页都很流畅。Nodejs标准的web开发框架Express,可以帮助我们迅速建立web站点,比起PHP的开发效率更高,而且学习曲线更低。非常适合小型网站,个性化网站,我们自己的Geek网站!!

关于作者

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/nodejs-oauth-passport/

nodejs-passport-oauth

前言

随着社交网络的发展,开发一个应用门槛越来越低。从一个完整的应用系统,到一个部署在社交网络平台的APP;从数据库–》应用层–》展示层,变成只需要开发展示层。

很多的社交应用,甚至都放弃了用户注册!仅靠大型社交网站的登陆授权,就可以赚到100W以上的用户量。。。

减少用户管理代码开发及维护,更专注于应用本身,个人开发者已经崛起!!

目录

  1. Passport介绍
  2. OAuth介绍
  3. 登陆Github
  4. 登陆LinkedIn

1. Passport介绍

Passport项目,主要是为了解决登陆认证的问题。

Web应用一般有2种登陆认证的形式:

  • 用户名和密码认证登陆
  • OAuth认证登陆

在上一篇文章中,我们介绍了Passport的项目,通过用户名和密码认证登陆。Express结合Passport实现登陆认证

本文将介绍,通过Passport实现OAuth登陆认证。

2. OAuth介绍

OAuth协议为用户资源的授权提供了一个安全的、开放而又简易的标准。与以往的授权方式不同之处是OAuth的授权不会使第三方触及到用户的帐号信息(如用户名与密码),即第三方无需使用用户的用户名与密码就可以申请获得该用户资源的授权,因此OAuth是安全的。

  • 简单:不管是OAUTH服务提供者还是应用开发者,都很易于理解与使用
  • 安全:没有涉及到用户密钥等信息,更安全更灵活
  • 开放:任何服务提供商都可以实现OAUTH,任何软件开发商都可以使用OAUTH

OAuth认证授权就三个步骤,三句话可以概括:

  • 获取未授权的Request Token
  • 获取用户授权的Request Token
  • 用授权的Request Token换取Access Token

OAuth的介绍,摘自:http://baike.baidu.com/view/3948029.htm

3. Github登陆

通过Passport实现Github登陆。我们还使用上一篇文章中的环境:Express结合Passport实现登陆认证

  • 申请开发github应用
  • Passport程序实现
  • 运行Github登陆认证

1). 申请开发github应用

github-app

2). Passport程序实现

  • 安装Passport的github扩展
  • 增加Github认证策略
  • 定义Github认证的路由配置: 登陆,回调,展示

a. 安装Passport的github扩展


D:\workspace\javascript\nodejs-passport>npm install passport-github

b. 增加Github认证策略

修改app.js


passport.use(new GithubStrategy({//对应从Github申请KEY
    clientID: "XXXX",
    clientSecret: "XXXX",
    callbackURL: "http://localhost:3000/auth/github/callback"
},function(accessToken, refreshToken, profile, done) {
    done(null, profile);
}));

c. 定义Github认证的路由配置: 登陆,回调,展示

  • /auth/github: 通过github,登陆
  • /auth/github/callback: github认证成功后,回调
  • /github: 回调验证后,转向展示示

修改app.js


app.all('/github', isLoggedIn);
app.get("/github",user.github);

app.get("/auth/github", passport.authenticate("github",{ scope : "email"}));
app.get("/auth/github/callback",
    passport.authenticate("github",{
        successRedirect: '/github',
        failureRedirect: '/'
    }));

3).运行Github登陆认证

github-auth

程序日志


D:\workspace\javascript\nodejs-passport>node app.js
Express server listening on port 3000
GET / 200 401ms - 594b
GET /stylesheets/style.css 304 9ms
GET /auth/github 302 8ms - 424b
GET /auth/github/callback?code=7cf818c4590e2aacfe90 302 4052ms - 70b
GET /github 200 4ms - 139b
GET /logout 302 2ms - 58b
GET / 200 3ms - 594b
GET /stylesheets/style.css 304 2ms

4. LinkedIn登陆

  • 申请开发LinkedIn应用
  • Passport程序实现
  • 运行LinkedIn登陆认证

1). 申请开发LinkedIn应用

linkedin-app

2). Passport程序实现

  • 安装Passport的LinkedIn扩展
  • 增加LinkedIn认证策略
  • 定义LinkedIn认证的路由配置: 登陆,回调,展示

a. 安装Passport的LinkedIn扩展


D:\workspace\javascript\nodejs-passport>npm install passport-linkedin

b. 增加LinkedIn认证策略

修改app.js


passport.use(new LinkedinStrategy({
    consumerKey: "XXXX",
    consumerSecret: "XXXX",
    callbackURL: "http://localhost:3000/auth/linkedin/callback",
    userAgent: 'localhost'
},function(accessToken, refreshToken, profile, done) {
    done(null, profile);
}));

c. 定义LinkedIn认证的路由配置: 登陆,回调,展示

  • /auth/linkedin: 通过LinkedIn,登陆
  • /auth/linkedin/callback: github认证成功后,回调
  • /linkedin: 回调验证后,转向展示示

修改app.js


app.all('/github', isLoggedIn);
app.get("/github",user.github);

app.all('/linkedin', isLoggedIn);
app.get("/linkedin",user.linkedin);
app.get("/auth/linkedin", passport.authenticate("linkedin",{}));
app.get("/auth/linkedin/callback",
    passport.authenticate("linkedin",{
        successRedirect: '/linkedin',
        failureRedirect: '/'
    }));

3).运行LinkedIn登陆认证

linkedin-auth

程序日志


D:\workspace\javascript\nodejs-passport>node app.js
Express server listening on port 3000
GET / 200 399ms - 638b
GET /stylesheets/style.css 304 4ms
GET /auth/Linkedin 302 3092ms - 256b
GET /auth/linkedin/callback?oauth_token=75--8f032180-afae-489b-bc3c-3326d80bea6f&oauth_verifier=36091 302 3049ms - 74b
GET /linkedin 200 5ms - 76b
GET /logout 302 1ms - 58b
GET / 200 19ms - 638b
GET /stylesheets/style.css 304 2ms

5. 完整的应用

项目代码我已上传到了github, 项目地址:https://github.com/bsspirit/nodejs-passport

下载及安装


git clone https://github.com/bsspirit/nodejs-passport
npm install

我们非常方便地,实现了Github和LinkedIn登陆认证,是否已经体会到Passport的强大了!把权限这种基础功能,进行合理的封装,是会帮我们节省大量的工作量的。

转载请注明出处:
http://blog.fens.me/nodejs-oauth-passport/

打赏作者