近年来,数据工程领域发生了重大变化。数据系统日益复杂,对实时处理的需求,以及对可靠性和性能的不断需求,促使人们寻找更健壮的编程语言。这就是Rust的用武之地,Rust的增长速度非常快:
在这些竞争者中,Rust在2024年作为数据工程师的强大工具出现了,它承诺内存安全性、并发性和高效性。但是,究竟是什么让Rust成为数据工程的合适选择,以及如何在这个领域有效地利用它?
Rust在数据工程领域越来越受欢迎的一些关键原因:
1,内存安全:Rust的所有权系统确保在编译时捕获空指针解引用和缓冲区溢出等内存错误,从而降低运行时崩溃的风险。
2,并发性:凭借其轻量级并发模型和严格的编译时检查,Rust使编写既安全又高效的并发程序变得更容易,这是数据密集型应用程序的关键需求。
3,性能:Rust的性能与C和C++相当,适合高吞吐量的数据处理任务。
4,生态系统:Rust生态系统虽然相对年轻,但随着库和工具的不断成熟,它们越来越支持数据工程任务。
为了在数据工程中有效地利用Rust,有必要了解构建模块以及它们如何适应更广泛的数据管道。
数据摄取是任何数据管道中的第一步,它包括从各种来源收集原始数据,并为进一步处理做好准备。Rust的并发能力在这里派上了用场。像HTTP请求的request和kafka-rust这样的库可以进行高效和安全的数据摄取。
例子:
use reqwest::Client;
use tokio::stream::StreamExt;
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["http://example.com/data1", "http://example.com/data2"];
let fetches = futures::stream::iter(urls)
.map(|url| {
let client = client.clone();
async move {
let resp = client.get(url).send().await?;
let body = resp.text().await?;
Ok(body)
}
})
.buffer_unordered(5);
fetches
.for_each(|res| async {
match res {
Ok(data) => println!("Fetched data: {}", data),
Err(e) => eprintln!("Error: {}", e),
}
})
.await;
}
一旦数据被摄取,下一步就是处理。这可能涉及转换、聚合和过滤。Rust强大的迭代器系统和函数式编程范例支持富有表现力和高效的数据处理。
例子:
let data = vec![1, 2, 3, 4, 5];
let processed_data: Vec<_> = data
.into_iter()
.filter(|&x| x % 2 == 0)
.map(|x| x * 2)
.collect();
println!("Processed data: {:?}", processed_data);
3,数据存储
有效地存储处理过的数据对于任何数据管道都是至关重要的。Rust支持与各种数据库交互,包括SQL和NoSQL,确保数据可以可靠地存储和快速检索。SQL数据库的diesel和MongoDB的mongodb等库提供了必要的抽象。
例子:
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
fn establish_connection() -> SqliteConnection {
SqliteConnection::establish("test.db").expect("Error connecting to database")
}
fn main() {
let connection = establish_connection();
// Insert and query data using Diesel ORM
}
使用serde以及强大而简单的API框架(如axum),可以简单的进行序列化和反序列化操作,以有效地提供数据,使得为数据请求提供API服务变得容易。
例子:
use axum::{routing::get, Json, Router};
use serde::Serialize;
use std::net::SocketAddr;
#[derive(Serialize)]
struct HelloWorld {
message: String,
}
#[tokio::main]
async fn main() {
let app = Router::new().route("/", get(hello_world));
let addr = SocketAddr::from(([127.0.0.1], 3000));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
async fn hello_world() -> Json<HelloWorld> {
Json(HelloWorld {
message: "Hello, World!".to_string(),
})
}
数据分析通常需要处理大型数据集和执行复杂的计算。Rust的数据操作库(如polars)越来越多地用于这些任务,并且比现有的Python库(如pandas)更加高效和高性能。
例子:
use polars::prelude::*;
use std::fs::File;
use std::io::BufReader;
fn main() -> Result<()> {
let file = File::open("path/to/your/data.csv").expect("Failed to open file");
let reader = BufReader::new(file);
let df = CsvReader::new(reader)
.infer_schema(None)
.has_header(true)
.finish()?;
// Print the first few rows of the DataFrame
println!("DataFrame preview:");
println!("{:?}", df.head(Some(5)));
// Example: Calculate the mean and sum of a column named "value"
let mean_value = df
.lazy()
.select([col("value").mean().alias("mean_value")])
.collect()?
.column("mean_value")?
.get(0);
println!("Mean of 'value' column: {:?}", mean_value);
let sum_value = df
.lazy()
.select([col("value").sum().alias("sum_value")])
.collect()?
.column("sum_value")?
.get(0);
println!("Sum of 'value' column: {:?}", sum_value);
// Example: Group by a column named "category" and calculate aggregate metrics
let grouped_df = df
.lazy()
.groupby([col("category")])
.agg([
col("value").mean().alias("mean_value"),
col("value").sum().alias("sum_value"),
col("value").count().alias("count"),
])
.collect()?;
println!("Grouped DataFrame with aggregate metrics:");
println!("{:?}", grouped_df);
Ok(())
}
DataFusion是Rust数据工程生态系统中最突出的项目之一,它是一个可扩展的查询执行框架。DataFusion为构建高性能的分布式数据处理系统提供了基础,是Apache Arrow生态系统的一部分,它专注于内存中的列数据处理。
DataFusion的主要优点
1,内存处理:DataFusion利用Apache Arrow的列式进行内存处理,与传统的基于行的存储相比,这大大加快了分析查询的速度。
2,SQL支持:DataFusion支持SQL,使熟悉基于SQL数据操作的广泛用户可以访问它。
3,可扩展性:它的模块化设计允许开发人员可以根据他们的特定需求扩展和定制它,添加对新数据源、自定义函数等的支持。
4,并发性和并行性:DataFusion基于Rust,自然继承了Rust在并发性和并行性方面的优势,能够在大型数据集上高效地执行复杂查询。
为了说明如何使用DataFusion,我们看一个需要在大型数据集上执行SQL查询的场景:
use datafusion::prelude::*;
use arrow::record_batch::RecordBatch;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// 创建一个新的执行上下文
let mut ctx = ExecutionContext::new();
// 注册CSV文件
ctx.register_csv("example", "path/to/csv/file", CsvReadOptions::new()).await?;
// 执行SQL查询
let df = ctx.sql("SELECT * FROM example WHERE column_name > 100").await?;
// 收集结果
let batches: Vec<RecordBatch> = df.collect().await?;
// Print the results
for batch in batches {
println!("{:?}", batch);
}
Ok(())
}
由于DataFusion的出现,新的库如ballista将取代Spark在数据处理方面的地位,Ballista是一个用Rust编写的分布式计算平台,专为高性能、大规模数据处理而设计。它利用Apache Arrow实现高效的内存列数据表示,利用DataFusion实现查询执行,允许开发人员以分布式方式执行复杂的数据转换和分析。
Ballista旨在为传统的大数据框架(如Apache Spark)提供一个现代的、可扩展的替代方案,重点关注安全性、并发性和性能。
例子:
use ballista::prelude::*;
use tokio;
#[tokio::main]
async fn main() -> Result<()> {
// 创建一个Ballista上下文
let ctx = BallistaContext::local();
// 注册CSV文件
ctx.register_csv("example", "path/to/your/data.csv", CsvReadOptions::new()).await?;
// 执行SQL查询
let df = ctx.sql("SELECT * FROM example WHERE some_column > 100").await?;
// 收集并打印结果
let batches = df.collect().await?;
for batch in batches {
println!("{:?}", batch);
}
Ok(())
}
Rust提供了令人信服的安全性、高性能和高并发性的结合,使其成为现代数据工程任务的有力候选者,它对于数据工程的各种任务都非常有用。
本文由微信公众号coding到灯火阑珊原创,哈喽比特收录。
文章来源:https://mp.weixin.qq.com/s/9fWRbQaviGQhQBMa0JrFYQ
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。