mirror of
https://github.com/fiodarsazanavets/aspire-13-examples.git
synced 2026-06-20 12:23:14 +00:00
Add samples for Azure Queue Storage
This commit is contained in:
@@ -1,29 +1,90 @@
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
|
||||
namespace OnlineShop.ApiService;
|
||||
|
||||
public class LocationUpdater(
|
||||
IHubContext<LocationHub> locationHub) :
|
||||
BackgroundService
|
||||
public sealed class LocationUpdater(
|
||||
IHubContext<LocationHub> locationHub,
|
||||
IConnection rabbitConnection) : BackgroundService
|
||||
{
|
||||
protected async override Task
|
||||
ExecuteAsync(
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
await using var channel =
|
||||
await rabbitConnection.CreateChannelAsync(cancellationToken: stoppingToken);
|
||||
|
||||
const string queueName = "orders.created";
|
||||
|
||||
await channel.QueueDeclareAsync(
|
||||
queue: queueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: null,
|
||||
cancellationToken: stoppingToken);
|
||||
|
||||
await channel.BasicQosAsync(
|
||||
prefetchSize: 0,
|
||||
prefetchCount: 1,
|
||||
global: false,
|
||||
cancellationToken: stoppingToken);
|
||||
|
||||
var consumer = new AsyncEventingBasicConsumer(channel);
|
||||
|
||||
consumer.ReceivedAsync += async (sender, args) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
var json = Encoding.UTF8.GetString(args.Body.ToArray());
|
||||
var message = JsonSerializer.Deserialize<OrderCreatedMessage>(json);
|
||||
|
||||
if (message is null)
|
||||
{
|
||||
await channel.BasicNackAsync(
|
||||
args.DeliveryTag, false, false, stoppingToken);
|
||||
return;
|
||||
}
|
||||
|
||||
await SetInitialDeliveryLocation(
|
||||
message.OrderId,
|
||||
stoppingToken);
|
||||
|
||||
await channel.BasicAckAsync(
|
||||
args.DeliveryTag,
|
||||
multiple: false,
|
||||
cancellationToken: stoppingToken);
|
||||
}
|
||||
catch
|
||||
{
|
||||
await channel.BasicNackAsync(
|
||||
args.DeliveryTag,
|
||||
false,
|
||||
requeue: true,
|
||||
cancellationToken: stoppingToken);
|
||||
}
|
||||
};
|
||||
|
||||
await channel.BasicConsumeAsync(
|
||||
queue: queueName,
|
||||
autoAck: false,
|
||||
consumer: consumer,
|
||||
cancellationToken: stoppingToken);
|
||||
|
||||
await Task.Delay(Timeout.Infinite, stoppingToken);
|
||||
}
|
||||
|
||||
private async Task SetInitialDeliveryLocation(
|
||||
int orderId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await Task.Delay(5000, cancellationToken);
|
||||
await UpdateLocation(
|
||||
51.5074, -0.1276, cancellationToken);
|
||||
|
||||
await Task.Delay(5000, cancellationToken);
|
||||
await UpdateLocation(
|
||||
51.5074, -0.13, cancellationToken);
|
||||
|
||||
await Task.Delay(5000, cancellationToken);
|
||||
await UpdateLocation(
|
||||
51.508, -0.14, cancellationToken);
|
||||
await Task.Delay(3000, cancellationToken);
|
||||
await UpdateLocation(orderId, 51.5074, -0.1276, cancellationToken);
|
||||
}
|
||||
|
||||
private async Task UpdateLocation(
|
||||
int orderId,
|
||||
double latitude,
|
||||
double longitude,
|
||||
CancellationToken cancellationToken)
|
||||
@@ -35,4 +96,5 @@ public class LocationUpdater(
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
private sealed record OrderCreatedMessage(int OrderId);
|
||||
}
|
||||
@@ -9,8 +9,12 @@ using MongoDB.Driver;
|
||||
using OnlineShop.ApiService;
|
||||
using OnlineShop.ApiService.Model;
|
||||
using OnlineShop.ServiceDefaults.Dtos;
|
||||
using RabbitMQ.Client;
|
||||
using System.Data;
|
||||
using System.Globalization;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using static MongoDB.Driver.WriteConcern;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
|
||||
@@ -440,6 +444,136 @@ app.MapGet("/product-specs", async (
|
||||
return productSpecs.ToArray();
|
||||
});
|
||||
|
||||
app.MapPost("/api/orders", async (
|
||||
Dictionary<int, int> basket,
|
||||
[FromServices] SqlConnection dbConnection,
|
||||
[FromServices] IConnection rabbitConnection) =>
|
||||
{
|
||||
if (basket is null || basket.Count == 0)
|
||||
return Results.BadRequest("Basket is empty.");
|
||||
|
||||
var items = basket
|
||||
.Where(kvp => kvp.Value > 0)
|
||||
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
|
||||
|
||||
if (items.Count == 0)
|
||||
return Results.BadRequest("Basket contains no items with quantity > 0.");
|
||||
|
||||
if (dbConnection.State != ConnectionState.Open)
|
||||
await dbConnection.OpenAsync();
|
||||
|
||||
int orderId;
|
||||
decimal totalAmount;
|
||||
|
||||
await using var tx =
|
||||
(SqlTransaction)await dbConnection.BeginTransactionAsync(IsolationLevel.ReadCommitted);
|
||||
|
||||
try
|
||||
{
|
||||
var productIds = items.Keys.ToList();
|
||||
var inParams = string.Join(", ", productIds.Select((_, i) => $"@p{i}"));
|
||||
|
||||
var priceLookup = new Dictionary<int, decimal>();
|
||||
|
||||
await using (var cmd = new SqlCommand($@"
|
||||
SELECT Id, Price
|
||||
FROM Products
|
||||
WHERE Id IN ({inParams});",
|
||||
dbConnection, tx))
|
||||
{
|
||||
for (int i = 0; i < productIds.Count; i++)
|
||||
cmd.Parameters.AddWithValue($"@p{i}", productIds[i]);
|
||||
|
||||
await using var reader = await cmd.ExecuteReaderAsync();
|
||||
while (await reader.ReadAsync())
|
||||
priceLookup[reader.GetInt32(0)] = reader.GetDecimal(1);
|
||||
}
|
||||
|
||||
totalAmount = items.Sum(i => priceLookup[i.Key] * i.Value);
|
||||
|
||||
await using (var cmd = new SqlCommand(@"
|
||||
INSERT INTO Orders (TotalAmount)
|
||||
VALUES (@totalAmount);
|
||||
SELECT CAST(SCOPE_IDENTITY() AS INT);",
|
||||
dbConnection, tx))
|
||||
{
|
||||
var p = cmd.Parameters.Add("@totalAmount", SqlDbType.Decimal);
|
||||
p.Precision = 18;
|
||||
p.Scale = 2;
|
||||
p.Value = totalAmount;
|
||||
|
||||
orderId = (int)(await cmd.ExecuteScalarAsync() ?? 0);
|
||||
}
|
||||
|
||||
if (orderId <= 0)
|
||||
return Results.Problem("Failed to create order.");
|
||||
|
||||
await using (var cmd = new SqlCommand(@"
|
||||
INSERT INTO OrderItems (OrderId, ProductId, Quantity)
|
||||
VALUES (@orderId, @productId, @quantity);",
|
||||
dbConnection, tx))
|
||||
{
|
||||
cmd.Parameters.Add("@orderId", SqlDbType.Int).Value = orderId;
|
||||
var pProductId = cmd.Parameters.Add("@productId", SqlDbType.Int);
|
||||
var pQuantity = cmd.Parameters.Add("@quantity", SqlDbType.Int);
|
||||
|
||||
foreach (var (productId, qty) in items)
|
||||
{
|
||||
pProductId.Value = productId;
|
||||
pQuantity.Value = qty;
|
||||
await cmd.ExecuteNonQueryAsync();
|
||||
}
|
||||
}
|
||||
|
||||
await tx.CommitAsync();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
await tx.RollbackAsync();
|
||||
return Results.Problem($"Order creation failed: {ex.Message}");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await using var channel = await rabbitConnection.CreateChannelAsync();
|
||||
|
||||
const string queueName = "orders.created";
|
||||
|
||||
await channel.QueueDeclareAsync(
|
||||
queue: queueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: null);
|
||||
|
||||
var message = JsonSerializer.Serialize(new { orderId });
|
||||
var body = Encoding.UTF8.GetBytes(message);
|
||||
|
||||
var props = new BasicProperties
|
||||
{
|
||||
Persistent = true
|
||||
};
|
||||
|
||||
await channel.BasicPublishAsync(
|
||||
exchange: "",
|
||||
routingKey: queueName,
|
||||
mandatory: false,
|
||||
basicProperties: props,
|
||||
body: body);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return Results.Problem(
|
||||
$"Order was created (Id={orderId}) but RabbitMQ publish failed: {ex.Message}");
|
||||
}
|
||||
|
||||
return Results.Created($"/api/orders/{orderId}", new
|
||||
{
|
||||
OrderId = orderId,
|
||||
TotalAmount = totalAmount
|
||||
});
|
||||
});
|
||||
|
||||
app.MapDefaultEndpoints();
|
||||
|
||||
app.MapHub<LocationHub>("/locationHub");
|
||||
|
||||
Reference in New Issue
Block a user