Use Case (요구사항정의)
- 모니터링 항목의 데이터들을 수집하기 위해 파이썬 프로그램을 만들어야 한다.
- 이를 위해 나는 '마지막 실행시각', '실행주기', '모니터링 결과값' 등을 저장하는 history 테이블을 만들었다.
- 모니터링 프로그램이 실행되면 history 테이블에 저장된 "설정주기"를 참조하여 모니터링 데이터들을 하나씩 수집해야 한다.
- 모니터링 실행 결과 데이터는 "결과값"과 "마지막 실행시각"을 저장하고 다음 모니터링 데이터를 수집할 항목을 실행한다. 만약 다음 모니터링할 항목이 없다면 가장 마지막에 실행된 항목의 "마지막실행시각"과 "실행주기"를 참조하여 자동으로 그만큼 대기할 수 있다.
- 대기시간이 완료되면 그 다음을 실행한다.
- 모니터링 항목별 데이터를 수집할때 실행 지연시간이 있을 수 있으며, 이런 부분의 영향을 받지 않도록 비동기적으로 실행될 수 있도록 하는 방안이나 더 좋은 방법이 있으면 그 방법에 따라 구현해야한다.
Flow chart
테이블 정보
CREATE TABLE history (
item VARCHAR(255) PRIMARY KEY,
last_execution_time TIMESTAMP NOT NULL,
execution_interval INT NOT NULL,
result FLOAT
);
목업데이터 생성기
import mysql.connector
from datetime import datetime
import random
config = {
'user': 'root',
'password': '비번',
'host': '127.0.0.1',
'database': 'monitoring',
'raise_on_warnings': True
}
conn = mysql.connector.connect(**config)
cursor = conn.cursor()
# 20개의 테스트 데이터 생성
for i in range(20):
item = f"item{i}"
last_execution_time = datetime.now()
execution_interval = 10
cursor.execute("INSERT INTO history (item, last_execution_time, execution_interval) VALUES (%s, %s, %s)",
(item, last_execution_time, execution_interval))
conn.commit()
스케줄러 실행 PoC 프로그램
import asyncio
import mysql.connector
from datetime import datetime, timedelta
import random
# 이 함수는 모니터링 항목을 실행하고 결과를 반환하는 함수를 대신합니다.
async def monitoring_task(item):
random_num = random.random()
await asyncio.sleep(random_num*3) # 실제 모니터링 작업을 수행한다고 가정.
return 100.0+random_num*10 # 모니터링 작업 결과에 대한 임의의 값.
async def main():
config = {
'user': 'root',
'password': '비번',
'host': '127.0.0.1',
'database': 'monitoring',
'raise_on_warnings': True
}
conn = mysql.connector.connect(**config)
cursor = conn.cursor()
while True:
cursor.execute("SELECT * FROM history ORDER BY last_execution_time")
rows = cursor.fetchall()
for row in rows:
item, last_execution_time, execution_interval, _ = row
next_execution_time = last_execution_time.timestamp() + execution_interval
current_time = datetime.now().timestamp()
if next_execution_time <= current_time:
# 이 항목을 모니터링해야 합니다.
result = await monitoring_task(row)
print(f"실행아이템 {item} : {result}, 주기 : {execution_interval}")
try:
cursor.execute("UPDATE history SET last_execution_time = FROM_UNIXTIME(%s), result = %s WHERE item = %s",
(current_time, result, row[0]))
conn.commit()
except Exception as e:
print(e)
else:
# 다음 실행까지 대기
await asyncio.sleep(next_execution_time - current_time)
if __name__ == "__main__":
asyncio.run(main())
- 실행환경은 파이썬 3.10
- 테스트 방법은
- 그냥 실행
- 중간에 주기를 바꿔서 실행
추가 개선의 의지
- 상기 코드에는 큐관리가 되어 있지 않다.
- 만약 큐관리를 추가한다면 더 나은 프로세스로 구현이 가능할 것으로 보인다.
고려할만한 전략
- Job grouping : monitoring pre-run을 통한 사전 process 정보 획득 후, 큰 틀의 느낌으로 실행 주기별 모니터링 항목 그룹화
- Error handling : 실패한 job의 경우, 해당 job의 group정보를 기반으로 재 스케줄링.
- Queue Management : 스케줄링 구현에 python이 이용된다면, ‘asyncio.Queue’를 사용해 queue를 관리. [ asyncio.Queue외의 다른 queue관리 fcn : RabbitMQ, Kafka, Redis, etc… ]