博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink之修改StreamExecutionEnvironment配置Job
阅读量:2055 次
发布时间:2019-04-28

本文共 5758 字,大约阅读时间需要 19 分钟。

地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/execution_configuration.html

对于批处理程序修改配置代码如下:

val env = ExecutionEnvironment.getExecutionEnvironment    val conf = env.getConfig //获取执行环境的配置    //conf进行如下配置    //    conf.setExecutionRetryDelay(5)    //    conf.disableClosureCleaner()    conf.setParallelism(2) //设置并行度

更多配置参考文档,粗体为默认配置:

  • enableClosureCleaner() / disableClosureCleaner(). The closure cleaner is enabled by default. The closure cleaner removes unneeded references to the surrounding class of anonymous functions inside Flink programs. With the closure cleaner disabled, it might happen that an anonymous user function is referencing the surrounding class, which is usually not Serializable. This will lead to exceptions by the serializer.

  • getParallelism() / setParallelism(int parallelism) Set the default parallelism for the job.

  • getMaxParallelism() / setMaxParallelism(int parallelism) Set the default maximum parallelism for the job. This setting determines the maximum degree of parallelism and specifies the upper limit for dynamic scaling.

  • getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries) Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of -1 indicates that the system default value (as defined in the configuration) should be used. This is deprecated, use  instead.

  • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay) Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more. This is deprecated, use  instead.

  • getExecutionMode() / setExecutionMode(). The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner.

  • enableForceKryo() / disableForceKryo. Kryo is not forced by default. Forces the GenericTypeInformation to use the Kryo serializer for POJOS even though we could analyze them as a POJO. In some cases this might be preferable. For example, when Flink’s internal serializers fail to handle a POJO properly.

  • enableForceAvro() / disableForceAvro(). Avro is not forced by default. Forces the Flink AvroTypeInformation to use the Avro serializer instead of Kryo for serializing Avro POJOs.

  • enableObjectReuse() / disableObjectReuse() By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior.

  • enableSysoutLogging() / disableSysoutLogging() JobManager status updates are printed to System.out by default. This setting allows to disable this behavior.

  • getGlobalJobParameters() / setGlobalJobParameters() This method allows users to set custom objects as a global configuration for the job. Since the ExecutionConfig is accessible in all user defined functions, this is an easy method for making configuration globally available in a job.

  • addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer) Register a Kryo serializer instance for the given type.

  • addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) Register a Kryo serializer class for the given type.

  • registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer) Register the given type with Kryo and specify a serializer for it. By registering a type with Kryo, the serialization of the type will be much more efficient.

  • registerKryoType(Class<?> type) If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags (integer IDs) are written. If a type is not registered with Kryo, its entire class-name will be serialized with every instance, leading to much higher I/O costs.

  • registerPojoType(Class<?> type) Registers the given type with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written. If a type is not registered with Kryo, its entire class-name will be serialized with every instance, leading to much higher I/O costs.

Note that types registered with registerKryoType() are not available to Flink’s Kryo serializer instance.

  • disableAutoTypeRegistration() Automatic type registration is enabled by default. The automatic type registration is registering all types (including sub-types) used by usercode with Kryo and the POJO serializer.

  • setTaskCancellationInterval(long interval) Sets the the interval (in milliseconds) to wait between consecutive attempts to cancel a running task. When a task is canceled a new thread is created which periodically calls interrupt() on the task thread, if the task thread does not terminate within a certain time. This parameter refers to the time between consecutive calls to interrupt() and is set by default to 30000 milliseconds, or 30 seconds.

The RuntimeContext which is accessible in Rich* functions through the getRuntimeContext() method also allows to access the ExecutionConfig in all user defined functions.

转载地址:http://gyjlf.baihongyu.com/

你可能感兴趣的文章
【托业】【新东方托业全真模拟】TEST05~06-----P5~6
查看>>
【托业】【新东方托业全真模拟】TEST09~10-----P5~6
查看>>
【托业】【新东方托业全真模拟】TEST07~08-----P5~6
查看>>
solver及其配置
查看>>
JAVA多线程之volatile 与 synchronized 的比较
查看>>
Java集合框架知识梳理
查看>>
笔试题(一)—— java基础
查看>>
Redis学习笔记(二)— 在linux下搭建redis服务器
查看>>
Redis学习笔记(三)—— 使用redis客户端连接windows和linux下的redis并解决无法连接redis的问题
查看>>
Intellij IDEA使用(一)—— 安装Intellij IDEA(ideaIU-2017.2.3)并完成Intellij IDEA的简单配置
查看>>
Intellij IDEA使用(二)—— 在Intellij IDEA中配置JDK(SDK)
查看>>
Intellij IDEA使用(三)——在Intellij IDEA中配置Tomcat服务器
查看>>
Intellij IDEA使用(四)—— 使用Intellij IDEA创建静态的web(HTML)项目
查看>>
Intellij IDEA使用(五)—— Intellij IDEA在使用中的一些其他常用功能或常用配置收集
查看>>
Intellij IDEA使用(六)—— 使用Intellij IDEA创建Java项目并配置jar包
查看>>
Eclipse使用(十)—— 使用Eclipse创建简单的Maven Java项目
查看>>
Eclipse使用(十一)—— 使用Eclipse创建简单的Maven JavaWeb项目
查看>>
Intellij IDEA使用(十三)—— 在Intellij IDEA中配置Maven
查看>>
面试题 —— 关于main方法的十个面试题
查看>>
集成测试(一)—— 使用PHP页面请求Spring项目的Java接口数据
查看>>