如何使用 TDengine Sink Connector?

小T导读:TDengineKafkaConnector在TDengine的官方文档上放出来已经有一段时间了 , 我们也收到了一些开发者的反馈 。 文档中的教程使用Confluent平台(集成了Kafka)演示了如何使用SourceConnector和SinkConnector , 但是很多开发者在生产环境中并没有使用Confluent , 所以为方便大家 , 本文将使用独立部署的Kafka来演示 。 本文包含以下内容:1.如何使用TDengineSinkConnector , 把数据从Kafka同步到TDengine 。 2.TDengineSinkConnector的实现原理 。 3.一个简单的测试脚本 , 帮助你在自己的环境中快速测试 。 通过更改生成测试数据的程序和配置参数 , 你可以模拟自己的使用场景 。 4.测试同步同一个topic , 使用不同分区数和不同Sink任务数对性能的影响 。
背景知识
如果你对文章开头出现的术语并不陌生 , 那么可以跳过这一部分 。
·什么是Kafka?
Kafka的核心是一个通用的、分布式的、可重复消费的消息队列 。
与之相比 , 作为一款时序数据库(Time-SeriesDatabase) , TDengine也可看作针对结构化的时序数据的消息队列 。
·什么是KafkaConnect?为什么使用KafkaConnect?
KafkaConnect是Kafka的一个组件 , 简化了Kafka与其它数据源的集成 。 用户通过KafkaConnect读写Kafka;通过KafkaConnect插件(也称KafkaConnector)来读写各种数据源 。
为方便集成 , Kafka已经提供了生产者和消费者API以及客户端库 , 那为什么还需要KafkaConnect呢?因为一个好的Kafka客户端程序 , 不是单单生产或消费数据 , 还需要考虑容错、重启、日志、弹性伸缩、序列化以及反序列化等 。 当开发者自己完成了这一切 , 就相当于开发了一个和KafkaConnect类似的东西 。
与Kafka集成是KafkaConnect已经解决的问题 , 用户不需要重复造轮子 , 只有少数边缘场景才需要定制化的集成方案 。
TDengineSinkConnector的实现原理
TDengineSinkConnector用于将Kafka中指定topic的数据(批量或实时)同步到TDengine的database中 。
启动SinkConnector需要一个properties配置文件 。 详细配置见官方文档的配置参考(https://docs.taosdata.com/third-party/kafka/#配置参考) 。
SinkConnector内部的实现非常简单 , 整体工作流程分为以下几个步骤:Connect框架根据配置启动N个消费者线程 。 N个消费者同时订阅数据 , 并用配置文件中指定的key.converter和value.converter做反序列化 。 Connect框架把反序列化后的数据传递给N个SinkTask的实例 。 SinkTask使用TDengine提供的schemaless写入接口来写入数据 。
上述4个步骤 , 只有最后一步写数据是SinkConnector需要关心的 , 其它都是Connect框架自动实现的 。
下面重点讨论几个问题 。
·支持的数据格式
因为使用了schemaless写入接口 , 因此TDengineSinkConnector只支持三种格式的数据:InfluxDB行协议格式、OpenTSDBTelnet协议格式和OpenTSDBJSON协议格式 。 使用配置项db.schemaless来指定写入时使用的数据格式 。 例如:db.schemaless=line
如果Kafka中的数据已经是这三种格式之一 , 那么配置文件中的value.converer , 只需指定为Connnect内置的org.apache.kafka.connect.storage.StringConverter 。 value.converter=org.apache.kafka.connect.storage.StringConverter
如果Kafka中已有的数据不是上述三种之一 , 则需要实现自己的Converter类 , 将其转换为三种格式之一 , 这个链接也许能帮到你 。
·如何指定Consumer的参数?
既然Connect框架已经帮我们做了Consumer要做的事 , 那么我们怎么来控制Consumer的行为呢?比如如何控制Consumer订阅的主题?如何控制Consumer每次poll的消息数和时间间隔?
对于订阅哪些主题 , 可以用配置项topics来指定 。