Debezium发布历史165

news/2024/7/24 6:22:01 标签: 数据库, etl, 大数据, 运维

原文地址: https://debezium.io/blog/2023/10/05/Debezium-JMX-signaling-and-notifications/

欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.

Debezium signaling and notifications - Part 3: JMX channel
October 5, 2023 by Fiore Mario Vitale
debezium features notifications signaling integration

欢迎收看我们关于德贝兹信号和通知系列的第三部分。在这篇文章中,我们将继续我们对德贝兹信号和通知的探索。特别是,我们将研究如何使用JMDA通道启用和管理这些特性。

我们还将探索如何利用jlokia通过其他API发送信号和获得通知。

通过jmx与德贝兹的相互作用
jmx代表Java管理扩展,一种用于管理和监控Java应用程序的Java技术。它提供了一种标准化的方法来监控应用程序的性能,配置设置,并与使用各种管理工具和客户端运行的Java应用程序进行交互。对于复杂的、分布式的和企业级的Java应用程序的管理和监控,jmx特别有用。

可通过JMDA通道发出信号
德贝齐斯中的信号是关于在正常执行期间执行操作的触发动作。正如在前几篇文章中所讨论的,Debezum提供了不同的非常规信号通道。在这篇文章中,我们将重点讨论JMDA通道。

若要开始使用jmx信号通道,我们需要:

在卡夫卡连接服务上启用JDB2服务器

加起来jmx 到signal.enabled.channels 连接器配置属性

使用一个jmx客户端连接到jmx服务器发送信号。

德贝兹暴露了名为MBean的信号debezium.:type=management,context=signals,server= .这豆露出来了signal 接受三个参数的业务:

信号的标识。

信号的类型,例如,执行弹弓。

JSON数据字段,包含关于指定信号类型的附加信息。

通过JMDA通道启用通知
通知是告诉你在德贝兹铵中会发生什么的关键。通过JMDA通道访问通知允许您轻松地监视Debezns,例如,增量快照的进程。

要开始使用JDB2通知通道,我们需要:

在卡夫卡连接服务上启用JDB2服务器

加起来jmx 到notification.enabled.channels 连接器配置属性

使用一个JDB2客户端连接到JDB2服务器来访问通知。

通知书上的名称debezium.:type=management,context=notifications,server= .这个豆提供了一个Notification 包含一个jmx列表的BeanCompositeData 具有下列属性的类型:

财产 描述
身份证

分配给通知的唯一标识符。关于增量快照通知,id 是一样的execute-snapshot 信号。

总数_类型

与通知相关的聚合根的数据类型.在域驱动设计中,导出事件总是指聚合。

类型

提供在aggregate_type 场地。

附加_数据

地图<字符串,并附有通知的详细信息。

让我们花点时间,看看如何发送一个增量快照,并通过jmx通道接收关于其进展的通知。

通过jmx通道发送增量快照信号
对于这个例子,我们将使用带有后GERGSQL数据库的DEBeZMR文档图像。

我们可以使用下面的码头组合文件启动所有需要的服务

version: ‘2’
services:
zookeeper:
container_name: zookeeper
image: quay.io/debezium/zookeeper:2.4
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
container_name: kafka
image: quay.io/debezium/kafka:2.4
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
container_name: postgres
image: quay.io/debezium/example-postgres:2.4
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
connect:
container_name: connect
image: quay.io/debezium/connect:2.4
ports:
- 8083:8083
- 9012:9012
- 8778:8778
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- JMXPORT=9012
- JMXHOST=0.0.0.0
- ENABLE_JOLOKIA=true
这将暴露用于连接到jmx服务器的端口9012
启用jmx并指定将用于Jmx的端口号。该值用于指定JVM参数 -Dcom.sun.management.jmxremote.port= J M X P O R T . 这 个 地 址 或 可 解 析 的 主 机 名 的 码 头 主 机 , 使 用 它 来 构 造 一 个 发 送 到 j m x 客 户 端 的 U R L 。 局 部 宿 主 值 或 127.0.0.1 将 不 起 作 用 。 通 常 可 使 用 0.0.0.0 。 该 值 用 于 指 定 J V M 参 数 − D j a v a . r m i . s e r v e r . h o s t n a m e = JMX_PORT . 这个地址或可解析的主机名的码头主机,使用它来构造一个发送到jmx客户端的URL。局部宿主值或127.0.0.1将不起作用。通常可使用0.0.0.0。该值用于指定JVM参数 -Djava.rmi.server.hostname= JMXPORT.,使jmxURL宿127.0.0.1使0.0.0.0JVMDjava.rmi.server.hostname=JMXHOST
在保存文件后debezium.yaml ,所有服务均以:

docker compose -f debezium.yaml up -d
输出会像这样

[+] Running 5/5
✔ Network deploy_default Created 0.1s
✔ Container deploy-zookeeper-1 Started 0.1s
✔ Container deploy-postgres-1 Started 0.1s
✔ Container deploy-kafka-1 Started 0.1s
✔ Container deploy-connect-1 Started
现在我们可以检查所有的服务是否都在运行

docker ps
输出应该与此相似

CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f1d49fb79dba quay.io/debezium/connect:2.4 “/docker-entrypoint.…” 3 seconds ago Up 2 seconds 0.0.0.0:8083->8083/tcp, 0.0.0.0:8778->8778/tcp, 0.0.0.0:9012->9012/tcp, 9092/tcp deploy-connect-1
e164b2651fbf quay.io/debezium/kafka:2.4 “/docker-entrypoint.…” 3 seconds ago Up 2 seconds 0.0.0.0:9092->9092/tcp deploy-kafka-1
e61116f22f9d quay.io/debezium/example-postgres:2.4 “docker-entrypoint.s…” 4 seconds ago Up 2 seconds 0.0.0.0:5432->5432/tcp deploy-postgres-1
ccb502882928 quay.io/debezium/zookeeper:2.4 “/docker-entrypoint.…” 4 seconds ago Up 2 seconds 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp deploy-zookeeper-1
此时所有服务都已启动并运行,因此我们可以通过以下配置注册连接器

{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.hostname”: “postgres”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “postgres”,
“database.server.id”: “184054”,
“database.dbname”: “postgres”,
“topic.prefix”: “dbserver1”,
“snapshot.mode”: “NEVER”,
“schema.history.internal.kafka.bootstrap.servers”: “kafka:9092”,
“schema.history.internal.kafka.topic”: “schema-changes.inventory”,
“signal.enabled.channels”: “source,jmx”,
“signal.data.collection”: “inventory.debezium_signal”,
“notification.enabled.channels”: “jmx”
}
}
此配置可使 来源 和 Jmx 通道。即使我们只希望使用JDB2来发送信号来执行增量快照, 来源 仍然需要发送信号,因为Debezns需要使用信号表来水印db日志以进行事件复制。
把用来发信号的表
现在,别担心 notification.enabled.channels 财产。我们稍后会深入研究的
把这个配置保存到一个文件中 后记-jmx.json ,我们可以登记。

注册连接器我们可以使用curl 调用卡夫卡连接API

curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘{“name”:“inventory-connector”,“config”:{“connector.class”:“io.debezium.connector.postgresql.PostgresConnector”,“tasks.max”:“1”,“database.hostname”:“postgres”,“database.port”:“5432”,“database.user”:“postgres”,“database.password”:“postgres”,“database.server.id”:“184054”,“database.dbname”:“postgres”,“topic.prefix”:“dbserver1”,“snapshot.mode”:“NEVER”,“schema.history.internal.kafka.bootstrap.servers”:“kafka:9092”,“schema.history.internal.kafka.topic”:“schema-changes.inventory”,“signal.enabled.channels”:“source,jmx”,“signal.data.collection”:“inventory.debezium_signal”,“notification.enabled.channels”:“log,sink,jmx”,“notification.sink.topic.name”:“io.debezium.notification”}}’
或者我建议用 Kcctl 工具与卡夫卡互动连接。它是卡夫卡连接的现代直观的命令行客户端。

首先,我们需要创建一个配置上下文来连接卡夫卡连接

kcctl config set-context local --cluster http://localhost:8083
然后我们可以注册连接器运行以下命令

kcctl apply -f postgres-jmx.json
我们现在可以得到连接容器的日志

docker logs connect
检查连接器是否启动了流事件

INFO Postgres|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
为增量快照准备数据库
因为增量快照需要signal.data.collection 要定义此定义,我们需要在您的服务站数据库中创建信号表。

在使用与GTDS和read.only 准备好了。
要创建信号表,我们需要连接到我们的ESTGres实例。我们可以利用psql 客户在邮政集装箱内。

docker exec -it postgres bash
一旦进入容器,我们就可以连接到

psql -h localhost -d postgres -U postgres
密码是 波斯特格雷斯
我们就可以检查里面有没有桌子 存货 图解

\dt inventory.*
命令应该返回类似的东西

            List of relations

Schema | Name | Type | Owner
-----------±-----------------±------±---------
inventory | customers | table | postgres
inventory | geom | table | postgres
inventory | orders | table | postgres
inventory | products | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys | table | postgres
(6 rows)
我们需要用以下命令创建信号表:

CREATE TABLE inventory.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
发送增量快照信号
我们必须连接到jmx服务器,才能通过jmx通道发送信号。我们使用 Jmxdg 客户,所以下载后,我们可以运行它

java -jar jmxterm-1.0.4-uber.jar

open localhost:9012

beans -d debezium.postgres

run -b debezium.postgres:context=signals,server=dbserver1,type=management signal 12345 execute-snapshot {“data-collections”:[“inventory.orders”],“type”:“INCREMENTAL”}
帮我查一下客户
打开一个连接到JDB2服务器
搜寻豆下 德贝齐姆。 领域
执行 发信号 执行递增快照的操作 存货. 表
核对数据
在那之后,我们要检查所有来自 命令 表在相应的卡夫卡主题中得到了正确的捕捉。

我们可以通过以下命令输入卡夫卡容器:

docker exec -it kafka bash
一旦进入容器,我们就可以在 dbserver1.inventory.orders 主题:以下命令

kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.inventory.orders --from-beginning
输出应该是这样的

{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “int64”,
“optional”: true,
“field”: “txId”
},
{
“type”: “int64”,
“optional”: true,
“field”: “lsn”
},
{
“type”: “int64”,
“optional”: true,
“field”: “xmin”
}
],
“optional”: false,
“name”: “io.debezium.connector.postgresql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“name”: “event.block”,
“version”: 1,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.orders.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 10001,
“order_date”: 16816,
“purchaser”: 1001,
“quantity”: 1,
“product_id”: 102
},
“source”: {
“version”: “2.4.0-SNAPSHOT”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1695631605203,
“snapshot”: “incremental”,
“db”: “postgres”,
“sequence”: “[“34837776”,“34837776”]”,
“schema”: “inventory”,
“table”: “orders”,
“txId”: null,
“lsn”: null,
“xmin”: null
},
“op”: “r”,
“ts_ms”: 1695631605204,
“transaction”: null
}
}
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “int64”,
“optional”: true,
“field”: “txId”
},
{
“type”: “int64”,
“optional”: true,
“field”: “lsn”
},
{
“type”: “int64”,
“optional”: true,
“field”: “xmin”
}
],
“optional”: false,
“name”: “io.debezium.connector.postgresql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“name”: “event.block”,
“version”: 1,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.orders.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 10002,
“order_date”: 16817,
“purchaser”: 1002,
“quantity”: 2,
“product_id”: 105
},
“source”: {
“version”: “2.4.0-SNAPSHOT”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1695631605204,
“snapshot”: “incremental”,
“db”: “postgres”,
“sequence”: “[“34837776”,“34837776”]”,
“schema”: “inventory”,
“table”: “orders”,
“txId”: null,
“lsn”: null,
“xmin”: null
},
“op”: “r”,
“ts_ms”: 1695631605204,
“transaction”: null
}
}
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “int64”,
“optional”: true,
“field”: “txId”
},
{
“type”: “int64”,
“optional”: true,
“field”: “lsn”
},
{
“type”: “int64”,
“optional”: true,
“field”: “xmin”
}
],
“optional”: false,
“name”: “io.debezium.connector.postgresql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“name”: “event.block”,
“version”: 1,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.orders.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 10003,
“order_date”: 16850,
“purchaser”: 1002,
“quantity”: 2,
“product_id”: 106
},
“source”: {
“version”: “2.4.0-SNAPSHOT”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1695631605204,
“snapshot”: “incremental”,
“db”: “postgres”,
“sequence”: “[“34837776”,“34837776”]”,
“schema”: “inventory”,
“table”: “orders”,
“txId”: null,
“lsn”: null,
“xmin”: null
},
“op”: “r”,
“ts_ms”: 1695631605204,
“transaction”: null
}
}
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“default”: 0,
“field”: “id”
},
{
“type”: “int32”,
“optional”: false,
“name”: “io.debezium.time.Date”,
“version”: 1,
“field”: “order_date”
},
{
“type”: “int32”,
“optional”: false,
“field”: “purchaser”
},
{
“type”: “int32”,
“optional”: false,
“field”: “quantity”
},
{
“type”: “int32”,
“optional”: false,
“field”: “product_id”
}
],
“optional”: true,
“name”: “dbserver1.inventory.orders.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “int64”,
“optional”: true,
“field”: “txId”
},
{
“type”: “int64”,
“optional”: true,
“field”: “lsn”
},
{
“type”: “int64”,
“optional”: true,
“field”: “xmin”
}
],
“optional”: false,
“name”: “io.debezium.connector.postgresql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“name”: “event.block”,
“version”: 1,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.orders.Envelope”,
“version”: 1
},
“payload”: {
“before”: null,
“after”: {
“id”: 10004,
“order_date”: 16852,
“purchaser”: 1003,
“quantity”: 1,
“product_id”: 107
},
“source”: {
“version”: “2.4.0-SNAPSHOT”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1695631605204,
“snapshot”: “incremental”,
“db”: “postgres”,
“sequence”: “[“34837776”,“34837776”]”,
“schema”: “inventory”,
“table”: “orders”,
“txId”: null,
“lsn”: null,
“xmin”: null
},
“op”: “r”,
“ts_ms”: 1695631605204,
“transaction”: null
}
}
就这样!我们已经使用jmx通道发送了一个增量快照信号。

通过jmx通道监控增量快照进展
由于我们已经执行了一个增量快照,现在我们可以通过JMDA通道读取Debezns生成的通知。

我们使用下列配置注册连接器

{
“name”: “inventory-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.hostname”: “postgres”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “postgres”,
“database.server.id”: “184054”,
“database.dbname”: “postgres”,
“topic.prefix”: “dbserver1”,
“snapshot.mode”: “NEVER”,
“schema.history.internal.kafka.bootstrap.servers”: “kafka:9092”,
“schema.history.internal.kafka.topic”: “schema-changes.inventory”,
“signal.enabled.channels”: “source,jmx”,
“signal.data.collection”: “inventory.debezium_signal”,
“notification.enabled.channels”: “jmx”
}
}
这种配置使 Jmx 通知频道。
要访问该通知,我们需要再次连接到JDB2服务器。就像我们为信号所做的那样,我们将使用jmxterm

java -jar jmxterm-1.0.4-uber.jar

open localhost:9012

beans -d debezium.postgres

get -b debezium.postgres:context=notifications,server=dbserver1,type=management Notifications
帮我查一下客户
打开一个连接到JDB2服务器
搜寻豆下 德贝齐姆。 领域
得到通知。
你应该期待下面的输出

#mbean = debezium.postgres:context=notifications,server=dbserver1,type=management:
Notifications = [ {
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
};
aggregateType = Initial Snapshot;
id = b20bec8d-f21f-4d74-bb75-cdd7f4c7d933;
type = SKIPPED;
},
{
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = STARTED;
},
{
additionalData = {
( current_collection_in_progress ) = {
key = current_collection_in_progress;
value = inventory.orders;
};
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( maximum_key ) = {
key = maximum_key;
value = 10004;
};
( last_processed_key ) = {
key = last_processed_key;
value = 10004;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = IN_PROGRESS;
},
{
additionalData = {
( scanned_collection ) = {
key = scanned_collection;
value = inventory.orders;
};
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( total_rows_scanned ) = {
key = total_rows_scanned;
value = 4;
};
( status ) = {
key = status;
value = SUCCEEDED;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = TABLE_SCAN_COMPLETED;
},
{
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = COMPLETED;
}
];
这是一个来自初始快照的通知,带有状态SKIPPED 因为我们的连接器配置了"snapshot.mode": “NEVER”
这是关于增量快照启动的通知
这个通知告诉我们inventory.orders 快照正在进行中,提供了关于最后处理和最大键的有用信息。在这个例子中,我们只有一个in progress 通知,但取决于你的桌子尺寸snapshot.fetch.size ,你可以得到更多。
此通知告知特定表的快照已经完成,并提供了所处理的全部行的信息。
对于这个示例,这是我们有的最后一个通知,它告诉我们整个增量快照进展已经完成。
jmx还提供了生成自己的通知的可能性。德贝唑也会产生这些通知。您可以订阅这些通知,因此您可以在不投票的情况下立即接收这些通知。 通知书 豆。
利用亚洛基亚
JLOLIA是一个功能强大的工具,可以让您与JDB2服务器进行交互,并通过REST来公开它。使用它,我们可以通过REST与Debezns进行交互,利用信号和通知的jmx豆。这样,您可以无缝地发送信号和接收通知,并使用更熟悉的RESTAPI。

要启用Joloya,我们需要启用它在我们卡夫卡连接容器上的代理。

这是我们示例中使用的码头组合文件

version: ‘2’
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.4
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.4
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:2.4
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
connect:
image: quay.io/debezium/connect:2.4
ports:
- 8083:8083
- 9012:9012
- 8778:8778
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- JMXPORT=9012
- JMXHOST=0.0.0.0
- ENABLE_JOLOKIA=true
会暴露出恐怖分子使用的港口
这将使在我们的测试图像中已经装运的Joloia代理。如果您想在您的安装上启用代理,请检查 正式文件
通过约洛基亚发出信号
若要通过JOLOKIA发送信号,我们可以向具有所需信号和参数的JOLOKIA端点发送一个HTTP邮件请求。

要继续使用增量快照示例,要触发它,您可以运行以下命令

curl -X POST ‘http://localhost:8778/jolokia/exec’ -d ‘{“type”:“EXEC”,“mbean”:“debezium.postgres:context=signals,server=dbserver1,type=management”,“operation”:“signal”,“arguments”:[“12345”,“execute-snapshot”,"{“data-collections”: [“inventory.products”], “type”: “INCREMENTAL”}"]}’ | jq
它应该是

{
“request”: {
“mbean”: “debezium.postgres:context=signals,server=dbserver1,type=management”,
“arguments”: [
“12345”,
“execute-snapshot”,
“{“data-collections”: [“inventory.products”], “type”: “INCREMENTAL”}”
],
“type”: “exec”,
“operation”: “signal”
},
“value”: null,
“timestamp”: 1695651387,
“status”: 200
}
接收通知书
还允许您使用httpGET请求从Debezum获取通知。

curl -X GET ‘http://localhost:8778/jolokia/read/debezium.postgres:context=notifications,server=dbserver1,type=management/Notifications’ | jq
它应该是

{
“request”: {
“mbean”: “debezium.postgres:context=notifications,server=dbserver1,type=management”,
“attribute”: “Notifications”,
“type”: “read”
},
“value”: [
{
“additionalData”: {
“connector_name”: “dbserver1”
},
“id”: “b20bec8d-f21f-4d74-bb75-cdd7f4c7d933”,
“type”: “SKIPPED”,
“aggregateType”: “Initial Snapshot”
},
{
“additionalData”: {
“connector_name”: “dbserver1”,
“data_collections”: “inventory.orders”
},
“id”: “12345”,
“type”: “STARTED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“last_processed_key”: “10004”,
“current_collection_in_progress”: “inventory.orders”,
“connector_name”: “dbserver1”,
“maximum_key”: “10004”,
“data_collections”: “inventory.orders”
},
“id”: “12345”,
“type”: “IN_PROGRESS”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“scanned_collection”: “inventory.orders”,
“connector_name”: “dbserver1”,
“total_rows_scanned”: “4”,
“status”: “SUCCEEDED”,
“data_collections”: “inventory.orders”
},
“id”: “12345”,
“type”: “TABLE_SCAN_COMPLETED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“connector_name”: “dbserver1”
},
“id”: “12345”,
“type”: “COMPLETED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“connector_name”: “dbserver1”,
“data_collections”: “inventory.products”
},
“id”: “12345”,
“type”: “STARTED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“last_processed_key”: “109”,
“current_collection_in_progress”: “inventory.products”,
“connector_name”: “dbserver1”,
“maximum_key”: “109”,
“data_collections”: “inventory.products”
},
“id”: “12345”,
“type”: “IN_PROGRESS”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“scanned_collection”: “inventory.products”,
“connector_name”: “dbserver1”,
“total_rows_scanned”: “9”,
“status”: “SUCCEEDED”,
“data_collections”: “inventory.products”
},
“id”: “12345”,
“type”: “TABLE_SCAN_COMPLETED”,
“aggregateType”: “Incremental Snapshot”
},
{
“additionalData”: {
“connector_name”: “dbserver1”
},
“id”: “12345”,
“type”: “COMPLETED”,
“aggregateType”: “Incremental Snapshot”
}
],
“timestamp”: 1695652278,
“status”: 200
}
你也看到了我们也收到了inventory.products 我们通过RESTAPI发送的表增量快照

结论
在我们系列的第三部分中,我们学习了如何启用和管理使用jmx和joloia的信令和通知。信号可以让你动态地控制Debezium的行为,而通知可以让你了解关键事件。通过利用这些功能和JOLOKIA,您可以有效地管理、监视和与您的数据流工作流交互,确保您始终控制Debezium。


http://www.niftyadmin.cn/n/5413066.html

相关文章

【Linux】常见的基本指令(上)

在这篇博客中&#xff0c;将会介绍到Linux操作系统的基本指令。 一.ls指令 ls 指令是显示当前目录的下文件和文件夹。 ls -a 列出目录下的所有文件&#xff0c;包括隐藏文件 ls -l 或者 ll 列出当前目录下的文件和文件夹&#xff0c;并且是列文件的…

前端技术研究越深入,越觉得技术不是决定录用唯一条件。

一、拒绝抬杠 我说技能不是唯一条件&#xff0c;不是说技能不重要&#xff0c;招聘前端条件是1X,其中1是技能&#xff0c;X是其他条件。 如果X条件很优秀&#xff0c;1这个条件可以降格为0.8、0.5&#xff0c;甚至更低。 有人就抬杠&#xff0c;那为啥不招聘清洁工来干前端&…

记一次busybox-mountPath简单但容易忽略的问题open /proc/self/fd: no such file or directory

目录 一.设备条件介绍 二.我的目的 三.问题所在 1.在用以下pod进行测试的时候出现问题 2.报错 3.解决 4.解决完成 一.设备条件介绍 [rootk8s-master pv]# containerd --version containerd containerd.io 1.6.25 d8f198a4ed8892c764191ef7b3b06d8a2eeb5c7f ​ [rootk…

基于YOLOv5的驾驶员疲劳驾驶行为​​​​​​​检测系统

&#x1f4a1;&#x1f4a1;&#x1f4a1;本文主要内容:详细介绍了疲劳驾驶行为检测整个过程&#xff0c;从数据集到训练模型到结果可视化分析。 博主简介 AI小怪兽&#xff0c;YOLO骨灰级玩家&#xff0c;1&#xff09;YOLOv5、v7、v8优化创新&#xff0c;轻松涨点和模型轻量…

腾讯云服务器和阿里云服务器价格测评_2024年费用大PK

2024年阿里云服务器和腾讯云服务器价格战已经打响&#xff0c;阿里云服务器优惠61元一年起&#xff0c;腾讯云服务器61元一年&#xff0c;2核2G3M、2核4G、4核8G、4核16G、8核16G、16核32G、16核64G等配置价格对比&#xff0c;阿腾云atengyun.com整理阿里云和腾讯云服务器详细配…

leetCode刷题 4.寻找两个正序数组的中位数

目录 1. 思路 2. 解题方法 3. 复杂度 4. Code 题目&#xff1a; 给定两个大小分别为 m 和 n 的正序&#xff08;从小到大&#xff09;数组 nums1 和 nums2。请你找出并返回这两个正序数组的 中位数 。 算法的时间复杂度应该为 O(log (mn)) 。 示例 1&#xff1a; 输入&…

突破编程_C++_设计模式(外观模式)

1 外观模式的基本概念 C 外观模式&#xff08;Facade Pattern&#xff09;是一种结构型设计模式&#xff0c;它为子系统中的一组接口提供了一个统一的高级接口&#xff0c;从而使得子系统更容易使用。外观模式定义了一个高层次的接口&#xff0c;这个接口使得这一子系统更加容…

★【二叉搜索树插入新的节点】【二叉搜索树】Leetcode 701. 二叉搜索树中的插入操作

【二叉搜索树插入新的节点】【二叉搜索树】Leetcode 701. 二叉搜索树中的插入操作 关键点&#xff1a;解法 递归法解法 迭代法 ---------------&#x1f388;&#x1f388;题目链接&#x1f388;&#x1f388;------------------- 关键点&#xff1a; 关键点&#xff1a; 二叉…