flink Transformation算子(更新中)

avatar
作者
筋斗云
阅读量:0

flink Transformation算子部分

Transformation算子

map

该方法是将一个DataStream调用map方法返回一个新的DataStream。本质是将该DataStream中对应的每一条数据依次迭代出来,应用map方法传入的计算逻辑,返回一个新的DataStream。原来的DataStream中对应的每一条数据,与新生成的DataStream中数据是一一对应的,也可以说是存在着映射关系的。 
package com.lyj.sx.flink.day03;  import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class MapDemo  {     public static void main(String[] args) throws  Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          DataStreamSource<String> dataSource = env.socketTextStream("192.168.25.62", 8899);          dataSource.map(new QueryCategoryNameFromMySQLFunction()).print();          env.execute();      } } 
 package com.lyj.sx.flink.day03;  import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.Configuration;   import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet;  public class QueryCategoryNameFromMySQLFunction extends RichMapFunction<String, Tuple4<String, String, String, Double>> {     private Connection connection;     private PreparedStatement preparedStatement;     @Override     public void open(Configuration parameters) throws Exception {         connection=DriverManager.getConnection("jdbc:mysql://localhost:3306/dy_flink?characterEncoding=utf-8","root","");         preparedStatement=connection.prepareStatement("select name from t_category where id = ?");         super.open(parameters);     }      @Override     public Tuple4<String, String, String, Double> map(String s) throws Exception {          String[] fields = s.split(",");          String cid = fields[0];          preparedStatement.setInt(1,Integer.parseInt(cid));          ResultSet resultSet = preparedStatement.executeQuery();         String name = "未知";          while (resultSet.next()){                name = resultSet.getString(1);          }          resultSet.close();         return  Tuple4.of(fields[0], fields[1], name, Double.parseDouble(fields[3]));     }          @Override     public void close() throws Exception {         if(connection!=null){             connection.close();         }         if(preparedStatement!=null){             preparedStatement.close();         }     } } 

flatMap扁平化映射(DataStream → DataStream)

- 该方法是将一个DataStream调用flatMap方法返回一个新的DataStream,本质上是将该DataStream中的对应的每一条数据依次迭代出来,应用flatMap方法传入的计算逻辑,返回一个新的DataStream。原来的DataStream中输入的一条数据经过flatMap方法传入的计算逻辑后,会返回零到多条数据。所谓的扁平化即将原来的数据压平,返回多条数据。 
package com.lyj.sx.flink.day03;  import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;  public class FlatMapDemo1 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889);          SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapData = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {             @Override             public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                 String[] strings = value.split(" ");                 for (String string : strings) {                     out.collect(Tuple2.of(string, 1));                 }             }         });          flatMapData.print();          env.execute("pxj");     } } 
package com.lyj.sx.flink.day03;  import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;  public class FlatMapDemo2 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8887);          SingleOutputStreamOperator<Tuple2<String, Integer>> myflatMap = source.transform("MyflatMap", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {          }), new MyflatMap());          myflatMap.print();         env.execute("pxj");     } } 
package com.lyj.sx.flink.day03;  import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;  public  class MyflatMap extends AbstractStreamOperator<Tuple2<String, Integer>> implements OneInputStreamOperator<String, Tuple2<String, Integer>> {     @Override     public void processElement(StreamRecord<String> element) throws Exception {          String[] split = element.getValue().split(",");         for (String s : split) {             output.collect(element.replace(Tuple2.of(s,1)));         }     }      @Override     public void setKeyContextElement(StreamRecord<String> record) throws Exception {         System.out.println("StreamRecord..............");         OneInputStreamOperator.super.setKeyContextElement(record);          } } 

keyBy按key分区(DataStream → KeyedStream)

package com.lyj.sx.flink.day03;   import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class KeyByDemo1 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8888);          MapFunction<String, Tuple2<String, Integer>> mapFunction = new MapFunction<String, Tuple2<String, Integer>>() {             Tuple2<String, Integer> t;              @Override             public Tuple2<String, Integer> map(String s) throws Exception {                 String[] strings = s.split(" ");                 for (String string : strings) {                     t = Tuple2.of(string, 1);                 }                 return t;             }         };          source.map(mapFunction).print();          env.execute("pxj");     } } 
 package com.lyj.sx.flink.day03;   import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class KeyByDemo2 {     public static void main(String[] args) throws  Exception {          StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);          SingleOutputStreamOperator<Tuple3<String, String, Double>> tpStream = source.map(new MapFunction<String, Tuple3<String, String, Double>>() {             @Override             public Tuple3<String, String, Double> map(String s) throws Exception {                 String[] fields = s.split(",");                 return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));             }         });         KeyedStream<Tuple3<String, String, Double>, Tuple> tuple3TupleKeyedStream = tpStream.keyBy("f0", "f1");         tuple3TupleKeyedStream.print();         env.execute("pxj");     } } 
package com.lyj.sx.flink.day03;  import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class KeyByDemo3 {     public static void main(String[] args)throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);          SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(new MapFunction<String, Tuple2<String, Integer>>() {             Tuple2<String,Integer> t;              @Override             public Tuple2<String, Integer> map(String s) throws Exception {                  for (String string : s.split(",")) {                      t=Tuple2.of(string,1);                  }                 return t;             }         });          map.print();         KeyedStream<Tuple2<String, Integer>, String> keyedStream = map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {             @Override             public String getKey(Tuple2<String, Integer> value) throws Exception {                 return value.f0;             }         });         keyedStream.print();         env.execute("pxj");     } } 
package com.lyj.sx.flink.day03;  import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class KeyByDemo4 {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());         DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);          SingleOutputStreamOperator<Tuple3<String, String, Double>> mapped = source.map(new MapFunction<String, Tuple3<String, String, Double>>() {             @Override             public Tuple3<String, String, Double> map(String s) throws Exception {                 String[] fields = s.split(",");                 return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));             }         });         mapped.keyBy(new KeySelector<Tuple3<String, String, Double>, String>() {             @Override             public String getKey(Tuple3<String, String, Double> key) throws Exception {                 return key.f0+key.f1;             }         }).print();         env.execute("pxj");     } } 
 package com.lyj.sx.flink.day03;   import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class KeyByDemo5 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);         //山东省,济南市,5000          SingleOutputStreamOperator<Tuple3<String, String, Double>> map = source.map(new MapFunction<String, Tuple3<String, String, Double>>() {             @Override             public Tuple3<String, String, Double> map(String s) throws Exception {                 String[] fields = s.split(",");                 return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));             }         });          KeyedStream<Tuple3<String, String, Double>, Tuple2<String, String>> tuple3Tuple2KeyedStream = map.keyBy(t -> Tuple2.of(t.f0, t.f1), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {         }));         tuple3Tuple2KeyedStream.print();         env.execute("pxj");     } } 
 package com.lyj.sx.flink.day03;  import com.lyj.sx.flink.bean.DataBean; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  import java.beans.beancontext.BeanContext;  public class KeyByDemo6 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);          SingleOutputStreamOperator<DataBean> beanStream = source.map(new MapFunction<String, DataBean>() {             @Override             public DataBean map(String s) throws Exception {                 String[] fields = s.split(",");                 return DataBean.of(fields[0], fields[1], Double.parseDouble(fields[2]));             }         });         KeyedStream<DataBean, DataBean> keyedStream = beanStream.keyBy(t -> t);          keyedStream.print();              env.execute("pxj");     } } 

filter过滤(DataStream → DataStream)

- 该方法是将一个DataStream调用filter方法返回一个新的DataStream,本质上是将该DataStream中的对应的每一条输入数据依次迭代出来,应用filter方法传入的过滤逻辑,返回一个新的DataStream。原来的DataStream中输入的一条数据经过fliter方法传入的过滤逻辑后,返回true就会保留这条数据,返回false就会过滤掉该数据。 
 package com.lyj.sx.flink.day03;  import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class FilterDemo1 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);          SingleOutputStreamOperator<String> filter = source.filter(new FilterFunction<String>() {             @Override             public boolean filter(String s) throws Exception {                 return s.startsWith("h");             }         }).setParallelism(2);          filter.print();          env.execute("pxj");     } } 
 package com.lyj.sx.flink.day03;  import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamFilter;  public class FilterDemo2 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);          SingleOutputStreamOperator<String> transform = source.transform("myfilter", TypeInformation.of(String.class), new StreamFilter<>(w -> w.startsWith("h")));          transform.print();          env.execute("pxj");     } } 
package com.lyj.sx.flink.day03;  import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class FilterDemo3 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);          SingleOutputStreamOperator<String> transform = source.transform("MyFilterFunction", TypeInformation.of(String.class), new MyFilterFunction());          transform.print();          env.execute("pxj");     } } 
 package com.lyj.sx.flink.day03;  import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;  public class MyFilterFunction extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {     @Override     public void processElement(StreamRecord<String> element) throws Exception {          String elementValue = element.getValue();          if(elementValue.startsWith("h")){              output.collect(element);          }     }      @Override     public void setKeyContextElement(StreamRecord<String> record) throws Exception {         OneInputStreamOperator.super.setKeyContextElement(record);         System.out.println("setKeyContextElement.........");     } } 

底层实现filter and map

package com.lyj.sx.flink.day03;  import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class FilterAndMapDemo {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8885);          SingleOutputStreamOperator<Double> myFilterAndMap = source.transform("MyFilterAndMap", TypeInformation.of(Double.class), new MyFilterAndMap());          myFilterAndMap.print();         env.execute("pxj");     } } package com.lyj.sx.flink.day03;  import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;  public class MyFilterAndMap extends AbstractStreamOperator<Double> implements OneInputStreamOperator<String, Double> {     @Override     public void processElement(StreamRecord<String> element) throws Exception {          String value = element.getValue();          try {              int i = Integer.parseInt(value);              if(i%2==0){                  double r=i*10.1;                  output.collect(element.replace(r));              }          }catch (Exception e){              e.printStackTrace();          }      }      @Override     public void setKeyContextElement(StreamRecord<String> record) throws Exception {         OneInputStreamOperator.super.setKeyContextElement(record);     } } 

reduce

DataStream → KeyedStream
键控数据流(keyBy之后的流)上的"滚动"聚合。将当前元素与上次聚合的值组合并计算出新值输出。
它的滚动聚合逻辑没有写死,而是由用户通过ReduceFunction来传入。返回值必须和输入类型一致

在这里插入图片描述

package com.lyj.sx.flink.day03;  import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;   public class ReduceDemo1 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886); //         MapFunction<String, Tuple2<String, Integer>> mapFunction = new MapFunction<String, Tuple2<String, Integer>>() { //            Tuple2<String, Integer> tuple2; // //            @Override //            public Tuple2<String, Integer> map(String value) throws Exception { //                String[] strings = value.split(","); //                for (String s : strings) { //                    tuple2 = Tuple2.of(s, 1); //                } //                return tuple2; //            } //        };         FlatMapFunction<String, Tuple2<String, Integer>> mapFunction = new FlatMapFunction<String, Tuple2<String, Integer>>() {             @Override             public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                 String[] strings = value.split(",");                 for (String s : strings) {                     out.collect(Tuple2.of(s, 1));                 }             }         };         SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.flatMap(mapFunction);          KeyedStream<Tuple2<String, Integer>, Object> keyBy = map.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {             @Override             public Object getKey(Tuple2<String, Integer> value) throws Exception {                 return value.f0;             }         });          SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = keyBy.reduce(new ReduceFunction<Tuple2<String, Integer>>() {             @Override             public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {                 value1.f1 = value1.f1 + value2.f1;                 return value1;             }         });          reduce.print();          env.execute("pxj");     } } 

sum

package com.lyj.sx.flink.day04;  import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;  public class SumDemo {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);         FlatMapFunction<String, Tuple2<String, Integer>> mapFunction = new FlatMapFunction<String, Tuple2<String, Integer>>() {             @Override             public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                 String[] strings = value.split(",");                 for (String s : strings) {                     out.collect(Tuple2.of(s, 1));                 }             }         };         SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.flatMap(mapFunction);         KeyedStream<Tuple2<String, Integer>, Object> keyBy = map.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {             @Override             public Object getKey(Tuple2<String, Integer> value) throws Exception {                 return value.f0;             }         });          KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = map.keyBy(t -> t.f0);         SingleOutputStreamOperator<Tuple2<String, Integer>> summed = tuple2StringKeyedStream.sum("f1");          summed.print();          env.execute("pxj");     } } 

union

union 合并(DataStream * → DataStream)
该方法可以将两个或者多个数据类型一致的DataStream合并成一个DataStream。DataStream union(DataStream… streams)可以看出DataStream的union方法的参数为可变参数,即可以合并两个或多个数据类型一致的DataStream。

package com.lyj.sx.flink.day04;  import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class UnionDemo {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8885);          DataStreamSource<String> source1 = env.socketTextStream("192.168.25.62", 8889);          DataStream<String> union = source.union(source1);          union.print();          env.execute("pxj");     } } 
package com.lyj.sx.flink.day04;  import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class UnionDemo2 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8885);          SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt);          DataStreamSource<String> source1 = env.socketTextStream("192.168.25.62", 8889);          SingleOutputStreamOperator<Integer> map1 = source1.map(Integer::parseInt);          DataStream<Integer> union = map.union(map1);          union.print();          env.execute("pxj");      } } 
package com.lyj.sx.flink.day04;  import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class UnionDemo3 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889);          SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt);          DataStream<Integer> union = map.union(map);          union.print();          env.execute("pxj");     } } 

sum

package com.lyj.sx.flink.day04;  import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;  public class SumDemo {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);         FlatMapFunction<String, Tuple2<String, Integer>> mapFunction = new FlatMapFunction<String, Tuple2<String, Integer>>() {             @Override             public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                 String[] strings = value.split(",");                 for (String s : strings) {                     out.collect(Tuple2.of(s, 1));                 }             }         };         SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.flatMap(mapFunction);         KeyedStream<Tuple2<String, Integer>, Object> keyBy = map.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {             @Override             public Object getKey(Tuple2<String, Integer> value) throws Exception {                 return value.f0;             }         });          KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = map.keyBy(t -> t.f0);         SingleOutputStreamOperator<Tuple2<String, Integer>> summed = tuple2StringKeyedStream.sum("f1");          summed.print();          env.execute("pxj");     } } 

Project

package com.lyj.sx.flink.day04;  import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class ProjectDemo {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); //         DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889);         DataStreamSource<String> lines = env.socketTextStream("192.168.25.62", 8888);          SingleOutputStreamOperator<Tuple3<String, String, Double>> tpStream = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {              @Override             public Tuple3<String, String, Double> map(String value) throws Exception {                 String[] fields = value.split(",");                 return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));             }         });          SingleOutputStreamOperator<Tuple> res = tpStream.project(2, 0);         res.print();          env.execute("pxj");  /*山东省,济南市,5000 * 2024-06-10 22:56:14,686 main ERROR Unable to locate appender "Console" for logger config "root" 12> (5000.0,山东省)  * */      } } 

滚动聚合 (组内计算)

滚动聚合算子,是多个聚合算子的统称,有sum、min、minBy、max、maxBy;
这些算子的底层逻辑都是维护一个聚合值,并使用每条流入的数据对聚合值进行滚动更新;
这些算子都只能在KeyedStream上调用(就是必须keyby后调用);
MinBy maxBy 返回的是结果的那条数据
Min max sum的计算逻辑流程如下 (返回的是第一条数据)
在这里插入图片描述

package com.lyj.sx.flink.day04;  import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class MinDemo {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());         DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889);         MapFunction<String, Tuple3<String, String, Integer>> mapFunction = new MapFunction<String, Tuple3<String, String, Integer>>() {             @Override             public Tuple3<String, String, Integer> map(String value) throws Exception {                 String[] strings = value.split(",");                 return Tuple3.of(strings[0], strings[1], Integer.parseInt(strings[2]));             }         };         SingleOutputStreamOperator<Tuple3<String, String, Integer>> map = source.map(mapFunction);         KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = map.keyBy(t -> t.f1);         SingleOutputStreamOperator<Tuple3<String, String, Integer>> max = tuple3StringKeyedStream.min(2);         max.print();         env.execute("pxj");     } } /**输入:  * pxj,pxj,2  * sx,pxj,4  * lyj,pxj,8  * sx,pxj,1  *  *输出:  * 2024-06-10 16:17:21,539 main ERROR Unable to locate appender "Console" for logger config "root"  * 4> (lyj,pxj,8)  * 4> (lyj,pxj,2)  * 4> (lyj,pxj,2)  * 4> (lyj,pxj,1)  *  * 结论:  * 返回值,最小值字段以外,其他字段是第一条输入数据的值;  * */ 
package com.lyj.sx.flink.day04;  import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class MinByDemo {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());         DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889);         MapFunction<String, Tuple3<String, String, Integer>> mapFunction = new MapFunction<String, Tuple3<String, String, Integer>>() {             @Override             public Tuple3<String, String, Integer> map(String value) throws Exception {                 String[] strings = value.split(",");                 return Tuple3.of(strings[0], strings[1], Integer.parseInt(strings[2]));             }         };         SingleOutputStreamOperator<Tuple3<String, String, Integer>> map = source.map(mapFunction);         KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = map.keyBy(t -> t.f1);         SingleOutputStreamOperator<Tuple3<String, String, Integer>> max = tuple3StringKeyedStream.minBy(2);         max.print();         env.execute("pxj");     } } /**  *输入:  * pxj,pxj,2  * sx,pxj,4  * lyj,pxj,8  * sx,pxj,1  *  * 输出:  * 2024-06-10 21:39:49,788 main ERROR Unable to locate appender "Console" for logger config "root"  * 4> (pxj,pxj,2)  * 4> (pxj,pxj,2)  * 4> (pxj,pxj,2)  * 4> (sx,pxj,1)  * 结论  * :返回值,就是最小值字段所在的那条数据;  */ 
package com.lyj.sx.flink.day04;  import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class MaxDemoCopy {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889);          MapFunction<String, Tuple3<String, String, Integer>> mapFunction = new MapFunction<String, Tuple3<String, String, Integer>>() {             @Override             public Tuple3<String, String, Integer> map(String value) throws Exception {                 String[] strings = value.split(",");                 return Tuple3.of(strings[0], strings[1], Integer.parseInt(strings[2]));             }         };          SingleOutputStreamOperator<Tuple3<String, String, Integer>> map = source.map(mapFunction);          KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = map.keyBy(t -> t.f1);          SingleOutputStreamOperator<Tuple3<String, String, Integer>> max = tuple3StringKeyedStream.max(2);          max.print();          env.execute("pxj");     } } /***  * 输入:  * pxj,pxj,2  * sx,pxj,4  * lyj,pxj,8  * sx,pxj,1  *  *  * 输出:  * 2024-06-10 14:42:55,922 main ERROR Unable to locate appender "Console" for logger config "root"  * 4> (pxj,pxj,2)  * 4> (pxj,pxj,4)  * 4> (pxj,pxj,8)  * 4> (pxj,pxj,8)  * 结论:1.返回返回的是第一条数据  *  *  *  *  *  * */ 
package com.lyj.sx.flink.day04;  import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class MaxDemo {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889);          MapFunction<String, Tuple2<String, Integer>> mapFunction = new MapFunction<String, Tuple2<String, Integer>>() {             @Override             public Tuple2<String, Integer> map(String value) throws Exception {                 String[] fields = value.split(",");                 return Tuple2.of(fields[0], Integer.parseInt(fields[1]));             }         };          KeyedStream<Tuple2<String, Integer>, String> keyed = source.map(mapFunction).keyBy(t -> t.f0);         SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.max(1);         result.print();         env.execute("pxj");     } } 
package com.lyj.sx.flink.day04;  import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class MaxByDemoCopy {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());         DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889);         MapFunction<String, Tuple3<String, String, Integer>> mapFunction = new MapFunction<String, Tuple3<String, String, Integer>>() {             @Override             public Tuple3<String, String, Integer> map(String value) throws Exception {                 String[] strings = value.split(",");                 return Tuple3.of(strings[0], strings[1], Integer.parseInt(strings[2]));             }         };         SingleOutputStreamOperator<Tuple3<String, String, Integer>> map = source.map(mapFunction);         KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = map.keyBy(t -> t.f1);         SingleOutputStreamOperator<Tuple3<String, String, Integer>> max = tuple3StringKeyedStream.maxBy(2);         max.print();         env.execute("pxj");     } } /**  * 输入:  * pxj,pxj,2  * sx,pxj,4  * lyj,pxj,8  * sx,pxj,1  * sx,pxj,20  * lyj,pxj,1  * 输出:   * 2024-06-10 14:48:30,222 main ERROR Unable to locate appender "Console" for logger config "root"  * 4> (pxj,pxj,2)  * 4> (lyj,pxj,8)  * 4> (lyj,pxj,8)  * 4> (lyj,pxj,8)  * 4> (sx,pxj,20)  * 4> (sx,pxj,20)  *结论:返回值,就是最大值字段所在的那条数据;  *  *  *  *  *  * */ 
package com.lyj.sx.flink.day04;  import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class MaxByDemo {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());         DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889);         MapFunction<String, Tuple2<String, Integer>> mapFunction = new MapFunction<String, Tuple2<String, Integer>>() {             @Override             public Tuple2<String, Integer> map(String value) throws Exception {                 String[] fields = value.split(",");                 return Tuple2.of(fields[0], Integer.parseInt(fields[1]));             }         };         KeyedStream<Tuple2<String, Integer>, String> keyed = source.map(mapFunction).keyBy(t -> t.f0);         SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.maxBy(1);         result.print();         env.execute("pxj");     } } 

Union

package com.lyj.sx.flink.day04;  import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class UnionDemo {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8885);          DataStreamSource<String> source1 = env.socketTextStream("192.168.25.62", 8889);          DataStream<String> union = source.union(source1);          union.print();          env.execute("pxj");     } } 
package com.lyj.sx.flink.day04;  import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class UnionDemo2 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8885);          SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt);          DataStreamSource<String> source1 = env.socketTextStream("192.168.25.62", 8889);          SingleOutputStreamOperator<Integer> map1 = source1.map(Integer::parseInt);          DataStream<Integer> union = map.union(map1);          union.print();          env.execute("pxj");      } } 
package com.lyj.sx.flink.day04;  import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class UnionDemo3 {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889);          SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt);          DataStream<Integer> union = map.union(map);          union.print();          env.execute("pxj");     } } 

Connect

package com.lyj.sx.flink.day04;  import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction;  public class ConnectDemo {     public static void main(String[] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());          DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8888);          DataStreamSource<String> source1 = env.socketTextStream("192.168.25.62", 9999);          SingleOutputStreamOperator<Integer> map = source1.map(Integer::parseInt);          ConnectedStreams<String, Integer> connect = source.connect(map);          SingleOutputStreamOperator<String> map1 = connect.map(new CoMapFunction<String, Integer, String>() {             @Override             public String map1(String value) throws Exception {                 return value.toUpperCase();             }              @Override             public String map2(Integer value) throws Exception {                 return value * 10 + "";             }         });          map1.print();         env.execute("pxj");     } } 

整理人:pxj_sx(潘陈)
日 期:2024-06-30 15:46:02

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!