6.4 HBase Elasticsearch schema 说明
6.6.3 创建数据表开启 Elasticsearch 索引
注意事项
● 注[1] 如果配置文件目录conf已经加入classpath路径中,那么后面的加载指定配 置文件的代码可以不执行。
6.6.3 创建数据表开启 Elasticsearch 索引
功能简介
建表功能同创建表,在此基础上,表属性配置Elasticsearch中的索引开启的字段,请参 见HBase Elasticsearch schema说明。
样例代码
public void createTable() {
LOG.info("Entering testCreateTable.");
// Specify the table descriptor.
HTableDescriptor htd = new HTableDescriptor(tableName);
// Set the column family name to info.
HColumnDescriptor hcd = new HColumnDescriptor(CF1);
// Set data encoding methods. HBase provides DIFF,FAST_DIFF,PREFIX // and PREFIX_TREE
hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
// Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY
// GZ has the highest compression rate,but low compression and // decompression efficiency,fit for cold data
// SNAPPY has low compression rate, but high compression and // decompression efficiency,fit for hot data.
// it is advised to use SANPPY
hcd.setCompressionType(Compression.Algorithm.SNAPPY);
HColumnDescriptor hcd2 = new HColumnDescriptor(CF2);
hcd2.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
hcd2.setCompressionType(Compression.Algorithm.SNAPPY);
htd.addFamily(hcd);
htd.addFamily(hcd2);
//add HBase ES schema String ESJsonSchema = "[" +
"{\"name\":\"contentCh\",\"type\":\"text\",\"hbaseQualifier\":\"cf1:contentCh\",\"analyzer\":\"ik_smart
\"}," +
"{\"name\":\"contentEng\",\"type\":\"text\",\"hbaseQualifier\":\"cf2:contentEng\"}," + "{\"name\":\"id\",\"type\":\"long\",\"hbaseQualifier\":\"cf1:id\"}," +
"{\"name\":\"charNum\",\"type\":\"integer\",\"hbaseQualifier\":\"cf1:charNum\"}," + "{\"name\":\"pageNum\",\"type\":\"short\",\"hbaseQualifier\":\"cf1:pageNum\"}," + "{\"name\":\"level\",\"type\":\"byte\",\"hbaseQualifier\":\"cf1:level\"}," +
"{\"name\":\"researchCost\",\"type\":\"double\",\"hbaseQualifier\":\"cf1:researchCost\"}," + "{\"name\":\"score\",\"type\":\"float\",\"hbaseQualifier\":\"cf1:score\"}," +
"{\"name\":\"male\",\"type\":\"boolean\",\"hbaseQualifier\":\"cf1:male\"}" + "]";
htd.setValue(HBaseESConst.HBASE_INDEX_ES_ENABLED, "true");
htd.setValue(HBaseESConst.HBASE_INDEX_ES_ENDPOINT, ESClusterHosts);//(1) htd.setValue(HBaseESConst.HBASE_INDEX_ES_INDEXNAME, ES_INDEX_NAME);//(2)
htd.setValue(HBaseESConst.HBASE_INDEX_ES_SCHEMA, ESJsonSchema);//(3) Admin admin = null;
try {
// Instantiate an Admin object.
admin = conn.getAdmin();
if (!admin.tableExists(tableName)) { LOG.info("Creating table...");
admin.createTable(htd);
LOG.info(admin.getClusterStatus());
LOG.info(admin.listNamespaceDescriptors());
LOG.info("Table created successfully.");
} else {
LOG.warn("table already exists");
}
} catch (IOException e) {
LOG.error("Create table failed.", e);
} finally {
LOG.info("Exiting testCreateTable.");
}
样例代码如下,请将代码中的字符串“***replace_with_Chinese_characters***”替换 为中文字符。
public void putData() { LOG.info("Entering testPut.");
Table table = null;
try {
// Instantiate an HTable object.
table = conn.getTable(tableName);
List<Put> puts = new ArrayList<Put>();
// Instantiate a Put object.
Put put = new Put(Bytes.toBytes("rowkey001"));
put.addColumn(CF1, QUA_ARTICLE_CONTENT_CHINESE, Bytes.toBytes("***replace_with_Chinese_characters***"));
put.addColumn(CF2, QUA_ARTICLE_CONTENT_English, Bytes.toBytes("how many apples in the
market"));
put.addColumn(CF1, QUA_ARTICLE_ID, Bytes.toBytes(1111l));
put.addColumn(CF1, QUA_CHARACTER_NUM, Bytes.toBytes(1));
short shortNum = 1;
put.addColumn(CF1, QUA_PAGE_NUM, Bytes.toBytes(shortNum));
char c = 'A';
put.addColumn(CF1, QUA_ARTICLE_LEVEL, new byte[]{(byte)c});
put.addColumn(CF1, QUA_RESEARCH_COST, Bytes.toBytes(111.11d));
put.addColumn(CF1, QUA_ARTICLE_SCORE, Bytes.toBytes(80.5f));
put.addColumn(CF1, QUA_AUTHOR_MALE, Bytes.toBytes(true));
put.addColumn(CF1, QUA_WHATEVER, Bytes.toBytes("happy life and sweet love"));
puts.add(put);
put = new Put(Bytes.toBytes("rowkey002"));
put.addColumn(CF1, QUA_ARTICLE_CONTENT_CHINESE, Bytes.toBytes("***replace_with_Chinese_characters***"));
put.addColumn(CF2, QUA_ARTICLE_CONTENT_English, Bytes.toBytes("how many people in the swimming pool"));
put.addColumn(CF1, QUA_ARTICLE_ID, Bytes.toBytes(2222l));
put.addColumn(CF1, QUA_CHARACTER_NUM, Bytes.toBytes(2));
shortNum = 2;
put.addColumn(CF1, QUA_PAGE_NUM, Bytes.toBytes(shortNum));
c = 'B';
put.addColumn(CF1, QUA_ARTICLE_LEVEL, new byte[]{(byte)c});
put.addColumn(CF1, QUA_RESEARCH_COST, Bytes.toBytes(222.22d));
put.addColumn(CF1, QUA_ARTICLE_SCORE, Bytes.toBytes(170.5f));
put.addColumn(CF1, QUA_AUTHOR_MALE, Bytes.toBytes(true));
put.addColumn(CF1, QUA_WHATEVER, Bytes.toBytes("forever young"));
puts.add(put);
put = new Put(Bytes.toBytes("rowkey003"));
put.addColumn(CF1, QUA_ARTICLE_CONTENT_CHINESE, Bytes.toBytes("***replace_with_Chinese_characters***"));
put.addColumn(CF2, QUA_ARTICLE_CONTENT_English, Bytes.toBytes("we play video game in night"));
put.addColumn(CF1, QUA_ARTICLE_ID, Bytes.toBytes(3333l));
put.addColumn(CF1, QUA_CHARACTER_NUM, Bytes.toBytes(3));
shortNum = 3;
put.addColumn(CF1, QUA_PAGE_NUM, Bytes.toBytes(shortNum));
c = 'C';
put.addColumn(CF1, QUA_ARTICLE_LEVEL, new byte[]{(byte)c});
put.addColumn(CF1, QUA_RESEARCH_COST, Bytes.toBytes(333.33d));
put.addColumn(CF1, QUA_ARTICLE_SCORE, Bytes.toBytes(180.5f));
put.addColumn(CF1, QUA_AUTHOR_MALE, Bytes.toBytes(false));
put.addColumn(CF1, QUA_WHATEVER, Bytes.toBytes("wishes always with you"));
puts.add(put);
// Submit a put request.
try {
table.put(puts);
}
// if your put operation does not contain the all field in your schema, you will get ESDocIndexException.
// ESDocIndexException means data/document indexing to ES failed, but remember data put in HBase success.
// so here you handle this exception depends on your business( you can retry or ignore).
catch (ESDocIndexException e) { //TODO
}
LOG.info("Put successfully.");
} catch (IOException e) {
} }
LOG.info("Exiting testPut.");
}
conf = HBaseConfiguration.create();
conf.set(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
"org.apache.hadoop.hbase.client.LemonConnectionImplementation");
Connection conn = ConnectionFactory.createConnection(conf);
2. 全文检索条件通过ESColumnValueFilter的httpParameters或reqBodyJson传入,
参数格式和开源Elasticsearch的官方文档一致,请分别参见URI Search和
Request Body Search。
如下样例代码通过httpParameters来查询(请将代码中的字符串
“***replace_with_Chinese_keywords_01***”替换为中文检索词):
public void testScanDataWithES1() { LOG.info("Entering testScanDataWithES1.");
Table table = null;
// Instantiate a ResultScanner object.
ResultScanner rScanner = null;
try {
// Create the Configuration instance.
table = conn.getTable(tableName);
assert table instanceof LemonWrapperHTable;
// set specified qualifier.
Scan scan = new Scan();
scan.addColumn(CF1, QUA_ARTICLE_CONTENT_CHINESE);
scan.addColumn(CF2, QUA_ARTICLE_CONTENT_English);
scan.addColumn(CF1, QUA_ARTICLE_ID);
// Set the cache size.
scan.setCaching(1000);
// with special filter
Map<String, String> httpParameters = new HashMap<>();
httpParameters.put("q", "contentCh:***replace_with_Chinese_keywords_01***");
List<String> indexNames = new ArrayList();
indexNames.add(ES_INDEX_NAME);
ESColumnValueFilter filter = new ESColumnValueFilter(indexNames, httpParameters, "");
scan.setFilter(filter);
// Submit a scan request.
rScanner = table.getScanner(scan);
// Print query results.
for (Result r = rScanner.next(); r != null; r = rScanner.next()) {
for (Cell cell : r.rawCells()) {
LOG.info("testScanDataWithES1 by text type condition, scan result:" + Bytes.toString(CellUtil.cloneRow(cell)) + ":"
+ Bytes.toString(CellUtil.cloneFamily(cell)) + ","
+ Bytes.toString(CellUtil.cloneQualifier(cell)) + ","
+ convertCellValue(Bytes.toString(CellUtil.cloneQualifier(cell)), CellUtil.cloneValue(cell)));
} }
LOG.info("Scan data successfully.");
} catch (IOException e) {
LOG.info("Exiting testScanDataWithES1.");
}
如下样例代码通过reqBodyJson来查询(请将代码中的字符串
“***replace_with_Chinese_keywords_02***”替换为中文检索词):
public void testScanDataWithES2() { LOG.info("Entering testScanDataWithES2.");
Table table = null;
// Instantiate a ResultScanner object.
ResultScanner rScanner = null;
try {
// Create the Configuration instance.
table = conn.getTable(tableName);
assert table instanceof LemonWrapperHTable;
// set specified qualifier.
Scan scan = new Scan();
scan.addColumn(CF1, QUA_ARTICLE_CONTENT_CHINESE);
scan.addColumn(CF2, QUA_ARTICLE_CONTENT_English);
scan.addColumn(CF1, QUA_ARTICLE_ID);
// Set the cache size.
scan.setCaching(1000);
// with special filter
Map<String, String> httpParameters = Collections.emptyMap();
List<String> indexNames = new ArrayList();
indexNames.add(ES_INDEX_NAME);
String reqBodyJson =
ESColumnValueFilter filter = new ESColumnValueFilter(indexNames, httpParameters, reqBodyJson);
scan.setFilter(filter);
// Submit a scan request.
rScanner = table.getScanner(scan);
// Print query results.
for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) {
LOG.info("testScanDataWithES2 by text type condition, scan result:" + Bytes.toString(CellUtil.cloneRow(cell)) + ":"
+ Bytes.toString(CellUtil.cloneFamily(cell)) + ","
+ Bytes.toString(CellUtil.cloneQualifier(cell)) + ","
+ convertCellValue(Bytes.toString(CellUtil.cloneQualifier(cell)), CellUtil.cloneValue(cell)));
} }
LOG.info("Scan data successfully.");
} catch (IOException e) {
LOG.error("Scan data failed ", e);
} finally {
if (rScanner != null) { // Close the scanner object.
rScanner.close();
}
if (table != null) { try {
// Close the HTable object.
table.close();
} catch (IOException e) {
LOG.error("Close table failed ", e);
} } }
LOG.info("Exiting testScanDataWithES2.");
}