郑州微信网站建设,高德地图搜索不到国外,wordpress宽度,上海科技网站建设注#xff1a;该文用于个人学习记录和知识交流#xff0c;如有不足#xff0c;欢迎指点。连接池有很多种#xff0c;这里介绍的是数据库连接池一、连接池是什么#xff1f;维持管理一定数量连接的池式结构维持#xff1a;不断开连接管理#xff1a;定时发送PING包给Mysq…注该文用于个人学习记录和知识交流如有不足欢迎指点。连接池有很多种这里介绍的是数据库连接池一、连接池是什么维持管理一定数量连接的池式结构维持不断开连接管理定时发送PING包给Mysql防止Mysql断开连接连接的生命周期二、连接池解决了什么问题复用资源提升并发处理sql的能力多个连接在Mysql中对应多个连接处理线程意思就是说多个连接同时发送请求mysql也能处理得过来 - 提升了服务器处理sql的能力。三、连接池的实现同步连接池res db-Query(SQLStr)应用场景服务器初始化时从数据库获取必要的数据。会阻塞发起请求的线程一个连接对应一把锁。发起请求时尝试获取锁利用当前连接访问mysql完毕后释放锁异步连接池db-AsynQuery(SQLStr, callback) 通过callback回调函数接收数据库的返回值应用场景服务器的业务处理不会阻塞发起请求的线程将请求插入消息队列n个连接对应n个线程 io密集型 一般创建 2 * cpu 核心数的连接n个线程从消息队列里面提取请求执行四、Mysql c/c 的驱动接口库驱动实现的内容连接、发送、接收基于mysql协议libmysqlclient 纯 c 实现libmysqlcppconn c 实现注意内部使用了异常机制安装sudo apt-get install libmysqlcppconn-dev阻塞 io五、异步连接池的代码实现知识点1. 调用回调函数的时机future、promise实现2. 工作线程跟连接循环包含的解决头文件中类定义内部含有别的类的时候不要用头文件而是用声明3. 移动构造的使用各种类的实现1. 连接池MySQLConnPool单例相同数据库名连接数组任务队列#pragma once #include vector #include memory #include functional #include cppconn/resultset.h #include unordered_map #include QueryCallback.h namespace sql { class ResultSet; } class MySQLConn; template typename T class BlockingQueue; class SQLOperation; class MySQLConnPool { public: static MySQLConnPool *GetInstance(const std::string db); void InitPool(const std::string url, int pool_size); QueryCallback Query(const std::string sql, std::functionvoid(std::unique_ptrsql::ResultSet) cb); private: MySQLConnPool(const std::string db) : database_(db) {} ~MySQLConnPool(); std::string database_; std::vectorMySQLConn * pool_; static std::unordered_mapstd::string, MySQLConnPool * instances_; // 静态变量确保所有对象都共用一个instances_ BlockingQueueSQLOperation * *task_queue_; };#include MySQLConnPool.h #include MySQLConn.h #include SQLOperation.h #include QueryCallback.h #include cppconn/resultset.h #include BlockingQueue.h std::unordered_mapstd::string, MySQLConnPool * MySQLConnPool::instances_; MySQLConnPool *MySQLConnPool::GetInstance(const std::string db) { if (instances_.find(db) instances_.end()) { instances_[db] new MySQLConnPool(db); } return instances_[db]; } void MySQLConnPool::InitPool(const std::string url, int pool_size) { task_queue_ new BlockingQueueSQLOperation *(); for (int i 0; i pool_size; i) { MySQLConn *conn new MySQLConn(url, database_, *task_queue_); conn-Open(); pool_.push_back(conn); } } MySQLConnPool::~MySQLConnPool() { if (task_queue_) task_queue_-Cancel(); for (auto conn : pool_) { delete conn; } if (task_queue_) { delete task_queue_; task_queue_ nullptr; } pool_.clear(); } QueryCallback MySQLConnPool::Query(const std::string sql, std::functionvoid(std::unique_ptrsql::ResultSet) cb) { SQLOperation *op new SQLOperation(sql); auto future op-GetFuture(); task_queue_-Push(op); return QueryCallback(std::move(future), std::move(cb)); }2. 连接项MySQLConn工作线程连接的信息#pragma once #include SQLOperation.h #include string namespace sql { class Driver; class Connection; class SQLException; class ResultSet; } class MySQLWorker; template typename T class BlockingQueue; class SQLOperation; struct MySQLConnInfo { explicit MySQLConnInfo(const std::string info, const std::string db); std::string user; std::string password; std::string database; std::string url; }; class MySQLConn { public: MySQLConn(const std::string info, const std::string db, BlockingQueueSQLOperation * task_queue); ~MySQLConn(); int Open(); void Close(); sql::ResultSet* Query(const std::string sql); private: void HandlerException(sql::SQLException e); sql::Driver *driver_; sql::Connection *conn_; MySQLWorker *worker_; MySQLConnInfo info_; };#include MySQLConn.h #include QueryCallback.h #include MySQLWorker.h #include BlockingQueue.h #include cppconn/driver.h #include cppconn/connection.h #include cppconn/exception.h #include cppconn/statement.h #include cppconn/resultset.h #include vector #include string // tcp://127.0.0.1:3306;root;123456 static std::vectorstd::string_view Tokenize(std::string_view str, char sep, bool keepEmpty) { std::vectorstd::string_view tokens; size_t start 0; for (size_t end str.find(sep); end ! std::string_view::npos; end str.find(sep, start)) { if (keepEmpty || (start end)) tokens.push_back(str.substr(start, end - start)); start end 1; } if (keepEmpty || (start str.length())) tokens.push_back(str.substr(start)); return tokens; } MySQLConnInfo::MySQLConnInfo(const std::string info, const std::string db) { auto tokens Tokenize(info, ;, false); if (tokens.size() ! 3) return; url.assign(tokens[0]); user.assign(tokens[1]); password.assign(tokens[2]); database.assign(db); } MySQLConn::MySQLConn(const std::string info, const std::string db, BlockingQueueSQLOperation * task_queue) : info_(info, db) { worker_ new MySQLWorker(this, task_queue); worker_-Start(); } MySQLConn::~MySQLConn() { if (worker_) { worker_-Stop(); delete worker_; worker_ nullptr; } if (conn_) { delete conn_; } } int MySQLConn::Open() { int err 0; try { driver_ get_driver_instance(); conn_ driver_-connect(info_.url, info_.user, info_.password); if (!conn_) { return -1; } conn_-setSchema(info_.database); } catch (sql::SQLException e) { HandlerException(e); err e.getErrorCode(); } return err; } void MySQLConn::Close() { if (conn_) { conn_-close(); delete conn_; conn_ nullptr; } } sql::ResultSet* MySQLConn::Query(const std::string sql) { try { sql::Statement *stmt conn_-createStatement(); return stmt-executeQuery(sql); } catch (sql::SQLException e) { HandlerException(e); } return nullptr; } void MySQLConn::HandlerException(sql::SQLException e) { if (e.getErrorCode() ! 0) { std::cerr # ERR: SQLException in __FILE__; std::cerr ( __FUNCTION__ ) on line __LINE__ std::endl; std::cerr # ERR: e.what(); std::cerr (MySQL error code: e.getErrorCode(); std::cerr , SQLState: e.getSQLState() ) std::endl; } }3. 工作线程MySQLWorker连接项任务队列#pragma once #include thread class MySQLConn; template typename T class BlockingQueue; class SQLOperation; class MySQLWorker { public: MySQLWorker(MySQLConn *conn, BlockingQueueSQLOperation * task_queue); ~MySQLWorker(); void Start(); void Stop(); private: void Worker(); MySQLConn *conn_; std::thread worker_; BlockingQueueSQLOperation * task_queue_; };#include MySQLWorker.h #include BlockingQueue.h #include SQLOperation.h #include MySQLConn.h MySQLWorker::MySQLWorker(MySQLConn *conn, BlockingQueueSQLOperation * task_queue) : conn_(conn), task_queue_(task_queue) { } MySQLWorker::~MySQLWorker() { Stop(); } void MySQLWorker::Start() { worker_ std::thread(MySQLWorker::Worker, this); } void MySQLWorker::Stop() { if (worker_.joinable()) { worker_.join(); } } void MySQLWorker::Worker() { while (true) { SQLOperation *op nullptr; if (!task_queue_.Pop(op)) { break; } op-Execute(conn_); delete op; } }4 .请求项SQLOperation插入到任务队列请求字符串promise: 存储请求结果传给future#pragma once #include string #include future #include memory #include cppconn/resultset.h namespace sql { class ResultSet; } class MySQLConn; class SQLOperation { public: explicit SQLOperation(const std::string sql) : sql_(sql) {} void Execute(MySQLConn *conn); std::futurestd::unique_ptrsql::ResultSet GetFuture() { return promise_.get_future(); } private: std::string sql_; std::promisestd::unique_ptrsql::ResultSet promise_; };#include SQLOperation.h #include MySQLConn.h void SQLOperation::Execute(MySQLConn *conn) { auto result conn-Query(sql_); promise_.set_value(std::unique_ptrsql::ResultSet(result)); }5. 回调项QueryCallbackfuture获得sql的执行结果回调函数当future有数据时执行的函数#pragma once #include future #include functional #include memory #include cppconn/resultset.h namespace sql { class ResultSet; } class QueryCallback { public: QueryCallback(std::futurestd::unique_ptrsql::ResultSet future, std::functionvoid(std::unique_ptrsql::ResultSet) cb) : future_(std::move(future)), cb_(std::move(cb)) { } bool InvokeIfReady() { if (future_.wait_for(std::chrono::seconds(0)) std::future_status::ready) { cb_(std::move(future_.get())); return true; } return false; } private: std::futurestd::unique_ptrsql::ResultSet future_; std::functionvoid(std::unique_ptrsql::ResultSet) cb_; };6. 统筹所有回调项的执行AsyncProcessor回调项数组#pragma once #include vector #include mutex class QueryCallback; class AsyncProcessor { public: void AddQueryCallback(QueryCallback query_callback); // QueryCallback含有成员future、function两个成员。前者不可拷贝深拷贝也不行后者采用深拷贝执行时间长。综上这里采用移动拷贝 /* 移动构造的本质是「资源所有权转移」 仅修改指针 / 引用的指向比如把 std::function 内部的可调用对象指针从传入的 query_callback 转移到容器的元素中 几乎无开销O (1) 操作远优于拷贝构造的 O (n) 深拷贝。 */ void InvokeIfReady(); private: std::vectorQueryCallback pending_queries_; };#include AsyncProcessor.h #include QueryCallback.h void AsyncProcessor::AddQueryCallback(QueryCallback query_callback) { pending_queries_.emplace_back(std::move(query_callback)); } void AsyncProcessor::InvokeIfReady() { for (auto it pending_queries_.begin(); it ! pending_queries_.end();) { if (it-InvokeIfReady()) it pending_queries_.erase(it); else it; } }总的运作流程创建连接池传入要连接的数据库初始化连接池任务队列创建多条连接创建、工作线程启动发起请求Query创建请求项插入任务队列基于promise得到future然后创建并返回回调项将回调项插入AsyncProcessor中让其调度回调项中回调函数的执行时机工作线程从任务队列中取出请求项执行并将结果赋值给promise pomise会通知future。即futrue.wait_for()为ready -- 回调项调用回调函数。测试用例#include MySQLConnPool.h #include AsyncProcessor.h #include cppconn/resultset.h #include iostream #include thread #include chrono /* g AsyncProcessor.cpp main.cpp MySQLConn.cpp MySQLConnPool.cpp MySQLWorker.cpp SQLOperation.cpp -o main -lpthread -lmysqlcppconn -stdc17 -g */ void HandleQueryResult(std::unique_ptrsql::ResultSet res) { while (res-next()) { std::cout U_ID: res-getInt(U_ID) U_NAME: res-getString(U_NAME) std::endl; } } int main() { MySQLConnPool *pool1 MySQLConnPool::GetInstance(DCF_DB); pool1-InitPool(tcp://127.0.0.1:3306;root;123456, 10); AsyncProcessor response_handler; auto query_callback1 pool1-Query(SELECT * FROM TBL_USER, HandleQueryResult); response_handler.AddQueryCallback(std::move(query_callback1)); while (true) { response_handler.InvokeIfReady(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } return 0; }