Hbase0.96与Java的交互访问

hadoop:hadoop-2.2.0

hbase:hbase-0.96.0

1.org.apache.hadoop.hbase.client.Put

<1>取消了无参的构造方法

<2>Put类不再继承Writable类

0.94.6时public class Put extends Mutation implements HeapSize, Writable, Comparable<Row>

0.96.0时public class Put extends Mutation implements HeapSize, Comparable<Row>

解决方法:

由public class MonthUserLoginTimeIndexReducer extends Reducer<BytesWritable,MonthUserLoginTimeIndexWritable, ImmutableBytesWritable, Writable> {

改public class MonthUserLoginTimeIndexReducer extends Reducer<BytesWritable,MonthUserLoginTimeIndexWritable, ImmutableBytesWritable, Put> {

2.org.apache.hadoop.hbase.client.Mutation.familyMap

org.apache.hadoop.hbase.client.Mutation.familyMap类型改变:

/**

* 0.94.6

* protected Map<byte[],List<KeyValue>> familyMap

*

* 0.96.*

* protected NavigableMap<byte[],List<Cell>> familyMap

* org.apache.hadoop.hbase.Cell hbase-0.94.*中是没有的

*/

org.apache.hadoop.hbase.KeyValue的改变:

/**

* 0.94.*

* public class KeyValue extends Object implements Writable, HeapSize

*

* 0.96.0

* public class KeyValue extends Object implements Cell, HeapSize, Cloneable

*/

解决方法:将代码中的List<KeyValue>改成List<Cell>

3. org.apache.hadoop.hbase.KeyValue

0.96.0中方法getFamily已被弃用(Deprecated),改成方法getFamilyArray()

4.org.apache.hadoop.hbase.HTableDescriptor

类org.apache.hadoop.hbase.HTableDescriptor的构造方法public HTableDescriptor(String name)已被弃用(Deprecated)

解决方法:使用public HTableDescriptor(TableName name)

旧:HTableDescriptor tableDesc = new HTableDescriptor(tableName);

新:HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));

5.org.apache.hadoop.hbase.client.HTablePool

类org.apache.hadoop.hbase.client.HTablePool整个被弃用(Deprecated)

解决方法:使用HConnection.getTable(String)代替,HConnection是个接口,类CoprocessorHConnection是它唯一的实现类:

HRegionServer hRegionServer = new HRegionServer(conf) ;

HConnection connection = HConnectionManager.createConnection(conf);

hConnection = new CoprocessorHConnection(connection,hRegionServer);

6.org.apache.hadoop.hbase.client.Result

方法public KeyValue[] raw()被弃用(Deprecated),建议使用public Cell[] rawCells()

方法getRow被弃用(Deprecated)

方法getFamily被弃用(Deprecated)

方法getQualifier被弃用(Deprecated)

方法getValue被弃用(Deprecated)

方法public List<KeyValue> getColumn(byte[] family,byte[] qualifier)被弃用(Deprecated)

方法public KeyValue getColumnLatest(byte[] family,byte[] qualifier)被弃用(Deprecated)

Cell中:改成以下方法

getRowArray()

getFamilyArray()

getQualifierArray()

getValueArray()

Result中:增加如下方法

public List<KeyValue> getColumnCells(byte[] family,byte[] qualifier)

public KeyValue getColumnLatestCell(byte[] family,byte[] qualifier)

改动:所有ipeijian_data中凡是和【新增用户活跃用户流失用户】相关的都做如下变化:

旧代码:if (value.raw().length == 1

新代码:if (value.rawCells().length == 1

7.job中设置TableInputFormat.SCAN

0.96.0中去掉了方法:public void write(DataOutput out)throws IOException

之前版本使用conf.set(TableInputFormat.SCAN, StatUtils.convertScanToString(scan));进行设置

StatUtils.convertScanToString的具体实现为:

public static String convertScanToString(Scan scan) throws IOException {

ByteArrayOutputStream out = new ByteArrayOutputStream();

DataOutputStream dos = new DataOutputStream(out);

scan.write(dos);

return Base64.encodeBytes(out.toByteArray());

}

该方法的实现与TableMapReduceUtil.convertScanToString(Scan scan)是一样的。

但是当hbase升级到了0.96.*是对于类Scan弃用(不仅仅是Deprecated,而是Deleted)了方法write,所以上面

的实现变为不正确

hbase0.96.*中对该方法进行了重新的实现:

public static String convertScanToString(Scan scan) throws IOException {

ClientProtos.Scan proto = ProtobufUtil.toScan(scan);

return Base64.encodeBytes(proto.toByteArray());

}

所以做如下更改:

StatUtils类中方法convertScanToString的实现做如上更改以适配hbase0.96.*

8.cn.m15.ipj.db.hbase.MyPut

自定义的Put类,比传统的Put类多一个length,原版和新版代码比较:

原版:(红色字体为API变为新版时报错的地方)

public class MyPut extends Put {

public MyPut(byte[] row, int length) {

//原因是put的无参构造方法已经在新本中消失

if (row == null || length > HConstants.MAX_ROW_LENGTH) {

throw new IllegalArgumentException(“Row key is invalid”);

}

this.row = Arrays.copyOf(row, length);

this.ts = HConstants.LATEST_TIMESTAMP;

}

public MyPut add(byte[] family, byte[] qualifier, long ts, byte[] value,int length) {

List<KeyValue> list = getKeyValueList(family);

KeyValue kv = createPutKeyValue(family, qualifier, ts, value, length);

list.add(kv);

familyMap.put(kv.getFamily(), list);

//familyMap的类型已经改变

return this;

}

private List<KeyValue> getKeyValueList(byte[] family) {

List<KeyValue> list = familyMap.get(family);

//familyMap的类型已经改变

if (list == null) {

list = new ArrayList<KeyValue>(0);

}

return list;

}

private KeyValue createPutKeyValue(byte[] family, byte[] qualifier,long ts, byte[] value, int length) {

return new KeyValue(this.row, 0, this.row.length, family, 0,

family.length, qualifier, 0, qualifier.length, ts,

KeyValue.Type.Put, value, 0, length);

}

}

更改之后:

public MyPut(byte[] row, int length) {

super(row,length);

//新增加

if (row == null || length > HConstants.MAX_ROW_LENGTH) {

throw new IllegalArgumentException(“Row key is invalid”);

}

this.row = Arrays.copyOf(row, length);

this.ts = HConstants.LATEST_TIMESTAMP;

}

public MyPut add(byte[] family, byte[] qualifier, long ts, byte[] value,int length) {

List<Cell> list = getCellsList(family);

KeyValue kv = createPutKeyValue(family, qualifier, ts, value, length);

list.add(kv);

familyMap.put(CellUtil.cloneFamily(kv), list);

return this;

}

private List<Cell> getCellsList(byte[] family) {

List<Cell> list = familyMap.get(family);

if (list == null) {

list = new ArrayList<Cell>(0);

}

return list;

}

private KeyValue createPutKeyValue(byte[] family, byte[] qualifier,long ts, byte[] value, int length) {

return new KeyValue(this.row, 0, this.row.length, family, 0,family.length, qualifier, 0, qualifier.length, ts,

KeyValue.Type.Put, value, 0, length);

}

}

package com.test.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.Cell;

public class HbaseTest {

    private static Configuration conf = null;
    /**
     * 初始化配置
     */
    static {
        conf = HBaseConfiguration.create();
    }

    /**
     * 创建表操作
     * 
     * @throws IOException
     * @throws ZooKeeperConnectionException
     * @throws MasterNotRunningException
     */
    public void createTable(String tableName, String[] cfs)
            throws MasterNotRunningException, ZooKeeperConnectionException,
            IOException {
        HBaseAdmin admin = new HBaseAdmin(conf);
        if (admin.tableExists(tableName)) {
            System.out.println("表已经存在!");
        } else {
            HTableDescriptor tableDesc = new HTableDescriptor(
                    TableName.valueOf(tableName));
            for (int i = 0; i < cfs.length; i++) {
                tableDesc.addFamily(new HColumnDescriptor(cfs[i]));
            }
            admin.createTable(tableDesc);
            admin.close();
            System.out.println("表创建成功!");
        }
    }

    /**
     * 删除表操作
     */
    public void deleteTable(String tableName) {
        HBaseAdmin admin;
        try {
            admin = new HBaseAdmin(conf);
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
            admin.close();
            System.out.println("表删除成功!");
        } catch (MasterNotRunningException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ZooKeeperConnectionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    /**
     * 插入一行记录
     */
    public void writeRow(String tableName, String[] cfs) {
        try {
            HTable table = new HTable(conf, tableName);
            Put put = new Put(Bytes.toBytes("row1"));
            for (int j = 0; j < cfs.length; j++) {
                put.add(Bytes.toBytes(cfs[j]),
                        Bytes.toBytes(String.valueOf(1)),
                        Bytes.toBytes("value_1"));
            }
            table.close();
            System.out.println("添加成功!");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * 删除一行记录
     */
    public void deleteRow(String tableName, String rowKey) {
        try {
            HTable table = new HTable(conf, tableName);
            Delete dl = new Delete(rowKey.getBytes());
            table.delete(dl);
            table.close();
            System.out.println("删除行成功!");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    /**
     * 查找一条记录
     */
    public static void selectRow(String tableName, String rowKey) {
        try {
            HTable table = new HTable(conf, tableName);
            Get g = new Get(rowKey.getBytes());
            Result rs = table.get(g);
            System.out.println(rs);
            table.close();
            System.out.println("查询完成!");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * 查询表中所有的行
     */
    public void scanner(String tableName) {
        try {
            HTable table = new HTable(conf, tableName);
            Scan s = new Scan();
            ResultScanner rs = table.getScanner(s);
            for (Result r : rs) {
                // keyvalue
                Cell[] cell = r.rawCells();
                System.out.println("长度:" + cell.length);
                for (int i = 0; i < cell.length; i++) {
                    System.out.println("信息:"
                            + new String(CellUtil.cloneFamily(cell[i])) + " "
                            + new String(CellUtil.cloneQualifier(cell[i]))
                            + "  " + new String(CellUtil.cloneValue(cell[i]))
                            + " " + new String(CellUtil.cloneRow(cell[i])));
                }
                System.out.println("\n-----------------------");
            }
            table.close();
            System.out.println("执行结束!");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        HbaseTest hbase = new HbaseTest();
        String tableName = "test";
        hbase.scanner(tableName);
    }
}