2011年4月27日 星期三

Hadoop v0.21.0 的改變


目前碰到的改變的有
1.Create Job方法

多了Cluster.java
Create Job前 先Create一個Cluster
Cluster cluster = new Cluster(new Configuration());
//會自動load configuration檔 (在conf.get第一次被呼叫時才load)

再用
Job job = job.getInstance(cluster,conf)
//會呼叫 new Job(cluster,conf)

也可用 job.getInstance(cluster)
//會呼叫new Job(cluster) ==> Job(cluster,new Configuration())


JobID給值的時間
Job.waitForCompletion(true) ==> submit() ==> JobSubmitter 的 submitJobInternal
==> JobID 在 JobSubmitter中submitJobInternal給值


2.FileSystem多了 newInstance(conf) method
額外的寫檔用newInstance來寫 寫完關掉 比較安全
不然會不小心關到MapReduce用的FileSystem Object

3.DistributedCache存取方式改變
設定

由原來的DistributedCache.addCacheFile(new URI("/tmp/D_Matrix"),conf);
變成job.addCacheFile(new URI("/tmp/D_Matrix"));

取出

由localFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
變成localFiles = context.getLocalCacheFiles();

2011年4月24日 星期日

How to use Hadoop TwoDArrayWritable datatype

以下例子假設我們要建立一個 double的array

首先 我們要建立一個新的 class 名稱可自訂
(如果沒建新的class 在Reducer就會出現 NoSuchMethodException)

假設我們的Class 名稱為DoubleTwoDArrayWritable

我們要Create一個DoubleTwoDArrayWritable.java file
裡面寫
import org.apache.hadoop.io.*;
public class DoubleTwoDArrayWritable extends TwoDArrayWritable
{
public DoubleTwoDArrayWritable()
{
super(DoubleWritable.class);
}
}

這樣我們就建立了一個新的datatype了

之後設定 job.setMapOutputValueClass(DoubleTwoDArrayWritable.class);
(若要設為Key的話 還要implement必較方法 目前似乎沒支援)

在Mapper中,要給值還要過一個Writable[][]

private Text one = new Text("1");
private DoubleTwoDArrayWritable WTW = new DoubleTwoDArrayWritable();
private DoubleWritable [][] Result = null;
if (Result == null)
{
Result = new DoubleWritable[r_int][];
for (i=0;i < r_int ;i++)
{
Result[i] = new DoubleWritable[r_int];
for (j=0;j
<r_int;j++)
Result[i][j] = new DoubleWritable();
}

for ( i=0; i
<r_int ; i++ )
for ( j=i ; j
<r_int ; j++ )
{
tmpR = W[i]*W[j];
Result[i][j].set(tmpR);
Result[j][i].set(tmpR);
}
WTW.set(Result);
context.write(one,WTW);






而在Reducer中,也要過一個Writable[][] 來接值 @@"

private Writable [][] getArray = null;
for (DoubleTwoDArrayWritable value : values)
{
getArray = value.get();
for ( i=0; i<r_int ; i++ )
for ( j=0 ; j<r_int ; j++ )
C[i][j] = C[i][j] + ((DoubleWritable)getArray[i][j]).get();

}

2011年4月18日 星期一

Read a file in HDFS

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;

首先 先建立一個FileSystem物件,在建立此物件時,必需傳入一個Configuration object,因此
我們先Create一個Configuration object
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
之後就可以用這個 FileSystem object來開檔,創出一個FSDataInputStream object
Path inFile = new Path("/tmp/test");
FSDataInputStream in = fs.open(outFile);

之後就用
in.read(long position, byte[] buffer, int offset, int length)
in.readFully(long position, byte[] buffer)

來讀檔


Write a file to HDFS

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;

首先 先建立一個FileSystem物件,在建立此物件時,必需傳入一個Configuration object,因此
我們先Create一個Configuration object
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
之後就可以用這個 FileSystem object來開檔,創出一個FSDataOutputStream object
Path outFile = new Path("/tmp/test");
FSDataOutputStream out = fs.create(outFile);

之後就用
out.write(s.getBytes(),0,s.length()); 來寫入要寫的 String s 資訊

最後別忘了把FileSystem關掉 才會正式寫回去
fs.close();


P.S. 在Map或Reduce中 可以用Context.getConfiguration() 取得 Configuration object
但如果這樣,就不能關掉FileSystem,因為FileSystem 還會被MapReduce用到
MapReduce的一個Job跑完就會自動關掉FileSystem 了~