java学习

java学习

 

canal简单安装使用

1、数据库配置

首先使用canal需要修改数据库配置

[mysqld] 
log-bin=mysql-bin # 开启
binlog binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

创建canal数据库用户

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON
*.* TO 'canal'@'%';
FLUSH PRIVILEGES;

 

2、安装canal

下载:https://github.com/alibaba/canal/releases

解压(修改版本号):tar zxvf canal.deployer-1.1.4.tar.gz -C ./canal

配置开放服务器端口:11110、11111、11112

修改canal配置文件(这里设置了两个instance,即两个数据库):

vi canal/conf/canal.properties 
canal.destinations
= example1,example2

 配置instance:

cp -R canal/conf/example conf/example1
mv conf/example conf/example2

第一个数据库配置

vi canal/conf/example1/instance.properties
canal.instance.master.address
=32.1.2.140:3306

第二个数据库配置

vi canal/conf/example2/instance.properties
canal.instance.master.address
=32.1.2.140:3307

#如果需要新增一个instance,只需要修改canal.properties文件,并新增一个instance配置即可,无需重启canal。

运行:

sh canal/bin/startup.sh # 查看日志
cat canal/logs/canal/canal

 

3、Java使用样例

引入pom依赖,需要与安装的canal版本一致

复制代码
<dependencies>     <dependency>         <groupId>com.alibaba.otter</groupId>         <artifactId>canal.client</artifactId>         <version>1.1.4</version>     </dependency> </dependencies>
复制代码

示例代码(异步打印两个数据库的修改内容):

复制代码
package cn.spicybar.dblog;  import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.Message;  import java.net.InetSocketAddress; import java.util.List;  public class CanalClient {      public static void main(String[] args) {         new Thread(() -> initConnector("example1")).start();         new Thread(() -> initConnector("example2")).start();     }      private static void initConnector(String destination) {         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("32.1.0.237", 11111), destination, "", "");         try {             connector.connect();             connector.subscribe(".*\\..*");             connector.rollback();             while (true) {                 Message message = connector.getWithoutAck(1000);                 if (message.getId() != -1 && message.getEntries().size() > 0) {                     printEntry(message.getEntries());                 }                 connector.ack(message.getId());             }         } finally {             connector.disconnect();         }     }      private static void printEntry(List<Entry> entries) {         for (Entry entry : entries) {             if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {                 continue;             }             try {                 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());                 System.out.println(rowChange.getSql());             } catch (Exception e) {                 throw new RuntimeException("ERROR ## parser error, data:" + entry.toString(), e);             }         }     }

posted on 2020-08-31 10:55 杨军威 阅读(661) 评论(0)  编辑  收藏


只有注册用户登录后才能发表评论。


网站导航:
 

导航

统计

常用链接

留言簿

随笔档案

搜索

最新评论

阅读排行榜

评论排行榜