카테고리 없음

스프링 워커풀

have a nice day :D 2025. 11. 21. 08:27
반응형

스프링 워커풀 설정
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

@Configuration
class ExportTaskExecutorConfig {

    @Bean
    fun exportTaskExecutor(): ThreadPoolTaskExecutor {
        return ThreadPoolTaskExecutor().apply {
            threadNamePrefix = "export-worker-"

            // 👉 동시에 몇 개까지 무거운 작업을 돌릴지
            corePoolSize = 2          // 동시에 처리할 작업 개수
            maxPoolSize = 2           // 그냥 core랑 같게 고정

            // 👉 대기열에 쌓을 최대 개수 (넘어가면 Rejected)
            queueCapacity = 20

            // 애플리케이션 내려갈 때 작업 다 끝날 때까지 기다릴지 여부
            setWaitForTasksToCompleteOnShutdown(true)

            // Rejected 될 때 기본은 예외 터뜨림 (원하면 커스터마이징 가능)
            // setRejectedExecutionHandler { r, executor -> ... }

            initialize()
        }
    }
}





워커풀 사용 예
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.stereotype.Service

@Service
class ExportWorkerService(
    @Qualifier("exportTaskExecutor")
    private val taskExecutor: ThreadPoolTaskExecutor
) {

    fun submitJob(jobId: Long) {
        // 여기서 바로 무거운 작업 하지 말고 워커 풀에 던짐
        taskExecutor.execute {
            runJob(jobId)
        }
    }

    private fun runJob(jobId: Long) {
        // 👉 여기 안에서 DB 조회 → CSV → MinIO 업로드 등
        // 실제 무거운 작업 수행
        println("start job: $jobId, thread=${Thread.currentThread().name}")

        // 예시)
        // val task = downloadTaskRepository.findById(jobId) ...
        // 상태 RUNNING 변경
        // DB 스트리밍 조회 + MinIO로 CSV 스트리밍 업로드
        // 상태 DONE / FAIL 업데이트
    }
}



kafka
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Service

@Service
class ExportRequestConsumer(
    private val exportWorkerService: ExportWorkerService
) {

    @KafkaListener(
        topics = ["export-request-topic"],
        groupId = "export-consumer-group"
    )
    fun onMessage(message: ExportRequestMessage) {
        // ❌ 여기서 무거운 작업 X
        // ✅ 워커 풀에만 던짐
        exportWorkerService.submitJob(message.jobId)
    }
}

반응형