SOLVED 同步es,索引的字段是一个数组对象的情况,要怎么配置
-
-
@野人 我看你另外开了个提问帖,那边我贴了代码 你可以断点调试下,或者加日志调试下
-
@cloudcanal-万少
验证了一下:
1 如果先插入user表,然后再插入label,label表的数据没有同步到es,刚才那个报id的错误我加上 label标的 user_id 字段后面加上 as _id 没有报错了,但是数据也没有同步过去。2 先插入label表,然后在插入user表,数据是有过去的。
第一种情况是设计这样的吗?我理解第一种情况应该也可以插入数据才对的。
-
@cloudcanal-万少 通过这样的方式:JSON_ARRAYAGG(JSON_OBJECT(‘userId’,user_id,‘label’,label)) ,把多条记录变成一个数组了,并把这个字段设置为object,在插入label表的时候报:
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column ‘_v._id’ in ‘where clause’
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
at com.mysql.jdbc.Util.getInstance(Util.java:408)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:943)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3970)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3906)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2524)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2677)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1962)
at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeQuery(DruidPooledPreparedStatement.java:228)
at com.alibaba.otter.canal.client.adapter.support.Util.sqlRS(Util.java:59)
… 17 common frames omitted -
@野人 我发的已经包含了用法,你配置添加如下的额外选项
objFields: 字段名称: object
-
这个例子就是我发的,这个只是支持数据里面一个数据的,如果我的数据里面是一个对象的,支持的吗?
下面这种格式,表是一对多的,多的那个表需要多个字段的
{
“orderId”:1,
“shopCode”:“店铺编码”,
“extendProds”:{“shopName”:“店铺名称”},
“confirmPackage”:[{
“waybillNo”:“200”,
“packageId”:1
},{“waybillNo”:“100”,
“packageId”:2}]
} -
可以参考这个yml文件,定义下哪些字段是数组或者对象属性
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值 outerAdapterKey: exampleKey # 对应application.yml中es配置的key destination: example # cannal的instance或者MQ的topic groupId: # 对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping: _index: mytest_user # es 的索引名称 _type: _doc # es 的type名称, es7下无需配置此项 _id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配 # pk: id # 如果不需要_id, 则需要指定一个属性为主键属性 # sql映射 sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name, a.c_time as _c_time, c.labels as _labels from user a left join role b on b.id=a.role_id left join (select user_id, group_concat(label order by id desc separator ';') as labels from label group by user_id) c on c.user_id=a.id" # objFields: # _labels: array:; # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的 # _obj: object # json对象 etlCondition: "where a.c_time>='{0}'" # etl 的条件参数 commitBatch: 3000 # 提交批大小