11
25

챗봇 앱을 공모전을 준비하면서 개발하고 있다. 응답 전체가 오기까지 기다리면 응답 대기시간이 너무 길어져서 스트리밍으로 받아야했다. 아무 생각없이 레트로핏을 쓰면서 `text/event-stream`으로 사용했는데, 청크 단위로 스트리밍되는게 아니라 덩어리로 날아오는 걸 보고 골머리를 앓았다.

    @POST("stream")
    @Headers("Accept: text/event-stream")
    suspend fun fetchStreamResponse(
        @Body input: CommonRequest,
    ): ResponseBody

레트로핏 객체를 어떻게 만들었는지 보여주겠다.

@Module
@InstallIn(SingletonComponent::class)
object NetworkModule {
    @Provides
    @Singleton
    fun provideGson(): Gson = GsonBuilder().setLenient().create()

    @Singleton
    @Provides
    fun provideRetrofit(
        okHttpClient: OkHttpClient,
        gson: Gson,
    ): Retrofit =
        Retrofit
            .Builder()
            .addConverterFactory(ScalarsConverterFactory.create())
            .addConverterFactory(GsonConverterFactory.create(gson))
            .baseUrl(BuildConfig.BASE_URL)
            .client(okHttpClient)
            .build()

    @Singleton
    @Provides
    fun provideOkHttpClient(logger: HttpLoggingInterceptor) =
        OkHttpClient.Builder().run {
            addInterceptor(logger)
            connectTimeout(NETWORK_TIMEOUT, TimeUnit.SECONDS)
            readTimeout(NETWORK_TIMEOUT, TimeUnit.SECONDS)
            writeTimeout(NETWORK_TIMEOUT, TimeUnit.SECONDS)
            build()
        }

    @Singleton
    @Provides
    fun provideLoggingInterceptor(): HttpLoggingInterceptor {
        val loggingInterceptor =
            HttpLoggingInterceptor {
                when {
                    !it.isJsonArray() && !it.isJsonObject() ->
                        Timber.tag("RETROFIT").d("CONNECTION INFO: $it")

                    else ->
                        try {
                            Timber.tag("RETROFIT").d(
                                GsonBuilder().setPrettyPrinting().create().toJson(
                                    JsonParser().parse(it),
                                ),
                            )
                        } catch (m: JsonSyntaxException) {
                            Timber.tag("RETROFIT").d(it)
                        }
                }
            }
        loggingInterceptor.level = HttpLoggingInterceptor.Level.BODY
        return loggingInterceptor
    }

    const val NETWORK_TIMEOUT = 20L
}

LoggingInterceptor를 OkhttpClient에 넣고, 그걸 Retrofit에서 사용하게하는 코드다.

처음에는 `OkHttpClient()` 형태로 테스트해보며 잘 스트리밍 되는 걸 확인해서, 모듈에 있는 객체를 주입받아 쓰려고했다. 그랬더니 바로 덩어리로 바뀌어서 오는 걸 볼 수 있었다.

 

왜 그럴까? 

# LoggingInterceptor 내부

내부 코드를 보다보니 제일 문제가 되는 부분을 알 수 있었다.

  if (!logBody || !response.promisesBody()) {
    logger.log("<-- END HTTP")
  } else if (bodyHasUnknownEncoding(response.headers)) {
    logger.log("<-- END HTTP (encoded body omitted)")
  } else {
    val source = responseBody.source()
    source.request(Long.MAX_VALUE) // Buffer the entire body.
    var buffer = source.buffer

응답이 null이 아닌 경우, 응답 Body 전체를 버퍼에 로드시키는 과정이 있다... 스트리밍은 BufferedSource에서 바로 끌어다가 와야되는데 여기서 미리 가로채서 로드하는 바람에 응답이 전체 덩어리로 날아오게 되는 것이었다.

 

전체응답이 올 때 까지, 정확히 END BODY가 올 때까지 버퍼에 올리고 있기 때문에 스트리밍을 아예 안해버리는 것이다.

sequenceDiagram
    participant Client
    participant OkHttpClient
    participant LoggingInterceptor
    participant Network
    participant Server

    Note over Client,Server: 기존 방식 (LoggingInterceptor 없이)
    Client->>OkHttpClient: Request Streaming Data
    OkHttpClient->>Network: Forward Request
    Network->>Server: Send Request
    rect rgb(240, 240, 240)
        loop Streaming Response
            Server-->>Network: Chunk 1 전송
            Network-->>OkHttpClient: Chunk 1 포워딩
            OkHttpClient-->>Client: Chunk 1 처리
            Server-->>Network: Chunk 2 전송
            Network-->>OkHttpClient: Chunk 2 포워딩
            OkHttpClient-->>Client: Chunk 2 처리
        end
    end

    Note over Client,Server: LoggingInterceptor
    Client->>OkHttpClient: Request Streaming Data
    OkHttpClient->>LoggingInterceptor: Forward Request
    LoggingInterceptor->>Network: Forward Request
    Network->>Server: Send Request
    rect rgb(240, 240, 240)
        loop Buffering Response
            Server-->>Network: Chunk 1 전송
            Network-->>LoggingInterceptor: Chunk 1 수신
            LoggingInterceptor->>LoggingInterceptor: Chunk 버퍼 처리
            Server-->>Network: Chunk 2 전송
            Network-->>LoggingInterceptor: Chunk 2 수신
            LoggingInterceptor->>LoggingInterceptor: Chunk 버퍼 처리
        end
    end
    LoggingInterceptor->>OkHttpClient: Complete Response 반환
    OkHttpClient->>Client: Complete Response 처리

그림으로 그려봤다.

 

그래서 스트리밍을 정상적으로 하려면 인터셉터를 안쓰거나, 커스텀 인터셉터를 만들어서 스트리밍을 위한 처리를 따로 해줘야한다. 나는 스트림 전용 OkHttpClient를 따로 만들어서 Qualifier 선언해 해결했는데, 스트리밍 커스텀 인텁셉터는 아래와 같이 만들면 된다.

# Streaming 커스텀 인터셉터

방식은 단순하다. Interceptor를 상속받아서 구현할건데, Content-Type에 stream이 있으면 버퍼링을 수행하지않으면 끝난다.

if (!response.headers["Content-Type"].orEmpty().contains("stream")) {
    // 일반 응답 처리하는 곳
} else {
    println("Streaming response body - not logging content")
}

Intercepter 자체가 버퍼링을 수행하는 건 아니라서 잘 처리해주면 끝난다.

override fun intercept(chain: Interceptor.Chain): Response {
    val response = chain.proceed(chain.request())
        
    return response.newBuilder()
        .body(ResponseBody.create(
            response.body?.contentType(),
            response.body?.contentLength() ?: -1,
            object : ForwardingSource(response.body?.source()!!) {
                override fun read(sink: Buffer, byteCount: Long): Long {
                    val bytesRead = super.read(sink, byteCount)
                    // 청크 단위로 로깅
                    if (bytesRead > 0) {
                        println("Received chunk: ${bytesRead} bytes")
                    }
                    return bytesRead
                }
            }
        ))
        .build()
}

 

도움이 됐다면 댓글이나 공감 버튼 한 번씩 누르고 가주세요!

 

반응형
COMMENT