本例子实现从hbase获取数据,并发送kafka。

使用


#!/usr/bin/env python
#coding=utf-8
 
import sys
import time
import json

sys.path.append('/usr/local/lib/python3.5/site-packages')
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase1 import Hbase  #调用hbase thrif1
from hbase1.ttypes import *
from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka.errors import KafkaError
import unittest

class HbaseOpreator:
    def __init__(self,host,port,table='test'):
        self.tableName=table
        self.transport=TTransport.TBufferedTransport(TSocket.TSocket(host,port))
        self.protocol=TBinaryProtocol.TBinaryProtocol(self.transport)
        self.client=Hbase.Client(self.protocol)
        self.transport.open()

    def __del__(self):
        self.transport.close()
    

    def scanTablefilter(self,table,*args):
        d=dict()    
        L=[]
        try:
            tableName=table
            # scan = Hbase.TScan(startRow, stopRow)
            scan=TScan()
            #主键首字母123
            # filter  = "PrefixFilter('123_')"
            # filter = "RowFilter(=,'regexstring:.aaa')"
            #过滤条件,当前为 statis_date 字段,值为20170223
            # fitler = "SingleColumnValueFilter(tableName,'f','statis_date','20170223')"
            # filter="SingleColumnValueFilter('f','statis_date',=,'binary:20170223') AND SingleColumnValueFilter('f','name',=,'binary:LXS')"
            filter="SingleColumnValueFilter('info','name',=,'binary:lilei') OR SingleColumnValueFilter('info','name',=,'binary:lily')"
            scan.filterString=filter
            id=self.client.scannerOpenWithScan(tableName,scan,None)
            result=self.client.scannerGet(id)
            # result=self.client.scannerGetList(id,100)
            while result:
                for r in result:
                    key=r.row
                    name=r.columns.get('info:name').value
                    age=r.columns.get('info:age').value
                    phone=r.columns.get('info:phone').value
                    d['key']=key
                    d['name']=name
                    d['age']=age
                    d['phone']=phone
                    # encode_result_json=json.dumps(d).encode(encoding="utf-8")
                    # print(encode_result_json)
                    L.append(d)                                    
                result=self.client.scannerGet(id)                
            return    json.dumps(L).encode(encoding="utf-8")        
        finally:
            # self.client.scannerClose(scan)
            print("scan finish")

def sendKfafkaProduct(data):
    # self.host_port='localhost:9092'
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    for d in data:
        producer.send('test', key=b'lxs', value=d)
        time.sleep(5)
        print(d)
    
    while True:
        producer.send('test', key=b'lxs', value=data)
        time.sleep(5)
        print(data)

if __name__== '__main__':
    # unittest.main()
    
    B=HbaseOpreator('10.27.1.138',9090)
    value=B.scanTablefilter('ns_lbi:test_hbase_student')
    print(value)
    #sendKfafkaProduct(value)




Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐