스프링 워커풀 설정
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
@Configuration
class ExportTaskExecutorConfig {
@Bean
fun exportTaskExecutor(): ThreadPoolTaskExecutor {
return ThreadPoolTaskExecutor().apply {
threadNamePrefix = "export-worker-"
// 👉 동시에 몇 개까지 무거운 작업을 돌릴지
corePoolSize = 2 // 동시에 처리할 작업 개수
maxPoolSize = 2 // 그냥 core랑 같게 고정
// 👉 대기열에 쌓을 최대 개수 (넘어가면 Rejected)
queueCapacity = 20
// 애플리케이션 내려갈 때 작업 다 끝날 때까지 기다릴지 여부
setWaitForTasksToCompleteOnShutdown(true)
// Rejected 될 때 기본은 예외 터뜨림 (원하면 커스터마이징 가능)
// setRejectedExecutionHandler { r, executor -> ... }
initialize()
}
}
}
워커풀 사용 예
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.stereotype.Service
@Service
class ExportWorkerService(
@Qualifier("exportTaskExecutor")
private val taskExecutor: ThreadPoolTaskExecutor
) {
fun submitJob(jobId: Long) {
// 여기서 바로 무거운 작업 하지 말고 워커 풀에 던짐
taskExecutor.execute {
runJob(jobId)
}
}
private fun runJob(jobId: Long) {
// 👉 여기 안에서 DB 조회 → CSV → MinIO 업로드 등
// 실제 무거운 작업 수행
println("start job: $jobId, thread=${Thread.currentThread().name}")
// 예시)
// val task = downloadTaskRepository.findById(jobId) ...
// 상태 RUNNING 변경
// DB 스트리밍 조회 + MinIO로 CSV 스트리밍 업로드
// 상태 DONE / FAIL 업데이트
}
}
kafka
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Service
@Service
class ExportRequestConsumer(
private val exportWorkerService: ExportWorkerService
) {
@KafkaListener(
topics = ["export-request-topic"],
groupId = "export-consumer-group"
)
fun onMessage(message: ExportRequestMessage) {
// ❌ 여기서 무거운 작업 X
// ✅ 워커 풀에만 던짐
exportWorkerService.submitJob(message.jobId)
}
}