Google Cloud Storage

Google Cloud storage (GCS) provides cloud storage for a variety of use cases. You can use it for reading and writing data, and for checkpoint storage when using FileSystemCheckpointStorage) with the streaming state backends.

You can use GCS objects like regular files by specifying paths in the following format:

  1. gs://<your-bucket>/<endpoint>

The endpoint can either be a single file or a directory, for example:

  1. // Read from GSC bucket
  2. env.readTextFile("gs://<bucket>/<endpoint>");
  3. // Write to GCS bucket
  4. stream.writeAsText("gs://<bucket>/<endpoint>");
  5. // Use GCS as checkpoint storage
  6. env.getCheckpointConfig().setCheckpointStorage("gs://<bucket>/<endpoint>");

Libraries

You must include the following jars in Flink’s lib directory to connect Flink with gcs:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-shaded-hadoop2-uber</artifactId>
  4. <version>${flink.shared_hadoop_latest_version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.google.cloud.bigdataoss</groupId>
  8. <artifactId>gcs-connector</artifactId>
  9. <version>hadoop2-2.2.0</version>
  10. </dependency>

We have tested with flink-shared-hadoop2-uber version >= 2.8.5-1.8.3. You can track the latest version of the gcs-connector hadoop 2.

Authentication to access GCS

Most operations on GCS require authentication. Please see the documentation on Google Cloud Storage authentication for more information.

You can use the following method for authentication

  • Configure via core-site.xml You would need to add the following properties to core-site.xml

    1. <configuration>
    2. <property>
    3. <name>google.cloud.auth.service.account.enable</name>
    4. <value>true</value>
    5. </property>
    6. <property>
    7. <name>google.cloud.auth.service.account.json.keyfile</name>
    8. <value><PATH TO GOOGLE AUTHENTICATION JSON></value>
    9. </property>
    10. </configuration>

    You would need to add the following to flink-conf.yaml

    1. flinkConfiguration:
    2. fs.hdfs.hadoopconf: <DIRECTORY PATH WHERE core-site.xml IS SAVED>
  • You can provide the necessary key via the GOOGLE_APPLICATION_CREDENTIALS environment variable.