1 Star 0 Fork 0

py-bigdata / py-bigdata-util

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

Bidata Utility

Install

pip install py-bigdate-util --upgrade

Usage

Config

读本地配置文件load配置


#!/usr/bin/env python3
# -*- coding=utf-8 -*-
import unittest

from bigdata_util.util import get_absolute_path
from bigdata_util.util import BaseConfig, Singleton

'''
# config.test_conf
a = a
b = 1
c = c
'''

class MyConfig(BaseConfig, metaclass=Singleton):
    def __init__(self, cfg: dict = None):
        """
        :type cfg: object
        """
        super(MyConfig, self).__init__(
            cfg,
            # config.conf 符合 ConfigObj 格式
            # https://configobj.readthedocs.io/en/latest/configobj.html#the-config-file-format
            get_absolute_path(__file__, '../config.test_conf')
        )
        pass


class ConfigTest(unittest.TestCase):

    def test_config(self):
        config = {
            'aa': 'aa',
            'bb': 2,
            'c': 'cc'
        }
        c = MyConfig(cfg=config)
        self.assertEqual(c.get('a', 'default'), 'a')
        self.assertEqual(c.get('aa', 'default'), 'aa')
        self.assertEqual(c.get('b', 'default'), 1)
        self.assertEqual(c.get('bb', 'default'), 2)
        self.assertEqual(c.get('c', 'default'), 'cc')
        pass

Connectors

MaxcomputeConnector功能点

  • odps_ins = MaxcomputeConnector(cfg.get('connector.odps')) 连接odps,可配置专网logview地址,同时支持http、socks代理配置
  • odps_ins.get_table_data 拉取表数据,并缓存在本目录 .connector_maxcompute_cache 中,监控last_modified_time决定是否更新
  • odps_ins.update_mr_jar 自动从本地java工程目录拿到jar包并上传到odps作为资源,返回资源名
  • odps_ins.update_udf 更新udf
  • odps_ins.run_sql_with_logview_return_plain_json 运行sql,打印logview,并返回结果
  • odps_ins.run_sql_in_file 运行.sql文件,支持文件中传入 ${变量} 或 ${类实例.属性}
  • odps_ins.create_ots_external_table 创建ots外表

odps和postgres配合使用

from bigdata_util.connector import MaxcomputeConnector, PostGreConnector
from .my_config import MyConfig

cfg = MyConfig()
odps = MaxcomputeConnector(cfg.get('connector.odps'))
postgre = PostGreConnector(cfg.get('connector.postgres'))

# fetch and cache data.
data_list = odps.get_table_data('dual_point', partition='dt=1', ignore_cache=True)
# 多次次读数据时,基于远程表的last_modified_time决定是否重新拉取
data_list = odps.get_table_data('dual_point', partition='dt=1', ignore_cache=True)
postgre.save_geometry_table('dual_point', data_list, col_name_wkt='wkt', geometry_type='point')

ots使用

from bigdata_util.connector import TableStoreConnector

ots_cli = TableStoreConnector(cfg.get('connector.ots'))
consumed, return_row, next_token = ots_cli.get_row('dual', [('id', 1)])

运行java mr

已在java代码中写好mr逻辑,然后在python中调用

define

#!/usr/bin/env python3
# -*- coding=utf-8 -*-

from bigdata_util.util import get_logger, get_absolute_path
from bigdata_util.connector import MaxComputeConnector
from .my_config import CustomConfig
from bigdata_util.mr_launcher import MapReduceLauncher
from typing import List


logger = get_logger(__file__)
cfg = CustomConfig()
odps_cst = MaxComputeConnector(cfg.get('connector.odps_cst'))


class CalculateDistanceOfPools(MapReduceLauncher):

    def init_mr_jar_base_path(self, mr_jar_base_path: str = None):
        return cfg.get('mr_jar_base_path')

    def init_project_base_path(self, mr_jar_base_path: str = None):
        return cfg.get('project_base_path')

    @staticmethod
    def init_odps_conf_file_name():
        return 'odps_cst_conf.ini'

    def init_mapper_class(self, mapper_class: str = None):
        return 'com.aliyun.citybrain.traffic.commonmapper.RawMapper'

    def init_reducer_class(self, reducer_class: str = None):
        return 'com.aliyun.citybrain.traffic.pooldistance.Reducer'

    def init_maxcompute_ins(self, maxcompute_ins: MaxComputeConnector = None):
        return odps_cst

    def init_mr_jar_name(self, mr_jar_name: str = None):
        return 'traffic-algo-mg-1.0-SNAPSHOT-jar-with-dependencies.jar'


    def init_system_parameters(self, system_parameters: dict = None):
        """
        只控制 下面这三个参数,默认值如下
          -splitSize 32
          -reduceCnt 100
          -reduceMem 4096
        :param system_parameters: {'splitSize': '16', 'resuceCnt': '900', 'reduceMem': 4096}
        :return: 可以为空
        """
        return {
            'reduceCnt': 900
        }

    def init_ddl_file_path(self, ddl_file_path: str = None):
        return os.path.abspath(os.path.join(
            os.path.split(os.path.realpath(__file__))[0],
            'step_3_pool_clustering_euclidean_distance_between_pool_mr.osql'
        ))

    def run(self):
        self.launch()


if __name__ == '__main__':
    CalculateDistanceOfPools().run()

use

        from . import CalculateDistanceOfPools
        mr = CalculateDistanceOfPools()
        mr.set_init_parameters({
            'mapper.key': 'pool_id:string',
            'mapper.value': 'pool_id:string,sub_polygon:string',
            'geometry_col_name': 'sub_polygon',
            'geometry_gen_type': 'wkt',
            'input_table': TableInfo('algtmp_tfc_vhcpool_info', partition=cfg.COMMON_PARTITION_WITH_DT),
            'output_table': TableInfo('algtmp_rltn_euclidean_distance_pool_pool', partition=cfg.COMMON_PARTITION_WITH_DT),
            'overlap_length': 1,       # 至少50米交接
            'overlap_ratio': 0.1,
            'threshold.MAX_DISTANCE': 0
        })
        mr.launch()

# 打印mr运行的logview...

Plot


from . import MyConfig
from bigdata_util.plot import PlotLine
from bigdata_util.connector import MaxcomputeConnector

cfg = MyConfig()
plot_line = PlotLine(MaxcomputeConnector(cfg.get('connector.odps')))
plot_line.plot_line('''
    select
      stat_time x,
      value y,
      name label
    from dual
''')

fig

connectors依赖

odps

pip install pyodps==0.9.1

hive

pip install impyla==0.16.3
pip install thrift_sasl==0.4.2 (mac安装请参考: https://github.com/albin3/book-notes/issues/2)

hive kerberos

安装依赖:

mac: brew install krb5 linux: apt install -y krb5-user

pip install bit_array
pip install thrift
pip install thrift_sasl
pip install impyla
pip install krbcontext
pip install hdfs[kerberos] -i https://mirrors.aliyun.com/pypi/simple
pip install pykerberos -i https://mirrors.aliyun.com/pypi/simple

kafka

pip install kafka-python==2.0.1

pg with proxy

pip install asyncpg

usage:

from bigdata_util.connector.postgre_async import PostgreAsyncConnector

table_name = 'algtmp_connector_test_pg'
pg_conn = PostgreAsyncConnector(
    'postgresql://<user_name>:<passwd>@<host_name>/<data_base>',
    'socks5://<host_name>:<port>'
)
pg_conn.execute_sql(f'''
    create table if not exists {table_name} (
        a varchar,
        b varchar,
        c varchar,
        d varchar,
        PRIMARY KEY (a, b, c, d)
    )
''')
pg_conn.save_data(table_name, [
    {'a': 'a1', 'b': 'b1', 'c': 'c2', 'd': 'd1'}
])
data_list = pg_conn.run_sql_return_plain_json(f'''
    select * from {table_name}
''')
pg_conn.drop_table(table_name)

mysql with proxy

暂时只支持查询,不支持写数据

pip install PyMySQL

usage:

from bigdata_util.connector.mysql import MysqlConnector

mysql_conn = MysqlConnector(
    host='localhost',
    port=3306,
    user='root',
    password='password',
    database='default',
    proxy='socks5://<host_name>:<port>',
)
data_list = mysql_conn.run_sql_return_plain_json('''
    show tables;
''')

install in p36

安装前可能需要先手动安装 numpy pandas shapely

> pip install numpy pandas shapely
> pip install py-bigdate-util

空文件

简介

大数据Python处理套件 包括connector连接器、plot可视化、util常用日期处理及GIS处理函数 展开 收起
Python 等 2 种语言
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Python
1
https://gitee.com/py-bigdata/py-bigdata-util.git
git@gitee.com:py-bigdata/py-bigdata-util.git
py-bigdata
py-bigdata-util
py-bigdata-util
master

搜索帮助