Wednesday, November 9, 2016

20+ Questions You Want To Ask From A Stream Processing Framework

About a year ago I started wondering around in the world of stream processing. Having been what I would call an "average" Java developer, building plain-old Java applications, libraries, desktop and web-based stuff, the transition to the "big data"-ish stack of technologies was definitely an eye-opener. In both positive and sometimes negative ways.

In this post, I would like to present you a number of questions you might find useful to ask about the next stream processing framework you are about to adopt. There are also some answers here, based on my personal (and my colleagues') experiences taking the first steps with Apache Flink and Apache Spark Streaming. Obviously, the questions selected here and answers to many of these are highly subjective and may not reflect your exact needs or previous experience. Nevertheless, if you are someone who has just recently started gazing upon the big data landscape and found (near real time) stream processing to be of some interest, you might also be able to take advantage of the experiences here.

The questions are divided into five portions, based roughly on (some kind of) a lifecycle of an application, from building to monitoring it in its working environment.

Coding

Where do I start from?

Spark users might grab the Spark Streaming Programming Guide and go through some examples. There's also a quick start guide available from Scala, Python and Java. Flink users can have a look at the Basic API Concepts and the DataStream API Programming Guide. The Maven quickstart example is also a great way to set up your first Flink program.

Where does my code run? How does it get there?

In both Spark and Flink, your code runs either on the "master" node (Spark Driver / Flink JobManager) or one of the "worker" nodes (Spark Worker / Flink TaskManager). The code that sets up your processing topology, is usually executed on the master node, while the topology itself is serialized and transported to worker nodes for execution.

Where do I get my data from?

From an implementation of a Receiver (Spark data source) or a SourceFunction (Flink data source). Many of those are available out of the box or as extensions e.g. for reading from file systems, sockets, Kafka, Flume, Kinesis etc.

Nevertheless, you can always implement a custom source. The approach is similar for both Spark (custom receivers) and Flink - you have to implement a receiver function running on a thread that waits for incoming data and pushes the received data into the stream upon arrival.

If streams are unbounded, how can my streaming operations reach a result?

There aren't many operations that can be directly applied onto a direct stream, usually some sort of windowing is needed on the data. Spark Streaming applies windows to your data automatically, dividing it by arrival time into batches of determined length provided during the setup of your processing topology. In Flink, you need to explicitly specify window operations to be applied on your data.

How can I make my design modular?

  • Extract complex functions into separate classes.
  • Separate parts of your topology into factory functions. You can use IoC containers, such as Google Guice or Spring IoC to wire these functions together. This allows you to test subsections of your topology separately.

How can I prepare for and recover from failures?

  • Do not rely on local state (static variables, cache, etc), use the state management facilities provided by Spark / Flink.
  • In general, try to avoid recovery into a new version of your application, if possible. Especially, if a new version changes in topology.
  • In Flink, learn about Savepoints and Fault Tolerance to make sure your code is upgradable, in addition to being recoverable.
  • In Spark, have a look at the fault-tolerance semantics described in the programming manual. 

How can I manage state?

In Spark, you need to use updateStateByKey or mapWithState functions to map the previous state of your stream to its new state. Your state is kept directly in RDD's produced by your topology.

In Flink, you can implement stateful functions (both source and transform functions). The State Backend is responsible for storing and restoring the state stored by your stateful functions.

Testing


What are my “units” for unit testing?

I would recommend you to separate processing components (i.e. the functions that act upon data) and the processing topology (i.e. the pipeline) into separate classes / components as much as you can. Although defining your whole stream processing pipeline in one place makes it easier to grasp, it doesn't really give you any benefits when building automated tests. Also, if you separate your processing pipeline into sub-components, you should be able to provide test inputs to each of these separately, without the need to actually involve real input / output operators. This applies to both Spark and Flink.

Can I include the surrounding framework into my tests?

In both Spark and Flink, yes you can (to some extent). For example, you should be able to:

  1. Build streams from collections of objects, to facilitate data-driven testing.
  2. Build ad-hoc stream sources and outputs to test the endpoints of your topology.
  3. Run your whole topology in "local mode", without the need to deploy it to a cluster.

How can I test the distributed nature of my application?

Both Spark and Flink allow you to run your processing topology in local mode, with varying degrees of parallelism. Nevertheless, this usually happens in one JVM and certain patterns that are incompatible with the distributed nature of stream processing, can go unnoticed. For example:

  • Static variables (e.g. static singletons) do not behave the same way in local and distributed cluster.
  • Some serialization and deserialization support could be missed in case of local mode in Spark. When running locally, Flink is usually able to catch unserializable object graphs in local mode more reliably than Spark.
As always, the safest way to make sure you cover potential edge cases with cluster-based execution is still just to set up a cluster of JVMs during integration tests and run your integration tests against that setup.

Do I have to test the framework itself as well?

Luckily, not too much in case of Spark Streaming and Flink. Both usually run as intended. Just make sure you are careful with experimental portions of the APIs (usually annotated or documented as such). Also, take note that there are many streaming operations that run in-memory for efficiency's sake - so make sure you monitor performance-critical sections of your code well and don't expect any miracles from the framework when dealing with copious amounts of data.

Building


How do I include third-party dependencies?

Using either Maven Shade / Assembly plugin to create an uber-jar or by providing these separately through a shared file system. You can trade off between quick deployments and self-containment here. It seems that the recommended approach is to build one big artifact containing everything except Spark / Flink itself and deploy that.

How do I isolate my code and dependencies?

That can be a challenge in both Spark and Flink. In both cases, your code runs on a JVM together with dependencies included with the framework itself. There's not much built-in modularity support either. Your code (JAR) gets loaded by a separate class loader, with the framework's main class loader as the parent, so the parent's dependencies have most likely already been loaded.

Leaving options like OSGi aside (not because it would be a bad idea, just don't have any personal experience with joining OSGi and Spark / Flink), you can use Maven's Shade plugin to move your required dependencies to custom packages. That is, if the versions of your dependencies don't match with what Flink and Spark require.

One big artifact or many small ones?

With Flink, your best bet is to create one big uber-jar for deployment, this allows for easy deployment also from the JobManager UI. It is possible to add your dependencies via a custom classpath, but this requires the additional artifacts to be available on a shared path (e.g. network share). With Spark, the command-line tools allow for more flexibility. For example, in case of YARN deployment, the CLI tools allow for additional artifacts to be uploaded and shared with the entire cluster automatically. Still, building a one big jar tends to be less error-prone.

Deploying


Do I need to deploy any kind of middleware?

You usually need to deploy the framework itself. Both Spark and Flink support standalone clustered mode, YARN and cloud deployments. That is, your programs do not run in isolation, but rather on Spark / Flink nodes that must be configured and deployed prior to the deployment of your application.

Does my application share its sandbox with others?

Depends on how you deploy your application. In Flink, task managers have 1 task slot per manager by default, which means that your application's code runs in a separate JVM. However, this does not need to be the case, you can configure task managers to run multiple slots, in which case your code is separated from others through class loader isolation only. Have a look at Cluster Execution.

In Spark, executors get their own processes which stay up for the duration of the whole application, i.e. tasks from different applications always run in different JVMs. See Cluster Mode Overview for more details.

Which deployment tools do I have at my disposal?

Out of the box, shell scripts. Flink Job Manager also provides a REST API and an HTML front end for simple uploads.

To deploy the cluster itself, you can use tools like Apache Ambari for more convenient set-up and configuration management.

What kind of infrastructure do I need?

Nothing too spectacular. Standalone clusters need some hardware and software able to run Java on. Common cloud providers are usually supported as well. For example, Amazon EMR for Flink and Spark.

Monitoring


What are the metrics I could monitor?

The most crucial metric tends to be the back pressure from downstream, when incoming data overwhelms a worker node and the limited throughput of your stream results in the buildup of the queue of unprocessed input. This is especially critical if your input source does not support throttling. There are no direct metrics for back pressure available, however – so you might need to implement your own.

At any rate, there are some great sources of information about monitoring in Spark and Flink available online.

Which tools could (or should) I use?

JMX, Graphite and other "usual suspects" work fine. Ganglia support is also available for Flink out of the box, a custom Spark build is required due to licensing restrictions (more details here).
It should be noted though, that getting the best out of monitoring due to the distributed nature of your application can be tricky. Also, since built-in metrics do not necessarily cover all your needs, you will probably need to add your own metrics. Unfortunately, there aren't really good implementation patterns for such purpose specified in neither Flink nor Spark.

How should I handle the system log?

If you are using a resource manager (e.g. YARN), you can use its history server and related log management features out of the box. But in any case, you need to collect and store the log on your own. There isn't anything built in for this purpose, other than what Log4J / SLF4J implementations provide. Note that Spark used to be built on top of Log4J, so in order to use different logging implementations require a bit more work to set up.