XeniosCoin中文网

发布日期:2025-01-03 18:34    点击次数:106
完成数据订阅通道的配置后,您可以使用DTS提供的SDK示例代码来订阅数据变更信息,本文介绍该示例代码的使用说明。本文以IntelliJ IDEA软件(Community Edition 2020.1 Windows版本)为例,介绍如何运行SDK示例代码来消费订阅数据。创建新版数据订阅通道,详情请参见创建RDS MySQL数据订阅通道、创建PolarDB MySQL版数据订阅通道或创建Oracle数据订阅通道。创建一个或多个消费组,详情请参见新增消费组。根据业务需求,使用SDK示例代码。参考使用示例代码使用新版订阅SDK。使用定制修改后的新版订阅SDK下载SDK示例代码文件,然后解压该文件。定位至SDK示例代码解压的目录,使用文本编辑工具打开pom.xml文件,将数据订阅SDK的版本修改为最新版本。打开IntelliJ IDEA软件,然后单击Open or Import。在弹出的对话框中,定位至SDK示例代码解压的目录,依次展开文件夹,找到项目对象模型文件:pom.xml。在弹出对话框中,选择Open as Project。在IntelliJ IDEA软件界面,依次展开文件夹,并根据 SDK客户端的使用模式,选择并双击打开对应的Java文件:DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java。设置Java文件代码中的必填参数。在DTS控制台单击目标订阅实例ID,在基本信息页面的网络区域,您可以获取网络地址及端口号信息。topic数据订阅通道的订阅Topic。在DTS控制台单击目标订阅实例ID,在基本信息页面的基本信息区域,您可以获取到订阅Topic。sid消费组ID。在DTS控制台单击目标订阅实例ID,然后单击数据消费,您可以获取到消费组ID和消费组的账号信息。 userName消费组的账号。password该账号的密码。initCheckpoint消费位点,即SDK客户端消费第一条数据的时间戳,格式为Unix时间戳,例如1620962769。 消费位点必须在订阅实例的数据范围(如图示)之内,并需转化为Unix时间戳。ConsumerContext.ConsumerSubscribeMode subscribeModeSDK客户端的使用模式,取值为:ConsumerContext.ConsumerSubscribeMode.ASSIGN:ASSIGN模式,即一个消费组下仅支持一个SDK客户端消费订阅数据。ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE:SUBSCRIBE模式,即支持在同一个消费组下同时启动多个SDK客户端实现灾备。无在IntelliJ IDEA软件界面的顶部,选择Run > Run运行该客户端。运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。SDK客户端每隔一定时间会统计并显示消费数据的信息,包括数据发送和接受时数据总数、数据总量、每秒请求数接收RPS等。表 2. 消费数据的统计信息参数说明outCountsSDK客户端所消费的数据总数。outBytesSDK客户端所消费的数据总量,单位为Byte。outRpsSDK客户端消费数据时的每秒请求数。outBpsSDK客户端消费数据时每秒传送的比特数。inBytesDTS服务器发送的数据总量,单位为Byte。 DStoreRecordQueueDTS服务器发送数据时,当前数据缓存队列的大小。inCountsDTS服务器发送数据总数。inRpsDTS服务器发送数据时的每秒请求数。 __dtSDK客户端接收到数据的当前时间戳,单位为毫秒。DefaultUserRecordQueue序列化后,当前数据缓存队列的大小。保存和查询消费位点当SDK客户端首次启动、重启或者发生内部重试时,您需要查询并传入 消费位点,开始或重新消费数据。下文将介绍在不同情况下如何管理和查询消费位点,以确保数据不丢失,且尽量不重复,实现按需消费。场景SDK使用模式查询方法查询消费位点ASSIGN模式、SUBSCRIBE模式由于SDK客户端每5秒保存一次消息位点,并提交至DTS服务器,如需查询最近一次消费位点,您可通过以下路径查询:SDK客户端所在服务器的localCheckpointStore文件。订阅通道的数据消费界面。如您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置了外部的持久化共享存储介质(如数据库),该存储介质每5秒会保存一次消息位点,供您查询。首次启动SDK客户端,需传入消费位点,来消费数据。ASSIGN模式、SUBSCRIBE模式根据SDK客户端使用模式,选择Java文件DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java,并配置消费位点initCheckpoint进行消费。配置方式,请参见3和4。SDK客户端因内部重试,需重新传入上一个记录的消费位点,以继续消费数据。ASSIGN模式请按如下顺序,查找上一个记录的消费位点,找到即可返回位点信息:您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置的外部存储介质。SDK客户端所在服务器的localCheckpointStore文件。您在DTSConsumerSubscribeDemo.java文件中initCheckpoint传入的开始时间戳(start timestamp)。SUBSCRIBE模式请按如下顺序,查找上一个记录的消费位点,找到即可返回位点信息:您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置的外部存储介质。DTS Server(增量数据采集模块)保存的位点。您在DTSConsumerSubscribeDemo.java文件中initCheckpoint传入的开始时间戳(start timestamp)。使用DTS Server(新建增量数据采集模块)的起始位点。已重启SDK客户端,需重新传入上一个记录的消费位点,以继续消费数据。ASSIGN模式根据consumerContext.java文件中setForceUseCheckpoint配置情况,查询消费位点,找到即可返回位点信息:配置为true时,每次重启SDK客户端,都会强制使用传入的initCheckpoint作为消费位点。配置为false或者没有配置时,请按如下顺序,查找上一个记录的消费位点:SDK客户端所在服务器的localCheckpointStore文件。DTS Server(增量数据采集模块)保存的位点。您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置的外部存储介质。SUBSCRIBE模式该模式下consumerContext.java文件中setForceUseCheckpoint配置不生效,请按如下顺序,查找上一个记录的消费位点:您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置的外部存储介质。DTS Server(增量数据采集模块)保存的位点。您在DTSConsumerSubscribeDemo.java文件中initCheckpoint传入的开始时间戳(start timestamp)。使用DTS Server(新建增量数据采集模块)的起始位点。持久化存储消费位点如果增量数据采集模块触发容灾机制(特别是SUBSCRIBE模式),新建的增量数据采集模块将无法保存客户端上次的消费位点信息,可能会导致客户端从一个较旧的位点开始消费订阅数据,从而造成历史数据的重复消费。例如:增量数据服务切换前,老的增量数据采集模块位点范围为2023年11月11日 08:00:00~ 2023年11月12日 08:00:00,客户端的消费位点为2023年11月12日 08:00:00;增量数据服务切换后,新的增量数据采集模块位点范围为2023年11月08日 10:00:00~ 2023年11月12日 08:01:00,那么客户端会从新的增量数据采集模块的起始位点2023年11月08日 10:00:00开始消费,造成重复消费历史数据。为了规避这种切换场景对历史数据的重复消费,建议您在客户端配置一个在客户端保存的消费位点持久化存储方式。示例方法如下,您可以根据实际情况进行修改。创建一个UserMetaStore()方法,继承实现AbstractUserMetaStore()方法。例如使用MySQL数据库存储位点信息,Java示例代码如下:public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } } 在consumerContext.java文件中的setUserRegisteredStore(new UserMetaStore())方法,配置外部存储介质。常见问题无法连接订阅实例,如何处理?请根据报错提示进行排查,详情请参见问题排查。持久化后的消费位点是什么格式的数据?消费位点在持久化处理后,将返回JSON格式的数据。其中,持久化后的消费位点的格式为Unix时间戳,您可以直接将其传回SDK进行使用。如下返回数据中,"timestamp"后的1700709977即为持久化后的消费位点。{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":1700709977,"info":""}]}问题排查问题报错提示原因解决方法无法连接ERROR CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009 failed, please check the network and if the brokerUrl is correct'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)brokerUrl填写错误。填入正确的brokerUrl、userName和password,查询方式,请参见必填参数说明。telnet real node *** failed, please check the network无法通过broker地址连接真实的IP地址。ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)用户名和密码错误。com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failedconsumerContext.java文件中 setUseCheckpoint配置为true,但消费位点不在订阅实例的数据范围(如图示)之内。传入在订阅实例的数据范围(如图示)之内的消费位点,查询方式,请参见必填参数说明。消费订阅速度变慢无可通过查询统计信息中的参数DStoreRecordQueue和DefaultUserRecordQueue队列的大小,分析消费数据变慢的原因。查询方式,请参见消费数据的统计信息。如参数DStoreRecordQueue保持为0,则表示DTS服务器拉取数据速度变慢。如参数DefaultUserRecordQueue保持为默认值512,则表示SDK客户端消费数据的速度变慢。根据实际情况,修改代码中的消费位点(initCheckpoint)进行重置位点。



Powered by BIZA 中文站 @2013-2022 RSS地图 HTML地图

Copyright Powered by365站群 © 2013-2024