Back to all tech blogs

How we stagger never-ending effects with Cats Effect

  • Data/ML
Our solution to pace the start of never-ending concurrent effects that are not covered by open source rate limiters; it's particularly useful for infinite streams

At Adevinta and in general, an application runs thousands of concurrent never-ending tasks, each of which is a Kafka consumer sending data to different destinations. These tasks require a warm-up period during which they are both CPU-intensive (as consumers need to catch up) and demanding on server-side resources to establish initial connections. The warm-up period typically lasts a couple of seconds.

If all these tasks were to start simultaneously, it would generate a spike load on the server-side and cause a significant strain on the application startup. Additionally, stressing the brokers during the startup period makes the overall warming-up process slower. To mitigate this issue, the application staggers the execution of tasks by controlling the number of concurrent warming-up tasks.

The following picture shows an execution timeline for running five tasks (task 1, task 2, task 3, task 4 and task 5) in a staggered fashion. Execution has been configured for a maximum of two concurrent warming up tasks during each one second period.

Execution timeline for five tasks, staggered with a limit of two concurrent warming-up tasks per second
Execution timeline for five tasks, staggered with a limit of two concurrent warming-up tasks per second

Implementation

Usage

ExampleApp is an IOApp that runs 5 concurrent never-ending tasks.

The tasks are started at a maximum rate of 2 per second.

object ExampleApp extends IOApp {


 override def run(args: List[String]): IO[ExitCode] = for {
   rateLimiter <- RateLimiter[IO](tokensPerPeriod = 2, period = 1.second) // [0]
   tasks = (1 to 5).toList.map(neverEndingTask) // [1]
   _ <- IO.parSequenceN(Int.MaxValue)(tasks.map(rateLimiter.runWhenTokenAvailable)) // [2]
 } yield ExitCode.Success


 private def neverEndingTask(i: Int): IO[Unit] = for {
   time <- IO.realTimeInstant
   _    <- IO.println(s"[$time] Task $i started")
   _    <- IO.never[Unit].onCancel(IO.println(s"Task $i cancelled"))
 } yield ()


}
Scala
  • [0] Creates a RateLimiter[IO] with a maximum amount of two tokens per second. We’ll deep dive into its implementation shortly.
  • [1] Assigns a list containing five never-ending effects to the tasks variable. Note that the effects are not run yet.
  • [2] Compose every effect in the tasks list with rateLimiter.runWhenTokenAvailable function, which will limit the number of concurrently starting tasks, and run them in parallel.

ExampleApp execution example:

[2024-07-14T10:41:06.322081Z] Task 1 started

[2024-07-14T10:41:06.322064Z] Task 4 started

[2024-07-14T10:41:07.339337Z] Task 3 started

[2024-07-14T10:41:07.339337Z] Task 2 started

[2024-07-14T10:41:08.345294Z] Task 5 started

 

Indenting logs by the execution time:

[10:41:06.322081Z] Task 1 started

[10:41:06.322064Z] Task 4 started

      [10:41:07.339337Z] Task 3 started

      [10:41:07.339337Z] Task 2 started

            [10:41:08.345294Z] Task 5 started

In the previous execution, Task 1 and Task 4 start immediately at 10:41:06, then Task 3 and Task 2 start one second later at 10:41:07. Finally, Task 5 starts at 10:41:08.

You may note that tasks are not executed in their sequence order. This is expected as all tasks are run in parallel with no order affinity. The parTraverseN documentation (underlying method used by parSequenceN) states:

*Note that the semantics of this operation aim to maximise fairness: when a spot to execute becomes available, every task has a chance to claim it, and not only the next ‘n’ tasks in ‘ta’

RateLimiter implementation

The rate limiter exposes three methods: getRatedToken, availableTokens and runWhenTokenAvailable.

Those methods are described in their corresponding Scala docs.

 

sealed abstract class RateLimiter[F[_]: Monad] {


 /** This method gets semantically blocked after [[tokensPerPeriod]] calls, which are reset every [[period]].
   *
   * It's useful when there is a need to synchronize rated access to trigger an action.
   */
 def getRatedToken: F[Unit]


 /** Returns the number of tokens currently available. Always non-negative. */
 def availableTokens: F[Long]


 /** Runs `f` after a token is acquired. */
 def runWhenTokenAvailable[A](f: F[A]): F[A] = getRatedToken >> f


}
Scala

Let’s deep dive into their implementation.

object RateLimiter {


 def apply[F[_]](tokensPerPeriod: Long, period: FiniteDuration)(implicit
     genTemporal: GenTemporal[F, Throwable]
 ): F[RateLimiter[F]] = for {
   _ <- GenTemporal[F, Throwable]
     .raiseWhen(tokensPerPeriod < 1)(new IllegalArgumentException("maxConcurrent must be greater than 0"))
   semaphore <- Semaphore[F](tokensPerPeriod)
 } yield new RateLimiter[F] {


   def availableTokens: F[Long] = semaphore.available


   def getRatedToken: F[Unit] =
     semaphore.acquire
       .guaranteeCase {
         case Outcome.Succeeded(_) => semaphore.release.delayBy(period).uncancelable.start.void
         case Outcome.Errored(_) => GenTemporal[F, Throwable].unit
         case Outcome.Canceled() => GenTemporal[F, Throwable].unit
       }
 }


}
Scala

RateLimiter implementation relies on a semaphore of tokensPerPeriod permits and making sure that acquired permits are released asynchronously after the specified period has elapsed. 

getRatedToken acquires a semaphore permit and uses MonadCancel[F].guaranteeCase to guarantee that the semaphore permit will be asynchronously released after the period time has elapsed. 

Let’s expand the implementation and analyse each line:

def getRatedToken: F[Unit] =
 semaphore.acquire
   // from [[cats.effect.kernel.MonadCancel.guaranteeCase]]: Specifies an effect that is always invoked after
   // evaluation of fa completes.
   .guaranteeCase {
     case Outcome.Succeeded(_) =>
       semaphore.release.delayBy(period) // release the permit after [[period]]
       .uncancelable // make the permit release uncancelable 
       .start // start the fiber in the background
       .void // discard the fiber outcome
     // semaphore.acquire cancellation already restores the permits in case of interruption.
     // from [[cats.effect.std.Semaphore.acquire]] docs: semaphore.acquire method is interruptible, and in case of
     // interruption it will take care of restoring any permits it has acquired. If it does succeed however,
     // managing permits correctly is the user's responsibility
     case Outcome.Errored(_) => GenTemporal[F, Throwable].unit
     case Outcome.Canceled() => GenTemporal[F, Throwable].unit
   }


Scala

availableTokens returns the number of available semaphore permits. 

runWhenTokenAvailable gets a token before executing the provided effect. 

Is this a typical rate limiter?

Rate limiters are focused on restricting the number of effects running at a given time to guarantee throttling. In these scenarios the restricted effect is assumed to end at some point. In our use case, concurrency is controlled during the warm-up period when the tasks are submitted. However, there is no concurrency control after that period ends.

For example, Cats Backpressure[F] rate-limits the number of concurrent effects. This is achieved with either semantically blocking until backpressure is alleviated (lossless configuration) or discarding the effects once the backpressure state is reached (lossy configuration). In the presence of backpressure, the algorithm would run the supplied effect only when another one has finished.

In the case of upperbound (Typelevel ecosystem library for interval-based rate limiter), a Limiter allows the following configuration:

  • minInterval: The minimum guaranteed interval between job executions (/job/: effect executed through the Limiter.submit interface). If the duration of some jobs is longer than minInterval, multiple jobs will be started concurrently. The specified interval between jobs is a minimum interval and it could be longer if the maxConcurrent bound is hit.
  • maxConcurrent: The Limiter will stop executing new jobs when reaching maxConcurrent running jobs.
  • maxQueued: Number of enqueued jobs waiting to be executed. Once this number is reached, submitting new jobs will immediately fail.

Additionally, Limiter.submit will semantically block until the effect is allowed to be executed, returning the result synchronously.

Setting minInterval to tokensPerPeriod rate (tokensPerPeriod/ period) and maxConcurrent value to Int.MaxValue could help pacing the start of the tasks. However, Limiter.submit synchronous method would need to be wrapped into an async method.

See upperbound implementation for more detail.

Is it a token bucket implementation?

Conceptually, it’s a specific implementation of the token bucket algorithm where the bucket contains a fixed maximum number of tokens. Running an effect consumes one token and tokens are always withheld for a predefined duration regardless of the effect outcome.

Source code 

Checkout a project example in the rodrigo-molina/stagger GitHub project. 

 

Final words

Cats Effect library provides effective synchronisation interfaces. A couple of lines of code using Cats Effects can achieve hundreds of vanilla Scala ones. On the other hand, understanding cancellation semantics and specific guarantees need special care to avoid concurrency nightmares.

Be brave to roll up your sleeves and get your hands dirty when encountering unconventional use cases. This article serves as an example of that.

Related techblogs

Discover all techblogs

From ten to one: Streamlining experimentation tooling at Adevinta

Read more about From ten to one: Streamlining experimentation tooling at Adevinta

The Power of a Data Enablers Team: A Bridge between Business and Platform

Read more about The Power of a Data Enablers Team: A Bridge between Business and Platform
The Power of a Data Enablers Team: A Bridge between Business and Platform

Adevinta’s Machine Learning Golden Path

Read more about Adevinta’s Machine Learning Golden Path
How we built a gold-standard for building and deploying machine learning models across Adevinta