# 安装和使用

安装包下载 (opens new window)

# 设置密码(一般测试情况下可以不用设定密码)

  • redis.windows-service.conf,不是redis.windows.conf(以非系统服务方式启动程序使用的配置文件)
  • 找到含有requirepass字样的地方,追加一行,输入requirepass 12345
  • 在服务中找到Redis名称的服务,重新启动
  • 在Redis的目录,cd C:\Program Files\Redis。输入redis-cli并回车。显示正确端口号,则表示正常
  • 输入“auth 12345”并回车(12345是之前设定的密码)。返回提示OK表示密码验证通过
  • 允许外网访问:在redis的配置文件中把bind 127.0.0.1改为 bind 0.0.0.0

# 消息队列InitQ

  • 获取initQ包
  • 添加中间件(该中间件依赖 StackExchange.Redis)
services.AddInitQ(m=> 
{
    m.SuspendTime = 1000;
    m.IntervalTime = 1000; 
    m.ConnectionString = "127.0.0.1,connectTimeout=15000,syncTimeout=5000,password=123456";
    m.ListSubscribe = new List<Type>() { typeof(RedisSubscribeA), typeof(RedisSubscribeB) };
    m.ShowLog = false;
});
  • 配置说明
public class InitQOptions
{
    /// <summary>
    /// redis连接字符串
    /// </summary>
    public string ConnectionString { get; set; }

    /// <summary>
    /// 没消息时挂起时长(毫秒)
    /// </summary>
    public int SuspendTime { get; set; }

    /// <summary>
    /// 每次消费消息间隔时间(毫秒)
    /// </summary>
    public int IntervalTime { get; set; }

    /// <summary>
    /// 是否显示日志
    /// </summary>
    public bool ShowLog { get; set; }

    /// <summary>
    /// 需要注入的类型
    /// </summary>
    public IList<Type> ListSubscribe { get; set; }

    public InitQOptions()
    {
        ConnectionString = "";
        IntervalTime = 0;
        SuspendTime = 1000;
        ShowLog = false;
    }
}

# 消息发布/订阅

  • 订阅发布者
using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
  {
      //redis对象
      var _redis = scope.ServiceProvider.GetService<ICacheService>();
      //循环向 tibos_test_1 队列发送消息
      for (int i = 0; i < 1000; i++)
      {
          await _redis.ListRightPushAsync("tibos_test_1", $"我是消息{i + 1}号");
      }
  }
  • 定义消费者
//Thread.Sleep 按顺序消费
public class RedisSubscribeA: IRedisSubscribe
{
	[Subscribe("tibos_test_1")]
	private async Task SubRedisTest(string msg)
	{
		Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
		Thread.Sleep(3000); //使用堵塞线程模式,同步延时
		Console.WriteLine($"A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
	}
}

Thread.Sleep

//Task.Delay 随机消费
[Subscribe("tibos_test_1")]
private async Task SubRedisTest(string msg)
{
	Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg}");
	await Task.Delay(3000); //使用非堵塞线程模式,异步延时
	Console.WriteLine($"A类<---当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者A消费消息:{msg} 完成");
}

Task.Delay

# 消息广播/订阅

  • 订阅消息通道,订阅者需要在程序初始化的时候启动一个线程侦听通道,这里使用HostedService来实现,并注册到容器
public class ChannelSubscribeA : IHostedService, IDisposable
{
  private readonly IServiceProvider _provider;
  private readonly ILogger _logger;

  public ChannelSubscribeA(ILogger<TestMain> logger, IServiceProvider provider)
  {
	  _logger = logger;
	  _provider = provider;
  }
  public void Dispose()
  {
	  _logger.LogInformation("退出");
  }

  public Task StartAsync(CancellationToken cancellationToken)
  {
	  _logger.LogInformation("程序启动");
	  Task.Run(async () =>
	  {
		  using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
		  {
			  //redis对象
			  var _redis = scope.ServiceProvider.GetService<ICacheService>();
			  await _redis.SubscribeAsync("test_channel", new Action<RedisChannel, RedisValue>((channel, message) =>
			  {
				  Console.WriteLine("test_channel" + " 订阅服务A收到消息:" + message);
			  }));

		  }
	  });
	  return Task.CompletedTask;
  }

  public Task StopAsync(CancellationToken cancellationToken)
  {
	  _logger.LogInformation("结束");
	  return Task.CompletedTask;
  }
}
public class ChannelSubscribeB : IHostedService, IDisposable
{
  private readonly IServiceProvider _provider;
  private readonly ILogger _logger;

  public ChannelSubscribeB(ILogger<TestMain> logger, IServiceProvider provider)
  {
	  _logger = logger;
	  _provider = provider;
  }
  public void Dispose()
  {
	  _logger.LogInformation("退出");
  }

  public Task StartAsync(CancellationToken cancellationToken)
  {
	  _logger.LogInformation("程序启动");
	  Task.Run(async () =>
	  {
		  using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
		  {
			  //redis对象
			  var _redis = scope.ServiceProvider.GetService<ICacheService>();
			  await _redis.SubscribeAsync("test_channel", new Action<RedisChannel, RedisValue>((channel, message) =>
			  {
				  Console.WriteLine("test_channel" + " 订阅服务B收到消息:" + message);
			  }));

		  }
	  });
	  return Task.CompletedTask;
  }

  public Task StopAsync(CancellationToken cancellationToken)
  {
	  _logger.LogInformation("结束");
	  return Task.CompletedTask;
  }
}
  • 将HostedService类注入到容器
services.AddHostedService<ChannelSubscribeA>();
services.AddHostedService<ChannelSubscribeB>();
  • 广播消息
using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
{
  //redis对象
  var _redis = scope.ServiceProvider.GetService<ICacheService>();
  for (int i = 0; i < 1000; i++)
  {
	  await _redis.PublishAsync("test_channel", $"往通道发送第{i}条消息");
  }
}

# 延迟消息

延迟消息非常适用处理一些定时任务的场景,如订单15分钟未付款,自动取消, xxx天后,自动续费...... 这里使用zset+redis锁来实现,这里的操作方式,跟发布/订阅非常类似

  • 定义发布者
Task.Run(async () =>
{
  using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope())
  {
	  //redis对象
	  var _redis = scope.ServiceProvider.GetService<ICacheService>();

	  for (int i = 0; i < 100; i++)
	  {
		  var dt = DateTime.Now.AddSeconds(3 * (i + 1));
		  //key:redis里的key,唯一
		  //msg:任务
		  //time:延时执行的时间
		  await _redis.SortedSetAddAsync("test_0625", $"延迟任务,第{i + 1}个元素,执行时间:{dt.ToString("yyyy-MM-dd HH:mm:ss")}", dt);
	  }
  }
});
  • 定义消费者
//延迟队列  SubscribeDelay
[SubscribeDelay("test_0625")]
private async Task SubRedisTest1(string msg)
{
  Console.WriteLine($"A类--->当前时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 订阅者延迟队列消息开始--->{msg}");
  //模拟任务执行耗时
  await Task.Delay(TimeSpan.FromSeconds(3));
  Console.WriteLine($"A类--->{msg} 结束<---");
}

延迟消息