1 Star 0 Fork 3

xurr2020/boostkit-mysql

forked from src-oepkgs/boostkit-mysql 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
KunpengBoostKit22.0.RC4-CODE-THREADPOOL-FOR-MySQL-8.0.25.patch 91.24 KB
一键复制 编辑 原始数据 按行查看 历史
maxiaoqi2020 提交于 2023-03-01 17:48 . create boostkit mysql rpm package
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962
diff --git a/plugin/thread_pool/CMakeLists.txt b/plugin/thread_pool/CMakeLists.txt
new file mode 100644
index 00000000000..35cbdff5140
--- /dev/null
+++ b/plugin/thread_pool/CMakeLists.txt
@@ -0,0 +1,26 @@
+# Copyright (c) 2006, 2010, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2022 Huawei Technologies Co., Ltd.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+ADD_COMPILE_DEFINITIONS(
+ COMPILE_DEFINITIONS MYSQL_DYNAMIC_PLUGIN)
+
+MYSQL_ADD_PLUGIN(thread_pool
+ threadpool_common.cc
+ threadpool_unix.cc
+ MODULE_ONLY
+ MODULE_OUTPUT_NAME "thread_pool"
+ )
+
diff --git a/plugin/thread_pool/numa_affinity_manager.h b/plugin/thread_pool/numa_affinity_manager.h
new file mode 100644
index 00000000000..3471d328736
--- /dev/null
+++ b/plugin/thread_pool/numa_affinity_manager.h
@@ -0,0 +1,117 @@
+/* Copyright (C) 2012 Monty Program Ab
+ Copyright (C) 2022 Huawei Technologies Co., Ltd
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#ifndef NUMA_AFFINITY_MANAGER_H_
+#define NUMA_AFFINITY_MANAGER_H_
+
+#include <string>
+#include <vector>
+#include <iostream>
+#include <memory>
+#include <numa.h>
+
+using namespace std;
+
+class numa_affinity_manager
+{
+public:
+ numa_affinity_manager(){};
+ virtual ~numa_affinity_manager(){};
+
+ bool init() {
+ initok = false;
+ cpu_count = get_sys_cpu();
+ numa_count = get_sys_numa();
+ if (cpu_count <= 0 || numa_count <= 0 ||
+ cpu_count % numa_count != 0) {
+ return false;
+ }
+
+ int cpu_per_numa = cpu_count / numa_count;
+ int start = 0;
+ numa_cpu_map.clear();
+ auto delete_cpumask = [](bitmask *ptr) {
+ if (ptr != nullptr) {
+ numa_free_cpumask(ptr);
+ }
+ };
+ for (int i = 0; i < numa_count; i++) {
+ auto msk = numa_allocate_cpumask();
+ if (msk == nullptr) {
+ return false;
+ }
+
+ for (int j = 0; j < cpu_per_numa; j++) {
+ numa_bitmask_setbit(msk, start + j);
+ }
+ numa_cpu_map.emplace_back(msk, delete_cpumask);
+ start += cpu_per_numa;
+ }
+ initok = true;
+ return true;
+ }
+
+ bool bind_numa(int group_id) {
+ if (initok) {
+ pid_t pid = gettid();
+ return (numa_sched_setaffinity(
+ pid, numa_cpu_map[group_id%numa_cpu_map.size()].get()) == 0);
+ }
+
+ return false;
+ }
+
+protected:
+ int get_sys_cpu() {
+ return numa_num_configured_cpus();
+ }
+
+ int get_sys_numa() {
+ return numa_num_configured_nodes();
+ }
+
+ pid_t gettid() {
+ return static_cast<pid_t>(syscall(SYS_gettid));
+ }
+
+public:
+ void print_cpumask(const string &name, bitmask *msk) {
+ cout << name << ": ";
+ for (unsigned int i = 0; i < msk->size; i++) {
+ if (numa_bitmask_isbitset(msk, i)) {
+ cout << i << " ";
+ }
+ }
+ cout << endl;
+ }
+ void dump() {
+ cout << "initok: " << initok << endl;
+ cout << "cpu_count: " << cpu_count << endl;
+ cout << "numa_count: " << numa_count << endl;
+
+ for (unsigned int i = 0; i < numa_cpu_map.size(); i++) {
+ string name = "numa_cpu_map[" + to_string(i) + "]";
+ print_cpumask(name, numa_cpu_map[i].get());
+ }
+ }
+
+private:
+ bool initok{false};
+ int cpu_count{0};
+ int numa_count{0};
+ vector<shared_ptr<bitmask>> numa_cpu_map;
+};
+
+#endif // NUMA_AFFINITY_MANAGER_H_
diff --git a/plugin/thread_pool/threadpool.h b/plugin/thread_pool/threadpool.h
new file mode 100644
index 00000000000..f4dd68dc8a9
--- /dev/null
+++ b/plugin/thread_pool/threadpool.h
@@ -0,0 +1,89 @@
+/* Copyright (C) 2012 Monty Program Ab
+ Copyright (C) 2022 Huawei Technologies Co., Ltd
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#ifndef THREADPOOL_H_
+#define THREADPOOL_H_
+
+#include "sql/sql_class.h"
+#include "sql/mysqld_thd_manager.h"
+#include "sql/conn_handler/connection_handler_manager.h"
+#include "sql/conn_handler/channel_info.h"
+
+struct SHOW_VAR;
+
+#define MAX_THREAD_GROUPS 1024
+#define MAX_CONNECTIONS 100000
+
+
+enum tp_high_prio_mode_t {
+ TP_HIGH_PRIO_MODE_TRANSACTIONS,
+ TP_HIGH_PRIO_MODE_STATEMENTS,
+ TP_HIGH_PRIO_MODE_NONE
+};
+
+/* Threadpool parameters */
+extern uint threadpool_idle_timeout; /* Shutdown idle worker threads after this timeout */
+extern bool threadpool_dedicated_listener; /* Control whether listener be dedicated */
+extern uint threadpool_size; /* Number of parallel executing threads */
+extern bool threadpool_sched_affinity; /* Control whether thread group scheduling affinity */
+extern uint threadpool_max_threads;
+extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/
+extern uint threadpool_oversubscribe; /* Maximum active threads in group */
+extern uint threadpool_toobusy; /* Maximum active and waiting threads in group */
+
+/* Possible values for thread_pool_high_prio_mode */
+extern const char *threadpool_high_prio_mode_names[];
+
+/* Common thread pool routines, suitable for different implementations */
+extern void threadpool_remove_connection(THD *thd);
+extern int threadpool_process_request(THD *thd);
+extern int threadpool_add_connection(THD *thd);
+
+/*
+ Functions used by scheduler.
+ OS-specific implementations are in
+ threadpool_unix.cc or threadpool_win.cc
+*/
+extern bool tp_init();
+extern void tp_wait_begin(THD *, int);
+extern void tp_wait_end(THD *);
+extern void tp_post_kill_notification(THD *thd) noexcept;
+extern bool tp_add_connection(Channel_info *);
+extern void tp_end(void);
+extern void tp_fake_end(void);
+extern void threadpool_remove_connection(THD *thd);
+extern bool thread_attach(THD *thd);
+
+extern THD_event_functions tp_event_functions;
+
+/*
+ Threadpool statistics
+*/
+struct TP_STATISTICS {
+ /* Current number of worker thread. */
+ std::atomic<int32> num_worker_threads;
+};
+
+extern TP_STATISTICS tp_stats;
+
+/* Functions to set threadpool parameters */
+extern void tp_set_threadpool_size(uint val) noexcept;
+extern void tp_set_threadpool_stall_limit(uint val) noexcept;
+
+extern uint tp_get_thdvar_high_prio_tickets(THD *thd);
+extern uint tp_get_thdvar_high_prio_mode(THD *thd);
+
+#endif // THREADPOOL_H_
+
diff --git a/plugin/thread_pool/threadpool_common.cc b/plugin/thread_pool/threadpool_common.cc
new file mode 100644
index 00000000000..00595fc4b3f
--- /dev/null
+++ b/plugin/thread_pool/threadpool_common.cc
@@ -0,0 +1,765 @@
+/* Copyright (C) 2012 Monty Program Ab
+ Copyright (C) 2022 Huawei Technologies Co., Ltd
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "threadpool.h"
+#include "threadpool_unix.h"
+#include "my_thread_local.h"
+#include "my_sys.h"
+#include "mysql/plugin.h"
+#include "mysql/psi/mysql_idle.h"
+#include "mysql/thread_pool_priv.h"
+#include "sql/debug_sync.h"
+#include "sql/mysqld.h"
+#include "sql/sql_class.h"
+#include "sql/sql_connect.h"
+#include "sql/protocol_classic.h"
+#include "sql/sql_parse.h"
+#include "sql/sql_table.h"
+#include "sql/field.h"
+#include "sql/sql_show.h"
+#include "sql/sql_class.h"
+#include <dlfcn.h>
+#include <memory>
+
+#define MYSQL_SERVER 1
+
+/* Threadpool parameters */
+uint threadpool_idle_timeout;
+bool threadpool_dedicated_listener;
+uint threadpool_size;
+bool threadpool_sched_affinity;
+uint threadpool_stall_limit;
+uint threadpool_max_threads;
+uint threadpool_oversubscribe;
+uint threadpool_toobusy;
+
+/* Stats */
+TP_STATISTICS tp_stats;
+
+/*
+ Worker threads contexts, and THD contexts.
+ =========================================
+
+ Both worker threads and connections have their sets of thread local variables
+ At the moment it is mysys_var (this has specific data for dbug, my_error and
+ similar goodies), and PSI per-client structure.
+
+ Whenever query is executed following needs to be done:
+
+ 1. Save worker thread context.
+ 2. Change TLS variables to connection specific ones using thread_attach(THD*).
+ This function does some additional work.
+ 3. Process query
+ 4. Restore worker thread context.
+
+ Connection login and termination follows similar schema w.r.t saving and
+ restoring contexts.
+
+ For both worker thread, and for the connection, mysys variables are created
+ using my_thread_init() and freed with my_thread_end().
+
+*/
+class Worker_thread_context {
+#ifdef HAVE_PSI_THREAD_INTERFACE
+ PSI_thread *const psi_thread;
+#endif
+#ifndef NDEBUG
+ const my_thread_id thread_id;
+#endif
+ public:
+ Worker_thread_context() noexcept
+ :
+#ifdef HAVE_PSI_THREAD_INTERFACE
+ psi_thread(PSI_THREAD_CALL(get_thread)())
+#endif
+#ifndef NDEBUG
+ ,
+ thread_id(my_thread_var_id())
+#endif
+ {
+ }
+
+ ~Worker_thread_context() noexcept {
+#ifdef HAVE_PSI_THREAD_INTERFACE
+ PSI_THREAD_CALL(set_thread)(psi_thread);
+#endif
+#ifndef NDEBUG
+ set_my_thread_var_id(thread_id);
+#endif
+ THR_MALLOC = nullptr;
+ }
+};
+
+/*
+ Attach/associate the connection with the OS thread,
+*/
+bool thread_attach(THD *thd) {
+#ifndef NDEBUG
+ set_my_thread_var_id(thd->thread_id());
+#endif
+ thd->thread_stack = (char *)&thd;
+ thd->store_globals();
+#ifdef HAVE_PSI_THREAD_INTERFACE
+ PSI_THREAD_CALL(set_thread)(thd->get_psi());
+#endif
+ mysql_socket_set_thread_owner(
+ thd->get_protocol_classic()->get_vio()->mysql_socket);
+ return 0;
+}
+
+#ifdef HAVE_PSI_STATEMENT_INTERFACE
+extern PSI_statement_info stmt_info_new_packet;
+#endif
+
+static void threadpool_net_before_header_psi_noop(NET * /* net */,
+ void * /* user_data */,
+ size_t /* count */) {}
+
+static void threadpool_init_net_server_extension(THD *thd) {
+#ifdef HAVE_PSI_INTERFACE
+ // socket_connection.cc:init_net_server_extension should have been called
+ // already for us. We only need to overwrite the "before" callback
+ assert(thd->m_net_server_extension.m_user_data == thd);
+ thd->m_net_server_extension.m_before_header =
+ threadpool_net_before_header_psi_noop;
+#else
+ assert(thd->get_protocol_classic()->get_net()->extension == NULL);
+#endif
+}
+
+int threadpool_add_connection(THD *thd) {
+ int retval = 1;
+ Worker_thread_context worker_context;
+
+ my_thread_init();
+
+ /* Create new PSI thread for use with the THD. */
+#ifdef HAVE_PSI_THREAD_INTERFACE
+ thd->set_psi(PSI_THREAD_CALL(new_thread)(key_thread_one_connection, thd,
+ thd->thread_id()));
+#endif
+
+ /* Login. */
+ thread_attach(thd);
+ thd->start_utime = my_micro_time();
+ thd->store_globals();
+
+ if (thd_prepare_connection(thd)) {
+ goto end;
+ }
+
+ /*
+ Check if THD is ok, as prepare_new_connection_state()
+ can fail, for example if init command failed.
+ */
+ if (thd_connection_alive(thd)) {
+ retval = 0;
+ thd_set_net_read_write(thd, 1);
+ MYSQL_SOCKET_SET_STATE(thd->get_protocol_classic()->get_vio()->mysql_socket,
+ PSI_SOCKET_STATE_IDLE);
+ thd->m_server_idle = true;
+ threadpool_init_net_server_extension(thd);
+ }
+
+end:
+ if (retval) {
+ Connection_handler_manager *handler_manager =
+ Connection_handler_manager::get_instance();
+ handler_manager->inc_aborted_connects();
+ }
+ return retval;
+}
+
+
+static Connection_handler_functions tp_chf = {
+ 0,
+ tp_add_connection,
+ tp_end
+};
+
+THD_event_functions tp_event_functions = {
+ tp_wait_begin,
+ tp_wait_end,
+ tp_post_kill_notification
+};
+
+
+void threadpool_remove_connection(THD *thd) {
+ Worker_thread_context worker_context;
+
+ thread_attach(thd);
+ thd_set_net_read_write(thd, 0);
+
+ end_connection(thd);
+ close_connection(thd, 0);
+
+ thd->release_resources();
+
+#ifdef HAVE_PSI_THREAD_INTERFACE
+ PSI_THREAD_CALL(delete_thread)(thd->get_psi());
+#endif
+
+ Global_THD_manager::get_instance()->remove_thd(thd);
+ Connection_handler_manager::dec_connection_count();
+ delete thd;
+}
+
+/**
+ Process a single client request or a single batch.
+*/
+int threadpool_process_request(THD *thd) {
+ int retval = 0;
+ Worker_thread_context worker_context;
+
+ thread_attach(thd);
+
+ if (thd->killed == THD::KILL_CONNECTION) {
+ /*
+ killed flag was set by timeout handler
+ or KILL command. Return error.
+ */
+ retval = 1;
+ goto end;
+ }
+
+ /*
+ In the loop below, the flow is essentially the copy of thead-per-connections
+ logic, see do_handle_one_connection() in sql_connect.c
+
+ The goal is to execute a single query, thus the loop is normally executed
+ only once. However for SSL connections, it can be executed multiple times
+ (SSL can preread and cache incoming data, and vio->has_data() checks if it
+ was the case).
+ */
+ for (;;) {
+ Vio *vio;
+ thd_set_net_read_write(thd, 0);
+
+ if ((retval = do_command(thd)) != 0) goto end;
+
+ if (!thd_connection_alive(thd)) {
+ retval = 1;
+ goto end;
+ }
+
+ vio = thd->get_protocol_classic()->get_vio();
+ if (!vio->has_data(vio)) {
+ /* More info on this debug sync is in sql_parse.cc*/
+ DEBUG_SYNC(thd, "before_do_command_net_read");
+ thd_set_net_read_write(thd, 1);
+ goto end;
+ }
+ if (!thd->m_server_idle) {
+ MYSQL_SOCKET_SET_STATE(vio->mysql_socket, PSI_SOCKET_STATE_IDLE);
+ MYSQL_START_IDLE_WAIT(thd->m_idle_psi, &thd->m_idle_state);
+ thd->m_server_idle = true;
+ }
+ }
+
+end:
+ if (!retval && !thd->m_server_idle) {
+ MYSQL_SOCKET_SET_STATE(thd->get_protocol_classic()->get_vio()->mysql_socket,
+ PSI_SOCKET_STATE_IDLE);
+ MYSQL_START_IDLE_WAIT(thd->m_idle_psi, &thd->m_idle_state);
+ thd->m_server_idle = true;
+ }
+
+ return retval;
+}
+
+static void fix_threadpool_size(THD*,
+ struct SYS_VAR *, void*, const void* value)
+{
+ threadpool_size = *static_cast<const uint*>(value);
+ tp_set_threadpool_size(threadpool_size);
+}
+
+static void fix_threadpool_stall_limit(THD*, struct SYS_VAR *, void*, const void* value)
+{
+ threadpool_stall_limit = *static_cast<const uint*>(value);
+ tp_set_threadpool_stall_limit(threadpool_stall_limit);
+}
+
+static inline int my_getncpus() noexcept {
+#ifdef _SC_NPROCESSORS_ONLN
+ return sysconf(_SC_NPROCESSORS_ONLN);
+#else
+ return 2; /* The value returned by the old my_getncpus implementation */
+#endif
+}
+
+static MYSQL_SYSVAR_UINT(idle_timeout, threadpool_idle_timeout,
+ PLUGIN_VAR_RQCMDARG,
+ "Timeout in seconds for an idle thread in the thread pool."
+ "Worker thread will be shut down after timeout",
+ NULL, NULL, 60, 1, UINT_MAX, 1);
+
+static MYSQL_SYSVAR_UINT(oversubscribe, threadpool_oversubscribe,
+ PLUGIN_VAR_RQCMDARG,
+ "How many additional active worker threads in a group are allowed.",
+ NULL, NULL, 3, 1, 1000, 1);
+
+static MYSQL_SYSVAR_UINT(toobusy, threadpool_toobusy,
+ PLUGIN_VAR_RQCMDARG,
+ "How many additional active and waiting worker threads in a group are allowed.",
+ NULL, NULL, 13, 1, 1000, 1);
+
+static MYSQL_SYSVAR_BOOL(dedicated_listener, threadpool_dedicated_listener,
+ PLUGIN_VAR_RQCMDARG,
+ "Control whether listener be dedicated", nullptr,
+ nullptr, false);
+
+static MYSQL_SYSVAR_UINT(size, threadpool_size,
+ PLUGIN_VAR_RQCMDARG,
+ "Number of thread groups in the pool. "
+ "This parameter is roughly equivalent to maximum number of concurrently "
+ "executing threads (threads in a waiting state do not count as executing).",
+ NULL, fix_threadpool_size, (uint)my_getncpus(), 1, MAX_THREAD_GROUPS, 1);
+
+static MYSQL_SYSVAR_BOOL(sched_affinity, threadpool_sched_affinity,
+ PLUGIN_VAR_RQCMDARG,
+ "Control whether thread group scheduling affinity.", nullptr,
+ nullptr, false);
+
+static MYSQL_SYSVAR_UINT(stall_limit, threadpool_stall_limit,
+ PLUGIN_VAR_RQCMDARG,
+ "Maximum query execution time in milliseconds,"
+ "before an executing non-yielding thread is considered stalled."
+ "If a worker thread is stalled, additional worker thread "
+ "may be created to handle remaining clients.",
+ NULL, fix_threadpool_stall_limit, 500, 10, UINT_MAX, 1);
+
+static MYSQL_SYSVAR_UINT(max_threads, threadpool_max_threads,
+ PLUGIN_VAR_RQCMDARG,
+ "Maximum allowed number of worker threads in the thread pool",
+ NULL, NULL, MAX_CONNECTIONS, 1, MAX_CONNECTIONS, 1);
+
+static int threadpool_plugin_init(void *)
+{
+ DBUG_ENTER("threadpool_plugin_init");
+
+ tp_init();
+ my_connection_handler_set(&tp_chf, &tp_event_functions);
+ DBUG_RETURN(0);
+}
+
+static int threadpool_plugin_deinit(void *)
+{
+ DBUG_ENTER("threadpool_plugin_deinit");
+ my_connection_handler_reset();
+ DBUG_RETURN(0);
+}
+
+static MYSQL_THDVAR_UINT(high_prio_tickets,
+ PLUGIN_VAR_RQCMDARG,
+ "Number of tickets to enter the high priority event queue for each "
+ "transaction.",
+ NULL, NULL, UINT_MAX, 0, UINT_MAX, 1);
+
+const char *threadpool_high_prio_mode_names[] = {"transactions", "statements",
+ "none", NullS};
+TYPELIB threadpool_high_prio_mode_typelib = {
+ array_elements(threadpool_high_prio_mode_names) - 1, "",
+ threadpool_high_prio_mode_names, NULL
+};
+
+static MYSQL_THDVAR_ENUM(high_prio_mode,
+ PLUGIN_VAR_RQCMDARG,
+ "High priority queue mode: one of 'transactions', 'statements' or 'none'. "
+ "In the 'transactions' mode the thread pool uses both high- and low-priority "
+ "queues depending on whether an event is generated by an already started "
+ "transaction and whether it has any high priority tickets (see "
+ "thread_pool_high_prio_tickets). In the 'statements' mode all events (i.e. "
+ "individual statements) always go to the high priority queue, regardless of "
+ "the current transaction state and high priority tickets. "
+ "'none' is the opposite of 'statements', i.e. disables the high priority queue "
+ "completely.",
+ NULL, NULL, TP_HIGH_PRIO_MODE_TRANSACTIONS, &threadpool_high_prio_mode_typelib);
+
+static uint &idle_timeout = threadpool_idle_timeout;
+static bool &dedicated_listener = threadpool_dedicated_listener;
+static uint &size = threadpool_size;
+static bool &sched_affinity = threadpool_sched_affinity;
+static uint &stall_limit = threadpool_stall_limit;
+static uint &max_threads = threadpool_max_threads;
+static uint &oversubscribe = threadpool_oversubscribe;
+static uint &toobusy = threadpool_toobusy;
+
+SYS_VAR *system_variables[] = {
+ MYSQL_SYSVAR(idle_timeout),
+ MYSQL_SYSVAR(dedicated_listener),
+ MYSQL_SYSVAR(size),
+ MYSQL_SYSVAR(sched_affinity),
+ MYSQL_SYSVAR(max_threads),
+ MYSQL_SYSVAR(stall_limit),
+ MYSQL_SYSVAR(oversubscribe),
+ MYSQL_SYSVAR(toobusy),
+ MYSQL_SYSVAR(high_prio_tickets),
+ MYSQL_SYSVAR(high_prio_mode),
+ NULL
+};
+
+namespace Show {
+
+static ST_FIELD_INFO groups_fields_info[] =
+{
+ {"GROUP_ID", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"CONNECTIONS", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"THREADS", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"ACTIVE_THREADS", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"STANDBY_THREADS", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"QUEUE_LENGTH", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"HAS_LISTENER", 1, MYSQL_TYPE_TINY, 0, 0, 0, 0},
+ {"IS_STALLED", 1, MYSQL_TYPE_TINY, 0, 0, 0, 0},
+ {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0}
+};
+
+} // namespace Show
+
+
+static int groups_fill_table(THD* thd, TABLE_LIST* tables, Item*)
+{
+ if (!all_groups)
+ return 0;
+
+ TABLE* table = tables->table;
+ for (uint i = 0; i < MAX_THREAD_GROUPS && all_groups[i].pollfd != -1; i++)
+ {
+ thread_group_t* group = &all_groups[i];
+
+ mysql_mutex_lock(&group->mutex);
+
+ /* ID */
+ table->field[0]->store(i, true);
+ /* CONNECTION_COUNT */
+ table->field[1]->store(group->connection_count, true);
+ /* THREAD_COUNT */
+ table->field[2]->store(group->thread_count, true);
+ /* ACTIVE_THREAD_COUNT */
+ table->field[3]->store(group->active_thread_count, true);
+ /* STANDBY_THREAD_COUNT */
+ table->field[4]->store(group->waiting_thread_count, true);
+ /* QUEUE LENGTH */
+ uint queue_len = group->high_prio_queue.elements()
+ + group->queue.elements();
+ table->field[5]->store(queue_len, true);
+ /* HAS_LISTENER */
+ table->field[6]->store((longlong)(group->listener != 0), true);
+ /* IS_STALLED */
+ table->field[7]->store(group->stalled, true);
+
+ mysql_mutex_unlock(&group->mutex);
+
+ if (schema_table_store_record(thd, table))
+ return 1;
+ }
+ return 0;
+}
+
+
+static int groups_init(void* p)
+{
+ ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
+ schema->fields_info = Show::groups_fields_info;
+ schema->fill_table = groups_fill_table;
+ return 0;
+}
+
+
+namespace Show {
+
+static ST_FIELD_INFO queues_field_info[] =
+{
+ {"GROUP_ID", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"POSITION", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"PRIORITY", 1, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"CONNECTION_ID", 19, MYSQL_TYPE_LONGLONG, 0, MY_I_S_UNSIGNED, 0, 0},
+ {"QUEUEING_TIME_MICROSECONDS", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0}
+};
+
+} // namespace Show
+
+typedef connection_queue_t::Iterator connection_queue_iterator;
+
+static int queues_fill_table(THD* thd, TABLE_LIST* tables, Item*)
+{
+ if (!all_groups)
+ return 0;
+
+ TABLE* table = tables->table;
+ for (uint group_id = 0;
+ group_id < MAX_THREAD_GROUPS && all_groups[group_id].pollfd != -1;
+ group_id++)
+ {
+ thread_group_t* group = &all_groups[group_id];
+
+ mysql_mutex_lock(&group->mutex);
+ bool err = false;
+ int pos = 0;
+ ulonglong now = my_microsecond_getsystime();
+ connection_queue_t queues[NQUEUES] = {group->high_prio_queue, group->queue};
+ for (uint prio = 0; prio < NQUEUES && !err; prio++)
+ {
+ connection_queue_iterator it(queues[prio]);
+ connection_t* c;
+ while ((c = it++) != nullptr)
+ {
+ /* GROUP_ID */
+ table->field[0]->store(group_id, true);
+ /* POSITION */
+ table->field[1]->store(pos++, true);
+ /* PRIORITY */
+ table->field[2]->store(prio, true);
+ /* CONNECTION_ID */
+ if (c->thd != nullptr) {
+ table->field[3]->store(c->thd->thread_id(), true);
+ } else {
+ table->field[3]->store(0, true);
+ }
+ /* QUEUEING_TIME */
+ table->field[4]->store(now - c->enqueue_time, true);
+
+ err = schema_table_store_record(thd, table);
+ if (err)
+ break;
+ }
+ }
+ mysql_mutex_unlock(&group->mutex);
+ if (err)
+ return 1;
+ }
+ return 0;
+}
+
+static int queues_init(void* p)
+{
+ ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
+ schema->fields_info = Show::queues_field_info;
+ schema->fill_table = queues_fill_table;
+ return 0;
+}
+
+namespace Show {
+
+static ST_FIELD_INFO stats_fields_info[] =
+{
+ {"GROUP_ID", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"THREAD_CREATIONS", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {"THREAD_CREATIONS_DUE_TO_STALL", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {"WAKES", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {"WAKES_DUE_TO_STALL", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {"THROTTLES", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {"STALLS", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {"POLLS_BY_LISTENER", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {"POLLS_BY_WORKER", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {"DEQUEUES_BY_LISTENER", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {"DEQUEUES_BY_WORKER", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0}
+};
+
+} // namespace Show
+
+
+static int stats_fill_table(THD* thd, TABLE_LIST* tables, Item*)
+{
+ if (!all_groups)
+ return 0;
+
+ TABLE* table = tables->table;
+ for (uint i = 0; i < MAX_THREAD_GROUPS && all_groups[i].pollfd != -1; i++)
+ {
+ table->field[0]->store(i, true);
+ thread_group_t* group = &all_groups[i];
+
+ mysql_mutex_lock(&group->mutex);
+ thread_group_counters_t* counters = &group->counters;
+ table->field[1]->store(counters->thread_creations, true);
+ table->field[2]->store(counters->thread_creations_due_to_stall, true);
+ table->field[3]->store(counters->wakes, true);
+ table->field[4]->store(counters->wakes_due_to_stall, true);
+ table->field[5]->store(counters->throttles, true);
+ table->field[6]->store(counters->stalls, true);
+ table->field[7]->store(counters->polls[LISTENER], true);
+ table->field[8]->store(counters->polls[WORKER], true);
+ table->field[9]->store(counters->dequeues[LISTENER], true);
+ table->field[10]->store(counters->dequeues[WORKER], true);
+ mysql_mutex_unlock(&group->mutex);
+ if (schema_table_store_record(thd, table))
+ return 1;
+ }
+ return 0;
+}
+
+static int stats_init(void* p)
+{
+ ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
+ schema->fields_info = Show::stats_fields_info;
+ schema->fill_table = stats_fill_table;
+ return 0;
+}
+
+
+namespace Show {
+
+static ST_FIELD_INFO waits_fields_info[] =
+{
+ {"REASON", 16, MYSQL_TYPE_STRING, 0, 0, 0, 0},
+ {"COUNT", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0}
+};
+
+} // namespace Show
+
+/* See thd_wait_type enum for explanation*/
+static const LEX_CSTRING wait_reasons[THD_WAIT_LAST] =
+{
+ {STRING_WITH_LEN("UNKNOWN")},
+ {STRING_WITH_LEN("SLEEP")},
+ {STRING_WITH_LEN("DISKIO")},
+ {STRING_WITH_LEN("ROW_LOCK")},
+ {STRING_WITH_LEN("GLOBAL_LOCK")},
+ {STRING_WITH_LEN("META_DATA_LOCK")},
+ {STRING_WITH_LEN("TABLE_LOCK")},
+ {STRING_WITH_LEN("USER_LOCK")},
+ {STRING_WITH_LEN("BINLOG")},
+ {STRING_WITH_LEN("GROUP_COMMIT")},
+ {STRING_WITH_LEN("SYNC")}
+};
+
+extern std::atomic<uint64_t> tp_waits[THD_WAIT_LAST];
+
+static int waits_fill_table(THD* thd, TABLE_LIST* tables, Item*)
+{
+ if (!all_groups)
+ return 0;
+
+ TABLE* table = tables->table;
+ for (unsigned int i = 0; i < THD_WAIT_LAST; i++)
+ {
+ table->field[0]->store(wait_reasons[i].str, wait_reasons[i].length, system_charset_info);
+ table->field[1]->store(tp_waits[i], true);
+ if (schema_table_store_record(thd, table))
+ return 1;
+ }
+ return 0;
+}
+
+static int waits_init(void* p)
+{
+ ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
+ schema->fields_info = Show::waits_fields_info;
+ schema->fill_table = waits_fill_table;
+ return 0;
+}
+
+struct st_mysql_daemon thread_pool_plugin =
+{ MYSQL_DAEMON_INTERFACE_VERSION };
+
+static struct st_mysql_information_schema plugin_descriptor =
+{ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION };
+
+mysql_declare_plugin(thread_pool)
+{
+ MYSQL_DAEMON_PLUGIN,
+ &thread_pool_plugin,
+ "thread_pool",
+ "TEST_TEST",
+ "thread pool plugin extracted from percona server",
+ PLUGIN_LICENSE_GPL,
+ threadpool_plugin_init, /* Plugin Init */
+ nullptr, /* Plugin Check uninstall */
+ threadpool_plugin_deinit, /* Plugin Deinit */
+ 0x0100 /* 1.0 */,
+ nullptr, /* status variables */
+ system_variables, /* system variables */
+ nullptr, /* config options */
+ 0, /* flags */
+},
+{
+ MYSQL_INFORMATION_SCHEMA_PLUGIN,
+ &plugin_descriptor,
+ "THREAD_POOL_GROUPS",
+ "Vladislav Vaintroub",
+ "Provides information about threadpool groups.",
+ PLUGIN_LICENSE_GPL,
+ groups_init,
+ nullptr,
+ nullptr,
+ 0x0100,
+ nullptr,
+ nullptr,
+ nullptr,
+ 0,
+},
+{
+ MYSQL_INFORMATION_SCHEMA_PLUGIN,
+ &plugin_descriptor,
+ "THREAD_POOL_QUEUES",
+ "Vladislav Vaintroub",
+ "Provides information about threadpool queues.",
+ PLUGIN_LICENSE_GPL,
+ queues_init,
+ nullptr,
+ nullptr,
+ 0x0100,
+ nullptr,
+ nullptr,
+ nullptr,
+ 0,
+},
+{
+ MYSQL_INFORMATION_SCHEMA_PLUGIN,
+ &plugin_descriptor,
+ "THREAD_POOL_STATS",
+ "Vladislav Vaintroub",
+ "Provides performance counter information for threadpool.",
+ PLUGIN_LICENSE_GPL,
+ stats_init,
+ nullptr,
+ nullptr,
+ 0x0100,
+ nullptr,
+ nullptr,
+ nullptr,
+ 0,
+},
+{
+ MYSQL_INFORMATION_SCHEMA_PLUGIN,
+ &plugin_descriptor,
+ "THREAD_POOL_WAITS",
+ "Vladislav Vaintroub",
+ "Provides wait counters for threadpool.",
+ PLUGIN_LICENSE_GPL,
+ waits_init,
+ nullptr,
+ nullptr,
+ 0x0100,
+ nullptr,
+ nullptr,
+ nullptr,
+ 0,
+}
+mysql_declare_plugin_end;
+
+uint tp_get_thdvar_high_prio_tickets(THD *thd) {
+ return THDVAR(thd, high_prio_tickets);
+}
+
+uint tp_get_thdvar_high_prio_mode(THD *thd) {
+ return THDVAR(thd, high_prio_mode);
+}
+
diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc
new file mode 100644
index 00000000000..a9fdf3dbfcd
--- /dev/null
+++ b/plugin/thread_pool/threadpool_unix.cc
@@ -0,0 +1,1794 @@
+/* Copyright (C) 2012 Monty Program Ab
+ Copyright (C) 2022 Huawei Technologies Co., Ltd
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#include "threadpool_unix.h"
+#include "sql/debug_sync.h"
+#include "sql/log.h"
+#include "sql/protocol_classic.h"
+#include "my_sys.h"
+#include "my_systime.h"
+#include "mysql/thread_pool_priv.h" // thd_is_transaction_active()
+#include "mysql/plugin.h"
+#include "threadpool.h"
+#include <atomic>
+#include <time.h>
+
+#define MYSQL_SERVER 1
+
+/** Maximum number of native events a listener can read in one go */
+#define MAX_EVENTS 1024
+
+/** Define if wait_begin() should create threads if necessary without waiting
+for stall detection to kick in */
+#define THREADPOOL_CREATE_THREADS_ON_WAIT
+
+/** Indicates that threadpool was initialized*/
+static bool threadpool_started = false;
+
+/*
+ Define PSI Keys for performance schema.
+ We have a mutex per group, worker threads, condition per worker thread,
+ and timer thread with its own mutex and condition.
+*/
+
+#ifdef HAVE_PSI_INTERFACE
+static PSI_mutex_key key_group_mutex;
+static PSI_mutex_key key_timer_mutex;
+static PSI_mutex_info mutex_list[] = {
+ {&key_group_mutex, "group_mutex", 0, 0, PSI_DOCUMENT_ME},
+ {&key_timer_mutex, "timer_mutex", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}};
+
+static PSI_cond_key key_worker_cond;
+static PSI_cond_key key_timer_cond;
+static PSI_cond_info cond_list[] = {
+ {&key_worker_cond, "worker_cond", 0, 0, PSI_DOCUMENT_ME},
+ {&key_timer_cond, "timer_cond", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}};
+
+static PSI_thread_key key_worker_thread;
+static PSI_thread_key key_timer_thread;
+static PSI_thread_info thread_list[] = {
+ {&key_worker_thread, "worker_thread", 0, 0, PSI_DOCUMENT_ME},
+ {&key_timer_thread, "timer_thread", PSI_FLAG_SINGLETON, 0,
+ PSI_DOCUMENT_ME}};
+#endif // HAVE_PSI_INTERFACE
+
+thread_group_t all_groups[MAX_THREAD_GROUPS];
+numa_affinity_manager group_affinity;
+
+static uint group_count;
+
+/**
+ Used for printing "pool blocked" message, see
+ print_pool_blocked_message();
+*/
+static ulonglong pool_block_start;
+
+/* Global timer for all groups */
+struct pool_timer_t {
+ mysql_mutex_t mutex;
+ mysql_cond_t cond;
+ std::atomic<uint64> current_microtime;
+ std::atomic<uint64> next_timeout_check;
+ int tick_interval;
+ bool shutdown;
+};
+
+static pool_timer_t pool_timer;
+
+static void queue_put(thread_group_t *thread_group, connection_t *connection);
+static int wake_thread(thread_group_t *thread_group,
+ bool due_to_stall) noexcept;
+static void handle_event(connection_t *connection);
+static int wake_or_create_thread(thread_group_t *thread_group,
+ bool due_to_stall = false);
+static int create_worker(thread_group_t *thread_group, bool due_to_stall) noexcept;
+static void *admin_port_worker_main(void *param);
+static void *worker_main(void *param);
+static void *connection_detach_worker(void *param);
+static void check_stall(thread_group_t *thread_group);
+static void connection_abort(connection_t *connection);
+static void set_next_timeout_check(ulonglong abstime);
+static void print_pool_blocked_message(bool) noexcept;
+
+THD *thd_to_detach = nullptr;
+
+class ThreadPoolConnSet {
+public:
+ ThreadPoolConnSet() {};
+ virtual ~ThreadPoolConnSet() {};
+
+ bool empty() {
+ bool ret = false;
+ mtx.lock();
+ ret = conns.empty();
+ mtx.unlock();
+ return ret;
+ }
+
+ void killConns() {
+ mtx.lock();
+ for (auto &it: conns) {
+ THD *thd = it->thd;
+ if (current_thd != thd && thd->killed != THD::KILL_CONNECTION) {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->killed = THD::KILL_CONNECTION;
+ tp_post_kill_notification(thd);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ } else if (current_thd == thd) {
+ thd_to_detach = thd;
+ }
+ }
+ mtx.unlock();
+ }
+
+ void insert(connection_t *c) {
+ mtx.lock();
+ conns.insert(c);
+ mtx.unlock();
+ }
+
+ void erase(connection_t *c) {
+ mtx.lock();
+ conns.erase(c);
+ mtx.unlock();
+ }
+
+public:
+ std::set<connection_t *> conns;
+ std::mutex mtx;
+};
+
+ThreadPoolConnSet threadpool_thds;
+
+int vio_cancel(Vio *vio, int how)
+{
+ int r= 0;
+ DBUG_ENTER("vio_cancel");
+
+ if (vio->inactive == false)
+ {
+ assert(vio->type == VIO_TYPE_TCPIP ||
+ vio->type == VIO_TYPE_SOCKET ||
+ vio->type == VIO_TYPE_SSL);
+
+ assert(mysql_socket_getfd(vio->mysql_socket) >= 0);
+ if (mysql_socket_shutdown(vio->mysql_socket, how))
+ r= -1;
+ }
+
+ DBUG_RETURN(r);
+}
+
+/**
+ Asynchronous network IO.
+
+ We use native edge-triggered network IO multiplexing facility.
+ This maps to different APIs on different Unixes.
+
+ Supported are currently Linux with epoll, Solaris with event ports,
+ OSX and BSD with kevent. All those API's are used with one-shot flags
+ (the event is signalled once client has written something into the socket,
+ then socket is removed from the "poll-set" until the command is finished,
+ and we need to re-arm/re-register socket)
+
+ No implementation for poll/select/AIO is currently provided.
+
+ The API closely resembles all of the above mentioned platform APIs
+ and consists of following functions.
+
+ - io_poll_create()
+ Creates an io_poll descriptor
+ On Linux: epoll_create()
+
+ - io_poll_associate_fd(int poll_fd, int fd, void *data)
+ Associate file descriptor with io poll descriptor
+ On Linux : epoll_ctl(..EPOLL_CTL_ADD))
+
+ - io_poll_disassociate_fd(int pollfd, int fd)
+ Associate file descriptor with io poll descriptor
+ On Linux: epoll_ctl(..EPOLL_CTL_DEL)
+
+
+ - io_poll_start_read(int poll_fd,int fd, void *data)
+ The same as io_poll_associate_fd(), but cannot be used before
+ io_poll_associate_fd() was called.
+ On Linux : epoll_ctl(..EPOLL_CTL_MOD)
+
+ - io_poll_wait (int pollfd, native_event *native_events, int maxevents,
+ int timeout_ms)
+
+ wait until one or more descriptors added with io_poll_associate_fd()
+ or io_poll_start_read() becomes readable. Data associated with
+ descriptors can be retrieved from native_events array, using
+ native_event_get_userdata() function.
+
+
+ On Linux: epoll_wait()
+*/
+
+#if defined(__linux__)
+#ifndef EPOLLRDHUP
+/* Early 2.6 kernel did not have EPOLLRDHUP */
+#define EPOLLRDHUP 0
+#endif
+static int io_poll_create() noexcept { return epoll_create(1); }
+
+static int io_poll_associate_fd(int pollfd, int fd, void *data) noexcept {
+ struct epoll_event ev;
+ ev.data.u64 = 0; /* Keep valgrind happy */
+ ev.data.ptr = data;
+ ev.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLRDHUP | EPOLLONESHOT;
+ return epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+static int io_poll_start_read(int pollfd, int fd, void *data) noexcept {
+ struct epoll_event ev;
+ ev.data.u64 = 0; /* Keep valgrind happy */
+ ev.data.ptr = data;
+ ev.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLRDHUP | EPOLLONESHOT;
+ return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev);
+}
+
+static int io_poll_disassociate_fd(int pollfd, int fd) noexcept {
+ struct epoll_event ev;
+ return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev);
+}
+
+/*
+ Wrapper around epoll_wait.
+ NOTE - in case of EINTR, it restarts with original timeout. Since we use
+ either infinite or 0 timeouts, this is not critical
+*/
+static int io_poll_wait(int pollfd, native_event *native_events, int maxevents,
+ int timeout_ms) noexcept {
+ int ret;
+ do {
+ ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms);
+ } while (ret == -1 && errno == EINTR);
+ return ret;
+}
+
+static void *native_event_get_userdata(native_event *event) noexcept {
+ return event->data.ptr;
+}
+
+#elif defined(__FreeBSD__) || defined(__APPLE__)
+static int io_poll_create() noexcept { return kqueue(); }
+
+static int io_poll_start_read(int pollfd, int fd, void *data) noexcept {
+ struct kevent ke;
+ EV_SET(&ke, fd, EVFILT_READ, EV_ADD | EV_ONESHOT, 0, 0, data);
+ return kevent(pollfd, &ke, 1, 0, 0, 0);
+}
+
+static int io_poll_associate_fd(int pollfd, int fd, void *data) noexcept {
+ struct kevent ke;
+ EV_SET(&ke, fd, EVFILT_READ, EV_ADD | EV_ONESHOT, 0, 0, data);
+ return io_poll_start_read(pollfd, fd, data);
+}
+
+static int io_poll_disassociate_fd(int pollfd, int fd) noexcept {
+ struct kevent ke;
+ EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
+ return kevent(pollfd, &ke, 1, 0, 0, 0);
+}
+
+static int io_poll_wait(int pollfd, struct kevent *events, int maxevents,
+ int timeout_ms) noexcept {
+ struct timespec ts;
+ int ret;
+ if (timeout_ms >= 0) {
+ ts.tv_sec = timeout_ms / 1000;
+ ts.tv_nsec = (timeout_ms % 1000) * 1000000;
+ }
+ do {
+ ret = kevent(pollfd, 0, 0, events, maxevents,
+ (timeout_ms >= 0) ? &ts : nullptr);
+ } while (ret == -1 && errno == EINTR);
+ return ret;
+}
+
+static void *native_event_get_userdata(native_event *event) noexcept {
+ return event->udata;
+}
+#else
+#error not ported yet to this OS
+#endif
+
+namespace {
+
+/*
+ Prevent too many active threads executing at the same time, if the workload is
+ not CPU bound.
+*/
+inline bool too_many_active_threads(
+ const thread_group_t &thread_group) noexcept {
+ return (thread_group.active_thread_count >=
+ 1 + (int)threadpool_oversubscribe &&
+ !thread_group.stalled);
+}
+
+/*
+ Limit the number of 'busy' threads by 1 + threadpool_toobusy. A thread
+ is busy if it is in either the active state or the waiting state (i.e. between
+ thd_wait_begin() / thd_wait_end() calls).
+*/
+inline bool too_many_busy_threads(const thread_group_t &thread_group) noexcept {
+ return (thread_group.active_thread_count + thread_group.waiting_thread_count >
+ 1 + (int)threadpool_toobusy);
+}
+
+inline bool too_many_connection(const thread_group_t &thread_group) noexcept {
+ return (thread_group.connection_count > (int)threadpool_toobusy - 1);
+}
+
+/*
+ Checks if a given connection is eligible to enter the high priority queue
+ based on its current thread_pool_high_prio_mode value, available high
+ priority tickets and transactional state and whether any locks are held.
+*/
+inline bool connection_is_high_prio(const connection_t &c) noexcept {
+ const ulong mode = tp_get_thdvar_high_prio_mode(c.thd);
+
+ return (mode == TP_HIGH_PRIO_MODE_STATEMENTS) ||
+ (mode == TP_HIGH_PRIO_MODE_TRANSACTIONS && c.tickets > 0 &&
+ (thd_is_transaction_active(c.thd) ||
+ c.thd->variables.option_bits & OPTION_TABLE_LOCK ||
+ c.thd->locked_tables_mode != LTM_NONE ||
+ c.thd->mdl_context.has_locks() ||
+ c.thd->global_read_lock.is_acquired() ||
+ c.thd->mdl_context.has_locks(MDL_key::USER_LEVEL_LOCK) ||
+ c.thd->mdl_context.has_locks(MDL_key::LOCKING_SERVICE)));
+}
+
+inline bool connection_is_worker_continue(const connection_t &c) noexcept {
+ if (c.thd->is_admin_connection()) {
+ return true;
+ }
+
+ if (c.thread_group != &all_groups[c.thd->thread_id() % group_count]) {
+ return false;
+ }
+
+ if (!too_many_connection(*(c.thread_group))) {
+ return true;
+ }
+
+ const ulong mode = tp_get_thdvar_high_prio_mode(c.thd);
+ bool ret = (mode == TP_HIGH_PRIO_MODE_TRANSACTIONS && c.tickets > 0 &&
+ (thd_is_transaction_active(c.thd) ||
+ c.thd->variables.option_bits & OPTION_TABLE_LOCK ||
+ c.thd->locked_tables_mode != LTM_NONE ||
+ c.thd->mdl_context.has_locks() ||
+ c.thd->global_read_lock.is_acquired() ||
+ c.thd->mdl_context.has_locks(MDL_key::USER_LEVEL_LOCK) ||
+ c.thd->mdl_context.has_locks(MDL_key::LOCKING_SERVICE)));
+ return ret;
+}
+
+} // namespace
+
+/* Dequeue element from a workqueue */
+static connection_t *queue_get(thread_group_t *thread_group) noexcept {
+ DBUG_ENTER("queue_get");
+ thread_group->queue_event_count++;
+ connection_t *c;
+
+ if ((c = thread_group->high_prio_queue.front())) {
+ thread_group->high_prio_queue.remove(c);
+ }
+ /*
+ Don't pick events from the low priority queue if there are too many
+ active + waiting threads.
+ */
+ else if (!too_many_busy_threads(*thread_group) &&
+ (c = thread_group->queue.front())) {
+ thread_group->queue.remove(c);
+ }
+ DBUG_RETURN(c);
+}
+
+static connection_t *queue_get(thread_group_t *group, operation_origin origin) {
+ connection_t *ret = queue_get(group);
+ if (ret != nullptr) {
+ TP_INCREMENT_GROUP_COUNTER(group, dequeues[(int)origin]);
+ }
+ return ret;
+}
+
+static inline void queue_push(thread_group_t *thread_group, connection_t *connection)
+{
+ connection->enqueue_time= pool_timer.current_microtime;
+ thread_group->queue.push_back(connection);
+}
+
+static inline void high_prio_queue_push(thread_group_t *thread_group, connection_t *connection)
+{
+ connection->enqueue_time= pool_timer.current_microtime;
+ thread_group->high_prio_queue.push_back(connection);
+}
+
+class Thd_timeout_checker : public Do_THD_Impl {
+ private:
+ pool_timer_t *const m_timer;
+
+ public:
+ Thd_timeout_checker(pool_timer_t *timer) noexcept : m_timer(timer) {}
+
+ virtual ~Thd_timeout_checker() {}
+
+ virtual void operator()(THD *thd) noexcept {
+ if (thd_get_net_read_write(thd) != 1) return;
+
+ connection_t *connection = (connection_t *)thd->scheduler.data;
+ if (!connection) return;
+
+ if (connection->abs_wait_timeout <
+ m_timer->current_microtime.load(std::memory_order_relaxed)) {
+ /* Wait timeout exceeded, kill connection. */
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->killed = THD::KILL_CONNECTION;
+ tp_post_kill_notification(thd);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ } else {
+ set_next_timeout_check(connection->abs_wait_timeout);
+ }
+ }
+};
+
+/*
+ Handle wait timeout :
+ Find connections that have been idle for too long and kill them.
+ Also, recalculate time when next timeout check should run.
+*/
+static void timeout_check(pool_timer_t *timer) {
+ DBUG_ENTER("timeout_check");
+
+ /* Reset next timeout check, it will be recalculated in the loop below */
+ timer->next_timeout_check.store(ULLONG_MAX, std::memory_order_relaxed);
+
+ Thd_timeout_checker thd_timeout_checker(timer);
+ Global_THD_manager::get_instance()->do_for_all_thd_copy(&thd_timeout_checker);
+
+ DBUG_VOID_RETURN;
+}
+
+/*
+ Timer thread.
+
+ Periodically, check if one of the thread groups is stalled. Stalls happen if
+ events are not being dequeued from the queue, or from the network, Primary
+ reason for stall can be a lengthy executing non-blocking request. It could
+ also happen that thread is waiting but wait_begin/wait_end is forgotten by
+ storage engine. Timer thread will create a new thread in group in case of
+ a stall.
+
+ Besides checking for stalls, timer thread is also responsible for terminating
+ clients that have been idle for longer than wait_timeout seconds.
+
+ TODO: Let the timer sleep for long time if there is no work to be done.
+ Currently it wakes up rather often on and idle server.
+*/
+static void *timer_thread(void *param) noexcept {
+ my_thread_init();
+ DBUG_ENTER("timer_thread");
+
+ pool_timer_t *timer = (pool_timer_t *)param;
+ timer->next_timeout_check.store(ULLONG_MAX, std::memory_order_relaxed);
+ timer->current_microtime.store(my_microsecond_getsystime(),
+ std::memory_order_relaxed);
+
+ for (;;) {
+ struct timespec ts;
+
+ set_timespec_nsec(&ts, timer->tick_interval * 1000000ULL);
+ mysql_mutex_lock(&timer->mutex);
+ int err = mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
+ if (timer->shutdown) {
+ mysql_mutex_unlock(&timer->mutex);
+ break;
+ }
+ if (err == ETIMEDOUT) {
+ timer->current_microtime.store(my_microsecond_getsystime(),
+ std::memory_order_relaxed);
+
+ /* Check stalls in thread groups */
+ for (size_t i = 0; i < array_elements(all_groups); i++) {
+ if (all_groups[i].connection_count) check_stall(&all_groups[i]);
+ }
+
+ /* Check if any client exceeded wait_timeout */
+ if (timer->next_timeout_check.load(std::memory_order_relaxed) <=
+ timer->current_microtime.load(std::memory_order_relaxed))
+ timeout_check(timer);
+ }
+ mysql_mutex_unlock(&timer->mutex);
+ }
+
+ mysql_mutex_destroy(&timer->mutex);
+ my_thread_end();
+ return nullptr;
+}
+
+/*
+ Check if both the high and low priority queues are empty.
+
+ NOTE: we also consider the low priority queue empty in case it has events, but
+ they cannot be processed due to the too_many_busy_threads() limit.
+*/
+static bool queues_are_empty(const thread_group_t &tg) noexcept {
+ return (tg.high_prio_queue.is_empty() &&
+ (tg.queue.is_empty() || too_many_busy_threads(tg)));
+}
+
+static void check_stall(thread_group_t *thread_group) {
+ if (mysql_mutex_trylock(&thread_group->mutex) != 0) {
+ /* Something happens. Don't disturb */
+ return;
+ }
+
+ /*
+ Check if listener is present. If not, check whether any IO
+ events were dequeued since last time. If not, this means
+ listener is either in tight loop or thd_wait_begin()
+ was forgotten. Create a new worker(it will make itself listener).
+ */
+ if (!thread_group->listener && !thread_group->io_event_count) {
+ wake_or_create_thread(thread_group, true);
+ mysql_mutex_unlock(&thread_group->mutex);
+ return;
+ }
+
+ /* Reset io event count */
+ thread_group->io_event_count = 0;
+
+ /*
+ Check whether requests from the workqueues are being dequeued.
+
+ The stall detection and resolution works as follows:
+
+ 1. There is a counter thread_group->queue_event_count for the number of
+ events removed from the queues. Timer resets the counter to 0 on each
+ run.
+ 2. Timer determines stall if this counter remains 0 since last check
+ and at least one of the high and low priority queues is not empty.
+ 3. Once timer determined a stall it sets thread_group->stalled flag and
+ wakes and idle worker (or creates a new one, subject to throttling).
+ 4. The stalled flag is reset, when an event is dequeued.
+
+ Q : Will this handling lead to an unbound growth of threads, if queues
+ stall permanently?
+ A : No. If queues stall permanently, it is an indication for many very long
+ simultaneous queries. The maximum number of simultanoues queries is
+ max_connections, further we have threadpool_max_threads limit, upon which no
+ worker threads are created. So in case there is a flood of very long
+ queries, threadpool would slowly approach thread-per-connection behavior.
+ NOTE:
+ If long queries never wait, creation of the new threads is done by timer,
+ so it is slower than in real thread-per-connection. However if long queries
+ do wait and indicate that via thd_wait_begin/end callbacks, thread creation
+ will be faster.
+ */
+ if (!thread_group->queue_event_count && !queues_are_empty(*thread_group)) {
+ thread_group->stalled = true;
+ TP_INCREMENT_GROUP_COUNTER(thread_group, stalls);
+ wake_or_create_thread(thread_group, true);
+ }
+
+ /* Reset queue event count */
+ thread_group->queue_event_count = 0;
+
+ mysql_mutex_unlock(&thread_group->mutex);
+}
+
+static void start_timer(pool_timer_t *timer) noexcept {
+ my_thread_handle thread_id;
+ DBUG_ENTER("start_timer");
+ mysql_mutex_init(key_timer_mutex, &timer->mutex, nullptr);
+ mysql_cond_init(key_timer_cond, &timer->cond);
+ timer->shutdown = false;
+ mysql_thread_create(key_timer_thread, &thread_id, nullptr, timer_thread, timer);
+ DBUG_VOID_RETURN;
+}
+
+static void stop_timer(pool_timer_t *timer) noexcept {
+ DBUG_ENTER("stop_timer");
+ mysql_mutex_lock(&timer->mutex);
+ timer->shutdown = true;
+ mysql_cond_signal(&timer->cond);
+ mysql_mutex_unlock(&timer->mutex);
+ DBUG_VOID_RETURN;
+}
+
+/**
+ Poll for socket events and distribute them to worker threads
+ In many case current thread will handle single event itself.
+
+ @return a ready connection, or NULL on shutdown
+*/
+static connection_t *listener(thread_group_t *thread_group) {
+ DBUG_ENTER("listener");
+ connection_t *retval = nullptr;
+
+ for (;;) {
+ if (thread_group->shutdown) break;
+
+ native_event ev[MAX_EVENTS];
+ int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
+
+ DBUG_EXECUTE_IF("threadpool_io_poll_wait_at_least_2_events",
+ {
+ while (cnt < 2)
+ {
+ int cnt_again = io_poll_wait(thread_group->pollfd, ev + cnt, MAX_EVENTS - cnt, -1);
+ cnt += cnt_again;
+ }
+ }
+ );
+
+ TP_INCREMENT_GROUP_COUNTER(thread_group, polls[LISTENER]);
+ if (cnt <= 0) {
+ assert(thread_group->shutdown);
+ break;
+ }
+
+ mysql_mutex_lock(&thread_group->mutex);
+
+ if (thread_group->shutdown) {
+ mysql_mutex_unlock(&thread_group->mutex);
+ break;
+ }
+
+ thread_group->io_event_count += cnt;
+
+ /*
+ We got some network events and need to make decisions : whether
+ listener hould handle events and whether or not any wake worker
+ threads so they can handle events.
+
+ Q1 : Should listener handle an event itself, or put all events into
+ queue and let workers handle the events?
+
+ Solution :
+ Generally, listener that handles events itself is preferable. We do not
+ want listener thread to change its state from waiting to running too
+ often, Since listener has just woken from poll, it better uses its time
+ slice and does some work. Besides, not handling events means they go to
+ the queue, and often to wake another worker must wake up to handle the
+ event. This is not good, as we want to avoid wakeups.
+
+ The downside of listener that also handles queries is that we can
+ potentially leave thread group for long time not picking the new
+ network events. It is not a major problem, because this stall will be
+ detected sooner or later by the timer thread. Still, relying on timer
+ is not always good, because it may "tick" too slow (large timer_interval)
+
+ We use following strategy to solve this problem - if queue was not empty
+ we suspect flood of network events and listener stays, Otherwise, it
+ handles a query.
+
+
+ Q2: If queue is not empty, how many workers to wake?
+
+ Solution:
+ We generally try to keep one thread per group active (threads handling
+ queries are considered active, unless they stuck in inside some "wait")
+ Thus, we will wake only one worker, and only if there is not active
+ threads currently,and listener is not going to handle a query. When we
+ don't wake, we hope that currently active threads will finish fast and
+ handle the queue. If this does not happen, timer thread will detect stall
+ and wake a worker.
+
+ NOTE: Currently nothing is done to detect or prevent long queuing times.
+ A solutionc for the future would be to give up "one active thread per
+ group" principle, if events stay in the queue for too long, and just wake
+ more workers.
+ */
+
+ const bool listener_picks_event = threadpool_dedicated_listener? false :
+ (thread_group->high_prio_queue.is_empty() && thread_group->queue.is_empty());
+
+ /*
+ If listener_picks_event is set, listener thread will handle first event,
+ and put the rest into the queue. If listener_pick_event is not set, all
+ events go to the queue.
+ */
+ for (int i = (listener_picks_event) ? 1 : 0; i < cnt; i++) {
+ connection_t *c = (connection_t *)native_event_get_userdata(&ev[i]);
+ if (connection_is_high_prio(*c)) {
+ c->tickets--;
+ thread_group->high_prio_queue.push_back(c);
+ } else {
+ c->tickets = tp_get_thdvar_high_prio_tickets(c->thd);
+ queue_push(thread_group, c);
+ }
+ }
+
+ if (listener_picks_event) {
+ /* Handle the first event. */
+ retval = (connection_t *)native_event_get_userdata(&ev[0]);
+ TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues[LISTENER]);
+ mysql_mutex_unlock(&thread_group->mutex);
+ break;
+ }
+
+ /* The remaining threads can be created at most */
+ int workers_in_need = (int)threadpool_toobusy -
+ thread_group->active_thread_count - thread_group->waiting_thread_count;
+
+ /* There are no remaining threads and the thread group is stalled */
+ if (workers_in_need <= 0 && thread_group->active_thread_count == 0) {
+ workers_in_need = 1;
+ }
+
+ /* The number of threads that can be created and
+ the number of threads that are really needed, whichever is smaller */
+ workers_in_need = workers_in_need > cnt ? cnt : workers_in_need;
+
+ /* Wake up or create the required threads */
+ for (int i = 0; i < workers_in_need; i++) {
+ /* We added some work items to queue, now wake a worker. */
+ if (wake_thread(thread_group, false)) {
+ /*
+ Wake failed, hence groups has no idle threads. Now check if there are
+ any threads in the group except listener.
+ In order to achieve the best running performance of the
+ number of threads, the conditions for the wake-up or
+ creation of worker threads are relaxed.
+ The queue is not empty, and listener is not going to handle
+ events. In order to drain the queue, we create a worker here.
+ Alternatively, we could just rely on timer to detect stall, and
+ create thread, but waiting for timer would be an inefficient and
+ pointless delay.
+ */
+ create_worker(thread_group, false);
+ }
+ }
+ mysql_mutex_unlock(&thread_group->mutex);
+ }
+ DBUG_RETURN(retval);
+}
+
+/**
+ Adjust thread counters in group or global
+ whenever thread is created or is about to exit
+
+ @param thread_group
+ @param count - 1, when new thread is created
+ -1, when thread is about to exit
+*/
+static void add_thread_count(thread_group_t *thread_group,
+ int32 count) noexcept {
+ thread_group->thread_count += count;
+ /* worker starts out and end in "active" state */
+ thread_group->active_thread_count += count;
+ tp_stats.num_worker_threads.fetch_add(count, std::memory_order_relaxed);
+}
+
+/**
+ Creates a new worker thread.
+ thread_mutex must be held when calling this function
+
+ NOTE: in rare cases, the number of threads can exceed
+ threadpool_max_threads, because we need at least 2 threads
+ per group to prevent deadlocks (one listener + one worker)
+*/
+static int create_worker(thread_group_t *thread_group,
+ bool due_to_stall) noexcept {
+ my_thread_handle thread_id;
+ bool max_threads_reached = false;
+ int err;
+
+ DBUG_ENTER("create_worker");
+ if (tp_stats.num_worker_threads.load(std::memory_order_relaxed) >=
+ (int)threadpool_max_threads &&
+ thread_group->thread_count >= 2) {
+ err = 1;
+ max_threads_reached = true;
+ goto end;
+ }
+
+ err = mysql_thread_create(key_worker_thread, &thread_id,
+ thread_group->pthread_attr, worker_main,
+ thread_group);
+ if (!err) {
+ thread_group->last_thread_creation_time = my_microsecond_getsystime();
+ Global_THD_manager::get_instance()->inc_thread_created();
+ add_thread_count(thread_group, 1);
+ TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations);
+
+ if (due_to_stall) {
+ TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations_due_to_stall);
+ }
+ } else {
+ set_my_errno(errno);
+ }
+
+end:
+ if (err) {
+ print_pool_blocked_message(max_threads_reached);
+ } else {
+ pool_block_start = 0; /* Reset pool blocked timer, if it was set */
+ }
+
+ DBUG_RETURN(err);
+}
+
+/**
+ Calculate microseconds throttling delay for thread creation.
+
+ The value depends on how many threads are already in the group:
+ small number of threads means no delay, the more threads the larger
+ the delay.
+
+ The actual values were not calculated using any scientific methods.
+ They just look right, and behave well in practice.
+
+ TODO: Should throttling depend on thread_pool_stall_limit?
+*/
+static ulonglong microsecond_throttling_interval(
+ const thread_group_t &thread_group) noexcept {
+ const int count = thread_group.thread_count;
+
+ if (count < 4) return 0;
+
+ if (count < 8) return 50 * 1000;
+
+ if (count < 16) return 100 * 1000;
+
+ return 200 * 1000;
+}
+
+/**
+ Wakes a worker thread, or creates a new one.
+
+ Worker creation is throttled, so we avoid too many threads
+ to be created during the short time.
+*/
+static int wake_or_create_thread(thread_group_t *thread_group,
+ bool due_to_stall) {
+ DBUG_ENTER("wake_or_create_thread");
+
+ if (thread_group->shutdown) DBUG_RETURN(0);
+
+ if (wake_thread(thread_group, due_to_stall) == 0) DBUG_RETURN(0);
+
+ if (thread_group->thread_count > thread_group->connection_count)
+ DBUG_RETURN(-1);
+
+ /* In order to achieve the best running performance of the
+ number of threads, the conditions for the wake-up or
+ creation of worker threads are relaxed. */
+ if (thread_group->active_thread_count <
+ (1 + (int)threadpool_oversubscribe)) {
+ /*
+ We're better off creating a new thread here with no delay, either there
+ are not enough active workers, or they all are all blocking and there was no
+ idle thread to wakeup. Smells like a potential deadlock or very slowly
+ executing requests, e.g sleeps or user locks.
+ */
+ DBUG_RETURN(create_worker(thread_group, due_to_stall));
+ }
+
+ const ulonglong now = my_microsecond_getsystime();
+ const ulonglong time_since_last_thread_created =
+ (now - thread_group->last_thread_creation_time);
+
+ /* Throttle thread creation. */
+ if (time_since_last_thread_created >
+ microsecond_throttling_interval(*thread_group)) {
+ DBUG_RETURN(create_worker(thread_group, due_to_stall));
+ }
+
+ TP_INCREMENT_GROUP_COUNTER(thread_group, throttles);
+ DBUG_RETURN(-1);
+}
+
+static int thread_group_init(thread_group_t *thread_group,
+ pthread_attr_t *thread_attr) noexcept {
+ DBUG_ENTER("thread_group_init");
+ thread_group->pthread_attr = thread_attr;
+ mysql_mutex_init(key_group_mutex, &thread_group->mutex, nullptr);
+ thread_group->pollfd = -1;
+ thread_group->shutdown_pipe[0] = -1;
+ thread_group->shutdown_pipe[1] = -1;
+ thread_group->thread_count = 0;
+ thread_group->admin_port_thread_count = 0;
+ thread_group->dump_thread_count = 0;
+ thread_group->active_thread_count = 0;
+ thread_group->connection_count = 0;
+ thread_group->waiting_thread_count = 0;
+ thread_group->io_event_count = 0;
+ thread_group->queue_event_count = 0;
+ thread_group->shutdown = false;
+ thread_group->stalled = false;
+ DBUG_RETURN(0);
+}
+
+static void thread_group_destroy(thread_group_t *thread_group) noexcept {
+ mysql_mutex_destroy(&thread_group->mutex);
+ if (thread_group->pollfd != -1) {
+ close(thread_group->pollfd);
+ thread_group->pollfd = -1;
+ }
+ for (int i = 0; i < 2; i++) {
+ if (thread_group->shutdown_pipe[i] != -1) {
+ close(thread_group->shutdown_pipe[i]);
+ thread_group->shutdown_pipe[i] = -1;
+ }
+ }
+}
+
+/**
+ Wake sleeping thread from waiting list
+*/
+static int wake_thread(thread_group_t *thread_group, bool due_to_stall) noexcept {
+ DBUG_ENTER("wake_thread");
+ worker_thread_t *thread = thread_group->waiting_threads.front();
+ if (thread) {
+ thread->woken = true;
+ thread_group->waiting_threads.remove(thread);
+ mysql_cond_signal(&thread->cond);
+ TP_INCREMENT_GROUP_COUNTER(thread_group, wakes);
+ if (due_to_stall) {
+ TP_INCREMENT_GROUP_COUNTER(thread_group, wakes_due_to_stall);
+ }
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
+}
+
+/**
+ Shutdown for thread group
+*/
+static void thread_group_close(thread_group_t *thread_group) noexcept {
+ DBUG_ENTER("thread_group_close");
+
+ mysql_mutex_lock(&thread_group->mutex);
+ if (thread_group->thread_count == 0) {
+ mysql_mutex_unlock(&thread_group->mutex);
+ thread_group_destroy(thread_group);
+ DBUG_VOID_RETURN;
+ }
+
+ thread_group->shutdown = true;
+ thread_group->listener = nullptr;
+
+ if (pipe(thread_group->shutdown_pipe)) {
+ mysql_mutex_unlock(&thread_group->mutex);
+ DBUG_VOID_RETURN;
+ }
+
+ /* Wake listener */
+ if (io_poll_associate_fd(thread_group->pollfd,
+ thread_group->shutdown_pipe[0], nullptr)) {
+ mysql_mutex_unlock(&thread_group->mutex);
+ DBUG_VOID_RETURN;
+ }
+ char c = 0;
+ if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) {
+ mysql_mutex_unlock(&thread_group->mutex);
+ DBUG_VOID_RETURN;
+ }
+
+ /* Wake all workers. */
+ while (wake_thread(thread_group, false) == 0) {
+ }
+
+ mysql_mutex_unlock(&thread_group->mutex);
+ DBUG_VOID_RETURN;
+}
+
+/*
+ Add work to the queue. Maybe wake a worker if they all sleep.
+
+ Currently, this function is only used when new connections need to
+ perform login (this is done in worker threads).
+*/
+static void queue_put(thread_group_t *thread_group, connection_t *connection) {
+ DBUG_ENTER("queue_put");
+
+ mysql_mutex_lock(&thread_group->mutex);
+ connection->tickets = tp_get_thdvar_high_prio_tickets(connection->thd);
+ connection->enqueue_time = pool_timer.current_microtime;
+
+ queue_push(thread_group, connection);
+
+ /* In order to achieve the best running performance of the
+ number of threads, the conditions for the wake-up or
+ creation of worker threads are relaxed. */
+ if (thread_group->active_thread_count <
+ 1 + (int)threadpool_oversubscribe) {
+ wake_or_create_thread(thread_group, false);
+ }
+
+ mysql_mutex_unlock(&thread_group->mutex);
+
+ DBUG_VOID_RETURN;
+}
+
+/**
+ Retrieve a connection with pending event.
+
+ Pending event in our case means that there is either a pending login request
+ (if connection is not yet logged in), or there are unread bytes on the socket.
+
+ If there are no pending events currently, thread will wait.
+ If timeout specified in abstime parameter passes, the function returns nullptr.
+
+ @param current_thread - current worker thread
+ @param thread_group - current thread group
+ @param abstime - absolute wait timeout
+
+ @return
+ connection with pending event.
+ nullptr is returned if timeout has expired,or on shutdown.
+*/
+static connection_t *get_event(worker_thread_t *current_thread,
+ thread_group_t *thread_group,
+ struct timespec *abstime) {
+ DBUG_ENTER("get_event");
+ connection_t *connection = nullptr;
+ int err = 0;
+
+ mysql_mutex_lock(&thread_group->mutex);
+ assert(thread_group->active_thread_count >= 0);
+
+ for (;;) {
+ const bool oversubscribed = too_many_active_threads(*thread_group);
+ if (thread_group->shutdown) break;
+
+ /* Check if queue is not empty */
+ if (!oversubscribed) {
+ connection = queue_get(thread_group, WORKER);
+ if (connection) break;
+ }
+
+ /* If there is currently no listener in the group, become one. */
+ if (!thread_group->listener) {
+ thread_group->listener = current_thread;
+ thread_group->active_thread_count--;
+ mysql_mutex_unlock(&thread_group->mutex);
+
+ connection = listener(thread_group);
+
+ mysql_mutex_lock(&thread_group->mutex);
+ thread_group->active_thread_count++;
+ /* There is no listener anymore, it just returned. */
+ thread_group->listener = nullptr;
+ break;
+ }
+
+ /*
+ Last thing we try before going to sleep is to
+ pick a single event via epoll, without waiting (timeout 0)
+ */
+ if (!oversubscribed) {
+ native_event nev;
+ if (io_poll_wait(thread_group->pollfd, &nev, 1, 0) == 1) {
+ thread_group->io_event_count++;
+ TP_INCREMENT_GROUP_COUNTER(thread_group, polls[WORKER]);
+ connection = (connection_t *)native_event_get_userdata(&nev);
+
+ /*
+ Since we are going to perform an out-of-order event processing for the
+ connection, first check whether it is eligible for high priority
+ processing. We can get here even if there are queued events, so it
+ must either have a high priority ticket, or there must be not too many
+ busy threads (as if it was coming from a low priority queue).
+ */
+ if (connection_is_high_prio(*connection))
+ connection->tickets--;
+ else if (too_many_busy_threads(*thread_group)) {
+ /*
+ Not eligible for high priority processing. Restore tickets and put
+ it into the low priority queue.
+ */
+ connection->tickets = tp_get_thdvar_high_prio_tickets(connection->thd);
+ thread_group->queue.push_back(connection);
+ connection = nullptr;
+ }
+
+ if (connection) {
+ TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues[WORKER]);
+ thread_group->queue_event_count++;
+ break;
+ }
+ }
+ }
+
+ /* And now, finally sleep */
+ current_thread->woken = false; /* wake() sets this to true */
+
+ /*
+ Add current thread to the head of the waiting list and wait.
+ It is important to add thread to the head rather than tail
+ as it ensures LIFO wakeup order (hot caches, working inactivity timeout)
+ */
+ thread_group->waiting_threads.push_front(current_thread);
+
+ thread_group->active_thread_count--;
+ if (abstime) {
+ err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex,
+ abstime);
+ } else {
+ err = mysql_cond_wait(&current_thread->cond, &thread_group->mutex);
+ }
+ thread_group->active_thread_count++;
+
+ if (!current_thread->woken) {
+ /*
+ Thread was not signalled by wake(), it might be a spurious wakeup or
+ a timeout. Anyhow, we need to remove ourselves from the list now.
+ If thread was explicitly woken, than caller removed us from the list.
+ */
+ thread_group->waiting_threads.remove(current_thread);
+ }
+
+ if (err) break;
+ }
+
+ thread_group->stalled = false;
+ mysql_mutex_unlock(&thread_group->mutex);
+
+ DBUG_RETURN(connection);
+}
+
+/**
+ Tells the pool that worker starts waiting on IO, lock, condition,
+ sleep() or similar.
+*/
+
+static void wait_begin(thread_group_t *thread_group) noexcept {
+ DBUG_ENTER("wait_begin");
+ mysql_mutex_lock(&thread_group->mutex);
+ thread_group->active_thread_count--;
+ thread_group->waiting_thread_count++;
+
+ assert(thread_group->active_thread_count >= 0);
+ assert(thread_group->connection_count > 0);
+
+#ifdef THREADPOOL_CREATE_THREADS_ON_WAIT
+ /* In order to achieve the best running performance of the
+ number of threads, the conditions for the wake-up or
+ creation of worker threads are relaxed. */
+ if ((thread_group->active_thread_count < (1 + (int)threadpool_oversubscribe)) &&
+ (!queues_are_empty(*thread_group) || !thread_group->listener)) {
+ /*
+ Group might stall while this thread waits, thus wake
+ or create a worker to prevent stall.
+ */
+ wake_or_create_thread(thread_group);
+ }
+#endif
+
+ mysql_mutex_unlock(&thread_group->mutex);
+ DBUG_VOID_RETURN;
+}
+
+/**
+ Tells the pool has finished waiting.
+*/
+static void wait_end(thread_group_t *thread_group) noexcept {
+ DBUG_ENTER("wait_end");
+ mysql_mutex_lock(&thread_group->mutex);
+ thread_group->active_thread_count++;
+ thread_group->waiting_thread_count--;
+ mysql_mutex_unlock(&thread_group->mutex);
+ DBUG_VOID_RETURN;
+}
+
+/**
+ Allocate/initialize a new connection structure.
+*/
+
+static connection_t *alloc_connection(THD *thd) noexcept {
+ DBUG_ENTER("alloc_connection");
+ DBUG_EXECUTE_IF("simulate_tp_alloc_connection_oom", DBUG_RETURN(nullptr););
+
+ connection_t *connection = (connection_t *)my_malloc(
+ PSI_NOT_INSTRUMENTED /*key_memory_thread_pool_connection*/,
+ sizeof(connection_t), 0);
+ if (connection) {
+ connection->thd = thd;
+ connection->waiting = false;
+ connection->logged_in = false;
+ connection->bound_to_poll_descriptor = false;
+ connection->abs_wait_timeout = ULLONG_MAX;
+ connection->tickets = 0;
+ }
+ DBUG_RETURN(connection);
+}
+
+/**
+ Add a new connection to thread pool..
+*/
+
+bool tp_add_connection(
+ Channel_info *channel_info) {
+ DBUG_ENTER("Thread_pool_connection_handler::add_connection");
+
+ THD *const thd = channel_info->create_thd();
+
+ if (unlikely(!thd)) {
+ channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false);
+ DBUG_RETURN(true);
+ }
+
+ connection_t *const connection = alloc_connection(thd);
+
+ if (unlikely(!connection)) {
+ thd->get_protocol_classic()->end_net();
+ delete thd;
+ // channel will be closed by send_error_and_close_channel()
+ channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false);
+ DBUG_RETURN(true);
+ }
+
+ delete channel_info;
+
+ thd->set_new_thread_id();
+ thd->start_utime = my_micro_time();
+
+ threadpool_thds.insert(connection);
+ Global_THD_manager::get_instance()->add_thd(thd);
+
+ thd->scheduler.data = connection;
+
+ /* Assign connection to a group. */
+ thread_group_t *group = &all_groups[thd->thread_id() % group_count];
+
+ connection->thread_group = group;
+
+ if (thd->is_admin_connection()) {
+ my_thread_handle thread_id;
+ mysql_mutex_lock(&group->mutex);
+ int err = mysql_thread_create(key_worker_thread, &thread_id,
+ group->pthread_attr, admin_port_worker_main, connection);
+
+ if (err) {
+ set_my_errno(errno);
+ print_pool_blocked_message(false);
+ } else {
+ group->admin_port_thread_count++;
+ }
+ mysql_mutex_unlock(&group->mutex);
+ } else {
+ mysql_mutex_lock(&group->mutex);
+ group->connection_count++;
+ mysql_mutex_unlock(&group->mutex);
+
+ /*
+ Add connection to the work queue. Actual login
+ will be done by a worker thread.
+ */
+ queue_put(group, connection);
+ }
+
+ DBUG_RETURN(false);
+}
+
+/**
+ Terminate connection.
+*/
+static void connection_abort(connection_t *connection) {
+ DBUG_ENTER("connection_abort");
+ threadpool_thds.erase(connection);
+
+ thread_group_t *group = connection->thread_group;
+ bool is_admin_port = connection->thd->is_admin_connection();
+ threadpool_remove_connection(connection->thd);
+
+ if (!is_admin_port) {
+ mysql_mutex_lock(&group->mutex);
+ group->connection_count--;
+ mysql_mutex_unlock(&group->mutex);
+ }
+
+ my_free(connection);
+ DBUG_VOID_RETURN;
+}
+
+/**
+ Detach connection.
+*/
+static void connection_detach(connection_t *connection) {
+ DBUG_ENTER("connection_detach");
+ threadpool_thds.erase(connection);
+
+ thread_group_t *group = connection->thread_group;
+ bool is_admin_port = connection->thd->is_admin_connection();
+ Vio *const vio = connection->thd->get_protocol_classic()->get_vio();
+ const int fd = mysql_socket_getfd(vio->mysql_socket);
+ mysql_mutex_lock(&group->mutex);
+ io_poll_disassociate_fd(group->pollfd, fd);
+ connection->bound_to_poll_descriptor = false;
+ mysql_mutex_unlock(&group->mutex);
+
+ if (!is_admin_port) {
+ mysql_mutex_lock(&group->mutex);
+ group->connection_count--;
+ mysql_mutex_unlock(&group->mutex);
+ }
+
+ my_thread_handle thread_id;
+
+ if (mysql_thread_create(key_worker_thread, &thread_id, group->pthread_attr,
+ connection_detach_worker, connection->thd)) {
+ threadpool_remove_connection(connection->thd);
+ }
+
+ my_free(connection);
+ DBUG_VOID_RETURN;
+}
+
+
+static void *connection_detach_worker(void *param) {
+ my_thread_init();
+ DBUG_ENTER("connection_detach_worker");
+ THD *thd = static_cast<THD *>(param);
+ assert(thd != nullptr);
+ thread_attach(thd);
+
+ while (1) {
+ if (threadpool_process_request(thd)) {
+ break;
+ }
+ }
+
+ threadpool_remove_connection(thd);
+ return nullptr;
+}
+
+/**
+ MySQL scheduler callback : kill connection
+*/
+
+void tp_post_kill_notification(THD *thd) noexcept {
+ DBUG_ENTER("tp_post_kill_notification");
+ if (current_thd == thd || thd->system_thread) {
+ DBUG_VOID_RETURN;
+ }
+
+ Vio *vio = thd->get_protocol_classic()->get_vio();
+ if (vio) vio_cancel(vio, SHUT_RD);
+ DBUG_VOID_RETURN;
+}
+
+alignas(CPU_LEVEL1_DCACHE_LINESIZE) std::atomic<uint64_t> tp_waits[THD_WAIT_LAST];
+
+/**
+ MySQL scheduler callback: wait begin
+*/
+void tp_wait_begin(THD *thd, int type MY_ATTRIBUTE((unused))) {
+ DBUG_ENTER("tp_wait_begin");
+
+ if (thd == nullptr) {
+ DBUG_VOID_RETURN;
+ }
+
+ connection_t *connection = (connection_t *)thd->scheduler.data;
+
+ if (connection && connection->thd &&
+ !connection->thd->is_admin_connection()) {
+ assert(!connection->waiting);
+ connection->waiting = true;
+ assert(type > 0 && type < THD_WAIT_LAST);
+ tp_waits[type]++;
+ wait_begin(connection->thread_group);
+ }
+ DBUG_VOID_RETURN;
+}
+
+/**
+ MySQL scheduler callback: wait end
+*/
+
+void tp_wait_end(THD *thd) {
+ DBUG_ENTER("tp_wait_end");
+
+ if (thd == nullptr) {
+ DBUG_VOID_RETURN;
+ }
+ connection_t *connection = (connection_t *)thd->scheduler.data;
+
+ if (connection && connection->thd &&
+ !connection->thd->is_admin_connection()) {
+ assert(connection->waiting);
+ connection->waiting = false;
+ wait_end(connection->thread_group);
+ }
+ DBUG_VOID_RETURN;
+}
+
+static void set_next_timeout_check(ulonglong abstime) {
+ DBUG_ENTER("set_next_timeout_check");
+ while (abstime < pool_timer.next_timeout_check.load()) {
+ uint64 old = pool_timer.next_timeout_check.load();
+ pool_timer.next_timeout_check.compare_exchange_weak(old, abstime);
+ }
+ DBUG_VOID_RETURN;
+}
+
+
+
+ inline ulong get_wait_timeout(THD *thd) noexcept {
+ return thd->variables.net_wait_timeout;
+ }
+
+/**
+ Set wait timeout for connection.
+*/
+
+static void set_wait_timeout(connection_t *c) noexcept {
+ DBUG_ENTER("set_wait_timeout");
+ /*
+ Calculate wait deadline for this connection.
+ Instead of using my_microsecond_getsystime() which has a syscall
+ overhead, use pool_timer.current_microtime and take
+ into account that its value could be off by at most
+ one tick interval.
+ */
+
+ c->abs_wait_timeout =
+ pool_timer.current_microtime.load(std::memory_order_relaxed) +
+ 1000LL * pool_timer.tick_interval +
+ 1000000LL * get_wait_timeout(c->thd);
+
+ set_next_timeout_check(c->abs_wait_timeout);
+ DBUG_VOID_RETURN;
+}
+
+/**
+ Handle a (rare) special case,where connection needs to
+ migrate to a different group because group_count has changed
+ after thread_pool_size setting.
+*/
+
+static int change_group(connection_t *c, thread_group_t *old_group,
+ thread_group_t *new_group) {
+ assert(c->thread_group == old_group);
+
+ /* Remove connection from the old group. */
+ if (c->bound_to_poll_descriptor) {
+ Vio *const vio = c->thd->get_protocol_classic()->get_vio();
+ const int fd = mysql_socket_getfd(vio->mysql_socket);
+ mysql_mutex_lock(&old_group->mutex);
+ io_poll_disassociate_fd(old_group->pollfd, fd);
+ c->bound_to_poll_descriptor = false;
+ } else {
+ mysql_mutex_lock(&old_group->mutex);
+ }
+ c->thread_group->connection_count--;
+ mysql_mutex_unlock(&old_group->mutex);
+
+ /* Add connection to the new group. */
+ mysql_mutex_lock(&new_group->mutex);
+ c->thread_group = new_group;
+ new_group->connection_count++;
+ /* Ensure that there is a listener in the new group. */
+ int ret = 0;
+ if (!new_group->thread_count) ret = create_worker(new_group, false);
+ mysql_mutex_unlock(&new_group->mutex);
+ return ret;
+}
+
+static int start_io(connection_t *connection) {
+ /*
+ Usually, connection will stay in the same group for the entire
+ connection's life. However, we do allow group_count to
+ change at runtime, which means in rare cases when it changes is
+ connection should need to migrate to another group, this ensures
+ to ensure equal load between groups.
+
+ So we recalculate in which group the connection should be, based
+ on thread_id and current group count, and migrate if necessary.
+ */
+ thread_group_t *const group =
+ &all_groups[connection->thd->thread_id() % group_count];
+
+ if (group != connection->thread_group) {
+ if (change_group(connection, connection->thread_group, group)) return -1;
+ }
+
+ /*
+ Bind to poll descriptor if not yet done.
+ */
+ Vio *vio = connection->thd->get_protocol_classic()->get_vio();
+ int fd = mysql_socket_getfd(vio->mysql_socket);
+ if (!connection->bound_to_poll_descriptor) {
+ connection->bound_to_poll_descriptor = true;
+ return io_poll_associate_fd(group->pollfd, fd, connection);
+ }
+
+ return io_poll_start_read(group->pollfd, fd, connection);
+}
+
+static void handle_event(connection_t *connection) {
+ DBUG_ENTER("handle_event");
+ int err = 0;
+
+ while (1) {
+ if (!connection->logged_in) {
+ err = threadpool_add_connection(connection->thd);
+ connection->logged_in = true;
+ } else {
+ err = threadpool_process_request(connection->thd);
+ }
+
+ if (err) {
+ goto end;
+ }
+
+ if (connection->thd == thd_to_detach) {
+ connection_detach(connection);
+ goto end_return;
+ }
+
+ set_wait_timeout(connection);
+
+ if (!connection_is_worker_continue(*connection)) {
+ break;
+ }
+ }
+
+ if (!connection->thd->is_admin_connection()) {
+ err = start_io(connection);
+ }
+
+end:
+ if (err || connection->thd->is_admin_connection()) {
+ connection_abort(connection);
+ }
+
+end_return:
+ DBUG_VOID_RETURN;
+}
+
+static void *admin_port_worker_main(void *param) {
+ my_thread_init();
+ DBUG_ENTER("admin_port_worker_main");
+
+#ifdef HAVE_PSI_THREAD_INTERFACE
+ PSI_THREAD_CALL(set_thread_account)
+ (nullptr, 0, nullptr, 0);
+#endif
+
+ connection_t *connection = static_cast<connection_t *>(param);
+ assert(connection != nullptr);
+ assert(connection->thread_group != nullptr);
+ thread_group_t *group = connection->thread_group;
+
+ handle_event(connection);
+
+ mysql_mutex_lock(&group->mutex);
+ group->admin_port_thread_count--;
+ mysql_mutex_unlock(&group->mutex);
+
+ my_thread_end();
+ return nullptr;
+}
+
+/**
+ Worker thread's main
+*/
+static void *worker_main(void *param) {
+ my_thread_init();
+
+ DBUG_ENTER("worker_main");
+
+ thread_group_t *thread_group = static_cast<thread_group_t *>(param);
+ assert(thread_group != nullptr);
+
+ if (threadpool_sched_affinity) {
+ group_affinity.bind_numa((thread_group - all_groups) / sizeof(thread_group_t));
+ }
+
+ /* Init per-thread structure */
+ worker_thread_t this_thread;
+ mysql_cond_init(key_worker_cond, &this_thread.cond);
+ this_thread.thread_group = thread_group;
+ this_thread.event_count = 0;
+
+#ifdef HAVE_PSI_THREAD_INTERFACE
+ PSI_THREAD_CALL(set_thread_account)
+ (nullptr, 0, nullptr, 0);
+#endif
+
+ /* Run event loop */
+ for (;;) {
+ struct timespec ts;
+ set_timespec(&ts, threadpool_idle_timeout);
+ connection_t *connection = get_event(&this_thread, thread_group, &ts);
+
+ if (!connection) {
+ break;
+ }
+
+ this_thread.event_count++;
+ handle_event(connection);
+ }
+
+ /* Thread shutdown: cleanup per-worker-thread structure. */
+ mysql_cond_destroy(&this_thread.cond);
+
+ bool last_thread = false; /* last thread in group exits */
+ mysql_mutex_lock(&thread_group->mutex);
+ add_thread_count(thread_group, -1);
+ last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown);
+ mysql_mutex_unlock(&thread_group->mutex);
+
+ /* Last thread in group exits and pool is terminating, destroy group.*/
+ if (last_thread) {
+ thread_group_destroy(thread_group);
+ }
+
+ my_thread_end();
+ return nullptr;
+}
+
+bool tp_init() {
+ DBUG_ENTER("tp_init");
+ threadpool_started = true;
+ group_affinity.init();
+
+ for (uint i = 0; i < array_elements(all_groups); i++) {
+ thread_group_init(&all_groups[i], get_connection_attrib());
+ }
+ tp_set_threadpool_size(threadpool_size);
+ if (group_count == 0) {
+ /* Something went wrong */
+ sql_print_error("Can't set threadpool size to %d", threadpool_size);
+ DBUG_RETURN(true);
+ }
+#ifdef HAVE_PSI_INTERFACE
+ mysql_mutex_register("threadpool", mutex_list, array_elements(mutex_list));
+ mysql_cond_register("threadpool", cond_list, array_elements(cond_list));
+ mysql_thread_register("threadpool", thread_list, array_elements(thread_list));
+#endif
+
+ pool_timer.tick_interval = threadpool_stall_limit;
+ start_timer(&pool_timer);
+ DBUG_RETURN(false);
+}
+
+void tp_end_thread() {
+ if (!threadpool_started) {
+ return;
+ }
+
+ while (!threadpool_thds.empty()) {
+ my_sleep(10000);
+ }
+
+ stop_timer(&pool_timer);
+
+ for (uint i = 0; i < array_elements(all_groups); i++) {
+ thread_group_close(&all_groups[i]);
+ }
+
+ threadpool_started = false;
+}
+
+void tp_end() {
+ DBUG_ENTER("tp_end");
+ threadpool_thds.killConns();
+
+ std::thread exit_tp(tp_end_thread);
+ exit_tp.detach();
+ DBUG_VOID_RETURN;
+}
+
+/** Ensure that poll descriptors are created when threadpool_size changes */
+void tp_set_threadpool_size(uint size) noexcept {
+ if (!threadpool_started) return;
+
+ bool success = true;
+ for (uint i = 0; i < size; i++) {
+ thread_group_t *group = &all_groups[i];
+ mysql_mutex_lock(&group->mutex);
+ if (group->pollfd == -1) {
+ group->pollfd = io_poll_create();
+ success = (group->pollfd >= 0);
+ if (!success) {
+ sql_print_error("io_poll_create() failed, errno=%d\n", errno);
+ break;
+ }
+ }
+ mysql_mutex_unlock(&all_groups[i].mutex);
+ if (!success) {
+ group_count = i;
+ return;
+ }
+ }
+ group_count = size;
+}
+
+void tp_set_threadpool_stall_limit(uint limit) noexcept {
+ if (!threadpool_started) {
+ return;
+ }
+
+ mysql_mutex_lock(&(pool_timer.mutex));
+ pool_timer.tick_interval = limit;
+ mysql_mutex_unlock(&(pool_timer.mutex));
+ mysql_cond_signal(&(pool_timer.cond));
+}
+
+/**
+ Calculate number of idle/waiting threads in the pool.
+
+ Sum idle threads over all groups.
+ Don't do any locking, it is not required for stats.
+*/
+int tp_get_idle_thread_count() noexcept {
+ int sum = 0;
+ for (uint i = 0;
+ i < array_elements(all_groups) && (all_groups[i].pollfd >= 0); i++) {
+ sum += (all_groups[i].thread_count - all_groups[i].active_thread_count);
+ }
+ return sum;
+}
+
+/* Report threadpool problems */
+
+/**
+ Delay in microseconds, after which "pool blocked" message is printed.
+ (30 sec == 30 Mio usec)
+*/
+#define BLOCK_MSG_DELAY 30 * 1000000
+
+#define MAX_THREADS_REACHED_MSG \
+ "Threadpool could not create additional thread to handle queries, because the \
+number of allowed threads was reached. Increasing 'thread_pool_max_threads' \
+parameter can help in this situation.\n \
+If 'admin_port' parameter is set, you can still connect to the database with \
+superuser account (it must be TCP connection using admin_port as TCP port) \
+and troubleshoot the situation. \
+A likely cause of pool blocks are clients that lock resources for long time. \
+'show processlist' or 'show engine innodb status' can give additional hints."
+
+#define CREATE_THREAD_ERROR_MSG "Can't create threads in threadpool (errno=%d)."
+
+/**
+ Write a message when blocking situation in threadpool occurs.
+ The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds.
+ It will be just a single message for each blocking situation (to prevent
+ log flood).
+*/
+static void print_pool_blocked_message(bool max_threads_reached) noexcept {
+ ulonglong now = my_microsecond_getsystime();
+ static bool msg_written = false;
+
+ if (pool_block_start == 0) {
+ pool_block_start = now;
+ msg_written = false;
+ }
+
+ if (!msg_written && ((now > pool_block_start + BLOCK_MSG_DELAY) ||
+ (now == pool_block_start))) {
+ if (max_threads_reached)
+ sql_print_error(MAX_THREADS_REACHED_MSG);
+ else
+ sql_print_error(CREATE_THREAD_ERROR_MSG, my_errno);
+
+ if (now > pool_block_start) {
+ sql_print_information("Threadpool has been blocked for %u seconds\n",
+ (uint)((now - pool_block_start) / 1000000));
+ }
+ /* avoid reperated messages for the same blocking situation */
+ msg_written = true;
+ }
+}
diff --git a/plugin/thread_pool/threadpool_unix.h b/plugin/thread_pool/threadpool_unix.h
new file mode 100644
index 00000000000..3c561f2da75
--- /dev/null
+++ b/plugin/thread_pool/threadpool_unix.h
@@ -0,0 +1,135 @@
+/* Copyright (C) 2012 Monty Program Ab
+ Copyright (C) 2022 Huawei Technologies Co., Ltd
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
+ USA */
+
+#ifndef THREADPOOL_UNIX_H_
+#define THREADPOOL_UNIX_H_
+
+#include "mysql/service_thd_wait.h"
+#include "sql/sql_plist.h"
+#include "sql/mysqld.h"
+#include "threadpool.h"
+#include "violite.h"
+#include "numa_affinity_manager.h"
+
+#ifdef __linux__
+#include <sys/epoll.h>
+typedef struct epoll_event native_event;
+#endif
+#if defined(__FreeBSD__) || defined(__APPLE__)
+#include <sys/event.h>
+typedef struct kevent native_event;
+#endif
+#if defined(__sun)
+#include <port.h>
+typedef port_event_t native_event;
+#endif
+
+#define my_microsecond_getsystime() (my_getsystime()/10)
+
+struct thread_group_t;
+
+/* Per-thread structure for workers */
+struct worker_thread_t {
+ ulonglong event_count; /* number of request handled by this thread */
+ thread_group_t *thread_group;
+ worker_thread_t *next_in_list;
+ worker_thread_t **prev_in_list;
+
+ mysql_cond_t cond;
+ bool woken;
+};
+
+typedef I_P_List<
+ worker_thread_t,
+ I_P_List_adapter<worker_thread_t, &worker_thread_t::next_in_list,
+ &worker_thread_t::prev_in_list>>
+ worker_list_t;
+
+struct connection_t {
+ THD *thd;
+ thread_group_t *thread_group;
+ connection_t *next_in_queue;
+ connection_t **prev_in_queue;
+ ulonglong abs_wait_timeout;
+ ulonglong enqueue_time;
+ bool logged_in;
+ bool bound_to_poll_descriptor;
+ bool waiting;
+ uint tickets;
+};
+
+typedef I_P_List<connection_t,
+ I_P_List_adapter<connection_t, &connection_t::next_in_queue,
+ &connection_t::prev_in_queue>,
+ I_P_List_counter, I_P_List_fast_push_back<connection_t>>
+ connection_queue_t;
+
+const int NQUEUES = 2; /* We have high and low priority queues */
+
+enum operation_origin
+{
+ WORKER,
+ LISTENER
+};
+
+struct thread_group_counters_t
+{
+ ulonglong thread_creations;
+ ulonglong thread_creations_due_to_stall;
+ ulonglong wakes;
+ ulonglong wakes_due_to_stall;
+ ulonglong throttles;
+ ulonglong stalls;
+ ulonglong dequeues[2];
+ ulonglong polls[2];
+};
+
+struct alignas(128) thread_group_t {
+ mysql_mutex_t mutex;
+ connection_queue_t queue;
+ connection_queue_t high_prio_queue;
+ worker_list_t waiting_threads;
+ worker_thread_t *listener;
+ pthread_attr_t *pthread_attr;
+ int pollfd;
+ int thread_count;
+ int admin_port_thread_count;
+ int dump_thread_count;
+ int active_thread_count;
+ int connection_count;
+ int waiting_thread_count;
+ /* Stats for the deadlock detection timer routine.*/
+ int io_event_count;
+ int queue_event_count;
+ ulonglong last_thread_creation_time;
+ int shutdown_pipe[2];
+ bool shutdown;
+ bool stalled;
+ thread_group_counters_t counters;
+ char padding[320 - sizeof(thread_group_counters_t)];
+};
+
+static_assert(sizeof(thread_group_t) == 512,
+ "sizeof(thread_group_t) must be 512 to avoid false sharing");
+
+#define TP_INCREMENT_GROUP_COUNTER(group, var) do {group->counters.var++;}while(0)
+
+extern thread_group_t all_groups[MAX_THREAD_GROUPS];
+extern numa_affinity_manager group_affinity;
+
+#endif // THREADPOOL_UNIX_H_
+
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/xurr2020/boostkit-mysql.git
git@gitee.com:xurr2020/boostkit-mysql.git
xurr2020
boostkit-mysql
boostkit-mysql
master

搜索帮助