Skip to content

Commit d0d7eef

Browse files
authored
async build support for cardinal (zilliztech#873)
* add async build api Signed-off-by: yusheng.ma <yusheng.ma@zilliz.com> * add async build api Signed-off-by: yusheng.ma <yusheng.ma@zilliz.com> * add async build api Signed-off-by: yusheng.ma <yusheng.ma@zilliz.com> * add async build api Signed-off-by: yusheng.ma <yusheng.ma@zilliz.com> --------- Signed-off-by: yusheng.ma <yusheng.ma@zilliz.com>
1 parent dc57f43 commit d0d7eef

File tree

7 files changed

+178
-1
lines changed

7 files changed

+178
-1
lines changed

include/knowhere/expected.h

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ enum class Status {
4545
invalid_index_error = 23,
4646
invalid_cluster_error = 24,
4747
cluster_inner_error = 25,
48+
timeout = 26,
4849
};
4950

5051
inline std::string

include/knowhere/index/index.h

+11-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
#include "knowhere/dataset.h"
1818
#include "knowhere/expected.h"
1919
#include "knowhere/index/index_node.h"
20-
20+
#include "knowhere/index/interrupt.h"
2121
namespace knowhere {
22+
2223
template <typename T1>
2324
class Index {
2425
public:
@@ -141,6 +142,15 @@ class Index {
141142
Status
142143
Build(const DataSetPtr dataset, const Json& json);
143144

145+
#ifdef KNOWHERE_WITH_CARDINAL
146+
const std::shared_ptr<Interrupt>
147+
BuildAsync(const DataSetPtr dataset, const Json& json,
148+
const std::chrono::seconds timeout = std::chrono::seconds::max());
149+
#else
150+
const std::shared_ptr<Interrupt>
151+
BuildAsync(const DataSetPtr dataset, const Json& json);
152+
#endif
153+
144154
Status
145155
Train(const DataSetPtr dataset, const Json& json);
146156

include/knowhere/index/index_node.h

+12
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434

3535
namespace knowhere {
3636

37+
class Interrupt;
38+
3739
class IndexNode : public Object {
3840
public:
3941
IndexNode(const int32_t ver) : version_(ver) {
@@ -71,6 +73,16 @@ class IndexNode : public Object {
7173
return Add(dataset, std::move(cfg));
7274
}
7375

76+
/*
77+
*@ @brief Builds the index using the provided dataset,configuration and handle.
78+
*/
79+
#ifdef KNOWHERE_WITH_CARDINAL
80+
virtual Status
81+
BuildAsync(const DataSetPtr dataset, std::shared_ptr<Config> cfg, const Interrupt* = nullptr) {
82+
return Build(dataset, std::move(cfg));
83+
}
84+
#endif
85+
7486
/**
7587
* @brief Trains the index model using the provided dataset and configuration.
7688
*

include/knowhere/index/interrupt.h

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright (C) 2019-2023 Zilliz. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
4+
// with the License. You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software distributed under the License
9+
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10+
// or implied. See the License for the specific language governing permissions and limitations under the License.
11+
#ifndef KNOWHERE_INTERRUPT_H
12+
#define KNOWHERE_INTERRUPT_H
13+
#include <atomic>
14+
#include <chrono>
15+
#include <memory>
16+
17+
#include "knowhere/expected.h"
18+
namespace folly {
19+
template <typename T>
20+
class Future;
21+
} // namespace folly
22+
23+
namespace knowhere {
24+
class Interrupt {
25+
public:
26+
#ifdef KNOWHERE_WITH_CARDINAL
27+
explicit Interrupt(const std::chrono::seconds& timeout);
28+
29+
void
30+
Stop();
31+
32+
bool
33+
Flag() const;
34+
35+
bool
36+
IsTimeout() const;
37+
#else
38+
Interrupt();
39+
#endif
40+
41+
Status
42+
Get();
43+
44+
void
45+
Set(folly::Future<Status>&& future);
46+
47+
~Interrupt();
48+
49+
private:
50+
#ifdef KNOWHERE_WITH_CARDINAL
51+
std::chrono::steady_clock::time_point start;
52+
std::chrono::seconds timeout;
53+
mutable std::atomic_bool flag = false;
54+
#endif
55+
std::unique_ptr<folly::Future<Status>> future;
56+
};
57+
} // namespace knowhere
58+
#endif /* INTERRUPT_H */

python/knowhere/knowhere.i

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ typedef uint64_t size_t;
1717
%ignore knowhere::IndexNode;
1818
%ignore knowhere::Index;
1919
%ignore knowhere::expected;
20+
%ignore knowhere::Interrupt;
2021
%{
2122
#include <stdint.h>
2223
#include <memory>

src/index/index.cc

+36
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#include "knowhere/index/index.h"
1313

1414
#include "fmt/format.h"
15+
#include "folly/futures/Future.h"
16+
#include "knowhere/comp/thread_pool.h"
1517
#include "knowhere/comp/time_recorder.h"
1618
#include "knowhere/dataset.h"
1719
#include "knowhere/expected.h"
@@ -34,6 +36,40 @@ LoadConfig(BaseConfig* cfg, const Json& json, knowhere::PARAM_TYPE param_type, c
3436
return Config::Load(*cfg, json_, param_type, msg);
3537
}
3638

39+
#ifdef KNOWHERE_WITH_CARDINAL
40+
template <typename T>
41+
inline const std::shared_ptr<Interrupt>
42+
Index<T>::BuildAsync(const DataSetPtr dataset, const Json& json, const std::chrono::seconds timeout) {
43+
auto pool = ThreadPool::GetGlobalBuildThreadPool();
44+
auto interrupt = std::make_shared<Interrupt>(timeout);
45+
interrupt->Set(pool->push([this, dataset, &json, &interrupt]() {
46+
auto cfg = this->node->CreateConfig();
47+
RETURN_IF_ERROR(LoadConfig(cfg.get(), json, knowhere::TRAIN, "Build"));
48+
49+
#if defined(NOT_COMPILE_FOR_SWIG) && !defined(KNOWHERE_WITH_LIGHT)
50+
TimeRecorder rc("BuildAsync index ", 2);
51+
auto res = this->node->BuildAsync(dataset, std::move(cfg), interrupt.get());
52+
auto time = rc.ElapseFromBegin("done");
53+
time *= 0.000001; // convert to s
54+
knowhere_build_latency.Observe(time);
55+
#else
56+
auto res = this->node->BuildAsync(dataset, std::move(cfg), Interrupt.get());
57+
#endif
58+
return res;
59+
}));
60+
return interrupt;
61+
}
62+
#else
63+
template <typename T>
64+
inline const std::shared_ptr<Interrupt>
65+
Index<T>::BuildAsync(const DataSetPtr dataset, const Json& json) {
66+
auto pool = ThreadPool::GetGlobalBuildThreadPool();
67+
auto interrupt = std::make_shared<Interrupt>();
68+
interrupt->Set(pool->push([this, &dataset, &json]() { return this->Build(dataset, json); }));
69+
return interrupt;
70+
}
71+
#endif
72+
3773
template <typename T>
3874
inline Status
3975
Index<T>::Build(const DataSetPtr dataset, const Json& json) {

src/index/interrupt.cc

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright (C) 2019-2023 Zilliz. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
4+
// with the License. You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software distributed under the License
9+
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10+
// or implied. See the License for the specific language governing permissions and limitations under the License.
11+
12+
#include "knowhere/index/interrupt.h"
13+
14+
#include "folly/futures/Future.h"
15+
namespace knowhere {
16+
#ifdef KNOWHERE_WITH_CARDINAL
17+
Interrupt::Interrupt(const std::chrono::seconds& timeout) : start(std::chrono::steady_clock::now()), timeout(timeout) {
18+
}
19+
#else
20+
Interrupt::Interrupt() = default;
21+
#endif
22+
23+
#ifdef KNOWHERE_WITH_CARDINAL
24+
void
25+
Interrupt::Stop() {
26+
this->flag.store(true);
27+
};
28+
29+
bool
30+
Interrupt::Flag() const {
31+
return this->flag.load();
32+
}
33+
34+
bool
35+
Interrupt::IsTimeout() const {
36+
auto now = std::chrono::steady_clock::now();
37+
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - start);
38+
return dur.count() > timeout.count();
39+
}
40+
#endif
41+
Status
42+
Interrupt::Get() {
43+
future->wait();
44+
#ifdef KNOWHERE_WITH_CARDINAL
45+
if (this->Flag() || this->IsTimeout())
46+
return Status::timeout;
47+
#endif
48+
return std::move(*future).get();
49+
}
50+
51+
void
52+
Interrupt::Set(folly::Future<Status>&& future) {
53+
this->future = std::make_unique<folly::Future<Status>>(std::move(future));
54+
}
55+
56+
Interrupt::~Interrupt() {
57+
}
58+
59+
} // namespace knowhere

0 commit comments

Comments
 (0)