Pubsub stream processing with dataflow

  • date 31st May, 2021 |
  • by Prwatech |
  • 0 Comments

Prerequisites

GCP account

Open console.

Open Console. Click on Activate cloud shell

NB: If any permission error arises, give owner permission for the default service account

Create variables for your bucket, project, pubsub topic and region. Select a Dataflow region close to where you run the commands

$ BUCKET_NAME=your-bucket-name
$ PROJECT_ID=(gcloud config get-value project)
$ TOPIC_ID=your-topic-id
$ REGION=dataflow-region

Create a Cloud Scheduler job in this project. 

$ gcloud scheduler jobs create pubsub publisher-job \

–schedule=”* * * * *” \

–topic=$TOPIC_ID –message-body=”Hello!”

Start the job.

$ gcloud scheduler jobs run publisher-job

Use the following commands to clone the quickstart repository and navigate to the sample code directory:

$ git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

$ cd python-docs-samples/pubsub/streaming-analytics

$ pip3 install -r requirements.txt  

NB : If error arises as apache beam version is not available, open requirements.txt file (nano requirements.txt) and change version to 2.24.0

Open File

$ nano PubSubToGCS.py

paste the below code

# Copyright 2019 Google LLC.

#

# Licensed under the Apache License, Version 2.0 (the “License”);

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at

#

#       http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an “AS IS” BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

# [START pubsub_to_gcs]

import argparse

from datetime import datetime

import logging

import random

from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys

from apache_beam.options.pipeline_options import PipelineOptions

from apache_beam.transforms.window import FixedWindows

class GroupMessagesByFixedWindows(PTransform):

    “””A composite transform that groups Pub/Sub messages based on publish time

    and outputs a list of tuples, each containing a message and its publish time.

    “””

    def __init__(self, window_size, num_shards=5):

        # Set window size to 60 seconds.

        self.window_size = int(window_size * 60)

        self.num_shards = num_shards

    def expand(self, pcoll):

        return (

            pcoll

            # Bind window info to each element using element timestamp (or publish time).

            | “Window into fixed intervals”

            >> WindowInto(FixedWindows(self.window_size))

            | “Add timestamp to windowed elements” >> ParDo(AddTimestamp())

            # Assign a random key to each windowed element based on the number of shards.

            | “Add key” >> WithKeys(lambda _: random.randint(0, self.num_shards – 1))

            # Group windowed elements by key. All the elements in the same window must fit

            # memory for this. If not, you need to use `beam.util.BatchElements`.

            | “Group by key” >> GroupByKey()

        )

class AddTimestamp(DoFn):

    def process(self, element, publish_time=DoFn.TimestampParam):

        “””Processes each windowed element by extracting the message body and its

        publish time into a tuple.

        “””

        yield (

            element.decode(“utf-8”),

            datetime.utcfromtimestamp(float(publish_time)).strftime(

                “%Y-%m-%d %H:%M:%S.%f”

            ),

        )

class WriteToGCS(DoFn):

    def __init__(self, output_path):

        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):

        “””Write messages in a batch to Google Cloud Storage.”””

        ts_format = “%H:%M”

        window_start = window.start.to_utc_datetime().strftime(ts_format)

        window_end = window.end.to_utc_datetime().strftime(ts_format)

        shard_id, batch = key_value

        filename = “-“.join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode=”w”) as f:

            for message_body, publish_time in batch:

                f.write(f”{message_body},{publish_time}”.encode(“utf-8”))

def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):

    # Set `save_main_session` to True so DoFns can access globally imported modules.

    pipeline_options = PipelineOptions(

        pipeline_args, streaming=True, save_main_session=True

    )

    with Pipeline(options=pipeline_options) as pipeline:

        (

            pipeline

            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam

            # binds the publish time returned by the Pub/Sub server for each message

            # to the element’s timestamp parameter, accessible via `DoFn.TimestampParam`.

            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub

            | “Read from Pub/Sub” >> io.ReadFromPubSub(topic=input_topic)

            | “Window into” >> GroupMessagesByFixedWindows(window_size, num_shards)

            | “Write to GCS” >> ParDo(WriteToGCS(output_path))

        )

if __name__ == “__main__”:

    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()

    parser.add_argument(

        “–input_topic”,

        help=”The Cloud Pub/Sub topic to read from.”

        ‘”projects/<PROJECT_ID>/topics/<TOPIC_ID>”.’,

    )

    parser.add_argument(

        “–window_size”,

        type=float,

        default=1.0,

        help=”Output file’s window size in minutes.”,

    )

    parser.add_argument(

        “–output_path”,

        help=”Path of the output GCS file including the prefix.”,

    )

    parser.add_argument(

        “–num_shards”,

        type=int,

        default=5,

        help=”Number of shards to use when writing windowed elements to GCS.”,

    )

    known_args, pipeline_args = parser.parse_known_args()

    run(

        known_args.input_topic,

        known_args.output_path,

        known_args.window_size,

        known_args.num_shards,

        pipeline_args,

    )

# [END pubsub_to_gcs]

To save and exit press Ctrl+ x then press Y to confirm then press Enter.

Paste the below code in shell.

$ python3 PubSubToGCS.py \

  –project=$PROJECT_ID \

  –region=$REGION \

  –input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \

  –output_path=gs://$BUCKET_NAME/samples/output \

  –runner=DataflowRunner \

  –window_size=2 \

  –num_shards=2 \

  –temp_location=gs://$BUCKET_NAME/temp

In console, Open Menu > Dataflow.

You can see the Job progress in Dataflow Console

Wait little bit time and check the bucket. It will have samples folder and inside that the output will be created. Or

$ gsutil ls gs://${BUCKET_NAME}/samples/

It will display the output created

Delete the Cloud Scheduler job.

$ gcloud scheduler jobs delete publisher-job

Quick Support

image image