代码拉取完成,页面将自动刷新
pip install py-bigdate-util --upgrade
读本地配置文件load配置
#!/usr/bin/env python3
# -*- coding=utf-8 -*-
import unittest
from bigdata_util.util import get_absolute_path
from bigdata_util.util import BaseConfig, Singleton
'''
# config.test_conf
a = a
b = 1
c = c
'''
class MyConfig(BaseConfig, metaclass=Singleton):
def __init__(self, cfg: dict = None):
"""
:type cfg: object
"""
super(MyConfig, self).__init__(
cfg,
# config.conf 符合 ConfigObj 格式
# https://configobj.readthedocs.io/en/latest/configobj.html#the-config-file-format
get_absolute_path(__file__, '../config.test_conf')
)
pass
class ConfigTest(unittest.TestCase):
def test_config(self):
config = {
'aa': 'aa',
'bb': 2,
'c': 'cc'
}
c = MyConfig(cfg=config)
self.assertEqual(c.get('a', 'default'), 'a')
self.assertEqual(c.get('aa', 'default'), 'aa')
self.assertEqual(c.get('b', 'default'), 1)
self.assertEqual(c.get('bb', 'default'), 2)
self.assertEqual(c.get('c', 'default'), 'cc')
pass
MaxcomputeConnector功能点
odps和postgres配合使用
from bigdata_util.connector import MaxcomputeConnector, PostGreConnector
from .my_config import MyConfig
cfg = MyConfig()
odps = MaxcomputeConnector(cfg.get('connector.odps'))
postgre = PostGreConnector(cfg.get('connector.postgres'))
# fetch and cache data.
data_list = odps.get_table_data('dual_point', partition='dt=1', ignore_cache=True)
# 多次次读数据时,基于远程表的last_modified_time决定是否重新拉取
data_list = odps.get_table_data('dual_point', partition='dt=1', ignore_cache=True)
postgre.save_geometry_table('dual_point', data_list, col_name_wkt='wkt', geometry_type='point')
ots使用
from bigdata_util.connector import TableStoreConnector
ots_cli = TableStoreConnector(cfg.get('connector.ots'))
consumed, return_row, next_token = ots_cli.get_row('dual', [('id', 1)])
已在java代码中写好mr逻辑,然后在python中调用
define
#!/usr/bin/env python3
# -*- coding=utf-8 -*-
from bigdata_util.util import get_logger, get_absolute_path
from bigdata_util.connector import MaxComputeConnector
from .my_config import CustomConfig
from bigdata_util.mr_launcher import MapReduceLauncher
from typing import List
logger = get_logger(__file__)
cfg = CustomConfig()
odps_cst = MaxComputeConnector(cfg.get('connector.odps_cst'))
class CalculateDistanceOfPools(MapReduceLauncher):
def init_mr_jar_base_path(self, mr_jar_base_path: str = None):
return cfg.get('mr_jar_base_path')
def init_project_base_path(self, mr_jar_base_path: str = None):
return cfg.get('project_base_path')
@staticmethod
def init_odps_conf_file_name():
return 'odps_cst_conf.ini'
def init_mapper_class(self, mapper_class: str = None):
return 'com.aliyun.citybrain.traffic.commonmapper.RawMapper'
def init_reducer_class(self, reducer_class: str = None):
return 'com.aliyun.citybrain.traffic.pooldistance.Reducer'
def init_maxcompute_ins(self, maxcompute_ins: MaxComputeConnector = None):
return odps_cst
def init_mr_jar_name(self, mr_jar_name: str = None):
return 'traffic-algo-mg-1.0-SNAPSHOT-jar-with-dependencies.jar'
def init_system_parameters(self, system_parameters: dict = None):
"""
只控制 下面这三个参数,默认值如下
-splitSize 32
-reduceCnt 100
-reduceMem 4096
:param system_parameters: {'splitSize': '16', 'resuceCnt': '900', 'reduceMem': 4096}
:return: 可以为空
"""
return {
'reduceCnt': 900
}
def init_ddl_file_path(self, ddl_file_path: str = None):
return os.path.abspath(os.path.join(
os.path.split(os.path.realpath(__file__))[0],
'step_3_pool_clustering_euclidean_distance_between_pool_mr.osql'
))
def run(self):
self.launch()
if __name__ == '__main__':
CalculateDistanceOfPools().run()
use
from . import CalculateDistanceOfPools
mr = CalculateDistanceOfPools()
mr.set_init_parameters({
'mapper.key': 'pool_id:string',
'mapper.value': 'pool_id:string,sub_polygon:string',
'geometry_col_name': 'sub_polygon',
'geometry_gen_type': 'wkt',
'input_table': TableInfo('algtmp_tfc_vhcpool_info', partition=cfg.COMMON_PARTITION_WITH_DT),
'output_table': TableInfo('algtmp_rltn_euclidean_distance_pool_pool', partition=cfg.COMMON_PARTITION_WITH_DT),
'overlap_length': 1, # 至少50米交接
'overlap_ratio': 0.1,
'threshold.MAX_DISTANCE': 0
})
mr.launch()
# 打印mr运行的logview...
from . import MyConfig
from bigdata_util.plot import PlotLine
from bigdata_util.connector import MaxcomputeConnector
cfg = MyConfig()
plot_line = PlotLine(MaxcomputeConnector(cfg.get('connector.odps')))
plot_line.plot_line('''
select
stat_time x,
value y,
name label
from dual
''')
pip install pyodps==0.9.1
pip install impyla==0.16.3
pip install thrift_sasl==0.4.2 (mac安装请参考: https://github.com/albin3/book-notes/issues/2)
安装依赖:
mac: brew install krb5
linux: apt install -y krb5-user
pip install bit_array
pip install thrift
pip install thrift_sasl
pip install impyla
pip install krbcontext
pip install hdfs[kerberos] -i https://mirrors.aliyun.com/pypi/simple
pip install pykerberos -i https://mirrors.aliyun.com/pypi/simple
pip install kafka-python==2.0.1
pip install asyncpg
usage:
from bigdata_util.connector.postgre_async import PostgreAsyncConnector
table_name = 'algtmp_connector_test_pg'
pg_conn = PostgreAsyncConnector(
'postgresql://<user_name>:<passwd>@<host_name>/<data_base>',
'socks5://<host_name>:<port>'
)
pg_conn.execute_sql(f'''
create table if not exists {table_name} (
a varchar,
b varchar,
c varchar,
d varchar,
PRIMARY KEY (a, b, c, d)
)
''')
pg_conn.save_data(table_name, [
{'a': 'a1', 'b': 'b1', 'c': 'c2', 'd': 'd1'}
])
data_list = pg_conn.run_sql_return_plain_json(f'''
select * from {table_name}
''')
pg_conn.drop_table(table_name)
暂时只支持查询,不支持写数据
pip install PyMySQL
usage:
from bigdata_util.connector.mysql import MysqlConnector
mysql_conn = MysqlConnector(
host='localhost',
port=3306,
user='root',
password='password',
database='default',
proxy='socks5://<host_name>:<port>',
)
data_list = mysql_conn.run_sql_return_plain_json('''
show tables;
''')
安装前可能需要先手动安装 numpy pandas shapely
> pip install numpy pandas shapely
> pip install py-bigdate-util
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。