阅读量:0
Apache Flink 是一个分布式流处理框架,用于实时处理无界和有界数据流
- 简单的流处理:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class SimpleStreamProcessing { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> source = env.fromElements("Hello", "Flink", "on", "Ubuntu"); DataStream<String> processed = source.map(s -> s.toUpperCase()); processed.print(); env.execute("Simple Stream Processing Example"); } }
- 计算单词频率:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> source = env.fromElements("Hello", "Flink", "on", "Ubuntu", "is", "awesome"); DataStream<Tuple2<String, Integer>> wordCounts = source .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] words = value.split("\\s+"); for (String word : words) { out.collect(new Tuple2<>(word, 1)); } } }) .keyBy(0) .sum(1); wordCounts.print(); env.execute("Word Count Example"); } }
- 使用窗口函数计算滚动平均值:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class RollingAverageExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Double> source = env.fromElements(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0); DataStream<Double> rollingAverage = source .timeWindowAll(Time.seconds(3)) .reduce((value1, value2) -> value1 + value2) .map(sum -> sum / 3); rollingAverage.print(); env.execute("Rolling Average Example"); } }
要运行这些示例,请确保已安装 Java 开发工具包(JDK)并正确配置了 Flink。然后,将示例代码保存为 Java 文件(例如 SimpleStreamProcessing.java
),并使用以下命令编译和运行:
javac -cp /path/to/flink/lib/*: SimpleStreamProcessing.java java -cp /path/to/flink/lib/*: SimpleStreamProcessing.class SimpleStreamProcessing
请注意,您需要根据实际情况替换 /path/to/flink/lib/
为 Flink 安装目录中 lib
文件夹的路径。