2011年5月8日 星期日

Coverting Text file to Binary Sequence File

1.In the Main Class
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);

job.setNumReduceTasks(0); //don't use reduce class

2.In the Map class
public class Map extends
Mapper< LongWritable, Text, BytesWritable, BytesWritable>
{
private BytesWritable one = new BytesWritable("1".getBytes());
private BytesWritable val = new BytesWritable();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String s = value.toString();
val.set(s.getBytes(),0,s.length());
context.write(one,val);

}
}

Using SequenceFileAsBinaryInputFormat & OutputFormat v0.21.0

1.First you have to make sure your input files are binary sequence files.
To more detail, please see
http://www.hadoop.tw/2008/12/hadoop-uncompressed-sequencefi.html
If you don't know how to Covert Text file to Binary sequence file, please see
http://kuanyuhadoop.blogspot.com/2011/05/coverting-text-file-to-binary-sequence.html

2.In the Main Class, you have to set the following things:
job.setOutputKeyClass(BytesWritable.class)
job.setOutputValueClass(BytesWritable.class)

job.setInputFormatClass(SequenceFileAsBinaryInputFormat.class)
job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class)

3.In the Map Class
public class Map extends Mapper
{
public void map(BytesWritable key, BytesWritable value, Context context)
{
...
}
}

Reduce class is the same as Map Class...

2011年4月27日 星期三

Hadoop v0.21.0 的改變


目前碰到的改變的有
1.Create Job方法

多了Cluster.java
Create Job前 先Create一個Cluster
Cluster cluster = new Cluster(new Configuration());
//會自動load configuration檔 (在conf.get第一次被呼叫時才load)

再用
Job job = job.getInstance(cluster,conf)
//會呼叫 new Job(cluster,conf)

也可用 job.getInstance(cluster)
//會呼叫new Job(cluster) ==> Job(cluster,new Configuration())


JobID給值的時間
Job.waitForCompletion(true) ==> submit() ==> JobSubmitter 的 submitJobInternal
==> JobID 在 JobSubmitter中submitJobInternal給值


2.FileSystem多了 newInstance(conf) method
額外的寫檔用newInstance來寫 寫完關掉 比較安全
不然會不小心關到MapReduce用的FileSystem Object

3.DistributedCache存取方式改變
設定

由原來的DistributedCache.addCacheFile(new URI("/tmp/D_Matrix"),conf);
變成job.addCacheFile(new URI("/tmp/D_Matrix"));

取出

由localFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
變成localFiles = context.getLocalCacheFiles();

2011年4月24日 星期日

How to use Hadoop TwoDArrayWritable datatype

以下例子假設我們要建立一個 double的array

首先 我們要建立一個新的 class 名稱可自訂
(如果沒建新的class 在Reducer就會出現 NoSuchMethodException)

假設我們的Class 名稱為DoubleTwoDArrayWritable

我們要Create一個DoubleTwoDArrayWritable.java file
裡面寫
import org.apache.hadoop.io.*;
public class DoubleTwoDArrayWritable extends TwoDArrayWritable
{
public DoubleTwoDArrayWritable()
{
super(DoubleWritable.class);
}
}

這樣我們就建立了一個新的datatype了

之後設定 job.setMapOutputValueClass(DoubleTwoDArrayWritable.class);
(若要設為Key的話 還要implement必較方法 目前似乎沒支援)

在Mapper中,要給值還要過一個Writable[][]

private Text one = new Text("1");
private DoubleTwoDArrayWritable WTW = new DoubleTwoDArrayWritable();
private DoubleWritable [][] Result = null;
if (Result == null)
{
Result = new DoubleWritable[r_int][];
for (i=0;i < r_int ;i++)
{
Result[i] = new DoubleWritable[r_int];
for (j=0;j
<r_int;j++)
Result[i][j] = new DoubleWritable();
}

for ( i=0; i
<r_int ; i++ )
for ( j=i ; j
<r_int ; j++ )
{
tmpR = W[i]*W[j];
Result[i][j].set(tmpR);
Result[j][i].set(tmpR);
}
WTW.set(Result);
context.write(one,WTW);






而在Reducer中,也要過一個Writable[][] 來接值 @@"

private Writable [][] getArray = null;
for (DoubleTwoDArrayWritable value : values)
{
getArray = value.get();
for ( i=0; i<r_int ; i++ )
for ( j=0 ; j<r_int ; j++ )
C[i][j] = C[i][j] + ((DoubleWritable)getArray[i][j]).get();

}

2011年4月18日 星期一

Read a file in HDFS

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;

首先 先建立一個FileSystem物件,在建立此物件時,必需傳入一個Configuration object,因此
我們先Create一個Configuration object
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
之後就可以用這個 FileSystem object來開檔,創出一個FSDataInputStream object
Path inFile = new Path("/tmp/test");
FSDataInputStream in = fs.open(outFile);

之後就用
in.read(long position, byte[] buffer, int offset, int length)
in.readFully(long position, byte[] buffer)

來讀檔


Write a file to HDFS

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;

首先 先建立一個FileSystem物件,在建立此物件時,必需傳入一個Configuration object,因此
我們先Create一個Configuration object
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
之後就可以用這個 FileSystem object來開檔,創出一個FSDataOutputStream object
Path outFile = new Path("/tmp/test");
FSDataOutputStream out = fs.create(outFile);

之後就用
out.write(s.getBytes(),0,s.length()); 來寫入要寫的 String s 資訊

最後別忘了把FileSystem關掉 才會正式寫回去
fs.close();


P.S. 在Map或Reduce中 可以用Context.getConfiguration() 取得 Configuration object
但如果這樣,就不能關掉FileSystem,因為FileSystem 還會被MapReduce用到
MapReduce的一個Job跑完就會自動關掉FileSystem 了~


2011年3月30日 星期三

Map & Reduce class variable

經過測試 Map 和 Reduce 的 Class variable 只會維持在同一個 Job 中
一出 Job 後Map 和 Reduce的Class variable 就不會再存在
For example:
我們寫了一個Reduce class
public class TEST_Reduce extends Reducer{
private double[][] C = null;
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException{
if (C == null){
C = new double[r_int][];
context.write(new Text("Read C"),new Text("1"));
}
}
}
這個程式 在同一個Job中 不管Reduce收到多少keys,都只會output一次(Read C,1)
但是若重覆跑 N 次 Job 就會輸出 N 次(Read C,1)

大家不妨可以試試

2011年3月29日 星期二

How to use MapReduce DistributedCache v0.20.2

首先要先
import org.apache.hadoop.filecache.* ==> 給 DistributedCache用
import java.net.RUI
import org.apache.hadoop.fs.Path

前面已經提到 MapReduce v0.20.2 必需將 Configuration 和 Job分開。
而 DistributedCache 的用法也變成
1. 在main中先告訴Configuration 哪個File要被當成DistributedCache
DistributedCache.addCacheFile(new URI("$(path in HFS)")
,$(configuration object));
For example:
Configuration conf = new Configuration();
DistributedCache.addCacheFile(new URI("/myDis/test.dat"),conf);

2. 而在 map 或 reduce 中
可以用 DistributedCache.getLocalCacheFiles(context.getConfiguration());
取得CacheFile的Path Array;
For example:
Path[] localFiles;
localFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
接下來就是Java的正常開檔了,提供一個用法
FileReader fr = new FileReader(localFiles[0].toString());
BufferedReader br = nre bufferedReader(fr);
String fileline = br.readLine(); //Read first line in the file

如果我們把localFile的Path印出來,我們不難發現,Hadoop 會幫我們把檔案偷偷放在我們在core-site.xml (詳細請參考Hadoop建置文件) 裡設定的hadoop.tmp.dir下的
mapred/local/taskTracker/archive/${ServerName}/${path in HFS}

Example Code:
P.S. You have to put the file "test.dat" under the directory "/myDis" first in the HFS before running the program.

2011年3月21日 星期一

New MapReduce API Slide

http://www.slideshare.net/sh1mmer/upgrading-to-the-new-map-reduce-api

How to compile MapReduce file and run it

以下以hadoop-0.20.2為例
Compile:
javac -classpath /{yourPathToHadoop}/hadoop-0.20.2-core.jar yourfile -d {outputDir}
For example:
javac -classpath /usr/local/hadoop/hadoop-0.20.2-core.jar myWordCount.java -d bin/

把bin內的檔案包成 jar檔
jar -cvf {OutputJarName} -C bin/ .
For example:
jar -cvf output.jar -C bin/ .

放到 hadoop上跑
hadoop jar {yourJarName} {MainClassName}
For example:
hadoop jar output.jar myWordCount input output
P.S. 後面的input output為myWordCount程式的參數

2011年3月20日 星期日

MapReduce 0.20.2 passing parameters to Mapper

MapReduce passing parameter

MapReduce 0.20.2把原來的 JobConf改掉,變成要先用Configuration 設好Config再傳入Job中。

For example:

Configuration conf = new Configuration();

Job job = new Job(conf, "wordcount");

因此0.20.2的傳參數方法 和之前用JobConf的也不一樣

傳的方法變成

在Main function中 用conf.set("key","value")來傳

For example:

Configuration conf = new Configuration();

conf.set("round","1000");

Job job = new Job(conf, "wordcount");

而在Mapper中 只要用context.getConfiguration().get("key") 就可以取出相對應的值了~

For example:

in map(...) function

{

String round = context.getConfiguration().get("round");

...

}

參考Source Code:

http://www.cmlab.csie.ntu.edu.tw/~wfuny/MapReduce/myWordCount.java