ฉันต้องการใช้ประโยชน์จากรันไทม์ของ Tokio เพื่อจัดการกับ async futures ในปริมาณที่ผันแปรได้ เนื่องจากไม่ทราบจำนวนฟิวเจอร์สในขณะรวบรวม ดูเหมือนว่าFuturesUnorderedจะเป็นตัวเลือกที่ดีที่สุดของฉัน (มาโครเช่นselect!กำหนดให้ระบุสาขาของคุณ ณ เวลาคอมไพล์; join_allอาจเป็นไปได้ แต่เอกสารแนะนำ FuturesUnordered "ในหลายกรณี" เมื่อคำสั่งซื้อไม่ ไม่เป็นไร)

ตรรกะของตัวอย่างนี้คือลูป recv() ที่ถูกผลักไปที่ฝากข้อมูลของฟิวเจอร์ส ซึ่งควรจะทำงานอยู่เสมอ เมื่อข้อมูลใหม่มาถึง การแยกวิเคราะห์/การประมวลผลจะถูกผลักไปที่ฝากข้อมูลในอนาคตด้วย (แทนที่จะถูกประมวลผลทันที) สิ่งนี้ทำให้มั่นใจได้ว่าผู้รับจะรักษาเวลาแฝงต่ำในการตอบสนองต่อเหตุการณ์ใหม่ และการประมวลผลข้อมูล (การถอดรหัสที่อาจมีราคาแพงในการคำนวณ) เกิดขึ้นพร้อมกันกับบล็อก async การประมวลผลข้อมูลอื่น ๆ ทั้งหมด (รวมถึงตัวรับการรับฟัง)

หัวข้อนี้จะอธิบายว่าทำไมฟิวเจอร์สได้รับ.boxed()โดยวิธีการ

ปัญหาคือข้อผิดพลาดที่คลุมเครือนี้:

error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely
  --> src/main.rs:27:8
   |
27 |     }).boxed());
   |        ^^^^^ `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely
   |
   = help: the trait `Sync` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send`
   = note: required because of the requirements on the impl of `Sync` for `Unique<dyn futures::Future<Output = ()> + std::marker::Send>`
   = note: required because it appears within the type `Box<dyn futures::Future<Output = ()> + std::marker::Send>`
   = note: required because it appears within the type `Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>`
   = note: required because of the requirements on the impl of `Sync` for `FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `&FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
   = note: required because it appears within the type `[static [email protected]/main.rs:16:25: 27:6 _]`
   = note: required because it appears within the type `from_generator::GenFuture<[static [email protected]/main.rs:16:25: 27:6 _]>`
   = note: required because it appears within the type `impl futures::Future`

ดูเหมือนว่าการผลักดัน UnorderedFutures "แบบเรียกซ้ำ" (ไม่ใช่ฉันเดาจริงๆแต่คุณจะเรียกมันว่าอะไร) ไม่ทำงาน แต่ฉันไม่แน่ใจว่าทำไม ข้อผิดพลาดนี้บ่งชี้ว่าSyncไม่ตรงตามข้อกำหนดของคุณสมบัติบางอย่างสำหรับบล็อกอะซิงโครนัสของ Box'd & Pin'd ที่มีแนวโน้มโดยFuturesUnordered-- ข้อกำหนดที่ฉันเดาว่าถูกกำหนดไว้เพียงเพราะ&FuturesUnordered(ใช้ในระหว่างfutures.push(...)เพราะวิธีการนั้นยืม &self) ต้องการมันสำหรับSendคุณลักษณะ ... หรือบางสิ่งบางอย่าง?

use std::error::Error;
use tokio::sync::mpsc::{self, Receiver, Sender};
use futures::stream::futures_unordered::FuturesUnordered;
use futures::FutureExt;

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    let mut futures = FuturesUnordered::new();
    let (tx, rx) = mpsc::channel(32);
    
    tokio::spawn( foo(tx) );    // Only the receiver is relevant; its transmitter is
                                // elsewhere, occasionally sending data.
    futures.push((async {                               // <--- NOTE: futures.push()
        loop {
            match rx.recv().await {
                Some(data) => {
                    futures.push((async move {          // <--- NOTE: nested futures.push()
                        let _ = data; // TODO: replace with code that processes 'data'
                    }).boxed());
                },
                None => {}
            }
        }
    }).boxed());
    
    while let Some(_) = futures.next().await {}

    Ok(())
}
ตอบ

ฉันจะทิ้งข้อผิดพลาดระดับต่ำไว้เป็นคำตอบอื่น แต่ฉันเชื่อว่าวิธีที่มีสำนวนมากขึ้นในการแก้ปัญหาระดับสูงที่นี่คือการรวมการใช้FuturesUnorderedกับสิ่งtokio::select!ต่อไปนี้:

use tokio::sync::mpsc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;

#[tokio::main]
pub async fn main() {
    let mut futures = FuturesUnordered::new();
    let (tx, mut rx) = mpsc::channel(32);
    
    //turn foo into something more concrete
    tokio::spawn(async move {
        let _ = tx.send(42i32).await;
    });

    loop {
        tokio::select! {
            Some(data) = rx.recv() => {
                futures.push(async move {
                    data.to_string()
                });
            },
            Some(result) = futures.next() => {
                println!("{}", result)
            },
            else => break,
        }
    }
}

คุณสามารถอ่านเพิ่มเติมเกี่ยวกับมาโครที่เลือกได้ที่นี่: https://tokio.rs/tokio/tutorial/select

เมื่อคุณใส่กล่องอนาคตที่สร้างโดยบล็อก async ด้วยboxedเมธอด คุณกำลังพยายามบังคับให้dyn Future + Send:

pub fn boxed<'a>(
    self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>

Sendอย่างไรก็ตามในอนาคตที่สร้างขึ้นไม่ได้ ทำไม? เพราะข้างในนั้น คุณพยายามดันไปที่FuturesUnorderedซึ่งยืมมันมา :

pub fn push(&self, future: Fut)

ซึ่งหมายความว่าasyncบล็อกจะจับไฟล์&FuturesUnordered. สำหรับชนิดที่จะเป็นSendเขตข้อมูลทั้งหมดของมันจะต้องเป็นSendดังนั้นสำหรับอนาคตที่สร้างขึ้นเพื่อเป็นSend, ต้อง&FuturesUnorderedSend

สำหรับการอ้างอิงเป็นSendประเภทจะต้องเป็นSync:

impl<'_, T> Send for &'_ T where
    T: Sync

และสำหรับFuturesUnorderedการเป็นSyncฟิวเจอร์สที่เก็บไว้จะต้องเป็นSync:

impl<Fut: Sync> Sync for FuturesUnordered<Fut> {}

อย่างไรก็ตาม อนาคตที่กลับมาโดยboxedไม่จำเป็นSync:

pub fn boxed<'a>(
    self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>

ซึ่งหมายความว่าตัวสร้าง async ไม่ใช่Sendดังนั้นคุณจึงไม่สามารถบังคับasync ได้dyn Future + Sendและคุณได้รับข้อความแสดงข้อผิดพลาดที่ทำให้สับสน

วิธีแก้ไขคือการเพิ่มSyncขอบเขตในอนาคตและBox::pinด้วยตนเอง:

type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;

let mut futures = FuturesUnordered::<BoxedFuture>::new();

futures.push(Box::pin(async {
    loop {
        match rx.recv().await {
            Some(data) => {
                futures.push(Box::pin(async move {
                    let _ = data;
                }));
            }
            None => {}
        }
    }
}));

อย่างไรก็ตาม คุณจะพบกับปัญหาการกู้ยืมจำนวนมาก ทางออกที่ดีกว่าคือใช้tokio::select!แทน outer pushตามคำตอบของ Michael อธิบาย