【环球报资讯】7.2K Star!把数据库的每一行都监控到的强大数据流平台
2022-09-15 15:59:05来源:开源前哨
Debezium 是一个开源的,为捕获数据变更(CDC)提供低延迟的数据流平台。使用 Debezium 来监控数据库,应用程序可以接收来自数据库的每个行变更事件,只有提交的变更才是可见的,因此无需担心事务或变更的回滚。Debezium 提供了所有变更事件的单一模型,不必担心每种数据库管理系统的复杂性。
同时,Debezium 将历史数据变更记录在了持久化的日志中,因此应用程序可以随时停止并重新启动,启动后能够重新接收在停止运行期间错过的所有事件。
(相关资料图)
监控数据库并在数据变更时收到通知是一件很复杂的事情,关系型数据库的触发器可能很合适,但是局限于某些数据库,并且通常仅限于更新同一个数据库内的状态,不与外部进程通信。一些数据库提供了 API 或者框架来进行监控,但是没有一个统一的标准,因此每个数据库的方法都不同,还需要大量比较专业的代码来实现;与此同时,监听数据变更后,如何保证这些变更事件的有序性并降低对数据库的影响是非常有挑战性的。
Debezium 提供了完成这些工作的模块,一些模块是通用的,可以与多个数据库管理系统一起使用,但在功能和性能方面也有一些限制。其他模块是为特定的数据库管理系统定制的,功能更加强大,并且很好地利用了系统的特定功能。
项目地址:
https://github.com/debezium/debezium
基本信息基础架构Debezium 利用 Kafka 和 Kafka Connect 实现了自己的持久性、可靠性和容错性。每一个部署在 Kafka Connect 服务中的 connector 监控一个上游数据库服务器,捕获所有的数据库变更,然后记录到一个或者多个 Kafka topic(通常一个数据库表对应一个kafka topic)。Kafka 确保所有这些数据更改事件具有多个副本并且总体上有序(Kafka 只能保证一个 topic 的单个分区内有序),这样,更多的客户端可以独立接收同样的数据变更事件而对上游数据库系统造成的影响降到很小(如果 N 个应用都直接去监控数据库,对数据库的压力为 N,而用 debezium 汇报数据库更改事件到 kafka,所有的应用都去接收 kafka,可以把对数据库的压力降到1)。另外,客户端可以随时停止接收,然后重启,从上次停止接收的地方接着接收。
对于不需要或者不想要这种容错级别、性能、可扩展性、可靠性的应用,他们可以使用内嵌的 Debezium connector 引擎来直接在应用内部运行 connector。这种应用仍需要接收数据库更改事件,但更希望 connector 直接传递给它,而不是持久化到 Kafka。
常见使用场景缓存失效
缓存的内容在源头被更改或者被删除的时候立即让缓存中的条目失效。如果缓存在一个独立的进程中运行(例 Redis,Memcache,Infinispan 或者其他的),那么简单的缓存失效逻辑可以放在独立的进程或服务中,从而简化主应用的逻辑。在一些场景中,缓存失效逻辑可以更复杂一点,让它利用更改事件中的更新数据去更新缓存中受影响的条目。
简化单体应用
许多应用更新数据库,然后在数据库中的更改被提交后,做一些额外的工作:更新搜索索引,更新缓存,发送通知,运行业务逻辑,等等。这种情况通常称为双写(dual-writes),因为应用没有在一个事务内写多个系统。这样不仅应用逻辑复杂难以维护,而且双写容易丢失数据或者在一些系统更新成功而另一些系统没有更新成功的时候造成不同系统之间的状态不一致。使用 CDC,在源数据库的数据更改提交后,这些额外的工作可以被放在独立的线程或者进程(服务)中完成。这种实现方式的容错性更好,不会丢失事件,容易扩展,并且更容易支持升级。
共享数据库
当多个应用共用同一个数据库的时候,一个应用提交的更改通常要被另一个应用感知到。一种实现方式是使用消息总线,另一种实现方式,即 Debezium:每个应用可以直接监控数据库的更改,并且响应更改。
数据集成
数据通常被存储在多个地方,尤其是当数据被用于不同的目的的时候,会有不同的形式。保持多系统的同步是很有挑战性的,但是可以通过使用 Debezium 加上简单的事件处理逻辑来实现简单的 ETL 类型的解决方案。
命令查询职责分离
在命令查询职责分离 Command Query Responsibility Separation (CQRS) 架构模式中,更新数据使用了一种数据模型,读数据使用了一种或者多种数据模型。由于数据更改被记录在更新侧,这些更改将被处理以更新各种读展示。所以CQRS应用通常更复杂,尤其是他们需要保证可靠性和全序处理。Debezium 和CDC 使这种方式更可行:写操作被正常记录,但是 Debezium 捕获数据变更,并且持久化到全序流里,然后供那些需要异步更新只读视图的服务接收。
安装使用 Debezium 需要三个独立的服务:ZooKeeper、Kafka 和 Debezium 连接器服务。官方推荐使用 Docker 进行安装,数据库以 MySQL 为例。
启动 ZooKeeper$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:1.9
如果是使用 Podman,运行以下命令:
$ sudo podman pod create --name=dbz --publish "9092,3306,8083"$ sudo podman run -it --rm --name zookeeper --pod dbz quay.io/debezium/zookeeper:1.9
启动后看到如下输出:
Starting up in standalone modeZooKeeper JMX enabled by defaultUsing config: /zookeeper/conf/zoo.cfg2017-09-21 07:15:55,417 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 32017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1...port 0.0.0.0/0.0.0.0:2181启动 Kafka
$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9
如果是使用 Podman,运行以下命令:
$ sudo podman run -it --rm --name kafka --pod dbz quay.io/debezium/kafka:1.9
启动后看到如下输出:
...2017-09-21 07:16:59,085 - INFO [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected)2017-09-21 07:16:59,218 - INFO [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA...2017-09-21 07:16:59,649 - INFO [main:Logging$class@70] - [Kafka Server 1], started启动 MySQL
该容器运行一个预先配置有 inventory 数据库的 MySQL 数据库服务器:
$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9
如果是使用 Podman,运行以下命令:
$ sudo podman run -it --rm --name mysql --pod dbz -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9
启动后看到如下输出:
...[System] [MY-010931] [Server] /usr/sbin/mysqld: ready for connections. Version: "8.0.27" socket: "/var/run/mysqld/mysqld.sock" port: 3306 MySQL Community Server - GPL.[System] [MY-011323] [Server] X Plugin ready for connections. Bind-address: "::" port: 33060, socket: /var/run/mysqld/mysqlx.sock启动 Kafka Connector
该服务公开了一个 REST API 来管理 Debezium MySQL 连接器:
$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:1.9
如果是使用 Podman,运行以下命令:
$ sudo podman run -it --rm --name connect --pod dbz -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses quay.io/debezium/connect:1.9
启动后看到如下输出:
...2020-02-06 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]...2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]注册 MySQL 连接器
通过注册 Debezium MySQL 连接器,连接器将开始监控 MySQL 数据库服务器的 binlog,记录数据库的 binlog 所有事务(例如对单个行的变更)。当数据库中的一行发生变更时,Debezium 会生成一个变更事件。
配置如下:
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory" }}
使用 curl 命令进行注册:
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d "{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }"更新数据库并查看变更事件
使用 watch-topic 可以查看 dbserver1.inventory.customers 主题:
$ docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:1.9 watch-topic -a -k dbserver1.inventory.customers
在 MySQL 客户端做一次数据变更:
mysql> UPDATE customers SET first_name="Anne Marie" WHERE id=1004;Query OK, 1 row affected (0.05 sec)Rows matched: 1 Changed: 1 Warnings: 0
查改修改后的值:
mysql> SELECT * FROM customers;+------+------------+-----------+-----------------------+| id | first_name | last_name | email |+------+------------+-----------+-----------------------+| 1001 | Sally | Thomas | sally.thomas@acme.com || 1002 | George | Bailey | gbailey@foobar.com || 1003 | Edward | Walker | ed@walker.com || 1004 | Anne Marie | Kretchmar | annek@noanswer.org |+------+------------+-----------+-----------------------+4 rows in set (0.00 sec)
切换到 watch-topic 终端以查看事件:通过比较 before 和 after 结构,可以确定由于提交而在受影响的行中实际更改的内容。
{ "schema": {...}, "payload": { "before": { "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": { "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { "name": "1.9.5.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501486, "gtid": null, "file": "mysql-bin.000003", "pos": 364, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "u", "ts_ms": 1486501486308 }}