<> one ： background
Used in our project Flink of Java client Used for data processing
data source ：kafka
Sending source ：kafka
There was only one business requirement , It can be understood as right mq A field in the message is accumulated and summed .
Now there are two more business requirements , It can be understood as right mq The other fields of the message are accumulated and summed .
The problem at this time is ：
flink Finish similar to map filter keyby reduce Equal operator operation , Can it only be used for one business ?
If the data source can be used by multiple businesses , Then we don't need to consider If only one business can use the data source , We need to copy the data from the same data source , Or copy a copy of the client code . It doesn't sound elegant
If you don't understand the above appeal , Let's look at the code
// Create data flow DataStream<TaskNodeInstance> sourceStream = StreamCommon.
getKafkaSourceStream(parameters, env); // Filter messages DataStream<TaskNodeInstance>
filterStream= sourceStream .filter(new NotNullFilter()) .filter(new BasicFilter(
)); // business 1. Failure response code statistics filterStream.filter(new RespCodeFilter()) .flatMap(new
RespCodeCountMap()) .keyBy("taskId") .timeWindow(Time.seconds(10)) .reduce(new
RespCodeCountReduce(), new RespCodeCountWindowFunction()) .addSink(new
RespCodeStatSink(parameters)).name("RespCodeStatSink:" + profile); //
business 2. Statistics of intelligent recommender results filterStream.filter(new RecommendFilter()) .map(new
RecommendCountMap()) .keyBy("taskId") .timeWindow(Time.seconds(10)) .reduce(new
RecommendCountReduce(), new RecommendCountWindow()) .addSink(new RecommendSink(
parameters)).name("RecommendCountSink:" + profile) ;
What's wrong with writing like this ?
If in business 1. Failure response code statistics map,reduce Wait for the operator to consume filterStream Object data , So our business 2. It's definitely wrong to write the statistics of intelligent recommendation results like this .
Data has been consumed
I intuitively thought so at that time , Then came up with two solutions
* Write a client code for another business
* In this client code , Add another one kafka-topic data source ,group-id Different can be
The above two methods can solve the problem , But not elegant .
So I started debug-flink-client Source code
The truth behind it is heartwarming
It turns out that every operator operation will new Come up with a new object
It will not affect the previous data
in other words The code above is correct
<> two ：Debug-flink-client-DataStream Source code
debug-dataStream Class source code
take map Operator entry discovery
what??? Return is new New object for . That means that each operator operation will produce a new object , Yes, the previous DataStream It won't affect .
We debug Run up and look at the memory address , No object value returned due to chained programming , Here, I accept each step of chain programming with a temporary object
Look at the memory address pointed to by the object
Good Good Sure enough, every operator operation is new Come out with a new one DataStream
Great Then the business code I wrote above is no problem
<> three ： verification
open flink-web Interface , Submit job .
have a look taskManager Overview of
I saw this , Basically verified what I said . ancestral DataStream, from kafka Object obtained from data source , It won't be polluted , It can be used by multiple businesses .
Look again taskManager Log of :
I'm so happy , All right .