Skip to main content

oku_fs/fs/
core.rs

1use super::*;
2use crate::config::OkuFsConfig;
3use bytes::Bytes;
4use iroh::protocol::ProtocolHandler;
5use iroh_blobs::{store::fs::FsStore, BlobsProtocol};
6use iroh_docs::Author;
7use iroh_gossip::Gossip;
8use log::{error, info};
9#[cfg(feature = "fuse")]
10use miette::IntoDiagnostic;
11use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
12use std::path::PathBuf;
13#[cfg(feature = "fuse")]
14use tokio::runtime::Handle;
15use tokio::sync::watch::{self};
16
17impl OkuFs {
18    /// Obtain the private key of the node's authorship credentials.
19    ///
20    /// # Return
21    ///
22    /// The private key of the node's authorship credentials.
23    pub async fn get_author(&self) -> anyhow::Result<Author> {
24        let default_author_id = self.default_author().await;
25
26        self.docs
27            .author_export(default_author_id)
28            .await
29            .ok()
30            .flatten()
31            .ok_or(anyhow::anyhow!(
32                "Missing private key for default author ({}).",
33                crate::fs::util::fmt_short(default_author_id)
34            ))
35    }
36
37    /// Starts an instance of an Oku file system.
38    /// In the background, an Iroh node is started if none is running, or is connected to if one is already running.
39    ///
40    /// # Arguments
41    ///
42    /// * `handle` - If compiling with the `fuse` feature, a Tokio runtime handle is required.
43    ///
44    /// # Returns
45    ///
46    /// A running instance of an Oku file system.
47    pub async fn start(#[cfg(feature = "fuse")] handle: &Handle) -> anyhow::Result<Self> {
48        let mdns = iroh::address_lookup::mdns::MdnsAddressLookup::builder();
49        let dht_discovery = iroh::address_lookup::DhtAddressLookup::builder();
50
51        let endpoint = iroh::Endpoint::builder()
52            .address_lookup(mdns)
53            .address_lookup(dht_discovery)
54            .bind()
55            .await?;
56
57        let store = FsStore::load(NODE_PATH.clone()).await?;
58
59        let blobs = BlobsProtocol::new(&store, None);
60
61        let gossip = Gossip::builder().spawn(endpoint.clone());
62        let docs = iroh_docs::protocol::Docs::persistent(NODE_PATH.clone())
63            .spawn(endpoint.clone(), store.into(), gossip.clone())
64            .await?;
65
66        let router = iroh::protocol::Router::builder(endpoint.clone())
67            .accept(iroh_blobs::ALPN, blobs.clone())
68            .accept(iroh_gossip::ALPN, gossip.clone())
69            .accept(iroh_docs::ALPN, docs.clone())
70            .spawn();
71        info!(
72            "Default author ID is {} … ",
73            crate::fs::util::fmt_short(docs.author_default().await.unwrap_or_default())
74        );
75
76        let (replica_sender, _replica_receiver) = watch::channel(());
77        let (okunet_fetch_sender, _okunet_fetch_receiver) = watch::channel(false);
78
79        let oku_fs = Self {
80            endpoint,
81            blobs,
82            docs,
83            router,
84            replica_sender,
85            okunet_fetch_sender,
86            #[cfg(feature = "fuse")]
87            fuse_handler: DebugIgnore::from(Arc::new(DefaultFuseHandler::new())),
88            #[cfg(feature = "fuse")]
89            handle: handle.clone(),
90            dht: mainline::Dht::server()?.as_async(),
91        };
92        let oku_fs_clone = oku_fs.clone();
93        let config = OkuFsConfig::load_or_create_config().unwrap_or_default();
94        let republish_delay = config.get_republish_delay();
95        let initial_publish_delay = config.get_initial_publish_delay();
96
97        tokio::spawn(async move {
98            tokio::time::sleep(initial_publish_delay).await;
99            loop {
100                match oku_fs_clone.announce_replicas().await {
101                    Ok(_) => info!("Announced all replicas … "),
102                    Err(e) => error!("{}", e),
103                }
104                match oku_fs_clone.refresh_users().await {
105                    Ok(_) => info!("Refreshed OkuNet database … "),
106                    Err(e) => error!("{}", e),
107                }
108                tokio::time::sleep(republish_delay).await;
109            }
110        });
111        Ok(oku_fs.clone())
112    }
113
114    /// Shuts down the Oku file system.
115    pub async fn shutdown(self) {
116        info!("Node shutting down … ");
117        self.endpoint.close().await;
118        if let Err(e) = self.router.shutdown().await {
119            error!("{e}");
120        }
121        self.docs.shutdown().await;
122        self.blobs.shutdown().await;
123        if let Err(e) = self.blobs.store().shutdown().await {
124            error!("{e}");
125        }
126    }
127
128    /// Retrieve the content of a document entry.
129    ///
130    /// # Arguments
131    ///
132    /// * `entry` - An entry in an Iroh document.
133    ///
134    /// # Returns
135    ///
136    /// The content of the entry, as raw bytes.
137    pub async fn content_bytes(&self, entry: &iroh_docs::Entry) -> anyhow::Result<Bytes> {
138        self.content_bytes_by_hash(&entry.content_hash()).await
139    }
140
141    /// Retrieve the content of a document entry by its hash.
142    ///
143    /// # Arguments
144    ///
145    /// * `hash` - The content hash of an Iroh document.
146    ///
147    /// # Returns
148    ///
149    /// The content of the entry, as raw bytes.
150    pub async fn content_bytes_by_hash(&self, hash: &iroh_blobs::Hash) -> anyhow::Result<Bytes> {
151        self.blobs
152            .blobs()
153            .get_bytes(*hash)
154            .await
155            .map_err(|e| anyhow::anyhow!(e))
156    }
157
158    /// Determines the oldest timestamp of a file entry in any replica stored locally.
159    ///
160    /// # Returns
161    ///
162    /// The oldest timestamp in any local replica, in microseconds from the Unix epoch.
163    pub async fn get_oldest_timestamp(&self) -> miette::Result<u64> {
164        let replicas = self.list_replicas().await?;
165        let mut timestamps: Vec<u64> = Vec::new();
166        for (replica, _capability_kind, _is_home_replica) in replicas {
167            timestamps.push(
168                self.get_oldest_timestamp_in_folder(&replica, &PathBuf::from("/"))
169                    .await?,
170            );
171        }
172        Ok(*timestamps.par_iter().min().unwrap_or(&u64::MIN))
173    }
174
175    /// Determines the latest timestamp of a file entry in any replica stored locally.
176    ///
177    /// # Returns
178    ///
179    /// The latest timestamp in any local replica, in microseconds from the Unix epoch.
180    pub async fn get_newest_timestamp(&self) -> miette::Result<u64> {
181        let replicas = self.list_replicas().await?;
182        let mut timestamps: Vec<u64> = Vec::new();
183        for (replica, _capability_kind, _is_home_replica) in replicas {
184            timestamps.push(
185                self.get_newest_timestamp_in_folder(&replica, &PathBuf::from("/"))
186                    .await?,
187            );
188        }
189        Ok(*timestamps.par_iter().max().unwrap_or(&u64::MIN))
190    }
191
192    /// Determines the size of the file system.
193    ///
194    /// # Returns
195    ///
196    /// The total size, in bytes, of the files in every replica stored locally.
197    pub async fn get_size(&self) -> miette::Result<u64> {
198        let replicas = self.list_replicas().await?;
199        let mut size = 0;
200        for (replica, _capability_kind, _is_home_replica) in replicas {
201            size += self.get_folder_size(&replica, &PathBuf::from("/")).await?;
202        }
203        Ok(size)
204    }
205
206    #[cfg(feature = "fuse")]
207    /// Mount the file system.
208    ///
209    /// # Arguments
210    ///
211    /// * `path` - The path to the file system mount point.
212    ///
213    /// # Returns
214    ///
215    /// A handle referencing the mounted file system; joining or dropping the handle will unmount the file system and shutdown the node.
216    pub fn mount(&self, path: PathBuf) -> miette::Result<fuser::BackgroundSession> {
217        easy_fuser::spawn_mount(self.clone(), path, &vec![], 4).into_diagnostic()
218    }
219}