What do you do at Continuuity, again? Part 2

Feb 4 2014, 8:54 am

Alex Baranau is a software engineer at Continuuity where he is responsible for building and designing software fueling the next generation of Big Data applications. Alex is a contributor to HBase and Flume, and has created several open-sourced projects. He also writes frequently about Big Data technologies.

Let’s make it awesome!

In our previous post, we introduced the basics of our Continuuity platform by using a simple example of a real-time processing application. In this post, we’ll take a step forward and introduce you to the details of building a non-trivial, real-time, data processing application.

Efficient computation

When scaling up your computations, simply expanding the number of processing units, each performing the same work, is usually not enough. Computations consist of diverse parts, each requiring its own scaling approach. This becomes even more obvious when you add the interactions with external systems as a necessary step of the computation logic.

Continuuity Reactor’s real-time data processing framework, BigFlow, is very flexible and rich in features, allowing you to develop an efficient computational pipeline. Without further ado, let’s dive into the design methodology.

Split processing into multiple steps

One obvious change to our earlier example is splitting the logic of detecting active users and reporting those users to the third-party ContentSystem. There are many reasons to make this change, some of them are the ability to tolerate unpredictable behavior in a third-party system with slow response or downtime and the flexibility to apply different scaling logic to different computing parts.

Separating out detection of active users (from reporting) in a flowlet will make the flowlet a re-usable component. We may want to make it even more generic, by separating the parsing Stream events, and supporting different formats of input data from diverse systems - it would be great to allow the following:

Note that the data transfer between flowlets must be reliable, and withstand any failures or issues including third-party system unavailablity or slowness.

Use in-memory cache per flowlet instance

Another great optimization would be to cache “detected active users” in a non-shared, in-memory data structure i.e. a non-shared in-memory cache per flowlet instance that provides the same transactional guarantees as persistent data structures - we don’t want to end up with an inconsistent cache at any time to avoid missing a single active user.

Configure partitioning of data between flowlet instances

When using a non-shared, in-memory cache, it is important to do direct processing of the same user in the same flowlet instance. It is easy to do so when you have only one flowlet instance, but when your use-case requires scaling computation to multiple flowlet instances, the incoming data should be partitioned accordingly. In our example, it makes sense to use hash-partitioning by user name. The system should be able to handle any number of flowlet instances at run-time by distributing the load evenly between them at any point in time.

Buffer events to increase thoughput

Based on the computation logic, we may want to process events in small packets to increase the throughput of the real-time, stream processing system. This means we want to consume more than one event per process method invocation i.e. per transaction, when there are lots of elements “awaiting to be consumed”. We don’t want to compromise the real-time aspect of the processing - the flowlet should consume more than one event, only if there are multiple elements that can be consumed. For the same reason (and because of other limitations like memory), we also want to setup a limit on the maximum number of events that can be consumed at once.

In this example, it makes sense to use batching in both scenarios, when detecting active users and when reporting active users to ContentService. The latter will help us save the potentially expensive RPC calls to the third-party system by batching users together.

Application design

Let’s put it all together to see what needs to be done:

Seems like a lot of work? Not really, when you apply the right tool!

Reactor Application

Below is the source code (yes! less than 100 lines of code) of the application implemented, according to our updated design:

  public class ActiveUsersApp implements Application {
    @Override
    public ApplicationSpecification configure() {
      return ApplicationSpecification.Builder.with()
        .setName("ActiveUsersApp").setDescription("Tracks active users")
        .withStreams().add(new Stream("pageViews"))
        .withDataSets().add(new Table("userActivity"))
                      .add(new MemoryTable("activeUsersCache"))
        .withFlows().add(new ActiveUsersFlow())
        .noProcedure().noMapReduce().noWorkflow().build();
    }
  }

  class ActiveUsersFlow implements Flow {
    @Override
    public FlowSpecification configure() {
      return FlowSpecification.Builder.with()
        .setName("ActiveUsersFlow").setDescription("Tracks active users by page views")
        .withFlowlets().add("parser", new Parser(), 2)
                      .add("filter", new ActiveUsersFilter(), 4)
                      .add("reporter", new ActiveUsersReporter(), 2)
        .connect().fromStream("pageViews").to("parser")
                  .from("parser").to("filter")
                  .from("filter").to("reporter").build();
    }

    class User {
      private final String name;
      public User(String name) { this.name = name; }
      public String getName() { return name; }
    }

    class Parser extends AbstractFlowlet {
      private OutputEmitter<User> output;

      @ProcessInput
      public void processPageView(StreamEvent event) {
        String userName = event.getHeaders().get("user");
        output.emit(new User(userName), "userName", userName);
      }
    }

    class ActiveUsersFilter extends AbstractFlowlet {
      @UseDataSet("activeUsersCache")
      private MemoryTable cache;

      @UseDataSet("userActivity")
      private Table userActivity;

      private OutputEmitter<User> output;

      @Batch(100)
      @HashPartition("userName")
      @ProcessInput
      public void process(Iterator<User> users) {
        while (users.hasNext()) {
          User user = users.next();
          if (cache.get(new Get(user.getName())).isEmpty()) {
            if (isNewActiveUser(user.getName())) {
              cache.put(new Put(user.getName()));
              output.emit(user);
            }
          }
        }
      }

      private boolean isNewActiveUser(String user) {
        int pageViews = userActivity.get(new Get(user)).getInt("pageViews", 0);

        if (pageViews >= 100) {
          return false;
        }

        pageViews++;
        userActivity.put(new Put(user, "pageViews", pageViews));

        return pageViews == 100;
      }
    }

    class ActiveUsersReporter extends AbstractFlowlet {
      @Batch(100)
      @ProcessInput
      public void processPageView(Iterator<User> users) {
        ContentSystem.addNewActiveUsers(users);
      }
    }
  }

Though the code is simple, let us highlight some of the new concepts:

  • ActiveUsersFlow consists of three flowlets connected together -parser, filter, and reporter.

  • User is a simple java class that defines the type of transferred elements between flowlets - the framework will do all the serde magic for you and even more.

  • OutputEmitter typed output for your flowlet. You can have many of those with different types, and the framework will do all the logistics for you.

  • @HashPartition declares partitioning strategy to use when distributing data across multiple flowlet instances (note that the partition key value is emitted by preceding flowlet). You can choose between FIFO (default), @RoundRobin and @HashPartition at the time of writing this post.

  • @Batch gives a hint to process events in small packets if more than one is available for consuming - the process method accepts an Iterator in this case.

  • MemoryTable is an in-memory, transactional table that implements caching and other optimizations.

Conclusion

Doing computation efficiently is hard. Processing massive amounts of data in a distributed way efficiently, is even harder. Using the right tools helps a lot. And that is exactly what we are doing at Continuuity - building the right tool for your big data application.

0 notes

blog comments powered by Disqus