Flink支持哪些数据类型?

网友投稿 662 2022-10-20


Flink支持哪些数据类型?

一、支持的数据类型

Flink 对可以在 DataSet 或 DataStream 中的元素类型进行了一些限制。这样做的原因是系统会分析类型以确定有效的执行策略。

1.java Tuple 和 Scala Case类;

2.Java POJO;

3.基本类型;

4.通用类;

5.值;

6.Hadoop Writables;

7.特殊类型

二、Flink之Tuple类型

Tuple类型  Tuple 是flink 一个很特殊的类型 (元组类型),是一个抽象类,共26个Tuple子类继承Tuple 他们是 Tuple0一直到Tuple25

package org.apache.flink.api.java.tuple;

import java.io.Serializable;

import org.apache.flink.annotation.Public;

import org.apache.flink.types.NullFieldException;

@Public

public abstract class Tuple implements Serializable {

private static final long serialVersionUID = 1L;

public static final int MAX_ARITY = 25;

private static final Class>[] CLASSES = new Class[]{Tuple0.class, Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class};

public Tuple() {

}

public abstract T getField(int var1);

public T getFieldNotNull(int pos) {

T field = this.getField(pos);

if (field != null) {

return field;

} else {

throw new NullFieldException(pos);

}

}

public abstract void setField(T var1, int var2);

public abstract int getArity();

public abstract T copy();

public static Class extends Tuple> getTupleClass(int arity) {

if (arity >= 0 && arity <= 25) {

return CLASSES[arity];

} else {

throw new IllegalArgumentException("The tuple arity must be in [0, 25].");

}

}

public static Tuple newInstance(int arity) {

switch(arity) {

case 0:

return Tuple0.INSTANCE;

case 1:

return new Tuple1();

case 2:

return new Tuple2();

case 3:

return new Tuple3();

case 4:

return new Tuple4();

case 5:

return new Tuple5();

case 6:

return new Tuple6();

case 7:

return new Tuple7();

case 8:

return new Tuple8();

case 9:

return new Tuple9();

case 10:

return new Tuple10();

case 11:

return new Tuple11();

case 12:

return new Tuple12();

case 13:

return new Tuple13();

case 14:

return new Tuple14();

case 15:

return new Tuple15();

case 16:

return new Tuple16();

case 17:

return new Tuple17();

case 18:

return new Tuple18();

case 19:

return new Tuple19();

case 20:

return new Tuple20();

case 21:

return new Tuple21();

case 22:

return new Tuple22();

case 23:

return new Tuple23();

case 24:

return new Tuple24();

case 25:

return new Tuple25();

default:

throw new IllegalArgumentException("The tuple arity must be in [0, 25].");

}

}

}

查看源码我们看到Tuple0一直到Tuple25

我们看flink为我们为我们构造好了0-25个字段的模板类

ackage org.apache.flink.api.java.tuple;

import java.io.ObjectStreamException;

import org.apache.flink.annotation.Public;

@Public

public class Tuple0 extends Tuple {

private static final long serialVersionUID = 1L;

public static final Tuple0 INSTANCE = new Tuple0();

public Tuple0() {

}

public int getArity() {

return 0;

}

public T getField(int pos) {

throw new IndexOutOfBoundsException(String.valueOf(pos));

}

public void setField(T value, int pos) {

throw new IndexOutOfBoundsException(String.valueOf(pos));

}

public Tuple0 copy() {

return new Tuple0();

}

public String toString() {

return "()";

}

public boolean equals(Object o) {

return this == o || o instanceof Tuple0;

}

public int hashCode() {

return 0;

}

private Object readResolve() throws ObjectStreamException {

return INSTANCE;

}

}

三、Tuple的使用

方式一:初始化元组

可使用静态方法 newInstance进行元组构造 指定元组空间大小;

ex: 1 则元组只有一个空间,则实际使用的Tuple1 字段只有f0

ex: 12 则元组只有两个空间,则实际使用的Tuple2 字段只有f0,f1

指定  Tuple元组空间大小 (可理解为字段个数)

Tuple tuple = Tuple.newInstance(1);

方式一:构造元组

使用Tuple.newInstance(xx),指定元组空间大小的话,这样存取虽然能够实现,但会存在存储索引位置使用不正确的情况,可能由于失误操作编写出索引越界异常,而且使用不太方便,使用Tuplex.of(数据)方法构造Tuple元组

Tuple3 tuple3 = Tuple3.of("test0", "test1", "test2");

System.out.println(tuple3.f0); // test0

System.out.println(tuple3.f1); // test1

System.out.println(tuple3.f2); // test2

四、Flink之POJO类型

Java和Scala的类在满足下列条件时,将会被Flink视作特殊的POJO数据类型专门进行处理:

1.是公共类;

2.无参构造是公共的;

3.所有的属性都是可获得的(声明为公共的,或提供get,set方法);

4.字段的类型必须是Flink支持的。Flink会用Avro来序列化任意的对象。

Flink会分析POJO类型的结构获知POJO的字段。POJO类型要比一般类型好用。此外,Flink访问POJO要比一般类型更高效。

public class WordWithCount {

public String word;

public int count;

public WordWithCount() {}

public WordWithCount(String word, int count) { this.word = word; this.count = count; }

}

DataStream wordCounts = env.fromElements(

new WordWithCount("hello", 1),

new WordWithCount("world", 2));

wordCounts.keyBy("word");

五、Flink之基本类型

Flink支持Java和Scala所有的基本数据类型,比如 Integer,String,和Double。

六、Flink之通用类型

Flink支持大多数的Java,Scala类(API和自定义)。包含不能序列化字段的类在增加一些限制后也可支持。遵循Java Bean规范的类一般都可以使用。

所有不能视为POJO的类Flink都会当做一般类处理。这些数据类型被视作黑箱,其内容是不可见的。通用类使用Kryo进行序列/反序列化。

七、Flink之值类型Values

通过实现org.apache.flinktypes.Value接口的read和write方法提供自定义代码来进行序列化/反序列化,而不是使用通用的序列化框架。

Flink预定义的值类型与原生数据类型是一一对应的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)。这些值类型作为原生数据类型的可变变体,他们的值是可以改变的,允许程序重用对象从而缓解GC的压力。

八、Flink之Hadoop的Writable类

它实现org.apache.hadoop.Writable接口的类型,该类型的序列化逻辑在write()和readFields()方法中实现。

九、Flink之特殊类型

Flink比较特殊的类型有以下两种:

1.Scala的 Either、Option和Try。

2.Java ApI有自己的Either实现。

Java Api 与 Scala 的 类似Either,它表示两种可能类型的值,Left或Right。Either对于错误处理或需要输出两种不同类型的记录的运算符很有用。

类型擦除和类型推理

Java编译器在编译之后会丢弃很多泛型类型信息。这在Java中称为类型擦除。这意味着在运行时,对象的实例不再知道其泛型类型。

例如,在JVM中,DataStream和DataStream的实例看起来是相同的。

List l1 = new ArrayList();

List l2 = new ArrayList();

System.out.println(l1.getClass() == l2.getClass());

泛型:一种较为准确的说法就是为了参数化类型,或者说可以将类型当作参数传递给一个类或者是方法。Flink 的Java API会试图去重建(可以做类型推理)这些被丢弃的类型信息,并将它们明确地存储在数据集以及操作中。你可以通过DataStream.getType()方法来获取类型,这个方法将返回一个TypeInformation的实例,这个实例是Flink内部表示类型的方式。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:光纤收发器:关于芯片性能指标参数
下一篇:光纤收发器和光电转换器是同一种设备吗?
相关文章

 发表评论

暂时没有评论,来抢沙发吧~