▪ 客户端配置。
10.3.5 Stream SQL Join 程序
10.3.5.1 场景说明
● 业务2的数据通过socket接收消息记录,可使用netcat命令用户输入模拟数据源。
– 使用Linux命令netcat -l -p <port>,启动一个简易的文本服务器。
– 启动应用程序连接netcat监听的port成功后,向netcat终端输入数据信息。
开发思路
1. 启动Flink Kafka Producer应用向Kafka发送数据。
2. 启动Flink Kafka Consumer应用从Kafka接收数据,构造Table1,保证topic与 producer一致。
3. 从soket中读取数据,构造Table2。
4. 使用Flink SQL对Table1和Table2进行联合查询,并进行打印。
10.3.5.2 Java 样例代码 功能介绍
在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。
代码样例
用户在开发前需要使用对接安全模式的Kafka,则需要引入FusionInsight的kafka-clients-*.jar,该jar包可在Kafka客户端目录下获取。
下面列出producer和consumer,以及Flink Stream SQL Join使用主要逻辑代码作为演 示。
完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和 com.huawei.bigdata.flink.examples.SqlJoinWithSocket
1. 每秒钟往Kafka中生产一条用户信息,用户信息有姓名、年龄、性别组成。
//producer代码
public class WriteIntoKafka {
public static void main(String[] args) throws Exception { // 打印出执行flink run的参考命令
System.out.println("use command as: ");
System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092");
System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");
System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar topic topic-test -bootstrap.servers 10.91.8.218:9093 security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL sasl.kerberos.service.name kafka ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
System.out.println("******************************************************************************************");
应用开发指南 10 Flink 开发指南(安全模式)
System.out.println("<topic> is the kafka topic name");
System.out.println("<bootstrap.servers> is the ip:port list of brokers");
System.out.println("******************************************************************************************");
// 构造执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并发度 env.setParallelism(1);
// 解析运行参数
ParameterTool paraTool = ParameterTool.fromArgs(args);
// 构造流图,将自定义Source生成的数据写入Kafka
DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
FlinkKafkaProducer010 producer = new FlinkKafkaProducer010<>(new FlinkKafkaProducer010<>(paraTool.get("topic"),
new SimpleStringSchema(), paraTool.getProperties()));
messageStream.addSink(producer);
// 调用execute触发执行 env.execute();
}
// 自定义Source,每隔1s持续产生消息
public static class SimpleStringGenerator implements SourceFunction<String> {
static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"};
static final String[] SEX = {"MALE", "FEMALE"};
public void run(SourceContext<String> ctx) throws Exception { while (running) {
} }
2. 生成Table1和Table2,并使用Join对Table1和Table2进行联合查询,打印输出结 果。public class SqlJoinWithSocket {
public static void main(String[] args) throws Exception{
final String hostname;
final int port;
System.out.println("use command as: ");
System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" + " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port xxx");
System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" + " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"
+ "--hostname xxx.xxx.xxx.xxx --port xxx");
System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" + " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks "
+ "--ssl.truststore.password huawei --hostname xxx.xxx.xxx.xxx --port xxx");
System.out.println("******************************************************************************************");
System.out.println("<topic> is the kafka topic name");
System.out.println("<bootstrap.servers> is the ip:port list of brokers");
System.out.println("******************************************************************************************");
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " +
"--hostname <hostname> --port <port>', where hostname (localhost by default) " +
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
//基于EventTime进行处理
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
ParameterTool paraTool = ParameterTool.fromArgs(args);
//Stream1,从Kafka中读取数据
DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"),
new SimpleStringSchema(),
paraTool.getProperties())).map(new MapFunction<String, Tuple3<String, String, String>>() { @Override
应用开发指南 10 Flink 开发指南(安全模式)
public Tuple3<String, String, String> map(String s) throws Exception {
tableEnv.registerDataStream("Table1", kafkaStream, "name, age, sexy, proctime.proctime");
//Stream2,从Socket中读取数据
DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port,
"\n").
tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime");
//执行SQL Join进行联合查询
Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime
\n" +
"FROM Table1 AS t1\n" + "JOIN Table2 AS t2\n" + "ON t1.name = t2.name\n" +
"AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND");
//将查询结果转换为Stream,并打印输出
tableEnv.toAppendStream(result, Row.class).print();
env.execute();
步骤1 在IntelliJ IDEA中,在生成Jar包之前配置工程的Artifacts信息。
1. 在IDEA主页面,选择“File > Project Structures...”进入“Project Structure”页 面。
应用开发指南 10 Flink 开发指南(安全模式)