» Home » Flink Job Listener: Run a task After Flink Job is Completed

Flink Job Listener: Run a task After Flink Job is Completed

by Imran Shaikh
1266 views
Flink Job Listener: Run a task After Flink Job is Completed

Hey, Tea Lovers! This post is about how you can do some tasks after the Flink Job is completed or submitted. To do this we will be using Flink Job Listener. There can be many scenarios where you want to do some additional processing after the Flink job is completed. It can be a notification push, some update in the database, or some kind of log you want to do. It’s up to the requirement. Not only post job completion, but we sometimes also need to do something after Job is submitted to the Flink Cluster.

Since, Flink is executed in a different thread, determining when the job is completed via normal code is not doable. Therefore, Flink has a Listener class, which gets called after Job submission and completion. It’s an interface, so we need to implement and register it with the Flink Environment, either Execution or Stream.

Requirment to Understand Flink Job Listener

You don’t need to be an expert in it. But you should know the basics about the Flink and its basic inner working. The knowledge about how this Flink Job works will be a great help to clear any doubt you will experience during this post job task or job listener. To get started with Flink you can see the ho to install Flink on Mac, Windows, or Linux.

For this example, I will be using Flink 1.12 but it works with any of the Flink versions. I will show you how you can register and how it works. The example would be pretty straightforward, but it can be done in many ways, as I said earlier. Such as, a database call, some simple log or calling an API, or push notifications. Anything that you can do via normal code. Flink will simply call the function and it gets executed.

Flink Job Listener Interface

All you need is the implementation of this interface. And write your logic in the overridden functions. Flink JobListener interface has two methods, one is for submission and another one is for completion. The definition of the interface is as follows.

interface JobListener {
  void onJobSubmitted(@Nullable JobClient var1, @Nullable Throwable var2);

  void onJobExecuted(@Nullable JobExecutionResult var1, @Nullable Throwable var2);
}

Both the methods have two parameters. One common parameter is Throwable. The Throwable parameter is the error thrown by the process if any. Let’s look at both of the methods individually.

Flink JobListener interface: onJobSubmitted

As the name suggests the code inside this function is called whenever a job is submitted for execution, the onJobSubmitted method gets called. Suppose you submitted the job and each time a new job is submitted, you need to get notified via Email that Job has been submitted. You can do so in this function. Simply write the code of Email sending or call an API that does that, and you will be notified, as simple as that.

@Override
public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
  if (throwable != null) {
    log.error("Job failed to submit.", throwable);
    // do something or notify about failed job submission
    return;
  }

  log.info("Job submitted successfully");
  // do something
  // push notification
  // or Call an API
  // or Insert something in DB
}

Flink Job Listener: onJobExecuted

The onJobExecuted is the function that gets called when the job is executed, that is when it finishes, either successfully or throws an error. Sometimes we may want to get an Email (similar to the submission example). Or we need to log the details about the job into the DB such as start time, end time, the input was taken, and many more. We can do so in this function. Whenever a job is completed it calls the registers JobListener onJobExecuted method. Similar to onJobSubmitted it has Throwable parameter. It gets set whenever the job throws an error and the job gets completed or terminated abnormally.

@Override
public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
  if (throwable != null) {
    log.error("Job failed to finish", throwable);
    return;
  }
  log.info("Job completed successfully");
  try (
          // get DB conneciton
          Connection connection = DriverManager.getConnection("url-to-db")
  ) {
    // do something
    // select or insert query
  } catch (SQLException e) {
    log.error("Error in communication DB", e);
  }
}

You may notice the completion task triggers before the Flink job completly shuts down. Its because its a different thread, and your code triggers when Job is completed (pipeline). But there are things Flink has to do such as shutting down the cluster (local).

Note

Complete Example of Flink Post Job Tasks

The example below shows a simple Flink batch process. In this, we are using ExecutionEnvironment and registering a JobListener. This JobListener simply logs if the submission or completion is successful or not. I commented that you can do even more, but for simplicity let’s just work with log only.

The main job or pipeline is to take a list of integers, multiply them by 2 and store them in a temporary file, that’s it. The output is generated in the temporary folder of the system with flink-job-listener- as a prefix.

You can download the code via GitHub. The full project is Flink Tutorial or the Main class of this post

log.info("Starting the JobListener Example Code");
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

String outPut = File.createTempFile("1", "2").getParent();
log.info("Output will be store at folder {}", outPut);

List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
env.fromCollection(integers)
    .map(x -> x * 2) // multiply numbers by 2
    .writeAsText(outPut + "/flink-job-listener-" + UUID.randomUUID());
    
log.info("Registering the JobListener");
env.registerJobListener(new JobListener() {
  @Override
  public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
    if (throwable != null) {
      log.error("Job failed to submit", throwable);
      return;
    }
    log.info("Job submitted successfully");
    // do something
    // push notification
    // or Call an API
    // or Insert something in DB
  }

  @Override
  public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
    if (throwable != null) {
      log.error("Job failed to finish ", throwable);
      return;
    }
    log.info("Job completed successfully");
    // do something
    // push notification
    // or Call an API
    // or Insert something in DB
  }
});
    
env.execute();

Conclusion

That’s it for this post. I hope it has helped you determine the flow of your Flink project. How you can do something after your Flink Job is completed. Or How to do something after Job is submitted. This post is to help Flink users give some help on the topic. You can read my previous post on Flink of How to Select Specific Folders or Files As Input in Flink or the other post under apache flink.

The full project is on GitHub. See you in the next post.

HAKUNA MATATA!!!

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

This website uses cookies to improve your experience. We'll assume you're ok with this, but you can opt-out if you wish. Accept Read More

Privacy & Cookies Policy
0
Would love your thoughts, please comment.x
()
x