paint-brush
Cómo construir una aplicación de transmisión de eventos en .NETpor@bbejeck
2,959 lecturas
2,959 lecturas

Cómo construir una aplicación de transmisión de eventos en .NET

por Bill Bejeck14m2023/02/13
Read on Terminal Reader

Demasiado Largo; Para Leer

El procesamiento de flujo es un enfoque para el desarrollo de software que ve los eventos como la entrada o salida principal de una aplicación. En esta publicación de blog, crearemos una aplicación de transmisión de eventos utilizando Apache Kafka, los clientes productores y consumidores de .NET, y Task Parallel Library (TPL) de Microsoft. El cliente de Kafka y TPL se encargan de la mayor parte del trabajo pesado; solo necesita concentrarse en la lógica de su negocio.
featured image - Cómo construir una aplicación de transmisión de eventos en .NET
Bill Bejeck HackerNoon profile picture
0-item


Cuando te detienes y piensas en la vida cotidiana, puedes ver fácilmente todo como un evento. Considere la siguiente secuencia:


  1. Se enciende el indicador de "bajo nivel de combustible" de su automóvil
  2. Como resultado, se detiene en la próxima estación de combustible para repostar
  3. Cuando echa gasolina al automóvil, se le solicita que se una al club de recompensas de la compañía para obtener un descuento.
  4. Entras, te registras y obtienes un crédito para tu próxima compra.


Podríamos seguir y seguir aquí, pero he dejado claro mi punto: la vida es una secuencia de eventos. Dado ese hecho, ¿cómo diseñaría un nuevo sistema de software hoy? ¿Recopilaría diferentes resultados y los procesaría en algún intervalo arbitrario o esperaría hasta el final del día para procesarlos? No, no lo harías; querrá actuar en cada evento tan pronto como suceda. Claro, puede haber casos en los que no pueda responder de inmediato a circunstancias individuales... piense en obtener un volcado de las transacciones de un día a la vez. Pero aún así, actuaría tan pronto como reciba los datos, un evento de suma global considerable si lo desea.


Entonces, ¿cómo se implementa un sistema de software para trabajar con eventos? La respuesta es el procesamiento de flujo.


¿Qué es el procesamiento de flujo?

Al convertirse en la tecnología de facto para manejar datos de eventos, el procesamiento de flujo es un enfoque para el desarrollo de software que ve los eventos como la entrada o salida principal de una aplicación. Por ejemplo, no tiene sentido esperar para actuar sobre la información o responder a una posible compra fraudulenta con tarjeta de crédito. Otras veces, podría implicar el manejo de un flujo entrante de registros en un microservicio, y procesarlos de la manera más eficiente es lo mejor para su aplicación.

Cualquiera que sea el caso de uso, es seguro decir que un enfoque de transmisión de eventos es el mejor enfoque para manejar eventos.


En esta publicación de blog, crearemos una aplicación de transmisión de eventos utilizando Apache Kafka®, los clientes productores y consumidores de .NET, y Task Parallel Library (TPL) de Microsoft. A primera vista, es posible que no junte automáticamente los tres como posibles candidatos para trabajar juntos. Claro, Kafka y los clientes .NET son una gran pareja, pero ¿dónde encaja TPL en la imagen?


La mayoría de las veces, el rendimiento es un requisito clave y, para evitar cuellos de botella debido a discrepancias de impedancia entre el consumo de Kafka y el procesamiento posterior, generalmente sugerimos la paralelización en el proceso siempre que surja la oportunidad.


Siga leyendo para ver cómo los tres componentes funcionan juntos para crear una aplicación de transmisión de eventos sólida y eficiente. La mejor parte es que el cliente de Kafka y TPL se encargan de la mayor parte del trabajo pesado; solo necesita concentrarse en la lógica de su negocio.


Antes de sumergirnos en la aplicación, demos una breve descripción de cada componente.

apache kafka

Si el procesamiento de transmisiones es el estándar de facto para manejar transmisiones de eventos, entonces Apache Kafka es el estándar de facto para crear aplicaciones de transmisión de eventos. Apache Kafka es un registro distribuido proporcionado de manera altamente escalable, elástica, tolerante a fallas y segura. En pocas palabras, Kafka utiliza intermediarios (servidores) y clientes. Los intermediarios forman la capa de almacenamiento distribuido del clúster de Kafka, que puede abarcar centros de datos o regiones de la nube. Los clientes brindan la capacidad de leer y escribir datos de eventos desde un clúster de intermediarios. Los clústeres de Kafka son tolerantes a fallas: si algún agente falla, otros agentes se encargarán del trabajo para garantizar operaciones continuas.

Clientes .NET confluentes

Mencioné en el párrafo anterior que los clientes escriben o leen desde un clúster de agentes de Kafka. Apache Kafka se empaqueta con clientes Java, pero hay varios otros clientes disponibles, a saber, el productor y consumidor de .NET Kafka, que es el núcleo de la aplicación en esta publicación de blog. El productor y el consumidor de .NET llevan el poder de la transmisión de eventos con Kafka al desarrollador de .NET. Para más información sobre los clientes .NET consultar la documentación .

Biblioteca paralela de tareas

Task Parallel Library ( TPL ) es "un conjunto de tipos públicos y API en los espacios de nombres System.Threading y System.Threading.Tasks", lo que simplifica el trabajo de escribir aplicaciones simultáneas. El TPL hace que agregar concurrencia sea una tarea más manejable al manejar los siguientes detalles:


1. Manejo de la partición del trabajo 2. Programación de subprocesos en ThreadPool 3. Detalles de bajo nivel como cancelación, administración de estado, etc.


La conclusión es que el uso de TPL puede maximizar el rendimiento de procesamiento de su aplicación mientras le permite concentrarse en la lógica comercial. Específicamente, utilizará el subconjunto Biblioteca de flujo de datos de la TPL.


La biblioteca de flujo de datos es un modelo de programación basado en actores que permite el paso de mensajes en proceso y tareas de canalización. Los componentes de Dataflow se basan en los tipos y la infraestructura de programación de TPL y se integran a la perfección con el lenguaje C#. La lectura de Kafka suele ser bastante rápida, pero el procesamiento (una llamada DB o una llamada RPC) suele ser un cuello de botella. Vale la pena considerar cualquier oportunidad de paralelización que podamos utilizar para lograr un mayor rendimiento sin sacrificar las garantías de pedido.


En esta publicación de blog, aprovecharemos estos componentes de Dataflow junto con los clientes de .NET Kafka para crear una aplicación de procesamiento de flujo que procesará los datos a medida que estén disponibles.

Bloques de flujo de datos

Antes de entrar en la aplicación que va a construir; deberíamos brindar información general sobre lo que constituye la biblioteca de flujo de datos TPL. El enfoque que se detalla aquí es más aplicable cuando tiene tareas intensivas de E/S y CPU que requieren un alto rendimiento. La biblioteca de flujo de datos TPL consta de bloques que pueden almacenar en búfer y procesar datos o registros entrantes, y los bloques se clasifican en una de tres categorías:


  1. Bloques de origen: actúan como una fuente de datos y otros bloques pueden leerlos.

  2. Bloques de destino: un receptor de datos o un sumidero, en el que otros bloques pueden escribir.

  3. Bloques propagadores: se comportan como un bloque de origen y de destino.


Toma los diferentes bloques y los conecta para formar una canalización de procesamiento lineal o un gráfico de procesamiento más complejo. Considere las siguientes ilustraciones:



Cada nodo en el gráfico representa un procesamiento diferente o una tarea computacional.



La biblioteca de flujo de datos proporciona varios tipos de bloques predefinidos que se dividen en tres categorías: almacenamiento en búfer, ejecución y agrupación. Estamos utilizando los tipos de ejecución y almacenamiento en búfer para el proyecto desarrollado para esta publicación de blog. El BufferBlock<T> es una estructura de propósito general que almacena datos en búfer y es ideal para usar en aplicaciones de productor/consumidor. El BufferBlock utiliza una cola de tipo primero en entrar, primero en salir para manejar los datos entrantes.


El BufferBlock (y las clases que lo amplían) es el único tipo de bloque en la biblioteca de flujo de datos que permite escribir y leer mensajes directamente; otros tipos esperan recibir mensajes o enviar mensajes a los bloques. Por esta razón, usamos un BufferBlock como delegado cuando creamos el bloque de origen e implementamos la interfaz ISourceBlock y el bloque receptor implementamos la interfaz ITargetBlock .


El otro tipo de bloque de flujo de datos que se usa en nuestra aplicación es un TransformBlock <TInput, TOutput> . Como la mayoría de los tipos de bloques en la biblioteca de flujo de datos, crea una instancia de TransformBlock al proporcionar un Func<TInput, TOutput> para actuar como un delegado que ejecuta el bloque de transformación para cada registro de entrada que recibe.


Dos características esenciales de los bloques de Dataflow son que puede controlar la cantidad de registros que almacenará en el búfer y el nivel de paralelismo.


Al establecer una capacidad de búfer máxima, su aplicación aplicará automáticamente una contrapresión cuando la aplicación encuentre una espera prolongada en algún punto de la canalización de procesamiento. Esta contrapresión es necesaria para evitar una sobreacumulación de datos. Luego, una vez que el problema desaparezca y el búfer disminuya de tamaño, consumirá datos nuevamente.


La capacidad de establecer la concurrencia de un bloque es fundamental para el rendimiento. Si un bloque realiza una tarea intensiva de E/S o CPU, existe una tendencia natural a paralelizar el trabajo para aumentar el rendimiento. Pero agregar simultaneidad puede causar un problema: procesar el pedido. Si agrega subprocesos a la tarea de un bloque, no puede garantizar el orden de salida de los datos. En algunos casos, el orden no importará, pero cuando sí importa, es una compensación importante a considerar: mayor rendimiento con simultaneidad en comparación con el procesamiento de la salida del pedido. Afortunadamente, no tiene que hacer este compromiso con la Biblioteca de flujo de datos.


Cuando establece el paralelismo de un bloque en más de uno, el marco garantiza que mantendrá el orden original de los registros de entrada (tenga en cuenta que mantener el orden con paralelismo es configurable, siendo verdadero el valor predeterminado). Si el orden original de los datos es A, B, C, entonces el orden de salida será A, B, C. ¿Escéptico? Sé que lo estaba, así que lo probé y descubrí que funcionaba como se anunciaba. Hablaremos de esta prueba un poco más adelante en esta publicación. Tenga en cuenta que solo se debe aumentar el paralelismo con operaciones sin estado o con estado que sean asociativas y conmutativas , lo que significa que cambiar el orden o la agrupación de las operaciones no afectará el resultado.


En este punto, puedes ver a dónde va esto. Tiene un tema de Kafka que representa eventos que necesita manejar de la manera más rápida posible. Por lo tanto, va a crear una aplicación de transmisión compuesta por un bloque de origen con un KafkaConsumer de .NET, bloques de procesamiento para lograr la lógica comercial y un bloque receptor que contiene un KafkaProducer de .NET para volver a escribir los resultados finales en un tema de Kafka. Aquí hay una ilustración de una vista de alto nivel de la aplicación:




La aplicación tendrá la siguiente estructura:


  1. Bloque fuente: Envolviendo un .NET KafkaConsumer y un delegado BufferBlock
  2. Bloque de transformación: deserialización
  3. Bloque de transformación: mapeo de datos JSON entrantes para comprar objetos
  4. Bloque de transformación: tarea de uso intensivo de CPU (simulado)
  5. Bloque de transformación: serialización
  6. Bloque de destino: empaquetar un delegado de .NET KafkaProducer y BufferBlock


A continuación, se incluye una descripción del flujo general de la aplicación y algunos puntos críticos sobre cómo aprovechar Kafka y la biblioteca de flujo de datos para crear una aplicación poderosa de transmisión de eventos.


Una aplicación de transmisión de eventos

Este es nuestro escenario: tiene un tema de Kafka que recibe registros de compras de su tienda en línea y el formato de datos entrantes es JSON. Desea procesar estos eventos de compra aplicando inferencias de ML a los detalles de la compra. Además, le gustaría transformar los registros JSON al formato Protobuf, ya que este es el formato de datos de toda la empresa. Por supuesto, el rendimiento de la aplicación es esencial. Las operaciones de ML consumen mucha CPU, por lo que necesita una forma de maximizar el rendimiento de la aplicación, por lo que aprovechará la paralelización de esa parte de la aplicación.


Consumir datos en la canalización

Recorramos los puntos críticos de la aplicación de streaming, empezando por el bloque fuente. Mencioné antes la implementación de la interfaz ISourceBlock y, dado que BufferBlock también implementa ISourceBlock , la usaremos como delegado para satisfacer todos los métodos de la interfaz. Entonces, la implementación del bloque fuente envolverá un KafkaConsumer y el BufferBlock. Dentro de nuestro bloque fuente, tendremos un hilo separado cuya única responsabilidad es que el consumidor pase los registros que ha consumido al búfer. A partir de ahí, el búfer enviará los registros al siguiente bloque de la canalización.


Antes de reenviar el registro al búfer, el ConsumeRecord (devuelto por la llamada Consumer.consume ) se envuelve con una abstracción Record que, además de la clave y el valor, captura la partición original y el desplazamiento, que es fundamental para la aplicación, y Explicaré por qué en breve. También vale la pena señalar que toda la canalización funciona con la abstracción Record , por lo que cualquier transformación da como resultado un nuevo objeto Record que envuelve la clave, el valor y otros campos esenciales, como el desplazamiento original, que los conserva a lo largo de toda la canalización.


Bloques de procesamiento

La aplicación divide el procesamiento en varios bloques diferentes. Cada bloque se vincula con el siguiente paso en la cadena de procesamiento, por lo que el bloque de origen se vincula con el primer bloque, que maneja la deserialización. Si bien .NET KafkaConsumer puede manejar la deserialización de registros, hacemos que el consumidor pase la carga útil serializada y la deserialice en un bloque Transform. La deserialización puede hacer un uso intensivo de la CPU, por lo que poner esto en su bloque de procesamiento nos permite paralelizar la operación si es necesario.


Después de la deserialización, los registros fluyen hacia otro bloque de transformación que convierte la carga útil de JSON en un objeto de modelo de datos de compra en formato Protobuf. La parte más interesante surge cuando los datos pasan al siguiente bloque, lo que representa una tarea que requiere un uso intensivo de la CPU para completar completamente la transacción de compra. La aplicación simula esta parte y la función suministrada duerme con un tiempo aleatorio de entre uno y tres segundos.


Este bloque de procesamiento simulado es donde aprovechamos el poder del marco de bloques de Dataflow. Cuando crea una instancia de un bloque de Dataflow, proporciona una instancia Func delegada que se aplica a cada registro que encuentra y una instancia ExecutionDataflowBlockOptions . Mencioné la configuración de los bloques de Dataflow antes, pero los revisaremos rápidamente aquí nuevamente. ExecutionDataflowBlockOptions contiene dos propiedades esenciales: el tamaño máximo de búfer para ese bloque y el grado máximo de paralelización.


Si bien establecemos la configuración del tamaño del búfer para todos los bloques en la canalización en 10 000 registros, mantenemos el nivel de paralelización predeterminado de 1, excepto para nuestra CPU simulada intensiva, donde lo establecemos en 4. Tenga en cuenta que el tamaño predeterminado del búfer de Dataflow es ilimitado. Hablaremos de las implicaciones de rendimiento en la siguiente sección, pero por ahora, completaremos la descripción general de la aplicación.


El bloque de procesamiento intensivo reenvía a un bloque de transformación de serialización que alimenta el bloque receptor, que luego envuelve un KafkaProducer de .NET y produce los resultados finales en un tema de Kafka. El bloque sumidero también usa un BufferBlock delegado y un subproceso separado para producir. El subproceso recupera el siguiente registro disponible del búfer. Luego llama al método KafkaProducer.Produce pasando un delegado Action que envuelve el DeliveryReport : el subproceso de E/S del productor ejecutará el delegado de Action una vez que se complete la solicitud de producción.


Eso completa el tutorial de alto nivel de la aplicación. Ahora, analicemos una parte crucial de nuestra configuración: cómo manejar las compensaciones de compromiso, que es vital dado que estamos canalizando registros del consumidor.


Compensación de compensaciones

Al procesar datos con Kafka, periódicamente confirmará compensaciones (una compensación es la posición lógica de un registro en un tema de Kafka) de los registros que su aplicación ha procesado correctamente hasta un punto determinado. Entonces, ¿por qué uno comete las compensaciones? Esa es una pregunta fácil de responder: cuando su consumidor se apaga de manera controlada o por error, reanudará el procesamiento desde la última compensación comprometida conocida. Al comprometer periódicamente las compensaciones, su consumidor no volverá a procesar registros o al menos una cantidad mínima si su aplicación se cierra después de procesar algunos registros pero antes de comprometerse. Este enfoque se conoce como procesamiento al menos una vez, lo que garantiza que los registros se procesen al menos una vez y, en caso de errores, tal vez algunos de ellos se vuelvan a procesar, pero esa es una excelente opción cuando la alternativa es arriesgarse a perder datos. Kafka también proporciona garantías de procesamiento exactamente una vez y, aunque no entraremos en transacciones en esta publicación de blog, puede leer más sobre las transacciones en Kafka en esta entrada de blog .


Si bien hay varias formas diferentes de confirmar compensaciones, la más simple y básica es el enfoque de confirmación automática. El consumidor lee los registros y la aplicación los procesa. Después de que transcurra una cantidad de tiempo configurable (según las marcas de tiempo de los registros), el consumidor confirmará las compensaciones de los registros ya consumidos. Por lo general, la confirmación automática es un enfoque razonable; en un bucle típico de proceso de consumo, no volverá al consumidor hasta que haya procesado con éxito todos los registros consumidos anteriormente. Si hubiera habido un error o un cierre inesperado, el código nunca regresa al consumidor, por lo que no se produce ninguna confirmación. Pero en nuestra aplicación aquí, estamos canalizando: tomamos registros consumidos y los empujamos a un búfer y volvemos a consumir más, no hay que esperar para un procesamiento exitoso.


Con el enfoque de canalización, ¿cómo garantizamos el procesamiento al menos una vez? Aprovecharemos el método IConsumer.StoreOffset , que asigna un solo parámetro, un TopicPartitionOffset , y lo almacena (junto con otras compensaciones) para la siguiente confirmación. Tenga en cuenta que este enfoque de la gestión de compensación contrasta cómo funciona la confirmación automática con la API de Java.


Entonces, el procedimiento de confirmación funciona de esta manera: cuando el bloque receptor recupera un registro para producirlo en Kafka, también se lo proporciona al delegado de Acción. Cuando el productor ejecuta la devolución de llamada, pasa el desplazamiento original al consumidor (la misma instancia en el bloque de origen) y el consumidor usa el método StoreOffset. Todavía tiene la confirmación automática habilitada para el consumidor, pero proporciona las compensaciones para confirmar en lugar de que el consumidor confirme a ciegas las últimas compensaciones consumidas hasta este punto.



Compensación de compensaciones


Por lo tanto, aunque la aplicación utiliza la canalización, solo se compromete después de recibir un acuse de recibo del intermediario, lo que significa que el intermediario y el conjunto mínimo de intermediarios de réplica han almacenado el registro. Trabajar de esta manera permite que la aplicación progrese más rápido, ya que el consumidor puede buscar y alimentar continuamente la canalización mientras los bloques realizan su trabajo. Este enfoque es posible porque el cliente consumidor de .NET es seguro para subprocesos (algunos métodos no lo son y están documentados como tales), por lo que podemos tener nuestro único consumidor trabajando de manera segura en los subprocesos de bloque fuente y receptor.


Para cualquier error durante la etapa de producción, la aplicación registra el error y vuelve a colocar el registro en el BufferBlock anidado para que el productor vuelva a intentar enviar el registro al intermediario. Pero esta lógica de reintento se realiza a ciegas y, en la práctica, probablemente desee una solución más robusta.

Implicaciones de rendimiento

Ahora que hemos cubierto cómo funciona la aplicación, veamos los números de rendimiento. Todas las pruebas se ejecutaron localmente en una computadora portátil macOS Big Sur (11.6), por lo que su kilometraje puede variar en este escenario. La configuración de la prueba de rendimiento es sencilla:


  1. Produzca 1 millón de registros para un tema de Kafka en formato JSON. Este paso se realizó con anticipación y no se incluyó en las mediciones de prueba.

  2. Inicie la aplicación habilitada para Kafka Dataflow y configure la paralelización en todos los bloques en 1 (el valor predeterminado)

  3. La aplicación se ejecuta hasta que haya procesado con éxito 1 millón de registros, luego se apaga

  4. Registre el tiempo que tomó procesar todos los registros


La única diferencia para la segunda ronda fue establecer el MaxDegreeOfParallelism para el bloque intensivo de CPU simulado en cuatro.

Aquí están los resultados:


Número de registros

Factor de concurrencia

Tiempo (minutos)

1M

1

38

1M

4

9


Entonces, simplemente estableciendo una configuración, mejoramos significativamente el rendimiento mientras mantenemos el orden de los eventos. Entonces, al habilitar un grado máximo de paralelismo a cuatro, obtenemos la aceleración esperada por un factor mayor a cuatro. Pero la parte crítica de esta mejora del rendimiento es que no escribió ningún código concurrente, lo que sería difícil de hacer correctamente.


Anteriormente en la publicación del blog, mencioné una prueba para validar que la concurrencia con los bloques de Dataflow preserva el orden de los eventos, así que hablemos de eso ahora. El juicio implicó los siguientes pasos:


  1. Producir 1M de enteros (0-999,999) a un tema de Kafka

  2. Modificar la aplicación de referencia para trabajar con tipos enteros

  3. Ejecute la aplicación con un nivel de simultaneidad de uno para el bloque de proceso remoto simulado: produzca en un tema de Kafka

  4. Vuelva a ejecutar la aplicación con un nivel de concurrencia de cuatro y genere los números para otro tema de Kafka.

  5. Ejecute un programa para consumir los números enteros de ambos temas de resultados y guárdelos en una matriz en la memoria

  6. Compare ambas matrices y confirme que están en el mismo orden


El resultado de esta prueba fue que ambas matrices contenían los números enteros en orden del 0 al 999 999, lo que demuestra que el uso de un bloque de Dataflow con un nivel de paralelismo de más de uno mantuvo el orden de procesamiento de los datos entrantes. Puede encontrar información más detallada sobre el paralelismo de Dataflow en la documentación .

Resumen

En esta publicación, presentamos cómo usar los clientes .NET Kafka y la biblioteca paralela de tareas para crear una aplicación de transmisión de eventos robusta y de alto rendimiento. Kafka proporciona transmisión de eventos de alto rendimiento, y la biblioteca paralela de tareas le brinda los componentes básicos para crear aplicaciones simultáneas con almacenamiento en búfer para manejar todos los detalles, lo que permite a los desarrolladores concentrarse en la lógica comercial. Si bien el escenario de la aplicación es un poco artificial, es de esperar que pueda ver la utilidad de combinar las dos tecnologías. Darle una oportunidad- aquí está el repositorio de GitHub .



También publicado aquí.