跳到主要内容

关系数据库同步 Database

本文介绍如何使用Logstash JDBC输入插件,通过Logstash将关系数据库中的数据同步到 Nasu Elasticsearch Serverless

前置条件提示

请确保你的机器IP已在应用白名单内

准备MySQL数据

  1. 运行以下SQL语句以生成具有三个字段的表:
CREATE DATABASE es_db;
USE es_db;
DROP TABLE IF EXISTS es_table;
CREATE TABLE es_table (
id BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY unique_id (id),
client_name VARCHAR(32) NOT NULL,
modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
  • es_table 存储数据的表的名称。
  • id 记录的主键。
  • client_name 简单的字段。
  • modification_time 插入或上次更新记录的时间戳。
  1. 添加3条记录
use es_db
INSERT INTO es_table (id, client_name)
VALUES (1,"Targaryen"),
(2,"Lannister"),
(3,"Stark");
  1. 验证数据
select * from es_table;
+----+-------------+---------------------+
| id | client_name | modification_time |
+----+-------------+---------------------+
| 1 | Targaryen | 2023-02-26 19:49:22 |
| 2 | Lannister | 2023-02-26 19:49:22 |
| 3 | Stark-aaa | 2023-02-26 19:46:04 |
+----+-------------+---------------------+

配置Logstash管道,使用JDBC输入,控制台输出。

⬇️ 下载Logstash

解压Logstash并创建 jdbc.conf,配置JDBC插件和MySQL数据库中获取数据,输出到控制台

jdbc.conf
input {
jdbc {
jdbc_driver_library => "<driverpath>/mysql-connector-java-<versionNumber>.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
jdbc_user => "<myusername>"
jdbc_password => "<mypassword>"
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
}
}
output {
stdout { codec => "rubydebug"}
}
  • jdbc_driver_library :jdbc驱动地址,例如/your_home/.m2/repository/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar
  • jdbc_user: 数据库用户名称
  • jdbc_password: 数据库用户密码

使用该配置启动logstash

bin/logstash -f jdbc.conf

Logstash通过命令行接口标准输出(stdout)

[2023-02-26T19:53:11,224][INFO ][logstash.inputs.jdbc     ][main][6582494b5d4a4cc5c92ee3580facb96880402ed2b1081493f217455765a6227a] (0.000919s) SELECT * FROM (SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > 0 AND modification_time < NOW()) ORDER BY modification_time ASC) AS `t1` LIMIT 100000 OFFSET 0
{
"unix_ts_in_secs" => 1677412162,
"id" => 2,
"client_name" => "Lannister",
"modification_time" => 2023-02-27T01:49:22.000Z,
"@version" => "1",
"@timestamp" => 2023-02-26T11:53:11.262Z
}
{
"unix_ts_in_secs" => 1677411964,
"id" => 3,
"client_name" => "Stark-aaa",
"modification_time" => 2023-02-27T01:46:04.000Z,
"@version" => "1",
"@timestamp" => 2023-02-26T11:53:11.255Z
}
{
"unix_ts_in_secs" => 1677412162,
"id" => 1,
"client_name" => "Targaryen",
"modification_time" => 2023-02-27T01:49:22.000Z,
"@version" => "1",
"@timestamp" => 2023-02-26T11:53:11.262Z
}

输出到 Nasu Elasticsearch Serverless

替换output输出到云端 Nasu Elasticsearch Serverless.

input {
jdbc {
...
# 配置输入重新消费
clean_run => true
...
}
}
output {
elasticsearch {
index => "rdbms_idx"
hosts => ["https://router.nasuyun.com:9200"]
user => "your_username"
# 配置纳速云应用的用户名及密码
password => "your_password"
document_id => "%{id}"
}
}

重启logstash,将MySQL数据输出到 Nasu Elasticsearch Serverless

bin/logstash -f jdbc.conf

通过Kibana并验证数据

主键绑定

document_id => "%{id}" 指定了主键,通过更新MySQL数据,我们同时能检查到 Elasticsearch 端保持数据一致。