oku_fs/database/posts/
operations.rs1use super::super::core::*;
2use super::core::OkuPost;
3use super::core::POST_INDEX;
4use super::core::POST_INDEX_READER;
5use super::core::POST_INDEX_WRITER;
6use super::core::POST_SCHEMA;
7use crate::fs::util::path_to_entry_key;
8use iroh_docs::AuthorId;
9use miette::IntoDiagnostic;
10use native_db::*;
11use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
12use std::{collections::HashSet, path::PathBuf};
13use tantivy::{collector::TopDocs, query::QueryParser, TantivyDocument};
14
15impl OkuDatabase {
16 pub fn search_posts(
28 query_string: &str,
29 result_limit: &Option<usize>,
30 ) -> miette::Result<Vec<OkuPost>> {
31 let searcher = POST_INDEX_READER.searcher();
32 let query_parser = QueryParser::for_index(
33 &POST_INDEX,
34 vec![
35 POST_SCHEMA.1["author_id"],
36 POST_SCHEMA.1["path"],
37 POST_SCHEMA.1["title"],
38 POST_SCHEMA.1["body"],
39 POST_SCHEMA.1["tag"],
40 ],
41 );
42 let query = query_parser.parse_query(query_string).into_diagnostic()?;
43 let limit = result_limit.unwrap_or(10);
44 let top_docs = searcher
45 .search(&query, &TopDocs::with_limit(limit))
46 .into_diagnostic()?;
47 Ok(top_docs
48 .par_iter()
49 .filter_map(|x| searcher.doc(x.1).ok())
50 .collect::<Vec<TantivyDocument>>()
51 .into_par_iter()
52 .filter_map(|x| TryInto::try_into(x).ok())
53 .collect())
54 }
55
56 pub fn upsert_post(&self, post: &OkuPost) -> miette::Result<Option<OkuPost>> {
66 let rw: transaction::RwTransaction<'_> =
67 self.database.rw_transaction().into_diagnostic()?;
68 let old_value: Option<OkuPost> = rw.upsert(post.clone()).into_diagnostic()?;
69 rw.commit().into_diagnostic()?;
70
71 let mut index_writer = POST_INDEX_WRITER
72 .clone()
73 .try_lock_owned()
74 .into_diagnostic()?;
75 if let Some(old_post) = old_value.clone() {
76 index_writer.delete_term(old_post.index_term());
77 }
78 index_writer
79 .add_document(post.to_owned().into())
80 .into_diagnostic()?;
81 index_writer.commit().into_diagnostic()?;
82
83 Ok(old_value)
84 }
85
86 pub fn upsert_posts(&self, posts: &Vec<OkuPost>) -> miette::Result<Vec<Option<OkuPost>>> {
96 let rw = self.database.rw_transaction().into_diagnostic()?;
97 let old_posts: Vec<_> = posts
98 .clone()
99 .into_iter()
100 .filter_map(|post| rw.upsert(post).ok())
101 .collect();
102 rw.commit().into_diagnostic()?;
103
104 let mut index_writer = POST_INDEX_WRITER
105 .clone()
106 .try_lock_owned()
107 .into_diagnostic()?;
108 old_posts.par_iter().for_each(|old_post| {
109 if let Some(old_post) = old_post {
110 index_writer.delete_term(old_post.index_term());
111 }
112 });
113 posts.par_iter().for_each(|post| {
114 let _ = index_writer.add_document(post.clone().into());
115 });
116 index_writer.commit().into_diagnostic()?;
117
118 Ok(old_posts)
119 }
120
121 pub fn delete_post(&self, post: &OkuPost) -> miette::Result<OkuPost> {
131 let rw = self.database.rw_transaction().into_diagnostic()?;
132 let removed_post = rw.remove(post.to_owned()).into_diagnostic()?;
133 rw.commit().into_diagnostic()?;
134
135 let mut index_writer = POST_INDEX_WRITER
136 .clone()
137 .try_lock_owned()
138 .into_diagnostic()?;
139 index_writer.delete_term(removed_post.index_term());
140 index_writer.commit().into_diagnostic()?;
141
142 Ok(removed_post)
143 }
144
145 pub fn delete_posts(&self, posts: &[OkuPost]) -> miette::Result<Vec<OkuPost>> {
155 let rw = self.database.rw_transaction().into_diagnostic()?;
156 let removed_posts: Vec<_> = posts
157 .iter()
158 .filter_map(|post| rw.remove(post.to_owned()).ok())
159 .collect();
160 rw.commit().into_diagnostic()?;
161
162 let mut index_writer = POST_INDEX_WRITER
163 .clone()
164 .try_lock_owned()
165 .into_diagnostic()?;
166 removed_posts.par_iter().for_each(|removed_post| {
167 index_writer.delete_term(removed_post.index_term());
168 });
169 index_writer.commit().into_diagnostic()?;
170
171 Ok(removed_posts)
172 }
173
174 pub fn get_posts(&self) -> miette::Result<Vec<OkuPost>> {
180 let r = self.database.r_transaction().into_diagnostic()?;
181 r.scan()
182 .primary()
183 .into_diagnostic()?
184 .all()
185 .into_diagnostic()?
186 .collect::<Result<Vec<_>, _>>()
187 .into_diagnostic()
188 }
189
190 pub fn get_posts_by_author(&self, author_id: &AuthorId) -> miette::Result<Vec<OkuPost>> {
200 Ok(self
201 .get_posts()?
202 .into_par_iter()
203 .filter(|x| x.entry.author() == *author_id)
204 .collect())
205 }
206
207 pub fn get_posts_by_tag(&self, tag: &String) -> miette::Result<Vec<OkuPost>> {
217 Ok(self
218 .get_posts()?
219 .into_par_iter()
220 .filter(|x| x.note.tags.contains(tag))
221 .collect())
222 }
223
224 pub fn get_tags(&self) -> miette::Result<HashSet<String>> {
230 Ok(self
231 .get_posts()?
232 .into_iter()
233 .flat_map(|x| x.note.tags)
234 .collect())
235 }
236
237 pub fn get_post(
249 &self,
250 author_id: &AuthorId,
251 path: &PathBuf,
252 ) -> miette::Result<Option<OkuPost>> {
253 let r = self.database.r_transaction().into_diagnostic()?;
254 let entry_key = (
255 author_id.as_bytes().to_vec(),
256 path_to_entry_key(path).to_vec(),
257 );
258 r.get().primary(entry_key).into_diagnostic()
259 }
260}