阅读量:0
flink Transformation算子部分
Transformation算子
map
该方法是将一个DataStream调用map方法返回一个新的DataStream。本质是将该DataStream中对应的每一条数据依次迭代出来,应用map方法传入的计算逻辑,返回一个新的DataStream。原来的DataStream中对应的每一条数据,与新生成的DataStream中数据是一一对应的,也可以说是存在着映射关系的。
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MapDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataSource = env.socketTextStream("192.168.25.62", 8899); dataSource.map(new QueryCategoryNameFromMySQLFunction()).print(); env.execute(); } }
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.Configuration; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; public class QueryCategoryNameFromMySQLFunction extends RichMapFunction<String, Tuple4<String, String, String, Double>> { private Connection connection; private PreparedStatement preparedStatement; @Override public void open(Configuration parameters) throws Exception { connection=DriverManager.getConnection("jdbc:mysql://localhost:3306/dy_flink?characterEncoding=utf-8","root",""); preparedStatement=connection.prepareStatement("select name from t_category where id = ?"); super.open(parameters); } @Override public Tuple4<String, String, String, Double> map(String s) throws Exception { String[] fields = s.split(","); String cid = fields[0]; preparedStatement.setInt(1,Integer.parseInt(cid)); ResultSet resultSet = preparedStatement.executeQuery(); String name = "未知"; while (resultSet.next()){ name = resultSet.getString(1); } resultSet.close(); return Tuple4.of(fields[0], fields[1], name, Double.parseDouble(fields[3])); } @Override public void close() throws Exception { if(connection!=null){ connection.close(); } if(preparedStatement!=null){ preparedStatement.close(); } } }
flatMap扁平化映射(DataStream → DataStream)
- 该方法是将一个DataStream调用flatMap方法返回一个新的DataStream,本质上是将该DataStream中的对应的每一条数据依次迭代出来,应用flatMap方法传入的计算逻辑,返回一个新的DataStream。原来的DataStream中输入的一条数据经过flatMap方法传入的计算逻辑后,会返回零到多条数据。所谓的扁平化即将原来的数据压平,返回多条数据。
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class FlatMapDemo1 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889); SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapData = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] strings = value.split(" "); for (String string : strings) { out.collect(Tuple2.of(string, 1)); } } }); flatMapData.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class FlatMapDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8887); SingleOutputStreamOperator<Tuple2<String, Integer>> myflatMap = source.transform("MyflatMap", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() { }), new MyflatMap()); myflatMap.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day03; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; public class MyflatMap extends AbstractStreamOperator<Tuple2<String, Integer>> implements OneInputStreamOperator<String, Tuple2<String, Integer>> { @Override public void processElement(StreamRecord<String> element) throws Exception { String[] split = element.getValue().split(","); for (String s : split) { output.collect(element.replace(Tuple2.of(s,1))); } } @Override public void setKeyContextElement(StreamRecord<String> record) throws Exception { System.out.println("StreamRecord.............."); OneInputStreamOperator.super.setKeyContextElement(record); } }
keyBy按key分区(DataStream → KeyedStream)
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class KeyByDemo1 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8888); MapFunction<String, Tuple2<String, Integer>> mapFunction = new MapFunction<String, Tuple2<String, Integer>>() { Tuple2<String, Integer> t; @Override public Tuple2<String, Integer> map(String s) throws Exception { String[] strings = s.split(" "); for (String string : strings) { t = Tuple2.of(string, 1); } return t; } }; source.map(mapFunction).print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class KeyByDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886); SingleOutputStreamOperator<Tuple3<String, String, Double>> tpStream = source.map(new MapFunction<String, Tuple3<String, String, Double>>() { @Override public Tuple3<String, String, Double> map(String s) throws Exception { String[] fields = s.split(","); return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2])); } }); KeyedStream<Tuple3<String, String, Double>, Tuple> tuple3TupleKeyedStream = tpStream.keyBy("f0", "f1"); tuple3TupleKeyedStream.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class KeyByDemo3 { public static void main(String[] args)throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886); SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(new MapFunction<String, Tuple2<String, Integer>>() { Tuple2<String,Integer> t; @Override public Tuple2<String, Integer> map(String s) throws Exception { for (String string : s.split(",")) { t=Tuple2.of(string,1); } return t; } }); map.print(); KeyedStream<Tuple2<String, Integer>, String> keyedStream = map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); keyedStream.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class KeyByDemo4 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886); SingleOutputStreamOperator<Tuple3<String, String, Double>> mapped = source.map(new MapFunction<String, Tuple3<String, String, Double>>() { @Override public Tuple3<String, String, Double> map(String s) throws Exception { String[] fields = s.split(","); return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2])); } }); mapped.keyBy(new KeySelector<Tuple3<String, String, Double>, String>() { @Override public String getKey(Tuple3<String, String, Double> key) throws Exception { return key.f0+key.f1; } }).print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class KeyByDemo5 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886); //山东省,济南市,5000 SingleOutputStreamOperator<Tuple3<String, String, Double>> map = source.map(new MapFunction<String, Tuple3<String, String, Double>>() { @Override public Tuple3<String, String, Double> map(String s) throws Exception { String[] fields = s.split(","); return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2])); } }); KeyedStream<Tuple3<String, String, Double>, Tuple2<String, String>> tuple3Tuple2KeyedStream = map.keyBy(t -> Tuple2.of(t.f0, t.f1), TypeInformation.of(new TypeHint<Tuple2<String, String>>() { })); tuple3Tuple2KeyedStream.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day03; import com.lyj.sx.flink.bean.DataBean; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.beans.beancontext.BeanContext; public class KeyByDemo6 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886); SingleOutputStreamOperator<DataBean> beanStream = source.map(new MapFunction<String, DataBean>() { @Override public DataBean map(String s) throws Exception { String[] fields = s.split(","); return DataBean.of(fields[0], fields[1], Double.parseDouble(fields[2])); } }); KeyedStream<DataBean, DataBean> keyedStream = beanStream.keyBy(t -> t); keyedStream.print(); env.execute("pxj"); } }
filter过滤(DataStream → DataStream)
- 该方法是将一个DataStream调用filter方法返回一个新的DataStream,本质上是将该DataStream中的对应的每一条输入数据依次迭代出来,应用filter方法传入的过滤逻辑,返回一个新的DataStream。原来的DataStream中输入的一条数据经过fliter方法传入的过滤逻辑后,返回true就会保留这条数据,返回false就会过滤掉该数据。
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class FilterDemo1 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886); SingleOutputStreamOperator<String> filter = source.filter(new FilterFunction<String>() { @Override public boolean filter(String s) throws Exception { return s.startsWith("h"); } }).setParallelism(2); filter.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamFilter; public class FilterDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886); SingleOutputStreamOperator<String> transform = source.transform("myfilter", TypeInformation.of(String.class), new StreamFilter<>(w -> w.startsWith("h"))); transform.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class FilterDemo3 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886); SingleOutputStreamOperator<String> transform = source.transform("MyFilterFunction", TypeInformation.of(String.class), new MyFilterFunction()); transform.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day03; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; public class MyFilterFunction extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> { @Override public void processElement(StreamRecord<String> element) throws Exception { String elementValue = element.getValue(); if(elementValue.startsWith("h")){ output.collect(element); } } @Override public void setKeyContextElement(StreamRecord<String> record) throws Exception { OneInputStreamOperator.super.setKeyContextElement(record); System.out.println("setKeyContextElement........."); } }
底层实现filter and map
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class FilterAndMapDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8885); SingleOutputStreamOperator<Double> myFilterAndMap = source.transform("MyFilterAndMap", TypeInformation.of(Double.class), new MyFilterAndMap()); myFilterAndMap.print(); env.execute("pxj"); } } package com.lyj.sx.flink.day03; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; public class MyFilterAndMap extends AbstractStreamOperator<Double> implements OneInputStreamOperator<String, Double> { @Override public void processElement(StreamRecord<String> element) throws Exception { String value = element.getValue(); try { int i = Integer.parseInt(value); if(i%2==0){ double r=i*10.1; output.collect(element.replace(r)); } }catch (Exception e){ e.printStackTrace(); } } @Override public void setKeyContextElement(StreamRecord<String> record) throws Exception { OneInputStreamOperator.super.setKeyContextElement(record); } }
reduce
DataStream → KeyedStream
键控数据流(keyBy之后的流)上的"滚动"聚合。将当前元素与上次聚合的值组合并计算出新值输出。
它的滚动聚合逻辑没有写死,而是由用户通过ReduceFunction来传入。返回值必须和输入类型一致
package com.lyj.sx.flink.day03; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class ReduceDemo1 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886); // MapFunction<String, Tuple2<String, Integer>> mapFunction = new MapFunction<String, Tuple2<String, Integer>>() { // Tuple2<String, Integer> tuple2; // // @Override // public Tuple2<String, Integer> map(String value) throws Exception { // String[] strings = value.split(","); // for (String s : strings) { // tuple2 = Tuple2.of(s, 1); // } // return tuple2; // } // }; FlatMapFunction<String, Tuple2<String, Integer>> mapFunction = new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] strings = value.split(","); for (String s : strings) { out.collect(Tuple2.of(s, 1)); } } }; SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.flatMap(mapFunction); KeyedStream<Tuple2<String, Integer>, Object> keyBy = map.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() { @Override public Object getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = keyBy.reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { value1.f1 = value1.f1 + value2.f1; return value1; } }); reduce.print(); env.execute("pxj"); } }
sum
package com.lyj.sx.flink.day04; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class SumDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886); FlatMapFunction<String, Tuple2<String, Integer>> mapFunction = new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] strings = value.split(","); for (String s : strings) { out.collect(Tuple2.of(s, 1)); } } }; SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.flatMap(mapFunction); KeyedStream<Tuple2<String, Integer>, Object> keyBy = map.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() { @Override public Object getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = map.keyBy(t -> t.f0); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = tuple2StringKeyedStream.sum("f1"); summed.print(); env.execute("pxj"); } }
union
union 合并(DataStream * → DataStream)
该方法可以将两个或者多个数据类型一致的DataStream合并成一个DataStream。DataStream union(DataStream… streams)可以看出DataStream的union方法的参数为可变参数,即可以合并两个或多个数据类型一致的DataStream。
package com.lyj.sx.flink.day04; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class UnionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8885); DataStreamSource<String> source1 = env.socketTextStream("192.168.25.62", 8889); DataStream<String> union = source.union(source1); union.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day04; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class UnionDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8885); SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt); DataStreamSource<String> source1 = env.socketTextStream("192.168.25.62", 8889); SingleOutputStreamOperator<Integer> map1 = source1.map(Integer::parseInt); DataStream<Integer> union = map.union(map1); union.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day04; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class UnionDemo3 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889); SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt); DataStream<Integer> union = map.union(map); union.print(); env.execute("pxj"); } }
sum
package com.lyj.sx.flink.day04; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class SumDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886); FlatMapFunction<String, Tuple2<String, Integer>> mapFunction = new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] strings = value.split(","); for (String s : strings) { out.collect(Tuple2.of(s, 1)); } } }; SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.flatMap(mapFunction); KeyedStream<Tuple2<String, Integer>, Object> keyBy = map.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() { @Override public Object getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = map.keyBy(t -> t.f0); SingleOutputStreamOperator<Tuple2<String, Integer>> summed = tuple2StringKeyedStream.sum("f1"); summed.print(); env.execute("pxj"); } }
Project
package com.lyj.sx.flink.day04; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class ProjectDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); // DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889); DataStreamSource<String> lines = env.socketTextStream("192.168.25.62", 8888); SingleOutputStreamOperator<Tuple3<String, String, Double>> tpStream = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() { @Override public Tuple3<String, String, Double> map(String value) throws Exception { String[] fields = value.split(","); return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2])); } }); SingleOutputStreamOperator<Tuple> res = tpStream.project(2, 0); res.print(); env.execute("pxj"); /*山东省,济南市,5000 * 2024-06-10 22:56:14,686 main ERROR Unable to locate appender "Console" for logger config "root" 12> (5000.0,山东省) * */ } }
滚动聚合 (组内计算)
滚动聚合算子,是多个聚合算子的统称,有sum、min、minBy、max、maxBy;
这些算子的底层逻辑都是维护一个聚合值,并使用每条流入的数据对聚合值进行滚动更新;
这些算子都只能在KeyedStream上调用(就是必须keyby后调用);
MinBy maxBy 返回的是结果的那条数据
Min max sum的计算逻辑流程如下 (返回的是第一条数据)
package com.lyj.sx.flink.day04; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MinDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889); MapFunction<String, Tuple3<String, String, Integer>> mapFunction = new MapFunction<String, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(String value) throws Exception { String[] strings = value.split(","); return Tuple3.of(strings[0], strings[1], Integer.parseInt(strings[2])); } }; SingleOutputStreamOperator<Tuple3<String, String, Integer>> map = source.map(mapFunction); KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = map.keyBy(t -> t.f1); SingleOutputStreamOperator<Tuple3<String, String, Integer>> max = tuple3StringKeyedStream.min(2); max.print(); env.execute("pxj"); } } /**输入: * pxj,pxj,2 * sx,pxj,4 * lyj,pxj,8 * sx,pxj,1 * *输出: * 2024-06-10 16:17:21,539 main ERROR Unable to locate appender "Console" for logger config "root" * 4> (lyj,pxj,8) * 4> (lyj,pxj,2) * 4> (lyj,pxj,2) * 4> (lyj,pxj,1) * * 结论: * 返回值,最小值字段以外,其他字段是第一条输入数据的值; * */
package com.lyj.sx.flink.day04; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MinByDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889); MapFunction<String, Tuple3<String, String, Integer>> mapFunction = new MapFunction<String, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(String value) throws Exception { String[] strings = value.split(","); return Tuple3.of(strings[0], strings[1], Integer.parseInt(strings[2])); } }; SingleOutputStreamOperator<Tuple3<String, String, Integer>> map = source.map(mapFunction); KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = map.keyBy(t -> t.f1); SingleOutputStreamOperator<Tuple3<String, String, Integer>> max = tuple3StringKeyedStream.minBy(2); max.print(); env.execute("pxj"); } } /** *输入: * pxj,pxj,2 * sx,pxj,4 * lyj,pxj,8 * sx,pxj,1 * * 输出: * 2024-06-10 21:39:49,788 main ERROR Unable to locate appender "Console" for logger config "root" * 4> (pxj,pxj,2) * 4> (pxj,pxj,2) * 4> (pxj,pxj,2) * 4> (sx,pxj,1) * 结论 * :返回值,就是最小值字段所在的那条数据; */
package com.lyj.sx.flink.day04; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MaxDemoCopy { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889); MapFunction<String, Tuple3<String, String, Integer>> mapFunction = new MapFunction<String, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(String value) throws Exception { String[] strings = value.split(","); return Tuple3.of(strings[0], strings[1], Integer.parseInt(strings[2])); } }; SingleOutputStreamOperator<Tuple3<String, String, Integer>> map = source.map(mapFunction); KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = map.keyBy(t -> t.f1); SingleOutputStreamOperator<Tuple3<String, String, Integer>> max = tuple3StringKeyedStream.max(2); max.print(); env.execute("pxj"); } } /*** * 输入: * pxj,pxj,2 * sx,pxj,4 * lyj,pxj,8 * sx,pxj,1 * * * 输出: * 2024-06-10 14:42:55,922 main ERROR Unable to locate appender "Console" for logger config "root" * 4> (pxj,pxj,2) * 4> (pxj,pxj,4) * 4> (pxj,pxj,8) * 4> (pxj,pxj,8) * 结论:1.返回返回的是第一条数据 * * * * * * */
package com.lyj.sx.flink.day04; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MaxDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889); MapFunction<String, Tuple2<String, Integer>> mapFunction = new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] fields = value.split(","); return Tuple2.of(fields[0], Integer.parseInt(fields[1])); } }; KeyedStream<Tuple2<String, Integer>, String> keyed = source.map(mapFunction).keyBy(t -> t.f0); SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.max(1); result.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day04; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MaxByDemoCopy { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889); MapFunction<String, Tuple3<String, String, Integer>> mapFunction = new MapFunction<String, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(String value) throws Exception { String[] strings = value.split(","); return Tuple3.of(strings[0], strings[1], Integer.parseInt(strings[2])); } }; SingleOutputStreamOperator<Tuple3<String, String, Integer>> map = source.map(mapFunction); KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = map.keyBy(t -> t.f1); SingleOutputStreamOperator<Tuple3<String, String, Integer>> max = tuple3StringKeyedStream.maxBy(2); max.print(); env.execute("pxj"); } } /** * 输入: * pxj,pxj,2 * sx,pxj,4 * lyj,pxj,8 * sx,pxj,1 * sx,pxj,20 * lyj,pxj,1 * 输出: * 2024-06-10 14:48:30,222 main ERROR Unable to locate appender "Console" for logger config "root" * 4> (pxj,pxj,2) * 4> (lyj,pxj,8) * 4> (lyj,pxj,8) * 4> (lyj,pxj,8) * 4> (sx,pxj,20) * 4> (sx,pxj,20) *结论:返回值,就是最大值字段所在的那条数据; * * * * * * */
package com.lyj.sx.flink.day04; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class MaxByDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889); MapFunction<String, Tuple2<String, Integer>> mapFunction = new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] fields = value.split(","); return Tuple2.of(fields[0], Integer.parseInt(fields[1])); } }; KeyedStream<Tuple2<String, Integer>, String> keyed = source.map(mapFunction).keyBy(t -> t.f0); SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyed.maxBy(1); result.print(); env.execute("pxj"); } }
Union
package com.lyj.sx.flink.day04; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class UnionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8885); DataStreamSource<String> source1 = env.socketTextStream("192.168.25.62", 8889); DataStream<String> union = source.union(source1); union.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day04; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class UnionDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8885); SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt); DataStreamSource<String> source1 = env.socketTextStream("192.168.25.62", 8889); SingleOutputStreamOperator<Integer> map1 = source1.map(Integer::parseInt); DataStream<Integer> union = map.union(map1); union.print(); env.execute("pxj"); } }
package com.lyj.sx.flink.day04; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class UnionDemo3 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889); SingleOutputStreamOperator<Integer> map = source.map(Integer::parseInt); DataStream<Integer> union = map.union(map); union.print(); env.execute("pxj"); } }
Connect
package com.lyj.sx.flink.day04; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; public class ConnectDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8888); DataStreamSource<String> source1 = env.socketTextStream("192.168.25.62", 9999); SingleOutputStreamOperator<Integer> map = source1.map(Integer::parseInt); ConnectedStreams<String, Integer> connect = source.connect(map); SingleOutputStreamOperator<String> map1 = connect.map(new CoMapFunction<String, Integer, String>() { @Override public String map1(String value) throws Exception { return value.toUpperCase(); } @Override public String map2(Integer value) throws Exception { return value * 10 + ""; } }); map1.print(); env.execute("pxj"); } }
整理人:pxj_sx(潘陈)
日 期:2024-06-30 15:46:02