From 14ee70e0da9b03b6a1b8aec4731534a066b67d35 Mon Sep 17 00:00:00 2001 From: RegulusZ <704709463@qq.com> Date: Fri, 31 Jul 2020 01:06:49 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E6=8F=92=E5=85=A5=E5=92=8C?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E7=9A=84format=5Fdata=EF=BC=8C=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E8=8E=B7=E5=8F=96=E7=9A=84=E7=9A=84data=E5=AD=97?= =?UTF-8?q?=E5=85=B8=E8=A7=A3=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 40 +++++++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index ea3cbb0..6dd0f2c 100644 --- a/README.md +++ b/README.md @@ -76,13 +76,16 @@ from canal.client import Client from canal.protocol import EntryProtocol_pb2 from canal.protocol import CanalProtocol_pb2 +# 建立与canal服务端的连接 client = Client() -client.connect(host='127.0.0.1', port=11111) -client.check_valid(username=b'', password=b'') +client.connect(host='127.0.0.1', port=11111) # canal服务端部署的主机IP与端口 +client.check_valid(username=b'', password=b'') # 自行填写配置的数据库账户密码 +# destination是canal服务端的服务名称, filter即获取数据的过滤规则,采用正则表达式 client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*') while True: message = client.get(100) + # entries是每个循环周期内获取到数据集 entries = message['entries'] for entry in entries: entry_type = entry.entryType @@ -92,27 +95,32 @@ while True: row_change.MergeFromString(entry.storeValue) event_type = row_change.eventType header = entry.header + # 数据库名 database = header.schemaName + # 表名 table = header.tableName event_type = header.eventType + # row是binlog解析出来的行变化记录,一般有三种格式,对应增删改 for row in row_change.rowDatas: format_data = dict() + # 根据增删改的其中一种情况进行数据处理 if event_type == EntryProtocol_pb2.EventType.DELETE: + format_data['before'] = dict() for column in row.beforeColumns: - format_data = { - column.name: column.value - } + format_data['before'][column.name] = column.value elif event_type == EntryProtocol_pb2.EventType.INSERT: + format_data['after'] = dict() for column in row.afterColumns: - format_data = { - column.name: column.value - } + format_data['after'][column.name] = column.value else: - format_data['before'] = format_data['after'] = dict() + # format_data['before'] = format_data['after'] = dict() 采用下面的写法应该更好 + format_data['before'] = dict() + format_data['after'] = dict() for column in row.beforeColumns: format_data['before'][column.name] = column.value for column in row.afterColumns: format_data['after'][column.name] = column.value + # data即最后获取的数据,包含库名,表明,事务类型,改动数据 data = dict( db=database, table=table, @@ -124,6 +132,20 @@ while True: client.disconnect() ```` +这个demo间隔一秒获取一次服务端的增量数据,并作相应的解析,代码中已做了简单的注释帮助理解,最后获取的data就是某个sql语句改动某一行的完整记录,通常有三种情况: +````python +# 设库test中有表test1,分别有id(int)和name(varchar)字段 +# insert操作:insert into test.test1 values (1,'a') +# 此时data中应是如下情况 +data = {'db':'test', 'table':'test1', 'event_type':1, 'data':{'after':{'id':'1', 'name':'a'}}} +# update操作:update test.test1 set id=2, name='b' where id=1 +# 此时的data +data = {'db':'test', 'table':'test1', 'event_type':2, 'data':{'before':{'id':'1', 'name':'a'}, 'after':{'id':'2', 'name':'b'}}} +# delete操作:delete from test.test1 where id=2 +# 此时的data +data = {'db':'test', 'table':'test1', 'event_type':3, 'data':{'before':{'id':'2', 'name':'b'}}} +```` + 更多详情请查看 [Sample](https://github.com/haozi3156666/canal-python/blob/master/example.py)