oku_fs/database/posts/
operations.rs1use super::super::core::*;
2use super::core::OkuPost;
3use super::core::POST_INDEX;
4use super::core::POST_INDEX_READER;
5use super::core::POST_INDEX_WRITER;
6use super::core::POST_SCHEMA;
7use crate::fs::util::path_to_entry_key;
8use iroh_docs::AuthorId;
9use miette::IntoDiagnostic;
10use native_db::*;
11use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
12use std::{collections::HashSet, path::PathBuf};
13use tantivy::{collector::TopDocs, query::QueryParser, TantivyDocument};
14
15impl OkuDatabase {
16    pub fn search_posts(
28        query_string: &str,
29        result_limit: &Option<usize>,
30    ) -> miette::Result<Vec<OkuPost>> {
31        let searcher = POST_INDEX_READER.searcher();
32        let query_parser = QueryParser::for_index(
33            &POST_INDEX,
34            vec![
35                POST_SCHEMA.1["author_id"],
36                POST_SCHEMA.1["path"],
37                POST_SCHEMA.1["title"],
38                POST_SCHEMA.1["body"],
39                POST_SCHEMA.1["tag"],
40            ],
41        );
42        let query = query_parser.parse_query(query_string).into_diagnostic()?;
43        let limit = result_limit.unwrap_or(10);
44        let top_docs = searcher
45            .search(&query, &TopDocs::with_limit(limit))
46            .into_diagnostic()?;
47        Ok(top_docs
48            .par_iter()
49            .filter_map(|x| searcher.doc(x.1).ok())
50            .collect::<Vec<TantivyDocument>>()
51            .into_par_iter()
52            .filter_map(|x| TryInto::try_into(x).ok())
53            .collect())
54    }
55
56    pub fn upsert_post(&self, post: &OkuPost) -> miette::Result<Option<OkuPost>> {
66        let rw: transaction::RwTransaction<'_> =
67            self.database.rw_transaction().into_diagnostic()?;
68        let old_value: Option<OkuPost> = rw.upsert(post.clone()).into_diagnostic()?;
69        rw.commit().into_diagnostic()?;
70
71        let mut index_writer = POST_INDEX_WRITER
72            .clone()
73            .try_lock_owned()
74            .into_diagnostic()?;
75        if let Some(old_post) = old_value.clone() {
76            index_writer.delete_term(old_post.index_term());
77        }
78        index_writer
79            .add_document(post.to_owned().into())
80            .into_diagnostic()?;
81        index_writer.commit().into_diagnostic()?;
82
83        Ok(old_value)
84    }
85
86    pub fn upsert_posts(&self, posts: &Vec<OkuPost>) -> miette::Result<Vec<Option<OkuPost>>> {
96        let rw = self.database.rw_transaction().into_diagnostic()?;
97        let old_posts: Vec<_> = posts
98            .clone()
99            .into_iter()
100            .filter_map(|post| rw.upsert(post).ok())
101            .collect();
102        rw.commit().into_diagnostic()?;
103
104        let mut index_writer = POST_INDEX_WRITER
105            .clone()
106            .try_lock_owned()
107            .into_diagnostic()?;
108        old_posts.par_iter().for_each(|old_post| {
109            if let Some(old_post) = old_post {
110                index_writer.delete_term(old_post.index_term());
111            }
112        });
113        posts.par_iter().for_each(|post| {
114            let _ = index_writer.add_document(post.clone().into());
115        });
116        index_writer.commit().into_diagnostic()?;
117
118        Ok(old_posts)
119    }
120
121    pub fn delete_post(&self, post: &OkuPost) -> miette::Result<OkuPost> {
131        let rw = self.database.rw_transaction().into_diagnostic()?;
132        let removed_post = rw.remove(post.to_owned()).into_diagnostic()?;
133        rw.commit().into_diagnostic()?;
134
135        let mut index_writer = POST_INDEX_WRITER
136            .clone()
137            .try_lock_owned()
138            .into_diagnostic()?;
139        index_writer.delete_term(removed_post.index_term());
140        index_writer.commit().into_diagnostic()?;
141
142        Ok(removed_post)
143    }
144
145    pub fn delete_posts(&self, posts: &[OkuPost]) -> miette::Result<Vec<OkuPost>> {
155        let rw = self.database.rw_transaction().into_diagnostic()?;
156        let removed_posts: Vec<_> = posts
157            .iter()
158            .filter_map(|post| rw.remove(post.to_owned()).ok())
159            .collect();
160        rw.commit().into_diagnostic()?;
161
162        let mut index_writer = POST_INDEX_WRITER
163            .clone()
164            .try_lock_owned()
165            .into_diagnostic()?;
166        removed_posts.par_iter().for_each(|removed_post| {
167            index_writer.delete_term(removed_post.index_term());
168        });
169        index_writer.commit().into_diagnostic()?;
170
171        Ok(removed_posts)
172    }
173
174    pub fn get_posts(&self) -> miette::Result<Vec<OkuPost>> {
180        let r = self.database.r_transaction().into_diagnostic()?;
181        r.scan()
182            .primary()
183            .into_diagnostic()?
184            .all()
185            .into_diagnostic()?
186            .collect::<Result<Vec<_>, _>>()
187            .into_diagnostic()
188    }
189
190    pub fn get_posts_by_author(&self, author_id: &AuthorId) -> miette::Result<Vec<OkuPost>> {
200        Ok(self
201            .get_posts()?
202            .into_par_iter()
203            .filter(|x| x.entry.author() == *author_id)
204            .collect())
205    }
206
207    pub fn get_posts_by_tag(&self, tag: &String) -> miette::Result<Vec<OkuPost>> {
217        Ok(self
218            .get_posts()?
219            .into_par_iter()
220            .filter(|x| x.note.tags.contains(tag))
221            .collect())
222    }
223
224    pub fn get_tags(&self) -> miette::Result<HashSet<String>> {
230        Ok(self
231            .get_posts()?
232            .into_iter()
233            .flat_map(|x| x.note.tags)
234            .collect())
235    }
236
237    pub fn get_post(
249        &self,
250        author_id: &AuthorId,
251        path: &PathBuf,
252    ) -> miette::Result<Option<OkuPost>> {
253        let r = self.database.r_transaction().into_diagnostic()?;
254        let entry_key = (
255            author_id.as_bytes().to_vec(),
256            path_to_entry_key(path).to_vec(),
257        );
258        r.get().primary(entry_key).into_diagnostic()
259    }
260}