Daily Archives: February 8, 2021

Flink study summary

Different windows

Tumbling Windows, fixed window. BI analysis. Only cares about the result within a certain time.
Sliding Window, 有步长,数据可能会重复使用。场景,需要实时分析,给出结果。如股票交易,最近一段时间的交易情况.
Session Window, if long time haven’t received event, then it comes a new window. It’s like timeout.
https://www.youtube.com/watch?v=MSp_w5dnhF4

Watermark, TimestampExtractor etc.

Flink Watermark, watermark can be treated as a special event. Every event which the time is earlier than watermark won’t come into system.
https://www.youtube.com/watch?v=vijEi9eGtRo

BoundedOutOfOrderednessTimestampExtractor,
Time.milliseconds(), 等待延迟多久发车。如果当前TaskManger中的event time是5s,time是1s,那么延迟是4s,认为4s之前的数据都来了。

TimestampAssigner
AssignerWithPeriodicWatermarks, default每200ms产生一个watermark
AssignerWithPunctuatedWatermarks,

sideOutputLateDate(), do something for dropped data.

Allowed Lateness, the window won’t close with an allowed lateness time.

Flinkd, by default, trigger a process() when a window is generated and closed.

 

Flink Architecture(Task, slot, thread)

JobManager
ResourceManager, different implementations. Such as YARN, Mesos, Kubernetes resource manager. Can distribute slots of TaskManagers, can’t start TaskManagers

Dispatcher, Can start a new JobMaster

JobMaster, Manage single JobGraph

TaskManager, worker. Resource on TaskManger is called slot

Each task, executed by one thread.

operators are chained by default. Each chain is handled by single thread. Single thread, single slot. Slot is resource group. or subtask.

slots, isolated memory. CPU is not isolated within slot.

Event time, Session End trigger

Method1:

Another way is that we still use the ProcessWindowFunction(). By default, it only triggers when a whole window is done. We customize a ViewEventTrigger. It triggers ProcessWindowFunction when each element comes in. Because the window can grow bigger and bigger, we can customize a ViewEventEvictor to evict redundant element.

Method2:

Use KeyedProcessFunction,
ctx.timerService().registerEventTimeTimer(current.lastModified + interval);
@Override public void onTimer() {}

/**
 * registerEventTimeTimer 是基于Event Time注册定时器,触发时需要下一条数据的水印作对比
 * 需要查看相差的时间是否大于等于interval,大于等于interval则触发,因为已经到时间了,小于interval,不然不会触发定时器,因为没到指定的时间
 * 如果下一条水印不来,就没办法对比时间间隔是否大于interval,也就不会触发
 * <p>
 * 与registerProcessingTimeTimer不同,这个是与机器时间做比对,所以一到interval,就会进行触发操作
 */
public class CountWithTimeoutEventTimeFunction
        extends KeyedProcessFunction<Integer, UserAction, Tuple2<Integer, Long>> {

    private ValueState<CountWithTimestamp> state;
    private final long interval = 60000L;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(
            UserAction value,
            Context ctx,
            Collector<Tuple2<Integer, Long>> out) throws Exception {

        CountWithTimestamp current = state.value();

        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.getUserId();
        }

        current.count++;
        current.lastModified = ctx.timestamp();

        state.update(current);
        // 注册EventTime处理时间的定时器
        ctx.timerService().registerEventTimeTimer(current.lastModified + interval);
    }

    @Override
    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<Tuple2<Integer, Long>> out) throws Exception {

        CountWithTimestamp result = state.value();

        if (timestamp == result.lastModified + interval) {
            out.collect(new Tuple2<Integer, Long>(result.key, result.count));
            state.update(null);
            ctx.timerService().deleteEventTimeTimer(timestamp);
        }
    }
}

above code is referred from https://blog.csdn.net/qq_33689414/article/details/94619525

Checkpoint, savepoint

checkpoint, savepoint difference

Checkpoint is automatically generated by flink job. It periodically saves in s3 or somewhere.

FsStateBackend vs RocksDBStateBackend

FsStateBackend is memory save only. It’s used when state is small.

RocksDBStateBackend can save on TaskManger local disk, then sync to S3 or somewhere. It’s used when state is large, can’t be handled within memory.

commands

get EMR yarn application id

application_id=`yarn application -list -appStates RUNNING | awk 'NR == 3 { print $1 }'` // get current yarn application id

get flink jobId

job_id=`flink list -m yarn-cluster -yid $application_id | awk 'NR==4 {print $4}'` // get flink job id

cancel flink and create a savepoint

flink cancel --withSavepoint s3://bucket_paty/deployment-savepoint $job_id -m yarn-cluster -yid $application_id

kill all yarn application

for x in $(yarn application -list -appStates RUNNING | awk 'NR > 2 { print $1 }'); do yarn application -kill $x; done

Need to set RETAIN_ON_CANCELLATION. Or checkpoint will be lost when manually canceling flink job.
Also checkpoint still gone when ran command:

flink cancel --withSavepoint

checkpoint persists when manual cancel or killing yarn application.

parallelism, slot, task manager

parallelism is 64, it means it needs 64 slots. If set 2 slot per task manager, then EMR should give us 32 task manager.

fink WordCount.jar

Under flink default library, there is WordCount.jar. We can simple run it.

1. launch flink locally

./bin/start-cluster.sh

2. run a job

./bin/flink run ./examples/batch/WordCount.jar