hbase分页查询实现
admin
2023-02-08 03:40:04
0

Hbase本身是没有分页查询的,我在网上找了很多资料来实现一个分页功能,在这里做了一下记录,分享给大家,有什么不足之处,请尽管指出。废话不多说,看代码。

import java.io.IOException;

import java.util.LinkedHashMap;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

 

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HTableInterface;

import org.apache.hadoop.hbase.client.HTablePool;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;

import org.apache.hadoop.hbase.filter.Filter;

import org.apache.hadoop.hbase.filter.FilterList;

import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;

import org.apache.hadoop.hbase.util.Bytes;

 

publicclass HBaseUtils {

    privatestatic Configuration config = null;

    privatestatic HTablePool tp = null;

    static {

        // 加载集群配置

        config = HBaseConfiguration.create();

        config.set("hbase.zookeeper.quorum", "xx.xx.xx");

        config.set("hbase.zookeeper.property.clientPort", "2181");

        // 创建表池(可伟略提高查询性能,具体说明请百度或官方API)

        tp = new HTablePool(config, 10);

    }

 

    /*

     * 获取hbase的表

     */

    publicstatic HTableInterface getTable(StringtableName) {

 

        if (StringUtils.isEmpty(tableName))

            returnnull;

 

        returntp.getTable(getBytes(tableName));

    }

 

    /* 转换byte数组 */

    publicstaticbyte[] getBytes(String str) {

        if (str == null)

            str= "";

 

        return Bytes.toBytes(str);

    }

 

    /**

     * 查询数据

     * @param tableKey 表标识

     * @param queryKey 查询标识

     * @param startRow 开始行

     * @param paramsMap 参数集合

     * @return结果集

     */

    publicstatic TBData getDataMap(StringtableName, String startRow,

            StringstopRow, Integer currentPage, Integer pageSize)

            throws IOException {

        List>mapList = null;

        mapList = new LinkedList>();

 

        ResultScanner scanner = null;

        // 为分页创建的封装类对象,下面有给出具体属性

        TBData tbData = null;

        try {

            // 获取最大返回结果数量

            if (pageSize == null || pageSize == 0L)

                pageSize = 100;

 

            if (currentPage == null || currentPage == 0)

                currentPage = 1;

 

            // 计算起始页和结束页

            IntegerfirstPage = (currentPage - 1) * pageSize;

 

            IntegerendPage = firstPage + pageSize;

 

            // 从表池中取出HBASE表对象

            HTableInterfacetable = getTable(tableName);

            // 获取筛选对象

            Scanscan = getScan(startRow, stopRow);

            // 给筛选对象放入过滤器(true标识分页,具体方法在下面)

            scan.setFilter(packageFilters(true));

            // 缓存1000条数据

            scan.setCaching(1000);

            scan.setCacheBlocks(false);

            scanner= table.getScanner(scan);

            int i = 0;

            List<byte[]> rowList = new LinkedList<byte[]>();

            // 遍历扫描器对象, 并将需要查询出来的数据row key取出

            for (Result result : scanner) {

                String row = toStr(result.getRow());

                if (i >= firstPage && i< endPage) {

                    rowList.add(getBytes(row));

                }

                i++;

            }

 

            // 获取取出的row key的GET对象

            ListgetList = getList(rowList);

            Result[]results = table.get(getList);

            // 遍历结果

            for (Result result : results) {

                Map<byte[], byte[]> fmap = packFamilyMap(result);

                Map rmap = packRowMap(fmap);

                mapList.add(rmap);

            }

 

            // 封装分页对象

            tbData= new TBData();

            tbData.setCurrentPage(currentPage);

            tbData.setPageSize(pageSize);

            tbData.setTotalCount(i);

            tbData.setTotalPage(getTotalPage(pageSize, i));

            tbData.setResultList(mapList);

        } catch (IOException e) {

            e.printStackTrace();

        } finally {

            closeScanner(scanner);

        }

 

        return tbData;

    }

 

    privatestaticint getTotalPage(int pageSize, int totalCount) {

        int n = totalCount / pageSize;

        if (totalCount % pageSize == 0) {

            return n;

        } else {

            return ((int) n) + 1;

        }

    }

 

    // 获取扫描器对象

    privatestatic Scan getScan(String startRow,String stopRow) {

        Scan scan = new Scan();

        scan.setStartRow(getBytes(startRow));

        scan.setStopRow(getBytes(stopRow));

 

        return scan;

    }

 

    /**

     * 封装查询条件

     */

    privatestatic FilterList packageFilters(boolean isPage) {

        FilterList filterList = null;

        // MUST_PASS_ALL(条件 AND) MUST_PASS_ONE(条件OR)

        filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);

        Filter filter1 = null;

        Filter filter2 = null;

        filter1 = newFilter(getBytes("family1"), getBytes("column1"),

                CompareOp.EQUAL, getBytes("condition1"));

        filter2 = newFilter(getBytes("family2"), getBytes("column1"),

                CompareOp.LESS, getBytes("condition2"));

        filterList.addFilter(filter1);

        filterList.addFilter(filter2);

        if (isPage) {

            filterList.addFilter(new FirstKeyOnlyFilter());

        }

        return filterList;

    }

 

    privatestatic Filter newFilter(byte[] f, byte[] c, CompareOp op, byte[] v) {

        returnnew SingleColumnValueFilter(f, c, op,v);

    }

 

    privatestaticvoid closeScanner(ResultScannerscanner) {

        if (scanner != null)

            scanner.close();

    }

 

    /**

     * 封装每行数据

     */

    privatestatic MappackRowMap(Map<byte[], byte[]> dataMap) {

        Map map = new LinkedHashMap();

 

        for (byte[] key : dataMap.keySet()) {

 

            byte[] value = dataMap.get(key);

 

            map.put(toStr(key), toStr(value));

 

        }

        return map;

    }

 

    /* 根据ROW KEY集合获取GET对象集合 */

    privatestatic List getList(List<byte[]> rowList) {

        List list = new LinkedList();

        for (byte[] row : rowList) {

            Getget = new Get(row);

 

            get.addColumn(getBytes("family1"), getBytes("column1"));

            get.addColumn(getBytes("family1"), getBytes("column2"));

            get.addColumn(getBytes("family2"), getBytes("column1"));

            list.add(get);

        }

        return list;

    }

 

    /**

     * 封装配置的所有字段列族

     */

    privatestatic Map<byte[], byte[]> packFamilyMap(Result result){

        Map<byte[], byte[]> dataMap = null;

        dataMap = new LinkedHashMap<byte[], byte[]>();

        dataMap.putAll(result.getFamilyMap(getBytes("family1")));

        dataMap.putAll(result.getFamilyMap(getBytes("family2")));

        return dataMap;

    }

 

    privatestatic String toStr(byte[] bt) {

        return Bytes.toString(bt);

    }

 

    publicstaticvoid main(String[] args) throws IOException {

        // 拿出row key的起始行和结束行

        // #<0<9<:

        String startRow = "aaaa#";

        String stopRow = "aaaa:";

        int currentPage = 1;

        int pageSize = 20;

        // 执行hbase查询

        getDataMap("table", startRow, stopRow, currentPage,pageSize);

 

    }

}

 

class TBData {

    private Integer currentPage;

    private Integer pageSize;

    private Integer totalCount;

    private Integer totalPage;

    private List> resultList;

 

    public Integer getCurrentPage() {

        returncurrentPage;

    }

 

    publicvoid setCurrentPage(IntegercurrentPage) {

        this.currentPage = currentPage;

    }

 

    public Integer getPageSize() {

        returnpageSize;

    }

 

    publicvoid setPageSize(Integer pageSize) {

        this.pageSize = pageSize;

    }

 

    public Integer getTotalCount() {

        returntotalCount;

    }

 

    publicvoid setTotalCount(Integer totalCount){

        this.totalCount = totalCount;

    }

 

    public Integer getTotalPage() {

        returntotalPage;

    }

 

    publicvoid setTotalPage(Integer totalPage) {

        this.totalPage = totalPage;

    }

 

    public List> getResultList() {

        returnresultList;

    }

 

    publicvoidsetResultList(List> resultList) {

        this.resultList = resultList;

    }

}


相关内容

热门资讯

德国总理:美国正在被伊朗羞辱 德国之声4月27日报道,德国总理默茨在访问一所学校时表示,在当前的持续冲突中,伊朗领导层正试图羞辱美...
理响中国|“长”歌以行,风云激... 光阴如梭,东方潮阔。这里是中国的长三角,世界的长三角。无论过去、现在还是未来,这片土地都因时代而生,...
白宫:特朗普及其国安团队开会讨... 新华社华盛顿4月27日电 美国白宫新闻秘书莱维特27日在记者会上证实,总统特朗普及其国家安全团队当天...
人民日报刊文:日本放开杀伤性武... 日本放开杀伤性武器出口推高地缘冲突风险(国际论坛)常思纯《人民日报》(2026年04月28日 第 0...
医疗保障法草案二审:明确生育保... 满足多样化健康保障需求本报记者 彭 波4月27日,医疗保障法草案二审稿提请十四届全国人大常委会第二十...
天津一景区发生自转旋翼机事故1... 澎湃新闻记者 吕新文中国民用航空华北地区管理局4月22日公布《豪客通航“10•1”天津长芦汉盐旅游区...
卡塔尔埃米尔与美国总统特朗普通... 当地时间24日,卡塔尔埃米尔塔米姆与美国总统特朗普通电话,重点就中东地区局势以及伊朗与美国谈判问题交...
男子30年前被扣押2859克黄... 澎湃新闻记者 王鑫家住辽宁省大连市的潘永嘉近日向澎湃新闻反映称,三十年前,他在大连周水子机场被盖州市...
商务部:取消反制欧盟两家金融机... 中华人民共和国商务部令二〇二六年 第1号鉴于欧盟已取消对中国两家金融机构的制裁措施,现公布《关于取消...
过去24小时共有5艘船只通过霍... 总台记者当地时间24日获悉,过去24小时内,共有5艘船只通过霍尔木兹海峡,其中包括一艘伊朗油轮。(总...