Different windows
Tumbling Windows, fixed window. BI analysis. Only cares about the result within a certain time.
Sliding Window, 有步长,数据可能会重复使用。场景,需要实时分析,给出结果。如股票交易,最近一段时间的交易情况.
Session Window, if long time haven’t received event, then it comes a new window. It’s like timeout.
https://www.youtube.com/watch?v=MSp_w5dnhF4
Watermark, TimestampExtractor etc.
Flink Watermark, watermark can be treated as a special event. Every event which the time is earlier than watermark won’t come into system.
https://www.youtube.com/watch?v=vijEi9eGtRo
BoundedOutOfOrderednessTimestampExtractor,
Time.milliseconds(), 等待延迟多久发车。如果当前TaskManger中的event time是5s,time是1s,那么延迟是4s,认为4s之前的数据都来了。
TimestampAssigner
AssignerWithPeriodicWatermarks, default每200ms产生一个watermark
AssignerWithPunctuatedWatermarks,
sideOutputLateDate(), do something for dropped data.
Allowed Lateness, the window won’t close with an allowed lateness time.
Flinkd, by default, trigger a process() when a window is generated and closed.
Flink Architecture(Task, slot, thread)
JobManager
ResourceManager, different implementations. Such as YARN, Mesos, Kubernetes resource manager. Can distribute slots of TaskManagers, can’t start TaskManagers
Dispatcher, Can start a new JobMaster
JobMaster, Manage single JobGraph
TaskManager, worker. Resource on TaskManger is called slot
Each task, executed by one thread.
operators are chained by default. Each chain is handled by single thread. Single thread, single slot. Slot is resource group. or subtask.
slots, isolated memory. CPU is not isolated within slot.
Event time, Session End trigger
Method1:
Another way is that we still use the ProcessWindowFunction(). By default, it only triggers when a whole window is done. We customize a ViewEventTrigger. It triggers ProcessWindowFunction when each element comes in. Because the window can grow bigger and bigger, we can customize a ViewEventEvictor to evict redundant element.
Method2:
Use KeyedProcessFunction,
ctx.timerService().registerEventTimeTimer(current.lastModified + interval);
@Override public void onTimer() {}
/** * registerEventTimeTimer 是基于Event Time注册定时器,触发时需要下一条数据的水印作对比 * 需要查看相差的时间是否大于等于interval,大于等于interval则触发,因为已经到时间了,小于interval,不然不会触发定时器,因为没到指定的时间 * 如果下一条水印不来,就没办法对比时间间隔是否大于interval,也就不会触发 * <p> * 与registerProcessingTimeTimer不同,这个是与机器时间做比对,所以一到interval,就会进行触发操作 */ public class CountWithTimeoutEventTimeFunction extends KeyedProcessFunction<Integer, UserAction, Tuple2<Integer, Long>> { private ValueState<CountWithTimestamp> state; private final long interval = 60000L; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class)); } @Override public void processElement( UserAction value, Context ctx, Collector<Tuple2<Integer, Long>> out) throws Exception { CountWithTimestamp current = state.value(); if (current == null) { current = new CountWithTimestamp(); current.key = value.getUserId(); } current.count++; current.lastModified = ctx.timestamp(); state.update(current); // 注册EventTime处理时间的定时器 ctx.timerService().registerEventTimeTimer(current.lastModified + interval); } @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector<Tuple2<Integer, Long>> out) throws Exception { CountWithTimestamp result = state.value(); if (timestamp == result.lastModified + interval) { out.collect(new Tuple2<Integer, Long>(result.key, result.count)); state.update(null); ctx.timerService().deleteEventTimeTimer(timestamp); } } }
above code is referred from https://blog.csdn.net/qq_33689414/article/details/94619525
Checkpoint, savepoint
checkpoint, savepoint difference
Checkpoint is automatically generated by flink job. It periodically saves in s3 or somewhere.
FsStateBackend vs RocksDBStateBackend
FsStateBackend is memory save only. It’s used when state is small.
RocksDBStateBackend can save on TaskManger local disk, then sync to S3 or somewhere. It’s used when state is large, can’t be handled within memory.
commands
get EMR yarn application id
application_id=`yarn application -list -appStates RUNNING | awk 'NR == 3 { print $1 }'` // get current yarn application id
get flink jobId
job_id=`flink list -m yarn-cluster -yid $application_id | awk 'NR==4 {print $4}'` // get flink job id
cancel flink and create a savepoint
flink cancel --withSavepoint s3://bucket_paty/deployment-savepoint $job_id -m yarn-cluster -yid $application_id
kill all yarn application
for x in $(yarn application -list -appStates RUNNING | awk 'NR > 2 { print $1 }'); do yarn application -kill $x; done
Need to set RETAIN_ON_CANCELLATION. Or checkpoint will be lost when manually canceling flink job.
Also checkpoint still gone when ran command:
flink cancel --withSavepoint
checkpoint persists when manual cancel or killing yarn application.
parallelism, slot, task manager
parallelism is 64, it means it needs 64 slots. If set 2 slot per task manager, then EMR should give us 32 task manager.
fink WordCount.jar
Under flink default library, there is WordCount.jar. We can simple run it.
1. launch flink locally
./bin/start-cluster.sh
2. run a job
./bin/flink run ./examples/batch/WordCount.jar