diff --git a/README.md b/README.md index ea3cbb0..6dd0f2c 100644 --- a/README.md +++ b/README.md @@ -76,13 +76,16 @@ from canal.client import Client from canal.protocol import EntryProtocol_pb2 from canal.protocol import CanalProtocol_pb2 +# 建立与canal服务端的连接 client = Client() -client.connect(host='127.0.0.1', port=11111) -client.check_valid(username=b'', password=b'') +client.connect(host='127.0.0.1', port=11111) # canal服务端部署的主机IP与端口 +client.check_valid(username=b'', password=b'') # 自行填写配置的数据库账户密码 +# destination是canal服务端的服务名称, filter即获取数据的过滤规则,采用正则表达式 client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*') while True: message = client.get(100) + # entries是每个循环周期内获取到数据集 entries = message['entries'] for entry in entries: entry_type = entry.entryType @@ -92,27 +95,32 @@ while True: row_change.MergeFromString(entry.storeValue) event_type = row_change.eventType header = entry.header + # 数据库名 database = header.schemaName + # 表名 table = header.tableName event_type = header.eventType + # row是binlog解析出来的行变化记录,一般有三种格式,对应增删改 for row in row_change.rowDatas: format_data = dict() + # 根据增删改的其中一种情况进行数据处理 if event_type == EntryProtocol_pb2.EventType.DELETE: + format_data['before'] = dict() for column in row.beforeColumns: - format_data = { - column.name: column.value - } + format_data['before'][column.name] = column.value elif event_type == EntryProtocol_pb2.EventType.INSERT: + format_data['after'] = dict() for column in row.afterColumns: - format_data = { - column.name: column.value - } + format_data['after'][column.name] = column.value else: - format_data['before'] = format_data['after'] = dict() + # format_data['before'] = format_data['after'] = dict() 采用下面的写法应该更好 + format_data['before'] = dict() + format_data['after'] = dict() for column in row.beforeColumns: format_data['before'][column.name] = column.value for column in row.afterColumns: format_data['after'][column.name] = column.value + # data即最后获取的数据,包含库名,表明,事务类型,改动数据 data = dict( db=database, table=table, @@ -124,6 +132,20 @@ while True: client.disconnect() ```` +这个demo间隔一秒获取一次服务端的增量数据,并作相应的解析,代码中已做了简单的注释帮助理解,最后获取的data就是某个sql语句改动某一行的完整记录,通常有三种情况: +````python +# 设库test中有表test1,分别有id(int)和name(varchar)字段 +# insert操作:insert into test.test1 values (1,'a') +# 此时data中应是如下情况 +data = {'db':'test', 'table':'test1', 'event_type':1, 'data':{'after':{'id':'1', 'name':'a'}}} +# update操作:update test.test1 set id=2, name='b' where id=1 +# 此时的data +data = {'db':'test', 'table':'test1', 'event_type':2, 'data':{'before':{'id':'1', 'name':'a'}, 'after':{'id':'2', 'name':'b'}}} +# delete操作:delete from test.test1 where id=2 +# 此时的data +data = {'db':'test', 'table':'test1', 'event_type':3, 'data':{'before':{'id':'2', 'name':'b'}}} +```` + 更多详情请查看 [Sample](https://github.com/haozi3156666/canal-python/blob/master/example.py)