news 2026/6/10 10:20:16

Linux C/C++ 学习日记(50):连接池

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Linux C/C++ 学习日记(50):连接池

注:该文用于个人学习记录和知识交流,如有不足,欢迎指点。

连接池有很多种,这里介绍的是数据库连接池

一、连接池是什么?

  • 维持管理一定数量连接的池式结构
  • 维持:不断开连接
  • 管理:定时发送PING包给Mysql,防止Mysql断开连接

连接的生命周期:

二、连接池解决了什么问题?

  • 复用资源,提升并发处理sql的能力
  • 多个连接在Mysql中对应多个连接处理线程:意思就是说,多个连接同时发送请求,mysql也能处理得过来 -> 提升了服务器处理sql的能力。

三、连接池的实现

同步连接池:

res = db->Query(SQLStr)

应用场景:服务器初始化时(从数据库获取必要的数据)。

  1. 会阻塞发起请求的线程:
  2. 一个连接对应一把锁。
  3. 发起请求时尝试获取锁,利用当前连接访问mysql,完毕后释放锁

异步连接池:

db->AsynQuery(SQLStr, callback) 通过callback回调函数接收数据库的返回值

应用场景:服务器的业务处理

  1. 不会阻塞发起请求的线程
  2. 将请求插入消息队列
  3. n个连接对应n个线程 ( io密集型: 一般创建 2 * cpu 核心数的连接)
  4. n个线程从消息队列里面提取请求执行

四、Mysql c/c++ 的驱动(接口库)

驱动实现的内容:连接、发送、接收(基于mysql协议)

  • libmysqlclient 纯 c 实现
  • libmysqlcppconn c++ 实现(注意:内部使用了异常机制)
    安装:sudo apt-get install libmysqlcppconn-dev
  • 阻塞 io


五、异步连接池的代码实现

知识点:

1. 调用回调函数的时机:

future、promise实现

2. 工作线程跟连接循环包含的解决:
头文件中:类定义内部含有别的类的时候不要用头文件,而是用声明

3. 移动构造的使用

各种类的实现:

1. 连接池:MySQLConnPool

  • 单例(相同数据库名)
  • 连接数组
  • 任务队列
#pragma once #include <vector> #include <memory> #include <functional> #include <cppconn/resultset.h> #include <unordered_map> #include "QueryCallback.h" namespace sql { class ResultSet; } class MySQLConn; template <typename T> class BlockingQueue; class SQLOperation; class MySQLConnPool { public: static MySQLConnPool *GetInstance(const std::string &db); void InitPool(const std::string &url, int pool_size); QueryCallback Query(const std::string &sql, std::function<void(std::unique_ptr<sql::ResultSet>)> &&cb); private: MySQLConnPool(const std::string &db) : database_(db) {} ~MySQLConnPool(); std::string database_; std::vector<MySQLConn *> pool_; static std::unordered_map<std::string, MySQLConnPool *> instances_; // 静态变量:确保所有对象都共用一个instances_ BlockingQueue<SQLOperation *> *task_queue_; };
#include "MySQLConnPool.h" #include "MySQLConn.h" #include "SQLOperation.h" #include "QueryCallback.h" #include <cppconn/resultset.h> #include "BlockingQueue.h" std::unordered_map<std::string, MySQLConnPool *> MySQLConnPool::instances_; MySQLConnPool *MySQLConnPool::GetInstance(const std::string &db) { if (instances_.find(db) == instances_.end()) { instances_[db] = new MySQLConnPool(db); } return instances_[db]; } void MySQLConnPool::InitPool(const std::string &url, int pool_size) { task_queue_ = new BlockingQueue<SQLOperation *>(); for (int i = 0; i < pool_size; ++i) { MySQLConn *conn = new MySQLConn(url, database_, *task_queue_); conn->Open(); pool_.push_back(conn); } } MySQLConnPool::~MySQLConnPool() { if (task_queue_) task_queue_->Cancel(); for (auto conn : pool_) { delete conn; } if (task_queue_) { delete task_queue_; task_queue_ = nullptr; } pool_.clear(); } QueryCallback MySQLConnPool::Query(const std::string &sql, std::function<void(std::unique_ptr<sql::ResultSet>)> &&cb) { SQLOperation *op = new SQLOperation(sql); auto future = op->GetFuture(); task_queue_->Push(op); return QueryCallback(std::move(future), std::move(cb)); }

2. 连接项:MySQLConn

  • 工作线程
  • 连接的信息
#pragma once #include "SQLOperation.h" #include <string> namespace sql { class Driver; class Connection; class SQLException; class ResultSet; } class MySQLWorker; template <typename T> class BlockingQueue; class SQLOperation; struct MySQLConnInfo { explicit MySQLConnInfo(const std::string &info, const std::string &db); std::string user; std::string password; std::string database; std::string url; }; class MySQLConn { public: MySQLConn(const std::string &info, const std::string &db, BlockingQueue<SQLOperation *> &task_queue); ~MySQLConn(); int Open(); void Close(); sql::ResultSet* Query(const std::string &sql); private: void HandlerException(sql::SQLException &e); sql::Driver *driver_; sql::Connection *conn_; MySQLWorker *worker_; MySQLConnInfo info_; };
#include "MySQLConn.h" #include "QueryCallback.h" #include "MySQLWorker.h" #include "BlockingQueue.h" #include <cppconn/driver.h> #include <cppconn/connection.h> #include <cppconn/exception.h> #include <cppconn/statement.h> #include <cppconn/resultset.h> #include <vector> #include <string> // "tcp://127.0.0.1:3306;root;123456" static std::vector<std::string_view> Tokenize(std::string_view str, char sep, bool keepEmpty) { std::vector<std::string_view> tokens; size_t start = 0; for (size_t end = str.find(sep); end != std::string_view::npos; end = str.find(sep, start)) { if (keepEmpty || (start < end)) tokens.push_back(str.substr(start, end - start)); start = end + 1; } if (keepEmpty || (start < str.length())) tokens.push_back(str.substr(start)); return tokens; } MySQLConnInfo::MySQLConnInfo(const std::string &info, const std::string &db) { auto tokens = Tokenize(info, ';', false); if (tokens.size() != 3) return; url.assign(tokens[0]); user.assign(tokens[1]); password.assign(tokens[2]); database.assign(db); } MySQLConn::MySQLConn(const std::string &info, const std::string &db, BlockingQueue<SQLOperation *> &task_queue) : info_(info, db) { worker_ = new MySQLWorker(this, task_queue); worker_->Start(); } MySQLConn::~MySQLConn() { if (worker_) { worker_->Stop(); delete worker_; worker_ = nullptr; } if (conn_) { delete conn_; } } int MySQLConn::Open() { int err = 0; try { driver_ = get_driver_instance(); conn_ = driver_->connect(info_.url, info_.user, info_.password); if (!conn_) { return -1; } conn_->setSchema(info_.database); } catch (sql::SQLException &e) { HandlerException(e); err = e.getErrorCode(); } return err; } void MySQLConn::Close() { if (conn_) { conn_->close(); delete conn_; conn_ = nullptr; } } sql::ResultSet* MySQLConn::Query(const std::string &sql) { try { sql::Statement *stmt = conn_->createStatement(); return stmt->executeQuery(sql); } catch (sql::SQLException &e) { HandlerException(e); } return nullptr; } void MySQLConn::HandlerException(sql::SQLException &e) { if (e.getErrorCode() != 0) { std::cerr << "# ERR: SQLException in " << __FILE__; std::cerr << "(" << __FUNCTION__ << ") on line " << __LINE__ << std::endl; std::cerr << "# ERR: " << e.what(); std::cerr << " (MySQL error code: " << e.getErrorCode(); std::cerr << ", SQLState: " << e.getSQLState() << " )" << std::endl; } }

3. 工作线程:MySQLWorker

  • 连接项
  • 任务队列
#pragma once #include <thread> class MySQLConn; template <typename T> class BlockingQueue; class SQLOperation; class MySQLWorker { public: MySQLWorker(MySQLConn *conn, BlockingQueue<SQLOperation *> &task_queue); ~MySQLWorker(); void Start(); void Stop(); private: void Worker(); MySQLConn *conn_; std::thread worker_; BlockingQueue<SQLOperation *> &task_queue_; };
#include "MySQLWorker.h" #include "BlockingQueue.h" #include "SQLOperation.h" #include "MySQLConn.h" MySQLWorker::MySQLWorker(MySQLConn *conn, BlockingQueue<SQLOperation *> &task_queue) : conn_(conn), task_queue_(task_queue) { } MySQLWorker::~MySQLWorker() { Stop(); } void MySQLWorker::Start() { worker_ = std::thread(&MySQLWorker::Worker, this); } void MySQLWorker::Stop() { if (worker_.joinable()) { worker_.join(); } } void MySQLWorker::Worker() { while (true) { SQLOperation *op = nullptr; if (!task_queue_.Pop(op)) { break; } op->Execute(conn_); delete op; } }

4 .请求项:SQLOperation

插入到任务队列

  • 请求字符串
  • promise: 存储请求结果,传给future
#pragma once #include <string> #include <future> #include <memory> #include <cppconn/resultset.h> namespace sql { class ResultSet; } class MySQLConn; class SQLOperation { public: explicit SQLOperation(const std::string &sql) : sql_(sql) {} void Execute(MySQLConn *conn); std::future<std::unique_ptr<sql::ResultSet>> GetFuture() { return promise_.get_future(); } private: std::string sql_; std::promise<std::unique_ptr<sql::ResultSet>> promise_; };
#include "SQLOperation.h" #include "MySQLConn.h" void SQLOperation::Execute(MySQLConn *conn) { auto result = conn->Query(sql_); promise_.set_value(std::unique_ptr<sql::ResultSet>(result)); }

5. 回调项:QueryCallback

  • future:获得sql的执行结果
  • 回调函数:当future有数据时,执行的函数
#pragma once #include <future> #include <functional> #include <memory> #include <cppconn/resultset.h> namespace sql { class ResultSet; } class QueryCallback { public: QueryCallback(std::future<std::unique_ptr<sql::ResultSet>> &&future, std::function<void(std::unique_ptr<sql::ResultSet>)> &&cb) : future_(std::move(future)), cb_(std::move(cb)) { } bool InvokeIfReady() { if (future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { cb_(std::move(future_.get())); return true; } return false; } private: std::future<std::unique_ptr<sql::ResultSet>> future_; std::function<void(std::unique_ptr<sql::ResultSet>)> cb_; };

6. 统筹所有回调项的执行:AsyncProcessor

  • 回调项数组
#pragma once #include <vector> #include <mutex> class QueryCallback; class AsyncProcessor { public: void AddQueryCallback(QueryCallback &&query_callback); // QueryCallback含有成员future、function两个成员。前者不可拷贝(深拷贝也不行),后者采用深拷贝(执行时间长)。综上这里采用移动拷贝 /* 移动构造的本质是「资源所有权转移」: 仅修改指针 / 引用的指向(比如把 std::function 内部的可调用对象指针从传入的 query_callback 转移到容器的元素中); 几乎无开销(O (1) 操作),远优于拷贝构造的 O (n) 深拷贝。 */ void InvokeIfReady(); private: std::vector<QueryCallback> pending_queries_; };
#include "AsyncProcessor.h" #include "QueryCallback.h" void AsyncProcessor::AddQueryCallback(QueryCallback &&query_callback) { pending_queries_.emplace_back(std::move(query_callback)); } void AsyncProcessor::InvokeIfReady() { for (auto it = pending_queries_.begin(); it != pending_queries_.end();) { if (it->InvokeIfReady()) it = pending_queries_.erase(it); else ++it; } }

总的运作流程:

  1. 创建连接池(传入要连接的数据库)
  2. 初始化连接池:任务队列创建,多条连接创建、工作线程启动
  3. 发起请求Query:创建请求项插入任务队列,基于promise得到future,然后创建并返回回调项
  4. 将回调项插入AsyncProcessor中,让其调度回调项中回调函数的执行时机
  5. 工作线程从任务队列中取出请求项执行,并将结果赋值给promise, pomise会通知future。
    即futrue.wait_for()为ready --> 回调项调用回调函数。

测试用例:

#include "MySQLConnPool.h" #include "AsyncProcessor.h" #include <cppconn/resultset.h> #include <iostream> #include <thread> #include <chrono> /* g++ AsyncProcessor.cpp main.cpp MySQLConn.cpp MySQLConnPool.cpp MySQLWorker.cpp SQLOperation.cpp -o main -lpthread -lmysqlcppconn -std=c++17 -g */ void HandleQueryResult(std::unique_ptr<sql::ResultSet> res) { while (res->next()) { std::cout << "U_ID: " << res->getInt("U_ID") << " U_NAME: " << res->getString("U_NAME") << std::endl; } } int main() { MySQLConnPool *pool1 = MySQLConnPool::GetInstance("DCF_DB"); pool1->InitPool("tcp://127.0.0.1:3306;root;123456", 10); AsyncProcessor response_handler; auto query_callback1 = pool1->Query("SELECT * FROM TBL_USER", HandleQueryResult); response_handler.AddQueryCallback(std::move(query_callback1)); while (true) { response_handler.InvokeIfReady(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } return 0; }
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/5 1:31:11

GSE插件宏编辑终极指南:版本升级完整教程

GSE插件宏编辑终极指南&#xff1a;版本升级完整教程 【免费下载链接】GSE-Advanced-Macro-Compiler GSE is an alternative advanced macro editor and engine for World of Warcraft. It uses Travis for UnitTests, Coveralls to report on test coverage and the Curse pac…

作者头像 李华
网站建设 2026/6/9 1:06:10

DeepSeek-V3推理性能实战调优:从延迟瓶颈到吞吐量巅峰

你是否曾经在深夜盯着监控面板&#xff0c;看着P99延迟曲线不断攀升而束手无策&#xff1f;或者面对昂贵的GPU集群&#xff0c;却发现利用率始终无法突破60%&#xff1f;这些正是大模型推理优化中最常见的痛点。本文将带你深入DeepSeek-V3的性能调优实战&#xff0c;帮你找到那…

作者头像 李华
网站建设 2026/6/8 13:36:36

时间序列数据增强实战:5大技巧让模型性能飙升200%

还在为时间序列数据样本不足而头疼吗&#xff1f;&#x1f914; 作为技术决策者和一线开发工程师&#xff0c;我们都知道数据质量直接影响模型效果。今天就来聊聊Time-Series-Library项目中那些让模型性能翻倍的数据增强黑科技&#xff01; 【免费下载链接】Time-Series-Librar…

作者头像 李华
网站建设 2026/6/9 1:59:00

Wan2.1 GP 视频生成工具完全使用指南

Wan2.1 GP 视频生成工具完全使用指南 【免费下载链接】Wan2GP Wan 2.1 for the GPU Poor 项目地址: https://gitcode.com/gh_mirrors/wa/Wan2GP Wan2.1 GP 是一款专为消费级GPU优化的开源视频生成工具&#xff0c;它让普通用户也能轻松制作AI视频。无论你是内容创作者、…

作者头像 李华
网站建设 2026/6/3 9:47:29

零门槛部署Lucky:让你的设备轻松拥有公网访问能力

零门槛部署Lucky&#xff1a;让你的设备轻松拥有公网访问能力 【免费下载链接】lucky 软硬路由公网神器,ipv6/ipv4 端口转发,反向代理,DDNS,WOL,ipv4 stun内网穿透,cron,acme,阿里云盘,ftp,webdav,filebrowser 项目地址: https://gitcode.com/GitHub_Trending/luc/lucky …

作者头像 李华