直接使用JAVA 操作 Tokyo Cabinet

下载 tokyo cabinet: http://fallabs.com/tokyocabinet/tokyocabinet-1.4.47.tar.gz

下载 java api client: http://fallabs.com/tokyocabinet/javapkg/ ,

Specifications of Java API

Tokyo Cabinet Key-Value PPT

1. 安装依赖的库

需要安装bzip2和zlib

2. tokyo cabinet

tar zxvf tokyocabinet-1.4.47.tar.gz

cd tokyocabinet-1.4.47/

./configure

#注:在32位Linux操作系统上编译Tokyo cabinet,请使用./configure --enable-off64代替./configure,可以使数据库文件突破2GB的限制。

#./configure --enable-off64

make

make install

cd ../

3. java client

wget

http://fallabs.com/tokyocabinet/javapkg

/tokyocabinet.24.tar.gz

tar-xzvf tokyocabinet-java-1.24.tar.gz

cdtokyocabinet-java-1.24

./configure--prefix=/usr

#然后报错居然……缺少jni.h,忘记装JDK了……

#记得导出JAVA_HOME,否则一样报错jni.h确实。

exportJAVA_HOME="/usr/java/default"

#再次Configure,可以了。

./configure--prefix=/usr

make

makeinstall

install会将libjtokyocabinet.so 和 tokyocabinet.jar放到/usr/lib64下面。

将生成的tokyocabinet.jar拖到本地,新建项目,引用这个jar包。使用如下测试代码:

package test;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Random;

import java.util.concurrent.atomic.AtomicLong;

import tokyocabinet.*;

public class BenchMark {

private static List<TDB> dbList = new ArrayList<TDB>();

public static class Stat {

private AtomicLong _count = new AtomicLong(0);

private static Stat _instance = new Stat();

public static Stat getInstance() {

return _instance;

}

private Stat() {

_printer = new RatePrinter(_count);

_printer.setDaemon(true);

_printer.start();

}

public void inc() {

_count.incrementAndGet();

}

private RatePrinter _printer;

private static class RatePrinter extends Thread {

private long _last;

private AtomicLong _c;

public RatePrinter(AtomicLong c) {

_c = c;

}

@Override

public void run() {

while (true) {

try {

long current = _c.get();

System.out.println(“Rate: ” + (current – _last) + ” req/s”);

_last = current;

Thread.sleep(1000L);

} catch (Throwable e) {

e.printStackTrace();

}

}

}

}

}

public static class EchoThread extends Thread {

// private TDB tdb;

public EchoThread(String ip, String port, int in, ThreadGroup group) {

super(group, “EchoThread-” + in);

// // create the object

// TDB tdb = new TDB();

//

// // open the database

// if(!tdb.open(“casket”+in+”.tct”, TDB.OWRITER | TDB.OCREAT)){

// int ecode = tdb.ecode();

// System.err.println(“open error: ” + tdb.errmsg(ecode));

// }

}

@Override

public void run() {

int index = 0;

// create the object

Random r = new Random();

// open the database

// if (!tdb.open(“casket” + Thread.currentThread().getId() + “.tct”, TDB.OWRITER | TDB.OCREAT)) {

// int ecode = tdb.ecode();

// System.err.println(“open error: ” + tdb.errmsg(ecode));

// }

while (true) {

try {

TDB tdb = dbList.get(0);

String pkey = index + “asdf”;

Map<String, String> cols = new HashMap<String, String>();

cols.put(“name”, “mikio” + index);

cols.put(“age”, “30″);

cols.put(“lang”, “ja,en,c”);

if (!tdb.put(pkey, cols)) {

int ecode = tdb.ecode();

System.err.println(“put error: ” + tdb.errmsg(ecode) + ” key:” + pkey + ” value:” + cols);

}

// client.insert(“Table1″, “name”+index, “Standard1:name”,

// (“name”+index).getBytes(“UTF-8″),

// System.currentTimeMillis(), true);

// client.get_column(“Table1″, “name0″, “Standard1:name”);

index++;

Stat.getInstance().inc();

} catch (Throwable e) {

e.printStackTrace();

break;

} finally {

// close the database

//if (!tdb.close()) {

//int ecode = tdb.ecode();

// System.err.println(“close error: ” +

// tdb.errmsg(ecode));

//}

}

}

}

}

/**

* @param args

* @throws TTransportException

*/

public static void main(String[] args) {

if (args.length != 1) {

System.out.println(“Usage: Benchmark <concurrent>”);

System.exit(1);

}

String ip = args[0];

String port = args[0];

Integer concurrent = Integer.valueOf(args[0]);

System.out.println(“ip = ” + ip + “,port = ” + port + “,concurrent = ” + concurrent);

ThreadGroup group = new ThreadGroup(“Benchmark”);

List<Thread> threads = new ArrayList<Thread>();

for (int i = 0; i < concurrent; i++) {

TDB db = new TDB();

//db.optimize();

if (!db.open(“./test” + i + “.tdb”, TDB.OCREAT | TDB.OWRITER)) {

int ecode = db.ecode();

System.err.println(“open error: ” + TDB.errmsg(ecode));

continue;

}

dbList.add(db);

}

for (int x = 0; x < concurrent; ++x) {

Thread t = new EchoThread(ip, port, x, group);

threads.add(t);

t.start();

}

}

}

对比上一次的代码,能够发现,1.new TDB的过程扔进了Thread.start之前;2.在thread中使用一个全局的变量来获取当前的对象。

启十个进程,全往第一个里写:

concurrent = 10

Rate: 25 req/s

Rate: 119617 req/s

Rate: 130620 req/s

Rate: 144202 req/s

Rate: 120458 req/s

Rate: 112809 req/s

Rate: 120800 req/s

Rate: 122290 req/s

Rate: 119526 req/s

Rate: 111189 req/s

Rate: 112483 req/s

Rate: 109138 req/s

Rate: 115648 req/s

Rate: 119419 req/s

Rate: 105558 req/s

Rate: 110230 req/s

Rate: 116507 req/s

Rate: 105367 req/s

Rate: 103781 req/s

Rate: 106618 req/s

Rate: 107698 req/s

Rate: 116768 req/s

Rate: 107244 req/s

保持在10w/s的写入速度,到达30s左右以后,数据急转直下:

Rate: 48060 req/s

Rate: 6901 req/s

Rate: 4987 req/s

Rate: 46229 req/s

Rate: 46686 req/s

Rate: 45402 req/s

Rate: 6271 req/s

Rate: 810 req/s

Rate: 33895 req/s

Rate: 46548 req/s

Rate: 47025 req/s

Rate: 6995 req/s

Rate: 860 req/s

这,就是tc的table表在写入一个ArrayList的真实速度(4核8G)。

官方发言的100W只需要0.4s,说的是写入的hash表,而且数据是纯线性的数字。

提升速度和稳定的办法,和张宴兄弟商量,b+tree类型的数据稳定一些,设置tctdbsetxmsiz也能解决燃眉之急。

TCHDB哈希数据库的优化 修改参数后,再测试:

写入100万条:tchtestwrite -xm 536870912 test.tch 1000000 5000000 时间: 0.580秒 速度:1724137条/秒

写入200万条:tchtestwrite -xm 536870912 test.tch 2000000 5000000 时间: 1.105秒 速度:1809954条/秒

写入500万条:tchtestwrite -xm 536870912 test.tch 5000000 5000000 时间: 2.737秒 速度:1826817条/秒

可见,性能提升了不少,随着写入数据量地增加,速度依旧不减.

TCHDB哈希数据库的优化

关键参数(C API):

booltchdbsetxmsiz(TCHDB *hdb, int64_t xmsiz);

Xmsiz指定了TCHDB的扩展MMAP内存大小,默认值为67108864,也就是64M,如果数据库文件超过64M,则只有前部分会映射在内存中,所以写入性能会下降。

其他参数(C API) :

booltchdbtune(TCHDB *hdb, int64_t bnum, int8_t apow, int8_t fpow, uint8_t opts);

bnum指定了bucket array的数量。推荐设置bnum为预计存储总记录数的0.5~4倍,使key的哈希分布更均匀,减少在bucket内二分查找的时间复杂度