diff --git a/MANIFEST.in b/MANIFEST.in index fcb59edebf84ff2b2b569a7dc1228d3b82f58d25..3c91aa35bb1910db5b4ecd34c9d9d937ed7c020b 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,6 +1,6 @@ include AUTHORS include LICENSE -recursive-include postgresql *.c -recursive-include postgresql *.sql -recursive-include postgresql *.txt -recursive-include postgresql/documentation/sphinx *.rst conf.py +recursive-include py_opengauss *.c +recursive-include py_opengauss *.sql +recursive-include py_opengauss *.txt +recursive-include py_opengauss/documentation/sphinx *.rst conf.py diff --git a/README.md b/README.md index 0f6089c75c1b2c54f9dd20796919c623419cc561..effa8abe2d0378f1a36062b5c795692d5cb787c3 100644 --- a/README.md +++ b/README.md @@ -1,41 +1,42 @@ -### About +### 关于 -py-opengauss is a Python 3 package providing modules for working with openGauss. -Primarily, a high-level driver for querying databases. +该驱动是基于 [py-postgresql](https://github.com/python-postgres/fe) 1.3.0 版本进行修改的,新增了两个特性: +- 支持 [openGauss](https://opengauss.org/) 数据库连接 +- 支持多 IP 连接 -For a high performance async interface, MagicStack's asyncpg -http://github.com/MagicStack/asyncpg should be considered. -py-opengauss, currently, does not have direct support for high-level async -interfaces provided by recent versions of Python. Future versions may change this. +### 安装方式 -### Advisory +通过 pypi.org: -In v1.3, `py_opengauss.driver.dbapi20.connect` will now raise `ClientCannotConnectError` directly. -Exception traps around connect should still function, but the `__context__` attribute -on the error instance will be `None` in the usual failure case as it is no longer -incorrectly chained. Trapping `ClientCannotConnectError` ahead of `Error` should -allow both cases to co-exist in the event that data is being extracted from -the `ClientCannotConnectError`. + $ pip install py-opengauss + +通过源码安装: -In v2.0, support for older versions of PostgreSQL and Python will be removed. -If you have automated installations using PyPI, make sure that they specify a major version. + $ git clone https://github.com/vimiix/py-opengauss.git + $ cd py-opengauss + $ python3 setup.py install -### Installation +### 连接方式: -Using PyPI.org: +> 支持的连接协议列表: ['pq', 'postgres', 'postgresql', 'og', 'opengauss'] - $ pip install py-opengauss +```python +>>> import py_opengauss +# General Format: +>>> db = py_opengauss.open('pq://user:password@host:port/database') -From a clone: +# Also support opengauss scheme: +>>> db = py_opengauss.open('opengauss://user:password@host:port/database') - $ git clone https://github.com/vimiix/py-opengauss.git - $ cd py-opengauss - $ python3 ./setup.py install # Or use in-place without installation(PYTHONPATH). +# multi IP support, will return PRIMARY instance connect: +>>> db = py_opengauss.open('opengauss://user:password@host1:123,host2:456/database') -### Basic Usage +# Connect to 'postgres' at localhost. +>>> db = py_opengauss.open('localhost/postgres') +``` -> Support schemes: ['pq', 'postgres', 'postgresql', 'og', 'opengauss'] +### 基本用法 ```python import py_opengauss @@ -50,6 +51,25 @@ with db.xact(): print(x) ``` +### sqlalchemy 多IP连接用法 + +*注:sqlalchemy 目前本身是不支持 py_opengauss 包的* + +由于 sqlalchemy 在内部会解析连接串,且目前仅支持单个IP的连接串。 +所以需下载定制后的 [sqlalchemy](https://github.com/vimiix/sqlalchemy) 手动安装使用 + +https://github.com/vimiix/sqlalchemy + +该定制版本在内部增加了对于 py_opengauss 包的支持,且支持了多IP连接串。 + +##### 使用方式 + +```python +from sqlalchemy import create_engine +# 初始化opengauss数据库多主机连接(适用于没有固定虚拟IP的数据库主备集群): +engine = create_engine('postgresql+pyopengauss://user:password@host1:port1,host2:port2/db') +``` + ### Documentation http://py-postgresql.readthedocs.io diff --git a/py_opengauss/__init__.py b/py_opengauss/__init__.py index 621f9a62efae0afb33d17c8f41fb9687dc17ec1f..9bd5ccd53d3b97e3566f777285d2fcfa5de3ab45 100644 --- a/py_opengauss/__init__.py +++ b/py_opengauss/__init__.py @@ -33,6 +33,16 @@ try: except ImportError: pass +# Check that the given connection is the primary instance +def is_primary(c): + sql = "SELECT local_role,db_state FROM pg_stat_get_stream_replications()" + r = c.prepare(sql)() + if r: + # 主备实例时角色为 Primary,单实例时为 Normal + if r[0][0] in ('Primary', 'Normal') and r[0][1] == 'Normal': + return True + return False + # Avoid importing these until requested. _pg_iri = _pg_driver = _pg_param = None def open(iri = None, prompt_title = None, **kw): @@ -47,6 +57,9 @@ def open(iri = None, prompt_title = None, **kw): # Also support opengauss scheme: >>> db = py_opengauss.open('opengauss://user:password@host:port/database') + # multi IP support: + >>> db = py_opengauss.open('opengauss://user:password@host1:123,host2:456/database') + # Connect to 'postgres' at localhost. >>> db = py_opengauss.open('localhost/postgres') @@ -70,29 +83,46 @@ def open(iri = None, prompt_title = None, **kw): return_connector = True iri = iri[1:] iri_params = _pg_iri.parse(iri) - iri_params.pop('path', None) + [p.pop('path', None) for p in iri_params] else: - iri_params = {} + iri_params = [] std_params = _pg_param.collect(prompt_title = None) - # If unix is specified, it's going to conflict with any standard - # settings, so remove them right here. - if 'unix' in kw or 'unix' in iri_params: - std_params.pop('host', None) - std_params.pop('port', None) - params = _pg_param.normalize( - list(_pg_param.denormalize_parameters(std_params)) + \ - list(_pg_param.denormalize_parameters(iri_params)) + \ - list(_pg_param.denormalize_parameters(kw)) - ) - _pg_param.resolve_password(params) - - C = _pg_driver.default.fit(**params) - if return_connector is True: - return C - else: + + # Traversal connect host for search primary + errs = [] + for iri_param in iri_params: + # If unix is specified, it's going to conflict with any standard + # settings, so remove them right here. + if 'unix' in kw or 'unix' in iri_param: + std_params.pop('host', None) + std_params.pop('port', None) + params = _pg_param.normalize( + list(_pg_param.denormalize_parameters(std_params)) + \ + list(_pg_param.denormalize_parameters(iri_param)) + \ + list(_pg_param.denormalize_parameters(kw)) + ) + _pg_param.resolve_password(params) + + C = _pg_driver.default.fit(**params) c = C() - c.connect() - return c + if len(iri_params) == 1: + if return_connector: + return C + else: + c.connect() + return c + try: + c.connect() + except Exception as e: + errs.append({params.get('host'): e}) + continue + if is_primary(c): + return C if return_connector is True else c + else: + c.close() + errs.append({params.get('host'): "not primary instance"}) + + raise ConnectionError(errs) __docformat__ = 'reStructuredText' diff --git a/py_opengauss/driver/dbapi20.py b/py_opengauss/driver/dbapi20.py index 7296793a33c3fc93c8197b350f70ae2d8a3e7876..aa8c88c5c9f4de847f0236b215e2359655258e84 100644 --- a/py_opengauss/driver/dbapi20.py +++ b/py_opengauss/driver/dbapi20.py @@ -13,6 +13,7 @@ from functools import partial import datetime import time import re +from copy import deepcopy from .. import clientparameters as pg_param from .. import driver as pg_driver @@ -401,6 +402,16 @@ class Connection(Connection): self._xact = self.xact() self._xact.start() + +# Check that the given connection is the primary instance +def is_primary(c): + sql = "select local_role,db_state from pg_stat_get_stream_replications()" + r = c.prepare(sql)() + if r and r[0][0] in ('Primary', 'Normal') and r[0][1] == 'Normal': + return True + return False + + driver = pg_driver.Driver(connection = Connection) def connect(**kw): """ @@ -408,11 +419,51 @@ def connect(**kw): Due to the way defaults are populated, when connecting to a local filesystem socket using the `unix` keyword parameter, `host` and `port` must also be set to ``None``. + + [新增] 如果上层是多host连接方式,比如:user:password@host1:123,host2:456/database + 需在 kw 中通过 host 字段以多地址方式传入: + { 'host': 'host1:123,host2:456'} + 会返回角色为主库的 connect 对象 """ + kws = [] + is_multi_host = False + if 'host' in kw: + addrs = kw.pop('host').split(',') + if len(addrs) > 1: + is_multi_host = True + for addr in addrs: + param = deepcopy(kw) + if ":" in addr: + host, str_port = addr.strip().split(':') + port = int(str_port) + param['host'] = host + param['port'] = port + else: + param['host'] = addr + kws.append(param) + else: + kws.append(kw) + std_params = pg_param.collect(prompt_title = None) - params = pg_param.normalize( - list(pg_param.denormalize_parameters(std_params)) + \ - list(pg_param.denormalize_parameters(kw)) - ) - pg_param.resolve_password(params) - return driver.connect(**params) + + errs = [] + for kw in kws: + params = pg_param.normalize( + list(pg_param.denormalize_parameters(std_params)) + \ + list(pg_param.denormalize_parameters(kw)) + ) + pg_param.resolve_password(params) + if not is_multi_host: + return driver.connect(**params) + else: + try: + c = driver.connect(**params) + except Exception as e: + errs.append({params.get('host'): e}) + continue + if is_primary(c): + return c + else: + c.close() + errs.append({params.get('host'): "not primary instance"}) + raise ConnectionError(errs) diff --git a/py_opengauss/iri.py b/py_opengauss/iri.py index 150b22c1b5f9b859ec28d5df4a61f92355713fcc..4fd3698a9a0d8e0c0caa3680a2d1b94c61446590 100644 --- a/py_opengauss/iri.py +++ b/py_opengauss/iri.py @@ -26,69 +26,72 @@ del itemgetter import re escape_path_re = re.compile('[%s]' %(re.escape(ri.unescaped + ','),)) -def structure(d, fieldproc = ri.unescape): +def structure(l, fieldproc = ri.unescape): """ Create a clientparams dictionary from a parsed RI. """ - scheme = d.get('scheme', 'pq').lower() - if scheme not in {'pq', 'postgres', 'postgresql', 'og', 'opengauss'}: - raise ValueError("PQ-IRI scheme is not 'pq', 'postgres', 'postgresql', 'og' or 'opengauss'") - if scheme in {'og', 'opengauss'}: - # recover opengauss scheme to pq - d['scheme'] = 'pq' - - cpd = { - k : fieldproc(v) for k, v in d.items() - if k not in ('path', 'fragment', 'query', 'host', 'scheme') - } - - path = d.get('path') - frag = d.get('fragment') - query = d.get('query') - host = d.get('host') - - if host is not None: - if host.startswith('[') and host.endswith(']'): - host = host[1:-1] - if host.startswith('unix:'): - cpd['unix'] = host[len('unix:'):].replace(':','/') + res = [] + for d in l: + scheme = d.get('scheme', 'pq').lower() + if scheme not in {'pq', 'postgres', 'postgresql', 'og', 'opengauss'}: + raise ValueError("PQ-IRI scheme is not 'pq', 'postgres', 'postgresql', 'og' or 'opengauss'") + if scheme in {'og', 'opengauss'}: + # recover opengauss scheme to pq + d['scheme'] = 'pq' + + cpd = { + k : fieldproc(v) for k, v in d.items() + if k not in ('path', 'fragment', 'query', 'host', 'scheme') + } + + path = d.get('path') + frag = d.get('fragment') + query = d.get('query') + host = d.get('host') + + if host is not None: + if host.startswith('[') and host.endswith(']'): + host = host[1:-1] + if host.startswith('unix:'): + cpd['unix'] = host[len('unix:'):].replace(':','/') + else: + cpd['host'] = host else: - cpd['host'] = host - else: - cpd['host'] = fieldproc(host) + cpd['host'] = fieldproc(host) - if path: - # Only state the database field's existence if the first path is non-empty. - if path[0]: - cpd['database'] = path[0] - path = path[1:] if path: - cpd['path'] = path - - settings = {} - if query: - if hasattr(query, 'items'): - qiter = query.items() - else: - qiter = query - for k, v in qiter: - if k.startswith('[') and k.endswith(']'): - k = k[1:-1] - if k != 'settings' and k not in cpd: - cpd[fieldproc(k)] = fieldproc(v) - elif k: - settings[fieldproc(k)] = fieldproc(v) - # else: ignore empty query keys - - if frag: - settings['search_path'] = [ - fieldproc(x) for x in frag.split(',') - ] - - if settings: - cpd['settings'] = settings - - return cpd + # Only state the database field's existence if the first path is non-empty. + if path[0]: + cpd['database'] = path[0] + path = path[1:] + if path: + cpd['path'] = path + + settings = {} + if query: + if hasattr(query, 'items'): + qiter = query.items() + else: + qiter = query + for k, v in qiter: + if k.startswith('[') and k.endswith(']'): + k = k[1:-1] + if k != 'settings' and k not in cpd: + cpd[fieldproc(k)] = fieldproc(v) + elif k: + settings[fieldproc(k)] = fieldproc(v) + # else: ignore empty query keys + + if frag: + settings['search_path'] = [ + fieldproc(x) for x in frag.split(',') + ] + + if settings: + cpd['settings'] = settings + res.append(cpd) + + return res def construct_path(x, re = escape_path_re): """ @@ -177,7 +180,7 @@ def construct(x, obscure_password = False): def parse(s, fieldproc = ri.unescape): """ - Parse a Postgres IRI into a dictionary object. + Parse a Postgres IRI into a dictionary object list. """ return structure( # In ri.parse, don't unescape the parsed values as our sub-structure diff --git a/py_opengauss/project.py b/py_opengauss/project.py index 40f00ca7c0b3d90d92bb9e812b9723f067f1096f..5da1c5fb4dc0dfa952d6e20ace7f5f8d866dcca2 100644 --- a/py_opengauss/project.py +++ b/py_opengauss/project.py @@ -8,5 +8,5 @@ identity = 'http://github.com/vimiix/py-opengauss' meaculpa = 'Python+openGauss' abstract = 'Driver and tools library for openGauss' -version_info = (1, 3, 1) # dev based on py-postgresql version 1.3.0 +version_info = (1, 3, 6) # dev based on py-postgresql version 1.3.0 version = '.'.join(map(str, version_info)) diff --git a/py_opengauss/release/distutils.py b/py_opengauss/release/distutils.py index 836b67f8f6e349d8274852a1420aac55746d62c1..f7073f702a521512ea185770be2ec310b4ecd4a3 100644 --- a/py_opengauss/release/distutils.py +++ b/py_opengauss/release/distutils.py @@ -21,21 +21,22 @@ except ImportError as e: LONG_DESCRIPTION = """ This package is based on py-postgresql upgrades to work with openGauss. -Forked Repo: http://github.com/vimiix/py-opengauss +Repo: http://github.com/vimiix/py-opengauss +Adapted from: http://github.com/python-postgres/fe v1.3.0 .. warning:: - In v1.3, `postgresql.driver.dbapi20.connect` will now raise `ClientCannotConnectError` directly. + In v1.3, `py_opengauss.driver.dbapi20.connect` will now raise `ClientCannotConnectError` directly. Exception traps around connect should still function, but the `__context__` attribute on the error instance will be `None` in the usual failure case as it is no longer incorrectly chained. Trapping `ClientCannotConnectError` ahead of `Error` should allow both cases to co-exist in the event that data is being extracted from the `ClientCannotConnectError`. -py-postgresql is a set of Python modules providing interfaces to various parts -of PostgreSQL. Primarily, it provides a pure-Python driver with some C optimizations for -querying a PostgreSQL database. +py-opengauss is a set of Python modules providing interfaces to various parts +of openGauss. Primarily, it provides a pure-Python driver with some C optimizations for +querying a openGauss database. -http://github.com/python-postgres/fe +http://github.com/vimiix/py-opengauss Features: @@ -43,11 +44,12 @@ Features: * Cluster tools for creating and controlling a cluster. * Support for most PostgreSQL types: composites, arrays, numeric, lots more. * COPY support. + * Multiple ip connect support Sample PG-API Code:: - >>> import postgresql - >>> db = postgresql.open('pq://user:password@host:port/database') + >>> import py_opengauss + >>> db = py_opengauss.open('pq://user:password@host:port,host:port/database') >>> db.execute("CREATE TABLE emp (emp_first_name text, emp_last_name text, emp_salary numeric)") >>> make_emp = db.prepare("INSERT INTO emp VALUES ($1, $2, $3)") >>> make_emp("John", "Doe", "75,322") @@ -58,13 +60,13 @@ Sample PG-API Code:: There is a DB-API 2.0 module as well:: - postgresql.driver.dbapi20 + py_opengauss.driver.dbapi20 However, PG-API is recommended as it provides greater utility. Once installed, try out the ``pg_python`` console script:: - $ python3 -m postgresql.bin.pg_python -h localhost -p port -U theuser -d database_name + $ python3 -m py_opengauss.bin.pg_python -h localhost -p port -U theuser -d database_name If a successful connection is made to the remote host, it will provide a Python console with the database connection bound to the `db` name. @@ -174,8 +176,8 @@ def standard_setup_keywords(build_extensions = True, prefix = default_prefix): 'long_description_content_type' : 'text/x-rst', 'author' : 'James William Pye', 'author_email' : 'james.pye@gmail.com', - 'maintainer' : 'James William Pye', - 'maintainer_email' : 'james.pye@gmail.com', + 'maintainer' : 'Vimiix', + 'maintainer_email' : 'i@vimiix.com', 'url' : url, 'classifiers' : CLASSIFIERS, 'packages' : list(prefixed_packages(prefix = prefix)), diff --git a/py_opengauss/resolved/riparse.py b/py_opengauss/resolved/riparse.py index f91a261897e35fd5437366565357ddcb7eacc7b5..523989a10c34996005699a57308e47df9dd60ef9 100644 --- a/py_opengauss/resolved/riparse.py +++ b/py_opengauss/resolved/riparse.py @@ -39,6 +39,7 @@ within an RI's component as it can create ambiguity about a token when a percent encoded variant is decoded. """ import re +import copy pct_encode = '%%%0.2X'.__mod__ unescaped = '%' + ''.join([chr(x) for x in range(0, 33)]) @@ -211,9 +212,34 @@ def unsplit(t): s += t[4] return s +def split_ip(ip, fieldproc = unescape): + if ip[0] == '[': + # IPvN addr + next_pos = ip.find(']') + if next_pos == -1: + # unterminated IPvN block + next_pos = len(ip) - 1 + addr = ip[:next_pos + 1] + pos = next_pos + 1 + next_pos = addr.find(':', pos) + if next_pos == -1: + port = None + else: + port = fieldproc(addr[next_pos + 1:]) + else: + next_pos = ip.find(':') + if next_pos == -1: + addr = fieldproc(ip) + port = None + else: + addr = fieldproc(ip[:next_pos]) + port = fieldproc(ip[next_pos + 1:]) + return addr, port + def split_netloc(netloc, fieldproc = unescape): """ - Split a net location into a 4-tuple, (user, password, host, port). + Split a net location into a 4-tuple list, + [(user, password, host, port), (user, password, host, port), ...]. Set `fieldproc` to `str` if the components' percent escapes should not be decoded. @@ -237,32 +263,23 @@ def split_netloc(netloc, fieldproc = unescape): pos += 1 if pos >= len(netloc): - return (user, password, None, None) - - pos_chr = netloc[pos] - if pos_chr == '[': - # IPvN addr - next_pos = netloc.find(']', pos) - if next_pos == -1: - # unterminated IPvN block - next_pos = len(netloc) - 1 - addr = netloc[pos:next_pos+1] - pos = next_pos + 1 - next_pos = netloc.find(':', pos) - if next_pos == -1: - port = None - else: - port = fieldproc(netloc[next_pos+1:]) - else: - next_pos = netloc.find(':', pos) - if next_pos == -1: - addr = fieldproc(netloc[pos:]) - port = None + return [(user, password, None, None)] + + res = [] + addr_start_pos = pos + keep_going = True + while keep_going: + next_comma_pos = netloc.find(',', addr_start_pos) + if next_comma_pos == -1: + addr_end_pos = len(netloc) + keep_going = False else: - addr = fieldproc(netloc[pos:next_pos]) - port = fieldproc(netloc[next_pos+1:]) - - return (user, password, addr, port) + addr_end_pos = next_comma_pos + addr, port = split_ip(netloc[addr_start_pos:addr_end_pos], fieldproc) + res.append((user, password, addr, port)) + if keep_going: + addr_start_pos = next_comma_pos+1 + return res def unsplit_netloc(t): """ @@ -288,7 +305,7 @@ def unsplit_netloc(t): def structure(t, fieldproc = unescape): """ - Create a dictionary from a split RI(5-tuple). + Create a dictionary list from a split RI(5-tuple). Set `fieldproc` to `str` if the components' percent escapes should not be decoded. @@ -298,17 +315,6 @@ def structure(t, fieldproc = unescape): if t[0] is not None: d['scheme'] = t[0] - if t[1] is not None: - uphp = split_netloc(t[1], fieldproc = fieldproc) - if uphp[0] is not None: - d['user'] = uphp[0] - if uphp[1] is not None: - d['password'] = uphp[1] - if uphp[2] is not None: - d['host'] = uphp[2] - if uphp[3] is not None: - d['port'] = uphp[3] - if t[2] is not None: if t[2]: d['path'] = list(map(fieldproc, t[2].split('/'))) @@ -324,7 +330,24 @@ def structure(t, fieldproc = unescape): if t[4] is not None: d['fragment'] = fieldproc(t[4]) - return d + + if t[1] is None: # netloc + return [d] + else: + res = [] + uphps = split_netloc(t[1], fieldproc = fieldproc) + for uphp in uphps: + tmpd = copy.deepcopy(d) + if uphp[0] is not None: + tmpd['user'] = uphp[0] + if uphp[1] is not None: + tmpd['password'] = uphp[1] + if uphp[2] is not None: + tmpd['host'] = uphp[2] + if uphp[3] is not None: + tmpd['port'] = uphp[3] + res.append(tmpd) + return res def construct_query(x, key_re = escape_query_key_re, @@ -374,7 +397,7 @@ def construct(x): def parse(s, fieldproc = unescape): """ - Parse an RI into a dictionary object. Synonym for ``structure(split(x))``. + Parse an RI into a dictionary object list. Synonym for ``structure(split(x))``. Set `fieldproc` to `str` if the components' percent escapes should not be decoded.