Полезные техники Akka

Я использую Akka в течение 3 лет в различных проектах и теперь, я с трудом представляю себе взаимодействие с некоторыми из частей моей работы без нее. Конечно, Scala предоставляет другие мощные парадигмы для работы с параллелизмом, но я нахожу, Акторы, одной из самых элегантных концепций, когда дело доходит до рассуждений о ней.

Есть несколько техник, которые я многократно использовал в проектах, и которыми я хочу поделиться. Однако внимательнее к ошибкам - я не считаю себя экспертом Akka, поэтому некоторые из них могут оказаться неоптимальными техниками или даже анти-паттернами - используйте их на свой страх и риск, и убедитесь, что вы понимаете их ограничения. Кроме того, если вы еще этого не сделали, я всеми средствами рекомендовал бы сесть со свежей чашкой кофе и прочесть замечательную документацию Akka.

Часы с кукушкой

Я использую эту технику в основном в проектах с фреймворком Play. Первая версия фреймворка имела механизм для многократного выполнения задач, который был удален в версии 2 с рекомендацией использовать планировщики Akka.

Основная идея этой техники заключается в инкапсуляции логики задачи, которая должна быть выполнена многократно или в определенное время, в одном акторе, и пробуждении актора отправкой ему сообщения:

CockooClock.scalaview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class ScheduledOrderSynchronizer extends Actor {
private val SYNC_ALL_ORDERS = "SYNC_ALL_ORDERS"
private var scheduler: Cancellable = _
override def preStart(): Unit = {
import scala.concurrent.duration._
scheduler = context.system.scheduler.schedule(
initialDelay = 10 seconds,
interval = 5 minutes,
receiver = self,
message = SYNC_ALL_ORDERS
)
}
override def postStop(): Unit = {
scheduler.cancel()
}
def receive = {
case SYNC_ALL_ORDERS =>
try {
// synchronize all the orders
} catch {
case t: Throwable =>
// report errors
}
}
}

Давайте посмотрим на то, как работает этот механизм в деталях:

  • прежде всего, мы должны инициализировать наш планировщик. Для этого, я считаю, полезно использовать методы жизненного цикла актора preStart и postStop . Можно было бы целиком объявить планировщикагде-нибудь еще и отправлять сообщения актору, однако я считаю, что инкапсуляция логики сильно облегчает техническое обслуживание, особенно, когда есть несколько подобных механизмов в одном и том же приложении.

  • мы создаем планировщик используя метод ActorSystem scheduler.schedule, передав в него начальную задержку (после которой в первый раз будет отправлено сообщение), затем интервал, с которым сообщение должно быть отправлено, получателя (в нашем случае, мы хотим, чтобы оно было отправлено самому актору, поэтому мы используем self), и сообщение для отправки.

  • когда актор прекращает работу, мы также должны отменить планировщик, иначе, отправки сообщений каждые 5 минут, продолжатся в пустоту

  • как для любого актора, реализация того, что должно быть выполнено повторно происходит в методе receive - в нашем случае, мы только обрабатываем сообщения вида SYNC_ALL_ORDERS

  • одинственно важной вещью, в данный момент, является обработка ошибки: я бы рекомендовал выполнять любой код внутри блока try-catch, как показано выше - в данном случае было бы излишним использовать стратегию обработки ошибок Akka, и мы не хотим разрушать наш планировщик, если одна из синхронизаций пойдет не так.

Теперь, единственной вещью, которую мы должны сделать, это привести механизм в движение. Для приложения Play, хорошим местом для этого был бы имеющий дурную репутацию Global:

CockooClockInit.scalaview raw
1
2
3
4
5
object Global extends GlobalSettings {
override def onStart(app: Application) {
Akka.system.actorOf(Props(new ScheduledOrderSynchronizer), name = "orderSynchronizer")
}
}

Вот и все! Наши часы с кукушкой готовы бить каждые 5 минут.

Примечание: API планировщика хорошо подходит для задач, которые подразумевают повторение, и менее для задач, которые должны выполниться в определенное время или дату. Однако, этого можно добиться с API путем расчета времени между инициализацией планировщика и запланированным временем. Для этих случаев, я использую библиотеку nscala-time, которая является оберткой поверх отличной библиотеки Joda Time.

Пакетный процессор

Одной вещью, которую я нахожу себя делающим довольно часто с Akka, является массовое распараллеливание данной задачи, которая часто связана Это может быть простая конфигурация, что включает в себя только одного наблюдателя и много рабочих одного вида, или чем-то более сложным с участием упорядочивающего конвейера.

Один наблюдатель и его армия рабочих

Многоступенчатая структура

В большинстве случаев эта разновидность шаблона включает производителя (база данных, такая как MongoDB, MySQL, BaseX, Amazon SimpleDB; гигабайтный XML файл, и т.д.), из которого элементы должны быть собраны и отправлены для обработки.

Давайте посмотрим на примере простого наблюдателя - цепочки рабочих. Мы сделаем несколько предположений для этого примера:

  • время для получения нового пакета данных несущественно, так что мы не заинтересованы в оптимизации процесса получения новых данных раньше времени

  • наша JVM имеет приличный объем доступной памяти, так что рабочие способны ставить в очередь все элементы своего почтового ящика

Примечание: Код намеренно оставлен ​​абстрактным, так как мы не заинтересованы в том, как данные будут получены из источника или обработаны, но в том, что произойдет с ними.

BatchProcessor.scalaview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import akka.actor.{ Props, Actor }
import akka.event.Logging
import akka.routing.RoundRobinRouter
abstract class BatchProcessor(dataSetId: Long) extends Actor {
val log = Logging(context.system, "application")
val workers = context.actorOf(Props[ItemProcessingWorker].withRouter(RoundRobinRouter(100)))
var totalItemCount = -1
var currentBatchSize: Int = 0
var currentProcessedItemsCount: Int = 0
var currentProcessingErrors: List[ItemProcessingError] = List.empty
var allProcessedItemsCount = 0
var allProcessingErrors: List[ItemProcessingError] = List.empty
def receive = {
case ProcessBatch =>
if (totalItemCount == -1) {
totalItemCount = totalItems
log.info(s"Starting to process set with ID $dataSetId, we have $totalItemCount items to go through")
}
val batch = fetchBatch
processBatch(batch)
case ProcessedOneItem =>
currentProcessedItemsCount = currentProcessedItemsCount + 1
continueProcessing()
case error @ ItemProcessingError(_, _, _) =>
currentProcessingErrors = error :: currentProcessingErrors
continueProcessing()
}
def processBatch(batch: List[BatchItem]) = {
if (batch.isEmpty) {
log.info(s"Done migrating all items for data set $dataSetId. $totalItems processed items, we had ${allProcessingErrors.size} errors in total")
} else {
// reset processing state for the current batch
currentBatchSize = batch.size
allProcessedItemsCount = currentProcessedItemsCount + allProcessedItemsCount
currentProcessedItemsCount = 0
allProcessingErrors = currentProcessingErrors ::: allProcessingErrors
currentProcessingErrors = List.empty
// distribute the work
batch foreach { item =>
workers ! item
}
}
}
def continueProcessing() = {
val itemsProcessed = currentProcessedItemsCount + currentProcessingErrors.size
if (itemsProcessed > 0 && itemsProcessed % 100 == 0) {
log.info(s"Processed $itemsProcessed out of $currentBatchSize with ${currentProcessingErrors.size} errors")
}
if (itemsProcessed == currentBatchSize) {
self ! ProcessBatch
}
}
def totalItems: Int
def fetchBatch: List[BatchItem]
}
abstract class ItemProcessingWorker extends Actor {
def receive = {
case ProcessItem(item) =>
try {
process(item) match {
case None => sender ! ProcessedOneItem
case Some(error) => sender ! error
}
} catch {
case t: Throwable =>
sender ! ItemProcessingError(item.id, "Unhandled error", Some(t))
}
}
def process(item: BatchItem): Option[ItemProcessingError]
}
case object ProcessBatch
trait BatchItem {
val id: Int
}
case class ProcessItem(item: BatchItem)
case object ProcessedOneItem
case class ItemProcessingError(itemId: Int, message: String, error: Option[Throwable])

Давайте пройдемся по примеру шаг за шагом:

  • у нас есть наблюдатель (BatchProcessor), определение рабочего (ItemProcessingWorker) и набор сообщений (ProcessBatch, ProcessItem, ProcessedOneItem, и т.д.)

  • конструктор BatchProcessor принимает в качестве параметра ID набора данных, который мы хотим обработать (при условии, например, что данные живут в базе данных и имеют идентификатор). Новый BatchProcessor должен быть создан для каждого набора, который мы хотим обработать

  • BatchProcessor создает несколько экземпляров дочерних рабочих с помощью RoundRobinRouter

  • наблюдатель имеет несколько членов, которые следят за состоянием обработки. Он также отслеживает все имеющие место ItemProcessingError

    затем механизм работает следующим образом:

    • чтобы начать, BatchProcessor должен получить от клиента сообщение ProcessBatch

    • затем он извлекает первую группу из источника и передает его в метод processBatch. Если это первый раз, когда он запускается он будет так же показывать, как много элементов есть в общем

    • processBatch инициализирует состояние для этой группы сбрасывая счетчики и устанавливая размер пакета, и затем переходит к распределению работы рабочим с помощью batch foreach { item => workers ! item }.

    • когда элемент обработан, актор-рабочий отвечает сообщением ProcessedOneItem или ItemProcessingError в случае неудачи. Мы используем эту информацию, чтобы отслеживать прогресс обработки.

    • метод continueProcessing сообщает прогресс для каждых 100 обработанных элементов (на практике, это должно быть адаптированно к размеру имеющихся данных). когда текущий пакет будет полностью обработан (сумма успешных и провальных элементов равна размеру текущего пакета), он отправляет наблюдателя в другой метод ProcessBatch. Нереализованный метод fetchBatch может использовать счетчики для того, чтобы знать, какое смещение использовать для получения следующей группы элементов.

    • наконец, когда не осталось данных для обработки, метод processBatch сообщает нам об этом и обработка завершается.

Способ, представленный выше, конечно, весьма общий. На практике процесс, часто, должен быть доработан или может быть адаптирован, чтобы быть более производительным для конкретного сценария использования. Природа источника данных часто определяет, как работают составляющие. Тем не менее, это одна из вещей, которая мне нравится в Akka - она только предоставляет минимальную инфраструктуру и оставляет на усмотрение разработчика право выбирать, какой механизм будет использован для реализации конвейера. Некоторые улучшения, которые вероятно нужны в реальном приложении, включают:

  • обработку ошибок использующую стратегии наблюдателя Akka. Прямо сейчас, наш рабочий очень прост, и имеет выражение try - catch вокруг метода process. Но мы не по-настоящему восстанавливаемся после ошибок, мы просто позволяем им упасть и сообщить количество имеющихся у нас ошибок. Это может работать хорошо, если эти шибки являются фатальными, но на практике они могут стоить попытки обработать некоторые элементы из элементов.

  • информирование клиента, что наша работа выполнена. Это может быть простым, как сохранение ссылки на актор , при получении первого сообщения ProcessBatch, и отправка сообщения WorkDone, когда все элементы обработаны (не пытайтесь отправлять это сообщение по ссылке sender потому что, к тому времени когда вся работа выполнится, скорее всего, отправителем сообщения будет один из дочерних акторов)

  • наличие более хороших показателей относительно обработки, такие как скорость обработки. Я часто использую библиотеку метрик Кода Хейла для этой цели.

Обратное давление бедняка

Если вы являетесь членом команды Akka, пропустите этот раздел. Для остальной части человечества, эта тема имеет непосредственное отношение к одному предположению, что мы сделали в предыдущем примере: “наша JVM имеет приличный объем доступной памяти, так что рабочие могут ставить в очередь все элементы своего почтового ящика”. На самом деле, это не всегда так. Тем не менее, если вы будете продолжать получать данные от производителя и передавать их рабочим, вы в конечном итоге исчерпаете всю память: по-умолчанию, Akka использует UnboundedMailbox, который будет продолжать заполняться.

В приведенном выше примере, эту проблему легко исправить, потому что мы только тогда переходим к следующей группе, когда все элементы из текущей были обработаны, что означает, что выбора подходящего размера пакета достаточно. Тем не менее, мы имели право выбрать другую реализацию, которая извлекает данные из источника постоянно, потому что делать как сейчас несколько дороже, или потому, что вы заинтересованы в работе с системой на максимальной мощности.

В такой ситуации, нам нужны средства, чтобы снизить скорость наблюдателя. Эта концепция также известна как обратное давление, и я приглашаю вас прочитать, как правильно ее реализовать. Иногда, однако, вы можете не быть заинтересованы в оптимальном решении проблемы обратного давления, и в этом случае, может быть нормальным нарушить основную директиву Akka: не допускайте блокировок внутри актора.

Оно может быть не оптимальным, не элегантным, но я нашел этот механизм работающим довольно хорошо, при условии, что у вас есть общее представление о ограничении памяти приложения, и вы не заинтересованы в динамичном масштабировании.

Для того, чтобы иметь возможность снизить скорость надлежащим образом, мы должны отделить наблюдателя и производителя данных. Это, то как может выглядеть реализация:

PoorManBackPreassure.scalaview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import akka.actor.{ Props, Actor }
import akka.event.Logging
import akka.routing.RoundRobinRouter
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
abstract class Processor(dataSetId: Long) extends Actor {
val log = Logging(context.system, "application")
val workers = context.actorOf(Props[ItemProcessingWorker].withRouter(RoundRobinRouter(100)))
val producer = context.actorOf(Props[DataProducer], name = "dataProducer")
var totalItemCount = -1
var currentItemCount = 0
var allProcessedItemsCount = 0
var allProcessingErrors: List[ItemProcessingError] = List.empty
val MAX_LOAD = 50
def receive = {
case Process =>
if (totalItemCount == -1) {
totalItemCount = totalItems
log.info(s"Starting to process set with ID $dataSetId, we have $totalItemCount items to go through")
}
val index = allProcessedItemsCount + allProcessingErrors.size
if (currentItemCount < MAX_LOAD) {
producer ! FetchData(index)
}
case Data(items) =>
processBatch(items)
case GetLoad =>
sender ! currentItemCount
case ProcessedOneItem =>
allProcessedItemsCount = allProcessedItemsCount + 1
currentItemCount = currentItemCount - 1
continueProcessing()
case error @ ItemProcessingError(_, _, _) =>
allProcessingErrors = error :: allProcessingErrors
currentItemCount = currentItemCount - 1
continueProcessing()
}
def processBatch(batch: List[Item]) = {
if (batch.isEmpty) {
log.info(s"Done migrating all items for data set $dataSetId. $totalItems processed items, we had ${allProcessingErrors.size} errors in total")
} else {
// distribute the work
batch foreach { item =>
workers ! item
currentItemCount = currentItemCount + 1
}
}
}
def continueProcessing() = {
val itemsProcessed = allProcessedItemsCount + allProcessingErrors.size
if (itemsProcessed > 0 && itemsProcessed % 100 == 0) {
log.info(s"Processed $itemsProcessed out of $totalItems with ${allProcessingErrors.size} errors")
}
self ! Process
}
def totalItems: Int
}
abstract class DataProducer extends Actor {
private val MAX_LOAD = 50
def receive = {
case FetchData(currentIndex) =>
throttleDown()
sender ! Data(fetchBatch(currentIndex))
}
def throttleDown(): Unit = {
implicit val timeout = Timeout(5.seconds)
val eventuallyLoad = context.parent ? GetLoad
try {
val load = Await.result(eventuallyLoad, 5.seconds)
if (load.asInstanceOf[Int] > MAX_LOAD) {
Thread.sleep(5000)
throttleDown()
}
} catch {
case t: Throwable =>
// we most likely have timed out - wait a bit longer
throttleDown()
}
}
def fetchBatch(currentIndex: Int): List[Item]
}
abstract class ItemProcessingWorker extends Actor {
def receive = {
case ProcessItem(item) =>
try {
process(item) match {
case None => sender ! ProcessedOneItem
case Some(error) => sender ! error
}
} catch {
case t: Throwable =>
sender ! ItemProcessingError(item.id, "Unhandled error", Some(t))
}
}
def process(item: Item): Option[ItemProcessingError]
}
case object Process
trait Item {
val id: Int
}
case class FetchData(currentIndex: Int)
case class Data(items: List[Item])
case object GetLoad
case class ProcessItem(item: Item)
case object ProcessedOneItem
case class ItemProcessingError(itemId: Int, message: String, error: Option[Throwable])

Так как же это работает? Давайте взглянем поближе:

  • мы больше не извлекаем данные в группы - или точнее, мы извлекаем их в группы, но мы не ждем завершения обработки группы, чтобы извлечь больше данных. Таким образом, мы не отслеживаем статус группы.

  • мы отслеживаем текущее количество элементов, которые были обработаны через счетчик currentItemCount в Processor, который увеличивается каждый раз, когда посылается элемент и уменьшается каждый раз, когда сообщается успех или неудача.

  • данные больше не извлекаются непосредственно в Processor, но вместо этого извлекаются в его дочерний класс называемый DataProducer . Каждый раз, когда элемент был обработан, Processor посылает себе сообщение Process, который впоследствии спрашивает производителя о дополнительных данных - если счетчик нашего текущего элемента уже не будет выше максимальной нагрузки мы можем справиться

  • теперь к самому интересному: DataProducer не будет получать новые данные сразу же, вместо этого, он проверит вместе с наблюдателем, какова текущая нагрузка. Если она выше, чем максимально допустимая (50 элементов в нашем примере, потому что мы, например, имеем дело с изображениями в высоким разрешении или чем-то подобным, что занимает много памяти) он будет ждать в течение 5 секунд и проверит снова, пока нагрузка снова не станет приемлемой. Только после этого он отправит новые данные в Processor , который продолжит делать свою работу, пока не останется никаких данных.

Некоторые комментарии об этой технике:

  • как говорится, нужно иметь хорошее ощущение того, каким должен быть максимум. Этот механизм в отличии от всех остальных, динамичный и не будет работать, если элементы весьма гетерогенны (в том смысле, что их обработка занимает очень разное количество времени)

  • она лучше подходит, чтобы ждать кучу элементов, которые будут обработаны, прежде чем запросить новую партию, в зависимости от случая

  • обработка ошибок в приведенном выше примере должна быть улучшена для использования в реальной жизни

Есть много других механизмов, для распределения работы, которые могут быть легко реализованы с Akka. Еще одним механизмом, который я бы хотел попробовать, когда получаю возможность работать, является кража работы, где рабочий получает активную роль получения работы, вместо того, чтобы их иметь, она отправляются ему.

Подводные камни

За время моей работы с Akka я натолкнулся на несколько подводных камней. Они в основном - итог не достаточно хорошего чтения документации, но я видел, как другие делают некоторые из этих ошибок, так что я думаю, будет полезным перечислить их здесь.

Создание слишком многих корневых акторов

Корневые акторы, созданные непосредственно через ActorSystem, вместо context, следует использовать с осторожностью: создание таких акторов дорого, т.к. требует синхронизации на защитном уровне. Кроме того, в большинстве случаев, вам не нужно иметь много корневых акторов, а скорее всего один актор для своей задачи, который имеет ряд дочерних.

Заключение future

Это ошибка, которую легко допустить, подробно описанная в этой статье. Давайте рассмотрим следующий пример:

ClosingOverFuture.scalaview raw
1
2
3
4
5
6
7
8
9
def receive = {
case ComputeResult(itemId: Long) =>
computeResult(itemId).map { result =>
// gotcha!
sender ! result
}
}
def computeResult(itemId: Long): Future[Int] = ???

Так в чем же проблема? computeResult производит Future , при этом переключается на другой контекст выполнения и освобождает основной поток. Это означает, что другое сообщение ComputeResult может прийти внутрь, и заменить ссылку sender. Тогда, когда первый future завершится, мы можем ответить не тому отправителю.

Легко исправить это, сохранив отправителя в val вот так:

ClosingOverFutureFix.scalaview raw
1
2
3
4
5
6
7
def receive = {
case ComputeResult(itemId: Long) =>
val originalSender = sender
computeResult(itemId).map { result =>
originalSender ! result
}
}

Неверное понимание смысла Future.andThen

Это в основном связано скорее с futures, чем с акторами, но я столкнулся с этим при смешивании обоих парадигм. Future API имеет метод andThen”, используемый исключительно для целей побочного действия”. Другими словами, он не будет трансформировать future и свяжет себя с ним, но скорее будет выполнен независимо.

Я ошибался полагая, что andThen вернет новый, преобразованный, Future, поэтому я попытался выполнить следующие действия:

FutureAndThen.scalaview raw
1
2
3
4
5
6
7
upload.andThen {
case Success(s) => {
FileUploadSuccess(item, s)
}
case Failure(t) =>
FileUploadFailure(item, Some(t))
} pipeTo originalSender

pipeTo является довольно мощным инструментом в инструментарий Akka, который позволяет отправлять результат future актору (под капотом он создает анонимный актор, который ждет завершения future).

Правильной реализацией будет:

Заключение

В заключении я хотел бы повториться, что Akka реально полезный инструмент, чтобы иметь его вашем наборе, и я думаю, что он собирается остаться таким гораздо дольше, поэтому я могу только посоветовать любому, кто этого еще не сделал, пойти и попробовать его (даже если вы работаете с Java, для вас есть Java API). Как мы и говорили, еще одной приятной парадигмой, которая станет стандартизированной, являются реактивные потоки, которые имеют дело с обратным давлением как с составной частью концепции.

Оригинал