paint-brush
Apache Kafka에서 ksqlDB를 사용하여 관엽 식물 알림 시스템을 구축한 방법~에 의해@thedanicafine
5,604 판독값
5,604 판독값

Apache Kafka에서 ksqlDB를 사용하여 관엽 식물 알림 시스템을 구축한 방법

~에 의해 Danica Fine19m2023/02/09
Read on Terminal Reader
Read this story w/o Javascript

너무 오래; 읽다

Raspberry Pi, Apache Kafka 및 Telegram을 사용하여 관엽 식물에 물을 주어야 할 때 알려주는 시스템을 구축하는 첫 번째 IoT 하드웨어 프로젝트에 참여해 보세요!
featured image - Apache Kafka에서 ksqlDB를 사용하여 관엽 식물 알림 시스템을 구축한 방법
Danica Fine HackerNoon profile picture

2020년에는 너무나 많은 사람들이 팬데믹으로 인한 취미, 즉 봉쇄 조치로 인해 제한을 받는 동안 마음껏 몰두할 수 있는 취미를 선택했습니다. 저는 실내화분을 선택했어요.


팬데믹 이전에 나는 이미 집에 작은 보육원을 갖고 있었습니다. 솔직히 그때도 매일 식물 하나하나를 돌보는 일은 정말 힘든 일이었습니다. 그들 중 누구에게 물을 주어야 하는지 확인하고, 모두 적절한 양의 햇빛을 받고 있는지 확인하고, 그들과 이야기를 나누세요… #justHouseplantThings.


One of my many needy houseplants. 집에 있는 시간이 많다는 것은 식물에 더 많이 투자할 수 있다는 것을 의미했습니다. 그리고 나는 그렇게 했습니다. 내 시간과 노력과 돈이요. 우리 집에는 수십 개의 관엽 식물이 있습니다. 그들 모두는 이름과 성격을 가지고 있고(적어도 내 생각에는 그렇게 생각한다), 몇몇은 심지어 멍청한 눈을 가지고 있다. 물론 하루 종일 집에 있는 동안에는 괜찮았지만, 생활이 천천히 정상으로 돌아오면서 나는 어려운 상황에 처하게 되었습니다. 더 이상 이 세상에서 식물을 관찰할 시간이 없었습니다. 해결책이 필요했습니다. 매일 수동으로 식물을 확인하는 것보다 식물을 모니터링하는 더 좋은 방법이 있어야 했습니다.


Apache Kafka®를 입력하세요. 글쎄요, 또 다른 취미인 하드웨어 프로젝트를 시작하고 싶은 저의 소망을 입력해 보세요.


저는 항상 Raspberry Pi를 사용하여 프로젝트를 구축할 핑계를 찾고 싶었고 이것이 기회라는 것을 알았습니다. 나는 식물을 모니터링하여 주의가 필요할 때만 알림을 주고, 잠시 후가 아닌 알림을 보낼 수 있는 시스템을 구축할 것입니다. 그리고 저는 Kafka를 백본으로 사용하겠습니다.

이것은 실제로 매우 유용한 프로젝트로 판명되었습니다. 그것은 제가 가지고 있던 매우 현실적인 문제를 해결했고, 관엽식물에 대한 집착과 마침내 집에서 Kafka를 사용하고 싶은 가려운 욕구를 결합할 기회를 주었습니다. 이 모든 것은 누구나 스스로 구현할 수 있는 쉽고 접근 가능한 하드웨어 프로젝트로 깔끔하게 마무리되었습니다.


당신이 나와 같고 집을 자동화해야만 해결할 수 있는 화초 문제가 있거나, 나와는 전혀 다르지만 여전히 멋진 프로젝트를 탐구하고 싶다면 이 블로그 게시물이 당신을 위한 것입니다. .


소매를 걷어붙이고 손을 더럽히자!

씨앗 심기

먼저, 나는 이 프로젝트를 통해 내가 이루고 싶은 것이 무엇인지 알아보기 위해 자리에 앉았습니다. 시스템의 첫 번째 단계에서 식물의 수분 수준을 모니터링하고 이에 대한 알림을 받을 수 있으면 매우 도움이 될 것입니다. 결국 식물 관리에서 가장 시간이 많이 걸리는 부분은 어떤 식물을 관리해야 할지 결정하는 것이었습니다. 이 시스템이 의사결정 프로세스를 처리할 수 있다면 엄청난 시간을 절약할 수 있을 것입니다!


높은 수준에서 제가 구상한 기본 시스템은 다음과 같습니다.

데이터를 수집하기 위한 Raspberry Pi, 식물 메타데이터를 저장할 위치, 원시 데이터를 변환하기 위한 Kafka 및 ksqlDB, 출력 경고로 시작하는 식물 모니터링 시스템의 상위 수준 아키텍처입니다.



토양에 수분 센서를 배치하고 이를 Raspberry Pi에 연결합니다. 그런 다음 정기적으로 수분 수치를 측정하여 Kafka에 넣을 수 있었습니다. 수분 측정값 외에도 어떤 식물에 물을 주어야 하는지 결정하기 위해 각 식물에 대한 일부 메타데이터가 필요했습니다. 메타데이터도 Kafka로 생성하겠습니다. Kafka의 두 데이터 세트를 모두 사용하면 스트림 처리를 사용하여 데이터 세트를 서로 결합하고 강화하고 어떤 식물에 물을 주어야 하는지 계산할 수 있습니다. 거기에서 경고를 실행할 수 있습니다.


일련의 기본 요구 사항이 확립된 후 하드웨어 및 조립 단계에 들어갔습니다.

사물을 스토킹하는 중

자존심이 강한 많은 엔지니어들처럼 나도 엄청난 인터넷 검색으로 하드웨어 단계를 시작했습니다. 나는 이 프로젝트를 성공시키기 위해 모든 요소가 존재한다는 것을 알고 있었지만, 물리적 구성 요소를 다루는 것이 처음이었기 때문에 내가 무엇을 하고 있는지 정확히 알고 싶었습니다.


모니터링 시스템의 주요 목표는 식물에 언제 물을 주어야 하는지 알려주는 것이었기 때문에 일종의 수분 센서가 필요했습니다. 저는 토양 수분 센서가 다양한 모양과 크기로 제공되고, 아날로그 또는 디지털 구성 요소로 제공되며, 수분을 측정하는 방식이 다르다는 것을 알게 되었습니다. 결국 저는 이러한 I2C 용량성 센서를 선택했습니다. 하드웨어를 막 시작하는 사람에게는 훌륭한 옵션인 것 같았습니다. 용량성 센서로서 저항성 기반 센서보다 수명이 길고 아날로그에서 디지털로의 변환이 필요하지 않으며 거의 플러그 앤 방식이었습니다. 놀다. 게다가 온도 측정도 무료로 제공했습니다.


여담: 궁금한 분들을 위해 말씀드리자면 I2C는 상호 집적 회로(Inter-Integrated Circuit)를 의미합니다. 이러한 각 센서는 고유한 주소를 통해 통신합니다. 따라서 각 센서에서 데이터를 가져오려면 사용하는 모든 센서의 고유 주소를 설정하고 추적해야 합니다. 나중에 염두에 두어야 할 사항입니다.


센서를 결정하는 것이 물리적 설정에서 가장 큰 부분이었습니다. 하드웨어 측면에서 남은 일은 라즈베리 파이와 몇 가지 장비를 구하는 것뿐이었습니다. 그런 다음 자유롭게 시스템 구축을 시작할 수 있었습니다.


다음 구성 요소를 사용했습니다.

내 프로젝트에 사용된 라즈베리 파이, 브레드보드, 토양 수분 센서입니다.


흙에서부터…

나는 이 프로젝트가 쉽고 초보자 친화적이길 원했지만, 배선과 납땜을 최대한 많이 해보고 싶었습니다. 저보다 앞서 오신 분들을 기리기 위해 저는 전선, 크림퍼, 그리고 꿈을 가지고 이번 조립 여정을 시작했습니다. 첫 번째 단계는 4개의 센서를 브레드보드에 연결하고 브레드보드를 Raspberry Pi에 연결하기에 충분한 리본 와이어를 준비하는 것이었습니다. 설정에서 구성 요소 사이의 간격을 허용하기 위해 24인치 길이를 준비했습니다. 각 전선을 벗겨내고 압착한 후 JST 커넥터(센서를 브레드보드에 연결하는 전선용) 또는 암 소켓(Raspberry Pi 자체에 연결하기 위해)에 연결해야 했습니다. 그러나 물론 시간과 노력, 비용을 절약하고 싶다면 직접 전선을 압착하지 말고 미리 준비된 전선을 구입하는 것이 좋습니다.


여담: 내가 소유한 화초의 수를 고려하면 4개는 내 모니터링 설정에 사용할 수 있는 센서 수가 임의로 적은 것처럼 보일 수 있습니다. 앞서 언급했듯이 이러한 센서는 I2C 장치이므로 통신하는 모든 정보는 고유 주소를 사용하여 전송됩니다. 즉, 제가 구입한 토양 수분 센서는 모두 동일한 기본 주소로 배송되는데, 이는 동일한 장치를 여러 개 사용하려는 이와 같은 설정에 문제가 됩니다. 이 문제를 해결하는 두 가지 주요 방법이 있습니다. 첫 번째 옵션은 장치 자체에 따라 다릅니다. 내 특정 센서에는 후면에 2개의 I2C 주소 점퍼가 있었고 이들의 조합을 납땜하면 I2C 주소를 0x36과 0x39 범위로 변경할 수 있었습니다. 전체적으로 4개의 고유 주소를 가질 수 있으므로 최종 설정에서 4개의 센서를 사용합니다. 장치에 주소 변경을 위한 물리적 수단이 부족한 경우 두 번째 옵션은 정보를 다시 라우팅하고 멀티플렉스를 사용하여 프록시 주소를 설정하는 것입니다. 제가 하드웨어를 처음 접한다는 점을 고려하면 그것이 이 특정 프로젝트의 범위를 벗어난다고 느꼈습니다.


센서를 Raspberry Pi에 연결하기 위한 와이어를 준비한 후 테스트 Python 스크립트를 사용하여 단일 센서에서 판독값을 수집하여 모든 것이 올바르게 설정되었는지 확인했습니다. 좀 더 안심시키기 위해 나머지 세 개의 센서도 같은 방식으로 테스트했습니다. 그리고 이 단계에서 교차 전선이 전자 부품에 어떤 영향을 미치는지, 그리고 이러한 문제를 디버깅하는 것이 얼마나 어려운지를 직접 배웠습니다.


배선이 제대로 작동하면 모든 센서를 Raspberry Pi에 연결할 수 있습니다. 모든 센서는 Raspberry Pi의 동일한 핀(GND, 3V3, SDA 및 SCL)에 연결되어야 했습니다. 모든 센서에는 고유한 I2C 주소가 있으므로 모두 동일한 와이어를 통해 통신하더라도 해당 주소를 사용하여 특정 센서에서 데이터를 얻을 수 있습니다. 제가 해야 할 일은 각 센서를 브레드보드에 연결한 다음 브레드보드를 Raspberry Pi에 연결하는 것뿐이었습니다. 이를 달성하기 위해 남은 전선을 약간 사용하고 납땜을 사용하여 브레드보드의 기둥을 연결했습니다. 그런 다음 센서를 쉽게 연결할 수 있도록 JST 커넥터를 브레드보드에 직접 납땜했습니다.


브레드보드를 Raspberry Pi에 연결하고, 센서를 4개의 식물에 삽입하고, 테스트 스크립트를 통해 모든 센서에서 데이터를 읽을 수 있음을 확인한 후 Kafka에 데이터를 생성하는 작업을 시작할 수 있었습니다.

실시간 데이터

Raspberry Pi 설정 과 모든 수분 센서가 예상대로 작동하면서 Kafka를 혼합하여 일부 데이터 스트리밍을 시작할 차례였습니다.


예상하셨겠지만 Kafka에 데이터를 쓰려면 Kafka 클러스터가 필요했습니다. 이 프로젝트의 소프트웨어 구성 요소를 최대한 가볍고 쉽게 설정하기 위해 Confluent Cloud를 Kafka 공급자로 사용하기로 결정했습니다. 그렇게 하면 인프라를 설정하거나 관리할 필요가 없었고 Kafka 클러스터가 설정 후 몇 분 안에 준비되었습니다.


특히 MQTT가 센서에서 IoT 데이터를 스트리밍하기 위한 사실상의 표준이라는 점을 고려하면 제가 이 프로젝트에 Kafka를 사용하기로 선택한 이유도 언급할 가치가 있습니다. Kafka와 MQTT는 모두 게시/하위 스타일 메시징을 위해 구축되었으므로 그 점에서 비슷합니다. 그러나 이와 같은 데이터 스트리밍 프로젝트를 구축하려는 경우 MQTT는 부족할 것입니다. 스트림 처리, 데이터 지속성 및 다운스트림 통합을 처리하려면 Kafka와 같은 또 다른 기술이 필요합니다. 결론은 MQTT와 Kafka가 정말 잘 작동한다는 것입니다. Kafka 외에도 내 프로젝트의 IoT 구성 요소에 MQTT를 사용할 수 있었습니다. 대신, 저는 Raspberry Pi에서 Python 제작자와 직접 작업하기로 결정했습니다. 즉, IoT에서 영감을 받은 프로젝트에 MQTT 및 Kafka를 사용하려는 경우 MQTT Kafka 소스 커넥터를 사용하여 MQTT 데이터를 Kafka로 가져올 수 있으므로 안심하세요.

데이터를 통한 제적

데이터를 실행에 옮기기 전에 Kafka 주제에 대한 메시지를 어떻게 구성할지 결정하기 위해 한 걸음 물러났습니다. 특히 이와 같은 해킹 프로젝트의 경우 걱정 없이 Kafka 주제에 데이터를 보내기 시작하는 것이 쉽습니다. 하지만 주제 전반에 걸쳐 데이터를 어떻게 구성할지, 어떤 키를 사용할지, 데이터가 어떻게 구성되는지 아는 것이 중요합니다. 필드에 입력합니다.


그럼 주제부터 시작하겠습니다. 그것들은 어떻게 보일까요? 센서에는 수분과 온도를 포착하는 기능이 있었습니다. 이러한 판독값을 단일 주제에 기록해야 합니까, 아니면 여러 주제에 기록해야 합니까? 수분과 온도 판독값이 동시에 식물의 센서에서 캡처되었으므로 동일한 Kafka 메시지에 함께 저장했습니다. 이 두 가지 정보가 함께 이 프로젝트의 목적을 위한 식물 읽기로 구성되었습니다. 그것은 모두 같은 독서 주제에 속할 것입니다.


센서 데이터 외에도 센서가 모니터링하는 식물 유형, 온도 및 습도 경계 등 관엽 식물 메타데이터를 저장할 주제가 필요했습니다. 이 정보는 데이터 처리 단계에서 판독값이 경고를 트리거해야 하는 시기를 결정하는 데 사용됩니다.


나는 houseplants-readingshouseplants-metadata 두 가지 주제를 만들었습니다. 몇 개의 파티션을 사용해야 합니까? 두 주제 모두에 대해 글을 쓰는 시점에서 Confluent Cloud의 기본 파티션 수는 6개를 사용하기로 결정했습니다. 그 번호가 맞나요? 글쎄요, 그렇습니다. 이 경우 제가 다루는 데이터의 양이 적기 때문에 주제당 6개의 파티션이 무리일 수 있지만, 나중에 이 프로젝트를 더 많은 플랜트로 확장할 경우에는 6개의 파티션을 갖는 것이 좋습니다. .


파티션 외에도 주목해야 할 또 다른 중요한 구성 매개변수는 관엽 식물 주제에서 활성화한 로그 압축입니다. 'readings' 이벤트 스트림과 달리 'metadata' 주제에는 참조 데이터 또는 메타데이터가 포함됩니다. 이를 압축된 주제에 보관함으로써 데이터가 만료되지 않도록 보장하고 주어진 키(기억한다면 키는 각 화초의 고유 식별자임)에 대해 마지막으로 알려진 값에 항상 액세스할 수 있습니다.

위의 내용을 바탕으로 판독값과 관엽식물 메타데이터(가독성을 위해 여기에서는 단축됨) 모두에 대해 두 개의 Avro 스키마를 작성했습니다.

읽기 스키마

 { "doc": "Houseplant reading taken from sensors.", "fields": [ {"name": "plant_id", "type": "int"}, {"name": "timestamp", "logicalType": "timestamp-millis", "type": "long"}, {"name": "moisture", "type": "float"}, {"name": "temperature", "type": "float"} ], "name": "reading", "namespace": "com.houseplants", "type": "record" }

관엽식물 메타데이터 스키마

 { "doc": "Houseplant metadata.", "fields": [ {"name": "plant_id", "type": "int"}, {"name": "scientific_name", "type": "string"}, {"name": "common_name", "type": "string"}, {"name": "given_name", "type": "string"}, {"name": "temperature_threshold_low", "type": "float"}, {"name": "temperature_threshold_high", "type": "float"}, {"name": "moisture_threshold_low", "type": "float"}, {"name": "moisture_threshold_high", "type": "float"} ], "name": "houseplant", "namespace": "com.houseplants", "type": "record" }


이전에 Kafka를 사용해 본 적이 있다면 주제를 파악하고 메시지 값이 어떤 모습인지 아는 것이 첫 번째 단계에 불과하다는 것을 알고 계실 것입니다. 각 메시지의 핵심이 무엇인지 아는 것도 마찬가지로 중요합니다. 판독값과 메타데이터 모두에 대해 Kafka에서 키의 기초를 형성해야 하는 엔터티 인스턴스이기 때문에 각 데이터 세트의 인스턴스가 무엇인지 스스로에게 물었습니다. 식물별로 판독값이 수집되고 메타데이터가 식물별로 할당되므로 두 데이터 세트의 엔터티 인스턴스는 개별 식물이었습니다. 나는 두 주제의 논리적 핵심이 식물을 기반으로 하기로 결정했습니다. 각 식물에 숫자 ID를 할당하고 해당 숫자를 판독 메시지와 메타데이터 메시지 모두의 키로 사용합니다.


그래서 내가 이 일을 올바른 방향으로 진행하고 있다는 것을 알았기 때문에 약간 우쭐한 만족감으로 내 센서의 데이터를 Kafka 주제로 스트리밍하는 데 주의를 돌릴 수 있었습니다.

메시지 육성

저는 센서에서 Kafka로 데이터를 보내기 시작하고 싶었습니다. 첫 번째 단계는 Raspberry Pi에 confluent-kafka Python 라이브러리를 설치하는 것이었습니다. 거기에서 센서의 판독값을 캡처하고 Kafka에서 데이터를 생성하는 Python 스크립트를 작성했습니다.


그렇게 쉽다고 말하면 믿으시겠습니까? 단 몇 줄의 코드만으로 내 센서 데이터가 다운스트림 분석에 사용하기 위해 Kafka 주제에 기록되고 유지되었습니다. 아직도 그 생각만 하면 좀 현기증이 나네요.


Confluent Cloud UI에서 볼 수 있듯이 Kafka 주제를 통해 흐르는 관엽 식물 판독 이벤트입니다.



Kafka의 센서 판독을 통해 이제 모든 종류의 다운스트림 분석을 수행하려면 관엽 식물 메타데이터가 필요했습니다. 일반적인 데이터 파이프라인에서 이러한 종류의 데이터는 관계형 데이터베이스나 다른 데이터 저장소에 상주하며 Kafka Connect 및 이에 사용 가능한 많은 커넥터를 사용하여 수집됩니다.


저는 자체 외부 데이터베이스를 가동하는 대신 Kafka를 메타데이터의 영구 저장 계층으로 사용하기로 결정했습니다. 소수의 식물에 대한 메타데이터를 사용하여 다른 Python 스크립트를 사용하여 Kafka에 직접 데이터를 수동으로 썼습니다.

문제의 근원

내 데이터는 Kafka에 있습니다. 이제 정말로 손을 더럽힐 시간이다. 하지만 먼저 제가 이 프로젝트를 통해 달성하고 싶었던 것이 무엇인지 다시 살펴보겠습니다. 전반적인 목표는 식물에 물을 주어야 함을 나타내는 수분 수치가 낮을 때 경고를 보내는 것입니다. 스트림 처리를 사용하여 메타데이터로 판독 데이터를 강화한 다음 새로운 데이터 스트림을 계산하여 알림을 실행할 수 있습니다.


최소한의 코딩으로 데이터를 처리할 수 있도록 이 파이프라인의 데이터 처리 단계에서 ksqlDB를 사용하기로 결정했습니다. Confluent Cloud와 함께 ksqlDB는 설정 및 사용이 쉽습니다. 간단히 애플리케이션 컨텍스트를 프로비저닝하고 간단한 SQL을 작성하여 데이터 로드 및 처리를 시작하면 됩니다.

입력 데이터 정의

데이터 처리를 시작하기 전에 작업에 사용할 수 있도록 ksqlDB 애플리케이션 내에서 데이터 세트를 선언해야 했습니다. 그러기 위해서는 먼저 내 데이터가 TABLE 또는 STREAM 중 어느 것으로 표시되어야 하는지 결정한 다음 CREATE 문을 사용하여 기존 Kafka 주제를 가리켜야 했습니다.


관엽 식물 판독 데이터는 ksqlDB에서 STREAM 으로 표시됩니다. 이는 기본적으로 Kafka 주제(변경할 수 없는 일련의 추가 전용 이벤트)와 정확히 동일하지만 스키마도 포함합니다. 오히려 편리하게도 이전에 스키마를 이미 설계하고 선언했으며 ksqlDB는 스키마 레지스트리에서 직접 이를 가져올 수 있습니다.


 CREATE STREAM houseplant_readings ( id STRING KEY ) WITH ( kafka_topic='houseplant-readings', format='AVRO', partitions=4 );


Kafka 주제에 대해 생성된 스트림을 사용하면 표준 SQL을 사용하여 쿼리하고 필터링하여 다음과 같은 간단한 문을 사용하여 데이터를 탐색할 수 있습니다.


 SELECT plant_id, moisture FROM HOUSEPLANT_READINGS EMIT CHANGES; 


위 쿼리의 출력은 식물 수분 측정값 샘플을 보여줍니다.


관엽식물 메타데이터는 조금 더 고려해야 합니다. 읽기 데이터와 마찬가지로 Kafka 주제로 저장되지만 논리적으로는 다른 유형의 데이터, 즉 상태입니다. 각 식물에는 이름, 위치 등이 있습니다. 이를 압축된 Kafka 주제에 저장하고 ksqlDB에 TABLE 로 표시합니다. 일반 RDBMS와 마찬가지로 테이블은 특정 키의 현재 상태를 알려줍니다. ksqlDB가 스키마 레지스트리에서 스키마 자체를 선택하는 동안 테이블의 기본 키를 나타내는 필드를 명시적으로 선언해야 합니다.


 CREATE TABLE houseplant_metadata ( id INTEGER PRIMARY KEY ) WITH ( kafka_topic='houseplant-metadata', format='AVRO', partitions=4 );

데이터를 풍부하게 하세요

ksqlDB 애플리케이션에 두 데이터세트를 모두 등록한 후 다음 단계는 houseplants 테이블에 포함된 메타데이터로 houseplant_readings 강화하는 것입니다. 그러면 관련 식물에 대한 읽기 및 메타데이터가 모두 포함된 새 스트림(Kafka 주제로 뒷받침됨)이 생성됩니다.


강화 쿼리는 다음과 같습니다.


 CREATE STREAM houseplant_readings_enriched WITH ( kafka_topic='houseplant-readings-enriched', format='AVRO', partitions=4 ) AS SELECT r.id AS plant_id, r.timestamp AS ts, r.moisture AS moisture, r.temperature AS temperature, h.scientific_name AS scientific_name, h.common_name AS common_name, h.given_name AS given_name, h.temperature_low AS temperature_low, h.temperature_high AS temperature_high, h.moisture_low AS moisture_low, h.moisture_high AS moisture_high FROM houseplant_readings AS r LEFT OUTER JOIN houseplants AS h ON houseplant_readings.id = houseplants.id PARTITION BY r.id EMIT CHANGES;


해당 쿼리의 출력은 다음과 같습니다.

플랜트 판독값과 플랜트 메타데이터를 결합한 쿼리의 결과로 출력되는 예제 이벤트입니다.


이벤트 스트림에 대한 경고 만들기

이 기사의 시작 부분을 다시 생각해 보면 이 모든 것의 요점은 식물에 물을 주어야 할 시기를 알려주는 것이었음을 기억할 것입니다. 우리는 일련의 수분(및 온도) 판독값을 얻었고, 각 식물의 수분 수준이 물을 주어야 함을 나타낼 수 있는 임계값을 알려주는 표를 얻었습니다. 하지만 수분 부족 경고를 언제 보낼지 어떻게 결정합니까? 그리고 얼마나 자주 보내나요?


이러한 질문에 답하려고 노력하면서 저는 센서와 센서가 생성하는 데이터에 대해 몇 가지 사실을 발견했습니다. 우선 5초 간격으로 데이터를 캡처하고 있습니다. 수분 수치가 낮을 때마다 알림을 보내면 휴대전화에 알림이 넘쳐날 것입니다. 그건 좋지 않습니다. 최대 한 시간에 한 번씩 알림을 받고 싶습니다. 데이터를 보면서 두 번째로 깨달은 것은 센서가 완벽하지 않다는 것입니다. 시간이 지남에 따라 식물의 수분 수준이 감소하는 일반적인 추세에도 불구하고 정기적으로 잘못된 낮음 또는 잘못된 높은 판독값이 표시되었습니다.


이 두 가지 관찰을 결합하여 저는 주어진 1시간 내에 20분 동안 낮은 수분 수치를 보면 경고를 보내는 것이 충분할 것이라고 결정했습니다. 5초마다 1회 판독하면 시간당 720회 판독이 가능하며… 여기서 약간의 계산을 수행하면 경고를 보내기 전에 1시간 동안 240개의 낮은 판독값을 확인해야 한다는 의미입니다.

이제 우리가 할 일은 1시간마다 식물당 최대 하나의 이벤트를 포함하는 새 스트림을 만드는 것입니다. 나는 다음 쿼리를 작성하여 이를 달성했습니다.


 CREATE TABLE houseplant_low_readings WITH ( kafka_topic='houseplant-low-readings', format='AVRO', partitions=4 ) AS SELECT plant_id, given_name, common_name, scientific_name, CONCAT(given_name, ' the ', common_name, ' (', scientific_name, ') is looking pretty dry...') AS message, COUNT(*) AS low_reading_count FROM houseplant_readings_enriched WINDOW TUMBLING (SIZE 1 HOURS, GRACE PERIOD 30 MINUTES) WHERE moisture < moisture_low GROUP BY plant_id, given_name, common_name, scientific_name HAVING COUNT(*) > 240 EMIT FINAL;


가장 먼저, 윈도우 집계가 눈에 띌 것입니다. 이 쿼리는 겹치지 않는 1시간 기간 동안 작동하므로 지정된 기간 내에서 식물 ID당 데이터를 집계할 수 있습니다. 꽤 직설적 인.


저는 수분 판독값이 해당 식물의 낮은 수분 임계값보다 작은 강화된 판독값 스트림의 행을 구체적으로 필터링하고 계산합니다. 해당 개수가 240개 이상이면 경고의 기초가 되는 결과를 출력하겠습니다.


그런데 왜 이 쿼리의 결과가 테이블에 있는지 궁금할 것입니다. 아시다시피 스트림은 데이터 엔터티의 어느 정도 완전한 기록을 나타내는 반면 테이블은 특정 키에 대한 최신 값을 반영합니다. 이 쿼리는 실제로 내부적으로는 상태 저장 스트리밍 애플리케이션이라는 점을 기억하는 것이 중요합니다. 기본 강화 데이터 스트림을 통해 메시지가 흐르면서 특정 메시지가 필터 요구 사항을 충족하는 경우 1시간 이내에 해당 식물 ID에 대한 낮은 판독값 수를 늘리고 상태 내에서 이를 추적합니다. 그러나 이 쿼리에서 제가 정말로 관심을 갖는 것은 집계의 최종 결과입니다. 즉, 특정 키에 대해 낮은 판독값의 수가 240을 초과하는지 여부입니다. 나는 테이블을 원한다.


여담: 해당 문의 마지막 줄은 `EMIT FINAL`이라는 것을 알 수 있습니다. 이 문구는 스트리밍 애플리케이션을 통해 새 행이 흐를 때마다 잠재적으로 결과를 출력하는 대신 결과가 내보내지기 전에 창이 닫힐 때까지 기다릴 것임을 의미합니다.


이 쿼리의 결과는 특정 1시간 동안 특정 식물 ID에 대해 내가 원하는 대로 최대 하나의 경고 메시지를 출력한다는 것입니다.

분기

이 시점에서 식물의 수분 수준이 적절하고 지속적으로 낮을 때 메시지를 포함하는 ksqlDB로 채워진 Kafka 주제가 있었습니다. 하지만 실제로 Kafka에서 이 데이터를 어떻게 가져오나요? 나에게 가장 편리한 것은 이 정보를 내 휴대폰으로 직접 받는 것입니다.


나는 여기서 바퀴를 재발명할 생각이 없었기 때문에 Telegram 봇을 사용하여 Kafka 주제의 메시지를 읽고 전화로 경고를 보내는 방법을 설명하는 이 블로그 게시물을 활용했습니다. 블로그에 설명된 프로세스에 따라 Telegram 봇을 만들고 내 휴대폰에서 해당 봇과 대화를 시작하여 해당 대화의 고유 ID와 내 봇의 API 키를 기록해 두었습니다. 해당 정보를 사용하여 Telegram 채팅 API를 사용하여 내 봇에서 내 휴대폰으로 메시지를 보낼 수 있었습니다.


좋습니다. 그런데 Kafka에서 Telegram 봇으로 알림을 어떻게 받을 수 있나요? Kafka 주제의 알림을 사용하고 Telegram 채팅 API를 통해 각 메시지를 수동으로 보내는 맞춤형 소비자를 작성하여 메시지 전송을 호출할 수 있습니다. 하지만 그것은 추가 작업처럼 들립니다. 대신, 저는 완전히 관리되는 HTTP 싱크 커넥터를 사용하여 동일한 작업을 수행하기로 결정했습니다. 단, 추가 코드를 직접 작성하지 않았습니다.


몇 분 안에 내 텔레그램 봇이 작동할 준비가 되었고, 나와 봇 사이에 비공개 채팅이 열렸습니다. 채팅 ID를 사용하면 이제 Confluent Cloud의 완전 관리형 HTTP 싱크 커넥터를 사용하여 내 휴대폰으로 직접 메시지를 보낼 수 있습니다.


전체 구성은 다음과 같았습니다.

 { "name": "HttpSinkConnector_Houseplants_Telegram_Bot", "config": { "topics": "houseplant-low-readings", "input.data.format": "AVRO", "connector.class": "HttpSink", "name": "HttpSinkConnector_Houseplants_Telegram_Bot", "kafka.auth.mode": "KAFKA_API_KEY", "http.api.url": "https://api.telegram.org/**********/sendMessage", "request.method": "POST", "headers": "Content-Type: application/json", "request.body.format": "string", "batch.max.size": "1", "batch.prefix": "{\"chat_id\":\"********\",", "batch.suffix": "}", "regex.patterns": ".*MESSAGE=(.*),LOW_READING_COUNT=(.*)}.*", "regex.replacements": "\"text\":\"$1\"", "regex.separator": "~", "tasks.max": "1" } } 



Http Sink Connector의 Confluent Cloud에 대한 상위 수준 요약 대시보드입니다.



커넥터를 출시한 지 며칠 후, 식물에 물을 주어야 한다는 매우 유용한 메시지를 받았습니다. 성공!


내 휴대폰에 Monstera adansonii에 물을 주어야 한다는 텔레그램 알림이 떴습니다.


새 잎사귀를 뒤집는다

이 프로젝트의 초기 단계를 완료한 지 약 1년이 지났습니다. 그 동안 제가 모니터링하고 있는 식물들이 모두 행복하고 건강하다는 소식을 전해드리게 되어 기쁩니다! 더 이상 확인하는 데 추가 시간을 소비할 필요가 없으며 스트리밍 데이터 파이프라인에서 생성된 알림에만 의존할 수 있습니다. 얼마나 멋지나요?


이 시스템으로 일부 관엽 식물을 모니터링하고 있습니다.



이 프로젝트를 구축하는 과정이 흥미를 끌었다면 자신만의 스트리밍 데이터 파이프라인을 시작하는 것이 좋습니다. 자신의 삶에서 실시간 파이프라인을 구축하고 통합하는 데 도전하고 싶은 노련한 Kafka 사용자이든 Kafka를 완전히 처음 접하는 사람이든 관계없이 이러한 종류의 프로젝트가 여러분에게 적합하다는 것을 말씀드리고자 왔습니다.



여기에도 게시되었습니다 .