Poll based futures in Rust

Max Bruckner

Callbacks

void download(
    std::string url,
    std::function<void(std::vector<std::byte>)> callback
);

Callbacks

void download(
    std::string url,
    std::function<void(std::vector<std::byte>)> callback
);
void play_song(std::vector<std::byte> song);

download(
    "https://example.com/song.opus",
    [](auto song) {
        play_song(song);
    });

Komplexeres Beispiel

1. Bild herunterladen.

2. Konvertieren

3. Wieder hochladen

Callbacks (komplexes Beispiel)

download(
    "https://example.com/image.png"
    [](auto image) {
        convert(
            image,
            [](auto converted_image) {
                upload(
                    converted_image,
                    "https://example.com/image.webp",
                    []() {
                        // ...
                    });
            });
    });

Future (aka Promise)

Repräsentiert zukünftiges Ergebnis

  • Async IO
  • Arbeit auf anderem Thread
  • Kombinatoren
  • Timer

Beispiel

download("https://example.com/image.png")
    .and_then(|image| convert(image)) // redundant closure, I know
    .and_then(|converted_image|
        upload(converted_image, "https://example.com/image.webp"))
    .map(|()| /* ... */ )

Async/Await

let image = download("https://example.com/image.png").await;
let converted_image = convert(image).await;
upload(converted_image, "https://example.com/image.webp").await;
/* ... */

Typische Implementierung

Serviervorschlag

Sugar free!

template <typename Result, typename Arguments>
struct Future {
    void start(Arguments arguments);
    void schedule(std::function<void(Result)> action);
    /* ... */
}
upload_future.schedule([]() { /* ... */});
conversion_future.schedule([](auto converted_image) {
    upload_future.start(converted_image);
});
download_future.schedule([](auto image) {
    conversion_future.start(image);
});
download_future.start(url);

Schwierigkeiten

  • Cancelation
  • Thread-Synchronisierung (Wo setze ich fort)
  • Heap-Allokationen schwer vermeidbar

std::future::Future

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context)
        -> Poll<Self::Output>;
}
pub enum Poll<T> {
    Ready(T),
    Pending,
}

Beispiel: Async IO

fn poll(&mut self, waker: &Waker) -> Poll<Vec<u8>> {
    if not_started {
        syscall(|result| {
            self.is_finished = true;
            self.result = result;
            waker.wake();
        });
        return Pending;
    }

    if self.is_finished {
        return Ready(self.result);
    } else {
        return Pending;
    }
}

Unser Beispiel

download(download_url)
    .and_then(|image| convert(image))
    .and_then(|converted_image| upload(converted_image, upload_url))
    .map(|()| /* ... */ )
let image = download(download_url).await;
let converted_image = convert(image).await;
upload(converted_image, upload_url).await;
/* ... */

Intern (so ungefähr)

enum ImageChain {
    Downloading(DownloadFuture),
    Converting(ConversionFuture),
    Uploading(UploadFuture),
    Finished,
}

Intern (so ungefähr)

fn poll(&mut self, waker: &Waker) -> Poll<()> {
    match self {
        Downloading(download_future) => {
            match download_future.poll(waker) {
                Ready(image) => // transition to Converting,
                Pending => return Pending, }, },
        Converting(conversion_future) => {
            match conversion_future.poll(waker) {
                Ready(image) => // transition to Uploading,
                Pending => return Pending, } },
        Uploading(upload_future) => {
            Ready(()) => /* ... */ // also transition to Finish.,
            Pending => return Pending,
        },
        Finished => panic!("invalid"),
    }
}

Task (in der Regel)

  • Arbeitseinheit aus vielen Teil-Futures
  • Erhält callbacks über Waker
  • Scheduling in Task-Queue
  • Idr. als Einheit allokiert
  • Pollt Top-Level-Future nach wakeup

Ist selbst eine Future

In unserem Beispiel:

Ein Task pro Bild-URL

Futures für z.B.

  • TCP-Verbindung
  • TLS-Handshake
  • Umrechnung
  • usw.

Fazit

Cancelation

  • Einfach nicht mehr pollen

Thread-Synchronisierung

  • Nur wo wirklich nötig (e.g. nicht für Combinators)

Heap-Allokationen

  • Meistens nur einmal pro Task

Und

  • Compiler kann viel weg optimieren

Quellen

Tipp

  • extundelete kann gelöschte Präsentationen wiederherstellen ...