mysql、canal、kafka、数据同步系列(五)canal整合kafka

mysql、canal、kafka、数据同步系列(五)canal整合kafka

终于到最后一步了。我们依然采用docker-compose部署。

1、我们修改之前的docker-compose.yml文件

version: '3'
services: 
  canal-server: 
    image: canal/canal-server:v1.1.3
    container_name: canal-server
    ports: 
      - 11111:11111
    environment: 
      - canal.instance.mysql.slaveId=12
      - canal.auto.scan=false
      - canal.destinations=test
      - canal.instance.master.address=10.10.10.10:3304
      - canal.instance.dbUsername=canal
      - canal.instance.dbPassword=canal
      - canal.serverMode=kafka
      - canal.mq.servers=10.10.10.10:9092
      - canal.mq.flatMessage=true
      - canal.mq.compressionType=none
      - canal.mq.acks=all
      - canal.mq.partition=0
      - canal.mq.bufferMemory=33554432
      - canal.mq.retries=0
      - canal.mq.topic=fengpt
     # - canal.instance.filter.regex=test\\.*
    volumes: 
      - ./conf:/canal-server/conf
      - ./logs:/canal-server/logs

2、一些配置说明

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=example
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:
# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
canal.mq.servers = 127.0.0.1:9092
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false

3、启动

docker-compose up -d

4、我们编写测试类,测试下,

这里我们采用 springboot2+kafka.

1)加入pom
 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.4.RELEASE</version>
        </dependency>
2)配置文件application.yml
spring:
  kafka:
    bootstrap-servers: 10.10.10.10:9092
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #bootstrap-servers: 10.10.10.10:9092
    consumer:
      group-id: face-group111
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #bootstrap-servers: 10.10.10.10:9092
3)、编写消费者
@Component
public class KafkaReceiver {

    @KafkaListener(topics = {"fengpt"})
    public void listen(ConsumerRecord<?, ?> record) {

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("----------------- record =" + record);
            System.out.println("------------------ message =" + message);
        }

    }
}
4)、启动项目,在数据库里面insert 和update相关数据。查看控制台,可以看见,消费者在消费消息。
----------------- record =ConsumerRecord(topic = fengpt, partition = 0, offset = 0, CreateTime = 1585326357800, serialized key size = -1, serialized value size = 277, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"data":[{"id":"13","name":"ww"}],"database":"test","es":1585326357000,"id":6,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(255)"},"old":[{"name":"67676"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12},"table":"user","ts":1585326357800,"type":"UPDATE"})
------------------ message ={"data":[{"id":"13","name":"ww"}],"database":"test","es":1585326357000,"id":6,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(255)"},"old":[{"name":"67676"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12},"table":"user","ts":1585326357800,"type":"UPDATE"}

到此,我们就实现了,MySQL同步canal,同步binlog日志到kafka中。

Copyright: 采用 知识共享署名4.0 国际许可协议进行许可

Links: https://www.fengpt.cn/archives/mysqlcanalkafka数据同步系列五canal整合kafka