R与大数据——Map/Reduce

在R+Hadoop的模式下工作了也有一个月了,小结一下这段时间以来的心得体会以加深理解。
作为当下最火的大数据解决方案,Hadoop提供了两种方式来实现Map/Reduce: - Hadoop Streaming,允许用户创建和运行可执行程序来作为mapper和reducer。本文主要考虑这种方式。 - Hadoop Pipes,是一个C++ API,也可用于实现Map/Reduce应用。由于它主要面对C++用户,所以不在本文考虑范围之内。

Hadoop Streaming的工作原理

在Hadoop Streaming中,mapper和reducer都是可执行文件,它们从标准输入stdin中读取数据,经过处理再输出到标准输出stdout。Hadoop Streaming会自动创建map/reduce作业(job)并将其提交到合适的集群中,监控任务进程直到完成。

当指定好mapper的可执行文件时,在初始化的过程中每个mapper任务(task)会为该可执行文件单独创建一个进程。在mapper任务运行时,它将输入转化为行并发送到进程的stdin。同时mapper会收集从进程的stdout产生的输出,并将得到的每一行转化为key/value对作为mapper的输出。默认情况下,每行第一个\t字符之前的内容作为key,余下的所有内容(除了\t字符)为value。

当指定好reducer的可执行文件时,在初始化的过程中每个reducer任务(task)也会为该可执行文件单独创建一个进程。在reducer任务运行时,它将输入的key/value对转化为行并发送到进程的stdin。同时收集从进程的stdout产生的输出作为reducer的输出。

R+Hadoop=?

从Hadoop Streaming的工作原理中可以看到只要具有“能够从stdin读入数据,输出数据到stdout”的能力,就可以利用它来执行Map/Reduce任务。显然R也能够在Map/Reduce的框架下发挥它统计和数据挖掘的优势了。

目前在CRAN上比较成熟的和Hadoop有关的包有: - HadoopStreaming 提供了一个编写R Map/Reduce脚本的框架。好处在于它不但能够在集群环境下工作,而且在非集群环境下通过管道也能进行Map/Reduce。这就为算法的调试带来了方便。 - Rhipe 没有用过,具体不好多说。但是它提供了一个类似lapply的函数rhlapply,这点看起来比较方便。 - RHive 提供了R和Hive之间的交互,在它的基础上可以用R实现UDF(User Defined Function)和UDAF(User Defined Aggregate Function)以便在Hive中调用(例子)。这在某些应用场景是非常有用的。 - RHadoop 由三个包rhbase, rhdfs, rmr共同组成,属于Revolution Analytics的项目。实际上这个包的作者就是Rhipe的作者,是他在进入RA之后负责开发的。有了它,Rhipe就是浮云了。

Word Count Example

这里给出一个Map/Reduce中的“Hello World”小例子 —— Word Count。
采用HadoopStreaming包进行实现,改自其自带例子,进行简化,也更实用。

-------------- mapper.R --------------
#!/usr/bin/env Rscript
library(HadoopStreaming)
options(stringsAsFactors=F)
opts <- hsCmdLineArgs()
map <- function(inflow) {
    inflow <- strsplit(paste(inflow,collapse=' '),'[[:punct:][:space:]]+')[[1]]
    inflow <- inflow[!(inflow=='')]
    hsWriteTable(data.frame(word=inflow,cnt=1),
                 file=opts$outcon,sep=opts$outsep)
}
hsLineReader(opts$incon,chunkSize=opts$chunksize,FUN=map)

-------------- reducer.R --------------
#!/usr/bin/env Rscript
library(HadoopStreaming)
options(stringsAsFactors=F)
opts <- hsCmdLineArgs()
reduce <- function(inflow) {
    out <- tapply(inflow[,2],inflow[,1],sum)
    hsWriteTable(data.frame(word=names(out),cnt=unname(out)),
                 file=opts$outcon,sep=opts$outsep)
}
p.key <- NA
p.val <- 0
p.reduce <- function(inflow) {
    if(nrow(inflow)==0) {
        cat(paste(p.key,opts$outsep,p.val,'\n',sep=''),
            file=opts$outcon)
        p.key <<- NA
        p.val <<- 0
    } else {
        if(is.na(p.key)) {
            p.key <<- inflow[1,1]
        }
        p.val <<- p.val+sum(inflow[,2])
    }
}
cols <- list(word='',cnt=0)
hsTableReader(opts$incon,cols,chunkSize=opts$chunksize,skip=opts$skip,sep=opts$insep,keyCol='word',singleKey=F,ignoreKey=F,FUN=reduce,PFUN=p.reduce)

结语

通过Hadoop Streaming机制可以发挥Hadoop的集群能力和R的统计挖掘能力。虽然它并不是那么的elegant,但是确实提供了一种在大数据的情况下解决问题的方式。不过使用HadoopStreaming包来计算的话,自行进行一定的封装还是必要的,否则每次都要写一大堆脚本才能完成任务。毕竟Done in R style还是很重要滴!


Published: March 31 2012

blog comments powered by Disqus