From 5d8116248383b95a7cfd75bbefb782fd2e7d2b24 Mon Sep 17 00:00:00 2001 From: movead Date: Wed, 10 Nov 2021 08:13:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9B=9E=E5=90=88postgres=E7=9A=849d6077a?= =?UTF-8?q?=E6=8F=90=E4=BA=A4,=E4=BF=AE=E6=94=B9=E5=BF=AB=E6=8E=92?= =?UTF-8?q?=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../backend/utils/sort/gen_qsort_tuple.pl | 66 ++++++++++++++----- src/common/port/qsort.cpp | 60 ++++++++++++----- src/common/port/qsort_arg.cpp | 60 ++++++++++++----- 3 files changed, 138 insertions(+), 48 deletions(-) diff --git a/src/common/backend/utils/sort/gen_qsort_tuple.pl b/src/common/backend/utils/sort/gen_qsort_tuple.pl index 18dd751b38..75f00efa2f 100644 --- a/src/common/backend/utils/sort/gen_qsort_tuple.pl +++ b/src/common/backend/utils/sort/gen_qsort_tuple.pl @@ -17,6 +17,8 @@ # Remove __inline, _DIAGASSERTs, __P # Remove ill-considered "swap_cnt" switch to insertion sort, # in favor of a simple check for presorted input. +# Take care to recurse on the smaller partition, to bound stack usage. +# # Instead of sorting arbitrary objects, we're always sorting SortTuples # Add CHECK_FOR_INTERRUPTS() # @@ -94,6 +96,13 @@ sub emit_qsort_boilerplate * Software--Practice and Experience 23 (1993) 1249-1265. * We have modified their original by adding a check for already-sorted input, * which seems to be a win per discussions on pgsql-hackers around 2006-03-21. + * + * Also, we recurse on the smaller partition and iterate on the larger one, + * which ensures we cannot recurse more than log(N) levels (since the + * partition recursed to is surely no more than half of the input). Bentley + * and McIlroy explicitly rejected doing this on the grounds that it's "not + * worth the effort", but we have seen crashes in the field due to stack + * overrun, so that judgment seems wrong. */ static void @@ -114,7 +123,7 @@ swapfunc(SortTuple *a, SortTuple *b, size_t n) *(b) = t; \ } while (0); -#define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n)) +#define vecswap(a, b, n) if ((n) > 0) swapfunc(a, b, n) EOM } @@ -141,8 +150,9 @@ qsort_$SUFFIX(SortTuple *a, size_t n$EXTRAARGS) *pl, *pm, *pn; - int d, - r, + size_t d1, + d2; + int r, presorted; loop: @@ -173,7 +183,8 @@ loop: pn = a + (n - 1); if (n > 40) { - d = (n / 8); + size_t d = (n / 8); + pl = med3_$SUFFIX(pl, pl + d, pl + 2 * d$EXTRAPARAMS); pm = med3_$SUFFIX(pm - d, pm, pm + d$EXTRAPARAMS); pn = med3_$SUFFIX(pn - 2 * d, pn - d, pn$EXTRAPARAMS); @@ -187,23 +198,23 @@ loop: { while (pb <= pc && (r = cmp_$SUFFIX(pb, a$CMPPARAMS)) <= 0) { - CHECK_FOR_INTERRUPTS(); if (r == 0) { swap(pa, pb); pa++; } pb++; + CHECK_FOR_INTERRUPTS(); } while (pb <= pc && (r = cmp_$SUFFIX(pc, a$CMPPARAMS)) >= 0) { - CHECK_FOR_INTERRUPTS(); if (r == 0) { swap(pc, pd); pd--; } pc--; + CHECK_FOR_INTERRUPTS(); } if (pb > pc) break; @@ -212,20 +223,39 @@ loop: pc--; } pn = a + n; - r = Min(pa - a, pb - pa); - vecswap(a, pb - r, r); - r = Min(pd - pc, pn - pd - 1); - vecswap(pb, pn - r, r); - if ((r = pb - pa) > 1) - qsort_$SUFFIX(a, r$EXTRAPARAMS); - if ((r = pd - pc) > 1) + d1 = Min(pa - a, pb - pa); + vecswap(a, pb - d1, d1); + d1 = Min(pd - pc, pn - pd - 1); + vecswap(pb, pn - d1, d1); + d1 = pb - pa; + d2 = pd - pc; + if (d1 <= d2) { - /* Iterate rather than recurse to save stack space */ - a = pn - r; - n = r; - goto loop; + /* Recurse on left partition, then iterate on right partition */ + if (d1 > 1) + qsort_$SUFFIX(a, d1$EXTRAPARAMS); + if (d2 > 1) + { + /* Iterate rather than recurse to save stack space */ + /* qsort_$SUFFIX(pn - d2, d2$EXTRAPARAMS); */ + a = pn - d2; + n = d2; + goto loop; + } + } + else + { + /* Recurse on right partition, then iterate on left partition */ + if (d2 > 1) + qsort_$SUFFIX(pn - d2, d2$EXTRAPARAMS); + if (d1 > 1) + { + /* Iterate rather than recurse to save stack space */ + /* qsort_$SUFFIX(a, d1$EXTRAPARAMS); */ + n = d1; + goto loop; + } } -/* qsort_$SUFFIX(pn - r, r$EXTRAPARAMS);*/ } EOM diff --git a/src/common/port/qsort.cpp b/src/common/port/qsort.cpp index e05bbcae6e..4b2f32eeba 100644 --- a/src/common/port/qsort.cpp +++ b/src/common/port/qsort.cpp @@ -6,6 +6,7 @@ * Remove __inline, _DIAGASSERTs, __P * Remove ill-considered "swap_cnt" switch to insertion sort, * in favor of a simple check for presorted input. + * Take care to recurse on the smaller partition, to bound stack usage. * * CAUTION: if you change this file, see also qsort_arg.c, gen_qsort_tuple.pl * @@ -54,6 +55,13 @@ static void swapfunc(char*, char*, size_t, int); * Software--Practice and Experience 23 (1993) 1249-1265. * We have modified their original by adding a check for already-sorted input, * which seems to be a win per discussions on pgsql-hackers around 2006-03-21. + * + * Also, we recurse on the smaller partition and iterate on the larger one, + * which ensures we cannot recurse more than log(N) levels (since the + * partition recursed to is surely no more than half of the input). Bentley + * and McIlroy explicitly rejected doing this on the grounds that it's "not + * worth the effort", but we have seen crashes in the field due to stack + * overrun, so that judgment seems wrong. */ #define swapcode(TYPE, parmi, parmj, n) \ do { \ @@ -89,7 +97,7 @@ static void swapfunc(char* a, char* b, size_t n, int swaptype) #define vecswap(a, b, n) \ if ((n) > 0) \ - swapfunc((a), (b), (size_t)(n), swaptype) + swapfunc((a), (b), (n), swaptype) #define returnifzero(a) \ if (a == 0) { \ @@ -105,7 +113,8 @@ static char* med3(char* a, char* b, char* c, int (*cmp)(const void*, const void* void pg_qsort(void* a, size_t n, size_t es, int (*cmp)(const void*, const void*)) { char *pa = NULL, *pb = NULL, *pc = NULL, *pd = NULL, *pl = NULL, *pm = NULL, *pn = NULL; - int d, r, swaptype, presorted; + size_t d1 = 0, d2 = 0; + int r, swaptype, presorted; returnifzero(es); @@ -134,7 +143,7 @@ loop: pl = (char*)a; pn = (char*)a + (n - 1) * es; if (n > 40) { - d = (n / 8) * es; + size_t d = (n / 8) * es; pl = med3(pl, pl + d, pl + 2 * d, cmp); pm = med3(pm - d, pm, pm + d, cmp); pn = med3(pn - 2 * d, pn - d, pn, cmp); @@ -167,16 +176,37 @@ loop: pc -= es; } pn = (char*)a + n * es; - r = Min(pa - (char*)a, pb - pa); - vecswap((char*)a, pb - r, r); - r = Min((size_t)(pd - pc), (size_t)(pn - pd - es)); - vecswap(pb, pn - r, r); - if ((size_t)(r = pb - pa) > es) - qsort(a, r / es, es, cmp); - if ((size_t)(r = pd - pc) > es) { - /* Iterate rather than recurse to save stack space */ - a = pn - r; - n = r / es; - goto loop; - } + d1 = Min(pa - (char *) a, pb - pa); + vecswap((char*)a, pb - d1, d1); + d1 = Min(pd - pc, pn - pd - es); + vecswap(pb, pn - d1, d1); + d1 = pb - pa; + d2 = pd - pc; + if (d1 <= d2) + { + /* Recurse on left partition, then iterate on right partition */ + if (d1 > es) + pg_qsort(a, d1 / es, es, cmp); + if (d2 > es) + { + /* Iterate rather than recurse to save stack space */ + /* pg_qsort(pn - d2, d2 / es, es, cmp); */ + a = pn - d2; + n = d2 / es; + goto loop; + } + } + else + { + /* Recurse on right partition, then iterate on left partition */ + if (d2 > es) + pg_qsort(pn - d2, d2 / es, es, cmp); + if (d1 > es) + { + /* Iterate rather than recurse to save stack space */ + /* pg_qsort(a, d1 / es, es, cmp); */ + n = d1 / es; + goto loop; + } + } } diff --git a/src/common/port/qsort_arg.cpp b/src/common/port/qsort_arg.cpp index 4d3e2cb601..9802925767 100644 --- a/src/common/port/qsort_arg.cpp +++ b/src/common/port/qsort_arg.cpp @@ -6,6 +6,7 @@ * Remove __inline, _DIAGASSERTs, __P * Remove ill-considered "swap_cnt" switch to insertion sort, * in favor of a simple check for presorted input. + * Take care to recurse on the smaller partition, to bound stack usage. * * CAUTION: if you change this file, see also qsort.c, gen_qsort_tuple.pl * @@ -54,6 +55,13 @@ static void swapfunc(char*, char*, size_t, int); * Software--Practice and Experience 23 (1993) 1249-1265. * We have modified their original by adding a check for already-sorted input, * which seems to be a win per discussions on pgsql-hackers around 2006-03-21. + * + * Also, we recurse on the smaller partition and iterate on the larger one, + * which ensures we cannot recurse more than log(N) levels (since the + * partition recursed to is surely no more than half of the input). Bentley + * and McIlroy explicitly rejected doing this on the grounds that it's "not + * worth the effort", but we have seen crashes in the field due to stack + * overrun, so that judgment seems wrong. */ #define swapcode(TYPE, parmi, parmj, n) \ do { \ @@ -89,7 +97,7 @@ static void swapfunc(char* a, char* b, size_t n, int swaptype) #define vecswap(a, b, n) \ if ((n) > 0) \ - swapfunc((a), (b), (size_t)(n), swaptype) + swapfunc((a), (b), (n), swaptype) #define returnifzero(a) \ if (a == 0) { \ @@ -105,7 +113,8 @@ static char* med3(char* a, char* b, char* c, qsort_arg_comparator cmp, void* arg void qsort_arg(void* a, size_t n, size_t es, qsort_arg_comparator cmp, void* arg) { char *pa = NULL, *pb = NULL, *pc = NULL, *pd = NULL, *pl = NULL, *pm = NULL, *pn = NULL; - int d, r, swaptype, presorted; + size_t d1 = 0, d2 = 0; + int r, swaptype, presorted; returnifzero(es); @@ -132,7 +141,7 @@ loop: pl = (char*)a; pn = (char*)a + (n - 1) * es; if (n > 40) { - d = (n / 8) * es; + size_t d = (n / 8) * es; pl = med3(pl, pl + d, pl + 2 * d, cmp, arg); pm = med3(pm - d, pm, pm + d, cmp, arg); pn = med3(pn - 2 * d, pn - d, pn, cmp, arg); @@ -165,16 +174,37 @@ loop: pc -= es; } pn = (char*)a + n * es; - r = Min(pa - (char*)a, pb - pa); - vecswap((char*)a, pb - r, r); - r = Min((size_t)(pd - pc), (size_t)(pn - pd - es)); - vecswap(pb, pn - r, r); - if ((size_t)(r = pb - pa) > es) - qsort_arg(a, r / es, es, cmp, arg); - if ((size_t)(r = pd - pc) > es) { - /* Iterate rather than recurse to save stack space */ - a = pn - r; - n = r / es; - goto loop; - } + d1 = Min(pa - (char *) a, pb - pa); + vecswap((char*)a, pb - d1, d1); + d1 = Min(pd - pc, pn - pd - es); + vecswap(pb, pn - d1, d1); + d1 = pb - pa; + d2 = pd - pc; + if (d1 <= d2) + { + /* Recurse on left partition, then iterate on right partition */ + if (d1 > es) + qsort_arg(a, d1 / es, es, cmp, arg); + if (d2 > es) + { + /* Iterate rather than recurse to save stack space */ + /* qsort_arg(pn - d2, d2 / es, es, cmp, arg); */ + a = pn - d2; + n = d2 / es; + goto loop; + } + } + else + { + /* Recurse on right partition, then iterate on left partition */ + if (d2 > es) + qsort_arg(pn - d2, d2 / es, es, cmp, arg); + if (d1 > es) + { + /* Iterate rather than recurse to save stack space */ + /* qsort_arg(a, d1 / es, es, cmp, arg); */ + n = d1 / es; + goto loop; + } + } } -- Gitee