MoEngage is an enterprise SaaS business that helps B2C brands gather valuable insights from their customer’s behavior and use these insights to set up efficient engagement and retention campaigns.
As these campaigns and insights are based on the end users’ interactions (consumers), the most critical requirement is to be able to ingest all of this demographic and interaction data and make it available to various internal services in a timely fashion.
As the business has grown over the last years, we have been fortunate to be able to help customers across the world to achieve their marketing goals. With businesses becoming increasingly digitized, our customers themselves have seen growth in their customer base at an exponential rate. They have now come to expect highly responsive applications, which can only be possible if their data can be ingested in real-time.
When ingesting data, the database becomes the biggest bottleneck. MoEngage uses MongoDB for most of its database use cases. While some databases can support higher write throughput, they’re unable to support querying using various filters as they’re primarily key-value stores. We have spent considerable time fine-tuning our clusters. Indexing, sharding, and instance right-sizing are among the many optimizations we have in place. However, these aren’t enough to ensure real-time ingestion. Thus, our applications need to be designed in order to achieve a larger scale of reads and writes.
One of the techniques that can dramatically improve the scale of writes is the use of bulk writes. However, using it is tricky as any consumer’s activity is coming on the fly, and processing its data requires access to its latest state for us to be able to do consistent updates. Thus to be able to leverage it, we need a messaging layer that allows partitioning data in a way that any given consumer’s data would always get processed by only one data processor. To do that, as well as achieve our goal of ordered data processing, we decided to opt for Kafka as a pub-sub layer. Kafka is a well-known pub-sub layer that, among other things, supports key features such as transactions, high throughput, persistent ordering, horizontal scalability, and schema registry that are vital to our ability to scale and evolve our use cases.
MoEngage’s real-time data ingestion helps brands deliver personalized messages to their customers at the right time
The next bit of insight was that in order to leverage bulk writes, rather than using the database as a source of truth, we needed a fast caching layer as a source of truth, allowing us to update our databases in bulk. Our experience with DynamoDB & ElastiCache (redis) taught us that this would be prohibitively expensive. For this reason, the caching layer that we would use would have to be an in-memory cache. This would not only lower the cost of running the cache but would lead to large gains in performance as well. The most prominent key-value store for this use case is RocksDB which can leverage both the memory of an instance as well as its disk should the amount of data stored overflow the memory.
Our decision to use RocksDB and Kafka introduces new challenges as what used to be a stateless system would now become a stateful application. Firstly, the size of this RocksDB cache would be in the order of hundreds of gigabytes per deployment, and the application leveraging it could restart due to various reasons – new feature releases, instance termination by the cloud provider, and stability issues with the application itself. The only way to reliably run our application would be to track the offsets at which we last read data from Kafka and keep that in alignment with the state of the contents of our cache. This aggregated state would need to be persisted externally to allow for recovery during planned or unplanned restarts. Above all, we would need a high level of configurability for this entire checkpoint process (frequency, the gap between checkpoints, concurrent checkpoints, etc.). Rather than building the entire solution in-house, it was more prudent to leverage existing frameworks as they would have better performance, reliability and community support. We evaluated various streaming frameworks and concluded that Apache Flink would be the one with all the features and the desired performance at our scale. At a high level, a flink job consists of one or more task managers who are responsible for executing various operators that implement the data processing requirements of the application. The job of allocating tasks to task managers, tracking their health, and triggering checkpoints is handled by a separate set of processes called job managers. Once the task managers resume data processing, any user state gets stored in a finely tuned RocksDB storage engine which gets periodically checkpointed to S3 and Zookeeper in order to facilitate graceful restarts.
After figuring out the right language, framework, and messaging layers, the time came to start building out the system and migrating all our existing features. Our ingestion layer consists of 4 steps:
As data validation and schema management aren’t really tied to any particular user but rather to a client, we decided to carve these features out as a dedicated service. Additionally, as we mentioned earlier, data can come from various sources, including mobile SDKs that we provide, data API to publish the same via the clients’ backend, third-party partners such as Segment, ad attribution services, CSV files, and internal APIs. As each of these was targeting different use cases, over the years, the implementations for ingesting data across these sources had diverged even though the ultimate goal was to update the state of users and their devices. We took this opportunity to consolidate the behavior across sources within this data validation layer and transform each of these inputs into one consolidated output stream that could serve as input to services that implement the rest of the functionality.
The most critical service is the one that deals with user & device creation as well as event processing. With data validation and API variations taken care of in the upstream layer, this service relies on the identifiers of users and devices in the consolidated payload to determine what user and device that might have been involved, which might sometimes involve creating their entries and on other occasions involve merging or even removal of existing documents. The latter can happen because, in our business domain, both users and devices can have multiple identifiers, and there is no single identifier for either that is leveraged by all input data sources. Once the entities are resolved, the next phase of this flink job is to process all the events within the payload, the processing of which can result in a change in the state of the user or the device involved. Rather than updating their states directly, it determines the change in state and publishes them to Kafka to be used by another downstream service to update entities in bulk. We are able to determine the change in state as the job relies on RocksDB as the source of truth. Thus RocksDB not only helps us cut down our database reads by more than half, but more importantly, it allows us to leverage bulk writes to databases.
The final service in our pipeline is a relatively simple service that consumes MongoDB update requests from Kafka and applies them in bulk, thereby greatly increasing the write throughput of our database clusters. With RocksDB serving as a source of truth, we can leverage full non-blocking and asynchronous I/O to do our updates which helps us greatly improve our efficiency of writes. Not only do we do more writing, but we’re able to do them with far fewer resources! We did have to spend some time building a buffering mechanism that ensures that any entity has only one update in-flight at any given time, without which the order of write operations can never be guaranteed.
MoEngage’s real-time ingestion infrastructure helps brands drive more ROI from their engagement, retention, and monetization campaigns
Splitting our ingestion layer into three different jobs helped us achieve the efficiency that we wanted, but this came at the cost of greater chances of failure. Any one of the services could go down due to a change in code or stability issues within our cloud. Checkpoints can help us avoid re-processing all of the data in our messaging layers, but it doesn’t eliminate the chance of duplicate data processing. This is why it was critical to ensure that each service was idempotent.
Reaction streaming was designed to support only a select set of write operations – set, unset, adding to a set, and removing from a set. Any client intending to use this service would need to leverage one or more of these operations. This set of four operations has one thing in common – the repeated application of any of these on a document will ultimately produce the same result.
API Unification Layer & Action Streaming both rely on Kafka transactions to ensure that even if data gets processed multiple times, it isn’t made available to downstream services until the checkpoint completes. Care is also taken to ensure that all time-based properties have stable values despite restarts and ensuring that no record older than these times ever gets re-processed.
Our system is designed to be able to run both as containerized applications in Kubernetes as well as on cloud provide virtual machines, which MoEngage has historically relied on. This is to ensure business continuity while all the kinks of our Kubernetes setup get sorted out, as well as all engineers have a sufficient understanding of it. The ability to spin up containers in milliseconds can’t be matched by virtual machines. Kubernetes manifests for workloads across the world are managed using customize, which makes it easy to avoid any sort of configuration duplication. Deployments outside of Kubernetes are managed using Terraform, Terragrunt, and CodeDeploy with in-house enhancements to make it easy to spin up new deployments, while configurations are managed using Consul. We use HOCON as the format for configuration as they allow for the easy composition of multiple configuration files into one, thereby allowing us to break configuration into small reusable chunks that can be used across deployments and for multiple services, making it easy to make large-scale changes in configurations. It also provides the ability to provide configurations in terms of units, removing any sort of ambiguity in the value of a configuration.
We implemented our system by leveraging the principle of layered architecture – business logic completely free of any infrastructure dependencies, a service layer that interacts with external systems and invokes the business logic, and finally, the splitting of this service across various Flink operators tied together by a job graph. Business logic was implemented in Java as we felt that hiring or training developers in Java would be easier while the relatively static portions of the system were written in Scala so as to leverage the benefits of Scala’s type system, ability to compose functions, error handling capabilities, and lightweight syntax. However, this decision proved to be a design blunder as we couldn’t fully leverage the best capabilities of either language.
Had we written our code entirely in Scala, we could have:
Had we written our code entirely in Java, we would have:
While Flink does offer great features that do indeed work at our scale, its feature set is often lacking in various aspects, which results in more developer effort than planned. Some of these weaknesses aren’t often well documented, if at all, while some features require quite a bit of experimentation to get things right
MoEngage constantly strives to make improvements to the platform that helps brands deliver seamless experiences to their customers
There are several enhancements that we plan to work on in the coming months, some of which are:
We set out to ensure real-time ingestion of our customers’ data at all times at a scale that exposes the flaws of the best open-source frameworks. It was a great challenge to be able to learn and become adept at multiple languages and frameworks, and we’ve thoroughly enjoyed knocking them off one by one! If you’re interested in solving similar problems, check out our current openings in the Engineering team at MoEngage!