Flink Stream map的坑

发布于 2020-09-17  5.83k 次阅读


场景是我在聚合的时候指定PositionToMaxBy

data.window(...).maxBy(1)

抛异常

Cannot reference field by position on PojoType<com.unique.module.biz.beans.Sensor, fields = [id: String, temperature: Double, timestamp: Long]>Referencing a field by position is supported on tuples, case classes, and arrays. Additionally, you can select the 0th field of a primitive/basic type (e.g. int).

Bean构造方法为

    private String id;
    private Long timestamp;
    private Double temperature;

    public Sensor() {
    }

    public Sensor(String id, Long timestamp, Double temperature) {
        this.id = id;
        this.timestamp = timestamp;
        this.temperature = temperature;
    }

定位问题后发现

datasource.map(x -> {
            String[] obj = x.split(",");
            return new Sensor(obj[0], Long.valueOf(obj[1]), Double.valueOf(obj[2]));
        });

如果是返回POJO类型,map 过程中调用TypeExtractor.analyzePojo反射,会new PojoTypeInfo,该构造方法里会调用asc码排序,

    @PublicEvolving
    public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
        super(typeClass);
        Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()), "POJO %s is not public", new Object[]{typeClass});
        this.fields = (PojoField[])fields.toArray(new PojoField[fields.size()]);
        Arrays.sort(this.fields, new Comparator<PojoField>() {
            public int compare(PojoField o1, PojoField o2) {
                return o1.getField().getName().compareTo(o2.getField().getName());
            }
        });

也就是说即便Bean的属性是

id->f1->f3->f14->f31->f2,map后的bean字段顺序会变成id->f1->f14->f2->f3->f31

不要图方便用position,用maxBy("temperature")