• 沒有找到結果。

6.4 HBase Elasticsearch schema 说明

6.6.3 创建数据表开启 Elasticsearch 索引

注意事项

● 注[1] 如果配置文件目录conf已经加入classpath路径中,那么后面的加载指定配 置文件的代码可以不执行。

6.6.3 创建数据表开启 Elasticsearch 索引

功能简介

建表功能同创建表,在此基础上,表属性配置Elasticsearch中的索引开启的字段,请参 见HBase Elasticsearch schema说明。

样例代码

public void createTable() {

LOG.info("Entering testCreateTable.");

// Specify the table descriptor.

HTableDescriptor htd = new HTableDescriptor(tableName);

// Set the column family name to info.

HColumnDescriptor hcd = new HColumnDescriptor(CF1);

// Set data encoding methods. HBase provides DIFF,FAST_DIFF,PREFIX // and PREFIX_TREE

hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);

// Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY

// GZ has the highest compression rate,but low compression and // decompression efficiency,fit for cold data

// SNAPPY has low compression rate, but high compression and // decompression efficiency,fit for hot data.

// it is advised to use SANPPY

hcd.setCompressionType(Compression.Algorithm.SNAPPY);

HColumnDescriptor hcd2 = new HColumnDescriptor(CF2);

hcd2.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);

hcd2.setCompressionType(Compression.Algorithm.SNAPPY);

htd.addFamily(hcd);

htd.addFamily(hcd2);

//add HBase ES schema String ESJsonSchema = "[" +

"{\"name\":\"contentCh\",\"type\":\"text\",\"hbaseQualifier\":\"cf1:contentCh\",\"analyzer\":\"ik_smart

\"}," +

"{\"name\":\"contentEng\",\"type\":\"text\",\"hbaseQualifier\":\"cf2:contentEng\"}," + "{\"name\":\"id\",\"type\":\"long\",\"hbaseQualifier\":\"cf1:id\"}," +

"{\"name\":\"charNum\",\"type\":\"integer\",\"hbaseQualifier\":\"cf1:charNum\"}," + "{\"name\":\"pageNum\",\"type\":\"short\",\"hbaseQualifier\":\"cf1:pageNum\"}," + "{\"name\":\"level\",\"type\":\"byte\",\"hbaseQualifier\":\"cf1:level\"}," +

"{\"name\":\"researchCost\",\"type\":\"double\",\"hbaseQualifier\":\"cf1:researchCost\"}," + "{\"name\":\"score\",\"type\":\"float\",\"hbaseQualifier\":\"cf1:score\"}," +

"{\"name\":\"male\",\"type\":\"boolean\",\"hbaseQualifier\":\"cf1:male\"}" + "]";

htd.setValue(HBaseESConst.HBASE_INDEX_ES_ENABLED, "true");

htd.setValue(HBaseESConst.HBASE_INDEX_ES_ENDPOINT, ESClusterHosts);//(1) htd.setValue(HBaseESConst.HBASE_INDEX_ES_INDEXNAME, ES_INDEX_NAME);//(2)

htd.setValue(HBaseESConst.HBASE_INDEX_ES_SCHEMA, ESJsonSchema);//(3) Admin admin = null;

try {

// Instantiate an Admin object.

admin = conn.getAdmin();

if (!admin.tableExists(tableName)) { LOG.info("Creating table...");

admin.createTable(htd);

LOG.info(admin.getClusterStatus());

LOG.info(admin.listNamespaceDescriptors());

LOG.info("Table created successfully.");

} else {

LOG.warn("table already exists");

}

} catch (IOException e) {

LOG.error("Create table failed.", e);

} finally {

LOG.info("Exiting testCreateTable.");

}

样例代码如下,请将代码中的字符串“***replace_with_Chinese_characters***”替换 为中文字符。

public void putData() { LOG.info("Entering testPut.");

Table table = null;

try {

// Instantiate an HTable object.

table = conn.getTable(tableName);

List<Put> puts = new ArrayList<Put>();

// Instantiate a Put object.

Put put = new Put(Bytes.toBytes("rowkey001"));

put.addColumn(CF1, QUA_ARTICLE_CONTENT_CHINESE, Bytes.toBytes("***replace_with_Chinese_characters***"));

put.addColumn(CF2, QUA_ARTICLE_CONTENT_English, Bytes.toBytes("how many apples in the

market"));

put.addColumn(CF1, QUA_ARTICLE_ID, Bytes.toBytes(1111l));

put.addColumn(CF1, QUA_CHARACTER_NUM, Bytes.toBytes(1));

short shortNum = 1;

put.addColumn(CF1, QUA_PAGE_NUM, Bytes.toBytes(shortNum));

char c = 'A';

put.addColumn(CF1, QUA_ARTICLE_LEVEL, new byte[]{(byte)c});

put.addColumn(CF1, QUA_RESEARCH_COST, Bytes.toBytes(111.11d));

put.addColumn(CF1, QUA_ARTICLE_SCORE, Bytes.toBytes(80.5f));

put.addColumn(CF1, QUA_AUTHOR_MALE, Bytes.toBytes(true));

put.addColumn(CF1, QUA_WHATEVER, Bytes.toBytes("happy life and sweet love"));

puts.add(put);

put = new Put(Bytes.toBytes("rowkey002"));

put.addColumn(CF1, QUA_ARTICLE_CONTENT_CHINESE, Bytes.toBytes("***replace_with_Chinese_characters***"));

put.addColumn(CF2, QUA_ARTICLE_CONTENT_English, Bytes.toBytes("how many people in the swimming pool"));

put.addColumn(CF1, QUA_ARTICLE_ID, Bytes.toBytes(2222l));

put.addColumn(CF1, QUA_CHARACTER_NUM, Bytes.toBytes(2));

shortNum = 2;

put.addColumn(CF1, QUA_PAGE_NUM, Bytes.toBytes(shortNum));

c = 'B';

put.addColumn(CF1, QUA_ARTICLE_LEVEL, new byte[]{(byte)c});

put.addColumn(CF1, QUA_RESEARCH_COST, Bytes.toBytes(222.22d));

put.addColumn(CF1, QUA_ARTICLE_SCORE, Bytes.toBytes(170.5f));

put.addColumn(CF1, QUA_AUTHOR_MALE, Bytes.toBytes(true));

put.addColumn(CF1, QUA_WHATEVER, Bytes.toBytes("forever young"));

puts.add(put);

put = new Put(Bytes.toBytes("rowkey003"));

put.addColumn(CF1, QUA_ARTICLE_CONTENT_CHINESE, Bytes.toBytes("***replace_with_Chinese_characters***"));

put.addColumn(CF2, QUA_ARTICLE_CONTENT_English, Bytes.toBytes("we play video game in night"));

put.addColumn(CF1, QUA_ARTICLE_ID, Bytes.toBytes(3333l));

put.addColumn(CF1, QUA_CHARACTER_NUM, Bytes.toBytes(3));

shortNum = 3;

put.addColumn(CF1, QUA_PAGE_NUM, Bytes.toBytes(shortNum));

c = 'C';

put.addColumn(CF1, QUA_ARTICLE_LEVEL, new byte[]{(byte)c});

put.addColumn(CF1, QUA_RESEARCH_COST, Bytes.toBytes(333.33d));

put.addColumn(CF1, QUA_ARTICLE_SCORE, Bytes.toBytes(180.5f));

put.addColumn(CF1, QUA_AUTHOR_MALE, Bytes.toBytes(false));

put.addColumn(CF1, QUA_WHATEVER, Bytes.toBytes("wishes always with you"));

puts.add(put);

// Submit a put request.

try {

table.put(puts);

}

// if your put operation does not contain the all field in your schema, you will get ESDocIndexException.

// ESDocIndexException means data/document indexing to ES failed, but remember data put in HBase success.

// so here you handle this exception depends on your business( you can retry or ignore).

catch (ESDocIndexException e) { //TODO

}

LOG.info("Put successfully.");

} catch (IOException e) {

} }

LOG.info("Exiting testPut.");

}

conf = HBaseConfiguration.create();

conf.set(HConnection.HBASE_CLIENT_CONNECTION_IMPL,

"org.apache.hadoop.hbase.client.LemonConnectionImplementation");

Connection conn = ConnectionFactory.createConnection(conf);

2. 全文检索条件通过ESColumnValueFilter的httpParameters或reqBodyJson传入,

参数格式和开源Elasticsearch的官方文档一致,请分别参见URI Search和

Request Body Search。

如下样例代码通过httpParameters来查询(请将代码中的字符串

“***replace_with_Chinese_keywords_01***”替换为中文检索词):

public void testScanDataWithES1() { LOG.info("Entering testScanDataWithES1.");

Table table = null;

// Instantiate a ResultScanner object.

ResultScanner rScanner = null;

try {

// Create the Configuration instance.

table = conn.getTable(tableName);

assert table instanceof LemonWrapperHTable;

// set specified qualifier.

Scan scan = new Scan();

scan.addColumn(CF1, QUA_ARTICLE_CONTENT_CHINESE);

scan.addColumn(CF2, QUA_ARTICLE_CONTENT_English);

scan.addColumn(CF1, QUA_ARTICLE_ID);

// Set the cache size.

scan.setCaching(1000);

// with special filter

Map<String, String> httpParameters = new HashMap<>();

httpParameters.put("q", "contentCh:***replace_with_Chinese_keywords_01***");

List<String> indexNames = new ArrayList();

indexNames.add(ES_INDEX_NAME);

ESColumnValueFilter filter = new ESColumnValueFilter(indexNames, httpParameters, "");

scan.setFilter(filter);

// Submit a scan request.

rScanner = table.getScanner(scan);

// Print query results.

for (Result r = rScanner.next(); r != null; r = rScanner.next()) {

for (Cell cell : r.rawCells()) {

LOG.info("testScanDataWithES1 by text type condition, scan result:" + Bytes.toString(CellUtil.cloneRow(cell)) + ":"

+ Bytes.toString(CellUtil.cloneFamily(cell)) + ","

+ Bytes.toString(CellUtil.cloneQualifier(cell)) + ","

+ convertCellValue(Bytes.toString(CellUtil.cloneQualifier(cell)), CellUtil.cloneValue(cell)));

} }

LOG.info("Scan data successfully.");

} catch (IOException e) {

LOG.info("Exiting testScanDataWithES1.");

}

如下样例代码通过reqBodyJson来查询(请将代码中的字符串

“***replace_with_Chinese_keywords_02***”替换为中文检索词):

public void testScanDataWithES2() { LOG.info("Entering testScanDataWithES2.");

Table table = null;

// Instantiate a ResultScanner object.

ResultScanner rScanner = null;

try {

// Create the Configuration instance.

table = conn.getTable(tableName);

assert table instanceof LemonWrapperHTable;

// set specified qualifier.

Scan scan = new Scan();

scan.addColumn(CF1, QUA_ARTICLE_CONTENT_CHINESE);

scan.addColumn(CF2, QUA_ARTICLE_CONTENT_English);

scan.addColumn(CF1, QUA_ARTICLE_ID);

// Set the cache size.

scan.setCaching(1000);

// with special filter

Map<String, String> httpParameters = Collections.emptyMap();

List<String> indexNames = new ArrayList();

indexNames.add(ES_INDEX_NAME);

String reqBodyJson =

ESColumnValueFilter filter = new ESColumnValueFilter(indexNames, httpParameters, reqBodyJson);

scan.setFilter(filter);

// Submit a scan request.

rScanner = table.getScanner(scan);

// Print query results.

for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) {

LOG.info("testScanDataWithES2 by text type condition, scan result:" + Bytes.toString(CellUtil.cloneRow(cell)) + ":"

+ Bytes.toString(CellUtil.cloneFamily(cell)) + ","

+ Bytes.toString(CellUtil.cloneQualifier(cell)) + ","

+ convertCellValue(Bytes.toString(CellUtil.cloneQualifier(cell)), CellUtil.cloneValue(cell)));

} }

LOG.info("Scan data successfully.");

} catch (IOException e) {

LOG.error("Scan data failed ", e);

} finally {

if (rScanner != null) { // Close the scanner object.

rScanner.close();

}

if (table != null) { try {

// Close the HTable object.

table.close();

} catch (IOException e) {

LOG.error("Close table failed ", e);

} } }

LOG.info("Exiting testScanDataWithES2.");

}

7 调测程序

相關文件