1use super::core::home_replica_filters;
2use crate::fs::util::entry_key_to_path;
3use crate::{
4 database::{
5 core::DATABASE,
6 posts::core::{OkuNote, OkuPost},
7 users::OkuUser,
8 },
9 fs::OkuFs,
10};
11use dashmap::DashMap;
12use iroh_blobs::Hash;
13use iroh_docs::rpc::client::docs::Entry;
14use iroh_docs::AuthorId;
15use log::error;
16use miette::IntoDiagnostic;
17use rayon::iter::{
18 FromParallelIterator, IntoParallelIterator, IntoParallelRefIterator, ParallelIterator,
19};
20use std::{
21 collections::{HashMap, HashSet},
22 path::{Path, PathBuf},
23 sync::atomic::AtomicUsize,
24};
25use url::Url;
26
27impl OkuFs {
28 pub async fn posts(&self) -> Option<Vec<OkuPost>> {
34 let post_files = self
35 .read_directory(&self.home_replica().await?, Path::new("/posts/"))
36 .await
37 .ok()
38 .unwrap_or_default();
39 Some(
40 post_files
41 .par_iter()
42 .filter(|(entry, _)| {
43 entry_key_to_path(entry.key())
44 .map(|x| matches!(x.extension(), Some(y) if y == "okupost"))
45 .unwrap_or(false)
46 })
47 .filter_map(|(entry, bytes)| {
48 toml::from_str::<OkuNote>(String::from_utf8_lossy(bytes).as_ref())
49 .ok()
50 .map(|x| OkuPost {
51 entry: entry.clone(),
52 note: x,
53 })
54 })
55 .collect(),
56 )
57 }
58
59 pub async fn all_posts(&self) -> HashSet<OkuPost> {
65 let mut posts = HashSet::<_>::from_par_iter(self.posts().await.unwrap_or_default());
66 posts.extend(DATABASE.get_posts().unwrap_or_default());
67 posts
68 }
69
70 pub async fn posts_with_tags(&self, posts: &[OkuPost], tags: &HashSet<String>) -> Vec<OkuPost> {
82 posts
83 .to_owned()
84 .into_par_iter()
85 .filter(|x| !x.note.tags.is_disjoint(tags))
86 .collect()
87 }
88
89 pub async fn all_tags(&self, posts: &HashSet<OkuPost>) -> HashSet<String> {
99 HashSet::<_>::from_par_iter(posts.into_par_iter().flat_map(|x| x.note.tags.clone()))
100 }
101
102 pub async fn count_tags(&self, posts: &HashSet<OkuPost>) -> HashMap<String, usize> {
112 let result: DashMap<String, AtomicUsize> = DashMap::new();
113 posts.into_par_iter().for_each(|x| {
114 x.note.tags.par_iter().for_each(|y| match result.get(y) {
115 None => {
116 result.insert(y.to_owned(), AtomicUsize::new(1));
117 }
118 Some(v) => {
119 v.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
120 }
121 });
122 });
123 result
124 .into_par_iter()
125 .map(|(k, v)| (k, v.into_inner()))
126 .collect()
127 }
128
129 pub async fn post(&self, path: &PathBuf) -> miette::Result<OkuPost> {
139 let namespace_id = self
140 .home_replica()
141 .await
142 .ok_or(miette::miette!("Home replica not set … "))?;
143 match self.read_file(&namespace_id, path).await {
144 Ok(bytes) => {
145 let note = toml::from_str::<OkuNote>(String::from_utf8_lossy(&bytes).as_ref())
146 .into_diagnostic()?;
147 Ok(OkuPost {
148 entry: self.get_entry(&namespace_id, path).await?,
149 note,
150 })
151 }
152 Err(e) => Err(miette::miette!("{}", e)),
153 }
154 }
155
156 pub async fn post_from_entry(&self, entry: &Entry) -> miette::Result<OkuPost> {
166 let bytes = self
167 .content_bytes(entry)
168 .await
169 .map_err(|e| miette::miette!("{}", e))?;
170 let note = toml::from_str::<OkuNote>(String::from_utf8_lossy(&bytes).as_ref())
171 .into_diagnostic()?;
172 Ok(OkuPost {
173 entry: entry.clone(),
174 note,
175 })
176 }
177
178 pub async fn posts_from_user(&self, user: &OkuUser) -> miette::Result<Vec<OkuPost>> {
188 let mut posts: Vec<_> = Vec::new();
189 for post in user.posts.clone() {
190 posts.push(self.post_from_entry(&post).await?);
191 }
192 Ok(posts)
193 }
194
195 pub async fn create_or_modify_post(
213 &self,
214 path: &Option<PathBuf>,
215 url: &Url,
216 title: &String,
217 body: &String,
218 tags: &HashSet<String>,
219 ) -> miette::Result<Hash> {
220 let home_replica_id = self
221 .home_replica()
222 .await
223 .ok_or(miette::miette!("No home replica set … "))?;
224 let new_note = OkuNote {
225 url: url.clone(),
226 title: title.to_string(),
227 body: body.to_string(),
228 tags: tags.clone(),
229 };
230 let post_path = match path {
231 Some(given_path) => given_path,
232 None => &new_note.suggested_post_path().into(),
233 };
234 self.create_or_modify_file(
235 &home_replica_id,
236 post_path,
237 toml::to_string_pretty(&new_note).into_diagnostic()?,
238 )
239 .await
240 }
241
242 pub async fn delete_post(&self, path: &PathBuf) -> miette::Result<usize> {
252 let home_replica_id = self
253 .home_replica()
254 .await
255 .ok_or(miette::miette!("No home replica set … "))?;
256 self.delete_file(&home_replica_id, path).await
257 }
258
259 pub async fn fetch_post(
271 &self,
272 author_id: &AuthorId,
273 path: &PathBuf,
274 ) -> miette::Result<OkuPost> {
275 let ticket = self
276 .resolve_author_id(author_id)
277 .await
278 .map_err(|e| miette::miette!("{}", e))?;
279 let namespace_id = ticket.capability.id();
280 match self
281 .fetch_file_with_ticket(&ticket, path, &Some(home_replica_filters()))
282 .await
283 {
284 Ok(bytes) => {
285 let note = toml::from_str::<OkuNote>(String::from_utf8_lossy(&bytes).as_ref())
286 .into_diagnostic()?;
287 let mut embedding_path = path.clone();
288 embedding_path.set_extension("okuembed");
289 if let Err(e) = self
290 .fetch_post_embeddings(&ticket, &embedding_path, note.url.as_ref())
291 .await
292 {
293 error!("{e}")
294 }
295 Ok(OkuPost {
296 entry: self.get_entry(&namespace_id, path).await?,
297 note,
298 })
299 }
300 Err(e) => Err(miette::miette!("{}", e)),
301 }
302 }
303
304 pub async fn get_or_fetch_post(
316 &self,
317 author_id: &AuthorId,
318 path: &PathBuf,
319 ) -> miette::Result<OkuPost> {
320 match DATABASE.get_post(author_id, path).ok().flatten() {
321 Some(post) => Ok(post),
322 None => self.fetch_post(author_id, path).await,
323 }
324 }
325}