2011年5月8日 星期日

Coverting Text file to Binary Sequence File

1.In the Main Class
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);

job.setNumReduceTasks(0); //don't use reduce class

2.In the Map class
public class Map extends
Mapper< LongWritable, Text, BytesWritable, BytesWritable>
{
private BytesWritable one = new BytesWritable("1".getBytes());
private BytesWritable val = new BytesWritable();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String s = value.toString();
val.set(s.getBytes(),0,s.length());
context.write(one,val);

}
}

Using SequenceFileAsBinaryInputFormat & OutputFormat v0.21.0

1.First you have to make sure your input files are binary sequence files.
To more detail, please see
http://www.hadoop.tw/2008/12/hadoop-uncompressed-sequencefi.html
If you don't know how to Covert Text file to Binary sequence file, please see
http://kuanyuhadoop.blogspot.com/2011/05/coverting-text-file-to-binary-sequence.html

2.In the Main Class, you have to set the following things:
job.setOutputKeyClass(BytesWritable.class)
job.setOutputValueClass(BytesWritable.class)

job.setInputFormatClass(SequenceFileAsBinaryInputFormat.class)
job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class)

3.In the Map Class
public class Map extends Mapper
{
public void map(BytesWritable key, BytesWritable value, Context context)
{
...
}
}

Reduce class is the same as Map Class...

2011年4月27日 星期三

Hadoop v0.21.0 的改變


目前碰到的改變的有
1.Create Job方法

多了Cluster.java
Create Job前 先Create一個Cluster
Cluster cluster = new Cluster(new Configuration());
//會自動load configuration檔 (在conf.get第一次被呼叫時才load)

再用
Job job = job.getInstance(cluster,conf)
//會呼叫 new Job(cluster,conf)

也可用 job.getInstance(cluster)
//會呼叫new Job(cluster) ==> Job(cluster,new Configuration())


JobID給值的時間
Job.waitForCompletion(true) ==> submit() ==> JobSubmitter 的 submitJobInternal
==> JobID 在 JobSubmitter中submitJobInternal給值


2.FileSystem多了 newInstance(conf) method
額外的寫檔用newInstance來寫 寫完關掉 比較安全
不然會不小心關到MapReduce用的FileSystem Object

3.DistributedCache存取方式改變
設定

由原來的DistributedCache.addCacheFile(new URI("/tmp/D_Matrix"),conf);
變成job.addCacheFile(new URI("/tmp/D_Matrix"));

取出

由localFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
變成localFiles = context.getLocalCacheFiles();

2011年4月24日 星期日

How to use Hadoop TwoDArrayWritable datatype

以下例子假設我們要建立一個 double的array

首先 我們要建立一個新的 class 名稱可自訂
(如果沒建新的class 在Reducer就會出現 NoSuchMethodException)

假設我們的Class 名稱為DoubleTwoDArrayWritable

我們要Create一個DoubleTwoDArrayWritable.java file
裡面寫
import org.apache.hadoop.io.*;
public class DoubleTwoDArrayWritable extends TwoDArrayWritable
{
public DoubleTwoDArrayWritable()
{
super(DoubleWritable.class);
}
}

這樣我們就建立了一個新的datatype了

之後設定 job.setMapOutputValueClass(DoubleTwoDArrayWritable.class);
(若要設為Key的話 還要implement必較方法 目前似乎沒支援)

在Mapper中,要給值還要過一個Writable[][]

private Text one = new Text("1");
private DoubleTwoDArrayWritable WTW = new DoubleTwoDArrayWritable();
private DoubleWritable [][] Result = null;
if (Result == null)
{
Result = new DoubleWritable[r_int][];
for (i=0;i < r_int ;i++)
{
Result[i] = new DoubleWritable[r_int];
for (j=0;j
<r_int;j++)
Result[i][j] = new DoubleWritable();
}

for ( i=0; i
<r_int ; i++ )
for ( j=i ; j
<r_int ; j++ )
{
tmpR = W[i]*W[j];
Result[i][j].set(tmpR);
Result[j][i].set(tmpR);
}
WTW.set(Result);
context.write(one,WTW);






而在Reducer中,也要過一個Writable[][] 來接值 @@"

private Writable [][] getArray = null;
for (DoubleTwoDArrayWritable value : values)
{
getArray = value.get();
for ( i=0; i<r_int ; i++ )
for ( j=0 ; j<r_int ; j++ )
C[i][j] = C[i][j] + ((DoubleWritable)getArray[i][j]).get();

}

2011年4月18日 星期一

Read a file in HDFS

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;

首先 先建立一個FileSystem物件,在建立此物件時,必需傳入一個Configuration object,因此
我們先Create一個Configuration object
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
之後就可以用這個 FileSystem object來開檔,創出一個FSDataInputStream object
Path inFile = new Path("/tmp/test");
FSDataInputStream in = fs.open(outFile);

之後就用
in.read(long position, byte[] buffer, int offset, int length)
in.readFully(long position, byte[] buffer)

來讀檔


Write a file to HDFS

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;

首先 先建立一個FileSystem物件,在建立此物件時,必需傳入一個Configuration object,因此
我們先Create一個Configuration object
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
之後就可以用這個 FileSystem object來開檔,創出一個FSDataOutputStream object
Path outFile = new Path("/tmp/test");
FSDataOutputStream out = fs.create(outFile);

之後就用
out.write(s.getBytes(),0,s.length()); 來寫入要寫的 String s 資訊

最後別忘了把FileSystem關掉 才會正式寫回去
fs.close();


P.S. 在Map或Reduce中 可以用Context.getConfiguration() 取得 Configuration object
但如果這樣,就不能關掉FileSystem,因為FileSystem 還會被MapReduce用到
MapReduce的一個Job跑完就會自動關掉FileSystem 了~


2011年3月30日 星期三

Map & Reduce class variable

經過測試 Map 和 Reduce 的 Class variable 只會維持在同一個 Job 中
一出 Job 後Map 和 Reduce的Class variable 就不會再存在
For example:
我們寫了一個Reduce class
public class TEST_Reduce extends Reducer{
private double[][] C = null;
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException{
if (C == null){
C = new double[r_int][];
context.write(new Text("Read C"),new Text("1"));
}
}
}
這個程式 在同一個Job中 不管Reduce收到多少keys,都只會output一次(Read C,1)
但是若重覆跑 N 次 Job 就會輸出 N 次(Read C,1)

大家不妨可以試試