admission controllers的特点:
- 可定制性:准入功能可针对不同的场景进行调整。
- 可预防性:审计则是为了检测问题,而准入控制器可以预防问题发生
- 可扩展性:在kubernetes自有的验证机制外,增加了另外的防线,弥补了RBAC仅能对资源提供安全保证。
下图,显示了用户操作资源的流程,可以看出 admission controllers 作用是在通过身份验证资源持久化之前起到拦截作用。在准入控制器的加入会使kubernetes增加了更高级的安全功能。
图:Kubernetes API 请求的请求处理步骤图Source:https://kubernetes.io/blog/2019/03/21/a-guide-to-kubernetes-admission-controllers/
这里找到一个大佬博客画的图,通过两张图可以很清晰的了解到admission webhook流程,与官方给出的不一样的地方在于,这里清楚地定位了kubernetes admission webhook 处于准入控制中,RBAC之后,push 之前。
图:Kubernetes API 请求的请求处理步骤图(详细)Source:https://www.armosec.io/blog/kubernetes-admission-controller/
两种控制器有什么区别?#根据官方提供的说法是
Mutating controllers may modify related objects to the requests they admit; validating controllers may not
从结构图中也可以看出,validating 是在持久化之前,而 Mutating 是在结构验证前,根据这些特性我们可以使用 Mutating 修改这个资源对象内容(如增加验证的信息),在 validating 中验证是否合法。
composition of admission controllers#kubernetes中的 admission controllers 由两部分组成:
- 内置在APIServer中的准入控制器 build-in list
- 特殊的控制器;也是内置在APIServer中,但提供一些自定义的功能MutatingAdmissionValidatingAdmission
Mutating 控制器可以修改他们处理的资源对象,Validating 控制器不会。当在任何一个阶段中的任何控制器拒绝这个了请求,则会立即拒绝整个请求,并将错误返回。
admission webhook#由于准入控制器是内置在 kube-apiserver 中的,这种情况下就限制了admission controller的可扩展性。在这种背景下,kubernetes提供了一种可扩展的准入控制器 extensible admission controllers,这种行为叫做动态准入控制 Dynamic Admission Control,而提供这个功能的就是 admission webhook 。
admission webhook 通俗来讲就是 HTTP 回调,通过定义一个http server,接收准入请求并处理。用户可以通过kubernetes提供的两种类型的 admission webhook,validating admission webhook 和 mutating admission webhook。来完成自定义的准入策略的处理。
webhook 就是
如何使用准入控制器#注:从上面的流程图也可以看出,admission webhook 也是有顺序的。首先调用mutating webhook,然后会调用validating webhook。
使用条件:kubernetes v1.16 使用 admissionregistration.k8s.io/v1 ;kubernetes v1.9 使用 admissionregistration.k8s.io/v1beta1。
如何在集群中开启准入控制器? :查看kube-apiserver 的启动参数 --enable-admission-plugins ;通过该参数来配置要启动的准入控制器,如 --enable-admission-plugins=NodeRestriction 多个准入控制器以 , 分割,顺序无关紧要。 反之使用 --disable-admission-plugins 参数可以关闭相应的准入控制器(Refer to apiserver opts)。
通过 kubectl 命令可以看到,当前kubernetes集群所支持准入控制器的版本
$ kubectl api-versions | grep admissionregistration.k8s.io/v1
admissionregistration.k8s.io/v1
admissionregistration.k8s.io/v1beta1
通过上面的学习,已经了解到了两种webhook的工作原理如下所示:
mutating webhook,会在持久化前拦截在 MutatingWebhookConfiguration 中定义的规则匹配的请求。MutatingAdmissionWebhook 通过向 mutating webhook 服务器发送准入请求来执行验证。
validaing webhook,会在持久化前拦截在 ValidatingWebhookConfiguration 中定义的规则匹配的请求。ValidatingAdmissionWebhook 通过将准入请求发送到 validating webhook server来执行验证。
那么接下来将从源码中看这个在这个工作流程中,究竟做了些什么?
资源类型#对于 1.9 版本之后,也就是 v1 版本 ,admission 被定义在 k8s.io\api\admissionregistration\v1\types.go ,大同小异,因为本地只有1.18集群,所以以这个讲解。
对于 Validating Webhook 来讲实现主要都在webhook中
type ValidatingWebhookConfiguration struct {
// 每个api必须包含下列的metadata,这个是kubernetes规范,可以在注释中的url看到相关文档
metav1.Typemeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Webhooks在这里被表示为[]ValidatingWebhook,表示我们可以注册多个
// optional
// patchMergeKey=name
// patchStrategy=merge
Webhooks []ValidatingWebhook `json:"webhooks,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,2,rep,name=Webhooks"`
}
webhook,则是对这种类型的webhook提供的操作、资源等。对于这部分不做过多的注释了,因为这里本身为kubernetes API资源,官网有很详细的例子与说明。这里更多字段的意思的可以参考官方 doc
type ValidatingWebhook struct {
// admission webhook的名词,Required
Name string `json:"name" protobuf:"bytes,1,opt,name=name"`
// ClientConfig 定义了与webhook通讯的方式 Required
ClientConfig WebhookClientConfig `json:"clientConfig" protobuf:"bytes,2,opt,name=clientConfig"`
// rule表示了webhook对于哪些资源及子资源的操作进行关注
Rules []RuleWithOperations `json:"rules,omitempty" protobuf:"bytes,3,rep,name=rules"`
// FailurePolicy 对于无法识别的value将如何处理,allowed/Ignore optional
FailurePolicy *FailurePolicyType `json:"failurePolicy,omitempty" protobuf:"bytes,4,opt,name=failurePolicy,casttype=FailurePolicyType"`
// matchPolicy 定义了如何使用“rules”列表来匹配传入的请求。
MatchPolicy *MatchPolicyType `json:"matchPolicy,omitempty" protobuf:"bytes,9,opt,name=matchPolicy,casttype=MatchPolicyType"`
NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty" protobuf:"bytes,5,opt,name=namespaceSelector"`
SideEffects *SideEffectClass `json:"sideEffects" protobuf:"bytes,6,opt,name=sideEffects,casttype=SideEffectClass"`
AdmissionReviewVersions []string `json:"admissionReviewVersions" protobuf:"bytes,8,rep,name=admissionReviewVersions"`
}
到这里了解了一个webhook资源的定义,那么这个如何使用呢?通过 Find Usages 找到一个 k8s.io/apiserver/pkg/admission/plugin/webhook/accessors.go 在使用它。这里没有注释,但在结构上可以看出,包含客户端与一系列选择器组成
type mutatingWebhookAccessor struct {
*v1.MutatingWebhook
uid string
configurationName string
initObjectSelector sync.Once
objectSelector labels.Selector
objectSelectorErr error
initNamespaceSelector sync.Once
namespaceSelector labels.Selector
namespaceSelectorErr error
initClient sync.Once
client *rest.RESTClient
clientErr error
}
accessor 因为包含了整个webhookconfig定义的一些动作(这里个人这么觉得)。
accessor.go 下面 有一个 GetRESTClient 方法 ,通过这里可以看出,这里做的就是使用根据 accessor 构造一个客户端。
func (m *mutatingWebhookAccessor) GetRESTClient(clientManager *webhookutil.ClientManager) (*rest.RESTClient, error) {
m.initClient.Do(func() {
m.client, m.clientErr = clientManager.HookClient(hookClientConfigForWebhook(m))
})
return m.client, m.clientErr
}
到这步骤已经没必要往下看了,因已经知道这里是请求webhook前的步骤了,下面就是何时请求了。
k8s.io\apiserver\pkg\admission\plugin\webhook\validating\dispatcher.go 下面有两个方法,Dispatch去请求我们自己定义的webhook
func (d *validatingDispatcher) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces, hooks []webhook.WebhookAccessor) error {
var relevantHooks []*generic.WebhookInvocation
// Construct all the versions we need to call our webhooks
versionedAttrs := map[schema.GroupVersionKind]*generic.VersionedAttributes{}
for _, hook := range hooks {
invocation, statusError := d.plugin.ShouldCallhook(hook, attr, o)
if statusError != nil {
return statusError
}
if invocation == nil {
continue
}
relevantHooks = append(relevantHooks, invocation)
// if we already have this version, continue
if _, ok := versionedAttrs[invocation.Kind]; ok {
continue
}
versionedAttr, err := generic.NewVersionedAttributes(attr, invocation.Kind, o)
if err != nil {
return apierrors.NewInternalError(err)
}
versionedAttrs[invocation.Kind] = versionedAttr
}
if len(relevantHooks) == 0 {
// no matching hooks
return nil
}
// Check if the request has already timed out before spawning remote calls
select {
case <-ctx.Done():
// parent context is canceled or timed out, no point in continuing
return apierrors.NewTimeouterror("request did not complete within requested timeout", 0)
default:
}
wg := sync.WaitGroup{}
errCh := make(chan error, len(relevantHooks))
wg.Add(len(relevantHooks))
// 循环所有相关的注册的hook
for i := range relevantHooks {
go func(invocation *generic.WebhookInvocation) {
defer wg.Done()
// invacation 中有一个 Accessor,Accessor注册了一个相关的webhookconfig
// 也就是我们 kubectl -f 注册进来的那个webhook的相关配置
hook, ok := invocation.Webhook.GetValidatingWebhook()
if !ok {
utilruntime.HandleError(fmt.Errorf("validating webhook dispatch requires v1.ValidatingWebhook, but got %T", hook))
return
}
versionedAttr := versionedAttrs[invocation.Kind]
t := time.Now()
// 调用了callHook去请求我们自定义的webhook
err := d.callHook(ctx, hook, invocation, versionedAttr)
ignoreClientCallFailures := hook.FailurePolicy != nil && *hook.FailurePolicy == v1.Ignore
rejected := false
if err != nil {
switch err := err.(type) {
case *webhookutil.ErrCallingWebhook:
if !ignoreClientCallFailures {
rejected = true
admissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionCallingWebhookError, 0)
}
case *webhookutil.ErrWebhookRejection:
rejected = true
admissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionNoError, int(err.Status.ErrStatus.Code))
default:
rejected = true
admissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionAPIServerInternalError, 0)
}
}
admissionmetrics.Metrics.ObserveWebhook(time.Since(t), rejected, versionedAttr.Attributes, "validating", hook.Name)
if err == nil {
return
}
if callErr, ok := err.(*webhookutil.ErrCallingWebhook); ok {
if ignoreClientCallFailures {
klog.Warningf("Failed calling webhook, failing open %v: %v", hook.Name, callErr)
utilruntime.HandleError(callErr)
return
}
klog.Warningf("Failed calling webhook, failing closed %v: %v", hook.Name, err)
errCh <- apierrors.NewInternalError(err)
return
}
if rejectionErr, ok := err.(*webhookutil.ErrWebhookRejection); ok {
err = rejectionErr.Status
}
klog.Warningf("rejected by webhook %q: %#v", hook.Name, err)
errCh <- err
}(relevantHooks[i])
}
wg.Wait()
close(errCh)
var errs []error
for e := range errCh {
errs = append(errs, e)
}
if len(errs) == 0 {
return nil
}
if len(errs) > 1 {
for i := 1; i < len(errs); i {
// TODO: merge status errors; until then, just return the first one.
utilruntime.HandleError(errs[i])
}
}
return errs[0]
}
折叠
callHook 可以理解为真正去请求我们自定义的webhook服务的动作
func (d *validatingDispatcher) callHook(ctx context.Context, h *v1.ValidatingWebhook, invocation *generic.WebhookInvocation, attr *generic.VersionedAttributes) error {
if attr.Attributes.IsDryRun() {
if h.SideEffects == nil {
return &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: fmt.Errorf("Webhook SideEffects is nil")}
}
if !(*h.SideEffects == v1.SideEffectClassNone || *h.SideEffects == v1.SideEffectClassNoneOnDryRun) {
return webhookerrors.NewDryRunUnsupportedErr(h.Name)
}
}
uid, request, response, err := webhookrequest.CreateAdmissionObjects(attr, invocation)
if err != nil {
return &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}
}
// 发生请求,可以看到,这里从上面的讲到的地方获取了一个客户端
client, err := invocation.Webhook.GetRESTClient(d.cm)
if err != nil {
return &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}
}
trace := utiltrace.New("Call validating webhook",
utiltrace.Field{"configuration", invocation.Webhook.GetConfigurationName()},
utiltrace.Field{"webhook", h.Name},
utiltrace.Field{"resource", attr.GetResource()},
utiltrace.Field{"subresource", attr.GetSubresource()},
utiltrace.Field{"operation", attr.GetOperation()},
utiltrace.Field{"UID", uid})
defer trace.LogIfLong(500 * time.Millisecond)
// 这里设置超时,超时时长就是在yaml资源清单中设置的那个值
if h.TimeoutSeconds != nil {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(*h.TimeoutSeconds)*time.Second)
defer cancel()
}
// 直接用post请求我们自己定义的webhook接口
r := client.Post().Body(request)
// if the context has a deadline, set it as a parameter to inform the backend
if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
// compute the timeout
if timeout := time.Until(deadline); timeout > 0 {
// if it's not an even number of seconds, round up to the nearest second
if truncated := timeout.Truncate(time.Second); truncated != timeout {
timeout = truncated time.Second
}
// set the timeout
r.Timeout(timeout)
}
}
if err := r.Do(ctx).Into(response); err != nil {
return &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}
}
trace.Step("Request completed")
result, err := webhookrequest.VerifyAdmissionResponse(uid, false, response)
if err != nil {
return &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}
}
for k, v := range result.AuditAnnotations {
key := h.Name "/" k
if err := attr.Attributes.AddAnnotation(key, v); err != nil {
klog.Warningf("Failed to set admission audit annotation %s to %s for validating webhook %s: %v", key, v, h.Name, err)
}
}
if result.Allowed {
return nil
}
return &webhookutil.ErrWebhookRejection{Status: webhookerrors.ToStatusErr(h.Name, result.Result)}
}
折叠
走到这里基本上对 admission webhook 有了大致的了解,可以知道这个操作是由 apiserver 完成的。下面就实际操作下自定义一个webhook。
这里还有两个概念,就是请求参数 AdmissionRequest 和相应参数 AdmissionResponse,这些可以在 callHook 中看到,这两个参数被定义在 k8s.io\api\admission\v1\types.go ;这两个参数也就是我们在自定义 webhook 时需要处理接收到的body的结构,以及我们响应内容数据结构。
如何编写一个自定义的admission webhook#通过上面的学习了解到了,自定义的webhook就是做为kubernetes提供给用户两种admission controller来验证自定义业务的一个中间件 admission webhook。本质上他是一个HTTP Server,用户可以使用任何语言来完成这部分功能。当然,如果涉及到需要对kubernetes集群资源操作的话,还是建议使用kubernetes官方提供了SDK的编程语言来完成自定义的webhook。
那么完成一个自定义admission webhook需要两个步骤:
- 将相关的webhook config注册给kubernetes,也就是让kubernetes知道你的webhook
- 准备一个http server来处理 apiserver发过来验证的信息
向kubernetes注册webhook对象#注:这里使用go net/http包,本身不区分方法处理HTTP的何种请求,如果用其他框架实现的,如django,需要指定对应方法需要为POST
kubernetes提供的两种类型可自定义的准入控制器,和其他资源一样,可以利用资源清单,动态配置那些资源要被adminssion webhook处理。 kubernetes将这种形式抽象为两种资源:
- ValidatingWebhookConfiguration
- MutatingWebhookConfiguration
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: "pod-policy.example.com"
webhooks:
- name: "pod-policy.example.com"
rules:
- apiGroups: [""] # 拦截资源的Group "" 表示 core。"*" 表示所有。
apiVersions: ["v1"] # 拦截资源的版本
operations: ["CREATE"] # 什么请求下拦截
resources: ["pods"] # 拦截什么资源
scope: "Namespaced" # 生效的范围,cluster还是namespace "*"表示没有范围限制。
clientConfig: # 我们部署的webhook服务,
service: # service是在cluster-in模式下
namespace: "example-namespace"
name: "example-service"
port: 443 # 服务的端口
path: "/validate" # path是对应用于验证的接口
# caBundle是提供给 admission webhook CA证书
caBundle: "Ci0tLS0tQk...<base64-encoded PEM bundle containing the CA that signed the webhook's serving certificate>...tLS0K"
admissionReviewVersions: ["v1", "v1beta1"]
sideEffects: None
timeoutSeconds: 5 # 1-30s直接,表示请求api的超时时间
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: "valipod-policy.example.com"
webhooks:
- name: "valipod-policy.example.com"
rules:
- apiGroups: ["apps"] # 拦截资源的Group "" 表示 core。"*" 表示所有。
apiVersions: ["v1"] # 拦截资源的版本
operations: ["CREATE"] # 什么请求下拦截
resources: ["deployments"] # 拦截什么资源
scope: "Namespaced" # 生效的范围,cluster还是namespace "*"表示没有范围限制。
clientConfig: # 我们部署的webhook服务,
url: "https://10.0.0.1:81/validate" # 这里是外部模式
# service: # service是在cluster-in模式下
# namespace: "default"
# name: "admission-webhook"
# port: 81 # 服务的端口
# path: "/mutate" # path是对应用于验证的接口
# caBundle是提供给 admission webhook CA证书
caBundle: "Ci0tLS0tQk...<base64-encoded PEM bundle containing the CA that signed the webhook's serving certificate>...tLS0K"
admissionReviewVersions: ["v1"]
sideEffects: None
timeoutSeconds: 5 # 1-30s直接,表示请求api的超时时间
注:对于webhook,也可以引入外部的服务,并非必须部署到集群内部
对于外部服务来讲,需要 clientConfig 中的 service , 更换为 url ; 通过 url 参数可以将一个外部的服务引入
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
...
webhooks:
- name: my-webhook.example.com
clientConfig:
url: "https://my-webhook.example.com:9443/my-webhook-path"
...
注:这里的url规则必须准守下列形式:
scheme://host:port/path使用了url 时,这里不应填写集群内的服务scheme 必须是 https,不能为http,这就意味着,引入外部时也需要配置时使用了,?xx=xx 的参数也是不被允许的(官方说法是这样的,通过源码学习了解到因为会发送特定的请求体,所以无需管参数)
更多的配置可以参考kubernetes官方提供的 doc
准备一个webhook#让我们编写我们的 webhook server。将创建两个钩子,/mutate 与 /validate;
- /mutate 将在创建deployment资源时,基于版本,给资源加上注释 webhook.example.com/allow: true
- /validate 将对 /mutate 增加的 allow:true 那么则继续,否则拒绝。
这里为了方便,全部写在一起了,实际上不符合软件的设计。在kubernetes代码库中也提供了一个webhook server,可以参考他这个webhook server来学习具体要做什么
package main
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
v1admission "k8s.io/api/admission/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
appv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
)
type patch struct {
Op string `json:"op"`
Path string `json:"path"`
Value map[string]string `json:"value"`
}
func serve(w http.ResponseWriter, r *http.Request) {
var body []byte
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
klog.Infof(fmt.Sprintf("receive request: %v....", string(body)[:130]))
if len(body) == 0 {
klog.Error(fmt.Sprintf("admission request body is empty"))
http.Error(w, fmt.Errorf("admission request body is empty").Error(), http.StatusBadRequest)
return
}
var admission v1admission.AdmissionReview
codefc := serializer.NewCodecFactory(runtime.NewScheme())
decoder := codefc.UniversalDeserializer()
_, _, err := decoder.Decode(body, nil, &admission)
if err != nil {
msg := fmt.Sprintf("Request could not be decoded: %v", err)
klog.Error(msg)
http.Error(w, msg, http.StatusBadRequest)
return
}
if admission.Request == nil {
klog.Error(fmt.Sprintf("admission review can't be used: Request field is nil"))
http.Error(w, fmt.Errorf("admission review can't be used: Request field is nil").Error(), http.StatusBadRequest)
return
}
switch strings.Split(r.RequestURI, "?")[0] {
case "/mutate":
req := admission.Request
var admissionResp v1admission.AdmissionReview
admissionResp.APIVersion = admission.APIVersion
admissionResp.Kind = admission.Kind
klog.Infof("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v Operation=%v",
req.Kind.Kind, req.Namespace, req.Name, req.UID, req.Operation)
switch req.Kind.Kind {
case "Deployment":
var (
respstr []byte
err error
deploy appv1.Deployment
)
if err = json.Unmarshal(req.Object.Raw, &deploy); err != nil {
respStructure := v1admission.AdmissionResponse{Result: &metav1.Status{
Message: fmt.Sprintf("could not unmarshal resouces review request: %v", err),
Code: http.StatusInternalServerError,
}}
klog.Error(fmt.Sprintf("could not unmarshal resouces review request: %v", err))
if respstr, err = json.Marshal(respStructure); err != nil {
klog.Error(fmt.Errorf("could not unmarshal resouces review response: %v", err))
http.Error(w, fmt.Errorf("could not unmarshal resouces review response: %v", err).Error(), http.StatusInternalServerError)
return
}
http.Error(w, string(respstr), http.StatusBadRequest)
return
}
current_annotations := deploy.GetAnnotations()
pl := []patch{}
for k, v := range current_annotations {
pl = append(pl, patch{
Op: "add",
Path: "/metadata/annotations",
Value: map[string]string{
k: v,
},
})
}
pl = append(pl, patch{
Op: "add",
Path: "/metadata/annotations",
Value: map[string]string{
deploy.Name "/Allow": "true",
},
})
annotationbyte, err := json.Marshal(pl)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
respStructure := &v1admission.AdmissionResponse{
UID: req.UID,
Allowed: true,
Patch: annotationbyte,
PatchType: func() *v1admission.PatchType {
t := v1admission.PatchTypeJSONPatch
return &t
}(),
Result: &metav1.Status{
Message: fmt.Sprintf("could not unmarshal resouces review request: %v", err),
Code: http.StatusOK,
},
}
admissionResp.Response = respStructure
klog.Infof("sending response: %s....", admissionResp.Response.String()[:130])
respByte, err := json.Marshal(admissionResp)
if err != nil {
klog.Errorf("Can't encode response messages: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
klog.Infof("prepare to write response...")
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(respByte); err != nil {
klog.Errorf("Can't write response: %v", err)
http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
}
default:
klog.Error(fmt.Sprintf("unsupport resouces review request type"))
http.Error(w, "unsupport resouces review request type", http.StatusBadRequest)
}
case "/validate":
req := admission.Request
var admissionResp v1admission.AdmissionReview
admissionResp.APIVersion = admission.APIVersion
admissionResp.Kind = admission.Kind
klog.Infof("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v Operation=%v",
req.Kind.Kind, req.Namespace, req.Name, req.UID, req.Operation)
var (
deploy appv1.Deployment
respstr []byte
)
switch req.Kind.Kind {
case "Deployment":
if err = json.Unmarshal(req.Object.Raw, &deploy); err != nil {
respStructure := v1admission.AdmissionResponse{Result: &metav1.Status{
Message: fmt.Sprintf("could not unmarshal resouces review request: %v", err),
Code: http.StatusInternalServerError,
}}
klog.Error(fmt.Sprintf("could not unmarshal resouces review request: %v", err))
if respstr, err = json.Marshal(respStructure); err != nil {
klog.Error(fmt.Errorf("could not unmarshal resouces review response: %v", err))
http.Error(w, fmt.Errorf("could not unmarshal resouces review response: %v", err).Error(), http.StatusInternalServerError)
return
}
http.Error(w, string(respstr), http.StatusBadRequest)
return
}
}
al := deploy.GetAnnotations()
respStructure := v1admission.AdmissionResponse{
UID: req.UID,
}
if al[fmt.Sprintf("%s/Allow", deploy.Name)] == "true" {
respStructure.Allowed = true
respStructure.Result = &metav1.Status{
Code: http.StatusOK,
}
} else {
respStructure.Allowed = false
respStructure.Result = &metav1.Status{
Code: http.StatusForbidden,
Reason: func() metav1.StatusReason {
return metav1.StatusReasonForbidden
}(),
Message: fmt.Sprintf("the resource %s couldn't to allow entry.", deploy.Kind),
}
}
admissionResp.Response = &respStructure
klog.Infof("sending response: %s....", admissionResp.Response.String()[:130])
respByte, err := json.Marshal(admissionResp)
if err != nil {
klog.Errorf("Can't encode response messages: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
klog.Infof("prepare to write response...")
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(respByte); err != nil {
klog.Errorf("Can't write response: %v", err)
http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
}
}
}
func main() {
var (
cert, key string
)
if cert = os.Getenv("TLS_CERT"); len(cert) == 0 {
cert = "./tls/tls.crt"
}
if key = os.Getenv("TLS_KEY"); len(key) == 0 {
key = "./tls/tls.key"
}
ca, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
klog.Error(err.Error())
return
}
server := &http.Server{
Addr: ":81",
TLSConfig: &tls.Config{
Certificates: []tls.Certificate{
ca,
},
},
}
httpserver := http.NewServeMux()
httpserver.HandleFunc("/validate", serve)
httpserver.HandleFunc("/mutate", serve)
httpserver.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
klog.Info(fmt.Sprintf("%s %s", r.RequestURI, "pong"))
fmt.Fprint(w, "pong")
})
server.Handler = httpserver
go func() {
if err := server.ListenAndServeTLS("", ""); err != nil {
klog.Errorf("Failed to listen and serve webhook server: %v", err)
}
}()
klog.Info("starting serve.")
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
<-signalChan
klog.Infof("Got shut signal, shutting...")
if err := server.Shutdown(context.Background()); err != nil {
klog.Errorf("HTTP server Shutdown: %v", err)
}
}
折叠
对应的Dockerfile
FROM golang:alpine AS builder
MAINTAINER cylon
WORKDIR /admission
COPY ./ /admission
ENV GOPROXY https://goproxy.cn,direct
RUN \
sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories && \
apk add upx && \
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -ldflags "-s -w" -o webhook main.go && \
upx -1 webhook && \
chmod x webhook
FROM alpine AS runner
WORKDIR /go/admission
COPY --from=builder /admission/webhook .
VOLUME ["/admission"]
集群内部部署所需的资源清单
apiVersion: v1
kind: Service
metadata:
name: admission-webhook
labels:
app: admission-webhook
spec:
ports:
- port: 81
targetPort: 81
selector:
app: simple-webhook
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: simple-webhook
name: simple-webhook
spec:
replicas: 1
selector:
matchLabels:
app: simple-webhook
template:
metadata:
labels:
app: simple-webhook
spec:
containers:
- image: cylonchau/simple-webhook:v0.0.2
imagePullPolicy: IfNotPresent
name: webhook
command: ["./webhook"]
env:
- name: "TLS_CERT"
value: "./tls/tls.crt"
- name: "TLS_KEY"
value: "./tls/tls.key"
- name: NS_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
ports:
- containerPort: 81
volumeMounts:
- name: tlsdir
mountPath: /go/admission/tls
readOnly: true
volumes:
- name: tlsdir
secret:
secretName: webhook
---
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: "pod-policy.example.com"
webhooks:
- name: "pod-policy.example.com"
rules:
- apiGroups: ["apps"] # 拦截资源的Group "" 表示 core。"*" 表示所有。
apiVersions: ["v1"] # 拦截资源的版本
operations: ["CREATE"] # 什么请求下拦截
resources: ["deployments"] # 拦截什么资源
scope: "Namespaced" # 生效的范围,cluster还是namespace "*"表示没有范围限制。
clientConfig: # 我们部署的webhook服务,
url: "https://10.0.0.1:81/mutate"
# service: # service是在cluster-in模式下
# namespace: "default"
# name: "admission-webhook"
# port: 81 # 服务的端口
# path: "/mutate" # path是对应用于验证的接口
# caBundle是提供给 admission webhook CA证书
caBundle: Put you CA (base64 encode) in here
admissionReviewVersions: ["v1"]
sideEffects: None
timeoutSeconds: 5 # 1-30s直接,表示请求api的超时时间
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: "valipod-policy.example.com"
webhooks:
- name: "valipod-policy.example.com"
rules:
- apiGroups: ["apps"] # 拦截资源的Group "" 表示 core。"*" 表示所有。
apiVersions: ["v1"] # 拦截资源的版本
operations: ["CREATE"] # 什么请求下拦截
resources: ["deployments"] # 拦截什么资源
scope: "Namespaced" # 生效的范围,cluster还是namespace "*"表示没有范围限制。
clientConfig: # 我们部署的webhook服务,
# service: # service是在cluster-in模式下
# namespace: "default"
# name: "admission-webhook"
# port: 81 # 服务的端口
# path: "/mutate" # path是对应用于验证的接口
# caBundle是提供给 admission webhook CA证书
caBundle: Put you CA (base64 encode) in here
admissionReviewVersions: ["v1"]
sideEffects: None
timeoutSeconds: 5 # 1-30s直接,表示请求api的超时时间
折叠
证书问题
如果需要 cluster-in ,那么则需要对对应webhookconfig资源配置 service ;如果使用的是外部部署,则需要配置对应访问地址,如:"https://xxxx:port/method"
这两种方式的证书均需要对应的 subjectAltName ,cluster-in 模式 需要对应service名称,如,至少包含serviceName.NS.svc 这一个域名。
下面就是证书类问题的错误
Failed calling webhook, failing closed pod-policy.example.com: failed calling webhook "pod-policy.example.com": Post https://admission-webhook.default.svc:81/mutate?timeout=5s: x509: certificate signed by unknown authority (possibly because of "crypto/rsa: verification error" while trying to verify candidate authority certificate "admission-webhook-ca")
相应信息问题
上面我们了解到的APIServer是去发出 v1admission.AdmissionReview 也就是 Request 和 Response类型的,所以,为了更清晰的表示出问题所在,需要对响应格式中的 Reason 与 Message 配置,这也就是我们在客户端看到的报错信息。
&metav1.Status{
Code: http.StatusForbidden,
Reason: func() metav1.StatusReason {
return metav1.StatusReasonForbidden
}(),
Message: fmt.Sprintf("the resource %s couldn't to allow entry.", deploy.Kind),
}
通过上面的设置用户可以看到下列错误
$ kubectl apply -f nginx.yaml
Error from server (Forbidden): error when creating "nginx.yaml": admission webhook "valipod-policy.example.com" denied the request: the resource Deployment couldn't to allow entry.
注:必须的参数还包含,UID,allowed,这两个是必须的,上面阐述的只是对用户友好的提示信息
下面的报错就是对相应格式设置错误
Error from server (InternalError): error when creating "nginx.yaml": Internal error occurred: failed calling webhook "pod-policy.example.com": the server rejected our request for an unknown reason
相应信息版本问题
相应信息也需要指定一个版本,这个与请求来的结构中拿即可
admissionResp.APIVersion = admission.APIVersion
admissionResp.Kind = admission.Kind
下面是没有为对应相应信息配置对应KV的值出现的报错
Error from server (InternalError): error when creating "nginx.yaml": Internal error occurred: failed calling webhook "pod-policy.example.com": expected webhook response of admission.k8s.io/v1, Kind=AdmissionReview, got /, Kind=
关于patch
kubernetes中patch使用的是特定的规范,如 jsonpatch
kubernetes当前唯一支持的 patchType 是 JSONPatch。 有关更多详细信息,请参见 JSON patch
对于 jsonpatch 是一个固定的类型,在go中必须定义其结构体
{ "op": "add", // 做什么操作 "path": "/spec/replicas", // 操作的路径 "value": 3 // 对应添加的key value }
下面就是字符串类型设置为布尔型产生的报错
Error from server (InternalError): error when creating "nginx.yaml": Internal error occurred: v1.Deployment.ObjectMeta: v1.ObjectMeta.Annotations: ReadString: expects " or n, but found t, error found in #10 byte of ...|t/Allow":true},"crea|..., bigger context ...|tadata":{"annotations":{"nginx-deployment/Allow":true},"creationTimestamp":null,"managedFields":[{"m|..
Ubuntu
touch ./demoCAindex.txt
touch ./demoCA/serial
touch ./demoCA/crlnumber
echo 01 > ./demoCA/serial
mkdir ./demoCA/newcerts
openssl genrsa -out cakey.pem 2048
openssl req -new \
-x509 \
-key cakey.pem \
-out cacert.pem \
-days 3650 \
-subj "/CN=admission webhook ca"
openssl genrsa -out tls.key 2048
openssl req -new \
-key tls.key \
-subj "/CN=admission webhook client" \
-reqexts webhook \
-config <(cat /etc/ssl/openssl.cnf \
<(printf "[webhook]\nsubjectAltName=DNS: admission-webhook, DNS: admission-webhook.default.svc, DNS: admission-webhook.default.svc.cluster.local, IP:10.0.0.1, IP:10.0.0.4")) \
-out tls.csr
sed -i 's/= match/= optional/g' /etc/ssl/openssl.cnf
openssl ca \
-in tls.csr \
-cert cacert.pem \
-keyfile cakey.pem \
-out tls.crt \
-days 300 \
-extensions webhook \
-extfile <(cat /etc/ssl/openssl.cnf \
<(printf "[webhook]\nsubjectAltName=DNS: admission-webhook, DNS: admission-webhook.default.svc, DNS: admission-webhook.default.svc.cluster.local, IP:10.0.0.1, IP:10.0.0.4"))
CentOS
touch /etc/pki/CA/index.txt
touch /etc/pki/CA/serial # 下一个要颁发的编号 16进制
touch /etc/pki/CA/crlnumber
echo 01 > /etc/pki/CA/serial
openssl req -new \
-x509 \
-key cakey.pem \
-out cacert.pem \
-days 3650 \
-subj "/CN=admission webhook ca"
openssl genrsa -out tls.key 2048
openssl req -new \
-key tls.key \
-subj "/CN=admission webhook client" \
-reqexts webhook \
-config <(cat /etc/pki/tls/openssl.cnf \
<(printf "[webhook]\nsubjectAltName=DNS: admission-webhook, DNS: admission-webhook.default.svc, DNS: admission-webhook.default.svc.cluster.local, IP:10.0.0.1, IP:10.0.0.4")) \
-out tls.csr
sed -i 's/= match/= optional/g' /etc/ssl/openssl.cnf
openssl ca \
-in tls.csr \
-cert cacert.pem \
-keyfile cakey.pem \
-out tls.crt \
-days 300 \
-extensions webhook \
-extfile <(cat /etc/pki/tls/openssl.cnf \
<(printf "[webhook]\nsubjectAltName=DNS: admission-webhook, DNS: admission-webhook.default.svc, DNS: admission-webhook.default.svc.cluster.local, IP:10.0.0.1, IP:10.0.0.4"))
可以看到我们自己注入的 annotation nginx-deployment/Allow: true,在该示例中,仅为演示过程,而不是真的策略,实际环境中可以根据情况进行定制自己的策略。
结果可以看出,当在 mutating 中不通过,即缺少对应的 annotation 标签 , 则 validating 会不允许准入
$ kubectl describe deploy nginx-deployment
Name: nginx-deployment
Namespace: default
CreationTimestamp: Mon, 11 Jul 2022 20:25:16 0800
Labels: <none>
Annotations: deployment.kubernetes.io/revision: 1
nginx-deployment/Allow: true
Selector: app=nginx
Replicas: 1 desired | 1 updated | 1 total | 1 available | 0 unavailable
StrategyType: RollingUpdate
MinReadySeconds: 0
RollingUpdateStrategy: 25% max unavailable, 25% max surge
Pod Template:
Labels: app=nginx
Containers:
nginx:
Image: nginx:1.14.2
文章来自https://www.cnblogs.com/Cylon/p/16467991.html
,