-
엘라스틱서치 update by query 성능 최적화Elasticsearch 2022. 7. 21. 17:44
환경
현재 Elasticsearch의 환경은 다음과 같다.
- Coordinates node : 2대
- Data node : 14대
준비된 index의 조건은 다음과 같다.
- shard 개수 : 14개
- replica 개수 : 1개
- 총 문서 개수 : 약 6억 8천만개
쿼리 조건
약 6억 8천만개의 전체 문서를 대상으로 4,400만건의 문서를 update by query로 업데이트 하려고 한다.
이 검증을 통하여 위와 같은 처리가 운영 환경에서 사용 가능할 지 판단해 보는 것이 목표였다.
일단 문서를 업데이트 하는 데에 얼마나 시간이 소요되는지 확인하는 것이 중요했다.
첫번째 시도 : 일단 요청해보기
테스트를 위해서 다음과 같은 Kotlin API 함수를 정의했다.
@PostMapping("/api/test/update-by-query") fun testUpdateByQuery( @RequestParam indexName: String, @RequestParam field: String, @RequestParam value: String, @RequestParam script: String, ): String { val response = elasticsearchClient.updateByQuery( UpdateByQueryRequest.Builder().index(indexName) .query( Query.Builder() .term(TermQuery.Builder() .field(field) .value(value) .build() ) .build() ) .script( Script.Builder() .inline( InlineScript.Builder() .source(script) .build() ) .build() ) .conflicts(Conflicts.Proceed) .build()) val result = StringBuilder("") result.append("took : ${response.took()}") result.append("updated : ${response.updated()}") result.append("failures size : ${response.failures().size}") result.append("timedOut : ${response.timedOut()}") return result.toString() }
유연한 테스트를 위해서 API Query parameter로 query에 사용될 field와 value, 그리고 update script를 전달하도록 했다.
위 API를 통해서 Elasticsearch로 요청될 쿼리는 다음과 같다.
POST test_index/_update_by_query { "query": { "bool": { "filter": [ { "term": { "field": "value" } } ] } }, "script": { "source": "ctx._source.변경될필드='변경!';", "lang": "painless" } }
Application에서 사용하는 Elasticsearch Java Client의 API Timeout 설정을 10800000ms (3시간)으로 설정해두었다.
얼마나 걸릴 지 예상할 수 없었기 때문에, 시간이 더 소요된다면 작업이 무의미하다고 판단되는 수로 지정하였다.
val restClient = RestClient.builder(*httpHosts) .setRequestConfigCallback { requestConfig -> // 3시간 requestConfig.setConnectTimeout(10800000) .setSocketTimeout(10800000) } .build()
결과는 다음과 같다.
Timeout 이다.
두번째 시도 : 병렬 처리 적용하기
공식문서를 천천히 살펴보면 다음과 같은 부분이 있다.
update by query는 sliced scroll로 병렬 업데이트를 가능하게 한다고 한다.
sliced scroll이 무엇인지 확인해보자.
위의 내용을 보니, Sliced scroll은 client에서 여러개의 요청을 보내야 하고, 각 요청의 id와 최대 몇 개의 요청을 보낼 것인지에 대한 정보 (max)를 기입하도록 되어있다.
Elasticsearch의 index는 여러개의 샤드로 분산되어 저장된다.
샤드는 다시 루씬 검색 엔진으로 확장되기 때문에 그 자체로 검색엔진의 역할을 할 수 있다.
즉, Elasticsearch로 보내진 요청은 Coordinate 서버에 의해서 데이터를 보관하고 있는 샤드가 위치한 데이터 노드로 해당 요청을 분산하여 보내고, 요청을 받은 데이터 노드 내에 샤드는 해당 쿼리를 독자적으로 수행한다.
이렇게 여러 샤드에서 개별적으로 처리된 결과는 통합되어 최종 결과로 리턴하게 된다.
정리하자면, 여러 샤드가 병렬로 해당 쿼리를 수행하도록 만들면 빠르게 처리할 수 있을 것이다.
공식문서는 그 내용을 잘 설명하고 있다.
그대로 따라해 보도록 하자.
Scrolled slice 적용된 update by query API 만들기
앞서 만들었던 일반적인 update by query API 처럼, Scrolled slice가 적용된 update by query API 만들어서 간편하게 테스트를 진행했다.
@PostMapping("/api/test/scrolled-update-by-query") fun testScrolledUpdateByQuery( @RequestParam indexName: String, @RequestParam field: String, @RequestParam value: String, @RequestParam script: String, ): String { val result = StringBuilder("") val max = 14 val apiRequests = mutableListOf<CompletableFuture<Void>>() val threadPool = Executors.newFixedThreadPool(max) for (i in 1..max) { apiRequests.add( CompletableFuture.supplyAsync({ Logger.info { "API 요청 $i 번째" } productElasticsearchClient.updateByQuery( UpdateByQueryRequest.Builder().index(indexName) .query( Query.Builder() .term( TermQuery.Builder() .field(field) .value(value) .build() ) .build() ) .script( Script.Builder() .inline( InlineScript.Builder() .source(script) .build() ) .build() ) .conflicts(Conflicts.Proceed) .scroll( Time.Builder() .time("180m") .build() ) .slice( SlicedScroll.Builder() .id(i - 1) .max(max) .build() ) .scrollSize(10_000L) .build() ) }, threadPool) .thenAcceptAsync { response -> Logger.info { "------------------ API 응답 $i 번째" } result.append("$i 번째 요청 ====> ") result.append("took : ${response.took()} ") result.append("updated : ${response.updated()} ") result.append("failures size : ${response.failures().size} ") result.append("timedOut : ${response.timedOut()} ") result.append("\\n\\n\\n") } ) } CompletableFuture.allOf(*apiRequests.toTypedArray()).join() return result.toString() }
코드에서 주목해야 할 부분은 다음과 같다.
slice의 개수는 샤드의 개수와 동일하게 설정해야 최대 성능이 발휘된다.
sliced scroll을 통해 병렬처리를 할 때에는 slice의 개수 설정이 중요하다. 어떻게 설정하느냐에 따라 성능이 천차만별로 나타나기 때문이다.
쿼리 성능은 slices 개수가 인덱스의 샤드 개수와 동일할 때 최대 성능을 발휘한다.
slices 개수를 샤드의 수보다 더 크게 설정하는 것은 일반적으로 overhead를 더할 뿐, 성능을 향상시키지 못한다.
실제로 500개의 slices로 요청을 보내본 결과 다음과 같은 결과가 나왔다.
- 건수 : 약 394만건
- scroll 조건 : 500 sliced / 180m scroll
- Indexing Rate : 최대 초당 10,000건
- 소요 시간 : 약 8분 3초
반면, 샤드의 개수와 동일하게 14개의 slices로 요청을 보낸 결과는 다음과 같다.
- 건수 : 약 394만건
- scroll 조건 : 14 sliced / 180m scroll
- Indexing Rate : 최대 초당 32,000건
- 소요 시간 : 약 2분 20초
무려 4배의 성능 우위를 보이는 만큼 slices 개수를 shard 개수와 동일하게 설정하는 것이 가장 중요하다.
최종적으로 약 4400만건에 대해 업데이트한 결과는 다음과 같다.
- 건수 : 약 4400만건
- scroll 조건 : 14 sliced / 180m scroll
- Indexing Rate : 최대 초당 30,000건
- 소요 시간 : 약 29분 18초
- scroll size : 1,000 (default)
약 29분 18초만에 4,400만건에 대한 변경 처리를 할 수 있었다.
scroll size를 10,000으로 지정하면 성능이 더 좋아지는 것을 확인할 수 있다.
- 건수 : 약 4400만건
- scroll 조건 : 14 sliced / 180m scroll
- Indexing Rate : 최대 초당 40,000건
- 소요 시간 : 약 18분 18초
- scroll size : 10,000
반응형'Elasticsearch' 카테고리의 다른 글
엘라스틱서치 - Ubuntu 서버에 데이터 노드 추가하기 (0) 2022.01.12