oku_fs/database/posts/
operations.rs1use super::super::core::*;
2use super::core::OkuPost;
3use super::core::POST_INDEX;
4use super::core::POST_INDEX_READER;
5use super::core::POST_INDEX_WRITER;
6use super::core::POST_SCHEMA;
7use crate::fs::util::path_to_entry_key;
8use iroh_docs::AuthorId;
9use log::error;
10use miette::IntoDiagnostic;
11use native_db::*;
12use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
13use std::{collections::HashSet, path::PathBuf};
14use tantivy::{collector::TopDocs, query::QueryParser, TantivyDocument};
15
16impl OkuDatabase {
17 pub fn search_posts(
29 query_string: &str,
30 result_limit: &Option<usize>,
31 ) -> miette::Result<Vec<OkuPost>> {
32 let searcher = POST_INDEX_READER.searcher();
33 let query_parser = QueryParser::for_index(
34 &POST_INDEX,
35 vec![
36 POST_SCHEMA.1["author_id"],
37 POST_SCHEMA.1["path"],
38 POST_SCHEMA.1["title"],
39 POST_SCHEMA.1["body"],
40 POST_SCHEMA.1["tag"],
41 ],
42 );
43 let query = query_parser.parse_query(query_string).into_diagnostic()?;
44 let limit = result_limit.unwrap_or(10);
45 let top_docs = searcher
46 .search(&query, &TopDocs::with_limit(limit))
47 .into_diagnostic()?;
48 Ok(top_docs
49 .par_iter()
50 .filter_map(|x| searcher.doc(x.1).ok())
51 .collect::<Vec<TantivyDocument>>()
52 .into_par_iter()
53 .filter_map(|x| TryInto::try_into(x).ok())
54 .collect())
55 }
56
57 pub fn upsert_post(&self, post: &OkuPost) -> miette::Result<Option<OkuPost>> {
67 let rw: transaction::RwTransaction<'_> =
68 self.database.rw_transaction().into_diagnostic()?;
69 let old_value: Option<OkuPost> = rw.upsert(post.clone()).into_diagnostic()?;
70 rw.commit().into_diagnostic()?;
71
72 let mut index_writer = POST_INDEX_WRITER
73 .clone()
74 .try_lock_owned()
75 .into_diagnostic()?;
76 if let Some(old_post) = old_value.clone() {
77 index_writer.delete_term(old_post.index_term());
78 }
79 index_writer
80 .add_document(post.to_owned().into())
81 .into_diagnostic()?;
82 index_writer.commit().into_diagnostic()?;
83
84 Ok(old_value)
85 }
86
87 pub fn upsert_posts(&self, posts: &Vec<OkuPost>) -> miette::Result<Vec<Option<OkuPost>>> {
97 let rw = self.database.rw_transaction().into_diagnostic()?;
98 let old_posts: Vec<_> = posts
99 .clone()
100 .into_iter()
101 .filter_map(|post| rw.upsert(post).ok())
102 .collect();
103 rw.commit().into_diagnostic()?;
104
105 let mut index_writer = POST_INDEX_WRITER
106 .clone()
107 .try_lock_owned()
108 .into_diagnostic()?;
109 old_posts.par_iter().for_each(|old_post| {
110 if let Some(old_post) = old_post {
111 index_writer.delete_term(old_post.index_term());
112 }
113 });
114 posts.par_iter().for_each(|post| {
115 if let Err(e) = index_writer.add_document(post.clone().into()) {
116 error!("{e}");
117 }
118 });
119 index_writer.commit().into_diagnostic()?;
120
121 Ok(old_posts)
122 }
123
124 pub fn delete_post(&self, post: &OkuPost) -> miette::Result<OkuPost> {
134 let rw = self.database.rw_transaction().into_diagnostic()?;
135 let removed_post = rw.remove(post.to_owned()).into_diagnostic()?;
136 rw.commit().into_diagnostic()?;
137
138 let mut index_writer = POST_INDEX_WRITER
139 .clone()
140 .try_lock_owned()
141 .into_diagnostic()?;
142 index_writer.delete_term(removed_post.index_term());
143 index_writer.commit().into_diagnostic()?;
144
145 Ok(removed_post)
146 }
147
148 pub fn delete_posts(&self, posts: &[OkuPost]) -> miette::Result<Vec<OkuPost>> {
158 let rw = self.database.rw_transaction().into_diagnostic()?;
159 let removed_posts: Vec<_> = posts
160 .iter()
161 .filter_map(|post| rw.remove(post.to_owned()).ok())
162 .collect();
163 rw.commit().into_diagnostic()?;
164
165 let mut index_writer = POST_INDEX_WRITER
166 .clone()
167 .try_lock_owned()
168 .into_diagnostic()?;
169 removed_posts.par_iter().for_each(|removed_post| {
170 index_writer.delete_term(removed_post.index_term());
171 });
172 index_writer.commit().into_diagnostic()?;
173
174 Ok(removed_posts)
175 }
176
177 pub fn get_posts(&self) -> miette::Result<Vec<OkuPost>> {
183 let r = self.database.r_transaction().into_diagnostic()?;
184 r.scan()
185 .primary()
186 .into_diagnostic()?
187 .all()
188 .into_diagnostic()?
189 .collect::<Result<Vec<_>, _>>()
190 .into_diagnostic()
191 }
192
193 pub fn get_posts_by_author(&self, author_id: &AuthorId) -> miette::Result<Vec<OkuPost>> {
203 Ok(self
204 .get_posts()?
205 .into_par_iter()
206 .filter(|x| x.entry.author() == *author_id)
207 .collect())
208 }
209
210 pub fn get_posts_by_tag(&self, tag: &String) -> miette::Result<Vec<OkuPost>> {
220 Ok(self
221 .get_posts()?
222 .into_par_iter()
223 .filter(|x| x.note.tags.contains(tag))
224 .collect())
225 }
226
227 pub fn get_tags(&self) -> miette::Result<HashSet<String>> {
233 Ok(self
234 .get_posts()?
235 .into_iter()
236 .flat_map(|x| x.note.tags)
237 .collect())
238 }
239
240 pub fn get_post(
252 &self,
253 author_id: &AuthorId,
254 path: &PathBuf,
255 ) -> miette::Result<Option<OkuPost>> {
256 let r = self.database.r_transaction().into_diagnostic()?;
257 let entry_key = (
258 author_id.as_bytes().to_vec(),
259 path_to_entry_key(path).to_vec(),
260 );
261 r.get().primary(entry_key).into_diagnostic()
262 }
263}