Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add PublishToLocal #1

Merged
merged 1 commit into from
Aug 6, 2021
Merged

add PublishToLocal #1

merged 1 commit into from
Aug 6, 2021

Conversation

sljeff
Copy link

@sljeff sljeff commented Jul 28, 2021

@fenngwd fenngwd self-requested a review August 3, 2021 09:50
@fenngwd fenngwd self-assigned this Aug 3, 2021
Copy link

@fenngwd fenngwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sljeff 是不是有几个consume写掉了

@fenngwd
Copy link

fenngwd commented Aug 3, 2021

@sljeff PublishToLocal 从broker上暴露出来之后,怎么在liveness和readiness 探针里面去访问呢?

@sljeff
Copy link
Author

sljeff commented Aug 4, 2021

@sljeff 是不是有几个consume写掉了

写漏了?应该只有 gcp_pubsub 的没办法实现就没写。

@sljeff PublishToLocal 从broker上暴露出来之后,怎么在liveness和readiness 探针里面去访问呢?

参考这个描述里面的 checkHealth 方法,在 goaby 或者什么地方实现一下就好 RichardKnop#686

@fenngwd
Copy link

fenngwd commented Aug 4, 2021

@sljeff

func (b *Broker) consume(deliveries <-chan []byte, concurrency int, taskProcessor iface.TaskProcessor) error {

@fenngwd
Copy link

fenngwd commented Aug 4, 2021

参考这个描述里面的 checkHealth 方法,在 goaby 或者什么地方实现一下就好 RichardKnop#686

我的意思时说,比如你在Worker里实现了收到消息健康检查的Task,

	server.RegisterTask(healthCheckTaskName, func(healthCheckUUID string) error {
		select {
		case healthCheckCompleteChan <- healthCheckUUID: // success and send uuid
			return nil
		case <-time.After(5 * time.Second):
			return fmt.Errorf("send health check result error: %v", healthCheckUUID)
		}
	})

但从liveness和readiness的探针怎么调用了?HTTP?还是别的?

@sljeff
Copy link
Author

sljeff commented Aug 4, 2021

@sljeff

func (b *Broker) consume(deliveries <-chan []byte, concurrency int, taskProcessor iface.TaskProcessor) error {

这个逻辑是对的吧,consume 会从 deliveries 拉消息;PublishToLocal 也是往同一个 deliveries 里丢消息。

参考这个描述里面的 checkHealth 方法,在 goaby 或者什么地方实现一下就好 RichardKnop#686

我的意思时说,比如你在Worker里实现了收到消息健康检查的Task,

	server.RegisterTask(healthCheckTaskName, func(healthCheckUUID string) error {
		select {
		case healthCheckCompleteChan <- healthCheckUUID: // success and send uuid
			return nil
		case <-time.After(5 * time.Second):
			return fmt.Errorf("send health check result error: %v", healthCheckUUID)
		}
	})

但从liveness和readiness的探针怎么调用了?HTTP?还是别的?

探针就是有个命令能调用 checkHealth 方法就行,用 HTTP 应该没什么必要。

@fenngwd
Copy link

fenngwd commented Aug 4, 2021

@sljeff 那换个问题,为什么amqp需要一个单独的delivery?

@fenngwd
Copy link

fenngwd commented Aug 4, 2021

@sljeff 命令可以调用吗,不是local的吗?进程间怎么通信?

@sljeff
Copy link
Author

sljeff commented Aug 4, 2021

@sljeff 那换个问题,为什么amqp需要一个单独的delivery?

应该是 amqp 的 deliveries 对象是个 <-chan Delivery(返回值是 amqp 那个库决定的),不能往里丢消息,所以就额外加了一个 channel。

@sljeff 命令可以调用吗,不是local的吗?进程间怎么通信?

确实,没考虑到这个。可能需要做成一个 http 接口或者什么其他的可以被触发的接口,总之是可以放进一个库里然后探针去触发。

@fenngwd fenngwd merged commit 45afbf2 into shanbay:master Aug 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants