1、添加不同数据源
package com. baidu. keyue. deepsight. memory. test ;
import com. baidu. keyue. deepsight. memory. WordCount ;
import com. baidu. keyue. deepsight. memory. WordCountData ;
import org. apache. flink . api. common. RuntimeExecutionMode ;
import org. apache. flink . api. common. eventtime. WatermarkStrategy ;
import org. apache. flink . api. common. serialization. SimpleStringSchema ;
import org. apache. flink . api. common. typeinfo. Types ;
import org. apache. flink . api. connector. source. util. ratelimit. RateLimiterStrategy ;
import org. apache. flink . api. java. tuple. Tuple2 ;
import org. apache. flink . configuration. Configuration ;
import org. apache. flink . configuration. RestOptions ;
import org. apache. flink . connector. datagen. source. DataGeneratorSource ;
import org. apache. flink . connector. datagen. source. GeneratorFunction ;
import org. apache. flink . connector. file. src. FileSource ;
import org. apache. flink . connector. file. src. reader. TextLineInputFormat ;
import org. apache. flink . connector. kafka. source. KafkaSource ;
import org. apache. flink . connector. kafka. source. enumerator. initializer. OffsetsInitializer ;
import org. apache. flink . core. fs. Path ;
import org. apache. flink . streaming. api. datastream. DataStream ;
import org. apache. flink . streaming. api. environment. StreamExecutionEnvironment ;
public class EnvDemo {
public static void main ( String [ ] args) throws Exception {
Configuration configuration = new Configuration ( ) ;
configuration. set ( RestOptions . BIND_PORT , "8082" ) ;
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( configuration) ;
env. setRuntimeMode ( RuntimeExecutionMode . STREAMING ) ;
env. setParallelism ( 1 ) ;
DataStream < String > text = env. fromSource ( createDataGeneratorSource ( ) , WatermarkStrategy . noWatermarks ( ) , "datagen source" ) ;
DataStream < Tuple2 < String , Integer > > counts =
text. flatMap ( new WordCount. Tokenizer ( ) )
. keyBy ( 0 )
. sum ( 1 ) ;
counts. print ( ) ;
env. execute ( "WordCount" ) ;
}
public static FileSource crateFileSource ( ) {
FileSource < String > fileSource = FileSource . forRecordStreamFormat ( new TextLineInputFormat ( ) ,
new Path ( "input/word.txt" ) ) . build ( ) ;
return fileSource;
}
public static KafkaSource < String > crateKafkaSource ( ) {
KafkaSource < String > kafkaSource = KafkaSource . < String > builder ( )
. setBootstrapServers ( "ip-port" )
. setTopics ( "topic" )
. setGroupId ( "groupId" )
. setStartingOffsets ( OffsetsInitializer . earliest ( ) )
. setValueOnlyDeserializer ( new SimpleStringSchema ( ) )
. build ( ) ;
return kafkaSource;
}
public static DataGeneratorSource < String > createDataGeneratorSource ( ) {
DataGeneratorSource < String > dataGeneratorSource = new DataGeneratorSource < > (
( GeneratorFunction < Long , String > ) value -> "hello" + value,
100 ,
RateLimiterStrategy . perSecond ( 5 ) ,
Types . STRING
) ;
return dataGeneratorSource;
}
}
2、数据处理
package com. baidu. keyue. deepsight. memory. test ;
import com. baidu. keyue. deepsight. memory. WordCount ;
import com. baidu. keyue. deepsight. memory. bean. WaterSensor ;
import org. apache. flink . api. common. RuntimeExecutionMode ;
import org. apache. flink . api. common. eventtime. WatermarkStrategy ;
import org. apache. flink . api. common. functions. FilterFunction ;
import org. apache. flink . api. common. functions. FlatMapFunction ;
import org. apache. flink . api. common. functions. MapFunction ;
import org. apache. flink . api. common. functions. ReduceFunction ;
import org. apache. flink . api. common. serialization. SimpleStringSchema ;
import org. apache. flink . api. common. typeinfo. Types ;
import org. apache. flink . api. connector. source. util. ratelimit. RateLimiterStrategy ;
import org. apache. flink . api. java. functions. KeySelector ;
import org. apache. flink . api. java. tuple. Tuple2 ;
import org. apache. flink . configuration. Configuration ;
import org. apache. flink . configuration. RestOptions ;
import org. apache. flink . connector. datagen. source. DataGeneratorSource ;
import org. apache. flink . connector. datagen. source. GeneratorFunction ;
import org. apache. flink . connector. file. src. FileSource ;
import org. apache. flink . connector. file. src. reader. TextLineInputFormat ;
import org. apache. flink . connector. kafka. source. KafkaSource ;
import org. apache. flink . connector. kafka. source. enumerator. initializer. OffsetsInitializer ;
import org. apache. flink . core. fs. Path ;
import org. apache. flink . streaming. api. datastream. DataStream ;
import org. apache. flink . streaming. api. datastream. KeyedStream ;
import org. apache. flink . streaming. api. datastream. SingleOutputStreamOperator ;
import org. apache. flink . streaming. api. environment. StreamExecutionEnvironment ;
import org. apache. flink . util. Collector ;
public class DataProcessDemo {
public static void main ( String [ ] args) throws Exception {
Configuration configuration = new Configuration ( ) ;
configuration. set ( RestOptions . BIND_PORT , "8082" ) ;
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( configuration) ;
env. setRuntimeMode ( RuntimeExecutionMode . STREAMING ) ;
env. setParallelism ( 1 ) ;
DataStream < WaterSensor > sensorDs = env. fromElements (
new WaterSensor ( "s1" , 1L , 1 ) ,
new WaterSensor ( "s1" , 100L , 100 ) ,
new WaterSensor ( "s1" , 1000L , 1000 ) ,
new WaterSensor ( "s3" , 3L , 3 )
) ;
SingleOutputStreamOperator < String > map = sensorDs. map ( new MapFunction < WaterSensor , String > ( ) {
@Override
public String map ( WaterSensor waterSensor) throws Exception {
return waterSensor. getId ( ) + " : " + waterSensor. getVc ( ) ;
}
} ) ;
SingleOutputStreamOperator < WaterSensor > filter = sensorDs. filter ( new FilterFunction < WaterSensor > ( ) {
@Override
public boolean filter ( WaterSensor waterSensor) throws Exception {
return waterSensor. getVc ( ) > 1 ;
}
} ) ;
SingleOutputStreamOperator < String > flatMap = sensorDs. flatMap ( new FlatMapFunction < WaterSensor , String > ( ) {
@Override
public void flatMap ( WaterSensor waterSensor, Collector < String > collector) throws Exception {
if ( "s1" . equals ( waterSensor. getId ( ) ) ) {
collector. collect ( waterSensor. getId ( ) ) ;
} else {
collector. collect ( waterSensor. getId ( ) ) ;
collector. collect ( waterSensor. getVc ( ) . toString ( ) ) ;
}
}
} ) ;
KeyedStream < WaterSensor , String > keyBy = sensorDs. keyBy ( new KeySelector < WaterSensor , String > ( ) {
@Override
public String getKey ( WaterSensor waterSensor) throws Exception {
return waterSensor. getId ( ) ;
}
} ) ;
keyBy. reduce ( new ReduceFunction < WaterSensor > ( ) {
@Override
public WaterSensor reduce ( WaterSensor t2, WaterSensor t1) throws Exception {
System . out. println ( "t1=" + t1) ;
System . out. println ( "t2=" + t2) ;
return new WaterSensor ( t1. getId ( ) , t1. getTs ( ) + t2. getTs ( ) , t1. getVc ( ) + t2. getVc ( ) ) ;
}
} ) . print ( ) ;
env. execute ( "WordCount" ) ;
}
}
3、分流/合流
package com. baidu. keyue. deepsight. memory. test ;
import com. baidu. keyue. deepsight. memory. bean. WaterSensor ;
import org. apache. flink . api. common. RuntimeExecutionMode ;
import org. apache. flink . api. common. functions. FilterFunction ;
import org. apache. flink . api. common. functions. FlatMapFunction ;
import org. apache. flink . api. common. functions. MapFunction ;
import org. apache. flink . api. common. functions. ReduceFunction ;
import org. apache. flink . api. java. functions. KeySelector ;
import org. apache. flink . configuration. Configuration ;
import org. apache. flink . configuration. RestOptions ;
import org. apache. flink . streaming. api. datastream. DataStream ;
import org. apache. flink . streaming. api. datastream. KeyedStream ;
import org. apache. flink . streaming. api. datastream. SingleOutputStreamOperator ;
import org. apache. flink . streaming. api. environment. StreamExecutionEnvironment ;
import org. apache. flink . util. Collector ;
public class FenliuDemo {
public static void main ( String [ ] args) throws Exception {
Configuration configuration = new Configuration ( ) ;
configuration. set ( RestOptions . BIND_PORT , "8082" ) ;
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( configuration) ;
env. setRuntimeMode ( RuntimeExecutionMode . STREAMING ) ;
env. setParallelism ( 1 ) ;
DataStream < WaterSensor > sensorDs = env. fromElements (
new WaterSensor ( "s1" , 1L , 1 ) ,
new WaterSensor ( "s1" , 100L , 100 ) ,
new WaterSensor ( "s1" , 1000L , 1000 ) ,
new WaterSensor ( "s3" , 3L , 3 )
) ;
SingleOutputStreamOperator < WaterSensor > oushu = sensorDs. filter ( waterSensor -> waterSensor. getVc ( ) % 2 == 0 ) ;
SingleOutputStreamOperator < WaterSensor > jishu = sensorDs. filter ( waterSensor -> waterSensor. getVc ( ) % 2 == 1 ) ;
oushu. print ( "偶数流" ) ;
jishu. print ( "奇数流" ) ;
oushu. union ( jishu) . print ( "合并流" ) ;
env. execute ( "WordCount" ) ;
}
}
4、输出流 sink
package com.baidu.keyue.deepsight.memory.test;
import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink .api.common.RuntimeExecutionMode;
import org.apache.flink .api.common.serialization.SimpleStringEncoder;
import org.apache.flink .api.connector.sink.Sink;
import org.apache.flink .configuration.Configuration;
import org.apache.flink .configuration.RestOptions;
import org.apache.flink .connector.file.sink.FileSink;
import org.apache.flink .core.fs.Path;
import org.apache.flink .streaming.api.datastream.DataStream;
import org.apache.flink .streaming.api.environment.StreamExecutionEnvironment;
public class SinkDemo {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.set(RestOptions.BIND_PORT, "8082");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
// 设置流处理还是批处理
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);
// 读取数据
DataStream<WaterSensor> sensorDs = env.fromElements(
new WaterSensor("s1", 1L, 1),
new WaterSensor("s1", 100L, 100),
new WaterSensor("s1", 1000L, 1000),
new WaterSensor("s3", 3L, 3)
);
FileSink<WaterSensor> fileSink = FileSink.<WaterSensor>forRowFormat(new Path("/Users/chengyong03/Downloads/output/flink "),
new SimpleStringEncoder<>("UTF-8"))
.build();
sensorDs.sinkTo(fileSink);
env.execute("WordCount");
}
}
package com. baidu. keyue. deepsight. memory. test ;
import com. baidu. keyue. deepsight. memory. bean. WaterSensor ;
import org. apache. flink . streaming. api. datastream. DataStream ;
import org. apache. flink . streaming. api. environment. StreamExecutionEnvironment ;
import org. apache. flink . table. api. EnvironmentSettings ;
import org. apache. flink . table. api. Table ;
import org. apache. flink . table. api. bridge. java. StreamTableEnvironment ;
public class SqlDemo {
public static void main ( String [ ] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( ) ;
StreamTableEnvironment tableEnv = StreamTableEnvironment . create ( env) ;
DataStream < WaterSensor > sensorDs = env. fromElements (
new WaterSensor ( "s1" , 1L , 1 ) ,
new WaterSensor ( "s1" , 100L , 100 ) ,
new WaterSensor ( "s1" , 1000L , 1000 ) ,
new WaterSensor ( "s3" , 3L , 3 )
) ;
sensorDs. print ( ) ;
Table sensorTable = tableEnv. fromDataStream ( sensorDs) ;
tableEnv. createTemporaryView ( "sensorTable" , sensorTable) ;
Table resultTable = tableEnv. sqlQuery ( "select * from sensorTable where vc > 10" ) ;
DataStream < WaterSensor > waterSensorDataStream = tableEnv. toDataStream ( resultTable, WaterSensor . class ) ;
waterSensorDataStream. print ( ) ;
env. execute ( ) ;
}
}