Flink從入門到真香(十一、Sink自定義數據輸出-以寫入MySQL爲例)

目標: Flink從txt文件中讀取數據,寫入到mysql中java

環境準備: 若是沒有mysql,能夠按照下面命令安裝一下mysql

wget https://repo.mysql.com//mysql80-community-release-el7-3.noarch.rpm

yum -y install mysql80-community-release-el7-3.noarch.rpm
yum install mysql-community-server -y

systemctl restart mariadb

查看mysql默認密碼sql

[root@localhost ~]# grep 'temporary password' /var/log/mysqld.log
2020-11-04T02:05:26.432219Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: qCk4b_3iEE;V

修改mysql默認密碼數據庫

mysql -uroot -p'qCk4b_3iEE;V'

mysql> ALTER USER 'root'@'localhost' IDENTIFIED BY 'Mafei@20201104';
Query OK, 0 rows affected (0.03 sec)

容許全部主機鏈接mysqlapache

mysql> update user set host = '%' where user = 'root';

systemctl restart mysqld

建立mysql的數據庫及表api

建立test數據庫:

create database test default character set utf8mb4 collate utf8mb4_unicode_ci;

建立表

-- ----------------------------
-- Table structure for sensor_temp
-- ----------------------------
DROP TABLE IF EXISTS `sensor_temp`;
CREATE TABLE `sensor_temp` (
  `ids` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
  `temp` double(10,0) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

SET FOREIGN_KEY_CHECKS = 1;

新建一個scala object 類,JdbcSinkmaven

package com.mafei.sinktest

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}

object JdbcSink {
  def main(args: Array[String]): Unit = {
    //建立執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
    env.setParallelism(1)
    inputStream.print()

    //先轉換成樣例類類型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") //按照,分割數據,獲取結果
        SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別
      })

    dataStream.addSink(new MyJdbcSinkFunc())
    env.execute()
  }
}

class MyJdbcSinkFunc() extends RichSinkFunction[SensorReadingTest5]{

  //定義鏈接、預編譯語句
  var conn: Connection = _

  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://10.0.83.82:3306/test","root","Mafei@20201104")
    insertStmt = conn.prepareStatement("INSERT INTO `sensor_temp`(`ids`, `temp`) VALUES ( ?, ?)")
    updateStmt = conn.prepareStatement("update sensor_temp set temp= ? where ids= ? ")
  }

  override def invoke(in: SensorReadingTest5): Unit = {

    updateStmt.setDouble(1,in.temperature)
    updateStmt.setString(2,in.id)
    updateStmt.execute()
    if (updateStmt.getUpdateCount ==0){
      println("執行了插入操做。。。")
      insertStmt.setString(1,in.id)
      insertStmt.setDouble(2,in.temperature)
      insertStmt.execute()
    }

  }

  override def close(): Unit = {

    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

代碼結構及效果:

Flink從入門到真香(十一、Sink自定義數據輸出-以寫入MySQL爲例)

mysql中數據的效果圖:

Flink從入門到真香(十一、Sink自定義數據輸出-以寫入MySQL爲例)