简简单单 Online zuozuo :本心、输入输出、结果
文章目录
- 使用 Logstash 进行数据摄取:从 PostgreSQL 到 Elasticsearch
- 前言
- 1、什么是 Logstash
- 2、Windows 下安装 Logstash
- 3、安装 JDBC 驱动并创建管道配置
- 4、Filter 与 Output 说明
- 5、运行管道与验证
- 6、方案优点
- 7、方案缺点与适用场景
使用 Logstash 进行数据摄取:从 PostgreSQL 到 Elasticsearch
编辑 | 简简单单 Online zuozuo
地址 | https://blog.csdn.net/qq_15071263
如果觉得本文对你有帮助,欢迎关注、点赞、收藏、评论,谢谢
前言
本文介绍如何使用 Logstash 将 PostgreSQL 中的数据同步到 Elasticsearch,涵盖 Logstash 的基本概念、在 Windows 下的安装步骤、基于 JDBC 的增量摄取管道配置,以及 Input、Filter、Output 各阶段的说明。文末会总结该方案的优缺点与适用场景,便于你在实际项目中选型与落地。
#Logstash #PostgreSQL #Elasticsearch #数据摄取 #JDBC #ELK #数据同步
1、什么是 Logstash
Logstash 是 Elastic 旗下的开源数据处理管道,用于从各类数据源采集、转换并输出到不同目标,例如 Elasticsearch、Kafka、平面文件等。
一条 Logstash 管道通常包含三个阶段:
- Input(输入):数据来源,负责从数据源拉取待摄取的数据。
- Filter(过滤):对数据进行清洗、聚合、解析等转换,常用插件包括 Grok、Mutate、Date 等。
- Output(输出):数据写入的目标,如 Elasticsearch、文件、数据库等。
将数据通过 Logstash 写入 Elastic 前,需满足以下前置条件:
- 本机已安装 Logstash,并配备 PostgreSQL 的 JDBC 驱动。
- 具备可用的 PostgreSQL 数据库,且存在需要同步的表或可通过函数查询的数据。
- 已有运行中的 Elasticsearch 实例。
2、Windows 下安装 Logstash
下面简要说明在本地安装并运行 Logstash 的步骤。
安装 Java
从 Oracle 官网 下载 JDK(Java 8 或更高版本),解压到指定目录。解压完成后,需要配置环境变量,以便系统识别 Java 命令:新建环境变量JAVA_HOME,指向 JDK 安装目录,并在系统Path中追加%JAVA_HOME%\bin。在命令行中执行以下命令可验证是否安装成功:
java -version若配置正确,会输出当前 Java 版本信息。
安装 Logstash
从 Elastic 官网 下载 Logstash 安装包并解压到指定目录。在命令行中进入 Logstash 解压目录下的bin目录,执行:
bin\logstash -e"input { stdin { } } output { stdout { } }"若出现正常启动并等待 stdin 输入的提示,说明 Logstash 已能本地运行。
3、安装 JDBC 驱动并创建管道配置
安装 PostgreSQL JDBC 驱动
从 PostgreSQL 官网 下载 PostgreSQL JDBC 驱动,将得到的.jar文件放到 Logstash 可访问的目录(例如 Logstash 安装目录下的lib或单独目录)。
创建 Logstash 管道配置文件
下面是一份用于增量摄取的示例管道配置:管道会记录上次运行位置,并按调度(如每分钟)只摄取自上次运行以来有变化的数据。
input{jdbc{jdbc_driver_library=>"C:/path/to/postgresql-42.x.x.jar"jdbc_driver_class=>"org.postgresql.Driver"jdbc_connection_string=>"jdbc:postgresql://localhost:5432/your_database"jdbc_user=>"your_username"jdbc_password=>"your_password"schedule=>"* * * * *"statement=>"SELECT * FROM your_table WHERE updated_at > :sql_last_value ORDER BY updated_at"use_column_value=>truetracking_column=>"updated_at"tracking_column_type=>"timestamp"last_run_metadata_path=>"C:/path/to/last_run_metadata"}}filter{mutate{remove_field=>["date"]}mutate{rename=>{"first_name"=>"name"}}}output{elasticsearch{hosts=>["http://localhost:9200"]user=>"elastic"password=>"your_elastic_password"index=>"your_index_name"document_id=>"%{id}"}}Input 部分要点说明:
- jdbc_driver_library:PostgreSQL JDBC 驱动
.jar文件所在路径。 - jdbc_driver_class:使用的驱动类名,PostgreSQL 为
org.postgresql.Driver。 - jdbc_connection_string:PostgreSQL 连接字符串。
- jdbc_user / jdbc_password:数据库用户名与密码。
- paging:可按页拉取数据(如每页 1000 条),有利于性能与进度控制。
- schedule:调度表达式,例如
* * * * *表示每分钟执行一次(与 cron 格式一致)。 - statement:要执行的 SQL。复杂 SQL 可写入单独
.sql文件,并在配置中使用statement_filepath指向该文件。 - 增量相关:
- use_column_value设为
true时,Logstash 使用tracking_column对应列(如updated_at)的实际值作为:sql_last_value,而不是上次执行时间。 - tracking_column/tracking_column_type:用于增量跟踪的列名与类型。
- last_run_metadata_path:保存上次运行位置的元数据文件路径,供下次运行读取。
- use_column_value设为
4、Filter 与 Output 说明
Filter
Filter 为可选阶段,用于在写入目标前对数据进行清洗或转换。上述示例中:使用mutate删除了date字段,并将源数据中的first_name映射为目标中的name字段。
Output
Output 定义数据写入的目标。本例中为 Elasticsearch:需配置hosts、认证信息(如有)、索引名index,以及document_id。
在增量摄取场景下,建议显式设置document_id(如%{id}使用主键)。写入时 Elasticsearch 会按该 ID 查找文档;若已存在则更新同一条文档,避免重复。若不设置document_id,每次都会生成新文档,容易产生重复数据。
5、运行管道与验证
在命令行中进入 Logstash 安装目录,执行(将配置文件路径替换为你的实际路径):
bin\logstash -f C:\path\to\your_pipeline.conf管道启动后,可在控制台看到拉取与写入的日志。在 Kibana 或通过 Elasticsearch API 查询对应索引,即可验证数据是否按预期从 PostgreSQL 同步到 Elasticsearch。
6、方案优点
- Logstash 开源、生态成熟,易于在现有环境中部署和集成。
- 提供大量插件(200+),可通过 Filter 完成解析、转换、聚合等复杂处理。
- 数据源与 Elasticsearch 解耦,便于更换数据源或目标。
- 与 Elasticsearch 集成简单,配置清晰。
7、方案缺点与适用场景
缺点:
- 延迟:不适合对极低延迟或实时性要求很高的场景;管道越复杂,加载、转换和发送耗时越长。
- 错误与监控:若不单独做监控与告警,出错时较难排查,可能造成数据遗漏或重复。
- 重复数据:若未正确设置
document_id或增量逻辑,容易产生重复文档。 - 启动时间:相比部分轻量工具,Logstash 启动较慢。
- 配置维护:使用类 YAML 的配置文件,管道复杂时维护成本会上升。
- 资源占用:在高负载或复杂管道下,CPU、内存占用较高。
适用场景:
该方案适合需要稳定、集中式的批处理/准实时数据管道的场景,例如将业务库(PostgreSQL)数据定期或按增量同步到 Elasticsearch 做检索与分析。若需求是毫秒级实时流或超大规模实时同步,可再评估 Kafka + Connector 或 Elasticsearch 自身 CDC 等方案。
生如逆旅,一苇以航
欢迎关注、欢迎联系交流、欢迎沟通想法、欢迎交换意见、欢迎合作咨询
感谢亲的关注、点赞、收藏、评论,一键三连支持,谢谢