终于到最后一步了。我们依然采用docker-compose部署。
1、我们修改之前的docker-compose.yml文件
version: '3'
services:
canal-server:
image: canal/canal-server:v1.1.3
container_name: canal-server
ports:
- 11111:11111
environment:
- canal.instance.mysql.slaveId=12
- canal.auto.scan=false
- canal.destinations=test
- canal.instance.master.address=10.10.10.10:3304
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.serverMode=kafka
- canal.mq.servers=10.10.10.10:9092
- canal.mq.flatMessage=true
- canal.mq.compressionType=none
- canal.mq.acks=all
- canal.mq.partition=0
- canal.mq.bufferMemory=33554432
- canal.mq.retries=0
- canal.mq.topic=fengpt
# - canal.instance.filter.regex=test\\.*
volumes:
- ./conf:/canal-server/conf
- ./logs:/canal-server/logs
2、一些配置说明
# 按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=example
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:
# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9092
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false
3、启动
docker-compose up -d
4、我们编写测试类,测试下,
这里我们采用 springboot2+kafka.
1)加入pom
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
2)配置文件application.yml
spring:
kafka:
bootstrap-servers: 10.10.10.10:9092
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#bootstrap-servers: 10.10.10.10:9092
consumer:
group-id: face-group111
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#bootstrap-servers: 10.10.10.10:9092
3)、编写消费者
@Component
public class KafkaReceiver {
@KafkaListener(topics = {"fengpt"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("----------------- record =" + record);
System.out.println("------------------ message =" + message);
}
}
}
4)、启动项目,在数据库里面insert 和update相关数据。查看控制台,可以看见,消费者在消费消息。
----------------- record =ConsumerRecord(topic = fengpt, partition = 0, offset = 0, CreateTime = 1585326357800, serialized key size = -1, serialized value size = 277, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"data":[{"id":"13","name":"ww"}],"database":"test","es":1585326357000,"id":6,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(255)"},"old":[{"name":"67676"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12},"table":"user","ts":1585326357800,"type":"UPDATE"})
------------------ message ={"data":[{"id":"13","name":"ww"}],"database":"test","es":1585326357000,"id":6,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(255)"},"old":[{"name":"67676"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12},"table":"user","ts":1585326357800,"type":"UPDATE"}
到此,我们就实现了,MySQL同步canal,同步binlog日志到kafka中。
评论区