ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 429에러 해결하기
    Projects/나만의 무기 2024. 7. 29. 23:44
    발표날 시연을 못하게한 에러를 해결하는 글을 작성해보도록 합시다.

     

    시작하며,

    시연을 못한채 발표를 끝내서 아쉬웠었다. 그래서 이 후회되는 경험을 성장의 경험으로 바꾸고 싶었고. 다음 프로젝트가 있다면 반드시 시연을 해내는 사람이 되기 위해 팀원들과 발표 다음날 버그를 재현을 시도하고 해당 버그가 나도 서비스가 운영되도록 추가했다.

     


    원인 추적

    우리 미코의 서비스는 클라이언트의 짧은 음성녹음파일을 메인서버에 넘기고 메인서버는 nlp서버에 넘겨서 리턴제로라는 stt api 에 보내서 텍스트로 변환된 응답을 받아오는 기능이 있었다. 

     

    그런데 시연날 이 과정에서 에러가 나서 우리 서비스의 stt 기능이 동작을 못했었다.

    MIKO 서비스의 아키텍처

     

    stt 요청 과정

     

    nlp-server에서 'returnzero' 서비스에 `음성파일.wav` 를 보내서 stt 결과를 받아오는데 이 방식이 어떻게 구현되어 있나면

    while True:
        status_resp = requests.get(
            status_url,
            headers={'Authorization': 'bearer ' + access_token}
        )
        status_resp.raise_for_status()
        status_data = status_resp.json()
        print(f"Status Data: {status_data}")
    
        if status_data['status'] == 'completed':
            script = ' '.join([utterance['msg'] for utterance in status_data['results']['utterances']])
            return status_resp.status_code, script
        elif status_data['status'] == 'failed':
            return status_resp.status_code, "Transcription failed"
        else:
            time.sleep(1)

     

    1. 보낸 요청에 대해서 폴링으로 1초마다 전사 결과를 확인하는 요청을 보낸다.

    2. 전사 완료 'completed' 면 결과를 리턴한다.

    왜 이렇게 구현했어요? 리턴제로 기술 블로그에서 이렇게 하래요... 참고로 리턴제로 측에서 추천하는 폴링 주기는 5초 이지만 5초는 우리 서비스에 너무 느린반응이었고 테스트 한뒤 1초로 정했다.

     

     

    저기서 문제는 저 폴링 주기이다. 평소에는 1초, 심지어는 0.1 초도 문제없는데 간혹 리턴제로 측에서 429 에러 너무 잦은 요청이 뜬다.

    1초로 했을때는 하필 시연때 처음 떳다...docs를 안따른 자의 최후

     

    우리는 카페에 가서 당일 에러를 재현해보려고 이 동시에 많은 요청을 보내기도 폴링주기를 0.1 이하로도 내려봤는데 429가 안떴었다. 그래서 우리는 리턴제로 서버의 상태에 따라 5초 미만의 폴링주기는 429가 간혹 뜰 수도 있다고 판단했고 해당 내용은 문의를 보내 확인하지 못한 상태이다. 뭐, 어찌되었든 해당 429 에러가 났을 때의 처리를 해야함에는 변함이 없다. 


    리턴제로 docs

    https://developers.rtzr.ai/docs/stt-file/

    리턴제로의 stt 서비스 api 문서 일부 발췌
    TIP

    일반 STT API의 경우, 긴 음성 파일도 지원하기 위하여 Polling 방식으로 구현되어 있습니다. 전사 요청 API에서 응답받은 {TRANSCRIBE_ID}의 상태 값이 transcribing인 경우, 최종 상태(completed 또는 failed)가 될 때까지 주기적으로 조회하여 변환 결과를 확인할 수 있습니다. 권장하는 Polling 주기는 5초입니다. (Polling 주기가 너무 짧을 경우 HTTP Status 429로 요청 제한 초과 응답이 내려갈 수 있습니다.)

     

    HttpStatusCodeNotes

    HttpStatus Code Notes
    429 A0003 요청 제한 초과

     


    해결방법

    해결 방법은 간단하다 잠시 모든 요청을 중지하고 지연시간을 가지고다시 요청을 보내면된다.

    이 지연 시간은 권장 폴링 주기인 5초면 충분하지만 때로 5초 이상이 필요할때도 있어 5초 지연을 가지고 다시 요청했을때 다시 429를 받는다면 5초씩 지연시간이 증가되어 최대 1분 을 가지도록 했다.

     

    429 에러를 받는 다면 우선 메인서버에 429 코드 넘겨주기. >> 사용자에게 현재 stt 서비스 지연 알림을 보내기 위해서이다.

    stt 요청을 보내는 모든 worker 가 지연시간을 가진뒤 다시 요청을 재개하도록 하기.

    stt 서비스가 동시 처리 10개 까지 가능한데 성능이슈로 나는 worker 5개를 돌려 각각 한개의 요청을 보내게 처리하고 있었다. 모든 worker의 stt 요청이 잠시 쉬어야 한다.

    429 에러가 나 변환되지 못한 음성파일은 큐에 넣고 다시 요청 보내기

     

     

    1. 429 에러 용 변수 추가

    # 마지막 429 에러 시간을 추적하는 전역 변수와 딜레이 초기값
    last_429_time = 0
    current_delay = 5
    max_delay = 60

    `last_429_time`은 429 에러가 발생한 제일 최근 시간이다.

     

    2. 429에러 발생 시간을 업데이트하는 함수만들기,  429에러 발생 시점으로부터 지연시간이 흘렀는지 체크하는 함수 만들기

    5초의 지연을 가지고 다시  요청을 해도 또 429까 뜬다면 `지연시간` +5초를 한다. 최대 지연시간은 1분이다.
    def check_global_delay():
        global last_429_time, current_delay
        with lock:
            if last_429_time and time.time() - last_429_time < current_delay:
                print(f"waiting... {int(time.time() - last_429_time)}/{current_delay} sec")
                return True
        return False
    
    
    def update_global_delay():
        global last_429_time, current_delay
        with lock:
            last_429_time = time.time()
            current_delay = min(current_delay + 5, max_delay)

     

    3. stt 요청 하는 함수 첫 단계에 `check_global_delay` 현재 stt요청 지연을 해야하는지 조건문 추가하기, 429에러를 받으면 `update_global_delay` 실행하기

    @stt.route('/', methods=['POST'])
    def speech_to_text():
        global current_delay
        # 전역 지연 확인
        if check_global_delay(): # <<<<<<< 여기
            return jsonify({'text': 'Too many requests, please try again later.'}), 429
    
        if 'file' not in request.files:
            return jsonify({'text': 'Error: No file part'}), 400
    
        file = request.files['file']
    
        if file.filename == '':
            return jsonify({'text': 'Error: No selected file'}), 400
    
        if file:
            # 파일을 메모리에 임시 저장
            file_content = file.read()
            file.seek(0)
    
            request_id = str(uuid.uuid4())
            request_queue.put((request_id, file_content))
    
            while True:
                response = response_queue.get(request_id)
                if response:
                    status, response_data = response
                    if status == 429:
                        update_global_delay() # <<<<< 여기
                    else:
                        current_delay = 5
                        return jsonify({'text': response_data}), status
                time.sleep(0.1)

     

    4. worker 에 429 응답 받았을시 요청큐에 해당 음성파일 다시 넣기

    def worker():
        while True:
            # 전역 지연 확인
            if check_global_delay():
                time.sleep(5)
                continue
                
           ``` 생략 ```
    
                # 429 에러 발생 시 요청을 다시 큐에 넣음
                if status == 429:
                    update_global_delay()
                    request_queue.put((request_id, file_content))
                else:
                    response_queue.put(request_id, (status, response_data))
    
    		``` 생략 ```

     

    전체 코드

    import os
    from dotenv import load_dotenv
    from flask import Blueprint, request, jsonify
    import stt.returnzero_service as returnzero_service
    import threading
    import queue
    import uuid
    import time
    import torchaudio
    from util.converter import convert_to_wav
    from stt.audio_processing.pre_processing import process_audio
    
    stt = Blueprint('stt', __name__)
    
    # .env 파일 로드
    load_dotenv()
    
    # 마지막 429 에러 시간을 추적하는 전역 변수와 딜레이 초기값
    last_429_time = 0
    current_delay = 5
    max_delay = 60
    
    
    def check_global_delay():
        global last_429_time, current_delay
        with lock:
            if last_429_time and time.time() - last_429_time < current_delay:
                print(f"waiting... {int(time.time() - last_429_time)}/{current_delay} sec")
                return True
        return False
    
    
    def update_global_delay():
        global last_429_time, current_delay
        with lock:
            last_429_time = time.time()
            current_delay = min(current_delay + 5, max_delay)
    
    
    # 동시 요청 수를 관리하기 위한 큐와 변수
    request_queue = queue.Queue()
    MAX_CONCURRENT_REQUESTS = int(os.getenv("MAX_CONCURRENT_REQUESTS", 1))
    lock = threading.Lock()
    
    
    # 응답을 관리하기 위한 큐
    class ResponseQueue:
        def __init__(self):
            self.responses = {}
            self.lock = threading.Lock()
    
        def put(self, request_id, response):
            with self.lock:
                self.responses[request_id] = response
    
        def get(self, request_id):
            with self.lock:
                return self.responses.pop(request_id, None)
    
    
    response_queue = ResponseQueue()
    
    
    def worker():
        while True:
            # 전역 지연 확인
            if check_global_delay():
                time.sleep(5)
                continue
    
            request_id, file_content = request_queue.get()
            try:
                # 파일을 wav로 변환
                wav_stream = convert_to_wav(file_content)
    
                # 메모리에서 torchaudio로 로드
                wav_stream.seek(0)
                waveform, sample_rate = torchaudio.load(wav_stream, format='wav')
    
                processed_waveform = process_audio(waveform, sample_rate)
    
                # 리턴제로 서비스 요청
                status, response_data = returnzero_service.request_text(processed_waveform, max_retries=3)
    
                # 429 에러 발생 시 요청을 다시 큐에 넣음
                if status == 429:
                    update_global_delay()
                    request_queue.put((request_id, file_content))
                else:
                    response_queue.put(request_id, (status, response_data))
    
            except Exception as e:
                print(f"Error: {str(e)}")
                response_queue.put(request_id, (500, str(e)))
            finally:
                request_queue.task_done()
    
    
    # 작업자 스레드 생성 및 시작
    for _ in range(MAX_CONCURRENT_REQUESTS):
        threading.Thread(target=worker, daemon=True).start()
    
    
    @stt.route('/', methods=['POST'])
    def speech_to_text():
        global current_delay
        # 전역 지연 확인
        if check_global_delay():
            return jsonify({'text': 'Too many requests, please try again later.'}), 429
    
        if 'file' not in request.files:
            return jsonify({'text': 'Error: No file part'}), 400
    
        file = request.files['file']
    
        if file.filename == '':
            return jsonify({'text': 'Error: No selected file'}), 400
    
        if file:
            # 파일을 메모리에 임시 저장
            file_content = file.read()
            file.seek(0)
    
            request_id = str(uuid.uuid4())
            request_queue.put((request_id, file_content))
    
            while True:
                response = response_queue.get(request_id)
                if response:
                    status, response_data = response
                    if status == 429:
                        update_global_delay()
                    else:
                        current_delay = 5
                        return jsonify({'text': response_data}), status
                time.sleep(0.1)

     

    429에러 발생시 지연이 되는지 테스트해본 사진

     

    nlp 서버에선 이렇게 처리했고, 메인서버에서 429 상태 코드를 받으면 클라이언트에게 sokect.emit 으로 stt 지연중이라는 메시지가 가게했다. 해당 메시지를 받은 클라이언트에선 alert로 "현재 stt 서비스가 지연중입니다." 라는 알림이 뜨도록 했다.

     

    마치며,

    서비스에 영향을 줄수있는 외부 요인을 개발하면서 고민하고 잘 정리해두는게 중요하다고 느꼈고, 서비스 장애가 있을때 이 상황을 사용자에게 알려주는것이 있는 것과 없는것이 사용경험에 큰차이를 준다는 것을 직접 사용해보고 느꼈다. 시연을 못한 후회에서 뒤늦게 추가한 기능이지만 이렇게라도 해결을 해놓고 나니 후련하다.

    'Projects > 나만의 무기' 카테고리의 다른 글

    나만무 중간 발표 For 멘토님  (0) 2024.06.27
    초안 발표  (1) 2024.06.14
Designed by Tistory.