oku_fs/fs/
core.rs

1use super::*;
2use crate::discovery::{INITIAL_PUBLISH_DELAY, REPUBLISH_DELAY};
3use bytes::Bytes;
4use iroh::protocol::ProtocolHandler;
5use iroh_blobs::store::Store;
6use iroh_docs::Author;
7use log::{error, info};
8#[cfg(feature = "fuse")]
9use miette::IntoDiagnostic;
10use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
11use std::path::PathBuf;
12#[cfg(feature = "fuse")]
13use tokio::runtime::Handle;
14use tokio::sync::watch::{self};
15
16impl OkuFs {
17    /// Obtain the private key of the node's authorship credentials.
18    ///
19    /// # Return
20    ///
21    /// The private key of the node's authorship credentials.
22    pub async fn get_author(&self) -> anyhow::Result<Author> {
23        let default_author_id = self.default_author().await;
24
25        self.docs
26            .client()
27            .authors()
28            .export(default_author_id)
29            .await
30            .ok()
31            .flatten()
32            .ok_or(anyhow::anyhow!(
33                "Missing private key for default author ({}).",
34                crate::fs::util::fmt_short(default_author_id)
35            ))
36    }
37
38    /// Starts an instance of an Oku file system.
39    /// In the background, an Iroh node is started if none is running, or is connected to if one is already running.
40    ///
41    /// # Arguments
42    ///
43    /// * `handle` - If compiling with the `fuse` feature, a Tokio runtime handle is required.
44    ///
45    /// # Returns
46    ///
47    /// A running instance of an Oku file system.
48    pub async fn start(#[cfg(feature = "fuse")] handle: &Handle) -> anyhow::Result<Self> {
49        let endpoint = iroh::Endpoint::builder()
50            .discovery_n0()
51            .discovery_dht()
52            .discovery_local_network()
53            .bind()
54            .await?;
55        let blobs = iroh_blobs::net_protocol::Blobs::persistent(NODE_PATH.clone())
56            .await?
57            .build(&endpoint);
58        let gossip = iroh_gossip::net::Gossip::builder()
59            .spawn(endpoint.clone())
60            .await?;
61        let docs = iroh_docs::protocol::Docs::persistent(NODE_PATH.clone())
62            .spawn(&blobs, &gossip)
63            .await?;
64
65        let router = iroh::protocol::Router::builder(endpoint.clone())
66            .accept(iroh_blobs::ALPN, blobs.clone())
67            .accept(iroh_gossip::ALPN, gossip.clone())
68            .accept(iroh_docs::ALPN, docs.clone())
69            .spawn()
70            .await?;
71        info!(
72            "Default author ID is {} … ",
73            crate::fs::util::fmt_short(docs.client().authors().default().await.unwrap_or_default())
74        );
75
76        let (replica_sender, _replica_receiver) = watch::channel(());
77        let (okunet_fetch_sender, _okunet_fetch_receiver) = watch::channel(false);
78
79        let oku_fs = Self {
80            endpoint,
81            blobs,
82            docs,
83            router,
84            replica_sender,
85            okunet_fetch_sender,
86            #[cfg(feature = "fuse")]
87            fuse_handler: DebugIgnore::from(Arc::new(DefaultFuseHandler::new())),
88            #[cfg(feature = "fuse")]
89            handle: handle.clone(),
90            dht: mainline::Dht::server()?.as_async(),
91        };
92        let oku_fs_clone = oku_fs.clone();
93        tokio::spawn(async move {
94            loop {
95                tokio::time::sleep(INITIAL_PUBLISH_DELAY).await;
96                match oku_fs_clone.announce_replicas().await {
97                    Ok(_) => info!("Announced all replicas … "),
98                    Err(e) => error!("{}", e),
99                }
100                match oku_fs_clone.refresh_users().await {
101                    Ok(_) => info!("Refreshed OkuNet database … "),
102                    Err(e) => error!("{}", e),
103                }
104                tokio::time::sleep(REPUBLISH_DELAY - INITIAL_PUBLISH_DELAY).await;
105            }
106        });
107        Ok(oku_fs.clone())
108    }
109
110    /// Shuts down the Oku file system.
111    pub async fn shutdown(self) {
112        let _ = self.endpoint.close().await;
113        let _ = self.router.shutdown().await;
114        self.docs.shutdown().await;
115        self.blobs.shutdown().await;
116        self.blobs.store().shutdown().await;
117    }
118
119    /// Retrieve the content of a document entry.
120    ///
121    /// # Arguments
122    ///
123    /// * `entry` - An entry in an Iroh document.
124    ///
125    /// # Returns
126    ///
127    /// The content of the entry, as raw bytes.
128    pub async fn content_bytes(&self, entry: &iroh_docs::Entry) -> anyhow::Result<Bytes> {
129        self.content_bytes_by_hash(&entry.content_hash()).await
130    }
131
132    /// Retrieve the content of a document entry by its hash.
133    ///
134    /// # Arguments
135    ///
136    /// * `hash` - The content hash of an Iroh document.
137    ///
138    /// # Returns
139    ///
140    /// The content of the entry, as raw bytes.
141    pub async fn content_bytes_by_hash(&self, hash: &iroh_blobs::Hash) -> anyhow::Result<Bytes> {
142        self.blobs.client().read_to_bytes(*hash).await
143    }
144
145    /// Determines the oldest timestamp of a file entry in any replica stored locally.
146    ///
147    /// # Returns
148    ///
149    /// The oldest timestamp in any local replica, in microseconds from the Unix epoch.
150    pub async fn get_oldest_timestamp(&self) -> miette::Result<u64> {
151        let replicas = self.list_replicas().await?;
152        let mut timestamps: Vec<u64> = Vec::new();
153        for (replica, _capability_kind) in replicas {
154            timestamps.push(
155                self.get_oldest_timestamp_in_folder(&replica, &PathBuf::from("/"))
156                    .await?,
157            );
158        }
159        Ok(*timestamps.par_iter().min().unwrap_or(&u64::MIN))
160    }
161
162    /// Determines the latest timestamp of a file entry in any replica stored locally.
163    ///
164    /// # Returns
165    ///
166    /// The latest timestamp in any local replica, in microseconds from the Unix epoch.
167    pub async fn get_newest_timestamp(&self) -> miette::Result<u64> {
168        let replicas = self.list_replicas().await?;
169        let mut timestamps: Vec<u64> = Vec::new();
170        for (replica, _capability_kind) in replicas {
171            timestamps.push(
172                self.get_newest_timestamp_in_folder(&replica, &PathBuf::from("/"))
173                    .await?,
174            );
175        }
176        Ok(*timestamps.par_iter().max().unwrap_or(&u64::MIN))
177    }
178
179    /// Determines the size of the file system.
180    ///
181    /// # Returns
182    ///
183    /// The total size, in bytes, of the files in every replica stored locally.
184    pub async fn get_size(&self) -> miette::Result<u64> {
185        let replicas = self.list_replicas().await?;
186        let mut size = 0;
187        for (replica, _capability_kind) in replicas {
188            size += self.get_folder_size(&replica, &PathBuf::from("/")).await?;
189        }
190        Ok(size)
191    }
192
193    #[cfg(feature = "fuse")]
194    /// Mount the file system.
195    ///
196    /// # Arguments
197    ///
198    /// * `path` - The path to the file system mount point.
199    ///
200    /// # Returns
201    ///
202    /// A handle referencing the mounted file system; joining or dropping the handle will unmount the file system and shutdown the node.
203    pub fn mount(&self, path: PathBuf) -> miette::Result<fuser::BackgroundSession> {
204        easy_fuser::spawn_mount(self.clone(), path, &vec![], 4).into_diagnostic()
205    }
206}