Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit with offset. #56

Merged
merged 11 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/collection_manager/sides/hooks.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use axum_openapi3::utoipa::ToSchema;
use axum_openapi3::utoipa::{self, IntoParams};
use axum_openapi3::utoipa::{self};
use chrono::Utc;
use dashmap::DashMap;
use oxc_allocator::Allocator;
Expand Down Expand Up @@ -125,6 +125,12 @@ impl Debug for HooksRuntime {
}
}

impl Default for HooksRuntime {
fn default() -> Self {
Self::new()
}
}

impl HooksRuntime {
pub fn new() -> Self {
Self {
Expand Down
114 changes: 65 additions & 49 deletions src/collection_manager/sides/read/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
FacetDefinition, FacetResult, FieldId, Filter, Limit, Properties, SearchMode,
SearchParams, TypedField,
},
sides::{CollectionWriteOperation, DocumentFieldIndexOperation},
sides::{CollectionWriteOperation, DocumentFieldIndexOperation, Offset},
},
embeddings::{EmbeddingService, LoadedModel},
file_utils::BufferedFile,
Expand All @@ -34,6 +34,7 @@ use crate::{
SEARCH_METRIC,
},
nlp::{locales::Locale, NLPService, TextParser},
offset_storage::OffsetStorage,
types::{CollectionId, DocumentId},
};

Expand All @@ -59,6 +60,7 @@ pub struct CollectionReader {
pub(super) number_index: NumberIndex,
pub(super) bool_index: BoolIndex,
// TODO: textparser -> vec<field_id>
offset_storage: OffsetStorage,
}

impl CollectionReader {
Expand Down Expand Up @@ -96,6 +98,8 @@ impl CollectionReader {
bool_index,

fields: Default::default(),

offset_storage: Default::default(),
})
}

Expand Down Expand Up @@ -126,13 +130,16 @@ impl CollectionReader {
.load(collection_data_dir.join("vectors"))
.context("Cannot load vectors index")?;

let coll_desc_file_path = collection_data_dir.join("desc.json");
let dump: CollectionDescriptorDump = BufferedFile::open(coll_desc_file_path)
let coll_desc_file_path = collection_data_dir.join("info.json");
let dump: CollectionInfo = BufferedFile::open(coll_desc_file_path)
.context("Cannot open collection file")?
.read_json_data()
.with_context(|| {
format!("Cannot deserialize collection descriptor for {:?}", self.id)
})?;
.with_context(|| format!("Cannot deserialize collection info for {:?}", self.id))?;

let dump = match dump {
CollectionInfo::V1(dump) => dump,
};

for (field_name, (field_id, field_type)) in dump.fields {
self.fields.insert(field_name, (field_id, field_type));
}
Expand Down Expand Up @@ -162,18 +169,18 @@ impl CollectionReader {
Ok(())
}

pub fn commit(&self, commit_config: CommitConfig) -> Result<()> {
pub fn commit(&self, data_dir: PathBuf) -> Result<()> {
self.string_index
.commit(commit_config.folder_to_commit.join("strings"))
.commit(data_dir.join("strings"))
.context("Cannot commit string index")?;
self.number_index
.commit(commit_config.folder_to_commit.join("numbers"))
.commit(data_dir.join("numbers"))
.context("Cannot commit number index")?;
self.vector_index
.commit(commit_config.folder_to_commit.join("vectors"))
.commit(data_dir.join("vectors"))
.context("Cannot commit vectors index")?;

let dump = CollectionDescriptorDump {
let dump = CollectionInfo::V1(CollectionInfoV1 {
id: self.id.clone(),
fields: self
.fields
Expand All @@ -191,13 +198,13 @@ impl CollectionReader {
(model.model_name(), field_ids.clone())
})
.collect(),
};
});

let coll_desc_file_path = commit_config.folder_to_commit.join("desc.json");
BufferedFile::create(coll_desc_file_path)
.context("Cannot create desc.json file")?
let coll_desc_file_path = data_dir.join("info.json");
BufferedFile::create_or_overwrite(coll_desc_file_path)
.context("Cannot create info.json file")?
.write_json_data(&dump)
.with_context(|| format!("Cannot serialize collection descriptor for {:?}", self.id))?;
.with_context(|| format!("Cannot serialize collection info for {:?}", self.id))?;

Ok(())
}
Expand All @@ -208,6 +215,7 @@ impl CollectionReader {

pub(super) async fn update(
&self,
offset: Offset,
collection_operation: CollectionWriteOperation,
) -> Result<()> {
match collection_operation {
Expand All @@ -222,6 +230,8 @@ impl CollectionReader {
self.fields
.insert(field_name.clone(), (field_id, typed_field.clone()));

self.offset_storage.set_offset(offset);

match typed_field {
TypedField::Embedding(embedding) => {
let loaded_model = self
Expand All @@ -230,7 +240,7 @@ impl CollectionReader {
.await?;

self.vector_index
.add_field(field_id, loaded_model.dimensions())?;
.add_field(offset, field_id, loaded_model.dimensions())?;

self.fields_per_model
.entry(loaded_model)
Expand All @@ -242,34 +252,38 @@ impl CollectionReader {
let text_parser = self.nlp_service.get(locale);
self.text_parser_per_field
.insert(field_id, (locale, text_parser));
self.string_index.add_field(offset, field_id);
}
_ => {}
}
}
CollectionWriteOperation::Index(doc_id, field_id, field_op) => match field_op {
DocumentFieldIndexOperation::IndexBoolean { value } => {
self.bool_index.add(doc_id, field_id, value)?;
}
DocumentFieldIndexOperation::IndexNumber { value } => {
self.number_index.add(doc_id, field_id, value)?;
}
DocumentFieldIndexOperation::IndexString {
field_length,
terms,
} => {
self.string_index
.insert(doc_id, field_id, field_length, terms)?;
}
DocumentFieldIndexOperation::IndexEmbedding { value } => {
// `insert_batch` is designed to process multiple values at once
// We are inserting only one value, and this is not good for performance
// We should add an API to accept a single value and avoid the rebuild step
// Instead, we could move the "rebuild" logic to the `VectorIndex`
// TODO: do it.
self.vector_index
.insert_batch(vec![(doc_id, field_id, vec![value])])?;
CollectionWriteOperation::Index(doc_id, field_id, field_op) => {
self.offset_storage.set_offset(offset);
match field_op {
DocumentFieldIndexOperation::IndexBoolean { value } => {
self.bool_index.add(offset, doc_id, field_id, value)?;
}
DocumentFieldIndexOperation::IndexNumber { value } => {
self.number_index.add(offset, doc_id, field_id, value)?;
}
DocumentFieldIndexOperation::IndexString {
field_length,
terms,
} => {
self.string_index
.insert(offset, doc_id, field_id, field_length, terms)?;
}
DocumentFieldIndexOperation::IndexEmbedding { value } => {
// `insert_batch` is designed to process multiple values at once
// We are inserting only one value, and this is not good for performance
// We should add an API to accept a single value and avoid the rebuild step
// Instead, we could move the "rebuild" logic to the `VectorIndex`
// TODO: do it.
self.vector_index
.insert_batch(offset, vec![(doc_id, field_id, vec![value])])?;
}
}
},
}
};

Ok(())
Expand Down Expand Up @@ -643,19 +657,21 @@ impl CollectionReader {
}
}

pub struct CommitConfig {
pub folder_to_commit: PathBuf,
pub epoch: u64,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Committed {
pub epoch: u64,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CollectionDescriptorDump {
pub id: CollectionId,
pub fields: Vec<(String, (FieldId, TypedField))>,
pub used_models: Vec<(String, Vec<FieldId>)>,
#[serde(tag = "version")]
enum CollectionInfo {
#[serde(rename = "1")]
V1(CollectionInfoV1),
}

#[derive(Debug, Serialize, Deserialize)]
struct CollectionInfoV1 {
id: CollectionId,
fields: Vec<(String, (FieldId, TypedField))>,
used_models: Vec<(String, Vec<FieldId>)>,
}
Loading
Loading